statsig_rust/event_logging/
event_logger.rs1use super::{
2 event_queue::{
3 batch::EventBatch,
4 queue::{EventQueue, QueueResult},
5 queued_event::{EnqueueOperation, QueuedEvent},
6 },
7 exposure_sampling::ExposureSampling,
8 flush_interval::FlushInterval,
9 statsig_event_internal::StatsigEventInternal,
10};
11use crate::{
12 event_logging::event_logger_constants::EventLoggerConstants,
13 log_d, log_w,
14 networking::NetworkError,
15 observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
16 statsig_metadata::StatsigMetadata,
17 write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
18};
19use std::time::Duration;
20use std::{
21 collections::HashMap,
22 sync::{Arc, RwLock},
23};
24use tokio::sync::Notify;
25
26const BG_TASK_TAG: &str = "EVENT_LOGGER_BG_TASK";
27const DEFAULT_BATCH_SIZE: u32 = 2000;
28const DEFAULT_PENDING_BATCH_MAX: u32 = 20;
29
30#[derive(Debug, PartialEq, Eq, Clone, Copy)]
31pub enum ExposureTrigger {
32 Auto,
33 Manual,
34}
35
36const TAG: &str = stringify!(EvtLogger);
37
38pub struct EventLogger {
39 queue: EventQueue,
40 options: Arc<StatsigOptions>,
41 logging_adapter: Arc<dyn EventLoggingAdapter>,
42 event_sampler: ExposureSampling,
43 non_exposed_checks: RwLock<HashMap<String, u64>>,
44 limit_flush_notify: Notify,
45 flush_interval: FlushInterval,
46 shutdown_notify: Notify,
47 ops_stats: Arc<OpsStatsForInstance>,
48}
49
50impl EventLogger {
51 pub fn new(
52 sdk_key: &str,
53 options: &Arc<StatsigOptions>,
54 event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
55 statsig_rt: &Arc<StatsigRuntime>,
56 ) -> Arc<Self> {
57 let me = Arc::new(Self {
58 queue: EventQueue::new(
59 options
60 .event_logging_max_queue_size
61 .unwrap_or(DEFAULT_BATCH_SIZE),
62 options
63 .event_logging_max_pending_batch_queue_size
64 .unwrap_or(DEFAULT_PENDING_BATCH_MAX),
65 ),
66 event_sampler: ExposureSampling::new(sdk_key),
67 flush_interval: FlushInterval::new(),
68 options: options.clone(),
69 logging_adapter: event_logging_adapter.clone(),
70 non_exposed_checks: RwLock::new(HashMap::new()),
71 shutdown_notify: Notify::new(),
72 limit_flush_notify: Notify::new(),
73 ops_stats: OPS_STATS.get_for_instance(sdk_key),
74 });
75
76 me.spawn_background_task(statsig_rt);
77 me
78 }
79
80 pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
81 if self.options.disable_all_logging == Some(true) {
82 return;
83 }
84
85 let decision = self.event_sampler.get_sampling_decision(&operation);
86 if !decision.should_log() {
87 return;
88 }
89
90 let pending_event = operation.into_queued_event(decision);
91 if self.queue.add(pending_event) {
92 self.limit_flush_notify.notify_one();
93 }
94 }
95
96 pub fn increment_non_exposure_checks(&self, name: &str) {
97 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
98
99 match non_exposed_checks.get_mut(name) {
100 Some(count) => *count += 1,
101 None => {
102 non_exposed_checks.insert(name.into(), 1);
103 }
104 }
105 }
106
107 pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
108 self.try_flush_all_pending_events(FlushType::Manual).await
109 }
110
111 pub async fn shutdown(&self) -> Result<(), StatsigErr> {
112 let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
113 self.shutdown_notify.notify_one();
114 result
115 }
116
117 fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
118 let me = self.clone();
119
120 rt.spawn(BG_TASK_TAG, |rt_shutdown_notify| async move {
121 let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
122 let tick_interval = Duration::from_millis(tick_interval_ms);
123
124 loop {
125 let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
126
127 tokio::select! {
128 () = tokio::time::sleep(tick_interval) => {
129 me.try_scheduled_flush().await;
130 }
131 () = rt_shutdown_notify.notified() => {
132 return; }
134 _ = me.shutdown_notify.notified() => {
135 return; }
137 _ = me.limit_flush_notify.notified(), if can_limit_flush => {
138 me.try_limit_flush().await;
139 }
140 }
141
142 me.event_sampler.try_reset_all_sampling();
143 }
144 });
145 }
146
147 async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
148 self.prepare_event_queue_for_flush();
149
150 let batches = self.queue.take_all_batches();
151
152 let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
153 match self.log_batch(&mut batch, flush_type).await {
154 Ok(_) => Ok(()),
155 Err(e) => {
156 if flush_type == FlushType::Manual {
157 self.flush_interval.adjust_for_failure();
158 self.try_requeue_failed_batch(&e, batch, flush_type);
159 return Err(e);
160 }
161
162 self.drop_failed_events(&e, batch, flush_type);
163 Err(e)
164 }
165 }
166 }))
167 .await;
168
169 results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
170 }
171
172 async fn try_scheduled_flush(&self) {
173 if !self.flush_interval.has_cooled_from_most_recent_failure() {
174 return;
175 }
176
177 let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
178 let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
179
180 if !should_flush_by_time && !should_flush_by_size {
181 return;
182 }
183
184 self.flush_interval.mark_scheduled_flush_attempt();
185
186 self.flush_next_batch(if should_flush_by_size {
187 FlushType::ScheduledFullBatch
188 } else {
189 FlushType::ScheduledMaxTime
190 })
191 .await;
192 }
193
194 async fn try_limit_flush(&self) {
195 log_d!(TAG, "Attempting limit flush");
196 self.flush_next_batch(FlushType::Limit).await;
197 }
198
199 async fn flush_next_batch(&self, flush_type: FlushType) {
200 self.prepare_event_queue_for_flush();
201
202 let mut batch = match self.queue.take_next_batch() {
203 Some(batch) => batch,
204 None => return,
205 };
206
207 let error = match self.log_batch(&mut batch, flush_type).await {
208 Err(e) => e,
209 Ok(()) => {
210 self.flush_interval.adjust_for_success();
211 return;
212 }
213 };
214
215 self.flush_interval.adjust_for_failure();
216 self.try_requeue_failed_batch(&error, batch, flush_type);
217 }
218
219 async fn log_batch(
220 &self,
221 batch: &mut EventBatch,
222 flush_type: FlushType,
223 ) -> Result<(), StatsigErr> {
224 let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
225 self.flush_interval.get_current_flush_interval_ms(),
226 self.queue.batch_size,
227 self.queue.max_pending_batches,
228 flush_type.as_string(),
229 );
230
231 let result = self
232 .logging_adapter
233 .log_events(batch.get_log_event_request(statsig_metadata))
234 .await;
235
236 batch.attempts += 1;
237
238 match result {
239 Ok(true) => {
240 self.ops_stats.log_event_request_success(batch.events.len());
241 Ok(())
242 }
243 Ok(false) => Err(StatsigErr::LockFailure("Unknown Failure".into())),
244 Err(e) => Err(e),
245 }
246 }
247
248 fn prepare_event_queue_for_flush(&self) {
249 self.try_add_non_exposed_checks_event();
250
251 let dropped_events_count = match self.queue.reconcile_batching() {
252 QueueResult::Success => return,
253 QueueResult::DroppedEvents(dropped_events_count) => dropped_events_count,
254 };
255
256 if dropped_events_count > 0 {
257 self.ops_stats.log_batching_dropped_events(
258 StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
259 dropped_events_count,
260 &self.flush_interval,
261 &self.queue,
262 );
263 }
264 }
265
266 fn try_requeue_failed_batch(
267 &self,
268 error: &StatsigErr,
269 batch: EventBatch,
270 flush_type: FlushType,
271 ) {
272 let is_non_retryable = matches!(
273 error,
274 StatsigErr::NetworkError(NetworkError::RequestNotRetryable, _)
275 );
276
277 let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
278
279 if is_non_retryable || is_max_retries {
280 self.drop_failed_events(error, batch, flush_type);
281 return;
282 }
283
284 let dropped_events_count = match self.queue.requeue_batch(batch) {
285 QueueResult::Success => return,
286 QueueResult::DroppedEvents(dropped_events_count) => dropped_events_count,
287 };
288
289 if dropped_events_count == 0 {
290 return;
291 }
292
293 log_w!(
294 TAG,
295 "Max pending event batches reached, dropping {} event(s). Configuration: batch_size: {}, max_pending_batches: {}",
296 dropped_events_count,
297 self.queue.batch_size,
298 self.queue.max_pending_batches
299 );
300
301 self.ops_stats.log_batching_dropped_events(
302 StatsigErr::LogEventError(
303 "Dropped events due to max pending event batches limit".to_string(),
304 ),
305 dropped_events_count,
306 &self.flush_interval,
307 &self.queue,
308 );
309 }
310
311 fn drop_failed_events(&self, error: &StatsigErr, batch: EventBatch, flush_type: FlushType) {
312 let dropped_events_count = batch.events.len() as u64;
313
314 let kind = match flush_type {
315 FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
316 FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
317 FlushType::Limit => "Limit",
318 FlushType::Manual => "Manual",
319 FlushType::Shutdown => "Shutdown",
320 };
321
322 log_w!(
323 TAG,
324 "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
325 kind,
326 batch.attempts,
327 dropped_events_count,
328 error
329 );
330
331 self.ops_stats
332 .log_event_request_failure(dropped_events_count);
333
334 self.ops_stats.log_batching_dropped_events(
335 StatsigErr::LogEventError("Dropped events due flush failure".to_string()),
336 dropped_events_count,
337 &self.flush_interval,
338 &self.queue,
339 );
340 }
341
342 fn try_add_non_exposed_checks_event(&self) {
343 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
344 if non_exposed_checks.is_empty() {
345 return;
346 }
347
348 let checks = std::mem::take(&mut *non_exposed_checks);
349 self.queue.add(QueuedEvent::Passthrough(
350 StatsigEventInternal::new_non_exposed_checks_event(checks),
351 ));
352 }
353}
354
355#[derive(Debug, PartialEq, Eq, Clone, Copy)]
356enum FlushType {
357 ScheduledMaxTime,
358 ScheduledFullBatch,
359 Limit,
360 Manual,
361 Shutdown,
362}
363
364impl FlushType {
365 fn as_string(&self) -> String {
366 match self {
367 FlushType::ScheduledMaxTime => "scheduled:max_time",
368 FlushType::ScheduledFullBatch => "scheduled:full_batch",
369 FlushType::Limit => "limit",
370 FlushType::Manual => "manual",
371 FlushType::Shutdown => "shutdown",
372 }
373 .to_string()
374 }
375}