1use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::DEFAULT_INIT_TIMEOUT_MS;
11use crate::{
12 log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
13};
14use async_trait::async_trait;
15use chrono::Utc;
16use parking_lot::RwLock;
17use percent_encoding::percent_encode;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::{Arc, Weak};
21use std::time::Duration;
22use tokio::sync::Notify;
23use tokio::time::sleep;
24
25use super::SpecsInfo;
26
27pub struct NetworkResponse {
28 pub data: ResponseData,
29 pub api: String,
30 pub requested_deltas: bool,
31}
32
33pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
34pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
35
36#[allow(unused)]
37pub const INIT_DICT_ID: &str = "null";
38
39const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
40pub struct StatsigHttpSpecsAdapter {
41 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
42 network: NetworkClient,
43 sdk_key: String,
44 specs_url: String,
45 fallback_url: Option<String>,
46 init_timeout_ms: u64,
47 sync_interval_duration: Duration,
48 ops_stats: Arc<OpsStatsForInstance>,
49 shutdown_notify: Arc<Notify>,
50 allow_dcs_deltas: bool,
51 use_deltas_next_request: AtomicBool,
52}
53
54impl StatsigHttpSpecsAdapter {
55 #[must_use]
56 pub fn new(
57 sdk_key: &str,
58 options: Option<&StatsigOptions>,
59 override_url: Option<String>,
60 ) -> Self {
61 let default_options = StatsigOptions::default();
62 let options_ref = options.unwrap_or(&default_options);
63
64 let init_timeout_ms = options_ref
65 .init_timeout_ms
66 .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
67
68 let specs_url = match override_url {
69 Some(url) => url,
70 None => options_ref
71 .specs_url
72 .as_ref()
73 .map(|u| u.to_string())
74 .unwrap_or(DEFAULT_SPECS_URL.to_string()),
75 };
76
77 let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
79 && specs_url != DEFAULT_SPECS_URL
80 {
81 Some(DEFAULT_SPECS_URL.to_string())
82 } else {
83 None
84 };
85
86 let headers = StatsigMetadata::get_constant_request_headers(
87 sdk_key,
88 options_ref.service_name.as_deref(),
89 );
90 let enable_dcs_deltas = options_ref.enable_dcs_deltas.unwrap_or(false);
91
92 Self {
93 listener: RwLock::new(None),
94 network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
95 sdk_key: sdk_key.to_string(),
96 specs_url,
97 fallback_url,
98 init_timeout_ms,
99 sync_interval_duration: Duration::from_millis(u64::from(
100 options_ref
101 .specs_sync_interval_ms
102 .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
103 )),
104 ops_stats: OPS_STATS.get_for_instance(sdk_key),
105 shutdown_notify: Arc::new(Notify::new()),
106 allow_dcs_deltas: enable_dcs_deltas,
107 use_deltas_next_request: AtomicBool::new(enable_dcs_deltas),
108 }
109 }
110
111 pub fn force_shutdown(&self) {
112 self.shutdown_notify.notify_one();
113 }
114
115 pub async fn fetch_specs_from_network(
116 &self,
117 current_specs_info: SpecsInfo,
118 trigger: SpecsSyncTrigger,
119 ) -> Result<NetworkResponse, NetworkError> {
120 let request_args = self.get_request_args(¤t_specs_info, trigger);
121 let url = request_args.url.clone();
122 let requested_deltas = request_args.deltas_enabled;
123 match self.handle_specs_request(request_args).await {
124 Ok(response) => Ok(NetworkResponse {
125 data: response,
126 api: get_api_from_url(&url),
127 requested_deltas,
128 }),
129 Err(e) => Err(e),
130 }
131 }
132
133 fn get_request_args(
134 &self,
135 current_specs_info: &SpecsInfo,
136 trigger: SpecsSyncTrigger,
137 ) -> RequestArgs {
138 let mut params = HashMap::new();
139
140 params.insert("supports_proto".to_string(), "true".to_string());
141 let headers = Some(HashMap::from([
142 ("statsig-supports-proto".to_string(), "true".to_string()),
143 (
144 "accept-encoding".to_string(),
145 "statsig-br, gzip, deflate, br".to_string(),
146 ),
147 ]));
148
149 if let Some(lcut) = current_specs_info.lcut {
150 if lcut > 0 {
151 params.insert("sinceTime".to_string(), lcut.to_string());
152 }
153 }
154
155 let is_init_request = trigger == SpecsSyncTrigger::Initial;
156
157 let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
158 self.init_timeout_ms
159 } else {
160 0
161 };
162
163 if let Some(cs) = ¤t_specs_info.checksum {
164 params.insert(
165 "checksum".to_string(),
166 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
167 );
168 }
169
170 let use_deltas_next_req = self.use_deltas_next_request.load(Ordering::SeqCst);
171 if use_deltas_next_req {
172 params.insert("accept_deltas".to_string(), "true".to_string());
173 }
174
175 RequestArgs {
176 url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
177 retries: match trigger {
178 SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
179 SpecsSyncTrigger::Background => 3,
180 },
181 query_params: Some(params),
182 deltas_enabled: use_deltas_next_req,
183 accept_gzip_response: true,
184 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
185 timeout_ms,
186 headers,
187 ..RequestArgs::new()
188 }
189 }
190
191 async fn handle_fallback_request(
192 &self,
193 mut request_args: RequestArgs,
194 ) -> Result<NetworkResponse, NetworkError> {
195 let requested_deltas = request_args.deltas_enabled;
196 let fallback_url = match &self.fallback_url {
197 Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
198 None => {
199 return Err(NetworkError::RequestFailed(
200 request_args.url.clone(),
201 None,
202 "No fallback URL".to_string(),
203 ))
204 }
205 };
206
207 request_args.url = fallback_url.clone();
208
209 let response = self.handle_specs_request(request_args).await?;
212 Ok(NetworkResponse {
213 data: response,
214 api: get_api_from_url(&fallback_url),
215 requested_deltas,
216 })
217 }
218
219 async fn handle_specs_request(
220 &self,
221 request_args: RequestArgs,
222 ) -> Result<ResponseData, NetworkError> {
223 let url = request_args.url.clone();
224 let response = self.network.get(request_args).await?;
225 match response.data {
226 Some(data) => Ok(data),
227 None => Err(NetworkError::RequestFailed(
228 url,
229 None,
230 response.error.unwrap_or("No data in response".to_string()),
231 )),
232 }
233 }
234
235 pub async fn run_background_sync(self: Arc<Self>) {
236 let specs_info = match self
237 .listener
238 .try_read_for(std::time::Duration::from_secs(5))
239 {
240 Some(lock) => match lock.as_ref() {
241 Some(listener) => listener.get_current_specs_info(),
242 None => SpecsInfo::empty(),
243 },
244 None => SpecsInfo::error(),
245 };
246
247 self.ops_stats
248 .set_diagnostics_context(ContextType::ConfigSync);
249 if let Err(e) = self
250 .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
251 .await
252 {
253 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
254 return;
255 }
256 log_e!(TAG, "Background specs sync failed: {}", e);
257 }
258 self.ops_stats.enqueue_diagnostics_event(
259 Some(KeyType::DownloadConfigSpecs),
260 Some(ContextType::ConfigSync),
261 );
262 }
263
264 async fn manually_sync_specs(
265 &self,
266 current_specs_info: SpecsInfo,
267 trigger: SpecsSyncTrigger,
268 ) -> Result<(), StatsigErr> {
269 if let Some(lock) = self
270 .listener
271 .try_read_for(std::time::Duration::from_secs(5))
272 {
273 if lock.is_none() {
274 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
275 }
276 }
277
278 let response = self
279 .fetch_specs_from_network(current_specs_info.clone(), trigger)
280 .await;
281 let result = self.process_spec_data(response).await;
282
283 if result.is_err() && self.fallback_url.is_some() {
284 log_d!(TAG, "Falling back to statsig api");
285 let response = self
286 .handle_fallback_request(self.get_request_args(¤t_specs_info, trigger))
287 .await;
288 return self.process_spec_data(response).await;
289 }
290
291 result
292 }
293
294 async fn process_spec_data(
295 &self,
296 response: Result<NetworkResponse, NetworkError>,
297 ) -> Result<(), StatsigErr> {
298 let resp = response.map_err(StatsigErr::NetworkError)?;
299 let requested_deltas = resp.requested_deltas;
300
301 let update = SpecsUpdate {
302 data: resp.data,
303 source: SpecsSource::Network,
304 received_at: Utc::now().timestamp_millis() as u64,
305 source_api: Some(resp.api),
306 };
307
308 self.ops_stats.add_marker(
309 Marker::new(
310 KeyType::DownloadConfigSpecs,
311 ActionType::Start,
312 Some(StepType::Process),
313 ),
314 None,
315 );
316
317 let result = match self
318 .listener
319 .try_read_for(std::time::Duration::from_secs(5))
320 {
321 Some(lock) => match lock.as_ref() {
322 Some(listener) => listener.did_receive_specs_update(update),
323 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
324 },
325 None => {
326 let err =
327 StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
328 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
329 Err(err)
330 }
331 };
332
333 if matches!(&result, Err(StatsigErr::ChecksumFailure(_))) {
334 let was_deltas_used = self.use_deltas_next_request.swap(false, Ordering::SeqCst);
335 if was_deltas_used {
336 log_d!(TAG, "Disabling delta requests after checksum failure");
337 }
338 } else if result.is_ok() && !requested_deltas && self.allow_dcs_deltas {
339 let was_deltas_used = self.use_deltas_next_request.swap(true, Ordering::SeqCst);
340 if !was_deltas_used {
341 log_d!(
342 TAG,
343 "Re-enabling delta requests after successful non-delta specs update"
344 );
345 }
346 }
347
348 self.ops_stats.add_marker(
349 Marker::new(
350 KeyType::DownloadConfigSpecs,
351 ActionType::End,
352 Some(StepType::Process),
353 )
354 .with_is_success(result.is_ok()),
355 None,
356 );
357
358 result
359 }
360}
361
362#[async_trait]
363impl SpecsAdapter for StatsigHttpSpecsAdapter {
364 async fn start(
365 self: Arc<Self>,
366 _statsig_runtime: &Arc<StatsigRuntime>,
367 ) -> Result<(), StatsigErr> {
368 let specs_info = match self
369 .listener
370 .try_read_for(std::time::Duration::from_secs(5))
371 {
372 Some(lock) => match lock.as_ref() {
373 Some(listener) => listener.get_current_specs_info(),
374 None => SpecsInfo::empty(),
375 },
376 None => SpecsInfo::error(),
377 };
378 self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
379 .await
380 }
381
382 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
383 match self
384 .listener
385 .try_write_for(std::time::Duration::from_secs(5))
386 {
387 Some(mut lock) => *lock = Some(listener),
388 None => {
389 log_e!(TAG, "Failed to acquire write lock on listener");
390 }
391 }
392 }
393
394 async fn schedule_background_sync(
395 self: Arc<Self>,
396 statsig_runtime: &Arc<StatsigRuntime>,
397 ) -> Result<(), StatsigErr> {
398 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
399 let interval_duration = self.sync_interval_duration;
400 let shutdown_notify = self.shutdown_notify.clone();
401
402 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
403 loop {
404 tokio::select! {
405 () = sleep(interval_duration) => {
406 if let Some(strong_self) = weak_self.upgrade() {
407 Self::run_background_sync(strong_self).await;
408 } else {
409 log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
410 break;
411 }
412 }
413 () = rt_shutdown_notify.notified() => {
414 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
415 break;
416 },
417 () = shutdown_notify.notified() => {
418 log_d!(TAG, "Shutting down specs background sync");
419 break;
420 }
421 }
422 }
423 })?;
424
425 Ok(())
426 }
427
428 async fn shutdown(
429 &self,
430 _timeout: Duration,
431 _statsig_runtime: &Arc<StatsigRuntime>,
432 ) -> Result<(), StatsigErr> {
433 self.shutdown_notify.notify_one();
434 Ok(())
435 }
436
437 fn get_type_name(&self) -> String {
438 stringify!(StatsigHttpSpecsAdapter).to_string()
439 }
440}
441
442#[allow(unused)]
443fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
444 format!("{spec_url}/{sdk_key}.json")
445}
446
447#[derive(Debug, Clone, Copy, PartialEq, Eq)]
448pub enum SpecsSyncTrigger {
449 Initial,
450 Background,
451 Manual,
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457 use crate::{networking::ResponseData, specs_adapter::SpecsUpdate, StatsigOptions};
458 use std::sync::atomic::AtomicUsize;
459
460 struct ChecksumFailingListener;
461
462 impl SpecsUpdateListener for ChecksumFailingListener {
463 fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
464 Err(StatsigErr::ChecksumFailure(
465 "simulated checksum failure".to_string(),
466 ))
467 }
468
469 fn get_current_specs_info(&self) -> SpecsInfo {
470 SpecsInfo::empty()
471 }
472 }
473
474 struct ChecksumFailingThenSuccessListener {
475 calls: AtomicUsize,
476 }
477
478 impl SpecsUpdateListener for ChecksumFailingThenSuccessListener {
479 fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
480 let curr = self.calls.fetch_add(1, Ordering::SeqCst);
481 if curr == 0 {
482 Err(StatsigErr::ChecksumFailure(
483 "simulated checksum failure".to_string(),
484 ))
485 } else {
486 Ok(())
487 }
488 }
489
490 fn get_current_specs_info(&self) -> SpecsInfo {
491 SpecsInfo::empty()
492 }
493 }
494
495 #[tokio::test]
496 async fn test_disable_accept_deltas_after_checksum_failure() {
497 let options = StatsigOptions {
498 enable_dcs_deltas: Some(true),
499 ..StatsigOptions::default()
500 };
501 let adapter = StatsigHttpSpecsAdapter::new(
502 "secret-key",
503 Some(&options),
504 Some("https://example.com/v2/download_config_specs".to_string()),
505 );
506 let specs_info = SpecsInfo::empty();
507
508 let request_before = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
509 assert_eq!(
510 request_before
511 .query_params
512 .as_ref()
513 .and_then(|p| p.get("accept_deltas"))
514 .map(String::as_str),
515 Some("true")
516 );
517
518 adapter.initialize(Arc::new(ChecksumFailingListener));
519 let result = adapter
520 .process_spec_data(Ok(NetworkResponse {
521 data: ResponseData::from_bytes(vec![]),
522 api: "test-api".to_string(),
523 requested_deltas: true,
524 }))
525 .await;
526
527 assert!(matches!(result, Err(StatsigErr::ChecksumFailure(_))));
528
529 let request_after = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
530 assert!(request_after
531 .query_params
532 .as_ref()
533 .is_none_or(|p| !p.contains_key("accept_deltas")));
534 }
535
536 #[tokio::test]
537 async fn test_reenable_accept_deltas_after_successful_non_delta_update() {
538 let options = StatsigOptions {
539 enable_dcs_deltas: Some(true),
540 ..StatsigOptions::default()
541 };
542 let adapter = StatsigHttpSpecsAdapter::new(
543 "secret-key",
544 Some(&options),
545 Some("https://example.com/v2/download_config_specs".to_string()),
546 );
547 let specs_info = SpecsInfo::empty();
548
549 adapter.initialize(Arc::new(ChecksumFailingThenSuccessListener {
550 calls: AtomicUsize::new(0),
551 }));
552
553 let first_result = adapter
554 .process_spec_data(Ok(NetworkResponse {
555 data: ResponseData::from_bytes(vec![]),
556 api: "test-api".to_string(),
557 requested_deltas: true,
558 }))
559 .await;
560
561 assert!(matches!(first_result, Err(StatsigErr::ChecksumFailure(_))));
562
563 let request_after_failure = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
564 assert!(request_after_failure
565 .query_params
566 .as_ref()
567 .is_none_or(|p| !p.contains_key("accept_deltas")));
568
569 let second_result = adapter
570 .process_spec_data(Ok(NetworkResponse {
571 data: ResponseData::from_bytes(vec![]),
572 api: "test-api".to_string(),
573 requested_deltas: false,
574 }))
575 .await;
576
577 assert!(second_result.is_ok());
578
579 let request_after_success = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
580 assert_eq!(
581 request_after_success
582 .query_params
583 .as_ref()
584 .and_then(|p| p.get("accept_deltas"))
585 .map(String::as_str),
586 Some("true")
587 );
588 }
589}