#![cfg(windows)]
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::io;
use std::path::{Path, PathBuf};
use std::time::Duration;
use rmux_ipc::{BlockingLocalStream, LocalEndpoint};
use crate::bootstrap::deadline::StartupDeadline;
#[path = "startup_windows/mutex.rs"]
mod mutex;
#[path = "startup_windows/name.rs"]
mod name;
#[path = "startup_windows/probe.rs"]
mod probe;
use mutex::acquire_startup_mutex;
use name::{startup_mutex_name, validate_pipe_name};
use probe::{probe_responsive, wait_for_daemon};
const PIPE_PREFIX: &str = r"\\.\pipe\";
const STARTUP_MUTEX_PREFIX: &str = r"Local\rmux-startup-";
const PROBE_CONNECT_TIMEOUT: Duration = Duration::from_millis(200);
const PROBE_IO_TIMEOUT: Duration = Duration::from_millis(250);
const PROBE_SESSION_NAME: &str = "__rmux_startup_probe__";
pub const DEFAULT_STARTUP_DEADLINE: Duration = Duration::from_secs(5);
pub const STARTUP_POLL_INTERVAL: Duration = Duration::from_millis(50);
#[derive(Debug)]
pub enum StartupOutcome {
Started(BlockingLocalStream),
JoinedExisting(BlockingLocalStream),
}
impl StartupOutcome {
#[must_use]
pub fn into_stream(self) -> BlockingLocalStream {
match self {
Self::Started(stream) | Self::JoinedExisting(stream) => stream,
}
}
#[must_use]
pub const fn is_owner(&self) -> bool {
matches!(self, Self::Started(_))
}
}
#[derive(Debug)]
pub enum StartupError {
InvalidPipeName {
reason: String,
pipe_name: PathBuf,
},
InvalidMutexName {
reason: String,
pipe_name: PathBuf,
},
Mutex {
pipe_name: PathBuf,
source: io::Error,
},
MutexTimeout {
pipe_name: PathBuf,
waited: Duration,
},
MutexAccessDenied {
pipe_name: PathBuf,
source: io::Error,
},
PipeBusy {
pipe_name: PathBuf,
},
PipeNotFound {
pipe_name: PathBuf,
},
PipeNoData {
pipe_name: PathBuf,
},
PipeAccessDenied {
pipe_name: PathBuf,
},
PipeIo {
operation: &'static str,
pipe_name: PathBuf,
source: io::Error,
},
Launcher {
source: io::Error,
},
StartupTimeout {
pipe_name: PathBuf,
waited: Duration,
},
}
impl StartupError {
#[must_use]
pub const fn is_recoverable(&self) -> bool {
matches!(
self,
Self::Mutex { .. }
| Self::MutexTimeout { .. }
| Self::PipeBusy { .. }
| Self::PipeNotFound { .. }
| Self::PipeNoData { .. }
| Self::Launcher { .. }
| Self::StartupTimeout { .. }
)
}
}
impl fmt::Display for StartupError {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidPipeName { reason, pipe_name } => write!(
formatter,
"rmux startup rejected pipe '{}': {reason}",
pipe_name.display()
),
Self::InvalidMutexName { reason, pipe_name } => write!(
formatter,
"rmux startup rejected mutex name for '{}': {reason}",
pipe_name.display()
),
Self::Mutex { pipe_name, source } => write!(
formatter,
"rmux startup mutex for '{}' failed: {source}",
pipe_name.display()
),
Self::MutexTimeout { pipe_name, waited } => write!(
formatter,
"rmux startup mutex for '{}' timed out after {}ms",
pipe_name.display(),
waited.as_millis()
),
Self::MutexAccessDenied { pipe_name, source } => write!(
formatter,
"rmux startup mutex for '{}' denied for current user: {source}",
pipe_name.display()
),
Self::PipeBusy { pipe_name } => write!(
formatter,
"rmux pipe '{}' is busy on every instance",
pipe_name.display()
),
Self::PipeNotFound { pipe_name } => write!(
formatter,
"rmux pipe '{}' is not currently served",
pipe_name.display()
),
Self::PipeNoData { pipe_name } => write!(
formatter,
"rmux pipe '{}' closed mid-handshake",
pipe_name.display()
),
Self::PipeAccessDenied { pipe_name } => write!(
formatter,
"rmux pipe '{}' denied current user access",
pipe_name.display()
),
Self::PipeIo {
operation,
pipe_name,
source,
} => write!(
formatter,
"rmux pipe '{}' failed to {operation}: {source}",
pipe_name.display()
),
Self::Launcher { source } => {
write!(formatter, "rmux startup launcher failed: {source}")
}
Self::StartupTimeout { pipe_name, waited } => write!(
formatter,
"rmux startup timed out after {}ms waiting for '{}' to answer",
waited.as_millis(),
pipe_name.display()
),
}
}
}
impl Error for StartupError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Mutex { source, .. }
| Self::MutexAccessDenied { source, .. }
| Self::PipeIo { source, .. }
| Self::Launcher { source } => Some(source),
_ => None,
}
}
}
pub async fn connect_or_start<L, F>(
pipe_name: &Path,
launcher: L,
) -> Result<StartupOutcome, StartupError>
where
L: FnOnce() -> F,
F: Future<Output = io::Result<()>>,
{
connect_or_start_with(
pipe_name,
launcher,
DEFAULT_STARTUP_DEADLINE,
STARTUP_POLL_INTERVAL,
)
.await
}
pub async fn connect_or_start_with<L, F>(
pipe_name: &Path,
launcher: L,
deadline: Duration,
poll_interval: Duration,
) -> Result<StartupOutcome, StartupError>
where
L: FnOnce() -> F,
F: Future<Output = io::Result<()>>,
{
connect_or_start_with_timeout(pipe_name, launcher, Some(deadline), poll_interval).await
}
pub async fn connect_or_start_with_timeout<L, F>(
pipe_name: &Path,
launcher: L,
deadline: Option<Duration>,
poll_interval: Duration,
) -> Result<StartupOutcome, StartupError>
where
L: FnOnce() -> F,
F: Future<Output = io::Result<()>>,
{
let deadline = StartupDeadline::from_timeout(deadline);
validate_pipe_name(pipe_name)?;
let endpoint = LocalEndpoint::from_path(pipe_name.to_path_buf());
if let Some(stream) = probe_responsive(&endpoint, pipe_name).await? {
return Ok(StartupOutcome::JoinedExisting(stream));
}
let mutex_name = startup_mutex_name(pipe_name)?;
let _guard = acquire_startup_mutex(pipe_name, &mutex_name, deadline).await?;
if let Some(stream) = probe_responsive(&endpoint, pipe_name).await? {
return Ok(StartupOutcome::JoinedExisting(stream));
}
launcher()
.await
.map_err(|source| StartupError::Launcher { source })?;
let stream = wait_for_daemon(&endpoint, pipe_name, deadline, poll_interval).await?;
drop(_guard);
Ok(StartupOutcome::Started(stream))
}
#[cfg(test)]
#[path = "startup_windows/tests.rs"]
mod tests;