statsig_rust/networking/
network_client.rs

1use chrono::Utc;
2
3use super::network_error::NetworkError;
4use super::providers::get_network_provider;
5use super::{HttpMethod, NetworkProvider, RequestArgs, Response};
6use crate::networking::proxy_config::ProxyConfig;
7use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
8use crate::observability::ErrorBoundaryEvent;
9use crate::sdk_diagnostics::marker::{ActionType, Marker, StepType};
10use crate::{log_d, log_i, log_w, StatsigOptions};
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Weak};
14use std::time::Duration;
15
16const NON_RETRY_CODES: [u16; 4] = [400, 403, 405, 501];
17const SHUTDOWN_ERROR: &str = "Request was aborted because the client is shutting down";
18
19const TAG: &str = stringify!(NetworkClient);
20
21pub struct NetworkClient {
22    headers: HashMap<String, String>,
23    is_shutdown: Arc<AtomicBool>,
24    ops_stats: Arc<OpsStatsForInstance>,
25    net_provider: Weak<dyn NetworkProvider>,
26    disable_network: bool,
27    proxy_config: Option<ProxyConfig>,
28    silent_on_network_failure: bool,
29}
30
31impl NetworkClient {
32    #[must_use]
33    pub fn new(
34        sdk_key: &str,
35        headers: Option<HashMap<String, String>>,
36        options: Option<&StatsigOptions>,
37    ) -> Self {
38        let net_provider = get_network_provider();
39        let (disable_network, proxy_config) = options
40            .map(|opts| {
41                (
42                    opts.disable_network.unwrap_or(false),
43                    opts.proxy_config.clone(),
44                )
45            })
46            .unwrap_or((false, None));
47
48        NetworkClient {
49            headers: headers.unwrap_or_default(),
50            is_shutdown: Arc::new(AtomicBool::new(false)),
51            net_provider,
52            ops_stats: OPS_STATS.get_for_instance(sdk_key),
53            disable_network,
54            proxy_config,
55            silent_on_network_failure: false,
56        }
57    }
58
59    pub fn shutdown(&self) {
60        self.is_shutdown.store(true, Ordering::SeqCst);
61    }
62
63    pub async fn get(&self, request_args: RequestArgs) -> Result<Response, NetworkError> {
64        self.make_request(HttpMethod::GET, request_args).await
65    }
66
67    pub async fn post(
68        &self,
69        mut request_args: RequestArgs,
70        body: Option<Vec<u8>>,
71    ) -> Result<Response, NetworkError> {
72        request_args.body = body;
73        self.make_request(HttpMethod::POST, request_args).await
74    }
75
76    async fn make_request(
77        &self,
78        method: HttpMethod,
79        mut request_args: RequestArgs,
80    ) -> Result<Response, NetworkError> {
81        let is_shutdown = if let Some(is_shutdown) = &request_args.is_shutdown {
82            is_shutdown.clone()
83        } else {
84            self.is_shutdown.clone()
85        };
86
87        if self.disable_network {
88            log_d!(TAG, "Network is disabled, not making requests");
89            return Err(NetworkError::DisableNetworkOn(request_args.url));
90        }
91
92        request_args.populate_headers(self.headers.clone());
93
94        let mut merged_headers = request_args.headers.unwrap_or_default();
95        if !self.headers.is_empty() {
96            merged_headers.extend(self.headers.clone());
97        }
98        merged_headers.insert(
99            "STATSIG-CLIENT-TIME".into(),
100            Utc::now().timestamp_millis().to_string(),
101        );
102        request_args.headers = Some(merged_headers);
103
104        // passing down proxy config through request args
105        if let Some(proxy_config) = &self.proxy_config {
106            request_args.proxy_config = Some(proxy_config.clone());
107        }
108
109        let mut attempt = 0;
110
111        loop {
112            if let Some(key) = request_args.diagnostics_key {
113                self.ops_stats.add_marker(
114                    Marker::new(key, ActionType::Start, Some(StepType::NetworkRequest))
115                        .with_attempt(attempt)
116                        .with_url(request_args.url.clone()),
117                    None,
118                );
119            }
120            if is_shutdown.load(Ordering::SeqCst) {
121                log_i!(TAG, "{}", SHUTDOWN_ERROR);
122                return Err(NetworkError::ShutdownError(request_args.url));
123            }
124
125            let response = match self.net_provider.upgrade() {
126                Some(net_provider) => net_provider.send(&method, &request_args).await,
127                None => {
128                    return Err(NetworkError::RequestFailed(
129                        request_args.url,
130                        None,
131                        "Failed to get a NetworkProvider instance".to_string(),
132                    ));
133                }
134            };
135
136            log_d!(
137                TAG,
138                "Response ({}): {:?}",
139                &request_args.url,
140                response.status_code
141            );
142
143            let status = response.status_code;
144            let sdk_region_str = response
145                .headers
146                .as_ref()
147                .and_then(|h| h.get("x-statsig-region"));
148            let success = (200..300).contains(&status.unwrap_or(0));
149
150            let error_message = response
151                .error
152                .clone()
153                .unwrap_or_else(|| get_error_message_for_status(status, response.data.as_deref()));
154
155            if let Some(key) = request_args.diagnostics_key {
156                let mut end_marker =
157                    Marker::new(key, ActionType::End, Some(StepType::NetworkRequest))
158                        .with_attempt(attempt)
159                        .with_url(request_args.url.clone())
160                        .with_is_success(success)
161                        .with_sdk_region(sdk_region_str.map(|s| s.to_owned()));
162
163                if let Some(status_code) = status {
164                    end_marker = end_marker.with_status_code(status_code);
165                }
166
167                let error_map = if !error_message.is_empty() {
168                    let mut map = HashMap::new();
169                    map.insert("name".to_string(), "NetworkError".to_string());
170                    map.insert("message".to_string(), error_message.clone());
171                    let status_string = match status {
172                        Some(code) => code.to_string(),
173                        None => "None".to_string(),
174                    };
175                    map.insert("code".to_string(), status_string);
176                    Some(map)
177                } else {
178                    None
179                };
180
181                if let Some(error_map) = error_map {
182                    end_marker = end_marker.with_error(error_map);
183                }
184
185                self.ops_stats.add_marker(end_marker, None);
186            }
187
188            if success {
189                return Ok(response);
190            }
191
192            if NON_RETRY_CODES.contains(&status.unwrap_or(0)) {
193                let error = NetworkError::RequestNotRetryable(
194                    request_args.url.clone(),
195                    status,
196                    error_message,
197                );
198                self.log_warning(&error, &request_args);
199                return Err(error);
200            }
201
202            if attempt >= request_args.retries {
203                let error = NetworkError::RetriesExhausted(
204                    request_args.url.clone(),
205                    status,
206                    attempt + 1,
207                    error_message,
208                );
209                self.log_warning(&error, &request_args);
210                return Err(error);
211            }
212
213            attempt += 1;
214            let backoff_ms = 2_u64.pow(attempt) * 100;
215
216            log_i!(
217                TAG, "Network request failed with status code {} (attempt {}/{}), will retry after {}ms...\n{}",
218                status.map_or("unknown".to_string(), |s| s.to_string()),
219                attempt,
220                request_args.retries + 1,
221                backoff_ms,
222                error_message
223            );
224
225            tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
226        }
227    }
228
229    pub fn mute_network_error_log(mut self) -> Self {
230        self.silent_on_network_failure = true;
231        self
232    }
233
234    fn log_warning(&self, error: &NetworkError, args: &RequestArgs) {
235        let exception = error.name();
236
237        log_w!(TAG, "{}", error);
238        if !self.silent_on_network_failure {
239            let dedupe_key = format!("{:?}", args.diagnostics_key);
240            self.ops_stats.log_error(ErrorBoundaryEvent {
241                tag: TAG.to_string(),
242                exception: exception.to_string(),
243                bypass_dedupe: false,
244                info: serde_json::to_string(error).unwrap_or_default(),
245                dedupe_key: Some(dedupe_key),
246                extra: None,
247            });
248        }
249    }
250}
251
252fn get_error_message_for_status(status: Option<u16>, data: Option<&[u8]>) -> String {
253    if (200..300).contains(&status.unwrap_or(0)) {
254        return String::new();
255    }
256
257    let mut message = String::new();
258    if let Some(data) = data {
259        let lossy_str = String::from_utf8_lossy(data);
260        if lossy_str.is_ascii() {
261            message = lossy_str.to_string();
262        }
263    }
264
265    let status_value = match status {
266        Some(code) => code,
267        None => return format!("HTTP Error None: {message}"),
268    };
269
270    let generic_message = match status_value {
271        400 => "Bad Request",
272        401 => "Unauthorized",
273        403 => "Forbidden",
274        404 => "Not Found",
275        405 => "Method Not Allowed",
276        406 => "Not Acceptable",
277        408 => "Request Timeout",
278        500 => "Internal Server Error",
279        502 => "Bad Gateway",
280        503 => "Service Unavailable",
281        504 => "Gateway Timeout",
282        0 => "Unknown Error",
283        _ => return format!("HTTP Error {status_value}: {message}"),
284    };
285
286    if message.is_empty() {
287        return generic_message.to_string();
288    }
289
290    format!("{generic_message}: {message}")
291}