1use super::common::{pack_udp, parse_udp, Address};
2use super::protocol::{
3 self, AuthMethod, AuthRequest, AuthResponse, CommandRequest, CommandResponse, Version,
4};
5use protocol::Command;
6use rd_interface::{
7 async_trait,
8 util::{connect_tcp, connect_udp},
9 Context, IServer, IUdpChannel, IntoAddress, IntoDyn, Net, Result, TcpStream, UdpSocket,
10};
11use std::{
12 net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4},
13 sync::{Arc, RwLock},
14};
15use tokio::io::{split, AsyncWriteExt, BufWriter};
16
17struct Config {
18 net: Net,
19 listen_net: Net,
20}
21
22#[derive(Clone)]
23pub struct Socks5Server {
24 cfg: Arc<Config>,
25}
26
27impl Socks5Server {
28 pub async fn serve_connection(self, socket: TcpStream, addr: SocketAddr) -> anyhow::Result<()> {
29 let default_addr: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
30 let Config { net, listen_net } = &*self.cfg;
31 let local_ip = socket.local_addr().await?.ip();
32 let (mut rx, tx) = split(socket);
33 let mut tx = BufWriter::with_capacity(512, tx);
34
35 let version = Version::read(&mut rx).await?;
36 let auth_req = AuthRequest::read(&mut rx).await?;
37
38 let method = auth_req.select_from(&[AuthMethod::Noauth]);
39 let auth_resp = AuthResponse::new(method);
40 version.write(&mut tx).await?;
43 auth_resp.write(&mut tx).await?;
44 tx.flush().await?;
45
46 let cmd_req = CommandRequest::read(&mut rx).await?;
47
48 match cmd_req.command {
49 Command::Connect => {
50 let dst = cmd_req.address.into();
51 let out = match net
52 .tcp_connect(&mut Context::from_socketaddr(addr), dst)
53 .await
54 {
55 Ok(socket) => socket,
56 Err(e) => {
57 CommandResponse::error(e).write(&mut tx).await?;
58 tx.flush().await?;
59 return Ok(());
60 }
61 };
62
63 let addr: Address = out.local_addr().await.unwrap_or(default_addr).into();
64 CommandResponse::success(addr).write(&mut tx).await?;
65 tx.flush().await?;
66
67 let socket = rx.unsplit(tx.into_inner());
68
69 connect_tcp(out, socket).await?;
70 }
71 Command::UdpAssociate => {
72 let dst = match cmd_req.address {
73 Address::SocketAddr(SocketAddr::V4(_)) => rd_interface::Address::SocketAddr(
74 SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0),
75 ),
76 Address::SocketAddr(SocketAddr::V6(_)) => rd_interface::Address::SocketAddr(
77 SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0),
78 ),
79 _ => {
80 CommandResponse::reply_error(
81 protocol::CommandReply::AddressTypeNotSupported,
82 )
83 .write(&mut tx)
84 .await?;
85
86 tx.flush().await?;
87 return Ok(());
88 }
89 };
90 let out = match net
91 .udp_bind(&mut Context::from_socketaddr(addr), dst.into())
92 .await
93 {
94 Ok(socket) => socket,
95 Err(e) => {
96 CommandResponse::error(e).write(&mut tx).await?;
97 tx.flush().await?;
98 return Ok(());
99 }
100 };
101 let udp = listen_net
102 .udp_bind(
103 &mut Context::from_socketaddr(addr),
104 "0.0.0.0:0".into_address()?,
105 )
106 .await?;
107
108 let udp_port = match udp.local_addr().await {
110 Ok(a) => a.port(),
111 Err(e) => {
112 CommandResponse::error(e).write(&mut tx).await?;
113 tx.flush().await?;
114 return Ok(());
115 }
116 };
117 let addr: SocketAddr = (local_ip, udp_port).into();
118 let addr: Address = addr.into();
119
120 CommandResponse::success(addr).write(&mut tx).await?;
121 tx.flush().await?;
122
123 let socket = rx.unsplit(tx.into_inner());
124
125 let udp_channel = Socks5UdpSocket(udp, socket, RwLock::new(None));
126 connect_udp(udp_channel.into_dyn(), out).await?;
127 }
128 _ => {
129 return Ok(());
130 }
131 };
132
133 Ok(())
134 }
135 pub fn new(listen_net: Net, net: Net) -> Self {
136 Self {
137 cfg: Arc::new(Config { net, listen_net }),
138 }
139 }
140}
141
142pub struct Socks5UdpSocket(UdpSocket, TcpStream, RwLock<Option<SocketAddr>>);
143
144#[async_trait]
145impl IUdpChannel for Socks5UdpSocket {
146 async fn recv_send_to(&self, buf: &mut [u8]) -> Result<(usize, rd_interface::Address)> {
147 let bytes_size = 259 + buf.len();
149 let mut bytes = vec![0u8; bytes_size];
150 let (recv_len, from_addr) = self.0.recv_from(&mut bytes).await?;
151 let saved_addr = { *self.2.read().unwrap() };
152 if let None = saved_addr {
153 *self.2.write().unwrap() = Some(from_addr);
154 }
155 bytes.truncate(recv_len);
156
157 let (addr, payload) = parse_udp(&bytes).await?;
158 let to_copy = payload.len().min(buf.len());
159 buf[..to_copy].copy_from_slice(&payload[..to_copy]);
160
161 Ok((to_copy, addr.into()))
162 }
163
164 async fn send_recv_from(&self, buf: &[u8], addr: SocketAddr) -> Result<usize> {
165 let saddr: Address = addr.into();
166
167 let bytes = pack_udp(saddr, buf).await?;
168
169 let addr = { *self.2.read().unwrap() };
170 Ok(if let Some(addr) = addr {
171 self.0.send_to(&bytes, addr.into()).await?
172 } else {
173 0
174 })
175 }
176}
177
178pub struct Socks5 {
179 server: Socks5Server,
180 listen_net: Net,
181 bind: String,
182}
183
184#[async_trait]
185impl IServer for Socks5 {
186 async fn start(&self) -> Result<()> {
187 let listener = self
188 .listen_net
189 .tcp_bind(&mut Context::new(), self.bind.into_address()?)
190 .await?;
191
192 loop {
193 let (socket, addr) = listener.accept().await?;
194 let server = self.server.clone();
195 let _ = tokio::spawn(async move {
196 if let Err(e) = server.serve_connection(socket, addr).await {
197 log::error!("Error when serve_connection: {:?}", e)
198 }
199 });
200 }
201 }
202}
203
204impl Socks5 {
205 pub fn new(listen_net: Net, net: Net, bind: String) -> Self {
206 Socks5 {
207 server: Socks5Server::new(listen_net.clone(), net),
208 listen_net,
209 bind,
210 }
211 }
212}