statsig_rust/event_logging/
event_logger.rs

1use super::{
2    event_queue::{
3        batch::EventBatch,
4        queue::{EventQueue, QueueResult},
5        queued_event::{EnqueueOperation, QueuedEvent},
6    },
7    exposure_sampling::ExposureSampling,
8    flush_interval::FlushInterval,
9    flush_type::FlushType,
10    statsig_event_internal::StatsigEventInternal,
11};
12use crate::{
13    event_logging::event_logger_constants::EventLoggerConstants,
14    log_d, log_w,
15    networking::NetworkError,
16    observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
17    statsig_metadata::StatsigMetadata,
18    write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
19};
20use std::time::Duration;
21use std::{
22    collections::HashMap,
23    sync::{Arc, RwLock},
24};
25use tokio::sync::{Notify, Semaphore};
26
27const BG_TASK_TAG: &str = "EVENT_LOGGER_BG_TASK";
28const DEFAULT_BATCH_SIZE: u32 = 2000;
29const DEFAULT_PENDING_BATCH_MAX: u32 = 100;
30const MAX_LIMIT_FLUSH_TASKS: usize = 5;
31
32#[derive(Debug, PartialEq, Eq, Clone, Copy)]
33pub enum ExposureTrigger {
34    Auto,
35    Manual,
36}
37
38const TAG: &str = stringify!(EvtLogger);
39
40pub struct EventLogger {
41    queue: EventQueue,
42    options: Arc<StatsigOptions>,
43    logging_adapter: Arc<dyn EventLoggingAdapter>,
44    event_sampler: ExposureSampling,
45    non_exposed_checks: RwLock<HashMap<String, u64>>,
46    limit_flush_notify: Notify,
47    limit_flush_semaphore: Arc<Semaphore>,
48    flush_interval: FlushInterval,
49    shutdown_notify: Notify,
50    ops_stats: Arc<OpsStatsForInstance>,
51}
52
53impl EventLogger {
54    pub fn new(
55        sdk_key: &str,
56        options: &Arc<StatsigOptions>,
57        event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
58        statsig_rt: &Arc<StatsigRuntime>,
59    ) -> Arc<Self> {
60        let me = Arc::new(Self {
61            queue: EventQueue::new(
62                options
63                    .event_logging_max_queue_size
64                    .unwrap_or(DEFAULT_BATCH_SIZE),
65                options
66                    .event_logging_max_pending_batch_queue_size
67                    .unwrap_or(DEFAULT_PENDING_BATCH_MAX),
68            ),
69            event_sampler: ExposureSampling::new(sdk_key),
70            flush_interval: FlushInterval::new(),
71            options: options.clone(),
72            logging_adapter: event_logging_adapter.clone(),
73            non_exposed_checks: RwLock::new(HashMap::new()),
74            shutdown_notify: Notify::new(),
75            limit_flush_notify: Notify::new(),
76            limit_flush_semaphore: Arc::new(Semaphore::new(MAX_LIMIT_FLUSH_TASKS)),
77            ops_stats: OPS_STATS.get_for_instance(sdk_key),
78        });
79
80        me.spawn_background_task(statsig_rt);
81        me
82    }
83
84    pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
85        if self.options.disable_all_logging == Some(true) {
86            return;
87        }
88
89        let decision = self.event_sampler.get_sampling_decision(&operation);
90        if !decision.should_log() {
91            return;
92        }
93
94        let pending_event = operation.into_queued_event(decision);
95        if self.queue.add(pending_event) {
96            self.limit_flush_notify.notify_one();
97        }
98    }
99
100    pub fn increment_non_exposure_checks(&self, name: &str) {
101        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
102
103        match non_exposed_checks.get_mut(name) {
104            Some(count) => *count += 1,
105            None => {
106                non_exposed_checks.insert(name.into(), 1);
107            }
108        }
109    }
110
111    pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
112        self.try_flush_all_pending_events(FlushType::Manual).await
113    }
114
115    pub async fn shutdown(&self) -> Result<(), StatsigErr> {
116        let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
117        self.shutdown_notify.notify_one();
118        result
119    }
120
121    fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
122        let me = self.clone();
123        let rt_clone = rt.clone();
124
125        rt.spawn(BG_TASK_TAG, |rt_shutdown_notify| async move {
126            let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
127            let tick_interval = Duration::from_millis(tick_interval_ms);
128
129            loop {
130                let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
131
132                tokio::select! {
133                    () = tokio::time::sleep(tick_interval) => {
134                        me.try_scheduled_flush().await;
135                    }
136                    () = rt_shutdown_notify.notified() => {
137                        return; // Runtime Shutdown
138                    }
139                    _ = me.shutdown_notify.notified() => {
140                        return; // EvtLogger Shutdown
141                    }
142                    _ = me.limit_flush_notify.notified(), if can_limit_flush => {
143                        Self::spawn_new_limit_flush_task(&me, &rt_clone);
144                    }
145                }
146
147                me.event_sampler.try_reset_all_sampling();
148            }
149        });
150    }
151
152    fn spawn_new_limit_flush_task(inst: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
153        let permit = match inst.limit_flush_semaphore.clone().try_acquire_owned() {
154            Ok(permit) => permit,
155            Err(_) => return,
156        };
157
158        let me = inst.clone();
159        rt.spawn(BG_TASK_TAG, |_| async move {
160            log_d!(TAG, "Attempting limit flush");
161            if !me.flush_next_batch(FlushType::Limit).await {
162                return;
163            }
164
165            loop {
166                if !me.flush_interval.has_completely_recovered_from_backoff() {
167                    break;
168                }
169
170                if !me.queue.contains_at_least_one_full_batch() {
171                    break;
172                }
173
174                if !me.flush_next_batch(FlushType::Limit).await {
175                    break;
176                }
177            }
178
179            drop(permit);
180        });
181    }
182
183    async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
184        self.prepare_event_queue_for_flush(flush_type);
185
186        let batches = self.queue.take_all_batches();
187
188        let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
189            match self.log_batch(&mut batch, flush_type).await {
190                Ok(_) => Ok(()),
191                Err(e) => {
192                    if flush_type == FlushType::Manual {
193                        self.flush_interval.adjust_for_failure();
194                        self.try_requeue_failed_batch(&e, batch, flush_type);
195                        return Err(e);
196                    }
197
198                    self.drop_events_for_failure(&e, batch, flush_type);
199                    Err(e)
200                }
201            }
202        }))
203        .await;
204
205        results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
206    }
207
208    async fn try_scheduled_flush(&self) {
209        if !self.flush_interval.has_cooled_from_most_recent_failure() {
210            return;
211        }
212
213        let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
214        let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
215
216        if !should_flush_by_time && !should_flush_by_size {
217            return;
218        }
219
220        self.flush_interval.mark_scheduled_flush_attempt();
221
222        self.flush_next_batch(if should_flush_by_size {
223            FlushType::ScheduledFullBatch
224        } else {
225            FlushType::ScheduledMaxTime
226        })
227        .await;
228    }
229
230    async fn flush_next_batch(&self, flush_type: FlushType) -> bool {
231        self.prepare_event_queue_for_flush(flush_type);
232
233        let mut batch = match self.queue.take_next_batch() {
234            Some(batch) => batch,
235            None => return false,
236        };
237
238        let error = match self.log_batch(&mut batch, flush_type).await {
239            Err(e) => e,
240            Ok(()) => {
241                self.flush_interval.adjust_for_success();
242                return true;
243            }
244        };
245
246        self.flush_interval.adjust_for_failure();
247        self.try_requeue_failed_batch(&error, batch, flush_type);
248
249        false
250    }
251
252    async fn log_batch(
253        &self,
254        batch: &mut EventBatch,
255        flush_type: FlushType,
256    ) -> Result<(), StatsigErr> {
257        let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
258            self.flush_interval.get_current_flush_interval_ms(),
259            self.queue.batch_size,
260            self.queue.max_pending_batches,
261            flush_type.to_string(),
262        );
263
264        let result = self
265            .logging_adapter
266            .log_events(batch.get_log_event_request(statsig_metadata))
267            .await;
268
269        batch.attempts += 1;
270
271        match result {
272            Ok(true) => {
273                self.ops_stats.log_event_request_success(batch.events.len());
274                Ok(())
275            }
276            Ok(false) => Err(StatsigErr::LogEventError("Unknown Failure".into())),
277            Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _)) => Ok(()),
278            Err(e) => Err(e),
279        }
280    }
281
282    fn prepare_event_queue_for_flush(&self, flush_type: FlushType) {
283        self.try_add_non_exposed_checks_event();
284
285        let dropped_events_count = match self.queue.reconcile_batching() {
286            QueueResult::Success => return,
287            QueueResult::DroppedEvents(dropped_events_count) => dropped_events_count,
288        };
289
290        if dropped_events_count > 0 {
291            self.log_dropped_event_warning(dropped_events_count);
292
293            self.ops_stats.log_batching_dropped_events(
294                StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
295                dropped_events_count,
296                &self.flush_interval,
297                &self.queue,
298                flush_type,
299            );
300        }
301    }
302
303    fn try_requeue_failed_batch(
304        &self,
305        error: &StatsigErr,
306        batch: EventBatch,
307        flush_type: FlushType,
308    ) {
309        let is_non_retryable = matches!(
310            error,
311            StatsigErr::NetworkError(NetworkError::RequestNotRetryable, _)
312        );
313
314        let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
315
316        if is_non_retryable || is_max_retries {
317            self.drop_events_for_failure(error, batch, flush_type);
318            return;
319        }
320
321        let dropped_events_count = match self.queue.requeue_batch(batch) {
322            QueueResult::Success => return,
323            QueueResult::DroppedEvents(dropped_events_count) => dropped_events_count,
324        };
325
326        if dropped_events_count == 0 {
327            return;
328        }
329
330        self.log_dropped_event_warning(dropped_events_count);
331
332        self.ops_stats.log_batching_dropped_events(
333            StatsigErr::LogEventError(
334                "Dropped events due to max pending event batches limit".to_string(),
335            ),
336            dropped_events_count,
337            &self.flush_interval,
338            &self.queue,
339            flush_type,
340        );
341    }
342
343    fn drop_events_for_failure(
344        &self,
345        error: &StatsigErr,
346        batch: EventBatch,
347        flush_type: FlushType,
348    ) {
349        let dropped_events_count = batch.events.len() as u64;
350
351        let kind = match flush_type {
352            FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
353            FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
354            FlushType::Limit => "Limit",
355            FlushType::Manual => "Manual",
356            FlushType::Shutdown => "Shutdown",
357        };
358
359        log_w!(
360            TAG,
361            "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
362            kind,
363            batch.attempts,
364            dropped_events_count,
365            error
366        );
367
368        self.ops_stats
369            .log_event_request_failure(dropped_events_count, flush_type);
370    }
371
372    fn try_add_non_exposed_checks_event(&self) {
373        let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
374        if non_exposed_checks.is_empty() {
375            return;
376        }
377
378        let checks = std::mem::take(&mut *non_exposed_checks);
379        self.queue.add(QueuedEvent::Passthrough(
380            StatsigEventInternal::new_non_exposed_checks_event(checks),
381        ));
382    }
383
384    fn log_dropped_event_warning(&self, dropped_events_count: u64) {
385        let approximate_pending_events_count = self.queue.approximate_pending_events_count();
386        log_w!(
387            TAG,
388            "Too many events. Dropped {}. Approx pending events {}. Max pending batches {}. Max queue size {}",
389            dropped_events_count,
390            approximate_pending_events_count,
391            self.queue.max_pending_batches,
392            self.queue.batch_size
393        );
394    }
395}