Skip to main content

statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
2use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
3use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
4use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
5use crate::sdk_diagnostics::diagnostics::ContextType;
6use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
7use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
8use crate::statsig_err::StatsigErr;
9use crate::statsig_metadata::StatsigMetadata;
10use crate::utils::get_api_from_url;
11use crate::DEFAULT_INIT_TIMEOUT_MS;
12use crate::{
13    log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
14};
15use async_trait::async_trait;
16use chrono::Utc;
17use parking_lot::RwLock;
18use percent_encoding::percent_encode;
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::Duration;
23use tokio::sync::Notify;
24use tokio::time::sleep;
25
26use super::SpecsInfo;
27
28pub struct NetworkResponse {
29    pub data: ResponseData,
30    pub loggable_api: String,
31    pub requested_deltas: bool,
32}
33
34pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
35pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
36
37#[allow(unused)]
38pub const INIT_DICT_ID: &str = "null";
39
40const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
41const CONFIG_SYNC_OVERALL_LATENCY_METRIC: &str = "config_sync_overall.latency";
42const CONFIG_SYNC_OVERALL_FORMAT_TAG: &str = "format";
43const CONFIG_SYNC_OVERALL_SOURCE_API_TAG: &str = "source_api";
44const CONFIG_SYNC_OVERALL_ERROR_TAG: &str = "error";
45const CONFIG_SYNC_OVERALL_NETWORK_SUCCESS_TAG: &str = "network_success";
46const CONFIG_SYNC_OVERALL_PROCESS_SUCCESS_TAG: &str = "process_success";
47const CONFIG_SYNC_OVERALL_DELTAS_USED_TAG: &str = "deltas_used";
48const STATSIG_NETWORK_FALLBACK_THRESHOLD: u32 = 5;
49
50pub struct StatsigHttpSpecsAdapter {
51    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
52    network: NetworkClient,
53    sdk_key: String,
54    specs_url: String,
55    fallback_url: Option<String>,
56    init_timeout_ms: u64,
57    sync_interval_duration: Duration,
58    ops_stats: Arc<OpsStatsForInstance>,
59    shutdown_notify: Arc<Notify>,
60    allow_dcs_deltas: bool,
61    use_deltas_next_request: AtomicBool,
62    background_sync_failure_count: AtomicU32,
63}
64
65// OB client -- START
66// These types are only for the config_sync_overall.latency observability metric added in this change.
67enum ResponseFormat {
68    Json,
69    PlainText,
70    Protobuf,
71    Unknown,
72}
73
74enum NetworkSyncOutcome {
75    Success,
76    Failure,
77}
78
79impl NetworkSyncOutcome {
80    fn as_bool(&self) -> bool {
81        matches!(self, Self::Success)
82    }
83}
84
85impl ResponseFormat {
86    fn as_str(&self) -> &str {
87        match self {
88            ResponseFormat::Json => "json",
89            ResponseFormat::PlainText => "plain_text",
90            ResponseFormat::Protobuf => "protobuf",
91            ResponseFormat::Unknown => "unknown",
92        }
93    }
94}
95// OB client -- END
96
97impl StatsigHttpSpecsAdapter {
98    #[must_use]
99    pub fn new(
100        sdk_key: &str,
101        options: Option<&StatsigOptions>,
102        override_url: Option<String>,
103    ) -> Self {
104        let default_options = StatsigOptions::default();
105        let options_ref = options.unwrap_or(&default_options);
106
107        let init_timeout_ms = options_ref
108            .init_timeout_ms
109            .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
110
111        let specs_url = match override_url {
112            Some(url) => url,
113            None => options_ref
114                .specs_url
115                .as_ref()
116                .map(|u| u.to_string())
117                .unwrap_or(DEFAULT_SPECS_URL.to_string()),
118        };
119
120        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
121        let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
122            && specs_url != DEFAULT_SPECS_URL
123        {
124            Some(DEFAULT_SPECS_URL.to_string())
125        } else {
126            None
127        };
128
129        let headers = StatsigMetadata::get_constant_request_headers(
130            sdk_key,
131            options_ref.service_name.as_deref(),
132        );
133        let enable_dcs_deltas = options_ref.enable_dcs_deltas.unwrap_or(false);
134
135        Self {
136            listener: RwLock::new(None),
137            network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
138            sdk_key: sdk_key.to_string(),
139            specs_url,
140            fallback_url,
141            init_timeout_ms,
142            sync_interval_duration: Duration::from_millis(u64::from(
143                options_ref
144                    .specs_sync_interval_ms
145                    .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
146            )),
147            ops_stats: OPS_STATS.get_for_instance(sdk_key),
148            shutdown_notify: Arc::new(Notify::new()),
149            allow_dcs_deltas: enable_dcs_deltas,
150            use_deltas_next_request: AtomicBool::new(enable_dcs_deltas),
151            background_sync_failure_count: AtomicU32::new(0),
152        }
153    }
154
155    pub fn force_shutdown(&self) {
156        self.shutdown_notify.notify_one();
157    }
158
159    pub async fn fetch_specs_from_network(
160        &self,
161        current_specs_info: SpecsInfo,
162        trigger: SpecsSyncTrigger,
163    ) -> Result<NetworkResponse, NetworkError> {
164        let request_args = self.get_request_args(&current_specs_info, trigger);
165        let url = request_args.url.clone();
166        let requested_deltas = request_args.deltas_enabled;
167        match self.handle_specs_request(request_args).await {
168            Ok(response) => Ok(NetworkResponse {
169                data: response,
170                loggable_api: get_api_from_url(&url),
171                requested_deltas,
172            }),
173            Err(e) => Err(e),
174        }
175    }
176
177    fn get_request_args(
178        &self,
179        current_specs_info: &SpecsInfo,
180        trigger: SpecsSyncTrigger,
181    ) -> RequestArgs {
182        let mut params = HashMap::new();
183
184        params.insert("supports_proto".to_string(), "true".to_string());
185        let headers = Some(HashMap::from([
186            ("statsig-supports-proto".to_string(), "true".to_string()),
187            (
188                "accept-encoding".to_string(),
189                "statsig-br, gzip, deflate, br".to_string(),
190            ),
191        ]));
192
193        if let Some(lcut) = current_specs_info.lcut {
194            if lcut > 0 {
195                params.insert("sinceTime".to_string(), lcut.to_string());
196            }
197        }
198
199        let is_init_request = trigger == SpecsSyncTrigger::Initial;
200
201        let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
202            self.init_timeout_ms
203        } else {
204            0
205        };
206
207        if let Some(cs) = &current_specs_info.checksum {
208            params.insert(
209                "checksum".to_string(),
210                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
211            );
212        }
213
214        let use_deltas_next_req = self.use_deltas_next_request.load(Ordering::SeqCst);
215        if use_deltas_next_req {
216            params.insert("accept_deltas".to_string(), "true".to_string());
217        }
218
219        RequestArgs {
220            url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
221            retries: match trigger {
222                SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
223                SpecsSyncTrigger::Background => 3,
224            },
225            query_params: Some(params),
226            deltas_enabled: use_deltas_next_req,
227            accept_gzip_response: true,
228            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
229            timeout_ms,
230            headers,
231            ..RequestArgs::new()
232        }
233    }
234
235    async fn handle_fallback_request(
236        &self,
237        mut request_args: RequestArgs,
238    ) -> Result<NetworkResponse, NetworkError> {
239        let requested_deltas = request_args.deltas_enabled;
240        let fallback_url = match &self.fallback_url {
241            Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
242            None => {
243                return Err(NetworkError::RequestFailed(
244                    request_args.url.clone(),
245                    None,
246                    "No fallback URL".to_string(),
247                ))
248            }
249        };
250
251        request_args.url = fallback_url.clone();
252
253        // TODO logging
254
255        let response = self.handle_specs_request(request_args).await?;
256        Ok(NetworkResponse {
257            data: response,
258            loggable_api: get_api_from_url(&fallback_url),
259            requested_deltas,
260        })
261    }
262
263    async fn handle_specs_request(
264        &self,
265        request_args: RequestArgs,
266    ) -> Result<ResponseData, NetworkError> {
267        let url = request_args.url.clone();
268        let response = self.network.get(request_args).await?;
269        match response.data {
270            Some(data) => Ok(data),
271            None => Err(NetworkError::RequestFailed(
272                url,
273                None,
274                response.error.unwrap_or("No data in response".to_string()),
275            )),
276        }
277    }
278
279    fn should_attempt_fallback(
280        &self,
281        trigger: SpecsSyncTrigger,
282        result: &Result<(), StatsigErr>,
283    ) -> bool {
284        if result.is_ok() || self.fallback_url.is_none() {
285            return false;
286        }
287
288        if trigger != SpecsSyncTrigger::Background {
289            return true;
290        }
291
292        let failure_count = self
293            .background_sync_failure_count
294            .fetch_add(1, Ordering::SeqCst)
295            + 1;
296
297        if failure_count.is_multiple_of(STATSIG_NETWORK_FALLBACK_THRESHOLD) {
298            return true;
299        }
300
301        log_d!(
302            TAG,
303            "Skipping fallback on background sync failure {}. Retrying fallback every {} failures.",
304            failure_count,
305            STATSIG_NETWORK_FALLBACK_THRESHOLD
306        );
307
308        false
309    }
310
311    pub async fn run_background_sync(self: Arc<Self>) {
312        let specs_info = match self
313            .listener
314            .try_read_for(std::time::Duration::from_secs(5))
315        {
316            Some(lock) => match lock.as_ref() {
317                Some(listener) => listener.get_current_specs_info(),
318                None => SpecsInfo::empty(),
319            },
320            None => SpecsInfo::error(),
321        };
322
323        self.ops_stats
324            .set_diagnostics_context(ContextType::ConfigSync);
325        if let Err(e) = self
326            .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
327            .await
328        {
329            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
330                return;
331            }
332            log_e!(TAG, "Background specs sync failed: {}", e);
333        }
334        self.ops_stats.enqueue_diagnostics_event(
335            Some(KeyType::DownloadConfigSpecs),
336            Some(ContextType::ConfigSync),
337        );
338    }
339
340    async fn manually_sync_specs(
341        &self,
342        current_specs_info: SpecsInfo,
343        trigger: SpecsSyncTrigger,
344    ) -> Result<(), StatsigErr> {
345        if let Some(lock) = self
346            .listener
347            .try_read_for(std::time::Duration::from_secs(5))
348        {
349            if lock.is_none() {
350                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
351            }
352        }
353
354        let sync_start_ms = Utc::now().timestamp_millis() as u64;
355        let mut deltas_used = self.use_deltas_next_request.load(Ordering::SeqCst);
356        let response = self
357            .fetch_specs_from_network(current_specs_info.clone(), trigger)
358            .await;
359        let (mut source_api, mut response_format, mut network_success) = match &response {
360            Ok(response) => (
361                response.loggable_api.clone(),
362                Self::get_response_format(&response.data),
363                NetworkSyncOutcome::Success,
364            ),
365            Err(_) => (
366                get_api_from_url(&construct_specs_url(
367                    self.specs_url.as_str(),
368                    self.sdk_key.as_str(),
369                )),
370                ResponseFormat::Unknown,
371                NetworkSyncOutcome::Failure,
372            ),
373        };
374        if let Ok(response) = &response {
375            deltas_used = response.requested_deltas;
376        }
377
378        let mut result = self.process_spec_data(response).await;
379
380        if self.should_attempt_fallback(trigger, &result) {
381            log_d!(TAG, "Falling back to statsig api");
382            let fallback_args = self.get_request_args(&current_specs_info, trigger);
383            deltas_used = fallback_args.deltas_enabled;
384            let response = self.handle_fallback_request(fallback_args).await;
385            match &response {
386                Ok(response) => {
387                    source_api = response.loggable_api.clone();
388                    response_format = Self::get_response_format(&response.data);
389                    network_success = NetworkSyncOutcome::Success;
390                    deltas_used = response.requested_deltas;
391                }
392                Err(_) => {
393                    // Backup request failed, so no successful network payload was returned.
394                    if let Some(fallback_url) = self.fallback_url.as_ref() {
395                        source_api = get_api_from_url(&construct_specs_url(
396                            fallback_url.as_str(),
397                            self.sdk_key.as_str(),
398                        ));
399                    }
400                    network_success = NetworkSyncOutcome::Failure;
401                }
402            }
403            result = self.process_spec_data(response).await;
404        }
405
406        let process_success = !matches!(result.as_ref(), Err(StatsigErr::NetworkError(_)));
407        self.log_config_sync_overall_latency(
408            sync_start_ms,
409            &source_api,
410            response_format.as_str(),
411            network_success.as_bool(),
412            process_success,
413            result
414                .as_ref()
415                .err()
416                .map_or_else(String::new, |e| e.to_string()),
417            deltas_used,
418        );
419
420        result
421    }
422
423    // --------- START - Observability helpers ---------
424    fn get_response_format(response_data: &ResponseData) -> ResponseFormat {
425        if Self::is_response_protobuf(response_data) {
426            return ResponseFormat::Protobuf;
427        }
428
429        let content_type = match response_data.get_header_ref("content-type") {
430            Some(content_type) => content_type.to_ascii_lowercase(),
431            None => return ResponseFormat::Unknown,
432        };
433
434        if content_type.contains("application/json") || content_type.contains("+json") {
435            return ResponseFormat::Json;
436        }
437
438        if content_type.contains("text/plain") {
439            return ResponseFormat::PlainText;
440        }
441
442        ResponseFormat::Unknown
443    }
444
445    fn is_response_protobuf(response_data: &ResponseData) -> bool {
446        let content_type = response_data.get_header_ref("content-type");
447        if content_type.map(|s| s.as_str().contains("application/octet-stream")) != Some(true) {
448            return false;
449        }
450
451        let content_encoding = response_data.get_header_ref("content-encoding");
452        content_encoding.map(|s| s.as_str().contains("statsig-br")) == Some(true)
453    }
454
455    #[allow(clippy::too_many_arguments)]
456    fn log_config_sync_overall_latency(
457        &self,
458        sync_start_ms: u64,
459        source_api: &str,
460        response_format: &str,
461        network_success: bool,
462        process_success: bool,
463        error: String,
464        deltas_used: bool,
465    ) {
466        let latency_ms =
467            (Utc::now().timestamp_millis() as u64).saturating_sub(sync_start_ms) as f64;
468        self.ops_stats.log(ObservabilityEvent::new_event(
469            MetricType::Dist,
470            CONFIG_SYNC_OVERALL_LATENCY_METRIC.to_string(),
471            latency_ms,
472            Some(HashMap::from([
473                (
474                    CONFIG_SYNC_OVERALL_SOURCE_API_TAG.to_string(),
475                    get_api_from_url(source_api),
476                ),
477                (
478                    CONFIG_SYNC_OVERALL_FORMAT_TAG.to_string(),
479                    response_format.to_string(),
480                ),
481                (CONFIG_SYNC_OVERALL_ERROR_TAG.to_string(), error),
482                (
483                    CONFIG_SYNC_OVERALL_NETWORK_SUCCESS_TAG.to_string(),
484                    network_success.to_string(),
485                ),
486                (
487                    CONFIG_SYNC_OVERALL_PROCESS_SUCCESS_TAG.to_string(),
488                    process_success.to_string(),
489                ),
490                (
491                    CONFIG_SYNC_OVERALL_DELTAS_USED_TAG.to_string(),
492                    deltas_used.to_string(),
493                ),
494            ])),
495        ));
496    }
497    // --------- END - Observability helpers ---------
498
499    async fn process_spec_data(
500        &self,
501        response: Result<NetworkResponse, NetworkError>,
502    ) -> Result<(), StatsigErr> {
503        let resp = response.map_err(StatsigErr::NetworkError)?;
504        let requested_deltas = resp.requested_deltas;
505
506        let update = SpecsUpdate {
507            data: resp.data,
508            source: SpecsSource::Network,
509            received_at: Utc::now().timestamp_millis() as u64,
510            source_api: Some(resp.loggable_api),
511        };
512
513        self.ops_stats.add_marker(
514            Marker::new(
515                KeyType::DownloadConfigSpecs,
516                ActionType::Start,
517                Some(StepType::Process),
518            ),
519            None,
520        );
521
522        let result = match self
523            .listener
524            .try_read_for(std::time::Duration::from_secs(5))
525        {
526            Some(lock) => match lock.as_ref() {
527                Some(listener) => listener.did_receive_specs_update(update),
528                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
529            },
530            None => {
531                let err =
532                    StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
533                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
534                Err(err)
535            }
536        };
537
538        if matches!(&result, Err(StatsigErr::ChecksumFailure(_))) {
539            let was_deltas_used = self.use_deltas_next_request.swap(false, Ordering::SeqCst);
540            if was_deltas_used {
541                log_d!(TAG, "Disabling delta requests after checksum failure");
542            }
543        } else if result.is_ok() && !requested_deltas && self.allow_dcs_deltas {
544            let was_deltas_used = self.use_deltas_next_request.swap(true, Ordering::SeqCst);
545            if !was_deltas_used {
546                log_d!(
547                    TAG,
548                    "Re-enabling delta requests after successful non-delta specs update"
549                );
550            }
551        }
552
553        self.ops_stats.add_marker(
554            Marker::new(
555                KeyType::DownloadConfigSpecs,
556                ActionType::End,
557                Some(StepType::Process),
558            )
559            .with_is_success(result.is_ok()),
560            None,
561        );
562
563        result
564    }
565}
566
567#[async_trait]
568impl SpecsAdapter for StatsigHttpSpecsAdapter {
569    async fn start(
570        self: Arc<Self>,
571        _statsig_runtime: &Arc<StatsigRuntime>,
572    ) -> Result<(), StatsigErr> {
573        let specs_info = match self
574            .listener
575            .try_read_for(std::time::Duration::from_secs(5))
576        {
577            Some(lock) => match lock.as_ref() {
578                Some(listener) => listener.get_current_specs_info(),
579                None => SpecsInfo::empty(),
580            },
581            None => SpecsInfo::error(),
582        };
583        self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
584            .await
585    }
586
587    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
588        match self
589            .listener
590            .try_write_for(std::time::Duration::from_secs(5))
591        {
592            Some(mut lock) => *lock = Some(listener),
593            None => {
594                log_e!(TAG, "Failed to acquire write lock on listener");
595            }
596        }
597    }
598
599    async fn schedule_background_sync(
600        self: Arc<Self>,
601        statsig_runtime: &Arc<StatsigRuntime>,
602    ) -> Result<(), StatsigErr> {
603        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
604        let interval_duration = self.sync_interval_duration;
605        let shutdown_notify = self.shutdown_notify.clone();
606
607        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
608            loop {
609                tokio::select! {
610                    () = sleep(interval_duration) => {
611                        if let Some(strong_self) = weak_self.upgrade() {
612                            Self::run_background_sync(strong_self).await;
613                        } else {
614                            log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
615                            break;
616                        }
617                    }
618                    () = rt_shutdown_notify.notified() => {
619                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
620                        break;
621                    },
622                    () = shutdown_notify.notified() => {
623                        log_d!(TAG, "Shutting down specs background sync");
624                        break;
625                    }
626                }
627            }
628        })?;
629
630        Ok(())
631    }
632
633    async fn shutdown(
634        &self,
635        _timeout: Duration,
636        _statsig_runtime: &Arc<StatsigRuntime>,
637    ) -> Result<(), StatsigErr> {
638        self.shutdown_notify.notify_one();
639        Ok(())
640    }
641
642    fn get_type_name(&self) -> String {
643        stringify!(StatsigHttpSpecsAdapter).to_string()
644    }
645}
646
647#[allow(unused)]
648fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
649    format!("{spec_url}/{sdk_key}.json")
650}
651
652#[derive(Debug, Clone, Copy, PartialEq, Eq)]
653pub enum SpecsSyncTrigger {
654    Initial,
655    Background,
656    Manual,
657}
658
659#[cfg(test)]
660mod tests {
661    use super::*;
662    use crate::{networking::ResponseData, specs_adapter::SpecsUpdate, StatsigOptions};
663    use std::collections::HashMap;
664    use std::sync::atomic::AtomicUsize;
665
666    struct ChecksumFailingListener;
667
668    impl SpecsUpdateListener for ChecksumFailingListener {
669        fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
670            Err(StatsigErr::ChecksumFailure(
671                "simulated checksum failure".to_string(),
672            ))
673        }
674
675        fn get_current_specs_info(&self) -> SpecsInfo {
676            SpecsInfo::empty()
677        }
678    }
679
680    struct ChecksumFailingThenSuccessListener {
681        calls: AtomicUsize,
682    }
683
684    impl SpecsUpdateListener for ChecksumFailingThenSuccessListener {
685        fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
686            let curr = self.calls.fetch_add(1, Ordering::SeqCst);
687            if curr == 0 {
688                Err(StatsigErr::ChecksumFailure(
689                    "simulated checksum failure".to_string(),
690                ))
691            } else {
692                Ok(())
693            }
694        }
695
696        fn get_current_specs_info(&self) -> SpecsInfo {
697            SpecsInfo::empty()
698        }
699    }
700
701    #[tokio::test]
702    async fn test_disable_accept_deltas_after_checksum_failure() {
703        let options = StatsigOptions {
704            enable_dcs_deltas: Some(true),
705            ..StatsigOptions::default()
706        };
707        let adapter = StatsigHttpSpecsAdapter::new(
708            "secret-key",
709            Some(&options),
710            Some("https://example.com/v2/download_config_specs".to_string()),
711        );
712        let specs_info = SpecsInfo::empty();
713
714        let request_before = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
715        assert_eq!(
716            request_before
717                .query_params
718                .as_ref()
719                .and_then(|p| p.get("accept_deltas"))
720                .map(String::as_str),
721            Some("true")
722        );
723
724        adapter.initialize(Arc::new(ChecksumFailingListener));
725        let result = adapter
726            .process_spec_data(Ok(NetworkResponse {
727                data: ResponseData::from_bytes(vec![]),
728                loggable_api: "test-api".to_string(),
729                requested_deltas: true,
730            }))
731            .await;
732
733        assert!(matches!(result, Err(StatsigErr::ChecksumFailure(_))));
734
735        let request_after = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
736        assert!(request_after
737            .query_params
738            .as_ref()
739            .is_none_or(|p| !p.contains_key("accept_deltas")));
740    }
741
742    #[tokio::test]
743    async fn test_reenable_accept_deltas_after_successful_non_delta_update() {
744        let options = StatsigOptions {
745            enable_dcs_deltas: Some(true),
746            ..StatsigOptions::default()
747        };
748        let adapter = StatsigHttpSpecsAdapter::new(
749            "secret-key",
750            Some(&options),
751            Some("https://example.com/v2/download_config_specs".to_string()),
752        );
753        let specs_info = SpecsInfo::empty();
754
755        adapter.initialize(Arc::new(ChecksumFailingThenSuccessListener {
756            calls: AtomicUsize::new(0),
757        }));
758
759        let first_result = adapter
760            .process_spec_data(Ok(NetworkResponse {
761                data: ResponseData::from_bytes(vec![]),
762                loggable_api: "test-api".to_string(),
763                requested_deltas: true,
764            }))
765            .await;
766
767        assert!(matches!(first_result, Err(StatsigErr::ChecksumFailure(_))));
768
769        let request_after_failure = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
770        assert!(request_after_failure
771            .query_params
772            .as_ref()
773            .is_none_or(|p| !p.contains_key("accept_deltas")));
774
775        let second_result = adapter
776            .process_spec_data(Ok(NetworkResponse {
777                data: ResponseData::from_bytes(vec![]),
778                loggable_api: "test-api".to_string(),
779                requested_deltas: false,
780            }))
781            .await;
782
783        assert!(second_result.is_ok());
784
785        let request_after_success = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
786        assert_eq!(
787            request_after_success
788                .query_params
789                .as_ref()
790                .and_then(|p| p.get("accept_deltas"))
791                .map(String::as_str),
792            Some("true")
793        );
794    }
795
796    #[test]
797    fn test_get_response_format_json() {
798        let mut headers = HashMap::new();
799        headers.insert("content-type".to_string(), "application/json".to_string());
800        let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
801        assert!(matches!(
802            StatsigHttpSpecsAdapter::get_response_format(&data),
803            ResponseFormat::Json
804        ));
805    }
806
807    #[test]
808    fn test_get_response_format_plain_text() {
809        let mut headers = HashMap::new();
810        headers.insert(
811            "content-type".to_string(),
812            "text/plain; charset=utf-8".to_string(),
813        );
814        let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
815        assert!(matches!(
816            StatsigHttpSpecsAdapter::get_response_format(&data),
817            ResponseFormat::PlainText
818        ));
819    }
820
821    #[test]
822    fn test_get_response_format_protobuf() {
823        let mut headers = HashMap::new();
824        headers.insert(
825            "content-type".to_string(),
826            "application/octet-stream".to_string(),
827        );
828        headers.insert("content-encoding".to_string(), "statsig-br".to_string());
829        let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
830        assert!(matches!(
831            StatsigHttpSpecsAdapter::get_response_format(&data),
832            ResponseFormat::Protobuf
833        ));
834    }
835
836    #[test]
837    fn test_get_response_format_unknown_without_content_type() {
838        let data = ResponseData::from_bytes(vec![]);
839        assert!(matches!(
840            StatsigHttpSpecsAdapter::get_response_format(&data),
841            ResponseFormat::Unknown
842        ));
843    }
844}