use std::ffi::OsStr;
use std::io;
use std::path::Path;
use std::sync::mpsc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use rmux_ipc::{acquire_named_mutex, NamedMutexAcquire, NamedMutexError};
use super::StartupError;
use crate::bootstrap::deadline::StartupDeadline;
pub(super) struct StartupMutexHolder {
pub(super) release: Option<mpsc::SyncSender<()>>,
pub(super) thread: Option<JoinHandle<()>>,
}
impl StartupMutexHolder {
pub(super) fn release(&mut self) {
if let Some(tx) = self.release.take() {
let _ = tx.send(());
}
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
}
}
impl Drop for StartupMutexHolder {
fn drop(&mut self) {
self.release();
}
}
pub(super) async fn acquire_startup_mutex(
pipe_name: &Path,
mutex_name: &OsStr,
deadline: StartupDeadline,
) -> Result<StartupMutexHolder, StartupError> {
let pipe_owned = pipe_name.to_path_buf();
let mutex_owned = mutex_name.to_owned();
let (acquire_tx, acquire_rx) = tokio::sync::oneshot::channel();
let (release_tx, release_rx) = mpsc::sync_channel::<()>(1);
let thread = thread::Builder::new()
.name("rmux-startup-mutex".to_owned())
.spawn(move || {
let mutex_wait = deadline.requested_timeout().unwrap_or(Duration::MAX);
let outcome = acquire_named_mutex(&mutex_owned, mutex_wait);
match outcome {
Ok(NamedMutexAcquire::Created(guard))
| Ok(NamedMutexAcquire::Opened(guard))
| Ok(NamedMutexAcquire::Abandoned(guard)) => {
if acquire_tx.send(Ok(())).is_err() {
drop(guard);
return;
}
let _ = release_rx.recv();
drop(guard);
}
Err(error) => {
let _ = acquire_tx.send(Err(error));
}
}
})
.map_err(|source| StartupError::Mutex {
pipe_name: pipe_owned.clone(),
source,
})?;
let acquired = acquire_rx.await.map_err(|_canceled| StartupError::Mutex {
pipe_name: pipe_owned.clone(),
source: io::Error::other("startup mutex thread exited before reporting an outcome"),
})?;
match acquired {
Ok(()) => Ok(StartupMutexHolder {
release: Some(release_tx),
thread: Some(thread),
}),
Err(error) => {
let _ = thread.join();
Err(map_named_mutex_error(error, pipe_name, deadline))
}
}
}
fn map_named_mutex_error(
error: NamedMutexError,
pipe_name: &Path,
deadline: StartupDeadline,
) -> StartupError {
match error {
NamedMutexError::TimedOut => StartupError::MutexTimeout {
pipe_name: pipe_name.to_path_buf(),
waited: deadline.requested_timeout().unwrap_or(Duration::MAX),
},
NamedMutexError::AccessDenied(source) => StartupError::MutexAccessDenied {
pipe_name: pipe_name.to_path_buf(),
source,
},
NamedMutexError::InvalidName { reason } => StartupError::InvalidMutexName {
reason,
pipe_name: pipe_name.to_path_buf(),
},
NamedMutexError::SecurityDescriptor(source)
| NamedMutexError::Create(source)
| NamedMutexError::Wait(source) => StartupError::Mutex {
pipe_name: pipe_name.to_path_buf(),
source,
},
}
}