1use chrono::Utc;
2
3use super::network_error::NetworkError;
4use super::providers::get_network_provider;
5use super::{HttpMethod, NetworkProvider, RequestArgs, Response};
6use crate::networking::proxy_config::ProxyConfig;
7use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
8use crate::observability::ErrorBoundaryEvent;
9use crate::sdk_diagnostics::marker::{ActionType, Marker, StepType};
10use crate::{log_d, log_i, log_w, StatsigOptions};
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Weak};
14use std::time::Duration;
15
16const NON_RETRY_CODES: [u16; 4] = [400, 403, 405, 501];
17const SHUTDOWN_ERROR: &str = "Request was aborted because the client is shutting down";
18
19const TAG: &str = stringify!(NetworkClient);
20
21pub struct NetworkClient {
22 headers: HashMap<String, String>,
23 is_shutdown: Arc<AtomicBool>,
24 ops_stats: Arc<OpsStatsForInstance>,
25 net_provider: Weak<dyn NetworkProvider>,
26 disable_network: bool,
27 proxy_config: Option<ProxyConfig>,
28 silent_on_network_failure: bool,
29}
30
31impl NetworkClient {
32 #[must_use]
33 pub fn new(
34 sdk_key: &str,
35 headers: Option<HashMap<String, String>>,
36 options: Option<&StatsigOptions>,
37 ) -> Self {
38 let net_provider = get_network_provider();
39 let (disable_network, proxy_config) = options
40 .map(|opts| {
41 (
42 opts.disable_network.unwrap_or(false),
43 opts.proxy_config.clone(),
44 )
45 })
46 .unwrap_or((false, None));
47
48 NetworkClient {
49 headers: headers.unwrap_or_default(),
50 is_shutdown: Arc::new(AtomicBool::new(false)),
51 net_provider,
52 ops_stats: OPS_STATS.get_for_instance(sdk_key),
53 disable_network,
54 proxy_config,
55 silent_on_network_failure: false,
56 }
57 }
58
59 pub fn shutdown(&self) {
60 self.is_shutdown.store(true, Ordering::SeqCst);
61 }
62
63 pub async fn get(&self, request_args: RequestArgs) -> Result<Response, NetworkError> {
64 self.make_request(HttpMethod::GET, request_args).await
65 }
66
67 pub async fn post(
68 &self,
69 mut request_args: RequestArgs,
70 body: Option<Vec<u8>>,
71 ) -> Result<Response, NetworkError> {
72 request_args.body = body;
73 self.make_request(HttpMethod::POST, request_args).await
74 }
75
76 async fn make_request(
77 &self,
78 method: HttpMethod,
79 mut request_args: RequestArgs,
80 ) -> Result<Response, NetworkError> {
81 let is_shutdown = if let Some(is_shutdown) = &request_args.is_shutdown {
82 is_shutdown.clone()
83 } else {
84 self.is_shutdown.clone()
85 };
86
87 if self.disable_network {
88 log_d!(TAG, "Network is disabled, not making requests");
89 return Err(NetworkError::DisableNetworkOn(request_args.url));
90 }
91
92 request_args.populate_headers(self.headers.clone());
93
94 let mut merged_headers = request_args.headers.unwrap_or_default();
95 if !self.headers.is_empty() {
96 merged_headers.extend(self.headers.clone());
97 }
98 merged_headers.insert(
99 "STATSIG-CLIENT-TIME".into(),
100 Utc::now().timestamp_millis().to_string(),
101 );
102 request_args.headers = Some(merged_headers);
103
104 if let Some(proxy_config) = &self.proxy_config {
106 request_args.proxy_config = Some(proxy_config.clone());
107 }
108
109 let mut attempt = 0;
110
111 loop {
112 if let Some(key) = request_args.diagnostics_key {
113 self.ops_stats.add_marker(
114 Marker::new(key, ActionType::Start, Some(StepType::NetworkRequest))
115 .with_attempt(attempt)
116 .with_url(request_args.url.clone()),
117 None,
118 );
119 }
120 if is_shutdown.load(Ordering::SeqCst) {
121 log_i!(TAG, "{}", SHUTDOWN_ERROR);
122 return Err(NetworkError::ShutdownError(request_args.url));
123 }
124
125 let mut response = match self.net_provider.upgrade() {
126 Some(net_provider) => net_provider.send(&method, &request_args).await,
127 None => {
128 return Err(NetworkError::RequestFailed(
129 request_args.url,
130 None,
131 "Failed to get a NetworkProvider instance".to_string(),
132 ));
133 }
134 };
135
136 log_d!(
137 TAG,
138 "Response ({}): {:?}",
139 &request_args.url,
140 response.status_code
141 );
142
143 let status = response.status_code;
144 let sdk_region_str = response
145 .headers
146 .as_ref()
147 .and_then(|h| h.get("x-statsig-region"));
148 let success = (200..300).contains(&status.unwrap_or(0));
149
150 let error_message = response
151 .error
152 .clone()
153 .unwrap_or_else(|| get_error_message_for_status(status, response.data.as_mut()));
154
155 if let Some(key) = request_args.diagnostics_key {
156 let mut end_marker =
157 Marker::new(key, ActionType::End, Some(StepType::NetworkRequest))
158 .with_attempt(attempt)
159 .with_url(request_args.url.clone())
160 .with_is_success(success)
161 .with_sdk_region(sdk_region_str.map(|s| s.to_owned()));
162
163 if let Some(status_code) = status {
164 end_marker = end_marker.with_status_code(status_code);
165 }
166
167 let error_map = if !error_message.is_empty() {
168 let mut map = HashMap::new();
169 map.insert("name".to_string(), "NetworkError".to_string());
170 map.insert("message".to_string(), error_message.clone());
171 let status_string = match status {
172 Some(code) => code.to_string(),
173 None => "None".to_string(),
174 };
175 map.insert("code".to_string(), status_string);
176 Some(map)
177 } else {
178 None
179 };
180
181 if let Some(error_map) = error_map {
182 end_marker = end_marker.with_error(error_map);
183 }
184
185 self.ops_stats.add_marker(end_marker, None);
186 }
187
188 if success {
189 return Ok(response);
190 }
191
192 if NON_RETRY_CODES.contains(&status.unwrap_or(0)) {
193 let error = NetworkError::RequestNotRetryable(
194 request_args.url.clone(),
195 status,
196 error_message,
197 );
198 self.log_warning(&error, &request_args);
199 return Err(error);
200 }
201
202 if attempt >= request_args.retries {
203 let error = NetworkError::RetriesExhausted(
204 request_args.url.clone(),
205 status,
206 attempt + 1,
207 error_message,
208 );
209 self.log_warning(&error, &request_args);
210 return Err(error);
211 }
212
213 attempt += 1;
214 let backoff_ms = 2_u64.pow(attempt) * 100;
215
216 log_i!(
217 TAG, "Network request failed with status code {} (attempt {}/{}), will retry after {}ms...\n{}",
218 status.map_or("unknown".to_string(), |s| s.to_string()),
219 attempt,
220 request_args.retries + 1,
221 backoff_ms,
222 error_message
223 );
224
225 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
226 }
227 }
228
229 pub fn mute_network_error_log(mut self) -> Self {
230 self.silent_on_network_failure = true;
231 self
232 }
233
234 fn log_warning(&self, error: &NetworkError, args: &RequestArgs) {
235 let exception = error.name();
236
237 log_w!(TAG, "{}", error);
238 if !self.silent_on_network_failure {
239 let dedupe_key = format!("{:?}", args.diagnostics_key);
240 self.ops_stats.log_error(ErrorBoundaryEvent {
241 tag: TAG.to_string(),
242 exception: exception.to_string(),
243 bypass_dedupe: false,
244 info: serde_json::to_string(error).unwrap_or_default(),
245 dedupe_key: Some(dedupe_key),
246 extra: None,
247 });
248 }
249 }
250}
251
252fn get_error_message_for_status(
253 status: Option<u16>,
254 data: Option<&mut super::ResponseData>,
255) -> String {
256 if (200..300).contains(&status.unwrap_or(0)) {
257 return String::new();
258 }
259
260 let mut message = String::new();
261 if let Some(data) = data {
262 let lossy_str = data.read_to_string().unwrap_or_default();
263 if lossy_str.is_ascii() {
264 message = lossy_str.to_string();
265 }
266 }
267
268 let status_value = match status {
269 Some(code) => code,
270 None => return format!("HTTP Error None: {message}"),
271 };
272
273 let generic_message = match status_value {
274 400 => "Bad Request",
275 401 => "Unauthorized",
276 403 => "Forbidden",
277 404 => "Not Found",
278 405 => "Method Not Allowed",
279 406 => "Not Acceptable",
280 408 => "Request Timeout",
281 500 => "Internal Server Error",
282 502 => "Bad Gateway",
283 503 => "Service Unavailable",
284 504 => "Gateway Timeout",
285 0 => "Unknown Error",
286 _ => return format!("HTTP Error {status_value}: {message}"),
287 };
288
289 if message.is_empty() {
290 return generic_message.to_string();
291 }
292
293 format!("{generic_message}: {message}")
294}