1use super::config_spec_background_sync_metrics::log_config_sync_overall_latency;
2use super::response_format::{get_specs_response_format, SpecsResponseFormat};
3use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
4use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
5use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
6use crate::sdk_diagnostics::diagnostics::ContextType;
7use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
8use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
9use crate::statsig_err::StatsigErr;
10use crate::statsig_metadata::StatsigMetadata;
11use crate::utils::get_api_from_url;
12use crate::DEFAULT_INIT_TIMEOUT_MS;
13use crate::{
14 log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
15};
16use async_trait::async_trait;
17use chrono::Utc;
18use parking_lot::RwLock;
19use percent_encoding::percent_encode;
20use std::collections::HashMap;
21use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
22use std::sync::{Arc, Weak};
23use std::time::Duration;
24use tokio::sync::Notify;
25use tokio::time::sleep;
26
27use super::SpecsInfo;
28
29pub struct NetworkResponse {
30 pub data: ResponseData,
31 pub loggable_api: String,
32 pub requested_deltas: bool,
33}
34
35pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
36pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
37
38#[allow(unused)]
39pub const INIT_DICT_ID: &str = "null";
40
41const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
42const STATSIG_NETWORK_FALLBACK_THRESHOLD: u32 = 5;
43
44pub struct StatsigHttpSpecsAdapter {
45 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
46 network: NetworkClient,
47 sdk_key: String,
48 specs_url: String,
49 fallback_url: Option<String>,
50 init_timeout_ms: u64,
51 sync_interval_duration: Duration,
52 ops_stats: Arc<OpsStatsForInstance>,
53 shutdown_notify: Arc<Notify>,
54 allow_dcs_deltas: bool,
55 use_deltas_next_request: AtomicBool,
56 background_sync_failure_count: AtomicU32,
57}
58
59enum NetworkSyncOutcome {
63 Success,
64 Failure,
65}
66
67impl NetworkSyncOutcome {
68 fn as_bool(&self) -> bool {
69 matches!(self, Self::Success)
70 }
71}
72impl StatsigHttpSpecsAdapter {
75 #[must_use]
76 pub fn new(
77 sdk_key: &str,
78 options: Option<&StatsigOptions>,
79 override_url: Option<String>,
80 ) -> Self {
81 let default_options = StatsigOptions::default();
82 let options_ref = options.unwrap_or(&default_options);
83
84 let init_timeout_ms = options_ref
85 .init_timeout_ms
86 .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
87
88 let specs_url = match override_url {
89 Some(url) => url,
90 None => options_ref
91 .specs_url
92 .as_ref()
93 .map(|u| u.to_string())
94 .unwrap_or(DEFAULT_SPECS_URL.to_string()),
95 };
96
97 let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
99 && specs_url != DEFAULT_SPECS_URL
100 {
101 Some(DEFAULT_SPECS_URL.to_string())
102 } else {
103 None
104 };
105
106 let headers = StatsigMetadata::get_constant_request_headers(
107 sdk_key,
108 options_ref.service_name.as_deref(),
109 );
110 let enable_dcs_deltas = options_ref.enable_dcs_deltas.unwrap_or(false);
111
112 Self {
113 listener: RwLock::new(None),
114 network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
115 sdk_key: sdk_key.to_string(),
116 specs_url,
117 fallback_url,
118 init_timeout_ms,
119 sync_interval_duration: Duration::from_millis(u64::from(
120 options_ref
121 .specs_sync_interval_ms
122 .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
123 )),
124 ops_stats: OPS_STATS.get_for_instance(sdk_key),
125 shutdown_notify: Arc::new(Notify::new()),
126 allow_dcs_deltas: enable_dcs_deltas,
127 use_deltas_next_request: AtomicBool::new(enable_dcs_deltas),
128 background_sync_failure_count: AtomicU32::new(0),
129 }
130 }
131
132 pub fn force_shutdown(&self) {
133 self.shutdown_notify.notify_one();
134 }
135
136 pub async fn fetch_specs_from_network(
137 &self,
138 current_specs_info: SpecsInfo,
139 trigger: SpecsSyncTrigger,
140 ) -> Result<NetworkResponse, NetworkError> {
141 let request_args = self.get_request_args(¤t_specs_info, trigger);
142 let url = request_args.url.clone();
143 let requested_deltas = request_args.deltas_enabled;
144 match self.handle_specs_request(request_args).await {
145 Ok(response) => Ok(NetworkResponse {
146 data: response,
147 loggable_api: get_api_from_url(&url),
148 requested_deltas,
149 }),
150 Err(e) => Err(e),
151 }
152 }
153
154 fn get_request_args(
155 &self,
156 current_specs_info: &SpecsInfo,
157 trigger: SpecsSyncTrigger,
158 ) -> RequestArgs {
159 let mut params = HashMap::new();
160
161 params.insert("supports_proto".to_string(), "true".to_string());
162 let headers = Some(HashMap::from([
163 ("statsig-supports-proto".to_string(), "true".to_string()),
164 (
165 "accept-encoding".to_string(),
166 "statsig-br, gzip, deflate, br".to_string(),
167 ),
168 ]));
169
170 if let Some(lcut) = current_specs_info.lcut {
171 if lcut > 0 {
172 params.insert("sinceTime".to_string(), lcut.to_string());
173 }
174 }
175
176 let is_init_request = trigger == SpecsSyncTrigger::Initial;
177
178 let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
179 self.init_timeout_ms
180 } else {
181 0
182 };
183
184 if let Some(cs) = ¤t_specs_info.checksum {
185 params.insert(
186 "checksum".to_string(),
187 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
188 );
189 }
190
191 let use_deltas_next_req = self.use_deltas_next_request.load(Ordering::SeqCst);
192 if use_deltas_next_req {
193 params.insert("accept_deltas".to_string(), "true".to_string());
194 }
195
196 RequestArgs {
197 url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
198 retries: match trigger {
199 SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
200 SpecsSyncTrigger::Background => 3,
201 },
202 query_params: Some(params),
203 deltas_enabled: use_deltas_next_req,
204 accept_gzip_response: true,
205 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
206 timeout_ms,
207 headers,
208 ..RequestArgs::new()
209 }
210 }
211
212 async fn handle_fallback_request(
213 &self,
214 mut request_args: RequestArgs,
215 ) -> Result<NetworkResponse, NetworkError> {
216 let requested_deltas = request_args.deltas_enabled;
217 let fallback_url = match &self.fallback_url {
218 Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
219 None => {
220 return Err(NetworkError::RequestFailed(
221 request_args.url.clone(),
222 None,
223 "No fallback URL".to_string(),
224 ))
225 }
226 };
227
228 request_args.url = fallback_url.clone();
229
230 let response = self.handle_specs_request(request_args).await?;
233 Ok(NetworkResponse {
234 data: response,
235 loggable_api: get_api_from_url(&fallback_url),
236 requested_deltas,
237 })
238 }
239
240 async fn handle_specs_request(
241 &self,
242 request_args: RequestArgs,
243 ) -> Result<ResponseData, NetworkError> {
244 let url = request_args.url.clone();
245 let response = self.network.get(request_args).await?;
246 match response.data {
247 Some(data) => Ok(data),
248 None => Err(NetworkError::RequestFailed(
249 url,
250 None,
251 response.error.unwrap_or("No data in response".to_string()),
252 )),
253 }
254 }
255
256 fn should_attempt_fallback(
257 &self,
258 trigger: SpecsSyncTrigger,
259 result: &Result<(), StatsigErr>,
260 ) -> bool {
261 if result.is_ok() || self.fallback_url.is_none() {
262 return false;
263 }
264
265 if trigger != SpecsSyncTrigger::Background {
266 return true;
267 }
268
269 let failure_count = self
270 .background_sync_failure_count
271 .fetch_add(1, Ordering::SeqCst)
272 + 1;
273
274 if failure_count.is_multiple_of(STATSIG_NETWORK_FALLBACK_THRESHOLD) {
275 return true;
276 }
277
278 log_d!(
279 TAG,
280 "Skipping fallback on background sync failure {}. Retrying fallback every {} failures.",
281 failure_count,
282 STATSIG_NETWORK_FALLBACK_THRESHOLD
283 );
284
285 false
286 }
287
288 pub async fn run_background_sync(self: Arc<Self>) {
289 let specs_info = match self
290 .listener
291 .try_read_for(std::time::Duration::from_secs(5))
292 {
293 Some(lock) => match lock.as_ref() {
294 Some(listener) => listener.get_current_specs_info(),
295 None => SpecsInfo::empty(),
296 },
297 None => SpecsInfo::error(),
298 };
299
300 self.ops_stats
301 .set_diagnostics_context(ContextType::ConfigSync);
302 if let Err(e) = self
303 .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
304 .await
305 {
306 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
307 return;
308 }
309 log_e!(TAG, "Background specs sync failed: {}", e);
310 }
311 self.ops_stats.enqueue_diagnostics_event(
312 Some(KeyType::DownloadConfigSpecs),
313 Some(ContextType::ConfigSync),
314 );
315 }
316
317 async fn manually_sync_specs(
318 &self,
319 current_specs_info: SpecsInfo,
320 trigger: SpecsSyncTrigger,
321 ) -> Result<(), StatsigErr> {
322 if let Some(lock) = self
323 .listener
324 .try_read_for(std::time::Duration::from_secs(5))
325 {
326 if lock.is_none() {
327 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
328 }
329 }
330
331 let sync_start_ms = Utc::now().timestamp_millis() as u64;
332 let mut deltas_used = self.use_deltas_next_request.load(Ordering::SeqCst);
333 let response = self
334 .fetch_specs_from_network(current_specs_info.clone(), trigger)
335 .await;
336 let (mut source_api, mut response_format, mut network_success) = match &response {
337 Ok(response) => (
338 response.loggable_api.clone(),
339 get_specs_response_format(&response.data),
340 NetworkSyncOutcome::Success,
341 ),
342 Err(_) => (
343 get_api_from_url(&construct_specs_url(
344 self.specs_url.as_str(),
345 self.sdk_key.as_str(),
346 )),
347 SpecsResponseFormat::Unknown,
348 NetworkSyncOutcome::Failure,
349 ),
350 };
351 if let Ok(response) = &response {
352 deltas_used = response.requested_deltas;
353 }
354
355 let mut result = self.process_spec_data(response).await;
356
357 if self.should_attempt_fallback(trigger, &result) {
358 log_d!(TAG, "Falling back to statsig api");
359 let fallback_args = self.get_request_args(¤t_specs_info, trigger);
360 deltas_used = fallback_args.deltas_enabled;
361 let response = self.handle_fallback_request(fallback_args).await;
362 match &response {
363 Ok(response) => {
364 source_api = response.loggable_api.clone();
365 response_format = get_specs_response_format(&response.data);
366 network_success = NetworkSyncOutcome::Success;
367 deltas_used = response.requested_deltas;
368 }
369 Err(_) => {
370 if let Some(fallback_url) = self.fallback_url.as_ref() {
372 source_api = get_api_from_url(&construct_specs_url(
373 fallback_url.as_str(),
374 self.sdk_key.as_str(),
375 ));
376 }
377 network_success = NetworkSyncOutcome::Failure;
378 }
379 }
380 result = self.process_spec_data(response).await;
381 }
382
383 let process_success = !matches!(result.as_ref(), Err(StatsigErr::NetworkError(_)));
384 log_config_sync_overall_latency(
385 &self.ops_stats,
386 sync_start_ms,
387 source_api.as_str(),
388 response_format.as_str(),
389 network_success.as_bool(),
390 process_success,
391 result
392 .as_ref()
393 .err()
394 .map_or_else(String::new, |e| e.to_string()),
395 deltas_used,
396 );
397
398 result
399 }
400
401 async fn process_spec_data(
402 &self,
403 response: Result<NetworkResponse, NetworkError>,
404 ) -> Result<(), StatsigErr> {
405 let resp = response.map_err(StatsigErr::NetworkError)?;
406 let requested_deltas = resp.requested_deltas;
407
408 let update = SpecsUpdate {
409 data: resp.data,
410 source: SpecsSource::Network,
411 received_at: Utc::now().timestamp_millis() as u64,
412 source_api: Some(resp.loggable_api),
413 };
414
415 self.ops_stats.add_marker(
416 Marker::new(
417 KeyType::DownloadConfigSpecs,
418 ActionType::Start,
419 Some(StepType::Process),
420 ),
421 None,
422 );
423
424 let result = match self
425 .listener
426 .try_read_for(std::time::Duration::from_secs(5))
427 {
428 Some(lock) => match lock.as_ref() {
429 Some(listener) => listener.did_receive_specs_update(update),
430 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
431 },
432 None => {
433 let err =
434 StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
435 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
436 Err(err)
437 }
438 };
439
440 if matches!(&result, Err(StatsigErr::ChecksumFailure(_))) {
441 let was_deltas_used = self.use_deltas_next_request.swap(false, Ordering::SeqCst);
442 if was_deltas_used {
443 log_d!(TAG, "Disabling delta requests after checksum failure");
444 }
445 } else if result.is_ok() && !requested_deltas && self.allow_dcs_deltas {
446 let was_deltas_used = self.use_deltas_next_request.swap(true, Ordering::SeqCst);
447 if !was_deltas_used {
448 log_d!(
449 TAG,
450 "Re-enabling delta requests after successful non-delta specs update"
451 );
452 }
453 }
454
455 self.ops_stats.add_marker(
456 Marker::new(
457 KeyType::DownloadConfigSpecs,
458 ActionType::End,
459 Some(StepType::Process),
460 )
461 .with_is_success(result.is_ok()),
462 None,
463 );
464
465 result
466 }
467}
468
469#[async_trait]
470impl SpecsAdapter for StatsigHttpSpecsAdapter {
471 async fn start(
472 self: Arc<Self>,
473 _statsig_runtime: &Arc<StatsigRuntime>,
474 ) -> Result<(), StatsigErr> {
475 let specs_info = match self
476 .listener
477 .try_read_for(std::time::Duration::from_secs(5))
478 {
479 Some(lock) => match lock.as_ref() {
480 Some(listener) => listener.get_current_specs_info(),
481 None => SpecsInfo::empty(),
482 },
483 None => SpecsInfo::error(),
484 };
485 self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
486 .await
487 }
488
489 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
490 match self
491 .listener
492 .try_write_for(std::time::Duration::from_secs(5))
493 {
494 Some(mut lock) => *lock = Some(listener),
495 None => {
496 log_e!(TAG, "Failed to acquire write lock on listener");
497 }
498 }
499 }
500
501 async fn schedule_background_sync(
502 self: Arc<Self>,
503 statsig_runtime: &Arc<StatsigRuntime>,
504 ) -> Result<(), StatsigErr> {
505 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
506 let interval_duration = self.sync_interval_duration;
507 let shutdown_notify = self.shutdown_notify.clone();
508
509 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
510 loop {
511 tokio::select! {
512 () = sleep(interval_duration) => {
513 if let Some(strong_self) = weak_self.upgrade() {
514 Self::run_background_sync(strong_self).await;
515 } else {
516 log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
517 break;
518 }
519 }
520 () = rt_shutdown_notify.notified() => {
521 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
522 break;
523 },
524 () = shutdown_notify.notified() => {
525 log_d!(TAG, "Shutting down specs background sync");
526 break;
527 }
528 }
529 }
530 })?;
531
532 Ok(())
533 }
534
535 async fn shutdown(
536 &self,
537 _timeout: Duration,
538 _statsig_runtime: &Arc<StatsigRuntime>,
539 ) -> Result<(), StatsigErr> {
540 self.shutdown_notify.notify_one();
541 Ok(())
542 }
543
544 fn get_type_name(&self) -> String {
545 stringify!(StatsigHttpSpecsAdapter).to_string()
546 }
547}
548
549#[allow(unused)]
550fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
551 format!("{spec_url}/{sdk_key}.json")
552}
553
554#[derive(Debug, Clone, Copy, PartialEq, Eq)]
555pub enum SpecsSyncTrigger {
556 Initial,
557 Background,
558 Manual,
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use crate::{networking::ResponseData, specs_adapter::SpecsUpdate, StatsigOptions};
565 use std::collections::HashMap;
566 use std::sync::atomic::AtomicUsize;
567
568 struct ChecksumFailingListener;
569
570 impl SpecsUpdateListener for ChecksumFailingListener {
571 fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
572 Err(StatsigErr::ChecksumFailure(
573 "simulated checksum failure".to_string(),
574 ))
575 }
576
577 fn get_current_specs_info(&self) -> SpecsInfo {
578 SpecsInfo::empty()
579 }
580 }
581
582 struct ChecksumFailingThenSuccessListener {
583 calls: AtomicUsize,
584 }
585
586 impl SpecsUpdateListener for ChecksumFailingThenSuccessListener {
587 fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
588 let curr = self.calls.fetch_add(1, Ordering::SeqCst);
589 if curr == 0 {
590 Err(StatsigErr::ChecksumFailure(
591 "simulated checksum failure".to_string(),
592 ))
593 } else {
594 Ok(())
595 }
596 }
597
598 fn get_current_specs_info(&self) -> SpecsInfo {
599 SpecsInfo::empty()
600 }
601 }
602
603 #[tokio::test]
604 async fn test_disable_accept_deltas_after_checksum_failure() {
605 let options = StatsigOptions {
606 enable_dcs_deltas: Some(true),
607 ..StatsigOptions::default()
608 };
609 let adapter = StatsigHttpSpecsAdapter::new(
610 "secret-key",
611 Some(&options),
612 Some("https://example.com/v2/download_config_specs".to_string()),
613 );
614 let specs_info = SpecsInfo::empty();
615
616 let request_before = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
617 assert_eq!(
618 request_before
619 .query_params
620 .as_ref()
621 .and_then(|p| p.get("accept_deltas"))
622 .map(String::as_str),
623 Some("true")
624 );
625
626 adapter.initialize(Arc::new(ChecksumFailingListener));
627 let result = adapter
628 .process_spec_data(Ok(NetworkResponse {
629 data: ResponseData::from_bytes(vec![]),
630 loggable_api: "test-api".to_string(),
631 requested_deltas: true,
632 }))
633 .await;
634
635 assert!(matches!(result, Err(StatsigErr::ChecksumFailure(_))));
636
637 let request_after = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
638 assert!(request_after
639 .query_params
640 .as_ref()
641 .is_none_or(|p| !p.contains_key("accept_deltas")));
642 }
643
644 #[tokio::test]
645 async fn test_reenable_accept_deltas_after_successful_non_delta_update() {
646 let options = StatsigOptions {
647 enable_dcs_deltas: Some(true),
648 ..StatsigOptions::default()
649 };
650 let adapter = StatsigHttpSpecsAdapter::new(
651 "secret-key",
652 Some(&options),
653 Some("https://example.com/v2/download_config_specs".to_string()),
654 );
655 let specs_info = SpecsInfo::empty();
656
657 adapter.initialize(Arc::new(ChecksumFailingThenSuccessListener {
658 calls: AtomicUsize::new(0),
659 }));
660
661 let first_result = adapter
662 .process_spec_data(Ok(NetworkResponse {
663 data: ResponseData::from_bytes(vec![]),
664 loggable_api: "test-api".to_string(),
665 requested_deltas: true,
666 }))
667 .await;
668
669 assert!(matches!(first_result, Err(StatsigErr::ChecksumFailure(_))));
670
671 let request_after_failure = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
672 assert!(request_after_failure
673 .query_params
674 .as_ref()
675 .is_none_or(|p| !p.contains_key("accept_deltas")));
676
677 let second_result = adapter
678 .process_spec_data(Ok(NetworkResponse {
679 data: ResponseData::from_bytes(vec![]),
680 loggable_api: "test-api".to_string(),
681 requested_deltas: false,
682 }))
683 .await;
684
685 assert!(second_result.is_ok());
686
687 let request_after_success = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
688 assert_eq!(
689 request_after_success
690 .query_params
691 .as_ref()
692 .and_then(|p| p.get("accept_deltas"))
693 .map(String::as_str),
694 Some("true")
695 );
696 }
697
698 #[test]
699 fn test_get_response_format_json() {
700 let mut headers = HashMap::new();
701 headers.insert("content-type".to_string(), "application/json".to_string());
702 let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
703 assert!(matches!(
704 get_specs_response_format(&data),
705 SpecsResponseFormat::Json
706 ));
707 }
708
709 #[test]
710 fn test_get_response_format_plain_text() {
711 let mut headers = HashMap::new();
712 headers.insert(
713 "content-type".to_string(),
714 "text/plain; charset=utf-8".to_string(),
715 );
716 let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
717 assert!(matches!(
718 get_specs_response_format(&data),
719 SpecsResponseFormat::PlainText
720 ));
721 }
722
723 #[test]
724 fn test_get_response_format_protobuf() {
725 let mut headers = HashMap::new();
726 headers.insert(
727 "content-type".to_string(),
728 "application/octet-stream".to_string(),
729 );
730 headers.insert("content-encoding".to_string(), "statsig-br".to_string());
731 let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
732 assert!(matches!(
733 get_specs_response_format(&data),
734 SpecsResponseFormat::Protobuf
735 ));
736 }
737
738 #[test]
739 fn test_get_response_format_unknown_without_content_type() {
740 let data = ResponseData::from_bytes(vec![]);
741 assert!(matches!(
742 get_specs_response_format(&data),
743 SpecsResponseFormat::Unknown
744 ));
745 }
746}