statsig_rust/event_logging_adapter/
statsig_http_event_logging_adapter.rs

1use crate::compression::compression_helper::{compress_data, get_compression_format};
2use crate::event_logging_adapter::EventLoggingAdapter;
3use crate::log_event_payload::LogEventRequest;
4use crate::networking::{NetworkClient, RequestArgs};
5use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
6use crate::observability::ErrorBoundaryEvent;
7use crate::statsig_metadata::StatsigMetadata;
8use crate::{log_d, StatsigErr, StatsigRuntime};
9use async_trait::async_trait;
10use serde::Deserialize;
11use serde_json::json;
12use std::collections::HashMap;
13use std::sync::Arc;
14
15const DEFAULT_LOG_EVENT_URL: &str = "https://prodregistryv2.org/v1/log_event";
16
17#[derive(Deserialize)]
18struct LogEventResult {
19    success: Option<bool>,
20}
21
22const TAG: &str = stringify!(StatsigHttpEventLoggingAdapter);
23
24pub struct StatsigHttpEventLoggingAdapter {
25    log_event_url: String,
26    network: NetworkClient,
27    ops_stats: Arc<OpsStatsForInstance>,
28}
29
30impl StatsigHttpEventLoggingAdapter {
31    #[must_use]
32    pub fn new(sdk_key: &str, log_event_url: Option<&String>) -> Self {
33        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
34
35        let log_event_url = match log_event_url {
36            Some(u) => u,
37            _ => DEFAULT_LOG_EVENT_URL,
38        }
39        .to_string();
40        Self {
41            log_event_url,
42            network: NetworkClient::new(sdk_key, Some(headers)),
43            ops_stats: OPS_STATS.get_for_instance(sdk_key),
44        }
45    }
46
47    pub async fn send_events_over_http(
48        &self,
49        request: &LogEventRequest,
50    ) -> Result<bool, StatsigErr> {
51        log_d!(
52            TAG,
53            "Logging Events ({}): {}",
54            &request.event_count,
55            json!(&request.payload).to_string()
56        );
57
58        let compression_format = get_compression_format();
59
60        // Set headers
61        let headers = HashMap::from([
62            (
63                "statsig-event-count".to_string(),
64                request.event_count.to_string(),
65            ),
66            ("Content-Encoding".to_owned(), compression_format.clone()),
67            ("Content-Type".to_owned(), "application/json".to_owned()),
68        ]);
69
70        // Compress data before sending it
71        let bytes = serde_json::to_vec(&request.payload)
72            .map_err(|e| StatsigErr::SerializationError(e.to_string()))?;
73        let compressed = match compress_data(&bytes) {
74            Ok(c) => c,
75            Err(e) => return Err(e),
76        };
77
78        // Make request
79        let response_str = self
80            .network
81            .post(
82                RequestArgs {
83                    url: self.log_event_url.clone(),
84                    retries: 3,
85                    headers: Some(headers),
86                    accept_gzip_response: true,
87                    ..RequestArgs::new()
88                },
89                Some(compressed.into()),
90            )
91            .await
92            .map_err(|_err| StatsigErr::NetworkError("Log event failure".into()))?;
93
94        serde_json::from_str::<LogEventResult>(&response_str)
95            .map(|result| result.success != Some(false))
96            .map_err(|e| {
97                StatsigErr::JsonParseError(stringify!(LogEventResult).to_string(), e.to_string())
98            })
99    }
100}
101
102#[async_trait]
103impl EventLoggingAdapter for StatsigHttpEventLoggingAdapter {
104    async fn start(&self, _statsig_runtime: &Arc<StatsigRuntime>) -> Result<(), StatsigErr> {
105        Ok(())
106    }
107
108    #[allow(clippy::manual_inspect)]
109    async fn log_events(&self, request: LogEventRequest) -> Result<bool, StatsigErr> {
110        self.send_events_over_http(&request).await.map_err(|e| {
111            self.ops_stats.log_error(ErrorBoundaryEvent {
112                exception: "LogEventFailed".to_string(),
113                tag: "statsig::log_event_failed".to_string(),
114                extra: Some(HashMap::from([(
115                    "eventCount".to_string(),
116                    request.event_count.to_string(),
117                )])),
118            });
119            e
120        })
121    }
122
123    async fn shutdown(&self) -> Result<(), StatsigErr> {
124        Ok(())
125    }
126
127    fn should_schedule_background_flush(&self) -> bool {
128        true
129    }
130}
131
132#[cfg(not(feature = "with_zstd"))]
133#[tokio::test]
134async fn test_event_logging() {
135    use crate::log_event_payload::{LogEventPayload, LogEventRequest};
136    use std::env;
137
138    let sdk_key = env::var("test_api_key").expect("test_api_key environment variable not set");
139
140    let adapter = StatsigHttpEventLoggingAdapter::new(&sdk_key, None);
141
142    let payload_str = r#"{"events":[{"eventName":"statsig::config_exposure","metadata":{"config":"running_exp_in_unlayered_with_holdout","ruleID":"5suobe8yyvznqasn9Ph1dI"},"secondaryExposures":[{"gate":"global_holdout","gateValue":"false","ruleID":"3QoA4ncNdVGBaMt3N1KYjz:0.50:1"},{"gate":"exp_holdout","gateValue":"false","ruleID":"1rEqLOpCROaRafv7ubGgax"}],"time":1722386636538,"user":{"appVersion":null,"country":null,"custom":null,"customIDs":null,"email":"daniel@statsig.com","ip":null,"locale":null,"privateAttributes":null,"statsigEnvironment":null,"userAgent":null,"userID":"a-user"},"value":null}],"statsigMetadata":{"sdk_type":"statsig-server-core","sdk_version":"0.0.1"}}"#;
143    let payload = serde_json::from_str::<LogEventPayload>(payload_str).unwrap();
144
145    let request = LogEventRequest {
146        payload,
147        event_count: 1,
148    };
149
150    let result = adapter.log_events(request).await;
151
152    assert!(result.is_ok(), "Error logging events: {:?}", result.err());
153}