1#[cfg(unix)]
4use std::path::PathBuf;
5use std::{collections::HashMap, io, net::SocketAddr, sync::Arc, time::Duration};
6
7use cfg_if::cfg_if;
8use log::{error, info, trace};
9use shadowsocks::{
10 ManagerListener, ServerAddr,
11 config::{Mode, ServerConfig, ServerType, ServerUser, ServerUserManager},
12 context::{Context, SharedContext},
13 crypto::CipherKind,
14 dns_resolver::DnsResolver,
15 manager::{
16 datagram::ManagerSocketAddr,
17 protocol::{
18 self, AddRequest, AddResponse, ErrorResponse, ListResponse, ManagerRequest, PingResponse, RemoveRequest,
19 RemoveResponse, ServerUserConfig, StatRequest,
20 },
21 },
22 net::{AcceptOpts, ConnectOpts},
23 plugin::PluginConfig,
24};
25use tokio::{sync::Mutex, task::JoinHandle};
26
27use crate::{
28 acl::AccessControl,
29 config::{ManagerConfig, ManagerServerHost, ManagerServerMode, SecurityConfig},
30 net::FlowStat,
31 server::ServerBuilder,
32};
33
34enum ServerInstanceMode {
35 Builtin {
36 flow_stat: Arc<FlowStat>,
37 abortable: JoinHandle<io::Result<()>>,
38 },
39
40 #[cfg(unix)]
41 Standalone { flow_stat: u64 },
42}
43
44struct ServerInstance {
45 mode: ServerInstanceMode,
46 svr_cfg: ServerConfig,
47}
48
49impl Drop for ServerInstance {
50 fn drop(&mut self) {
51 #[allow(irrefutable_let_patterns)]
52 if let ServerInstanceMode::Builtin { ref abortable, .. } = self.mode {
53 abortable.abort();
54 }
55 }
56}
57
58impl ServerInstance {
59 fn flow_stat(&self) -> u64 {
60 match self.mode {
61 ServerInstanceMode::Builtin { ref flow_stat, .. } => flow_stat.tx() + flow_stat.rx(),
62 #[cfg(unix)]
63 ServerInstanceMode::Standalone { flow_stat } => flow_stat,
64 }
65 }
66}
67
68pub struct ManagerBuilder {
70 context: SharedContext,
71 svr_cfg: ManagerConfig,
72 connect_opts: ConnectOpts,
73 accept_opts: AcceptOpts,
74 udp_expiry_duration: Option<Duration>,
75 udp_capacity: Option<usize>,
76 acl: Option<Arc<AccessControl>>,
77 ipv6_first: bool,
78 security: SecurityConfig,
79}
80
81impl ManagerBuilder {
82 pub fn new(svr_cfg: ManagerConfig) -> Self {
84 Self::with_context(svr_cfg, Context::new_shared(ServerType::Server))
85 }
86
87 pub(crate) fn with_context(svr_cfg: ManagerConfig, context: SharedContext) -> Self {
89 Self {
90 context,
91 svr_cfg,
92 connect_opts: ConnectOpts::default(),
93 accept_opts: AcceptOpts::default(),
94 udp_expiry_duration: None,
95 udp_capacity: None,
96 acl: None,
97 ipv6_first: false,
98 security: SecurityConfig::default(),
99 }
100 }
101
102 pub fn set_connect_opts(&mut self, opts: ConnectOpts) {
104 self.connect_opts = opts;
105 }
106
107 pub fn set_accept_opts(&mut self, opts: AcceptOpts) {
109 self.accept_opts = opts;
110 }
111
112 pub fn set_udp_expiry_duration(&mut self, d: Duration) {
114 self.udp_expiry_duration = Some(d);
115 }
116
117 pub fn set_udp_capacity(&mut self, c: usize) {
119 self.udp_capacity = Some(c);
120 }
121
122 pub fn config(&self) -> &ManagerConfig {
124 &self.svr_cfg
125 }
126
127 pub fn set_dns_resolver(&mut self, resolver: Arc<DnsResolver>) {
129 let context = Arc::get_mut(&mut self.context).expect("cannot set DNS resolver on a shared context");
130 context.set_dns_resolver(resolver)
131 }
132
133 pub fn set_acl(&mut self, acl: Arc<AccessControl>) {
135 self.acl = Some(acl);
136 }
137
138 pub fn set_ipv6_first(&mut self, ipv6_first: bool) {
140 self.ipv6_first = ipv6_first;
141 }
142
143 pub fn set_security_config(&mut self, security: SecurityConfig) {
145 self.security = security;
146 }
147
148 pub async fn build(self) -> io::Result<Manager> {
150 let listener = ManagerListener::bind(&self.context, &self.svr_cfg.addr).await?;
151 Ok(Manager {
152 context: self.context,
153 servers: Mutex::new(HashMap::new()),
154 svr_cfg: self.svr_cfg,
155 connect_opts: self.connect_opts,
156 accept_opts: self.accept_opts,
157 udp_expiry_duration: self.udp_expiry_duration,
158 udp_capacity: self.udp_capacity,
159 acl: self.acl,
160 ipv6_first: self.ipv6_first,
161 security: self.security,
162 listener,
163 })
164 }
165}
166
167pub struct Manager {
169 context: SharedContext,
170 servers: Mutex<HashMap<u16, ServerInstance>>,
171 svr_cfg: ManagerConfig,
172 connect_opts: ConnectOpts,
173 accept_opts: AcceptOpts,
174 udp_expiry_duration: Option<Duration>,
175 udp_capacity: Option<usize>,
176 acl: Option<Arc<AccessControl>>,
177 ipv6_first: bool,
178 security: SecurityConfig,
179 listener: ManagerListener,
180}
181
182impl Manager {
183 pub fn manager_config(&self) -> &ManagerConfig {
185 &self.svr_cfg
186 }
187
188 pub fn local_addr(&self) -> io::Result<ManagerSocketAddr> {
190 self.listener.local_addr()
191 }
192
193 pub async fn run(mut self) -> io::Result<()> {
195 let local_addr = self.listener.local_addr()?;
196 info!("shadowsocks manager server listening on {}", local_addr);
197
198 loop {
199 let (req, peer_addr) = match self.listener.recv_from().await {
200 Ok(r) => r,
201 Err(err) => {
202 error!("manager recv_from error: {}", err);
203 continue;
204 }
205 };
206
207 trace!("received {:?} from {:?}", req, peer_addr);
208
209 match req {
210 ManagerRequest::Add(ref req) => match self.handle_add(req).await {
211 Ok(rsp) => {
212 let _ = self.listener.send_to(&rsp, &peer_addr).await;
213 }
214 Err(err) => {
215 error!("add server_port: {} failed, error: {}", req.server_port, err);
216 let rsp = ErrorResponse(err);
217 let _ = self.listener.send_to(&rsp, &peer_addr).await;
218 }
219 },
220 ManagerRequest::Remove(ref req) => {
221 let rsp = self.handle_remove(req).await;
222 let _ = self.listener.send_to(&rsp, &peer_addr).await;
223 }
224 ManagerRequest::List(..) => {
225 let rsp = self.handle_list().await;
226 let _ = self.listener.send_to(&rsp, &peer_addr).await;
227 }
228 ManagerRequest::Ping(..) => {
229 let rsp = self.handle_ping().await;
230 let _ = self.listener.send_to(&rsp, &peer_addr).await;
231 }
232 ManagerRequest::Stat(ref stat) => self.handle_stat(stat).await,
233 }
234 }
235 }
236
237 pub async fn add_server(&self, svr_cfg: ServerConfig) {
239 match self.svr_cfg.server_mode {
240 ManagerServerMode::Builtin => self.add_server_builtin(svr_cfg).await,
241 #[cfg(unix)]
242 ManagerServerMode::Standalone => self.add_server_standalone(svr_cfg).await,
243 }
244 }
245
246 async fn add_server_builtin(&self, svr_cfg: ServerConfig) {
247 let mut server_builder = ServerBuilder::new(svr_cfg.clone());
252
253 server_builder.set_connect_opts(self.connect_opts.clone());
254 server_builder.set_accept_opts(self.accept_opts.clone());
255 server_builder.set_dns_resolver(self.context.dns_resolver().clone());
256
257 if let Some(d) = self.udp_expiry_duration {
258 server_builder.set_udp_expiry_duration(d);
259 }
260
261 if let Some(c) = self.udp_capacity {
262 server_builder.set_udp_capacity(c);
263 }
264
265 if let Some(ref acl) = self.acl {
266 server_builder.set_acl(acl.clone());
267 }
268
269 if self.ipv6_first {
270 server_builder.set_ipv6_first(self.ipv6_first);
271 }
272
273 server_builder.set_security_config(&self.security);
274
275 let server_port = server_builder.server_config().addr().port();
276
277 let mut servers = self.servers.lock().await;
278 if let Some(v) = servers.remove(&server_port) {
280 info!(
281 "closed managed server listening on {}, inbound address {}",
282 v.svr_cfg.addr(),
283 v.svr_cfg.tcp_external_addr()
284 );
285 }
286
287 let flow_stat = server_builder.flow_stat();
288 let server = match server_builder.build().await {
289 Ok(s) => s,
290 Err(err) => {
291 error!("failed to start server ({}), error: {}", svr_cfg.addr(), err);
292 return;
293 }
294 };
295
296 let abortable = tokio::spawn(async move { server.run().await });
297
298 servers.insert(
299 server_port,
300 ServerInstance {
301 mode: ServerInstanceMode::Builtin { flow_stat, abortable },
302 svr_cfg,
303 },
304 );
305 }
306
307 #[cfg(unix)]
308 fn server_pid_path(&self, port: u16) -> PathBuf {
309 let pid_file_name = format!("shadowsocks-server-{port}.pid");
310 let mut pid_path = self.svr_cfg.server_working_directory.clone();
311 pid_path.push(&pid_file_name);
312 pid_path
313 }
314
315 #[cfg(unix)]
316 fn server_config_path(&self, port: u16) -> PathBuf {
317 let config_file_name = format!("shadowsocks-server-{port}.json");
318 let mut config_file_path = self.svr_cfg.server_working_directory.clone();
319 config_file_path.push(&config_file_name);
320 config_file_path
321 }
322
323 #[cfg(unix)]
324 fn kill_standalone_server(&self, port: u16) {
325 use log::{debug, warn};
326 use std::{
327 fs::{self, File},
328 io::Read,
329 };
330
331 let pid_path = self.server_pid_path(port);
332 if pid_path.exists()
333 && let Ok(mut pid_file) = File::open(&pid_path) {
334 let mut pid_content = String::new();
335 if pid_file.read_to_string(&mut pid_content).is_ok() {
336 let pid_content = pid_content.trim();
337
338 match pid_content.parse::<libc::pid_t>() {
339 Ok(pid) => {
340 let _ = unsafe { libc::kill(pid, libc::SIGTERM) };
341 debug!("killed standalone server port {}, pid: {}", port, pid);
342 }
343 Err(..) => {
344 warn!("failed to read pid from {}", pid_path.display());
345 }
346 }
347 }
348 }
349
350 let server_config_path = self.server_config_path(port);
351
352 let _ = fs::remove_file(pid_path);
353 let _ = fs::remove_file(server_config_path);
354 }
355
356 #[cfg(unix)]
357 async fn add_server_standalone(&self, svr_cfg: ServerConfig) {
358 use std::{
359 fs::{self, OpenOptions},
360 io::Write,
361 };
362
363 use tokio::process::Command;
364
365 use crate::config::{Config, ConfigType, ServerInstanceConfig};
366
367 let mut servers = self.servers.lock().await;
369
370 if !self.svr_cfg.server_working_directory.exists() {
372 fs::create_dir_all(&self.svr_cfg.server_working_directory).expect("create working_directory");
373 }
374
375 let port = svr_cfg.addr().port();
376
377 self.kill_standalone_server(port);
379
380 let config_file_path = self.server_config_path(port);
382 let pid_path = self.server_pid_path(port);
383
384 let server_instance = ServerInstanceConfig {
385 config: svr_cfg.clone(),
386 acl: None, #[cfg(any(target_os = "linux", target_os = "android"))]
388 outbound_fwmark: None,
389 #[cfg(target_os = "freebsd")]
390 outbound_user_cookie: None,
391 outbound_bind_addr: None,
392 outbound_bind_interface: None,
393 outbound_udp_allow_fragmentation: None,
394 };
395
396 let mut config = Config::new(ConfigType::Server);
397 config.server.push(server_instance);
398
399 trace!("created standalone server with config {:?}", config);
400
401 let config_file_content = format!("{config}");
402
403 match OpenOptions::new()
404 .write(true)
405 .create(true)
406 .truncate(true)
407 .open(&config_file_path)
408 {
409 Err(err) => {
410 error!(
411 "failed to open {} for writing, error: {}",
412 config_file_path.display(),
413 err
414 );
415 return;
416 }
417 Ok(mut file) => {
418 if let Err(err) = file.write_all(config_file_content.as_bytes()) {
419 error!("failed to write {}, error: {}", config_file_path.display(), err);
420 return;
421 }
422 let _ = file.sync_data();
423 }
424 }
425
426 let manager_addr = self.svr_cfg.addr.to_string();
427
428 let mut child_command = Command::new(&self.svr_cfg.server_program);
430 child_command
431 .arg("-c")
432 .arg(&config_file_path)
433 .arg("--daemonize")
434 .arg("--daemonize-pid")
435 .arg(&pid_path)
436 .arg("--manager-addr")
437 .arg(&manager_addr);
438
439 if let Some(ref acl) = self.acl {
440 child_command.arg("--acl").arg(acl.file_path().to_str().expect("acl"));
441 }
442
443 let child_result = child_command.kill_on_drop(false).spawn();
444
445 if let Err(err) = child_result {
446 error!(
447 "failed to spawn process of {}, error: {}",
448 self.svr_cfg.server_program, err
449 );
450 return;
451 }
452
453 servers.insert(
455 port,
456 ServerInstance {
457 mode: ServerInstanceMode::Standalone { flow_stat: 0 },
458 svr_cfg,
459 },
460 );
461 }
462
463 async fn handle_add(&self, req: &AddRequest) -> io::Result<AddResponse> {
464 let addr = match self.svr_cfg.server_host {
465 ManagerServerHost::Domain(ref dname) => ServerAddr::DomainName(dname.clone(), req.server_port),
466 ManagerServerHost::Ip(ip) => ServerAddr::SocketAddr(SocketAddr::new(ip, req.server_port)),
467 };
468
469 let method = match req.method {
470 Some(ref m) => match m.parse::<CipherKind>() {
471 Ok(method) => method,
472 Err(..) => {
473 error!("unrecognized method \"{}\", req: {:?}", m, req);
474
475 let err = format!("unrecognized method \"{m}\"");
476 return Ok(AddResponse(err));
477 }
478 },
479 None => match self.svr_cfg.method {
480 Some(m) => m,
481 None => {
482 cfg_if! {
483 if #[cfg(feature = "aead-cipher")] {
484 CipherKind::CHACHA20_POLY1305
487 } else {
488 return Ok(AddResponse("method is required".to_string()));
491 }
492 }
493 }
494 },
495 };
496
497 let mut svr_cfg = match ServerConfig::new(addr, req.password.clone(), method) {
498 Ok(svr_cfg) => svr_cfg,
499 Err(err) => {
500 error!("failed to create ServerConfig, error: {}", err);
501 return Ok(AddResponse("invalid server".to_string()));
502 }
503 };
504
505 if let Some(ref plugin) = req.plugin {
506 let p = PluginConfig {
507 plugin: plugin.clone(),
508 plugin_opts: req.plugin_opts.clone(),
509 plugin_args: Vec::new(),
510 plugin_mode: match req.plugin_mode {
511 None => Mode::TcpOnly,
512 Some(ref mode) => match mode.parse::<Mode>() {
513 Ok(m) => m,
514 Err(..) => {
515 error!("unrecognized plugin_mode \"{}\", req: {:?}", mode, req);
516
517 let err = format!("unrecognized plugin_mode \"{}\"", mode);
518 return Ok(AddResponse(err));
519 }
520 },
521 },
522 };
523 svr_cfg.set_plugin(p);
524 } else if let Some(ref plugin) = self.svr_cfg.plugin {
525 svr_cfg.set_plugin(plugin.clone());
526 }
527
528 let mode = match req.mode {
529 None => None,
530 Some(ref mode) => match mode.parse::<Mode>() {
531 Ok(m) => Some(m),
532 Err(..) => {
533 error!("unrecognized mode \"{}\", req: {:?}", mode, req);
534
535 let err = format!("unrecognized mode \"{mode}\"");
536 return Ok(AddResponse(err));
537 }
538 },
539 };
540
541 svr_cfg.set_mode(mode.unwrap_or(self.svr_cfg.mode));
542
543 if let Some(ref users) = req.users {
544 let mut user_manager = ServerUserManager::new();
545
546 for user in users.iter() {
547 let user = match ServerUser::with_encoded_key(&user.name, &user.password) {
548 Ok(u) => u,
549 Err(..) => {
550 error!(
551 "users[].password must be encoded with base64, but found: {}",
552 user.password
553 );
554
555 return Err(io::Error::other("users[].password must be encoded with base64"));
556 }
557 };
558
559 user_manager.add_user(user);
560 }
561
562 svr_cfg.set_user_manager(user_manager);
563 }
564
565 self.add_server(svr_cfg).await;
566
567 Ok(AddResponse("ok".to_owned()))
568 }
569
570 async fn handle_remove(&self, req: &RemoveRequest) -> RemoveResponse {
571 let mut servers = self.servers.lock().await;
572 servers.remove(&req.server_port);
573
574 #[cfg(unix)]
575 if self.svr_cfg.server_mode == ManagerServerMode::Standalone {
576 self.kill_standalone_server(req.server_port);
577 }
578
579 RemoveResponse("ok".to_owned())
580 }
581
582 async fn handle_list(&self) -> ListResponse {
583 let instances = self.servers.lock().await;
584
585 let mut servers = Vec::new();
586
587 for (_, server) in instances.iter() {
588 let svr_cfg = &server.svr_cfg;
589
590 let mut users = None;
591 if let Some(user_manager) = server.svr_cfg.user_manager() {
592 let mut vu = Vec::with_capacity(user_manager.user_count());
593
594 for user in user_manager.users_iter() {
595 vu.push(ServerUserConfig {
596 name: user.name().to_owned(),
597 password: user.encoded_key(),
598 });
599 }
600
601 users = Some(vu);
602 }
603
604 let sc = protocol::ServerConfig {
605 server_port: svr_cfg.addr().port(),
606 password: svr_cfg.password().to_owned(),
607 method: None,
608 no_delay: None,
609 plugin: None,
610 plugin_opts: None,
611 plugin_mode: None,
612 mode: None,
613 users,
614 };
615 servers.push(sc);
616 }
617
618 ListResponse { servers }
619 }
620
621 async fn handle_ping(&self) -> PingResponse {
622 let instances = self.servers.lock().await;
623
624 let mut stat = HashMap::new();
625 for (port, server) in instances.iter() {
626 stat.insert(*port, server.flow_stat());
627 }
628
629 PingResponse { stat }
630 }
631
632 #[cfg(not(unix))]
633 async fn handle_stat(&self, _: &StatRequest) {}
634
635 #[cfg(unix)]
636 async fn handle_stat(&self, stat: &StatRequest) {
637 use log::warn;
638 use std::collections::hash_map::Entry;
639
640 use crate::config::{Config, ConfigType};
641
642 if self.svr_cfg.server_mode != ManagerServerMode::Standalone {
644 return;
645 }
646
647 let mut instances = self.servers.lock().await;
648
649 for (port, flow) in stat.stat.iter() {
651 match instances.entry(*port) {
652 Entry::Occupied(mut occ) => match occ.get_mut().mode {
653 ServerInstanceMode::Builtin { .. } => {
654 error!("received `stat` for port {} that is running a builtin server", *port)
655 }
656 ServerInstanceMode::Standalone { ref mut flow_stat } => *flow_stat = *flow,
657 },
658 Entry::Vacant(vac) => {
659 let server_config_path = self.server_config_path(*port);
662 if !server_config_path.exists() {
663 warn!(
664 "received `stat` for port {} but file {} doesn't exist",
665 *port,
666 server_config_path.display()
667 );
668 continue;
669 }
670
671 match Config::load_from_file(&server_config_path, ConfigType::Server) {
672 Err(err) => {
673 error!(
674 "failed to load {} for server port {}, error: {}",
675 server_config_path.display(),
676 *port,
677 err
678 );
679 continue;
680 }
681 Ok(config) => {
682 trace!(
683 "loaded {} for server port {}, {:?}",
684 server_config_path.display(),
685 *port,
686 config
687 );
688
689 if config.server.len() != 1 {
690 error!(
691 "invalid config {} for server port {}, containing {} servers",
692 server_config_path.display(),
693 *port,
694 config.server.len()
695 );
696 continue;
697 }
698
699 let svr_cfg = config.server[0].config.clone();
700
701 vac.insert(ServerInstance {
702 mode: ServerInstanceMode::Standalone { flow_stat: *flow },
703 svr_cfg,
704 });
705 }
706 }
707 }
708 }
709 }
710 }
711}