octoproxy_lib/proxy_client/
mod.rs

1use futures_util::Future;
2use hyper::service::Service;
3use std::{
4    fmt, io,
5    pin::Pin,
6    task::{Context, Poll},
7};
8use tokio::io::{AsyncRead, AsyncWrite};
9
10use http::Uri;
11
12use self::stream::ProxyStream;
13
14mod stream;
15mod tunnel;
16
17#[inline]
18pub(crate) fn io_err<E: Into<Box<dyn std::error::Error + Send + Sync>>>(e: E) -> io::Error {
19    io::Error::new(io::ErrorKind::Other, e)
20}
21
22/// A wrapper around `Proxy`s with a connector.
23#[derive(Clone)]
24pub struct ProxyConnector<C> {
25    proxy: Uri,
26    connector: C,
27}
28
29impl<C: fmt::Debug> fmt::Debug for ProxyConnector<C> {
30    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
31        write!(
32            f,
33            "ProxyConnector (unsecured){{ proxies: {:?}, connector: {:?} }}",
34            self.proxy, self.connector
35        )
36    }
37}
38
39impl<C> ProxyConnector<C> {
40    /// Create a proxy connector and attach a particular proxy
41    pub fn new(connector: C, proxy: Uri) -> Self {
42        Self { proxy, connector }
43    }
44}
45
46type BoxError = Box<dyn std::error::Error + Send + Sync>;
47
48impl<C> Service<Uri> for ProxyConnector<C>
49where
50    C: Service<Uri>,
51    C::Response: AsyncRead + AsyncWrite + Send + Unpin + 'static,
52    C::Future: Send + 'static,
53    C::Error: Into<BoxError>,
54{
55    type Response = ProxyStream<C::Response>;
56    type Error = io::Error;
57    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
58
59    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
60        match self.connector.poll_ready(cx) {
61            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
62            Poll::Ready(Err(e)) => Poll::Ready(Err(io_err(e.into()))),
63            Poll::Pending => Poll::Pending,
64        }
65    }
66
67    fn call(&mut self, uri: Uri) -> Self::Future {
68        let host = uri.authority().map(|auth| auth.to_string()).unwrap();
69
70        let tunnel = tunnel::new(&host);
71        let connection =
72            proxy_dst(uri, &self.proxy).map(|proxy_url| self.connector.call(proxy_url));
73
74        Box::pin(async move {
75            let proxy_stream = match connection {
76                Ok(connection) => match connection.await.map_err(io_err) {
77                    Ok(proxy_stream) => proxy_stream,
78                    Err(e) => return Err(e),
79                },
80                Err(e) => return Err(e),
81            };
82
83            let tunnel_stream = match tunnel.with_stream(proxy_stream).await {
84                Ok(tunnel_stream) => tunnel_stream,
85                Err(e) => return Err(e),
86            };
87
88            Ok(ProxyStream(tunnel_stream))
89        })
90    }
91}
92
93fn proxy_dst(dst: Uri, proxy: &Uri) -> io::Result<Uri> {
94    Uri::builder()
95        .scheme(
96            proxy
97                .scheme_str()
98                .ok_or_else(|| io_err(format!("proxy uri missing scheme: {}", proxy)))?,
99        )
100        .authority(
101            proxy
102                .authority()
103                .ok_or_else(|| io_err(format!("proxy uri missing host: {}", proxy)))?
104                .clone(),
105        )
106        .path_and_query(dst.path_and_query().unwrap().clone())
107        .build()
108        .map_err(|err| io_err(format!("other error: {}", err)))
109}