statsig_rust/event_logging_adapter/
statsig_local_file_event_logging_adapter.rs

1use std::collections::HashSet;
2use std::io::{Read, Write};
3use std::sync::Arc;
4
5use super::log_event_payload::LogEventRequest;
6use crate::event_logging::statsig_event_internal::StatsigEventInternal;
7use crate::event_logging_adapter::EventLoggingAdapter;
8use crate::hashing::djb2;
9use crate::log_event_payload::LogEventPayload;
10use crate::statsig_metadata::StatsigMetadata;
11use crate::{log_d, log_e, StatsigErr, StatsigHttpEventLoggingAdapter, StatsigRuntime};
12use async_trait::async_trait;
13use file_guard::Lock;
14use rand::Rng;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17
18const TAG: &str = stringify!(StatsigLocalFileEventLoggingAdapter);
19#[derive(Serialize, Deserialize)]
20pub struct PendingLogRequests {
21    requests: Vec<LogEventRequest>,
22}
23
24pub struct StatsigLocalFileEventLoggingAdapter {
25    file_path: String,
26    http_adapter: StatsigHttpEventLoggingAdapter,
27}
28
29impl StatsigLocalFileEventLoggingAdapter {
30    #[must_use]
31    pub fn new(sdk_key: &str, output_directory: &str, log_event_url: Option<String>) -> Self {
32        let hashed_key = djb2(sdk_key);
33        let file_path = format!("{output_directory}/{hashed_key}_events.json");
34
35        Self {
36            file_path,
37            http_adapter: StatsigHttpEventLoggingAdapter::new(sdk_key, log_event_url.as_ref()),
38        }
39    }
40
41    pub async fn send_pending_events(&self) -> Result<(), StatsigErr> {
42        log_d!(TAG, "Sending pending events...");
43        let current_requests = if let Some(requests) = read_and_clear_file(&self.file_path)? {
44            requests
45        } else {
46            log_d!(TAG, "No events found");
47            return Ok(());
48        };
49
50        let processed_events = process_events(&current_requests);
51
52        let chunks = processed_events.chunks(1000);
53        let tasks = chunks.map(|chunk| async move {
54            let request = LogEventRequest {
55                event_count: chunk.len() as u64,
56                payload: LogEventPayload {
57                    events: json!(chunk),
58                    statsig_metadata: StatsigMetadata::get_as_json(),
59                },
60            };
61
62            let result = self.http_adapter.send_events_over_http(&request).await;
63            (request, result)
64        });
65
66        let results = futures::future::join_all(tasks).await;
67        let mut failed_requests = Vec::new();
68
69        for (request, result) in results {
70            match result {
71                Ok(true) => (),
72                _ => failed_requests.push(request),
73            }
74        }
75
76        if !failed_requests.is_empty() {
77            self.log_events(failed_requests.remove(0)).await?;
78        }
79
80        log_d!(TAG, "All events sent");
81        Ok(())
82    }
83}
84
85fn read_and_clear_file(file_path: &str) -> Result<Option<String>, StatsigErr> {
86    log_d!(TAG, "Retrieving pending events from {}", file_path);
87
88    let path = std::path::Path::new(file_path);
89    if !path.exists() {
90        return Ok(None);
91    }
92
93    let mut file = std::fs::OpenOptions::new()
94        .read(true)
95        .write(true)
96        .open(file_path)
97        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
98
99    let mut lock = file_guard::lock(&mut file, Lock::Exclusive, 0, 1)
100        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
101
102    let mut file_contents = String::new();
103    (*lock)
104        .read_to_string(&mut file_contents)
105        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
106
107    // Truncate the file to clear its contents
108    (*lock)
109        .set_len(0)
110        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
111
112    Ok(Some(file_contents))
113}
114
115fn process_events(current_requests: &str) -> Vec<StatsigEventInternal> {
116    let mut seen_exposures = HashSet::new();
117    let mut processed_events = vec![];
118
119    for line in current_requests.lines() {
120        let events: Vec<StatsigEventInternal> = match serde_json::from_str(line) {
121            Ok(events) => events,
122            Err(e) => {
123                log_e!(TAG, "Failed to parse events in file: {}", e.to_string());
124                continue;
125            }
126        };
127
128        for event in events {
129            if event.is_diagnostic_event() && !should_sample_sdk_diagnostics() {
130                continue;
131            }
132
133            if !event.is_exposure_event() {
134                processed_events.push(event);
135                continue;
136            }
137
138            let key = create_merge_key(&event);
139            if seen_exposures.contains(&key) {
140                continue;
141            }
142
143            seen_exposures.insert(key);
144            processed_events.push(event);
145        }
146    }
147
148    processed_events
149}
150
151fn create_merge_key(event: &StatsigEventInternal) -> String {
152    let mut metadata_parts = Vec::new();
153    if let Some(metadata) = &event.event_data.metadata {
154        if let Some(name) = metadata.get("gate") {
155            metadata_parts.push(format!("g.{name}|"));
156        }
157
158        if let Some(name) = metadata.get("config") {
159            metadata_parts.push(format!("c.{name}|"));
160        }
161
162        if let Some(name) = metadata.get("parameterName") {
163            metadata_parts.push(format!("pn.{name}|"));
164        }
165
166        if let Some(name) = metadata.get("allocatedExperiment") {
167            metadata_parts.push(format!("ae.{name}|"));
168        }
169
170        if let Some(rule_id) = metadata.get("ruleID") {
171            metadata_parts.push(format!("r.{rule_id}|"));
172        }
173    }
174
175    format!(
176        "{}|{}|{}",
177        event.event_data.event_name,
178        event.user.value,
179        metadata_parts.concat()
180    )
181}
182
183// PHP initializes per request, so we get a diagnostics event per request.
184// This samples quite aggressively to compensate for that
185fn should_sample_sdk_diagnostics() -> bool {
186    let random_number = rand::thread_rng().gen_range(0..10000);
187    random_number < 1
188}
189
190#[async_trait]
191impl EventLoggingAdapter for StatsigLocalFileEventLoggingAdapter {
192    async fn start(&self, _statsig_runtime: &Arc<StatsigRuntime>) -> Result<(), StatsigErr> {
193        Ok(())
194    }
195
196    async fn log_events(&self, request: LogEventRequest) -> Result<bool, StatsigErr> {
197        let json = request.payload.events.to_string();
198
199        let mut file = std::fs::OpenOptions::new()
200            .append(true)
201            .create(true)
202            .open(&self.file_path)
203            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
204
205        let mut lock = file_guard::lock(&mut file, Lock::Exclusive, 0, 1)
206            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
207
208        (*lock)
209            .write_all(format!("{json}\n").as_bytes())
210            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
211
212        Ok(true)
213    }
214
215    async fn shutdown(&self) -> Result<(), StatsigErr> {
216        Ok(())
217    }
218
219    fn should_schedule_background_flush(&self) -> bool {
220        false
221    }
222}