1use super::{
2 event_queue::{
3 batch::EventBatch,
4 queue::{EventQueue, QueueReconcileResult},
5 queued_event::{EnqueueOperation, QueuedEvent},
6 },
7 exposure_sampling::ExposureSampling,
8 flush_interval::FlushInterval,
9 flush_type::FlushType,
10 statsig_event_internal::StatsigEventInternal,
11};
12use crate::{
13 event_logging::{
14 event_logger_constants::EventLoggerConstants, event_queue::queue::QueueAddResult,
15 },
16 log_d, log_e, log_w,
17 networking::NetworkError,
18 observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
19 statsig_metadata::StatsigMetadata,
20 write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
21};
22use parking_lot::RwLock;
23use std::{collections::HashMap, sync::Arc};
24use std::{
25 sync::atomic::{AtomicU64, Ordering},
26 time::Duration,
27};
28use tokio::sync::{Notify, Semaphore};
29
30pub const MIN_BATCH_SIZE: u32 = 10;
31pub const MAX_BATCH_SIZE: u32 = 2000;
32
33pub const MIN_PENDING_BATCH_COUNT: u32 = 1;
34pub const DEFAULT_PENDING_BATCH_COUNT_MAX: u32 = 100;
35
36const BG_LOOP_TAG: &str = "EVT_LOG_BG_LOOP";
37const LIMIT_FLUSH_TAG: &str = "EVT_LOG_LIMIT_FLUSH";
38const DEFAULT_BATCH_SIZE: u32 = MAX_BATCH_SIZE;
39const MAX_LIMIT_FLUSH_TASKS: usize = 5;
40
41#[derive(Debug, PartialEq, Eq, Clone, Copy)]
42pub enum ExposureTrigger {
43 Auto,
44 Manual,
45}
46
47const TAG: &str = stringify!(EvtLogger);
48
49pub struct EventLogger {
50 queue: EventQueue,
51 options: Arc<StatsigOptions>,
52 logging_adapter: Arc<dyn EventLoggingAdapter>,
53 event_sampler: ExposureSampling,
54 non_exposed_checks: RwLock<HashMap<String, u64>>,
55 limit_flush_notify: Notify,
56 limit_flush_semaphore: Arc<Semaphore>,
57 flush_interval: FlushInterval,
58 shutdown_notify: Notify,
59 ops_stats: Arc<OpsStatsForInstance>,
60 enqueue_dropped_events_count: AtomicU64,
61}
62
63impl EventLogger {
64 pub fn new(
65 sdk_key: &str,
66 options: &Arc<StatsigOptions>,
67 event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
68 statsig_rt: &Arc<StatsigRuntime>,
69 ) -> Arc<Self> {
70 let me = Arc::new(Self {
71 queue: EventQueue::new(
72 options
73 .event_logging_max_queue_size
74 .unwrap_or(DEFAULT_BATCH_SIZE),
75 options
76 .event_logging_max_pending_batch_queue_size
77 .unwrap_or(DEFAULT_PENDING_BATCH_COUNT_MAX),
78 ),
79 event_sampler: ExposureSampling::new(sdk_key),
80 flush_interval: FlushInterval::new(),
81 options: options.clone(),
82 logging_adapter: event_logging_adapter.clone(),
83 non_exposed_checks: RwLock::new(HashMap::new()),
84 shutdown_notify: Notify::new(),
85 limit_flush_notify: Notify::new(),
86 limit_flush_semaphore: Arc::new(Semaphore::new(MAX_LIMIT_FLUSH_TASKS)),
87 ops_stats: OPS_STATS.get_for_instance(sdk_key),
88 enqueue_dropped_events_count: AtomicU64::new(0),
89 });
90
91 me.spawn_background_task(statsig_rt);
92 me
93 }
94
95 pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
96 if self.options.disable_all_logging == Some(true) {
97 return;
98 }
99
100 let decision = self.event_sampler.get_sampling_decision(&operation);
101 if !decision.should_log() {
102 return;
103 }
104
105 let pending_event = operation.into_queued_event(decision);
106 match self.queue.add(pending_event) {
107 QueueAddResult::Noop => (),
108 QueueAddResult::NeedsFlush => self.limit_flush_notify.notify_one(),
109 QueueAddResult::NeedsFlushAndDropped(dropped_events_count) => {
110 self.enqueue_dropped_events_count
111 .fetch_add(dropped_events_count, Ordering::Relaxed);
112 self.limit_flush_notify.notify_one();
113 }
114 }
115 }
116
117 pub fn increment_non_exposure_checks(&self, name: &str) {
118 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
119
120 match non_exposed_checks.get_mut(name) {
121 Some(count) => *count += 1,
122 None => {
123 non_exposed_checks.insert(name.into(), 1);
124 }
125 }
126 }
127
128 pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
129 self.try_flush_all_pending_events(FlushType::Manual).await
130 }
131
132 pub async fn shutdown(&self) -> Result<(), StatsigErr> {
133 let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
134 self.shutdown_notify.notify_one();
135 result
136 }
137
138 pub fn force_shutdown(&self) {
139 self.shutdown_notify.notify_one();
140 }
141
142 fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
143 let me = self.clone();
144 let rt_clone = rt.clone();
145
146 let spawn_result = rt.spawn(BG_LOOP_TAG, |rt_shutdown_notify| async move {
147 let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
148 let tick_interval = Duration::from_millis(tick_interval_ms);
149
150 loop {
151 let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
152
153 tokio::select! {
154 () = tokio::time::sleep(tick_interval) => {
155 me.try_scheduled_flush().await;
156 }
157 () = rt_shutdown_notify.notified() => {
158 return; }
160 _ = me.shutdown_notify.notified() => {
161 return; }
163 _ = me.limit_flush_notify.notified(), if can_limit_flush => {
164 Self::spawn_new_limit_flush_task(&me, &rt_clone);
165 }
166 }
167
168 me.event_sampler.try_reset_all_sampling();
169 }
170 });
171
172 if let Err(e) = spawn_result {
173 log_e!(TAG, "Failed to spawn background task: {e}");
174 }
175 }
176
177 fn spawn_new_limit_flush_task(inst: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
178 let permit = match inst.limit_flush_semaphore.clone().try_acquire_owned() {
179 Ok(permit) => permit,
180 Err(_) => return,
181 };
182
183 let me = inst.clone();
184 let spawn_result = rt.spawn(LIMIT_FLUSH_TAG, |_| async move {
185 log_d!(TAG, "Attempting limit flush");
186 if !me.flush_next_batch(FlushType::Limit).await {
187 return;
188 }
189
190 loop {
191 if !me.flush_interval.has_completely_recovered_from_backoff() {
192 break;
193 }
194
195 if !me.queue.contains_at_least_one_full_batch() {
196 break;
197 }
198
199 if !me.flush_next_batch(FlushType::Limit).await {
200 break;
201 }
202 }
203
204 drop(permit);
205 });
206
207 if let Err(e) = spawn_result {
208 log_e!(TAG, "Failed to spawn limit flush task: {e}");
209 }
210 }
211
212 async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
213 self.prepare_event_queue_for_flush(flush_type);
214
215 let batches = self.queue.take_all_batches();
216
217 let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
218 match self.log_batch(&mut batch, flush_type).await {
219 Ok(_) => Ok(()),
220 Err(e) => {
221 if flush_type == FlushType::Manual {
222 self.flush_interval.adjust_for_failure();
223 self.try_requeue_failed_batch(&e, batch, flush_type);
224 return Err(e);
225 }
226
227 self.drop_events_for_failure(&e, batch, flush_type);
228 Err(e)
229 }
230 }
231 }))
232 .await;
233
234 results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
235 }
236
237 async fn try_scheduled_flush(&self) {
238 if !self.flush_interval.has_cooled_from_most_recent_failure() {
239 return;
240 }
241
242 let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
243 let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
244
245 if !should_flush_by_time && !should_flush_by_size {
246 return;
247 }
248
249 self.flush_interval.mark_scheduled_flush_attempt();
250
251 self.flush_next_batch(if should_flush_by_size {
252 FlushType::ScheduledFullBatch
253 } else {
254 FlushType::ScheduledMaxTime
255 })
256 .await;
257 }
258
259 async fn flush_next_batch(&self, flush_type: FlushType) -> bool {
260 self.prepare_event_queue_for_flush(flush_type);
261
262 let mut batch = match self.queue.take_next_batch() {
263 Some(batch) => batch,
264 None => return false,
265 };
266
267 let error = match self.log_batch(&mut batch, flush_type).await {
268 Err(e) => e,
269 Ok(()) => {
270 self.flush_interval.adjust_for_success();
271 return true;
272 }
273 };
274
275 self.flush_interval.adjust_for_failure();
276 self.try_requeue_failed_batch(&error, batch, flush_type);
277
278 false
279 }
280
281 async fn log_batch(
282 &self,
283 batch: &mut EventBatch,
284 flush_type: FlushType,
285 ) -> Result<(), StatsigErr> {
286 let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
287 self.flush_interval.get_current_flush_interval_ms(),
288 self.queue.batch_size,
289 self.queue.max_pending_batches,
290 flush_type.to_string(),
291 );
292
293 let result = self
294 .logging_adapter
295 .log_events(batch.get_log_event_request(statsig_metadata))
296 .await;
297
298 batch.attempts += 1;
299
300 match result {
301 Ok(true) => {
302 self.ops_stats.log_event_request_success(batch.events.len());
303 Ok(())
304 }
305 Ok(false) => Err(StatsigErr::LogEventError("Unknown Failure".into())),
306 Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_))) => Ok(()),
307 Err(e) => Err(e),
308 }
309 }
310
311 fn prepare_event_queue_for_flush(&self, flush_type: FlushType) {
312 self.try_add_non_exposed_checks_event();
313 self.try_log_enqueue_dropped_events();
314
315 let dropped_events_count = match self.queue.reconcile_batching() {
316 QueueReconcileResult::Success => return,
317 QueueReconcileResult::LockFailure => {
318 log_e!(TAG, "prepare_event_queue_for_flush lock failure");
319 return;
320 }
321 QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
322 };
323
324 if dropped_events_count > 0 {
325 self.log_dropped_event_warning(dropped_events_count);
326
327 self.ops_stats.log_batching_dropped_events(
328 StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
329 dropped_events_count,
330 &self.flush_interval,
331 &self.queue,
332 flush_type,
333 );
334 }
335 }
336
337 fn try_requeue_failed_batch(
338 &self,
339 error: &StatsigErr,
340 batch: EventBatch,
341 flush_type: FlushType,
342 ) {
343 let is_non_retryable = matches!(
344 error,
345 StatsigErr::NetworkError(NetworkError::RequestNotRetryable(_, _, _))
346 );
347
348 let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
349
350 if is_non_retryable || is_max_retries {
351 self.drop_events_for_failure(error, batch, flush_type);
352 return;
353 }
354
355 let dropped_events_count = match self.queue.requeue_batch(batch) {
356 QueueReconcileResult::Success => return,
357 QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
358 QueueReconcileResult::LockFailure => {
359 log_e!(TAG, "try_requeue_failed_batch lock failure");
360 return;
361 }
362 };
363
364 if dropped_events_count == 0 {
365 return;
366 }
367
368 self.log_dropped_event_warning(dropped_events_count);
369
370 self.ops_stats.log_batching_dropped_events(
371 StatsigErr::LogEventError(
372 "Dropped events due to max pending event batches limit".to_string(),
373 ),
374 dropped_events_count,
375 &self.flush_interval,
376 &self.queue,
377 flush_type,
378 );
379 }
380
381 fn drop_events_for_failure(
382 &self,
383 error: &StatsigErr,
384 batch: EventBatch,
385 flush_type: FlushType,
386 ) {
387 let dropped_events_count = batch.events.len() as u64;
388
389 let kind = match flush_type {
390 FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
391 FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
392 FlushType::Limit => "Limit",
393 FlushType::Manual => "Manual",
394 FlushType::Shutdown => "Shutdown",
395 };
396
397 log_w!(
398 TAG,
399 "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
400 kind,
401 batch.attempts,
402 dropped_events_count,
403 error
404 );
405
406 self.ops_stats
407 .log_event_request_failure(dropped_events_count, flush_type);
408 }
409
410 fn try_add_non_exposed_checks_event(&self) {
411 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
412 if non_exposed_checks.is_empty() {
413 return;
414 }
415
416 let checks = std::mem::take(&mut *non_exposed_checks);
417 let result = self.queue.add(QueuedEvent::Passthrough(
418 StatsigEventInternal::new_non_exposed_checks_event(checks),
419 ));
420
421 if let QueueAddResult::NeedsFlushAndDropped(dropped_events_count) = result {
422 self.enqueue_dropped_events_count
423 .fetch_add(dropped_events_count, Ordering::Relaxed);
424 }
425 }
426
427 fn try_log_enqueue_dropped_events(&self) {
428 let dropped_events_count = self.enqueue_dropped_events_count.swap(0, Ordering::Relaxed);
429 if dropped_events_count == 0 {
430 return;
431 }
432
433 self.log_dropped_event_warning(dropped_events_count);
434
435 self.ops_stats.log_batching_dropped_events(
436 StatsigErr::LogEventError(
437 "Dropped events due to max pending event batches limit".to_string(),
438 ),
439 dropped_events_count,
440 &self.flush_interval,
441 &self.queue,
442 FlushType::Limit,
443 );
444 }
445
446 fn log_dropped_event_warning(&self, dropped_events_count: u64) {
447 let approximate_pending_events_count = self.queue.approximate_pending_events_count();
448 log_w!(
449 TAG,
450 "Too many events. Dropped {}. Approx pending events {}. Max pending batches {}. Max queue size {}",
451 dropped_events_count,
452 approximate_pending_events_count,
453 self.queue.max_pending_batches,
454 self.queue.batch_size
455 );
456 }
457}