http_client/
isahc.rs

1//! http-client implementation for isahc
2
3use std::convert::TryFrom;
4
5use async_std::io::BufReader;
6use isahc::config::Configurable;
7use isahc::{http, ResponseExt};
8
9use crate::Config;
10
11use super::{async_trait, Body, Error, HttpClient, Request, Response};
12
13/// Curl-based HTTP Client.
14#[derive(Debug)]
15pub struct IsahcClient {
16    client: isahc::HttpClient,
17    config: Config,
18}
19
20impl Default for IsahcClient {
21    fn default() -> Self {
22        Self::new()
23    }
24}
25
26impl IsahcClient {
27    /// Create a new instance.
28    pub fn new() -> Self {
29        Self::from_client(isahc::HttpClient::new().unwrap())
30    }
31
32    /// Create from externally initialized and configured client.
33    pub fn from_client(client: isahc::HttpClient) -> Self {
34        Self {
35            client,
36            config: Config::default(),
37        }
38    }
39}
40
41#[async_trait]
42impl HttpClient for IsahcClient {
43    async fn send(&self, mut req: Request) -> Result<Response, Error> {
44        let mut builder = http::Request::builder()
45            .uri(req.url().as_str())
46            .method(http::Method::from_bytes(req.method().to_string().as_bytes()).unwrap());
47
48        for (name, value) in req.iter() {
49            builder = builder.header(name.as_str(), value.as_str());
50        }
51
52        let body = req.take_body();
53        let body = match body.len() {
54            Some(len) => isahc::Body::from_reader_sized(body, len as u64),
55            None => isahc::Body::from_reader(body),
56        };
57
58        let request = builder.body(body).unwrap();
59        let res = self.client.send_async(request).await.map_err(Error::from)?;
60        let maybe_metrics = res.metrics().cloned();
61        let (parts, body) = res.into_parts();
62        let body = Body::from_reader(BufReader::new(body), None);
63        let mut response = http_types::Response::new(parts.status.as_u16());
64        for (name, value) in &parts.headers {
65            response.append_header(name.as_str(), value.to_str().unwrap());
66        }
67
68        if let Some(metrics) = maybe_metrics {
69            response.ext_mut().insert(metrics);
70        }
71
72        response.set_body(body);
73        Ok(response)
74    }
75
76    /// Override the existing configuration with new configuration.
77    ///
78    /// Config options may not impact existing connections.
79    fn set_config(&mut self, config: Config) -> http_types::Result<()> {
80        let mut builder =
81            isahc::HttpClient::builder().max_connections_per_host(config.max_connections_per_host);
82
83        if !config.http_keep_alive {
84            builder = builder.connection_cache_size(0);
85        }
86        if config.tcp_no_delay {
87            builder = builder.tcp_nodelay();
88        }
89        if let Some(timeout) = config.timeout {
90            builder = builder.timeout(timeout);
91        }
92
93        self.client = builder.build()?;
94        self.config = config;
95
96        Ok(())
97    }
98
99    /// Get the current configuration.
100    fn config(&self) -> &Config {
101        &self.config
102    }
103}
104
105impl TryFrom<Config> for IsahcClient {
106    type Error = isahc::Error;
107
108    fn try_from(config: Config) -> Result<Self, Self::Error> {
109        let mut builder = isahc::HttpClient::builder();
110
111        if !config.http_keep_alive {
112            builder = builder.connection_cache_size(0);
113        }
114        if config.tcp_no_delay {
115            builder = builder.tcp_nodelay();
116        }
117        if let Some(timeout) = config.timeout {
118            builder = builder.timeout(timeout);
119        }
120
121        Ok(Self {
122            client: builder.build()?,
123            config,
124        })
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use async_std::prelude::*;
132    use async_std::task;
133    use http_types::url::Url;
134    use http_types::Result;
135    use std::time::Duration;
136
137    fn build_test_request(url: Url) -> Request {
138        let mut req = Request::new(http_types::Method::Post, url);
139        req.set_body("hello");
140        req.append_header("test", "value");
141        req
142    }
143
144    #[async_std::test]
145    async fn basic_functionality() -> Result<()> {
146        let port = portpicker::pick_unused_port().unwrap();
147        let mut app = tide::new();
148        app.at("/").all(|mut r: tide::Request<()>| async move {
149            let mut response = tide::Response::new(http_types::StatusCode::Ok);
150            response.set_body(r.body_bytes().await.unwrap());
151            Ok(response)
152        });
153
154        let server = task::spawn(async move {
155            app.listen(("localhost", port)).await?;
156            Result::Ok(())
157        });
158
159        let client = task::spawn(async move {
160            task::sleep(Duration::from_millis(100)).await;
161            let request =
162                build_test_request(Url::parse(&format!("http://localhost:{}/", port)).unwrap());
163            let mut response: Response = IsahcClient::new().send(request).await?;
164            assert_eq!(response.body_string().await.unwrap(), "hello");
165            Ok(())
166        });
167
168        server.race(client).await?;
169
170        Ok(())
171    }
172}