1use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
2use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
3use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
4use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
5use crate::sdk_diagnostics::diagnostics::ContextType;
6use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
7use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
8use crate::statsig_err::StatsigErr;
9use crate::statsig_metadata::StatsigMetadata;
10use crate::utils::get_api_from_url;
11use crate::DEFAULT_INIT_TIMEOUT_MS;
12use crate::{
13 log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
14};
15use async_trait::async_trait;
16use chrono::Utc;
17use parking_lot::RwLock;
18use percent_encoding::percent_encode;
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::Duration;
23use tokio::sync::Notify;
24use tokio::time::sleep;
25
26use super::SpecsInfo;
27
28pub struct NetworkResponse {
29 pub data: ResponseData,
30 pub loggable_api: String,
31 pub requested_deltas: bool,
32}
33
34pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
35pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
36
37#[allow(unused)]
38pub const INIT_DICT_ID: &str = "null";
39
40const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
41const CONFIG_SYNC_OVERALL_LATENCY_METRIC: &str = "config_sync_overall.latency";
42const CONFIG_SYNC_OVERALL_FORMAT_TAG: &str = "format";
43const CONFIG_SYNC_OVERALL_SOURCE_API_TAG: &str = "source_api";
44const CONFIG_SYNC_OVERALL_ERROR_TAG: &str = "error";
45const CONFIG_SYNC_OVERALL_NETWORK_SUCCESS_TAG: &str = "network_success";
46const CONFIG_SYNC_OVERALL_PROCESS_SUCCESS_TAG: &str = "process_success";
47const CONFIG_SYNC_OVERALL_DELTAS_USED_TAG: &str = "deltas_used";
48const STATSIG_NETWORK_FALLBACK_THRESHOLD: u32 = 5;
49
50pub struct StatsigHttpSpecsAdapter {
51 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
52 network: NetworkClient,
53 sdk_key: String,
54 specs_url: String,
55 fallback_url: Option<String>,
56 init_timeout_ms: u64,
57 sync_interval_duration: Duration,
58 ops_stats: Arc<OpsStatsForInstance>,
59 shutdown_notify: Arc<Notify>,
60 allow_dcs_deltas: bool,
61 use_deltas_next_request: AtomicBool,
62 background_sync_failure_count: AtomicU32,
63}
64
65enum ResponseFormat {
68 Json,
69 PlainText,
70 Protobuf,
71 Unknown,
72}
73
74enum NetworkSyncOutcome {
75 Success,
76 Failure,
77}
78
79impl NetworkSyncOutcome {
80 fn as_bool(&self) -> bool {
81 matches!(self, Self::Success)
82 }
83}
84
85impl ResponseFormat {
86 fn as_str(&self) -> &str {
87 match self {
88 ResponseFormat::Json => "json",
89 ResponseFormat::PlainText => "plain_text",
90 ResponseFormat::Protobuf => "protobuf",
91 ResponseFormat::Unknown => "unknown",
92 }
93 }
94}
95impl StatsigHttpSpecsAdapter {
98 #[must_use]
99 pub fn new(
100 sdk_key: &str,
101 options: Option<&StatsigOptions>,
102 override_url: Option<String>,
103 ) -> Self {
104 let default_options = StatsigOptions::default();
105 let options_ref = options.unwrap_or(&default_options);
106
107 let init_timeout_ms = options_ref
108 .init_timeout_ms
109 .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
110
111 let specs_url = match override_url {
112 Some(url) => url,
113 None => options_ref
114 .specs_url
115 .as_ref()
116 .map(|u| u.to_string())
117 .unwrap_or(DEFAULT_SPECS_URL.to_string()),
118 };
119
120 let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
122 && specs_url != DEFAULT_SPECS_URL
123 {
124 Some(DEFAULT_SPECS_URL.to_string())
125 } else {
126 None
127 };
128
129 let headers = StatsigMetadata::get_constant_request_headers(
130 sdk_key,
131 options_ref.service_name.as_deref(),
132 );
133 let enable_dcs_deltas = options_ref.enable_dcs_deltas.unwrap_or(false);
134
135 Self {
136 listener: RwLock::new(None),
137 network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
138 sdk_key: sdk_key.to_string(),
139 specs_url,
140 fallback_url,
141 init_timeout_ms,
142 sync_interval_duration: Duration::from_millis(u64::from(
143 options_ref
144 .specs_sync_interval_ms
145 .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
146 )),
147 ops_stats: OPS_STATS.get_for_instance(sdk_key),
148 shutdown_notify: Arc::new(Notify::new()),
149 allow_dcs_deltas: enable_dcs_deltas,
150 use_deltas_next_request: AtomicBool::new(enable_dcs_deltas),
151 background_sync_failure_count: AtomicU32::new(0),
152 }
153 }
154
155 pub fn force_shutdown(&self) {
156 self.shutdown_notify.notify_one();
157 }
158
159 pub async fn fetch_specs_from_network(
160 &self,
161 current_specs_info: SpecsInfo,
162 trigger: SpecsSyncTrigger,
163 ) -> Result<NetworkResponse, NetworkError> {
164 let request_args = self.get_request_args(¤t_specs_info, trigger);
165 let url = request_args.url.clone();
166 let requested_deltas = request_args.deltas_enabled;
167 match self.handle_specs_request(request_args).await {
168 Ok(response) => Ok(NetworkResponse {
169 data: response,
170 loggable_api: get_api_from_url(&url),
171 requested_deltas,
172 }),
173 Err(e) => Err(e),
174 }
175 }
176
177 fn get_request_args(
178 &self,
179 current_specs_info: &SpecsInfo,
180 trigger: SpecsSyncTrigger,
181 ) -> RequestArgs {
182 let mut params = HashMap::new();
183
184 params.insert("supports_proto".to_string(), "true".to_string());
185 let headers = Some(HashMap::from([
186 ("statsig-supports-proto".to_string(), "true".to_string()),
187 (
188 "accept-encoding".to_string(),
189 "statsig-br, gzip, deflate, br".to_string(),
190 ),
191 ]));
192
193 if let Some(lcut) = current_specs_info.lcut {
194 if lcut > 0 {
195 params.insert("sinceTime".to_string(), lcut.to_string());
196 }
197 }
198
199 let is_init_request = trigger == SpecsSyncTrigger::Initial;
200
201 let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
202 self.init_timeout_ms
203 } else {
204 0
205 };
206
207 if let Some(cs) = ¤t_specs_info.checksum {
208 params.insert(
209 "checksum".to_string(),
210 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
211 );
212 }
213
214 let use_deltas_next_req = self.use_deltas_next_request.load(Ordering::SeqCst);
215 if use_deltas_next_req {
216 params.insert("accept_deltas".to_string(), "true".to_string());
217 }
218
219 RequestArgs {
220 url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
221 retries: match trigger {
222 SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
223 SpecsSyncTrigger::Background => 3,
224 },
225 query_params: Some(params),
226 deltas_enabled: use_deltas_next_req,
227 accept_gzip_response: true,
228 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
229 timeout_ms,
230 headers,
231 ..RequestArgs::new()
232 }
233 }
234
235 async fn handle_fallback_request(
236 &self,
237 mut request_args: RequestArgs,
238 ) -> Result<NetworkResponse, NetworkError> {
239 let requested_deltas = request_args.deltas_enabled;
240 let fallback_url = match &self.fallback_url {
241 Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
242 None => {
243 return Err(NetworkError::RequestFailed(
244 request_args.url.clone(),
245 None,
246 "No fallback URL".to_string(),
247 ))
248 }
249 };
250
251 request_args.url = fallback_url.clone();
252
253 let response = self.handle_specs_request(request_args).await?;
256 Ok(NetworkResponse {
257 data: response,
258 loggable_api: get_api_from_url(&fallback_url),
259 requested_deltas,
260 })
261 }
262
263 async fn handle_specs_request(
264 &self,
265 request_args: RequestArgs,
266 ) -> Result<ResponseData, NetworkError> {
267 let url = request_args.url.clone();
268 let response = self.network.get(request_args).await?;
269 match response.data {
270 Some(data) => Ok(data),
271 None => Err(NetworkError::RequestFailed(
272 url,
273 None,
274 response.error.unwrap_or("No data in response".to_string()),
275 )),
276 }
277 }
278
279 fn should_attempt_fallback(
280 &self,
281 trigger: SpecsSyncTrigger,
282 result: &Result<(), StatsigErr>,
283 ) -> bool {
284 if result.is_ok() || self.fallback_url.is_none() {
285 return false;
286 }
287
288 if trigger != SpecsSyncTrigger::Background {
289 return true;
290 }
291
292 let failure_count = self
293 .background_sync_failure_count
294 .fetch_add(1, Ordering::SeqCst)
295 + 1;
296
297 if failure_count.is_multiple_of(STATSIG_NETWORK_FALLBACK_THRESHOLD) {
298 return true;
299 }
300
301 log_d!(
302 TAG,
303 "Skipping fallback on background sync failure {}. Retrying fallback every {} failures.",
304 failure_count,
305 STATSIG_NETWORK_FALLBACK_THRESHOLD
306 );
307
308 false
309 }
310
311 pub async fn run_background_sync(self: Arc<Self>) {
312 let specs_info = match self
313 .listener
314 .try_read_for(std::time::Duration::from_secs(5))
315 {
316 Some(lock) => match lock.as_ref() {
317 Some(listener) => listener.get_current_specs_info(),
318 None => SpecsInfo::empty(),
319 },
320 None => SpecsInfo::error(),
321 };
322
323 self.ops_stats
324 .set_diagnostics_context(ContextType::ConfigSync);
325 if let Err(e) = self
326 .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
327 .await
328 {
329 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
330 return;
331 }
332 log_e!(TAG, "Background specs sync failed: {}", e);
333 }
334 self.ops_stats.enqueue_diagnostics_event(
335 Some(KeyType::DownloadConfigSpecs),
336 Some(ContextType::ConfigSync),
337 );
338 }
339
340 async fn manually_sync_specs(
341 &self,
342 current_specs_info: SpecsInfo,
343 trigger: SpecsSyncTrigger,
344 ) -> Result<(), StatsigErr> {
345 if let Some(lock) = self
346 .listener
347 .try_read_for(std::time::Duration::from_secs(5))
348 {
349 if lock.is_none() {
350 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
351 }
352 }
353
354 let sync_start_ms = Utc::now().timestamp_millis() as u64;
355 let mut deltas_used = self.use_deltas_next_request.load(Ordering::SeqCst);
356 let response = self
357 .fetch_specs_from_network(current_specs_info.clone(), trigger)
358 .await;
359 let (mut source_api, mut response_format, mut network_success) = match &response {
360 Ok(response) => (
361 response.loggable_api.clone(),
362 Self::get_response_format(&response.data),
363 NetworkSyncOutcome::Success,
364 ),
365 Err(_) => (
366 get_api_from_url(&construct_specs_url(
367 self.specs_url.as_str(),
368 self.sdk_key.as_str(),
369 )),
370 ResponseFormat::Unknown,
371 NetworkSyncOutcome::Failure,
372 ),
373 };
374 if let Ok(response) = &response {
375 deltas_used = response.requested_deltas;
376 }
377
378 let mut result = self.process_spec_data(response).await;
379
380 if self.should_attempt_fallback(trigger, &result) {
381 log_d!(TAG, "Falling back to statsig api");
382 let fallback_args = self.get_request_args(¤t_specs_info, trigger);
383 deltas_used = fallback_args.deltas_enabled;
384 let response = self.handle_fallback_request(fallback_args).await;
385 match &response {
386 Ok(response) => {
387 source_api = response.loggable_api.clone();
388 response_format = Self::get_response_format(&response.data);
389 network_success = NetworkSyncOutcome::Success;
390 deltas_used = response.requested_deltas;
391 }
392 Err(_) => {
393 if let Some(fallback_url) = self.fallback_url.as_ref() {
395 source_api = get_api_from_url(&construct_specs_url(
396 fallback_url.as_str(),
397 self.sdk_key.as_str(),
398 ));
399 }
400 network_success = NetworkSyncOutcome::Failure;
401 }
402 }
403 result = self.process_spec_data(response).await;
404 }
405
406 let process_success = !matches!(result.as_ref(), Err(StatsigErr::NetworkError(_)));
407 self.log_config_sync_overall_latency(
408 sync_start_ms,
409 &source_api,
410 response_format.as_str(),
411 network_success.as_bool(),
412 process_success,
413 result
414 .as_ref()
415 .err()
416 .map_or_else(String::new, |e| e.to_string()),
417 deltas_used,
418 );
419
420 result
421 }
422
423 fn get_response_format(response_data: &ResponseData) -> ResponseFormat {
425 if Self::is_response_protobuf(response_data) {
426 return ResponseFormat::Protobuf;
427 }
428
429 let content_type = match response_data.get_header_ref("content-type") {
430 Some(content_type) => content_type.to_ascii_lowercase(),
431 None => return ResponseFormat::Unknown,
432 };
433
434 if content_type.contains("application/json") || content_type.contains("+json") {
435 return ResponseFormat::Json;
436 }
437
438 if content_type.contains("text/plain") {
439 return ResponseFormat::PlainText;
440 }
441
442 ResponseFormat::Unknown
443 }
444
445 fn is_response_protobuf(response_data: &ResponseData) -> bool {
446 let content_type = response_data.get_header_ref("content-type");
447 if content_type.map(|s| s.as_str().contains("application/octet-stream")) != Some(true) {
448 return false;
449 }
450
451 let content_encoding = response_data.get_header_ref("content-encoding");
452 content_encoding.map(|s| s.as_str().contains("statsig-br")) == Some(true)
453 }
454
455 #[allow(clippy::too_many_arguments)]
456 fn log_config_sync_overall_latency(
457 &self,
458 sync_start_ms: u64,
459 source_api: &str,
460 response_format: &str,
461 network_success: bool,
462 process_success: bool,
463 error: String,
464 deltas_used: bool,
465 ) {
466 let latency_ms =
467 (Utc::now().timestamp_millis() as u64).saturating_sub(sync_start_ms) as f64;
468 self.ops_stats.log(ObservabilityEvent::new_event(
469 MetricType::Dist,
470 CONFIG_SYNC_OVERALL_LATENCY_METRIC.to_string(),
471 latency_ms,
472 Some(HashMap::from([
473 (
474 CONFIG_SYNC_OVERALL_SOURCE_API_TAG.to_string(),
475 get_api_from_url(source_api),
476 ),
477 (
478 CONFIG_SYNC_OVERALL_FORMAT_TAG.to_string(),
479 response_format.to_string(),
480 ),
481 (CONFIG_SYNC_OVERALL_ERROR_TAG.to_string(), error),
482 (
483 CONFIG_SYNC_OVERALL_NETWORK_SUCCESS_TAG.to_string(),
484 network_success.to_string(),
485 ),
486 (
487 CONFIG_SYNC_OVERALL_PROCESS_SUCCESS_TAG.to_string(),
488 process_success.to_string(),
489 ),
490 (
491 CONFIG_SYNC_OVERALL_DELTAS_USED_TAG.to_string(),
492 deltas_used.to_string(),
493 ),
494 ])),
495 ));
496 }
497 async fn process_spec_data(
500 &self,
501 response: Result<NetworkResponse, NetworkError>,
502 ) -> Result<(), StatsigErr> {
503 let resp = response.map_err(StatsigErr::NetworkError)?;
504 let requested_deltas = resp.requested_deltas;
505
506 let update = SpecsUpdate {
507 data: resp.data,
508 source: SpecsSource::Network,
509 received_at: Utc::now().timestamp_millis() as u64,
510 source_api: Some(resp.loggable_api),
511 };
512
513 self.ops_stats.add_marker(
514 Marker::new(
515 KeyType::DownloadConfigSpecs,
516 ActionType::Start,
517 Some(StepType::Process),
518 ),
519 None,
520 );
521
522 let result = match self
523 .listener
524 .try_read_for(std::time::Duration::from_secs(5))
525 {
526 Some(lock) => match lock.as_ref() {
527 Some(listener) => listener.did_receive_specs_update(update),
528 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
529 },
530 None => {
531 let err =
532 StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
533 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
534 Err(err)
535 }
536 };
537
538 if matches!(&result, Err(StatsigErr::ChecksumFailure(_))) {
539 let was_deltas_used = self.use_deltas_next_request.swap(false, Ordering::SeqCst);
540 if was_deltas_used {
541 log_d!(TAG, "Disabling delta requests after checksum failure");
542 }
543 } else if result.is_ok() && !requested_deltas && self.allow_dcs_deltas {
544 let was_deltas_used = self.use_deltas_next_request.swap(true, Ordering::SeqCst);
545 if !was_deltas_used {
546 log_d!(
547 TAG,
548 "Re-enabling delta requests after successful non-delta specs update"
549 );
550 }
551 }
552
553 self.ops_stats.add_marker(
554 Marker::new(
555 KeyType::DownloadConfigSpecs,
556 ActionType::End,
557 Some(StepType::Process),
558 )
559 .with_is_success(result.is_ok()),
560 None,
561 );
562
563 result
564 }
565}
566
567#[async_trait]
568impl SpecsAdapter for StatsigHttpSpecsAdapter {
569 async fn start(
570 self: Arc<Self>,
571 _statsig_runtime: &Arc<StatsigRuntime>,
572 ) -> Result<(), StatsigErr> {
573 let specs_info = match self
574 .listener
575 .try_read_for(std::time::Duration::from_secs(5))
576 {
577 Some(lock) => match lock.as_ref() {
578 Some(listener) => listener.get_current_specs_info(),
579 None => SpecsInfo::empty(),
580 },
581 None => SpecsInfo::error(),
582 };
583 self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
584 .await
585 }
586
587 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
588 match self
589 .listener
590 .try_write_for(std::time::Duration::from_secs(5))
591 {
592 Some(mut lock) => *lock = Some(listener),
593 None => {
594 log_e!(TAG, "Failed to acquire write lock on listener");
595 }
596 }
597 }
598
599 async fn schedule_background_sync(
600 self: Arc<Self>,
601 statsig_runtime: &Arc<StatsigRuntime>,
602 ) -> Result<(), StatsigErr> {
603 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
604 let interval_duration = self.sync_interval_duration;
605 let shutdown_notify = self.shutdown_notify.clone();
606
607 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
608 loop {
609 tokio::select! {
610 () = sleep(interval_duration) => {
611 if let Some(strong_self) = weak_self.upgrade() {
612 Self::run_background_sync(strong_self).await;
613 } else {
614 log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
615 break;
616 }
617 }
618 () = rt_shutdown_notify.notified() => {
619 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
620 break;
621 },
622 () = shutdown_notify.notified() => {
623 log_d!(TAG, "Shutting down specs background sync");
624 break;
625 }
626 }
627 }
628 })?;
629
630 Ok(())
631 }
632
633 async fn shutdown(
634 &self,
635 _timeout: Duration,
636 _statsig_runtime: &Arc<StatsigRuntime>,
637 ) -> Result<(), StatsigErr> {
638 self.shutdown_notify.notify_one();
639 Ok(())
640 }
641
642 fn get_type_name(&self) -> String {
643 stringify!(StatsigHttpSpecsAdapter).to_string()
644 }
645}
646
647#[allow(unused)]
648fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
649 format!("{spec_url}/{sdk_key}.json")
650}
651
652#[derive(Debug, Clone, Copy, PartialEq, Eq)]
653pub enum SpecsSyncTrigger {
654 Initial,
655 Background,
656 Manual,
657}
658
659#[cfg(test)]
660mod tests {
661 use super::*;
662 use crate::{networking::ResponseData, specs_adapter::SpecsUpdate, StatsigOptions};
663 use std::collections::HashMap;
664 use std::sync::atomic::AtomicUsize;
665
666 struct ChecksumFailingListener;
667
668 impl SpecsUpdateListener for ChecksumFailingListener {
669 fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
670 Err(StatsigErr::ChecksumFailure(
671 "simulated checksum failure".to_string(),
672 ))
673 }
674
675 fn get_current_specs_info(&self) -> SpecsInfo {
676 SpecsInfo::empty()
677 }
678 }
679
680 struct ChecksumFailingThenSuccessListener {
681 calls: AtomicUsize,
682 }
683
684 impl SpecsUpdateListener for ChecksumFailingThenSuccessListener {
685 fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
686 let curr = self.calls.fetch_add(1, Ordering::SeqCst);
687 if curr == 0 {
688 Err(StatsigErr::ChecksumFailure(
689 "simulated checksum failure".to_string(),
690 ))
691 } else {
692 Ok(())
693 }
694 }
695
696 fn get_current_specs_info(&self) -> SpecsInfo {
697 SpecsInfo::empty()
698 }
699 }
700
701 #[tokio::test]
702 async fn test_disable_accept_deltas_after_checksum_failure() {
703 let options = StatsigOptions {
704 enable_dcs_deltas: Some(true),
705 ..StatsigOptions::default()
706 };
707 let adapter = StatsigHttpSpecsAdapter::new(
708 "secret-key",
709 Some(&options),
710 Some("https://example.com/v2/download_config_specs".to_string()),
711 );
712 let specs_info = SpecsInfo::empty();
713
714 let request_before = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
715 assert_eq!(
716 request_before
717 .query_params
718 .as_ref()
719 .and_then(|p| p.get("accept_deltas"))
720 .map(String::as_str),
721 Some("true")
722 );
723
724 adapter.initialize(Arc::new(ChecksumFailingListener));
725 let result = adapter
726 .process_spec_data(Ok(NetworkResponse {
727 data: ResponseData::from_bytes(vec![]),
728 loggable_api: "test-api".to_string(),
729 requested_deltas: true,
730 }))
731 .await;
732
733 assert!(matches!(result, Err(StatsigErr::ChecksumFailure(_))));
734
735 let request_after = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
736 assert!(request_after
737 .query_params
738 .as_ref()
739 .is_none_or(|p| !p.contains_key("accept_deltas")));
740 }
741
742 #[tokio::test]
743 async fn test_reenable_accept_deltas_after_successful_non_delta_update() {
744 let options = StatsigOptions {
745 enable_dcs_deltas: Some(true),
746 ..StatsigOptions::default()
747 };
748 let adapter = StatsigHttpSpecsAdapter::new(
749 "secret-key",
750 Some(&options),
751 Some("https://example.com/v2/download_config_specs".to_string()),
752 );
753 let specs_info = SpecsInfo::empty();
754
755 adapter.initialize(Arc::new(ChecksumFailingThenSuccessListener {
756 calls: AtomicUsize::new(0),
757 }));
758
759 let first_result = adapter
760 .process_spec_data(Ok(NetworkResponse {
761 data: ResponseData::from_bytes(vec![]),
762 loggable_api: "test-api".to_string(),
763 requested_deltas: true,
764 }))
765 .await;
766
767 assert!(matches!(first_result, Err(StatsigErr::ChecksumFailure(_))));
768
769 let request_after_failure = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
770 assert!(request_after_failure
771 .query_params
772 .as_ref()
773 .is_none_or(|p| !p.contains_key("accept_deltas")));
774
775 let second_result = adapter
776 .process_spec_data(Ok(NetworkResponse {
777 data: ResponseData::from_bytes(vec![]),
778 loggable_api: "test-api".to_string(),
779 requested_deltas: false,
780 }))
781 .await;
782
783 assert!(second_result.is_ok());
784
785 let request_after_success = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
786 assert_eq!(
787 request_after_success
788 .query_params
789 .as_ref()
790 .and_then(|p| p.get("accept_deltas"))
791 .map(String::as_str),
792 Some("true")
793 );
794 }
795
796 #[test]
797 fn test_get_response_format_json() {
798 let mut headers = HashMap::new();
799 headers.insert("content-type".to_string(), "application/json".to_string());
800 let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
801 assert!(matches!(
802 StatsigHttpSpecsAdapter::get_response_format(&data),
803 ResponseFormat::Json
804 ));
805 }
806
807 #[test]
808 fn test_get_response_format_plain_text() {
809 let mut headers = HashMap::new();
810 headers.insert(
811 "content-type".to_string(),
812 "text/plain; charset=utf-8".to_string(),
813 );
814 let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
815 assert!(matches!(
816 StatsigHttpSpecsAdapter::get_response_format(&data),
817 ResponseFormat::PlainText
818 ));
819 }
820
821 #[test]
822 fn test_get_response_format_protobuf() {
823 let mut headers = HashMap::new();
824 headers.insert(
825 "content-type".to_string(),
826 "application/octet-stream".to_string(),
827 );
828 headers.insert("content-encoding".to_string(), "statsig-br".to_string());
829 let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
830 assert!(matches!(
831 StatsigHttpSpecsAdapter::get_response_format(&data),
832 ResponseFormat::Protobuf
833 ));
834 }
835
836 #[test]
837 fn test_get_response_format_unknown_without_content_type() {
838 let data = ResponseData::from_bytes(vec![]);
839 assert!(matches!(
840 StatsigHttpSpecsAdapter::get_response_format(&data),
841 ResponseFormat::Unknown
842 ));
843 }
844}