ss_rs/
tcp.rs

1//! Shadowsocks tcp services.
2
3use std::{
4    io::{self, ErrorKind},
5    net::SocketAddr,
6    sync::Arc,
7};
8
9use tokio::{
10    io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
11    net::{TcpListener as TokioTcpListener, TcpStream as TokioTcpStream, ToSocketAddrs},
12};
13
14use crate::{
15    context::Ctx,
16    crypto::cipher::Method,
17    net::{
18        lookup_host,
19        stream::{TcpStream as SsTcpStream, TimeoutStream},
20    },
21    socks5::{self, Socks5Addr},
22};
23
24mod constants {
25    use std::time::Duration;
26
27    pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
28}
29
30/// TCP Listener for incoming shadowsocks connection.
31pub struct SsTcpListener {
32    inner_listener: TokioTcpListener,
33    cipher_method: Method,
34    cipher_key: Vec<u8>,
35    ctx: Arc<Ctx>,
36}
37
38impl SsTcpListener {
39    /// Creates a new TcpListener for incoming shadowsocks connection,
40    /// which will be bound to the specified address.
41    pub async fn bind<A: ToSocketAddrs>(
42        addr: A,
43        cipher_method: Method,
44        cipher_key: &[u8],
45        ctx: Arc<Ctx>,
46    ) -> io::Result<Self> {
47        let inner_listener = TokioTcpListener::bind(addr).await?;
48        Ok(SsTcpListener {
49            inner_listener,
50            cipher_method,
51            cipher_key: cipher_key.to_owned(),
52            ctx,
53        })
54    }
55
56    /// Accepts a new incoming shadowsocks connection from this listener.
57    pub async fn accept(&self) -> io::Result<(SsTcpStream<TokioTcpStream>, SocketAddr)> {
58        let (stream, addr) = self.inner_listener.accept().await?;
59        Ok((
60            SsTcpStream::new(
61                stream,
62                self.cipher_method,
63                &self.cipher_key,
64                self.ctx.clone(),
65            ),
66            addr,
67        ))
68    }
69}
70
71/// Starts a shadowsocks remote server.
72pub async fn ss_remote(
73    addr: SocketAddr,
74    method: Method,
75    key: Vec<u8>,
76    ctx: Arc<Ctx>,
77) -> io::Result<()> {
78    let listener = SsTcpListener::bind(addr, method, &key, ctx.clone()).await?;
79
80    log::info!("ss-remote listening on {}", addr);
81
82    loop {
83        match listener.accept().await {
84            Ok((encrypted_stream, peer)) => {
85                log::debug!("Accept {}", peer);
86                tokio::spawn(handle_ss_remote(encrypted_stream, peer, ctx.clone()));
87            }
88            Err(e) => log::warn!("Accept error: {}", e),
89        }
90    }
91}
92
93/// Starts a shadowsocks local server.
94pub async fn ss_local(
95    local_addr: SocketAddr,
96    remote_addr: SocketAddr,
97    method: Method,
98    key: Vec<u8>,
99    ctx: Arc<Ctx>,
100) -> io::Result<()> {
101    let listener = TokioTcpListener::bind(local_addr).await?;
102
103    log::info!("ss-local listening on {}", local_addr);
104    log::info!("The remote server address is {}", remote_addr);
105
106    loop {
107        match listener.accept().await {
108            Ok((stream, peer)) => {
109                log::debug!("Accept {}", peer);
110                tokio::spawn(handle_ss_local(
111                    stream,
112                    peer,
113                    remote_addr,
114                    method,
115                    key.clone(),
116                    ctx.clone(),
117                ));
118            }
119            Err(e) => log::warn!("Accept error: {}", e),
120        }
121    }
122}
123
124/// Handles incoming connection from ss-remote.
125pub async fn handle_ss_remote<T>(stream: SsTcpStream<T>, peer: SocketAddr, ctx: Arc<Ctx>)
126where
127    T: AsyncRead + AsyncWrite + Unpin + Send,
128{
129    let mut stream = make_timed_stream(stream);
130
131    // 1. Checks whether or not to reject the client
132    if ctx.is_bypass(peer.ip(), None) {
133        log::warn!("Reject the client: peer {}", peer);
134        return;
135    }
136
137    // 2. Constructs a socks5 address with timeout
138    let target_addr = match Socks5Addr::construct(&mut stream).await {
139        Ok(addr) => addr,
140        Err(e) => {
141            match e.kind() {
142                ErrorKind::Other => {
143                    log::warn!("Read target address failed: {}, peer {}", e, peer);
144                    // We shouldn't close the connection,
145                    // See https://github.com/shadowsocks/shadowsocks-rust/issues/292
146                    read_to_end(&mut stream).await.unwrap_or_default();
147                }
148                _ => log::debug!("Read target address failed: {}, peer {}", e, peer),
149            }
150            return;
151        }
152    };
153
154    // 3. Resolves target socket address
155    let target_socket_addr = match lookup_host(&target_addr.to_string()).await {
156        Ok(addr) => addr,
157        Err(e) => {
158            log::warn!("Resolve {} failed: {}, peer {}", target_addr, e, peer);
159            return;
160        }
161    };
162    let target_ip = target_socket_addr.ip();
163
164    // 4. Checks whether or not to block outbound
165    if ctx.is_block_outbound(target_ip, Some(&target_addr.to_string())) {
166        log::warn!(
167            "Block outbound address: {} -> {} ({})",
168            peer,
169            target_addr,
170            target_ip
171        );
172        return;
173    }
174
175    log::debug!(
176        "Allow outbound address: {} -> {} ({})",
177        peer,
178        target_addr,
179        target_ip
180    );
181
182    // 5. Connects to target address
183    let mut target_stream = match TokioTcpStream::connect(target_socket_addr).await {
184        Ok(stream) => make_timed_stream(stream),
185        Err(e) => {
186            log::debug!(
187                "Unable to connect to {} ({}): {}, peer {}",
188                target_addr,
189                target_ip,
190                e,
191                peer
192            );
193            return;
194        }
195    };
196
197    // 6. Establishes connection between ss-local and target
198    let trans = format!("{} <=> {} ({})", peer, target_addr, target_ip);
199    transfer(&mut stream, &mut target_stream, &trans).await;
200}
201
202/// Handles incoming connection from ss-local.
203pub async fn handle_ss_local(
204    stream: TokioTcpStream,
205    peer: SocketAddr,
206    remote_addr: SocketAddr,
207    method: Method,
208    key: Vec<u8>,
209    ctx: Arc<Ctx>,
210) {
211    let mut stream = make_timed_stream(stream);
212
213    // 1. Constructs a socks5 address with timeout
214    let target_addr = match socks5::handshake(&mut stream).await {
215        Ok(addr) => addr,
216        Err(e) => {
217            match e.kind() {
218                ErrorKind::Other => log::warn!("Read target address failed: {}, peer {}", e, peer),
219                _ => log::debug!("Read target address failed: {}, peer {}", e, peer),
220            }
221            return;
222        }
223    };
224
225    // 2. Resolves target socket address
226    let target_socket_addr = match lookup_host(&target_addr.to_string()).await {
227        Ok(addr) => Some(addr),
228        Err(e) => {
229            log::debug!("Resolve {} failed: {}, peer {}", target_addr, e, peer);
230            None
231        }
232    };
233
234    // 3. Relays target address, bypass or proxy
235    let trans: String;
236    let host = target_addr
237        .to_string()
238        .split(':')
239        .next()
240        .map(str::to_owned)
241        .unwrap_or_default();
242    match target_socket_addr {
243        Some(addr) if ctx.is_bypass(addr.ip(), Some(&host)) => {
244            trans = format!("{} <=> {} ({})", peer, target_addr, addr.ip());
245
246            log::debug!(
247                "Bypass target address: {} -> {} ({})",
248                peer,
249                target_addr,
250                addr.ip()
251            );
252
253            // 3.1 Connects to target host
254            let mut target_stream = match TokioTcpStream::connect(addr).await {
255                Ok(stream) => make_timed_stream(stream),
256                Err(e) => {
257                    log::error!(
258                        "Unable to connect to {} ({}): {}, peer {}",
259                        target_addr,
260                        addr.ip(),
261                        e,
262                        peer
263                    );
264                    return;
265                }
266            };
267
268            // 3.2 Establishes connection between ss-local and target
269            transfer(&mut stream, &mut target_stream, &trans).await;
270        }
271        _ => {
272            trans = format!("{} <=> {}", peer, target_addr);
273
274            if log::log_enabled!(log::Level::Debug) {
275                let mut str = format!("Proxy target address: {} -> {}", peer, target_addr);
276
277                if let Some(addr) = target_socket_addr {
278                    str.push_str(&format!(" ({})", addr.ip()));
279                }
280
281                log::debug!("{}", str);
282            }
283
284            // 3.1 Connects to ss-remote
285            let mut target_stream = match TokioTcpStream::connect(remote_addr).await {
286                Ok(stream) => make_timed_stream(SsTcpStream::new(stream, method, &key, ctx)),
287                Err(e) => {
288                    log::error!("Unable to connect to {}: {}, peer {}", remote_addr, e, peer);
289                    return;
290                }
291            };
292
293            // 3.2 Writes target address
294            let target_addr_bytes = target_addr.get_raw_parts();
295            match target_stream.write_all(&target_addr_bytes).await {
296                Ok(_) => {}
297                Err(e) => {
298                    log::error!(
299                        "Write target address to {} failed: {}, peer {}",
300                        remote_addr,
301                        e,
302                        peer
303                    );
304                    return;
305                }
306            }
307
308            // 3.3 Establishes connection between ss-local and ss-remote
309            transfer(&mut stream, &mut target_stream, &trans).await;
310        }
311    }
312}
313
314async fn transfer<A, B>(a: &mut A, b: &mut B, trans: &str)
315where
316    A: AsyncRead + AsyncWrite + Unpin + ?Sized,
317    B: AsyncRead + AsyncWrite + Unpin + ?Sized,
318{
319    match tokio::io::copy_bidirectional(a, b).await {
320        Ok((atob, btoa)) => log::trace!("{} done: ltor {} bytes, rtol {} bytes", trans, atob, btoa),
321        Err(e) => match e.kind() {
322            ErrorKind::Other => log::warn!("{} error: {}", trans, e),
323            _ => log::debug!("{} error: {}", trans, e),
324        },
325    }
326}
327
328async fn read_to_end<R>(reader: &mut R) -> io::Result<()>
329where
330    R: AsyncRead + Unpin + ?Sized,
331{
332    let mut buf = [0; 2048];
333
334    loop {
335        let n = reader.read(&mut buf).await?;
336        if n == 0 {
337            break;
338        }
339    }
340
341    Ok(())
342}
343
344fn make_timed_stream<T>(stream: T) -> TimeoutStream<T> {
345    TimeoutStream::new(stream, constants::DEFAULT_TIMEOUT)
346}