octoproxy_lib/proxy_client/
mod.rs1use futures_util::Future;
2use hyper::service::Service;
3use std::{
4 fmt, io,
5 pin::Pin,
6 task::{Context, Poll},
7};
8use tokio::io::{AsyncRead, AsyncWrite};
9
10use http::Uri;
11
12use self::stream::ProxyStream;
13
14mod stream;
15mod tunnel;
16
17#[inline]
18pub(crate) fn io_err<E: Into<Box<dyn std::error::Error + Send + Sync>>>(e: E) -> io::Error {
19 io::Error::new(io::ErrorKind::Other, e)
20}
21
22#[derive(Clone)]
24pub struct ProxyConnector<C> {
25 proxy: Uri,
26 connector: C,
27}
28
29impl<C: fmt::Debug> fmt::Debug for ProxyConnector<C> {
30 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
31 write!(
32 f,
33 "ProxyConnector (unsecured){{ proxies: {:?}, connector: {:?} }}",
34 self.proxy, self.connector
35 )
36 }
37}
38
39impl<C> ProxyConnector<C> {
40 pub fn new(connector: C, proxy: Uri) -> Self {
42 Self { proxy, connector }
43 }
44}
45
46type BoxError = Box<dyn std::error::Error + Send + Sync>;
47
48impl<C> Service<Uri> for ProxyConnector<C>
49where
50 C: Service<Uri>,
51 C::Response: AsyncRead + AsyncWrite + Send + Unpin + 'static,
52 C::Future: Send + 'static,
53 C::Error: Into<BoxError>,
54{
55 type Response = ProxyStream<C::Response>;
56 type Error = io::Error;
57 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
58
59 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
60 match self.connector.poll_ready(cx) {
61 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
62 Poll::Ready(Err(e)) => Poll::Ready(Err(io_err(e.into()))),
63 Poll::Pending => Poll::Pending,
64 }
65 }
66
67 fn call(&mut self, uri: Uri) -> Self::Future {
68 let host = uri.authority().map(|auth| auth.to_string()).unwrap();
69
70 let tunnel = tunnel::new(&host);
71 let connection =
72 proxy_dst(uri, &self.proxy).map(|proxy_url| self.connector.call(proxy_url));
73
74 Box::pin(async move {
75 let proxy_stream = match connection {
76 Ok(connection) => match connection.await.map_err(io_err) {
77 Ok(proxy_stream) => proxy_stream,
78 Err(e) => return Err(e),
79 },
80 Err(e) => return Err(e),
81 };
82
83 let tunnel_stream = match tunnel.with_stream(proxy_stream).await {
84 Ok(tunnel_stream) => tunnel_stream,
85 Err(e) => return Err(e),
86 };
87
88 Ok(ProxyStream(tunnel_stream))
89 })
90 }
91}
92
93fn proxy_dst(dst: Uri, proxy: &Uri) -> io::Result<Uri> {
94 Uri::builder()
95 .scheme(
96 proxy
97 .scheme_str()
98 .ok_or_else(|| io_err(format!("proxy uri missing scheme: {}", proxy)))?,
99 )
100 .authority(
101 proxy
102 .authority()
103 .ok_or_else(|| io_err(format!("proxy uri missing host: {}", proxy)))?
104 .clone(),
105 )
106 .path_and_query(dst.path_and_query().unwrap().clone())
107 .build()
108 .map_err(|err| io_err(format!("other error: {}", err)))
109}