statsig_rust/event_logging/
event_logger.rs

1use super::{
2    event_queue::{
3        batch::EventBatch,
4        queue::{EventQueue, QueueResult},
5        queued_event::{EnqueueOperation, QueuedEvent},
6    },
7    exposure_sampling::ExposureSampling,
8    flush_interval::FlushInterval,
9    statsig_event_internal::StatsigEventInternal,
10};
11use crate::{
12    event_logging::event_logger_constants::EventLoggerConstants,
13    log_d, log_w,
14    networking::NetworkError,
15    observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
16    statsig_metadata::StatsigMetadata,
17    write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
18};
19use std::time::Duration;
20use std::{
21    collections::HashMap,
22    sync::{Arc, RwLock},
23};
24use tokio::sync::Notify;
25
26const BG_TASK_TAG: &str = "EVENT_LOGGER_BG_TASK";
27const DEFAULT_BATCH_SIZE: u32 = 2000;
28const DEFAULT_PENDING_BATCH_MAX: u32 = 20;
29
30#[derive(Debug, PartialEq, Eq, Clone, Copy)]
31pub enum ExposureTrigger {
32    Auto,
33    Manual,
34}
35
36const TAG: &str = stringify!(EvtLogger);
37
38pub struct EventLogger {
39    queue: EventQueue,
40    options: Arc<StatsigOptions>,
41    logging_adapter: Arc<dyn EventLoggingAdapter>,
42    event_sampler: ExposureSampling,
43    non_exposed_checks: RwLock<HashMap<String, u64>>,
44    limit_flush_notify: Notify,
45    flush_interval: FlushInterval,
46    shutdown_notify: Notify,
47    ops_stats: Arc<OpsStatsForInstance>,
48}
49
50impl EventLogger {
51    pub fn new(
52        sdk_key: &str,
53        options: &Arc<StatsigOptions>,
54        event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
55        statsig_rt: &Arc<StatsigRuntime>,
56    ) -> Arc<Self> {
57        let me = Arc::new(Self {
58            queue: EventQueue::new(
59                options
60                    .event_logging_max_queue_size
61                    .unwrap_or(DEFAULT_BATCH_SIZE),
62                options
63                    .event_logging_max_pending_batch_queue_size
64                    .unwrap_or(DEFAULT_PENDING_BATCH_MAX),
65            ),
66            event_sampler: ExposureSampling::new(sdk_key),
67            flush_interval: FlushInterval::new(),
68            options: options.clone(),
69            logging_adapter: event_logging_adapter.clone(),
70            non_exposed_checks: RwLock::new(HashMap::new()),
71            shutdown_notify: Notify::new(),
72            limit_flush_notify: Notify::new(),
73            ops_stats: OPS_STATS.get_for_instance(sdk_key),
74        });
75
76        me.spawn_background_task(statsig_rt);
77        me
78    }
79
80    pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
81        if self.options.disable_all_logging == Some(true) {
82            return;
83        }
84
85        let decision = self.event_sampler.get_sampling_decision(&operation);
86        if !decision.should_log() {
87            return;
88        }
89
90        let pending_event = operation.into_queued_event(decision);
91        if self.queue.add(pending_event) {
92            self.limit_flush_notify.notify_one();
93        }
94    }
95
96    pub fn increment_non_exposure_checks(&self, name: &str) {
97        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
98
99        match non_exposed_checks.get_mut(name) {
100            Some(count) => *count += 1,
101            None => {
102                non_exposed_checks.insert(name.into(), 1);
103            }
104        }
105    }
106
107    pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
108        self.try_flush_all_pending_events(FlushType::Manual).await
109    }
110
111    pub async fn shutdown(&self) -> Result<(), StatsigErr> {
112        let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
113        self.shutdown_notify.notify_one();
114        result
115    }
116
117    fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
118        let me = self.clone();
119
120        rt.spawn(BG_TASK_TAG, |rt_shutdown_notify| async move {
121            let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
122            let tick_interval = Duration::from_millis(tick_interval_ms);
123
124            loop {
125                let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
126
127                tokio::select! {
128                    () = tokio::time::sleep(tick_interval) => {
129                        me.try_scheduled_flush().await;
130                    }
131                    () = rt_shutdown_notify.notified() => {
132                        return; // Runtime Shutdown
133                    }
134                    _ = me.shutdown_notify.notified() => {
135                        return; // EvtLogger Shutdown
136                    }
137                    _ = me.limit_flush_notify.notified(), if can_limit_flush => {
138                        me.try_limit_flush().await;
139                    }
140                }
141
142                me.event_sampler.try_reset_all_sampling();
143            }
144        });
145    }
146
147    async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
148        self.prepare_event_queue_for_flush();
149
150        let batches = self.queue.take_all_batches();
151
152        let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
153            match self.log_batch(&mut batch, flush_type).await {
154                Ok(_) => Ok(()),
155                Err(e) => {
156                    if flush_type == FlushType::Manual {
157                        self.flush_interval.adjust_for_failure();
158                        self.try_requeue_failed_batch(&e, batch, flush_type);
159                        return Err(e);
160                    }
161
162                    self.drop_failed_events(&e, batch, flush_type);
163                    Err(e)
164                }
165            }
166        }))
167        .await;
168
169        results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
170    }
171
172    async fn try_scheduled_flush(&self) {
173        if !self.flush_interval.has_cooled_from_most_recent_failure() {
174            return;
175        }
176
177        let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
178        let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
179
180        if !should_flush_by_time && !should_flush_by_size {
181            return;
182        }
183
184        self.flush_interval.mark_scheduled_flush_attempt();
185
186        self.flush_next_batch(if should_flush_by_size {
187            FlushType::ScheduledFullBatch
188        } else {
189            FlushType::ScheduledMaxTime
190        })
191        .await;
192    }
193
194    async fn try_limit_flush(&self) {
195        log_d!(TAG, "Attempting limit flush");
196        self.flush_next_batch(FlushType::Limit).await;
197    }
198
199    async fn flush_next_batch(&self, flush_type: FlushType) {
200        self.prepare_event_queue_for_flush();
201
202        let mut batch = match self.queue.take_next_batch() {
203            Some(batch) => batch,
204            None => return,
205        };
206
207        let error = match self.log_batch(&mut batch, flush_type).await {
208            Err(e) => e,
209            Ok(()) => {
210                self.flush_interval.adjust_for_success();
211                return;
212            }
213        };
214
215        self.flush_interval.adjust_for_failure();
216        self.try_requeue_failed_batch(&error, batch, flush_type);
217    }
218
219    async fn log_batch(
220        &self,
221        batch: &mut EventBatch,
222        flush_type: FlushType,
223    ) -> Result<(), StatsigErr> {
224        let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
225            self.flush_interval.get_current_flush_interval_ms(),
226            self.queue.batch_size,
227            self.queue.max_pending_batches,
228            flush_type.as_string(),
229        );
230
231        let result = self
232            .logging_adapter
233            .log_events(batch.get_log_event_request(statsig_metadata))
234            .await;
235
236        batch.attempts += 1;
237
238        match result {
239            Ok(true) => {
240                self.ops_stats.log_event_request_success(batch.events.len());
241                Ok(())
242            }
243            Ok(false) => Err(StatsigErr::LockFailure("Unknown Failure".into())),
244            Err(e) => Err(e),
245        }
246    }
247
248    fn prepare_event_queue_for_flush(&self) {
249        self.try_add_non_exposed_checks_event();
250
251        let dropped_events_count = match self.queue.reconcile_batching() {
252            QueueResult::Success => return,
253            QueueResult::DroppedEvents(dropped_events_count) => dropped_events_count,
254        };
255
256        if dropped_events_count > 0 {
257            self.ops_stats.log_batching_dropped_events(
258                StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
259                dropped_events_count,
260                &self.flush_interval,
261                &self.queue,
262            );
263        }
264    }
265
266    fn try_requeue_failed_batch(
267        &self,
268        error: &StatsigErr,
269        batch: EventBatch,
270        flush_type: FlushType,
271    ) {
272        let is_non_retryable = matches!(
273            error,
274            StatsigErr::NetworkError(NetworkError::RequestNotRetryable, _)
275        );
276
277        let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
278
279        if is_non_retryable || is_max_retries {
280            self.drop_failed_events(error, batch, flush_type);
281            return;
282        }
283
284        let dropped_events_count = match self.queue.requeue_batch(batch) {
285            QueueResult::Success => return,
286            QueueResult::DroppedEvents(dropped_events_count) => dropped_events_count,
287        };
288
289        if dropped_events_count == 0 {
290            return;
291        }
292
293        log_w!(
294            TAG,
295            "Max pending event batches reached, dropping {} event(s). Configuration: batch_size: {}, max_pending_batches: {}",
296            dropped_events_count,
297            self.queue.batch_size,
298            self.queue.max_pending_batches
299        );
300
301        self.ops_stats.log_batching_dropped_events(
302            StatsigErr::LogEventError(
303                "Dropped events due to max pending event batches limit".to_string(),
304            ),
305            dropped_events_count,
306            &self.flush_interval,
307            &self.queue,
308        );
309    }
310
311    fn drop_failed_events(&self, error: &StatsigErr, batch: EventBatch, flush_type: FlushType) {
312        let dropped_events_count = batch.events.len() as u64;
313
314        let kind = match flush_type {
315            FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
316            FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
317            FlushType::Limit => "Limit",
318            FlushType::Manual => "Manual",
319            FlushType::Shutdown => "Shutdown",
320        };
321
322        log_w!(
323            TAG,
324            "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
325            kind,
326            batch.attempts,
327            dropped_events_count,
328            error
329        );
330
331        self.ops_stats
332            .log_event_request_failure(dropped_events_count);
333
334        self.ops_stats.log_batching_dropped_events(
335            StatsigErr::LogEventError("Dropped events due flush failure".to_string()),
336            dropped_events_count,
337            &self.flush_interval,
338            &self.queue,
339        );
340    }
341
342    fn try_add_non_exposed_checks_event(&self) {
343        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
344        if non_exposed_checks.is_empty() {
345            return;
346        }
347
348        let checks = std::mem::take(&mut *non_exposed_checks);
349        self.queue.add(QueuedEvent::Passthrough(
350            StatsigEventInternal::new_non_exposed_checks_event(checks),
351        ));
352    }
353}
354
355#[derive(Debug, PartialEq, Eq, Clone, Copy)]
356enum FlushType {
357    ScheduledMaxTime,
358    ScheduledFullBatch,
359    Limit,
360    Manual,
361    Shutdown,
362}
363
364impl FlushType {
365    fn as_string(&self) -> String {
366        match self {
367            FlushType::ScheduledMaxTime => "scheduled:max_time",
368            FlushType::ScheduledFullBatch => "scheduled:full_batch",
369            FlushType::Limit => "limit",
370            FlushType::Manual => "manual",
371            FlushType::Shutdown => "shutdown",
372        }
373        .to_string()
374    }
375}