hyper_tor_connector/
socks.rs

1use std::net::SocketAddr;
2use std::task::Poll;
3
4use futures::future::BoxFuture;
5use futures::FutureExt;
6use hyper::client::connect::{Connected, Connection};
7use hyper::Uri;
8use tokio::io::{AsyncRead, AsyncWrite};
9use tokio::net::TcpStream;
10use tokio_socks::tcp::Socks5Stream;
11pub use tokio_socks::Error;
12use tower::Service;
13
14#[derive(Debug, Clone)]
15pub struct TorConnector {
16    proxy_addr: SocketAddr,
17}
18impl TorConnector {
19    pub fn new(proxy_addr: SocketAddr) -> Result<Self, Error> {
20        Ok(Self { proxy_addr })
21    }
22}
23
24#[pin_project::pin_project]
25pub struct TorStream(#[pin] pub Socks5Stream<TcpStream>);
26impl Connection for TorStream {
27    fn connected(&self) -> Connected {
28        self.0.connected()
29    }
30}
31impl AsyncWrite for TorStream {
32    fn is_write_vectored(&self) -> bool {
33        self.0.is_write_vectored()
34    }
35    fn poll_flush(
36        self: std::pin::Pin<&mut Self>,
37        cx: &mut std::task::Context<'_>,
38    ) -> Poll<Result<(), std::io::Error>> {
39        self.project().0.poll_flush(cx)
40    }
41    fn poll_shutdown(
42        self: std::pin::Pin<&mut Self>,
43        cx: &mut std::task::Context<'_>,
44    ) -> Poll<Result<(), std::io::Error>> {
45        self.project().0.poll_shutdown(cx)
46    }
47    fn poll_write(
48        self: std::pin::Pin<&mut Self>,
49        cx: &mut std::task::Context<'_>,
50        buf: &[u8],
51    ) -> Poll<Result<usize, std::io::Error>> {
52        self.project().0.poll_write(cx, buf)
53    }
54    fn poll_write_vectored(
55        self: std::pin::Pin<&mut Self>,
56        cx: &mut std::task::Context<'_>,
57        bufs: &[std::io::IoSlice<'_>],
58    ) -> Poll<Result<usize, std::io::Error>> {
59        self.project().0.poll_write_vectored(cx, bufs)
60    }
61}
62
63impl AsyncRead for TorStream {
64    fn poll_read(
65        self: std::pin::Pin<&mut Self>,
66        cx: &mut std::task::Context<'_>,
67        buf: &mut tokio::io::ReadBuf<'_>,
68    ) -> Poll<std::io::Result<()>> {
69        self.project().0.poll_read(cx, buf)
70    }
71}
72
73impl Service<Uri> for TorConnector {
74    type Response = TorStream;
75    type Error = Error;
76    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
77
78    fn poll_ready(
79        &mut self,
80        _cx: &mut std::task::Context<'_>,
81    ) -> std::task::Poll<Result<(), Self::Error>> {
82        Poll::Ready(Ok(()))
83    }
84
85    fn call(&mut self, req: Uri) -> Self::Future {
86        let proxy = self.proxy_addr;
87        async move {
88            Ok::<_, Error>(TorStream(
89                Socks5Stream::connect(
90                    proxy,
91                    (
92                        req.host().unwrap_or_default(),
93                        req.port_u16().unwrap_or(match req.scheme_str() {
94                            Some("https") | Some("wss") => 443,
95                            _ => 80,
96                        }),
97                    ),
98                )
99                .await?,
100            ))
101        }
102        .boxed()
103    }
104}