statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::DEFAULT_INIT_TIMEOUT_MS;
11use crate::{
12    log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
13};
14use async_trait::async_trait;
15use chrono::Utc;
16use parking_lot::RwLock;
17use percent_encoding::percent_encode;
18use std::collections::HashMap;
19use std::sync::{Arc, Weak};
20use std::time::Duration;
21use tokio::sync::Notify;
22use tokio::time::sleep;
23
24use super::SpecsInfo;
25
26pub struct NetworkResponse {
27    pub data: ResponseData,
28    pub api: String,
29}
30
31pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
32pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
33
34#[allow(unused)]
35pub const INIT_DICT_ID: &str = "null";
36
37const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
38pub struct StatsigHttpSpecsAdapter {
39    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
40    network: NetworkClient,
41    sdk_key: String,
42    specs_url: String,
43    fallback_url: Option<String>,
44    init_timeout_ms: u64,
45    sync_interval_duration: Duration,
46    ops_stats: Arc<OpsStatsForInstance>,
47    shutdown_notify: Arc<Notify>,
48}
49
50impl StatsigHttpSpecsAdapter {
51    #[must_use]
52    pub fn new(
53        sdk_key: &str,
54        options: Option<&StatsigOptions>,
55        override_url: Option<String>,
56    ) -> Self {
57        let default_options = StatsigOptions::default();
58        let options_ref = options.unwrap_or(&default_options);
59
60        let init_timeout_ms = options_ref
61            .init_timeout_ms
62            .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
63
64        let specs_url = match override_url {
65            Some(url) => url,
66            None => options_ref
67                .specs_url
68                .as_ref()
69                .map(|u| u.to_string())
70                .unwrap_or(DEFAULT_SPECS_URL.to_string()),
71        };
72
73        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
74        let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
75            && specs_url != DEFAULT_SPECS_URL
76        {
77            Some(DEFAULT_SPECS_URL.to_string())
78        } else {
79            None
80        };
81
82        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
83
84        Self {
85            listener: RwLock::new(None),
86            network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
87            sdk_key: sdk_key.to_string(),
88            specs_url,
89            fallback_url,
90            init_timeout_ms,
91            sync_interval_duration: Duration::from_millis(u64::from(
92                options_ref
93                    .specs_sync_interval_ms
94                    .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
95            )),
96            ops_stats: OPS_STATS.get_for_instance(sdk_key),
97            shutdown_notify: Arc::new(Notify::new()),
98        }
99    }
100
101    pub fn force_shutdown(&self) {
102        self.shutdown_notify.notify_one();
103    }
104
105    pub async fn fetch_specs_from_network(
106        &self,
107        current_specs_info: SpecsInfo,
108        trigger: SpecsSyncTrigger,
109    ) -> Result<NetworkResponse, NetworkError> {
110        let request_args = self.get_request_args(&current_specs_info, trigger);
111        let url = request_args.url.clone();
112        match self.handle_specs_request(request_args).await {
113            Ok(response) => Ok(NetworkResponse {
114                data: response,
115                api: get_api_from_url(&url),
116            }),
117            Err(e) => Err(e),
118        }
119    }
120
121    fn get_request_args(
122        &self,
123        current_specs_info: &SpecsInfo,
124        trigger: SpecsSyncTrigger,
125    ) -> RequestArgs {
126        let mut params = HashMap::new();
127        if let Some(lcut) = current_specs_info.lcut {
128            if lcut > 0 {
129                params.insert("sinceTime".to_string(), lcut.to_string());
130            }
131        }
132
133        let is_init_request = trigger == SpecsSyncTrigger::Initial;
134
135        let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
136            self.init_timeout_ms
137        } else {
138            0
139        };
140
141        if let Some(cs) = &current_specs_info.checksum {
142            params.insert(
143                "checksum".to_string(),
144                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
145            );
146        }
147
148        RequestArgs {
149            url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
150            retries: match trigger {
151                SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
152                SpecsSyncTrigger::Background => 3,
153            },
154            query_params: Some(params),
155            accept_gzip_response: true,
156            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
157            timeout_ms,
158            ..RequestArgs::new()
159        }
160    }
161
162    async fn handle_fallback_request(
163        &self,
164        mut request_args: RequestArgs,
165    ) -> Result<NetworkResponse, NetworkError> {
166        let fallback_url = match &self.fallback_url {
167            Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
168            None => {
169                return Err(NetworkError::RequestFailed(
170                    request_args.url.clone(),
171                    None,
172                    "No fallback URL".to_string(),
173                ))
174            }
175        };
176
177        request_args.url = fallback_url.clone();
178
179        // TODO logging
180
181        let response = self.handle_specs_request(request_args).await?;
182        Ok(NetworkResponse {
183            data: response,
184            api: get_api_from_url(&fallback_url),
185        })
186    }
187
188    async fn handle_specs_request(
189        &self,
190        request_args: RequestArgs,
191    ) -> Result<ResponseData, NetworkError> {
192        let url = request_args.url.clone();
193        let response = self.network.get(request_args).await?;
194        match response.data {
195            Some(data) => Ok(data),
196            None => Err(NetworkError::RequestFailed(
197                url,
198                None,
199                "No data in response".to_string(),
200            )),
201        }
202    }
203
204    pub async fn run_background_sync(self: Arc<Self>) {
205        let specs_info = match self
206            .listener
207            .try_read_for(std::time::Duration::from_secs(5))
208        {
209            Some(lock) => match lock.as_ref() {
210                Some(listener) => listener.get_current_specs_info(),
211                None => SpecsInfo::empty(),
212            },
213            None => SpecsInfo::error(),
214        };
215
216        self.ops_stats
217            .set_diagnostics_context(ContextType::ConfigSync);
218        if let Err(e) = self
219            .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
220            .await
221        {
222            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
223                return;
224            }
225            log_e!(TAG, "Background specs sync failed: {}", e);
226        }
227        self.ops_stats.enqueue_diagnostics_event(
228            Some(KeyType::DownloadConfigSpecs),
229            Some(ContextType::ConfigSync),
230        );
231    }
232
233    async fn manually_sync_specs(
234        &self,
235        current_specs_info: SpecsInfo,
236        trigger: SpecsSyncTrigger,
237    ) -> Result<(), StatsigErr> {
238        if let Some(lock) = self
239            .listener
240            .try_read_for(std::time::Duration::from_secs(5))
241        {
242            if lock.is_none() {
243                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
244            }
245        }
246
247        let response = self
248            .fetch_specs_from_network(current_specs_info.clone(), trigger)
249            .await;
250        let result = self.process_spec_data(response).await;
251
252        if result.is_err() && self.fallback_url.is_some() {
253            log_d!(TAG, "Falling back to statsig api");
254            let response = self
255                .handle_fallback_request(self.get_request_args(&current_specs_info, trigger))
256                .await;
257            return self.process_spec_data(response).await;
258        }
259
260        result
261    }
262
263    async fn process_spec_data(
264        &self,
265        response: Result<NetworkResponse, NetworkError>,
266    ) -> Result<(), StatsigErr> {
267        let resp = response.map_err(StatsigErr::NetworkError)?;
268
269        let update = SpecsUpdate {
270            data: resp.data,
271            source: SpecsSource::Network,
272            received_at: Utc::now().timestamp_millis() as u64,
273            source_api: Some(resp.api),
274        };
275
276        self.ops_stats.add_marker(
277            Marker::new(
278                KeyType::DownloadConfigSpecs,
279                ActionType::Start,
280                Some(StepType::Process),
281            ),
282            None,
283        );
284
285        let result = match self
286            .listener
287            .try_read_for(std::time::Duration::from_secs(5))
288        {
289            Some(lock) => match lock.as_ref() {
290                Some(listener) => listener.did_receive_specs_update(update),
291                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
292            },
293            None => {
294                let err =
295                    StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
296                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
297                Err(err)
298            }
299        };
300
301        self.ops_stats.add_marker(
302            Marker::new(
303                KeyType::DownloadConfigSpecs,
304                ActionType::End,
305                Some(StepType::Process),
306            )
307            .with_is_success(result.is_ok()),
308            None,
309        );
310
311        result
312    }
313}
314
315#[async_trait]
316impl SpecsAdapter for StatsigHttpSpecsAdapter {
317    async fn start(
318        self: Arc<Self>,
319        _statsig_runtime: &Arc<StatsigRuntime>,
320    ) -> Result<(), StatsigErr> {
321        let specs_info = match self
322            .listener
323            .try_read_for(std::time::Duration::from_secs(5))
324        {
325            Some(lock) => match lock.as_ref() {
326                Some(listener) => listener.get_current_specs_info(),
327                None => SpecsInfo::empty(),
328            },
329            None => SpecsInfo::error(),
330        };
331        self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
332            .await
333    }
334
335    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
336        match self
337            .listener
338            .try_write_for(std::time::Duration::from_secs(5))
339        {
340            Some(mut lock) => *lock = Some(listener),
341            None => {
342                log_e!(TAG, "Failed to acquire write lock on listener");
343            }
344        }
345    }
346
347    async fn schedule_background_sync(
348        self: Arc<Self>,
349        statsig_runtime: &Arc<StatsigRuntime>,
350    ) -> Result<(), StatsigErr> {
351        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
352        let interval_duration = self.sync_interval_duration;
353        let shutdown_notify = self.shutdown_notify.clone();
354
355        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
356            loop {
357                tokio::select! {
358                    () = sleep(interval_duration) => {
359                        if let Some(strong_self) = weak_self.upgrade() {
360                            Self::run_background_sync(strong_self).await;
361                        } else {
362                            log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
363                            break;
364                        }
365                    }
366                    () = rt_shutdown_notify.notified() => {
367                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
368                        break;
369                    },
370                    () = shutdown_notify.notified() => {
371                        log_d!(TAG, "Shutting down specs background sync");
372                        break;
373                    }
374                }
375            }
376        })?;
377
378        Ok(())
379    }
380
381    async fn shutdown(
382        &self,
383        _timeout: Duration,
384        _statsig_runtime: &Arc<StatsigRuntime>,
385    ) -> Result<(), StatsigErr> {
386        self.shutdown_notify.notify_one();
387        Ok(())
388    }
389
390    fn get_type_name(&self) -> String {
391        stringify!(StatsigHttpSpecsAdapter).to_string()
392    }
393}
394
395#[allow(unused)]
396fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
397    format!("{spec_url}/{sdk_key}.json")
398}
399
400#[derive(Debug, Clone, Copy, PartialEq, Eq)]
401pub enum SpecsSyncTrigger {
402    Initial,
403    Background,
404    Manual,
405}