statsig_rust/event_logging/event_queue/
queue.rs1use super::{batch::EventBatch, queued_event::QueuedEvent};
2use crate::{
3 event_logging::statsig_event_internal::StatsigEventInternal, log_d, read_lock_or_return,
4 write_lock_or_return,
5};
6use parking_lot::RwLock;
7use std::collections::VecDeque;
8
9const TAG: &str = stringify!(EventQueue);
10
11pub enum QueueAddResult {
12 Noop,
13 NeedsFlush,
14 NeedsFlushAndDropped(u64),
15}
16
17pub enum QueueReconcileResult {
18 Success,
19 DroppedEvents(u64),
20 LockFailure,
21}
22
23pub struct EventQueue {
24 pub batch_size: usize,
25 pub max_pending_batches: usize,
26
27 pending_events: RwLock<VecDeque<QueuedEvent>>,
28 batches: RwLock<VecDeque<EventBatch>>,
29 max_pending_events: usize,
30}
31
32impl EventQueue {
33 pub fn new(batch_size: u32, max_queue_size: u32) -> Self {
34 let batch_size = batch_size as usize;
35 let max_queue_size = max_queue_size as usize;
36
37 Self {
38 pending_events: RwLock::new(VecDeque::new()),
39 batches: RwLock::new(VecDeque::new()),
40 batch_size,
41 max_pending_batches: max_queue_size,
42 max_pending_events: batch_size * max_queue_size,
43 }
44 }
45
46 pub fn approximate_pending_events_count(&self) -> usize {
47 let pending_len = read_lock_or_return!(TAG, self.pending_events, 0).len();
48 let batches_len = read_lock_or_return!(TAG, self.batches, 0).len();
49 pending_len + (batches_len * self.batch_size)
50 }
51
52 pub fn add(&self, pending_event: QueuedEvent) -> QueueAddResult {
53 let mut pending_events =
54 write_lock_or_return!(TAG, self.pending_events, QueueAddResult::Noop);
55 pending_events.push_back(pending_event);
56
57 let mut dropped_events = 0;
58 while pending_events.len() > self.max_pending_events {
59 pending_events.pop_front();
60 dropped_events += 1;
61 }
62
63 if dropped_events > 0 {
64 return QueueAddResult::NeedsFlushAndDropped(dropped_events);
65 }
66
67 if pending_events.len() % self.batch_size == 0 {
68 return QueueAddResult::NeedsFlush;
69 }
70
71 QueueAddResult::Noop
72 }
73
74 pub fn requeue_batch(&self, batch: EventBatch) -> QueueReconcileResult {
75 let len = batch.events.len() as u64;
76 let mut batches =
77 write_lock_or_return!(TAG, self.batches, QueueReconcileResult::DroppedEvents(len));
78
79 if batches.len() > self.max_pending_batches {
80 return QueueReconcileResult::DroppedEvents(len);
81 }
82
83 log_d!(
84 TAG,
85 "Requeueing batch with {} events and {} attempts to flush",
86 batch.events.len(),
87 batch.attempts
88 );
89
90 batches.push_back(batch);
91 QueueReconcileResult::Success
92 }
93
94 pub fn contains_at_least_one_full_batch(&self) -> bool {
95 let pending_events_count = self
96 .pending_events
97 .try_read_for(std::time::Duration::from_secs(5))
98 .map(|e| e.len())
99 .unwrap_or(0);
100 if pending_events_count >= self.batch_size {
101 return true;
102 }
103
104 let batches = read_lock_or_return!(TAG, self.batches, false);
105 for batch in batches.iter() {
106 if batch.events.len() >= self.batch_size {
107 return true;
108 }
109 }
110
111 false
112 }
113
114 pub fn take_all_batches(&self) -> VecDeque<EventBatch> {
115 let mut batches = write_lock_or_return!(TAG, self.batches, VecDeque::new());
116 std::mem::take(&mut *batches)
117 }
118
119 pub fn take_next_batch(&self) -> Option<EventBatch> {
120 let mut batches = write_lock_or_return!(TAG, self.batches, None);
121 batches.pop_front()
122 }
123
124 pub fn reconcile_batching(&self) -> QueueReconcileResult {
125 let mut pending_events: VecDeque<StatsigEventInternal> = self
126 .take_all_pending_events()
127 .into_iter()
128 .map(|evt| evt.into_statsig_event_internal())
129 .collect();
130
131 if pending_events.is_empty() {
132 return QueueReconcileResult::Success;
133 }
134
135 let mut batches =
136 write_lock_or_return!(TAG, self.batches, QueueReconcileResult::LockFailure);
137 let old_batches = std::mem::take(&mut *batches);
138
139 let (full_batches, partial_batches): (VecDeque<_>, VecDeque<_>) = old_batches
140 .into_iter()
141 .partition(|batch| batch.events.len() >= self.batch_size);
142
143 for batch in partial_batches {
144 pending_events.extend(batch.events);
145 }
146
147 let new_batches = self.create_batches(pending_events);
148
149 batches.extend(full_batches);
150 batches.extend(new_batches);
151
152 let dropped_events_count = self.clamp_batches(&mut batches);
153 if dropped_events_count > 0 {
154 return QueueReconcileResult::DroppedEvents(dropped_events_count);
155 }
156
157 QueueReconcileResult::Success
158 }
159
160 fn take_all_pending_events(&self) -> VecDeque<QueuedEvent> {
161 let mut pending_events = write_lock_or_return!(TAG, self.pending_events, VecDeque::new());
162 std::mem::take(&mut *pending_events)
163 }
164
165 fn create_batches(
166 &self,
167 mut pending_events: VecDeque<StatsigEventInternal>,
168 ) -> Vec<EventBatch> {
169 let mut batches = Vec::new();
170 while !pending_events.is_empty() {
171 let drain_count = self.batch_size.min(pending_events.len());
172 let chunk = pending_events.drain(..drain_count).collect::<Vec<_>>();
173 batches.push(EventBatch::new(chunk));
174 }
175
176 batches
177 }
178
179 fn clamp_batches(&self, batches: &mut VecDeque<EventBatch>) -> u64 {
180 if batches.len() <= self.max_pending_batches {
181 return 0;
182 }
183
184 let mut dropped_events_count = 0;
185 while batches.len() > self.max_pending_batches {
186 if let Some(batch) = batches.pop_front() {
187 dropped_events_count += batch.events.len() as u64;
188 }
189 }
190
191 dropped_events_count
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use std::borrow::Cow;
198
199 use super::*;
200 use crate::event_logging::event_queue::queued_event::EnqueueOperation;
201 use crate::event_logging::event_queue::queued_gate_expo::EnqueueGateExpoOp;
202 use crate::event_logging::exposure_sampling::EvtSamplingDecision::ForceSampled;
203 use crate::{
204 event_logging::event_logger::ExposureTrigger, statsig_types::FeatureGate,
205 user::StatsigUserInternal, EvaluationDetails, StatsigUser,
206 };
207
208 #[test]
209 fn test_adding_single_to_queue() {
210 let (queue, user, gate) = setup(10, 10);
211 let user_internal = StatsigUserInternal::new(&user, None);
212
213 let enqueue_op = EnqueueGateExpoOp {
214 exposure_time: 1,
215 user: &user_internal,
216 queried_gate_name: &gate.name,
217 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
218 details: EvaluationDetails::unrecognized_no_data(),
219 trigger: ExposureTrigger::Auto,
220 };
221
222 let queued_event = enqueue_op.into_queued_event(ForceSampled);
223
224 let result = queue.add(queued_event);
225
226 assert!(matches!(result, QueueAddResult::Noop));
227 assert_eq!(
228 queue
229 .pending_events
230 .try_read_for(std::time::Duration::from_secs(5))
231 .unwrap()
232 .len(),
233 1
234 );
235 }
236
237 #[test]
238 fn test_adding_multiple_to_queue() {
239 let (queue, user, gate) = setup(1000, 20);
240 let user_internal = StatsigUserInternal::new(&user, None);
241
242 let mut triggered_count = 0;
243 for _ in 0..4567 {
244 let enqueue_op = EnqueueGateExpoOp {
245 exposure_time: 1,
246 user: &user_internal,
247 queried_gate_name: &gate.name,
248 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
249 details: EvaluationDetails::unrecognized_no_data(),
250 trigger: ExposureTrigger::Auto,
251 };
252
253 let result = queue.add(enqueue_op.into_queued_event(ForceSampled));
254
255 if let QueueAddResult::NeedsFlush = result {
256 triggered_count += 1;
257 }
258 }
259
260 assert_eq!(
261 queue
262 .pending_events
263 .try_read_for(std::time::Duration::from_secs(5))
264 .unwrap()
265 .len(),
266 4567
267 );
268 assert_eq!(triggered_count, (4567 / 1000) as usize);
269 }
270
271 #[test]
272 fn test_take_all_batches() {
273 let batch_size = 200;
274 let max_pending_batches = 40;
275
276 let (queue, user, gate) = setup(batch_size, max_pending_batches);
277 let user_internal = StatsigUserInternal::new(&user, None);
278
279 for _ in 0..4567 {
280 let enqueue_op = EnqueueGateExpoOp {
281 exposure_time: 1,
282 user: &user_internal,
283 queried_gate_name: &gate.name,
284 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
285 details: EvaluationDetails::unrecognized_no_data(),
286 trigger: ExposureTrigger::Auto,
287 };
288 queue.add(enqueue_op.into_queued_event(ForceSampled));
289 }
290
291 queue.reconcile_batching();
292 let batches = queue.take_all_batches();
293 assert_eq!(batches.len(), (4567.0 / batch_size as f64).ceil() as usize,);
294 }
295
296 #[test]
297 fn test_take_next_batch() {
298 let batch_size = 200;
299 let max_pending_batches = 20;
300
301 let (queue, user, gate) = setup(batch_size, max_pending_batches);
302 let user_internal = StatsigUserInternal::new(&user, None);
303
304 for _ in 0..4567 {
305 let enqueue_op = EnqueueGateExpoOp {
306 exposure_time: 1,
307 user: &user_internal,
308 queried_gate_name: &gate.name,
309 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
310 details: EvaluationDetails::unrecognized_no_data(),
311 trigger: ExposureTrigger::Auto,
312 };
313 queue.add(enqueue_op.into_queued_event(ForceSampled));
314 }
315
316 queue.reconcile_batching();
317 let batch = queue.take_next_batch();
318 assert_eq!(batch.unwrap().events.len(), batch_size as usize);
319
320 assert_eq!(
321 queue
322 .batches
323 .try_read_for(std::time::Duration::from_secs(5))
324 .unwrap()
325 .len(),
326 (max_pending_batches - 1) as usize
327 ); }
329
330 fn setup(batch_size: u32, max_queue_size: u32) -> (EventQueue, StatsigUser, FeatureGate) {
331 let queue = EventQueue::new(batch_size, max_queue_size);
332 let user = StatsigUser::with_user_id("user-id");
333 let gate = FeatureGate {
334 name: "gate-name".into(),
335 value: true,
336 rule_id: "rule-id".into(),
337 id_type: "user-id".into(),
338 details: EvaluationDetails::unrecognized_no_data(),
339 __evaluation: None,
340 };
341 (queue, user, gate)
342 }
343}