Skip to main content

statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::DEFAULT_INIT_TIMEOUT_MS;
11use crate::{
12    log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
13};
14use async_trait::async_trait;
15use chrono::Utc;
16use parking_lot::RwLock;
17use percent_encoding::percent_encode;
18use std::collections::HashMap;
19use std::sync::{Arc, Weak};
20use std::time::Duration;
21use tokio::sync::Notify;
22use tokio::time::sleep;
23
24use super::SpecsInfo;
25
26pub struct NetworkResponse {
27    pub data: ResponseData,
28    pub api: String,
29}
30
31pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
32pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
33
34#[allow(unused)]
35pub const INIT_DICT_ID: &str = "null";
36
37const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
38pub struct StatsigHttpSpecsAdapter {
39    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
40    network: NetworkClient,
41    sdk_key: String,
42    specs_url: String,
43    fallback_url: Option<String>,
44    init_timeout_ms: u64,
45    sync_interval_duration: Duration,
46    ops_stats: Arc<OpsStatsForInstance>,
47    shutdown_notify: Arc<Notify>,
48}
49
50impl StatsigHttpSpecsAdapter {
51    #[must_use]
52    pub fn new(
53        sdk_key: &str,
54        options: Option<&StatsigOptions>,
55        override_url: Option<String>,
56    ) -> Self {
57        let default_options = StatsigOptions::default();
58        let options_ref = options.unwrap_or(&default_options);
59
60        let init_timeout_ms = options_ref
61            .init_timeout_ms
62            .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
63
64        let specs_url = match override_url {
65            Some(url) => url,
66            None => options_ref
67                .specs_url
68                .as_ref()
69                .map(|u| u.to_string())
70                .unwrap_or(DEFAULT_SPECS_URL.to_string()),
71        };
72
73        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
74        let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
75            && specs_url != DEFAULT_SPECS_URL
76        {
77            Some(DEFAULT_SPECS_URL.to_string())
78        } else {
79            None
80        };
81
82        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
83
84        Self {
85            listener: RwLock::new(None),
86            network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
87            sdk_key: sdk_key.to_string(),
88            specs_url,
89            fallback_url,
90            init_timeout_ms,
91            sync_interval_duration: Duration::from_millis(u64::from(
92                options_ref
93                    .specs_sync_interval_ms
94                    .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
95            )),
96            ops_stats: OPS_STATS.get_for_instance(sdk_key),
97            shutdown_notify: Arc::new(Notify::new()),
98        }
99    }
100
101    pub fn force_shutdown(&self) {
102        self.shutdown_notify.notify_one();
103    }
104
105    pub async fn fetch_specs_from_network(
106        &self,
107        current_specs_info: SpecsInfo,
108        trigger: SpecsSyncTrigger,
109    ) -> Result<NetworkResponse, NetworkError> {
110        let request_args = self.get_request_args(&current_specs_info, trigger);
111        let url = request_args.url.clone();
112        match self.handle_specs_request(request_args).await {
113            Ok(response) => Ok(NetworkResponse {
114                data: response,
115                api: get_api_from_url(&url),
116            }),
117            Err(e) => Err(e),
118        }
119    }
120
121    fn get_request_args(
122        &self,
123        current_specs_info: &SpecsInfo,
124        trigger: SpecsSyncTrigger,
125    ) -> RequestArgs {
126        let mut params = HashMap::new();
127
128        params.insert("supports_proto".to_string(), "true".to_string());
129        let headers = Some(HashMap::from([
130            ("statsig-supports-proto".to_string(), "true".to_string()),
131            (
132                "accept-encoding".to_string(),
133                "statsig-br, gzip, deflate, br".to_string(),
134            ),
135        ]));
136
137        if let Some(lcut) = current_specs_info.lcut {
138            if lcut > 0 {
139                params.insert("sinceTime".to_string(), lcut.to_string());
140            }
141        }
142
143        let is_init_request = trigger == SpecsSyncTrigger::Initial;
144
145        let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
146            self.init_timeout_ms
147        } else {
148            0
149        };
150
151        if let Some(cs) = &current_specs_info.checksum {
152            params.insert(
153                "checksum".to_string(),
154                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
155            );
156        }
157
158        RequestArgs {
159            url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
160            retries: match trigger {
161                SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
162                SpecsSyncTrigger::Background => 3,
163            },
164            query_params: Some(params),
165            accept_gzip_response: true,
166            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
167            timeout_ms,
168            headers,
169            ..RequestArgs::new()
170        }
171    }
172
173    async fn handle_fallback_request(
174        &self,
175        mut request_args: RequestArgs,
176    ) -> Result<NetworkResponse, NetworkError> {
177        let fallback_url = match &self.fallback_url {
178            Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
179            None => {
180                return Err(NetworkError::RequestFailed(
181                    request_args.url.clone(),
182                    None,
183                    "No fallback URL".to_string(),
184                ))
185            }
186        };
187
188        request_args.url = fallback_url.clone();
189
190        // TODO logging
191
192        let response = self.handle_specs_request(request_args).await?;
193        Ok(NetworkResponse {
194            data: response,
195            api: get_api_from_url(&fallback_url),
196        })
197    }
198
199    async fn handle_specs_request(
200        &self,
201        request_args: RequestArgs,
202    ) -> Result<ResponseData, NetworkError> {
203        let url = request_args.url.clone();
204        let response = self.network.get(request_args).await?;
205        match response.data {
206            Some(data) => Ok(data),
207            None => Err(NetworkError::RequestFailed(
208                url,
209                None,
210                response.error.unwrap_or("No data in response".to_string()),
211            )),
212        }
213    }
214
215    pub async fn run_background_sync(self: Arc<Self>) {
216        let specs_info = match self
217            .listener
218            .try_read_for(std::time::Duration::from_secs(5))
219        {
220            Some(lock) => match lock.as_ref() {
221                Some(listener) => listener.get_current_specs_info(),
222                None => SpecsInfo::empty(),
223            },
224            None => SpecsInfo::error(),
225        };
226
227        self.ops_stats
228            .set_diagnostics_context(ContextType::ConfigSync);
229        if let Err(e) = self
230            .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
231            .await
232        {
233            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
234                return;
235            }
236            log_e!(TAG, "Background specs sync failed: {}", e);
237        }
238        self.ops_stats.enqueue_diagnostics_event(
239            Some(KeyType::DownloadConfigSpecs),
240            Some(ContextType::ConfigSync),
241        );
242    }
243
244    async fn manually_sync_specs(
245        &self,
246        current_specs_info: SpecsInfo,
247        trigger: SpecsSyncTrigger,
248    ) -> Result<(), StatsigErr> {
249        if let Some(lock) = self
250            .listener
251            .try_read_for(std::time::Duration::from_secs(5))
252        {
253            if lock.is_none() {
254                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
255            }
256        }
257
258        let response = self
259            .fetch_specs_from_network(current_specs_info.clone(), trigger)
260            .await;
261        let result = self.process_spec_data(response).await;
262
263        if result.is_err() && self.fallback_url.is_some() {
264            log_d!(TAG, "Falling back to statsig api");
265            let response = self
266                .handle_fallback_request(self.get_request_args(&current_specs_info, trigger))
267                .await;
268            return self.process_spec_data(response).await;
269        }
270
271        result
272    }
273
274    async fn process_spec_data(
275        &self,
276        response: Result<NetworkResponse, NetworkError>,
277    ) -> Result<(), StatsigErr> {
278        let resp = response.map_err(StatsigErr::NetworkError)?;
279
280        let update = SpecsUpdate {
281            data: resp.data,
282            source: SpecsSource::Network,
283            received_at: Utc::now().timestamp_millis() as u64,
284            source_api: Some(resp.api),
285        };
286
287        self.ops_stats.add_marker(
288            Marker::new(
289                KeyType::DownloadConfigSpecs,
290                ActionType::Start,
291                Some(StepType::Process),
292            ),
293            None,
294        );
295
296        let result = match self
297            .listener
298            .try_read_for(std::time::Duration::from_secs(5))
299        {
300            Some(lock) => match lock.as_ref() {
301                Some(listener) => listener.did_receive_specs_update(update),
302                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
303            },
304            None => {
305                let err =
306                    StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
307                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
308                Err(err)
309            }
310        };
311
312        self.ops_stats.add_marker(
313            Marker::new(
314                KeyType::DownloadConfigSpecs,
315                ActionType::End,
316                Some(StepType::Process),
317            )
318            .with_is_success(result.is_ok()),
319            None,
320        );
321
322        result
323    }
324}
325
326#[async_trait]
327impl SpecsAdapter for StatsigHttpSpecsAdapter {
328    async fn start(
329        self: Arc<Self>,
330        _statsig_runtime: &Arc<StatsigRuntime>,
331    ) -> Result<(), StatsigErr> {
332        let specs_info = match self
333            .listener
334            .try_read_for(std::time::Duration::from_secs(5))
335        {
336            Some(lock) => match lock.as_ref() {
337                Some(listener) => listener.get_current_specs_info(),
338                None => SpecsInfo::empty(),
339            },
340            None => SpecsInfo::error(),
341        };
342        self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
343            .await
344    }
345
346    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
347        match self
348            .listener
349            .try_write_for(std::time::Duration::from_secs(5))
350        {
351            Some(mut lock) => *lock = Some(listener),
352            None => {
353                log_e!(TAG, "Failed to acquire write lock on listener");
354            }
355        }
356    }
357
358    async fn schedule_background_sync(
359        self: Arc<Self>,
360        statsig_runtime: &Arc<StatsigRuntime>,
361    ) -> Result<(), StatsigErr> {
362        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
363        let interval_duration = self.sync_interval_duration;
364        let shutdown_notify = self.shutdown_notify.clone();
365
366        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
367            loop {
368                tokio::select! {
369                    () = sleep(interval_duration) => {
370                        if let Some(strong_self) = weak_self.upgrade() {
371                            Self::run_background_sync(strong_self).await;
372                        } else {
373                            log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
374                            break;
375                        }
376                    }
377                    () = rt_shutdown_notify.notified() => {
378                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
379                        break;
380                    },
381                    () = shutdown_notify.notified() => {
382                        log_d!(TAG, "Shutting down specs background sync");
383                        break;
384                    }
385                }
386            }
387        })?;
388
389        Ok(())
390    }
391
392    async fn shutdown(
393        &self,
394        _timeout: Duration,
395        _statsig_runtime: &Arc<StatsigRuntime>,
396    ) -> Result<(), StatsigErr> {
397        self.shutdown_notify.notify_one();
398        Ok(())
399    }
400
401    fn get_type_name(&self) -> String {
402        stringify!(StatsigHttpSpecsAdapter).to_string()
403    }
404}
405
406#[allow(unused)]
407fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
408    format!("{spec_url}/{sdk_key}.json")
409}
410
411#[derive(Debug, Clone, Copy, PartialEq, Eq)]
412pub enum SpecsSyncTrigger {
413    Initial,
414    Background,
415    Manual,
416}