shadowsocks_service/server/
tcprelay.rs

1//! Shadowsocks TCP server
2
3use std::{
4    future::Future,
5    io::{self, ErrorKind},
6    net::SocketAddr,
7    sync::Arc,
8    time::Duration,
9};
10
11use log::{debug, error, info, trace, warn};
12use shadowsocks::{
13    ProxyListener, ServerConfig,
14    crypto::CipherKind,
15    net::{AcceptOpts, TcpStream as OutboundTcpStream},
16    relay::tcprelay::{ProxyServerStream, utils::copy_encrypted_bidirectional},
17};
18use tokio::{
19    io::{AsyncReadExt, AsyncWriteExt},
20    net::TcpStream as TokioTcpStream,
21    time,
22};
23
24use crate::net::{MonProxyStream, utils::ignore_until_end};
25
26use super::context::ServiceContext;
27
28/// TCP server instance
29pub struct TcpServer {
30    context: Arc<ServiceContext>,
31    svr_cfg: ServerConfig,
32    listener: ProxyListener,
33}
34
35impl TcpServer {
36    pub(crate) async fn new(
37        context: Arc<ServiceContext>,
38        svr_cfg: ServerConfig,
39        accept_opts: AcceptOpts,
40    ) -> io::Result<Self> {
41        let listener = ProxyListener::bind_with_opts(context.context(), &svr_cfg, accept_opts).await?;
42        Ok(Self {
43            context,
44            svr_cfg,
45            listener,
46        })
47    }
48
49    /// Server's configuration
50    pub fn server_config(&self) -> &ServerConfig {
51        &self.svr_cfg
52    }
53
54    /// Server's listen address
55    pub fn local_addr(&self) -> io::Result<SocketAddr> {
56        self.listener.local_addr()
57    }
58
59    /// Start server's accept loop
60    pub async fn run(self) -> io::Result<()> {
61        info!(
62            "shadowsocks tcp server listening on {}, inbound address {}",
63            self.listener.local_addr().expect("listener.local_addr"),
64            self.svr_cfg.addr()
65        );
66
67        loop {
68            let flow_stat = self.context.flow_stat();
69
70            let (local_stream, peer_addr) = match self
71                .listener
72                .accept_map(|s| MonProxyStream::from_stream(s, flow_stat))
73                .await
74            {
75                Ok(s) => s,
76                Err(err) => {
77                    error!("tcp server accept failed with error: {}", err);
78                    time::sleep(Duration::from_secs(1)).await;
79                    continue;
80                }
81            };
82
83            if self.context.check_client_blocked(&peer_addr) {
84                warn!("access denied from {} by ACL rules", peer_addr);
85                continue;
86            }
87
88            let client = TcpServerClient {
89                context: self.context.clone(),
90                method: self.svr_cfg.method(),
91                peer_addr,
92                stream: local_stream,
93                timeout: self.svr_cfg.timeout(),
94            };
95
96            tokio::spawn(async move {
97                if let Err(err) = client.serve().await {
98                    debug!("tcp server stream aborted with error: {}", err);
99                }
100            });
101        }
102    }
103}
104
105#[inline]
106async fn timeout_fut<F, R>(duration: Option<Duration>, f: F) -> io::Result<R>
107where
108    F: Future<Output = io::Result<R>>,
109{
110    match duration {
111        None => f.await,
112        Some(d) => match time::timeout(d, f).await {
113            Ok(o) => o,
114            Err(..) => Err(ErrorKind::TimedOut.into()),
115        },
116    }
117}
118
119struct TcpServerClient {
120    context: Arc<ServiceContext>,
121    method: CipherKind,
122    peer_addr: SocketAddr,
123    stream: ProxyServerStream<MonProxyStream<TokioTcpStream>>,
124    timeout: Option<Duration>,
125}
126
127impl TcpServerClient {
128    async fn serve(mut self) -> io::Result<()> {
129        // let target_addr = match Address::read_from(&mut self.stream).await {
130        let target_addr = match timeout_fut(self.timeout, self.stream.handshake()).await {
131            Ok(a) => a,
132            // Err(Socks5Error::IoError(ref err)) if err.kind() == ErrorKind::UnexpectedEof => {
133            //     debug!(
134            //         "handshake failed, received EOF before a complete target Address, peer: {}",
135            //         self.peer_addr
136            //     );
137            //     return Ok(());
138            // }
139            Err(err) if err.kind() == ErrorKind::UnexpectedEof => {
140                debug!(
141                    "tcp handshake failed, received EOF before a complete target Address, peer: {}",
142                    self.peer_addr
143                );
144                return Ok(());
145            }
146            Err(err) if err.kind() == ErrorKind::TimedOut => {
147                debug!(
148                    "tcp handshake failed, timeout before a complete target Address, peer: {}",
149                    self.peer_addr
150                );
151                return Ok(());
152            }
153            Err(err) => {
154                // https://github.com/shadowsocks/shadowsocks-rust/issues/292
155                //
156                // Keep connection open. Except AEAD-2022
157                warn!("tcp handshake failed. peer: {}, {}", self.peer_addr, err);
158
159                #[cfg(feature = "aead-cipher-2022")]
160                if self.method.is_aead_2022() {
161                    // Set SO_LINGER(0) for misbehave clients, which will eventually receive RST. (ECONNRESET)
162                    // This will also prevent the socket entering TIME_WAIT state.
163
164                    let stream = self.stream.into_inner().into_inner();
165                    let _ = stream.set_linger(Some(Duration::ZERO));
166
167                    return Ok(());
168                }
169
170                debug!("tcp silent-drop peer: {}", self.peer_addr);
171
172                // Unwrap and get the plain stream.
173                // Otherwise it will keep reporting decryption error before reaching EOF.
174                //
175                // Note: This will drop all data in the decryption buffer, which is no going back.
176                let mut stream = self.stream.into_inner();
177
178                let res = ignore_until_end(&mut stream).await;
179
180                trace!(
181                    "tcp silent-drop peer: {} is now closing with result {:?}",
182                    self.peer_addr, res
183                );
184
185                return Ok(());
186            }
187        };
188
189        trace!(
190            "accepted tcp client connection {}, establishing tunnel to {}",
191            self.peer_addr, target_addr
192        );
193
194        if self.context.check_outbound_blocked(&target_addr).await {
195            error!(
196                "tcp client {} outbound {} blocked by ACL rules",
197                self.peer_addr, target_addr
198            );
199            return Ok(());
200        }
201
202        let mut remote_stream = match timeout_fut(
203            self.timeout,
204            OutboundTcpStream::connect_remote_with_opts(
205                self.context.context_ref(),
206                &target_addr,
207                self.context.connect_opts_ref(),
208            ),
209        )
210        .await
211        {
212            Ok(s) => s,
213            Err(err) => {
214                error!(
215                    "tcp tunnel {} -> {} connect failed, error: {}",
216                    self.peer_addr, target_addr, err
217                );
218                return Err(err);
219            }
220        };
221
222        // https://github.com/shadowsocks/shadowsocks-rust/issues/232
223        //
224        // Protocols like FTP, clients will wait for servers to send Welcome Message without sending anything.
225        //
226        // Wait at most 500ms, and then sends handshake packet to remote servers.
227        if self.context.connect_opts_ref().tcp.fastopen {
228            let mut buffer = [0u8; 8192];
229            match time::timeout(Duration::from_millis(500), self.stream.read(&mut buffer)).await {
230                Ok(Ok(0)) => {
231                    // EOF. Just terminate right here.
232                    return Ok(());
233                }
234                Ok(Ok(n)) => {
235                    // Send the first packet.
236                    timeout_fut(self.timeout, remote_stream.write_all(&buffer[..n])).await?;
237                }
238                Ok(Err(err)) => return Err(err),
239                Err(..) => {
240                    // Timeout. Send handshake to server.
241                    timeout_fut(self.timeout, remote_stream.write(&[])).await?;
242
243                    trace!(
244                        "tcp tunnel {} -> {} sent TFO connect without data",
245                        self.peer_addr, target_addr
246                    );
247                }
248            }
249        }
250
251        debug!(
252            "established tcp tunnel {} <-> {} with {:?}",
253            self.peer_addr,
254            target_addr,
255            self.context.connect_opts_ref()
256        );
257
258        match copy_encrypted_bidirectional(self.method, &mut self.stream, &mut remote_stream).await {
259            Ok((rn, wn)) => {
260                trace!(
261                    "tcp tunnel {} <-> {} closed, L2R {} bytes, R2L {} bytes",
262                    self.peer_addr, target_addr, rn, wn
263                );
264            }
265            Err(err) => {
266                trace!(
267                    "tcp tunnel {} <-> {} closed with error: {}",
268                    self.peer_addr, target_addr, err
269                );
270            }
271        }
272
273        Ok(())
274    }
275}