axum_request_send/
impl_isahc.rs1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2
3use axum::{
4 body::{Body as AxumBody, StreamBody as AxumStreamBody},
5 http::Request as HttpRequest,
6 response::Response as AxumResponse,
7};
8use futures_util::TryStreamExt as _;
9use isahc::{AsyncBody as IsahcAsyncBody, Error as IsahcError, HttpClient};
10
11pub async fn send(
13 client: &HttpClient,
14 http_request: HttpRequest<AxumBody>,
15) -> Result<AxumResponse, IsahcError> {
16 let isahc_request = {
17 let (parts, body) = http_request.into_parts();
18 let body = body.map_ok(|x| x.to_vec()).map_err(|err| {
19 if let Some(cause) = err.into_cause() {
21 IoError::new(IoErrorKind::Other, cause)
22 } else {
23 IoError::new(IoErrorKind::Other, "Unknown".to_string())
24 }
25 });
26 let body = IsahcAsyncBody::from_reader(body.into_async_read());
27 HttpRequest::from_parts(parts, body)
28 };
29 let isahc_response = client.send_async(isahc_request).await?;
30 let http_response = {
31 let mut response = AxumResponse::new(());
32 *response.status_mut() = isahc_response.status();
33 *response.version_mut() = isahc_response.version();
34 *response.headers_mut() = isahc_response.headers().to_owned();
35
36 let body_stream = futures_stream_reader::reader(isahc_response.into_body());
37
38 let body = AxumStreamBody::new(body_stream);
39
40 let (parts, _) = response.into_parts();
41 AxumResponse::from_parts(parts, axum::body::boxed(body))
42 };
43 Ok(http_response)
44}
45
46#[cfg(test)]
47mod tests {
48 use super::*;
49
50 use std::net::SocketAddr;
51
52 use axum::{routing::get, Router, Server};
53 use isahc::AsyncReadResponseExt as _;
54
55 #[tokio::test]
56 async fn test_send() -> Result<(), Box<dyn std::error::Error>> {
57 let backend_listen_addr = SocketAddr::from((
59 [127, 0, 0, 1],
60 portpicker::pick_unused_port().expect("No ports free"),
61 ));
62 let server_listen_addr = SocketAddr::from((
63 [127, 0, 0, 1],
64 portpicker::pick_unused_port().expect("No ports free"),
65 ));
66
67 let backend_task = tokio::task::spawn(async move {
69 let app = Router::new().route("/", get(|| async { "backend" }));
70
71 let server = Server::bind(&backend_listen_addr).serve(app.into_make_service());
72
73 server.await.expect("backend start failed");
74 });
75
76 let server_task = tokio::task::spawn(async move {
78 use axum::{body::Body, http::Request};
79
80 let app = Router::new().route(
81 "/",
82 get(move |mut request: Request<Body>| async move {
83 *request.uri_mut() = format!("http://{}{}", backend_listen_addr, "/")
84 .parse()
85 .unwrap();
86 let client = isahc::HttpClient::new().unwrap();
87 send(&client, request).await.unwrap()
88 }),
89 );
90
91 let server = Server::bind(&server_listen_addr).serve(app.into_make_service());
92
93 server.await.expect("server start failed");
94 });
95
96 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
98
99 let mut resp = isahc::get_async(format!("http://{}{}", server_listen_addr, "/")).await?;
101 assert!(resp.status().is_success());
102 assert_eq!(resp.bytes().await.unwrap(), b"backend");
103
104 server_task.abort();
106 assert!(server_task.await.unwrap_err().is_cancelled());
107
108 backend_task.abort();
109 assert!(backend_task.await.unwrap_err().is_cancelled());
110
111 Ok(())
112 }
113}