1#[cfg(unix)]
2use std::fs;
3use std::io;
4#[cfg(windows)]
5use std::io::{Read, Write};
6#[cfg(unix)]
7use std::os::unix::fs::{DirBuilderExt, FileTypeExt, MetadataExt, PermissionsExt};
8#[cfg(unix)]
9use std::os::unix::net::UnixStream as StdUnixStream;
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex as StdMutex};
12#[cfg(windows)]
13use std::time::Duration;
14
15use tokio::sync::oneshot;
16use tokio::task::JoinHandle;
17#[cfg(unix)]
18use tracing::debug;
19
20use rmux_core::events::SubscriptionLimits;
21#[cfg(windows)]
22use rmux_ipc::connect_blocking;
23use rmux_ipc::{LocalEndpoint, LocalListener};
24#[cfg(windows)]
25use rmux_proto::{
26 encode_frame, FrameDecoder, HasSessionRequest, Request, Response, RmuxError, SessionName,
27};
28
29use crate::listener;
30#[cfg(windows)]
31use crate::server_access::current_owner_uid;
32
33#[cfg(all(test, unix))]
34const FALLBACK_SOCKET_ROOT: &str = "/tmp";
35#[cfg(unix)]
36const BOUND_SOCKET_MODE: u32 = 0o600;
37#[cfg(unix)]
38const UNSAFE_PERMISSION_MASK: u32 = 0o077;
39#[cfg(unix)]
40const SOCKET_DIR_PREFIX: &str = "rmux";
41
42pub fn default_socket_path() -> io::Result<PathBuf> {
47 rmux_ipc::default_endpoint().map(LocalEndpoint::into_path)
48}
49
50#[cfg(all(test, unix))]
51fn socket_root_from_env(tmpdir: Option<&std::ffi::OsStr>) -> io::Result<PathBuf> {
52 let tmpdir = tmpdir
53 .filter(|value| !value.is_empty())
54 .map(PathBuf::from)
55 .into_iter();
56 let candidates = tmpdir.chain(std::iter::once(PathBuf::from(FALLBACK_SOCKET_ROOT)));
57
58 for candidate in candidates {
59 if let Ok(resolved) = fs::canonicalize(&candidate) {
60 return Ok(resolved);
61 }
62 }
63
64 Err(io::Error::new(
65 io::ErrorKind::NotFound,
66 "no suitable rmux socket directory",
67 ))
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct DaemonConfig {
73 socket_path: PathBuf,
74 config_load: ConfigLoadOptions,
75 subscription_limits: SubscriptionLimits,
76}
77
78impl DaemonConfig {
79 #[must_use]
81 pub fn new(socket_path: PathBuf) -> Self {
82 Self {
83 socket_path,
84 config_load: ConfigLoadOptions::disabled(),
85 subscription_limits: SubscriptionLimits::default(),
86 }
87 }
88
89 pub fn with_default_socket_path() -> io::Result<Self> {
91 Ok(Self::new(default_socket_path()?))
92 }
93
94 #[must_use]
96 pub fn socket_path(&self) -> &Path {
97 &self.socket_path
98 }
99
100 #[must_use]
102 pub const fn config_load(&self) -> &ConfigLoadOptions {
103 &self.config_load
104 }
105
106 #[must_use]
108 pub fn subscription_limits(&self) -> SubscriptionLimits {
109 self.subscription_limits
110 }
111
112 #[must_use]
114 pub fn with_default_config_load(mut self, quiet: bool, cwd: Option<PathBuf>) -> Self {
115 self.config_load = ConfigLoadOptions {
116 selection: ConfigFileSelection::Default,
117 quiet,
118 cwd,
119 };
120 self
121 }
122
123 #[must_use]
125 pub fn with_subscription_limits(mut self, subscription_limits: SubscriptionLimits) -> Self {
126 self.subscription_limits = subscription_limits;
127 self
128 }
129
130 #[must_use]
132 pub fn with_config_files(
133 mut self,
134 files: Vec<PathBuf>,
135 quiet: bool,
136 cwd: Option<PathBuf>,
137 ) -> Self {
138 self.config_load = ConfigLoadOptions {
139 selection: ConfigFileSelection::Files(files),
140 quiet,
141 cwd,
142 };
143 self
144 }
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
149pub struct ConfigLoadOptions {
150 selection: ConfigFileSelection,
151 quiet: bool,
152 cwd: Option<PathBuf>,
153}
154
155impl ConfigLoadOptions {
156 #[must_use]
158 pub const fn disabled() -> Self {
159 Self {
160 selection: ConfigFileSelection::Disabled,
161 quiet: true,
162 cwd: None,
163 }
164 }
165
166 #[must_use]
168 pub const fn selection(&self) -> &ConfigFileSelection {
169 &self.selection
170 }
171
172 #[must_use]
174 pub const fn quiet(&self) -> bool {
175 self.quiet
176 }
177
178 #[must_use]
180 pub fn cwd(&self) -> Option<&Path> {
181 self.cwd.as_deref()
182 }
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
187pub enum ConfigFileSelection {
188 Disabled,
190 Default,
192 Files(Vec<PathBuf>),
194}
195
196#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct ServerDaemon {
199 config: DaemonConfig,
200}
201
202#[derive(Debug, Clone)]
203pub(crate) struct ShutdownHandle {
204 sender: Arc<StdMutex<Option<oneshot::Sender<()>>>>,
205}
206
207impl ShutdownHandle {
208 pub(crate) fn new() -> (Self, oneshot::Receiver<()>) {
209 let (sender, receiver) = oneshot::channel();
210 (
211 Self {
212 sender: Arc::new(StdMutex::new(Some(sender))),
213 },
214 receiver,
215 )
216 }
217
218 pub(crate) fn request_shutdown(&self) {
219 if let Some(sender) = self.sender.lock().expect("shutdown sender").take() {
220 let _ = sender.send(());
221 }
222 }
223}
224
225impl ServerDaemon {
226 #[must_use]
228 pub fn new(config: DaemonConfig) -> Self {
229 Self { config }
230 }
231
232 pub async fn bind(self) -> io::Result<ServerHandle> {
234 #[cfg(unix)]
235 {
236 prepare_socket_path(self.config.socket_path())?;
237 let endpoint = LocalEndpoint::from_path(self.config.socket_path().to_path_buf());
238 let listener = LocalListener::bind(&endpoint)?;
239 enforce_bound_socket_permissions(self.config.socket_path())?;
240 let (shutdown_handle, shutdown_receiver) = ShutdownHandle::new();
241 let socket_path = self.config.socket_path().to_path_buf();
242 let owner_uid = real_user_id()?;
243
244 let task = tokio::spawn(listener::serve(
245 listener,
246 socket_path.clone(),
247 shutdown_handle.clone(),
248 shutdown_receiver,
249 self.config.config_load().clone(),
250 self.config.subscription_limits(),
251 owner_uid,
252 ));
253
254 Ok(ServerHandle {
255 socket_path,
256 shutdown_handle,
257 task: Some(task),
258 })
259 }
260
261 #[cfg(windows)]
262 {
263 let endpoint = LocalEndpoint::from_path(self.config.socket_path().to_path_buf());
264 let listener = bind_windows_listener(&endpoint)?;
265 let (shutdown_handle, shutdown_receiver) = ShutdownHandle::new();
266 let socket_path = self.config.socket_path().to_path_buf();
267 let owner_uid = current_owner_uid();
268
269 let task = tokio::spawn(listener::serve(
270 listener,
271 socket_path.clone(),
272 shutdown_handle.clone(),
273 shutdown_receiver,
274 self.config.config_load().clone(),
275 self.config.subscription_limits(),
276 owner_uid,
277 ));
278
279 Ok(ServerHandle {
280 socket_path,
281 shutdown_handle,
282 task: Some(task),
283 })
284 }
285 }
286}
287
288#[cfg(windows)]
289fn bind_windows_listener(endpoint: &LocalEndpoint) -> io::Result<LocalListener> {
290 match LocalListener::bind(endpoint) {
291 Ok(listener) => Ok(listener),
292 Err(bind_error) => Err(windows_bind_error(endpoint, bind_error)),
293 }
294}
295
296#[cfg(windows)]
297fn windows_bind_error(endpoint: &LocalEndpoint, bind_error: io::Error) -> io::Error {
298 if windows_pipe_responds(endpoint) {
299 return io::Error::new(
300 io::ErrorKind::AddrInUse,
301 format!(
302 "Windows named pipe '{}' is already held by a responsive rmux-compatible server",
303 endpoint.as_path().display()
304 ),
305 );
306 }
307
308 io::Error::new(
309 bind_error.kind(),
310 format!(
311 "failed to bind Windows named pipe '{}': {bind_error}. Another process may still be holding this endpoint",
312 endpoint.as_path().display()
313 ),
314 )
315}
316
317#[cfg(windows)]
318fn windows_pipe_responds(endpoint: &LocalEndpoint) -> bool {
319 let endpoint = endpoint.clone();
320 std::thread::spawn(move || windows_protocol_probe(&endpoint).unwrap_or(false))
321 .join()
322 .unwrap_or(false)
323}
324
325#[cfg(windows)]
326fn windows_protocol_probe(endpoint: &LocalEndpoint) -> io::Result<bool> {
327 let mut stream = connect_blocking(endpoint, Duration::from_millis(100))?;
328 stream.set_write_timeout(Some(Duration::from_millis(100)))?;
329 stream.set_read_timeout(Some(Duration::from_millis(100)))?;
330
331 let request = Request::HasSession(HasSessionRequest {
332 target: SessionName::new("__rmux_probe__").map_err(io::Error::other)?,
333 });
334 let frame = encode_frame(&request).map_err(io::Error::other)?;
335 stream.write_all(&frame)?;
336 stream.flush()?;
337
338 let mut decoder = FrameDecoder::new();
339 let mut buffer = [0_u8; 512];
340 loop {
341 let bytes_read = match stream.read(&mut buffer) {
342 Ok(0) => return Ok(false),
343 Ok(bytes_read) => bytes_read,
344 Err(error) if error.kind() == io::ErrorKind::TimedOut => return Ok(false),
345 Err(error) => return Err(error),
346 };
347 decoder.push_bytes(&buffer[..bytes_read]);
348 match decoder.next_frame::<Response>() {
349 Ok(Some(Response::HasSession(_))) => return Ok(true),
350 Ok(Some(_response)) => return Ok(false),
351 Ok(None) => continue,
352 Err(RmuxError::IncompleteFrame { .. }) => continue,
353 Err(_error) => return Ok(false),
354 }
355 }
356}
357
358#[derive(Debug)]
360pub struct ServerHandle {
361 socket_path: PathBuf,
362 shutdown_handle: ShutdownHandle,
363 task: Option<JoinHandle<io::Result<()>>>,
364}
365
366impl ServerHandle {
367 #[must_use]
369 pub fn socket_path(&self) -> &Path {
370 &self.socket_path
371 }
372
373 pub async fn wait(mut self) -> io::Result<()> {
375 if let Some(task) = self.task.take() {
376 return task.await.map_err(io::Error::other)?;
377 }
378
379 Ok(())
380 }
381
382 pub async fn shutdown(mut self) -> io::Result<()> {
384 self.request_shutdown();
385
386 if let Some(task) = self.task.take() {
387 return task.await.map_err(io::Error::other)?;
388 }
389
390 Ok(())
391 }
392
393 fn request_shutdown(&mut self) {
394 self.shutdown_handle.request_shutdown();
395 }
396}
397
398impl Drop for ServerHandle {
399 fn drop(&mut self) {
400 self.request_shutdown();
401 }
402}
403
404#[cfg(unix)]
405fn prepare_socket_path(socket_path: &Path) -> io::Result<()> {
406 let parent = socket_path.parent().ok_or_else(|| {
407 io::Error::new(
408 io::ErrorKind::InvalidInput,
409 format!(
410 "socket path '{}' has no parent directory",
411 socket_path.display()
412 ),
413 )
414 })?;
415
416 ensure_parent_directory(parent)?;
417 remove_stale_socket_if_needed(socket_path)
418}
419
420#[cfg(unix)]
421fn ensure_parent_directory(parent: &Path) -> io::Result<()> {
422 let mut builder = fs::DirBuilder::new();
423 builder.recursive(true);
424 builder.mode(0o700);
425 match builder.create(parent) {
426 Ok(()) => {}
427 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
428 if !fs::metadata(parent)?.is_dir() {
429 return Err(io::Error::new(
430 io::ErrorKind::AlreadyExists,
431 format!("'{}' exists and is not a directory", parent.display()),
432 ));
433 }
434 }
435 Err(error) => return Err(error),
436 }
437
438 ensure_directory(parent)?;
439 if let Some(managed_parent) = managed_rmux_socket_directory(parent)? {
440 ensure_safe_rmux_socket_directory(&managed_parent)?;
441 }
442
443 Ok(())
444}
445
446#[cfg(unix)]
447fn ensure_directory(path: &Path) -> io::Result<()> {
448 let metadata = fs::symlink_metadata(path)?;
449 if metadata.is_dir() {
450 return Ok(());
451 }
452
453 Err(io::Error::new(
454 io::ErrorKind::AlreadyExists,
455 format!("'{}' exists and is not a directory", path.display()),
456 ))
457}
458
459#[cfg(unix)]
460fn managed_rmux_socket_directory(path: &Path) -> io::Result<Option<PathBuf>> {
461 let expected = format!("{SOCKET_DIR_PREFIX}-{}", real_user_id()?);
462 Ok(path.ancestors().find_map(|ancestor| {
463 ancestor
464 .file_name()
465 .and_then(|name| name.to_str())
466 .filter(|name| *name == expected)
467 .map(|_| ancestor.to_path_buf())
468 }))
469}
470
471#[cfg(unix)]
472fn ensure_safe_rmux_socket_directory(path: &Path) -> io::Result<()> {
473 let metadata = fs::symlink_metadata(path)?;
474 if !metadata.is_dir() {
475 return Err(io::Error::new(
476 io::ErrorKind::AlreadyExists,
477 format!("{} is not a directory", path.display()),
478 ));
479 }
480
481 let user_id = real_user_id()?;
482 if metadata.uid() != user_id || (metadata.mode() & UNSAFE_PERMISSION_MASK) != 0 {
483 return Err(io::Error::new(
484 io::ErrorKind::PermissionDenied,
485 format!("directory {} has unsafe permissions", path.display()),
486 ));
487 }
488
489 Ok(())
490}
491
492#[cfg(unix)]
493fn enforce_bound_socket_permissions(socket_path: &Path) -> io::Result<()> {
494 validate_bound_socket(socket_path, false)?;
495 fs::set_permissions(socket_path, fs::Permissions::from_mode(BOUND_SOCKET_MODE))?;
496 validate_bound_socket(socket_path, true)
497}
498
499#[cfg(unix)]
500fn validate_bound_socket(socket_path: &Path, require_owner_only: bool) -> io::Result<()> {
501 let metadata = fs::symlink_metadata(socket_path)?;
502 if metadata.file_type().is_symlink() || !metadata.file_type().is_socket() {
503 return Err(io::Error::new(
504 io::ErrorKind::AlreadyExists,
505 format!(
506 "socket path '{}' is not a Unix socket",
507 socket_path.display()
508 ),
509 ));
510 }
511
512 let user_id = real_user_id()?;
513 if metadata.uid() != user_id {
514 return Err(io::Error::new(
515 io::ErrorKind::PermissionDenied,
516 format!("socket {} has unsafe ownership", socket_path.display()),
517 ));
518 }
519
520 if require_owner_only && (metadata.mode() & UNSAFE_PERMISSION_MASK) != 0 {
521 return Err(io::Error::new(
522 io::ErrorKind::PermissionDenied,
523 format!("socket {} has unsafe permissions", socket_path.display()),
524 ));
525 }
526
527 Ok(())
528}
529
530#[cfg(unix)]
531fn remove_stale_socket_if_needed(socket_path: &Path) -> io::Result<()> {
532 let metadata = match fs::symlink_metadata(socket_path) {
533 Ok(metadata) => metadata,
534 Err(error) if error.kind() == io::ErrorKind::NotFound => return Ok(()),
535 Err(error) => return Err(error),
536 };
537
538 if !metadata.file_type().is_socket() {
539 return Err(io::Error::new(
540 io::ErrorKind::AlreadyExists,
541 format!(
542 "socket path '{}' exists but is not a Unix socket",
543 socket_path.display()
544 ),
545 ));
546 }
547
548 match StdUnixStream::connect(socket_path) {
549 Ok(_stream) => Err(io::Error::new(
550 io::ErrorKind::AddrInUse,
551 format!("socket '{}' is already in use", socket_path.display()),
552 )),
553 Err(error) if indicates_stale_socket(&error) => {
554 debug!(
555 "removing stale socket '{}' after failed connect probe: {error}",
556 socket_path.display()
557 );
558 match fs::remove_file(socket_path) {
559 Ok(()) => Ok(()),
560 Err(remove_error) if remove_error.kind() == io::ErrorKind::NotFound => Ok(()),
561 Err(remove_error) => Err(remove_error),
562 }
563 }
564 Err(error) => Err(error),
565 }
566}
567
568#[cfg(unix)]
569fn indicates_stale_socket(error: &io::Error) -> bool {
570 matches!(
571 error.kind(),
572 io::ErrorKind::ConnectionRefused | io::ErrorKind::NotFound
573 )
574}
575
576#[cfg(unix)]
577pub(crate) fn real_user_id() -> io::Result<u32> {
578 Ok(rmux_os::identity::real_user_id())
579}
580
581#[cfg(all(test, unix))]
582#[path = "daemon_tests/unix.rs"]
583mod tests;
584
585#[cfg(all(test, windows))]
586#[path = "daemon_tests/windows.rs"]
587mod tests;