statsig_rust/event_logging_adapter/
statsig_http_event_logging_adapter.rs1use crate::compression::compression_helper::{compress_data, get_compression_format};
2use crate::event_logging_adapter::EventLoggingAdapter;
3use crate::log_event_payload::LogEventRequest;
4use crate::networking::{NetworkClient, NetworkError, RequestArgs};
5use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
6use crate::observability::ErrorBoundaryEvent;
7use crate::statsig_metadata::StatsigMetadata;
8use crate::{log_d, StatsigErr, StatsigRuntime};
9use async_trait::async_trait;
10use serde::Deserialize;
11use serde_json::json;
12use std::collections::HashMap;
13use std::sync::Arc;
14
15const DEFAULT_LOG_EVENT_URL: &str = "https://prodregistryv2.org/v1/log_event";
16
17#[derive(Deserialize)]
18struct LogEventResult {
19 success: Option<bool>,
20}
21
22const TAG: &str = stringify!(StatsigHttpEventLoggingAdapter);
23
24pub struct StatsigHttpEventLoggingAdapter {
25 log_event_url: String,
26 network: NetworkClient,
27 ops_stats: Arc<OpsStatsForInstance>,
28}
29
30impl StatsigHttpEventLoggingAdapter {
31 #[must_use]
32 pub fn new(
33 sdk_key: &str,
34 log_event_url: Option<&String>,
35 disable_network: Option<bool>,
36 ) -> Self {
37 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
38
39 let log_event_url = match log_event_url {
40 Some(u) => u,
41 _ => DEFAULT_LOG_EVENT_URL,
42 }
43 .to_string();
44 Self {
45 log_event_url,
46 network: NetworkClient::new(sdk_key, Some(headers), disable_network),
47 ops_stats: OPS_STATS.get_for_instance(sdk_key),
48 }
49 }
50
51 pub async fn send_events_over_http(
52 &self,
53 request: &LogEventRequest,
54 ) -> Result<bool, StatsigErr> {
55 log_d!(
56 TAG,
57 "Logging Events ({}): {}",
58 &request.event_count,
59 json!(&request.payload).to_string()
60 );
61
62 let compression_format = get_compression_format();
63
64 let headers = HashMap::from([
66 (
67 "statsig-event-count".to_string(),
68 request.event_count.to_string(),
69 ),
70 ("Content-Encoding".to_owned(), compression_format.clone()),
71 ("Content-Type".to_owned(), "application/json".to_owned()),
72 ]);
73
74 let bytes = serde_json::to_vec(&request.payload)
76 .map_err(|e| StatsigErr::SerializationError(e.to_string()))?;
77 let compressed = match compress_data(&bytes) {
78 Ok(c) => c,
79 Err(e) => return Err(e),
80 };
81
82 let response_str = self
84 .network
85 .post(
86 RequestArgs {
87 url: self.log_event_url.clone(),
88 retries: 3,
89 headers: Some(headers),
90 accept_gzip_response: true,
91 ..RequestArgs::default()
92 },
93 Some(compressed),
94 )
95 .await
96 .map_err(|err| StatsigErr::NetworkError(err, Some("Log event failure".into())))?;
97
98 serde_json::from_str::<LogEventResult>(&response_str)
99 .map(|result| result.success != Some(false))
100 .map_err(|e| {
101 StatsigErr::JsonParseError(stringify!(LogEventResult).to_string(), e.to_string())
102 })
103 }
104}
105
106#[async_trait]
107impl EventLoggingAdapter for StatsigHttpEventLoggingAdapter {
108 async fn start(&self, _statsig_runtime: &Arc<StatsigRuntime>) -> Result<(), StatsigErr> {
109 Ok(())
110 }
111
112 #[allow(clippy::manual_inspect)]
113 async fn log_events(&self, request: LogEventRequest) -> Result<bool, StatsigErr> {
114 match self.send_events_over_http(&request).await {
115 Ok(_) => Ok(true),
116 Err(StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _)) => Ok(false),
117 Err(e) => {
118 self.ops_stats.log_error(ErrorBoundaryEvent {
119 exception: "LogEventFailed".to_string(),
120 tag: "statsig::log_event_failed".to_string(),
121 extra: Some(HashMap::from([(
122 "eventCount".to_string(),
123 request.event_count.to_string(),
124 )])),
125 });
126 Err(e)
127 }
128 }
129 }
130
131 async fn shutdown(&self) -> Result<(), StatsigErr> {
132 Ok(())
133 }
134
135 fn should_schedule_background_flush(&self) -> bool {
136 true
137 }
138}
139
140#[cfg(not(feature = "with_zstd"))]
141#[tokio::test]
142async fn test_event_logging() {
143 use crate::log_event_payload::{LogEventPayload, LogEventRequest};
144 use std::env;
145
146 let sdk_key = env::var("test_api_key").expect("test_api_key environment variable not set");
147
148 let adapter = StatsigHttpEventLoggingAdapter::new(&sdk_key, None, None);
149
150 let payload_str = r#"{"events":[{"eventName":"statsig::config_exposure","metadata":{"config":"running_exp_in_unlayered_with_holdout","ruleID":"5suobe8yyvznqasn9Ph1dI"},"secondaryExposures":[{"gate":"global_holdout","gateValue":"false","ruleID":"3QoA4ncNdVGBaMt3N1KYjz:0.50:1"},{"gate":"exp_holdout","gateValue":"false","ruleID":"1rEqLOpCROaRafv7ubGgax"}],"time":1722386636538,"user":{"appVersion":null,"country":null,"custom":null,"customIDs":null,"email":"daniel@statsig.com","ip":null,"locale":null,"privateAttributes":null,"statsigEnvironment":null,"userAgent":null,"userID":"a-user"},"value":null}],"statsigMetadata":{"sdk_type":"statsig-server-core","sdk_version":"0.0.1"}}"#;
151 let payload = serde_json::from_str::<LogEventPayload>(payload_str).unwrap();
152
153 let request = LogEventRequest {
154 payload,
155 event_count: 1,
156 };
157
158 let result = adapter.log_events(request).await;
159
160 assert!(result.is_ok(), "Error logging events: {:?}", result.err());
161}