Skip to main content

proxies/server/
mod.rs

1mod http;
2mod socks5;
3
4use std::io::Error;
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::task::{Context, Poll};
9
10pub use http::HttpHandle;
11pub use socks5::Socks5Handle;
12
13use std::fmt::Debug;
14use tokio::io::{AsyncRead, AsyncWrite, BufReader};
15use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
16use tokio_stream::{Stream, StreamExt};
17
18use crate::{ProxyError, connector::Connector, util::BufIoExt};
19
20pub struct ProxyServer<C, I = TcpIncoming> {
21    incoming: I,
22    client_handle: Arc<ClientHandle<C>>,
23}
24
25impl<C> ProxyServer<C, TcpIncoming> {
26    pub async fn bind<A>(connector: C, addr: A) -> Result<Self, ProxyError>
27    where
28        A: ToSocketAddrs + Clone + Debug,
29    {
30        let listener = match TcpListener::bind(addr.clone()).await {
31            Ok(l) => l,
32            Err(e) => {
33                bail!("bind {:?} fail: {}", addr, e);
34            }
35        };
36        Ok(ProxyServer {
37            incoming: TcpIncoming { listener },
38            client_handle: Arc::new(ClientHandle::new(connector)),
39        })
40    }
41
42    pub fn from_listener(connector: C, listener: TcpListener) -> Self {
43        ProxyServer {
44            incoming: TcpIncoming { listener },
45            client_handle: Arc::new(ClientHandle::new(connector)),
46        }
47    }
48}
49
50impl<C, I> ProxyServer<C, I> {
51    pub fn from_incoming(connector: C, incoming: I) -> Self {
52        Self {
53            incoming,
54            client_handle: Arc::new(ClientHandle::new(connector)),
55        }
56    }
57}
58
59impl<C, I, T> ProxyServer<C, I>
60where
61    C: Connector + Send + Sync + 'static,
62    <C as Connector>::Transport: Unpin + Send,
63    I: Stream<Item = Result<(T, SocketAddr), Error>> + Unpin,
64    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
65{
66    pub fn new(connector: C, incoming: I) -> Self {
67        Self {
68            incoming,
69            client_handle: Arc::new(ClientHandle::new(connector)),
70        }
71    }
72
73    pub async fn run(mut self) -> Result<(), ProxyError> {
74        while let Some(result) = self.incoming.next().await {
75            match result {
76                Ok((sock, addr)) => {
77                    let client_handle = self.client_handle.clone();
78                    tokio::spawn(async move {
79                        if let Err(e) = client_handle.handle(sock, addr).await {
80                            warn!("handle {} fail: {}", addr, e);
81                        }
82                    });
83                }
84                Err(e) => {
85                    bail!("accept incoming fail: {}", e);
86                }
87            }
88        }
89        Ok(())
90    }
91}
92
93struct ClientHandle<C> {
94    connector: C,
95
96    socks5_handle: Socks5Handle,
97    http_handle: HttpHandle,
98}
99
100impl<C> ClientHandle<C> {
101    fn new(connector: C) -> Self {
102        Self {
103            connector,
104            socks5_handle: Socks5Handle::new(),
105            http_handle: HttpHandle::new(),
106        }
107    }
108}
109
110impl<C> ClientHandle<C>
111where
112    C: Connector,
113    <C as Connector>::Transport: Unpin,
114{
115    async fn handle<T>(&self, sock: T, addr: SocketAddr) -> Result<(), ProxyError>
116    where
117        T: AsyncRead + AsyncWrite + Unpin,
118    {
119        let mut stream = BufReader::new(sock);
120        match stream.try_peek_byte().await {
121            Ok(Some(0x05)) => self.socks5_handle.handle(&self.connector, stream).await?,
122            Ok(Some(_)) => self.http_handle.handle(&self.connector, stream).await?,
123            Ok(None) => {
124                debug!("local socket({}) EOF with no data", addr);
125            }
126            Err(e) => {
127                bail!("read local socket({}) fail: {}", addr, e);
128            }
129        }
130        Ok(())
131    }
132}
133
134pub struct TcpIncoming {
135    listener: TcpListener,
136}
137
138impl Stream for TcpIncoming {
139    type Item = Result<(TcpStream, SocketAddr), Error>;
140
141    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
142        let (sock, addr) = ready!(Pin::new(&mut self.listener).poll_accept(cx))?;
143        sock.set_nodelay(true)?;
144        Poll::Ready(Some(Ok((sock, addr))))
145    }
146}