statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::{log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigRuntime};
10use async_trait::async_trait;
11use chrono::Utc;
12use percent_encoding::percent_encode;
13use std::collections::HashMap;
14use std::sync::{Arc, RwLock, Weak};
15use std::time::Duration;
16use tokio::sync::Notify;
17use tokio::time::sleep;
18
19use super::SpecsInfo;
20
21pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
22pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
23
24const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
25pub struct StatsigHttpSpecsAdapter {
26 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
27 network: NetworkClient,
28 sdk_key: String,
29 specs_url: String,
30 fallback_url: Option<String>,
31 sync_interval_duration: Duration,
32 ops_stats: Arc<OpsStatsForInstance>,
33 shutdown_notify: Arc<Notify>,
34}
35
36impl StatsigHttpSpecsAdapter {
37 #[must_use]
38 pub fn new(
39 sdk_key: &str,
40 specs_url: Option<&String>,
41 fallback_to_statsig_api: bool,
42 sync_interval: Option<u32>,
43 disable_network: Option<bool>,
44 ) -> Self {
45 let specs_url = specs_url
46 .map(|u| u.to_string())
47 .unwrap_or(DEFAULT_SPECS_URL.to_string());
48 let fallback_url = if fallback_to_statsig_api && specs_url != DEFAULT_SPECS_URL {
50 Some(DEFAULT_SPECS_URL.to_string())
51 } else {
52 None
53 };
54
55 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
56
57 Self {
58 listener: RwLock::new(None),
59 network: NetworkClient::new(sdk_key, Some(headers), disable_network),
60 sdk_key: sdk_key.to_string(),
61 specs_url,
62 fallback_url,
63 sync_interval_duration: Duration::from_millis(u64::from(
64 sync_interval.unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
65 )),
66 ops_stats: OPS_STATS.get_for_instance(sdk_key),
67 shutdown_notify: Arc::new(Notify::new()),
68 }
69 }
70
71 pub async fn fetch_specs_from_network(
72 &self,
73 current_specs_info: SpecsInfo,
74 ) -> Result<Vec<u8>, NetworkError> {
75 let request_args = self.get_request_args(¤t_specs_info);
76 match self.handle_specs_request(request_args).await {
77 Ok(response) => Ok(response),
78 Err(e) => Err(e),
79 }
80 }
81
82 fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
83 let mut params = HashMap::new();
84 if let Some(lcut) = current_specs_info.lcut {
85 params.insert("sinceTime".to_string(), lcut.to_string());
86 }
87 if let Some(cs) = ¤t_specs_info.checksum {
88 params.insert(
89 "checksum".to_string(),
90 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
91 );
92 }
93
94 RequestArgs {
95 url: construct_specs_url(
96 self.specs_url.as_str(),
97 self.sdk_key.as_str(),
98 current_specs_info.zstd_dict_id.as_deref(),
99 ),
100 query_params: Some(params),
101 accept_gzip_response: true,
102 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
103 ..RequestArgs::new()
104 }
105 }
106
107 async fn handle_fallback_request(
108 &self,
109 mut request_args: RequestArgs,
110 current_specs_info: SpecsInfo,
111 ) -> Result<Vec<u8>, NetworkError> {
112 let fallback_url = match &self.fallback_url {
113 Some(url) => construct_specs_url(
114 url.as_str(),
115 &self.sdk_key,
116 current_specs_info.zstd_dict_id.as_deref(),
117 ),
118 None => return Err(NetworkError::RequestFailed),
119 };
120
121 request_args.url = fallback_url;
122
123 let response = self.handle_specs_request(request_args).await?;
126 Ok(response)
127 }
128
129 async fn handle_specs_request(
131 &self,
132 request_args: RequestArgs,
133 ) -> Result<Vec<u8>, NetworkError> {
134 let response = self.network.get(request_args).await?;
135 match response.data {
136 Some(data) => Ok(data),
137 None => Err(NetworkError::RequestFailed),
138 }
139 }
140
141 pub async fn run_background_sync(weak_self: &Weak<Self>) {
142 let strong_self = if let Some(s) = weak_self.upgrade() {
143 s
144 } else {
145 log_e!(TAG, "No strong reference found");
146 return;
147 };
148
149 let specs_info = match strong_self.listener.read() {
150 Ok(lock) => match lock.as_ref() {
151 Some(listener) => listener.get_current_specs_info(),
152 None => SpecsInfo::empty(),
153 },
154 Err(_) => SpecsInfo::error(),
155 };
156
157 strong_self
158 .ops_stats
159 .set_diagnostics_context(ContextType::ConfigSync);
160 if let Err(e) = strong_self.manually_sync_specs(specs_info).await {
161 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _) = e {
162 return;
163 }
164 log_e!(TAG, "Background specs sync failed: {}", e);
165 }
166 strong_self.ops_stats.enqueue_diagnostics_event(
167 Some(KeyType::DownloadConfigSpecs),
168 Some(ContextType::ConfigSync),
169 );
170 }
171
172 async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
173 if let Ok(lock) = self.listener.read() {
174 if lock.is_none() {
175 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
176 }
177 }
178
179 let response = self
180 .fetch_specs_from_network(current_specs_info.clone())
181 .await;
182 let result = self.process_spec_data(response).await;
183
184 if result.is_err() && self.fallback_url.is_some() {
185 log_d!(TAG, "Falling back to statsig api");
186 let request_args = self.get_request_args(¤t_specs_info);
187 let response = self
188 .handle_fallback_request(request_args, current_specs_info)
189 .await;
190 return self.process_spec_data(response).await;
191 }
192
193 result
194 }
195
196 async fn process_spec_data(
197 &self,
198 response: Result<Vec<u8>, NetworkError>,
199 ) -> Result<(), StatsigErr> {
200 let data = response.map_err(|e| {
201 let msg = "No specs result from network";
202 StatsigErr::NetworkError(e, Some(msg.to_string()))
203 })?;
204
205 let update = SpecsUpdate {
206 data,
207 source: SpecsSource::Network,
208 received_at: Utc::now().timestamp_millis() as u64,
209 };
210
211 self.ops_stats.add_marker(
212 Marker::new(
213 KeyType::DownloadConfigSpecs,
214 ActionType::Start,
215 Some(StepType::Process),
216 ),
217 None,
218 );
219
220 let result = match self.listener.read() {
221 Ok(lock) => match lock.as_ref() {
222 Some(listener) => listener.did_receive_specs_update(update),
223 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
224 },
225 Err(e) => {
226 let err = StatsigErr::LockFailure(format!(
227 "Failed to acquire read lock on listener: {}",
228 e
229 ));
230 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
231 Err(err)
232 }
233 };
234
235 self.ops_stats.add_marker(
236 Marker::new(
237 KeyType::DownloadConfigSpecs,
238 ActionType::End,
239 Some(StepType::Process),
240 )
241 .with_is_success(result.is_ok()),
242 None,
243 );
244
245 result
246 }
247}
248
249#[async_trait]
250impl SpecsAdapter for StatsigHttpSpecsAdapter {
251 async fn start(
252 self: Arc<Self>,
253 _statsig_runtime: &Arc<StatsigRuntime>,
254 ) -> Result<(), StatsigErr> {
255 let specs_info = match self.listener.read() {
256 Ok(lock) => match lock.as_ref() {
257 Some(listener) => listener.get_current_specs_info(),
258 None => SpecsInfo::empty(),
259 },
260 Err(_) => SpecsInfo::error(),
261 };
262 self.manually_sync_specs(specs_info).await
263 }
264
265 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
266 match self.listener.write() {
267 Ok(mut lock) => *lock = Some(listener),
268 Err(e) => {
269 log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
270 }
271 }
272 }
273
274 async fn schedule_background_sync(
275 self: Arc<Self>,
276 statsig_runtime: &Arc<StatsigRuntime>,
277 ) -> Result<(), StatsigErr> {
278 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
279 let interval_duration = self.sync_interval_duration;
280 let shutdown_notify = self.shutdown_notify.clone();
281
282 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
283 loop {
284 tokio::select! {
285 () = sleep(interval_duration) => {
286 Self::run_background_sync(&weak_self).await;
287 }
288 () = rt_shutdown_notify.notified() => {
289 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
290 break;
291 },
292 () = shutdown_notify.notified() => {
293 log_d!(TAG, "Shutting down specs background sync");
294 break;
295 }
296 }
297 }
298 });
299
300 Ok(())
301 }
302
303 async fn shutdown(
304 &self,
305 _timeout: Duration,
306 _statsig_runtime: &Arc<StatsigRuntime>,
307 ) -> Result<(), StatsigErr> {
308 self.shutdown_notify.notify_one();
309 Ok(())
310 }
311
312 fn get_type_name(&self) -> String {
313 stringify!(StatsigHttpSpecsAdapter).to_string()
314 }
315}
316
317#[allow(unused)]
318fn construct_specs_url(spec_url: &str, sdk_key: &str, dict_id: Option<&str>) -> String {
319 #[cfg(feature = "with_shared_dict_compression")]
320 {
321 let dict_id = dict_id.unwrap_or("null");
322 format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
323 }
324 #[cfg(not(feature = "with_shared_dict_compression"))]
325 format!("{spec_url}/{sdk_key}.json")
326}