shadowsocks_service/server/
tcprelay.rs1use std::{
4 future::Future,
5 io::{self, ErrorKind},
6 net::SocketAddr,
7 sync::Arc,
8 time::Duration,
9};
10
11use log::{debug, error, info, trace, warn};
12use shadowsocks::{
13 ProxyListener, ServerConfig,
14 crypto::CipherKind,
15 net::{AcceptOpts, TcpStream as OutboundTcpStream},
16 relay::tcprelay::{ProxyServerStream, utils::copy_encrypted_bidirectional},
17};
18use tokio::{
19 io::{AsyncReadExt, AsyncWriteExt},
20 net::TcpStream as TokioTcpStream,
21 time,
22};
23
24use crate::net::{MonProxyStream, utils::ignore_until_end};
25
26use super::context::ServiceContext;
27
28pub struct TcpServer {
30 context: Arc<ServiceContext>,
31 svr_cfg: ServerConfig,
32 listener: ProxyListener,
33}
34
35impl TcpServer {
36 pub(crate) async fn new(
37 context: Arc<ServiceContext>,
38 svr_cfg: ServerConfig,
39 accept_opts: AcceptOpts,
40 ) -> io::Result<Self> {
41 let listener = ProxyListener::bind_with_opts(context.context(), &svr_cfg, accept_opts).await?;
42 Ok(Self {
43 context,
44 svr_cfg,
45 listener,
46 })
47 }
48
49 pub fn server_config(&self) -> &ServerConfig {
51 &self.svr_cfg
52 }
53
54 pub fn local_addr(&self) -> io::Result<SocketAddr> {
56 self.listener.local_addr()
57 }
58
59 pub async fn run(self) -> io::Result<()> {
61 info!(
62 "shadowsocks tcp server listening on {}, inbound address {}",
63 self.listener.local_addr().expect("listener.local_addr"),
64 self.svr_cfg.addr()
65 );
66
67 loop {
68 let flow_stat = self.context.flow_stat();
69
70 let (local_stream, peer_addr) = match self
71 .listener
72 .accept_map(|s| MonProxyStream::from_stream(s, flow_stat))
73 .await
74 {
75 Ok(s) => s,
76 Err(err) => {
77 error!("tcp server accept failed with error: {}", err);
78 time::sleep(Duration::from_secs(1)).await;
79 continue;
80 }
81 };
82
83 if self.context.check_client_blocked(&peer_addr) {
84 warn!("access denied from {} by ACL rules", peer_addr);
85 continue;
86 }
87
88 let client = TcpServerClient {
89 context: self.context.clone(),
90 method: self.svr_cfg.method(),
91 peer_addr,
92 stream: local_stream,
93 timeout: self.svr_cfg.timeout(),
94 };
95
96 tokio::spawn(async move {
97 if let Err(err) = client.serve().await {
98 debug!("tcp server stream aborted with error: {}", err);
99 }
100 });
101 }
102 }
103}
104
105#[inline]
106async fn timeout_fut<F, R>(duration: Option<Duration>, f: F) -> io::Result<R>
107where
108 F: Future<Output = io::Result<R>>,
109{
110 match duration {
111 None => f.await,
112 Some(d) => match time::timeout(d, f).await {
113 Ok(o) => o,
114 Err(..) => Err(ErrorKind::TimedOut.into()),
115 },
116 }
117}
118
119struct TcpServerClient {
120 context: Arc<ServiceContext>,
121 method: CipherKind,
122 peer_addr: SocketAddr,
123 stream: ProxyServerStream<MonProxyStream<TokioTcpStream>>,
124 timeout: Option<Duration>,
125}
126
127impl TcpServerClient {
128 async fn serve(mut self) -> io::Result<()> {
129 let target_addr = match timeout_fut(self.timeout, self.stream.handshake()).await {
131 Ok(a) => a,
132 Err(err) if err.kind() == ErrorKind::UnexpectedEof => {
140 debug!(
141 "tcp handshake failed, received EOF before a complete target Address, peer: {}",
142 self.peer_addr
143 );
144 return Ok(());
145 }
146 Err(err) if err.kind() == ErrorKind::TimedOut => {
147 debug!(
148 "tcp handshake failed, timeout before a complete target Address, peer: {}",
149 self.peer_addr
150 );
151 return Ok(());
152 }
153 Err(err) => {
154 warn!("tcp handshake failed. peer: {}, {}", self.peer_addr, err);
158
159 #[cfg(feature = "aead-cipher-2022")]
160 if self.method.is_aead_2022() {
161 let stream = self.stream.into_inner().into_inner();
165 let _ = stream.set_linger(Some(Duration::ZERO));
166
167 return Ok(());
168 }
169
170 debug!("tcp silent-drop peer: {}", self.peer_addr);
171
172 let mut stream = self.stream.into_inner();
177
178 let res = ignore_until_end(&mut stream).await;
179
180 trace!(
181 "tcp silent-drop peer: {} is now closing with result {:?}",
182 self.peer_addr, res
183 );
184
185 return Ok(());
186 }
187 };
188
189 trace!(
190 "accepted tcp client connection {}, establishing tunnel to {}",
191 self.peer_addr, target_addr
192 );
193
194 if self.context.check_outbound_blocked(&target_addr).await {
195 error!(
196 "tcp client {} outbound {} blocked by ACL rules",
197 self.peer_addr, target_addr
198 );
199 return Ok(());
200 }
201
202 let mut remote_stream = match timeout_fut(
203 self.timeout,
204 OutboundTcpStream::connect_remote_with_opts(
205 self.context.context_ref(),
206 &target_addr,
207 self.context.connect_opts_ref(),
208 ),
209 )
210 .await
211 {
212 Ok(s) => s,
213 Err(err) => {
214 error!(
215 "tcp tunnel {} -> {} connect failed, error: {}",
216 self.peer_addr, target_addr, err
217 );
218 return Err(err);
219 }
220 };
221
222 if self.context.connect_opts_ref().tcp.fastopen {
228 let mut buffer = [0u8; 8192];
229 match time::timeout(Duration::from_millis(500), self.stream.read(&mut buffer)).await {
230 Ok(Ok(0)) => {
231 return Ok(());
233 }
234 Ok(Ok(n)) => {
235 timeout_fut(self.timeout, remote_stream.write_all(&buffer[..n])).await?;
237 }
238 Ok(Err(err)) => return Err(err),
239 Err(..) => {
240 timeout_fut(self.timeout, remote_stream.write(&[])).await?;
242
243 trace!(
244 "tcp tunnel {} -> {} sent TFO connect without data",
245 self.peer_addr, target_addr
246 );
247 }
248 }
249 }
250
251 debug!(
252 "established tcp tunnel {} <-> {} with {:?}",
253 self.peer_addr,
254 target_addr,
255 self.context.connect_opts_ref()
256 );
257
258 match copy_encrypted_bidirectional(self.method, &mut self.stream, &mut remote_stream).await {
259 Ok((rn, wn)) => {
260 trace!(
261 "tcp tunnel {} <-> {} closed, L2R {} bytes, R2L {} bytes",
262 self.peer_addr, target_addr, rn, wn
263 );
264 }
265 Err(err) => {
266 trace!(
267 "tcp tunnel {} <-> {} closed with error: {}",
268 self.peer_addr, target_addr, err
269 );
270 }
271 }
272
273 Ok(())
274 }
275}