statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::{
10    log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
11};
12use async_trait::async_trait;
13use chrono::Utc;
14use percent_encoding::percent_encode;
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock, Weak};
17use std::time::Duration;
18use tokio::sync::Notify;
19use tokio::time::sleep;
20
21use super::{ConfigCompressionMode, SpecsInfo};
22
23pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
24pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
25
26const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
27pub struct StatsigHttpSpecsAdapter {
28    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
29    network: NetworkClient,
30    sdk_key: String,
31    specs_url: String,
32    fallback_url: Option<String>,
33    sync_interval_duration: Duration,
34    config_compression_mode: ConfigCompressionMode,
35    ops_stats: Arc<OpsStatsForInstance>,
36    shutdown_notify: Arc<Notify>,
37}
38
39impl StatsigHttpSpecsAdapter {
40    #[must_use]
41    pub fn new(sdk_key: &str, options: Option<&StatsigOptions>) -> Self {
42        let default_options = StatsigOptions::default();
43        let options_ref = options.unwrap_or(&default_options);
44
45        let specs_url = options_ref
46            .specs_url
47            .as_ref()
48            .map(|u| u.to_string())
49            .unwrap_or(DEFAULT_SPECS_URL.to_string());
50
51        // only fallback when the spec_url is not the DEFAULT_SPECS_URL
52        let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
53            && specs_url != DEFAULT_SPECS_URL
54        {
55            Some(DEFAULT_SPECS_URL.to_string())
56        } else {
57            None
58        };
59
60        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
61
62        Self {
63            listener: RwLock::new(None),
64            network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
65            sdk_key: sdk_key.to_string(),
66            specs_url,
67            fallback_url,
68            sync_interval_duration: Duration::from_millis(u64::from(
69                options_ref
70                    .specs_sync_interval_ms
71                    .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
72            )),
73            ops_stats: OPS_STATS.get_for_instance(sdk_key),
74            shutdown_notify: Arc::new(Notify::new()),
75            config_compression_mode: options_ref
76                .config_compression_mode
77                .clone()
78                .unwrap_or(ConfigCompressionMode::Gzip),
79        }
80    }
81
82    pub async fn fetch_specs_from_network(
83        &self,
84        current_specs_info: SpecsInfo,
85    ) -> Result<Vec<u8>, NetworkError> {
86        let request_args = self.get_request_args(&current_specs_info);
87        match self.handle_specs_request(request_args).await {
88            Ok(response) => Ok(response),
89            Err(e) => Err(e),
90        }
91    }
92
93    fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
94        let mut params = HashMap::new();
95        if let Some(lcut) = current_specs_info.lcut {
96            params.insert("sinceTime".to_string(), lcut.to_string());
97        }
98        if let Some(cs) = &current_specs_info.checksum {
99            params.insert(
100                "checksum".to_string(),
101                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
102            );
103        }
104
105        RequestArgs {
106            url: construct_specs_url(
107                &self.config_compression_mode,
108                self.specs_url.as_str(),
109                self.sdk_key.as_str(),
110                current_specs_info.zstd_dict_id.as_deref(),
111            ),
112            query_params: Some(params),
113            accept_gzip_response: true,
114            diagnostics_key: Some(KeyType::DownloadConfigSpecs),
115            ..RequestArgs::new()
116        }
117    }
118
119    async fn handle_fallback_request(
120        &self,
121        mut request_args: RequestArgs,
122        current_specs_info: SpecsInfo,
123    ) -> Result<Vec<u8>, NetworkError> {
124        let fallback_url = match &self.fallback_url {
125            Some(url) => construct_specs_url(
126                &self.config_compression_mode,
127                url.as_str(),
128                &self.sdk_key,
129                current_specs_info.zstd_dict_id.as_deref(),
130            ),
131            None => return Err(NetworkError::RequestFailed),
132        };
133
134        request_args.url = fallback_url;
135
136        // TODO logging
137
138        let response = self.handle_specs_request(request_args).await?;
139        Ok(response)
140    }
141
142    // TODO: return a decompressed and parsed SpecsResponse
143    async fn handle_specs_request(
144        &self,
145        request_args: RequestArgs,
146    ) -> Result<Vec<u8>, NetworkError> {
147        let response = self.network.get(request_args).await?;
148        match response.data {
149            Some(data) => Ok(data),
150            None => Err(NetworkError::RequestFailed),
151        }
152    }
153
154    pub async fn run_background_sync(weak_self: &Weak<Self>) {
155        let strong_self = if let Some(s) = weak_self.upgrade() {
156            s
157        } else {
158            log_e!(TAG, "No strong reference found");
159            return;
160        };
161
162        let specs_info = match strong_self.listener.read() {
163            Ok(lock) => match lock.as_ref() {
164                Some(listener) => listener.get_current_specs_info(),
165                None => SpecsInfo::empty(),
166            },
167            Err(_) => SpecsInfo::error(),
168        };
169
170        strong_self
171            .ops_stats
172            .set_diagnostics_context(ContextType::ConfigSync);
173        if let Err(e) = strong_self.manually_sync_specs(specs_info).await {
174            if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _) = e {
175                return;
176            }
177            log_e!(TAG, "Background specs sync failed: {}", e);
178        }
179        strong_self.ops_stats.enqueue_diagnostics_event(
180            Some(KeyType::DownloadConfigSpecs),
181            Some(ContextType::ConfigSync),
182        );
183    }
184
185    async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
186        if let Ok(lock) = self.listener.read() {
187            if lock.is_none() {
188                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
189            }
190        }
191
192        let response = self
193            .fetch_specs_from_network(current_specs_info.clone())
194            .await;
195        let result = self.process_spec_data(response).await;
196
197        if result.is_err() && self.fallback_url.is_some() {
198            log_d!(TAG, "Falling back to statsig api");
199            let request_args = self.get_request_args(&current_specs_info);
200            let response = self
201                .handle_fallback_request(request_args, current_specs_info)
202                .await;
203            return self.process_spec_data(response).await;
204        }
205
206        result
207    }
208
209    async fn process_spec_data(
210        &self,
211        response: Result<Vec<u8>, NetworkError>,
212    ) -> Result<(), StatsigErr> {
213        let data = response.map_err(|e| {
214            let msg = "No specs result from network";
215            StatsigErr::NetworkError(e, Some(msg.to_string()))
216        })?;
217
218        let update = SpecsUpdate {
219            data,
220            source: SpecsSource::Network,
221            received_at: Utc::now().timestamp_millis() as u64,
222        };
223
224        self.ops_stats.add_marker(
225            Marker::new(
226                KeyType::DownloadConfigSpecs,
227                ActionType::Start,
228                Some(StepType::Process),
229            ),
230            None,
231        );
232
233        let result = match self.listener.read() {
234            Ok(lock) => match lock.as_ref() {
235                Some(listener) => listener.did_receive_specs_update(update),
236                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
237            },
238            Err(e) => {
239                let err = StatsigErr::LockFailure(format!(
240                    "Failed to acquire read lock on listener: {}",
241                    e
242                ));
243                log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
244                Err(err)
245            }
246        };
247
248        self.ops_stats.add_marker(
249            Marker::new(
250                KeyType::DownloadConfigSpecs,
251                ActionType::End,
252                Some(StepType::Process),
253            )
254            .with_is_success(result.is_ok()),
255            None,
256        );
257
258        result
259    }
260}
261
262#[async_trait]
263impl SpecsAdapter for StatsigHttpSpecsAdapter {
264    async fn start(
265        self: Arc<Self>,
266        _statsig_runtime: &Arc<StatsigRuntime>,
267    ) -> Result<(), StatsigErr> {
268        let specs_info = match self.listener.read() {
269            Ok(lock) => match lock.as_ref() {
270                Some(listener) => listener.get_current_specs_info(),
271                None => SpecsInfo::empty(),
272            },
273            Err(_) => SpecsInfo::error(),
274        };
275        self.manually_sync_specs(specs_info).await
276    }
277
278    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
279        match self.listener.write() {
280            Ok(mut lock) => *lock = Some(listener),
281            Err(e) => {
282                log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
283            }
284        }
285    }
286
287    async fn schedule_background_sync(
288        self: Arc<Self>,
289        statsig_runtime: &Arc<StatsigRuntime>,
290    ) -> Result<(), StatsigErr> {
291        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
292        let interval_duration = self.sync_interval_duration;
293        let shutdown_notify = self.shutdown_notify.clone();
294
295        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
296            loop {
297                tokio::select! {
298                    () = sleep(interval_duration) => {
299                        Self::run_background_sync(&weak_self).await;
300                    }
301                    () = rt_shutdown_notify.notified() => {
302                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
303                        break;
304                    },
305                    () = shutdown_notify.notified() => {
306                        log_d!(TAG, "Shutting down specs background sync");
307                        break;
308                    }
309                }
310            }
311        });
312
313        Ok(())
314    }
315
316    async fn shutdown(
317        &self,
318        _timeout: Duration,
319        _statsig_runtime: &Arc<StatsigRuntime>,
320    ) -> Result<(), StatsigErr> {
321        self.shutdown_notify.notify_one();
322        Ok(())
323    }
324
325    fn get_type_name(&self) -> String {
326        stringify!(StatsigHttpSpecsAdapter).to_string()
327    }
328}
329
330fn construct_specs_url(
331    compression_mode: &ConfigCompressionMode,
332    spec_url: &str,
333    sdk_key: &str,
334    dict_id: Option<&str>,
335) -> String {
336    match compression_mode {
337        ConfigCompressionMode::Gzip => format!("{spec_url}/{sdk_key}.json"),
338        ConfigCompressionMode::Dictionary => {
339            let dict_id = dict_id.unwrap_or("null");
340            format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
341        }
342    }
343}