1use super::{
2 event_queue::{
3 batch::EventBatch,
4 queue::{EventQueue, QueueReconcileResult},
5 queued_event::{EnqueueOperation, QueuedEvent},
6 },
7 exposure_sampling::ExposureSampling,
8 flush_interval::FlushInterval,
9 flush_type::FlushType,
10 statsig_event_internal::StatsigEventInternal,
11};
12use crate::{
13 event_logging::{
14 event_logger_constants::EventLoggerConstants, event_queue::queue::QueueAddResult,
15 },
16 log_d, log_e, log_w,
17 networking::NetworkError,
18 observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
19 statsig_metadata::StatsigMetadata,
20 write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
21};
22use parking_lot::RwLock;
23use std::{collections::HashMap, sync::Arc};
24use std::{
25 sync::atomic::{AtomicU64, Ordering},
26 time::Duration,
27};
28use tokio::sync::{Notify, Semaphore};
29
30const BG_LOOP_TAG: &str = "EVT_LOG_BG_LOOP";
31const LIMIT_FLUSH_TAG: &str = "EVT_LOG_LIMIT_FLUSH";
32const DEFAULT_BATCH_SIZE: u32 = 2000;
33const DEFAULT_PENDING_BATCH_MAX: u32 = 100;
34const MAX_LIMIT_FLUSH_TASKS: usize = 5;
35
36#[derive(Debug, PartialEq, Eq, Clone, Copy)]
37pub enum ExposureTrigger {
38 Auto,
39 Manual,
40}
41
42const TAG: &str = stringify!(EvtLogger);
43
44pub struct EventLogger {
45 queue: EventQueue,
46 options: Arc<StatsigOptions>,
47 logging_adapter: Arc<dyn EventLoggingAdapter>,
48 event_sampler: ExposureSampling,
49 non_exposed_checks: RwLock<HashMap<String, u64>>,
50 limit_flush_notify: Notify,
51 limit_flush_semaphore: Arc<Semaphore>,
52 flush_interval: FlushInterval,
53 shutdown_notify: Notify,
54 ops_stats: Arc<OpsStatsForInstance>,
55 enqueue_dropped_events_count: AtomicU64,
56}
57
58impl EventLogger {
59 pub fn new(
60 sdk_key: &str,
61 options: &Arc<StatsigOptions>,
62 event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
63 statsig_rt: &Arc<StatsigRuntime>,
64 ) -> Arc<Self> {
65 let me = Arc::new(Self {
66 queue: EventQueue::new(
67 options
68 .event_logging_max_queue_size
69 .unwrap_or(DEFAULT_BATCH_SIZE),
70 options
71 .event_logging_max_pending_batch_queue_size
72 .unwrap_or(DEFAULT_PENDING_BATCH_MAX),
73 ),
74 event_sampler: ExposureSampling::new(sdk_key),
75 flush_interval: FlushInterval::new(),
76 options: options.clone(),
77 logging_adapter: event_logging_adapter.clone(),
78 non_exposed_checks: RwLock::new(HashMap::new()),
79 shutdown_notify: Notify::new(),
80 limit_flush_notify: Notify::new(),
81 limit_flush_semaphore: Arc::new(Semaphore::new(MAX_LIMIT_FLUSH_TASKS)),
82 ops_stats: OPS_STATS.get_for_instance(sdk_key),
83 enqueue_dropped_events_count: AtomicU64::new(0),
84 });
85
86 me.spawn_background_task(statsig_rt);
87 me
88 }
89
90 pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
91 if self.options.disable_all_logging == Some(true) {
92 return;
93 }
94
95 let decision = self.event_sampler.get_sampling_decision(&operation);
96 if !decision.should_log() {
97 return;
98 }
99
100 let pending_event = operation.into_queued_event(decision);
101 match self.queue.add(pending_event) {
102 QueueAddResult::Noop => (),
103 QueueAddResult::NeedsFlush => self.limit_flush_notify.notify_one(),
104 QueueAddResult::NeedsFlushAndDropped(dropped_events_count) => {
105 self.enqueue_dropped_events_count
106 .fetch_add(dropped_events_count, Ordering::Relaxed);
107 self.limit_flush_notify.notify_one();
108 }
109 }
110 }
111
112 pub fn increment_non_exposure_checks(&self, name: &str) {
113 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
114
115 match non_exposed_checks.get_mut(name) {
116 Some(count) => *count += 1,
117 None => {
118 non_exposed_checks.insert(name.into(), 1);
119 }
120 }
121 }
122
123 pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
124 self.try_flush_all_pending_events(FlushType::Manual).await
125 }
126
127 pub async fn shutdown(&self) -> Result<(), StatsigErr> {
128 let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
129 self.shutdown_notify.notify_one();
130 result
131 }
132
133 pub fn force_shutdown(&self) {
134 self.shutdown_notify.notify_one();
135 }
136
137 fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
138 let me = self.clone();
139 let rt_clone = rt.clone();
140
141 let spawn_result = rt.spawn(BG_LOOP_TAG, |rt_shutdown_notify| async move {
142 let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
143 let tick_interval = Duration::from_millis(tick_interval_ms);
144
145 loop {
146 let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
147
148 tokio::select! {
149 () = tokio::time::sleep(tick_interval) => {
150 me.try_scheduled_flush().await;
151 }
152 () = rt_shutdown_notify.notified() => {
153 return; }
155 _ = me.shutdown_notify.notified() => {
156 return; }
158 _ = me.limit_flush_notify.notified(), if can_limit_flush => {
159 Self::spawn_new_limit_flush_task(&me, &rt_clone);
160 }
161 }
162
163 me.event_sampler.try_reset_all_sampling();
164 }
165 });
166
167 if let Err(e) = spawn_result {
168 log_e!(TAG, "Failed to spawn background task: {e}");
169 }
170 }
171
172 fn spawn_new_limit_flush_task(inst: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
173 let permit = match inst.limit_flush_semaphore.clone().try_acquire_owned() {
174 Ok(permit) => permit,
175 Err(_) => return,
176 };
177
178 let me = inst.clone();
179 let spawn_result = rt.spawn(LIMIT_FLUSH_TAG, |_| async move {
180 log_d!(TAG, "Attempting limit flush");
181 if !me.flush_next_batch(FlushType::Limit).await {
182 return;
183 }
184
185 loop {
186 if !me.flush_interval.has_completely_recovered_from_backoff() {
187 break;
188 }
189
190 if !me.queue.contains_at_least_one_full_batch() {
191 break;
192 }
193
194 if !me.flush_next_batch(FlushType::Limit).await {
195 break;
196 }
197 }
198
199 drop(permit);
200 });
201
202 if let Err(e) = spawn_result {
203 log_e!(TAG, "Failed to spawn limit flush task: {e}");
204 }
205 }
206
207 async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
208 self.prepare_event_queue_for_flush(flush_type);
209
210 let batches = self.queue.take_all_batches();
211
212 let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
213 match self.log_batch(&mut batch, flush_type).await {
214 Ok(_) => Ok(()),
215 Err(e) => {
216 if flush_type == FlushType::Manual {
217 self.flush_interval.adjust_for_failure();
218 self.try_requeue_failed_batch(&e, batch, flush_type);
219 return Err(e);
220 }
221
222 self.drop_events_for_failure(&e, batch, flush_type);
223 Err(e)
224 }
225 }
226 }))
227 .await;
228
229 results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
230 }
231
232 async fn try_scheduled_flush(&self) {
233 if !self.flush_interval.has_cooled_from_most_recent_failure() {
234 return;
235 }
236
237 let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
238 let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
239
240 if !should_flush_by_time && !should_flush_by_size {
241 return;
242 }
243
244 self.flush_interval.mark_scheduled_flush_attempt();
245
246 self.flush_next_batch(if should_flush_by_size {
247 FlushType::ScheduledFullBatch
248 } else {
249 FlushType::ScheduledMaxTime
250 })
251 .await;
252 }
253
254 async fn flush_next_batch(&self, flush_type: FlushType) -> bool {
255 self.prepare_event_queue_for_flush(flush_type);
256
257 let mut batch = match self.queue.take_next_batch() {
258 Some(batch) => batch,
259 None => return false,
260 };
261
262 let error = match self.log_batch(&mut batch, flush_type).await {
263 Err(e) => e,
264 Ok(()) => {
265 self.flush_interval.adjust_for_success();
266 return true;
267 }
268 };
269
270 self.flush_interval.adjust_for_failure();
271 self.try_requeue_failed_batch(&error, batch, flush_type);
272
273 false
274 }
275
276 async fn log_batch(
277 &self,
278 batch: &mut EventBatch,
279 flush_type: FlushType,
280 ) -> Result<(), StatsigErr> {
281 let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
282 self.flush_interval.get_current_flush_interval_ms(),
283 self.queue.batch_size,
284 self.queue.max_pending_batches,
285 flush_type.to_string(),
286 );
287
288 let result = self
289 .logging_adapter
290 .log_events(batch.get_log_event_request(statsig_metadata))
291 .await;
292
293 batch.attempts += 1;
294
295 match result {
296 Ok(true) => {
297 self.ops_stats.log_event_request_success(batch.events.len());
298 Ok(())
299 }
300 Ok(false) => Err(StatsigErr::LogEventError("Unknown Failure".into())),
301 Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_))) => Ok(()),
302 Err(e) => Err(e),
303 }
304 }
305
306 fn prepare_event_queue_for_flush(&self, flush_type: FlushType) {
307 self.try_add_non_exposed_checks_event();
308 self.try_log_enqueue_dropped_events();
309
310 let dropped_events_count = match self.queue.reconcile_batching() {
311 QueueReconcileResult::Success => return,
312 QueueReconcileResult::LockFailure => {
313 log_e!(TAG, "prepare_event_queue_for_flush lock failure");
314 return;
315 }
316 QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
317 };
318
319 if dropped_events_count > 0 {
320 self.log_dropped_event_warning(dropped_events_count);
321
322 self.ops_stats.log_batching_dropped_events(
323 StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
324 dropped_events_count,
325 &self.flush_interval,
326 &self.queue,
327 flush_type,
328 );
329 }
330 }
331
332 fn try_requeue_failed_batch(
333 &self,
334 error: &StatsigErr,
335 batch: EventBatch,
336 flush_type: FlushType,
337 ) {
338 let is_non_retryable = matches!(
339 error,
340 StatsigErr::NetworkError(NetworkError::RequestNotRetryable(_, _, _))
341 );
342
343 let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
344
345 if is_non_retryable || is_max_retries {
346 self.drop_events_for_failure(error, batch, flush_type);
347 return;
348 }
349
350 let dropped_events_count = match self.queue.requeue_batch(batch) {
351 QueueReconcileResult::Success => return,
352 QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
353 QueueReconcileResult::LockFailure => {
354 log_e!(TAG, "try_requeue_failed_batch lock failure");
355 return;
356 }
357 };
358
359 if dropped_events_count == 0 {
360 return;
361 }
362
363 self.log_dropped_event_warning(dropped_events_count);
364
365 self.ops_stats.log_batching_dropped_events(
366 StatsigErr::LogEventError(
367 "Dropped events due to max pending event batches limit".to_string(),
368 ),
369 dropped_events_count,
370 &self.flush_interval,
371 &self.queue,
372 flush_type,
373 );
374 }
375
376 fn drop_events_for_failure(
377 &self,
378 error: &StatsigErr,
379 batch: EventBatch,
380 flush_type: FlushType,
381 ) {
382 let dropped_events_count = batch.events.len() as u64;
383
384 let kind = match flush_type {
385 FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
386 FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
387 FlushType::Limit => "Limit",
388 FlushType::Manual => "Manual",
389 FlushType::Shutdown => "Shutdown",
390 };
391
392 log_w!(
393 TAG,
394 "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
395 kind,
396 batch.attempts,
397 dropped_events_count,
398 error
399 );
400
401 self.ops_stats
402 .log_event_request_failure(dropped_events_count, flush_type);
403 }
404
405 fn try_add_non_exposed_checks_event(&self) {
406 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
407 if non_exposed_checks.is_empty() {
408 return;
409 }
410
411 let checks = std::mem::take(&mut *non_exposed_checks);
412 let result = self.queue.add(QueuedEvent::Passthrough(
413 StatsigEventInternal::new_non_exposed_checks_event(checks),
414 ));
415
416 if let QueueAddResult::NeedsFlushAndDropped(dropped_events_count) = result {
417 self.enqueue_dropped_events_count
418 .fetch_add(dropped_events_count, Ordering::Relaxed);
419 }
420 }
421
422 fn try_log_enqueue_dropped_events(&self) {
423 let dropped_events_count = self.enqueue_dropped_events_count.swap(0, Ordering::Relaxed);
424 if dropped_events_count == 0 {
425 return;
426 }
427
428 self.log_dropped_event_warning(dropped_events_count);
429
430 self.ops_stats.log_batching_dropped_events(
431 StatsigErr::LogEventError(
432 "Dropped events due to max pending event batches limit".to_string(),
433 ),
434 dropped_events_count,
435 &self.flush_interval,
436 &self.queue,
437 FlushType::Limit,
438 );
439 }
440
441 fn log_dropped_event_warning(&self, dropped_events_count: u64) {
442 let approximate_pending_events_count = self.queue.approximate_pending_events_count();
443 log_w!(
444 TAG,
445 "Too many events. Dropped {}. Approx pending events {}. Max pending batches {}. Max queue size {}",
446 dropped_events_count,
447 approximate_pending_events_count,
448 self.queue.max_pending_batches,
449 self.queue.batch_size
450 );
451 }
452}