statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::sdk_diagnostics::diagnostics::ContextType;
5use crate::sdk_diagnostics::marker::{ActionType, KeyType, Marker, StepType};
6use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
7use crate::statsig_err::StatsigErr;
8use crate::statsig_metadata::StatsigMetadata;
9use crate::{
10 log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigOptions, StatsigRuntime,
11};
12use async_trait::async_trait;
13use chrono::Utc;
14use percent_encoding::percent_encode;
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock, Weak};
17use std::time::Duration;
18use tokio::sync::Notify;
19use tokio::time::sleep;
20
21use super::{ConfigCompressionMode, SpecsInfo};
22
23pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
24pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
25
26const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
27pub struct StatsigHttpSpecsAdapter {
28 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
29 network: NetworkClient,
30 sdk_key: String,
31 specs_url: String,
32 fallback_url: Option<String>,
33 sync_interval_duration: Duration,
34 config_compression_mode: ConfigCompressionMode,
35 ops_stats: Arc<OpsStatsForInstance>,
36 shutdown_notify: Arc<Notify>,
37}
38
39impl StatsigHttpSpecsAdapter {
40 #[must_use]
41 pub fn new(sdk_key: &str, options: Option<&StatsigOptions>) -> Self {
42 let default_options = StatsigOptions::default();
43 let options_ref = options.unwrap_or(&default_options);
44
45 let specs_url = options_ref
46 .specs_url
47 .as_ref()
48 .map(|u| u.to_string())
49 .unwrap_or(DEFAULT_SPECS_URL.to_string());
50
51 let fallback_url = if options_ref.fallback_to_statsig_api.unwrap_or(false)
53 && specs_url != DEFAULT_SPECS_URL
54 {
55 Some(DEFAULT_SPECS_URL.to_string())
56 } else {
57 None
58 };
59
60 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
61
62 Self {
63 listener: RwLock::new(None),
64 network: NetworkClient::new(sdk_key, Some(headers), Some(options_ref)),
65 sdk_key: sdk_key.to_string(),
66 specs_url,
67 fallback_url,
68 sync_interval_duration: Duration::from_millis(u64::from(
69 options_ref
70 .specs_sync_interval_ms
71 .unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
72 )),
73 ops_stats: OPS_STATS.get_for_instance(sdk_key),
74 shutdown_notify: Arc::new(Notify::new()),
75 config_compression_mode: options_ref
76 .config_compression_mode
77 .clone()
78 .unwrap_or(ConfigCompressionMode::Gzip),
79 }
80 }
81
82 pub async fn fetch_specs_from_network(
83 &self,
84 current_specs_info: SpecsInfo,
85 ) -> Result<Vec<u8>, NetworkError> {
86 let request_args = self.get_request_args(¤t_specs_info);
87 match self.handle_specs_request(request_args).await {
88 Ok(response) => Ok(response),
89 Err(e) => Err(e),
90 }
91 }
92
93 fn get_request_args(&self, current_specs_info: &SpecsInfo) -> RequestArgs {
94 let mut params = HashMap::new();
95 if let Some(lcut) = current_specs_info.lcut {
96 params.insert("sinceTime".to_string(), lcut.to_string());
97 }
98 if let Some(cs) = ¤t_specs_info.checksum {
99 params.insert(
100 "checksum".to_string(),
101 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
102 );
103 }
104
105 RequestArgs {
106 url: construct_specs_url(
107 &self.config_compression_mode,
108 self.specs_url.as_str(),
109 self.sdk_key.as_str(),
110 current_specs_info.zstd_dict_id.as_deref(),
111 ),
112 query_params: Some(params),
113 accept_gzip_response: true,
114 diagnostics_key: Some(KeyType::DownloadConfigSpecs),
115 ..RequestArgs::new()
116 }
117 }
118
119 async fn handle_fallback_request(
120 &self,
121 mut request_args: RequestArgs,
122 current_specs_info: SpecsInfo,
123 ) -> Result<Vec<u8>, NetworkError> {
124 let fallback_url = match &self.fallback_url {
125 Some(url) => construct_specs_url(
126 &self.config_compression_mode,
127 url.as_str(),
128 &self.sdk_key,
129 current_specs_info.zstd_dict_id.as_deref(),
130 ),
131 None => return Err(NetworkError::RequestFailed),
132 };
133
134 request_args.url = fallback_url;
135
136 let response = self.handle_specs_request(request_args).await?;
139 Ok(response)
140 }
141
142 async fn handle_specs_request(
144 &self,
145 request_args: RequestArgs,
146 ) -> Result<Vec<u8>, NetworkError> {
147 let response = self.network.get(request_args).await?;
148 match response.data {
149 Some(data) => Ok(data),
150 None => Err(NetworkError::RequestFailed),
151 }
152 }
153
154 pub async fn run_background_sync(weak_self: &Weak<Self>) {
155 let strong_self = if let Some(s) = weak_self.upgrade() {
156 s
157 } else {
158 log_e!(TAG, "No strong reference found");
159 return;
160 };
161
162 let specs_info = match strong_self.listener.read() {
163 Ok(lock) => match lock.as_ref() {
164 Some(listener) => listener.get_current_specs_info(),
165 None => SpecsInfo::empty(),
166 },
167 Err(_) => SpecsInfo::error(),
168 };
169
170 strong_self
171 .ops_stats
172 .set_diagnostics_context(ContextType::ConfigSync);
173 if let Err(e) = strong_self.manually_sync_specs(specs_info).await {
174 if let StatsigErr::NetworkError(NetworkError::DisableNetworkOn, _) = e {
175 return;
176 }
177 log_e!(TAG, "Background specs sync failed: {}", e);
178 }
179 strong_self.ops_stats.enqueue_diagnostics_event(
180 Some(KeyType::DownloadConfigSpecs),
181 Some(ContextType::ConfigSync),
182 );
183 }
184
185 async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
186 if let Ok(lock) = self.listener.read() {
187 if lock.is_none() {
188 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
189 }
190 }
191
192 let response = self
193 .fetch_specs_from_network(current_specs_info.clone())
194 .await;
195 let result = self.process_spec_data(response).await;
196
197 if result.is_err() && self.fallback_url.is_some() {
198 log_d!(TAG, "Falling back to statsig api");
199 let request_args = self.get_request_args(¤t_specs_info);
200 let response = self
201 .handle_fallback_request(request_args, current_specs_info)
202 .await;
203 return self.process_spec_data(response).await;
204 }
205
206 result
207 }
208
209 async fn process_spec_data(
210 &self,
211 response: Result<Vec<u8>, NetworkError>,
212 ) -> Result<(), StatsigErr> {
213 let data = response.map_err(|e| {
214 let msg = "No specs result from network";
215 StatsigErr::NetworkError(e, Some(msg.to_string()))
216 })?;
217
218 let update = SpecsUpdate {
219 data,
220 source: SpecsSource::Network,
221 received_at: Utc::now().timestamp_millis() as u64,
222 };
223
224 self.ops_stats.add_marker(
225 Marker::new(
226 KeyType::DownloadConfigSpecs,
227 ActionType::Start,
228 Some(StepType::Process),
229 ),
230 None,
231 );
232
233 let result = match self.listener.read() {
234 Ok(lock) => match lock.as_ref() {
235 Some(listener) => listener.did_receive_specs_update(update),
236 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
237 },
238 Err(e) => {
239 let err = StatsigErr::LockFailure(format!(
240 "Failed to acquire read lock on listener: {}",
241 e
242 ));
243 log_error_to_statsig_and_console!(&self.ops_stats, TAG, err.clone());
244 Err(err)
245 }
246 };
247
248 self.ops_stats.add_marker(
249 Marker::new(
250 KeyType::DownloadConfigSpecs,
251 ActionType::End,
252 Some(StepType::Process),
253 )
254 .with_is_success(result.is_ok()),
255 None,
256 );
257
258 result
259 }
260}
261
262#[async_trait]
263impl SpecsAdapter for StatsigHttpSpecsAdapter {
264 async fn start(
265 self: Arc<Self>,
266 _statsig_runtime: &Arc<StatsigRuntime>,
267 ) -> Result<(), StatsigErr> {
268 let specs_info = match self.listener.read() {
269 Ok(lock) => match lock.as_ref() {
270 Some(listener) => listener.get_current_specs_info(),
271 None => SpecsInfo::empty(),
272 },
273 Err(_) => SpecsInfo::error(),
274 };
275 self.manually_sync_specs(specs_info).await
276 }
277
278 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
279 match self.listener.write() {
280 Ok(mut lock) => *lock = Some(listener),
281 Err(e) => {
282 log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
283 }
284 }
285 }
286
287 async fn schedule_background_sync(
288 self: Arc<Self>,
289 statsig_runtime: &Arc<StatsigRuntime>,
290 ) -> Result<(), StatsigErr> {
291 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
292 let interval_duration = self.sync_interval_duration;
293 let shutdown_notify = self.shutdown_notify.clone();
294
295 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
296 loop {
297 tokio::select! {
298 () = sleep(interval_duration) => {
299 Self::run_background_sync(&weak_self).await;
300 }
301 () = rt_shutdown_notify.notified() => {
302 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
303 break;
304 },
305 () = shutdown_notify.notified() => {
306 log_d!(TAG, "Shutting down specs background sync");
307 break;
308 }
309 }
310 }
311 });
312
313 Ok(())
314 }
315
316 async fn shutdown(
317 &self,
318 _timeout: Duration,
319 _statsig_runtime: &Arc<StatsigRuntime>,
320 ) -> Result<(), StatsigErr> {
321 self.shutdown_notify.notify_one();
322 Ok(())
323 }
324
325 fn get_type_name(&self) -> String {
326 stringify!(StatsigHttpSpecsAdapter).to_string()
327 }
328}
329
330fn construct_specs_url(
331 compression_mode: &ConfigCompressionMode,
332 spec_url: &str,
333 sdk_key: &str,
334 dict_id: Option<&str>,
335) -> String {
336 match compression_mode {
337 ConfigCompressionMode::Gzip => format!("{spec_url}/{sdk_key}.json"),
338 ConfigCompressionMode::Dictionary => {
339 let dict_id = dict_id.unwrap_or("null");
340 format!("{spec_url}/d/{dict_id}/{sdk_key}.json")
341 }
342 }
343}