statsig_rust/event_logging/
event_logger.rs

1use super::{
2    event_queue::{
3        batch::EventBatch,
4        queue::{EventQueue, QueueReconcileResult},
5        queued_event::{EnqueueOperation, QueuedEvent},
6    },
7    exposure_sampling::ExposureSampling,
8    flush_interval::FlushInterval,
9    flush_type::FlushType,
10    statsig_event_internal::StatsigEventInternal,
11};
12use crate::{
13    event_logging::{
14        event_logger_constants::EventLoggerConstants, event_queue::queue::QueueAddResult,
15    },
16    log_d, log_e, log_w,
17    networking::NetworkError,
18    observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
19    statsig_metadata::StatsigMetadata,
20    write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
21};
22use std::{
23    collections::HashMap,
24    sync::{Arc, RwLock},
25};
26use std::{
27    sync::atomic::{AtomicU64, Ordering},
28    time::Duration,
29};
30use tokio::sync::{Notify, Semaphore};
31
32const BG_LOOP_TAG: &str = "EVT_LOG_BG_LOOP";
33const LIMIT_FLUSH_TAG: &str = "EVT_LOG_LIMIT_FLUSH";
34const DEFAULT_BATCH_SIZE: u32 = 2000;
35const DEFAULT_PENDING_BATCH_MAX: u32 = 100;
36const MAX_LIMIT_FLUSH_TASKS: usize = 5;
37
38#[derive(Debug, PartialEq, Eq, Clone, Copy)]
39pub enum ExposureTrigger {
40    Auto,
41    Manual,
42}
43
44const TAG: &str = stringify!(EvtLogger);
45
46pub struct EventLogger {
47    queue: EventQueue,
48    options: Arc<StatsigOptions>,
49    logging_adapter: Arc<dyn EventLoggingAdapter>,
50    event_sampler: ExposureSampling,
51    non_exposed_checks: RwLock<HashMap<String, u64>>,
52    limit_flush_notify: Notify,
53    limit_flush_semaphore: Arc<Semaphore>,
54    flush_interval: FlushInterval,
55    shutdown_notify: Notify,
56    ops_stats: Arc<OpsStatsForInstance>,
57    enqueue_dropped_events_count: AtomicU64,
58}
59
60impl EventLogger {
61    pub fn new(
62        sdk_key: &str,
63        options: &Arc<StatsigOptions>,
64        event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
65        statsig_rt: &Arc<StatsigRuntime>,
66    ) -> Arc<Self> {
67        let me = Arc::new(Self {
68            queue: EventQueue::new(
69                options
70                    .event_logging_max_queue_size
71                    .unwrap_or(DEFAULT_BATCH_SIZE),
72                options
73                    .event_logging_max_pending_batch_queue_size
74                    .unwrap_or(DEFAULT_PENDING_BATCH_MAX),
75            ),
76            event_sampler: ExposureSampling::new(sdk_key),
77            flush_interval: FlushInterval::new(),
78            options: options.clone(),
79            logging_adapter: event_logging_adapter.clone(),
80            non_exposed_checks: RwLock::new(HashMap::new()),
81            shutdown_notify: Notify::new(),
82            limit_flush_notify: Notify::new(),
83            limit_flush_semaphore: Arc::new(Semaphore::new(MAX_LIMIT_FLUSH_TASKS)),
84            ops_stats: OPS_STATS.get_for_instance(sdk_key),
85            enqueue_dropped_events_count: AtomicU64::new(0),
86        });
87
88        me.spawn_background_task(statsig_rt);
89        me
90    }
91
92    pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
93        if self.options.disable_all_logging == Some(true) {
94            return;
95        }
96
97        let decision = self.event_sampler.get_sampling_decision(&operation);
98        if !decision.should_log() {
99            return;
100        }
101
102        let pending_event = operation.into_queued_event(decision);
103        match self.queue.add(pending_event) {
104            QueueAddResult::Noop => (),
105            QueueAddResult::NeedsFlush => self.limit_flush_notify.notify_one(),
106            QueueAddResult::NeedsFlushAndDropped(dropped_events_count) => {
107                self.enqueue_dropped_events_count
108                    .fetch_add(dropped_events_count, Ordering::Relaxed);
109                self.limit_flush_notify.notify_one();
110            }
111        }
112    }
113
114    pub fn increment_non_exposure_checks(&self, name: &str) {
115        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
116
117        match non_exposed_checks.get_mut(name) {
118            Some(count) => *count += 1,
119            None => {
120                non_exposed_checks.insert(name.into(), 1);
121            }
122        }
123    }
124
125    pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
126        self.try_flush_all_pending_events(FlushType::Manual).await
127    }
128
129    pub async fn shutdown(&self) -> Result<(), StatsigErr> {
130        let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
131        self.shutdown_notify.notify_one();
132        result
133    }
134
135    pub fn force_shutdown(&self) {
136        self.shutdown_notify.notify_one();
137    }
138
139    fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
140        let me = self.clone();
141        let rt_clone = rt.clone();
142
143        rt.spawn(BG_LOOP_TAG, |rt_shutdown_notify| async move {
144            let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
145            let tick_interval = Duration::from_millis(tick_interval_ms);
146
147            loop {
148                let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
149
150                tokio::select! {
151                    () = tokio::time::sleep(tick_interval) => {
152                        me.try_scheduled_flush().await;
153                    }
154                    () = rt_shutdown_notify.notified() => {
155                        return; // Runtime Shutdown
156                    }
157                    _ = me.shutdown_notify.notified() => {
158                        return; // EvtLogger Shutdown
159                    }
160                    _ = me.limit_flush_notify.notified(), if can_limit_flush => {
161                        Self::spawn_new_limit_flush_task(&me, &rt_clone);
162                    }
163                }
164
165                me.event_sampler.try_reset_all_sampling();
166            }
167        });
168    }
169
170    fn spawn_new_limit_flush_task(inst: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
171        let permit = match inst.limit_flush_semaphore.clone().try_acquire_owned() {
172            Ok(permit) => permit,
173            Err(_) => return,
174        };
175
176        let me = inst.clone();
177        rt.spawn(LIMIT_FLUSH_TAG, |_| async move {
178            log_d!(TAG, "Attempting limit flush");
179            if !me.flush_next_batch(FlushType::Limit).await {
180                return;
181            }
182
183            loop {
184                if !me.flush_interval.has_completely_recovered_from_backoff() {
185                    break;
186                }
187
188                if !me.queue.contains_at_least_one_full_batch() {
189                    break;
190                }
191
192                if !me.flush_next_batch(FlushType::Limit).await {
193                    break;
194                }
195            }
196
197            drop(permit);
198        });
199    }
200
201    async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
202        self.prepare_event_queue_for_flush(flush_type);
203
204        let batches = self.queue.take_all_batches();
205
206        let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
207            match self.log_batch(&mut batch, flush_type).await {
208                Ok(_) => Ok(()),
209                Err(e) => {
210                    if flush_type == FlushType::Manual {
211                        self.flush_interval.adjust_for_failure();
212                        self.try_requeue_failed_batch(&e, batch, flush_type);
213                        return Err(e);
214                    }
215
216                    self.drop_events_for_failure(&e, batch, flush_type);
217                    Err(e)
218                }
219            }
220        }))
221        .await;
222
223        results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
224    }
225
226    async fn try_scheduled_flush(&self) {
227        if !self.flush_interval.has_cooled_from_most_recent_failure() {
228            return;
229        }
230
231        let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
232        let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
233
234        if !should_flush_by_time && !should_flush_by_size {
235            return;
236        }
237
238        self.flush_interval.mark_scheduled_flush_attempt();
239
240        self.flush_next_batch(if should_flush_by_size {
241            FlushType::ScheduledFullBatch
242        } else {
243            FlushType::ScheduledMaxTime
244        })
245        .await;
246    }
247
248    async fn flush_next_batch(&self, flush_type: FlushType) -> bool {
249        self.prepare_event_queue_for_flush(flush_type);
250
251        let mut batch = match self.queue.take_next_batch() {
252            Some(batch) => batch,
253            None => return false,
254        };
255
256        let error = match self.log_batch(&mut batch, flush_type).await {
257            Err(e) => e,
258            Ok(()) => {
259                self.flush_interval.adjust_for_success();
260                return true;
261            }
262        };
263
264        self.flush_interval.adjust_for_failure();
265        self.try_requeue_failed_batch(&error, batch, flush_type);
266
267        false
268    }
269
270    async fn log_batch(
271        &self,
272        batch: &mut EventBatch,
273        flush_type: FlushType,
274    ) -> Result<(), StatsigErr> {
275        let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
276            self.flush_interval.get_current_flush_interval_ms(),
277            self.queue.batch_size,
278            self.queue.max_pending_batches,
279            flush_type.to_string(),
280        );
281
282        let result = self
283            .logging_adapter
284            .log_events(batch.get_log_event_request(statsig_metadata))
285            .await;
286
287        batch.attempts += 1;
288
289        match result {
290            Ok(true) => {
291                self.ops_stats.log_event_request_success(batch.events.len());
292                Ok(())
293            }
294            Ok(false) => Err(StatsigErr::LogEventError("Unknown Failure".into())),
295            Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_))) => Ok(()),
296            Err(e) => Err(e),
297        }
298    }
299
300    fn prepare_event_queue_for_flush(&self, flush_type: FlushType) {
301        self.try_add_non_exposed_checks_event();
302        self.try_log_enqueue_dropped_events();
303
304        let dropped_events_count = match self.queue.reconcile_batching() {
305            QueueReconcileResult::Success => return,
306            QueueReconcileResult::LockFailure => {
307                log_e!(TAG, "prepare_event_queue_for_flush lock failure");
308                return;
309            }
310            QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
311        };
312
313        if dropped_events_count > 0 {
314            self.log_dropped_event_warning(dropped_events_count);
315
316            self.ops_stats.log_batching_dropped_events(
317                StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
318                dropped_events_count,
319                &self.flush_interval,
320                &self.queue,
321                flush_type,
322            );
323        }
324    }
325
326    fn try_requeue_failed_batch(
327        &self,
328        error: &StatsigErr,
329        batch: EventBatch,
330        flush_type: FlushType,
331    ) {
332        let is_non_retryable = matches!(
333            error,
334            StatsigErr::NetworkError(NetworkError::RequestNotRetryable(_, _, _))
335        );
336
337        let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
338
339        if is_non_retryable || is_max_retries {
340            self.drop_events_for_failure(error, batch, flush_type);
341            return;
342        }
343
344        let dropped_events_count = match self.queue.requeue_batch(batch) {
345            QueueReconcileResult::Success => return,
346            QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
347            QueueReconcileResult::LockFailure => {
348                log_e!(TAG, "try_requeue_failed_batch lock failure");
349                return;
350            }
351        };
352
353        if dropped_events_count == 0 {
354            return;
355        }
356
357        self.log_dropped_event_warning(dropped_events_count);
358
359        self.ops_stats.log_batching_dropped_events(
360            StatsigErr::LogEventError(
361                "Dropped events due to max pending event batches limit".to_string(),
362            ),
363            dropped_events_count,
364            &self.flush_interval,
365            &self.queue,
366            flush_type,
367        );
368    }
369
370    fn drop_events_for_failure(
371        &self,
372        error: &StatsigErr,
373        batch: EventBatch,
374        flush_type: FlushType,
375    ) {
376        let dropped_events_count = batch.events.len() as u64;
377
378        let kind = match flush_type {
379            FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
380            FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
381            FlushType::Limit => "Limit",
382            FlushType::Manual => "Manual",
383            FlushType::Shutdown => "Shutdown",
384        };
385
386        log_w!(
387            TAG,
388            "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
389            kind,
390            batch.attempts,
391            dropped_events_count,
392            error
393        );
394
395        self.ops_stats
396            .log_event_request_failure(dropped_events_count, flush_type);
397    }
398
399    fn try_add_non_exposed_checks_event(&self) {
400        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
401        if non_exposed_checks.is_empty() {
402            return;
403        }
404
405        let checks = std::mem::take(&mut *non_exposed_checks);
406        let result = self.queue.add(QueuedEvent::Passthrough(
407            StatsigEventInternal::new_non_exposed_checks_event(checks),
408        ));
409
410        if let QueueAddResult::NeedsFlushAndDropped(dropped_events_count) = result {
411            self.enqueue_dropped_events_count
412                .fetch_add(dropped_events_count, Ordering::Relaxed);
413        }
414    }
415
416    fn try_log_enqueue_dropped_events(&self) {
417        let dropped_events_count = self.enqueue_dropped_events_count.swap(0, Ordering::Relaxed);
418        if dropped_events_count == 0 {
419            return;
420        }
421
422        self.log_dropped_event_warning(dropped_events_count);
423
424        self.ops_stats.log_batching_dropped_events(
425            StatsigErr::LogEventError(
426                "Dropped events due to max pending event batches limit".to_string(),
427            ),
428            dropped_events_count,
429            &self.flush_interval,
430            &self.queue,
431            FlushType::Limit,
432        );
433    }
434
435    fn log_dropped_event_warning(&self, dropped_events_count: u64) {
436        let approximate_pending_events_count = self.queue.approximate_pending_events_count();
437        log_w!(
438            TAG,
439            "Too many events. Dropped {}. Approx pending events {}. Max pending batches {}. Max queue size {}",
440            dropped_events_count,
441            approximate_pending_events_count,
442            self.queue.max_pending_batches,
443            self.queue.batch_size
444        );
445    }
446}