shadowsocks_service/manager/
server.rs

1//! Shadowsocks Manager server
2
3#[cfg(unix)]
4use std::path::PathBuf;
5use std::{collections::HashMap, io, net::SocketAddr, sync::Arc, time::Duration};
6
7use cfg_if::cfg_if;
8use log::{error, info, trace};
9use shadowsocks::{
10    ManagerListener, ServerAddr,
11    config::{Mode, ServerConfig, ServerType, ServerUser, ServerUserManager},
12    context::{Context, SharedContext},
13    crypto::CipherKind,
14    dns_resolver::DnsResolver,
15    manager::{
16        datagram::ManagerSocketAddr,
17        protocol::{
18            self, AddRequest, AddResponse, ErrorResponse, ListResponse, ManagerRequest, PingResponse, RemoveRequest,
19            RemoveResponse, ServerUserConfig, StatRequest,
20        },
21    },
22    net::{AcceptOpts, ConnectOpts},
23    plugin::PluginConfig,
24};
25use tokio::{sync::Mutex, task::JoinHandle};
26
27use crate::{
28    acl::AccessControl,
29    config::{ManagerConfig, ManagerServerHost, ManagerServerMode, SecurityConfig},
30    net::FlowStat,
31    server::ServerBuilder,
32};
33
34enum ServerInstanceMode {
35    Builtin {
36        flow_stat: Arc<FlowStat>,
37        abortable: JoinHandle<io::Result<()>>,
38    },
39
40    #[cfg(unix)]
41    Standalone { flow_stat: u64 },
42}
43
44struct ServerInstance {
45    mode: ServerInstanceMode,
46    svr_cfg: ServerConfig,
47}
48
49impl Drop for ServerInstance {
50    fn drop(&mut self) {
51        #[allow(irrefutable_let_patterns)]
52        if let ServerInstanceMode::Builtin { ref abortable, .. } = self.mode {
53            abortable.abort();
54        }
55    }
56}
57
58impl ServerInstance {
59    fn flow_stat(&self) -> u64 {
60        match self.mode {
61            ServerInstanceMode::Builtin { ref flow_stat, .. } => flow_stat.tx() + flow_stat.rx(),
62            #[cfg(unix)]
63            ServerInstanceMode::Standalone { flow_stat } => flow_stat,
64        }
65    }
66}
67
68/// Manager server builder
69pub struct ManagerBuilder {
70    context: SharedContext,
71    svr_cfg: ManagerConfig,
72    connect_opts: ConnectOpts,
73    accept_opts: AcceptOpts,
74    udp_expiry_duration: Option<Duration>,
75    udp_capacity: Option<usize>,
76    acl: Option<Arc<AccessControl>>,
77    ipv6_first: bool,
78    security: SecurityConfig,
79}
80
81impl ManagerBuilder {
82    /// Create a new manager server builder from configuration
83    pub fn new(svr_cfg: ManagerConfig) -> Self {
84        Self::with_context(svr_cfg, Context::new_shared(ServerType::Server))
85    }
86
87    /// Create a new manager server builder with context and configuration
88    pub(crate) fn with_context(svr_cfg: ManagerConfig, context: SharedContext) -> Self {
89        Self {
90            context,
91            svr_cfg,
92            connect_opts: ConnectOpts::default(),
93            accept_opts: AcceptOpts::default(),
94            udp_expiry_duration: None,
95            udp_capacity: None,
96            acl: None,
97            ipv6_first: false,
98            security: SecurityConfig::default(),
99        }
100    }
101
102    /// Set `ConnectOpts`
103    pub fn set_connect_opts(&mut self, opts: ConnectOpts) {
104        self.connect_opts = opts;
105    }
106
107    /// Set `AcceptOpts`
108    pub fn set_accept_opts(&mut self, opts: AcceptOpts) {
109        self.accept_opts = opts;
110    }
111
112    /// Set UDP association's expiry duration
113    pub fn set_udp_expiry_duration(&mut self, d: Duration) {
114        self.udp_expiry_duration = Some(d);
115    }
116
117    /// Set total UDP associations to be kept in one server
118    pub fn set_udp_capacity(&mut self, c: usize) {
119        self.udp_capacity = Some(c);
120    }
121
122    /// Get the manager's configuration
123    pub fn config(&self) -> &ManagerConfig {
124        &self.svr_cfg
125    }
126
127    /// Get customized DNS resolver
128    pub fn set_dns_resolver(&mut self, resolver: Arc<DnsResolver>) {
129        let context = Arc::get_mut(&mut self.context).expect("cannot set DNS resolver on a shared context");
130        context.set_dns_resolver(resolver)
131    }
132
133    /// Set access control list
134    pub fn set_acl(&mut self, acl: Arc<AccessControl>) {
135        self.acl = Some(acl);
136    }
137
138    /// Try to connect IPv6 addresses first if hostname could be resolved to both IPv4 and IPv6
139    pub fn set_ipv6_first(&mut self, ipv6_first: bool) {
140        self.ipv6_first = ipv6_first;
141    }
142
143    /// Set security config
144    pub fn set_security_config(&mut self, security: SecurityConfig) {
145        self.security = security;
146    }
147
148    /// Build the manager server instance
149    pub async fn build(self) -> io::Result<Manager> {
150        let listener = ManagerListener::bind(&self.context, &self.svr_cfg.addr).await?;
151        Ok(Manager {
152            context: self.context,
153            servers: Mutex::new(HashMap::new()),
154            svr_cfg: self.svr_cfg,
155            connect_opts: self.connect_opts,
156            accept_opts: self.accept_opts,
157            udp_expiry_duration: self.udp_expiry_duration,
158            udp_capacity: self.udp_capacity,
159            acl: self.acl,
160            ipv6_first: self.ipv6_first,
161            security: self.security,
162            listener,
163        })
164    }
165}
166
167/// Manager server
168pub struct Manager {
169    context: SharedContext,
170    servers: Mutex<HashMap<u16, ServerInstance>>,
171    svr_cfg: ManagerConfig,
172    connect_opts: ConnectOpts,
173    accept_opts: AcceptOpts,
174    udp_expiry_duration: Option<Duration>,
175    udp_capacity: Option<usize>,
176    acl: Option<Arc<AccessControl>>,
177    ipv6_first: bool,
178    security: SecurityConfig,
179    listener: ManagerListener,
180}
181
182impl Manager {
183    /// Manager server's configuration
184    pub fn manager_config(&self) -> &ManagerConfig {
185        &self.svr_cfg
186    }
187
188    /// Manager server's listen address
189    pub fn local_addr(&self) -> io::Result<ManagerSocketAddr> {
190        self.listener.local_addr()
191    }
192
193    /// Start serving
194    pub async fn run(mut self) -> io::Result<()> {
195        let local_addr = self.listener.local_addr()?;
196        info!("shadowsocks manager server listening on {}", local_addr);
197
198        loop {
199            let (req, peer_addr) = match self.listener.recv_from().await {
200                Ok(r) => r,
201                Err(err) => {
202                    error!("manager recv_from error: {}", err);
203                    continue;
204                }
205            };
206
207            trace!("received {:?} from {:?}", req, peer_addr);
208
209            match req {
210                ManagerRequest::Add(ref req) => match self.handle_add(req).await {
211                    Ok(rsp) => {
212                        let _ = self.listener.send_to(&rsp, &peer_addr).await;
213                    }
214                    Err(err) => {
215                        error!("add server_port: {} failed, error: {}", req.server_port, err);
216                        let rsp = ErrorResponse(err);
217                        let _ = self.listener.send_to(&rsp, &peer_addr).await;
218                    }
219                },
220                ManagerRequest::Remove(ref req) => {
221                    let rsp = self.handle_remove(req).await;
222                    let _ = self.listener.send_to(&rsp, &peer_addr).await;
223                }
224                ManagerRequest::List(..) => {
225                    let rsp = self.handle_list().await;
226                    let _ = self.listener.send_to(&rsp, &peer_addr).await;
227                }
228                ManagerRequest::Ping(..) => {
229                    let rsp = self.handle_ping().await;
230                    let _ = self.listener.send_to(&rsp, &peer_addr).await;
231                }
232                ManagerRequest::Stat(ref stat) => self.handle_stat(stat).await,
233            }
234        }
235    }
236
237    /// Add a server programmatically
238    pub async fn add_server(&self, svr_cfg: ServerConfig) {
239        match self.svr_cfg.server_mode {
240            ManagerServerMode::Builtin => self.add_server_builtin(svr_cfg).await,
241            #[cfg(unix)]
242            ManagerServerMode::Standalone => self.add_server_standalone(svr_cfg).await,
243        }
244    }
245
246    async fn add_server_builtin(&self, svr_cfg: ServerConfig) {
247        // Each server should use a separate Context, but shares
248        //
249        // * AccessControlList
250        // * DNS Resolver
251        let mut server_builder = ServerBuilder::new(svr_cfg.clone());
252
253        server_builder.set_connect_opts(self.connect_opts.clone());
254        server_builder.set_accept_opts(self.accept_opts.clone());
255        server_builder.set_dns_resolver(self.context.dns_resolver().clone());
256
257        if let Some(d) = self.udp_expiry_duration {
258            server_builder.set_udp_expiry_duration(d);
259        }
260
261        if let Some(c) = self.udp_capacity {
262            server_builder.set_udp_capacity(c);
263        }
264
265        if let Some(ref acl) = self.acl {
266            server_builder.set_acl(acl.clone());
267        }
268
269        if self.ipv6_first {
270            server_builder.set_ipv6_first(self.ipv6_first);
271        }
272
273        server_builder.set_security_config(&self.security);
274
275        let server_port = server_builder.server_config().addr().port();
276
277        let mut servers = self.servers.lock().await;
278        // Close existed server
279        if let Some(v) = servers.remove(&server_port) {
280            info!(
281                "closed managed server listening on {}, inbound address {}",
282                v.svr_cfg.addr(),
283                v.svr_cfg.tcp_external_addr()
284            );
285        }
286
287        let flow_stat = server_builder.flow_stat();
288        let server = match server_builder.build().await {
289            Ok(s) => s,
290            Err(err) => {
291                error!("failed to start server ({}), error: {}", svr_cfg.addr(), err);
292                return;
293            }
294        };
295
296        let abortable = tokio::spawn(async move { server.run().await });
297
298        servers.insert(
299            server_port,
300            ServerInstance {
301                mode: ServerInstanceMode::Builtin { flow_stat, abortable },
302                svr_cfg,
303            },
304        );
305    }
306
307    #[cfg(unix)]
308    fn server_pid_path(&self, port: u16) -> PathBuf {
309        let pid_file_name = format!("shadowsocks-server-{port}.pid");
310        let mut pid_path = self.svr_cfg.server_working_directory.clone();
311        pid_path.push(&pid_file_name);
312        pid_path
313    }
314
315    #[cfg(unix)]
316    fn server_config_path(&self, port: u16) -> PathBuf {
317        let config_file_name = format!("shadowsocks-server-{port}.json");
318        let mut config_file_path = self.svr_cfg.server_working_directory.clone();
319        config_file_path.push(&config_file_name);
320        config_file_path
321    }
322
323    #[cfg(unix)]
324    fn kill_standalone_server(&self, port: u16) {
325        use log::{debug, warn};
326        use std::{
327            fs::{self, File},
328            io::Read,
329        };
330
331        let pid_path = self.server_pid_path(port);
332        if pid_path.exists()
333            && let Ok(mut pid_file) = File::open(&pid_path) {
334                let mut pid_content = String::new();
335                if pid_file.read_to_string(&mut pid_content).is_ok() {
336                    let pid_content = pid_content.trim();
337
338                    match pid_content.parse::<libc::pid_t>() {
339                        Ok(pid) => {
340                            let _ = unsafe { libc::kill(pid, libc::SIGTERM) };
341                            debug!("killed standalone server port {}, pid: {}", port, pid);
342                        }
343                        Err(..) => {
344                            warn!("failed to read pid from {}", pid_path.display());
345                        }
346                    }
347                }
348            }
349
350        let server_config_path = self.server_config_path(port);
351
352        let _ = fs::remove_file(pid_path);
353        let _ = fs::remove_file(server_config_path);
354    }
355
356    #[cfg(unix)]
357    async fn add_server_standalone(&self, svr_cfg: ServerConfig) {
358        use std::{
359            fs::{self, OpenOptions},
360            io::Write,
361        };
362
363        use tokio::process::Command;
364
365        use crate::config::{Config, ConfigType, ServerInstanceConfig};
366
367        // Lock the map first incase there are multiple requests to create one server instance
368        let mut servers = self.servers.lock().await;
369
370        // Check if working_directory exists
371        if !self.svr_cfg.server_working_directory.exists() {
372            fs::create_dir_all(&self.svr_cfg.server_working_directory).expect("create working_directory");
373        }
374
375        let port = svr_cfg.addr().port();
376
377        // Check if there is already a running process
378        self.kill_standalone_server(port);
379
380        // Create configuration file for server
381        let config_file_path = self.server_config_path(port);
382        let pid_path = self.server_pid_path(port);
383
384        let server_instance = ServerInstanceConfig {
385            config: svr_cfg.clone(),
386            acl: None, // Set with --acl command line argument
387            #[cfg(any(target_os = "linux", target_os = "android"))]
388            outbound_fwmark: None,
389            #[cfg(target_os = "freebsd")]
390            outbound_user_cookie: None,
391            outbound_bind_addr: None,
392            outbound_bind_interface: None,
393            outbound_udp_allow_fragmentation: None,
394        };
395
396        let mut config = Config::new(ConfigType::Server);
397        config.server.push(server_instance);
398
399        trace!("created standalone server with config {:?}", config);
400
401        let config_file_content = format!("{config}");
402
403        match OpenOptions::new()
404            .write(true)
405            .create(true)
406            .truncate(true)
407            .open(&config_file_path)
408        {
409            Err(err) => {
410                error!(
411                    "failed to open {} for writing, error: {}",
412                    config_file_path.display(),
413                    err
414                );
415                return;
416            }
417            Ok(mut file) => {
418                if let Err(err) = file.write_all(config_file_content.as_bytes()) {
419                    error!("failed to write {}, error: {}", config_file_path.display(), err);
420                    return;
421                }
422                let _ = file.sync_data();
423            }
424        }
425
426        let manager_addr = self.svr_cfg.addr.to_string();
427
428        // Start server process
429        let mut child_command = Command::new(&self.svr_cfg.server_program);
430        child_command
431            .arg("-c")
432            .arg(&config_file_path)
433            .arg("--daemonize")
434            .arg("--daemonize-pid")
435            .arg(&pid_path)
436            .arg("--manager-addr")
437            .arg(&manager_addr);
438
439        if let Some(ref acl) = self.acl {
440            child_command.arg("--acl").arg(acl.file_path().to_str().expect("acl"));
441        }
442
443        let child_result = child_command.kill_on_drop(false).spawn();
444
445        if let Err(err) = child_result {
446            error!(
447                "failed to spawn process of {}, error: {}",
448                self.svr_cfg.server_program, err
449            );
450            return;
451        }
452
453        // Greate. Record into the map
454        servers.insert(
455            port,
456            ServerInstance {
457                mode: ServerInstanceMode::Standalone { flow_stat: 0 },
458                svr_cfg,
459            },
460        );
461    }
462
463    async fn handle_add(&self, req: &AddRequest) -> io::Result<AddResponse> {
464        let addr = match self.svr_cfg.server_host {
465            ManagerServerHost::Domain(ref dname) => ServerAddr::DomainName(dname.clone(), req.server_port),
466            ManagerServerHost::Ip(ip) => ServerAddr::SocketAddr(SocketAddr::new(ip, req.server_port)),
467        };
468
469        let method = match req.method {
470            Some(ref m) => match m.parse::<CipherKind>() {
471                Ok(method) => method,
472                Err(..) => {
473                    error!("unrecognized method \"{}\", req: {:?}", m, req);
474
475                    let err = format!("unrecognized method \"{m}\"");
476                    return Ok(AddResponse(err));
477                }
478            },
479            None => match self.svr_cfg.method {
480                Some(m) => m,
481                None => {
482                    cfg_if! {
483                        if #[cfg(feature = "aead-cipher")] {
484                            // If AEAD cipher is enabled, use chacha20-poly1305 as default method
485                            // NOTE: This behavior is defined in shadowsocks-libev's `manager.c`
486                            CipherKind::CHACHA20_POLY1305
487                        } else {
488                            // AEAD cipher is disabled, default method is not defined in any standard or implementations.
489                            // TODO: Complete this after discussion.
490                            return Ok(AddResponse("method is required".to_string()));
491                        }
492                    }
493                }
494            },
495        };
496
497        let mut svr_cfg = match ServerConfig::new(addr, req.password.clone(), method) {
498            Ok(svr_cfg) => svr_cfg,
499            Err(err) => {
500                error!("failed to create ServerConfig, error: {}", err);
501                return Ok(AddResponse("invalid server".to_string()));
502            }
503        };
504
505        if let Some(ref plugin) = req.plugin {
506            let p = PluginConfig {
507                plugin: plugin.clone(),
508                plugin_opts: req.plugin_opts.clone(),
509                plugin_args: Vec::new(),
510                plugin_mode: match req.plugin_mode {
511                    None => Mode::TcpOnly,
512                    Some(ref mode) => match mode.parse::<Mode>() {
513                        Ok(m) => m,
514                        Err(..) => {
515                            error!("unrecognized plugin_mode \"{}\", req: {:?}", mode, req);
516
517                            let err = format!("unrecognized plugin_mode \"{}\"", mode);
518                            return Ok(AddResponse(err));
519                        }
520                    },
521                },
522            };
523            svr_cfg.set_plugin(p);
524        } else if let Some(ref plugin) = self.svr_cfg.plugin {
525            svr_cfg.set_plugin(plugin.clone());
526        }
527
528        let mode = match req.mode {
529            None => None,
530            Some(ref mode) => match mode.parse::<Mode>() {
531                Ok(m) => Some(m),
532                Err(..) => {
533                    error!("unrecognized mode \"{}\", req: {:?}", mode, req);
534
535                    let err = format!("unrecognized mode \"{mode}\"");
536                    return Ok(AddResponse(err));
537                }
538            },
539        };
540
541        svr_cfg.set_mode(mode.unwrap_or(self.svr_cfg.mode));
542
543        if let Some(ref users) = req.users {
544            let mut user_manager = ServerUserManager::new();
545
546            for user in users.iter() {
547                let user = match ServerUser::with_encoded_key(&user.name, &user.password) {
548                    Ok(u) => u,
549                    Err(..) => {
550                        error!(
551                            "users[].password must be encoded with base64, but found: {}",
552                            user.password
553                        );
554
555                        return Err(io::Error::other("users[].password must be encoded with base64"));
556                    }
557                };
558
559                user_manager.add_user(user);
560            }
561
562            svr_cfg.set_user_manager(user_manager);
563        }
564
565        self.add_server(svr_cfg).await;
566
567        Ok(AddResponse("ok".to_owned()))
568    }
569
570    async fn handle_remove(&self, req: &RemoveRequest) -> RemoveResponse {
571        let mut servers = self.servers.lock().await;
572        servers.remove(&req.server_port);
573
574        #[cfg(unix)]
575        if self.svr_cfg.server_mode == ManagerServerMode::Standalone {
576            self.kill_standalone_server(req.server_port);
577        }
578
579        RemoveResponse("ok".to_owned())
580    }
581
582    async fn handle_list(&self) -> ListResponse {
583        let instances = self.servers.lock().await;
584
585        let mut servers = Vec::new();
586
587        for (_, server) in instances.iter() {
588            let svr_cfg = &server.svr_cfg;
589
590            let mut users = None;
591            if let Some(user_manager) = server.svr_cfg.user_manager() {
592                let mut vu = Vec::with_capacity(user_manager.user_count());
593
594                for user in user_manager.users_iter() {
595                    vu.push(ServerUserConfig {
596                        name: user.name().to_owned(),
597                        password: user.encoded_key(),
598                    });
599                }
600
601                users = Some(vu);
602            }
603
604            let sc = protocol::ServerConfig {
605                server_port: svr_cfg.addr().port(),
606                password: svr_cfg.password().to_owned(),
607                method: None,
608                no_delay: None,
609                plugin: None,
610                plugin_opts: None,
611                plugin_mode: None,
612                mode: None,
613                users,
614            };
615            servers.push(sc);
616        }
617
618        ListResponse { servers }
619    }
620
621    async fn handle_ping(&self) -> PingResponse {
622        let instances = self.servers.lock().await;
623
624        let mut stat = HashMap::new();
625        for (port, server) in instances.iter() {
626            stat.insert(*port, server.flow_stat());
627        }
628
629        PingResponse { stat }
630    }
631
632    #[cfg(not(unix))]
633    async fn handle_stat(&self, _: &StatRequest) {}
634
635    #[cfg(unix)]
636    async fn handle_stat(&self, stat: &StatRequest) {
637        use log::warn;
638        use std::collections::hash_map::Entry;
639
640        use crate::config::{Config, ConfigType};
641
642        // `stat` is only supported for Standalone mode
643        if self.svr_cfg.server_mode != ManagerServerMode::Standalone {
644            return;
645        }
646
647        let mut instances = self.servers.lock().await;
648
649        // Get or create a new instance then record the data statistic numbers
650        for (port, flow) in stat.stat.iter() {
651            match instances.entry(*port) {
652                Entry::Occupied(mut occ) => match occ.get_mut().mode {
653                    ServerInstanceMode::Builtin { .. } => {
654                        error!("received `stat` for port {} that is running a builtin server", *port)
655                    }
656                    ServerInstanceMode::Standalone { ref mut flow_stat } => *flow_stat = *flow,
657                },
658                Entry::Vacant(vac) => {
659                    // Read config from file
660
661                    let server_config_path = self.server_config_path(*port);
662                    if !server_config_path.exists() {
663                        warn!(
664                            "received `stat` for port {} but file {} doesn't exist",
665                            *port,
666                            server_config_path.display()
667                        );
668                        continue;
669                    }
670
671                    match Config::load_from_file(&server_config_path, ConfigType::Server) {
672                        Err(err) => {
673                            error!(
674                                "failed to load {} for server port {}, error: {}",
675                                server_config_path.display(),
676                                *port,
677                                err
678                            );
679                            continue;
680                        }
681                        Ok(config) => {
682                            trace!(
683                                "loaded {} for server port {}, {:?}",
684                                server_config_path.display(),
685                                *port,
686                                config
687                            );
688
689                            if config.server.len() != 1 {
690                                error!(
691                                    "invalid config {} for server port {}, containing {} servers",
692                                    server_config_path.display(),
693                                    *port,
694                                    config.server.len()
695                                );
696                                continue;
697                            }
698
699                            let svr_cfg = config.server[0].config.clone();
700
701                            vac.insert(ServerInstance {
702                                mode: ServerInstanceMode::Standalone { flow_stat: *flow },
703                                svr_cfg,
704                            });
705                        }
706                    }
707                }
708            }
709        }
710    }
711}