shadowsocks_service/local/
mod.rs1use std::{
4 io::{self, ErrorKind},
5 net::SocketAddr,
6 sync::Arc,
7 time::Duration,
8};
9
10use futures::future;
11use log::trace;
12use shadowsocks::{
13 config::Mode,
14 net::{AcceptOpts, ConnectOpts},
15};
16
17#[cfg(feature = "local-flow-stat")]
18use crate::{config::LocalFlowStatAddress, net::FlowStat};
19use crate::{
20 config::{Config, ConfigType, ProtocolType},
21 dns::build_dns_resolver,
22 utils::ServerHandle,
23};
24
25use self::{
26 context::ServiceContext,
27 loadbalancing::{PingBalancer, PingBalancerBuilder},
28};
29
30#[cfg(feature = "local-dns")]
31use self::dns::{Dns, DnsBuilder};
32#[cfg(feature = "local-fake-dns")]
33use self::fake_dns::{FakeDns, FakeDnsBuilder};
34#[cfg(feature = "local-http")]
35use self::http::{Http, HttpBuilder};
36#[cfg(feature = "local-online-config")]
37use self::online_config::{OnlineConfigService, OnlineConfigServiceBuilder};
38#[cfg(feature = "local-redir")]
39use self::redir::{Redir, RedirBuilder};
40use self::socks::{Socks, SocksBuilder};
41#[cfg(feature = "local-tun")]
42use self::tun::{Tun, TunBuilder};
43#[cfg(feature = "local-tunnel")]
44use self::tunnel::{Tunnel, TunnelBuilder};
45
46pub mod context;
47#[cfg(feature = "local-dns")]
48pub mod dns;
49#[cfg(feature = "local-fake-dns")]
50pub mod fake_dns;
51#[cfg(feature = "local-http")]
52pub mod http;
53pub mod loadbalancing;
54pub mod net;
55#[cfg(feature = "local-online-config")]
56pub mod online_config;
57#[cfg(feature = "local-redir")]
58pub mod redir;
59pub mod socks;
60#[cfg(feature = "local-tun")]
61pub mod tun;
62#[cfg(feature = "local-tunnel")]
63pub mod tunnel;
64pub mod utils;
65
66pub(crate) const LOCAL_DEFAULT_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(15);
70
71pub struct Server {
73 balancer: PingBalancer,
74 socks_servers: Vec<Socks>,
75 #[cfg(feature = "local-tunnel")]
76 tunnel_servers: Vec<Tunnel>,
77 #[cfg(feature = "local-http")]
78 http_servers: Vec<Http>,
79 #[cfg(feature = "local-tun")]
80 tun_servers: Vec<Tun>,
81 #[cfg(feature = "local-dns")]
82 dns_servers: Vec<Dns>,
83 #[cfg(feature = "local-redir")]
84 redir_servers: Vec<Redir>,
85 #[cfg(feature = "local-fake-dns")]
86 fake_dns_servers: Vec<FakeDns>,
87 #[cfg(feature = "local-flow-stat")]
88 local_stat_addr: Option<LocalFlowStatAddress>,
89 #[cfg(feature = "local-flow-stat")]
90 flow_stat: Arc<FlowStat>,
91 #[cfg(feature = "local-online-config")]
92 online_config: Option<OnlineConfigService>,
93}
94
95impl Server {
96 pub async fn new(config: Config) -> io::Result<Server> {
98 assert!(config.config_type == ConfigType::Local && !config.local.is_empty());
99
100 trace!("{:?}", config);
101
102 #[cfg(feature = "stream-cipher")]
105 for inst in config.server.iter() {
106 let server = &inst.config;
107
108 if server.method().is_stream() {
109 log::warn!(
110 "stream cipher {} for server {} have inherent weaknesses (see discussion in https://github.com/shadowsocks/shadowsocks-org/issues/36). \
111 DO NOT USE. It will be removed in the future.",
112 server.method(),
113 server.addr()
114 );
115 }
116 }
117
118 #[cfg(all(unix, not(target_os = "android")))]
119 if let Some(nofile) = config.nofile {
120 use crate::sys::set_nofile;
121 if let Err(err) = set_nofile(nofile) {
122 log::warn!("set_nofile {} failed, error: {}", nofile, err);
123 }
124 }
125
126 let mut context = ServiceContext::new();
129
130 let mut connect_opts = ConnectOpts {
131 #[cfg(any(target_os = "linux", target_os = "android"))]
132 fwmark: config.outbound_fwmark,
133
134 #[cfg(target_os = "android")]
135 vpn_protect_path: config.outbound_vpn_protect_path,
136
137 bind_interface: config.outbound_bind_interface,
138 bind_local_addr: config.outbound_bind_addr.map(|ip| SocketAddr::new(ip, 0)),
139
140 ..Default::default()
141 };
142 connect_opts.tcp.send_buffer_size = config.outbound_send_buffer_size;
143 connect_opts.tcp.recv_buffer_size = config.outbound_recv_buffer_size;
144 connect_opts.tcp.nodelay = config.no_delay;
145 connect_opts.tcp.fastopen = config.fast_open;
146 connect_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT));
147 connect_opts.tcp.mptcp = config.mptcp;
148 connect_opts.udp.mtu = config.udp_mtu;
149 connect_opts.udp.allow_fragmentation = config.outbound_udp_allow_fragmentation;
150 context.set_connect_opts(connect_opts);
151
152 let mut accept_opts = AcceptOpts {
153 ipv6_only: config.ipv6_only,
154 ..Default::default()
155 };
156 accept_opts.tcp.send_buffer_size = config.inbound_send_buffer_size;
157 accept_opts.tcp.recv_buffer_size = config.inbound_recv_buffer_size;
158 accept_opts.tcp.nodelay = config.no_delay;
159 accept_opts.tcp.fastopen = config.fast_open;
160 accept_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT));
161 accept_opts.tcp.mptcp = config.mptcp;
162 accept_opts.udp.mtu = config.udp_mtu;
163 context.set_accept_opts(accept_opts);
164
165 if let Some(resolver) = build_dns_resolver(
166 config.dns,
167 config.ipv6_first,
168 config.dns_cache_size,
169 context.connect_opts_ref(),
170 )
171 .await
172 {
173 context.set_dns_resolver(Arc::new(resolver));
174 }
175
176 if config.ipv6_first {
177 context.set_ipv6_first(config.ipv6_first);
178 }
179
180 if let Some(acl) = config.acl {
181 context.set_acl(Arc::new(acl));
182 }
183
184 context.set_security_config(&config.security);
185
186 assert!(!config.local.is_empty(), "no valid local server configuration");
187
188 let balancer = {
190 let mut mode: Option<Mode> = None;
191
192 for local in &config.local {
193 mode = Some(match mode {
194 None => local.config.mode,
195 Some(m) => m.merge(local.config.mode),
196 });
197 }
198
199 let mode = mode.unwrap_or(Mode::TcpOnly);
200
201 let mut balancer_builder = PingBalancerBuilder::new(Arc::new(context.clone()), mode);
203
204 if let Some(rtt) = config.balancer.max_server_rtt {
206 balancer_builder.max_server_rtt(rtt);
207 }
208
209 if let Some(intv) = config.balancer.check_interval {
210 balancer_builder.check_interval(intv);
211 }
212
213 if let Some(intv) = config.balancer.check_best_interval {
214 balancer_builder.check_best_interval(intv);
215 }
216
217 for server in config.server {
218 balancer_builder.add_server(server);
219 }
220
221 balancer_builder.build().await?
222 };
223
224 let mut local_server = Server {
225 balancer: balancer.clone(),
226 socks_servers: Vec::new(),
227 #[cfg(feature = "local-tunnel")]
228 tunnel_servers: Vec::new(),
229 #[cfg(feature = "local-http")]
230 http_servers: Vec::new(),
231 #[cfg(feature = "local-tun")]
232 tun_servers: Vec::new(),
233 #[cfg(feature = "local-dns")]
234 dns_servers: Vec::new(),
235 #[cfg(feature = "local-redir")]
236 redir_servers: Vec::new(),
237 #[cfg(feature = "local-fake-dns")]
238 fake_dns_servers: Vec::new(),
239 #[cfg(feature = "local-flow-stat")]
240 local_stat_addr: config.local_stat_addr,
241 #[cfg(feature = "local-flow-stat")]
242 flow_stat: context.flow_stat(),
243 #[cfg(feature = "local-online-config")]
244 online_config: match config.online_config {
245 None => None,
246 Some(online_config) => {
247 let mut builder = OnlineConfigServiceBuilder::new(
248 Arc::new(context.clone()),
249 online_config.config_url,
250 balancer.clone(),
251 );
252 if let Some(update_interval) = online_config.update_interval {
253 builder.set_update_interval(update_interval);
254 }
255 Some(builder.build().await?)
256 }
257 },
258 };
259
260 for local_instance in config.local {
261 let local_config = local_instance.config;
262
263 let mut context = context.clone();
266
267 if let Some(acl) = local_instance.acl {
269 context.set_acl(Arc::new(acl))
270 }
271
272 let context = Arc::new(context);
273 let balancer = balancer.clone();
274
275 match local_config.protocol {
276 ProtocolType::Socks => {
277 let client_addr = match local_config.addr {
278 Some(a) => a,
279 None => return Err(io::Error::new(ErrorKind::Other, "socks requires local address")),
280 };
281
282 let mut server_builder = SocksBuilder::with_context(context.clone(), client_addr, balancer);
283 server_builder.set_mode(local_config.mode);
284 server_builder.set_socks5_auth(local_config.socks5_auth);
285
286 if let Some(c) = config.udp_max_associations {
287 server_builder.set_udp_capacity(c);
288 }
289 if let Some(d) = config.udp_timeout {
290 server_builder.set_udp_expiry_duration(d);
291 }
292 if let Some(b) = local_config.udp_addr {
293 server_builder.set_udp_bind_addr(b.clone());
294 }
295 if let Some(b) = local_config.udp_associate_addr {
296 server_builder.set_udp_associate_addr(b.clone());
297 }
298
299 #[cfg(target_os = "macos")]
300 if let Some(n) = local_config.launchd_tcp_socket_name {
301 server_builder.set_launchd_tcp_socket_name(n);
302 }
303 #[cfg(target_os = "macos")]
304 if let Some(n) = local_config.launchd_udp_socket_name {
305 server_builder.set_launchd_udp_socket_name(n);
306 }
307
308 let server = server_builder.build().await?;
309 local_server.socks_servers.push(server);
310 }
311 #[cfg(feature = "local-tunnel")]
312 ProtocolType::Tunnel => {
313 let client_addr = match local_config.addr {
314 Some(a) => a,
315 None => return Err(io::Error::new(ErrorKind::Other, "tunnel requires local address")),
316 };
317
318 let forward_addr = local_config.forward_addr.expect("tunnel requires forward address");
319
320 let mut server_builder =
321 TunnelBuilder::with_context(context.clone(), forward_addr.clone(), client_addr, balancer);
322
323 if let Some(c) = config.udp_max_associations {
324 server_builder.set_udp_capacity(c);
325 }
326 if let Some(d) = config.udp_timeout {
327 server_builder.set_udp_expiry_duration(d);
328 }
329 server_builder.set_mode(local_config.mode);
330 if let Some(udp_addr) = local_config.udp_addr {
331 server_builder.set_udp_bind_addr(udp_addr);
332 }
333
334 #[cfg(target_os = "macos")]
335 if let Some(n) = local_config.launchd_tcp_socket_name {
336 server_builder.set_launchd_tcp_socket_name(n);
337 }
338 #[cfg(target_os = "macos")]
339 if let Some(n) = local_config.launchd_udp_socket_name {
340 server_builder.set_launchd_udp_socket_name(n);
341 }
342
343 let server = server_builder.build().await?;
344 local_server.tunnel_servers.push(server);
345 }
346 #[cfg(feature = "local-http")]
347 ProtocolType::Http => {
348 let client_addr = match local_config.addr {
349 Some(a) => a,
350 None => return Err(io::Error::new(ErrorKind::Other, "http requires local address")),
351 };
352
353 #[allow(unused_mut)]
354 let mut builder = HttpBuilder::with_context(context.clone(), client_addr, balancer);
355
356 #[cfg(target_os = "macos")]
357 if let Some(n) = local_config.launchd_tcp_socket_name {
358 builder.set_launchd_tcp_socket_name(n);
359 }
360
361 let server = builder.build().await?;
362 local_server.http_servers.push(server);
363 }
364 #[cfg(feature = "local-redir")]
365 ProtocolType::Redir => {
366 let client_addr = match local_config.addr {
367 Some(a) => a,
368 None => return Err(io::Error::new(ErrorKind::Other, "redir requires local address")),
369 };
370
371 let mut server_builder = RedirBuilder::with_context(context.clone(), client_addr, balancer);
372 if let Some(c) = config.udp_max_associations {
373 server_builder.set_udp_capacity(c);
374 }
375 if let Some(d) = config.udp_timeout {
376 server_builder.set_udp_expiry_duration(d);
377 }
378 server_builder.set_mode(local_config.mode);
379 server_builder.set_tcp_redir(local_config.tcp_redir);
380 server_builder.set_udp_redir(local_config.udp_redir);
381 if let Some(udp_addr) = local_config.udp_addr {
382 server_builder.set_udp_bind_addr(udp_addr);
383 }
384
385 let server = server_builder.build().await?;
386 local_server.redir_servers.push(server);
387 }
388 #[cfg(feature = "local-dns")]
389 ProtocolType::Dns => {
390 let client_addr = match local_config.addr {
391 Some(a) => a,
392 None => return Err(io::Error::new(ErrorKind::Other, "dns requires local address")),
393 };
394
395 let mut server_builder = {
396 let local_addr = local_config.local_dns_addr.expect("missing local_dns_addr");
397 let remote_addr = local_config.remote_dns_addr.expect("missing remote_dns_addr");
398 let client_cache_size = local_config.client_cache_size.unwrap_or(5);
399
400 DnsBuilder::with_context(
401 context.clone(),
402 client_addr,
403 local_addr.clone(),
404 remote_addr.clone(),
405 balancer,
406 client_cache_size,
407 )
408 };
409 server_builder.set_mode(local_config.mode);
410
411 #[cfg(target_os = "macos")]
412 if let Some(n) = local_config.launchd_tcp_socket_name {
413 server_builder.set_launchd_tcp_socket_name(n);
414 }
415 #[cfg(target_os = "macos")]
416 if let Some(n) = local_config.launchd_udp_socket_name {
417 server_builder.set_launchd_udp_socket_name(n);
418 }
419
420 let server = server_builder.build().await?;
421 local_server.dns_servers.push(server);
422 }
423 #[cfg(feature = "local-tun")]
424 ProtocolType::Tun => {
425 let mut builder = TunBuilder::new(context.clone(), balancer);
426 if let Some(address) = local_config.tun_interface_address {
427 builder.address(address);
428 }
429 if let Some(address) = local_config.tun_interface_destination {
430 builder.destination(address);
431 }
432 if let Some(name) = local_config.tun_interface_name {
433 builder.name(&name);
434 }
435 if let Some(c) = config.udp_max_associations {
436 builder.udp_capacity(c);
437 }
438 if let Some(d) = config.udp_timeout {
439 builder.udp_expiry_duration(d);
440 }
441 builder.mode(local_config.mode);
442 #[cfg(unix)]
443 if let Some(fd) = local_config.tun_device_fd {
444 builder.file_descriptor(fd);
445 } else if let Some(ref fd_path) = local_config.tun_device_fd_from_path {
446 use std::fs;
447
448 use log::info;
449 use shadowsocks::net::UnixListener;
450
451 let _ = fs::remove_file(fd_path);
452
453 let listener = match UnixListener::bind(fd_path) {
454 Ok(l) => l,
455 Err(err) => {
456 log::error!("failed to bind uds path \"{}\", error: {}", fd_path.display(), err);
457 return Err(err);
458 }
459 };
460
461 info!("waiting tun's file descriptor from {}", fd_path.display());
462
463 loop {
464 let (mut stream, peer_addr) = listener.accept().await?;
465 trace!("accepted {:?} for receiving tun file descriptor", peer_addr);
466
467 let mut buffer = [0u8; 1024];
468 let mut fd_buffer = [0];
469
470 match stream.recv_with_fd(&mut buffer, &mut fd_buffer).await {
471 Ok((n, fd_size)) => {
472 if fd_size == 0 {
473 log::error!(
474 "client {:?} didn't send file descriptors with buffer.size {} bytes",
475 peer_addr,
476 n
477 );
478 continue;
479 }
480
481 info!("got file descriptor {} for tun from {:?}", fd_buffer[0], peer_addr);
482
483 builder.file_descriptor(fd_buffer[0]);
484 break;
485 }
486 Err(err) => {
487 log::error!(
488 "failed to receive file descriptors from {:?}, error: {}",
489 peer_addr,
490 err
491 );
492 }
493 }
494 }
495 }
496 let server = builder.build().await?;
497 local_server.tun_servers.push(server);
498 }
499 #[cfg(feature = "local-fake-dns")]
500 ProtocolType::FakeDns => {
501 let client_addr = match local_config.addr {
502 Some(a) => a,
503 None => return Err(io::Error::new(ErrorKind::Other, "dns requires local address")),
504 };
505
506 let mut builder = FakeDnsBuilder::new(client_addr);
507 if let Some(n) = local_config.fake_dns_ipv4_network {
508 builder.set_ipv4_network(n);
509 }
510 if let Some(n) = local_config.fake_dns_ipv6_network {
511 builder.set_ipv6_network(n);
512 }
513 if let Some(exp) = local_config.fake_dns_record_expire_duration {
514 builder.set_expire_duration(exp);
515 }
516 if let Some(p) = local_config.fake_dns_database_path {
517 builder.set_database_path(p);
518 }
519 let server = builder.build().await?;
520 #[cfg(feature = "local-fake-dns")]
521 context.add_fake_dns_manager(server.clone_manager()).await;
522
523 local_server.fake_dns_servers.push(server);
524 }
525 }
526 }
527
528 Ok(local_server)
529 }
530
531 pub async fn run(self) -> io::Result<()> {
533 let mut vfut = Vec::new();
534
535 for svr in self.socks_servers {
536 vfut.push(ServerHandle(tokio::spawn(svr.run())));
537 }
538
539 #[cfg(feature = "local-tunnel")]
540 for svr in self.tunnel_servers {
541 vfut.push(ServerHandle(tokio::spawn(svr.run())));
542 }
543
544 #[cfg(feature = "local-http")]
545 for svr in self.http_servers {
546 vfut.push(ServerHandle(tokio::spawn(svr.run())));
547 }
548
549 #[cfg(feature = "local-tun")]
550 for svr in self.tun_servers {
551 vfut.push(ServerHandle(tokio::spawn(svr.run())));
552 }
553
554 #[cfg(feature = "local-dns")]
555 for svr in self.dns_servers {
556 vfut.push(ServerHandle(tokio::spawn(svr.run())));
557 }
558
559 #[cfg(feature = "local-redir")]
560 for svr in self.redir_servers {
561 vfut.push(ServerHandle(tokio::spawn(svr.run())));
562 }
563
564 #[cfg(feature = "local-fake-dns")]
565 for svr in self.fake_dns_servers {
566 vfut.push(ServerHandle(tokio::spawn(svr.run())));
567 }
568
569 #[cfg(feature = "local-flow-stat")]
570 if let Some(stat_addr) = self.local_stat_addr {
571 let report_fut = flow_report_task(stat_addr, self.flow_stat);
574 vfut.push(ServerHandle(tokio::spawn(report_fut)));
575 }
576
577 #[cfg(feature = "local-online-config")]
578 if let Some(online_config) = self.online_config {
579 vfut.push(ServerHandle(tokio::spawn(online_config.run())));
580 }
581
582 let (res, ..) = future::select_all(vfut).await;
583 res
584 }
585
586 pub fn server_balancer(&self) -> &PingBalancer {
588 &self.balancer
589 }
590
591 pub fn socks_servers(&self) -> &[Socks] {
593 &self.socks_servers
594 }
595
596 #[cfg(feature = "local-tunnel")]
598 pub fn tunnel_servers(&self) -> &[Tunnel] {
599 &self.tunnel_servers
600 }
601
602 #[cfg(feature = "local-http")]
604 pub fn http_servers(&self) -> &[Http] {
605 &self.http_servers
606 }
607
608 #[cfg(feature = "local-tun")]
610 pub fn tun_servers(&self) -> &[Tun] {
611 &self.tun_servers
612 }
613
614 #[cfg(feature = "local-dns")]
616 pub fn dns_servers(&self) -> &[Dns] {
617 &self.dns_servers
618 }
619
620 #[cfg(feature = "local-redir")]
622 pub fn redir_servers(&self) -> &[Redir] {
623 &self.redir_servers
624 }
625
626 #[cfg(feature = "local-fake-dns")]
628 pub fn fake_dns_servers(&self) -> &[FakeDns] {
629 &self.fake_dns_servers
630 }
631}
632
633#[cfg(feature = "local-flow-stat")]
634async fn flow_report_task(stat_addr: LocalFlowStatAddress, flow_stat: Arc<FlowStat>) -> io::Result<()> {
635 use std::slice;
636
637 use log::debug;
638 use tokio::{io::AsyncWriteExt, time};
639
640 let timeout = Duration::from_secs(1);
642
643 loop {
644 time::sleep(Duration::from_millis(500)).await;
646
647 let tx = flow_stat.tx();
648 let rx = flow_stat.rx();
649
650 let buf: [u64; 2] = [tx, rx];
651 let buf = unsafe { slice::from_raw_parts(buf.as_ptr() as *const _, 16) };
652
653 match stat_addr {
654 #[cfg(unix)]
655 LocalFlowStatAddress::UnixStreamPath(ref stat_path) => {
656 use tokio::net::UnixStream;
657
658 let mut stream = match time::timeout(timeout, UnixStream::connect(stat_path)).await {
659 Ok(Ok(s)) => s,
660 Ok(Err(err)) => {
661 debug!("send client flow statistic error: {}", err);
662 continue;
663 }
664 Err(..) => {
665 debug!("send client flow statistic error: timeout");
666 continue;
667 }
668 };
669
670 match time::timeout(timeout, stream.write_all(buf)).await {
671 Ok(Ok(..)) => {}
672 Ok(Err(err)) => {
673 debug!("send client flow statistic error: {}", err);
674 }
675 Err(..) => {
676 debug!("send client flow statistic error: timeout");
677 }
678 }
679 }
680 LocalFlowStatAddress::TcpStreamAddr(stat_addr) => {
681 use tokio::net::TcpStream;
682
683 let mut stream = match time::timeout(timeout, TcpStream::connect(stat_addr)).await {
684 Ok(Ok(s)) => s,
685 Ok(Err(err)) => {
686 debug!("send client flow statistic error: {}", err);
687 continue;
688 }
689 Err(..) => {
690 debug!("send client flow statistic error: timeout");
691 continue;
692 }
693 };
694
695 match time::timeout(timeout, stream.write_all(buf)).await {
696 Ok(Ok(..)) => {}
697 Ok(Err(err)) => {
698 debug!("send client flow statistic error: {}", err);
699 }
700 Err(..) => {
701 debug!("send client flow statistic error: timeout");
702 }
703 }
704 }
705 }
706 }
707}
708
709pub async fn run(config: Config) -> io::Result<()> {
711 Server::new(config).await?.run().await
712}