use std::{
collections::{HashMap, hash_map::DefaultHasher},
fs::File,
hash::{Hash, Hasher},
io::copy,
path::Path,
time::Duration,
};
use bytes::Bytes;
use nautilus_core::{
collections::into_ustr_vec,
python::{to_pyruntime_err, to_pytype_err, to_pyvalue_err},
};
use pyo3::{create_exception, exceptions::PyException, prelude::*, types::PyDict};
use reqwest::blocking::Client;
use crate::{
http::{HttpClient, HttpClientError, HttpMethod, HttpResponse, HttpStatus},
ratelimiter::quota::Quota,
};
create_exception!(network, HttpError, PyException);
create_exception!(network, HttpTimeoutError, PyException);
create_exception!(network, HttpInvalidProxyError, PyException);
create_exception!(network, HttpClientBuildError, PyException);
impl HttpClientError {
#[must_use]
pub fn into_py_err(self) -> PyErr {
match self {
Self::Error(e) => PyErr::new::<HttpError, _>(e),
Self::TimeoutError(e) => PyErr::new::<HttpTimeoutError, _>(e),
Self::InvalidProxy(e) => PyErr::new::<HttpInvalidProxyError, _>(e),
Self::ClientBuildError(e) => PyErr::new::<HttpClientBuildError, _>(e),
}
}
}
#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl HttpMethod {
fn __hash__(&self) -> isize {
let mut h = DefaultHasher::new();
self.hash(&mut h);
h.finish() as isize
}
}
#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl HttpResponse {
#[new]
pub fn py_new(status: u16, body: Vec<u8>) -> PyResult<Self> {
Ok(Self {
status: HttpStatus::try_from(status).map_err(to_pyvalue_err)?,
headers: HashMap::new(),
body: Bytes::from(body),
})
}
#[getter]
#[pyo3(name = "status")]
pub const fn py_status(&self) -> u16 {
self.status.as_u16()
}
#[getter]
#[pyo3(name = "headers")]
pub fn py_headers(&self) -> HashMap<String, String> {
self.headers.clone()
}
#[getter]
#[pyo3(name = "body")]
#[gen_stub(override_return_type(type_repr = "bytes"))]
pub fn py_body(&self) -> &[u8] {
self.body.as_ref()
}
}
#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl HttpClient {
#[new]
#[pyo3(signature = (default_headers=HashMap::new(), header_keys=Vec::new(), keyed_quotas=Vec::new(), default_quota=None, timeout_secs=None, proxy_url=None))]
pub fn py_new(
default_headers: HashMap<String, String>,
header_keys: Vec<String>,
keyed_quotas: Vec<(String, Quota)>,
default_quota: Option<Quota>,
timeout_secs: Option<u64>,
proxy_url: Option<String>,
) -> PyResult<Self> {
Self::new(
default_headers,
header_keys,
keyed_quotas,
default_quota,
timeout_secs,
proxy_url,
)
.map_err(HttpClientError::into_py_err)
}
#[allow(clippy::too_many_arguments)]
#[pyo3(name = "request")]
#[pyo3(signature = (method, url, params=None, headers=None, body=None, keys=None, timeout_secs=None))]
fn py_request<'py>(
&self,
method: HttpMethod,
url: String,
params: Option<&Bound<'_, PyAny>>,
headers: Option<HashMap<String, String>>,
body: Option<Vec<u8>>,
keys: Option<Vec<String>>,
timeout_secs: Option<u64>,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.client.clone();
let rate_limiter = self.rate_limiter.clone();
let params = params_to_hashmap(params)?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let keys = keys.map(into_ustr_vec);
rate_limiter.await_keys_ready(keys.as_deref()).await;
client
.send_request(
method.into(),
url,
params.as_ref(),
headers,
body,
timeout_secs,
)
.await
.map_err(HttpClientError::into_py_err)
})
}
#[pyo3(name = "get")]
#[pyo3(signature = (url, params=None, headers=None, keys=None, timeout_secs=None))]
fn py_get<'py>(
&self,
url: String,
params: Option<&Bound<'_, PyAny>>,
headers: Option<HashMap<String, String>>,
keys: Option<Vec<String>>,
timeout_secs: Option<u64>,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.clone();
let params = params_to_hashmap(params)?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.get(url, params.as_ref(), headers, timeout_secs, keys)
.await
.map_err(HttpClientError::into_py_err)
})
}
#[allow(clippy::too_many_arguments)]
#[pyo3(name = "post")]
#[pyo3(signature = (url, params=None, headers=None, body=None, keys=None, timeout_secs=None))]
fn py_post<'py>(
&self,
url: String,
params: Option<&Bound<'_, PyAny>>,
headers: Option<HashMap<String, String>>,
body: Option<Vec<u8>>,
keys: Option<Vec<String>>,
timeout_secs: Option<u64>,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.clone();
let params = params_to_hashmap(params)?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.post(url, params.as_ref(), headers, body, timeout_secs, keys)
.await
.map_err(HttpClientError::into_py_err)
})
}
#[allow(clippy::too_many_arguments)]
#[pyo3(name = "patch")]
#[pyo3(signature = (url, params=None, headers=None, body=None, keys=None, timeout_secs=None))]
fn py_patch<'py>(
&self,
url: String,
params: Option<&Bound<'_, PyAny>>,
headers: Option<HashMap<String, String>>,
body: Option<Vec<u8>>,
keys: Option<Vec<String>>,
timeout_secs: Option<u64>,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.clone();
let params = params_to_hashmap(params)?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.patch(url, params.as_ref(), headers, body, timeout_secs, keys)
.await
.map_err(HttpClientError::into_py_err)
})
}
#[pyo3(name = "delete")]
#[pyo3(signature = (url, params=None, headers=None, keys=None, timeout_secs=None))]
fn py_delete<'py>(
&self,
url: String,
params: Option<&Bound<'_, PyAny>>,
headers: Option<HashMap<String, String>>,
keys: Option<Vec<String>>,
timeout_secs: Option<u64>,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.clone();
let params = params_to_hashmap(params)?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.delete(url, params.as_ref(), headers, timeout_secs, keys)
.await
.map_err(HttpClientError::into_py_err)
})
}
}
fn params_to_hashmap(
params: Option<&Bound<'_, PyAny>>,
) -> PyResult<Option<HashMap<String, Vec<String>>>> {
let Some(params) = params else {
return Ok(None);
};
let Ok(dict) = params.cast::<PyDict>() else {
return Err(to_pytype_err("params must be a dict"));
};
let mut result = HashMap::new();
for (key, value) in dict {
let key_str = key.str()?.to_str()?.to_string();
if let Ok(seq) = value.cast::<pyo3::types::PySequence>() {
if !value.is_instance_of::<pyo3::types::PyString>() {
let values: Vec<String> = (0..seq.len()?)
.map(|i| {
let item = seq.get_item(i)?;
Ok(item.str()?.to_str()?.to_string())
})
.collect::<PyResult<_>>()?;
result.insert(key_str, values);
continue;
}
}
let value_str = value.str()?.to_str()?.to_string();
result.insert(key_str, vec![value_str]);
}
Ok(Some(result))
}
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.network")]
#[pyo3(signature = (url, params=None, headers=None, timeout_secs=None))]
pub fn http_get(
_py: Python<'_>,
url: String,
params: Option<&Bound<'_, PyAny>>,
headers: Option<HashMap<String, String>>,
timeout_secs: Option<u64>,
) -> PyResult<HttpResponse> {
let params_map = params_to_hashmap(params)?;
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create runtime");
runtime.block_on(async {
let client = HttpClient::new(HashMap::new(), vec![], vec![], None, timeout_secs, None)
.map_err(HttpClientError::into_py_err)?;
client
.get(url, params_map.as_ref(), headers, timeout_secs, None)
.await
.map_err(HttpClientError::into_py_err)
})
})
.join()
.expect("Thread panicked")
}
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.network")]
#[pyo3(signature = (url, params=None, headers=None, body=None, timeout_secs=None))]
pub fn http_post(
_py: Python<'_>,
url: String,
params: Option<&Bound<'_, PyAny>>,
headers: Option<HashMap<String, String>>,
body: Option<Vec<u8>>,
timeout_secs: Option<u64>,
) -> PyResult<HttpResponse> {
let params_map = params_to_hashmap(params)?;
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create runtime");
runtime.block_on(async {
let client = HttpClient::new(HashMap::new(), vec![], vec![], None, timeout_secs, None)
.map_err(HttpClientError::into_py_err)?;
client
.post(url, params_map.as_ref(), headers, body, timeout_secs, None)
.await
.map_err(HttpClientError::into_py_err)
})
})
.join()
.expect("Thread panicked")
}
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.network")]
#[pyo3(signature = (url, params=None, headers=None, body=None, timeout_secs=None))]
pub fn http_patch(
_py: Python<'_>,
url: String,
params: Option<&Bound<'_, PyAny>>,
headers: Option<HashMap<String, String>>,
body: Option<Vec<u8>>,
timeout_secs: Option<u64>,
) -> PyResult<HttpResponse> {
let params_map = params_to_hashmap(params)?;
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create runtime");
runtime.block_on(async {
let client = HttpClient::new(HashMap::new(), vec![], vec![], None, timeout_secs, None)
.map_err(HttpClientError::into_py_err)?;
client
.patch(url, params_map.as_ref(), headers, body, timeout_secs, None)
.await
.map_err(HttpClientError::into_py_err)
})
})
.join()
.expect("Thread panicked")
}
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.network")]
#[pyo3(signature = (url, params=None, headers=None, timeout_secs=None))]
pub fn http_delete(
_py: Python<'_>,
url: String,
params: Option<&Bound<'_, PyAny>>,
headers: Option<HashMap<String, String>>,
timeout_secs: Option<u64>,
) -> PyResult<HttpResponse> {
let params_map = params_to_hashmap(params)?;
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create runtime");
runtime.block_on(async {
let client = HttpClient::new(HashMap::new(), vec![], vec![], None, timeout_secs, None)
.map_err(HttpClientError::into_py_err)?;
client
.delete(url, params_map.as_ref(), headers, timeout_secs, None)
.await
.map_err(HttpClientError::into_py_err)
})
})
.join()
.expect("Thread panicked")
}
#[pyfunction]
#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.network")]
#[pyo3(signature = (url, filepath, params=None, headers=None, timeout_secs=None))]
pub fn http_download(
_py: Python<'_>,
url: String,
filepath: &str,
params: Option<&Bound<'_, PyAny>>,
headers: Option<HashMap<String, String>>,
timeout_secs: Option<u64>,
) -> PyResult<()> {
let params_map = params_to_hashmap(params)?;
let full_url = if let Some(ref params) = params_map {
let pairs: Vec<(String, String)> = params
.iter()
.flat_map(|(key, values)| values.iter().map(move |value| (key.clone(), value.clone())))
.collect();
if pairs.is_empty() {
url
} else {
let query_string = serde_urlencoded::to_string(pairs).map_err(to_pyvalue_err)?;
let separator = if url.contains('?') { '&' } else { '?' };
format!("{url}{separator}{query_string}")
}
} else {
url
};
let filepath = Path::new(filepath);
if let Some(parent) = filepath.parent() {
std::fs::create_dir_all(parent).map_err(to_pyvalue_err)?;
}
let mut client_builder = Client::builder();
if let Some(timeout) = timeout_secs {
client_builder = client_builder.timeout(Duration::from_secs(timeout));
}
let client = client_builder.build().map_err(to_pyvalue_err)?;
let mut request_builder = client.get(&full_url);
if let Some(headers_map) = headers {
for (key, value) in headers_map {
request_builder = request_builder.header(key, value);
}
}
let mut response = request_builder.send().map_err(to_pyvalue_err)?;
if !response.status().is_success() {
return Err(to_pyruntime_err(format!(
"HTTP error: {}",
response.status()
)));
}
let mut file = File::create(filepath).map_err(to_pyvalue_err)?;
copy(&mut response, &mut file).map_err(to_pyvalue_err)?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use axum::{Router, routing::get};
use pyo3::types::{PyDict, PyList, PyTuple};
use pyo3_async_runtimes::tokio::get_runtime;
use rstest::rstest;
use tokio::net::TcpListener;
use super::*;
#[rstest]
fn test_params_to_hashmap_none() {
pyo3::Python::initialize();
let result = Python::attach(|_py| params_to_hashmap(None)).unwrap();
assert!(result.is_none());
}
#[rstest]
fn test_params_to_hashmap_empty_dict() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let dict = PyDict::new(py);
params_to_hashmap(Some(dict.as_any()))
})
.unwrap();
assert!(result.is_some());
assert!(result.unwrap().is_empty());
}
#[rstest]
fn test_params_to_hashmap_single_string_value() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let dict = PyDict::new(py);
dict.set_item("key", "value").unwrap();
params_to_hashmap(Some(dict.as_any()))
})
.unwrap()
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result.get("key").unwrap(), &vec!["value"]);
}
#[rstest]
fn test_params_to_hashmap_multiple_string_values() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let dict = PyDict::new(py);
dict.set_item("foo", "bar").unwrap();
dict.set_item("limit", "100").unwrap();
dict.set_item("offset", "0").unwrap();
params_to_hashmap(Some(dict.as_any()))
})
.unwrap()
.unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result.get("foo").unwrap(), &vec!["bar"]);
assert_eq!(result.get("limit").unwrap(), &vec!["100"]);
assert_eq!(result.get("offset").unwrap(), &vec!["0"]);
}
#[rstest]
fn test_params_to_hashmap_int_value() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let dict = PyDict::new(py);
dict.set_item("limit", 100).unwrap();
params_to_hashmap(Some(dict.as_any()))
})
.unwrap()
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result.get("limit").unwrap(), &vec!["100"]);
}
#[rstest]
fn test_params_to_hashmap_float_value() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let dict = PyDict::new(py);
dict.set_item("price", 123.45).unwrap();
params_to_hashmap(Some(dict.as_any()))
})
.unwrap()
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result.get("price").unwrap(), &vec!["123.45"]);
}
#[rstest]
fn test_params_to_hashmap_bool_value() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let dict = PyDict::new(py);
dict.set_item("active", true).unwrap();
dict.set_item("closed", false).unwrap();
params_to_hashmap(Some(dict.as_any()))
})
.unwrap()
.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result.get("active").unwrap(), &vec!["True"]);
assert_eq!(result.get("closed").unwrap(), &vec!["False"]);
}
#[rstest]
fn test_params_to_hashmap_list_value() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let dict = PyDict::new(py);
let list = PyList::new(py, ["1", "2", "3"]).unwrap();
dict.set_item("id", list).unwrap();
params_to_hashmap(Some(dict.as_any()))
})
.unwrap()
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result.get("id").unwrap(), &vec!["1", "2", "3"]);
}
#[rstest]
fn test_params_to_hashmap_tuple_value() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let dict = PyDict::new(py);
let tuple = PyTuple::new(py, ["a", "b", "c"]).unwrap();
dict.set_item("letters", tuple).unwrap();
params_to_hashmap(Some(dict.as_any()))
})
.unwrap()
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result.get("letters").unwrap(), &vec!["a", "b", "c"]);
}
#[rstest]
fn test_params_to_hashmap_list_with_mixed_types() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let dict = PyDict::new(py);
let list = PyList::new(py, [1, 2, 3]).unwrap();
dict.set_item("nums", list).unwrap();
params_to_hashmap(Some(dict.as_any()))
})
.unwrap()
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result.get("nums").unwrap(), &vec!["1", "2", "3"]);
}
#[rstest]
fn test_params_to_hashmap_mixed_values() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let dict = PyDict::new(py);
dict.set_item("name", "test").unwrap();
dict.set_item("limit", 50).unwrap();
let ids = PyList::new(py, ["1", "2"]).unwrap();
dict.set_item("id", ids).unwrap();
params_to_hashmap(Some(dict.as_any()))
})
.unwrap()
.unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result.get("name").unwrap(), &vec!["test"]);
assert_eq!(result.get("limit").unwrap(), &vec!["50"]);
assert_eq!(result.get("id").unwrap(), &vec!["1", "2"]);
}
#[rstest]
fn test_params_to_hashmap_string_not_treated_as_sequence() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let dict = PyDict::new(py);
dict.set_item("text", "hello").unwrap();
params_to_hashmap(Some(dict.as_any()))
})
.unwrap()
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result.get("text").unwrap(), &vec!["hello"]);
}
#[rstest]
fn test_params_to_hashmap_invalid_non_dict() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let list = PyList::new(py, ["a", "b"]).unwrap();
params_to_hashmap(Some(list.as_any()))
});
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("params must be a dict"));
}
#[rstest]
fn test_params_to_hashmap_invalid_string_param() {
pyo3::Python::initialize();
let result = Python::attach(|py| {
let string = pyo3::types::PyString::new(py, "not a dict");
params_to_hashmap(Some(string.as_any()))
});
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("params must be a dict"));
}
async fn create_test_router() -> Router {
Router::new()
.route("/get", get(|| async { "hello-world!" }))
.route("/post", axum::routing::post(|| async { "posted" }))
.route("/patch", axum::routing::patch(|| async { "patched" }))
.route("/delete", axum::routing::delete(|| async { "deleted" }))
}
async fn start_test_server() -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
tokio::spawn(async move {
let app = create_test_router().await;
axum::serve(listener, app).await.unwrap();
});
Ok(addr)
}
#[rstest]
fn test_blocking_http_get() {
pyo3::Python::initialize();
let addr = get_runtime().block_on(async { start_test_server().await.unwrap() });
let url = format!("http://{addr}/get");
let response = Python::attach(|py| http_get(py, url, None, None, Some(10))).unwrap();
assert!(response.status.is_success());
assert_eq!(String::from_utf8_lossy(&response.body), "hello-world!");
}
#[rstest]
fn test_blocking_http_post() {
pyo3::Python::initialize();
let addr = get_runtime().block_on(async { start_test_server().await.unwrap() });
let url = format!("http://{addr}/post");
let response = Python::attach(|py| http_post(py, url, None, None, None, Some(10))).unwrap();
assert!(response.status.is_success());
assert_eq!(String::from_utf8_lossy(&response.body), "posted");
}
#[rstest]
fn test_blocking_http_patch() {
pyo3::Python::initialize();
let addr = get_runtime().block_on(async { start_test_server().await.unwrap() });
let url = format!("http://{addr}/patch");
let response =
Python::attach(|py| http_patch(py, url, None, None, None, Some(10))).unwrap();
assert!(response.status.is_success());
assert_eq!(String::from_utf8_lossy(&response.body), "patched");
}
#[rstest]
fn test_blocking_http_delete() {
pyo3::Python::initialize();
let addr = get_runtime().block_on(async { start_test_server().await.unwrap() });
let url = format!("http://{addr}/delete");
let response = Python::attach(|py| http_delete(py, url, None, None, Some(10))).unwrap();
assert!(response.status.is_success());
assert_eq!(String::from_utf8_lossy(&response.body), "deleted");
}
#[rstest]
fn test_blocking_http_download() {
pyo3::Python::initialize();
let addr = get_runtime().block_on(async { start_test_server().await.unwrap() });
let url = format!("http://{addr}/get");
let temp_dir = std::env::temp_dir();
let filepath = temp_dir.join("test_download.txt");
Python::attach(|py| {
http_download(py, url, filepath.to_str().unwrap(), None, None, Some(10)).unwrap();
});
assert!(filepath.exists());
let content = std::fs::read_to_string(&filepath).unwrap();
assert_eq!(content, "hello-world!");
std::fs::remove_file(&filepath).ok();
}
}