statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs

1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
5use crate::statsig_err::StatsigErr;
6use crate::statsig_metadata::StatsigMetadata;
7use crate::{log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigRuntime};
8use async_trait::async_trait;
9use chrono::Utc;
10use percent_encoding::percent_encode;
11use std::collections::HashMap;
12use std::sync::{Arc, RwLock, Weak};
13use std::time::Duration;
14use tokio::sync::Notify;
15use tokio::time::sleep;
16
17use super::SpecsInfo;
18
19pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
20pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
21
22const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
23pub struct StatsigHttpSpecsAdapter {
24    listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
25    network: NetworkClient,
26    specs_url: String,
27    fallback_url: Option<String>,
28    sync_interval_duration: Duration,
29    ops_stats: Arc<OpsStatsForInstance>,
30    shutdown_notify: Arc<Notify>,
31}
32
33impl StatsigHttpSpecsAdapter {
34    #[must_use]
35    pub fn new(
36        sdk_key: &str,
37        specs_url: Option<&String>,
38        fallback_to_statsig_api: bool,
39        sync_interval: Option<u32>,
40    ) -> Self {
41        let fallback_url = if fallback_to_statsig_api {
42            construct_fallback_specs_url(sdk_key, specs_url)
43        } else {
44            None
45        };
46
47        let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
48
49        Self {
50            listener: RwLock::new(None),
51            network: NetworkClient::new(sdk_key, Some(headers)),
52            specs_url: construct_specs_url(sdk_key, specs_url),
53            fallback_url,
54            sync_interval_duration: Duration::from_millis(u64::from(
55                sync_interval.unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
56            )),
57            ops_stats: OPS_STATS.get_for_instance(sdk_key),
58            shutdown_notify: Arc::new(Notify::new()),
59        }
60    }
61
62    pub async fn fetch_specs_from_network(&self, current_specs_info: SpecsInfo) -> Option<String> {
63        let mut params = HashMap::new();
64        if let Some(lcut) = current_specs_info.lcut {
65            params.insert("sinceTime".to_string(), lcut.to_string());
66        }
67        if let Some(cs) = current_specs_info.checksum {
68            params.insert(
69                "checksum".to_string(),
70                percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
71            );
72        }
73
74        let request_args = RequestArgs {
75            url: self.specs_url.clone(),
76            retries: 2,
77            query_params: Some(params),
78            accept_gzip_response: true,
79            ..RequestArgs::new()
80        };
81
82        match self.network.get(request_args.clone()).await {
83            Ok(response) => Some(response),
84            Err(NetworkError::RetriesExhausted) => self.handle_fallback_request(request_args).await,
85            Err(_) => None,
86        }
87    }
88
89    async fn handle_fallback_request(&self, mut request_args: RequestArgs) -> Option<String> {
90        let fallback_url = match &self.fallback_url {
91            Some(url) => url.clone(),
92            None => return None,
93        };
94
95        request_args.url = fallback_url;
96
97        // TODO logging
98
99        match self.network.get(request_args).await {
100            Ok(response) => Some(response),
101            Err(_) => None,
102        }
103    }
104
105    pub async fn run_background_sync(weak_self: &Weak<Self>) {
106        let strong_self = if let Some(s) = weak_self.upgrade() {
107            s
108        } else {
109            log_e!(TAG, "No strong reference found");
110            return;
111        };
112
113        let specs_info = match strong_self.listener.read() {
114            Ok(lock) => match lock.as_ref() {
115                Some(listener) => listener.get_current_specs_info(),
116                None => SpecsInfo::empty(),
117            },
118            Err(_) => SpecsInfo::error(),
119        };
120
121        if let Err(e) = strong_self.manually_sync_specs(specs_info).await {
122            log_e!(TAG, "Background specs sync failed: {}", e);
123        }
124    }
125
126    async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
127        if let Ok(lock) = self.listener.read() {
128            if lock.is_none() {
129                return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
130            }
131        }
132
133        let response = self.fetch_specs_from_network(current_specs_info).await;
134
135        let data = if let Some(r) = response {
136            r
137        } else {
138            let msg = "No specs result from network";
139            log_e!(TAG, "{}", msg);
140            return Err(StatsigErr::NetworkError(msg.to_string()));
141        };
142
143        let update = SpecsUpdate {
144            data,
145            source: SpecsSource::Network,
146            received_at: Utc::now().timestamp_millis() as u64,
147        };
148
149        match self.listener.read() {
150            Ok(lock) => match lock.as_ref() {
151                Some(listener) => listener.did_receive_specs_update(update),
152                None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
153            },
154            Err(e) => {
155                log_error_to_statsig_and_console!(
156                    &self.ops_stats,
157                    TAG,
158                    "Failed to acquire read lock on listener: {}",
159                    e
160                );
161                Err(StatsigErr::LockFailure(e.to_string()))
162            }
163        }
164    }
165}
166
167#[async_trait]
168impl SpecsAdapter for StatsigHttpSpecsAdapter {
169    async fn start(
170        self: Arc<Self>,
171        _statsig_runtime: &Arc<StatsigRuntime>,
172    ) -> Result<(), StatsigErr> {
173        let specs_info = match self.listener.read() {
174            Ok(lock) => match lock.as_ref() {
175                Some(listener) => listener.get_current_specs_info(),
176                None => SpecsInfo::empty(),
177            },
178            Err(_) => SpecsInfo::error(),
179        };
180        self.manually_sync_specs(specs_info).await
181    }
182
183    fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
184        match self.listener.write() {
185            Ok(mut lock) => *lock = Some(listener),
186            Err(e) => {
187                log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
188            }
189        }
190    }
191
192    async fn schedule_background_sync(
193        self: Arc<Self>,
194        statsig_runtime: &Arc<StatsigRuntime>,
195    ) -> Result<(), StatsigErr> {
196        let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
197        let interval_duration = self.sync_interval_duration;
198        let shutdown_notify = self.shutdown_notify.clone();
199
200        statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
201            loop {
202                tokio::select! {
203                    () = sleep(interval_duration) => {
204                        Self::run_background_sync(&weak_self).await;
205                    }
206                    () = rt_shutdown_notify.notified() => {
207                        log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
208                        break;
209                    },
210                    () = shutdown_notify.notified() => {
211                        log_d!(TAG, "Shutting down specs background sync");
212                        break;
213                    }
214                }
215            }
216        });
217
218        Ok(())
219    }
220
221    async fn shutdown(
222        &self,
223        _timeout: Duration,
224        _statsig_runtime: &Arc<StatsigRuntime>,
225    ) -> Result<(), StatsigErr> {
226        self.shutdown_notify.notify_one();
227        Ok(())
228    }
229
230    fn get_type_name(&self) -> String {
231        stringify!(StatsigHttpSpecsAdapter).to_string()
232    }
233}
234
235fn construct_specs_url(sdk_key: &str, spec_url: Option<&String>) -> String {
236    let base = match spec_url {
237        Some(u) => u,
238        _ => DEFAULT_SPECS_URL,
239    };
240
241    format!("{base}/{sdk_key}.json")
242}
243
244// only fallback when the spec_url is not the DEFAULT_SPECS_URL
245fn construct_fallback_specs_url(sdk_key: &str, spec_url: Option<&String>) -> Option<String> {
246    match spec_url {
247        Some(u) if u != DEFAULT_SPECS_URL => Some(format!("{u}/{sdk_key}.json")),
248        _ => None,
249    }
250}