statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::DEFAULT_INIT_TIMEOUT_MS;
11use crate::{
12    log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
13};
14use async_trait::async_trait;
15use chrono::Utc;
16use parking_lot::RwLock;
17use percent_encoding::percent_encode;
18use std::collections::HashMap;
19use std::sync::{Arc, Weak};
20use std::time::Duration;
21use tokio::sync::Notify;
22use tokio::time::sleep;
23
24use super::SpecsInfo;
25
26pub struct NetworkResponse {
27    pub data: ResponseData,
28    pub api: String,
29}
30
31pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
32pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
33
34#[allow(unused)]
35pub const INIT_DICT_ID: &str = "null";
36
37const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
38pub struct StatsigHttpSpecsAdapter {
39    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
40    network: NetworkClient,
41    sdk_key: String,
42    specs_url: String,
43    fallback_url: Option<String>,
44    init_timeout_ms: u64,
45    sync_interval_duration: Duration,
46    ops_stats: Arc<OpsStatsForInstance>,
47    shutdown_notify: Arc<Notify>,
48    enable_proto_spec_support: bool,
49}
50
51impl StatsigHttpSpecsAdapter {
52    #[must_use]
53    pub fn new(
54        sdk_key: &str,
55        options: Option<&StatsigOptions>,
56        override_url: Option<String>,
57    ) -> Self {
58        let default_options = StatsigOptions::default();
59        let options_ref = options.unwrap_or(&default_options);
60
61        let init_timeout_ms = options_ref
62            .init_timeout_ms
63            .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
64
65        let specs_url = match override_url {
66            Some(url) => url,
67            None => options_ref
68                .specs_url
69                .as_ref()
70                .map(|u| u.to_string())
71                .unwrap_or(DEFAULT_SPECS_URL.to_string()),
72        };
73
74        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
75        let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
76            && specs_url != DEFAULT_SPECS_URL
77        {
78            Some(DEFAULT_SPECS_URL.to_string())
79        } else {
80            None
81        };
82
83        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
84
85        let enable_proto_spec_support = options_ref
86            .experimental_flags
87            .as_ref()
88            .is_some_and(|flags| flags.contains("enable_proto_spec_support"));
89
90        Self {
91            listener: RwLock::new(None),
92            network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
93            sdk_key: sdk_key.to_string(),
94            specs_url,
95            fallback_url,
96            init_timeout_ms,
97            sync_interval_duration: Duration::from_millis(u64::from(
98                options_ref
99                    .specs_sync_interval_ms
100                    .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
101            )),
102            ops_stats: OPS_STATS.get_for_instance(sdk_key),
103            shutdown_notify: Arc::new(Notify::new()),
104            enable_proto_spec_support,
105        }
106    }
107
108    pub fn force_shutdown(&self) {
109        self.shutdown_notify.notify_one();
110    }
111
112    pub async fn fetch_specs_from_network(
113        &self,
114        current_specs_info: SpecsInfo,
115        trigger: SpecsSyncTrigger,
116    ) -> Result<NetworkResponse, NetworkError> {
117        let request_args = self.get_request_args(&current_specs_info, trigger);
118        let url = request_args.url.clone();
119        match self.handle_specs_request(request_args).await {
120            Ok(response) => Ok(NetworkResponse {
121                data: response,
122                api: get_api_from_url(&url),
123            }),
124            Err(e) => Err(e),
125        }
126    }
127
128    fn get_request_args(
129        &self,
130        current_specs_info: &SpecsInfo,
131        trigger: SpecsSyncTrigger,
132    ) -> RequestArgs {
133        let mut params = HashMap::new();
134        let mut headers = None;
135
136        if self.enable_proto_spec_support {
137            params.insert("supports_proto".to_string(), "true".to_string());
138            headers = Some(HashMap::from([
139                ("statsig-supports-proto".to_string(), "true".to_string()),
140                (
141                    "accept-encoding".to_string(),
142                    "statsig-br, gzip, deflate, br".to_string(),
143                ),
144            ]));
145        }
146
147        if let Some(lcut) = current_specs_info.lcut {
148            if lcut > 0 {
149                params.insert("sinceTime".to_string(), lcut.to_string());
150            }
151        }
152
153        let is_init_request = trigger == SpecsSyncTrigger::Initial;
154
155        let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
156            self.init_timeout_ms
157        } else {
158            0
159        };
160
161        if let Some(cs) = &current_specs_info.checksum {
162            params.insert(
163                "checksum".to_string(),
164                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
165            );
166        }
167
168        RequestArgs {
169            url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
170            retries: match trigger {
171                SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
172                SpecsSyncTrigger::Background => 3,
173            },
174            query_params: Some(params),
175            accept_gzip_response: true,
176            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
177            timeout_ms,
178            headers,
179            ..RequestArgs::new()
180        }
181    }
182
183    async fn handle_fallback_request(
184        &self,
185        mut request_args: RequestArgs,
186    ) -> Result<NetworkResponse, NetworkError> {
187        let fallback_url = match &self.fallback_url {
188            Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
189            None => {
190                return Err(NetworkError::RequestFailed(
191                    request_args.url.clone(),
192                    None,
193                    "No fallback URL".to_string(),
194                ))
195            }
196        };
197
198        request_args.url = fallback_url.clone();
199
200        // TODO logging
201
202        let response = self.handle_specs_request(request_args).await?;
203        Ok(NetworkResponse {
204            data: response,
205            api: get_api_from_url(&fallback_url),
206        })
207    }
208
209    async fn handle_specs_request(
210        &self,
211        request_args: RequestArgs,
212    ) -> Result<ResponseData, NetworkError> {
213        let url = request_args.url.clone();
214        let response = self.network.get(request_args).await?;
215        match response.data {
216            Some(data) => Ok(data),
217            None => Err(NetworkError::RequestFailed(
218                url,
219                None,
220                response.error.unwrap_or("No data in response".to_string()),
221            )),
222        }
223    }
224
225    pub async fn run_background_sync(self: Arc<Self>) {
226        let specs_info = match self
227            .listener
228            .try_read_for(std::time::Duration::from_secs(5))
229        {
230            Some(lock) => match lock.as_ref() {
231                Some(listener) => listener.get_current_specs_info(),
232                None => SpecsInfo::empty(),
233            },
234            None => SpecsInfo::error(),
235        };
236
237        self.ops_stats
238            .set_diagnostics_context(ContextType::ConfigSync);
239        if let Err(e) = self
240            .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
241            .await
242        {
243            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
244                return;
245            }
246            log_e!(TAG, "Background specs sync failed: {}", e);
247        }
248        self.ops_stats.enqueue_diagnostics_event(
249            Some(KeyType::DownloadConfigSpecs),
250            Some(ContextType::ConfigSync),
251        );
252    }
253
254    async fn manually_sync_specs(
255        &self,
256        current_specs_info: SpecsInfo,
257        trigger: SpecsSyncTrigger,
258    ) -> Result<(), StatsigErr> {
259        if let Some(lock) = self
260            .listener
261            .try_read_for(std::time::Duration::from_secs(5))
262        {
263            if lock.is_none() {
264                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
265            }
266        }
267
268        let response = self
269            .fetch_specs_from_network(current_specs_info.clone(), trigger)
270            .await;
271        let result = self.process_spec_data(response).await;
272
273        if result.is_err() && self.fallback_url.is_some() {
274            log_d!(TAG, "Falling back to statsig api");
275            let response = self
276                .handle_fallback_request(self.get_request_args(&current_specs_info, trigger))
277                .await;
278            return self.process_spec_data(response).await;
279        }
280
281        result
282    }
283
284    async fn process_spec_data(
285        &self,
286        response: Result<NetworkResponse, NetworkError>,
287    ) -> Result<(), StatsigErr> {
288        let resp = response.map_err(StatsigErr::NetworkError)?;
289
290        let update = SpecsUpdate {
291            data: resp.data,
292            source: SpecsSource::Network,
293            received_at: Utc::now().timestamp_millis() as u64,
294            source_api: Some(resp.api),
295        };
296
297        self.ops_stats.add_marker(
298            Marker::new(
299                KeyType::DownloadConfigSpecs,
300                ActionType::Start,
301                Some(StepType::Process),
302            ),
303            None,
304        );
305
306        let result = match self
307            .listener
308            .try_read_for(std::time::Duration::from_secs(5))
309        {
310            Some(lock) => match lock.as_ref() {
311                Some(listener) => listener.did_receive_specs_update(update),
312                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
313            },
314            None => {
315                let err =
316                    StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
317                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
318                Err(err)
319            }
320        };
321
322        self.ops_stats.add_marker(
323            Marker::new(
324                KeyType::DownloadConfigSpecs,
325                ActionType::End,
326                Some(StepType::Process),
327            )
328            .with_is_success(result.is_ok()),
329            None,
330        );
331
332        result
333    }
334}
335
336#[async_trait]
337impl SpecsAdapter for StatsigHttpSpecsAdapter {
338    async fn start(
339        self: Arc<Self>,
340        _statsig_runtime: &Arc<StatsigRuntime>,
341    ) -> Result<(), StatsigErr> {
342        let specs_info = match self
343            .listener
344            .try_read_for(std::time::Duration::from_secs(5))
345        {
346            Some(lock) => match lock.as_ref() {
347                Some(listener) => listener.get_current_specs_info(),
348                None => SpecsInfo::empty(),
349            },
350            None => SpecsInfo::error(),
351        };
352        self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
353            .await
354    }
355
356    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
357        match self
358            .listener
359            .try_write_for(std::time::Duration::from_secs(5))
360        {
361            Some(mut lock) => *lock = Some(listener),
362            None => {
363                log_e!(TAG, "Failed to acquire write lock on listener");
364            }
365        }
366    }
367
368    async fn schedule_background_sync(
369        self: Arc<Self>,
370        statsig_runtime: &Arc<StatsigRuntime>,
371    ) -> Result<(), StatsigErr> {
372        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
373        let interval_duration = self.sync_interval_duration;
374        let shutdown_notify = self.shutdown_notify.clone();
375
376        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
377            loop {
378                tokio::select! {
379                    () = sleep(interval_duration) => {
380                        if let Some(strong_self) = weak_self.upgrade() {
381                            Self::run_background_sync(strong_self).await;
382                        } else {
383                            log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
384                            break;
385                        }
386                    }
387                    () = rt_shutdown_notify.notified() => {
388                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
389                        break;
390                    },
391                    () = shutdown_notify.notified() => {
392                        log_d!(TAG, "Shutting down specs background sync");
393                        break;
394                    }
395                }
396            }
397        })?;
398
399        Ok(())
400    }
401
402    async fn shutdown(
403        &self,
404        _timeout: Duration,
405        _statsig_runtime: &Arc<StatsigRuntime>,
406    ) -> Result<(), StatsigErr> {
407        self.shutdown_notify.notify_one();
408        Ok(())
409    }
410
411    fn get_type_name(&self) -> String {
412        stringify!(StatsigHttpSpecsAdapter).to_string()
413    }
414}
415
416#[allow(unused)]
417fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
418    format!("{spec_url}/{sdk_key}.json")
419}
420
421#[derive(Debug, Clone, Copy, PartialEq, Eq)]
422pub enum SpecsSyncTrigger {
423    Initial,
424    Background,
425    Manual,
426}