use std::fs::{File, OpenOptions};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::{Duration, Instant};
use anyhow::{anyhow, Context, Result};
use chrono::Local;
use fs2::FileExt;
use crate::handshake::{verify_compatible, HandshakeError};
const DEFAULT_SPAWN_TIMEOUT_SECS: u64 = 10;
const SPAWN_LOCK_TIMEOUT: Duration = Duration::from_secs(5);
pub struct SpawnLock {
file: File,
}
impl SpawnLock {
pub fn acquire(socket: &str) -> Result<Self> {
let path = lock_path(socket);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create spawn lock dir {}", parent.display()))?;
}
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&path)
.with_context(|| format!("open spawn lock {}", path.display()))?;
let deadline = Instant::now() + SPAWN_LOCK_TIMEOUT;
loop {
match file.try_lock_exclusive() {
Ok(()) => return Ok(Self { file }),
Err(err) if Instant::now() < deadline => {
tracing::debug!(
path = %path.display(),
error = %err,
"spawn lock busy; retrying"
);
std::thread::sleep(Duration::from_millis(50));
}
Err(err) => {
return Err(err).with_context(|| {
format!(
"timed out after {}s waiting for spawn lock {}",
SPAWN_LOCK_TIMEOUT.as_secs(),
path.display()
)
});
}
}
}
}
pub fn try_acquire(socket: &str) -> Result<Self> {
let path = lock_path(socket);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create spawn lock dir {}", parent.display()))?;
}
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&path)
.with_context(|| format!("open spawn lock {}", path.display()))?;
file.try_lock_exclusive()
.with_context(|| format!("try-lock spawn lock {}", path.display()))?;
Ok(Self { file })
}
}
impl Drop for SpawnLock {
fn drop(&mut self) {
let _ = FileExt::unlock(&self.file);
}
}
pub fn lock_path(socket: &str) -> PathBuf {
PathBuf::from(format!("{socket}.spawn.lock"))
}
pub fn log_path() -> PathBuf {
let root = std::env::var("HOME")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("."));
root.join(".tail-fin")
.join("logs")
.join(format!("daemon-{}.log", Local::now().format("%Y%m%d")))
}
pub fn open_log_file() -> Result<File> {
let path = log_path();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create daemon log dir {}", parent.display()))?;
}
OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.with_context(|| format!("open daemon log {}", path.display()))
}
pub async fn ensure_daemon_safe(socket: &str) -> Result<()> {
ensure_daemon_safe_with_options(socket, 300, 900, 3).await
}
pub async fn ensure_daemon_safe_with_options(
socket: &str,
idle_timeout: u64,
daemon_idle: u64,
max_sessions: usize,
) -> Result<()> {
ensure_daemon_safe_inner(socket, idle_timeout, daemon_idle, max_sessions, true).await
}
pub async fn start_detached_and_wait(
socket: &str,
idle_timeout: u64,
daemon_idle: u64,
max_sessions: usize,
) -> Result<()> {
ensure_daemon_safe_inner(socket, idle_timeout, daemon_idle, max_sessions, false).await
}
async fn ensure_daemon_safe_inner(
socket: &str,
idle_timeout: u64,
daemon_idle: u64,
max_sessions: usize,
respect_no_auto_spawn: bool,
) -> Result<()> {
match verify_compatible(socket).await {
Ok(_) => return Ok(()),
Err(first_err) => {
if is_version_mismatch(&first_err) {
return Err(first_err);
}
}
}
if respect_no_auto_spawn && std::env::var_os("TFD_NO_AUTO_SPAWN").is_some() {
return Err(anyhow!(
"daemon not running at {socket} and TFD_NO_AUTO_SPAWN is set; run `tfd daemon start --detach` to start it"
));
}
let _lock = SpawnLock::acquire(socket)?;
match verify_compatible(socket).await {
Ok(_) => return Ok(()),
Err(err) if is_version_mismatch(&err) => return Err(err),
Err(_) => {}
}
start_detached_with_options(socket, idle_timeout, daemon_idle, max_sessions)?;
wait_for_compatible(socket).await
}
async fn wait_for_compatible(socket: &str) -> Result<()> {
let timeout = spawn_timeout();
let deadline = Instant::now() + timeout;
let mut last_err: Option<anyhow::Error> = None;
while Instant::now() < deadline {
match verify_compatible(socket).await {
Ok(_) => return Ok(()),
Err(err) if is_version_mismatch(&err) => return Err(err),
Err(err) => last_err = Some(err),
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(anyhow!(
"timed out after {}s waiting for daemon at {socket}{}",
timeout.as_secs(),
last_err.map(|e| format!(": {e}")).unwrap_or_default()
))
}
fn start_detached_with_options(
socket: &str,
idle_timeout: u64,
daemon_idle: u64,
max_sessions: usize,
) -> Result<()> {
if let Some(parent) = Path::new(socket).parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create socket dir {}", parent.display()))?;
}
let log = open_log_file()?;
let log_err = log.try_clone().context("clone daemon log handle")?;
let exe = std::env::current_exe().context("resolve current executable")?;
Command::new(exe)
.arg("--socket")
.arg(socket)
.arg("start")
.arg("--detach")
.arg("--idle-timeout")
.arg(idle_timeout.to_string())
.arg("--daemon-idle")
.arg(daemon_idle.to_string())
.arg("--max-sessions")
.arg(max_sessions.to_string())
.stdin(Stdio::null())
.stdout(Stdio::from(log))
.stderr(Stdio::from(log_err))
.spawn()
.context("spawn tfd daemon")?;
Ok(())
}
fn spawn_timeout() -> Duration {
let secs = std::env::var("TFD_SPAWN_TIMEOUT_SECS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.filter(|s| *s > 0)
.unwrap_or(DEFAULT_SPAWN_TIMEOUT_SECS);
Duration::from_secs(secs)
}
fn is_version_mismatch(err: &anyhow::Error) -> bool {
matches!(
err.downcast_ref::<HandshakeError>(),
Some(HandshakeError::VersionMismatch { .. })
)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Mutex, OnceLock};
fn env_lock() -> std::sync::MutexGuard<'static, ()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(())).lock().unwrap()
}
#[test]
fn spawn_lock_excludes_second_holder() {
let dir = tempfile::tempdir().unwrap();
let socket = dir.path().join("daemon.sock");
let socket = socket.to_str().unwrap();
let _lock = SpawnLock::try_acquire(socket).unwrap();
assert!(SpawnLock::try_acquire(socket).is_err());
}
#[test]
fn spawn_lock_releases_on_drop() {
let dir = tempfile::tempdir().unwrap();
let socket = dir.path().join("daemon.sock");
let socket = socket.to_str().unwrap();
drop(SpawnLock::try_acquire(socket).unwrap());
assert!(SpawnLock::try_acquire(socket).is_ok());
}
#[test]
fn log_path_uses_tail_fin_logs_and_date() {
let _guard = env_lock();
let old_home = std::env::var_os("HOME");
std::env::set_var("HOME", "/tmp/tfd-home-test");
let path = log_path();
restore_env("HOME", old_home);
assert!(path.starts_with("/tmp/tfd-home-test/.tail-fin/logs"));
assert!(path
.file_name()
.unwrap()
.to_string_lossy()
.starts_with("daemon-"));
}
#[test]
fn open_log_file_creates_parent_and_appends() {
let _guard = env_lock();
let old_home = std::env::var_os("HOME");
let dir = tempfile::tempdir().unwrap();
std::env::set_var("HOME", dir.path());
let file = open_log_file().unwrap();
drop(file);
assert!(log_path().exists());
restore_env("HOME", old_home);
}
#[test]
fn spawn_timeout_defaults_and_env_override() {
let _guard = env_lock();
let old_timeout = std::env::var_os("TFD_SPAWN_TIMEOUT_SECS");
std::env::remove_var("TFD_SPAWN_TIMEOUT_SECS");
assert_eq!(spawn_timeout(), Duration::from_secs(10));
std::env::set_var("TFD_SPAWN_TIMEOUT_SECS", "30");
assert_eq!(spawn_timeout(), Duration::from_secs(30));
restore_env("TFD_SPAWN_TIMEOUT_SECS", old_timeout);
}
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn no_auto_spawn_env_returns_deterministic_error() {
let _guard = env_lock();
let old_no_spawn = std::env::var_os("TFD_NO_AUTO_SPAWN");
let dir = tempfile::tempdir().unwrap();
let socket = dir.path().join("missing.sock");
std::env::set_var("TFD_NO_AUTO_SPAWN", "1");
let err = ensure_daemon_safe(socket.to_str().unwrap())
.await
.unwrap_err();
restore_env("TFD_NO_AUTO_SPAWN", old_no_spawn);
assert!(err.to_string().contains("TFD_NO_AUTO_SPAWN is set"));
}
#[test]
fn version_mismatch_detection_uses_typed_error() {
let mismatch: anyhow::Error = HandshakeError::VersionMismatch {
daemon: "0.0.1".to_string(),
cli: "0.0.2".to_string(),
}
.into();
let protocol: anyhow::Error = HandshakeError::Protocol("bad payload".to_string()).into();
assert!(is_version_mismatch(&mismatch));
assert!(!is_version_mismatch(&protocol));
}
fn restore_env(key: &str, value: Option<std::ffi::OsString>) {
if let Some(value) = value {
std::env::set_var(key, value);
} else {
std::env::remove_var(key);
}
}
}