statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::{
10    log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
11};
12use async_trait::async_trait;
13use chrono::Utc;
14use percent_encoding::percent_encode;
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock, Weak};
17use std::time::Duration;
18use tokio::sync::Notify;
19use tokio::time::sleep;
20
21use super::SpecsInfo;
22
23pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
24pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
25
26const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
27pub struct StatsigHttpSpecsAdapter {
28    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
29    network: NetworkClient,
30    sdk_key: String,
31    specs_url: String,
32    fallback_url: Option<String>,
33    sync_interval_duration: Duration,
34    ops_stats: Arc<OpsStatsForInstance>,
35    shutdown_notify: Arc<Notify>,
36}
37
38impl StatsigHttpSpecsAdapter {
39    #[must_use]
40    pub fn new(sdk_key: &str, options: Option<&StatsigOptions>) -> Self {
41        let default_options = StatsigOptions::default();
42        let options_ref = options.unwrap_or(&default_options);
43
44        let specs_url = options_ref
45            .specs_url
46            .as_ref()
47            .map(|u| u.to_string())
48            .unwrap_or(DEFAULT_SPECS_URL.to_string());
49
50        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
51        let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
52            && specs_url != DEFAULT_SPECS_URL
53        {
54            Some(DEFAULT_SPECS_URL.to_string())
55        } else {
56            None
57        };
58
59        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
60
61        Self {
62            listener: RwLock::new(None),
63            network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
64            sdk_key: sdk_key.to_string(),
65            specs_url,
66            fallback_url,
67            sync_interval_duration: Duration::from_millis(u64::from(
68                options_ref
69                    .specs_sync_interval_ms
70                    .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
71            )),
72            ops_stats: OPS_STATS.get_for_instance(sdk_key),
73            shutdown_notify: Arc::new(Notify::new()),
74        }
75    }
76
77    pub async fn fetch_specs_from_network(
78        &self,
79        current_specs_info: SpecsInfo,
80    ) -> Result<Vec<u8>, NetworkError> {
81        let request_args = self.get_request_args(&current_specs_info);
82        match self.handle_specs_request(request_args).await {
83            Ok(response) => Ok(response),
84            Err(e) => Err(e),
85        }
86    }
87
88    fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
89        let mut params = HashMap::new();
90        if let Some(lcut) = current_specs_info.lcut {
91            params.insert("sinceTime".to_string(), lcut.to_string());
92        }
93        if let Some(cs) = &current_specs_info.checksum {
94            params.insert(
95                "checksum".to_string(),
96                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
97            );
98        }
99
100        RequestArgs {
101            url: construct_specs_url(
102                self.specs_url.as_str(),
103                self.sdk_key.as_str(),
104                current_specs_info.zstd_dict_id.as_deref(),
105            ),
106            query_params: Some(params),
107            accept_gzip_response: true,
108            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
109            ..RequestArgs::new()
110        }
111    }
112
113    async fn handle_fallback_request(
114        &self,
115        mut request_args: RequestArgs,
116        current_specs_info: SpecsInfo,
117    ) -> Result<Vec<u8>, NetworkError> {
118        let fallback_url = match &self.fallback_url {
119            Some(url) => construct_specs_url(
120                url.as_str(),
121                &self.sdk_key,
122                current_specs_info.zstd_dict_id.as_deref(),
123            ),
124            None => return Err(NetworkError::RequestFailed),
125        };
126
127        request_args.url = fallback_url;
128
129        // TODO logging
130
131        let response = self.handle_specs_request(request_args).await?;
132        Ok(response)
133    }
134
135    // TODO: return a decompressed and parsed SpecsResponse
136    async fn handle_specs_request(
137        &self,
138        request_args: RequestArgs,
139    ) -> Result<Vec<u8>, NetworkError> {
140        let response = self.network.get(request_args).await?;
141        match response.data {
142            Some(data) => Ok(data),
143            None => Err(NetworkError::RequestFailed),
144        }
145    }
146
147    pub async fn run_background_sync(weak_self: &Weak<Self>) {
148        let strong_self = if let Some(s) = weak_self.upgrade() {
149            s
150        } else {
151            log_e!(TAG, "No strong reference found");
152            return;
153        };
154
155        let specs_info = match strong_self.listener.read() {
156            Ok(lock) => match lock.as_ref() {
157                Some(listener) => listener.get_current_specs_info(),
158                None => SpecsInfo::empty(),
159            },
160            Err(_) => SpecsInfo::error(),
161        };
162
163        strong_self
164            .ops_stats
165            .set_diagnostics_context(ContextType::ConfigSync);
166        if let Err(e) = strong_self.manually_sync_specs(specs_info).await {
167            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _) = e {
168                return;
169            }
170            log_e!(TAG, "Background specs sync failed: {}", e);
171        }
172        strong_self.ops_stats.enqueue_diagnostics_event(
173            Some(KeyType::DownloadConfigSpecs),
174            Some(ContextType::ConfigSync),
175        );
176    }
177
178    async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
179        if let Ok(lock) = self.listener.read() {
180            if lock.is_none() {
181                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
182            }
183        }
184
185        let response = self
186            .fetch_specs_from_network(current_specs_info.clone())
187            .await;
188        let result = self.process_spec_data(response).await;
189
190        if result.is_err() && self.fallback_url.is_some() {
191            log_d!(TAG, "Falling back to statsig api");
192            let request_args = self.get_request_args(&current_specs_info);
193            let response = self
194                .handle_fallback_request(request_args, current_specs_info)
195                .await;
196            return self.process_spec_data(response).await;
197        }
198
199        result
200    }
201
202    async fn process_spec_data(
203        &self,
204        response: Result<Vec<u8>, NetworkError>,
205    ) -> Result<(), StatsigErr> {
206        let data = response.map_err(|e| {
207            let msg = "No specs result from network";
208            StatsigErr::NetworkError(e, Some(msg.to_string()))
209        })?;
210
211        let update = SpecsUpdate {
212            data,
213            source: SpecsSource::Network,
214            received_at: Utc::now().timestamp_millis() as u64,
215        };
216
217        self.ops_stats.add_marker(
218            Marker::new(
219                KeyType::DownloadConfigSpecs,
220                ActionType::Start,
221                Some(StepType::Process),
222            ),
223            None,
224        );
225
226        let result = match self.listener.read() {
227            Ok(lock) => match lock.as_ref() {
228                Some(listener) => listener.did_receive_specs_update(update),
229                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
230            },
231            Err(e) => {
232                let err = StatsigErr::LockFailure(format!(
233                    "Failed to acquire read lock on listener: {}",
234                    e
235                ));
236                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
237                Err(err)
238            }
239        };
240
241        self.ops_stats.add_marker(
242            Marker::new(
243                KeyType::DownloadConfigSpecs,
244                ActionType::End,
245                Some(StepType::Process),
246            )
247            .with_is_success(result.is_ok()),
248            None,
249        );
250
251        result
252    }
253}
254
255#[async_trait]
256impl SpecsAdapter for StatsigHttpSpecsAdapter {
257    async fn start(
258        self: Arc<Self>,
259        _statsig_runtime: &Arc<StatsigRuntime>,
260    ) -> Result<(), StatsigErr> {
261        let specs_info = match self.listener.read() {
262            Ok(lock) => match lock.as_ref() {
263                Some(listener) => listener.get_current_specs_info(),
264                None => SpecsInfo::empty(),
265            },
266            Err(_) => SpecsInfo::error(),
267        };
268        self.manually_sync_specs(specs_info).await
269    }
270
271    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
272        match self.listener.write() {
273            Ok(mut lock) => *lock = Some(listener),
274            Err(e) => {
275                log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
276            }
277        }
278    }
279
280    async fn schedule_background_sync(
281        self: Arc<Self>,
282        statsig_runtime: &Arc<StatsigRuntime>,
283    ) -> Result<(), StatsigErr> {
284        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
285        let interval_duration = self.sync_interval_duration;
286        let shutdown_notify = self.shutdown_notify.clone();
287
288        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
289            loop {
290                tokio::select! {
291                    () = sleep(interval_duration) => {
292                        Self::run_background_sync(&weak_self).await;
293                    }
294                    () = rt_shutdown_notify.notified() => {
295                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
296                        break;
297                    },
298                    () = shutdown_notify.notified() => {
299                        log_d!(TAG, "Shutting down specs background sync");
300                        break;
301                    }
302                }
303            }
304        });
305
306        Ok(())
307    }
308
309    async fn shutdown(
310        &self,
311        _timeout: Duration,
312        _statsig_runtime: &Arc<StatsigRuntime>,
313    ) -> Result<(), StatsigErr> {
314        self.shutdown_notify.notify_one();
315        Ok(())
316    }
317
318    fn get_type_name(&self) -> String {
319        stringify!(StatsigHttpSpecsAdapter).to_string()
320    }
321}
322
323#[allow(unused)]
324fn construct_specs_url(spec_url: &str, sdk_key: &str, dict_id: Option<&str>) -> String {
325    #[cfg(feature = "with_shared_dict_compression")]
326    {
327        let dict_id = dict_id.unwrap_or("null");
328        format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
329    }
330    #[cfg(not(feature = "with_shared_dict_compression"))]
331    format!("{spec_url}/{sdk_key}.json")
332}