statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::{log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigRuntime};
10use async_trait::async_trait;
11use chrono::Utc;
12use percent_encoding::percent_encode;
13use std::collections::HashMap;
14use std::sync::{Arc, RwLock, Weak};
15use std::time::Duration;
16use tokio::sync::Notify;
17use tokio::time::sleep;
18
19use super::SpecsInfo;
20
21pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
22pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
23
24const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
25pub struct StatsigHttpSpecsAdapter {
26    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
27    network: NetworkClient,
28    sdk_key: String,
29    specs_url: String,
30    fallback_url: Option<String>,
31    sync_interval_duration: Duration,
32    ops_stats: Arc<OpsStatsForInstance>,
33    shutdown_notify: Arc<Notify>,
34}
35
36impl StatsigHttpSpecsAdapter {
37    #[must_use]
38    pub fn new(
39        sdk_key: &str,
40        specs_url: Option<&String>,
41        fallback_to_statsig_api: bool,
42        sync_interval: Option<u32>,
43        disable_network: Option<bool>,
44    ) -> Self {
45        let specs_url = specs_url
46            .map(|u| u.to_string())
47            .unwrap_or(DEFAULT_SPECS_URL.to_string());
48        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
49        let fallback_url = if fallback_to_statsig_api && specs_url != DEFAULT_SPECS_URL {
50            Some(DEFAULT_SPECS_URL.to_string())
51        } else {
52            None
53        };
54
55        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
56
57        Self {
58            listener: RwLock::new(None),
59            network: NetworkClient::new(sdk_key, Some(headers), disable_network),
60            sdk_key: sdk_key.to_string(),
61            specs_url,
62            fallback_url,
63            sync_interval_duration: Duration::from_millis(u64::from(
64                sync_interval.unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
65            )),
66            ops_stats: OPS_STATS.get_for_instance(sdk_key),
67            shutdown_notify: Arc::new(Notify::new()),
68        }
69    }
70
71    pub async fn fetch_specs_from_network(
72        &self,
73        current_specs_info: SpecsInfo,
74    ) -> Result<Vec<u8>, NetworkError> {
75        let request_args = self.get_request_args(&current_specs_info);
76        match self.handle_specs_request(request_args).await {
77            Ok(response) => Ok(response),
78            Err(e) => Err(e),
79        }
80    }
81
82    fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
83        let mut params = HashMap::new();
84        if let Some(lcut) = current_specs_info.lcut {
85            params.insert("sinceTime".to_string(), lcut.to_string());
86        }
87        if let Some(cs) = &current_specs_info.checksum {
88            params.insert(
89                "checksum".to_string(),
90                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
91            );
92        }
93
94        RequestArgs {
95            url: construct_specs_url(
96                self.specs_url.as_str(),
97                self.sdk_key.as_str(),
98                current_specs_info.zstd_dict_id.as_deref(),
99            ),
100            query_params: Some(params),
101            accept_gzip_response: true,
102            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
103            ..RequestArgs::new()
104        }
105    }
106
107    async fn handle_fallback_request(
108        &self,
109        mut request_args: RequestArgs,
110        current_specs_info: SpecsInfo,
111    ) -> Result<Vec<u8>, NetworkError> {
112        let fallback_url = match &self.fallback_url {
113            Some(url) => construct_specs_url(
114                url.as_str(),
115                &self.sdk_key,
116                current_specs_info.zstd_dict_id.as_deref(),
117            ),
118            None => return Err(NetworkError::RequestFailed),
119        };
120
121        request_args.url = fallback_url;
122
123        // TODO logging
124
125        let response = self.handle_specs_request(request_args).await?;
126        Ok(response)
127    }
128
129    // TODO: return a decompressed and parsed SpecsResponse
130    async fn handle_specs_request(
131        &self,
132        request_args: RequestArgs,
133    ) -> Result<Vec<u8>, NetworkError> {
134        let response = self.network.get(request_args).await?;
135        match response.data {
136            Some(data) => Ok(data),
137            None => Err(NetworkError::RequestFailed),
138        }
139    }
140
141    pub async fn run_background_sync(weak_self: &Weak<Self>) {
142        let strong_self = if let Some(s) = weak_self.upgrade() {
143            s
144        } else {
145            log_e!(TAG, "No strong reference found");
146            return;
147        };
148
149        let specs_info = match strong_self.listener.read() {
150            Ok(lock) => match lock.as_ref() {
151                Some(listener) => listener.get_current_specs_info(),
152                None => SpecsInfo::empty(),
153            },
154            Err(_) => SpecsInfo::error(),
155        };
156
157        strong_self
158            .ops_stats
159            .set_diagnostics_context(ContextType::ConfigSync);
160        if let Err(e) = strong_self.manually_sync_specs(specs_info).await {
161            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _) = e {
162                return;
163            }
164            log_e!(TAG, "Background specs sync failed: {}", e);
165        }
166        strong_self.ops_stats.enqueue_diagnostics_event(
167            Some(KeyType::DownloadConfigSpecs),
168            Some(ContextType::ConfigSync),
169        );
170    }
171
172    async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
173        if let Ok(lock) = self.listener.read() {
174            if lock.is_none() {
175                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
176            }
177        }
178
179        let response = self
180            .fetch_specs_from_network(current_specs_info.clone())
181            .await;
182        let result = self.process_spec_data(response).await;
183
184        if result.is_err() && self.fallback_url.is_some() {
185            log_d!(TAG, "Falling back to statsig api");
186            let request_args = self.get_request_args(&current_specs_info);
187            let response = self
188                .handle_fallback_request(request_args, current_specs_info)
189                .await;
190            return self.process_spec_data(response).await;
191        }
192
193        result
194    }
195
196    async fn process_spec_data(
197        &self,
198        response: Result<Vec<u8>, NetworkError>,
199    ) -> Result<(), StatsigErr> {
200        let data = response.map_err(|e| {
201            let msg = "No specs result from network";
202            StatsigErr::NetworkError(e, Some(msg.to_string()))
203        })?;
204
205        let update = SpecsUpdate {
206            data,
207            source: SpecsSource::Network,
208            received_at: Utc::now().timestamp_millis() as u64,
209        };
210
211        self.ops_stats.add_marker(
212            Marker::new(
213                KeyType::DownloadConfigSpecs,
214                ActionType::Start,
215                Some(StepType::Process),
216            ),
217            None,
218        );
219
220        let result = match self.listener.read() {
221            Ok(lock) => match lock.as_ref() {
222                Some(listener) => listener.did_receive_specs_update(update),
223                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
224            },
225            Err(e) => {
226                let err = StatsigErr::LockFailure(format!(
227                    "Failed to acquire read lock on listener: {}",
228                    e
229                ));
230                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
231                Err(err)
232            }
233        };
234
235        self.ops_stats.add_marker(
236            Marker::new(
237                KeyType::DownloadConfigSpecs,
238                ActionType::End,
239                Some(StepType::Process),
240            )
241            .with_is_success(result.is_ok()),
242            None,
243        );
244
245        result
246    }
247}
248
249#[async_trait]
250impl SpecsAdapter for StatsigHttpSpecsAdapter {
251    async fn start(
252        self: Arc<Self>,
253        _statsig_runtime: &Arc<StatsigRuntime>,
254    ) -> Result<(), StatsigErr> {
255        let specs_info = match self.listener.read() {
256            Ok(lock) => match lock.as_ref() {
257                Some(listener) => listener.get_current_specs_info(),
258                None => SpecsInfo::empty(),
259            },
260            Err(_) => SpecsInfo::error(),
261        };
262        self.manually_sync_specs(specs_info).await
263    }
264
265    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
266        match self.listener.write() {
267            Ok(mut lock) => *lock = Some(listener),
268            Err(e) => {
269                log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
270            }
271        }
272    }
273
274    async fn schedule_background_sync(
275        self: Arc<Self>,
276        statsig_runtime: &Arc<StatsigRuntime>,
277    ) -> Result<(), StatsigErr> {
278        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
279        let interval_duration = self.sync_interval_duration;
280        let shutdown_notify = self.shutdown_notify.clone();
281
282        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
283            loop {
284                tokio::select! {
285                    () = sleep(interval_duration) => {
286                        Self::run_background_sync(&weak_self).await;
287                    }
288                    () = rt_shutdown_notify.notified() => {
289                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
290                        break;
291                    },
292                    () = shutdown_notify.notified() => {
293                        log_d!(TAG, "Shutting down specs background sync");
294                        break;
295                    }
296                }
297            }
298        });
299
300        Ok(())
301    }
302
303    async fn shutdown(
304        &self,
305        _timeout: Duration,
306        _statsig_runtime: &Arc<StatsigRuntime>,
307    ) -> Result<(), StatsigErr> {
308        self.shutdown_notify.notify_one();
309        Ok(())
310    }
311
312    fn get_type_name(&self) -> String {
313        stringify!(StatsigHttpSpecsAdapter).to_string()
314    }
315}
316
317#[allow(unused)]
318fn construct_specs_url(spec_url: &str, sdk_key: &str, dict_id: Option<&str>) -> String {
319    #[cfg(feature = "with_shared_dict_compression")]
320    {
321        let dict_id = dict_id.unwrap_or("null");
322        format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
323    }
324    #[cfg(not(feature = "with_shared_dict_compression"))]
325    format!("{spec_url}/{sdk_key}.json")
326}