statsig_rust/event_logging_adapter/
statsig_local_file_event_logging_adapter.rs

1use std::collections::HashSet;
2use std::io::{Read, Write};
3use std::sync::Arc;
4
5use super::log_event_payload::LogEventRequest;
6use crate::event_logging::statsig_event_internal::StatsigEventInternal;
7use crate::event_logging_adapter::EventLoggingAdapter;
8use crate::hashing::djb2;
9use crate::log_event_payload::LogEventPayload;
10use crate::statsig_metadata::StatsigMetadata;
11use crate::{
12    log_d, log_e, StatsigErr, StatsigHttpEventLoggingAdapter, StatsigOptions, StatsigRuntime,
13};
14use async_trait::async_trait;
15use file_guard::Lock;
16use rand::Rng;
17use serde::{Deserialize, Serialize};
18use serde_json::json;
19
20const TAG: &str = stringify!(StatsigLocalFileEventLoggingAdapter);
21#[derive(Serialize, Deserialize)]
22pub struct PendingLogRequests {
23    requests: Vec<LogEventRequest>,
24}
25
26pub struct StatsigLocalFileEventLoggingAdapter {
27    file_path: String,
28    http_adapter: StatsigHttpEventLoggingAdapter,
29}
30
31impl StatsigLocalFileEventLoggingAdapter {
32    #[must_use]
33    pub fn new(
34        sdk_key: &str,
35        output_directory: &str,
36        log_event_url: Option<String>,
37        disable_network: bool,
38    ) -> Self {
39        let hashed_key = djb2(sdk_key);
40        let file_path = format!("{output_directory}/{hashed_key}_events.json");
41
42        let options = StatsigOptions {
43            log_event_url,
44            disable_network: Some(disable_network),
45            ..Default::default()
46        };
47
48        Self {
49            file_path,
50            http_adapter: StatsigHttpEventLoggingAdapter::new(sdk_key, Some(&options)),
51        }
52    }
53
54    pub async fn send_pending_events(&self) -> Result<(), StatsigErr> {
55        log_d!(TAG, "Sending pending events...");
56        let current_requests = if let Some(requests) = read_and_clear_file(&self.file_path)? {
57            requests
58        } else {
59            log_d!(TAG, "No events found");
60            return Ok(());
61        };
62
63        let processed_events = process_events(&current_requests);
64        let chunks = processed_events.chunks(1000);
65        let tasks = chunks.map(|chunk| async move {
66            let request = LogEventRequest {
67                event_count: chunk.len() as u64,
68                payload: LogEventPayload {
69                    events: json!(chunk),
70                    statsig_metadata: StatsigMetadata::get_as_json(),
71                },
72                retries: 0,
73            };
74
75            let result = self.http_adapter.send_events_over_http(&request).await;
76            (request, result)
77        });
78
79        let results = futures::future::join_all(tasks).await;
80        let mut failed_requests = Vec::new();
81
82        for (request, result) in results {
83            match result {
84                Ok(_) => (),
85                Err(_) => failed_requests.push(request),
86            }
87        }
88
89        if !failed_requests.is_empty() {
90            self.log_events(failed_requests.remove(0)).await?;
91        }
92
93        log_d!(TAG, "All events sent");
94        Ok(())
95    }
96}
97
98fn read_and_clear_file(file_path: &str) -> Result<Option<String>, StatsigErr> {
99    log_d!(TAG, "Retrieving pending events from {}", file_path);
100
101    let path = std::path::Path::new(file_path);
102    if !path.exists() {
103        return Ok(None);
104    }
105
106    let mut file = std::fs::OpenOptions::new()
107        .read(true)
108        .write(true)
109        .open(file_path)
110        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
111
112    let mut lock = file_guard::lock(&mut file, Lock::Exclusive, 0, 1)
113        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
114
115    let mut file_contents = String::new();
116    (*lock)
117        .read_to_string(&mut file_contents)
118        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
119
120    // Truncate the file to clear its contents
121    (*lock)
122        .set_len(0)
123        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
124
125    Ok(Some(file_contents))
126}
127
128fn process_events(current_requests: &str) -> Vec<StatsigEventInternal> {
129    let mut seen_exposures = HashSet::new();
130    let mut processed_events = vec![];
131
132    for line in current_requests.lines() {
133        let events: Vec<StatsigEventInternal> = match serde_json::from_str(line) {
134            Ok(events) => events,
135            Err(e) => {
136                log_e!(TAG, "Failed to parse events in file: {}", e.to_string());
137                continue;
138            }
139        };
140
141        for event in events {
142            if event.is_diagnostic_event() && !should_sample_sdk_diagnostics() {
143                continue;
144            }
145
146            if !event.is_exposure_event() {
147                processed_events.push(event);
148                continue;
149            }
150
151            let key = create_merge_key(&event);
152            if seen_exposures.contains(&key) {
153                continue;
154            }
155
156            seen_exposures.insert(key);
157            processed_events.push(event);
158        }
159    }
160
161    processed_events
162}
163
164fn create_merge_key(event: &StatsigEventInternal) -> String {
165    let mut metadata_parts = Vec::new();
166    if let Some(metadata) = &event.event_data.metadata {
167        if let Some(name) = metadata.get("gate") {
168            metadata_parts.push(format!("g.{name}|"));
169        }
170
171        if let Some(name) = metadata.get("config") {
172            metadata_parts.push(format!("c.{name}|"));
173        }
174
175        if let Some(name) = metadata.get("parameterName") {
176            metadata_parts.push(format!("pn.{name}|"));
177        }
178
179        if let Some(name) = metadata.get("allocatedExperiment") {
180            metadata_parts.push(format!("ae.{name}|"));
181        }
182
183        if let Some(rule_id) = metadata.get("ruleID") {
184            metadata_parts.push(format!("r.{rule_id}|"));
185        }
186    }
187
188    format!(
189        "{}|{}|{}",
190        event.event_data.event_name,
191        event.user.data.create_user_values_hash(),
192        metadata_parts.concat()
193    )
194}
195
196// PHP initializes per request, so we get a diagnostics event per request.
197// This samples quite aggressively to compensate for that
198fn should_sample_sdk_diagnostics() -> bool {
199    let random_number = rand::thread_rng().gen_range(0..10000);
200    random_number < 1
201}
202
203#[async_trait]
204impl EventLoggingAdapter for StatsigLocalFileEventLoggingAdapter {
205    async fn start(&self, _statsig_runtime: &Arc<StatsigRuntime>) -> Result<(), StatsigErr> {
206        Ok(())
207    }
208
209    async fn log_events(&self, request: LogEventRequest) -> Result<bool, StatsigErr> {
210        let json = request.payload.events.to_string();
211
212        let mut file = std::fs::OpenOptions::new()
213            .append(true)
214            .create(true)
215            .open(&self.file_path)
216            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
217
218        let mut lock = file_guard::lock(&mut file, Lock::Exclusive, 0, 1)
219            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
220
221        (*lock)
222            .write_all(format!("{json}\n").as_bytes())
223            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
224
225        Ok(true)
226    }
227
228    async fn shutdown(&self) -> Result<(), StatsigErr> {
229        Ok(())
230    }
231
232    fn should_schedule_background_flush(&self) -> bool {
233        false
234    }
235}