statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::{
11    log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
12};
13use async_trait::async_trait;
14use chrono::Utc;
15use percent_encoding::percent_encode;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock, Weak};
18use std::time::Duration;
19use tokio::sync::Notify;
20use tokio::time::sleep;
21
22use super::{ConfigCompressionMode, SpecsInfo};
23
24pub struct NetworkResponse {
25    pub data: Vec<u8>,
26    pub api: String,
27}
28
29pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
30pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
31pub const INIT_DICT_ID: &str = "null";
32
33const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
34pub struct StatsigHttpSpecsAdapter {
35    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
36    network: NetworkClient,
37    sdk_key: String,
38    specs_url: String,
39    fallback_url: Option<String>,
40    sync_interval_duration: Duration,
41    config_compression_mode: ConfigCompressionMode,
42    ops_stats: Arc<OpsStatsForInstance>,
43    shutdown_notify: Arc<Notify>,
44}
45
46impl StatsigHttpSpecsAdapter {
47    #[must_use]
48    pub fn new(
49        sdk_key: &str,
50        options: Option<&StatsigOptions>,
51        override_url: Option<String>,
52    ) -> Self {
53        let default_options = StatsigOptions::default();
54        let options_ref = options.unwrap_or(&default_options);
55
56        let specs_url = match override_url {
57            Some(url) => url,
58            None => options_ref
59                .specs_url
60                .as_ref()
61                .map(|u| u.to_string())
62                .unwrap_or(DEFAULT_SPECS_URL.to_string()),
63        };
64
65        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
66        let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
67            && specs_url != DEFAULT_SPECS_URL
68        {
69            Some(DEFAULT_SPECS_URL.to_string())
70        } else {
71            None
72        };
73
74        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
75
76        Self {
77            listener: RwLock::new(None),
78            network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
79            sdk_key: sdk_key.to_string(),
80            specs_url,
81            fallback_url,
82            sync_interval_duration: Duration::from_millis(u64::from(
83                options_ref
84                    .specs_sync_interval_ms
85                    .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
86            )),
87            ops_stats: OPS_STATS.get_for_instance(sdk_key),
88            shutdown_notify: Arc::new(Notify::new()),
89            config_compression_mode: options_ref
90                .config_compression_mode
91                .clone()
92                .unwrap_or(ConfigCompressionMode::Gzip),
93        }
94    }
95
96    pub async fn fetch_specs_from_network(
97        &self,
98        current_specs_info: SpecsInfo,
99    ) -> Result<NetworkResponse, NetworkError> {
100        let request_args = self.get_request_args(&current_specs_info);
101        let url = request_args.url.clone();
102        match self.handle_specs_request(request_args).await {
103            Ok(response) => Ok(NetworkResponse {
104                data: response,
105                api: get_api_from_url(&url),
106            }),
107            Err(e) => Err(e),
108        }
109    }
110
111    fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
112        let mut params = HashMap::new();
113        if let Some(lcut) = current_specs_info.lcut {
114            params.insert("sinceTime".to_string(), lcut.to_string());
115        }
116        if let Some(cs) = &current_specs_info.checksum {
117            params.insert(
118                "checksum".to_string(),
119                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
120            );
121        }
122
123        RequestArgs {
124            url: construct_specs_url(
125                &self.config_compression_mode,
126                self.specs_url.as_str(),
127                self.sdk_key.as_str(),
128                current_specs_info.zstd_dict_id.as_deref(),
129            ),
130            query_params: Some(params),
131            accept_gzip_response: true,
132            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
133            ..RequestArgs::new()
134        }
135    }
136
137    async fn handle_fallback_request(
138        &self,
139        mut request_args: RequestArgs,
140        current_specs_info: SpecsInfo,
141    ) -> Result<NetworkResponse, NetworkError> {
142        let fallback_url = match &self.fallback_url {
143            Some(url) => construct_specs_url(
144                &self.config_compression_mode,
145                url.as_str(),
146                &self.sdk_key,
147                current_specs_info.zstd_dict_id.as_deref(),
148            ),
149            None => return Err(NetworkError::RequestFailed),
150        };
151
152        request_args.url = fallback_url.clone();
153
154        // TODO logging
155
156        let response = self.handle_specs_request(request_args).await?;
157        Ok(NetworkResponse {
158            data: response,
159            api: get_api_from_url(&fallback_url),
160        })
161    }
162
163    // TODO: return a decompressed and parsed SpecsResponse
164    async fn handle_specs_request(
165        &self,
166        request_args: RequestArgs,
167    ) -> Result<Vec<u8>, NetworkError> {
168        let response = self.network.get(request_args).await?;
169        match response.data {
170            Some(data) => Ok(data),
171            None => Err(NetworkError::RequestFailed),
172        }
173    }
174
175    pub async fn run_background_sync(weak_self: &Weak<Self>) {
176        let strong_self = if let Some(s) = weak_self.upgrade() {
177            s
178        } else {
179            log_e!(TAG, "No strong reference found");
180            return;
181        };
182
183        let specs_info = match strong_self.listener.read() {
184            Ok(lock) => match lock.as_ref() {
185                Some(listener) => listener.get_current_specs_info(),
186                None => SpecsInfo::empty(),
187            },
188            Err(_) => SpecsInfo::error(),
189        };
190
191        strong_self
192            .ops_stats
193            .set_diagnostics_context(ContextType::ConfigSync);
194        if let Err(e) = strong_self.manually_sync_specs(specs_info).await {
195            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _) = e {
196                return;
197            }
198            log_e!(TAG, "Background specs sync failed: {}", e);
199        }
200        strong_self.ops_stats.enqueue_diagnostics_event(
201            Some(KeyType::DownloadConfigSpecs),
202            Some(ContextType::ConfigSync),
203        );
204    }
205
206    async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
207        if let Ok(lock) = self.listener.read() {
208            if lock.is_none() {
209                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
210            }
211        }
212
213        let response = self
214            .fetch_specs_from_network(current_specs_info.clone())
215            .await;
216        let result = self.process_spec_data(response).await;
217
218        if result.is_err() && self.fallback_url.is_some() {
219            log_d!(TAG, "Falling back to statsig api");
220            let response = self
221                .handle_fallback_request(
222                    self.get_request_args(&current_specs_info),
223                    current_specs_info,
224                )
225                .await;
226            return self.process_spec_data(response).await;
227        }
228
229        result
230    }
231
232    async fn process_spec_data(
233        &self,
234        response: Result<NetworkResponse, NetworkError>,
235    ) -> Result<(), StatsigErr> {
236        let resp = response.map_err(|e| {
237            let msg = "No specs result from network";
238            StatsigErr::NetworkError(e, Some(msg.to_string()))
239        })?;
240
241        let update = SpecsUpdate {
242            data: resp.data,
243            source: SpecsSource::Network,
244            received_at: Utc::now().timestamp_millis() as u64,
245            source_api: Some(resp.api),
246        };
247
248        self.ops_stats.add_marker(
249            Marker::new(
250                KeyType::DownloadConfigSpecs,
251                ActionType::Start,
252                Some(StepType::Process),
253            ),
254            None,
255        );
256
257        let result = match self.listener.read() {
258            Ok(lock) => match lock.as_ref() {
259                Some(listener) => listener.did_receive_specs_update(update),
260                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
261            },
262            Err(e) => {
263                let err = StatsigErr::LockFailure(format!(
264                    "Failed to acquire read lock on listener: {}",
265                    e
266                ));
267                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
268                Err(err)
269            }
270        };
271
272        self.ops_stats.add_marker(
273            Marker::new(
274                KeyType::DownloadConfigSpecs,
275                ActionType::End,
276                Some(StepType::Process),
277            )
278            .with_is_success(result.is_ok()),
279            None,
280        );
281
282        result
283    }
284}
285
286#[async_trait]
287impl SpecsAdapter for StatsigHttpSpecsAdapter {
288    async fn start(
289        self: Arc<Self>,
290        _statsig_runtime: &Arc<StatsigRuntime>,
291    ) -> Result<(), StatsigErr> {
292        let specs_info = match self.listener.read() {
293            Ok(lock) => match lock.as_ref() {
294                Some(listener) => listener.get_current_specs_info(),
295                None => SpecsInfo::empty(),
296            },
297            Err(_) => SpecsInfo::error(),
298        };
299        self.manually_sync_specs(specs_info).await
300    }
301
302    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
303        match self.listener.write() {
304            Ok(mut lock) => *lock = Some(listener),
305            Err(e) => {
306                log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
307            }
308        }
309    }
310
311    async fn schedule_background_sync(
312        self: Arc<Self>,
313        statsig_runtime: &Arc<StatsigRuntime>,
314    ) -> Result<(), StatsigErr> {
315        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
316        let interval_duration = self.sync_interval_duration;
317        let shutdown_notify = self.shutdown_notify.clone();
318
319        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
320            loop {
321                tokio::select! {
322                    () = sleep(interval_duration) => {
323                        Self::run_background_sync(&weak_self).await;
324                    }
325                    () = rt_shutdown_notify.notified() => {
326                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
327                        break;
328                    },
329                    () = shutdown_notify.notified() => {
330                        log_d!(TAG, "Shutting down specs background sync");
331                        break;
332                    }
333                }
334            }
335        });
336
337        Ok(())
338    }
339
340    async fn shutdown(
341        &self,
342        _timeout: Duration,
343        _statsig_runtime: &Arc<StatsigRuntime>,
344    ) -> Result<(), StatsigErr> {
345        self.shutdown_notify.notify_one();
346        Ok(())
347    }
348
349    fn get_type_name(&self) -> String {
350        stringify!(StatsigHttpSpecsAdapter).to_string()
351    }
352}
353
354fn construct_specs_url(
355    compression_mode: &ConfigCompressionMode,
356    spec_url: &str,
357    sdk_key: &str,
358    dict_id: Option<&str>,
359) -> String {
360    match compression_mode {
361        ConfigCompressionMode::Gzip => format!("{spec_url}/{sdk_key}.json"),
362        ConfigCompressionMode::Dictionary => {
363            let dict_id = dict_id.unwrap_or(INIT_DICT_ID);
364            format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
365        }
366    }
367}