statsig_rust/event_logging_adapter/
statsig_local_file_event_logging_adapter.rs

1use std::collections::HashSet;
2use std::io::{Read, Write};
3use std::sync::Arc;
4
5use super::log_event_payload::LogEventRequest;
6use crate::event_logging::statsig_event_internal::StatsigEventInternal;
7use crate::event_logging_adapter::EventLoggingAdapter;
8use crate::hashing::djb2;
9use crate::log_event_payload::LogEventPayload;
10use crate::statsig_metadata::StatsigMetadata;
11use crate::{log_d, log_e, StatsigErr, StatsigHttpEventLoggingAdapter, StatsigRuntime};
12use async_trait::async_trait;
13use file_guard::Lock;
14use rand::Rng;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17
18const TAG: &str = stringify!(StatsigLocalFileEventLoggingAdapter);
19#[derive(Serialize, Deserialize)]
20pub struct PendingLogRequests {
21    requests: Vec<LogEventRequest>,
22}
23
24pub struct StatsigLocalFileEventLoggingAdapter {
25    file_path: String,
26    http_adapter: StatsigHttpEventLoggingAdapter,
27}
28
29impl StatsigLocalFileEventLoggingAdapter {
30    #[must_use]
31    pub fn new(
32        sdk_key: &str,
33        output_directory: &str,
34        log_event_url: Option<String>,
35        disable_network: bool,
36    ) -> Self {
37        let hashed_key = djb2(sdk_key);
38        let file_path = format!("{output_directory}/{hashed_key}_events.json");
39
40        Self {
41            file_path,
42            http_adapter: StatsigHttpEventLoggingAdapter::new(
43                sdk_key,
44                log_event_url.as_ref(),
45                Some(disable_network),
46            ),
47        }
48    }
49
50    pub async fn send_pending_events(&self) -> Result<(), StatsigErr> {
51        log_d!(TAG, "Sending pending events...");
52        let current_requests = if let Some(requests) = read_and_clear_file(&self.file_path)? {
53            requests
54        } else {
55            log_d!(TAG, "No events found");
56            return Ok(());
57        };
58
59        let processed_events = process_events(&current_requests);
60
61        let chunks = processed_events.chunks(1000);
62        let tasks = chunks.map(|chunk| async move {
63            let request = LogEventRequest {
64                event_count: chunk.len() as u64,
65                payload: LogEventPayload {
66                    events: json!(chunk),
67                    statsig_metadata: StatsigMetadata::get_as_json(),
68                },
69            };
70
71            let result = self.http_adapter.send_events_over_http(&request).await;
72            (request, result)
73        });
74
75        let results = futures::future::join_all(tasks).await;
76        let mut failed_requests = Vec::new();
77
78        for (request, result) in results {
79            match result {
80                Ok(true) => (),
81                _ => failed_requests.push(request),
82            }
83        }
84
85        if !failed_requests.is_empty() {
86            self.log_events(failed_requests.remove(0)).await?;
87        }
88
89        log_d!(TAG, "All events sent");
90        Ok(())
91    }
92}
93
94fn read_and_clear_file(file_path: &str) -> Result<Option<String>, StatsigErr> {
95    log_d!(TAG, "Retrieving pending events from {}", file_path);
96
97    let path = std::path::Path::new(file_path);
98    if !path.exists() {
99        return Ok(None);
100    }
101
102    let mut file = std::fs::OpenOptions::new()
103        .read(true)
104        .write(true)
105        .open(file_path)
106        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
107
108    let mut lock = file_guard::lock(&mut file, Lock::Exclusive, 0, 1)
109        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
110
111    let mut file_contents = String::new();
112    (*lock)
113        .read_to_string(&mut file_contents)
114        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
115
116    // Truncate the file to clear its contents
117    (*lock)
118        .set_len(0)
119        .map_err(|e| StatsigErr::FileError(e.to_string()))?;
120
121    Ok(Some(file_contents))
122}
123
124fn process_events(current_requests: &str) -> Vec<StatsigEventInternal> {
125    let mut seen_exposures = HashSet::new();
126    let mut processed_events = vec![];
127
128    for line in current_requests.lines() {
129        let events: Vec<StatsigEventInternal> = match serde_json::from_str(line) {
130            Ok(events) => events,
131            Err(e) => {
132                log_e!(TAG, "Failed to parse events in file: {}", e.to_string());
133                continue;
134            }
135        };
136
137        for event in events {
138            if event.is_diagnostic_event() && !should_sample_sdk_diagnostics() {
139                continue;
140            }
141
142            if !event.is_exposure_event() {
143                processed_events.push(event);
144                continue;
145            }
146
147            let key = create_merge_key(&event);
148            if seen_exposures.contains(&key) {
149                continue;
150            }
151
152            seen_exposures.insert(key);
153            processed_events.push(event);
154        }
155    }
156
157    processed_events
158}
159
160fn create_merge_key(event: &StatsigEventInternal) -> String {
161    let mut metadata_parts = Vec::new();
162    if let Some(metadata) = &event.event_data.metadata {
163        if let Some(name) = metadata.get("gate") {
164            metadata_parts.push(format!("g.{name}|"));
165        }
166
167        if let Some(name) = metadata.get("config") {
168            metadata_parts.push(format!("c.{name}|"));
169        }
170
171        if let Some(name) = metadata.get("parameterName") {
172            metadata_parts.push(format!("pn.{name}|"));
173        }
174
175        if let Some(name) = metadata.get("allocatedExperiment") {
176            metadata_parts.push(format!("ae.{name}|"));
177        }
178
179        if let Some(rule_id) = metadata.get("ruleID") {
180            metadata_parts.push(format!("r.{rule_id}|"));
181        }
182    }
183
184    format!(
185        "{}|{}|{}",
186        event.event_data.event_name,
187        event.user.value,
188        metadata_parts.concat()
189    )
190}
191
192// PHP initializes per request, so we get a diagnostics event per request.
193// This samples quite aggressively to compensate for that
194fn should_sample_sdk_diagnostics() -> bool {
195    let random_number = rand::thread_rng().gen_range(0..10000);
196    random_number < 1
197}
198
199#[async_trait]
200impl EventLoggingAdapter for StatsigLocalFileEventLoggingAdapter {
201    async fn start(&self, _statsig_runtime: &Arc<StatsigRuntime>) -> Result<(), StatsigErr> {
202        Ok(())
203    }
204
205    async fn log_events(&self, request: LogEventRequest) -> Result<bool, StatsigErr> {
206        let json = request.payload.events.to_string();
207
208        let mut file = std::fs::OpenOptions::new()
209            .append(true)
210            .create(true)
211            .open(&self.file_path)
212            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
213
214        let mut lock = file_guard::lock(&mut file, Lock::Exclusive, 0, 1)
215            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
216
217        (*lock)
218            .write_all(format!("{json}\n").as_bytes())
219            .map_err(|e| StatsigErr::FileError(e.to_string()))?;
220
221        Ok(true)
222    }
223
224    async fn shutdown(&self) -> Result<(), StatsigErr> {
225        Ok(())
226    }
227
228    fn should_schedule_background_flush(&self) -> bool {
229        false
230    }
231}