statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs1use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::DEFAULT_INIT_TIMEOUT_MS;
11use crate::{
12 log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
13};
14use async_trait::async_trait;
15use chrono::Utc;
16use parking_lot::RwLock;
17use percent_encoding::percent_encode;
18use std::collections::HashMap;
19use std::sync::{Arc, Weak};
20use std::time::Duration;
21use tokio::sync::Notify;
22use tokio::time::sleep;
23
24use super::SpecsInfo;
25
26pub struct NetworkResponse {
27 pub data: ResponseData,
28 pub api: String,
29}
30
31pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
32pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
33
34#[allow(unused)]
35pub const INIT_DICT_ID: &str = "null";
36
37const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
38pub struct StatsigHttpSpecsAdapter {
39 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
40 network: NetworkClient,
41 sdk_key: String,
42 specs_url: String,
43 fallback_url: Option<String>,
44 init_timeout_ms: u64,
45 sync_interval_duration: Duration,
46 ops_stats: Arc<OpsStatsForInstance>,
47 shutdown_notify: Arc<Notify>,
48}
49
50impl StatsigHttpSpecsAdapter {
51 #[must_use]
52 pub fn new(
53 sdk_key: &str,
54 options: Option<&StatsigOptions>,
55 override_url: Option<String>,
56 ) -> Self {
57 let default_options = StatsigOptions::default();
58 let options_ref = options.unwrap_or(&default_options);
59
60 let init_timeout_ms = options_ref
61 .init_timeout_ms
62 .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
63
64 let specs_url = match override_url {
65 Some(url) => url,
66 None => options_ref
67 .specs_url
68 .as_ref()
69 .map(|u| u.to_string())
70 .unwrap_or(DEFAULT_SPECS_URL.to_string()),
71 };
72
73 let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
75 && specs_url != DEFAULT_SPECS_URL
76 {
77 Some(DEFAULT_SPECS_URL.to_string())
78 } else {
79 None
80 };
81
82 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
83
84 Self {
85 listener: RwLock::new(None),
86 network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
87 sdk_key: sdk_key.to_string(),
88 specs_url,
89 fallback_url,
90 init_timeout_ms,
91 sync_interval_duration: Duration::from_millis(u64::from(
92 options_ref
93 .specs_sync_interval_ms
94 .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
95 )),
96 ops_stats: OPS_STATS.get_for_instance(sdk_key),
97 shutdown_notify: Arc::new(Notify::new()),
98 }
99 }
100
101 pub fn force_shutdown(&self) {
102 self.shutdown_notify.notify_one();
103 }
104
105 pub async fn fetch_specs_from_network(
106 &self,
107 current_specs_info: SpecsInfo,
108 trigger: SpecsSyncTrigger,
109 ) -> Result<NetworkResponse, NetworkError> {
110 let request_args = self.get_request_args(¤t_specs_info, trigger);
111 let url = request_args.url.clone();
112 match self.handle_specs_request(request_args).await {
113 Ok(response) => Ok(NetworkResponse {
114 data: response,
115 api: get_api_from_url(&url),
116 }),
117 Err(e) => Err(e),
118 }
119 }
120
121 fn get_request_args(
122 &self,
123 current_specs_info: &SpecsInfo,
124 trigger: SpecsSyncTrigger,
125 ) -> RequestArgs {
126 let mut params = HashMap::new();
127 if let Some(lcut) = current_specs_info.lcut {
128 if lcut > 0 {
129 params.insert("sinceTime".to_string(), lcut.to_string());
130 }
131 }
132
133 let is_init_request = trigger == SpecsSyncTrigger::Initial;
134
135 let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
136 self.init_timeout_ms
137 } else {
138 0
139 };
140
141 if let Some(cs) = ¤t_specs_info.checksum {
142 params.insert(
143 "checksum".to_string(),
144 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
145 );
146 }
147
148 RequestArgs {
149 url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
150 retries: match trigger {
151 SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
152 SpecsSyncTrigger::Background => 3,
153 },
154 query_params: Some(params),
155 accept_gzip_response: true,
156 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
157 timeout_ms,
158 ..RequestArgs::new()
159 }
160 }
161
162 async fn handle_fallback_request(
163 &self,
164 mut request_args: RequestArgs,
165 ) -> Result<NetworkResponse, NetworkError> {
166 let fallback_url = match &self.fallback_url {
167 Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
168 None => {
169 return Err(NetworkError::RequestFailed(
170 request_args.url.clone(),
171 None,
172 "No fallback URL".to_string(),
173 ))
174 }
175 };
176
177 request_args.url = fallback_url.clone();
178
179 let response = self.handle_specs_request(request_args).await?;
182 Ok(NetworkResponse {
183 data: response,
184 api: get_api_from_url(&fallback_url),
185 })
186 }
187
188 async fn handle_specs_request(
189 &self,
190 request_args: RequestArgs,
191 ) -> Result<ResponseData, NetworkError> {
192 let url = request_args.url.clone();
193 let response = self.network.get(request_args).await?;
194 match response.data {
195 Some(data) => Ok(data),
196 None => Err(NetworkError::RequestFailed(
197 url,
198 None,
199 "No data in response".to_string(),
200 )),
201 }
202 }
203
204 pub async fn run_background_sync(self: Arc<Self>) {
205 let specs_info = match self
206 .listener
207 .try_read_for(std::time::Duration::from_secs(5))
208 {
209 Some(lock) => match lock.as_ref() {
210 Some(listener) => listener.get_current_specs_info(),
211 None => SpecsInfo::empty(),
212 },
213 None => SpecsInfo::error(),
214 };
215
216 self.ops_stats
217 .set_diagnostics_context(ContextType::ConfigSync);
218 if let Err(e) = self
219 .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
220 .await
221 {
222 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
223 return;
224 }
225 log_e!(TAG, "Background specs sync failed: {}", e);
226 }
227 self.ops_stats.enqueue_diagnostics_event(
228 Some(KeyType::DownloadConfigSpecs),
229 Some(ContextType::ConfigSync),
230 );
231 }
232
233 async fn manually_sync_specs(
234 &self,
235 current_specs_info: SpecsInfo,
236 trigger: SpecsSyncTrigger,
237 ) -> Result<(), StatsigErr> {
238 if let Some(lock) = self
239 .listener
240 .try_read_for(std::time::Duration::from_secs(5))
241 {
242 if lock.is_none() {
243 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
244 }
245 }
246
247 let response = self
248 .fetch_specs_from_network(current_specs_info.clone(), trigger)
249 .await;
250 let result = self.process_spec_data(response).await;
251
252 if result.is_err() && self.fallback_url.is_some() {
253 log_d!(TAG, "Falling back to statsig api");
254 let response = self
255 .handle_fallback_request(self.get_request_args(¤t_specs_info, trigger))
256 .await;
257 return self.process_spec_data(response).await;
258 }
259
260 result
261 }
262
263 async fn process_spec_data(
264 &self,
265 response: Result<NetworkResponse, NetworkError>,
266 ) -> Result<(), StatsigErr> {
267 let resp = response.map_err(StatsigErr::NetworkError)?;
268
269 let update = SpecsUpdate {
270 data: resp.data,
271 source: SpecsSource::Network,
272 received_at: Utc::now().timestamp_millis() as u64,
273 source_api: Some(resp.api),
274 };
275
276 self.ops_stats.add_marker(
277 Marker::new(
278 KeyType::DownloadConfigSpecs,
279 ActionType::Start,
280 Some(StepType::Process),
281 ),
282 None,
283 );
284
285 let result = match self
286 .listener
287 .try_read_for(std::time::Duration::from_secs(5))
288 {
289 Some(lock) => match lock.as_ref() {
290 Some(listener) => listener.did_receive_specs_update(update),
291 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
292 },
293 None => {
294 let err =
295 StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
296 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
297 Err(err)
298 }
299 };
300
301 self.ops_stats.add_marker(
302 Marker::new(
303 KeyType::DownloadConfigSpecs,
304 ActionType::End,
305 Some(StepType::Process),
306 )
307 .with_is_success(result.is_ok()),
308 None,
309 );
310
311 result
312 }
313}
314
315#[async_trait]
316impl SpecsAdapter for StatsigHttpSpecsAdapter {
317 async fn start(
318 self: Arc<Self>,
319 _statsig_runtime: &Arc<StatsigRuntime>,
320 ) -> Result<(), StatsigErr> {
321 let specs_info = match self
322 .listener
323 .try_read_for(std::time::Duration::from_secs(5))
324 {
325 Some(lock) => match lock.as_ref() {
326 Some(listener) => listener.get_current_specs_info(),
327 None => SpecsInfo::empty(),
328 },
329 None => SpecsInfo::error(),
330 };
331 self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
332 .await
333 }
334
335 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
336 match self
337 .listener
338 .try_write_for(std::time::Duration::from_secs(5))
339 {
340 Some(mut lock) => *lock = Some(listener),
341 None => {
342 log_e!(TAG, "Failed to acquire write lock on listener");
343 }
344 }
345 }
346
347 async fn schedule_background_sync(
348 self: Arc<Self>,
349 statsig_runtime: &Arc<StatsigRuntime>,
350 ) -> Result<(), StatsigErr> {
351 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
352 let interval_duration = self.sync_interval_duration;
353 let shutdown_notify = self.shutdown_notify.clone();
354
355 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
356 loop {
357 tokio::select! {
358 () = sleep(interval_duration) => {
359 if let Some(strong_self) = weak_self.upgrade() {
360 Self::run_background_sync(strong_self).await;
361 } else {
362 log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
363 break;
364 }
365 }
366 () = rt_shutdown_notify.notified() => {
367 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
368 break;
369 },
370 () = shutdown_notify.notified() => {
371 log_d!(TAG, "Shutting down specs background sync");
372 break;
373 }
374 }
375 }
376 })?;
377
378 Ok(())
379 }
380
381 async fn shutdown(
382 &self,
383 _timeout: Duration,
384 _statsig_runtime: &Arc<StatsigRuntime>,
385 ) -> Result<(), StatsigErr> {
386 self.shutdown_notify.notify_one();
387 Ok(())
388 }
389
390 fn get_type_name(&self) -> String {
391 stringify!(StatsigHttpSpecsAdapter).to_string()
392 }
393}
394
395#[allow(unused)]
396fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
397 format!("{spec_url}/{sdk_key}.json")
398}
399
400#[derive(Debug, Clone, Copy, PartialEq, Eq)]
401pub enum SpecsSyncTrigger {
402 Initial,
403 Background,
404 Manual,
405}