1use super::{
2 event_queue::{
3 batch::EventBatch,
4 queue::{EventQueue, QueueReconcileResult},
5 queued_event::{EnqueueOperation, QueuedEvent},
6 queued_secondary_expo::EnqueueSecondaryExposureAsPrimaryOp,
7 },
8 exposure_sampling::ExposureSampling,
9 flush_interval::FlushInterval,
10 flush_type::FlushType,
11 sec_expo_as_primary_experiment::SecExpoAsPrimaryExperiment,
12 statsig_event_internal::StatsigEventInternal,
13};
14use crate::{
15 event_logging::{
16 event_logger_constants::EventLoggerConstants, event_queue::queue::QueueAddResult,
17 },
18 log_d, log_e, log_w,
19 networking::NetworkError,
20 observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
21 statsig_metadata::StatsigMetadata,
22 write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
23};
24use parking_lot::RwLock;
25use std::{collections::HashMap, sync::Arc};
26use std::{
27 sync::atomic::{AtomicU64, Ordering},
28 time::Duration,
29};
30use tokio::sync::{Notify, Semaphore};
31
32pub const MIN_BATCH_SIZE: u32 = 10;
33pub const MAX_BATCH_SIZE: u32 = 2000;
34
35pub const MIN_PENDING_BATCH_COUNT: u32 = 1;
36pub const DEFAULT_PENDING_BATCH_COUNT_MAX: u32 = 100;
37
38const BG_LOOP_TAG: &str = "EVT_LOG_BG_LOOP";
39const LIMIT_FLUSH_TAG: &str = "EVT_LOG_LIMIT_FLUSH";
40const DEFAULT_BATCH_SIZE: u32 = MAX_BATCH_SIZE;
41const MAX_LIMIT_FLUSH_TASKS: usize = 5;
42
43#[derive(Debug, PartialEq, Eq, Clone, Copy)]
44pub enum ExposureTrigger {
45 Auto,
46 Manual,
47}
48
49const TAG: &str = stringify!(EvtLogger);
50
51pub struct EventLogger {
52 queue: EventQueue,
53 options: Arc<StatsigOptions>,
54 logging_adapter: Arc<dyn EventLoggingAdapter>,
55 event_sampler: ExposureSampling,
56 non_exposed_checks: RwLock<HashMap<String, u64>>,
57 limit_flush_notify: Notify,
58 limit_flush_semaphore: Arc<Semaphore>,
59 flush_interval: FlushInterval,
60 shutdown_notify: Notify,
61 ops_stats: Arc<OpsStatsForInstance>,
62 sec_expo_experiment: SecExpoAsPrimaryExperiment,
63 enqueue_dropped_events_count: AtomicU64,
64}
65
66impl EventLogger {
67 pub fn new(
68 sdk_key: &str,
69 options: &Arc<StatsigOptions>,
70 event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
71 statsig_rt: &Arc<StatsigRuntime>,
72 ) -> Arc<Self> {
73 let me = Arc::new(Self {
74 queue: EventQueue::new(
75 options
76 .event_logging_max_queue_size
77 .unwrap_or(DEFAULT_BATCH_SIZE),
78 options
79 .event_logging_max_pending_batch_queue_size
80 .unwrap_or(DEFAULT_PENDING_BATCH_COUNT_MAX),
81 ),
82 event_sampler: ExposureSampling::new(sdk_key),
83 flush_interval: FlushInterval::new(),
84 options: options.clone(),
85 logging_adapter: event_logging_adapter.clone(),
86 non_exposed_checks: RwLock::new(HashMap::new()),
87 shutdown_notify: Notify::new(),
88 limit_flush_notify: Notify::new(),
89 limit_flush_semaphore: Arc::new(Semaphore::new(MAX_LIMIT_FLUSH_TASKS)),
90 ops_stats: OPS_STATS.get_for_instance(sdk_key),
91 sec_expo_experiment: SecExpoAsPrimaryExperiment::new(sdk_key, options),
92 enqueue_dropped_events_count: AtomicU64::new(0),
93 });
94
95 me.spawn_background_task(statsig_rt);
96 me
97 }
98
99 pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
100 if self.options.disable_all_logging == Some(true) {
101 return;
102 }
103
104 let sec_expo_as_primary_active = self.sec_expo_experiment.should_log_as_primary();
105 let decision = self
106 .event_sampler
107 .get_sampling_decision(&operation, sec_expo_as_primary_active);
108
109 if sec_expo_as_primary_active {
110 let should_log_parent = decision.should_log();
111 let pending_event = operation.into_queued_event(decision);
112 self.enqueue_event_with_secondary_exposures_as_primary(
113 pending_event,
114 should_log_parent,
115 );
116 return;
117 }
118
119 if !decision.should_log() {
120 return;
121 }
122
123 let pending_event = operation.into_queued_event(decision);
124 self.add_pending_event(pending_event);
125 }
126
127 fn enqueue_event_with_secondary_exposures_as_primary(
128 &self,
129 mut pending_event: QueuedEvent,
130 should_log_parent: bool,
131 ) {
132 let Some(exposure_time) = pending_event.exposure_time() else {
133 if should_log_parent {
134 self.add_pending_event(pending_event);
135 }
136 return;
137 };
138 let secondary_exposures = pending_event.take_secondary_exposures_for_primary_logging();
139 if secondary_exposures.is_empty() {
140 if should_log_parent {
141 self.add_pending_event(pending_event);
142 }
143 return;
144 }
145
146 let Some(user) = pending_event.user_for_secondary_exposures() else {
147 if should_log_parent {
148 self.add_pending_event(pending_event);
149 }
150 return;
151 };
152
153 if should_log_parent {
154 self.add_pending_event(pending_event);
155 }
156
157 for secondary_exposure in secondary_exposures {
158 let operation = EnqueueSecondaryExposureAsPrimaryOp {
159 user: user.clone(),
160 secondary_exposure,
161 exposure_time,
162 };
163
164 let decision = self.event_sampler.get_sampling_decision(&operation, false);
165 if decision.should_log() {
166 self.add_pending_event(operation.into_queued_event(decision));
167 }
168 }
169 }
170
171 fn add_pending_event(&self, pending_event: QueuedEvent) {
172 match self.queue.add(pending_event) {
173 QueueAddResult::Noop => (),
174 QueueAddResult::NeedsFlush => self.limit_flush_notify.notify_one(),
175 QueueAddResult::NeedsFlushAndDropped(dropped_events_count) => {
176 self.enqueue_dropped_events_count
177 .fetch_add(dropped_events_count, Ordering::Relaxed);
178 self.limit_flush_notify.notify_one();
179 }
180 }
181 }
182
183 pub fn increment_non_exposure_checks(&self, name: &str) {
184 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
185
186 match non_exposed_checks.get_mut(name) {
187 Some(count) => *count += 1,
188 None => {
189 non_exposed_checks.insert(name.into(), 1);
190 }
191 }
192 }
193
194 pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
195 self.try_flush_all_pending_events(FlushType::Manual).await
196 }
197
198 pub async fn shutdown(&self) -> Result<(), StatsigErr> {
199 let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
200 self.shutdown_notify.notify_one();
201 result
202 }
203
204 pub fn force_shutdown(&self) {
205 self.shutdown_notify.notify_one();
206 }
207
208 fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
209 let me = self.clone();
210 let rt_clone = rt.clone();
211
212 let spawn_result = rt.spawn(BG_LOOP_TAG, |rt_shutdown_notify| async move {
213 let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
214 let tick_interval = Duration::from_millis(tick_interval_ms);
215
216 loop {
217 let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
218
219 tokio::select! {
220 () = tokio::time::sleep(tick_interval) => {
221 me.try_scheduled_flush().await;
222 }
223 () = rt_shutdown_notify.notified() => {
224 return; }
226 _ = me.shutdown_notify.notified() => {
227 return; }
229 _ = me.limit_flush_notify.notified(), if can_limit_flush => {
230 Self::spawn_new_limit_flush_task(&me, &rt_clone);
231 }
232 }
233
234 me.event_sampler.try_reset_all_sampling();
235 }
236 });
237
238 if let Err(e) = spawn_result {
239 log_e!(TAG, "Failed to spawn background task: {e}");
240 }
241 }
242
243 fn spawn_new_limit_flush_task(inst: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
244 let permit = match inst.limit_flush_semaphore.clone().try_acquire_owned() {
245 Ok(permit) => permit,
246 Err(_) => return,
247 };
248
249 let me = inst.clone();
250 let spawn_result = rt.spawn(LIMIT_FLUSH_TAG, |_| async move {
251 log_d!(TAG, "Attempting limit flush");
252 if !me.flush_next_batch(FlushType::Limit).await {
253 return;
254 }
255
256 loop {
257 if !me.flush_interval.has_completely_recovered_from_backoff() {
258 break;
259 }
260
261 if !me.queue.contains_at_least_one_full_batch() {
262 break;
263 }
264
265 if !me.flush_next_batch(FlushType::Limit).await {
266 break;
267 }
268 }
269
270 drop(permit);
271 });
272
273 if let Err(e) = spawn_result {
274 log_e!(TAG, "Failed to spawn limit flush task: {e}");
275 }
276 }
277
278 async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
279 self.prepare_event_queue_for_flush(flush_type);
280
281 let batches = self.queue.take_all_batches();
282
283 let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
284 match self.log_batch(&mut batch, flush_type).await {
285 Ok(_) => Ok(()),
286 Err(e) => {
287 if flush_type == FlushType::Manual {
288 self.flush_interval.adjust_for_failure();
289 self.try_requeue_failed_batch(&e, batch, flush_type);
290 return Err(e);
291 }
292
293 self.drop_events_for_failure(&e, batch, flush_type);
294 Err(e)
295 }
296 }
297 }))
298 .await;
299
300 results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
301 }
302
303 async fn try_scheduled_flush(&self) {
304 if !self.flush_interval.has_cooled_from_most_recent_failure() {
305 return;
306 }
307
308 let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
309 let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
310
311 if !should_flush_by_time && !should_flush_by_size {
312 return;
313 }
314
315 self.flush_interval.mark_scheduled_flush_attempt();
316
317 self.flush_next_batch(if should_flush_by_size {
318 FlushType::ScheduledFullBatch
319 } else {
320 FlushType::ScheduledMaxTime
321 })
322 .await;
323 }
324
325 async fn flush_next_batch(&self, flush_type: FlushType) -> bool {
326 self.prepare_event_queue_for_flush(flush_type);
327
328 let mut batch = match self.queue.take_next_batch() {
329 Some(batch) => batch,
330 None => return false,
331 };
332
333 let error = match self.log_batch(&mut batch, flush_type).await {
334 Err(e) => e,
335 Ok(()) => {
336 self.flush_interval.adjust_for_success();
337 return true;
338 }
339 };
340
341 self.flush_interval.adjust_for_failure();
342 self.try_requeue_failed_batch(&error, batch, flush_type);
343
344 false
345 }
346
347 async fn log_batch(
348 &self,
349 batch: &mut EventBatch,
350 flush_type: FlushType,
351 ) -> Result<(), StatsigErr> {
352 let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
353 self.flush_interval.get_current_flush_interval_ms(),
354 self.queue.batch_size,
355 self.queue.max_pending_batches,
356 flush_type.to_string(),
357 );
358
359 let result = self
360 .logging_adapter
361 .log_events(batch.get_log_event_request(statsig_metadata))
362 .await;
363
364 batch.attempts += 1;
365
366 match result {
367 Ok(true) => {
368 self.ops_stats.log_event_request_success(batch.events.len());
369 Ok(())
370 }
371 Ok(false) => Err(StatsigErr::LogEventError("Unknown Failure".into())),
372 Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_))) => Ok(()),
373 Err(e) => Err(e),
374 }
375 }
376
377 fn prepare_event_queue_for_flush(&self, flush_type: FlushType) {
378 self.try_add_non_exposed_checks_event();
379 self.try_log_enqueue_dropped_events();
380
381 let dropped_events_count = match self.queue.reconcile_batching() {
382 QueueReconcileResult::Success => return,
383 QueueReconcileResult::LockFailure => {
384 log_e!(TAG, "prepare_event_queue_for_flush lock failure");
385 return;
386 }
387 QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
388 };
389
390 if dropped_events_count > 0 {
391 self.log_dropped_event_warning(dropped_events_count);
392
393 self.ops_stats.log_batching_dropped_events(
394 StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
395 dropped_events_count,
396 &self.flush_interval,
397 &self.queue,
398 flush_type,
399 );
400 }
401 }
402
403 fn try_requeue_failed_batch(
404 &self,
405 error: &StatsigErr,
406 batch: EventBatch,
407 flush_type: FlushType,
408 ) {
409 let is_non_retryable = matches!(
410 error,
411 StatsigErr::NetworkError(NetworkError::RequestNotRetryable(_, _, _))
412 );
413
414 let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
415
416 if is_non_retryable || is_max_retries {
417 self.drop_events_for_failure(error, batch, flush_type);
418 return;
419 }
420
421 let dropped_events_count = match self.queue.requeue_batch(batch) {
422 QueueReconcileResult::Success => return,
423 QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
424 QueueReconcileResult::LockFailure => {
425 log_e!(TAG, "try_requeue_failed_batch lock failure");
426 return;
427 }
428 };
429
430 if dropped_events_count == 0 {
431 return;
432 }
433
434 self.log_dropped_event_warning(dropped_events_count);
435
436 self.ops_stats.log_batching_dropped_events(
437 StatsigErr::LogEventError(
438 "Dropped events due to max pending event batches limit".to_string(),
439 ),
440 dropped_events_count,
441 &self.flush_interval,
442 &self.queue,
443 flush_type,
444 );
445 }
446
447 fn drop_events_for_failure(
448 &self,
449 error: &StatsigErr,
450 batch: EventBatch,
451 flush_type: FlushType,
452 ) {
453 let dropped_events_count = batch.events.len() as u64;
454
455 let kind = match flush_type {
456 FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
457 FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
458 FlushType::Limit => "Limit",
459 FlushType::Manual => "Manual",
460 FlushType::Shutdown => "Shutdown",
461 };
462
463 log_w!(
464 TAG,
465 "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
466 kind,
467 batch.attempts,
468 dropped_events_count,
469 error
470 );
471
472 self.ops_stats
473 .log_event_request_failure(dropped_events_count, flush_type);
474 }
475
476 fn try_add_non_exposed_checks_event(&self) {
477 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
478 if non_exposed_checks.is_empty() {
479 return;
480 }
481
482 let checks = std::mem::take(&mut *non_exposed_checks);
483 let result = self.queue.add(QueuedEvent::Passthrough(
484 StatsigEventInternal::new_non_exposed_checks_event(checks),
485 ));
486
487 if let QueueAddResult::NeedsFlushAndDropped(dropped_events_count) = result {
488 self.enqueue_dropped_events_count
489 .fetch_add(dropped_events_count, Ordering::Relaxed);
490 }
491 }
492
493 fn try_log_enqueue_dropped_events(&self) {
494 let dropped_events_count = self.enqueue_dropped_events_count.swap(0, Ordering::Relaxed);
495 if dropped_events_count == 0 {
496 return;
497 }
498
499 self.log_dropped_event_warning(dropped_events_count);
500
501 self.ops_stats.log_batching_dropped_events(
502 StatsigErr::LogEventError(
503 "Dropped events due to max pending event batches limit".to_string(),
504 ),
505 dropped_events_count,
506 &self.flush_interval,
507 &self.queue,
508 FlushType::Limit,
509 );
510 }
511
512 fn log_dropped_event_warning(&self, dropped_events_count: u64) {
513 let approximate_pending_events_count = self.queue.approximate_pending_events_count();
514 log_w!(
515 TAG,
516 "Too many events. Dropped {}. Approx pending events {}. Max pending batches {}. Max queue size {}",
517 dropped_events_count,
518 approximate_pending_events_count,
519 self.queue.max_pending_batches,
520 self.queue.batch_size
521 );
522 }
523}