Skip to main content

rmux_server/
daemon.rs

1#[cfg(unix)]
2use std::fs;
3use std::io;
4#[cfg(windows)]
5use std::io::{Read, Write};
6#[cfg(unix)]
7use std::os::unix::fs::{DirBuilderExt, FileTypeExt, MetadataExt, PermissionsExt};
8#[cfg(unix)]
9use std::os::unix::net::UnixStream as StdUnixStream;
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex as StdMutex};
12#[cfg(windows)]
13use std::time::Duration;
14
15use tokio::sync::oneshot;
16use tokio::task::JoinHandle;
17#[cfg(unix)]
18use tracing::debug;
19
20use rmux_core::events::SubscriptionLimits;
21#[cfg(windows)]
22use rmux_ipc::connect_blocking;
23use rmux_ipc::{LocalEndpoint, LocalListener};
24#[cfg(windows)]
25use rmux_proto::{
26    encode_frame, FrameDecoder, HasSessionRequest, Request, Response, RmuxError, SessionName,
27};
28
29use crate::listener;
30#[cfg(windows)]
31use crate::server_access::current_owner_uid;
32
33#[cfg(all(test, unix))]
34const FALLBACK_SOCKET_ROOT: &str = "/tmp";
35#[cfg(unix)]
36const BOUND_SOCKET_MODE: u32 = 0o600;
37#[cfg(unix)]
38const UNSAFE_PERMISSION_MASK: u32 = 0o077;
39#[cfg(unix)]
40const SOCKET_DIR_PREFIX: &str = "rmux";
41
42/// Computes the default RMUX daemon socket path.
43///
44/// The path uses an rmux-specific per-user directory so it cannot collide with
45/// a real tmux server socket.
46pub fn default_socket_path() -> io::Result<PathBuf> {
47    rmux_ipc::default_endpoint().map(LocalEndpoint::into_path)
48}
49
50#[cfg(all(test, unix))]
51fn socket_root_from_env(tmpdir: Option<&std::ffi::OsStr>) -> io::Result<PathBuf> {
52    let tmpdir = tmpdir
53        .filter(|value| !value.is_empty())
54        .map(PathBuf::from)
55        .into_iter();
56    let candidates = tmpdir.chain(std::iter::once(PathBuf::from(FALLBACK_SOCKET_ROOT)));
57
58    for candidate in candidates {
59        if let Ok(resolved) = fs::canonicalize(&candidate) {
60            return Ok(resolved);
61        }
62    }
63
64    Err(io::Error::new(
65        io::ErrorKind::NotFound,
66        "no suitable rmux socket directory",
67    ))
68}
69
70/// Daemon configuration for a single RMUX server instance.
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct DaemonConfig {
73    socket_path: PathBuf,
74    config_load: ConfigLoadOptions,
75    subscription_limits: SubscriptionLimits,
76}
77
78impl DaemonConfig {
79    /// Builds a daemon configuration for the given socket path.
80    #[must_use]
81    pub fn new(socket_path: PathBuf) -> Self {
82        Self {
83            socket_path,
84            config_load: ConfigLoadOptions::disabled(),
85            subscription_limits: SubscriptionLimits::default(),
86        }
87    }
88
89    /// Builds a daemon configuration using the default spec socket path.
90    pub fn with_default_socket_path() -> io::Result<Self> {
91        Ok(Self::new(default_socket_path()?))
92    }
93
94    /// Returns the configured local IPC endpoint path.
95    #[must_use]
96    pub fn socket_path(&self) -> &Path {
97        &self.socket_path
98    }
99
100    /// Returns the startup config loading policy.
101    #[must_use]
102    pub const fn config_load(&self) -> &ConfigLoadOptions {
103        &self.config_load
104    }
105
106    /// Returns the pane-output subscription limits.
107    #[must_use]
108    pub fn subscription_limits(&self) -> SubscriptionLimits {
109        self.subscription_limits
110    }
111
112    /// Enables RMUX default startup config loading.
113    #[must_use]
114    pub fn with_default_config_load(mut self, quiet: bool, cwd: Option<PathBuf>) -> Self {
115        self.config_load = ConfigLoadOptions {
116            selection: ConfigFileSelection::Default,
117            quiet,
118            cwd,
119        };
120        self
121    }
122
123    /// Overrides pane-output subscription limits for this daemon.
124    #[must_use]
125    pub fn with_subscription_limits(mut self, subscription_limits: SubscriptionLimits) -> Self {
126        self.subscription_limits = subscription_limits;
127        self
128    }
129
130    /// Enables explicit `-f` startup config loading.
131    #[must_use]
132    pub fn with_config_files(
133        mut self,
134        files: Vec<PathBuf>,
135        quiet: bool,
136        cwd: Option<PathBuf>,
137    ) -> Self {
138        self.config_load = ConfigLoadOptions {
139            selection: ConfigFileSelection::Files(files),
140            quiet,
141            cwd,
142        };
143        self
144    }
145}
146
147/// Startup config loading policy for a daemon.
148#[derive(Debug, Clone, PartialEq, Eq)]
149pub struct ConfigLoadOptions {
150    selection: ConfigFileSelection,
151    quiet: bool,
152    cwd: Option<PathBuf>,
153}
154
155impl ConfigLoadOptions {
156    /// Builds a config policy that performs no startup config loading.
157    #[must_use]
158    pub const fn disabled() -> Self {
159        Self {
160            selection: ConfigFileSelection::Disabled,
161            quiet: true,
162            cwd: None,
163        }
164    }
165
166    /// Returns the selected config files mode.
167    #[must_use]
168    pub const fn selection(&self) -> &ConfigFileSelection {
169        &self.selection
170    }
171
172    /// Returns whether missing files should be suppressed.
173    #[must_use]
174    pub const fn quiet(&self) -> bool {
175        self.quiet
176    }
177
178    /// Returns the startup client's current working directory.
179    #[must_use]
180    pub fn cwd(&self) -> Option<&Path> {
181        self.cwd.as_deref()
182    }
183}
184
185/// Config file selection mode for daemon startup.
186#[derive(Debug, Clone, PartialEq, Eq)]
187pub enum ConfigFileSelection {
188    /// Do not load config files.
189    Disabled,
190    /// Load tmux's default config search path.
191    Default,
192    /// Load the explicit `-f` files in order.
193    Files(Vec<PathBuf>),
194}
195
196/// RMUX daemon launcher — call [`bind`](Self::bind) to start listening.
197#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct ServerDaemon {
199    config: DaemonConfig,
200}
201
202#[derive(Debug, Clone)]
203pub(crate) struct ShutdownHandle {
204    sender: Arc<StdMutex<Option<oneshot::Sender<()>>>>,
205}
206
207impl ShutdownHandle {
208    pub(crate) fn new() -> (Self, oneshot::Receiver<()>) {
209        let (sender, receiver) = oneshot::channel();
210        (
211            Self {
212                sender: Arc::new(StdMutex::new(Some(sender))),
213            },
214            receiver,
215        )
216    }
217
218    pub(crate) fn request_shutdown(&self) {
219        if let Some(sender) = self.sender.lock().expect("shutdown sender").take() {
220            let _ = sender.send(());
221        }
222    }
223}
224
225impl ServerDaemon {
226    /// Creates a daemon launcher for the given configuration.
227    #[must_use]
228    pub fn new(config: DaemonConfig) -> Self {
229        Self { config }
230    }
231
232    /// Binds the local IPC endpoint, starts accepting requests, and returns a handle.
233    pub async fn bind(self) -> io::Result<ServerHandle> {
234        #[cfg(unix)]
235        {
236            prepare_socket_path(self.config.socket_path())?;
237            let endpoint = LocalEndpoint::from_path(self.config.socket_path().to_path_buf());
238            let listener = LocalListener::bind(&endpoint)?;
239            enforce_bound_socket_permissions(self.config.socket_path())?;
240            let (shutdown_handle, shutdown_receiver) = ShutdownHandle::new();
241            let socket_path = self.config.socket_path().to_path_buf();
242            let owner_uid = real_user_id()?;
243
244            let task = tokio::spawn(listener::serve(
245                listener,
246                socket_path.clone(),
247                shutdown_handle.clone(),
248                shutdown_receiver,
249                self.config.config_load().clone(),
250                self.config.subscription_limits(),
251                owner_uid,
252            ));
253
254            Ok(ServerHandle {
255                socket_path,
256                shutdown_handle,
257                task: Some(task),
258            })
259        }
260
261        #[cfg(windows)]
262        {
263            let endpoint = LocalEndpoint::from_path(self.config.socket_path().to_path_buf());
264            let listener = bind_windows_listener(&endpoint)?;
265            let (shutdown_handle, shutdown_receiver) = ShutdownHandle::new();
266            let socket_path = self.config.socket_path().to_path_buf();
267            let owner_uid = current_owner_uid();
268
269            let task = tokio::spawn(listener::serve(
270                listener,
271                socket_path.clone(),
272                shutdown_handle.clone(),
273                shutdown_receiver,
274                self.config.config_load().clone(),
275                self.config.subscription_limits(),
276                owner_uid,
277            ));
278
279            Ok(ServerHandle {
280                socket_path,
281                shutdown_handle,
282                task: Some(task),
283            })
284        }
285    }
286}
287
288#[cfg(windows)]
289fn bind_windows_listener(endpoint: &LocalEndpoint) -> io::Result<LocalListener> {
290    match LocalListener::bind(endpoint) {
291        Ok(listener) => Ok(listener),
292        Err(bind_error) => Err(windows_bind_error(endpoint, bind_error)),
293    }
294}
295
296#[cfg(windows)]
297fn windows_bind_error(endpoint: &LocalEndpoint, bind_error: io::Error) -> io::Error {
298    if windows_pipe_responds(endpoint) {
299        return io::Error::new(
300            io::ErrorKind::AddrInUse,
301            format!(
302                "Windows named pipe '{}' is already held by a responsive rmux-compatible server",
303                endpoint.as_path().display()
304            ),
305        );
306    }
307
308    io::Error::new(
309        bind_error.kind(),
310        format!(
311            "failed to bind Windows named pipe '{}': {bind_error}. Another process may still be holding this endpoint",
312            endpoint.as_path().display()
313        ),
314    )
315}
316
317#[cfg(windows)]
318fn windows_pipe_responds(endpoint: &LocalEndpoint) -> bool {
319    let endpoint = endpoint.clone();
320    std::thread::spawn(move || windows_protocol_probe(&endpoint).unwrap_or(false))
321        .join()
322        .unwrap_or(false)
323}
324
325#[cfg(windows)]
326fn windows_protocol_probe(endpoint: &LocalEndpoint) -> io::Result<bool> {
327    let mut stream = connect_blocking(endpoint, Duration::from_millis(100))?;
328    stream.set_write_timeout(Some(Duration::from_millis(100)))?;
329    stream.set_read_timeout(Some(Duration::from_millis(100)))?;
330
331    let request = Request::HasSession(HasSessionRequest {
332        target: SessionName::new("__rmux_probe__").map_err(io::Error::other)?,
333    });
334    let frame = encode_frame(&request).map_err(io::Error::other)?;
335    stream.write_all(&frame)?;
336    stream.flush()?;
337
338    let mut decoder = FrameDecoder::new();
339    let mut buffer = [0_u8; 512];
340    loop {
341        let bytes_read = match stream.read(&mut buffer) {
342            Ok(0) => return Ok(false),
343            Ok(bytes_read) => bytes_read,
344            Err(error) if error.kind() == io::ErrorKind::TimedOut => return Ok(false),
345            Err(error) => return Err(error),
346        };
347        decoder.push_bytes(&buffer[..bytes_read]);
348        match decoder.next_frame::<Response>() {
349            Ok(Some(Response::HasSession(_))) => return Ok(true),
350            Ok(Some(_response)) => return Ok(false),
351            Ok(None) => continue,
352            Err(RmuxError::IncompleteFrame { .. }) => continue,
353            Err(_error) => return Ok(false),
354        }
355    }
356}
357
358/// Handle to a running RMUX daemon; dropping it triggers shutdown.
359#[derive(Debug)]
360pub struct ServerHandle {
361    socket_path: PathBuf,
362    shutdown_handle: ShutdownHandle,
363    task: Option<JoinHandle<io::Result<()>>>,
364}
365
366impl ServerHandle {
367    /// Returns the bound local IPC endpoint path for the running daemon.
368    #[must_use]
369    pub fn socket_path(&self) -> &Path {
370        &self.socket_path
371    }
372
373    /// Waits for the daemon task to exit after an external shutdown request.
374    pub async fn wait(mut self) -> io::Result<()> {
375        if let Some(task) = self.task.take() {
376            return task.await.map_err(io::Error::other)?;
377        }
378
379        Ok(())
380    }
381
382    /// Requests shutdown and waits for socket cleanup to complete.
383    pub async fn shutdown(mut self) -> io::Result<()> {
384        self.request_shutdown();
385
386        if let Some(task) = self.task.take() {
387            return task.await.map_err(io::Error::other)?;
388        }
389
390        Ok(())
391    }
392
393    fn request_shutdown(&mut self) {
394        self.shutdown_handle.request_shutdown();
395    }
396}
397
398impl Drop for ServerHandle {
399    fn drop(&mut self) {
400        self.request_shutdown();
401    }
402}
403
404#[cfg(unix)]
405fn prepare_socket_path(socket_path: &Path) -> io::Result<()> {
406    let parent = socket_path.parent().ok_or_else(|| {
407        io::Error::new(
408            io::ErrorKind::InvalidInput,
409            format!(
410                "socket path '{}' has no parent directory",
411                socket_path.display()
412            ),
413        )
414    })?;
415
416    ensure_parent_directory(parent)?;
417    remove_stale_socket_if_needed(socket_path)
418}
419
420#[cfg(unix)]
421fn ensure_parent_directory(parent: &Path) -> io::Result<()> {
422    let mut builder = fs::DirBuilder::new();
423    builder.recursive(true);
424    builder.mode(0o700);
425    match builder.create(parent) {
426        Ok(()) => {}
427        Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
428            if !fs::metadata(parent)?.is_dir() {
429                return Err(io::Error::new(
430                    io::ErrorKind::AlreadyExists,
431                    format!("'{}' exists and is not a directory", parent.display()),
432                ));
433            }
434        }
435        Err(error) => return Err(error),
436    }
437
438    ensure_directory(parent)?;
439    if let Some(managed_parent) = managed_rmux_socket_directory(parent)? {
440        ensure_safe_rmux_socket_directory(&managed_parent)?;
441    }
442
443    Ok(())
444}
445
446#[cfg(unix)]
447fn ensure_directory(path: &Path) -> io::Result<()> {
448    let metadata = fs::symlink_metadata(path)?;
449    if metadata.is_dir() {
450        return Ok(());
451    }
452
453    Err(io::Error::new(
454        io::ErrorKind::AlreadyExists,
455        format!("'{}' exists and is not a directory", path.display()),
456    ))
457}
458
459#[cfg(unix)]
460fn managed_rmux_socket_directory(path: &Path) -> io::Result<Option<PathBuf>> {
461    let expected = format!("{SOCKET_DIR_PREFIX}-{}", real_user_id()?);
462    Ok(path.ancestors().find_map(|ancestor| {
463        ancestor
464            .file_name()
465            .and_then(|name| name.to_str())
466            .filter(|name| *name == expected)
467            .map(|_| ancestor.to_path_buf())
468    }))
469}
470
471#[cfg(unix)]
472fn ensure_safe_rmux_socket_directory(path: &Path) -> io::Result<()> {
473    let metadata = fs::symlink_metadata(path)?;
474    if !metadata.is_dir() {
475        return Err(io::Error::new(
476            io::ErrorKind::AlreadyExists,
477            format!("{} is not a directory", path.display()),
478        ));
479    }
480
481    let user_id = real_user_id()?;
482    if metadata.uid() != user_id || (metadata.mode() & UNSAFE_PERMISSION_MASK) != 0 {
483        return Err(io::Error::new(
484            io::ErrorKind::PermissionDenied,
485            format!("directory {} has unsafe permissions", path.display()),
486        ));
487    }
488
489    Ok(())
490}
491
492#[cfg(unix)]
493fn enforce_bound_socket_permissions(socket_path: &Path) -> io::Result<()> {
494    validate_bound_socket(socket_path, false)?;
495    fs::set_permissions(socket_path, fs::Permissions::from_mode(BOUND_SOCKET_MODE))?;
496    validate_bound_socket(socket_path, true)
497}
498
499#[cfg(unix)]
500fn validate_bound_socket(socket_path: &Path, require_owner_only: bool) -> io::Result<()> {
501    let metadata = fs::symlink_metadata(socket_path)?;
502    if metadata.file_type().is_symlink() || !metadata.file_type().is_socket() {
503        return Err(io::Error::new(
504            io::ErrorKind::AlreadyExists,
505            format!(
506                "socket path '{}' is not a Unix socket",
507                socket_path.display()
508            ),
509        ));
510    }
511
512    let user_id = real_user_id()?;
513    if metadata.uid() != user_id {
514        return Err(io::Error::new(
515            io::ErrorKind::PermissionDenied,
516            format!("socket {} has unsafe ownership", socket_path.display()),
517        ));
518    }
519
520    if require_owner_only && (metadata.mode() & UNSAFE_PERMISSION_MASK) != 0 {
521        return Err(io::Error::new(
522            io::ErrorKind::PermissionDenied,
523            format!("socket {} has unsafe permissions", socket_path.display()),
524        ));
525    }
526
527    Ok(())
528}
529
530#[cfg(unix)]
531fn remove_stale_socket_if_needed(socket_path: &Path) -> io::Result<()> {
532    let metadata = match fs::symlink_metadata(socket_path) {
533        Ok(metadata) => metadata,
534        Err(error) if error.kind() == io::ErrorKind::NotFound => return Ok(()),
535        Err(error) => return Err(error),
536    };
537
538    if !metadata.file_type().is_socket() {
539        return Err(io::Error::new(
540            io::ErrorKind::AlreadyExists,
541            format!(
542                "socket path '{}' exists but is not a Unix socket",
543                socket_path.display()
544            ),
545        ));
546    }
547
548    match StdUnixStream::connect(socket_path) {
549        Ok(_stream) => Err(io::Error::new(
550            io::ErrorKind::AddrInUse,
551            format!("socket '{}' is already in use", socket_path.display()),
552        )),
553        Err(error) if indicates_stale_socket(&error) => {
554            debug!(
555                "removing stale socket '{}' after failed connect probe: {error}",
556                socket_path.display()
557            );
558            match fs::remove_file(socket_path) {
559                Ok(()) => Ok(()),
560                Err(remove_error) if remove_error.kind() == io::ErrorKind::NotFound => Ok(()),
561                Err(remove_error) => Err(remove_error),
562            }
563        }
564        Err(error) => Err(error),
565    }
566}
567
568#[cfg(unix)]
569fn indicates_stale_socket(error: &io::Error) -> bool {
570    matches!(
571        error.kind(),
572        io::ErrorKind::ConnectionRefused | io::ErrorKind::NotFound
573    )
574}
575
576#[cfg(unix)]
577pub(crate) fn real_user_id() -> io::Result<u32> {
578    Ok(rmux_os::identity::real_user_id())
579}
580
581#[cfg(all(test, unix))]
582#[path = "daemon_tests/unix.rs"]
583mod tests;
584
585#[cfg(all(test, windows))]
586#[path = "daemon_tests/windows.rs"]
587mod tests;