1use std::env;
4use std::fmt;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::process::{Command, Stdio};
8#[cfg(any(all(test, unix), not(any(unix, windows))))]
9use std::thread;
10use std::time::Duration;
11#[cfg(any(all(test, unix), not(any(unix, windows))))]
12use std::time::Instant;
13
14#[cfg(windows)]
15use std::os::windows::process::CommandExt;
16
17#[cfg(not(windows))]
18use rmux_proto::{ListSessionsRequest, Response};
19#[cfg(unix)]
20use rmux_sdk::bootstrap::startup_unix::{
21 connect_or_start_with, StartupError, StartupOutcome, DEFAULT_STARTUP_DEADLINE,
22 STARTUP_POLL_INTERVAL,
23};
24#[cfg(windows)]
25use rmux_sdk::bootstrap::startup_windows::{
26 connect_or_start_with, StartupError, StartupOutcome, DEFAULT_STARTUP_DEADLINE,
27 STARTUP_POLL_INTERVAL,
28};
29
30#[cfg(not(any(unix, windows)))]
31use crate::connect_or_absent;
32#[cfg(any(all(test, unix), not(any(unix, windows))))]
33use crate::ConnectResult;
34use crate::{ClientError, Connection};
35
36#[cfg(windows)]
37use windows_sys::Win32::Foundation::{ERROR_ACCESS_DENIED, ERROR_INVALID_PARAMETER};
38#[cfg(windows)]
39use windows_sys::Win32::System::Threading::{
40 CREATE_BREAKAWAY_FROM_JOB, CREATE_NEW_PROCESS_GROUP, CREATE_NO_WINDOW,
41 CREATE_UNICODE_ENVIRONMENT, DETACHED_PROCESS,
42};
43
44#[cfg(not(any(unix, windows)))]
45const AUTO_START_TIMEOUT: Duration = Duration::from_secs(5);
46#[cfg(not(any(unix, windows)))]
47const POLL_INTERVAL: Duration = Duration::from_millis(50);
48
49pub const INTERNAL_DAEMON_FLAG: &str = "--__internal-daemon";
54
55const BINARY_OVERRIDE_ENV: &str = "RMUX_INTERNAL_BINARY_PATH";
56const BINARY_OVERRIDE_TEST_OPT_IN_ENV: &str = "RMUX_ALLOW_INTERNAL_BINARY_OVERRIDE";
57
58#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct AutoStartConfig {
61 selection: AutoStartConfigSelection,
62 quiet: bool,
63 cwd: Option<PathBuf>,
64}
65
66impl AutoStartConfig {
67 #[must_use]
69 pub const fn disabled() -> Self {
70 Self {
71 selection: AutoStartConfigSelection::Disabled,
72 quiet: true,
73 cwd: None,
74 }
75 }
76
77 #[must_use]
79 pub fn default_files(quiet: bool, cwd: Option<PathBuf>) -> Self {
80 Self {
81 selection: AutoStartConfigSelection::Default,
82 quiet,
83 cwd,
84 }
85 }
86
87 #[must_use]
89 pub fn custom_files(files: Vec<PathBuf>, quiet: bool, cwd: Option<PathBuf>) -> Self {
90 Self {
91 selection: AutoStartConfigSelection::Files(files),
92 quiet,
93 cwd,
94 }
95 }
96
97 #[cfg(not(windows))]
98 fn loads_startup_config(&self) -> bool {
99 !matches!(self.selection, AutoStartConfigSelection::Disabled)
100 }
101
102 fn append_hidden_daemon_args(&self, command: &mut Command) {
103 match &self.selection {
104 AutoStartConfigSelection::Disabled => {}
105 AutoStartConfigSelection::Default => {
106 command.arg("--config-default");
107 }
108 AutoStartConfigSelection::Files(files) => {
109 for file in files {
110 command.arg("--config-file").arg(file);
111 }
112 }
113 }
114
115 if self.quiet {
116 command.arg("--config-quiet");
117 }
118 if let Some(cwd) = &self.cwd {
119 command.arg("--config-cwd").arg(cwd);
120 }
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
126pub enum AutoStartConfigSelection {
127 Disabled,
129 Default,
131 Files(Vec<PathBuf>),
133}
134
135pub fn ensure_server_running(socket_path: &Path) -> Result<Connection, AutoStartError> {
142 ensure_server_running_with_config(socket_path, AutoStartConfig::disabled())
143}
144
145#[cfg(unix)]
147pub fn ensure_server_running_with_config(
148 socket_path: &Path,
149 config: AutoStartConfig,
150) -> Result<Connection, AutoStartError> {
151 ensure_server_running_unix(socket_path, config)
152}
153
154#[cfg(windows)]
156pub fn ensure_server_running_with_config(
157 socket_path: &Path,
158 config: AutoStartConfig,
159) -> Result<Connection, AutoStartError> {
160 ensure_server_running_windows(socket_path, config)
161}
162
163#[cfg(not(any(unix, windows)))]
165pub fn ensure_server_running_with_config(
166 socket_path: &Path,
167 config: AutoStartConfig,
168) -> Result<Connection, AutoStartError> {
169 ensure_server_running_polling(socket_path, config)
170}
171
172#[cfg(unix)]
173fn ensure_server_running_unix(
174 socket_path: &Path,
175 config: AutoStartConfig,
176) -> Result<Connection, AutoStartError> {
177 let binary_path = rmux_binary_path().map_err(AutoStartError::BinaryPath)?;
178 let launcher_binary_path = binary_path.clone();
179 let launcher_socket_path = socket_path.to_path_buf();
180 let launcher_config = config.clone();
181
182 let runtime = tokio::runtime::Builder::new_current_thread()
183 .enable_all()
184 .build()
185 .map_err(|error| AutoStartError::Client(ClientError::Io(error)))?;
186 let outcome = runtime.block_on(connect_or_start_with(
187 socket_path,
188 move || async move {
189 spawn_hidden_daemon_for(
190 &launcher_binary_path,
191 &launcher_socket_path,
192 &launcher_config,
193 )
194 },
195 DEFAULT_STARTUP_DEADLINE,
196 STARTUP_POLL_INTERVAL,
197 ));
198
199 let mut connection = startup_outcome_into_connection(
200 outcome.map_err(|error| auto_start_error_from_startup(error, &binary_path, socket_path))?,
201 )?;
202 if !config.loads_startup_config() {
203 probe_server_readiness(&mut connection).map_err(AutoStartError::Client)?;
204 }
205
206 Ok(connection)
207}
208
209#[cfg(unix)]
210fn startup_outcome_into_connection(outcome: StartupOutcome) -> Result<Connection, AutoStartError> {
211 let stream = outcome
212 .into_stream()
213 .into_std()
214 .map_err(|error| AutoStartError::Client(ClientError::Io(error)))?;
215 stream
216 .set_nonblocking(false)
217 .map_err(|error| AutoStartError::Client(ClientError::Io(error)))?;
218 Connection::new(stream).map_err(AutoStartError::Client)
219}
220
221#[cfg(windows)]
222fn ensure_server_running_windows(
223 socket_path: &Path,
224 config: AutoStartConfig,
225) -> Result<Connection, AutoStartError> {
226 let binary_path = rmux_binary_path().map_err(AutoStartError::BinaryPath)?;
227 let launcher_binary_path = binary_path.clone();
228 let launcher_socket_path = socket_path.to_path_buf();
229 let launcher_config = config;
230
231 let runtime = tokio::runtime::Builder::new_current_thread()
232 .enable_all()
233 .build()
234 .map_err(|error| AutoStartError::Client(ClientError::Io(error)))?;
235 let outcome = runtime.block_on(connect_or_start_with(
236 socket_path,
237 move || async move {
238 spawn_hidden_daemon_for(
239 &launcher_binary_path,
240 &launcher_socket_path,
241 &launcher_config,
242 )
243 },
244 DEFAULT_STARTUP_DEADLINE,
245 STARTUP_POLL_INTERVAL,
246 ));
247
248 startup_outcome_into_connection(
249 outcome.map_err(|error| auto_start_error_from_startup(error, &binary_path, socket_path))?,
250 )
251}
252
253#[cfg(windows)]
254fn startup_outcome_into_connection(outcome: StartupOutcome) -> Result<Connection, AutoStartError> {
255 Connection::new(outcome.into_stream()).map_err(AutoStartError::Client)
256}
257
258#[cfg(unix)]
259fn auto_start_error_from_startup(
260 error: StartupError,
261 binary_path: &Path,
262 socket_path: &Path,
263) -> AutoStartError {
264 match error {
265 StartupError::Launcher { source } => AutoStartError::Launch {
266 path: binary_path.to_path_buf(),
267 error: source,
268 },
269 StartupError::StartupTimeout { waited, .. } => AutoStartError::TimedOut {
270 socket_path: socket_path.to_path_buf(),
271 waited,
272 },
273 error => AutoStartError::Client(ClientError::Io(io::Error::new(
274 startup_error_kind(&error),
275 error.to_string(),
276 ))),
277 }
278}
279
280#[cfg(windows)]
281fn auto_start_error_from_startup(
282 error: StartupError,
283 binary_path: &Path,
284 socket_path: &Path,
285) -> AutoStartError {
286 match error {
287 StartupError::Launcher { source } => AutoStartError::Launch {
288 path: binary_path.to_path_buf(),
289 error: source,
290 },
291 StartupError::StartupTimeout { waited, .. } => AutoStartError::TimedOut {
292 socket_path: socket_path.to_path_buf(),
293 waited,
294 },
295 error => AutoStartError::Client(ClientError::Io(io::Error::new(
296 startup_error_kind(&error),
297 error.to_string(),
298 ))),
299 }
300}
301
302#[cfg(unix)]
303fn startup_error_kind(error: &StartupError) -> io::ErrorKind {
304 match error {
305 StartupError::InvalidPath { .. } | StartupError::SymlinkRejected { .. } => {
306 io::ErrorKind::InvalidInput
307 }
308 StartupError::UnsafeOwner { .. }
309 | StartupError::UnsafePermissions { .. }
310 | StartupError::PeerCredentialMismatch { .. } => io::ErrorKind::PermissionDenied,
311 StartupError::Lock { source, .. } | StartupError::Filesystem { source, .. } => {
312 source.kind()
313 }
314 StartupError::Launcher { source } => source.kind(),
315 StartupError::StartupTimeout { .. } => io::ErrorKind::TimedOut,
316 }
317}
318
319#[cfg(windows)]
320fn startup_error_kind(error: &StartupError) -> io::ErrorKind {
321 match error {
322 StartupError::InvalidPipeName { .. } | StartupError::InvalidMutexName { .. } => {
323 io::ErrorKind::InvalidInput
324 }
325 StartupError::MutexAccessDenied { .. } | StartupError::PipeAccessDenied { .. } => {
326 io::ErrorKind::PermissionDenied
327 }
328 StartupError::MutexTimeout { .. }
329 | StartupError::PipeBusy { .. }
330 | StartupError::StartupTimeout { .. } => io::ErrorKind::TimedOut,
331 StartupError::PipeNotFound { .. } | StartupError::PipeNoData { .. } => {
332 io::ErrorKind::NotFound
333 }
334 StartupError::Mutex { source, .. } | StartupError::PipeIo { source, .. } => source.kind(),
335 StartupError::Launcher { source } => source.kind(),
336 }
337}
338
339#[cfg(not(any(unix, windows)))]
340fn ensure_server_running_polling(
341 socket_path: &Path,
342 config: AutoStartConfig,
343) -> Result<Connection, AutoStartError> {
344 if config.loads_startup_config() {
345 return ensure_server_running_with_probe(
346 socket_path,
347 AUTO_START_TIMEOUT,
348 POLL_INTERVAL,
349 || connect_or_absent(socket_path),
350 || launch_hidden_daemon(socket_path, &config),
351 |_| Ok(()),
352 );
353 }
354
355 ensure_server_running_with(
356 socket_path,
357 AUTO_START_TIMEOUT,
358 POLL_INTERVAL,
359 || connect_or_absent(socket_path),
360 || launch_hidden_daemon(socket_path, &config),
361 )
362}
363
364#[derive(Debug)]
366pub enum AutoStartError {
367 Client(ClientError),
369 BinaryPath(io::Error),
371 Launch {
373 path: PathBuf,
375 error: io::Error,
377 },
378 TimedOut {
380 socket_path: PathBuf,
382 waited: Duration,
384 },
385}
386
387impl fmt::Display for AutoStartError {
388 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
389 match self {
390 Self::Client(error) => write!(formatter, "{error}"),
391 Self::BinaryPath(error) => {
392 write!(formatter, "failed to resolve rmux binary path: {error}")
393 }
394 Self::Launch { path, error } => {
395 write!(
396 formatter,
397 "failed to launch hidden rmux daemon '{}': {error}",
398 path.display()
399 )
400 }
401 Self::TimedOut {
402 socket_path,
403 waited,
404 } => write!(
405 formatter,
406 "timed out after {}s waiting for rmux server socket '{}'",
407 waited.as_secs(),
408 socket_path.display()
409 ),
410 }
411 }
412}
413
414impl std::error::Error for AutoStartError {
415 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
416 match self {
417 Self::Client(error) => Some(error),
418 Self::BinaryPath(error) => Some(error),
419 Self::Launch { error, .. } => Some(error),
420 Self::TimedOut { .. } => None,
421 }
422 }
423}
424
425impl From<ClientError> for AutoStartError {
426 fn from(error: ClientError) -> Self {
427 Self::Client(error)
428 }
429}
430
431#[cfg(not(any(unix, windows)))]
432fn ensure_server_running_with<ConnectFn, LaunchFn>(
433 socket_path: &Path,
434 timeout: Duration,
435 poll_interval: Duration,
436 connect: ConnectFn,
437 launch: LaunchFn,
438) -> Result<Connection, AutoStartError>
439where
440 ConnectFn: FnMut() -> Result<ConnectResult, ClientError>,
441 LaunchFn: FnMut() -> Result<(), AutoStartError>,
442{
443 ensure_server_running_with_probe(
444 socket_path,
445 timeout,
446 poll_interval,
447 connect,
448 launch,
449 probe_server_readiness,
450 )
451}
452
453#[cfg(any(all(test, unix), not(any(unix, windows))))]
454fn ensure_server_running_with_probe<ConnectFn, LaunchFn, ProbeFn>(
455 socket_path: &Path,
456 timeout: Duration,
457 poll_interval: Duration,
458 mut connect: ConnectFn,
459 mut launch: LaunchFn,
460 mut probe: ProbeFn,
461) -> Result<Connection, AutoStartError>
462where
463 ConnectFn: FnMut() -> Result<ConnectResult, ClientError>,
464 LaunchFn: FnMut() -> Result<(), AutoStartError>,
465 ProbeFn: FnMut(&mut Connection) -> Result<(), ClientError>,
466{
467 match connect().map_err(AutoStartError::Client)? {
468 ConnectResult::Connected(mut connection) => {
469 probe(&mut connection).map_err(AutoStartError::Client)?;
470 return Ok(connection);
471 }
472 ConnectResult::Absent => {}
473 }
474
475 launch()?;
476 wait_for_server(
477 socket_path,
478 timeout,
479 poll_interval,
480 &mut connect,
481 &mut probe,
482 )
483}
484
485#[cfg(any(all(test, unix), not(any(unix, windows))))]
486fn wait_for_server<ConnectFn, ProbeFn>(
487 socket_path: &Path,
488 timeout: Duration,
489 poll_interval: Duration,
490 connect: &mut ConnectFn,
491 probe: &mut ProbeFn,
492) -> Result<Connection, AutoStartError>
493where
494 ConnectFn: FnMut() -> Result<ConnectResult, ClientError>,
495 ProbeFn: FnMut(&mut Connection) -> Result<(), ClientError>,
496{
497 let start = Instant::now();
498 let deadline = start + timeout;
499
500 loop {
501 match connect() {
502 Ok(ConnectResult::Connected(mut connection)) => match probe(&mut connection) {
503 Ok(()) => return Ok(connection),
504 Err(error) if is_transient_connect_error(&error) => {}
505 Err(error) => return Err(AutoStartError::Client(error)),
506 },
507 Ok(ConnectResult::Absent) => {}
508 Err(error) if is_transient_connect_error(&error) => {}
509 Err(error) => return Err(AutoStartError::Client(error)),
510 }
511
512 let now = Instant::now();
513 if now >= deadline {
514 return Err(AutoStartError::TimedOut {
515 socket_path: socket_path.to_path_buf(),
516 waited: timeout,
517 });
518 }
519
520 thread::sleep(poll_interval.min(deadline.saturating_duration_since(now)));
521 }
522}
523
524#[cfg(any(all(test, unix), not(any(unix, windows))))]
525fn is_transient_connect_error(error: &ClientError) -> bool {
526 matches!(
527 error,
528 ClientError::Io(io_error)
529 if matches!(
530 io_error.kind(),
531 io::ErrorKind::WouldBlock
532 | io::ErrorKind::Interrupted
533 | io::ErrorKind::TimedOut
534 )
535 )
536}
537
538#[cfg(not(windows))]
539fn probe_server_readiness(connection: &mut Connection) -> Result<(), ClientError> {
540 let response = connection.list_sessions(ListSessionsRequest {
541 format: None,
542 filter: None,
543 sort_order: None,
544 reversed: false,
545 })?;
546 match response {
547 Response::ListSessions(_) => Ok(()),
548 other => Err(ClientError::Protocol(rmux_proto::RmuxError::Server(
549 format!("unexpected readiness response: {other:?}"),
550 ))),
551 }
552}
553
554#[cfg(not(any(unix, windows)))]
555fn launch_hidden_daemon(
556 socket_path: &Path,
557 config: &AutoStartConfig,
558) -> Result<(), AutoStartError> {
559 let binary_path = rmux_binary_path().map_err(AutoStartError::BinaryPath)?;
560 spawn_hidden_daemon_for(&binary_path, socket_path, config).map_err(|error| {
561 AutoStartError::Launch {
562 path: binary_path,
563 error,
564 }
565 })
566}
567
568fn spawn_hidden_daemon_for(
569 binary_path: &Path,
570 socket_path: &Path,
571 config: &AutoStartConfig,
572) -> io::Result<()> {
573 let command = hidden_daemon_command(binary_path, socket_path, config, true);
574 match spawn_hidden_daemon(command) {
575 Ok(()) => Ok(()),
576 Err(error) if should_retry_hidden_daemon_without_breakaway(&error) => {
577 let command = hidden_daemon_command(binary_path, socket_path, config, false);
578 spawn_hidden_daemon(command)
579 }
580 Err(error) => Err(error),
581 }
582}
583
584fn hidden_daemon_command(
585 binary_path: &Path,
586 socket_path: &Path,
587 config: &AutoStartConfig,
588 allow_job_breakaway: bool,
589) -> Command {
590 let mut command = Command::new(binary_path);
591 command
592 .arg(INTERNAL_DAEMON_FLAG)
593 .arg(socket_path)
594 .stdin(Stdio::null())
595 .stdout(Stdio::null())
596 .stderr(Stdio::null());
597 config.append_hidden_daemon_args(&mut command);
598 configure_hidden_daemon_command(&mut command, allow_job_breakaway);
599 command
600}
601
602fn spawn_hidden_daemon(mut command: Command) -> io::Result<()> {
603 let child = command.spawn()?;
604 drop(child);
607 Ok(())
608}
609
610#[cfg(windows)]
611fn configure_hidden_daemon_command(command: &mut Command, allow_job_breakaway: bool) {
612 command.creation_flags(hidden_daemon_creation_flags(allow_job_breakaway));
613}
614
615#[cfg(not(windows))]
616fn configure_hidden_daemon_command(_command: &mut Command, _allow_job_breakaway: bool) {}
617
618#[cfg(windows)]
619fn hidden_daemon_creation_flags(allow_job_breakaway: bool) -> u32 {
620 let base =
621 DETACHED_PROCESS | CREATE_NO_WINDOW | CREATE_NEW_PROCESS_GROUP | CREATE_UNICODE_ENVIRONMENT;
622 if allow_job_breakaway {
623 base | CREATE_BREAKAWAY_FROM_JOB
624 } else {
625 base
626 }
627}
628
629#[cfg(windows)]
630fn should_retry_hidden_daemon_without_breakaway(error: &io::Error) -> bool {
631 matches!(
632 error.raw_os_error(),
633 Some(code)
634 if code == ERROR_ACCESS_DENIED as i32 || code == ERROR_INVALID_PARAMETER as i32
635 )
636}
637
638#[cfg(not(windows))]
639fn should_retry_hidden_daemon_without_breakaway(_error: &io::Error) -> bool {
640 false
641}
642
643fn rmux_binary_path() -> io::Result<PathBuf> {
644 let current_exe = env::current_exe()?;
645 match env::var_os(BINARY_OVERRIDE_ENV).filter(|_| binary_override_enabled_for_tests()) {
646 Some(path) => Ok(PathBuf::from(path)),
647 None => Ok(current_exe),
648 }
649}
650
651fn binary_override_enabled_for_tests() -> bool {
652 cfg!(debug_assertions)
653 && env::var_os(BINARY_OVERRIDE_TEST_OPT_IN_ENV).is_some_and(|value| value == "1")
654}
655
656#[cfg(all(test, unix))]
657#[path = "auto_start/tests.rs"]
658mod tests;
659
660#[cfg(all(test, windows))]
661mod windows_tests {
662 use std::io;
663
664 use super::*;
665
666 #[test]
667 fn hidden_daemon_flags_detach_console_and_preserve_unicode_env() {
668 let flags = hidden_daemon_creation_flags(true);
669
670 assert_ne!(flags & DETACHED_PROCESS, 0);
671 assert_ne!(flags & CREATE_NO_WINDOW, 0);
672 assert_ne!(flags & CREATE_NEW_PROCESS_GROUP, 0);
673 assert_ne!(flags & CREATE_UNICODE_ENVIRONMENT, 0);
674 assert_ne!(flags & CREATE_BREAKAWAY_FROM_JOB, 0);
675
676 let fallback_flags = hidden_daemon_creation_flags(false);
677 assert_ne!(fallback_flags & DETACHED_PROCESS, 0);
678 assert_ne!(fallback_flags & CREATE_NO_WINDOW, 0);
679 assert_eq!(fallback_flags & CREATE_BREAKAWAY_FROM_JOB, 0);
680 }
681
682 #[test]
683 fn hidden_daemon_retry_is_limited_to_breakaway_failures() {
684 assert!(should_retry_hidden_daemon_without_breakaway(
685 &io::Error::from_raw_os_error(ERROR_ACCESS_DENIED as i32)
686 ));
687 assert!(should_retry_hidden_daemon_without_breakaway(
688 &io::Error::from_raw_os_error(ERROR_INVALID_PARAMETER as i32)
689 ));
690 assert!(!should_retry_hidden_daemon_without_breakaway(
691 &io::Error::from_raw_os_error(2)
692 ));
693 }
694}