statsig_rust/event_logging_adapter/
statsig_http_event_logging_adapter.rs1use crate::compression::compression_helper::{compress_data, get_compression_format};
2use crate::event_logging_adapter::EventLoggingAdapter;
3use crate::log_event_payload::LogEventRequest;
4use crate::networking::{NetworkClient, NetworkError, RequestArgs};
5use crate::statsig_metadata::StatsigMetadata;
6use crate::{log_d, StatsigErr, StatsigOptions, StatsigRuntime};
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde_json::json;
10use std::collections::HashMap;
11use std::sync::Arc;
12
13const DEFAULT_LOG_EVENT_URL: &str = "https://prodregistryv2.org/v1/log_event";
14
15#[derive(Deserialize)]
16struct LogEventResult {
17 success: Option<bool>,
18}
19
20const TAG: &str = stringify!(StatsigHttpEventLoggingAdapter);
21
22pub struct StatsigHttpEventLoggingAdapter {
23 log_event_url: String,
24 network: NetworkClient,
25}
26
27impl StatsigHttpEventLoggingAdapter {
28 #[must_use]
29 pub fn new(sdk_key: &str, options: Option<&StatsigOptions>) -> Self {
30 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
31
32 let log_event_url = options
33 .and_then(|opts| opts.log_event_url.as_ref())
34 .map(|u| u.to_string())
35 .unwrap_or_else(|| DEFAULT_LOG_EVENT_URL.to_string());
36
37 Self {
38 log_event_url,
39 network: NetworkClient::new(sdk_key, Some(headers), options),
40 }
41 }
42
43 pub async fn send_events_over_http(&self, request: &LogEventRequest) -> Result<(), StatsigErr> {
44 log_d!(
45 TAG,
46 "Logging Events ({}): {}",
47 &request.event_count,
48 json!(&request.payload).to_string()
49 );
50
51 let compression_format = get_compression_format();
52
53 let headers = HashMap::from([
55 (
56 "statsig-event-count".to_string(),
57 request.event_count.to_string(),
58 ),
59 (
60 "statsig-retry-count".to_string(),
61 request.retries.to_string(),
62 ),
63 ("Content-Encoding".to_owned(), compression_format.clone()),
64 ("Content-Type".to_owned(), "application/json".to_owned()),
65 ]);
66
67 let bytes = serde_json::to_vec(&request.payload)
69 .map_err(|e| StatsigErr::SerializationError(e.to_string()))?;
70 let compressed = match compress_data(&bytes) {
71 Ok(c) => c,
72 Err(e) => return Err(e),
73 };
74
75 let response = self
77 .network
78 .post(
79 RequestArgs {
80 url: self.log_event_url.clone(),
81 headers: Some(headers),
82 accept_gzip_response: true,
83 ..RequestArgs::new()
84 },
85 Some(compressed),
86 )
87 .await
88 .map_err(StatsigErr::NetworkError)?;
89
90 let mut res_data = match response.data {
91 Some(data) => data,
92 None => {
93 return Err(StatsigErr::NetworkError(NetworkError::RequestFailed(
94 self.log_event_url.clone(),
95 response.status_code,
96 "Empty response from network".to_string(),
97 )));
98 }
99 };
100
101 let result = res_data
102 .deserialize_into::<LogEventResult>()
103 .map(|result| result.success != Some(false))
104 .map_err(|e| {
105 StatsigErr::JsonParseError(stringify!(LogEventResult).to_string(), e.to_string())
106 })?;
107
108 if result {
109 Ok(())
110 } else {
111 Err(StatsigErr::LogEventError(
112 "Unsuccessful response from network".into(),
113 ))
114 }
115 }
116}
117
118#[async_trait]
119impl EventLoggingAdapter for StatsigHttpEventLoggingAdapter {
120 async fn start(&self, _statsig_runtime: &Arc<StatsigRuntime>) -> Result<(), StatsigErr> {
121 Ok(())
122 }
123
124 async fn log_events(&self, request: LogEventRequest) -> Result<bool, StatsigErr> {
125 match self.send_events_over_http(&request).await {
126 Ok(()) => Ok(true),
127 Err(e) => Err(e),
128 }
129 }
130
131 async fn shutdown(&self) -> Result<(), StatsigErr> {
132 Ok(())
133 }
134
135 fn should_schedule_background_flush(&self) -> bool {
136 true
137 }
138}
139
140#[cfg(not(feature = "with_zstd"))]
141#[tokio::test]
142async fn test_event_logging() {
143 use crate::log_event_payload::{LogEventPayload, LogEventRequest};
144 use std::env;
145
146 let sdk_key = env::var("test_api_key").expect("test_api_key environment variable not set");
147
148 let adapter = StatsigHttpEventLoggingAdapter::new(&sdk_key, None);
149
150 let payload_str = r#"{"events":[{"eventName":"statsig::config_exposure","metadata":{"config":"running_exp_in_unlayered_with_holdout","ruleID":"5suobe8yyvznqasn9Ph1dI"},"secondaryExposures":[{"gate":"global_holdout","gateValue":"false","ruleID":"3QoA4ncNdVGBaMt3N1KYjz:0.50:1"},{"gate":"exp_holdout","gateValue":"false","ruleID":"1rEqLOpCROaRafv7ubGgax"}],"time":1722386636538,"user":{"appVersion":null,"country":null,"custom":null,"customIDs":null,"email":"daniel@statsig.com","ip":null,"locale":null,"privateAttributes":null,"statsigEnvironment":null,"userAgent":null,"userID":"a-user"},"value":null}],"statsigMetadata":{"sdk_type":"statsig-server-core","sdk_version":"0.0.1"}}"#;
151 let payload = serde_json::from_str::<LogEventPayload>(payload_str).unwrap();
152
153 let request = LogEventRequest {
154 payload,
155 event_count: 1,
156 retries: 0,
157 };
158
159 let result = adapter.log_events(request).await;
160
161 assert!(result.is_ok(), "Error logging events: {:?}", result.err());
162}