statsig_rust/event_logging/event_queue/
queue.rs

1use super::{batch::EventBatch, queued_event::QueuedEvent};
2use crate::{
3    event_logging::statsig_event_internal::StatsigEventInternal, log_d, read_lock_or_return,
4    write_lock_or_return,
5};
6use parking_lot::RwLock;
7use std::collections::VecDeque;
8
9const TAG: &str = stringify!(EventQueue);
10
11pub enum QueueAddResult {
12    Noop,
13    NeedsFlush,
14    NeedsFlushAndDropped(u64),
15}
16
17pub enum QueueReconcileResult {
18    Success,
19    DroppedEvents(u64),
20    LockFailure,
21}
22
23pub struct EventQueue {
24    pub batch_size: usize,
25    pub max_pending_batches: usize,
26
27    pending_events: RwLock<VecDeque<QueuedEvent>>,
28    batches: RwLock<VecDeque<EventBatch>>,
29    max_pending_events: usize,
30}
31
32impl EventQueue {
33    pub fn new(batch_size: u32, max_queue_size: u32) -> Self {
34        let batch_size = batch_size as usize;
35        let max_queue_size = max_queue_size as usize;
36
37        Self {
38            pending_events: RwLock::new(VecDeque::new()),
39            batches: RwLock::new(VecDeque::new()),
40            batch_size,
41            max_pending_batches: max_queue_size,
42            max_pending_events: batch_size * max_queue_size,
43        }
44    }
45
46    pub fn approximate_pending_events_count(&self) -> usize {
47        let pending_len = read_lock_or_return!(TAG, self.pending_events, 0).len();
48        let batches_len = read_lock_or_return!(TAG, self.batches, 0).len();
49        pending_len + (batches_len * self.batch_size)
50    }
51
52    pub fn add(&self, pending_event: QueuedEvent) -> QueueAddResult {
53        let mut pending_events =
54            write_lock_or_return!(TAG, self.pending_events, QueueAddResult::Noop);
55        pending_events.push_back(pending_event);
56
57        let mut dropped_events = 0;
58        while pending_events.len() > self.max_pending_events {
59            pending_events.pop_front();
60            dropped_events += 1;
61        }
62
63        if dropped_events > 0 {
64            return QueueAddResult::NeedsFlushAndDropped(dropped_events);
65        }
66
67        if pending_events.len() % self.batch_size == 0 {
68            return QueueAddResult::NeedsFlush;
69        }
70
71        QueueAddResult::Noop
72    }
73
74    pub fn requeue_batch(&self, batch: EventBatch) -> QueueReconcileResult {
75        let len = batch.events.len() as u64;
76        let mut batches =
77            write_lock_or_return!(TAG, self.batches, QueueReconcileResult::DroppedEvents(len));
78
79        if batches.len() > self.max_pending_batches {
80            return QueueReconcileResult::DroppedEvents(len);
81        }
82
83        log_d!(
84            TAG,
85            "Requeueing batch with {} events and {} attempts to flush",
86            batch.events.len(),
87            batch.attempts
88        );
89
90        batches.push_back(batch);
91        QueueReconcileResult::Success
92    }
93
94    pub fn contains_at_least_one_full_batch(&self) -> bool {
95        let pending_events_count = self
96            .pending_events
97            .try_read_for(std::time::Duration::from_secs(5))
98            .map(|e| e.len())
99            .unwrap_or(0);
100        if pending_events_count >= self.batch_size {
101            return true;
102        }
103
104        let batches = read_lock_or_return!(TAG, self.batches, false);
105        for batch in batches.iter() {
106            if batch.events.len() >= self.batch_size {
107                return true;
108            }
109        }
110
111        false
112    }
113
114    pub fn take_all_batches(&self) -> VecDeque<EventBatch> {
115        let mut batches = write_lock_or_return!(TAG, self.batches, VecDeque::new());
116        std::mem::take(&mut *batches)
117    }
118
119    pub fn take_next_batch(&self) -> Option<EventBatch> {
120        let mut batches = write_lock_or_return!(TAG, self.batches, None);
121        batches.pop_front()
122    }
123
124    pub fn reconcile_batching(&self) -> QueueReconcileResult {
125        let mut pending_events: VecDeque<StatsigEventInternal> = self
126            .take_all_pending_events()
127            .into_iter()
128            .map(|evt| evt.into_statsig_event_internal())
129            .collect();
130
131        if pending_events.is_empty() {
132            return QueueReconcileResult::Success;
133        }
134
135        let mut batches =
136            write_lock_or_return!(TAG, self.batches, QueueReconcileResult::LockFailure);
137        let old_batches = std::mem::take(&mut *batches);
138
139        let (full_batches, partial_batches): (VecDeque<_>, VecDeque<_>) = old_batches
140            .into_iter()
141            .partition(|batch| batch.events.len() >= self.batch_size);
142
143        for batch in partial_batches {
144            pending_events.extend(batch.events);
145        }
146
147        let new_batches = self.create_batches(pending_events);
148
149        batches.extend(full_batches);
150        batches.extend(new_batches);
151
152        let dropped_events_count = self.clamp_batches(&mut batches);
153        if dropped_events_count > 0 {
154            return QueueReconcileResult::DroppedEvents(dropped_events_count);
155        }
156
157        QueueReconcileResult::Success
158    }
159
160    fn take_all_pending_events(&self) -> VecDeque<QueuedEvent> {
161        let mut pending_events = write_lock_or_return!(TAG, self.pending_events, VecDeque::new());
162        std::mem::take(&mut *pending_events)
163    }
164
165    fn create_batches(
166        &self,
167        mut pending_events: VecDeque<StatsigEventInternal>,
168    ) -> Vec<EventBatch> {
169        let mut batches = Vec::new();
170        while !pending_events.is_empty() {
171            let drain_count = self.batch_size.min(pending_events.len());
172            let chunk = pending_events.drain(..drain_count).collect::<Vec<_>>();
173            batches.push(EventBatch::new(chunk));
174        }
175
176        batches
177    }
178
179    fn clamp_batches(&self, batches: &mut VecDeque<EventBatch>) -> u64 {
180        if batches.len() <= self.max_pending_batches {
181            return 0;
182        }
183
184        let mut dropped_events_count = 0;
185        while batches.len() > self.max_pending_batches {
186            if let Some(batch) = batches.pop_front() {
187                dropped_events_count += batch.events.len() as u64;
188            }
189        }
190
191        dropped_events_count
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use std::borrow::Cow;
198
199    use super::*;
200    use crate::event_logging::event_queue::queued_event::EnqueueOperation;
201    use crate::event_logging::event_queue::queued_gate_expo::EnqueueGateExpoOp;
202    use crate::event_logging::exposure_sampling::EvtSamplingDecision::ForceSampled;
203    use crate::{
204        event_logging::event_logger::ExposureTrigger, statsig_types::FeatureGate,
205        user::StatsigUserInternal, EvaluationDetails, StatsigUser,
206    };
207
208    #[test]
209    fn test_adding_single_to_queue() {
210        let (queue, user, gate) = setup(10, 10);
211        let user_internal = StatsigUserInternal::new(&user, None);
212
213        let enqueue_op = EnqueueGateExpoOp {
214            exposure_time: 1,
215            user: &user_internal,
216            queried_gate_name: &gate.name,
217            evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
218            details: EvaluationDetails::unrecognized_no_data(),
219            trigger: ExposureTrigger::Auto,
220        };
221
222        let queued_event = enqueue_op.into_queued_event(ForceSampled);
223
224        let result = queue.add(queued_event);
225
226        assert!(matches!(result, QueueAddResult::Noop));
227        assert_eq!(
228            queue
229                .pending_events
230                .try_read_for(std::time::Duration::from_secs(5))
231                .unwrap()
232                .len(),
233            1
234        );
235    }
236
237    #[test]
238    fn test_adding_multiple_to_queue() {
239        let (queue, user, gate) = setup(1000, 20);
240        let user_internal = StatsigUserInternal::new(&user, None);
241
242        let mut triggered_count = 0;
243        for _ in 0..4567 {
244            let enqueue_op = EnqueueGateExpoOp {
245                exposure_time: 1,
246                user: &user_internal,
247                queried_gate_name: &gate.name,
248                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
249                details: EvaluationDetails::unrecognized_no_data(),
250                trigger: ExposureTrigger::Auto,
251            };
252
253            let result = queue.add(enqueue_op.into_queued_event(ForceSampled));
254
255            if let QueueAddResult::NeedsFlush = result {
256                triggered_count += 1;
257            }
258        }
259
260        assert_eq!(
261            queue
262                .pending_events
263                .try_read_for(std::time::Duration::from_secs(5))
264                .unwrap()
265                .len(),
266            4567
267        );
268        assert_eq!(triggered_count, (4567 / 1000) as usize);
269    }
270
271    #[test]
272    fn test_take_all_batches() {
273        let batch_size = 200;
274        let max_pending_batches = 40;
275
276        let (queue, user, gate) = setup(batch_size, max_pending_batches);
277        let user_internal = StatsigUserInternal::new(&user, None);
278
279        for _ in 0..4567 {
280            let enqueue_op = EnqueueGateExpoOp {
281                exposure_time: 1,
282                user: &user_internal,
283                queried_gate_name: &gate.name,
284                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
285                details: EvaluationDetails::unrecognized_no_data(),
286                trigger: ExposureTrigger::Auto,
287            };
288            queue.add(enqueue_op.into_queued_event(ForceSampled));
289        }
290
291        queue.reconcile_batching();
292        let batches = queue.take_all_batches();
293        assert_eq!(batches.len(), (4567.0 / batch_size as f64).ceil() as usize,);
294    }
295
296    #[test]
297    fn test_take_next_batch() {
298        let batch_size = 200;
299        let max_pending_batches = 20;
300
301        let (queue, user, gate) = setup(batch_size, max_pending_batches);
302        let user_internal = StatsigUserInternal::new(&user, None);
303
304        for _ in 0..4567 {
305            let enqueue_op = EnqueueGateExpoOp {
306                exposure_time: 1,
307                user: &user_internal,
308                queried_gate_name: &gate.name,
309                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
310                details: EvaluationDetails::unrecognized_no_data(),
311                trigger: ExposureTrigger::Auto,
312            };
313            queue.add(enqueue_op.into_queued_event(ForceSampled));
314        }
315
316        queue.reconcile_batching();
317        let batch = queue.take_next_batch();
318        assert_eq!(batch.unwrap().events.len(), batch_size as usize);
319
320        assert_eq!(
321            queue
322                .batches
323                .try_read_for(std::time::Duration::from_secs(5))
324                .unwrap()
325                .len(),
326            (max_pending_batches - 1) as usize
327        ); // max minus the one we just took
328    }
329
330    fn setup(batch_size: u32, max_queue_size: u32) -> (EventQueue, StatsigUser, FeatureGate) {
331        let queue = EventQueue::new(batch_size, max_queue_size);
332        let user = StatsigUser::with_user_id("user-id");
333        let gate = FeatureGate {
334            name: "gate-name".into(),
335            value: true,
336            rule_id: "rule-id".into(),
337            id_type: "user-id".into(),
338            details: EvaluationDetails::unrecognized_no_data(),
339            __evaluation: None,
340        };
341        (queue, user, gate)
342    }
343}