statsig_rust/event_logging/event_queue/
queue.rs1use crate::{
2 event_logging::statsig_event_internal::StatsigEventInternal, log_d, read_lock_or_return,
3 write_lock_or_return,
4};
5use std::{collections::VecDeque, sync::RwLock};
6
7use super::{batch::EventBatch, queued_event::QueuedEvent};
8
9const TAG: &str = stringify!(EventQueue);
10
11pub enum QueueResult {
12 Success,
13 DroppedEvents(u64),
14}
15
16pub struct EventQueue {
17 pub batch_size: usize,
18 pub max_pending_batches: usize,
19
20 pending_events: RwLock<Vec<QueuedEvent>>,
21 batches: RwLock<VecDeque<EventBatch>>,
22}
23
24impl EventQueue {
25 pub fn new(batch_size: u32, max_queue_size: u32) -> Self {
26 Self {
27 pending_events: RwLock::new(Vec::new()),
28 batches: RwLock::new(VecDeque::new()),
29 batch_size: batch_size as usize,
30 max_pending_batches: max_queue_size as usize,
31 }
32 }
33
34 pub fn approximate_pending_events_count(&self) -> usize {
35 let pending_len = read_lock_or_return!(TAG, self.pending_events, 0).len();
36 let batches_len = read_lock_or_return!(TAG, self.batches, 0).len();
37 pending_len + (batches_len * self.batch_size)
38 }
39
40 pub fn add(&self, pending_event: QueuedEvent) -> bool {
41 let mut pending_events = write_lock_or_return!(TAG, self.pending_events, false);
42 pending_events.push(pending_event);
43 pending_events.len() % self.batch_size == 0
44 }
45
46 pub fn requeue_batch(&self, batch: EventBatch) -> QueueResult {
47 let len = batch.events.len() as u64;
48 let mut batches = write_lock_or_return!(TAG, self.batches, QueueResult::DroppedEvents(len));
49
50 if batches.len() > self.max_pending_batches {
51 return QueueResult::DroppedEvents(len);
52 }
53
54 log_d!(
55 TAG,
56 "Requeueing batch with {} events and {} attempts to flush",
57 batch.events.len(),
58 batch.attempts
59 );
60
61 batches.push_back(batch);
62 QueueResult::Success
63 }
64
65 pub fn contains_at_least_one_full_batch(&self) -> bool {
66 let pending_events_count = self.pending_events.read().map(|e| e.len()).unwrap_or(0);
67 if pending_events_count >= self.batch_size {
68 return true;
69 }
70
71 let batches = read_lock_or_return!(TAG, self.batches, false);
72 for batch in batches.iter() {
73 if batch.events.len() >= self.batch_size {
74 return true;
75 }
76 }
77
78 false
79 }
80
81 pub fn take_all_batches(&self) -> VecDeque<EventBatch> {
82 let mut batches = self.batches.write().unwrap();
83 std::mem::take(&mut *batches)
84 }
85
86 pub fn take_next_batch(&self) -> Option<EventBatch> {
87 let mut batches = self.batches.write().unwrap();
88 batches.pop_front()
89 }
90
91 pub fn reconcile_batching(&self) -> QueueResult {
92 let mut pending_events: Vec<StatsigEventInternal> = self
93 .take_all_pending_events()
94 .into_iter()
95 .map(|evt| evt.into_statsig_event_internal())
96 .collect();
97
98 if pending_events.is_empty() {
99 return QueueResult::Success;
100 }
101
102 let mut batches = self.batches.write().unwrap();
103 let old_batches = std::mem::take(&mut *batches);
104
105 let (full_batches, partial_batches): (VecDeque<_>, VecDeque<_>) = old_batches
106 .into_iter()
107 .partition(|batch| batch.events.len() >= self.batch_size);
108
109 for batch in partial_batches {
110 pending_events.extend(batch.events);
111 }
112
113 let new_batches = self.create_batches(pending_events);
114
115 batches.extend(full_batches);
116 batches.extend(new_batches);
117
118 let dropped_events_count = self.clamp_batches(&mut batches);
119 if dropped_events_count > 0 {
120 return QueueResult::DroppedEvents(dropped_events_count);
121 }
122
123 QueueResult::Success
124 }
125
126 fn take_all_pending_events(&self) -> Vec<QueuedEvent> {
127 let mut pending_events = write_lock_or_return!(TAG, self.pending_events, Vec::new());
128 std::mem::take(&mut *pending_events)
129 }
130
131 fn create_batches(&self, mut pending_events: Vec<StatsigEventInternal>) -> Vec<EventBatch> {
132 let mut batches = Vec::new();
133 while !pending_events.is_empty() {
134 let drain_count = self.batch_size.min(pending_events.len());
135 let chunk = pending_events.drain(..drain_count).collect::<Vec<_>>();
136 batches.push(EventBatch::new(chunk));
137 }
138
139 batches
140 }
141
142 fn clamp_batches(&self, batches: &mut VecDeque<EventBatch>) -> u64 {
143 if batches.len() <= self.max_pending_batches {
144 return 0;
145 }
146
147 let mut dropped_events_count = 0;
148 while batches.len() > self.max_pending_batches {
149 if let Some(batch) = batches.pop_front() {
150 dropped_events_count += batch.events.len() as u64;
151 }
152 }
153
154 dropped_events_count
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use std::borrow::Cow;
161
162 use super::*;
163 use crate::event_logging::event_queue::queued_event::EnqueueOperation;
164 use crate::event_logging::event_queue::queued_gate_expo::EnqueueGateExpoOp;
165 use crate::event_logging::exposure_sampling::EvtSamplingDecision::ForceSampled;
166 use crate::{
167 event_logging::event_logger::ExposureTrigger, statsig_types::FeatureGate,
168 user::StatsigUserInternal, EvaluationDetails, StatsigUser,
169 };
170
171 #[test]
172 fn test_adding_single_to_queue() {
173 let (queue, user, gate) = setup(10, 10);
174 let user_internal = StatsigUserInternal::new(&user, None);
175
176 let enqueue_op = EnqueueGateExpoOp {
177 user: &user_internal,
178 queried_gate_name: &gate.name,
179 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
180 details: EvaluationDetails::unrecognized_no_data(),
181 trigger: ExposureTrigger::Auto,
182 };
183
184 let queued_event = enqueue_op.into_queued_event(ForceSampled);
185
186 let has_exceeded_limit = queue.add(queued_event);
187
188 assert!(!has_exceeded_limit);
189 assert_eq!(queue.pending_events.read().unwrap().len(), 1);
190 }
191
192 #[test]
193 fn test_adding_multiple_to_queue() {
194 let (queue, user, gate) = setup(100, 20);
195 let user_internal = StatsigUserInternal::new(&user, None);
196
197 let mut triggered_count = 0;
198 for _ in 0..4567 {
199 let enqueue_op = EnqueueGateExpoOp {
200 user: &user_internal,
201 queried_gate_name: &gate.name,
202 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
203 details: EvaluationDetails::unrecognized_no_data(),
204 trigger: ExposureTrigger::Auto,
205 };
206
207 let did_trigger = queue.add(enqueue_op.into_queued_event(ForceSampled));
208
209 if did_trigger {
210 triggered_count += 1;
211 }
212 }
213
214 assert_eq!(queue.pending_events.read().unwrap().len(), 4567);
215 assert_eq!(triggered_count, (4567 / 100) as usize);
216 }
217
218 #[test]
219 fn test_take_all_batches() {
220 let batch_size = 200;
221 let max_pending_batches = 40;
222
223 let (queue, user, gate) = setup(batch_size, max_pending_batches);
224 let user_internal = StatsigUserInternal::new(&user, None);
225
226 for _ in 0..4567 {
227 let enqueue_op = EnqueueGateExpoOp {
228 user: &user_internal,
229 queried_gate_name: &gate.name,
230 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
231 details: EvaluationDetails::unrecognized_no_data(),
232 trigger: ExposureTrigger::Auto,
233 };
234 queue.add(enqueue_op.into_queued_event(ForceSampled));
235 }
236
237 queue.reconcile_batching();
238 let batches = queue.take_all_batches();
239 assert_eq!(batches.len(), (4567.0 / batch_size as f64).ceil() as usize,);
240 }
241
242 #[test]
243 fn test_take_next_batch() {
244 let batch_size = 200;
245 let max_pending_batches = 20;
246
247 let (queue, user, gate) = setup(batch_size, max_pending_batches);
248 let user_internal = StatsigUserInternal::new(&user, None);
249
250 for _ in 0..4567 {
251 let enqueue_op = EnqueueGateExpoOp {
252 user: &user_internal,
253 queried_gate_name: &gate.name,
254 evaluation: gate.__evaluation.as_ref().map(Cow::Borrowed),
255 details: EvaluationDetails::unrecognized_no_data(),
256 trigger: ExposureTrigger::Auto,
257 };
258 queue.add(enqueue_op.into_queued_event(ForceSampled));
259 }
260
261 queue.reconcile_batching();
262 let batch = queue.take_next_batch();
263 assert_eq!(batch.unwrap().events.len(), batch_size as usize);
264
265 assert_eq!(
266 queue.batches.read().unwrap().len(),
267 (max_pending_batches - 1) as usize
268 ); }
270
271 fn setup(batch_size: u32, max_queue_size: u32) -> (EventQueue, StatsigUser, FeatureGate) {
272 let queue = EventQueue::new(batch_size, max_queue_size);
273 let user = StatsigUser::with_user_id("user-id");
274 let gate = FeatureGate {
275 name: "gate-name".into(),
276 value: true,
277 rule_id: "rule-id".into(),
278 id_type: "user-id".into(),
279 details: EvaluationDetails::unrecognized_no_data(),
280 __evaluation: None,
281 };
282 (queue, user, gate)
283 }
284}