use std::io;
use std::time::{Duration, Instant};
use crate::broker::backend_lifecycle::identity::IdentityError;
use crate::broker::backend_lifecycle::probe::{self, ProbeError};
use crate::broker::backend_lifecycle::verify_pid::{self, ProcessHandle, VerifyPidError};
use crate::broker::protocol::{CacheManifest, Endpoint};
pub use crate::broker::backend_lifecycle::DaemonProcess;
pub type Result<T> = std::result::Result<T, BackendHandleError>;
pub struct BackendHandle {
pub service_name: String,
pub service_version: String,
pub daemon_process: DaemonProcess,
#[cfg(unix)]
pub(crate) pid_handle: Option<ProcessHandle>,
#[cfg(windows)]
pub(crate) process_handle: Option<ProcessHandle>,
}
impl BackendHandle {
pub fn probe(endpoint: &Endpoint, expected: &DaemonProcess) -> Option<Self> {
Self::probe_with_service("", "", endpoint, expected).ok()
}
#[cfg(feature = "client-async")]
pub async fn probe_async(endpoint: &Endpoint, expected: &DaemonProcess) -> Option<Self> {
Self::probe_with_service_async("", "", endpoint, expected)
.await
.ok()
}
pub fn probe_with_service(
service_name: impl Into<String>,
service_version: impl Into<String>,
endpoint: &Endpoint,
expected: &DaemonProcess,
) -> Result<Self> {
let process_handle = probe::probe_endpoint(endpoint, expected)?;
Ok(Self::from_verified(
service_name.into(),
service_version.into(),
expected.clone(),
process_handle,
))
}
#[cfg(feature = "client-async")]
pub async fn probe_with_service_async(
service_name: impl Into<String>,
service_version: impl Into<String>,
endpoint: &Endpoint,
expected: &DaemonProcess,
) -> Result<Self> {
let process_handle =
crate::broker::backend_lifecycle::probe_async::probe_endpoint_async(endpoint, expected)
.await?;
Ok(Self::from_verified(
service_name.into(),
service_version.into(),
expected.clone(),
process_handle,
))
}
pub fn probe_manifest(manifest: &CacheManifest) -> Option<Self> {
Self::try_from_manifest(manifest).ok().flatten()
}
pub fn try_from_manifest(manifest: &CacheManifest) -> Result<Option<Self>> {
let Some(daemon_process) = DaemonProcess::from_manifest_current_daemon(manifest)? else {
return Ok(None);
};
let handle = Self::probe_with_service(
manifest.service_name.clone(),
manifest.service_version.clone(),
&daemon_process.ipc_endpoint,
&daemon_process,
)?;
Ok(Some(handle))
}
pub fn is_alive(&self) -> bool {
self.platform_handle()
.map(|handle| handle.is_alive())
.unwrap_or_else(|| verify_pid::process_is_alive(self.daemon_process.pid))
}
pub async fn connect(&self) -> Result<Connection> {
Connection::connect(&self.daemon_process.ipc_endpoint).map_err(BackendHandleError::Connect)
}
#[cfg(windows)]
pub fn try_duplicate_windows_handoff_handle(
&self,
pipe_handle: crate::broker::server::handoff::WindowsHandleValue,
handoff_token: crate::broker::server::handoff::HandoffToken,
) -> crate::broker::server::handoff::DuplicateHandleResult {
let attempt = crate::broker::server::handoff::DuplicateHandleAttempt::new(
pipe_handle,
self.daemon_process.pid,
handoff_token,
);
crate::broker::server::handoff::try_duplicate_handle(&attempt)
}
pub async fn shutdown(self, timeout: Duration) -> Result<()> {
verify_pid::signal_terminate(self.daemon_process.pid)?;
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if !self.is_alive() {
return Ok(());
}
std::thread::sleep(Duration::from_millis(20));
}
Err(BackendHandleError::ShutdownTimeout {
pid: self.daemon_process.pid,
})
}
pub fn force_kill(self) -> Result<()> {
verify_pid::force_kill_pid(self.daemon_process.pid)?;
Ok(())
}
fn from_verified(
service_name: String,
service_version: String,
daemon_process: DaemonProcess,
process_handle: ProcessHandle,
) -> Self {
#[cfg(unix)]
{
Self {
service_name,
service_version,
daemon_process,
pid_handle: Some(process_handle),
}
}
#[cfg(windows)]
{
Self {
service_name,
service_version,
daemon_process,
process_handle: Some(process_handle),
}
}
}
fn platform_handle(&self) -> Option<&ProcessHandle> {
#[cfg(unix)]
{
self.pid_handle.as_ref()
}
#[cfg(windows)]
{
self.process_handle.as_ref()
}
}
}
pub struct Connection {
stream: interprocess::local_socket::Stream,
}
impl Connection {
pub fn connect(endpoint: &Endpoint) -> io::Result<Self> {
if endpoint.path.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"backend endpoint path is empty",
));
}
let name = endpoint_name(&endpoint.path)?;
use interprocess::local_socket::traits::Stream as _;
let stream = interprocess::local_socket::Stream::connect(name)?;
Ok(Self { stream })
}
pub fn into_inner(self) -> interprocess::local_socket::Stream {
self.stream
}
}
#[derive(Debug, thiserror::Error)]
pub enum BackendHandleError {
#[error(transparent)]
Identity(#[from] IdentityError),
#[error(transparent)]
Probe(#[from] ProbeError),
#[error("backend IPC connection failed: {0}")]
Connect(io::Error),
#[error(transparent)]
VerifyPid(#[from] VerifyPidError),
#[error("backend shutdown timed out for pid {pid}")]
ShutdownTimeout {
pid: u32,
},
}
fn endpoint_name(path: &str) -> io::Result<interprocess::local_socket::Name<'_>> {
use interprocess::local_socket::prelude::*;
#[cfg(unix)]
{
use interprocess::local_socket::GenericFilePath;
path.to_fs_name::<GenericFilePath>()
}
#[cfg(windows)]
{
use interprocess::local_socket::GenericNamespaced;
path.to_ns_name::<GenericNamespaced>()
}
}