statsig_rust/event_logging/event_queue/
queue.rs

1use crate::{
2    event_logging::statsig_event_internal::StatsigEventInternal, log_d, read_lock_or_return,
3    write_lock_or_return,
4};
5use std::{collections::VecDeque, sync::RwLock};
6
7use super::{batch::EventBatch, queued_event::QueuedEvent};
8
9const TAG: &str = stringify!(EventQueue);
10
11pub enum QueueResult {
12    Success,
13    DroppedEvents(u64),
14}
15
16pub struct EventQueue {
17    pub batch_size: usize,
18    pub max_pending_batches: usize,
19
20    pending_events: RwLock<Vec<QueuedEvent>>,
21    batches: RwLock<VecDeque<EventBatch>>,
22}
23
24impl EventQueue {
25    pub fn new(batch_size: u32, max_queue_size: u32) -> Self {
26        Self {
27            pending_events: RwLock::new(Vec::new()),
28            batches: RwLock::new(VecDeque::new()),
29            batch_size: batch_size as usize,
30            max_pending_batches: max_queue_size as usize,
31        }
32    }
33
34    pub fn add(&self, pending_event: QueuedEvent) -> bool {
35        let mut pending_events = write_lock_or_return!(TAG, self.pending_events, false);
36        pending_events.push(pending_event);
37        pending_events.len() % self.batch_size == 0
38    }
39
40    pub fn requeue_batch(&self, batch: EventBatch) -> QueueResult {
41        let len = batch.events.len() as u64;
42        let mut batches = write_lock_or_return!(TAG, self.batches, QueueResult::DroppedEvents(len));
43
44        if batches.len() > self.max_pending_batches {
45            return QueueResult::DroppedEvents(len);
46        }
47
48        log_d!(
49            TAG,
50            "Requeueing batch with {} events and {} attempts to flush",
51            batch.events.len(),
52            batch.attempts
53        );
54
55        batches.push_back(batch);
56        QueueResult::Success
57    }
58
59    pub fn contains_at_least_one_full_batch(&self) -> bool {
60        let pending_events_count = self.pending_events.read().map(|e| e.len()).unwrap_or(0);
61        if pending_events_count >= self.batch_size {
62            return true;
63        }
64
65        let batches = read_lock_or_return!(TAG, self.batches, false);
66        for batch in batches.iter() {
67            if batch.events.len() >= self.batch_size {
68                return true;
69            }
70        }
71
72        false
73    }
74
75    pub fn take_all_batches(&self) -> VecDeque<EventBatch> {
76        let mut batches = self.batches.write().unwrap();
77        std::mem::take(&mut *batches)
78    }
79
80    pub fn take_next_batch(&self) -> Option<EventBatch> {
81        let mut batches = self.batches.write().unwrap();
82        batches.pop_front()
83    }
84
85    pub fn reconcile_batching(&self) -> QueueResult {
86        let mut pending_events: Vec<StatsigEventInternal> = self
87            .take_all_pending_events()
88            .into_iter()
89            .map(|evt| evt.into_statsig_event_internal())
90            .collect();
91
92        if pending_events.is_empty() {
93            return QueueResult::Success;
94        }
95
96        let mut batches = self.batches.write().unwrap();
97        let old_batches = std::mem::take(&mut *batches);
98
99        let (full_batches, partial_batches): (VecDeque<_>, VecDeque<_>) = old_batches
100            .into_iter()
101            .partition(|batch| batch.events.len() >= self.batch_size);
102
103        for batch in partial_batches {
104            pending_events.extend(batch.events);
105        }
106
107        let new_batches = self.create_batches(pending_events);
108
109        batches.extend(full_batches);
110        batches.extend(new_batches);
111
112        let dropped_events_count = self.clamp_batches(&mut batches);
113        if dropped_events_count > 0 {
114            return QueueResult::DroppedEvents(dropped_events_count);
115        }
116
117        QueueResult::Success
118    }
119
120    fn take_all_pending_events(&self) -> Vec<QueuedEvent> {
121        let mut pending_events = write_lock_or_return!(TAG, self.pending_events, Vec::new());
122        std::mem::take(&mut *pending_events)
123    }
124
125    fn create_batches(&self, mut pending_events: Vec<StatsigEventInternal>) -> Vec<EventBatch> {
126        let mut batches = Vec::new();
127        while !pending_events.is_empty() {
128            let drain_count = self.batch_size.min(pending_events.len());
129            let chunk = pending_events.drain(..drain_count).collect::<Vec<_>>();
130            batches.push(EventBatch::new(chunk));
131        }
132
133        batches
134    }
135
136    fn clamp_batches(&self, batches: &mut VecDeque<EventBatch>) -> u64 {
137        if batches.len() <= self.max_pending_batches {
138            return 0;
139        }
140
141        let mut dropped_events_count = 0;
142        while batches.len() > self.max_pending_batches {
143            if let Some(batch) = batches.pop_front() {
144                dropped_events_count += batch.events.len() as u64;
145            }
146        }
147
148        dropped_events_count
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use std::borrow::Cow;
155
156    use super::*;
157    use crate::event_logging::event_queue::queued_event::EnqueueOperation;
158    use crate::event_logging::event_queue::queued_gate_expo::EnqueueGateExpoOp;
159    use crate::event_logging::exposure_sampling::EvtSamplingDecision::ForceSampled;
160    use crate::{
161        event_logging::event_logger::ExposureTrigger, statsig_types::FeatureGate,
162        user::StatsigUserInternal, EvaluationDetails, StatsigUser,
163    };
164
165    #[test]
166    fn test_adding_single_to_queue() {
167        let (queue, user, gate) = setup(10, 10);
168        let user_internal = StatsigUserInternal::new(&user, None);
169
170        let enqueue_op = EnqueueGateExpoOp {
171            user: &user_internal,
172            queried_gate_name: &gate.name,
173            evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
174            details: EvaluationDetails::unrecognized_no_data(),
175            trigger: ExposureTrigger::Auto,
176        };
177
178        let queued_event = enqueue_op.into_queued_event(ForceSampled);
179
180        let has_exceeded_limit = queue.add(queued_event);
181
182        assert!(!has_exceeded_limit);
183        assert_eq!(queue.pending_events.read().unwrap().len(), 1);
184    }
185
186    #[test]
187    fn test_adding_multiple_to_queue() {
188        let (queue, user, gate) = setup(100, 20);
189        let user_internal = StatsigUserInternal::new(&user, None);
190
191        let mut triggered_count = 0;
192        for _ in 0..4567 {
193            let enqueue_op = EnqueueGateExpoOp {
194                user: &user_internal,
195                queried_gate_name: &gate.name,
196                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
197                details: EvaluationDetails::unrecognized_no_data(),
198                trigger: ExposureTrigger::Auto,
199            };
200
201            let did_trigger = queue.add(enqueue_op.into_queued_event(ForceSampled));
202
203            if did_trigger {
204                triggered_count += 1;
205            }
206        }
207
208        assert_eq!(queue.pending_events.read().unwrap().len(), 4567);
209        assert_eq!(triggered_count, (4567 / 100) as usize);
210    }
211
212    #[test]
213    fn test_take_all_batches() {
214        let batch_size = 200;
215        let max_pending_batches = 40;
216
217        let (queue, user, gate) = setup(batch_size, max_pending_batches);
218        let user_internal = StatsigUserInternal::new(&user, None);
219
220        for _ in 0..4567 {
221            let enqueue_op = EnqueueGateExpoOp {
222                user: &user_internal,
223                queried_gate_name: &gate.name,
224                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
225                details: EvaluationDetails::unrecognized_no_data(),
226                trigger: ExposureTrigger::Auto,
227            };
228            queue.add(enqueue_op.into_queued_event(ForceSampled));
229        }
230
231        queue.reconcile_batching();
232        let batches = queue.take_all_batches();
233        assert_eq!(batches.len(), (4567.0 / batch_size as f64).ceil() as usize,);
234    }
235
236    #[test]
237    fn test_take_next_batch() {
238        let batch_size = 200;
239        let max_pending_batches = 20;
240
241        let (queue, user, gate) = setup(batch_size, max_pending_batches);
242        let user_internal = StatsigUserInternal::new(&user, None);
243
244        for _ in 0..4567 {
245            let enqueue_op = EnqueueGateExpoOp {
246                user: &user_internal,
247                queried_gate_name: &gate.name,
248                evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
249                details: EvaluationDetails::unrecognized_no_data(),
250                trigger: ExposureTrigger::Auto,
251            };
252            queue.add(enqueue_op.into_queued_event(ForceSampled));
253        }
254
255        queue.reconcile_batching();
256        let batch = queue.take_next_batch();
257        assert_eq!(batch.unwrap().events.len(), batch_size as usize);
258
259        assert_eq!(
260            queue.batches.read().unwrap().len(),
261            (max_pending_batches - 1) as usize
262        ); // max minus the one we just took
263    }
264
265    fn setup(batch_size: u32, max_queue_size: u32) -> (EventQueue, StatsigUser, FeatureGate) {
266        let queue = EventQueue::new(batch_size, max_queue_size);
267        let user = StatsigUser::with_user_id("user-id");
268        let gate = FeatureGate {
269            name: "gate-name".into(),
270            value: true,
271            rule_id: "rule-id".into(),
272            id_type: "user-id".into(),
273            details: EvaluationDetails::unrecognized_no_data(),
274            __evaluation: None,
275        };
276        (queue, user, gate)
277    }
278}