statsig_rust/event_logging/
event_logger.rs

1use super::{
2    event_queue::{
3        batch::EventBatch,
4        queue::{EventQueue, QueueReconcileResult},
5        queued_event::{EnqueueOperation, QueuedEvent},
6    },
7    exposure_sampling::ExposureSampling,
8    flush_interval::FlushInterval,
9    flush_type::FlushType,
10    statsig_event_internal::StatsigEventInternal,
11};
12use crate::{
13    event_logging::{
14        event_logger_constants::EventLoggerConstants, event_queue::queue::QueueAddResult,
15    },
16    log_d, log_e, log_w,
17    networking::NetworkError,
18    observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
19    statsig_metadata::StatsigMetadata,
20    write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
21};
22use parking_lot::RwLock;
23use std::{collections::HashMap, sync::Arc};
24use std::{
25    sync::atomic::{AtomicU64, Ordering},
26    time::Duration,
27};
28use tokio::sync::{Notify, Semaphore};
29
30const BG_LOOP_TAG: &str = "EVT_LOG_BG_LOOP";
31const LIMIT_FLUSH_TAG: &str = "EVT_LOG_LIMIT_FLUSH";
32const DEFAULT_BATCH_SIZE: u32 = 2000;
33const DEFAULT_PENDING_BATCH_MAX: u32 = 100;
34const MAX_LIMIT_FLUSH_TASKS: usize = 5;
35
36#[derive(Debug, PartialEq, Eq, Clone, Copy)]
37pub enum ExposureTrigger {
38    Auto,
39    Manual,
40}
41
42const TAG: &str = stringify!(EvtLogger);
43
44pub struct EventLogger {
45    queue: EventQueue,
46    options: Arc<StatsigOptions>,
47    logging_adapter: Arc<dyn EventLoggingAdapter>,
48    event_sampler: ExposureSampling,
49    non_exposed_checks: RwLock<HashMap<String, u64>>,
50    limit_flush_notify: Notify,
51    limit_flush_semaphore: Arc<Semaphore>,
52    flush_interval: FlushInterval,
53    shutdown_notify: Notify,
54    ops_stats: Arc<OpsStatsForInstance>,
55    enqueue_dropped_events_count: AtomicU64,
56}
57
58impl EventLogger {
59    pub fn new(
60        sdk_key: &str,
61        options: &Arc<StatsigOptions>,
62        event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
63        statsig_rt: &Arc<StatsigRuntime>,
64    ) -> Arc<Self> {
65        let me = Arc::new(Self {
66            queue: EventQueue::new(
67                options
68                    .event_logging_max_queue_size
69                    .unwrap_or(DEFAULT_BATCH_SIZE),
70                options
71                    .event_logging_max_pending_batch_queue_size
72                    .unwrap_or(DEFAULT_PENDING_BATCH_MAX),
73            ),
74            event_sampler: ExposureSampling::new(sdk_key),
75            flush_interval: FlushInterval::new(),
76            options: options.clone(),
77            logging_adapter: event_logging_adapter.clone(),
78            non_exposed_checks: RwLock::new(HashMap::new()),
79            shutdown_notify: Notify::new(),
80            limit_flush_notify: Notify::new(),
81            limit_flush_semaphore: Arc::new(Semaphore::new(MAX_LIMIT_FLUSH_TASKS)),
82            ops_stats: OPS_STATS.get_for_instance(sdk_key),
83            enqueue_dropped_events_count: AtomicU64::new(0),
84        });
85
86        me.spawn_background_task(statsig_rt);
87        me
88    }
89
90    pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
91        if self.options.disable_all_logging == Some(true) {
92            return;
93        }
94
95        let decision = self.event_sampler.get_sampling_decision(&operation);
96        if !decision.should_log() {
97            return;
98        }
99
100        let pending_event = operation.into_queued_event(decision);
101        match self.queue.add(pending_event) {
102            QueueAddResult::Noop => (),
103            QueueAddResult::NeedsFlush => self.limit_flush_notify.notify_one(),
104            QueueAddResult::NeedsFlushAndDropped(dropped_events_count) => {
105                self.enqueue_dropped_events_count
106                    .fetch_add(dropped_events_count, Ordering::Relaxed);
107                self.limit_flush_notify.notify_one();
108            }
109        }
110    }
111
112    pub fn increment_non_exposure_checks(&self, name: &str) {
113        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
114
115        match non_exposed_checks.get_mut(name) {
116            Some(count) => *count += 1,
117            None => {
118                non_exposed_checks.insert(name.into(), 1);
119            }
120        }
121    }
122
123    pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
124        self.try_flush_all_pending_events(FlushType::Manual).await
125    }
126
127    pub async fn shutdown(&self) -> Result<(), StatsigErr> {
128        let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
129        self.shutdown_notify.notify_one();
130        result
131    }
132
133    pub fn force_shutdown(&self) {
134        self.shutdown_notify.notify_one();
135    }
136
137    fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
138        let me = self.clone();
139        let rt_clone = rt.clone();
140
141        let spawn_result = rt.spawn(BG_LOOP_TAG, |rt_shutdown_notify| async move {
142            let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
143            let tick_interval = Duration::from_millis(tick_interval_ms);
144
145            loop {
146                let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
147
148                tokio::select! {
149                    () = tokio::time::sleep(tick_interval) => {
150                        me.try_scheduled_flush().await;
151                    }
152                    () = rt_shutdown_notify.notified() => {
153                        return; // Runtime Shutdown
154                    }
155                    _ = me.shutdown_notify.notified() => {
156                        return; // EvtLogger Shutdown
157                    }
158                    _ = me.limit_flush_notify.notified(), if can_limit_flush => {
159                        Self::spawn_new_limit_flush_task(&me, &rt_clone);
160                    }
161                }
162
163                me.event_sampler.try_reset_all_sampling();
164            }
165        });
166
167        if let Err(e) = spawn_result {
168            log_e!(TAG, "Failed to spawn background task: {e}");
169        }
170    }
171
172    fn spawn_new_limit_flush_task(inst: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
173        let permit = match inst.limit_flush_semaphore.clone().try_acquire_owned() {
174            Ok(permit) => permit,
175            Err(_) => return,
176        };
177
178        let me = inst.clone();
179        let spawn_result = rt.spawn(LIMIT_FLUSH_TAG, |_| async move {
180            log_d!(TAG, "Attempting limit flush");
181            if !me.flush_next_batch(FlushType::Limit).await {
182                return;
183            }
184
185            loop {
186                if !me.flush_interval.has_completely_recovered_from_backoff() {
187                    break;
188                }
189
190                if !me.queue.contains_at_least_one_full_batch() {
191                    break;
192                }
193
194                if !me.flush_next_batch(FlushType::Limit).await {
195                    break;
196                }
197            }
198
199            drop(permit);
200        });
201
202        if let Err(e) = spawn_result {
203            log_e!(TAG, "Failed to spawn limit flush task: {e}");
204        }
205    }
206
207    async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
208        self.prepare_event_queue_for_flush(flush_type);
209
210        let batches = self.queue.take_all_batches();
211
212        let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
213            match self.log_batch(&mut batch, flush_type).await {
214                Ok(_) => Ok(()),
215                Err(e) => {
216                    if flush_type == FlushType::Manual {
217                        self.flush_interval.adjust_for_failure();
218                        self.try_requeue_failed_batch(&e, batch, flush_type);
219                        return Err(e);
220                    }
221
222                    self.drop_events_for_failure(&e, batch, flush_type);
223                    Err(e)
224                }
225            }
226        }))
227        .await;
228
229        results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
230    }
231
232    async fn try_scheduled_flush(&self) {
233        if !self.flush_interval.has_cooled_from_most_recent_failure() {
234            return;
235        }
236
237        let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
238        let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
239
240        if !should_flush_by_time && !should_flush_by_size {
241            return;
242        }
243
244        self.flush_interval.mark_scheduled_flush_attempt();
245
246        self.flush_next_batch(if should_flush_by_size {
247            FlushType::ScheduledFullBatch
248        } else {
249            FlushType::ScheduledMaxTime
250        })
251        .await;
252    }
253
254    async fn flush_next_batch(&self, flush_type: FlushType) -> bool {
255        self.prepare_event_queue_for_flush(flush_type);
256
257        let mut batch = match self.queue.take_next_batch() {
258            Some(batch) => batch,
259            None => return false,
260        };
261
262        let error = match self.log_batch(&mut batch, flush_type).await {
263            Err(e) => e,
264            Ok(()) => {
265                self.flush_interval.adjust_for_success();
266                return true;
267            }
268        };
269
270        self.flush_interval.adjust_for_failure();
271        self.try_requeue_failed_batch(&error, batch, flush_type);
272
273        false
274    }
275
276    async fn log_batch(
277        &self,
278        batch: &mut EventBatch,
279        flush_type: FlushType,
280    ) -> Result<(), StatsigErr> {
281        let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
282            self.flush_interval.get_current_flush_interval_ms(),
283            self.queue.batch_size,
284            self.queue.max_pending_batches,
285            flush_type.to_string(),
286        );
287
288        let result = self
289            .logging_adapter
290            .log_events(batch.get_log_event_request(statsig_metadata))
291            .await;
292
293        batch.attempts += 1;
294
295        match result {
296            Ok(true) => {
297                self.ops_stats.log_event_request_success(batch.events.len());
298                Ok(())
299            }
300            Ok(false) => Err(StatsigErr::LogEventError("Unknown Failure".into())),
301            Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_))) => Ok(()),
302            Err(e) => Err(e),
303        }
304    }
305
306    fn prepare_event_queue_for_flush(&self, flush_type: FlushType) {
307        self.try_add_non_exposed_checks_event();
308        self.try_log_enqueue_dropped_events();
309
310        let dropped_events_count = match self.queue.reconcile_batching() {
311            QueueReconcileResult::Success => return,
312            QueueReconcileResult::LockFailure => {
313                log_e!(TAG, "prepare_event_queue_for_flush lock failure");
314                return;
315            }
316            QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
317        };
318
319        if dropped_events_count > 0 {
320            self.log_dropped_event_warning(dropped_events_count);
321
322            self.ops_stats.log_batching_dropped_events(
323                StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
324                dropped_events_count,
325                &self.flush_interval,
326                &self.queue,
327                flush_type,
328            );
329        }
330    }
331
332    fn try_requeue_failed_batch(
333        &self,
334        error: &StatsigErr,
335        batch: EventBatch,
336        flush_type: FlushType,
337    ) {
338        let is_non_retryable = matches!(
339            error,
340            StatsigErr::NetworkError(NetworkError::RequestNotRetryable(_, _, _))
341        );
342
343        let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
344
345        if is_non_retryable || is_max_retries {
346            self.drop_events_for_failure(error, batch, flush_type);
347            return;
348        }
349
350        let dropped_events_count = match self.queue.requeue_batch(batch) {
351            QueueReconcileResult::Success => return,
352            QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
353            QueueReconcileResult::LockFailure => {
354                log_e!(TAG, "try_requeue_failed_batch lock failure");
355                return;
356            }
357        };
358
359        if dropped_events_count == 0 {
360            return;
361        }
362
363        self.log_dropped_event_warning(dropped_events_count);
364
365        self.ops_stats.log_batching_dropped_events(
366            StatsigErr::LogEventError(
367                "Dropped events due to max pending event batches limit".to_string(),
368            ),
369            dropped_events_count,
370            &self.flush_interval,
371            &self.queue,
372            flush_type,
373        );
374    }
375
376    fn drop_events_for_failure(
377        &self,
378        error: &StatsigErr,
379        batch: EventBatch,
380        flush_type: FlushType,
381    ) {
382        let dropped_events_count = batch.events.len() as u64;
383
384        let kind = match flush_type {
385            FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
386            FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
387            FlushType::Limit => "Limit",
388            FlushType::Manual => "Manual",
389            FlushType::Shutdown => "Shutdown",
390        };
391
392        log_w!(
393            TAG,
394            "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
395            kind,
396            batch.attempts,
397            dropped_events_count,
398            error
399        );
400
401        self.ops_stats
402            .log_event_request_failure(dropped_events_count, flush_type);
403    }
404
405    fn try_add_non_exposed_checks_event(&self) {
406        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
407        if non_exposed_checks.is_empty() {
408            return;
409        }
410
411        let checks = std::mem::take(&mut *non_exposed_checks);
412        let result = self.queue.add(QueuedEvent::Passthrough(
413            StatsigEventInternal::new_non_exposed_checks_event(checks),
414        ));
415
416        if let QueueAddResult::NeedsFlushAndDropped(dropped_events_count) = result {
417            self.enqueue_dropped_events_count
418                .fetch_add(dropped_events_count, Ordering::Relaxed);
419        }
420    }
421
422    fn try_log_enqueue_dropped_events(&self) {
423        let dropped_events_count = self.enqueue_dropped_events_count.swap(0, Ordering::Relaxed);
424        if dropped_events_count == 0 {
425            return;
426        }
427
428        self.log_dropped_event_warning(dropped_events_count);
429
430        self.ops_stats.log_batching_dropped_events(
431            StatsigErr::LogEventError(
432                "Dropped events due to max pending event batches limit".to_string(),
433            ),
434            dropped_events_count,
435            &self.flush_interval,
436            &self.queue,
437            FlushType::Limit,
438        );
439    }
440
441    fn log_dropped_event_warning(&self, dropped_events_count: u64) {
442        let approximate_pending_events_count = self.queue.approximate_pending_events_count();
443        log_w!(
444            TAG,
445            "Too many events. Dropped {}. Approx pending events {}. Max pending batches {}. Max queue size {}",
446            dropped_events_count,
447            approximate_pending_events_count,
448            self.queue.max_pending_batches,
449            self.queue.batch_size
450        );
451    }
452}