1use std::env;
4use std::fmt;
5use std::io;
6use std::path::{Path, PathBuf};
7use std::process::{Command, Stdio};
8#[cfg(any(all(test, unix), not(any(unix, windows))))]
9use std::thread;
10use std::time::Duration;
11#[cfg(any(all(test, unix), not(any(unix, windows))))]
12use std::time::Instant;
13
14#[cfg(not(windows))]
15use rmux_proto::{ListSessionsRequest, Response};
16#[cfg(unix)]
17use rmux_sdk::bootstrap::startup_unix::{
18 connect_or_start_with, StartupError, StartupOutcome, DEFAULT_STARTUP_DEADLINE,
19 STARTUP_POLL_INTERVAL,
20};
21#[cfg(windows)]
22use rmux_sdk::bootstrap::startup_windows::{
23 connect_or_start_with, StartupError, StartupOutcome, DEFAULT_STARTUP_DEADLINE,
24 STARTUP_POLL_INTERVAL,
25};
26
27#[cfg(not(any(unix, windows)))]
28use crate::connect_or_absent;
29#[cfg(any(all(test, unix), not(any(unix, windows))))]
30use crate::ConnectResult;
31use crate::{ClientError, Connection};
32
33#[cfg(not(any(unix, windows)))]
34const AUTO_START_TIMEOUT: Duration = Duration::from_secs(5);
35#[cfg(not(any(unix, windows)))]
36const POLL_INTERVAL: Duration = Duration::from_millis(50);
37
38pub const INTERNAL_DAEMON_FLAG: &str = "--__internal-daemon";
43
44const BINARY_OVERRIDE_ENV: &str = "RMUX_INTERNAL_BINARY_PATH";
45const BINARY_OVERRIDE_TEST_OPT_IN_ENV: &str = "RMUX_ALLOW_INTERNAL_BINARY_OVERRIDE";
46
47#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct AutoStartConfig {
50 selection: AutoStartConfigSelection,
51 quiet: bool,
52 cwd: Option<PathBuf>,
53}
54
55impl AutoStartConfig {
56 #[must_use]
58 pub const fn disabled() -> Self {
59 Self {
60 selection: AutoStartConfigSelection::Disabled,
61 quiet: true,
62 cwd: None,
63 }
64 }
65
66 #[must_use]
68 pub fn default_files(quiet: bool, cwd: Option<PathBuf>) -> Self {
69 Self {
70 selection: AutoStartConfigSelection::Default,
71 quiet,
72 cwd,
73 }
74 }
75
76 #[must_use]
78 pub fn custom_files(files: Vec<PathBuf>, quiet: bool, cwd: Option<PathBuf>) -> Self {
79 Self {
80 selection: AutoStartConfigSelection::Files(files),
81 quiet,
82 cwd,
83 }
84 }
85
86 #[cfg(not(windows))]
87 fn loads_startup_config(&self) -> bool {
88 !matches!(self.selection, AutoStartConfigSelection::Disabled)
89 }
90
91 fn append_hidden_daemon_args(&self, command: &mut Command) {
92 match &self.selection {
93 AutoStartConfigSelection::Disabled => {}
94 AutoStartConfigSelection::Default => {
95 command.arg("--config-default");
96 }
97 AutoStartConfigSelection::Files(files) => {
98 for file in files {
99 command.arg("--config-file").arg(file);
100 }
101 }
102 }
103
104 if self.quiet {
105 command.arg("--config-quiet");
106 }
107 if let Some(cwd) = &self.cwd {
108 command.arg("--config-cwd").arg(cwd);
109 }
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
115pub enum AutoStartConfigSelection {
116 Disabled,
118 Default,
120 Files(Vec<PathBuf>),
122}
123
124pub fn ensure_server_running(socket_path: &Path) -> Result<Connection, AutoStartError> {
131 ensure_server_running_with_config(socket_path, AutoStartConfig::disabled())
132}
133
134#[cfg(unix)]
136pub fn ensure_server_running_with_config(
137 socket_path: &Path,
138 config: AutoStartConfig,
139) -> Result<Connection, AutoStartError> {
140 ensure_server_running_unix(socket_path, config)
141}
142
143#[cfg(windows)]
145pub fn ensure_server_running_with_config(
146 socket_path: &Path,
147 config: AutoStartConfig,
148) -> Result<Connection, AutoStartError> {
149 ensure_server_running_windows(socket_path, config)
150}
151
152#[cfg(not(any(unix, windows)))]
154pub fn ensure_server_running_with_config(
155 socket_path: &Path,
156 config: AutoStartConfig,
157) -> Result<Connection, AutoStartError> {
158 ensure_server_running_polling(socket_path, config)
159}
160
161#[cfg(unix)]
162fn ensure_server_running_unix(
163 socket_path: &Path,
164 config: AutoStartConfig,
165) -> Result<Connection, AutoStartError> {
166 let binary_path = rmux_binary_path().map_err(AutoStartError::BinaryPath)?;
167 let launcher_binary_path = binary_path.clone();
168 let launcher_socket_path = socket_path.to_path_buf();
169 let launcher_config = config.clone();
170
171 let runtime = tokio::runtime::Builder::new_current_thread()
172 .enable_all()
173 .build()
174 .map_err(|error| AutoStartError::Client(ClientError::Io(error)))?;
175 let outcome = runtime.block_on(connect_or_start_with(
176 socket_path,
177 move || async move {
178 spawn_hidden_daemon_for(
179 &launcher_binary_path,
180 &launcher_socket_path,
181 &launcher_config,
182 )
183 },
184 DEFAULT_STARTUP_DEADLINE,
185 STARTUP_POLL_INTERVAL,
186 ));
187
188 let mut connection = startup_outcome_into_connection(
189 outcome.map_err(|error| auto_start_error_from_startup(error, &binary_path, socket_path))?,
190 )?;
191 if !config.loads_startup_config() {
192 probe_server_readiness(&mut connection).map_err(AutoStartError::Client)?;
193 }
194
195 Ok(connection)
196}
197
198#[cfg(unix)]
199fn startup_outcome_into_connection(outcome: StartupOutcome) -> Result<Connection, AutoStartError> {
200 let stream = outcome
201 .into_stream()
202 .into_std()
203 .map_err(|error| AutoStartError::Client(ClientError::Io(error)))?;
204 stream
205 .set_nonblocking(false)
206 .map_err(|error| AutoStartError::Client(ClientError::Io(error)))?;
207 Connection::new(stream).map_err(AutoStartError::Client)
208}
209
210#[cfg(windows)]
211fn ensure_server_running_windows(
212 socket_path: &Path,
213 config: AutoStartConfig,
214) -> Result<Connection, AutoStartError> {
215 let binary_path = rmux_binary_path().map_err(AutoStartError::BinaryPath)?;
216 let launcher_binary_path = binary_path.clone();
217 let launcher_socket_path = socket_path.to_path_buf();
218 let launcher_config = config;
219
220 let runtime = tokio::runtime::Builder::new_current_thread()
221 .enable_all()
222 .build()
223 .map_err(|error| AutoStartError::Client(ClientError::Io(error)))?;
224 let outcome = runtime.block_on(connect_or_start_with(
225 socket_path,
226 move || async move {
227 spawn_hidden_daemon_for(
228 &launcher_binary_path,
229 &launcher_socket_path,
230 &launcher_config,
231 )
232 },
233 DEFAULT_STARTUP_DEADLINE,
234 STARTUP_POLL_INTERVAL,
235 ));
236
237 startup_outcome_into_connection(
238 outcome.map_err(|error| auto_start_error_from_startup(error, &binary_path, socket_path))?,
239 )
240}
241
242#[cfg(windows)]
243fn startup_outcome_into_connection(outcome: StartupOutcome) -> Result<Connection, AutoStartError> {
244 Connection::new(outcome.into_stream()).map_err(AutoStartError::Client)
245}
246
247#[cfg(unix)]
248fn auto_start_error_from_startup(
249 error: StartupError,
250 binary_path: &Path,
251 socket_path: &Path,
252) -> AutoStartError {
253 match error {
254 StartupError::Launcher { source } => AutoStartError::Launch {
255 path: binary_path.to_path_buf(),
256 error: source,
257 },
258 StartupError::StartupTimeout { waited, .. } => AutoStartError::TimedOut {
259 socket_path: socket_path.to_path_buf(),
260 waited,
261 },
262 error => AutoStartError::Client(ClientError::Io(io::Error::new(
263 startup_error_kind(&error),
264 error.to_string(),
265 ))),
266 }
267}
268
269#[cfg(windows)]
270fn auto_start_error_from_startup(
271 error: StartupError,
272 binary_path: &Path,
273 socket_path: &Path,
274) -> AutoStartError {
275 match error {
276 StartupError::Launcher { source } => AutoStartError::Launch {
277 path: binary_path.to_path_buf(),
278 error: source,
279 },
280 StartupError::StartupTimeout { waited, .. } => AutoStartError::TimedOut {
281 socket_path: socket_path.to_path_buf(),
282 waited,
283 },
284 error => AutoStartError::Client(ClientError::Io(io::Error::new(
285 startup_error_kind(&error),
286 error.to_string(),
287 ))),
288 }
289}
290
291#[cfg(unix)]
292fn startup_error_kind(error: &StartupError) -> io::ErrorKind {
293 match error {
294 StartupError::InvalidPath { .. } | StartupError::SymlinkRejected { .. } => {
295 io::ErrorKind::InvalidInput
296 }
297 StartupError::UnsafeOwner { .. }
298 | StartupError::UnsafePermissions { .. }
299 | StartupError::PeerCredentialMismatch { .. } => io::ErrorKind::PermissionDenied,
300 StartupError::Lock { source, .. } | StartupError::Filesystem { source, .. } => {
301 source.kind()
302 }
303 StartupError::Launcher { source } => source.kind(),
304 StartupError::StartupTimeout { .. } => io::ErrorKind::TimedOut,
305 }
306}
307
308#[cfg(windows)]
309fn startup_error_kind(error: &StartupError) -> io::ErrorKind {
310 match error {
311 StartupError::InvalidPipeName { .. } | StartupError::InvalidMutexName { .. } => {
312 io::ErrorKind::InvalidInput
313 }
314 StartupError::MutexAccessDenied { .. } | StartupError::PipeAccessDenied { .. } => {
315 io::ErrorKind::PermissionDenied
316 }
317 StartupError::MutexTimeout { .. }
318 | StartupError::PipeBusy { .. }
319 | StartupError::StartupTimeout { .. } => io::ErrorKind::TimedOut,
320 StartupError::PipeNotFound { .. } | StartupError::PipeNoData { .. } => {
321 io::ErrorKind::NotFound
322 }
323 StartupError::Mutex { source, .. } | StartupError::PipeIo { source, .. } => source.kind(),
324 StartupError::Launcher { source } => source.kind(),
325 }
326}
327
328#[cfg(not(any(unix, windows)))]
329fn ensure_server_running_polling(
330 socket_path: &Path,
331 config: AutoStartConfig,
332) -> Result<Connection, AutoStartError> {
333 if config.loads_startup_config() {
334 return ensure_server_running_with_probe(
335 socket_path,
336 AUTO_START_TIMEOUT,
337 POLL_INTERVAL,
338 || connect_or_absent(socket_path),
339 || launch_hidden_daemon(socket_path, &config),
340 |_| Ok(()),
341 );
342 }
343
344 ensure_server_running_with(
345 socket_path,
346 AUTO_START_TIMEOUT,
347 POLL_INTERVAL,
348 || connect_or_absent(socket_path),
349 || launch_hidden_daemon(socket_path, &config),
350 )
351}
352
353#[derive(Debug)]
355pub enum AutoStartError {
356 Client(ClientError),
358 BinaryPath(io::Error),
360 Launch {
362 path: PathBuf,
364 error: io::Error,
366 },
367 TimedOut {
369 socket_path: PathBuf,
371 waited: Duration,
373 },
374}
375
376impl fmt::Display for AutoStartError {
377 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
378 match self {
379 Self::Client(error) => write!(formatter, "{error}"),
380 Self::BinaryPath(error) => {
381 write!(formatter, "failed to resolve rmux binary path: {error}")
382 }
383 Self::Launch { path, error } => {
384 write!(
385 formatter,
386 "failed to launch hidden rmux daemon '{}': {error}",
387 path.display()
388 )
389 }
390 Self::TimedOut {
391 socket_path,
392 waited,
393 } => write!(
394 formatter,
395 "timed out after {}s waiting for rmux server socket '{}'",
396 waited.as_secs(),
397 socket_path.display()
398 ),
399 }
400 }
401}
402
403impl std::error::Error for AutoStartError {
404 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
405 match self {
406 Self::Client(error) => Some(error),
407 Self::BinaryPath(error) => Some(error),
408 Self::Launch { error, .. } => Some(error),
409 Self::TimedOut { .. } => None,
410 }
411 }
412}
413
414impl From<ClientError> for AutoStartError {
415 fn from(error: ClientError) -> Self {
416 Self::Client(error)
417 }
418}
419
420#[cfg(not(any(unix, windows)))]
421fn ensure_server_running_with<ConnectFn, LaunchFn>(
422 socket_path: &Path,
423 timeout: Duration,
424 poll_interval: Duration,
425 connect: ConnectFn,
426 launch: LaunchFn,
427) -> Result<Connection, AutoStartError>
428where
429 ConnectFn: FnMut() -> Result<ConnectResult, ClientError>,
430 LaunchFn: FnMut() -> Result<(), AutoStartError>,
431{
432 ensure_server_running_with_probe(
433 socket_path,
434 timeout,
435 poll_interval,
436 connect,
437 launch,
438 probe_server_readiness,
439 )
440}
441
442#[cfg(any(all(test, unix), not(any(unix, windows))))]
443fn ensure_server_running_with_probe<ConnectFn, LaunchFn, ProbeFn>(
444 socket_path: &Path,
445 timeout: Duration,
446 poll_interval: Duration,
447 mut connect: ConnectFn,
448 mut launch: LaunchFn,
449 mut probe: ProbeFn,
450) -> Result<Connection, AutoStartError>
451where
452 ConnectFn: FnMut() -> Result<ConnectResult, ClientError>,
453 LaunchFn: FnMut() -> Result<(), AutoStartError>,
454 ProbeFn: FnMut(&mut Connection) -> Result<(), ClientError>,
455{
456 match connect().map_err(AutoStartError::Client)? {
457 ConnectResult::Connected(mut connection) => {
458 probe(&mut connection).map_err(AutoStartError::Client)?;
459 return Ok(connection);
460 }
461 ConnectResult::Absent => {}
462 }
463
464 launch()?;
465 wait_for_server(
466 socket_path,
467 timeout,
468 poll_interval,
469 &mut connect,
470 &mut probe,
471 )
472}
473
474#[cfg(any(all(test, unix), not(any(unix, windows))))]
475fn wait_for_server<ConnectFn, ProbeFn>(
476 socket_path: &Path,
477 timeout: Duration,
478 poll_interval: Duration,
479 connect: &mut ConnectFn,
480 probe: &mut ProbeFn,
481) -> Result<Connection, AutoStartError>
482where
483 ConnectFn: FnMut() -> Result<ConnectResult, ClientError>,
484 ProbeFn: FnMut(&mut Connection) -> Result<(), ClientError>,
485{
486 let start = Instant::now();
487 let deadline = start + timeout;
488
489 loop {
490 match connect() {
491 Ok(ConnectResult::Connected(mut connection)) => match probe(&mut connection) {
492 Ok(()) => return Ok(connection),
493 Err(error) if is_transient_connect_error(&error) => {}
494 Err(error) => return Err(AutoStartError::Client(error)),
495 },
496 Ok(ConnectResult::Absent) => {}
497 Err(error) if is_transient_connect_error(&error) => {}
498 Err(error) => return Err(AutoStartError::Client(error)),
499 }
500
501 let now = Instant::now();
502 if now >= deadline {
503 return Err(AutoStartError::TimedOut {
504 socket_path: socket_path.to_path_buf(),
505 waited: timeout,
506 });
507 }
508
509 thread::sleep(poll_interval.min(deadline.saturating_duration_since(now)));
510 }
511}
512
513#[cfg(any(all(test, unix), not(any(unix, windows))))]
514fn is_transient_connect_error(error: &ClientError) -> bool {
515 matches!(
516 error,
517 ClientError::Io(io_error)
518 if matches!(
519 io_error.kind(),
520 io::ErrorKind::WouldBlock
521 | io::ErrorKind::Interrupted
522 | io::ErrorKind::TimedOut
523 )
524 )
525}
526
527#[cfg(not(windows))]
528fn probe_server_readiness(connection: &mut Connection) -> Result<(), ClientError> {
529 let response = connection.list_sessions(ListSessionsRequest {
530 format: None,
531 filter: None,
532 sort_order: None,
533 reversed: false,
534 })?;
535 match response {
536 Response::ListSessions(_) => Ok(()),
537 other => Err(ClientError::Protocol(rmux_proto::RmuxError::Server(
538 format!("unexpected readiness response: {other:?}"),
539 ))),
540 }
541}
542
543#[cfg(not(any(unix, windows)))]
544fn launch_hidden_daemon(
545 socket_path: &Path,
546 config: &AutoStartConfig,
547) -> Result<(), AutoStartError> {
548 let binary_path = rmux_binary_path().map_err(AutoStartError::BinaryPath)?;
549 spawn_hidden_daemon_for(&binary_path, socket_path, config).map_err(|error| {
550 AutoStartError::Launch {
551 path: binary_path,
552 error,
553 }
554 })
555}
556
557fn spawn_hidden_daemon_for(
558 binary_path: &Path,
559 socket_path: &Path,
560 config: &AutoStartConfig,
561) -> io::Result<()> {
562 let command = hidden_daemon_command(binary_path, socket_path, config, true);
563 match spawn_hidden_daemon(command) {
564 Ok(()) => Ok(()),
565 Err(error) if rmux_os::daemon::should_retry_hidden_daemon_without_breakaway(&error) => {
566 let command = hidden_daemon_command(binary_path, socket_path, config, false);
567 spawn_hidden_daemon(command)
568 }
569 Err(error) => Err(error),
570 }
571}
572
573fn hidden_daemon_command(
574 binary_path: &Path,
575 socket_path: &Path,
576 config: &AutoStartConfig,
577 allow_job_breakaway: bool,
578) -> Command {
579 let mut command = Command::new(binary_path);
580 command
581 .arg(INTERNAL_DAEMON_FLAG)
582 .arg(socket_path)
583 .stdin(Stdio::null())
584 .stdout(Stdio::null())
585 .stderr(Stdio::null());
586 config.append_hidden_daemon_args(&mut command);
587 rmux_os::daemon::configure_hidden_daemon_command(&mut command, allow_job_breakaway);
588 command
589}
590
591fn spawn_hidden_daemon(mut command: Command) -> io::Result<()> {
592 let child = command.spawn()?;
593 drop(child);
596 Ok(())
597}
598
599fn rmux_binary_path() -> io::Result<PathBuf> {
600 let current_exe = env::current_exe()?;
601 match env::var_os(BINARY_OVERRIDE_ENV).filter(|_| binary_override_enabled_for_tests()) {
602 Some(path) => Ok(PathBuf::from(path)),
603 None => Ok(current_exe),
604 }
605}
606
607fn binary_override_enabled_for_tests() -> bool {
608 cfg!(debug_assertions)
609 && env::var_os(BINARY_OVERRIDE_TEST_OPT_IN_ENV).is_some_and(|value| value == "1")
610}
611
612#[cfg(all(test, unix))]
613#[path = "auto_start/tests.rs"]
614mod tests;