statsig_rust/event_logging_adapter/
statsig_http_event_logging_adapter.rs

1use crate::compression::compression_helper::{compress_data, get_compression_format};
2use crate::event_logging_adapter::EventLoggingAdapter;
3use crate::log_event_payload::LogEventRequest;
4use crate::networking::{NetworkClient, NetworkError, RequestArgs};
5use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
6use crate::observability::ErrorBoundaryEvent;
7use crate::statsig_metadata::StatsigMetadata;
8use crate::{log_d, StatsigErr, StatsigRuntime};
9use async_trait::async_trait;
10use serde::Deserialize;
11use serde_json::json;
12use std::collections::HashMap;
13use std::sync::Arc;
14
15const DEFAULT_LOG_EVENT_URL: &str = "https://prodregistryv2.org/v1/log_event";
16
17#[derive(Deserialize)]
18struct LogEventResult {
19    success: Option<bool>,
20}
21
22const TAG: &str = stringify!(StatsigHttpEventLoggingAdapter);
23
24pub struct StatsigHttpEventLoggingAdapter {
25    log_event_url: String,
26    network: NetworkClient,
27    ops_stats: Arc<OpsStatsForInstance>,
28}
29
30impl StatsigHttpEventLoggingAdapter {
31    #[must_use]
32    pub fn new(
33        sdk_key: &str,
34        log_event_url: Option<&String>,
35        disable_network: Option<bool>,
36    ) -> Self {
37        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
38
39        let log_event_url = match log_event_url {
40            Some(u) => u,
41            _ => DEFAULT_LOG_EVENT_URL,
42        }
43        .to_string();
44        Self {
45            log_event_url,
46            network: NetworkClient::new(sdk_key, Some(headers), disable_network),
47            ops_stats: OPS_STATS.get_for_instance(sdk_key),
48        }
49    }
50
51    pub async fn send_events_over_http(
52        &self,
53        request: &LogEventRequest,
54    ) -> Result<bool, StatsigErr> {
55        log_d!(
56            TAG,
57            "Logging Events ({}): {}",
58            &request.event_count,
59            json!(&request.payload).to_string()
60        );
61
62        let compression_format = get_compression_format();
63
64        // Set headers
65        let headers = HashMap::from([
66            (
67                "statsig-event-count".to_string(),
68                request.event_count.to_string(),
69            ),
70            ("Content-Encoding".to_owned(), compression_format.clone()),
71            ("Content-Type".to_owned(), "application/json".to_owned()),
72        ]);
73
74        // Compress data before sending it
75        let bytes = serde_json::to_vec(&request.payload)
76            .map_err(|e| StatsigErr::SerializationError(e.to_string()))?;
77        let compressed = match compress_data(&bytes) {
78            Ok(c) => c,
79            Err(e) => return Err(e),
80        };
81
82        // Make request
83        let response_str = self
84            .network
85            .post(
86                RequestArgs {
87                    url: self.log_event_url.clone(),
88                    retries: 3,
89                    headers: Some(headers),
90                    accept_gzip_response: true,
91                    ..RequestArgs::default()
92                },
93                Some(compressed),
94            )
95            .await
96            .map_err(|err| StatsigErr::NetworkError(err, Some("Log event failure".into())))?;
97
98        serde_json::from_str::<LogEventResult>(&response_str)
99            .map(|result| result.success != Some(false))
100            .map_err(|e| {
101                StatsigErr::JsonParseError(stringify!(LogEventResult).to_string(), e.to_string())
102            })
103    }
104}
105
106#[async_trait]
107impl EventLoggingAdapter for StatsigHttpEventLoggingAdapter {
108    async fn start(&self, _statsig_runtime: &Arc<StatsigRuntime>) -> Result<(), StatsigErr> {
109        Ok(())
110    }
111
112    #[allow(clippy::manual_inspect)]
113    async fn log_events(&self, request: LogEventRequest) -> Result<bool, StatsigErr> {
114        match self.send_events_over_http(&request).await {
115            Ok(_) => Ok(true),
116            Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _)) => Ok(false),
117            Err(e) => {
118                self.ops_stats.log_error(ErrorBoundaryEvent {
119                    exception: "LogEventFailed".to_string(),
120                    tag: "statsig::log_event_failed".to_string(),
121                    extra: Some(HashMap::from([(
122                        "eventCount".to_string(),
123                        request.event_count.to_string(),
124                    )])),
125                });
126                Err(e)
127            }
128        }
129    }
130
131    async fn shutdown(&self) -> Result<(), StatsigErr> {
132        Ok(())
133    }
134
135    fn should_schedule_background_flush(&self) -> bool {
136        true
137    }
138}
139
140#[cfg(not(feature = "with_zstd"))]
141#[tokio::test]
142async fn test_event_logging() {
143    use crate::log_event_payload::{LogEventPayload, LogEventRequest};
144    use std::env;
145
146    let sdk_key = env::var("test_api_key").expect("test_api_key environment variable not set");
147
148    let adapter = StatsigHttpEventLoggingAdapter::new(&sdk_key, None, None);
149
150    let payload_str = r#"{"events":[{"eventName":"statsig::config_exposure","metadata":{"config":"running_exp_in_unlayered_with_holdout","ruleID":"5suobe8yyvznqasn9Ph1dI"},"secondaryExposures":[{"gate":"global_holdout","gateValue":"false","ruleID":"3QoA4ncNdVGBaMt3N1KYjz:0.50:1"},{"gate":"exp_holdout","gateValue":"false","ruleID":"1rEqLOpCROaRafv7ubGgax"}],"time":1722386636538,"user":{"appVersion":null,"country":null,"custom":null,"customIDs":null,"email":"daniel@statsig.com","ip":null,"locale":null,"privateAttributes":null,"statsigEnvironment":null,"userAgent":null,"userID":"a-user"},"value":null}],"statsigMetadata":{"sdk_type":"statsig-server-core","sdk_version":"0.0.1"}}"#;
151    let payload = serde_json::from_str::<LogEventPayload>(payload_str).unwrap();
152
153    let request = LogEventRequest {
154        payload,
155        event_count: 1,
156    };
157
158    let result = adapter.log_events(request).await;
159
160    assert!(result.is_ok(), "Error logging events: {:?}", result.err());
161}