statsig_rust/specs_adapter/
statsig_http_specs_adapter.rs1use crate::networking::{NetworkClient, NetworkError, RequestArgs};
2use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
3use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
4use crate::specs_adapter::{SpecsAdapter, SpecsUpdate, SpecsUpdateListener};
5use crate::statsig_err::StatsigErr;
6use crate::statsig_metadata::StatsigMetadata;
7use crate::{log_d, log_e, log_error_to_statsig_and_console, SpecsSource, StatsigRuntime};
8use async_trait::async_trait;
9use chrono::Utc;
10use percent_encoding::percent_encode;
11use std::collections::HashMap;
12use std::sync::{Arc, RwLock, Weak};
13use std::time::Duration;
14use tokio::sync::Notify;
15use tokio::time::sleep;
16
17use super::SpecsInfo;
18
19pub const DEFAULT_SPECS_URL: &str = "https://api.statsigcdn.com/v2/download_config_specs";
20pub const DEFAULT_SYNC_INTERVAL_MS: u32 = 10_000;
21
22const TAG: &str = stringify!(StatsigHttpSpecsAdapter);
23pub struct StatsigHttpSpecsAdapter {
24 listener: RwLock<Option<Arc<dyn SpecsUpdateListener>>>,
25 network: NetworkClient,
26 specs_url: String,
27 fallback_url: Option<String>,
28 sync_interval_duration: Duration,
29 ops_stats: Arc<OpsStatsForInstance>,
30 shutdown_notify: Arc<Notify>,
31}
32
33impl StatsigHttpSpecsAdapter {
34 #[must_use]
35 pub fn new(
36 sdk_key: &str,
37 specs_url: Option<&String>,
38 fallback_to_statsig_api: bool,
39 sync_interval: Option<u32>,
40 ) -> Self {
41 let fallback_url = if fallback_to_statsig_api {
42 construct_fallback_specs_url(sdk_key, specs_url)
43 } else {
44 None
45 };
46
47 let headers = StatsigMetadata::get_constant_request_headers(sdk_key);
48
49 Self {
50 listener: RwLock::new(None),
51 network: NetworkClient::new(sdk_key, Some(headers)),
52 specs_url: construct_specs_url(sdk_key, specs_url),
53 fallback_url,
54 sync_interval_duration: Duration::from_millis(u64::from(
55 sync_interval.unwrap_or(DEFAULT_SYNC_INTERVAL_MS),
56 )),
57 ops_stats: OPS_STATS.get_for_instance(sdk_key),
58 shutdown_notify: Arc::new(Notify::new()),
59 }
60 }
61
62 pub async fn fetch_specs_from_network(&self, current_specs_info: SpecsInfo) -> Option<String> {
63 let mut params = HashMap::new();
64 if let Some(lcut) = current_specs_info.lcut {
65 params.insert("sinceTime".to_string(), lcut.to_string());
66 }
67 if let Some(cs) = current_specs_info.checksum {
68 params.insert(
69 "checksum".to_string(),
70 percent_encode(cs.as_bytes(), percent_encoding::NON_ALPHANUMERIC).to_string(),
71 );
72 }
73
74 let request_args = RequestArgs {
75 url: self.specs_url.clone(),
76 retries: 2,
77 query_params: Some(params),
78 accept_gzip_response: true,
79 ..RequestArgs::new()
80 };
81
82 match self.network.get(request_args.clone()).await {
83 Ok(response) => Some(response),
84 Err(NetworkError::RetriesExhausted) => self.handle_fallback_request(request_args).await,
85 Err(_) => None,
86 }
87 }
88
89 async fn handle_fallback_request(&self, mut request_args: RequestArgs) -> Option<String> {
90 let fallback_url = match &self.fallback_url {
91 Some(url) => url.clone(),
92 None => return None,
93 };
94
95 request_args.url = fallback_url;
96
97 match self.network.get(request_args).await {
100 Ok(response) => Some(response),
101 Err(_) => None,
102 }
103 }
104
105 pub async fn run_background_sync(weak_self: &Weak<Self>) {
106 let strong_self = if let Some(s) = weak_self.upgrade() {
107 s
108 } else {
109 log_e!(TAG, "No strong reference found");
110 return;
111 };
112
113 let specs_info = match strong_self.listener.read() {
114 Ok(lock) => match lock.as_ref() {
115 Some(listener) => listener.get_current_specs_info(),
116 None => SpecsInfo::empty(),
117 },
118 Err(_) => SpecsInfo::error(),
119 };
120
121 if let Err(e) = strong_self.manually_sync_specs(specs_info).await {
122 log_e!(TAG, "Background specs sync failed: {}", e);
123 }
124 }
125
126 async fn manually_sync_specs(&self, current_specs_info: SpecsInfo) -> Result<(), StatsigErr> {
127 if let Ok(lock) = self.listener.read() {
128 if lock.is_none() {
129 return Err(StatsigErr::UnstartedAdapter("Listener not set".to_string()));
130 }
131 }
132
133 let response = self.fetch_specs_from_network(current_specs_info).await;
134
135 let data = if let Some(r) = response {
136 r
137 } else {
138 let msg = "No specs result from network";
139 log_e!(TAG, "{}", msg);
140 return Err(StatsigErr::NetworkError(msg.to_string()));
141 };
142
143 let update = SpecsUpdate {
144 data,
145 source: SpecsSource::Network,
146 received_at: Utc::now().timestamp_millis() as u64,
147 };
148
149 match self.listener.read() {
150 Ok(lock) => match lock.as_ref() {
151 Some(listener) => listener.did_receive_specs_update(update),
152 None => Err(StatsigErr::UnstartedAdapter("Listener not set".to_string())),
153 },
154 Err(e) => {
155 log_error_to_statsig_and_console!(
156 &self.ops_stats,
157 TAG,
158 "Failed to acquire read lock on listener: {}",
159 e
160 );
161 Err(StatsigErr::LockFailure(e.to_string()))
162 }
163 }
164 }
165}
166
167#[async_trait]
168impl SpecsAdapter for StatsigHttpSpecsAdapter {
169 async fn start(
170 self: Arc<Self>,
171 _statsig_runtime: &Arc<StatsigRuntime>,
172 ) -> Result<(), StatsigErr> {
173 let specs_info = match self.listener.read() {
174 Ok(lock) => match lock.as_ref() {
175 Some(listener) => listener.get_current_specs_info(),
176 None => SpecsInfo::empty(),
177 },
178 Err(_) => SpecsInfo::error(),
179 };
180 self.manually_sync_specs(specs_info).await
181 }
182
183 fn initialize(&self, listener: Arc<dyn SpecsUpdateListener>) {
184 match self.listener.write() {
185 Ok(mut lock) => *lock = Some(listener),
186 Err(e) => {
187 log_e!(TAG, "Failed to acquire write lock on listener: {}", e);
188 }
189 }
190 }
191
192 async fn schedule_background_sync(
193 self: Arc<Self>,
194 statsig_runtime: &Arc<StatsigRuntime>,
195 ) -> Result<(), StatsigErr> {
196 let weak_self: Weak<StatsigHttpSpecsAdapter> = Arc::downgrade(&self);
197 let interval_duration = self.sync_interval_duration;
198 let shutdown_notify = self.shutdown_notify.clone();
199
200 statsig_runtime.spawn("http_specs_bg_sync", move |rt_shutdown_notify| async move {
201 loop {
202 tokio::select! {
203 () = sleep(interval_duration) => {
204 Self::run_background_sync(&weak_self).await;
205 }
206 () = rt_shutdown_notify.notified() => {
207 log_d!(TAG, "Runtime shutdown. Shutting down specs background sync");
208 break;
209 },
210 () = shutdown_notify.notified() => {
211 log_d!(TAG, "Shutting down specs background sync");
212 break;
213 }
214 }
215 }
216 });
217
218 Ok(())
219 }
220
221 async fn shutdown(
222 &self,
223 _timeout: Duration,
224 _statsig_runtime: &Arc<StatsigRuntime>,
225 ) -> Result<(), StatsigErr> {
226 self.shutdown_notify.notify_one();
227 Ok(())
228 }
229
230 fn get_type_name(&self) -> String {
231 stringify!(StatsigHttpSpecsAdapter).to_string()
232 }
233}
234
235fn construct_specs_url(sdk_key: &str, spec_url: Option<&String>) -> String {
236 let base = match spec_url {
237 Some(u) => u,
238 _ => DEFAULT_SPECS_URL,
239 };
240
241 format!("{base}/{sdk_key}.json")
242}
243
244fn construct_fallback_specs_url(sdk_key: &str, spec_url: Option<&String>) -> Option<String> {
246 match spec_url {
247 Some(u) if u != DEFAULT_SPECS_URL => Some(format!("{u}/{sdk_key}.json")),
248 _ => None,
249 }
250}