hyper_tor_connector/
socks.rs1use std::net::SocketAddr;
2use std::task::Poll;
3
4use futures::future::BoxFuture;
5use futures::FutureExt;
6use hyper::client::connect::{Connected, Connection};
7use hyper::Uri;
8use tokio::io::{AsyncRead, AsyncWrite};
9use tokio::net::TcpStream;
10use tokio_socks::tcp::Socks5Stream;
11pub use tokio_socks::Error;
12use tower::Service;
13
14#[derive(Debug, Clone)]
15pub struct TorConnector {
16 proxy_addr: SocketAddr,
17}
18impl TorConnector {
19 pub fn new(proxy_addr: SocketAddr) -> Result<Self, Error> {
20 Ok(Self { proxy_addr })
21 }
22}
23
24#[pin_project::pin_project]
25pub struct TorStream(#[pin] pub Socks5Stream<TcpStream>);
26impl Connection for TorStream {
27 fn connected(&self) -> Connected {
28 self.0.connected()
29 }
30}
31impl AsyncWrite for TorStream {
32 fn is_write_vectored(&self) -> bool {
33 self.0.is_write_vectored()
34 }
35 fn poll_flush(
36 self: std::pin::Pin<&mut Self>,
37 cx: &mut std::task::Context<'_>,
38 ) -> Poll<Result<(), std::io::Error>> {
39 self.project().0.poll_flush(cx)
40 }
41 fn poll_shutdown(
42 self: std::pin::Pin<&mut Self>,
43 cx: &mut std::task::Context<'_>,
44 ) -> Poll<Result<(), std::io::Error>> {
45 self.project().0.poll_shutdown(cx)
46 }
47 fn poll_write(
48 self: std::pin::Pin<&mut Self>,
49 cx: &mut std::task::Context<'_>,
50 buf: &[u8],
51 ) -> Poll<Result<usize, std::io::Error>> {
52 self.project().0.poll_write(cx, buf)
53 }
54 fn poll_write_vectored(
55 self: std::pin::Pin<&mut Self>,
56 cx: &mut std::task::Context<'_>,
57 bufs: &[std::io::IoSlice<'_>],
58 ) -> Poll<Result<usize, std::io::Error>> {
59 self.project().0.poll_write_vectored(cx, bufs)
60 }
61}
62
63impl AsyncRead for TorStream {
64 fn poll_read(
65 self: std::pin::Pin<&mut Self>,
66 cx: &mut std::task::Context<'_>,
67 buf: &mut tokio::io::ReadBuf<'_>,
68 ) -> Poll<std::io::Result<()>> {
69 self.project().0.poll_read(cx, buf)
70 }
71}
72
73impl Service<Uri> for TorConnector {
74 type Response = TorStream;
75 type Error = Error;
76 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
77
78 fn poll_ready(
79 &mut self,
80 _cx: &mut std::task::Context<'_>,
81 ) -> std::task::Poll<Result<(), Self::Error>> {
82 Poll::Ready(Ok(()))
83 }
84
85 fn call(&mut self, req: Uri) -> Self::Future {
86 let proxy = self.proxy_addr;
87 async move {
88 Ok::<_, Error>(TorStream(
89 Socks5Stream::connect(
90 proxy,
91 (
92 req.host().unwrap_or_default(),
93 req.port_u16().unwrap_or(match req.scheme_str() {
94 Some("https") | Some("wss") => 443,
95 _ => 80,
96 }),
97 ),
98 )
99 .await?,
100 ))
101 }
102 .boxed()
103 }
104}