statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::{
10 log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
11};
12use async_trait::async_trait;
13use chrono::Utc;
14use percent_encoding::percent_encode;
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock, Weak};
17use std::time::Duration;
18use tokio::sync::Notify;
19use tokio::time::sleep;
20
21use super::SpecsInfo;
22
23pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
24pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
25
26const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
27pub struct StatsigHttpSpecsAdapter {
28 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
29 network: NetworkClient,
30 sdk_key: String,
31 specs_url: String,
32 fallback_url: Option<String>,
33 sync_interval_duration: Duration,
34 ops_stats: Arc<OpsStatsForInstance>,
35 shutdown_notify: Arc<Notify>,
36}
37
38impl StatsigHttpSpecsAdapter {
39 #[must_use]
40 pub fn new(sdk_key: &str, options: Option<&StatsigOptions>) -> Self {
41 let default_options = StatsigOptions::default();
42 let options_ref = options.unwrap_or(&default_options);
43
44 let specs_url = options_ref
45 .specs_url
46 .as_ref()
47 .map(|u| u.to_string())
48 .unwrap_or(DEFAULT_SPECS_URL.to_string());
49
50 let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
52 && specs_url != DEFAULT_SPECS_URL
53 {
54 Some(DEFAULT_SPECS_URL.to_string())
55 } else {
56 None
57 };
58
59 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
60
61 Self {
62 listener: RwLock::new(None),
63 network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
64 sdk_key: sdk_key.to_string(),
65 specs_url,
66 fallback_url,
67 sync_interval_duration: Duration::from_millis(u64::from(
68 options_ref
69 .specs_sync_interval_ms
70 .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
71 )),
72 ops_stats: OPS_STATS.get_for_instance(sdk_key),
73 shutdown_notify: Arc::new(Notify::new()),
74 }
75 }
76
77 pub async fn fetch_specs_from_network(
78 &self,
79 current_specs_info: SpecsInfo,
80 ) -> Result<Vec<u8>, NetworkError> {
81 let request_args = self.get_request_args(¤t_specs_info);
82 match self.handle_specs_request(request_args).await {
83 Ok(response) => Ok(response),
84 Err(e) => Err(e),
85 }
86 }
87
88 fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
89 let mut params = HashMap::new();
90 if let Some(lcut) = current_specs_info.lcut {
91 params.insert("sinceTime".to_string(), lcut.to_string());
92 }
93 if let Some(cs) = ¤t_specs_info.checksum {
94 params.insert(
95 "checksum".to_string(),
96 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
97 );
98 }
99
100 RequestArgs {
101 url: construct_specs_url(
102 self.specs_url.as_str(),
103 self.sdk_key.as_str(),
104 current_specs_info.zstd_dict_id.as_deref(),
105 ),
106 query_params: Some(params),
107 accept_gzip_response: true,
108 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
109 ..RequestArgs::new()
110 }
111 }
112
113 async fn handle_fallback_request(
114 &self,
115 mut request_args: RequestArgs,
116 current_specs_info: SpecsInfo,
117 ) -> Result<Vec<u8>, NetworkError> {
118 let fallback_url = match &self.fallback_url {
119 Some(url) => construct_specs_url(
120 url.as_str(),
121 &self.sdk_key,
122 current_specs_info.zstd_dict_id.as_deref(),
123 ),
124 None => return Err(NetworkError::RequestFailed),
125 };
126
127 request_args.url = fallback_url;
128
129 let response = self.handle_specs_request(request_args).await?;
132 Ok(response)
133 }
134
135 async fn handle_specs_request(
137 &self,
138 request_args: RequestArgs,
139 ) -> Result<Vec<u8>, NetworkError> {
140 let response = self.network.get(request_args).await?;
141 match response.data {
142 Some(data) => Ok(data),
143 None => Err(NetworkError::RequestFailed),
144 }
145 }
146
147 pub async fn run_background_sync(weak_self: &Weak<Self>) {
148 let strong_self = if let Some(s) = weak_self.upgrade() {
149 s
150 } else {
151 log_e!(TAG, "No strong reference found");
152 return;
153 };
154
155 let specs_info = match strong_self.listener.read() {
156 Ok(lock) => match lock.as_ref() {
157 Some(listener) => listener.get_current_specs_info(),
158 None => SpecsInfo::empty(),
159 },
160 Err(_) => SpecsInfo::error(),
161 };
162
163 strong_self
164 .ops_stats
165 .set_diagnostics_context(ContextType::ConfigSync);
166 if let Err(e) = strong_self.manually_sync_specs(specs_info).await {
167 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _) = e {
168 return;
169 }
170 log_e!(TAG, "Background specs sync failed: {}", e);
171 }
172 strong_self.ops_stats.enqueue_diagnostics_event(
173 Some(KeyType::DownloadConfigSpecs),
174 Some(ContextType::ConfigSync),
175 );
176 }
177
178 async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
179 if let Ok(lock) = self.listener.read() {
180 if lock.is_none() {
181 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
182 }
183 }
184
185 let response = self
186 .fetch_specs_from_network(current_specs_info.clone())
187 .await;
188 let result = self.process_spec_data(response).await;
189
190 if result.is_err() && self.fallback_url.is_some() {
191 log_d!(TAG, "Falling back to statsig api");
192 let request_args = self.get_request_args(¤t_specs_info);
193 let response = self
194 .handle_fallback_request(request_args, current_specs_info)
195 .await;
196 return self.process_spec_data(response).await;
197 }
198
199 result
200 }
201
202 async fn process_spec_data(
203 &self,
204 response: Result<Vec<u8>, NetworkError>,
205 ) -> Result<(), StatsigErr> {
206 let data = response.map_err(|e| {
207 let msg = "No specs result from network";
208 StatsigErr::NetworkError(e, Some(msg.to_string()))
209 })?;
210
211 let update = SpecsUpdate {
212 data,
213 source: SpecsSource::Network,
214 received_at: Utc::now().timestamp_millis() as u64,
215 };
216
217 self.ops_stats.add_marker(
218 Marker::new(
219 KeyType::DownloadConfigSpecs,
220 ActionType::Start,
221 Some(StepType::Process),
222 ),
223 None,
224 );
225
226 let result = match self.listener.read() {
227 Ok(lock) => match lock.as_ref() {
228 Some(listener) => listener.did_receive_specs_update(update),
229 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
230 },
231 Err(e) => {
232 let err = StatsigErr::LockFailure(format!(
233 "Failed to acquire read lock on listener: {}",
234 e
235 ));
236 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
237 Err(err)
238 }
239 };
240
241 self.ops_stats.add_marker(
242 Marker::new(
243 KeyType::DownloadConfigSpecs,
244 ActionType::End,
245 Some(StepType::Process),
246 )
247 .with_is_success(result.is_ok()),
248 None,
249 );
250
251 result
252 }
253}
254
255#[async_trait]
256impl SpecsAdapter for StatsigHttpSpecsAdapter {
257 async fn start(
258 self: Arc<Self>,
259 _statsig_runtime: &Arc<StatsigRuntime>,
260 ) -> Result<(), StatsigErr> {
261 let specs_info = match self.listener.read() {
262 Ok(lock) => match lock.as_ref() {
263 Some(listener) => listener.get_current_specs_info(),
264 None => SpecsInfo::empty(),
265 },
266 Err(_) => SpecsInfo::error(),
267 };
268 self.manually_sync_specs(specs_info).await
269 }
270
271 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
272 match self.listener.write() {
273 Ok(mut lock) => *lock = Some(listener),
274 Err(e) => {
275 log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
276 }
277 }
278 }
279
280 async fn schedule_background_sync(
281 self: Arc<Self>,
282 statsig_runtime: &Arc<StatsigRuntime>,
283 ) -> Result<(), StatsigErr> {
284 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
285 let interval_duration = self.sync_interval_duration;
286 let shutdown_notify = self.shutdown_notify.clone();
287
288 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
289 loop {
290 tokio::select! {
291 () = sleep(interval_duration) => {
292 Self::run_background_sync(&weak_self).await;
293 }
294 () = rt_shutdown_notify.notified() => {
295 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
296 break;
297 },
298 () = shutdown_notify.notified() => {
299 log_d!(TAG, "Shutting down specs background sync");
300 break;
301 }
302 }
303 }
304 });
305
306 Ok(())
307 }
308
309 async fn shutdown(
310 &self,
311 _timeout: Duration,
312 _statsig_runtime: &Arc<StatsigRuntime>,
313 ) -> Result<(), StatsigErr> {
314 self.shutdown_notify.notify_one();
315 Ok(())
316 }
317
318 fn get_type_name(&self) -> String {
319 stringify!(StatsigHttpSpecsAdapter).to_string()
320 }
321}
322
323#[allow(unused)]
324fn construct_specs_url(spec_url: &str, sdk_key: &str, dict_id: Option<&str>) -> String {
325 #[cfg(feature = "with_shared_dict_compression")]
326 {
327 let dict_id = dict_id.unwrap_or("null");
328 format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
329 }
330 #[cfg(not(feature = "with_shared_dict_compression"))]
331 format!("{spec_url}/{sdk_key}.json")
332}