statsig_rust/event_logging/event_queue/
queue.rs

1use crate::{
2    event_logging::statsig_event_internal::StatsigEventInternal, log_d, read_lock_or_return,
3    write_lock_or_return,
4};
5use std::{collections::VecDeque, sync::RwLock};
6
7use super::{batch::EventBatch, queued_event::QueuedEvent};
8
9const TAG: &str = stringify!(EventQueue);
10
11pub enum QueueAddResult {
12    Noop,
13    NeedsFlush,
14    NeedsFlushAndDropped(u64),
15}
16
17pub enum QueueReconcileResult {
18    Success,
19    DroppedEvents(u64),
20    LockFailure,
21}
22
23pub struct EventQueue {
24    pub batch_size: usize,
25    pub max_pending_batches: usize,
26
27    pending_events: RwLock<Vec<QueuedEvent>>,
28    batches: RwLock<VecDeque<EventBatch>>,
29    max_pending_events: usize,
30}
31
32impl EventQueue {
33    pub fn new(batch_size: u32, max_queue_size: u32) -> Self {
34        let batch_size = batch_size as usize;
35        let max_queue_size = max_queue_size as usize;
36
37        Self {
38            pending_events: RwLock::new(Vec::new()),
39            batches: RwLock::new(VecDeque::new()),
40            batch_size,
41            max_pending_batches: max_queue_size,
42            max_pending_events: batch_size * max_queue_size,
43        }
44    }
45
46    pub fn approximate_pending_events_count(&self) -> usize {
47        let pending_len = read_lock_or_return!(TAG, self.pending_events, 0).len();
48        let batches_len = read_lock_or_return!(TAG, self.batches, 0).len();
49        pending_len + (batches_len * self.batch_size)
50    }
51
52    pub fn add(&self, pending_event: QueuedEvent) -> QueueAddResult {
53        let mut pending_events =
54            write_lock_or_return!(TAG, self.pending_events, QueueAddResult::Noop);
55        pending_events.push(pending_event);
56
57        let len = pending_events.len();
58        if len >= self.max_pending_events {
59            let delta = len - self.max_pending_events;
60            pending_events.drain(0..delta);
61            return QueueAddResult::NeedsFlushAndDropped(delta as u64);
62        }
63
64        if len % self.batch_size == 0 {
65            return QueueAddResult::NeedsFlush;
66        }
67
68        QueueAddResult::Noop
69    }
70
71    pub fn requeue_batch(&self, batch: EventBatch) -> QueueReconcileResult {
72        let len = batch.events.len() as u64;
73        let mut batches =
74            write_lock_or_return!(TAG, self.batches, QueueReconcileResult::DroppedEvents(len));
75
76        if batches.len() > self.max_pending_batches {
77            return QueueReconcileResult::DroppedEvents(len);
78        }
79
80        log_d!(
81            TAG,
82            "Requeueing batch with {} events and {} attempts to flush",
83            batch.events.len(),
84            batch.attempts
85        );
86
87        batches.push_back(batch);
88        QueueReconcileResult::Success
89    }
90
91    pub fn contains_at_least_one_full_batch(&self) -> bool {
92        let pending_events_count = self.pending_events.read().map(|e| e.len()).unwrap_or(0);
93        if pending_events_count >= self.batch_size {
94            return true;
95        }
96
97        let batches = read_lock_or_return!(TAG, self.batches, false);
98        for batch in batches.iter() {
99            if batch.events.len() >= self.batch_size {
100                return true;
101            }
102        }
103
104        false
105    }
106
107    pub fn take_all_batches(&self) -> VecDeque<EventBatch> {
108        let mut batches = write_lock_or_return!(TAG, self.batches, VecDeque::new());
109        std::mem::take(&mut *batches)
110    }
111
112    pub fn take_next_batch(&self) -> Option<EventBatch> {
113        let mut batches = write_lock_or_return!(TAG, self.batches, None);
114        batches.pop_front()
115    }
116
117    pub fn reconcile_batching(&self) -> QueueReconcileResult {
118        let mut pending_events: Vec<StatsigEventInternal> = self
119            .take_all_pending_events()
120            .into_iter()
121            .map(|evt| evt.into_statsig_event_internal())
122            .collect();
123
124        if pending_events.is_empty() {
125            return QueueReconcileResult::Success;
126        }
127
128        let mut batches =
129            write_lock_or_return!(TAG, self.batches, QueueReconcileResult::LockFailure);
130        let old_batches = std::mem::take(&mut *batches);
131
132        let (full_batches, partial_batches): (VecDeque<_>, VecDeque<_>) = old_batches
133            .into_iter()
134            .partition(|batch| batch.events.len() >= self.batch_size);
135
136        for batch in partial_batches {
137            pending_events.extend(batch.events);
138        }
139
140        let new_batches = self.create_batches(pending_events);
141
142        batches.extend(full_batches);
143        batches.extend(new_batches);
144
145        let dropped_events_count = self.clamp_batches(&mut batches);
146        if dropped_events_count > 0 {
147            return QueueReconcileResult::DroppedEvents(dropped_events_count);
148        }
149
150        QueueReconcileResult::Success
151    }
152
153    fn take_all_pending_events(&self) -> Vec<QueuedEvent> {
154        let mut pending_events = write_lock_or_return!(TAG, self.pending_events, Vec::new());
155        std::mem::take(&mut *pending_events)
156    }
157
158    fn create_batches(&self, mut pending_events: Vec<StatsigEventInternal>) -> Vec<EventBatch> {
159        let mut batches = Vec::new();
160        while !pending_events.is_empty() {
161            let drain_count = self.batch_size.min(pending_events.len());
162            let chunk = pending_events.drain(..drain_count).collect::<Vec<_>>();
163            batches.push(EventBatch::new(chunk));
164        }
165
166        batches
167    }
168
169    fn clamp_batches(&self, batches: &mut VecDeque<EventBatch>) -> u64 {
170        if batches.len() <= self.max_pending_batches {
171            return 0;
172        }
173
174        let mut dropped_events_count = 0;
175        while batches.len() > self.max_pending_batches {
176            if let Some(batch) = batches.pop_front() {
177                dropped_events_count += batch.events.len() as u64;
178            }
179        }
180
181        dropped_events_count
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use std::borrow::Cow;
188
189    use super::*;
190    use crate::event_logging::event_queue::queued_event::EnqueueOperation;
191    use crate::event_logging::event_queue::queued_gate_expo::EnqueueGateExpoOp;
192    use crate::event_logging::exposure_sampling::EvtSamplingDecision::ForceSampled;
193    use crate::{
194        event_logging::event_logger::ExposureTrigger, statsig_types::FeatureGate,
195        user::StatsigUserInternal, EvaluationDetails, StatsigUser,
196    };
197
198    #[test]
199    fn test_adding_single_to_queue() {
200        let (queue, user, gate) = setup(10, 10);
201        let user_internal = StatsigUserInternal::new(&user, None);
202
203        let enqueue_op = EnqueueGateExpoOp {
204            user: &user_internal,
205            queried_gate_name: &gate.name,
206            evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
207            details: EvaluationDetails::unrecognized_no_data(),
208            trigger: ExposureTrigger::Auto,
209        };
210
211        let queued_event = enqueue_op.into_queued_event(ForceSampled);
212
213        let result = queue.add(queued_event);
214
215        assert!(matches!(result, QueueAddResult::Noop));
216        assert_eq!(queue.pending_events.read().unwrap().len(), 1);
217    }
218
219    #[test]
220    fn test_adding_multiple_to_queue() {
221        let (queue, user, gate) = setup(1000, 20);
222        let user_internal = StatsigUserInternal::new(&user, None);
223
224        let mut triggered_count = 0;
225        for _ in 0..4567 {
226            let enqueue_op = EnqueueGateExpoOp {
227                user: &user_internal,
228                queried_gate_name: &gate.name,
229                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
230                details: EvaluationDetails::unrecognized_no_data(),
231                trigger: ExposureTrigger::Auto,
232            };
233
234            let result = queue.add(enqueue_op.into_queued_event(ForceSampled));
235
236            if let QueueAddResult::NeedsFlush = result {
237                triggered_count += 1;
238            }
239        }
240
241        assert_eq!(queue.pending_events.read().unwrap().len(), 4567);
242        assert_eq!(triggered_count, (4567 / 1000) as usize);
243    }
244
245    #[test]
246    fn test_take_all_batches() {
247        let batch_size = 200;
248        let max_pending_batches = 40;
249
250        let (queue, user, gate) = setup(batch_size, max_pending_batches);
251        let user_internal = StatsigUserInternal::new(&user, None);
252
253        for _ in 0..4567 {
254            let enqueue_op = EnqueueGateExpoOp {
255                user: &user_internal,
256                queried_gate_name: &gate.name,
257                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
258                details: EvaluationDetails::unrecognized_no_data(),
259                trigger: ExposureTrigger::Auto,
260            };
261            queue.add(enqueue_op.into_queued_event(ForceSampled));
262        }
263
264        queue.reconcile_batching();
265        let batches = queue.take_all_batches();
266        assert_eq!(batches.len(), (4567.0 / batch_size as f64).ceil() as usize,);
267    }
268
269    #[test]
270    fn test_take_next_batch() {
271        let batch_size = 200;
272        let max_pending_batches = 20;
273
274        let (queue, user, gate) = setup(batch_size, max_pending_batches);
275        let user_internal = StatsigUserInternal::new(&user, None);
276
277        for _ in 0..4567 {
278            let enqueue_op = EnqueueGateExpoOp {
279                user: &user_internal,
280                queried_gate_name: &gate.name,
281                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
282                details: EvaluationDetails::unrecognized_no_data(),
283                trigger: ExposureTrigger::Auto,
284            };
285            queue.add(enqueue_op.into_queued_event(ForceSampled));
286        }
287
288        queue.reconcile_batching();
289        let batch = queue.take_next_batch();
290        assert_eq!(batch.unwrap().events.len(), batch_size as usize);
291
292        assert_eq!(
293            queue.batches.read().unwrap().len(),
294            (max_pending_batches - 1) as usize
295        ); // max minus the one we just took
296    }
297
298    fn setup(batch_size: u32, max_queue_size: u32) -> (EventQueue, StatsigUser, FeatureGate) {
299        let queue = EventQueue::new(batch_size, max_queue_size);
300        let user = StatsigUser::with_user_id("user-id");
301        let gate = FeatureGate {
302            name: "gate-name".into(),
303            value: true,
304            rule_id: "rule-id".into(),
305            id_type: "user-id".into(),
306            details: EvaluationDetails::unrecognized_no_data(),
307            __evaluation: None,
308        };
309        (queue, user, gate)
310    }
311}