statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::{
11    log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
12};
13use async_trait::async_trait;
14use chrono::Utc;
15use parking_lot::RwLock;
16use percent_encoding::percent_encode;
17use std::collections::HashMap;
18use std::sync::{Arc, Weak};
19use std::time::Duration;
20use tokio::sync::Notify;
21use tokio::time::sleep;
22
23use super::SpecsInfo;
24
25pub struct NetworkResponse {
26    pub data: Vec<u8>,
27    pub api: String,
28}
29
30pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
31pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
32#[allow(unused)]
33pub const INIT_DICT_ID: &str = "null";
34
35const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
36pub struct StatsigHttpSpecsAdapter {
37    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
38    network: NetworkClient,
39    sdk_key: String,
40    specs_url: String,
41    fallback_url: Option<String>,
42    sync_interval_duration: Duration,
43    ops_stats: Arc<OpsStatsForInstance>,
44    shutdown_notify: Arc<Notify>,
45}
46
47impl StatsigHttpSpecsAdapter {
48    #[must_use]
49    pub fn new(
50        sdk_key: &str,
51        options: Option<&StatsigOptions>,
52        override_url: Option<String>,
53    ) -> Self {
54        let default_options = StatsigOptions::default();
55        let options_ref = options.unwrap_or(&default_options);
56
57        let specs_url = match override_url {
58            Some(url) => url,
59            None => options_ref
60                .specs_url
61                .as_ref()
62                .map(|u| u.to_string())
63                .unwrap_or(DEFAULT_SPECS_URL.to_string()),
64        };
65
66        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
67        let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
68            && specs_url != DEFAULT_SPECS_URL
69        {
70            Some(DEFAULT_SPECS_URL.to_string())
71        } else {
72            None
73        };
74
75        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
76
77        Self {
78            listener: RwLock::new(None),
79            network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
80            sdk_key: sdk_key.to_string(),
81            specs_url,
82            fallback_url,
83            sync_interval_duration: Duration::from_millis(u64::from(
84                options_ref
85                    .specs_sync_interval_ms
86                    .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
87            )),
88            ops_stats: OPS_STATS.get_for_instance(sdk_key),
89            shutdown_notify: Arc::new(Notify::new()),
90        }
91    }
92
93    pub fn force_shutdown(&self) {
94        self.shutdown_notify.notify_one();
95    }
96
97    pub async fn fetch_specs_from_network(
98        &self,
99        current_specs_info: SpecsInfo,
100    ) -> Result<NetworkResponse, NetworkError> {
101        let request_args = self.get_request_args(&current_specs_info);
102        let url = request_args.url.clone();
103        match self.handle_specs_request(request_args).await {
104            Ok(response) => Ok(NetworkResponse {
105                data: response,
106                api: get_api_from_url(&url),
107            }),
108            Err(e) => Err(e),
109        }
110    }
111
112    fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
113        let mut params = HashMap::new();
114        if let Some(lcut) = current_specs_info.lcut {
115            params.insert("sinceTime".to_string(), lcut.to_string());
116        }
117        if let Some(cs) = &current_specs_info.checksum {
118            params.insert(
119                "checksum".to_string(),
120                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
121            );
122        }
123
124        RequestArgs {
125            url: construct_specs_url(
126                self.specs_url.as_str(),
127                self.sdk_key.as_str(),
128                current_specs_info.zstd_dict_id.as_deref(),
129            ),
130            query_params: Some(params),
131            accept_gzip_response: true,
132            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
133            ..RequestArgs::new()
134        }
135    }
136
137    async fn handle_fallback_request(
138        &self,
139        mut request_args: RequestArgs,
140        current_specs_info: SpecsInfo,
141    ) -> Result<NetworkResponse, NetworkError> {
142        let fallback_url = match &self.fallback_url {
143            Some(url) => construct_specs_url(
144                url.as_str(),
145                &self.sdk_key,
146                current_specs_info.zstd_dict_id.as_deref(),
147            ),
148            None => {
149                return Err(NetworkError::RequestFailed(
150                    request_args.url.clone(),
151                    0,
152                    "No fallback URL".to_string(),
153                ))
154            }
155        };
156
157        request_args.url = fallback_url.clone();
158
159        // TODO logging
160
161        let response = self.handle_specs_request(request_args).await?;
162        Ok(NetworkResponse {
163            data: response,
164            api: get_api_from_url(&fallback_url),
165        })
166    }
167
168    // TODO: return a decompressed and parsed SpecsResponse
169    async fn handle_specs_request(
170        &self,
171        request_args: RequestArgs,
172    ) -> Result<Vec<u8>, NetworkError> {
173        let url = request_args.url.clone();
174        let response = self.network.get(request_args).await?;
175        match response.data {
176            Some(data) => Ok(data),
177            None => Err(NetworkError::RequestFailed(
178                url,
179                0,
180                "No data in response".to_string(),
181            )),
182        }
183    }
184
185    pub async fn run_background_sync(self: Arc<Self>) {
186        let specs_info = match self
187            .listener
188            .try_read_for(std::time::Duration::from_secs(5))
189        {
190            Some(lock) => match lock.as_ref() {
191                Some(listener) => listener.get_current_specs_info(),
192                None => SpecsInfo::empty(),
193            },
194            None => SpecsInfo::error(),
195        };
196
197        self.ops_stats
198            .set_diagnostics_context(ContextType::ConfigSync);
199        if let Err(e) = self.manually_sync_specs(specs_info).await {
200            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
201                return;
202            }
203            log_e!(TAG, "Background specs sync failed: {}", e);
204        }
205        self.ops_stats.enqueue_diagnostics_event(
206            Some(KeyType::DownloadConfigSpecs),
207            Some(ContextType::ConfigSync),
208        );
209    }
210
211    async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
212        if let Some(lock) = self
213            .listener
214            .try_read_for(std::time::Duration::from_secs(5))
215        {
216            if lock.is_none() {
217                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
218            }
219        }
220
221        let response = self
222            .fetch_specs_from_network(current_specs_info.clone())
223            .await;
224        let result = self.process_spec_data(response).await;
225
226        if result.is_err() && self.fallback_url.is_some() {
227            log_d!(TAG, "Falling back to statsig api");
228            let response = self
229                .handle_fallback_request(
230                    self.get_request_args(&current_specs_info),
231                    current_specs_info,
232                )
233                .await;
234            return self.process_spec_data(response).await;
235        }
236
237        result
238    }
239
240    async fn process_spec_data(
241        &self,
242        response: Result<NetworkResponse, NetworkError>,
243    ) -> Result<(), StatsigErr> {
244        let resp = response.map_err(StatsigErr::NetworkError)?;
245
246        let update = SpecsUpdate {
247            data: resp.data,
248            source: SpecsSource::Network,
249            received_at: Utc::now().timestamp_millis() as u64,
250            source_api: Some(resp.api),
251        };
252
253        self.ops_stats.add_marker(
254            Marker::new(
255                KeyType::DownloadConfigSpecs,
256                ActionType::Start,
257                Some(StepType::Process),
258            ),
259            None,
260        );
261
262        let result = match self
263            .listener
264            .try_read_for(std::time::Duration::from_secs(5))
265        {
266            Some(lock) => match lock.as_ref() {
267                Some(listener) => listener.did_receive_specs_update(update),
268                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
269            },
270            None => {
271                let err =
272                    StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
273                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
274                Err(err)
275            }
276        };
277
278        self.ops_stats.add_marker(
279            Marker::new(
280                KeyType::DownloadConfigSpecs,
281                ActionType::End,
282                Some(StepType::Process),
283            )
284            .with_is_success(result.is_ok()),
285            None,
286        );
287
288        result
289    }
290}
291
292#[async_trait]
293impl SpecsAdapter for StatsigHttpSpecsAdapter {
294    async fn start(
295        self: Arc<Self>,
296        _statsig_runtime: &Arc<StatsigRuntime>,
297    ) -> Result<(), StatsigErr> {
298        let specs_info = match self
299            .listener
300            .try_read_for(std::time::Duration::from_secs(5))
301        {
302            Some(lock) => match lock.as_ref() {
303                Some(listener) => listener.get_current_specs_info(),
304                None => SpecsInfo::empty(),
305            },
306            None => SpecsInfo::error(),
307        };
308        self.manually_sync_specs(specs_info).await
309    }
310
311    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
312        match self
313            .listener
314            .try_write_for(std::time::Duration::from_secs(5))
315        {
316            Some(mut lock) => *lock = Some(listener),
317            None => {
318                log_e!(TAG, "Failed to acquire write lock on listener");
319            }
320        }
321    }
322
323    async fn schedule_background_sync(
324        self: Arc<Self>,
325        statsig_runtime: &Arc<StatsigRuntime>,
326    ) -> Result<(), StatsigErr> {
327        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
328        let interval_duration = self.sync_interval_duration;
329        let shutdown_notify = self.shutdown_notify.clone();
330
331        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
332            loop {
333                tokio::select! {
334                    () = sleep(interval_duration) => {
335                        if let Some(strong_self) = weak_self.upgrade() {
336                            Self::run_background_sync(strong_self).await;
337                        } else {
338                            log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
339                            break;
340                        }
341                    }
342                    () = rt_shutdown_notify.notified() => {
343                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
344                        break;
345                    },
346                    () = shutdown_notify.notified() => {
347                        log_d!(TAG, "Shutting down specs background sync");
348                        break;
349                    }
350                }
351            }
352        })?;
353
354        Ok(())
355    }
356
357    async fn shutdown(
358        &self,
359        _timeout: Duration,
360        _statsig_runtime: &Arc<StatsigRuntime>,
361    ) -> Result<(), StatsigErr> {
362        self.shutdown_notify.notify_one();
363        Ok(())
364    }
365
366    fn get_type_name(&self) -> String {
367        stringify!(StatsigHttpSpecsAdapter).to_string()
368    }
369}
370
371#[allow(unused)]
372fn construct_specs_url(spec_url: &str, sdk_key: &str, dict_id: Option<&str>) -> String {
373    #[cfg(feature = "with_shared_dict_compression")]
374    {
375        let dict_id = dict_id.unwrap_or(INIT_DICT_ID);
376        format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
377    }
378    #[cfg(not(feature = "with_shared_dict_compression"))]
379    format!("{spec_url}/{sdk_key}.json")
380}