plane_dynamic_proxy/
connector.rs1use core::task;
2use http::Uri;
3use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioIo};
4use std::{future::Future, pin::Pin, task::Poll, time::Duration};
5use tokio::net::TcpStream;
6
7#[derive(Clone)]
10pub struct TimeoutHttpConnector {
11 pub timeout: Duration,
12 pub connector: HttpConnector,
13}
14
15impl Default for TimeoutHttpConnector {
16 fn default() -> Self {
17 TimeoutHttpConnector {
18 timeout: Duration::from_secs(10),
19 connector: HttpConnector::new(),
20 }
21 }
22}
23
24impl tower_service::Service<Uri> for TimeoutHttpConnector {
25 type Response = TokioIo<TcpStream>;
26 type Error = TimeoutHttpConnectorError;
27 type Future = Pin<Box<dyn Future<Output = Result<TokioIo<TcpStream>, Self::Error>> + Send>>;
28
29 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
30 self.connector
31 .poll_ready(cx)
32 .map_err(|e| TimeoutHttpConnectorError::Boxed(Box::new(e)))
33 }
34
35 fn call(&mut self, dst: Uri) -> Self::Future {
36 let fut = self.connector.call(dst);
37 let timeout = self.timeout;
38 Box::pin(async move {
39 let result = tokio::time::timeout(timeout, fut).await;
40 match result {
41 Ok(Ok(io)) => Ok(io),
42 Ok(Err(e)) => Err(TimeoutHttpConnectorError::Boxed(Box::new(e))),
43 Err(_) => Err(TimeoutHttpConnectorError::Timeout),
44 }
45 })
46 }
47}
48
49#[derive(thiserror::Error, Debug)]
50pub enum TimeoutHttpConnectorError {
51 #[error("Timeout")]
52 Timeout,
53
54 #[error("Non-timeout error: {0}")]
55 Boxed(#[from] Box<dyn std::error::Error + Send + Sync>),
56}