1use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::DEFAULT_INIT_TIMEOUT_MS;
11use crate::{
12 log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
13};
14use async_trait::async_trait;
15use chrono::Utc;
16use parking_lot::RwLock;
17use percent_encoding::percent_encode;
18use std::collections::HashMap;
19use std::sync::{Arc, Weak};
20use std::time::Duration;
21use tokio::sync::Notify;
22use tokio::time::sleep;
23
24use super::SpecsInfo;
25
26pub struct NetworkResponse {
27 pub data: ResponseData,
28 pub api: String,
29}
30
31pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
32pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
33
34#[allow(unused)]
35pub const INIT_DICT_ID: &str = "null";
36
37const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
38pub struct StatsigHttpSpecsAdapter {
39 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
40 network: NetworkClient,
41 sdk_key: String,
42 specs_url: String,
43 fallback_url: Option<String>,
44 init_timeout_ms: u64,
45 sync_interval_duration: Duration,
46 ops_stats: Arc<OpsStatsForInstance>,
47 shutdown_notify: Arc<Notify>,
48}
49
50impl StatsigHttpSpecsAdapter {
51 #[must_use]
52 pub fn new(
53 sdk_key: &str,
54 options: Option<&StatsigOptions>,
55 override_url: Option<String>,
56 ) -> Self {
57 let default_options = StatsigOptions::default();
58 let options_ref = options.unwrap_or(&default_options);
59
60 let init_timeout_ms = options_ref
61 .init_timeout_ms
62 .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
63
64 let specs_url = match override_url {
65 Some(url) => url,
66 None => options_ref
67 .specs_url
68 .as_ref()
69 .map(|u| u.to_string())
70 .unwrap_or(DEFAULT_SPECS_URL.to_string()),
71 };
72
73 let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
75 && specs_url != DEFAULT_SPECS_URL
76 {
77 Some(DEFAULT_SPECS_URL.to_string())
78 } else {
79 None
80 };
81
82 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
83
84 Self {
85 listener: RwLock::new(None),
86 network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
87 sdk_key: sdk_key.to_string(),
88 specs_url,
89 fallback_url,
90 init_timeout_ms,
91 sync_interval_duration: Duration::from_millis(u64::from(
92 options_ref
93 .specs_sync_interval_ms
94 .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
95 )),
96 ops_stats: OPS_STATS.get_for_instance(sdk_key),
97 shutdown_notify: Arc::new(Notify::new()),
98 }
99 }
100
101 pub fn force_shutdown(&self) {
102 self.shutdown_notify.notify_one();
103 }
104
105 pub async fn fetch_specs_from_network(
106 &self,
107 current_specs_info: SpecsInfo,
108 trigger: SpecsSyncTrigger,
109 ) -> Result<NetworkResponse, NetworkError> {
110 let request_args = self.get_request_args(¤t_specs_info, trigger);
111 let url = request_args.url.clone();
112 match self.handle_specs_request(request_args).await {
113 Ok(response) => Ok(NetworkResponse {
114 data: response,
115 api: get_api_from_url(&url),
116 }),
117 Err(e) => Err(e),
118 }
119 }
120
121 fn get_request_args(
122 &self,
123 current_specs_info: &SpecsInfo,
124 trigger: SpecsSyncTrigger,
125 ) -> RequestArgs {
126 let mut params = HashMap::new();
127
128 params.insert("supports_proto".to_string(), "true".to_string());
129 let headers = Some(HashMap::from([
130 ("statsig-supports-proto".to_string(), "true".to_string()),
131 (
132 "accept-encoding".to_string(),
133 "statsig-br, gzip, deflate, br".to_string(),
134 ),
135 ]));
136
137 if let Some(lcut) = current_specs_info.lcut {
138 if lcut > 0 {
139 params.insert("sinceTime".to_string(), lcut.to_string());
140 }
141 }
142
143 let is_init_request = trigger == SpecsSyncTrigger::Initial;
144
145 let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
146 self.init_timeout_ms
147 } else {
148 0
149 };
150
151 if let Some(cs) = ¤t_specs_info.checksum {
152 params.insert(
153 "checksum".to_string(),
154 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
155 );
156 }
157
158 RequestArgs {
159 url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
160 retries: match trigger {
161 SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
162 SpecsSyncTrigger::Background => 3,
163 },
164 query_params: Some(params),
165 accept_gzip_response: true,
166 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
167 timeout_ms,
168 headers,
169 ..RequestArgs::new()
170 }
171 }
172
173 async fn handle_fallback_request(
174 &self,
175 mut request_args: RequestArgs,
176 ) -> Result<NetworkResponse, NetworkError> {
177 let fallback_url = match &self.fallback_url {
178 Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
179 None => {
180 return Err(NetworkError::RequestFailed(
181 request_args.url.clone(),
182 None,
183 "No fallback URL".to_string(),
184 ))
185 }
186 };
187
188 request_args.url = fallback_url.clone();
189
190 let response = self.handle_specs_request(request_args).await?;
193 Ok(NetworkResponse {
194 data: response,
195 api: get_api_from_url(&fallback_url),
196 })
197 }
198
199 async fn handle_specs_request(
200 &self,
201 request_args: RequestArgs,
202 ) -> Result<ResponseData, NetworkError> {
203 let url = request_args.url.clone();
204 let response = self.network.get(request_args).await?;
205 match response.data {
206 Some(data) => Ok(data),
207 None => Err(NetworkError::RequestFailed(
208 url,
209 None,
210 response.error.unwrap_or("No data in response".to_string()),
211 )),
212 }
213 }
214
215 pub async fn run_background_sync(self: Arc<Self>) {
216 let specs_info = match self
217 .listener
218 .try_read_for(std::time::Duration::from_secs(5))
219 {
220 Some(lock) => match lock.as_ref() {
221 Some(listener) => listener.get_current_specs_info(),
222 None => SpecsInfo::empty(),
223 },
224 None => SpecsInfo::error(),
225 };
226
227 self.ops_stats
228 .set_diagnostics_context(ContextType::ConfigSync);
229 if let Err(e) = self
230 .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
231 .await
232 {
233 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
234 return;
235 }
236 log_e!(TAG, "Background specs sync failed: {}", e);
237 }
238 self.ops_stats.enqueue_diagnostics_event(
239 Some(KeyType::DownloadConfigSpecs),
240 Some(ContextType::ConfigSync),
241 );
242 }
243
244 async fn manually_sync_specs(
245 &self,
246 current_specs_info: SpecsInfo,
247 trigger: SpecsSyncTrigger,
248 ) -> Result<(), StatsigErr> {
249 if let Some(lock) = self
250 .listener
251 .try_read_for(std::time::Duration::from_secs(5))
252 {
253 if lock.is_none() {
254 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
255 }
256 }
257
258 let response = self
259 .fetch_specs_from_network(current_specs_info.clone(), trigger)
260 .await;
261 let result = self.process_spec_data(response).await;
262
263 if result.is_err() && self.fallback_url.is_some() {
264 log_d!(TAG, "Falling back to statsig api");
265 let response = self
266 .handle_fallback_request(self.get_request_args(¤t_specs_info, trigger))
267 .await;
268 return self.process_spec_data(response).await;
269 }
270
271 result
272 }
273
274 async fn process_spec_data(
275 &self,
276 response: Result<NetworkResponse, NetworkError>,
277 ) -> Result<(), StatsigErr> {
278 let resp = response.map_err(StatsigErr::NetworkError)?;
279
280 let update = SpecsUpdate {
281 data: resp.data,
282 source: SpecsSource::Network,
283 received_at: Utc::now().timestamp_millis() as u64,
284 source_api: Some(resp.api),
285 };
286
287 self.ops_stats.add_marker(
288 Marker::new(
289 KeyType::DownloadConfigSpecs,
290 ActionType::Start,
291 Some(StepType::Process),
292 ),
293 None,
294 );
295
296 let result = match self
297 .listener
298 .try_read_for(std::time::Duration::from_secs(5))
299 {
300 Some(lock) => match lock.as_ref() {
301 Some(listener) => listener.did_receive_specs_update(update),
302 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
303 },
304 None => {
305 let err =
306 StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
307 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
308 Err(err)
309 }
310 };
311
312 self.ops_stats.add_marker(
313 Marker::new(
314 KeyType::DownloadConfigSpecs,
315 ActionType::End,
316 Some(StepType::Process),
317 )
318 .with_is_success(result.is_ok()),
319 None,
320 );
321
322 result
323 }
324}
325
326#[async_trait]
327impl SpecsAdapter for StatsigHttpSpecsAdapter {
328 async fn start(
329 self: Arc<Self>,
330 _statsig_runtime: &Arc<StatsigRuntime>,
331 ) -> Result<(), StatsigErr> {
332 let specs_info = match self
333 .listener
334 .try_read_for(std::time::Duration::from_secs(5))
335 {
336 Some(lock) => match lock.as_ref() {
337 Some(listener) => listener.get_current_specs_info(),
338 None => SpecsInfo::empty(),
339 },
340 None => SpecsInfo::error(),
341 };
342 self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
343 .await
344 }
345
346 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
347 match self
348 .listener
349 .try_write_for(std::time::Duration::from_secs(5))
350 {
351 Some(mut lock) => *lock = Some(listener),
352 None => {
353 log_e!(TAG, "Failed to acquire write lock on listener");
354 }
355 }
356 }
357
358 async fn schedule_background_sync(
359 self: Arc<Self>,
360 statsig_runtime: &Arc<StatsigRuntime>,
361 ) -> Result<(), StatsigErr> {
362 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
363 let interval_duration = self.sync_interval_duration;
364 let shutdown_notify = self.shutdown_notify.clone();
365
366 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
367 loop {
368 tokio::select! {
369 () = sleep(interval_duration) => {
370 if let Some(strong_self) = weak_self.upgrade() {
371 Self::run_background_sync(strong_self).await;
372 } else {
373 log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
374 break;
375 }
376 }
377 () = rt_shutdown_notify.notified() => {
378 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
379 break;
380 },
381 () = shutdown_notify.notified() => {
382 log_d!(TAG, "Shutting down specs background sync");
383 break;
384 }
385 }
386 }
387 })?;
388
389 Ok(())
390 }
391
392 async fn shutdown(
393 &self,
394 _timeout: Duration,
395 _statsig_runtime: &Arc<StatsigRuntime>,
396 ) -> Result<(), StatsigErr> {
397 self.shutdown_notify.notify_one();
398 Ok(())
399 }
400
401 fn get_type_name(&self) -> String {
402 stringify!(StatsigHttpSpecsAdapter).to_string()
403 }
404}
405
406#[allow(unused)]
407fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
408 format!("{spec_url}/{sdk_key}.json")
409}
410
411#[derive(Debug, Clone, Copy, PartialEq, Eq)]
412pub enum SpecsSyncTrigger {
413 Initial,
414 Background,
415 Manual,
416}