statsig_rust/event_logging_adapter/
statsig_http_event_logging_adapter.rs

1use crate::compression::compression_helper::{compress_data, get_compression_format};
2use crate::event_logging_adapter::EventLoggingAdapter;
3use crate::log_event_payload::LogEventRequest;
4use crate::networking::{NetworkClient, NetworkError, RequestArgs};
5use crate::statsig_metadata::StatsigMetadata;
6use crate::{log_d, StatsigErr, StatsigOptions, StatsigRuntime};
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12
13const DEFAULT_LOG_EVENT_URL: &str = "https://prodregistryv2.org/v1/log_event";
14
15#[derive(Deserialize)]
16struct LogEventResult {
17    success: Option<bool>,
18}
19
20const TAG: &str = stringify!(StatsigHttpEventLoggingAdapter);
21
22pub struct StatsigHttpEventLoggingAdapter {
23    log_event_url: String,
24    network: NetworkClient,
25}
26
27impl StatsigHttpEventLoggingAdapter {
28    #[must_use]
29    pub fn new(sdk_key: &str, options: Option<&StatsigOptions>) -> Self {
30        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
31
32        let log_event_url = options
33            .and_then(|opts| opts.log_event_url.as_ref())
34            .map(|u| u.to_string())
35            .unwrap_or_else(|| DEFAULT_LOG_EVENT_URL.to_string());
36
37        Self {
38            log_event_url,
39            network: NetworkClient::new(sdk_key, Some(headers), options),
40        }
41    }
42
43    pub async fn send_events_over_http(
44        &self,
45        request: &LogEventRequest,
46    ) -> Result<bool, StatsigErr> {
47        log_d!(
48            TAG,
49            "Logging Events ({}): {}",
50            &request.event_count,
51            json!(&request.payload).to_string()
52        );
53
54        let compression_format = get_compression_format();
55
56        // Set headers
57        let headers = HashMap::from([
58            (
59                "statsig-event-count".to_string(),
60                request.event_count.to_string(),
61            ),
62            (
63                "statsig-retry-count".to_string(),
64                request.retries.to_string(),
65            ),
66            ("Content-Encoding".to_owned(), compression_format.clone()),
67            ("Content-Type".to_owned(), "application/json".to_owned()),
68        ]);
69
70        // Compress data before sending it
71        let bytes = serde_json::to_vec(&request.payload)
72            .map_err(|e| StatsigErr::SerializationError(e.to_string()))?;
73        let compressed = match compress_data(&bytes) {
74            Ok(c) => c,
75            Err(e) => return Err(e),
76        };
77
78        // Make request
79        let response = self
80            .network
81            .post(
82                RequestArgs {
83                    url: self.log_event_url.clone(),
84                    headers: Some(headers),
85                    accept_gzip_response: true,
86                    ..RequestArgs::new()
87                },
88                Some(compressed),
89            )
90            .await
91            .map_err(|err| StatsigErr::NetworkError(err, Some("Log event failure".into())))?;
92        let response_slice = match response.data {
93            Some(data) => data,
94            None => {
95                return Err(StatsigErr::NetworkError(
96                    NetworkError::RequestFailed,
97                    Some("Empty response from network".to_string()),
98                ));
99            }
100        };
101
102        serde_json::from_slice::<LogEventResult>(&response_slice)
103            .map(|result| result.success != Some(false))
104            .map_err(|e| {
105                StatsigErr::JsonParseError(stringify!(LogEventResult).to_string(), e.to_string())
106            })
107    }
108}
109
110#[async_trait]
111impl EventLoggingAdapter for StatsigHttpEventLoggingAdapter {
112    async fn start(&self, _statsig_runtime: &Arc<StatsigRuntime>) -> Result<(), StatsigErr> {
113        Ok(())
114    }
115
116    #[allow(clippy::manual_inspect)]
117    async fn log_events(&self, request: LogEventRequest) -> Result<bool, StatsigErr> {
118        match self.send_events_over_http(&request).await {
119            Ok(_) => Ok(true),
120            Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _)) => Ok(false),
121            Err(e) => Err(e),
122        } //TODO: surface retryable code status for the logger to know if it should put back into the queue
123    }
124
125    async fn shutdown(&self) -> Result<(), StatsigErr> {
126        Ok(())
127    }
128
129    fn should_schedule_background_flush(&self) -> bool {
130        true
131    }
132}
133
134#[cfg(not(feature = "with_zstd"))]
135#[tokio::test]
136async fn test_event_logging() {
137    use crate::log_event_payload::{LogEventPayload, LogEventRequest};
138    use std::env;
139
140    let sdk_key = env::var("test_api_key").expect("test_api_key environment variable not set");
141
142    let adapter = StatsigHttpEventLoggingAdapter::new(&sdk_key, None);
143
144    let payload_str = r#"{"events":[{"eventName":"statsig::config_exposure","metadata":{"config":"running_exp_in_unlayered_with_holdout","ruleID":"5suobe8yyvznqasn9Ph1dI"},"secondaryExposures":[{"gate":"global_holdout","gateValue":"false","ruleID":"3QoA4ncNdVGBaMt3N1KYjz:0.50:1"},{"gate":"exp_holdout","gateValue":"false","ruleID":"1rEqLOpCROaRafv7ubGgax"}],"time":1722386636538,"user":{"appVersion":null,"country":null,"custom":null,"customIDs":null,"email":"daniel@statsig.com","ip":null,"locale":null,"privateAttributes":null,"statsigEnvironment":null,"userAgent":null,"userID":"a-user"},"value":null}],"statsigMetadata":{"sdk_type":"statsig-server-core","sdk_version":"0.0.1"}}"#;
145    let payload = serde_json::from_str::<LogEventPayload>(payload_str).unwrap();
146
147    let request = LogEventRequest {
148        payload,
149        event_count: 1,
150        retries: 0,
151    };
152
153    let result = adapter.log_events(request).await;
154
155    assert!(result.is_ok(), "Error logging events: {:?}", result.err());
156}