1mod http;
2mod socks5;
3
4use std::io::Error;
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::task::{Context, Poll};
9
10pub use http::HttpHandle;
11pub use socks5::Socks5Handle;
12
13use std::fmt::Debug;
14use tokio::io::{AsyncRead, AsyncWrite, BufReader};
15use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
16use tokio_stream::{Stream, StreamExt};
17
18use crate::{ProxyError, connector::Connector, util::BufIoExt};
19
20pub struct ProxyServer<C, I = TcpIncoming> {
21 incoming: I,
22 client_handle: Arc<ClientHandle<C>>,
23}
24
25impl<C> ProxyServer<C, TcpIncoming> {
26 pub async fn bind<A>(connector: C, addr: A) -> Result<Self, ProxyError>
27 where
28 A: ToSocketAddrs + Clone + Debug,
29 {
30 let listener = match TcpListener::bind(addr.clone()).await {
31 Ok(l) => l,
32 Err(e) => {
33 bail!("bind {:?} fail: {}", addr, e);
34 }
35 };
36 Ok(ProxyServer {
37 incoming: TcpIncoming { listener },
38 client_handle: Arc::new(ClientHandle::new(connector)),
39 })
40 }
41
42 pub fn from_listener(connector: C, listener: TcpListener) -> Self {
43 ProxyServer {
44 incoming: TcpIncoming { listener },
45 client_handle: Arc::new(ClientHandle::new(connector)),
46 }
47 }
48}
49
50impl<C, I> ProxyServer<C, I> {
51 pub fn from_incoming(connector: C, incoming: I) -> Self {
52 Self {
53 incoming,
54 client_handle: Arc::new(ClientHandle::new(connector)),
55 }
56 }
57}
58
59impl<C, I, T> ProxyServer<C, I>
60where
61 C: Connector + Send + Sync + 'static,
62 <C as Connector>::Transport: Unpin + Send,
63 I: Stream<Item = Result<(T, SocketAddr), Error>> + Unpin,
64 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
65{
66 pub fn new(connector: C, incoming: I) -> Self {
67 Self {
68 incoming,
69 client_handle: Arc::new(ClientHandle::new(connector)),
70 }
71 }
72
73 pub async fn run(mut self) -> Result<(), ProxyError> {
74 while let Some(result) = self.incoming.next().await {
75 match result {
76 Ok((sock, addr)) => {
77 let client_handle = self.client_handle.clone();
78 tokio::spawn(async move {
79 if let Err(e) = client_handle.handle(sock, addr).await {
80 warn!("handle {} fail: {}", addr, e);
81 }
82 });
83 }
84 Err(e) => {
85 bail!("accept incoming fail: {}", e);
86 }
87 }
88 }
89 Ok(())
90 }
91}
92
93struct ClientHandle<C> {
94 connector: C,
95
96 socks5_handle: Socks5Handle,
97 http_handle: HttpHandle,
98}
99
100impl<C> ClientHandle<C> {
101 fn new(connector: C) -> Self {
102 Self {
103 connector,
104 socks5_handle: Socks5Handle::new(),
105 http_handle: HttpHandle::new(),
106 }
107 }
108}
109
110impl<C> ClientHandle<C>
111where
112 C: Connector,
113 <C as Connector>::Transport: Unpin,
114{
115 async fn handle<T>(&self, sock: T, addr: SocketAddr) -> Result<(), ProxyError>
116 where
117 T: AsyncRead + AsyncWrite + Unpin,
118 {
119 let mut stream = BufReader::new(sock);
120 match stream.try_peek_byte().await {
121 Ok(Some(0x05)) => self.socks5_handle.handle(&self.connector, stream).await?,
122 Ok(Some(_)) => self.http_handle.handle(&self.connector, stream).await?,
123 Ok(None) => {
124 debug!("local socket({}) EOF with no data", addr);
125 }
126 Err(e) => {
127 bail!("read local socket({}) fail: {}", addr, e);
128 }
129 }
130 Ok(())
131 }
132}
133
134pub struct TcpIncoming {
135 listener: TcpListener,
136}
137
138impl Stream for TcpIncoming {
139 type Item = Result<(TcpStream, SocketAddr), Error>;
140
141 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
142 let (sock, addr) = ready!(Pin::new(&mut self.listener).poll_accept(cx))?;
143 sock.set_nodelay(true)?;
144 Poll::Ready(Some(Ok((sock, addr))))
145 }
146}