Skip to main content

statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
2use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
3use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
4use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
5use crate::sdk_diagnostics::diagnostics::ContextType;
6use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
7use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
8use crate::statsig_err::StatsigErr;
9use crate::statsig_metadata::StatsigMetadata;
10use crate::utils::get_api_from_url;
11use crate::DEFAULT_INIT_TIMEOUT_MS;
12use crate::{
13    log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
14};
15use async_trait::async_trait;
16use chrono::Utc;
17use parking_lot::RwLock;
18use percent_encoding::percent_encode;
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::Duration;
23use tokio::sync::Notify;
24use tokio::time::sleep;
25
26use super::SpecsInfo;
27
28pub struct NetworkResponse {
29    pub data: ResponseData,
30    pub loggable_api: String,
31    pub requested_deltas: bool,
32}
33
34pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
35pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
36
37#[allow(unused)]
38pub const INIT_DICT_ID: &str = "null";
39
40const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
41const CONFIG_SYNC_OVERALL_LATENCY_METRIC: &str = "config_sync_overall.latency";
42const CONFIG_SYNC_OVERALL_FORMAT_TAG: &str = "format";
43const CONFIG_SYNC_OVERALL_SOURCE_API_TAG: &str = "source_api";
44const CONFIG_SYNC_OVERALL_ERROR_TAG: &str = "error";
45const CONFIG_SYNC_OVERALL_NETWORK_SUCCESS_TAG: &str = "network_success";
46const CONFIG_SYNC_OVERALL_PROCESS_SUCCESS_TAG: &str = "process_success";
47
48pub struct StatsigHttpSpecsAdapter {
49    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
50    network: NetworkClient,
51    sdk_key: String,
52    specs_url: String,
53    fallback_url: Option<String>,
54    init_timeout_ms: u64,
55    sync_interval_duration: Duration,
56    ops_stats: Arc<OpsStatsForInstance>,
57    shutdown_notify: Arc<Notify>,
58    allow_dcs_deltas: bool,
59    use_deltas_next_request: AtomicBool,
60}
61
62// OB client -- START
63// These types are only for the config_sync_overall.latency observability metric added in this change.
64enum ResponseFormat {
65    Json,
66    PlainText,
67    Protobuf,
68    Unknown,
69}
70
71enum NetworkSyncOutcome {
72    Success,
73    Failure,
74}
75
76impl NetworkSyncOutcome {
77    fn as_bool(&self) -> bool {
78        matches!(self, Self::Success)
79    }
80}
81
82impl ResponseFormat {
83    fn as_str(&self) -> &str {
84        match self {
85            ResponseFormat::Json => "json",
86            ResponseFormat::PlainText => "plain_text",
87            ResponseFormat::Protobuf => "protobuf",
88            ResponseFormat::Unknown => "unknown",
89        }
90    }
91}
92// OB client -- END
93
94impl StatsigHttpSpecsAdapter {
95    #[must_use]
96    pub fn new(
97        sdk_key: &str,
98        options: Option<&StatsigOptions>,
99        override_url: Option<String>,
100    ) -> Self {
101        let default_options = StatsigOptions::default();
102        let options_ref = options.unwrap_or(&default_options);
103
104        let init_timeout_ms = options_ref
105            .init_timeout_ms
106            .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
107
108        let specs_url = match override_url {
109            Some(url) => url,
110            None => options_ref
111                .specs_url
112                .as_ref()
113                .map(|u| u.to_string())
114                .unwrap_or(DEFAULT_SPECS_URL.to_string()),
115        };
116
117        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
118        let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
119            && specs_url != DEFAULT_SPECS_URL
120        {
121            Some(DEFAULT_SPECS_URL.to_string())
122        } else {
123            None
124        };
125
126        let headers = StatsigMetadata::get_constant_request_headers(
127            sdk_key,
128            options_ref.service_name.as_deref(),
129        );
130        let enable_dcs_deltas = options_ref.enable_dcs_deltas.unwrap_or(false);
131
132        Self {
133            listener: RwLock::new(None),
134            network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
135            sdk_key: sdk_key.to_string(),
136            specs_url,
137            fallback_url,
138            init_timeout_ms,
139            sync_interval_duration: Duration::from_millis(u64::from(
140                options_ref
141                    .specs_sync_interval_ms
142                    .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
143            )),
144            ops_stats: OPS_STATS.get_for_instance(sdk_key),
145            shutdown_notify: Arc::new(Notify::new()),
146            allow_dcs_deltas: enable_dcs_deltas,
147            use_deltas_next_request: AtomicBool::new(enable_dcs_deltas),
148        }
149    }
150
151    pub fn force_shutdown(&self) {
152        self.shutdown_notify.notify_one();
153    }
154
155    pub async fn fetch_specs_from_network(
156        &self,
157        current_specs_info: SpecsInfo,
158        trigger: SpecsSyncTrigger,
159    ) -> Result<NetworkResponse, NetworkError> {
160        let request_args = self.get_request_args(&current_specs_info, trigger);
161        let url = request_args.url.clone();
162        let requested_deltas = request_args.deltas_enabled;
163        match self.handle_specs_request(request_args).await {
164            Ok(response) => Ok(NetworkResponse {
165                data: response,
166                loggable_api: get_api_from_url(&url),
167                requested_deltas,
168            }),
169            Err(e) => Err(e),
170        }
171    }
172
173    fn get_request_args(
174        &self,
175        current_specs_info: &SpecsInfo,
176        trigger: SpecsSyncTrigger,
177    ) -> RequestArgs {
178        let mut params = HashMap::new();
179
180        params.insert("supports_proto".to_string(), "true".to_string());
181        let headers = Some(HashMap::from([
182            ("statsig-supports-proto".to_string(), "true".to_string()),
183            (
184                "accept-encoding".to_string(),
185                "statsig-br, gzip, deflate, br".to_string(),
186            ),
187        ]));
188
189        if let Some(lcut) = current_specs_info.lcut {
190            if lcut > 0 {
191                params.insert("sinceTime".to_string(), lcut.to_string());
192            }
193        }
194
195        let is_init_request = trigger == SpecsSyncTrigger::Initial;
196
197        let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
198            self.init_timeout_ms
199        } else {
200            0
201        };
202
203        if let Some(cs) = &current_specs_info.checksum {
204            params.insert(
205                "checksum".to_string(),
206                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
207            );
208        }
209
210        let use_deltas_next_req = self.use_deltas_next_request.load(Ordering::SeqCst);
211        if use_deltas_next_req {
212            params.insert("accept_deltas".to_string(), "true".to_string());
213        }
214
215        RequestArgs {
216            url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
217            retries: match trigger {
218                SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
219                SpecsSyncTrigger::Background => 3,
220            },
221            query_params: Some(params),
222            deltas_enabled: use_deltas_next_req,
223            accept_gzip_response: true,
224            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
225            timeout_ms,
226            headers,
227            ..RequestArgs::new()
228        }
229    }
230
231    async fn handle_fallback_request(
232        &self,
233        mut request_args: RequestArgs,
234    ) -> Result<NetworkResponse, NetworkError> {
235        let requested_deltas = request_args.deltas_enabled;
236        let fallback_url = match &self.fallback_url {
237            Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
238            None => {
239                return Err(NetworkError::RequestFailed(
240                    request_args.url.clone(),
241                    None,
242                    "No fallback URL".to_string(),
243                ))
244            }
245        };
246
247        request_args.url = fallback_url.clone();
248
249        // TODO logging
250
251        let response = self.handle_specs_request(request_args).await?;
252        Ok(NetworkResponse {
253            data: response,
254            loggable_api: get_api_from_url(&fallback_url),
255            requested_deltas,
256        })
257    }
258
259    async fn handle_specs_request(
260        &self,
261        request_args: RequestArgs,
262    ) -> Result<ResponseData, NetworkError> {
263        let url = request_args.url.clone();
264        let response = self.network.get(request_args).await?;
265        match response.data {
266            Some(data) => Ok(data),
267            None => Err(NetworkError::RequestFailed(
268                url,
269                None,
270                response.error.unwrap_or("No data in response".to_string()),
271            )),
272        }
273    }
274
275    pub async fn run_background_sync(self: Arc<Self>) {
276        let specs_info = match self
277            .listener
278            .try_read_for(std::time::Duration::from_secs(5))
279        {
280            Some(lock) => match lock.as_ref() {
281                Some(listener) => listener.get_current_specs_info(),
282                None => SpecsInfo::empty(),
283            },
284            None => SpecsInfo::error(),
285        };
286
287        self.ops_stats
288            .set_diagnostics_context(ContextType::ConfigSync);
289        if let Err(e) = self
290            .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
291            .await
292        {
293            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
294                return;
295            }
296            log_e!(TAG, "Background specs sync failed: {}", e);
297        }
298        self.ops_stats.enqueue_diagnostics_event(
299            Some(KeyType::DownloadConfigSpecs),
300            Some(ContextType::ConfigSync),
301        );
302    }
303
304    async fn manually_sync_specs(
305        &self,
306        current_specs_info: SpecsInfo,
307        trigger: SpecsSyncTrigger,
308    ) -> Result<(), StatsigErr> {
309        if let Some(lock) = self
310            .listener
311            .try_read_for(std::time::Duration::from_secs(5))
312        {
313            if lock.is_none() {
314                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
315            }
316        }
317
318        let sync_start_ms = Utc::now().timestamp_millis() as u64;
319        let response = self
320            .fetch_specs_from_network(current_specs_info.clone(), trigger)
321            .await;
322        let (mut source_api, mut response_format, mut network_success) = match &response {
323            Ok(response) => (
324                response.loggable_api.clone(),
325                Self::get_response_format(&response.data),
326                NetworkSyncOutcome::Success,
327            ),
328            Err(_) => (
329                get_api_from_url(&construct_specs_url(
330                    self.specs_url.as_str(),
331                    self.sdk_key.as_str(),
332                )),
333                ResponseFormat::Unknown,
334                NetworkSyncOutcome::Failure,
335            ),
336        };
337
338        let mut result = self.process_spec_data(response).await;
339
340        if result.is_err() && self.fallback_url.is_some() {
341            log_d!(TAG, "Falling back to statsig api");
342            let response = self
343                .handle_fallback_request(self.get_request_args(&current_specs_info, trigger))
344                .await;
345            match &response {
346                Ok(response) => {
347                    source_api = response.loggable_api.clone();
348                    response_format = Self::get_response_format(&response.data);
349                    network_success = NetworkSyncOutcome::Success;
350                }
351                Err(_) => {
352                    // Backup request failed, so no successful network payload was returned.
353                    if let Some(fallback_url) = self.fallback_url.as_ref() {
354                        source_api = get_api_from_url(&construct_specs_url(
355                            fallback_url.as_str(),
356                            self.sdk_key.as_str(),
357                        ));
358                    }
359                    network_success = NetworkSyncOutcome::Failure;
360                }
361            }
362            result = self.process_spec_data(response).await;
363        }
364
365        let process_success = !matches!(result.as_ref(), Err(StatsigErr::NetworkError(_)));
366        self.log_config_sync_overall_latency(
367            sync_start_ms,
368            &source_api,
369            response_format.as_str(),
370            network_success.as_bool(),
371            process_success,
372            result
373                .as_ref()
374                .err()
375                .map_or_else(String::new, |e| e.to_string()),
376        );
377
378        result
379    }
380
381    // --------- START - Observability helpers ---------
382    fn get_response_format(response_data: &ResponseData) -> ResponseFormat {
383        if Self::is_response_protobuf(response_data) {
384            return ResponseFormat::Protobuf;
385        }
386
387        let content_type = match response_data.get_header_ref("content-type") {
388            Some(content_type) => content_type.to_ascii_lowercase(),
389            None => return ResponseFormat::Unknown,
390        };
391
392        if content_type.contains("application/json") || content_type.contains("+json") {
393            return ResponseFormat::Json;
394        }
395
396        if content_type.contains("text/plain") {
397            return ResponseFormat::PlainText;
398        }
399
400        ResponseFormat::Unknown
401    }
402
403    fn is_response_protobuf(response_data: &ResponseData) -> bool {
404        let content_type = response_data.get_header_ref("content-type");
405        if content_type.map(|s| s.as_str().contains("application/octet-stream")) != Some(true) {
406            return false;
407        }
408
409        let content_encoding = response_data.get_header_ref("content-encoding");
410        content_encoding.map(|s| s.as_str().contains("statsig-br")) == Some(true)
411    }
412
413    fn log_config_sync_overall_latency(
414        &self,
415        sync_start_ms: u64,
416        source_api: &str,
417        response_format: &str,
418        network_success: bool,
419        process_success: bool,
420        error: String,
421    ) {
422        let latency_ms =
423            (Utc::now().timestamp_millis() as u64).saturating_sub(sync_start_ms) as f64;
424        self.ops_stats.log(ObservabilityEvent::new_event(
425            MetricType::Dist,
426            CONFIG_SYNC_OVERALL_LATENCY_METRIC.to_string(),
427            latency_ms,
428            Some(HashMap::from([
429                (
430                    CONFIG_SYNC_OVERALL_SOURCE_API_TAG.to_string(),
431                    get_api_from_url(source_api),
432                ),
433                (
434                    CONFIG_SYNC_OVERALL_FORMAT_TAG.to_string(),
435                    response_format.to_string(),
436                ),
437                (CONFIG_SYNC_OVERALL_ERROR_TAG.to_string(), error),
438                (
439                    CONFIG_SYNC_OVERALL_NETWORK_SUCCESS_TAG.to_string(),
440                    network_success.to_string(),
441                ),
442                (
443                    CONFIG_SYNC_OVERALL_PROCESS_SUCCESS_TAG.to_string(),
444                    process_success.to_string(),
445                ),
446            ])),
447        ));
448    }
449    // --------- END - Observability helpers ---------
450
451    async fn process_spec_data(
452        &self,
453        response: Result<NetworkResponse, NetworkError>,
454    ) -> Result<(), StatsigErr> {
455        let resp = response.map_err(StatsigErr::NetworkError)?;
456        let requested_deltas = resp.requested_deltas;
457
458        let update = SpecsUpdate {
459            data: resp.data,
460            source: SpecsSource::Network,
461            received_at: Utc::now().timestamp_millis() as u64,
462            source_api: Some(resp.loggable_api),
463        };
464
465        self.ops_stats.add_marker(
466            Marker::new(
467                KeyType::DownloadConfigSpecs,
468                ActionType::Start,
469                Some(StepType::Process),
470            ),
471            None,
472        );
473
474        let result = match self
475            .listener
476            .try_read_for(std::time::Duration::from_secs(5))
477        {
478            Some(lock) => match lock.as_ref() {
479                Some(listener) => listener.did_receive_specs_update(update),
480                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
481            },
482            None => {
483                let err =
484                    StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
485                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
486                Err(err)
487            }
488        };
489
490        if matches!(&result, Err(StatsigErr::ChecksumFailure(_))) {
491            let was_deltas_used = self.use_deltas_next_request.swap(false, Ordering::SeqCst);
492            if was_deltas_used {
493                log_d!(TAG, "Disabling delta requests after checksum failure");
494            }
495        } else if result.is_ok() && !requested_deltas && self.allow_dcs_deltas {
496            let was_deltas_used = self.use_deltas_next_request.swap(true, Ordering::SeqCst);
497            if !was_deltas_used {
498                log_d!(
499                    TAG,
500                    "Re-enabling delta requests after successful non-delta specs update"
501                );
502            }
503        }
504
505        self.ops_stats.add_marker(
506            Marker::new(
507                KeyType::DownloadConfigSpecs,
508                ActionType::End,
509                Some(StepType::Process),
510            )
511            .with_is_success(result.is_ok()),
512            None,
513        );
514
515        result
516    }
517}
518
519#[async_trait]
520impl SpecsAdapter for StatsigHttpSpecsAdapter {
521    async fn start(
522        self: Arc<Self>,
523        _statsig_runtime: &Arc<StatsigRuntime>,
524    ) -> Result<(), StatsigErr> {
525        let specs_info = match self
526            .listener
527            .try_read_for(std::time::Duration::from_secs(5))
528        {
529            Some(lock) => match lock.as_ref() {
530                Some(listener) => listener.get_current_specs_info(),
531                None => SpecsInfo::empty(),
532            },
533            None => SpecsInfo::error(),
534        };
535        self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
536            .await
537    }
538
539    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
540        match self
541            .listener
542            .try_write_for(std::time::Duration::from_secs(5))
543        {
544            Some(mut lock) => *lock = Some(listener),
545            None => {
546                log_e!(TAG, "Failed to acquire write lock on listener");
547            }
548        }
549    }
550
551    async fn schedule_background_sync(
552        self: Arc<Self>,
553        statsig_runtime: &Arc<StatsigRuntime>,
554    ) -> Result<(), StatsigErr> {
555        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
556        let interval_duration = self.sync_interval_duration;
557        let shutdown_notify = self.shutdown_notify.clone();
558
559        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
560            loop {
561                tokio::select! {
562                    () = sleep(interval_duration) => {
563                        if let Some(strong_self) = weak_self.upgrade() {
564                            Self::run_background_sync(strong_self).await;
565                        } else {
566                            log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
567                            break;
568                        }
569                    }
570                    () = rt_shutdown_notify.notified() => {
571                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
572                        break;
573                    },
574                    () = shutdown_notify.notified() => {
575                        log_d!(TAG, "Shutting down specs background sync");
576                        break;
577                    }
578                }
579            }
580        })?;
581
582        Ok(())
583    }
584
585    async fn shutdown(
586        &self,
587        _timeout: Duration,
588        _statsig_runtime: &Arc<StatsigRuntime>,
589    ) -> Result<(), StatsigErr> {
590        self.shutdown_notify.notify_one();
591        Ok(())
592    }
593
594    fn get_type_name(&self) -> String {
595        stringify!(StatsigHttpSpecsAdapter).to_string()
596    }
597}
598
599#[allow(unused)]
600fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
601    format!("{spec_url}/{sdk_key}.json")
602}
603
604#[derive(Debug, Clone, Copy, PartialEq, Eq)]
605pub enum SpecsSyncTrigger {
606    Initial,
607    Background,
608    Manual,
609}
610
611#[cfg(test)]
612mod tests {
613    use super::*;
614    use crate::{networking::ResponseData, specs_adapter::SpecsUpdate, StatsigOptions};
615    use std::collections::HashMap;
616    use std::sync::atomic::AtomicUsize;
617
618    struct ChecksumFailingListener;
619
620    impl SpecsUpdateListener for ChecksumFailingListener {
621        fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
622            Err(StatsigErr::ChecksumFailure(
623                "simulated checksum failure".to_string(),
624            ))
625        }
626
627        fn get_current_specs_info(&self) -> SpecsInfo {
628            SpecsInfo::empty()
629        }
630    }
631
632    struct ChecksumFailingThenSuccessListener {
633        calls: AtomicUsize,
634    }
635
636    impl SpecsUpdateListener for ChecksumFailingThenSuccessListener {
637        fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
638            let curr = self.calls.fetch_add(1, Ordering::SeqCst);
639            if curr == 0 {
640                Err(StatsigErr::ChecksumFailure(
641                    "simulated checksum failure".to_string(),
642                ))
643            } else {
644                Ok(())
645            }
646        }
647
648        fn get_current_specs_info(&self) -> SpecsInfo {
649            SpecsInfo::empty()
650        }
651    }
652
653    #[tokio::test]
654    async fn test_disable_accept_deltas_after_checksum_failure() {
655        let options = StatsigOptions {
656            enable_dcs_deltas: Some(true),
657            ..StatsigOptions::default()
658        };
659        let adapter = StatsigHttpSpecsAdapter::new(
660            "secret-key",
661            Some(&options),
662            Some("https://example.com/v2/download_config_specs".to_string()),
663        );
664        let specs_info = SpecsInfo::empty();
665
666        let request_before = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
667        assert_eq!(
668            request_before
669                .query_params
670                .as_ref()
671                .and_then(|p| p.get("accept_deltas"))
672                .map(String::as_str),
673            Some("true")
674        );
675
676        adapter.initialize(Arc::new(ChecksumFailingListener));
677        let result = adapter
678            .process_spec_data(Ok(NetworkResponse {
679                data: ResponseData::from_bytes(vec![]),
680                loggable_api: "test-api".to_string(),
681                requested_deltas: true,
682            }))
683            .await;
684
685        assert!(matches!(result, Err(StatsigErr::ChecksumFailure(_))));
686
687        let request_after = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
688        assert!(request_after
689            .query_params
690            .as_ref()
691            .is_none_or(|p| !p.contains_key("accept_deltas")));
692    }
693
694    #[tokio::test]
695    async fn test_reenable_accept_deltas_after_successful_non_delta_update() {
696        let options = StatsigOptions {
697            enable_dcs_deltas: Some(true),
698            ..StatsigOptions::default()
699        };
700        let adapter = StatsigHttpSpecsAdapter::new(
701            "secret-key",
702            Some(&options),
703            Some("https://example.com/v2/download_config_specs".to_string()),
704        );
705        let specs_info = SpecsInfo::empty();
706
707        adapter.initialize(Arc::new(ChecksumFailingThenSuccessListener {
708            calls: AtomicUsize::new(0),
709        }));
710
711        let first_result = adapter
712            .process_spec_data(Ok(NetworkResponse {
713                data: ResponseData::from_bytes(vec![]),
714                loggable_api: "test-api".to_string(),
715                requested_deltas: true,
716            }))
717            .await;
718
719        assert!(matches!(first_result, Err(StatsigErr::ChecksumFailure(_))));
720
721        let request_after_failure = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
722        assert!(request_after_failure
723            .query_params
724            .as_ref()
725            .is_none_or(|p| !p.contains_key("accept_deltas")));
726
727        let second_result = adapter
728            .process_spec_data(Ok(NetworkResponse {
729                data: ResponseData::from_bytes(vec![]),
730                loggable_api: "test-api".to_string(),
731                requested_deltas: false,
732            }))
733            .await;
734
735        assert!(second_result.is_ok());
736
737        let request_after_success = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
738        assert_eq!(
739            request_after_success
740                .query_params
741                .as_ref()
742                .and_then(|p| p.get("accept_deltas"))
743                .map(String::as_str),
744            Some("true")
745        );
746    }
747
748    #[test]
749    fn test_get_response_format_json() {
750        let mut headers = HashMap::new();
751        headers.insert("content-type".to_string(), "application/json".to_string());
752        let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
753        assert!(matches!(
754            StatsigHttpSpecsAdapter::get_response_format(&data),
755            ResponseFormat::Json
756        ));
757    }
758
759    #[test]
760    fn test_get_response_format_plain_text() {
761        let mut headers = HashMap::new();
762        headers.insert(
763            "content-type".to_string(),
764            "text/plain; charset=utf-8".to_string(),
765        );
766        let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
767        assert!(matches!(
768            StatsigHttpSpecsAdapter::get_response_format(&data),
769            ResponseFormat::PlainText
770        ));
771    }
772
773    #[test]
774    fn test_get_response_format_protobuf() {
775        let mut headers = HashMap::new();
776        headers.insert(
777            "content-type".to_string(),
778            "application/octet-stream".to_string(),
779        );
780        headers.insert("content-encoding".to_string(), "statsig-br".to_string());
781        let data = ResponseData::from_bytes_with_headers(vec![], Some(headers));
782        assert!(matches!(
783            StatsigHttpSpecsAdapter::get_response_format(&data),
784            ResponseFormat::Protobuf
785        ));
786    }
787
788    #[test]
789    fn test_get_response_format_unknown_without_content_type() {
790        let data = ResponseData::from_bytes(vec![]);
791        assert!(matches!(
792            StatsigHttpSpecsAdapter::get_response_format(&data),
793            ResponseFormat::Unknown
794        ));
795    }
796}