statsig_rust/event_logging/event_queue/
queue.rs1use crate::{
2 event_logging::statsig_event_internal::StatsigEventInternal, log_d, read_lock_or_return,
3 write_lock_or_return,
4};
5use std::{collections::VecDeque, sync::RwLock};
6
7use super::{batch::EventBatch, queued_event::QueuedEvent};
8
9const TAG: &str = stringify!(EventQueue);
10
11pub enum QueueAddResult {
12 Noop,
13 NeedsFlush,
14 NeedsFlushAndDropped(u64),
15}
16
17pub enum QueueReconcileResult {
18 Success,
19 DroppedEvents(u64),
20 LockFailure,
21}
22
23pub struct EventQueue {
24 pub batch_size: usize,
25 pub max_pending_batches: usize,
26
27 pending_events: RwLock<Vec<QueuedEvent>>,
28 batches: RwLock<VecDeque<EventBatch>>,
29 max_pending_events: usize,
30}
31
32impl EventQueue {
33 pub fn new(batch_size: u32, max_queue_size: u32) -> Self {
34 let batch_size = batch_size as usize;
35 let max_queue_size = max_queue_size as usize;
36
37 Self {
38 pending_events: RwLock::new(Vec::new()),
39 batches: RwLock::new(VecDeque::new()),
40 batch_size,
41 max_pending_batches: max_queue_size,
42 max_pending_events: batch_size * max_queue_size,
43 }
44 }
45
46 pub fn approximate_pending_events_count(&self) -> usize {
47 let pending_len = read_lock_or_return!(TAG, self.pending_events, 0).len();
48 let batches_len = read_lock_or_return!(TAG, self.batches, 0).len();
49 pending_len + (batches_len * self.batch_size)
50 }
51
52 pub fn add(&self, pending_event: QueuedEvent) -> QueueAddResult {
53 let mut pending_events =
54 write_lock_or_return!(TAG, self.pending_events, QueueAddResult::Noop);
55 pending_events.push(pending_event);
56
57 let len = pending_events.len();
58 if len >= self.max_pending_events {
59 let delta = len - self.max_pending_events;
60 pending_events.drain(0..delta);
61 return QueueAddResult::NeedsFlushAndDropped(delta as u64);
62 }
63
64 if len % self.batch_size == 0 {
65 return QueueAddResult::NeedsFlush;
66 }
67
68 QueueAddResult::Noop
69 }
70
71 pub fn requeue_batch(&self, batch: EventBatch) -> QueueReconcileResult {
72 let len = batch.events.len() as u64;
73 let mut batches =
74 write_lock_or_return!(TAG, self.batches, QueueReconcileResult::DroppedEvents(len));
75
76 if batches.len() > self.max_pending_batches {
77 return QueueReconcileResult::DroppedEvents(len);
78 }
79
80 log_d!(
81 TAG,
82 "Requeueing batch with {} events and {} attempts to flush",
83 batch.events.len(),
84 batch.attempts
85 );
86
87 batches.push_back(batch);
88 QueueReconcileResult::Success
89 }
90
91 pub fn contains_at_least_one_full_batch(&self) -> bool {
92 let pending_events_count = self.pending_events.read().map(|e| e.len()).unwrap_or(0);
93 if pending_events_count >= self.batch_size {
94 return true;
95 }
96
97 let batches = read_lock_or_return!(TAG, self.batches, false);
98 for batch in batches.iter() {
99 if batch.events.len() >= self.batch_size {
100 return true;
101 }
102 }
103
104 false
105 }
106
107 pub fn take_all_batches(&self) -> VecDeque<EventBatch> {
108 let mut batches = write_lock_or_return!(TAG, self.batches, VecDeque::new());
109 std::mem::take(&mut *batches)
110 }
111
112 pub fn take_next_batch(&self) -> Option<EventBatch> {
113 let mut batches = write_lock_or_return!(TAG, self.batches, None);
114 batches.pop_front()
115 }
116
117 pub fn reconcile_batching(&self) -> QueueReconcileResult {
118 let mut pending_events: Vec<StatsigEventInternal> = self
119 .take_all_pending_events()
120 .into_iter()
121 .map(|evt| evt.into_statsig_event_internal())
122 .collect();
123
124 if pending_events.is_empty() {
125 return QueueReconcileResult::Success;
126 }
127
128 let mut batches =
129 write_lock_or_return!(TAG, self.batches, QueueReconcileResult::LockFailure);
130 let old_batches = std::mem::take(&mut *batches);
131
132 let (full_batches, partial_batches): (VecDeque<_>, VecDeque<_>) = old_batches
133 .into_iter()
134 .partition(|batch| batch.events.len() >= self.batch_size);
135
136 for batch in partial_batches {
137 pending_events.extend(batch.events);
138 }
139
140 let new_batches = self.create_batches(pending_events);
141
142 batches.extend(full_batches);
143 batches.extend(new_batches);
144
145 let dropped_events_count = self.clamp_batches(&mut batches);
146 if dropped_events_count > 0 {
147 return QueueReconcileResult::DroppedEvents(dropped_events_count);
148 }
149
150 QueueReconcileResult::Success
151 }
152
153 fn take_all_pending_events(&self) -> Vec<QueuedEvent> {
154 let mut pending_events = write_lock_or_return!(TAG, self.pending_events, Vec::new());
155 std::mem::take(&mut *pending_events)
156 }
157
158 fn create_batches(&self, mut pending_events: Vec<StatsigEventInternal>) -> Vec<EventBatch> {
159 let mut batches = Vec::new();
160 while !pending_events.is_empty() {
161 let drain_count = self.batch_size.min(pending_events.len());
162 let chunk = pending_events.drain(..drain_count).collect::<Vec<_>>();
163 batches.push(EventBatch::new(chunk));
164 }
165
166 batches
167 }
168
169 fn clamp_batches(&self, batches: &mut VecDeque<EventBatch>) -> u64 {
170 if batches.len() <= self.max_pending_batches {
171 return 0;
172 }
173
174 let mut dropped_events_count = 0;
175 while batches.len() > self.max_pending_batches {
176 if let Some(batch) = batches.pop_front() {
177 dropped_events_count += batch.events.len() as u64;
178 }
179 }
180
181 dropped_events_count
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use std::borrow::Cow;
188
189 use super::*;
190 use crate::event_logging::event_queue::queued_event::EnqueueOperation;
191 use crate::event_logging::event_queue::queued_gate_expo::EnqueueGateExpoOp;
192 use crate::event_logging::exposure_sampling::EvtSamplingDecision::ForceSampled;
193 use crate::{
194 event_logging::event_logger::ExposureTrigger, statsig_types::FeatureGate,
195 user::StatsigUserInternal, EvaluationDetails, StatsigUser,
196 };
197
198 #[test]
199 fn test_adding_single_to_queue() {
200 let (queue, user, gate) = setup(10, 10);
201 let user_internal = StatsigUserInternal::new(&user, None);
202
203 let enqueue_op = EnqueueGateExpoOp {
204 user: &user_internal,
205 queried_gate_name: &gate.name,
206 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
207 details: EvaluationDetails::unrecognized_no_data(),
208 trigger: ExposureTrigger::Auto,
209 };
210
211 let queued_event = enqueue_op.into_queued_event(ForceSampled);
212
213 let result = queue.add(queued_event);
214
215 assert!(matches!(result, QueueAddResult::Noop));
216 assert_eq!(queue.pending_events.read().unwrap().len(), 1);
217 }
218
219 #[test]
220 fn test_adding_multiple_to_queue() {
221 let (queue, user, gate) = setup(1000, 20);
222 let user_internal = StatsigUserInternal::new(&user, None);
223
224 let mut triggered_count = 0;
225 for _ in 0..4567 {
226 let enqueue_op = EnqueueGateExpoOp {
227 user: &user_internal,
228 queried_gate_name: &gate.name,
229 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
230 details: EvaluationDetails::unrecognized_no_data(),
231 trigger: ExposureTrigger::Auto,
232 };
233
234 let result = queue.add(enqueue_op.into_queued_event(ForceSampled));
235
236 if let QueueAddResult::NeedsFlush = result {
237 triggered_count += 1;
238 }
239 }
240
241 assert_eq!(queue.pending_events.read().unwrap().len(), 4567);
242 assert_eq!(triggered_count, (4567 / 1000) as usize);
243 }
244
245 #[test]
246 fn test_take_all_batches() {
247 let batch_size = 200;
248 let max_pending_batches = 40;
249
250 let (queue, user, gate) = setup(batch_size, max_pending_batches);
251 let user_internal = StatsigUserInternal::new(&user, None);
252
253 for _ in 0..4567 {
254 let enqueue_op = EnqueueGateExpoOp {
255 user: &user_internal,
256 queried_gate_name: &gate.name,
257 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
258 details: EvaluationDetails::unrecognized_no_data(),
259 trigger: ExposureTrigger::Auto,
260 };
261 queue.add(enqueue_op.into_queued_event(ForceSampled));
262 }
263
264 queue.reconcile_batching();
265 let batches = queue.take_all_batches();
266 assert_eq!(batches.len(), (4567.0 / batch_size as f64).ceil() as usize,);
267 }
268
269 #[test]
270 fn test_take_next_batch() {
271 let batch_size = 200;
272 let max_pending_batches = 20;
273
274 let (queue, user, gate) = setup(batch_size, max_pending_batches);
275 let user_internal = StatsigUserInternal::new(&user, None);
276
277 for _ in 0..4567 {
278 let enqueue_op = EnqueueGateExpoOp {
279 user: &user_internal,
280 queried_gate_name: &gate.name,
281 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
282 details: EvaluationDetails::unrecognized_no_data(),
283 trigger: ExposureTrigger::Auto,
284 };
285 queue.add(enqueue_op.into_queued_event(ForceSampled));
286 }
287
288 queue.reconcile_batching();
289 let batch = queue.take_next_batch();
290 assert_eq!(batch.unwrap().events.len(), batch_size as usize);
291
292 assert_eq!(
293 queue.batches.read().unwrap().len(),
294 (max_pending_batches - 1) as usize
295 ); }
297
298 fn setup(batch_size: u32, max_queue_size: u32) -> (EventQueue, StatsigUser, FeatureGate) {
299 let queue = EventQueue::new(batch_size, max_queue_size);
300 let user = StatsigUser::with_user_id("user-id");
301 let gate = FeatureGate {
302 name: "gate-name".into(),
303 value: true,
304 rule_id: "rule-id".into(),
305 id_type: "user-id".into(),
306 details: EvaluationDetails::unrecognized_no_data(),
307 __evaluation: None,
308 };
309 (queue, user, gate)
310 }
311}