statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::{
11 log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
12};
13use async_trait::async_trait;
14use chrono::Utc;
15use percent_encoding::percent_encode;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock, Weak};
18use std::time::Duration;
19use tokio::sync::Notify;
20use tokio::time::sleep;
21
22use super::SpecsInfo;
23
24pub struct NetworkResponse {
25 pub data: Vec<u8>,
26 pub api: String,
27}
28
29pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
30pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
31#[allow(unused)]
32pub const INIT_DICT_ID: &str = "null";
33
34const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
35pub struct StatsigHttpSpecsAdapter {
36 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
37 network: NetworkClient,
38 sdk_key: String,
39 specs_url: String,
40 fallback_url: Option<String>,
41 sync_interval_duration: Duration,
42 ops_stats: Arc<OpsStatsForInstance>,
43 shutdown_notify: Arc<Notify>,
44}
45
46impl StatsigHttpSpecsAdapter {
47 #[must_use]
48 pub fn new(
49 sdk_key: &str,
50 options: Option<&StatsigOptions>,
51 override_url: Option<String>,
52 ) -> Self {
53 let default_options = StatsigOptions::default();
54 let options_ref = options.unwrap_or(&default_options);
55
56 let specs_url = match override_url {
57 Some(url) => url,
58 None => options_ref
59 .specs_url
60 .as_ref()
61 .map(|u| u.to_string())
62 .unwrap_or(DEFAULT_SPECS_URL.to_string()),
63 };
64
65 let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
67 && specs_url != DEFAULT_SPECS_URL
68 {
69 Some(DEFAULT_SPECS_URL.to_string())
70 } else {
71 None
72 };
73
74 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
75
76 Self {
77 listener: RwLock::new(None),
78 network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
79 sdk_key: sdk_key.to_string(),
80 specs_url,
81 fallback_url,
82 sync_interval_duration: Duration::from_millis(u64::from(
83 options_ref
84 .specs_sync_interval_ms
85 .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
86 )),
87 ops_stats: OPS_STATS.get_for_instance(sdk_key),
88 shutdown_notify: Arc::new(Notify::new()),
89 }
90 }
91
92 pub fn force_shutdown(&self) {
93 self.shutdown_notify.notify_one();
94 }
95
96 pub async fn fetch_specs_from_network(
97 &self,
98 current_specs_info: SpecsInfo,
99 ) -> Result<NetworkResponse, NetworkError> {
100 let request_args = self.get_request_args(¤t_specs_info);
101 let url = request_args.url.clone();
102 match self.handle_specs_request(request_args).await {
103 Ok(response) => Ok(NetworkResponse {
104 data: response,
105 api: get_api_from_url(&url),
106 }),
107 Err(e) => Err(e),
108 }
109 }
110
111 fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
112 let mut params = HashMap::new();
113 if let Some(lcut) = current_specs_info.lcut {
114 params.insert("sinceTime".to_string(), lcut.to_string());
115 }
116 if let Some(cs) = ¤t_specs_info.checksum {
117 params.insert(
118 "checksum".to_string(),
119 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
120 );
121 }
122
123 RequestArgs {
124 url: construct_specs_url(
125 self.specs_url.as_str(),
126 self.sdk_key.as_str(),
127 current_specs_info.zstd_dict_id.as_deref(),
128 ),
129 query_params: Some(params),
130 accept_gzip_response: true,
131 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
132 ..RequestArgs::new()
133 }
134 }
135
136 async fn handle_fallback_request(
137 &self,
138 mut request_args: RequestArgs,
139 current_specs_info: SpecsInfo,
140 ) -> Result<NetworkResponse, NetworkError> {
141 let fallback_url = match &self.fallback_url {
142 Some(url) => construct_specs_url(
143 url.as_str(),
144 &self.sdk_key,
145 current_specs_info.zstd_dict_id.as_deref(),
146 ),
147 None => {
148 return Err(NetworkError::RequestFailed(
149 request_args.url.clone(),
150 0,
151 "No fallback URL".to_string(),
152 ))
153 }
154 };
155
156 request_args.url = fallback_url.clone();
157
158 let response = self.handle_specs_request(request_args).await?;
161 Ok(NetworkResponse {
162 data: response,
163 api: get_api_from_url(&fallback_url),
164 })
165 }
166
167 async fn handle_specs_request(
169 &self,
170 request_args: RequestArgs,
171 ) -> Result<Vec<u8>, NetworkError> {
172 let url = request_args.url.clone();
173 let response = self.network.get(request_args).await?;
174 match response.data {
175 Some(data) => Ok(data),
176 None => Err(NetworkError::RequestFailed(
177 url,
178 0,
179 "No data in response".to_string(),
180 )),
181 }
182 }
183
184 pub async fn run_background_sync(self: Arc<Self>) {
185 let specs_info = match self.listener.read() {
186 Ok(lock) => match lock.as_ref() {
187 Some(listener) => listener.get_current_specs_info(),
188 None => SpecsInfo::empty(),
189 },
190 Err(_) => SpecsInfo::error(),
191 };
192
193 self.ops_stats
194 .set_diagnostics_context(ContextType::ConfigSync);
195 if let Err(e) = self.manually_sync_specs(specs_info).await {
196 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
197 return;
198 }
199 log_e!(TAG, "Background specs sync failed: {}", e);
200 }
201 self.ops_stats.enqueue_diagnostics_event(
202 Some(KeyType::DownloadConfigSpecs),
203 Some(ContextType::ConfigSync),
204 );
205 }
206
207 async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
208 if let Ok(lock) = self.listener.read() {
209 if lock.is_none() {
210 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
211 }
212 }
213
214 let response = self
215 .fetch_specs_from_network(current_specs_info.clone())
216 .await;
217 let result = self.process_spec_data(response).await;
218
219 if result.is_err() && self.fallback_url.is_some() {
220 log_d!(TAG, "Falling back to statsig api");
221 let response = self
222 .handle_fallback_request(
223 self.get_request_args(¤t_specs_info),
224 current_specs_info,
225 )
226 .await;
227 return self.process_spec_data(response).await;
228 }
229
230 result
231 }
232
233 async fn process_spec_data(
234 &self,
235 response: Result<NetworkResponse, NetworkError>,
236 ) -> Result<(), StatsigErr> {
237 let resp = response.map_err(StatsigErr::NetworkError)?;
238
239 let update = SpecsUpdate {
240 data: resp.data,
241 source: SpecsSource::Network,
242 received_at: Utc::now().timestamp_millis() as u64,
243 source_api: Some(resp.api),
244 };
245
246 self.ops_stats.add_marker(
247 Marker::new(
248 KeyType::DownloadConfigSpecs,
249 ActionType::Start,
250 Some(StepType::Process),
251 ),
252 None,
253 );
254
255 let result = match self.listener.read() {
256 Ok(lock) => match lock.as_ref() {
257 Some(listener) => listener.did_receive_specs_update(update),
258 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
259 },
260 Err(e) => {
261 let err = StatsigErr::LockFailure(format!(
262 "Failed to acquire read lock on listener: {e}"
263 ));
264 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
265 Err(err)
266 }
267 };
268
269 self.ops_stats.add_marker(
270 Marker::new(
271 KeyType::DownloadConfigSpecs,
272 ActionType::End,
273 Some(StepType::Process),
274 )
275 .with_is_success(result.is_ok()),
276 None,
277 );
278
279 result
280 }
281}
282
283#[async_trait]
284impl SpecsAdapter for StatsigHttpSpecsAdapter {
285 async fn start(
286 self: Arc<Self>,
287 _statsig_runtime: &Arc<StatsigRuntime>,
288 ) -> Result<(), StatsigErr> {
289 let specs_info = match self.listener.read() {
290 Ok(lock) => match lock.as_ref() {
291 Some(listener) => listener.get_current_specs_info(),
292 None => SpecsInfo::empty(),
293 },
294 Err(_) => SpecsInfo::error(),
295 };
296 self.manually_sync_specs(specs_info).await
297 }
298
299 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
300 match self.listener.write() {
301 Ok(mut lock) => *lock = Some(listener),
302 Err(e) => {
303 log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
304 }
305 }
306 }
307
308 async fn schedule_background_sync(
309 self: Arc<Self>,
310 statsig_runtime: &Arc<StatsigRuntime>,
311 ) -> Result<(), StatsigErr> {
312 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
313 let interval_duration = self.sync_interval_duration;
314 let shutdown_notify = self.shutdown_notify.clone();
315
316 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
317 loop {
318 tokio::select! {
319 () = sleep(interval_duration) => {
320 if let Some(strong_self) = weak_self.upgrade() {
321 Self::run_background_sync(strong_self).await;
322 } else {
323 log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
324 break;
325 }
326 }
327 () = rt_shutdown_notify.notified() => {
328 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
329 break;
330 },
331 () = shutdown_notify.notified() => {
332 log_d!(TAG, "Shutting down specs background sync");
333 break;
334 }
335 }
336 }
337 });
338
339 Ok(())
340 }
341
342 async fn shutdown(
343 &self,
344 _timeout: Duration,
345 _statsig_runtime: &Arc<StatsigRuntime>,
346 ) -> Result<(), StatsigErr> {
347 self.shutdown_notify.notify_one();
348 Ok(())
349 }
350
351 fn get_type_name(&self) -> String {
352 stringify!(StatsigHttpSpecsAdapter).to_string()
353 }
354}
355
356#[allow(unused)]
357fn construct_specs_url(spec_url: &str, sdk_key: &str, dict_id: Option<&str>) -> String {
358 #[cfg(feature = "with_shared_dict_compression")]
359 {
360 let dict_id = dict_id.unwrap_or(INIT_DICT_ID);
361 format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
362 }
363 #[cfg(not(feature = "with_shared_dict_compression"))]
364 format!("{spec_url}/{sdk_key}.json")
365}