statsig_rust/networking/
network_client.rs

1use chrono::Utc;
2
3use super::network_error::NetworkError;
4use super::providers::get_network_provider;
5use super::{HttpMethod, NetworkProvider, RequestArgs, Response};
6use crate::networking::proxy_config::ProxyConfig;
7use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
8use crate::observability::ErrorBoundaryEvent;
9use crate::sdk_diagnostics::marker::{ActionType, Marker, StepType};
10use crate::{log_d, log_i, log_w, StatsigOptions};
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Weak};
14use std::time::Duration;
15
16const NON_RETRY_CODES: [u16; 4] = [400, 403, 405, 501];
17const SHUTDOWN_ERROR: &str = "Request was aborted because the client is shutting down";
18
19const TAG: &str = stringify!(NetworkClient);
20
21pub struct NetworkClient {
22    headers: HashMap<String, String>,
23    is_shutdown: Arc<AtomicBool>,
24    ops_stats: Arc<OpsStatsForInstance>,
25    net_provider: Weak<dyn NetworkProvider>,
26    disable_network: bool,
27    proxy_config: Option<ProxyConfig>,
28    silent_on_network_failure: bool,
29    disable_file_streaming: bool,
30}
31
32impl NetworkClient {
33    #[must_use]
34    pub fn new(
35        sdk_key: &str,
36        headers: Option<HashMap<String, String>>,
37        options: Option<&StatsigOptions>,
38    ) -> Self {
39        let net_provider = get_network_provider();
40        let (disable_network, proxy_config) = options
41            .map(|opts| {
42                (
43                    opts.disable_network.unwrap_or(false),
44                    opts.proxy_config.clone(),
45                )
46            })
47            .unwrap_or((false, None));
48
49        NetworkClient {
50            headers: headers.unwrap_or_default(),
51            is_shutdown: Arc::new(AtomicBool::new(false)),
52            net_provider,
53            ops_stats: OPS_STATS.get_for_instance(sdk_key),
54            disable_network,
55            proxy_config,
56            silent_on_network_failure: false,
57            disable_file_streaming: options
58                .map(|opts| opts.disable_disk_access.unwrap_or(false))
59                .unwrap_or(false),
60        }
61    }
62
63    pub fn shutdown(&self) {
64        self.is_shutdown.store(true, Ordering::SeqCst);
65    }
66
67    pub async fn get(&self, request_args: RequestArgs) -> Result<Response, NetworkError> {
68        self.make_request(HttpMethod::GET, request_args).await
69    }
70
71    pub async fn post(
72        &self,
73        mut request_args: RequestArgs,
74        body: Option<Vec<u8>>,
75    ) -> Result<Response, NetworkError> {
76        request_args.body = body;
77        self.make_request(HttpMethod::POST, request_args).await
78    }
79
80    async fn make_request(
81        &self,
82        method: HttpMethod,
83        mut request_args: RequestArgs,
84    ) -> Result<Response, NetworkError> {
85        let is_shutdown = if let Some(is_shutdown) = &request_args.is_shutdown {
86            is_shutdown.clone()
87        } else {
88            self.is_shutdown.clone()
89        };
90
91        if self.disable_network {
92            log_d!(TAG, "Network is disabled, not making requests");
93            return Err(NetworkError::DisableNetworkOn(request_args.url));
94        }
95
96        request_args.populate_headers(self.headers.clone());
97
98        if request_args.disable_file_streaming.is_none() {
99            request_args.disable_file_streaming = Some(self.disable_file_streaming);
100        }
101
102        let mut merged_headers = request_args.headers.unwrap_or_default();
103        if !self.headers.is_empty() {
104            merged_headers.extend(self.headers.clone());
105        }
106        merged_headers.insert(
107            "STATSIG-CLIENT-TIME".into(),
108            Utc::now().timestamp_millis().to_string(),
109        );
110        request_args.headers = Some(merged_headers);
111
112        // passing down proxy config through request args
113        if let Some(proxy_config) = &self.proxy_config {
114            request_args.proxy_config = Some(proxy_config.clone());
115        }
116
117        let mut attempt = 0;
118
119        loop {
120            if let Some(key) = request_args.diagnostics_key {
121                self.ops_stats.add_marker(
122                    Marker::new(key, ActionType::Start, Some(StepType::NetworkRequest))
123                        .with_attempt(attempt)
124                        .with_url(request_args.url.clone()),
125                    None,
126                );
127            }
128            if is_shutdown.load(Ordering::SeqCst) {
129                log_i!(TAG, "{}", SHUTDOWN_ERROR);
130                return Err(NetworkError::ShutdownError(request_args.url));
131            }
132
133            let mut response = match self.net_provider.upgrade() {
134                Some(net_provider) => net_provider.send(&method, &request_args).await,
135                None => {
136                    return Err(NetworkError::RequestFailed(
137                        request_args.url,
138                        None,
139                        "Failed to get a NetworkProvider instance".to_string(),
140                    ));
141                }
142            };
143
144            log_d!(
145                TAG,
146                "Response ({}): {:?}",
147                &request_args.url,
148                response.status_code
149            );
150
151            let status = response.status_code;
152            let sdk_region_str = response
153                .headers
154                .as_ref()
155                .and_then(|h| h.get("x-statsig-region"));
156            let success = (200..300).contains(&status.unwrap_or(0));
157
158            let error_message = response
159                .error
160                .clone()
161                .unwrap_or_else(|| get_error_message_for_status(status, response.data.as_mut()));
162
163            if let Some(key) = request_args.diagnostics_key {
164                let mut end_marker =
165                    Marker::new(key, ActionType::End, Some(StepType::NetworkRequest))
166                        .with_attempt(attempt)
167                        .with_url(request_args.url.clone())
168                        .with_is_success(success)
169                        .with_sdk_region(sdk_region_str.map(|s| s.to_owned()));
170
171                if let Some(status_code) = status {
172                    end_marker = end_marker.with_status_code(status_code);
173                }
174
175                let error_map = if !error_message.is_empty() {
176                    let mut map = HashMap::new();
177                    map.insert("name".to_string(), "NetworkError".to_string());
178                    map.insert("message".to_string(), error_message.clone());
179                    let status_string = match status {
180                        Some(code) => code.to_string(),
181                        None => "None".to_string(),
182                    };
183                    map.insert("code".to_string(), status_string);
184                    Some(map)
185                } else {
186                    None
187                };
188
189                if let Some(error_map) = error_map {
190                    end_marker = end_marker.with_error(error_map);
191                }
192
193                self.ops_stats.add_marker(end_marker, None);
194            }
195
196            if success {
197                return Ok(response);
198            }
199
200            if NON_RETRY_CODES.contains(&status.unwrap_or(0)) {
201                let error = NetworkError::RequestNotRetryable(
202                    request_args.url.clone(),
203                    status,
204                    error_message,
205                );
206                self.log_warning(&error, &request_args);
207                return Err(error);
208            }
209
210            if attempt >= request_args.retries {
211                let error = NetworkError::RetriesExhausted(
212                    request_args.url.clone(),
213                    status,
214                    attempt + 1,
215                    error_message,
216                );
217                self.log_warning(&error, &request_args);
218                return Err(error);
219            }
220
221            attempt += 1;
222            let backoff_ms = 2_u64.pow(attempt) * 100;
223
224            log_i!(
225                TAG, "Network request failed with status code {} (attempt {}/{}), will retry after {}ms...\n{}",
226                status.map_or("unknown".to_string(), |s| s.to_string()),
227                attempt,
228                request_args.retries + 1,
229                backoff_ms,
230                error_message
231            );
232
233            tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
234        }
235    }
236
237    pub fn mute_network_error_log(mut self) -> Self {
238        self.silent_on_network_failure = true;
239        self
240    }
241
242    fn log_warning(&self, error: &NetworkError, args: &RequestArgs) {
243        let exception = error.name();
244
245        log_w!(TAG, "{}", error);
246        if !self.silent_on_network_failure {
247            let dedupe_key = format!("{:?}", args.diagnostics_key);
248            self.ops_stats.log_error(ErrorBoundaryEvent {
249                tag: TAG.to_string(),
250                exception: exception.to_string(),
251                bypass_dedupe: false,
252                info: serde_json::to_string(error).unwrap_or_default(),
253                dedupe_key: Some(dedupe_key),
254                extra: None,
255            });
256        }
257    }
258}
259
260fn get_error_message_for_status(
261    status: Option<u16>,
262    data: Option<&mut super::ResponseData>,
263) -> String {
264    if (200..300).contains(&status.unwrap_or(0)) {
265        return String::new();
266    }
267
268    let mut message = String::new();
269    if let Some(data) = data {
270        let lossy_str = data.read_to_string().unwrap_or_default();
271        if lossy_str.is_ascii() {
272            message = lossy_str.to_string();
273        }
274    }
275
276    let status_value = match status {
277        Some(code) => code,
278        None => return format!("HTTP Error None: {message}"),
279    };
280
281    let generic_message = match status_value {
282        400 => "Bad Request",
283        401 => "Unauthorized",
284        403 => "Forbidden",
285        404 => "Not Found",
286        405 => "Method Not Allowed",
287        406 => "Not Acceptable",
288        408 => "Request Timeout",
289        500 => "Internal Server Error",
290        502 => "Bad Gateway",
291        503 => "Service Unavailable",
292        504 => "Gateway Timeout",
293        0 => "Unknown Error",
294        _ => return format!("HTTP Error {status_value}: {message}"),
295    };
296
297    if message.is_empty() {
298        return generic_message.to_string();
299    }
300
301    format!("{generic_message}: {message}")
302}