shadowsocks_service/local/
mod.rs1use std::{io, net::SocketAddr, sync::Arc, time::Duration};
4
5use futures::future;
6use log::trace;
7use shadowsocks::{
8 config::Mode,
9 net::{AcceptOpts, ConnectOpts},
10};
11
12#[cfg(feature = "local-flow-stat")]
13use crate::{config::LocalFlowStatAddress, net::FlowStat};
14use crate::{
15 config::{Config, ConfigType, ProtocolType},
16 dns::build_dns_resolver,
17 utils::ServerHandle,
18};
19
20use self::{
21 context::ServiceContext,
22 loadbalancing::{PingBalancer, PingBalancerBuilder},
23};
24
25#[cfg(feature = "local-dns")]
26use self::dns::{Dns, DnsBuilder};
27#[cfg(feature = "local-fake-dns")]
28use self::fake_dns::{FakeDns, FakeDnsBuilder};
29#[cfg(feature = "local-http")]
30use self::http::{Http, HttpBuilder};
31#[cfg(feature = "local-online-config")]
32use self::online_config::{OnlineConfigService, OnlineConfigServiceBuilder};
33#[cfg(feature = "local-redir")]
34use self::redir::{Redir, RedirBuilder};
35use self::socks::{Socks, SocksBuilder};
36#[cfg(feature = "local-tun")]
37use self::tun::{Tun, TunBuilder};
38#[cfg(feature = "local-tunnel")]
39use self::tunnel::{Tunnel, TunnelBuilder};
40
41pub mod context;
42#[cfg(feature = "local-dns")]
43pub mod dns;
44#[cfg(feature = "local-fake-dns")]
45pub mod fake_dns;
46#[cfg(feature = "local-http")]
47pub mod http;
48pub mod loadbalancing;
49pub mod net;
50#[cfg(feature = "local-online-config")]
51pub mod online_config;
52#[cfg(feature = "local-redir")]
53pub mod redir;
54pub mod socks;
55#[cfg(feature = "local-tun")]
56pub mod tun;
57#[cfg(feature = "local-tunnel")]
58pub mod tunnel;
59pub mod utils;
60
61pub(crate) const LOCAL_DEFAULT_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(15);
65
66pub struct Server {
68 balancer: PingBalancer,
69 socks_servers: Vec<Socks>,
70 #[cfg(feature = "local-tunnel")]
71 tunnel_servers: Vec<Tunnel>,
72 #[cfg(feature = "local-http")]
73 http_servers: Vec<Http>,
74 #[cfg(feature = "local-tun")]
75 tun_servers: Vec<Tun>,
76 #[cfg(feature = "local-dns")]
77 dns_servers: Vec<Dns>,
78 #[cfg(feature = "local-redir")]
79 redir_servers: Vec<Redir>,
80 #[cfg(feature = "local-fake-dns")]
81 fake_dns_servers: Vec<FakeDns>,
82 #[cfg(feature = "local-flow-stat")]
83 local_stat_addr: Option<LocalFlowStatAddress>,
84 #[cfg(feature = "local-flow-stat")]
85 flow_stat: Arc<FlowStat>,
86 #[cfg(feature = "local-online-config")]
87 online_config: Option<OnlineConfigService>,
88}
89
90impl Server {
91 pub async fn new(config: Config) -> io::Result<Self> {
93 assert!(config.config_type == ConfigType::Local && !config.local.is_empty());
94
95 trace!("{:?}", config);
96
97 #[cfg(feature = "stream-cipher")]
100 for inst in config.server.iter() {
101 let server = &inst.config;
102
103 if server.method().is_stream() {
104 log::warn!(
105 "stream cipher {} for server {} have inherent weaknesses (see discussion in https://github.com/shadowsocks/shadowsocks-org/issues/36). \
106 DO NOT USE. It will be removed in the future.",
107 server.method(),
108 server.addr()
109 );
110 }
111 }
112
113 #[cfg(all(unix, not(target_os = "android")))]
114 if let Some(nofile) = config.nofile {
115 use crate::sys::set_nofile;
116 if let Err(err) = set_nofile(nofile) {
117 log::warn!("set_nofile {} failed, error: {}", nofile, err);
118 }
119 }
120
121 let mut context = ServiceContext::new();
124
125 let mut connect_opts = ConnectOpts {
126 #[cfg(any(target_os = "linux", target_os = "android"))]
127 fwmark: config.outbound_fwmark,
128 #[cfg(target_os = "freebsd")]
129 user_cookie: config.outbound_user_cookie,
130
131 #[cfg(target_os = "android")]
132 vpn_protect_path: config.outbound_vpn_protect_path,
133
134 bind_interface: config.outbound_bind_interface,
135 bind_local_addr: config.outbound_bind_addr.map(|ip| SocketAddr::new(ip, 0)),
136
137 ..Default::default()
138 };
139 connect_opts.tcp.send_buffer_size = config.outbound_send_buffer_size;
140 connect_opts.tcp.recv_buffer_size = config.outbound_recv_buffer_size;
141 connect_opts.tcp.nodelay = config.no_delay;
142 connect_opts.tcp.fastopen = config.fast_open;
143 connect_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT));
144 connect_opts.tcp.mptcp = config.mptcp;
145 connect_opts.udp.mtu = config.udp_mtu;
146 connect_opts.udp.allow_fragmentation = config.outbound_udp_allow_fragmentation;
147 context.set_connect_opts(connect_opts);
148
149 let mut accept_opts = AcceptOpts {
150 ipv6_only: config.ipv6_only,
151 ..Default::default()
152 };
153 accept_opts.tcp.send_buffer_size = config.inbound_send_buffer_size;
154 accept_opts.tcp.recv_buffer_size = config.inbound_recv_buffer_size;
155 accept_opts.tcp.nodelay = config.no_delay;
156 accept_opts.tcp.fastopen = config.fast_open;
157 accept_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT));
158 accept_opts.tcp.mptcp = config.mptcp;
159 accept_opts.udp.mtu = config.udp_mtu;
160 context.set_accept_opts(accept_opts);
161
162 if let Some(resolver) = build_dns_resolver(
163 config.dns,
164 config.ipv6_first,
165 config.dns_cache_size,
166 context.connect_opts_ref(),
167 )
168 .await
169 {
170 context.set_dns_resolver(Arc::new(resolver));
171 }
172
173 if config.ipv6_first {
174 context.set_ipv6_first(config.ipv6_first);
175 }
176
177 if let Some(acl) = config.acl {
178 context.set_acl(Arc::new(acl));
179 }
180
181 context.set_security_config(&config.security);
182
183 assert!(!config.local.is_empty(), "no valid local server configuration");
184
185 let balancer = {
187 let mut mode: Option<Mode> = None;
188
189 for local in &config.local {
190 mode = Some(match mode {
191 None => local.config.mode,
192 Some(m) => m.merge(local.config.mode),
193 });
194 }
195
196 let mode = mode.unwrap_or(Mode::TcpOnly);
197
198 let mut balancer_builder = PingBalancerBuilder::new(Arc::new(context.clone()), mode);
200
201 if let Some(rtt) = config.balancer.max_server_rtt {
203 balancer_builder.max_server_rtt(rtt);
204 }
205
206 if let Some(intv) = config.balancer.check_interval {
207 balancer_builder.check_interval(intv);
208 }
209
210 if let Some(intv) = config.balancer.check_best_interval {
211 balancer_builder.check_best_interval(intv);
212 }
213
214 for server in config.server {
215 balancer_builder.add_server(server);
216 }
217
218 balancer_builder.build().await?
219 };
220
221 let mut local_server = Self {
222 balancer: balancer.clone(),
223 socks_servers: Vec::new(),
224 #[cfg(feature = "local-tunnel")]
225 tunnel_servers: Vec::new(),
226 #[cfg(feature = "local-http")]
227 http_servers: Vec::new(),
228 #[cfg(feature = "local-tun")]
229 tun_servers: Vec::new(),
230 #[cfg(feature = "local-dns")]
231 dns_servers: Vec::new(),
232 #[cfg(feature = "local-redir")]
233 redir_servers: Vec::new(),
234 #[cfg(feature = "local-fake-dns")]
235 fake_dns_servers: Vec::new(),
236 #[cfg(feature = "local-flow-stat")]
237 local_stat_addr: config.local_stat_addr,
238 #[cfg(feature = "local-flow-stat")]
239 flow_stat: context.flow_stat(),
240 #[cfg(feature = "local-online-config")]
241 online_config: match config.online_config {
242 None => None,
243 Some(online_config) => {
244 let mut builder = OnlineConfigServiceBuilder::new(
245 Arc::new(context.clone()),
246 online_config.config_url,
247 balancer.clone(),
248 );
249 if let Some(update_interval) = online_config.update_interval {
250 builder.set_update_interval(update_interval);
251 }
252 Some(builder.build().await?)
253 }
254 },
255 };
256
257 for local_instance in config.local {
258 let local_config = local_instance.config;
259
260 let mut context = context.clone();
263
264 if let Some(acl) = local_instance.acl {
266 context.set_acl(Arc::new(acl))
267 }
268
269 let context = Arc::new(context);
270 let balancer = balancer.clone();
271
272 match local_config.protocol {
273 ProtocolType::Socks => {
274 let client_addr = match local_config.addr {
275 Some(a) => a,
276 None => return Err(io::Error::other("socks requires local address")),
277 };
278
279 let mut server_builder = SocksBuilder::with_context(context.clone(), client_addr, balancer);
280 server_builder.set_mode(local_config.mode);
281 server_builder.set_socks5_auth(local_config.socks5_auth);
282
283 if let Some(c) = config.udp_max_associations {
284 server_builder.set_udp_capacity(c);
285 }
286 if let Some(d) = config.udp_timeout {
287 server_builder.set_udp_expiry_duration(d);
288 }
289 if let Some(b) = local_config.udp_addr {
290 server_builder.set_udp_bind_addr(b.clone());
291 }
292 if let Some(b) = local_config.udp_associate_addr {
293 server_builder.set_udp_associate_addr(b.clone());
294 }
295
296 #[cfg(target_os = "macos")]
297 if let Some(n) = local_config.launchd_tcp_socket_name {
298 server_builder.set_launchd_tcp_socket_name(n);
299 }
300 #[cfg(target_os = "macos")]
301 if let Some(n) = local_config.launchd_udp_socket_name {
302 server_builder.set_launchd_udp_socket_name(n);
303 }
304
305 let server = server_builder.build().await?;
306 local_server.socks_servers.push(server);
307 }
308 #[cfg(feature = "local-tunnel")]
309 ProtocolType::Tunnel => {
310 let client_addr = match local_config.addr {
311 Some(a) => a,
312 None => return Err(io::Error::other("tunnel requires local address")),
313 };
314
315 let forward_addr = local_config.forward_addr.expect("tunnel requires forward address");
316
317 let mut server_builder =
318 TunnelBuilder::with_context(context.clone(), forward_addr.clone(), client_addr, balancer);
319
320 if let Some(c) = config.udp_max_associations {
321 server_builder.set_udp_capacity(c);
322 }
323 if let Some(d) = config.udp_timeout {
324 server_builder.set_udp_expiry_duration(d);
325 }
326 server_builder.set_mode(local_config.mode);
327 if let Some(udp_addr) = local_config.udp_addr {
328 server_builder.set_udp_bind_addr(udp_addr);
329 }
330
331 #[cfg(target_os = "macos")]
332 if let Some(n) = local_config.launchd_tcp_socket_name {
333 server_builder.set_launchd_tcp_socket_name(n);
334 }
335 #[cfg(target_os = "macos")]
336 if let Some(n) = local_config.launchd_udp_socket_name {
337 server_builder.set_launchd_udp_socket_name(n);
338 }
339
340 let server = server_builder.build().await?;
341 local_server.tunnel_servers.push(server);
342 }
343 #[cfg(feature = "local-http")]
344 ProtocolType::Http => {
345 let client_addr = match local_config.addr {
346 Some(a) => a,
347 None => return Err(io::Error::other("http requires local address")),
348 };
349
350 #[allow(unused_mut)]
351 let mut builder = HttpBuilder::with_context(context.clone(), client_addr, balancer);
352
353 #[cfg(target_os = "macos")]
354 if let Some(n) = local_config.launchd_tcp_socket_name {
355 builder.set_launchd_tcp_socket_name(n);
356 }
357
358 let server = builder.build().await?;
359 local_server.http_servers.push(server);
360 }
361 #[cfg(feature = "local-redir")]
362 ProtocolType::Redir => {
363 let client_addr = match local_config.addr {
364 Some(a) => a,
365 None => return Err(io::Error::other("redir requires local address")),
366 };
367
368 let mut server_builder = RedirBuilder::with_context(context.clone(), client_addr, balancer);
369 if let Some(c) = config.udp_max_associations {
370 server_builder.set_udp_capacity(c);
371 }
372 if let Some(d) = config.udp_timeout {
373 server_builder.set_udp_expiry_duration(d);
374 }
375 server_builder.set_mode(local_config.mode);
376 server_builder.set_tcp_redir(local_config.tcp_redir);
377 server_builder.set_udp_redir(local_config.udp_redir);
378 if let Some(udp_addr) = local_config.udp_addr {
379 server_builder.set_udp_bind_addr(udp_addr);
380 }
381
382 let server = server_builder.build().await?;
383 local_server.redir_servers.push(server);
384 }
385 #[cfg(feature = "local-dns")]
386 ProtocolType::Dns => {
387 let client_addr = match local_config.addr {
388 Some(a) => a,
389 None => return Err(io::Error::other("dns requires local address")),
390 };
391
392 let mut server_builder = {
393 let local_addr = local_config.local_dns_addr.expect("missing local_dns_addr");
394 let remote_addr = local_config.remote_dns_addr.expect("missing remote_dns_addr");
395 let client_cache_size = local_config.client_cache_size.unwrap_or(5);
396
397 DnsBuilder::with_context(
398 context.clone(),
399 client_addr,
400 local_addr.clone(),
401 remote_addr.clone(),
402 balancer,
403 client_cache_size,
404 )
405 };
406 server_builder.set_mode(local_config.mode);
407
408 #[cfg(target_os = "macos")]
409 if let Some(n) = local_config.launchd_tcp_socket_name {
410 server_builder.set_launchd_tcp_socket_name(n);
411 }
412 #[cfg(target_os = "macos")]
413 if let Some(n) = local_config.launchd_udp_socket_name {
414 server_builder.set_launchd_udp_socket_name(n);
415 }
416
417 let server = server_builder.build().await?;
418 local_server.dns_servers.push(server);
419 }
420 #[cfg(feature = "local-tun")]
421 ProtocolType::Tun => {
422 let mut builder = TunBuilder::new(context.clone(), balancer);
423 if let Some(address) = local_config.tun_interface_address {
424 builder.address(address);
425 }
426 if let Some(address) = local_config.tun_interface_destination {
427 builder.destination(address);
428 }
429 if let Some(name) = local_config.tun_interface_name {
430 builder.name(&name);
431 }
432 if let Some(c) = config.udp_max_associations {
433 builder.udp_capacity(c);
434 }
435 if let Some(d) = config.udp_timeout {
436 builder.udp_expiry_duration(d);
437 }
438 builder.mode(local_config.mode);
439 #[cfg(unix)]
440 if let Some(fd) = local_config.tun_device_fd {
441 builder.file_descriptor(fd);
442 } else if let Some(ref fd_path) = local_config.tun_device_fd_from_path {
443 use std::fs;
444
445 use log::info;
446 use shadowsocks::net::UnixListener;
447
448 let _ = fs::remove_file(fd_path);
449
450 let listener = match UnixListener::bind(fd_path) {
451 Ok(l) => l,
452 Err(err) => {
453 log::error!("failed to bind uds path \"{}\", error: {}", fd_path.display(), err);
454 return Err(err);
455 }
456 };
457
458 info!("waiting tun's file descriptor from {}", fd_path.display());
459
460 loop {
461 let (mut stream, peer_addr) = listener.accept().await?;
462 trace!("accepted {:?} for receiving tun file descriptor", peer_addr);
463
464 let mut buffer = [0u8; 1024];
465 let mut fd_buffer = [0];
466
467 match stream.recv_with_fd(&mut buffer, &mut fd_buffer).await {
468 Ok((n, fd_size)) => {
469 if fd_size == 0 {
470 log::error!(
471 "client {:?} didn't send file descriptors with buffer.size {} bytes",
472 peer_addr,
473 n
474 );
475 continue;
476 }
477
478 info!("got file descriptor {} for tun from {:?}", fd_buffer[0], peer_addr);
479
480 builder.file_descriptor(fd_buffer[0]);
481 break;
482 }
483 Err(err) => {
484 log::error!(
485 "failed to receive file descriptors from {:?}, error: {}",
486 peer_addr,
487 err
488 );
489 }
490 }
491 }
492 }
493 let server = builder.build().await?;
494 local_server.tun_servers.push(server);
495 }
496 #[cfg(feature = "local-fake-dns")]
497 ProtocolType::FakeDns => {
498 let client_addr = match local_config.addr {
499 Some(a) => a,
500 None => return Err(io::Error::other("dns requires local address")),
501 };
502
503 let mut builder = FakeDnsBuilder::new(client_addr);
504 if let Some(n) = local_config.fake_dns_ipv4_network {
505 builder.set_ipv4_network(n);
506 }
507 if let Some(n) = local_config.fake_dns_ipv6_network {
508 builder.set_ipv6_network(n);
509 }
510 if let Some(exp) = local_config.fake_dns_record_expire_duration {
511 builder.set_expire_duration(exp);
512 }
513 if let Some(p) = local_config.fake_dns_database_path {
514 builder.set_database_path(p);
515 }
516 let server = builder.build().await?;
517 #[cfg(feature = "local-fake-dns")]
518 context.add_fake_dns_manager(server.clone_manager()).await;
519
520 local_server.fake_dns_servers.push(server);
521 }
522 }
523 }
524
525 Ok(local_server)
526 }
527
528 pub async fn run(self) -> io::Result<()> {
530 let mut vfut = Vec::new();
531
532 for svr in self.socks_servers {
533 vfut.push(ServerHandle(tokio::spawn(svr.run())));
534 }
535
536 #[cfg(feature = "local-tunnel")]
537 for svr in self.tunnel_servers {
538 vfut.push(ServerHandle(tokio::spawn(svr.run())));
539 }
540
541 #[cfg(feature = "local-http")]
542 for svr in self.http_servers {
543 vfut.push(ServerHandle(tokio::spawn(svr.run())));
544 }
545
546 #[cfg(feature = "local-tun")]
547 for svr in self.tun_servers {
548 vfut.push(ServerHandle(tokio::spawn(svr.run())));
549 }
550
551 #[cfg(feature = "local-dns")]
552 for svr in self.dns_servers {
553 vfut.push(ServerHandle(tokio::spawn(svr.run())));
554 }
555
556 #[cfg(feature = "local-redir")]
557 for svr in self.redir_servers {
558 vfut.push(ServerHandle(tokio::spawn(svr.run())));
559 }
560
561 #[cfg(feature = "local-fake-dns")]
562 for svr in self.fake_dns_servers {
563 vfut.push(ServerHandle(tokio::spawn(svr.run())));
564 }
565
566 #[cfg(feature = "local-flow-stat")]
567 if let Some(stat_addr) = self.local_stat_addr {
568 let report_fut = flow_report_task(stat_addr, self.flow_stat);
571 vfut.push(ServerHandle(tokio::spawn(report_fut)));
572 }
573
574 #[cfg(feature = "local-online-config")]
575 if let Some(online_config) = self.online_config {
576 vfut.push(ServerHandle(tokio::spawn(online_config.run())));
577 }
578
579 let (res, ..) = future::select_all(vfut).await;
580 res
581 }
582
583 pub fn server_balancer(&self) -> &PingBalancer {
585 &self.balancer
586 }
587
588 pub fn socks_servers(&self) -> &[Socks] {
590 &self.socks_servers
591 }
592
593 #[cfg(feature = "local-tunnel")]
595 pub fn tunnel_servers(&self) -> &[Tunnel] {
596 &self.tunnel_servers
597 }
598
599 #[cfg(feature = "local-http")]
601 pub fn http_servers(&self) -> &[Http] {
602 &self.http_servers
603 }
604
605 #[cfg(feature = "local-tun")]
607 pub fn tun_servers(&self) -> &[Tun] {
608 &self.tun_servers
609 }
610
611 #[cfg(feature = "local-dns")]
613 pub fn dns_servers(&self) -> &[Dns] {
614 &self.dns_servers
615 }
616
617 #[cfg(feature = "local-redir")]
619 pub fn redir_servers(&self) -> &[Redir] {
620 &self.redir_servers
621 }
622
623 #[cfg(feature = "local-fake-dns")]
625 pub fn fake_dns_servers(&self) -> &[FakeDns] {
626 &self.fake_dns_servers
627 }
628}
629
630#[cfg(feature = "local-flow-stat")]
631async fn flow_report_task(stat_addr: LocalFlowStatAddress, flow_stat: Arc<FlowStat>) -> io::Result<()> {
632 use std::slice;
633
634 use log::debug;
635 use tokio::{io::AsyncWriteExt, time};
636
637 let timeout = Duration::from_secs(1);
639
640 loop {
641 time::sleep(Duration::from_millis(500)).await;
643
644 let tx = flow_stat.tx();
645 let rx = flow_stat.rx();
646
647 let buf: [u64; 2] = [tx, rx];
648 let buf = unsafe { slice::from_raw_parts(buf.as_ptr() as *const _, 16) };
649
650 match stat_addr {
651 #[cfg(unix)]
652 LocalFlowStatAddress::UnixStreamPath(ref stat_path) => {
653 use tokio::net::UnixStream;
654
655 let mut stream = match time::timeout(timeout, UnixStream::connect(stat_path)).await {
656 Ok(Ok(s)) => s,
657 Ok(Err(err)) => {
658 debug!("send client flow statistic error: {}", err);
659 continue;
660 }
661 Err(..) => {
662 debug!("send client flow statistic error: timeout");
663 continue;
664 }
665 };
666
667 match time::timeout(timeout, stream.write_all(buf)).await {
668 Ok(Ok(..)) => {}
669 Ok(Err(err)) => {
670 debug!("send client flow statistic error: {}", err);
671 }
672 Err(..) => {
673 debug!("send client flow statistic error: timeout");
674 }
675 }
676 }
677 LocalFlowStatAddress::TcpStreamAddr(stat_addr) => {
678 use tokio::net::TcpStream;
679
680 let mut stream = match time::timeout(timeout, TcpStream::connect(stat_addr)).await {
681 Ok(Ok(s)) => s,
682 Ok(Err(err)) => {
683 debug!("send client flow statistic error: {}", err);
684 continue;
685 }
686 Err(..) => {
687 debug!("send client flow statistic error: timeout");
688 continue;
689 }
690 };
691
692 match time::timeout(timeout, stream.write_all(buf)).await {
693 Ok(Ok(..)) => {}
694 Ok(Err(err)) => {
695 debug!("send client flow statistic error: {}", err);
696 }
697 Err(..) => {
698 debug!("send client flow statistic error: timeout");
699 }
700 }
701 }
702 }
703 }
704}
705
706pub async fn run(config: Config) -> io::Result<()> {
708 Server::new(config).await?.run().await
709}