statsig_rust/event_logging/
event_logger.rs1use super::{
2 event_queue::{
3 batch::EventBatch,
4 queue::{EventQueue, QueueResult},
5 queued_event::{EnqueueOperation, QueuedEvent},
6 },
7 exposure_sampling::ExposureSampling,
8 flush_interval::FlushInterval,
9 flush_type::FlushType,
10 statsig_event_internal::StatsigEventInternal,
11};
12use crate::{
13 event_logging::event_logger_constants::EventLoggerConstants,
14 log_d, log_w,
15 networking::NetworkError,
16 observability::ops_stats::{OpsStatsForInstance, OPS_STATS},
17 statsig_metadata::StatsigMetadata,
18 write_lock_or_noop, EventLoggingAdapter, StatsigErr, StatsigOptions, StatsigRuntime,
19};
20use std::time::Duration;
21use std::{
22 collections::HashMap,
23 sync::{Arc, RwLock},
24};
25use tokio::sync::{Notify, Semaphore};
26
27const BG_TASK_TAG: &str = "EVENT_LOGGER_BG_TASK";
28const DEFAULT_BATCH_SIZE: u32 = 2000;
29const DEFAULT_PENDING_BATCH_MAX: u32 = 100;
30const MAX_LIMIT_FLUSH_TASKS: usize = 5;
31
32#[derive(Debug, PartialEq, Eq, Clone, Copy)]
33pub enum ExposureTrigger {
34 Auto,
35 Manual,
36}
37
38const TAG: &str = stringify!(EvtLogger);
39
40pub struct EventLogger {
41 queue: EventQueue,
42 options: Arc<StatsigOptions>,
43 logging_adapter: Arc<dyn EventLoggingAdapter>,
44 event_sampler: ExposureSampling,
45 non_exposed_checks: RwLock<HashMap<String, u64>>,
46 limit_flush_notify: Notify,
47 limit_flush_semaphore: Arc<Semaphore>,
48 flush_interval: FlushInterval,
49 shutdown_notify: Notify,
50 ops_stats: Arc<OpsStatsForInstance>,
51}
52
53impl EventLogger {
54 pub fn new(
55 sdk_key: &str,
56 options: &Arc<StatsigOptions>,
57 event_logging_adapter: &Arc<dyn EventLoggingAdapter>,
58 statsig_rt: &Arc<StatsigRuntime>,
59 ) -> Arc<Self> {
60 let me = Arc::new(Self {
61 queue: EventQueue::new(
62 options
63 .event_logging_max_queue_size
64 .unwrap_or(DEFAULT_BATCH_SIZE),
65 options
66 .event_logging_max_pending_batch_queue_size
67 .unwrap_or(DEFAULT_PENDING_BATCH_MAX),
68 ),
69 event_sampler: ExposureSampling::new(sdk_key),
70 flush_interval: FlushInterval::new(),
71 options: options.clone(),
72 logging_adapter: event_logging_adapter.clone(),
73 non_exposed_checks: RwLock::new(HashMap::new()),
74 shutdown_notify: Notify::new(),
75 limit_flush_notify: Notify::new(),
76 limit_flush_semaphore: Arc::new(Semaphore::new(MAX_LIMIT_FLUSH_TASKS)),
77 ops_stats: OPS_STATS.get_for_instance(sdk_key),
78 });
79
80 me.spawn_background_task(statsig_rt);
81 me
82 }
83
84 pub fn enqueue(self: &Arc<Self>, operation: impl EnqueueOperation) {
85 if self.options.disable_all_logging == Some(true) {
86 return;
87 }
88
89 let decision = self.event_sampler.get_sampling_decision(&operation);
90 if !decision.should_log() {
91 return;
92 }
93
94 let pending_event = operation.into_queued_event(decision);
95 if self.queue.add(pending_event) {
96 self.limit_flush_notify.notify_one();
97 }
98 }
99
100 pub fn increment_non_exposure_checks(&self, name: &str) {
101 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
102
103 match non_exposed_checks.get_mut(name) {
104 Some(count) => *count += 1,
105 None => {
106 non_exposed_checks.insert(name.into(), 1);
107 }
108 }
109 }
110
111 pub async fn flush_all_pending_events(&self) -> Result<(), StatsigErr> {
112 self.try_flush_all_pending_events(FlushType::Manual).await
113 }
114
115 pub async fn shutdown(&self) -> Result<(), StatsigErr> {
116 let result = self.try_flush_all_pending_events(FlushType::Shutdown).await;
117 self.shutdown_notify.notify_one();
118 result
119 }
120
121 fn spawn_background_task(self: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
122 let me = self.clone();
123 let rt_clone = rt.clone();
124
125 rt.spawn(BG_TASK_TAG, |rt_shutdown_notify| async move {
126 let tick_interval_ms = EventLoggerConstants::tick_interval_ms();
127 let tick_interval = Duration::from_millis(tick_interval_ms);
128
129 loop {
130 let can_limit_flush = me.flush_interval.has_completely_recovered_from_backoff();
131
132 tokio::select! {
133 () = tokio::time::sleep(tick_interval) => {
134 me.try_scheduled_flush().await;
135 }
136 () = rt_shutdown_notify.notified() => {
137 return; }
139 _ = me.shutdown_notify.notified() => {
140 return; }
142 _ = me.limit_flush_notify.notified(), if can_limit_flush => {
143 Self::spawn_new_limit_flush_task(&me, &rt_clone);
144 }
145 }
146
147 me.event_sampler.try_reset_all_sampling();
148 }
149 });
150 }
151
152 fn spawn_new_limit_flush_task(inst: &Arc<Self>, rt: &Arc<StatsigRuntime>) {
153 let permit = match inst.limit_flush_semaphore.clone().try_acquire_owned() {
154 Ok(permit) => permit,
155 Err(_) => return,
156 };
157
158 let me = inst.clone();
159 rt.spawn(BG_TASK_TAG, |_| async move {
160 log_d!(TAG, "Attempting limit flush");
161 if !me.flush_next_batch(FlushType::Limit).await {
162 return;
163 }
164
165 loop {
166 if !me.flush_interval.has_completely_recovered_from_backoff() {
167 break;
168 }
169
170 if !me.queue.contains_at_least_one_full_batch() {
171 break;
172 }
173
174 if !me.flush_next_batch(FlushType::Limit).await {
175 break;
176 }
177 }
178
179 drop(permit);
180 });
181 }
182
183 async fn try_flush_all_pending_events(&self, flush_type: FlushType) -> Result<(), StatsigErr> {
184 self.prepare_event_queue_for_flush(flush_type);
185
186 let batches = self.queue.take_all_batches();
187
188 let results = futures::future::join_all(batches.into_iter().map(|mut batch| async {
189 match self.log_batch(&mut batch, flush_type).await {
190 Ok(_) => Ok(()),
191 Err(e) => {
192 if flush_type == FlushType::Manual {
193 self.flush_interval.adjust_for_failure();
194 self.try_requeue_failed_batch(&e, batch, flush_type);
195 return Err(e);
196 }
197
198 self.drop_events_for_failure(&e, batch, flush_type);
199 Err(e)
200 }
201 }
202 }))
203 .await;
204
205 results.into_iter().find(|r| r.is_err()).unwrap_or(Ok(()))
206 }
207
208 async fn try_scheduled_flush(&self) {
209 if !self.flush_interval.has_cooled_from_most_recent_failure() {
210 return;
211 }
212
213 let should_flush_by_time = self.flush_interval.has_waited_max_allowed_interval();
214 let should_flush_by_size = self.queue.contains_at_least_one_full_batch();
215
216 if !should_flush_by_time && !should_flush_by_size {
217 return;
218 }
219
220 self.flush_interval.mark_scheduled_flush_attempt();
221
222 self.flush_next_batch(if should_flush_by_size {
223 FlushType::ScheduledFullBatch
224 } else {
225 FlushType::ScheduledMaxTime
226 })
227 .await;
228 }
229
230 async fn flush_next_batch(&self, flush_type: FlushType) -> bool {
231 self.prepare_event_queue_for_flush(flush_type);
232
233 let mut batch = match self.queue.take_next_batch() {
234 Some(batch) => batch,
235 None => return false,
236 };
237
238 let error = match self.log_batch(&mut batch, flush_type).await {
239 Err(e) => e,
240 Ok(()) => {
241 self.flush_interval.adjust_for_success();
242 return true;
243 }
244 };
245
246 self.flush_interval.adjust_for_failure();
247 self.try_requeue_failed_batch(&error, batch, flush_type);
248
249 false
250 }
251
252 async fn log_batch(
253 &self,
254 batch: &mut EventBatch,
255 flush_type: FlushType,
256 ) -> Result<(), StatsigErr> {
257 let statsig_metadata = StatsigMetadata::get_with_log_event_extras(
258 self.flush_interval.get_current_flush_interval_ms(),
259 self.queue.batch_size,
260 self.queue.max_pending_batches,
261 flush_type.to_string(),
262 );
263
264 let result = self
265 .logging_adapter
266 .log_events(batch.get_log_event_request(statsig_metadata))
267 .await;
268
269 batch.attempts += 1;
270
271 match result {
272 Ok(true) => {
273 self.ops_stats.log_event_request_success(batch.events.len());
274 Ok(())
275 }
276 Ok(false) => Err(StatsigErr::LogEventError("Unknown Failure".into())),
277 Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _)) => Ok(()),
278 Err(e) => Err(e),
279 }
280 }
281
282 fn prepare_event_queue_for_flush(&self, flush_type: FlushType) {
283 self.try_add_non_exposed_checks_event();
284
285 let dropped_events_count = match self.queue.reconcile_batching() {
286 QueueResult::Success => return,
287 QueueResult::DroppedEvents(dropped_events_count) => dropped_events_count,
288 };
289
290 if dropped_events_count > 0 {
291 self.log_dropped_event_warning(dropped_events_count);
292
293 self.ops_stats.log_batching_dropped_events(
294 StatsigErr::LogEventError("Dropped events due to event queue limit".to_string()),
295 dropped_events_count,
296 &self.flush_interval,
297 &self.queue,
298 flush_type,
299 );
300 }
301 }
302
303 fn try_requeue_failed_batch(
304 &self,
305 error: &StatsigErr,
306 batch: EventBatch,
307 flush_type: FlushType,
308 ) {
309 let is_non_retryable = matches!(
310 error,
311 StatsigErr::NetworkError(NetworkError::RequestNotRetryable, _)
312 );
313
314 let is_max_retries = batch.attempts > EventLoggerConstants::max_log_event_retries();
315
316 if is_non_retryable || is_max_retries {
317 self.drop_events_for_failure(error, batch, flush_type);
318 return;
319 }
320
321 let dropped_events_count = match self.queue.requeue_batch(batch) {
322 QueueResult::Success => return,
323 QueueResult::DroppedEvents(dropped_events_count) => dropped_events_count,
324 };
325
326 if dropped_events_count == 0 {
327 return;
328 }
329
330 self.log_dropped_event_warning(dropped_events_count);
331
332 self.ops_stats.log_batching_dropped_events(
333 StatsigErr::LogEventError(
334 "Dropped events due to max pending event batches limit".to_string(),
335 ),
336 dropped_events_count,
337 &self.flush_interval,
338 &self.queue,
339 flush_type,
340 );
341 }
342
343 fn drop_events_for_failure(
344 &self,
345 error: &StatsigErr,
346 batch: EventBatch,
347 flush_type: FlushType,
348 ) {
349 let dropped_events_count = batch.events.len() as u64;
350
351 let kind = match flush_type {
352 FlushType::ScheduledMaxTime => "Scheduled (Max Time)",
353 FlushType::ScheduledFullBatch => "Scheduled (Full Batch)",
354 FlushType::Limit => "Limit",
355 FlushType::Manual => "Manual",
356 FlushType::Shutdown => "Shutdown",
357 };
358
359 log_w!(
360 TAG,
361 "{} flush failed after {} attempt(s). {} Event(s) will be dropped. {}",
362 kind,
363 batch.attempts,
364 dropped_events_count,
365 error
366 );
367
368 self.ops_stats
369 .log_event_request_failure(dropped_events_count, flush_type);
370 }
371
372 fn try_add_non_exposed_checks_event(&self) {
373 let mut non_exposed_checks = write_lock_or_noop!(TAG, self.non_exposed_checks);
374 if non_exposed_checks.is_empty() {
375 return;
376 }
377
378 let checks = std::mem::take(&mut *non_exposed_checks);
379 self.queue.add(QueuedEvent::Passthrough(
380 StatsigEventInternal::new_non_exposed_checks_event(checks),
381 ));
382 }
383
384 fn log_dropped_event_warning(&self, dropped_events_count: u64) {
385 let approximate_pending_events_count = self.queue.approximate_pending_events_count();
386 log_w!(
387 TAG,
388 "Too many events. Dropped {}. Approx pending events {}. Max pending batches {}. Max queue size {}",
389 dropped_events_count,
390 approximate_pending_events_count,
391 self.queue.max_pending_batches,
392 self.queue.batch_size
393 );
394 }
395}