statsig_rust/event_logging/event_queue/
queue.rs

1use crate::{
2    event_logging::statsig_event_internal::StatsigEventInternal, log_d, read_lock_or_return,
3    write_lock_or_return,
4};
5use std::{collections::VecDeque, sync::RwLock};
6
7use super::{batch::EventBatch, queued_event::QueuedEvent};
8
9const TAG: &str = stringify!(EventQueue);
10
11pub enum QueueResult {
12    Success,
13    DroppedEvents(u64),
14}
15
16pub struct EventQueue {
17    pub batch_size: usize,
18    pub max_pending_batches: usize,
19
20    pending_events: RwLock<Vec<QueuedEvent>>,
21    batches: RwLock<VecDeque<EventBatch>>,
22}
23
24impl EventQueue {
25    pub fn new(batch_size: u32, max_queue_size: u32) -> Self {
26        Self {
27            pending_events: RwLock::new(Vec::new()),
28            batches: RwLock::new(VecDeque::new()),
29            batch_size: batch_size as usize,
30            max_pending_batches: max_queue_size as usize,
31        }
32    }
33
34    pub fn approximate_pending_events_count(&self) -> usize {
35        let pending_len = read_lock_or_return!(TAG, self.pending_events, 0).len();
36        let batches_len = read_lock_or_return!(TAG, self.batches, 0).len();
37        pending_len + (batches_len * self.batch_size)
38    }
39
40    pub fn add(&self, pending_event: QueuedEvent) -> bool {
41        let mut pending_events = write_lock_or_return!(TAG, self.pending_events, false);
42        pending_events.push(pending_event);
43        pending_events.len() % self.batch_size == 0
44    }
45
46    pub fn requeue_batch(&self, batch: EventBatch) -> QueueResult {
47        let len = batch.events.len() as u64;
48        let mut batches = write_lock_or_return!(TAG, self.batches, QueueResult::DroppedEvents(len));
49
50        if batches.len() > self.max_pending_batches {
51            return QueueResult::DroppedEvents(len);
52        }
53
54        log_d!(
55            TAG,
56            "Requeueing batch with {} events and {} attempts to flush",
57            batch.events.len(),
58            batch.attempts
59        );
60
61        batches.push_back(batch);
62        QueueResult::Success
63    }
64
65    pub fn contains_at_least_one_full_batch(&self) -> bool {
66        let pending_events_count = self.pending_events.read().map(|e| e.len()).unwrap_or(0);
67        if pending_events_count >= self.batch_size {
68            return true;
69        }
70
71        let batches = read_lock_or_return!(TAG, self.batches, false);
72        for batch in batches.iter() {
73            if batch.events.len() >= self.batch_size {
74                return true;
75            }
76        }
77
78        false
79    }
80
81    pub fn take_all_batches(&self) -> VecDeque<EventBatch> {
82        let mut batches = self.batches.write().unwrap();
83        std::mem::take(&mut *batches)
84    }
85
86    pub fn take_next_batch(&self) -> Option<EventBatch> {
87        let mut batches = self.batches.write().unwrap();
88        batches.pop_front()
89    }
90
91    pub fn reconcile_batching(&self) -> QueueResult {
92        let mut pending_events: Vec<StatsigEventInternal> = self
93            .take_all_pending_events()
94            .into_iter()
95            .map(|evt| evt.into_statsig_event_internal())
96            .collect();
97
98        if pending_events.is_empty() {
99            return QueueResult::Success;
100        }
101
102        let mut batches = self.batches.write().unwrap();
103        let old_batches = std::mem::take(&mut *batches);
104
105        let (full_batches, partial_batches): (VecDeque<_>, VecDeque<_>) = old_batches
106            .into_iter()
107            .partition(|batch| batch.events.len() >= self.batch_size);
108
109        for batch in partial_batches {
110            pending_events.extend(batch.events);
111        }
112
113        let new_batches = self.create_batches(pending_events);
114
115        batches.extend(full_batches);
116        batches.extend(new_batches);
117
118        let dropped_events_count = self.clamp_batches(&mut batches);
119        if dropped_events_count > 0 {
120            return QueueResult::DroppedEvents(dropped_events_count);
121        }
122
123        QueueResult::Success
124    }
125
126    fn take_all_pending_events(&self) -> Vec<QueuedEvent> {
127        let mut pending_events = write_lock_or_return!(TAG, self.pending_events, Vec::new());
128        std::mem::take(&mut *pending_events)
129    }
130
131    fn create_batches(&self, mut pending_events: Vec<StatsigEventInternal>) -> Vec<EventBatch> {
132        let mut batches = Vec::new();
133        while !pending_events.is_empty() {
134            let drain_count = self.batch_size.min(pending_events.len());
135            let chunk = pending_events.drain(..drain_count).collect::<Vec<_>>();
136            batches.push(EventBatch::new(chunk));
137        }
138
139        batches
140    }
141
142    fn clamp_batches(&self, batches: &mut VecDeque<EventBatch>) -> u64 {
143        if batches.len() <= self.max_pending_batches {
144            return 0;
145        }
146
147        let mut dropped_events_count = 0;
148        while batches.len() > self.max_pending_batches {
149            if let Some(batch) = batches.pop_front() {
150                dropped_events_count += batch.events.len() as u64;
151            }
152        }
153
154        dropped_events_count
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use std::borrow::Cow;
161
162    use super::*;
163    use crate::event_logging::event_queue::queued_event::EnqueueOperation;
164    use crate::event_logging::event_queue::queued_gate_expo::EnqueueGateExpoOp;
165    use crate::event_logging::exposure_sampling::EvtSamplingDecision::ForceSampled;
166    use crate::{
167        event_logging::event_logger::ExposureTrigger, statsig_types::FeatureGate,
168        user::StatsigUserInternal, EvaluationDetails, StatsigUser,
169    };
170
171    #[test]
172    fn test_adding_single_to_queue() {
173        let (queue, user, gate) = setup(10, 10);
174        let user_internal = StatsigUserInternal::new(&user, None);
175
176        let enqueue_op = EnqueueGateExpoOp {
177            user: &user_internal,
178            queried_gate_name: &gate.name,
179            evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
180            details: EvaluationDetails::unrecognized_no_data(),
181            trigger: ExposureTrigger::Auto,
182        };
183
184        let queued_event = enqueue_op.into_queued_event(ForceSampled);
185
186        let has_exceeded_limit = queue.add(queued_event);
187
188        assert!(!has_exceeded_limit);
189        assert_eq!(queue.pending_events.read().unwrap().len(), 1);
190    }
191
192    #[test]
193    fn test_adding_multiple_to_queue() {
194        let (queue, user, gate) = setup(100, 20);
195        let user_internal = StatsigUserInternal::new(&user, None);
196
197        let mut triggered_count = 0;
198        for _ in 0..4567 {
199            let enqueue_op = EnqueueGateExpoOp {
200                user: &user_internal,
201                queried_gate_name: &gate.name,
202                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
203                details: EvaluationDetails::unrecognized_no_data(),
204                trigger: ExposureTrigger::Auto,
205            };
206
207            let did_trigger = queue.add(enqueue_op.into_queued_event(ForceSampled));
208
209            if did_trigger {
210                triggered_count += 1;
211            }
212        }
213
214        assert_eq!(queue.pending_events.read().unwrap().len(), 4567);
215        assert_eq!(triggered_count, (4567 / 100) as usize);
216    }
217
218    #[test]
219    fn test_take_all_batches() {
220        let batch_size = 200;
221        let max_pending_batches = 40;
222
223        let (queue, user, gate) = setup(batch_size, max_pending_batches);
224        let user_internal = StatsigUserInternal::new(&user, None);
225
226        for _ in 0..4567 {
227            let enqueue_op = EnqueueGateExpoOp {
228                user: &user_internal,
229                queried_gate_name: &gate.name,
230                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
231                details: EvaluationDetails::unrecognized_no_data(),
232                trigger: ExposureTrigger::Auto,
233            };
234            queue.add(enqueue_op.into_queued_event(ForceSampled));
235        }
236
237        queue.reconcile_batching();
238        let batches = queue.take_all_batches();
239        assert_eq!(batches.len(), (4567.0 / batch_size as f64).ceil() as usize,);
240    }
241
242    #[test]
243    fn test_take_next_batch() {
244        let batch_size = 200;
245        let max_pending_batches = 20;
246
247        let (queue, user, gate) = setup(batch_size, max_pending_batches);
248        let user_internal = StatsigUserInternal::new(&user, None);
249
250        for _ in 0..4567 {
251            let enqueue_op = EnqueueGateExpoOp {
252                user: &user_internal,
253                queried_gate_name: &gate.name,
254                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
255                details: EvaluationDetails::unrecognized_no_data(),
256                trigger: ExposureTrigger::Auto,
257            };
258            queue.add(enqueue_op.into_queued_event(ForceSampled));
259        }
260
261        queue.reconcile_batching();
262        let batch = queue.take_next_batch();
263        assert_eq!(batch.unwrap().events.len(), batch_size as usize);
264
265        assert_eq!(
266            queue.batches.read().unwrap().len(),
267            (max_pending_batches - 1) as usize
268        ); // max minus the one we just took
269    }
270
271    fn setup(batch_size: u32, max_queue_size: u32) -> (EventQueue, StatsigUser, FeatureGate) {
272        let queue = EventQueue::new(batch_size, max_queue_size);
273        let user = StatsigUser::with_user_id("user-id");
274        let gate = FeatureGate {
275            name: "gate-name".into(),
276            value: true,
277            rule_id: "rule-id".into(),
278            id_type: "user-id".into(),
279            details: EvaluationDetails::unrecognized_no_data(),
280            __evaluation: None,
281        };
282        (queue, user, gate)
283    }
284}