Skip to main content

statsig_rust/event_logging/
event_logger.rs

1use super::{
2    event_queue::{
3        batch::EventBatch,
4        queue::{EventQueue, QueueReconcileResult},
5        queued_event::{EnqueueOperation, QueuedEvent},
6    },
7    exposure_sampling::ExposureSampling,
8    flush_interval::FlushInterval,
9    flush_type::FlushType,
10    statsig_event_internal::StatsigEventInternal,
11};
12use crate::{
13    event_logging::{
14        event_logger_constants::EventLoggerConstants, event_queue::queue::QueueAddResult,
15    },
16    log_d, log_e, log_w,
17    networking::NetworkError,
18    observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
19    statsig_metadata::StatsigMetadata,
20    write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
21};
22use parking_lot::RwLock;
23use std::{collections::HashMap, sync::Arc};
24use std::{
25    sync::atomic::{AtomicU64, Ordering},
26    time::Duration,
27};
28use tokio::sync::{Notify, Semaphore};
29
30pub const MIN_BATCH_SIZE: u32 = 10;
31pub const MAX_BATCH_SIZE: u32 = 2000;
32
33pub const MIN_PENDING_BATCH_COUNT: u32 = 1;
34pub const DEFAULT_PENDING_BATCH_COUNT_MAX: u32 = 100;
35
36const BG_LOOP_TAG: &str = "EVT_LOG_BG_LOOP";
37const LIMIT_FLUSH_TAG: &str = "EVT_LOG_LIMIT_FLUSH";
38const DEFAULT_BATCH_SIZE: u32 = MAX_BATCH_SIZE;
39const MAX_LIMIT_FLUSH_TASKS: usize = 5;
40
41#[derive(Debug, PartialEq, Eq, Clone, Copy)]
42pub enum ExposureTrigger {
43    Auto,
44    Manual,
45}
46
47const TAG: &str = stringify!(EvtLogger);
48
49pub struct EventLogger {
50    queue: EventQueue,
51    options: Arc<StatsigOptions>,
52    logging_adapter: Arc<dyn EventLoggingAdapter>,
53    event_sampler: ExposureSampling,
54    non_exposed_checks: RwLock<HashMap<String, u64>>,
55    limit_flush_notify: Notify,
56    limit_flush_semaphore: Arc<Semaphore>,
57    flush_interval: FlushInterval,
58    shutdown_notify: Notify,
59    ops_stats: Arc<OpsStatsForInstance>,
60    enqueue_dropped_events_count: AtomicU64,
61}
62
63impl EventLogger {
64    pub fn new(
65        sdk_key: &str,
66        options: &Arc<StatsigOptions>,
67        event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
68        statsig_rt: &Arc<StatsigRuntime>,
69    ) -> Arc<Self> {
70        let me = Arc::new(Self {
71            queue: EventQueue::new(
72                options
73                    .event_logging_max_queue_size
74                    .unwrap_or(DEFAULT_BATCH_SIZE),
75                options
76                    .event_logging_max_pending_batch_queue_size
77                    .unwrap_or(DEFAULT_PENDING_BATCH_COUNT_MAX),
78            ),
79            event_sampler: ExposureSampling::new(sdk_key),
80            flush_interval: FlushInterval::new(),
81            options: options.clone(),
82            logging_adapter: event_logging_adapter.clone(),
83            non_exposed_checks: RwLock::new(HashMap::new()),
84            shutdown_notify: Notify::new(),
85            limit_flush_notify: Notify::new(),
86            limit_flush_semaphore: Arc::new(Semaphore::new(MAX_LIMIT_FLUSH_TASKS)),
87            ops_stats: OPS_STATS.get_for_instance(sdk_key),
88            enqueue_dropped_events_count: AtomicU64::new(0),
89        });
90
91        me.spawn_background_task(statsig_rt);
92        me
93    }
94
95    pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
96        if self.options.disable_all_logging == Some(true) {
97            return;
98        }
99
100        let decision = self.event_sampler.get_sampling_decision(&operation);
101        if !decision.should_log() {
102            return;
103        }
104
105        let pending_event = operation.into_queued_event(decision);
106        match self.queue.add(pending_event) {
107            QueueAddResult::Noop => (),
108            QueueAddResult::NeedsFlush => self.limit_flush_notify.notify_one(),
109            QueueAddResult::NeedsFlushAndDropped(dropped_events_count) => {
110                self.enqueue_dropped_events_count
111                    .fetch_add(dropped_events_count, Ordering::Relaxed);
112                self.limit_flush_notify.notify_one();
113            }
114        }
115    }
116
117    pub fn increment_non_exposure_checks(&self, name: &str) {
118        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
119
120        match non_exposed_checks.get_mut(name) {
121            Some(count) => *count += 1,
122            None => {
123                non_exposed_checks.insert(name.into(), 1);
124            }
125        }
126    }
127
128    pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
129        self.try_flush_all_pending_events(FlushType::Manual).await
130    }
131
132    pub async fn shutdown(&self) -> Result<(), StatsigErr> {
133        let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
134        self.shutdown_notify.notify_one();
135        result
136    }
137
138    pub fn force_shutdown(&self) {
139        self.shutdown_notify.notify_one();
140    }
141
142    fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
143        let me = self.clone();
144        let rt_clone = rt.clone();
145
146        let spawn_result = rt.spawn(BG_LOOP_TAG, |rt_shutdown_notify| async move {
147            let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
148            let tick_interval = Duration::from_millis(tick_interval_ms);
149
150            loop {
151                let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
152
153                tokio::select! {
154                    () = tokio::time::sleep(tick_interval) => {
155                        me.try_scheduled_flush().await;
156                    }
157                    () = rt_shutdown_notify.notified() => {
158                        return; // Runtime Shutdown
159                    }
160                    _ = me.shutdown_notify.notified() => {
161                        return; // EvtLogger Shutdown
162                    }
163                    _ = me.limit_flush_notify.notified(), if can_limit_flush => {
164                        Self::spawn_new_limit_flush_task(&me, &rt_clone);
165                    }
166                }
167
168                me.event_sampler.try_reset_all_sampling();
169            }
170        });
171
172        if let Err(e) = spawn_result {
173            log_e!(TAG, "Failed to spawn background task: {e}");
174        }
175    }
176
177    fn spawn_new_limit_flush_task(inst: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
178        let permit = match inst.limit_flush_semaphore.clone().try_acquire_owned() {
179            Ok(permit) => permit,
180            Err(_) => return,
181        };
182
183        let me = inst.clone();
184        let spawn_result = rt.spawn(LIMIT_FLUSH_TAG, |_| async move {
185            log_d!(TAG, "Attempting limit flush");
186            if !me.flush_next_batch(FlushType::Limit).await {
187                return;
188            }
189
190            loop {
191                if !me.flush_interval.has_completely_recovered_from_backoff() {
192                    break;
193                }
194
195                if !me.queue.contains_at_least_one_full_batch() {
196                    break;
197                }
198
199                if !me.flush_next_batch(FlushType::Limit).await {
200                    break;
201                }
202            }
203
204            drop(permit);
205        });
206
207        if let Err(e) = spawn_result {
208            log_e!(TAG, "Failed to spawn limit flush task: {e}");
209        }
210    }
211
212    async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
213        self.prepare_event_queue_for_flush(flush_type);
214
215        let batches = self.queue.take_all_batches();
216
217        let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
218            match self.log_batch(&mut batch, flush_type).await {
219                Ok(_) => Ok(()),
220                Err(e) => {
221                    if flush_type == FlushType::Manual {
222                        self.flush_interval.adjust_for_failure();
223                        self.try_requeue_failed_batch(&e, batch, flush_type);
224                        return Err(e);
225                    }
226
227                    self.drop_events_for_failure(&e, batch, flush_type);
228                    Err(e)
229                }
230            }
231        }))
232        .await;
233
234        results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
235    }
236
237    async fn try_scheduled_flush(&self) {
238        if !self.flush_interval.has_cooled_from_most_recent_failure() {
239            return;
240        }
241
242        let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
243        let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
244
245        if !should_flush_by_time && !should_flush_by_size {
246            return;
247        }
248
249        self.flush_interval.mark_scheduled_flush_attempt();
250
251        self.flush_next_batch(if should_flush_by_size {
252            FlushType::ScheduledFullBatch
253        } else {
254            FlushType::ScheduledMaxTime
255        })
256        .await;
257    }
258
259    async fn flush_next_batch(&self, flush_type: FlushType) -> bool {
260        self.prepare_event_queue_for_flush(flush_type);
261
262        let mut batch = match self.queue.take_next_batch() {
263            Some(batch) => batch,
264            None => return false,
265        };
266
267        let error = match self.log_batch(&mut batch, flush_type).await {
268            Err(e) => e,
269            Ok(()) => {
270                self.flush_interval.adjust_for_success();
271                return true;
272            }
273        };
274
275        self.flush_interval.adjust_for_failure();
276        self.try_requeue_failed_batch(&error, batch, flush_type);
277
278        false
279    }
280
281    async fn log_batch(
282        &self,
283        batch: &mut EventBatch,
284        flush_type: FlushType,
285    ) -> Result<(), StatsigErr> {
286        let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
287            self.flush_interval.get_current_flush_interval_ms(),
288            self.queue.batch_size,
289            self.queue.max_pending_batches,
290            flush_type.to_string(),
291        );
292
293        let result = self
294            .logging_adapter
295            .log_events(batch.get_log_event_request(statsig_metadata))
296            .await;
297
298        batch.attempts += 1;
299
300        match result {
301            Ok(true) => {
302                self.ops_stats.log_event_request_success(batch.events.len());
303                Ok(())
304            }
305            Ok(false) => Err(StatsigErr::LogEventError("Unknown Failure".into())),
306            Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_))) => Ok(()),
307            Err(e) => Err(e),
308        }
309    }
310
311    fn prepare_event_queue_for_flush(&self, flush_type: FlushType) {
312        self.try_add_non_exposed_checks_event();
313        self.try_log_enqueue_dropped_events();
314
315        let dropped_events_count = match self.queue.reconcile_batching() {
316            QueueReconcileResult::Success => return,
317            QueueReconcileResult::LockFailure => {
318                log_e!(TAG, "prepare_event_queue_for_flush lock failure");
319                return;
320            }
321            QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
322        };
323
324        if dropped_events_count > 0 {
325            self.log_dropped_event_warning(dropped_events_count);
326
327            self.ops_stats.log_batching_dropped_events(
328                StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
329                dropped_events_count,
330                &self.flush_interval,
331                &self.queue,
332                flush_type,
333            );
334        }
335    }
336
337    fn try_requeue_failed_batch(
338        &self,
339        error: &StatsigErr,
340        batch: EventBatch,
341        flush_type: FlushType,
342    ) {
343        let is_non_retryable = matches!(
344            error,
345            StatsigErr::NetworkError(NetworkError::RequestNotRetryable(_, _, _))
346        );
347
348        let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
349
350        if is_non_retryable || is_max_retries {
351            self.drop_events_for_failure(error, batch, flush_type);
352            return;
353        }
354
355        let dropped_events_count = match self.queue.requeue_batch(batch) {
356            QueueReconcileResult::Success => return,
357            QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
358            QueueReconcileResult::LockFailure => {
359                log_e!(TAG, "try_requeue_failed_batch lock failure");
360                return;
361            }
362        };
363
364        if dropped_events_count == 0 {
365            return;
366        }
367
368        self.log_dropped_event_warning(dropped_events_count);
369
370        self.ops_stats.log_batching_dropped_events(
371            StatsigErr::LogEventError(
372                "Dropped events due to max pending event batches limit".to_string(),
373            ),
374            dropped_events_count,
375            &self.flush_interval,
376            &self.queue,
377            flush_type,
378        );
379    }
380
381    fn drop_events_for_failure(
382        &self,
383        error: &StatsigErr,
384        batch: EventBatch,
385        flush_type: FlushType,
386    ) {
387        let dropped_events_count = batch.events.len() as u64;
388
389        let kind = match flush_type {
390            FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
391            FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
392            FlushType::Limit => "Limit",
393            FlushType::Manual => "Manual",
394            FlushType::Shutdown => "Shutdown",
395        };
396
397        log_w!(
398            TAG,
399            "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
400            kind,
401            batch.attempts,
402            dropped_events_count,
403            error
404        );
405
406        self.ops_stats
407            .log_event_request_failure(dropped_events_count, flush_type);
408    }
409
410    fn try_add_non_exposed_checks_event(&self) {
411        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
412        if non_exposed_checks.is_empty() {
413            return;
414        }
415
416        let checks = std::mem::take(&mut *non_exposed_checks);
417        let result = self.queue.add(QueuedEvent::Passthrough(
418            StatsigEventInternal::new_non_exposed_checks_event(checks),
419        ));
420
421        if let QueueAddResult::NeedsFlushAndDropped(dropped_events_count) = result {
422            self.enqueue_dropped_events_count
423                .fetch_add(dropped_events_count, Ordering::Relaxed);
424        }
425    }
426
427    fn try_log_enqueue_dropped_events(&self) {
428        let dropped_events_count = self.enqueue_dropped_events_count.swap(0, Ordering::Relaxed);
429        if dropped_events_count == 0 {
430            return;
431        }
432
433        self.log_dropped_event_warning(dropped_events_count);
434
435        self.ops_stats.log_batching_dropped_events(
436            StatsigErr::LogEventError(
437                "Dropped events due to max pending event batches limit".to_string(),
438            ),
439            dropped_events_count,
440            &self.flush_interval,
441            &self.queue,
442            FlushType::Limit,
443        );
444    }
445
446    fn log_dropped_event_warning(&self, dropped_events_count: u64) {
447        let approximate_pending_events_count = self.queue.approximate_pending_events_count();
448        log_w!(
449            TAG,
450            "Too many events. Dropped {}. Approx pending events {}. Max pending batches {}. Max queue size {}",
451            dropped_events_count,
452            approximate_pending_events_count,
453            self.queue.max_pending_batches,
454            self.queue.batch_size
455        );
456    }
457}