statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::{
11 log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
12};
13use async_trait::async_trait;
14use chrono::Utc;
15use percent_encoding::percent_encode;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock, Weak};
18use std::time::Duration;
19use tokio::sync::Notify;
20use tokio::time::sleep;
21
22use super::{ConfigCompressionMode, SpecsInfo};
23
24pub struct NetworkResponse {
25 pub data: Vec<u8>,
26 pub api: String,
27}
28
29pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
30pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
31pub const INIT_DICT_ID: &str = "null";
32
33const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
34pub struct StatsigHttpSpecsAdapter {
35 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
36 network: NetworkClient,
37 sdk_key: String,
38 specs_url: String,
39 fallback_url: Option<String>,
40 sync_interval_duration: Duration,
41 config_compression_mode: ConfigCompressionMode,
42 ops_stats: Arc<OpsStatsForInstance>,
43 shutdown_notify: Arc<Notify>,
44}
45
46impl StatsigHttpSpecsAdapter {
47 #[must_use]
48 pub fn new(
49 sdk_key: &str,
50 options: Option<&StatsigOptions>,
51 override_url: Option<String>,
52 ) -> Self {
53 let default_options = StatsigOptions::default();
54 let options_ref = options.unwrap_or(&default_options);
55
56 let specs_url = match override_url {
57 Some(url) => url,
58 None => options_ref
59 .specs_url
60 .as_ref()
61 .map(|u| u.to_string())
62 .unwrap_or(DEFAULT_SPECS_URL.to_string()),
63 };
64
65 let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
67 && specs_url != DEFAULT_SPECS_URL
68 {
69 Some(DEFAULT_SPECS_URL.to_string())
70 } else {
71 None
72 };
73
74 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
75
76 Self {
77 listener: RwLock::new(None),
78 network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
79 sdk_key: sdk_key.to_string(),
80 specs_url,
81 fallback_url,
82 sync_interval_duration: Duration::from_millis(u64::from(
83 options_ref
84 .specs_sync_interval_ms
85 .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
86 )),
87 ops_stats: OPS_STATS.get_for_instance(sdk_key),
88 shutdown_notify: Arc::new(Notify::new()),
89 config_compression_mode: options_ref
90 .config_compression_mode
91 .clone()
92 .unwrap_or(ConfigCompressionMode::Gzip),
93 }
94 }
95
96 pub async fn fetch_specs_from_network(
97 &self,
98 current_specs_info: SpecsInfo,
99 ) -> Result<NetworkResponse, NetworkError> {
100 let request_args = self.get_request_args(¤t_specs_info);
101 let url = request_args.url.clone();
102 match self.handle_specs_request(request_args).await {
103 Ok(response) => Ok(NetworkResponse {
104 data: response,
105 api: get_api_from_url(&url),
106 }),
107 Err(e) => Err(e),
108 }
109 }
110
111 fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
112 let mut params = HashMap::new();
113 if let Some(lcut) = current_specs_info.lcut {
114 params.insert("sinceTime".to_string(), lcut.to_string());
115 }
116 if let Some(cs) = ¤t_specs_info.checksum {
117 params.insert(
118 "checksum".to_string(),
119 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
120 );
121 }
122
123 RequestArgs {
124 url: construct_specs_url(
125 &self.config_compression_mode,
126 self.specs_url.as_str(),
127 self.sdk_key.as_str(),
128 current_specs_info.zstd_dict_id.as_deref(),
129 ),
130 query_params: Some(params),
131 accept_gzip_response: true,
132 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
133 ..RequestArgs::new()
134 }
135 }
136
137 async fn handle_fallback_request(
138 &self,
139 mut request_args: RequestArgs,
140 current_specs_info: SpecsInfo,
141 ) -> Result<NetworkResponse, NetworkError> {
142 let fallback_url = match &self.fallback_url {
143 Some(url) => construct_specs_url(
144 &self.config_compression_mode,
145 url.as_str(),
146 &self.sdk_key,
147 current_specs_info.zstd_dict_id.as_deref(),
148 ),
149 None => return Err(NetworkError::RequestFailed),
150 };
151
152 request_args.url = fallback_url.clone();
153
154 let response = self.handle_specs_request(request_args).await?;
157 Ok(NetworkResponse {
158 data: response,
159 api: get_api_from_url(&fallback_url),
160 })
161 }
162
163 async fn handle_specs_request(
165 &self,
166 request_args: RequestArgs,
167 ) -> Result<Vec<u8>, NetworkError> {
168 let response = self.network.get(request_args).await?;
169 match response.data {
170 Some(data) => Ok(data),
171 None => Err(NetworkError::RequestFailed),
172 }
173 }
174
175 pub async fn run_background_sync(weak_self: &Weak<Self>) {
176 let strong_self = if let Some(s) = weak_self.upgrade() {
177 s
178 } else {
179 log_e!(TAG, "No strong reference found");
180 return;
181 };
182
183 let specs_info = match strong_self.listener.read() {
184 Ok(lock) => match lock.as_ref() {
185 Some(listener) => listener.get_current_specs_info(),
186 None => SpecsInfo::empty(),
187 },
188 Err(_) => SpecsInfo::error(),
189 };
190
191 strong_self
192 .ops_stats
193 .set_diagnostics_context(ContextType::ConfigSync);
194 if let Err(e) = strong_self.manually_sync_specs(specs_info).await {
195 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _) = e {
196 return;
197 }
198 log_e!(TAG, "Background specs sync failed: {}", e);
199 }
200 strong_self.ops_stats.enqueue_diagnostics_event(
201 Some(KeyType::DownloadConfigSpecs),
202 Some(ContextType::ConfigSync),
203 );
204 }
205
206 async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
207 if let Ok(lock) = self.listener.read() {
208 if lock.is_none() {
209 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
210 }
211 }
212
213 let response = self
214 .fetch_specs_from_network(current_specs_info.clone())
215 .await;
216 let result = self.process_spec_data(response).await;
217
218 if result.is_err() && self.fallback_url.is_some() {
219 log_d!(TAG, "Falling back to statsig api");
220 let response = self
221 .handle_fallback_request(
222 self.get_request_args(¤t_specs_info),
223 current_specs_info,
224 )
225 .await;
226 return self.process_spec_data(response).await;
227 }
228
229 result
230 }
231
232 async fn process_spec_data(
233 &self,
234 response: Result<NetworkResponse, NetworkError>,
235 ) -> Result<(), StatsigErr> {
236 let resp = response.map_err(|e| {
237 let msg = "No specs result from network";
238 StatsigErr::NetworkError(e, Some(msg.to_string()))
239 })?;
240
241 let update = SpecsUpdate {
242 data: resp.data,
243 source: SpecsSource::Network,
244 received_at: Utc::now().timestamp_millis() as u64,
245 source_api: Some(resp.api),
246 };
247
248 self.ops_stats.add_marker(
249 Marker::new(
250 KeyType::DownloadConfigSpecs,
251 ActionType::Start,
252 Some(StepType::Process),
253 ),
254 None,
255 );
256
257 let result = match self.listener.read() {
258 Ok(lock) => match lock.as_ref() {
259 Some(listener) => listener.did_receive_specs_update(update),
260 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
261 },
262 Err(e) => {
263 let err = StatsigErr::LockFailure(format!(
264 "Failed to acquire read lock on listener: {}",
265 e
266 ));
267 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
268 Err(err)
269 }
270 };
271
272 self.ops_stats.add_marker(
273 Marker::new(
274 KeyType::DownloadConfigSpecs,
275 ActionType::End,
276 Some(StepType::Process),
277 )
278 .with_is_success(result.is_ok()),
279 None,
280 );
281
282 result
283 }
284}
285
286#[async_trait]
287impl SpecsAdapter for StatsigHttpSpecsAdapter {
288 async fn start(
289 self: Arc<Self>,
290 _statsig_runtime: &Arc<StatsigRuntime>,
291 ) -> Result<(), StatsigErr> {
292 let specs_info = match self.listener.read() {
293 Ok(lock) => match lock.as_ref() {
294 Some(listener) => listener.get_current_specs_info(),
295 None => SpecsInfo::empty(),
296 },
297 Err(_) => SpecsInfo::error(),
298 };
299 self.manually_sync_specs(specs_info).await
300 }
301
302 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
303 match self.listener.write() {
304 Ok(mut lock) => *lock = Some(listener),
305 Err(e) => {
306 log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
307 }
308 }
309 }
310
311 async fn schedule_background_sync(
312 self: Arc<Self>,
313 statsig_runtime: &Arc<StatsigRuntime>,
314 ) -> Result<(), StatsigErr> {
315 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
316 let interval_duration = self.sync_interval_duration;
317 let shutdown_notify = self.shutdown_notify.clone();
318
319 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
320 loop {
321 tokio::select! {
322 () = sleep(interval_duration) => {
323 Self::run_background_sync(&weak_self).await;
324 }
325 () = rt_shutdown_notify.notified() => {
326 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
327 break;
328 },
329 () = shutdown_notify.notified() => {
330 log_d!(TAG, "Shutting down specs background sync");
331 break;
332 }
333 }
334 }
335 });
336
337 Ok(())
338 }
339
340 async fn shutdown(
341 &self,
342 _timeout: Duration,
343 _statsig_runtime: &Arc<StatsigRuntime>,
344 ) -> Result<(), StatsigErr> {
345 self.shutdown_notify.notify_one();
346 Ok(())
347 }
348
349 fn get_type_name(&self) -> String {
350 stringify!(StatsigHttpSpecsAdapter).to_string()
351 }
352}
353
354fn construct_specs_url(
355 compression_mode: &ConfigCompressionMode,
356 spec_url: &str,
357 sdk_key: &str,
358 dict_id: Option<&str>,
359) -> String {
360 match compression_mode {
361 ConfigCompressionMode::Gzip => format!("{spec_url}/{sdk_key}.json"),
362 ConfigCompressionMode::Dictionary => {
363 let dict_id = dict_id.unwrap_or(INIT_DICT_ID);
364 format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
365 }
366 }
367}