plane_dynamic_proxy/
connector.rs

1use core::task;
2use http::Uri;
3use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioIo};
4use std::{future::Future, pin::Pin, task::Poll, time::Duration};
5use tokio::net::TcpStream;
6
7/// Light wrapper of `hyper_util::client::legacy::connect::HttpConnector` that adds a timeout to the initial
8/// connection being established.
9#[derive(Clone)]
10pub struct TimeoutHttpConnector {
11    pub timeout: Duration,
12    pub connector: HttpConnector,
13}
14
15impl Default for TimeoutHttpConnector {
16    fn default() -> Self {
17        TimeoutHttpConnector {
18            timeout: Duration::from_secs(10),
19            connector: HttpConnector::new(),
20        }
21    }
22}
23
24impl tower_service::Service<Uri> for TimeoutHttpConnector {
25    type Response = TokioIo<TcpStream>;
26    type Error = TimeoutHttpConnectorError;
27    type Future = Pin<Box<dyn Future<Output = Result<TokioIo<TcpStream>, Self::Error>> + Send>>;
28
29    fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
30        self.connector
31            .poll_ready(cx)
32            .map_err(|e| TimeoutHttpConnectorError::Boxed(Box::new(e)))
33    }
34
35    fn call(&mut self, dst: Uri) -> Self::Future {
36        let fut = self.connector.call(dst);
37        let timeout = self.timeout;
38        Box::pin(async move {
39            let result = tokio::time::timeout(timeout, fut).await;
40            match result {
41                Ok(Ok(io)) => Ok(io),
42                Ok(Err(e)) => Err(TimeoutHttpConnectorError::Boxed(Box::new(e))),
43                Err(_) => Err(TimeoutHttpConnectorError::Timeout),
44            }
45        })
46    }
47}
48
49#[derive(thiserror::Error, Debug)]
50pub enum TimeoutHttpConnectorError {
51    #[error("Timeout")]
52    Timeout,
53
54    #[error("Non-timeout error: {0}")]
55    Boxed(#[from] Box<dyn std::error::Error + Send + Sync>),
56}