1use super::{
2 event_queue::{
3 batch::EventBatch,
4 queue::{EventQueue, QueueReconcileResult},
5 queued_event::{EnqueueOperation, QueuedEvent},
6 },
7 exposure_sampling::ExposureSampling,
8 flush_interval::FlushInterval,
9 flush_type::FlushType,
10 statsig_event_internal::StatsigEventInternal,
11};
12use crate::{
13 event_logging::{
14 event_logger_constants::EventLoggerConstants, event_queue::queue::QueueAddResult,
15 },
16 log_d, log_e, log_w,
17 networking::NetworkError,
18 observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
19 statsig_metadata::StatsigMetadata,
20 write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
21};
22use std::{
23 collections::HashMap,
24 sync::{Arc, RwLock},
25};
26use std::{
27 sync::atomic::{AtomicU64, Ordering},
28 time::Duration,
29};
30use tokio::sync::{Notify, Semaphore};
31
32const BG_LOOP_TAG: &str = "EVT_LOG_BG_LOOP";
33const LIMIT_FLUSH_TAG: &str = "EVT_LOG_LIMIT_FLUSH";
34const DEFAULT_BATCH_SIZE: u32 = 2000;
35const DEFAULT_PENDING_BATCH_MAX: u32 = 100;
36const MAX_LIMIT_FLUSH_TASKS: usize = 5;
37
38#[derive(Debug, PartialEq, Eq, Clone, Copy)]
39pub enum ExposureTrigger {
40 Auto,
41 Manual,
42}
43
44const TAG: &str = stringify!(EvtLogger);
45
46pub struct EventLogger {
47 queue: EventQueue,
48 options: Arc<StatsigOptions>,
49 logging_adapter: Arc<dyn EventLoggingAdapter>,
50 event_sampler: ExposureSampling,
51 non_exposed_checks: RwLock<HashMap<String, u64>>,
52 limit_flush_notify: Notify,
53 limit_flush_semaphore: Arc<Semaphore>,
54 flush_interval: FlushInterval,
55 shutdown_notify: Notify,
56 ops_stats: Arc<OpsStatsForInstance>,
57 enqueue_dropped_events_count: AtomicU64,
58}
59
60impl EventLogger {
61 pub fn new(
62 sdk_key: &str,
63 options: &Arc<StatsigOptions>,
64 event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
65 statsig_rt: &Arc<StatsigRuntime>,
66 ) -> Arc<Self> {
67 let me = Arc::new(Self {
68 queue: EventQueue::new(
69 options
70 .event_logging_max_queue_size
71 .unwrap_or(DEFAULT_BATCH_SIZE),
72 options
73 .event_logging_max_pending_batch_queue_size
74 .unwrap_or(DEFAULT_PENDING_BATCH_MAX),
75 ),
76 event_sampler: ExposureSampling::new(sdk_key),
77 flush_interval: FlushInterval::new(),
78 options: options.clone(),
79 logging_adapter: event_logging_adapter.clone(),
80 non_exposed_checks: RwLock::new(HashMap::new()),
81 shutdown_notify: Notify::new(),
82 limit_flush_notify: Notify::new(),
83 limit_flush_semaphore: Arc::new(Semaphore::new(MAX_LIMIT_FLUSH_TASKS)),
84 ops_stats: OPS_STATS.get_for_instance(sdk_key),
85 enqueue_dropped_events_count: AtomicU64::new(0),
86 });
87
88 me.spawn_background_task(statsig_rt);
89 me
90 }
91
92 pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
93 if self.options.disable_all_logging == Some(true) {
94 return;
95 }
96
97 let decision = self.event_sampler.get_sampling_decision(&operation);
98 if !decision.should_log() {
99 return;
100 }
101
102 let pending_event = operation.into_queued_event(decision);
103 match self.queue.add(pending_event) {
104 QueueAddResult::Noop => (),
105 QueueAddResult::NeedsFlush => self.limit_flush_notify.notify_one(),
106 QueueAddResult::NeedsFlushAndDropped(dropped_events_count) => {
107 self.enqueue_dropped_events_count
108 .fetch_add(dropped_events_count, Ordering::Relaxed);
109 self.limit_flush_notify.notify_one();
110 }
111 }
112 }
113
114 pub fn increment_non_exposure_checks(&self, name: &str) {
115 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
116
117 match non_exposed_checks.get_mut(name) {
118 Some(count) => *count += 1,
119 None => {
120 non_exposed_checks.insert(name.into(), 1);
121 }
122 }
123 }
124
125 pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
126 self.try_flush_all_pending_events(FlushType::Manual).await
127 }
128
129 pub async fn shutdown(&self) -> Result<(), StatsigErr> {
130 let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
131 self.shutdown_notify.notify_one();
132 result
133 }
134
135 pub fn force_shutdown(&self) {
136 self.shutdown_notify.notify_one();
137 }
138
139 fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
140 let me = self.clone();
141 let rt_clone = rt.clone();
142
143 rt.spawn(BG_LOOP_TAG, |rt_shutdown_notify| async move {
144 let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
145 let tick_interval = Duration::from_millis(tick_interval_ms);
146
147 loop {
148 let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
149
150 tokio::select! {
151 () = tokio::time::sleep(tick_interval) => {
152 me.try_scheduled_flush().await;
153 }
154 () = rt_shutdown_notify.notified() => {
155 return; }
157 _ = me.shutdown_notify.notified() => {
158 return; }
160 _ = me.limit_flush_notify.notified(), if can_limit_flush => {
161 Self::spawn_new_limit_flush_task(&me, &rt_clone);
162 }
163 }
164
165 me.event_sampler.try_reset_all_sampling();
166 }
167 });
168 }
169
170 fn spawn_new_limit_flush_task(inst: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
171 let permit = match inst.limit_flush_semaphore.clone().try_acquire_owned() {
172 Ok(permit) => permit,
173 Err(_) => return,
174 };
175
176 let me = inst.clone();
177 rt.spawn(LIMIT_FLUSH_TAG, |_| async move {
178 log_d!(TAG, "Attempting limit flush");
179 if !me.flush_next_batch(FlushType::Limit).await {
180 return;
181 }
182
183 loop {
184 if !me.flush_interval.has_completely_recovered_from_backoff() {
185 break;
186 }
187
188 if !me.queue.contains_at_least_one_full_batch() {
189 break;
190 }
191
192 if !me.flush_next_batch(FlushType::Limit).await {
193 break;
194 }
195 }
196
197 drop(permit);
198 });
199 }
200
201 async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
202 self.prepare_event_queue_for_flush(flush_type);
203
204 let batches = self.queue.take_all_batches();
205
206 let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
207 match self.log_batch(&mut batch, flush_type).await {
208 Ok(_) => Ok(()),
209 Err(e) => {
210 if flush_type == FlushType::Manual {
211 self.flush_interval.adjust_for_failure();
212 self.try_requeue_failed_batch(&e, batch, flush_type);
213 return Err(e);
214 }
215
216 self.drop_events_for_failure(&e, batch, flush_type);
217 Err(e)
218 }
219 }
220 }))
221 .await;
222
223 results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
224 }
225
226 async fn try_scheduled_flush(&self) {
227 if !self.flush_interval.has_cooled_from_most_recent_failure() {
228 return;
229 }
230
231 let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
232 let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
233
234 if !should_flush_by_time && !should_flush_by_size {
235 return;
236 }
237
238 self.flush_interval.mark_scheduled_flush_attempt();
239
240 self.flush_next_batch(if should_flush_by_size {
241 FlushType::ScheduledFullBatch
242 } else {
243 FlushType::ScheduledMaxTime
244 })
245 .await;
246 }
247
248 async fn flush_next_batch(&self, flush_type: FlushType) -> bool {
249 self.prepare_event_queue_for_flush(flush_type);
250
251 let mut batch = match self.queue.take_next_batch() {
252 Some(batch) => batch,
253 None => return false,
254 };
255
256 let error = match self.log_batch(&mut batch, flush_type).await {
257 Err(e) => e,
258 Ok(()) => {
259 self.flush_interval.adjust_for_success();
260 return true;
261 }
262 };
263
264 self.flush_interval.adjust_for_failure();
265 self.try_requeue_failed_batch(&error, batch, flush_type);
266
267 false
268 }
269
270 async fn log_batch(
271 &self,
272 batch: &mut EventBatch,
273 flush_type: FlushType,
274 ) -> Result<(), StatsigErr> {
275 let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
276 self.flush_interval.get_current_flush_interval_ms(),
277 self.queue.batch_size,
278 self.queue.max_pending_batches,
279 flush_type.to_string(),
280 );
281
282 let result = self
283 .logging_adapter
284 .log_events(batch.get_log_event_request(statsig_metadata))
285 .await;
286
287 batch.attempts += 1;
288
289 match result {
290 Ok(true) => {
291 self.ops_stats.log_event_request_success(batch.events.len());
292 Ok(())
293 }
294 Ok(false) => Err(StatsigErr::LogEventError("Unknown Failure".into())),
295 Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_))) => Ok(()),
296 Err(e) => Err(e),
297 }
298 }
299
300 fn prepare_event_queue_for_flush(&self, flush_type: FlushType) {
301 self.try_add_non_exposed_checks_event();
302 self.try_log_enqueue_dropped_events();
303
304 let dropped_events_count = match self.queue.reconcile_batching() {
305 QueueReconcileResult::Success => return,
306 QueueReconcileResult::LockFailure => {
307 log_e!(TAG, "prepare_event_queue_for_flush lock failure");
308 return;
309 }
310 QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
311 };
312
313 if dropped_events_count > 0 {
314 self.log_dropped_event_warning(dropped_events_count);
315
316 self.ops_stats.log_batching_dropped_events(
317 StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
318 dropped_events_count,
319 &self.flush_interval,
320 &self.queue,
321 flush_type,
322 );
323 }
324 }
325
326 fn try_requeue_failed_batch(
327 &self,
328 error: &StatsigErr,
329 batch: EventBatch,
330 flush_type: FlushType,
331 ) {
332 let is_non_retryable = matches!(
333 error,
334 StatsigErr::NetworkError(NetworkError::RequestNotRetryable(_, _, _))
335 );
336
337 let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
338
339 if is_non_retryable || is_max_retries {
340 self.drop_events_for_failure(error, batch, flush_type);
341 return;
342 }
343
344 let dropped_events_count = match self.queue.requeue_batch(batch) {
345 QueueReconcileResult::Success => return,
346 QueueReconcileResult::DroppedEvents(dropped_events_count) => dropped_events_count,
347 QueueReconcileResult::LockFailure => {
348 log_e!(TAG, "try_requeue_failed_batch lock failure");
349 return;
350 }
351 };
352
353 if dropped_events_count == 0 {
354 return;
355 }
356
357 self.log_dropped_event_warning(dropped_events_count);
358
359 self.ops_stats.log_batching_dropped_events(
360 StatsigErr::LogEventError(
361 "Dropped events due to max pending event batches limit".to_string(),
362 ),
363 dropped_events_count,
364 &self.flush_interval,
365 &self.queue,
366 flush_type,
367 );
368 }
369
370 fn drop_events_for_failure(
371 &self,
372 error: &StatsigErr,
373 batch: EventBatch,
374 flush_type: FlushType,
375 ) {
376 let dropped_events_count = batch.events.len() as u64;
377
378 let kind = match flush_type {
379 FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
380 FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
381 FlushType::Limit => "Limit",
382 FlushType::Manual => "Manual",
383 FlushType::Shutdown => "Shutdown",
384 };
385
386 log_w!(
387 TAG,
388 "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
389 kind,
390 batch.attempts,
391 dropped_events_count,
392 error
393 );
394
395 self.ops_stats
396 .log_event_request_failure(dropped_events_count, flush_type);
397 }
398
399 fn try_add_non_exposed_checks_event(&self) {
400 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
401 if non_exposed_checks.is_empty() {
402 return;
403 }
404
405 let checks = std::mem::take(&mut *non_exposed_checks);
406 let result = self.queue.add(QueuedEvent::Passthrough(
407 StatsigEventInternal::new_non_exposed_checks_event(checks),
408 ));
409
410 if let QueueAddResult::NeedsFlushAndDropped(dropped_events_count) = result {
411 self.enqueue_dropped_events_count
412 .fetch_add(dropped_events_count, Ordering::Relaxed);
413 }
414 }
415
416 fn try_log_enqueue_dropped_events(&self) {
417 let dropped_events_count = self.enqueue_dropped_events_count.swap(0, Ordering::Relaxed);
418 if dropped_events_count == 0 {
419 return;
420 }
421
422 self.log_dropped_event_warning(dropped_events_count);
423
424 self.ops_stats.log_batching_dropped_events(
425 StatsigErr::LogEventError(
426 "Dropped events due to max pending event batches limit".to_string(),
427 ),
428 dropped_events_count,
429 &self.flush_interval,
430 &self.queue,
431 FlushType::Limit,
432 );
433 }
434
435 fn log_dropped_event_warning(&self, dropped_events_count: u64) {
436 let approximate_pending_events_count = self.queue.approximate_pending_events_count();
437 log_w!(
438 TAG,
439 "Too many events. Dropped {}. Approx pending events {}. Max pending batches {}. Max queue size {}",
440 dropped_events_count,
441 approximate_pending_events_count,
442 self.queue.max_pending_batches,
443 self.queue.batch_size
444 );
445 }
446}