use std::{
net::{TcpListener, TcpStream},
path::Path,
sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
},
time::{Duration, Instant},
};
use anyhow::{Context, Result};
use objects::error::HeddleError;
use repo::daemon::{
EndpointState, HELPER_HOST, MOUNT_PROTOCOL_VERSION, MountDaemonRequest, MountDaemonResponse,
MountStatus, mount_daemon_endpoint_path, mount_idle_policy, persist_endpoint, remove_endpoint,
server::{DaemonHandler, IdleDecision, handle_json_connection},
};
use tracing::info;
use super::registry::{MountOutcome, MountRegistry};
pub fn run_mount_daemon(repo_root: &Path) -> Result<()> {
let endpoint_path = mount_daemon_endpoint_path(repo_root);
if let Some(parent) = endpoint_path.parent() {
std::fs::create_dir_all(parent)?;
}
let listener = TcpListener::bind((HELPER_HOST, 0))?;
listener.set_nonblocking(true)?;
let port = listener.local_addr()?.port();
persist_endpoint(
&endpoint_path,
&EndpointState {
version: MOUNT_PROTOCOL_VERSION,
host: HELPER_HOST.to_string(),
port,
pid: Some(std::process::id()),
},
)
.context("persist daemon endpoint")?;
info!(port, pid = std::process::id(), "heddle daemon serving");
let registry = Arc::new(Mutex::new(MountRegistry::new(repo_root.to_path_buf())));
let started = Instant::now();
let shutdown_requested = Arc::new(AtomicBool::new(false));
let mut handler = MountDaemonHandler {
registry: Arc::clone(®istry),
started,
shutdown_requested: Arc::clone(&shutdown_requested),
};
let result = repo::daemon::run_server_loop(&listener, &mut handler);
{
let mut guard = registry.lock().expect("mount registry lock");
guard.shutdown_all();
}
remove_endpoint(&endpoint_path);
info!("heddle daemon exiting");
result.map_err(Into::into)
}
struct MountDaemonHandler {
registry: Arc<Mutex<MountRegistry>>,
started: Instant,
shutdown_requested: Arc<AtomicBool>,
}
impl DaemonHandler for MountDaemonHandler {
fn handle(&mut self, stream: TcpStream) -> Result<(), HeddleError> {
let registry = Arc::clone(&self.registry);
let started = self.started;
let shutdown_requested = Arc::clone(&self.shutdown_requested);
handle_json_connection(stream, move |request: MountDaemonRequest| {
dispatch(®istry, started, &shutdown_requested, request)
})
}
fn on_tick(&mut self, idle_for: Duration) -> IdleDecision {
let shutdown = self.shutdown_requested.load(Ordering::Acquire);
let live_count = self.registry.lock().expect("mount registry lock").len();
mount_idle_policy(shutdown, live_count, idle_for)
}
}
fn dispatch(
registry: &Mutex<MountRegistry>,
started: Instant,
shutdown_requested: &AtomicBool,
request: MountDaemonRequest,
) -> MountDaemonResponse {
match request {
MountDaemonRequest::Mount {
thread_id,
mount_path,
repo_root: _,
} => {
let mut guard = registry.lock().expect("mount registry lock");
match guard.mount(&thread_id, &mount_path) {
Ok(MountOutcome::Created) => MountDaemonResponse::Mount {
version: MOUNT_PROTOCOL_VERSION,
ok: true,
mount_path,
status: MountStatus::Created,
},
Ok(MountOutcome::Existing) => MountDaemonResponse::Mount {
version: MOUNT_PROTOCOL_VERSION,
ok: true,
mount_path,
status: MountStatus::AlreadyMounted,
},
Err(error) => MountDaemonResponse::Error {
version: MOUNT_PROTOCOL_VERSION,
code: repo::daemon::ERR_MOUNT_CONFLICT.to_string(),
message: error.to_string(),
},
}
}
MountDaemonRequest::Unmount { thread_id } => {
let mut guard = registry.lock().expect("mount registry lock");
match guard.unmount(&thread_id) {
Ok(was_mounted) => MountDaemonResponse::Unmount {
version: MOUNT_PROTOCOL_VERSION,
ok: true,
was_mounted,
},
Err(error) => MountDaemonResponse::Error {
version: MOUNT_PROTOCOL_VERSION,
code: "unmount_failed".to_string(),
message: error.to_string(),
},
}
}
MountDaemonRequest::ListMounts {} => {
let guard = registry.lock().expect("mount registry lock");
MountDaemonResponse::ListMounts {
version: MOUNT_PROTOCOL_VERSION,
mounts: guard.snapshot(),
}
}
MountDaemonRequest::Health {} => {
let guard = registry.lock().expect("mount registry lock");
MountDaemonResponse::Health {
version: MOUNT_PROTOCOL_VERSION,
ok: true,
uptime_s: started.elapsed().as_secs(),
mount_count: guard.len(),
}
}
MountDaemonRequest::Shutdown {} => {
shutdown_requested.store(true, Ordering::Release);
MountDaemonResponse::Shutdown {
version: MOUNT_PROTOCOL_VERSION,
ok: true,
}
}
MountDaemonRequest::Unknown => MountDaemonResponse::Error {
version: MOUNT_PROTOCOL_VERSION,
code: "unknown_command".to_string(),
message: "daemon received an unrecognized command (likely client/server skew)"
.to_string(),
},
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use repo::daemon::HELPER_IDLE_TIMEOUT_SECS;
use tempfile::TempDir;
use super::*;
#[test]
fn idle_exit_blocked_while_mount_is_live() {
let tmp = TempDir::new().unwrap();
let registry = Arc::new(Mutex::new(MountRegistry::new(tmp.path().to_path_buf())));
registry
.lock()
.unwrap()
.__test_inject_phantom_mount("phantom", tmp.path().to_path_buf());
let mut handler = MountDaemonHandler {
registry: Arc::clone(®istry),
started: Instant::now(),
shutdown_requested: Arc::new(AtomicBool::new(false)),
};
let decision = handler.on_tick(Duration::from_secs(HELPER_IDLE_TIMEOUT_SECS * 10));
assert_eq!(decision, IdleDecision::Continue);
}
#[test]
fn idle_exit_when_registry_empty() {
let tmp = TempDir::new().unwrap();
let registry = Arc::new(Mutex::new(MountRegistry::new(tmp.path().to_path_buf())));
let mut handler = MountDaemonHandler {
registry: Arc::clone(®istry),
started: Instant::now(),
shutdown_requested: Arc::new(AtomicBool::new(false)),
};
let decision = handler.on_tick(Duration::from_secs(HELPER_IDLE_TIMEOUT_SECS + 1));
assert_eq!(decision, IdleDecision::Exit);
}
#[test]
fn shutdown_request_short_circuits_idle_check() {
let tmp = TempDir::new().unwrap();
let registry = Arc::new(Mutex::new(MountRegistry::new(tmp.path().to_path_buf())));
registry
.lock()
.unwrap()
.__test_inject_phantom_mount("phantom", tmp.path().to_path_buf());
let shutdown = Arc::new(AtomicBool::new(true));
let mut handler = MountDaemonHandler {
registry: Arc::clone(®istry),
started: Instant::now(),
shutdown_requested: Arc::clone(&shutdown),
};
let decision = handler.on_tick(Duration::from_millis(0));
assert_eq!(decision, IdleDecision::Exit);
}
}