1use std::{
4 io::{self, ErrorKind},
5 net::SocketAddr,
6 sync::Arc,
7};
8
9use tokio::{
10 io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
11 net::{TcpListener as TokioTcpListener, TcpStream as TokioTcpStream, ToSocketAddrs},
12};
13
14use crate::{
15 context::Ctx,
16 crypto::cipher::Method,
17 net::{
18 lookup_host,
19 stream::{TcpStream as SsTcpStream, TimeoutStream},
20 },
21 socks5::{self, Socks5Addr},
22};
23
24mod constants {
25 use std::time::Duration;
26
27 pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
28}
29
30pub struct SsTcpListener {
32 inner_listener: TokioTcpListener,
33 cipher_method: Method,
34 cipher_key: Vec<u8>,
35 ctx: Arc<Ctx>,
36}
37
38impl SsTcpListener {
39 pub async fn bind<A: ToSocketAddrs>(
42 addr: A,
43 cipher_method: Method,
44 cipher_key: &[u8],
45 ctx: Arc<Ctx>,
46 ) -> io::Result<Self> {
47 let inner_listener = TokioTcpListener::bind(addr).await?;
48 Ok(SsTcpListener {
49 inner_listener,
50 cipher_method,
51 cipher_key: cipher_key.to_owned(),
52 ctx,
53 })
54 }
55
56 pub async fn accept(&self) -> io::Result<(SsTcpStream<TokioTcpStream>, SocketAddr)> {
58 let (stream, addr) = self.inner_listener.accept().await?;
59 Ok((
60 SsTcpStream::new(
61 stream,
62 self.cipher_method,
63 &self.cipher_key,
64 self.ctx.clone(),
65 ),
66 addr,
67 ))
68 }
69}
70
71pub async fn ss_remote(
73 addr: SocketAddr,
74 method: Method,
75 key: Vec<u8>,
76 ctx: Arc<Ctx>,
77) -> io::Result<()> {
78 let listener = SsTcpListener::bind(addr, method, &key, ctx.clone()).await?;
79
80 log::info!("ss-remote listening on {}", addr);
81
82 loop {
83 match listener.accept().await {
84 Ok((encrypted_stream, peer)) => {
85 log::debug!("Accept {}", peer);
86 tokio::spawn(handle_ss_remote(encrypted_stream, peer, ctx.clone()));
87 }
88 Err(e) => log::warn!("Accept error: {}", e),
89 }
90 }
91}
92
93pub async fn ss_local(
95 local_addr: SocketAddr,
96 remote_addr: SocketAddr,
97 method: Method,
98 key: Vec<u8>,
99 ctx: Arc<Ctx>,
100) -> io::Result<()> {
101 let listener = TokioTcpListener::bind(local_addr).await?;
102
103 log::info!("ss-local listening on {}", local_addr);
104 log::info!("The remote server address is {}", remote_addr);
105
106 loop {
107 match listener.accept().await {
108 Ok((stream, peer)) => {
109 log::debug!("Accept {}", peer);
110 tokio::spawn(handle_ss_local(
111 stream,
112 peer,
113 remote_addr,
114 method,
115 key.clone(),
116 ctx.clone(),
117 ));
118 }
119 Err(e) => log::warn!("Accept error: {}", e),
120 }
121 }
122}
123
124pub async fn handle_ss_remote<T>(stream: SsTcpStream<T>, peer: SocketAddr, ctx: Arc<Ctx>)
126where
127 T: AsyncRead + AsyncWrite + Unpin + Send,
128{
129 let mut stream = make_timed_stream(stream);
130
131 if ctx.is_bypass(peer.ip(), None) {
133 log::warn!("Reject the client: peer {}", peer);
134 return;
135 }
136
137 let target_addr = match Socks5Addr::construct(&mut stream).await {
139 Ok(addr) => addr,
140 Err(e) => {
141 match e.kind() {
142 ErrorKind::Other => {
143 log::warn!("Read target address failed: {}, peer {}", e, peer);
144 read_to_end(&mut stream).await.unwrap_or_default();
147 }
148 _ => log::debug!("Read target address failed: {}, peer {}", e, peer),
149 }
150 return;
151 }
152 };
153
154 let target_socket_addr = match lookup_host(&target_addr.to_string()).await {
156 Ok(addr) => addr,
157 Err(e) => {
158 log::warn!("Resolve {} failed: {}, peer {}", target_addr, e, peer);
159 return;
160 }
161 };
162 let target_ip = target_socket_addr.ip();
163
164 if ctx.is_block_outbound(target_ip, Some(&target_addr.to_string())) {
166 log::warn!(
167 "Block outbound address: {} -> {} ({})",
168 peer,
169 target_addr,
170 target_ip
171 );
172 return;
173 }
174
175 log::debug!(
176 "Allow outbound address: {} -> {} ({})",
177 peer,
178 target_addr,
179 target_ip
180 );
181
182 let mut target_stream = match TokioTcpStream::connect(target_socket_addr).await {
184 Ok(stream) => make_timed_stream(stream),
185 Err(e) => {
186 log::debug!(
187 "Unable to connect to {} ({}): {}, peer {}",
188 target_addr,
189 target_ip,
190 e,
191 peer
192 );
193 return;
194 }
195 };
196
197 let trans = format!("{} <=> {} ({})", peer, target_addr, target_ip);
199 transfer(&mut stream, &mut target_stream, &trans).await;
200}
201
202pub async fn handle_ss_local(
204 stream: TokioTcpStream,
205 peer: SocketAddr,
206 remote_addr: SocketAddr,
207 method: Method,
208 key: Vec<u8>,
209 ctx: Arc<Ctx>,
210) {
211 let mut stream = make_timed_stream(stream);
212
213 let target_addr = match socks5::handshake(&mut stream).await {
215 Ok(addr) => addr,
216 Err(e) => {
217 match e.kind() {
218 ErrorKind::Other => log::warn!("Read target address failed: {}, peer {}", e, peer),
219 _ => log::debug!("Read target address failed: {}, peer {}", e, peer),
220 }
221 return;
222 }
223 };
224
225 let target_socket_addr = match lookup_host(&target_addr.to_string()).await {
227 Ok(addr) => Some(addr),
228 Err(e) => {
229 log::debug!("Resolve {} failed: {}, peer {}", target_addr, e, peer);
230 None
231 }
232 };
233
234 let trans: String;
236 let host = target_addr
237 .to_string()
238 .split(':')
239 .next()
240 .map(str::to_owned)
241 .unwrap_or_default();
242 match target_socket_addr {
243 Some(addr) if ctx.is_bypass(addr.ip(), Some(&host)) => {
244 trans = format!("{} <=> {} ({})", peer, target_addr, addr.ip());
245
246 log::debug!(
247 "Bypass target address: {} -> {} ({})",
248 peer,
249 target_addr,
250 addr.ip()
251 );
252
253 let mut target_stream = match TokioTcpStream::connect(addr).await {
255 Ok(stream) => make_timed_stream(stream),
256 Err(e) => {
257 log::error!(
258 "Unable to connect to {} ({}): {}, peer {}",
259 target_addr,
260 addr.ip(),
261 e,
262 peer
263 );
264 return;
265 }
266 };
267
268 transfer(&mut stream, &mut target_stream, &trans).await;
270 }
271 _ => {
272 trans = format!("{} <=> {}", peer, target_addr);
273
274 if log::log_enabled!(log::Level::Debug) {
275 let mut str = format!("Proxy target address: {} -> {}", peer, target_addr);
276
277 if let Some(addr) = target_socket_addr {
278 str.push_str(&format!(" ({})", addr.ip()));
279 }
280
281 log::debug!("{}", str);
282 }
283
284 let mut target_stream = match TokioTcpStream::connect(remote_addr).await {
286 Ok(stream) => make_timed_stream(SsTcpStream::new(stream, method, &key, ctx)),
287 Err(e) => {
288 log::error!("Unable to connect to {}: {}, peer {}", remote_addr, e, peer);
289 return;
290 }
291 };
292
293 let target_addr_bytes = target_addr.get_raw_parts();
295 match target_stream.write_all(&target_addr_bytes).await {
296 Ok(_) => {}
297 Err(e) => {
298 log::error!(
299 "Write target address to {} failed: {}, peer {}",
300 remote_addr,
301 e,
302 peer
303 );
304 return;
305 }
306 }
307
308 transfer(&mut stream, &mut target_stream, &trans).await;
310 }
311 }
312}
313
314async fn transfer<A, B>(a: &mut A, b: &mut B, trans: &str)
315where
316 A: AsyncRead + AsyncWrite + Unpin + ?Sized,
317 B: AsyncRead + AsyncWrite + Unpin + ?Sized,
318{
319 match tokio::io::copy_bidirectional(a, b).await {
320 Ok((atob, btoa)) => log::trace!("{} done: ltor {} bytes, rtol {} bytes", trans, atob, btoa),
321 Err(e) => match e.kind() {
322 ErrorKind::Other => log::warn!("{} error: {}", trans, e),
323 _ => log::debug!("{} error: {}", trans, e),
324 },
325 }
326}
327
328async fn read_to_end<R>(reader: &mut R) -> io::Result<()>
329where
330 R: AsyncRead + Unpin + ?Sized,
331{
332 let mut buf = [0; 2048];
333
334 loop {
335 let n = reader.read(&mut buf).await?;
336 if n == 0 {
337 break;
338 }
339 }
340
341 Ok(())
342}
343
344fn make_timed_stream<T>(stream: T) -> TimeoutStream<T> {
345 TimeoutStream::new(stream, constants::DEFAULT_TIMEOUT)
346}