1use std::{borrow::Cow, collections::HashMap, str::FromStr, sync::Arc, time::Duration};
19
20use nautilus_core::collections::into_ustr_vec;
21use nautilus_cryptography::providers::install_cryptographic_provider;
22use reqwest::{
23 Method, Response, Url,
24 header::{HeaderMap, HeaderName, HeaderValue},
25};
26use ustr::Ustr;
27
28use super::{HttpClientError, HttpResponse, HttpStatus};
29use crate::ratelimiter::{RateLimiter, clock::MonotonicClock, quota::Quota};
30
31const DEFAULT_POOL_MAX_IDLE_PER_HOST: usize = 32;
33
34const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 60;
36
37const DEFAULT_HTTP2_KEEP_ALIVE_SECS: u64 = 30;
39
40#[derive(Clone, Debug)]
50#[cfg_attr(
51 feature = "python",
52 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.network", from_py_object)
53)]
54#[cfg_attr(
55 feature = "python",
56 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.network")
57)]
58pub struct HttpClient {
59 pub(crate) client: InnerHttpClient,
61 pub(crate) rate_limiter: Arc<RateLimiter<Ustr, MonotonicClock>>,
63}
64
65impl HttpClient {
66 pub fn new(
73 headers: HashMap<String, String>,
74 header_keys: Vec<String>,
75 keyed_quotas: Vec<(String, Quota)>,
76 default_quota: Option<Quota>,
77 timeout_secs: Option<u64>,
78 proxy_url: Option<String>,
79 ) -> Result<Self, HttpClientError> {
80 install_cryptographic_provider();
81
82 let mut header_map = HeaderMap::new();
84 for (key, value) in headers {
85 let header_name = HeaderName::from_str(&key)
86 .map_err(|e| HttpClientError::Error(format!("Invalid header name '{key}': {e}")))?;
87 let header_value = HeaderValue::from_str(&value).map_err(|e| {
88 HttpClientError::Error(format!("Invalid header value '{value}': {e}"))
89 })?;
90 header_map.insert(header_name, header_value);
91 }
92
93 let mut client_builder = reqwest::Client::builder()
94 .default_headers(header_map)
95 .tcp_nodelay(true)
96 .pool_max_idle_per_host(DEFAULT_POOL_MAX_IDLE_PER_HOST)
97 .pool_idle_timeout(Duration::from_secs(DEFAULT_POOL_IDLE_TIMEOUT_SECS))
98 .http2_keep_alive_interval(Duration::from_secs(DEFAULT_HTTP2_KEEP_ALIVE_SECS))
99 .http2_keep_alive_while_idle(true)
100 .http2_adaptive_window(true);
101
102 if let Some(timeout_secs) = timeout_secs {
103 client_builder = client_builder.timeout(Duration::from_secs(timeout_secs));
104 }
105
106 if let Some(proxy_url) = proxy_url {
108 let proxy = reqwest::Proxy::all(&proxy_url)
109 .map_err(|e| HttpClientError::InvalidProxy(format!("{proxy_url}: {e}")))?;
110 client_builder = client_builder.proxy(proxy);
111 }
112
113 let client = client_builder
114 .build()
115 .map_err(|e| HttpClientError::ClientBuildError(e.to_string()))?;
116
117 let (valid_keys, header_names): (Vec<String>, Vec<HeaderName>) = header_keys
119 .into_iter()
120 .filter_map(|k| HeaderName::from_str(&k).ok().map(|name| (k, name)))
121 .unzip();
122
123 let client = InnerHttpClient {
124 client,
125 header_keys: Arc::new(valid_keys),
126 header_names: Arc::new(header_names),
127 };
128
129 let keyed_quotas = keyed_quotas
130 .into_iter()
131 .map(|(key, quota)| (Ustr::from(&key), quota))
132 .collect();
133
134 let rate_limiter = Arc::new(RateLimiter::new_with_quota(default_quota, keyed_quotas));
135
136 Ok(Self {
137 client,
138 rate_limiter,
139 })
140 }
141
142 #[allow(clippy::too_many_arguments)]
152 pub async fn request(
153 &self,
154 method: Method,
155 url: String,
156 params: Option<&HashMap<String, Vec<String>>>,
157 headers: Option<HashMap<String, String>>,
158 body: Option<Vec<u8>>,
159 timeout_secs: Option<u64>,
160 keys: Option<Vec<String>>,
161 ) -> Result<HttpResponse, HttpClientError> {
162 let keys = keys.map(into_ustr_vec);
163
164 self.request_with_ustr_keys(method, url, params, headers, body, timeout_secs, keys)
165 .await
166 }
167
168 #[allow(clippy::too_many_arguments)]
178 pub async fn request_with_params<P: serde::Serialize>(
179 &self,
180 method: Method,
181 url: String,
182 params: Option<&P>,
183 headers: Option<HashMap<String, String>>,
184 body: Option<Vec<u8>>,
185 timeout_secs: Option<u64>,
186 keys: Option<Vec<String>>,
187 ) -> Result<HttpResponse, HttpClientError> {
188 let keys = keys.map(into_ustr_vec);
189 let rate_limiter = self.rate_limiter.clone();
190 rate_limiter.await_keys_ready(keys.as_deref()).await;
191
192 self.client
193 .send_request_with_query(method, url, params, headers, body, timeout_secs)
194 .await
195 }
196
197 #[allow(clippy::too_many_arguments)]
203 pub async fn request_with_ustr_keys(
204 &self,
205 method: Method,
206 url: String,
207 params: Option<&HashMap<String, Vec<String>>>,
208 headers: Option<HashMap<String, String>>,
209 body: Option<Vec<u8>>,
210 timeout_secs: Option<u64>,
211 keys: Option<Vec<Ustr>>,
212 ) -> Result<HttpResponse, HttpClientError> {
213 let rate_limiter = self.rate_limiter.clone();
214 rate_limiter.await_keys_ready(keys.as_deref()).await;
215
216 self.client
217 .send_request(method, url, params, headers, body, timeout_secs)
218 .await
219 }
220
221 pub async fn get(
227 &self,
228 url: String,
229 params: Option<&HashMap<String, Vec<String>>>,
230 headers: Option<HashMap<String, String>>,
231 timeout_secs: Option<u64>,
232 keys: Option<Vec<String>>,
233 ) -> Result<HttpResponse, HttpClientError> {
234 self.request(Method::GET, url, params, headers, None, timeout_secs, keys)
235 .await
236 }
237
238 pub async fn post(
244 &self,
245 url: String,
246 params: Option<&HashMap<String, Vec<String>>>,
247 headers: Option<HashMap<String, String>>,
248 body: Option<Vec<u8>>,
249 timeout_secs: Option<u64>,
250 keys: Option<Vec<String>>,
251 ) -> Result<HttpResponse, HttpClientError> {
252 self.request(Method::POST, url, params, headers, body, timeout_secs, keys)
253 .await
254 }
255
256 pub async fn patch(
262 &self,
263 url: String,
264 params: Option<&HashMap<String, Vec<String>>>,
265 headers: Option<HashMap<String, String>>,
266 body: Option<Vec<u8>>,
267 timeout_secs: Option<u64>,
268 keys: Option<Vec<String>>,
269 ) -> Result<HttpResponse, HttpClientError> {
270 self.request(
271 Method::PATCH,
272 url,
273 params,
274 headers,
275 body,
276 timeout_secs,
277 keys,
278 )
279 .await
280 }
281
282 pub async fn delete(
288 &self,
289 url: String,
290 params: Option<&HashMap<String, Vec<String>>>,
291 headers: Option<HashMap<String, String>>,
292 timeout_secs: Option<u64>,
293 keys: Option<Vec<String>>,
294 ) -> Result<HttpResponse, HttpClientError> {
295 self.request(
296 Method::DELETE,
297 url,
298 params,
299 headers,
300 None,
301 timeout_secs,
302 keys,
303 )
304 .await
305 }
306}
307
308#[derive(Clone, Debug)]
317pub struct InnerHttpClient {
318 pub(crate) client: reqwest::Client,
319 pub(crate) header_keys: Arc<Vec<String>>,
320 pub(crate) header_names: Arc<Vec<HeaderName>>,
321}
322
323impl InnerHttpClient {
324 pub async fn send_request(
330 &self,
331 method: Method,
332 url: String,
333 params: Option<&HashMap<String, Vec<String>>>,
334 headers: Option<HashMap<String, String>>,
335 body: Option<Vec<u8>>,
336 timeout_secs: Option<u64>,
337 ) -> Result<HttpResponse, HttpClientError> {
338 let full_url = encode_url_params(&url, params)?;
339 self.send_request_internal(
340 method,
341 full_url.as_ref(),
342 None::<&()>,
343 headers,
344 body,
345 timeout_secs,
346 )
347 .await
348 }
349
350 pub async fn send_request_with_query<Q: serde::Serialize>(
359 &self,
360 method: Method,
361 url: String,
362 query: Option<&Q>,
363 headers: Option<HashMap<String, String>>,
364 body: Option<Vec<u8>>,
365 timeout_secs: Option<u64>,
366 ) -> Result<HttpResponse, HttpClientError> {
367 self.send_request_internal(method, &url, query, headers, body, timeout_secs)
368 .await
369 }
370
371 async fn send_request_internal<Q: serde::Serialize>(
377 &self,
378 method: Method,
379 url: &str,
380 query: Option<&Q>,
381 headers: Option<HashMap<String, String>>,
382 body: Option<Vec<u8>>,
383 timeout_secs: Option<u64>,
384 ) -> Result<HttpResponse, HttpClientError> {
385 let reqwest_url =
386 Url::parse(url).map_err(|e| HttpClientError::from(format!("URL parse error: {e}")))?;
387
388 let mut request_builder = self.client.request(method, reqwest_url);
389
390 if let Some(headers) = headers {
391 let mut header_map = HeaderMap::with_capacity(headers.len());
392 for (header_key, header_value) in &headers {
393 let key = HeaderName::from_bytes(header_key.as_bytes())
394 .map_err(|e| HttpClientError::from(format!("Invalid header name: {e}")))?;
395
396 if let Some(old_value) = header_map.insert(
397 key.clone(),
398 header_value
399 .parse()
400 .map_err(|e| HttpClientError::from(format!("Invalid header value: {e}")))?,
401 ) {
402 log::trace!("Replaced header '{key}': old={old_value:?}, new={header_value}");
403 }
404 }
405 request_builder = request_builder.headers(header_map);
406 }
407
408 if let Some(q) = query {
409 request_builder = request_builder.query(q);
410 }
411
412 if let Some(timeout_secs) = timeout_secs {
413 request_builder = request_builder.timeout(Duration::new(timeout_secs, 0));
414 }
415
416 let request = match body {
417 Some(b) => request_builder
418 .body(b)
419 .build()
420 .map_err(HttpClientError::from)?,
421 None => request_builder.build().map_err(HttpClientError::from)?,
422 };
423
424 log::trace!("{} {}", request.method(), request.url());
425
426 let response = self
427 .client
428 .execute(request)
429 .await
430 .map_err(HttpClientError::from)?;
431
432 self.to_response(response).await
433 }
434
435 pub async fn to_response(&self, response: Response) -> Result<HttpResponse, HttpClientError> {
443 log::trace!("{response:?}");
444
445 let resp_headers = response.headers();
446 let mut headers =
447 HashMap::with_capacity(std::cmp::min(self.header_names.len(), resp_headers.len()));
448
449 for (name, key_str) in self.header_names.iter().zip(self.header_keys.iter()) {
450 if let Some(val) = resp_headers.get(name)
451 && let Ok(v) = val.to_str()
452 {
453 headers.insert(key_str.clone(), v.to_owned());
454 }
455 }
456
457 let status = HttpStatus::new(response.status());
458 let body = response.bytes().await.map_err(HttpClientError::from)?;
459
460 Ok(HttpResponse {
461 status,
462 headers,
463 body,
464 })
465 }
466}
467
468impl Default for InnerHttpClient {
469 fn default() -> Self {
473 install_cryptographic_provider();
474 let client = reqwest::Client::new();
475 Self {
476 client,
477 header_keys: Arc::default(),
478 header_names: Arc::default(),
479 }
480 }
481}
482
483fn encode_url_params<'a>(
489 url: &'a str,
490 params: Option<&HashMap<String, Vec<String>>>,
491) -> Result<Cow<'a, str>, HttpClientError> {
492 let Some(params) = params else {
493 return Ok(Cow::Borrowed(url));
494 };
495
496 let pairs: Vec<(&str, &str)> = params
497 .iter()
498 .flat_map(|(key, values)| {
499 values
500 .iter()
501 .map(move |value| (key.as_str(), value.as_str()))
502 })
503 .collect();
504
505 if pairs.is_empty() {
506 return Ok(Cow::Borrowed(url));
507 }
508
509 let query_string = serde_urlencoded::to_string(pairs)
510 .map_err(|e| HttpClientError::Error(format!("Failed to encode params: {e}")))?;
511
512 let separator = if url.contains('?') { '&' } else { '?' };
513 Ok(Cow::Owned(format!("{url}{separator}{query_string}")))
514}
515
516#[cfg(test)]
517#[cfg(target_os = "linux")] mod tests {
519 use std::net::SocketAddr;
520
521 use axum::{
522 Router,
523 routing::{delete, get, patch, post},
524 serve,
525 };
526 use http::status::StatusCode;
527 use rstest::rstest;
528
529 use super::*;
530
531 fn create_router() -> Router {
532 Router::new()
533 .route("/get", get(|| async { "hello-world!" }))
534 .route("/post", post(|| async { StatusCode::OK }))
535 .route("/patch", patch(|| async { StatusCode::OK }))
536 .route("/delete", delete(|| async { StatusCode::OK }))
537 .route("/notfound", get(|| async { StatusCode::NOT_FOUND }))
538 .route(
539 "/slow",
540 get(|| async {
541 tokio::time::sleep(Duration::from_secs(2)).await;
542 "Eventually responded"
543 }),
544 )
545 }
546
547 async fn start_test_server() -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
548 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
549 let addr = listener.local_addr().unwrap();
550
551 tokio::spawn(async move {
552 serve(listener, create_router()).await.unwrap();
553 });
554
555 Ok(addr)
556 }
557
558 #[tokio::test]
559 async fn test_get() {
560 let addr = start_test_server().await.unwrap();
561 let url = format!("http://{addr}");
562
563 let client = InnerHttpClient::default();
564 let response = client
565 .send_request(
566 reqwest::Method::GET,
567 format!("{url}/get"),
568 None,
569 None,
570 None,
571 None,
572 )
573 .await
574 .unwrap();
575
576 assert!(response.status.is_success());
577 assert_eq!(String::from_utf8_lossy(&response.body), "hello-world!");
578 }
579
580 #[tokio::test]
581 async fn test_post() {
582 let addr = start_test_server().await.unwrap();
583 let url = format!("http://{addr}");
584
585 let client = InnerHttpClient::default();
586 let response = client
587 .send_request(
588 reqwest::Method::POST,
589 format!("{url}/post"),
590 None,
591 None,
592 None,
593 None,
594 )
595 .await
596 .unwrap();
597
598 assert!(response.status.is_success());
599 }
600
601 #[tokio::test]
602 async fn test_post_with_body() {
603 let addr = start_test_server().await.unwrap();
604 let url = format!("http://{addr}");
605
606 let client = InnerHttpClient::default();
607
608 let mut body = HashMap::new();
609 body.insert(
610 "key1".to_string(),
611 serde_json::Value::String("value1".to_string()),
612 );
613 body.insert(
614 "key2".to_string(),
615 serde_json::Value::String("value2".to_string()),
616 );
617
618 let body_string = serde_json::to_string(&body).unwrap();
619 let body_bytes = body_string.into_bytes();
620
621 let response = client
622 .send_request(
623 reqwest::Method::POST,
624 format!("{url}/post"),
625 None,
626 None,
627 Some(body_bytes),
628 None,
629 )
630 .await
631 .unwrap();
632
633 assert!(response.status.is_success());
634 }
635
636 #[tokio::test]
637 async fn test_patch() {
638 let addr = start_test_server().await.unwrap();
639 let url = format!("http://{addr}");
640
641 let client = InnerHttpClient::default();
642 let response = client
643 .send_request(
644 reqwest::Method::PATCH,
645 format!("{url}/patch"),
646 None,
647 None,
648 None,
649 None,
650 )
651 .await
652 .unwrap();
653
654 assert!(response.status.is_success());
655 }
656
657 #[tokio::test]
658 async fn test_delete() {
659 let addr = start_test_server().await.unwrap();
660 let url = format!("http://{addr}");
661
662 let client = InnerHttpClient::default();
663 let response = client
664 .send_request(
665 reqwest::Method::DELETE,
666 format!("{url}/delete"),
667 None,
668 None,
669 None,
670 None,
671 )
672 .await
673 .unwrap();
674
675 assert!(response.status.is_success());
676 }
677
678 #[tokio::test]
679 async fn test_not_found() {
680 let addr = start_test_server().await.unwrap();
681 let url = format!("http://{addr}/notfound");
682 let client = InnerHttpClient::default();
683
684 let response = client
685 .send_request(reqwest::Method::GET, url, None, None, None, None)
686 .await
687 .unwrap();
688
689 assert!(response.status.is_client_error());
690 assert_eq!(response.status.as_u16(), 404);
691 }
692
693 #[tokio::test]
694 async fn test_timeout() {
695 let addr = start_test_server().await.unwrap();
696 let url = format!("http://{addr}/slow");
697 let client = InnerHttpClient::default();
698
699 let result = client
701 .send_request(reqwest::Method::GET, url, None, None, None, Some(1))
702 .await;
703
704 match result {
705 Err(HttpClientError::TimeoutError(msg)) => {
706 println!("Got expected timeout error: {msg}");
707 }
708 Err(e) => panic!("Expected a timeout error, was: {e:?}"),
709 Ok(resp) => panic!("Expected a timeout error, but was a successful response: {resp:?}"),
710 }
711 }
712
713 #[rstest]
714 fn test_http_client_without_proxy() {
715 let result = HttpClient::new(
717 HashMap::new(),
718 vec![],
719 vec![],
720 None,
721 None,
722 None, );
724
725 assert!(result.is_ok());
726 }
727
728 #[rstest]
729 fn test_http_client_with_valid_proxy() {
730 let result = HttpClient::new(
732 HashMap::new(),
733 vec![],
734 vec![],
735 None,
736 None,
737 Some("http://proxy.example.com:8080".to_string()),
738 );
739
740 assert!(result.is_ok());
741 }
742
743 #[rstest]
744 fn test_http_client_with_socks5_proxy() {
745 let result = HttpClient::new(
747 HashMap::new(),
748 vec![],
749 vec![],
750 None,
751 None,
752 Some("socks5://127.0.0.1:1080".to_string()),
753 );
754
755 assert!(result.is_ok());
756 }
757
758 #[rstest]
759 fn test_http_client_with_malformed_proxy() {
760 let result = HttpClient::new(
764 HashMap::new(),
765 vec![],
766 vec![],
767 None,
768 None,
769 Some("://invalid".to_string()),
770 );
771
772 assert!(result.is_err());
773 assert!(matches!(result, Err(HttpClientError::InvalidProxy(_))));
774 }
775
776 #[rstest]
777 fn test_http_client_with_empty_proxy_string() {
778 let result = HttpClient::new(
780 HashMap::new(),
781 vec![],
782 vec![],
783 None,
784 None,
785 Some(String::new()),
786 );
787
788 assert!(result.is_err());
789 assert!(matches!(result, Err(HttpClientError::InvalidProxy(_))));
790 }
791
792 #[tokio::test]
793 async fn test_http_client_get() {
794 let addr = start_test_server().await.unwrap();
795 let url = format!("http://{addr}/get");
796
797 let client = HttpClient::new(HashMap::new(), vec![], vec![], None, None, None).unwrap();
798 let response = client.get(url, None, None, None, None).await.unwrap();
799
800 assert!(response.status.is_success());
801 assert_eq!(String::from_utf8_lossy(&response.body), "hello-world!");
802 }
803
804 #[tokio::test]
805 async fn test_http_client_post() {
806 let addr = start_test_server().await.unwrap();
807 let url = format!("http://{addr}/post");
808
809 let client = HttpClient::new(HashMap::new(), vec![], vec![], None, None, None).unwrap();
810 let response = client
811 .post(url, None, None, None, None, None)
812 .await
813 .unwrap();
814
815 assert!(response.status.is_success());
816 }
817
818 #[tokio::test]
819 async fn test_http_client_patch() {
820 let addr = start_test_server().await.unwrap();
821 let url = format!("http://{addr}/patch");
822
823 let client = HttpClient::new(HashMap::new(), vec![], vec![], None, None, None).unwrap();
824 let response = client
825 .patch(url, None, None, None, None, None)
826 .await
827 .unwrap();
828
829 assert!(response.status.is_success());
830 }
831
832 #[tokio::test]
833 async fn test_http_client_delete() {
834 let addr = start_test_server().await.unwrap();
835 let url = format!("http://{addr}/delete");
836
837 let client = HttpClient::new(HashMap::new(), vec![], vec![], None, None, None).unwrap();
838 let response = client.delete(url, None, None, None, None).await.unwrap();
839
840 assert!(response.status.is_success());
841 }
842}