statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::{
11 log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
12};
13use async_trait::async_trait;
14use chrono::Utc;
15use parking_lot::RwLock;
16use percent_encoding::percent_encode;
17use std::collections::HashMap;
18use std::sync::{Arc, Weak};
19use std::time::Duration;
20use tokio::sync::Notify;
21use tokio::time::sleep;
22
23use super::SpecsInfo;
24
25pub struct NetworkResponse {
26 pub data: Vec<u8>,
27 pub api: String,
28}
29
30pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
31pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
32#[allow(unused)]
33pub const INIT_DICT_ID: &str = "null";
34
35const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
36pub struct StatsigHttpSpecsAdapter {
37 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
38 network: NetworkClient,
39 sdk_key: String,
40 specs_url: String,
41 fallback_url: Option<String>,
42 sync_interval_duration: Duration,
43 ops_stats: Arc<OpsStatsForInstance>,
44 shutdown_notify: Arc<Notify>,
45}
46
47impl StatsigHttpSpecsAdapter {
48 #[must_use]
49 pub fn new(
50 sdk_key: &str,
51 options: Option<&StatsigOptions>,
52 override_url: Option<String>,
53 ) -> Self {
54 let default_options = StatsigOptions::default();
55 let options_ref = options.unwrap_or(&default_options);
56
57 let specs_url = match override_url {
58 Some(url) => url,
59 None => options_ref
60 .specs_url
61 .as_ref()
62 .map(|u| u.to_string())
63 .unwrap_or(DEFAULT_SPECS_URL.to_string()),
64 };
65
66 let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
68 && specs_url != DEFAULT_SPECS_URL
69 {
70 Some(DEFAULT_SPECS_URL.to_string())
71 } else {
72 None
73 };
74
75 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
76
77 Self {
78 listener: RwLock::new(None),
79 network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
80 sdk_key: sdk_key.to_string(),
81 specs_url,
82 fallback_url,
83 sync_interval_duration: Duration::from_millis(u64::from(
84 options_ref
85 .specs_sync_interval_ms
86 .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
87 )),
88 ops_stats: OPS_STATS.get_for_instance(sdk_key),
89 shutdown_notify: Arc::new(Notify::new()),
90 }
91 }
92
93 pub fn force_shutdown(&self) {
94 self.shutdown_notify.notify_one();
95 }
96
97 pub async fn fetch_specs_from_network(
98 &self,
99 current_specs_info: SpecsInfo,
100 ) -> Result<NetworkResponse, NetworkError> {
101 let request_args = self.get_request_args(¤t_specs_info);
102 let url = request_args.url.clone();
103 match self.handle_specs_request(request_args).await {
104 Ok(response) => Ok(NetworkResponse {
105 data: response,
106 api: get_api_from_url(&url),
107 }),
108 Err(e) => Err(e),
109 }
110 }
111
112 fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
113 let mut params = HashMap::new();
114 if let Some(lcut) = current_specs_info.lcut {
115 params.insert("sinceTime".to_string(), lcut.to_string());
116 }
117 if let Some(cs) = ¤t_specs_info.checksum {
118 params.insert(
119 "checksum".to_string(),
120 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
121 );
122 }
123
124 RequestArgs {
125 url: construct_specs_url(
126 self.specs_url.as_str(),
127 self.sdk_key.as_str(),
128 current_specs_info.zstd_dict_id.as_deref(),
129 ),
130 query_params: Some(params),
131 accept_gzip_response: true,
132 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
133 ..RequestArgs::new()
134 }
135 }
136
137 async fn handle_fallback_request(
138 &self,
139 mut request_args: RequestArgs,
140 current_specs_info: SpecsInfo,
141 ) -> Result<NetworkResponse, NetworkError> {
142 let fallback_url = match &self.fallback_url {
143 Some(url) => construct_specs_url(
144 url.as_str(),
145 &self.sdk_key,
146 current_specs_info.zstd_dict_id.as_deref(),
147 ),
148 None => {
149 return Err(NetworkError::RequestFailed(
150 request_args.url.clone(),
151 0,
152 "No fallback URL".to_string(),
153 ))
154 }
155 };
156
157 request_args.url = fallback_url.clone();
158
159 let response = self.handle_specs_request(request_args).await?;
162 Ok(NetworkResponse {
163 data: response,
164 api: get_api_from_url(&fallback_url),
165 })
166 }
167
168 async fn handle_specs_request(
170 &self,
171 request_args: RequestArgs,
172 ) -> Result<Vec<u8>, NetworkError> {
173 let url = request_args.url.clone();
174 let response = self.network.get(request_args).await?;
175 match response.data {
176 Some(data) => Ok(data),
177 None => Err(NetworkError::RequestFailed(
178 url,
179 0,
180 "No data in response".to_string(),
181 )),
182 }
183 }
184
185 pub async fn run_background_sync(self: Arc<Self>) {
186 let specs_info = match self
187 .listener
188 .try_read_for(std::time::Duration::from_secs(5))
189 {
190 Some(lock) => match lock.as_ref() {
191 Some(listener) => listener.get_current_specs_info(),
192 None => SpecsInfo::empty(),
193 },
194 None => SpecsInfo::error(),
195 };
196
197 self.ops_stats
198 .set_diagnostics_context(ContextType::ConfigSync);
199 if let Err(e) = self.manually_sync_specs(specs_info).await {
200 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
201 return;
202 }
203 log_e!(TAG, "Background specs sync failed: {}", e);
204 }
205 self.ops_stats.enqueue_diagnostics_event(
206 Some(KeyType::DownloadConfigSpecs),
207 Some(ContextType::ConfigSync),
208 );
209 }
210
211 async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
212 if let Some(lock) = self
213 .listener
214 .try_read_for(std::time::Duration::from_secs(5))
215 {
216 if lock.is_none() {
217 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
218 }
219 }
220
221 let response = self
222 .fetch_specs_from_network(current_specs_info.clone())
223 .await;
224 let result = self.process_spec_data(response).await;
225
226 if result.is_err() && self.fallback_url.is_some() {
227 log_d!(TAG, "Falling back to statsig api");
228 let response = self
229 .handle_fallback_request(
230 self.get_request_args(¤t_specs_info),
231 current_specs_info,
232 )
233 .await;
234 return self.process_spec_data(response).await;
235 }
236
237 result
238 }
239
240 async fn process_spec_data(
241 &self,
242 response: Result<NetworkResponse, NetworkError>,
243 ) -> Result<(), StatsigErr> {
244 let resp = response.map_err(StatsigErr::NetworkError)?;
245
246 let update = SpecsUpdate {
247 data: resp.data,
248 source: SpecsSource::Network,
249 received_at: Utc::now().timestamp_millis() as u64,
250 source_api: Some(resp.api),
251 };
252
253 self.ops_stats.add_marker(
254 Marker::new(
255 KeyType::DownloadConfigSpecs,
256 ActionType::Start,
257 Some(StepType::Process),
258 ),
259 None,
260 );
261
262 let result = match self
263 .listener
264 .try_read_for(std::time::Duration::from_secs(5))
265 {
266 Some(lock) => match lock.as_ref() {
267 Some(listener) => listener.did_receive_specs_update(update),
268 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
269 },
270 None => {
271 let err =
272 StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
273 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
274 Err(err)
275 }
276 };
277
278 self.ops_stats.add_marker(
279 Marker::new(
280 KeyType::DownloadConfigSpecs,
281 ActionType::End,
282 Some(StepType::Process),
283 )
284 .with_is_success(result.is_ok()),
285 None,
286 );
287
288 result
289 }
290}
291
292#[async_trait]
293impl SpecsAdapter for StatsigHttpSpecsAdapter {
294 async fn start(
295 self: Arc<Self>,
296 _statsig_runtime: &Arc<StatsigRuntime>,
297 ) -> Result<(), StatsigErr> {
298 let specs_info = match self
299 .listener
300 .try_read_for(std::time::Duration::from_secs(5))
301 {
302 Some(lock) => match lock.as_ref() {
303 Some(listener) => listener.get_current_specs_info(),
304 None => SpecsInfo::empty(),
305 },
306 None => SpecsInfo::error(),
307 };
308 self.manually_sync_specs(specs_info).await
309 }
310
311 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
312 match self
313 .listener
314 .try_write_for(std::time::Duration::from_secs(5))
315 {
316 Some(mut lock) => *lock = Some(listener),
317 None => {
318 log_e!(TAG, "Failed to acquire write lock on listener");
319 }
320 }
321 }
322
323 async fn schedule_background_sync(
324 self: Arc<Self>,
325 statsig_runtime: &Arc<StatsigRuntime>,
326 ) -> Result<(), StatsigErr> {
327 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
328 let interval_duration = self.sync_interval_duration;
329 let shutdown_notify = self.shutdown_notify.clone();
330
331 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
332 loop {
333 tokio::select! {
334 () = sleep(interval_duration) => {
335 if let Some(strong_self) = weak_self.upgrade() {
336 Self::run_background_sync(strong_self).await;
337 } else {
338 log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
339 break;
340 }
341 }
342 () = rt_shutdown_notify.notified() => {
343 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
344 break;
345 },
346 () = shutdown_notify.notified() => {
347 log_d!(TAG, "Shutting down specs background sync");
348 break;
349 }
350 }
351 }
352 })?;
353
354 Ok(())
355 }
356
357 async fn shutdown(
358 &self,
359 _timeout: Duration,
360 _statsig_runtime: &Arc<StatsigRuntime>,
361 ) -> Result<(), StatsigErr> {
362 self.shutdown_notify.notify_one();
363 Ok(())
364 }
365
366 fn get_type_name(&self) -> String {
367 stringify!(StatsigHttpSpecsAdapter).to_string()
368 }
369}
370
371#[allow(unused)]
372fn construct_specs_url(spec_url: &str, sdk_key: &str, dict_id: Option<&str>) -> String {
373 #[cfg(feature = "with_shared_dict_compression")]
374 {
375 let dict_id = dict_id.unwrap_or(INIT_DICT_ID);
376 format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
377 }
378 #[cfg(not(feature = "with_shared_dict_compression"))]
379 format!("{spec_url}/{sdk_key}.json")
380}