1use crate::networking::{NetworkClient, NetworkError, RequestArgs, ResponseData};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::utils::get_api_from_url;
10use crate::DEFAULT_INIT_TIMEOUT_MS;
11use crate::{
12 log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
13};
14use async_trait::async_trait;
15use chrono::Utc;
16use parking_lot::RwLock;
17use percent_encoding::percent_encode;
18use std::collections::HashMap;
19use std::sync::{Arc, Weak};
20use std::time::Duration;
21use tokio::sync::Notify;
22use tokio::time::sleep;
23
24use super::SpecsInfo;
25
26pub struct NetworkResponse {
27 pub data: ResponseData,
28 pub api: String,
29}
30
31pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
32pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
33
34#[allow(unused)]
35pub const INIT_DICT_ID: &str = "null";
36
37const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
38pub struct StatsigHttpSpecsAdapter {
39 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
40 network: NetworkClient,
41 sdk_key: String,
42 specs_url: String,
43 fallback_url: Option<String>,
44 init_timeout_ms: u64,
45 sync_interval_duration: Duration,
46 ops_stats: Arc<OpsStatsForInstance>,
47 shutdown_notify: Arc<Notify>,
48 enable_proto_spec_support: bool,
49}
50
51impl StatsigHttpSpecsAdapter {
52 #[must_use]
53 pub fn new(
54 sdk_key: &str,
55 options: Option<&StatsigOptions>,
56 override_url: Option<String>,
57 ) -> Self {
58 let default_options = StatsigOptions::default();
59 let options_ref = options.unwrap_or(&default_options);
60
61 let init_timeout_ms = options_ref
62 .init_timeout_ms
63 .unwrap_or(DEFAULT_INIT_TIMEOUT_MS);
64
65 let specs_url = match override_url {
66 Some(url) => url,
67 None => options_ref
68 .specs_url
69 .as_ref()
70 .map(|u| u.to_string())
71 .unwrap_or(DEFAULT_SPECS_URL.to_string()),
72 };
73
74 let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
76 && specs_url != DEFAULT_SPECS_URL
77 {
78 Some(DEFAULT_SPECS_URL.to_string())
79 } else {
80 None
81 };
82
83 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
84
85 let enable_proto_spec_support = options_ref
86 .experimental_flags
87 .as_ref()
88 .is_some_and(|flags| flags.contains("enable_proto_spec_support"));
89
90 Self {
91 listener: RwLock::new(None),
92 network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
93 sdk_key: sdk_key.to_string(),
94 specs_url,
95 fallback_url,
96 init_timeout_ms,
97 sync_interval_duration: Duration::from_millis(u64::from(
98 options_ref
99 .specs_sync_interval_ms
100 .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
101 )),
102 ops_stats: OPS_STATS.get_for_instance(sdk_key),
103 shutdown_notify: Arc::new(Notify::new()),
104 enable_proto_spec_support,
105 }
106 }
107
108 pub fn force_shutdown(&self) {
109 self.shutdown_notify.notify_one();
110 }
111
112 pub async fn fetch_specs_from_network(
113 &self,
114 current_specs_info: SpecsInfo,
115 trigger: SpecsSyncTrigger,
116 ) -> Result<NetworkResponse, NetworkError> {
117 let request_args = self.get_request_args(¤t_specs_info, trigger);
118 let url = request_args.url.clone();
119 match self.handle_specs_request(request_args).await {
120 Ok(response) => Ok(NetworkResponse {
121 data: response,
122 api: get_api_from_url(&url),
123 }),
124 Err(e) => Err(e),
125 }
126 }
127
128 fn get_request_args(
129 &self,
130 current_specs_info: &SpecsInfo,
131 trigger: SpecsSyncTrigger,
132 ) -> RequestArgs {
133 let mut params = HashMap::new();
134 let mut headers = None;
135
136 if self.enable_proto_spec_support {
137 params.insert("supports_proto".to_string(), "true".to_string());
138 headers = Some(HashMap::from([
139 ("statsig-supports-proto".to_string(), "true".to_string()),
140 (
141 "accept-encoding".to_string(),
142 "statsig-br, gzip, deflate, br".to_string(),
143 ),
144 ]));
145 }
146
147 if let Some(lcut) = current_specs_info.lcut {
148 if lcut > 0 {
149 params.insert("sinceTime".to_string(), lcut.to_string());
150 }
151 }
152
153 let is_init_request = trigger == SpecsSyncTrigger::Initial;
154
155 let timeout_ms = if is_init_request && self.init_timeout_ms > 0 {
156 self.init_timeout_ms
157 } else {
158 0
159 };
160
161 if let Some(cs) = ¤t_specs_info.checksum {
162 params.insert(
163 "checksum".to_string(),
164 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
165 );
166 }
167
168 RequestArgs {
169 url: construct_specs_url(self.specs_url.as_str(), self.sdk_key.as_str()),
170 retries: match trigger {
171 SpecsSyncTrigger::Initial | SpecsSyncTrigger::Manual => 0,
172 SpecsSyncTrigger::Background => 3,
173 },
174 query_params: Some(params),
175 accept_gzip_response: true,
176 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
177 timeout_ms,
178 headers,
179 ..RequestArgs::new()
180 }
181 }
182
183 async fn handle_fallback_request(
184 &self,
185 mut request_args: RequestArgs,
186 ) -> Result<NetworkResponse, NetworkError> {
187 let fallback_url = match &self.fallback_url {
188 Some(url) => construct_specs_url(url.as_str(), &self.sdk_key),
189 None => {
190 return Err(NetworkError::RequestFailed(
191 request_args.url.clone(),
192 None,
193 "No fallback URL".to_string(),
194 ))
195 }
196 };
197
198 request_args.url = fallback_url.clone();
199
200 let response = self.handle_specs_request(request_args).await?;
203 Ok(NetworkResponse {
204 data: response,
205 api: get_api_from_url(&fallback_url),
206 })
207 }
208
209 async fn handle_specs_request(
210 &self,
211 request_args: RequestArgs,
212 ) -> Result<ResponseData, NetworkError> {
213 let url = request_args.url.clone();
214 let response = self.network.get(request_args).await?;
215 match response.data {
216 Some(data) => Ok(data),
217 None => Err(NetworkError::RequestFailed(
218 url,
219 None,
220 response.error.unwrap_or("No data in response".to_string()),
221 )),
222 }
223 }
224
225 pub async fn run_background_sync(self: Arc<Self>) {
226 let specs_info = match self
227 .listener
228 .try_read_for(std::time::Duration::from_secs(5))
229 {
230 Some(lock) => match lock.as_ref() {
231 Some(listener) => listener.get_current_specs_info(),
232 None => SpecsInfo::empty(),
233 },
234 None => SpecsInfo::error(),
235 };
236
237 self.ops_stats
238 .set_diagnostics_context(ContextType::ConfigSync);
239 if let Err(e) = self
240 .manually_sync_specs(specs_info, SpecsSyncTrigger::Background)
241 .await
242 {
243 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn(_)) = e {
244 return;
245 }
246 log_e!(TAG, "Background specs sync failed: {}", e);
247 }
248 self.ops_stats.enqueue_diagnostics_event(
249 Some(KeyType::DownloadConfigSpecs),
250 Some(ContextType::ConfigSync),
251 );
252 }
253
254 async fn manually_sync_specs(
255 &self,
256 current_specs_info: SpecsInfo,
257 trigger: SpecsSyncTrigger,
258 ) -> Result<(), StatsigErr> {
259 if let Some(lock) = self
260 .listener
261 .try_read_for(std::time::Duration::from_secs(5))
262 {
263 if lock.is_none() {
264 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
265 }
266 }
267
268 let response = self
269 .fetch_specs_from_network(current_specs_info.clone(), trigger)
270 .await;
271 let result = self.process_spec_data(response).await;
272
273 if result.is_err() && self.fallback_url.is_some() {
274 log_d!(TAG, "Falling back to statsig api");
275 let response = self
276 .handle_fallback_request(self.get_request_args(¤t_specs_info, trigger))
277 .await;
278 return self.process_spec_data(response).await;
279 }
280
281 result
282 }
283
284 async fn process_spec_data(
285 &self,
286 response: Result<NetworkResponse, NetworkError>,
287 ) -> Result<(), StatsigErr> {
288 let resp = response.map_err(StatsigErr::NetworkError)?;
289
290 let update = SpecsUpdate {
291 data: resp.data,
292 source: SpecsSource::Network,
293 received_at: Utc::now().timestamp_millis() as u64,
294 source_api: Some(resp.api),
295 };
296
297 self.ops_stats.add_marker(
298 Marker::new(
299 KeyType::DownloadConfigSpecs,
300 ActionType::Start,
301 Some(StepType::Process),
302 ),
303 None,
304 );
305
306 let result = match self
307 .listener
308 .try_read_for(std::time::Duration::from_secs(5))
309 {
310 Some(lock) => match lock.as_ref() {
311 Some(listener) => listener.did_receive_specs_update(update),
312 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
313 },
314 None => {
315 let err =
316 StatsigErr::LockFailure("Failed to acquire read lock on listener".to_string());
317 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
318 Err(err)
319 }
320 };
321
322 self.ops_stats.add_marker(
323 Marker::new(
324 KeyType::DownloadConfigSpecs,
325 ActionType::End,
326 Some(StepType::Process),
327 )
328 .with_is_success(result.is_ok()),
329 None,
330 );
331
332 result
333 }
334}
335
336#[async_trait]
337impl SpecsAdapter for StatsigHttpSpecsAdapter {
338 async fn start(
339 self: Arc<Self>,
340 _statsig_runtime: &Arc<StatsigRuntime>,
341 ) -> Result<(), StatsigErr> {
342 let specs_info = match self
343 .listener
344 .try_read_for(std::time::Duration::from_secs(5))
345 {
346 Some(lock) => match lock.as_ref() {
347 Some(listener) => listener.get_current_specs_info(),
348 None => SpecsInfo::empty(),
349 },
350 None => SpecsInfo::error(),
351 };
352 self.manually_sync_specs(specs_info, SpecsSyncTrigger::Initial)
353 .await
354 }
355
356 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
357 match self
358 .listener
359 .try_write_for(std::time::Duration::from_secs(5))
360 {
361 Some(mut lock) => *lock = Some(listener),
362 None => {
363 log_e!(TAG, "Failed to acquire write lock on listener");
364 }
365 }
366 }
367
368 async fn schedule_background_sync(
369 self: Arc<Self>,
370 statsig_runtime: &Arc<StatsigRuntime>,
371 ) -> Result<(), StatsigErr> {
372 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
373 let interval_duration = self.sync_interval_duration;
374 let shutdown_notify = self.shutdown_notify.clone();
375
376 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
377 loop {
378 tokio::select! {
379 () = sleep(interval_duration) => {
380 if let Some(strong_self) = weak_self.upgrade() {
381 Self::run_background_sync(strong_self).await;
382 } else {
383 log_e!(TAG, "Strong reference to StatsigHttpSpecsAdapter lost. Stopping background sync");
384 break;
385 }
386 }
387 () = rt_shutdown_notify.notified() => {
388 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
389 break;
390 },
391 () = shutdown_notify.notified() => {
392 log_d!(TAG, "Shutting down specs background sync");
393 break;
394 }
395 }
396 }
397 })?;
398
399 Ok(())
400 }
401
402 async fn shutdown(
403 &self,
404 _timeout: Duration,
405 _statsig_runtime: &Arc<StatsigRuntime>,
406 ) -> Result<(), StatsigErr> {
407 self.shutdown_notify.notify_one();
408 Ok(())
409 }
410
411 fn get_type_name(&self) -> String {
412 stringify!(StatsigHttpSpecsAdapter).to_string()
413 }
414}
415
416#[allow(unused)]
417fn construct_specs_url(spec_url: &str, sdk_key: &str) -> String {
418 format!("{spec_url}/{sdk_key}.json")
419}
420
421#[derive(Debug, Clone, Copy, PartialEq, Eq)]
422pub enum SpecsSyncTrigger {
423 Initial,
424 Background,
425 Manual,
426}