axum_request_send/
impl_isahc.rs

1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2
3use axum::{
4    body::{Body as AxumBody, StreamBody as AxumStreamBody},
5    http::Request as HttpRequest,
6    response::Response as AxumResponse,
7};
8use futures_util::TryStreamExt as _;
9use isahc::{AsyncBody as IsahcAsyncBody, Error as IsahcError, HttpClient};
10
11//
12pub async fn send(
13    client: &HttpClient,
14    http_request: HttpRequest<AxumBody>,
15) -> Result<AxumResponse, IsahcError> {
16    let isahc_request = {
17        let (parts, body) = http_request.into_parts();
18        let body = body.map_ok(|x| x.to_vec()).map_err(|err| {
19            // Ref https://docs.rs/hyper/0.14.25/src/hyper/error.rs.html#301-313
20            if let Some(cause) = err.into_cause() {
21                IoError::new(IoErrorKind::Other, cause)
22            } else {
23                IoError::new(IoErrorKind::Other, "Unknown".to_string())
24            }
25        });
26        let body = IsahcAsyncBody::from_reader(body.into_async_read());
27        HttpRequest::from_parts(parts, body)
28    };
29    let isahc_response = client.send_async(isahc_request).await?;
30    let http_response = {
31        let mut response = AxumResponse::new(());
32        *response.status_mut() = isahc_response.status();
33        *response.version_mut() = isahc_response.version();
34        *response.headers_mut() = isahc_response.headers().to_owned();
35
36        let body_stream = futures_stream_reader::reader(isahc_response.into_body());
37
38        let body = AxumStreamBody::new(body_stream);
39
40        let (parts, _) = response.into_parts();
41        AxumResponse::from_parts(parts, axum::body::boxed(body))
42    };
43    Ok(http_response)
44}
45
46#[cfg(test)]
47mod tests {
48    use super::*;
49
50    use std::net::SocketAddr;
51
52    use axum::{routing::get, Router, Server};
53    use isahc::AsyncReadResponseExt as _;
54
55    #[tokio::test]
56    async fn test_send() -> Result<(), Box<dyn std::error::Error>> {
57        //
58        let backend_listen_addr = SocketAddr::from((
59            [127, 0, 0, 1],
60            portpicker::pick_unused_port().expect("No ports free"),
61        ));
62        let server_listen_addr = SocketAddr::from((
63            [127, 0, 0, 1],
64            portpicker::pick_unused_port().expect("No ports free"),
65        ));
66
67        //
68        let backend_task = tokio::task::spawn(async move {
69            let app = Router::new().route("/", get(|| async { "backend" }));
70
71            let server = Server::bind(&backend_listen_addr).serve(app.into_make_service());
72
73            server.await.expect("backend start failed");
74        });
75
76        //
77        let server_task = tokio::task::spawn(async move {
78            use axum::{body::Body, http::Request};
79
80            let app = Router::new().route(
81                "/",
82                get(move |mut request: Request<Body>| async move {
83                    *request.uri_mut() = format!("http://{}{}", backend_listen_addr, "/")
84                        .parse()
85                        .unwrap();
86                    let client = isahc::HttpClient::new().unwrap();
87                    send(&client, request).await.unwrap()
88                }),
89            );
90
91            let server = Server::bind(&server_listen_addr).serve(app.into_make_service());
92
93            server.await.expect("server start failed");
94        });
95
96        //
97        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
98
99        //
100        let mut resp = isahc::get_async(format!("http://{}{}", server_listen_addr, "/")).await?;
101        assert!(resp.status().is_success());
102        assert_eq!(resp.bytes().await.unwrap(), b"backend");
103
104        //
105        server_task.abort();
106        assert!(server_task.await.unwrap_err().is_cancelled());
107
108        backend_task.abort();
109        assert!(backend_task.await.unwrap_err().is_cancelled());
110
111        Ok(())
112    }
113}