Skip to main content

statsig_rust/event_logging/
event_logger.rs

1use super::{
2    event_queue::{
3        batch::EventBatch,
4        queue::{EventQueue, QueueReconcileResult},
5        queued_event::{EnqueueOperation, QueuedEvent},
6        queued_secondary_expo::EnqueueSecondaryExposureAsPrimaryOp,
7    },
8    exposure_sampling::ExposureSampling,
9    flush_interval::FlushInterval,
10    flush_type::FlushType,
11    sec_expo_as_primary_experiment::SecExpoAsPrimaryExperiment,
12    statsig_event_internal::StatsigEventInternal,
13};
14use crate::{
15    event_logging::{
16        event_logger_constants::EventLoggerConstants, event_queue::queue::QueueAddResult,
17    },
18    log_d, log_e, log_w,
19    networking::NetworkError,
20    observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
21    statsig_metadata::StatsigMetadata,
22    write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
23};
24use parking_lot::RwLock;
25use std::{collections::HashMap, sync::Arc};
26use std::{
27    sync::atomic::{AtomicU64, Ordering},
28    time::Duration,
29};
30use tokio::sync::{Notify, Semaphore};
31
32pub const MIN_BATCH_SIZE: u32 = 10;
33pub const MAX_BATCH_SIZE: u32 = 2000;
34
35pub const MIN_PENDING_BATCH_COUNT: u32 = 1;
36pub const DEFAULT_PENDING_BATCH_COUNT_MAX: u32 = 100;
37
38const BG_LOOP_TAG: &str = "EVT_LOG_BG_LOOP";
39const LIMIT_FLUSH_TAG: &str = "EVT_LOG_LIMIT_FLUSH";
40const DEFAULT_BATCH_SIZE: u32 = MAX_BATCH_SIZE;
41const MAX_LIMIT_FLUSH_TASKS: usize = 5;
42
43#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum ExposureTrigger {
45    Auto,
46    Manual,
47}
48
49const TAG: &str = stringify!(EvtLogger);
50
51pub struct EventLogger {
52    queue: EventQueue,
53    options: Arc<StatsigOptions>,
54    logging_adapter: Arc<dyn EventLoggingAdapter>,
55    event_sampler: ExposureSampling,
56    non_exposed_checks: RwLock<HashMap<String, u64>>,
57    limit_flush_notify: Notify,
58    limit_flush_semaphore: Arc<Semaphore>,
59    flush_interval: FlushInterval,
60    shutdown_notify: Notify,
61    ops_stats: Arc<OpsStatsForInstance>,
62    sec_expo_experiment: SecExpoAsPrimaryExperiment,
63    enqueue_dropped_events_count: AtomicU64,
64}
65
66impl EventLogger {
67    pub fn new(
68        sdk_key: &str,
69        options: &Arc<StatsigOptions>,
70        event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
71        statsig_rt: &Arc<StatsigRuntime>,
72    ) -> Arc<Self> {
73        let me = Arc::new(Self {
74            queue: EventQueue::new(
75                options
76                    .event_logging_max_queue_size
77                    .unwrap_or(DEFAULT_BATCH_SIZE),
78                options
79                    .event_logging_max_pending_batch_queue_size
80                    .unwrap_or(DEFAULT_PENDING_BATCH_COUNT_MAX),
81            ),
82            event_sampler: ExposureSampling::new(sdk_key),
83            flush_interval: FlushInterval::new(),
84            options: options.clone(),
85            logging_adapter: event_logging_adapter.clone(),
86            non_exposed_checks: RwLock::new(HashMap::new()),
87            shutdown_notify: Notify::new(),
88            limit_flush_notify: Notify::new(),
89            limit_flush_semaphore: Arc::new(Semaphore::new(MAX_LIMIT_FLUSH_TASKS)),
90            ops_stats: OPS_STATS.get_for_instance(sdk_key),
91            sec_expo_experiment: SecExpoAsPrimaryExperiment::new(sdk_key, options),
92            enqueue_dropped_events_count: AtomicU64::new(0),
93        });
94
95        me.spawn_background_task(statsig_rt);
96        me
97    }
98
99    pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
100        if self.options.disable_all_logging == Some(true) {
101            return;
102        }
103
104        let sec_expo_as_primary_active = self.sec_expo_experiment.should_log_as_primary();
105        let decision = self
106            .event_sampler
107            .get_sampling_decision(&operation, sec_expo_as_primary_active);
108
109        if sec_expo_as_primary_active {
110            let should_log_parent = decision.should_log();
111            let pending_event = operation.into_queued_event(decision);
112            self.enqueue_event_with_secondary_exposures_as_primary(
113                pending_event,
114                should_log_parent,
115            );
116            return;
117        }
118
119        if !decision.should_log() {
120            return;
121        }
122
123        let pending_event = operation.into_queued_event(decision);
124        self.add_pending_event(pending_event);
125    }
126
127    fn enqueue_event_with_secondary_exposures_as_primary(
128        &self,
129        mut pending_event: QueuedEvent,
130        should_log_parent: bool,
131    ) {
132        let Some(exposure_time) = pending_event.exposure_time() else {
133            if should_log_parent {
134                self.add_pending_event(pending_event);
135            }
136            return;
137        };
138        let secondary_exposures = pending_event.take_secondary_exposures_for_primary_logging();
139        if secondary_exposures.is_empty() {
140            if should_log_parent {
141                self.add_pending_event(pending_event);
142            }
143            return;
144        }
145
146        let Some(user) = pending_event.user_for_secondary_exposures() else {
147            if should_log_parent {
148                self.add_pending_event(pending_event);
149            }
150            return;
151        };
152
153        if should_log_parent {
154            self.add_pending_event(pending_event);
155        }
156
157        for secondary_exposure in secondary_exposures {
158            let operation = EnqueueSecondaryExposureAsPrimaryOp {
159                user: user.clone(),
160                secondary_exposure,
161                exposure_time,
162            };
163
164            let decision = self.event_sampler.get_sampling_decision(&operation, false);
165            if decision.should_log() {
166                self.add_pending_event(operation.into_queued_event(decision));
167            }
168        }
169    }
170
171    fn add_pending_event(&self, pending_event: QueuedEvent) {
172        match self.queue.add(pending_event) {
173            QueueAddResult::Noop => (),
174            QueueAddResult::NeedsFlush => self.limit_flush_notify.notify_one(),
175            QueueAddResult::NeedsFlushAndDropped(dropped_events_count) => {
176                self.enqueue_dropped_events_count
177                    .fetch_add(dropped_events_count, Ordering::Relaxed);
178                self.limit_flush_notify.notify_one();
179            }
180        }
181    }
182
183    pub fn increment_non_exposure_checks(&self, name: &str) {
184        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
185
186        match non_exposed_checks.get_mut(name) {
187            Some(count) => *count += 1,
188            None => {
189                non_exposed_checks.insert(name.into(), 1);
190            }
191        }
192    }
193
194    pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
195        self.try_flush_all_pending_events(FlushType::Manual).await
196    }
197
198    pub async fn shutdown(&self) -> Result<(), StatsigErr> {
199        let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
200        self.shutdown_notify.notify_one();
201        result
202    }
203
204    pub fn force_shutdown(&self) {
205        self.shutdown_notify.notify_one();
206    }
207
208    fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
209        let me = self.clone();
210        let rt_clone = rt.clone();
211
212        let spawn_result = rt.spawn(BG_LOOP_TAG, |rt_shutdown_notify| async move {
213            let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
214            let tick_interval = Duration::from_millis(tick_interval_ms);
215
216            loop {
217                let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
218
219                tokio::select! {
220                    () = tokio::time::sleep(tick_interval) => {
221                        me.try_scheduled_flush().await;
222                    }
223                    () = rt_shutdown_notify.notified() => {
224                        return; // Runtime Shutdown
225                    }
226                    _ = me.shutdown_notify.notified() => {
227                        return; // EvtLogger Shutdown
228                    }
229                    _ = me.limit_flush_notify.notified(), if can_limit_flush => {
230                        Self::spawn_new_limit_flush_task(&me, &rt_clone);
231                    }
232                }
233
234                me.event_sampler.try_reset_all_sampling();
235            }
236        });
237
238        if let Err(e) = spawn_result {
239            log_e!(TAG, "Failed to spawn background task: {e}");
240        }
241    }
242
243    fn spawn_new_limit_flush_task(inst: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
244        let permit = match inst.limit_flush_semaphore.clone().try_acquire_owned() {
245            Ok(permit) => permit,
246            Err(_) => return,
247        };
248
249        let me = inst.clone();
250        let spawn_result = rt.spawn(LIMIT_FLUSH_TAG, |_| async move {
251            log_d!(TAG, "Attempting limit flush");
252            if !me.flush_next_batch(FlushType::Limit).await {
253                return;
254            }
255
256            loop {
257                if !me.flush_interval.has_completely_recovered_from_backoff() {
258                    break;
259                }
260
261                if !me.queue.contains_at_least_one_full_batch() {
262                    break;
263                }
264
265                if !me.flush_next_batch(FlushType::Limit).await {
266                    break;
267                }
268            }
269
270            drop(permit);
271        });
272
273        if let Err(e) = spawn_result {
274            log_e!(TAG, "Failed to spawn limit flush task: {e}");
275        }
276    }
277
278    async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
279        self.prepare_event_queue_for_flush(flush_type);
280
281        let batches = self.queue.take_all_batches();
282
283        let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
284            match self.log_batch(&mut batch, flush_type).await {
285                Ok(_) => Ok(()),
286                Err(e) => {
287                    if flush_type == FlushType::Manual {
288                        self.flush_interval.adjust_for_failure();
289                        self.try_requeue_failed_batch(&e, batch, flush_type);
290                        return Err(e);
291                    }
292
293                    self.drop_events_for_failure(&e, batch, flush_type);
294                    Err(e)
295                }
296            }
297        }))
298        .await;
299
300        results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
301    }
302
303    async fn try_scheduled_flush(&self) {
304        if !self.flush_interval.has_cooled_from_most_recent_failure() {
305            return;
306        }
307
308        let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
309        let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
310
311        if !should_flush_by_time && !should_flush_by_size {
312            return;
313        }
314
315        self.flush_interval.mark_scheduled_flush_attempt();
316
317        self.flush_next_batch(if should_flush_by_size {
318            FlushType::ScheduledFullBatch
319        } else {
320            FlushType::ScheduledMaxTime
321        })
322        .await;
323    }
324
325    async fn flush_next_batch(&self, flush_type: FlushType) -> bool {
326        self.prepare_event_queue_for_flush(flush_type);
327
328        let mut batch = match self.queue.take_next_batch() {
329            Some(batch) => batch,
330            None => return false,
331        };
332
333        let error = match self.log_batch(&mut batch, flush_type).await {
334            Err(e) => e,
335            Ok(()) => {
336                self.flush_interval.adjust_for_success();
337                return true;
338            }
339        };
340
341        self.flush_interval.adjust_for_failure();
342        self.try_requeue_failed_batch(&error, batch, flush_type);
343
344        false
345    }
346
347    async fn log_batch(
348        &self,
349        batch: &mut EventBatch,
350        flush_type: FlushType,
351    ) -> Result<(), StatsigErr> {
352        let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
353            self.flush_interval.get_current_flush_interval_ms(),
354            self.queue.batch_size,
355            self.queue.max_pending_batches,
356            flush_type.to_string(),
357        );
358
359        let result = self
360            .logging_adapter
361            .log_events(batch.get_log_event_request(statsig_metadata))
362            .await;
363
364        batch.attempts += 1;
365
366        match result {
367            Ok(true) => {
368                self.ops_stats.log_event_request_success(batch.events.len());
369                Ok(())
370            }
371            Ok(false) => Err(StatsigErr::LogEventError("Unknown Failure".into())),
372            Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_))) => Ok(()),
373            Err(e) => Err(e),
374        }
375    }
376
377    fn prepare_event_queue_for_flush(&self, flush_type: FlushType) {
378        self.try_add_non_exposed_checks_event();
379        self.try_log_enqueue_dropped_events();
380
381        let dropped_events_count = match self.queue.reconcile_batching() {
382            QueueReconcileResult::Success => return,
383            QueueReconcileResult::LockFailure => {
384                log_e!(TAG, "prepare_event_queue_for_flush lock failure");
385                return;
386            }
387            QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
388        };
389
390        if dropped_events_count > 0 {
391            self.log_dropped_event_warning(dropped_events_count);
392
393            self.ops_stats.log_batching_dropped_events(
394                StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
395                dropped_events_count,
396                &self.flush_interval,
397                &self.queue,
398                flush_type,
399            );
400        }
401    }
402
403    fn try_requeue_failed_batch(
404        &self,
405        error: &StatsigErr,
406        batch: EventBatch,
407        flush_type: FlushType,
408    ) {
409        let is_non_retryable = matches!(
410            error,
411            StatsigErr::NetworkError(NetworkError::RequestNotRetryable(_, _, _))
412        );
413
414        let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
415
416        if is_non_retryable || is_max_retries {
417            self.drop_events_for_failure(error, batch, flush_type);
418            return;
419        }
420
421        let dropped_events_count = match self.queue.requeue_batch(batch) {
422            QueueReconcileResult::Success => return,
423            QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
424            QueueReconcileResult::LockFailure => {
425                log_e!(TAG, "try_requeue_failed_batch lock failure");
426                return;
427            }
428        };
429
430        if dropped_events_count == 0 {
431            return;
432        }
433
434        self.log_dropped_event_warning(dropped_events_count);
435
436        self.ops_stats.log_batching_dropped_events(
437            StatsigErr::LogEventError(
438                "Dropped events due to max pending event batches limit".to_string(),
439            ),
440            dropped_events_count,
441            &self.flush_interval,
442            &self.queue,
443            flush_type,
444        );
445    }
446
447    fn drop_events_for_failure(
448        &self,
449        error: &StatsigErr,
450        batch: EventBatch,
451        flush_type: FlushType,
452    ) {
453        let dropped_events_count = batch.events.len() as u64;
454
455        let kind = match flush_type {
456            FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
457            FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
458            FlushType::Limit => "Limit",
459            FlushType::Manual => "Manual",
460            FlushType::Shutdown => "Shutdown",
461        };
462
463        log_w!(
464            TAG,
465            "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
466            kind,
467            batch.attempts,
468            dropped_events_count,
469            error
470        );
471
472        self.ops_stats
473            .log_event_request_failure(dropped_events_count, flush_type);
474    }
475
476    fn try_add_non_exposed_checks_event(&self) {
477        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
478        if non_exposed_checks.is_empty() {
479            return;
480        }
481
482        let checks = std::mem::take(&mut *non_exposed_checks);
483        let result = self.queue.add(QueuedEvent::Passthrough(
484            StatsigEventInternal::new_non_exposed_checks_event(checks),
485        ));
486
487        if let QueueAddResult::NeedsFlushAndDropped(dropped_events_count) = result {
488            self.enqueue_dropped_events_count
489                .fetch_add(dropped_events_count, Ordering::Relaxed);
490        }
491    }
492
493    fn try_log_enqueue_dropped_events(&self) {
494        let dropped_events_count = self.enqueue_dropped_events_count.swap(0, Ordering::Relaxed);
495        if dropped_events_count == 0 {
496            return;
497        }
498
499        self.log_dropped_event_warning(dropped_events_count);
500
501        self.ops_stats.log_batching_dropped_events(
502            StatsigErr::LogEventError(
503                "Dropped events due to max pending event batches limit".to_string(),
504            ),
505            dropped_events_count,
506            &self.flush_interval,
507            &self.queue,
508            FlushType::Limit,
509        );
510    }
511
512    fn log_dropped_event_warning(&self, dropped_events_count: u64) {
513        let approximate_pending_events_count = self.queue.approximate_pending_events_count();
514        log_w!(
515            TAG,
516            "Too many events. Dropped {}. Approx pending events {}. Max pending batches {}. Max queue size {}",
517            dropped_events_count,
518            approximate_pending_events_count,
519            self.queue.max_pending_batches,
520            self.queue.batch_size
521        );
522    }
523}