Skip to main content

statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::DEFAULT_INIT_TIMEOUT_MS;
11use crate::{
12    log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
13};
14use async_trait::async_trait;
15use chrono::Utc;
16use parking_lot::RwLock;
17use percent_encoding::percent_encode;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::{Arc, Weak};
21use std::time::Duration;
22use tokio::sync::Notify;
23use tokio::time::sleep;
24
25use super::SpecsInfo;
26
27pub struct NetworkResponse {
28    pub data: ResponseData,
29    pub api: String,
30    pub requested_deltas: bool,
31}
32
33pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
34pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
35
36#[allow(unused)]
37pub const INIT_DICT_ID: &str = "null";
38
39const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
40pub struct StatsigHttpSpecsAdapter {
41    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
42    network: NetworkClient,
43    sdk_key: String,
44    specs_url: String,
45    fallback_url: Option<String>,
46    init_timeout_ms: u64,
47    sync_interval_duration: Duration,
48    ops_stats: Arc<OpsStatsForInstance>,
49    shutdown_notify: Arc<Notify>,
50    allow_dcs_deltas: bool,
51    use_deltas_next_request: AtomicBool,
52}
53
54impl StatsigHttpSpecsAdapter {
55    #[must_use]
56    pub fn new(
57        sdk_key: &str,
58        options: Option<&StatsigOptions>,
59        override_url: Option<String>,
60    ) -> Self {
61        let default_options = StatsigOptions::default();
62        let options_ref = options.unwrap_or(&default_options);
63
64        let init_timeout_ms = options_ref
65            .init_timeout_ms
66            .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
67
68        let specs_url = match override_url {
69            Some(url) => url,
70            None => options_ref
71                .specs_url
72                .as_ref()
73                .map(|u| u.to_string())
74                .unwrap_or(DEFAULT_SPECS_URL.to_string()),
75        };
76
77        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
78        let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
79            && specs_url != DEFAULT_SPECS_URL
80        {
81            Some(DEFAULT_SPECS_URL.to_string())
82        } else {
83            None
84        };
85
86        let headers = StatsigMetadata::get_constant_request_headers(
87            sdk_key,
88            options_ref.service_name.as_deref(),
89        );
90        let enable_dcs_deltas = options_ref.enable_dcs_deltas.unwrap_or(false);
91
92        Self {
93            listener: RwLock::new(None),
94            network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
95            sdk_key: sdk_key.to_string(),
96            specs_url,
97            fallback_url,
98            init_timeout_ms,
99            sync_interval_duration: Duration::from_millis(u64::from(
100                options_ref
101                    .specs_sync_interval_ms
102                    .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
103            )),
104            ops_stats: OPS_STATS.get_for_instance(sdk_key),
105            shutdown_notify: Arc::new(Notify::new()),
106            allow_dcs_deltas: enable_dcs_deltas,
107            use_deltas_next_request: AtomicBool::new(enable_dcs_deltas),
108        }
109    }
110
111    pub fn force_shutdown(&self) {
112        self.shutdown_notify.notify_one();
113    }
114
115    pub async fn fetch_specs_from_network(
116        &self,
117        current_specs_info: SpecsInfo,
118        trigger: SpecsSyncTrigger,
119    ) -> Result<NetworkResponse, NetworkError> {
120        let request_args = self.get_request_args(&current_specs_info, trigger);
121        let url = request_args.url.clone();
122        let requested_deltas = request_args.deltas_enabled;
123        match self.handle_specs_request(request_args).await {
124            Ok(response) => Ok(NetworkResponse {
125                data: response,
126                api: get_api_from_url(&url),
127                requested_deltas,
128            }),
129            Err(e) => Err(e),
130        }
131    }
132
133    fn get_request_args(
134        &self,
135        current_specs_info: &SpecsInfo,
136        trigger: SpecsSyncTrigger,
137    ) -> RequestArgs {
138        let mut params = HashMap::new();
139
140        params.insert("supports_proto".to_string(), "true".to_string());
141        let headers = Some(HashMap::from([
142            ("statsig-supports-proto".to_string(), "true".to_string()),
143            (
144                "accept-encoding".to_string(),
145                "statsig-br, gzip, deflate, br".to_string(),
146            ),
147        ]));
148
149        if let Some(lcut) = current_specs_info.lcut {
150            if lcut > 0 {
151                params.insert("sinceTime".to_string(), lcut.to_string());
152            }
153        }
154
155        let is_init_request = trigger == SpecsSyncTrigger::Initial;
156
157        let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
158            self.init_timeout_ms
159        } else {
160            0
161        };
162
163        if let Some(cs) = &current_specs_info.checksum {
164            params.insert(
165                "checksum".to_string(),
166                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
167            );
168        }
169
170        let use_deltas_next_req = self.use_deltas_next_request.load(Ordering::SeqCst);
171        if use_deltas_next_req {
172            params.insert("accept_deltas".to_string(), "true".to_string());
173        }
174
175        RequestArgs {
176            url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
177            retries: match trigger {
178                SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
179                SpecsSyncTrigger::Background => 3,
180            },
181            query_params: Some(params),
182            deltas_enabled: use_deltas_next_req,
183            accept_gzip_response: true,
184            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
185            timeout_ms,
186            headers,
187            ..RequestArgs::new()
188        }
189    }
190
191    async fn handle_fallback_request(
192        &self,
193        mut request_args: RequestArgs,
194    ) -> Result<NetworkResponse, NetworkError> {
195        let requested_deltas = request_args.deltas_enabled;
196        let fallback_url = match &self.fallback_url {
197            Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
198            None => {
199                return Err(NetworkError::RequestFailed(
200                    request_args.url.clone(),
201                    None,
202                    "No fallback URL".to_string(),
203                ))
204            }
205        };
206
207        request_args.url = fallback_url.clone();
208
209        // TODO logging
210
211        let response = self.handle_specs_request(request_args).await?;
212        Ok(NetworkResponse {
213            data: response,
214            api: get_api_from_url(&fallback_url),
215            requested_deltas,
216        })
217    }
218
219    async fn handle_specs_request(
220        &self,
221        request_args: RequestArgs,
222    ) -> Result<ResponseData, NetworkError> {
223        let url = request_args.url.clone();
224        let response = self.network.get(request_args).await?;
225        match response.data {
226            Some(data) => Ok(data),
227            None => Err(NetworkError::RequestFailed(
228                url,
229                None,
230                response.error.unwrap_or("No data in response".to_string()),
231            )),
232        }
233    }
234
235    pub async fn run_background_sync(self: Arc<Self>) {
236        let specs_info = match self
237            .listener
238            .try_read_for(std::time::Duration::from_secs(5))
239        {
240            Some(lock) => match lock.as_ref() {
241                Some(listener) => listener.get_current_specs_info(),
242                None => SpecsInfo::empty(),
243            },
244            None => SpecsInfo::error(),
245        };
246
247        self.ops_stats
248            .set_diagnostics_context(ContextType::ConfigSync);
249        if let Err(e) = self
250            .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
251            .await
252        {
253            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
254                return;
255            }
256            log_e!(TAG, "Background specs sync failed: {}", e);
257        }
258        self.ops_stats.enqueue_diagnostics_event(
259            Some(KeyType::DownloadConfigSpecs),
260            Some(ContextType::ConfigSync),
261        );
262    }
263
264    async fn manually_sync_specs(
265        &self,
266        current_specs_info: SpecsInfo,
267        trigger: SpecsSyncTrigger,
268    ) -> Result<(), StatsigErr> {
269        if let Some(lock) = self
270            .listener
271            .try_read_for(std::time::Duration::from_secs(5))
272        {
273            if lock.is_none() {
274                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
275            }
276        }
277
278        let response = self
279            .fetch_specs_from_network(current_specs_info.clone(), trigger)
280            .await;
281        let result = self.process_spec_data(response).await;
282
283        if result.is_err() && self.fallback_url.is_some() {
284            log_d!(TAG, "Falling back to statsig api");
285            let response = self
286                .handle_fallback_request(self.get_request_args(&current_specs_info, trigger))
287                .await;
288            return self.process_spec_data(response).await;
289        }
290
291        result
292    }
293
294    async fn process_spec_data(
295        &self,
296        response: Result<NetworkResponse, NetworkError>,
297    ) -> Result<(), StatsigErr> {
298        let resp = response.map_err(StatsigErr::NetworkError)?;
299        let requested_deltas = resp.requested_deltas;
300
301        let update = SpecsUpdate {
302            data: resp.data,
303            source: SpecsSource::Network,
304            received_at: Utc::now().timestamp_millis() as u64,
305            source_api: Some(resp.api),
306        };
307
308        self.ops_stats.add_marker(
309            Marker::new(
310                KeyType::DownloadConfigSpecs,
311                ActionType::Start,
312                Some(StepType::Process),
313            ),
314            None,
315        );
316
317        let result = match self
318            .listener
319            .try_read_for(std::time::Duration::from_secs(5))
320        {
321            Some(lock) => match lock.as_ref() {
322                Some(listener) => listener.did_receive_specs_update(update),
323                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
324            },
325            None => {
326                let err =
327                    StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
328                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
329                Err(err)
330            }
331        };
332
333        if matches!(&result, Err(StatsigErr::ChecksumFailure(_))) {
334            let was_deltas_used = self.use_deltas_next_request.swap(false, Ordering::SeqCst);
335            if was_deltas_used {
336                log_d!(TAG, "Disabling delta requests after checksum failure");
337            }
338        } else if result.is_ok() && !requested_deltas && self.allow_dcs_deltas {
339            let was_deltas_used = self.use_deltas_next_request.swap(true, Ordering::SeqCst);
340            if !was_deltas_used {
341                log_d!(
342                    TAG,
343                    "Re-enabling delta requests after successful non-delta specs update"
344                );
345            }
346        }
347
348        self.ops_stats.add_marker(
349            Marker::new(
350                KeyType::DownloadConfigSpecs,
351                ActionType::End,
352                Some(StepType::Process),
353            )
354            .with_is_success(result.is_ok()),
355            None,
356        );
357
358        result
359    }
360}
361
362#[async_trait]
363impl SpecsAdapter for StatsigHttpSpecsAdapter {
364    async fn start(
365        self: Arc<Self>,
366        _statsig_runtime: &Arc<StatsigRuntime>,
367    ) -> Result<(), StatsigErr> {
368        let specs_info = match self
369            .listener
370            .try_read_for(std::time::Duration::from_secs(5))
371        {
372            Some(lock) => match lock.as_ref() {
373                Some(listener) => listener.get_current_specs_info(),
374                None => SpecsInfo::empty(),
375            },
376            None => SpecsInfo::error(),
377        };
378        self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
379            .await
380    }
381
382    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
383        match self
384            .listener
385            .try_write_for(std::time::Duration::from_secs(5))
386        {
387            Some(mut lock) => *lock = Some(listener),
388            None => {
389                log_e!(TAG, "Failed to acquire write lock on listener");
390            }
391        }
392    }
393
394    async fn schedule_background_sync(
395        self: Arc<Self>,
396        statsig_runtime: &Arc<StatsigRuntime>,
397    ) -> Result<(), StatsigErr> {
398        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
399        let interval_duration = self.sync_interval_duration;
400        let shutdown_notify = self.shutdown_notify.clone();
401
402        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
403            loop {
404                tokio::select! {
405                    () = sleep(interval_duration) => {
406                        if let Some(strong_self) = weak_self.upgrade() {
407                            Self::run_background_sync(strong_self).await;
408                        } else {
409                            log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
410                            break;
411                        }
412                    }
413                    () = rt_shutdown_notify.notified() => {
414                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
415                        break;
416                    },
417                    () = shutdown_notify.notified() => {
418                        log_d!(TAG, "Shutting down specs background sync");
419                        break;
420                    }
421                }
422            }
423        })?;
424
425        Ok(())
426    }
427
428    async fn shutdown(
429        &self,
430        _timeout: Duration,
431        _statsig_runtime: &Arc<StatsigRuntime>,
432    ) -> Result<(), StatsigErr> {
433        self.shutdown_notify.notify_one();
434        Ok(())
435    }
436
437    fn get_type_name(&self) -> String {
438        stringify!(StatsigHttpSpecsAdapter).to_string()
439    }
440}
441
442#[allow(unused)]
443fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
444    format!("{spec_url}/{sdk_key}.json")
445}
446
447#[derive(Debug, Clone, Copy, PartialEq, Eq)]
448pub enum SpecsSyncTrigger {
449    Initial,
450    Background,
451    Manual,
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use crate::{networking::ResponseData, specs_adapter::SpecsUpdate, StatsigOptions};
458    use std::sync::atomic::AtomicUsize;
459
460    struct ChecksumFailingListener;
461
462    impl SpecsUpdateListener for ChecksumFailingListener {
463        fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
464            Err(StatsigErr::ChecksumFailure(
465                "simulated checksum failure".to_string(),
466            ))
467        }
468
469        fn get_current_specs_info(&self) -> SpecsInfo {
470            SpecsInfo::empty()
471        }
472    }
473
474    struct ChecksumFailingThenSuccessListener {
475        calls: AtomicUsize,
476    }
477
478    impl SpecsUpdateListener for ChecksumFailingThenSuccessListener {
479        fn did_receive_specs_update(&self, _update: SpecsUpdate) -> Result<(), StatsigErr> {
480            let curr = self.calls.fetch_add(1, Ordering::SeqCst);
481            if curr == 0 {
482                Err(StatsigErr::ChecksumFailure(
483                    "simulated checksum failure".to_string(),
484                ))
485            } else {
486                Ok(())
487            }
488        }
489
490        fn get_current_specs_info(&self) -> SpecsInfo {
491            SpecsInfo::empty()
492        }
493    }
494
495    #[tokio::test]
496    async fn test_disable_accept_deltas_after_checksum_failure() {
497        let options = StatsigOptions {
498            enable_dcs_deltas: Some(true),
499            ..StatsigOptions::default()
500        };
501        let adapter = StatsigHttpSpecsAdapter::new(
502            "secret-key",
503            Some(&options),
504            Some("https://example.com/v2/download_config_specs".to_string()),
505        );
506        let specs_info = SpecsInfo::empty();
507
508        let request_before = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
509        assert_eq!(
510            request_before
511                .query_params
512                .as_ref()
513                .and_then(|p| p.get("accept_deltas"))
514                .map(String::as_str),
515            Some("true")
516        );
517
518        adapter.initialize(Arc::new(ChecksumFailingListener));
519        let result = adapter
520            .process_spec_data(Ok(NetworkResponse {
521                data: ResponseData::from_bytes(vec![]),
522                api: "test-api".to_string(),
523                requested_deltas: true,
524            }))
525            .await;
526
527        assert!(matches!(result, Err(StatsigErr::ChecksumFailure(_))));
528
529        let request_after = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
530        assert!(request_after
531            .query_params
532            .as_ref()
533            .is_none_or(|p| !p.contains_key("accept_deltas")));
534    }
535
536    #[tokio::test]
537    async fn test_reenable_accept_deltas_after_successful_non_delta_update() {
538        let options = StatsigOptions {
539            enable_dcs_deltas: Some(true),
540            ..StatsigOptions::default()
541        };
542        let adapter = StatsigHttpSpecsAdapter::new(
543            "secret-key",
544            Some(&options),
545            Some("https://example.com/v2/download_config_specs".to_string()),
546        );
547        let specs_info = SpecsInfo::empty();
548
549        adapter.initialize(Arc::new(ChecksumFailingThenSuccessListener {
550            calls: AtomicUsize::new(0),
551        }));
552
553        let first_result = adapter
554            .process_spec_data(Ok(NetworkResponse {
555                data: ResponseData::from_bytes(vec![]),
556                api: "test-api".to_string(),
557                requested_deltas: true,
558            }))
559            .await;
560
561        assert!(matches!(first_result, Err(StatsigErr::ChecksumFailure(_))));
562
563        let request_after_failure = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
564        assert!(request_after_failure
565            .query_params
566            .as_ref()
567            .is_none_or(|p| !p.contains_key("accept_deltas")));
568
569        let second_result = adapter
570            .process_spec_data(Ok(NetworkResponse {
571                data: ResponseData::from_bytes(vec![]),
572                api: "test-api".to_string(),
573                requested_deltas: false,
574            }))
575            .await;
576
577        assert!(second_result.is_ok());
578
579        let request_after_success = adapter.get_request_args(&specs_info, SpecsSyncTrigger::Manual);
580        assert_eq!(
581            request_after_success
582                .query_params
583                .as_ref()
584                .and_then(|p| p.get("accept_deltas"))
585                .map(String::as_str),
586            Some("true")
587        );
588    }
589}