statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::{
11    log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
12};
13use async_trait::async_trait;
14use chrono::Utc;
15use percent_encoding::percent_encode;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock, Weak};
18use std::time::Duration;
19use tokio::sync::Notify;
20use tokio::time::sleep;
21
22use super::SpecsInfo;
23
24pub struct NetworkResponse {
25    pub data: Vec<u8>,
26    pub api: String,
27}
28
29pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
30pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
31#[allow(unused)]
32pub const INIT_DICT_ID: &str = "null";
33
34const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
35pub struct StatsigHttpSpecsAdapter {
36    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
37    network: NetworkClient,
38    sdk_key: String,
39    specs_url: String,
40    fallback_url: Option<String>,
41    sync_interval_duration: Duration,
42    ops_stats: Arc<OpsStatsForInstance>,
43    shutdown_notify: Arc<Notify>,
44}
45
46impl StatsigHttpSpecsAdapter {
47    #[must_use]
48    pub fn new(
49        sdk_key: &str,
50        options: Option<&StatsigOptions>,
51        override_url: Option<String>,
52    ) -> Self {
53        let default_options = StatsigOptions::default();
54        let options_ref = options.unwrap_or(&default_options);
55
56        let specs_url = match override_url {
57            Some(url) => url,
58            None => options_ref
59                .specs_url
60                .as_ref()
61                .map(|u| u.to_string())
62                .unwrap_or(DEFAULT_SPECS_URL.to_string()),
63        };
64
65        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
66        let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
67            && specs_url != DEFAULT_SPECS_URL
68        {
69            Some(DEFAULT_SPECS_URL.to_string())
70        } else {
71            None
72        };
73
74        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
75
76        Self {
77            listener: RwLock::new(None),
78            network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
79            sdk_key: sdk_key.to_string(),
80            specs_url,
81            fallback_url,
82            sync_interval_duration: Duration::from_millis(u64::from(
83                options_ref
84                    .specs_sync_interval_ms
85                    .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
86            )),
87            ops_stats: OPS_STATS.get_for_instance(sdk_key),
88            shutdown_notify: Arc::new(Notify::new()),
89        }
90    }
91
92    pub fn force_shutdown(&self) {
93        self.shutdown_notify.notify_one();
94    }
95
96    pub async fn fetch_specs_from_network(
97        &self,
98        current_specs_info: SpecsInfo,
99    ) -> Result<NetworkResponse, NetworkError> {
100        let request_args = self.get_request_args(&current_specs_info);
101        let url = request_args.url.clone();
102        match self.handle_specs_request(request_args).await {
103            Ok(response) => Ok(NetworkResponse {
104                data: response,
105                api: get_api_from_url(&url),
106            }),
107            Err(e) => Err(e),
108        }
109    }
110
111    fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
112        let mut params = HashMap::new();
113        if let Some(lcut) = current_specs_info.lcut {
114            params.insert("sinceTime".to_string(), lcut.to_string());
115        }
116        if let Some(cs) = &current_specs_info.checksum {
117            params.insert(
118                "checksum".to_string(),
119                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
120            );
121        }
122
123        RequestArgs {
124            url: construct_specs_url(
125                self.specs_url.as_str(),
126                self.sdk_key.as_str(),
127                current_specs_info.zstd_dict_id.as_deref(),
128            ),
129            query_params: Some(params),
130            accept_gzip_response: true,
131            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
132            ..RequestArgs::new()
133        }
134    }
135
136    async fn handle_fallback_request(
137        &self,
138        mut request_args: RequestArgs,
139        current_specs_info: SpecsInfo,
140    ) -> Result<NetworkResponse, NetworkError> {
141        let fallback_url = match &self.fallback_url {
142            Some(url) => construct_specs_url(
143                url.as_str(),
144                &self.sdk_key,
145                current_specs_info.zstd_dict_id.as_deref(),
146            ),
147            None => {
148                return Err(NetworkError::RequestFailed(
149                    request_args.url.clone(),
150                    0,
151                    "No fallback URL".to_string(),
152                ))
153            }
154        };
155
156        request_args.url = fallback_url.clone();
157
158        // TODO logging
159
160        let response = self.handle_specs_request(request_args).await?;
161        Ok(NetworkResponse {
162            data: response,
163            api: get_api_from_url(&fallback_url),
164        })
165    }
166
167    // TODO: return a decompressed and parsed SpecsResponse
168    async fn handle_specs_request(
169        &self,
170        request_args: RequestArgs,
171    ) -> Result<Vec<u8>, NetworkError> {
172        let url = request_args.url.clone();
173        let response = self.network.get(request_args).await?;
174        match response.data {
175            Some(data) => Ok(data),
176            None => Err(NetworkError::RequestFailed(
177                url,
178                0,
179                "No data in response".to_string(),
180            )),
181        }
182    }
183
184    pub async fn run_background_sync(self: Arc<Self>) {
185        let specs_info = match self.listener.read() {
186            Ok(lock) => match lock.as_ref() {
187                Some(listener) => listener.get_current_specs_info(),
188                None => SpecsInfo::empty(),
189            },
190            Err(_) => SpecsInfo::error(),
191        };
192
193        self.ops_stats
194            .set_diagnostics_context(ContextType::ConfigSync);
195        if let Err(e) = self.manually_sync_specs(specs_info).await {
196            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
197                return;
198            }
199            log_e!(TAG, "Background specs sync failed: {}", e);
200        }
201        self.ops_stats.enqueue_diagnostics_event(
202            Some(KeyType::DownloadConfigSpecs),
203            Some(ContextType::ConfigSync),
204        );
205    }
206
207    async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
208        if let Ok(lock) = self.listener.read() {
209            if lock.is_none() {
210                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
211            }
212        }
213
214        let response = self
215            .fetch_specs_from_network(current_specs_info.clone())
216            .await;
217        let result = self.process_spec_data(response).await;
218
219        if result.is_err() && self.fallback_url.is_some() {
220            log_d!(TAG, "Falling back to statsig api");
221            let response = self
222                .handle_fallback_request(
223                    self.get_request_args(&current_specs_info),
224                    current_specs_info,
225                )
226                .await;
227            return self.process_spec_data(response).await;
228        }
229
230        result
231    }
232
233    async fn process_spec_data(
234        &self,
235        response: Result<NetworkResponse, NetworkError>,
236    ) -> Result<(), StatsigErr> {
237        let resp = response.map_err(StatsigErr::NetworkError)?;
238
239        let update = SpecsUpdate {
240            data: resp.data,
241            source: SpecsSource::Network,
242            received_at: Utc::now().timestamp_millis() as u64,
243            source_api: Some(resp.api),
244        };
245
246        self.ops_stats.add_marker(
247            Marker::new(
248                KeyType::DownloadConfigSpecs,
249                ActionType::Start,
250                Some(StepType::Process),
251            ),
252            None,
253        );
254
255        let result = match self.listener.read() {
256            Ok(lock) => match lock.as_ref() {
257                Some(listener) => listener.did_receive_specs_update(update),
258                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
259            },
260            Err(e) => {
261                let err = StatsigErr::LockFailure(format!(
262                    "Failed to acquire read lock on listener: {e}"
263                ));
264                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
265                Err(err)
266            }
267        };
268
269        self.ops_stats.add_marker(
270            Marker::new(
271                KeyType::DownloadConfigSpecs,
272                ActionType::End,
273                Some(StepType::Process),
274            )
275            .with_is_success(result.is_ok()),
276            None,
277        );
278
279        result
280    }
281}
282
283#[async_trait]
284impl SpecsAdapter for StatsigHttpSpecsAdapter {
285    async fn start(
286        self: Arc<Self>,
287        _statsig_runtime: &Arc<StatsigRuntime>,
288    ) -> Result<(), StatsigErr> {
289        let specs_info = match self.listener.read() {
290            Ok(lock) => match lock.as_ref() {
291                Some(listener) => listener.get_current_specs_info(),
292                None => SpecsInfo::empty(),
293            },
294            Err(_) => SpecsInfo::error(),
295        };
296        self.manually_sync_specs(specs_info).await
297    }
298
299    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
300        match self.listener.write() {
301            Ok(mut lock) => *lock = Some(listener),
302            Err(e) => {
303                log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
304            }
305        }
306    }
307
308    async fn schedule_background_sync(
309        self: Arc<Self>,
310        statsig_runtime: &Arc<StatsigRuntime>,
311    ) -> Result<(), StatsigErr> {
312        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
313        let interval_duration = self.sync_interval_duration;
314        let shutdown_notify = self.shutdown_notify.clone();
315
316        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
317            loop {
318                tokio::select! {
319                    () = sleep(interval_duration) => {
320                        if let Some(strong_self) = weak_self.upgrade() {
321                            Self::run_background_sync(strong_self).await;
322                        } else {
323                            log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
324                            break;
325                        }
326                    }
327                    () = rt_shutdown_notify.notified() => {
328                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
329                        break;
330                    },
331                    () = shutdown_notify.notified() => {
332                        log_d!(TAG, "Shutting down specs background sync");
333                        break;
334                    }
335                }
336            }
337        });
338
339        Ok(())
340    }
341
342    async fn shutdown(
343        &self,
344        _timeout: Duration,
345        _statsig_runtime: &Arc<StatsigRuntime>,
346    ) -> Result<(), StatsigErr> {
347        self.shutdown_notify.notify_one();
348        Ok(())
349    }
350
351    fn get_type_name(&self) -> String {
352        stringify!(StatsigHttpSpecsAdapter).to_string()
353    }
354}
355
356#[allow(unused)]
357fn construct_specs_url(spec_url: &str, sdk_key: &str, dict_id: Option<&str>) -> String {
358    #[cfg(feature = "with_shared_dict_compression")]
359    {
360        let dict_id = dict_id.unwrap_or(INIT_DICT_ID);
361        format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
362    }
363    #[cfg(not(feature = "with_shared_dict_compression"))]
364    format!("{spec_url}/{sdk_key}.json")
365}