statsig_rust/event_logging/event_queue/
queue.rs1use crate::{
2 event_logging::statsig_event_internal::StatsigEventInternal, log_d, read_lock_or_return,
3 write_lock_or_return,
4};
5use std::{collections::VecDeque, sync::RwLock};
6
7use super::{batch::EventBatch, queued_event::QueuedEvent};
8
9const TAG: &str = stringify!(EventQueue);
10
11pub enum QueueResult {
12 Success,
13 DroppedEvents(u64),
14}
15
16pub struct EventQueue {
17 pub batch_size: usize,
18 pub max_pending_batches: usize,
19
20 pending_events: RwLock<Vec<QueuedEvent>>,
21 batches: RwLock<VecDeque<EventBatch>>,
22}
23
24impl EventQueue {
25 pub fn new(batch_size: u32, max_queue_size: u32) -> Self {
26 Self {
27 pending_events: RwLock::new(Vec::new()),
28 batches: RwLock::new(VecDeque::new()),
29 batch_size: batch_size as usize,
30 max_pending_batches: max_queue_size as usize,
31 }
32 }
33
34 pub fn add(&self, pending_event: QueuedEvent) -> bool {
35 let mut pending_events = write_lock_or_return!(TAG, self.pending_events, false);
36 pending_events.push(pending_event);
37 pending_events.len() % self.batch_size == 0
38 }
39
40 pub fn requeue_batch(&self, batch: EventBatch) -> QueueResult {
41 let len = batch.events.len() as u64;
42 let mut batches = write_lock_or_return!(TAG, self.batches, QueueResult::DroppedEvents(len));
43
44 if batches.len() > self.max_pending_batches {
45 return QueueResult::DroppedEvents(len);
46 }
47
48 log_d!(
49 TAG,
50 "Requeueing batch with {} events and {} attempts to flush",
51 batch.events.len(),
52 batch.attempts
53 );
54
55 batches.push_back(batch);
56 QueueResult::Success
57 }
58
59 pub fn contains_at_least_one_full_batch(&self) -> bool {
60 let pending_events_count = self.pending_events.read().map(|e| e.len()).unwrap_or(0);
61 if pending_events_count >= self.batch_size {
62 return true;
63 }
64
65 let batches = read_lock_or_return!(TAG, self.batches, false);
66 for batch in batches.iter() {
67 if batch.events.len() >= self.batch_size {
68 return true;
69 }
70 }
71
72 false
73 }
74
75 pub fn take_all_batches(&self) -> VecDeque<EventBatch> {
76 let mut batches = self.batches.write().unwrap();
77 std::mem::take(&mut *batches)
78 }
79
80 pub fn take_next_batch(&self) -> Option<EventBatch> {
81 let mut batches = self.batches.write().unwrap();
82 batches.pop_front()
83 }
84
85 pub fn reconcile_batching(&self) -> QueueResult {
86 let mut pending_events: Vec<StatsigEventInternal> = self
87 .take_all_pending_events()
88 .into_iter()
89 .map(|evt| evt.into_statsig_event_internal())
90 .collect();
91
92 if pending_events.is_empty() {
93 return QueueResult::Success;
94 }
95
96 let mut batches = self.batches.write().unwrap();
97 let old_batches = std::mem::take(&mut *batches);
98
99 let (full_batches, partial_batches): (VecDeque<_>, VecDeque<_>) = old_batches
100 .into_iter()
101 .partition(|batch| batch.events.len() >= self.batch_size);
102
103 for batch in partial_batches {
104 pending_events.extend(batch.events);
105 }
106
107 let new_batches = self.create_batches(pending_events);
108
109 batches.extend(full_batches);
110 batches.extend(new_batches);
111
112 let dropped_events_count = self.clamp_batches(&mut batches);
113 if dropped_events_count > 0 {
114 return QueueResult::DroppedEvents(dropped_events_count);
115 }
116
117 QueueResult::Success
118 }
119
120 fn take_all_pending_events(&self) -> Vec<QueuedEvent> {
121 let mut pending_events = write_lock_or_return!(TAG, self.pending_events, Vec::new());
122 std::mem::take(&mut *pending_events)
123 }
124
125 fn create_batches(&self, mut pending_events: Vec<StatsigEventInternal>) -> Vec<EventBatch> {
126 let mut batches = Vec::new();
127 while !pending_events.is_empty() {
128 let drain_count = self.batch_size.min(pending_events.len());
129 let chunk = pending_events.drain(..drain_count).collect::<Vec<_>>();
130 batches.push(EventBatch::new(chunk));
131 }
132
133 batches
134 }
135
136 fn clamp_batches(&self, batches: &mut VecDeque<EventBatch>) -> u64 {
137 if batches.len() <= self.max_pending_batches {
138 return 0;
139 }
140
141 let mut dropped_events_count = 0;
142 while batches.len() > self.max_pending_batches {
143 if let Some(batch) = batches.pop_front() {
144 dropped_events_count += batch.events.len() as u64;
145 }
146 }
147
148 dropped_events_count
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use std::borrow::Cow;
155
156 use super::*;
157 use crate::event_logging::event_queue::queued_event::EnqueueOperation;
158 use crate::event_logging::event_queue::queued_gate_expo::EnqueueGateExpoOp;
159 use crate::event_logging::exposure_sampling::EvtSamplingDecision::ForceSampled;
160 use crate::{
161 event_logging::event_logger::ExposureTrigger, statsig_types::FeatureGate,
162 user::StatsigUserInternal, EvaluationDetails, StatsigUser,
163 };
164
165 #[test]
166 fn test_adding_single_to_queue() {
167 let (queue, user, gate) = setup(10, 10);
168 let user_internal = StatsigUserInternal::new(&user, None);
169
170 let enqueue_op = EnqueueGateExpoOp {
171 user: &user_internal,
172 queried_gate_name: &gate.name,
173 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
174 details: EvaluationDetails::unrecognized_no_data(),
175 trigger: ExposureTrigger::Auto,
176 };
177
178 let queued_event = enqueue_op.into_queued_event(ForceSampled);
179
180 let has_exceeded_limit = queue.add(queued_event);
181
182 assert!(!has_exceeded_limit);
183 assert_eq!(queue.pending_events.read().unwrap().len(), 1);
184 }
185
186 #[test]
187 fn test_adding_multiple_to_queue() {
188 let (queue, user, gate) = setup(100, 20);
189 let user_internal = StatsigUserInternal::new(&user, None);
190
191 let mut triggered_count = 0;
192 for _ in 0..4567 {
193 let enqueue_op = EnqueueGateExpoOp {
194 user: &user_internal,
195 queried_gate_name: &gate.name,
196 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
197 details: EvaluationDetails::unrecognized_no_data(),
198 trigger: ExposureTrigger::Auto,
199 };
200
201 let did_trigger = queue.add(enqueue_op.into_queued_event(ForceSampled));
202
203 if did_trigger {
204 triggered_count += 1;
205 }
206 }
207
208 assert_eq!(queue.pending_events.read().unwrap().len(), 4567);
209 assert_eq!(triggered_count, (4567 / 100) as usize);
210 }
211
212 #[test]
213 fn test_take_all_batches() {
214 let batch_size = 200;
215 let max_pending_batches = 40;
216
217 let (queue, user, gate) = setup(batch_size, max_pending_batches);
218 let user_internal = StatsigUserInternal::new(&user, None);
219
220 for _ in 0..4567 {
221 let enqueue_op = EnqueueGateExpoOp {
222 user: &user_internal,
223 queried_gate_name: &gate.name,
224 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
225 details: EvaluationDetails::unrecognized_no_data(),
226 trigger: ExposureTrigger::Auto,
227 };
228 queue.add(enqueue_op.into_queued_event(ForceSampled));
229 }
230
231 queue.reconcile_batching();
232 let batches = queue.take_all_batches();
233 assert_eq!(batches.len(), (4567.0 / batch_size as f64).ceil() as usize,);
234 }
235
236 #[test]
237 fn test_take_next_batch() {
238 let batch_size = 200;
239 let max_pending_batches = 20;
240
241 let (queue, user, gate) = setup(batch_size, max_pending_batches);
242 let user_internal = StatsigUserInternal::new(&user, None);
243
244 for _ in 0..4567 {
245 let enqueue_op = EnqueueGateExpoOp {
246 user: &user_internal,
247 queried_gate_name: &gate.name,
248 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
249 details: EvaluationDetails::unrecognized_no_data(),
250 trigger: ExposureTrigger::Auto,
251 };
252 queue.add(enqueue_op.into_queued_event(ForceSampled));
253 }
254
255 queue.reconcile_batching();
256 let batch = queue.take_next_batch();
257 assert_eq!(batch.unwrap().events.len(), batch_size as usize);
258
259 assert_eq!(
260 queue.batches.read().unwrap().len(),
261 (max_pending_batches - 1) as usize
262 ); }
264
265 fn setup(batch_size: u32, max_queue_size: u32) -> (EventQueue, StatsigUser, FeatureGate) {
266 let queue = EventQueue::new(batch_size, max_queue_size);
267 let user = StatsigUser::with_user_id("user-id");
268 let gate = FeatureGate {
269 name: "gate-name".into(),
270 value: true,
271 rule_id: "rule-id".into(),
272 id_type: "user-id".into(),
273 details: EvaluationDetails::unrecognized_no_data(),
274 __evaluation: None,
275 };
276 (queue, user, gate)
277 }
278}