tail-fin-daemon 0.6.4

Long-running browser-session daemon for tail-fin (tfd binary). Keeps Chrome tabs warm across invocations via a Unix-socket protocol; registers Site implementations through a runtime Arc<dyn Site> registry.
Documentation
//! Pool-aware daemon server. Intercepts `session.acquire` / `session.release`
//! before falling back to stock registry routing.
//!
//! Wire protocol is identical to night-fury-daemon-core: newline-delimited JSON
//! using `Request` / `Response`.

use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use night_fury_daemon_core::protocol::{Request, Response};
use night_fury_daemon_core::{SessionDriver, SessionRegistry};
use serde_json::json;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener;
use tokio::task::JoinSet;

use crate::driver::TailFinSession;
use crate::pool::{AcquireOutcome, PoolKey, PoolManager, PoolState};

pub async fn serve(
    socket_path: &str,
    idle_timeout_secs: u64,
    daemon_idle_secs: u64,
    max_per_pool: usize,
) -> Result<()> {
    serve_with_driver::<TailFinSession>(
        socket_path,
        idle_timeout_secs,
        daemon_idle_secs,
        max_per_pool,
    )
    .await
}

/// Generic server entry. `serve()` specializes to `TailFinSession`.
///
/// Uses the production defaults for reaper interval (60s) and keepalive
/// period (idle_timeout / 3, floor 30s). For tests that need faster
/// timings, use `serve_with_options` via the doc(hidden) escape hatch.
pub async fn serve_with_driver<D: SessionDriver>(
    socket_path: &str,
    idle_timeout_secs: u64,
    daemon_idle_secs: u64,
    max_per_pool: usize,
) -> Result<()> {
    serve_with_options::<D>(
        socket_path,
        idle_timeout_secs,
        daemon_idle_secs,
        max_per_pool,
        Duration::from_secs(60),
        Duration::from_secs(idle_timeout_secs.max(30) / 3),
    )
    .await
}

/// Fully-parameterized server entry. Not public API — the doc(hidden) marker
/// signals this, and production callers must use `serve` or `serve_with_driver`
/// which apply sensible defaults. Test code uses this to bypass the production
/// 30s keepalive floor and 60s reaper interval.
#[doc(hidden)]
pub async fn serve_with_options<D: SessionDriver>(
    socket_path: &str,
    idle_timeout_secs: u64,
    daemon_idle_secs: u64,
    max_per_pool: usize,
    reap_interval: Duration,
    keepalive_period: Duration,
) -> Result<()> {
    if reap_interval.is_zero() {
        anyhow::bail!("reap_interval must be > 0");
    }
    if keepalive_period.is_zero() {
        anyhow::bail!("keepalive_period must be > 0");
    }

    let _ = std::fs::remove_file(socket_path);
    let listener = UnixListener::bind(socket_path)?;
    let registry = Arc::new(SessionRegistry::<D>::new(idle_timeout_secs));
    let pool = PoolManager::new(max_per_pool);
    let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);

    SessionRegistry::<D>::spawn_reaper(
        registry.clone(),
        shutdown_tx.clone(),
        daemon_idle_secs,
        reap_interval,
    );

    spawn_keepalive::<D>(
        registry.clone(),
        pool.clone(),
        shutdown_rx.clone(),
        keepalive_period,
    );

    tracing::info!(socket_path, "tfd listening");

    let mut tasks: JoinSet<()> = JoinSet::new();

    loop {
        tokio::select! {
            accept = listener.accept() => {
                match accept {
                    Ok((stream, _)) => {
                        let registry = registry.clone();
                        let pool = pool.clone();
                        let shutdown_tx = shutdown_tx.clone();
                        tasks.spawn(async move {
                            handle_connection::<D>(stream, registry, pool, shutdown_tx).await;
                        });
                    }
                    Err(e) => tracing::warn!(error = %e, "accept error"),
                }
            }
            _ = shutdown_rx.changed() => {
                if *shutdown_rx.borrow() {
                    tracing::info!("shutting down");
                    break;
                }
            }
        }
    }

    let drain = async { while tasks.join_next().await.is_some() {} };
    let _ = tokio::time::timeout(Duration::from_secs(5), drain).await;

    let _ = std::fs::remove_file(socket_path);
    Ok(())
}

async fn handle_connection<D: SessionDriver>(
    stream: tokio::net::UnixStream,
    registry: Arc<SessionRegistry<D>>,
    pool: PoolManager,
    shutdown_tx: tokio::sync::watch::Sender<bool>,
) {
    let (reader, mut writer) = stream.into_split();
    let mut lines = BufReader::new(reader).lines();

    loop {
        match lines.next_line().await {
            Ok(Some(line)) => {
                let req: Request = match serde_json::from_str(&line) {
                    Ok(r) => r,
                    Err(e) => {
                        let resp = Response::err("?", format!("parse error: {e}"));
                        let _ = write_response(&mut writer, &resp).await;
                        continue;
                    }
                };
                let (resp, should_shutdown) =
                    handle_request::<D>(req, registry.clone(), pool.clone()).await;
                if write_response(&mut writer, &resp).await.is_err() {
                    break;
                }
                if should_shutdown {
                    let _ = shutdown_tx.send(true);
                    break;
                }
            }
            Ok(None) => break,
            Err(e) => {
                tracing::warn!(error = %e, "connection read error");
                break;
            }
        }
    }
}

async fn handle_request<D: SessionDriver>(
    req: Request,
    registry: Arc<SessionRegistry<D>>,
    pool: PoolManager,
) -> (Response, bool) {
    let id = req.id.clone();

    match req.cmd.as_str() {
        "session.acquire" => (acquire::<D>(&id, req.params, registry, pool).await, false),
        "session.release" => {
            let Some(sid) = req.session_id.as_deref() else {
                return (Response::err(&id, "session_id required"), false);
            };
            if pool.mark(sid, PoolState::Idle) {
                (Response::ok(&id, json!({"released": sid})), false)
            } else {
                (
                    Response::err(&id, format!("session not found: {sid}")),
                    false,
                )
            }
        }
        "session.list" => (
            Response::ok(&id, json!({"sessions": snapshot(&pool)})),
            false,
        ),
        "session.destroy" => {
            let Some(sid) = req.session_id.as_deref() else {
                return (Response::err(&id, "session_id required"), false);
            };
            let removed_pool = pool.remove(sid).is_some();
            let removed_reg = registry.destroy(sid);
            if removed_pool || removed_reg {
                (Response::ok(&id, json!({"destroyed": sid})), false)
            } else {
                (
                    Response::err(&id, format!("session not found: {sid}")),
                    false,
                )
            }
        }
        "daemon.status" => (
            Response::ok(&id, json!({"ok": true, "sessions": registry.count()})),
            false,
        ),
        "daemon.shutdown" => (Response::ok(&id, json!({"message": "shutting down"})), true),

        cmd => {
            let Some(sid) = req.session_id.as_deref() else {
                return (Response::err(&id, "session_id required"), false);
            };
            let session = match registry.with(sid, |h| h.session()) {
                Ok(s) => s,
                Err(e) => return (Response::err(&id, e.to_string()), false),
            };
            let (resp, shutdown) = session.handle(&id, cmd, &req.params).await;
            if !resp.ok && should_evict_on_business_error(cmd, resp.error.as_deref()) {
                let _ = pool.remove(sid);
                let _ = registry.destroy(sid);
            }
            (resp, shutdown)
        }
    }
}

async fn acquire<D: SessionDriver>(
    id: &str,
    params: serde_json::Value,
    registry: Arc<SessionRegistry<D>>,
    pool: PoolManager,
) -> Response {
    let site_str = match params.get("site").and_then(|v| v.as_str()) {
        Some(s) => s.to_string(),
        None => return Response::err(id, "'site' required"),
    };
    let host = params
        .get("host")
        .and_then(|v| v.as_str())
        .unwrap_or(crate::cli::DEFAULT_CDP_HOST)
        .to_string();

    // Site validation happens inside D::launch (e.g., TailFinSession::launch
    // rejects unknown sites). Don't duplicate that check here.

    let mode: PoolState = match params
        .get("mode")
        .and_then(|v| v.as_str())
        .unwrap_or("acquired")
    {
        "in_use" => PoolState::InUse,
        "acquired" => PoolState::Acquired,
        other => return Response::err(id, format!("invalid mode '{other}' (in_use|acquired)")),
    };

    let key = PoolKey::new(site_str.clone(), host.clone());

    let deadline = std::time::Instant::now() + Duration::from_secs(30);
    loop {
        match pool.acquire_atomic(&key, mode) {
            AcquireOutcome::Existing(sid) => {
                return Response::ok(id, json!({"session_id": sid, "reused": true}));
            }
            AcquireOutcome::CreateNew => {
                let launch_params = json!({"site": site_str, "host": host});
                return match registry.create(launch_params).await {
                    Ok(sid) => {
                        pool.register(key, sid.clone(), mode);
                        Response::ok(id, json!({"session_id": sid, "reused": false}))
                    }
                    Err(e) => Response::err(id, format!("launch failed: {e}")),
                };
            }
            AcquireOutcome::Busy => {
                if std::time::Instant::now() >= deadline {
                    return Response::err(
                        id,
                        "pool at capacity; timed out after 30s waiting for an idle session",
                    );
                }
                tokio::time::sleep(Duration::from_millis(250)).await;
            }
        }
    }
}

fn should_evict_on_business_error(cmd: &str, error: Option<&str>) -> bool {
    // Grok statsig harvest timeout is usually transient/environmental
    // (wrong tab attached, momentary event miss, Cloudflare hiccup).
    // Keeping the session alive lets callers retry in-place and still benefit
    // from any warmed state when the next request succeeds.
    if cmd == "grok.ask"
        && error
            .unwrap_or_default()
            .contains("x-statsig-id harvest timed out")
    {
        return false;
    }
    true
}

fn snapshot(pool: &PoolManager) -> Vec<serde_json::Value> {
    pool.snapshot()
        .into_iter()
        .map(|(sid, key, state, idle)| {
            let state_str = match state {
                PoolState::Idle => "idle",
                PoolState::Acquired => "acquired",
                PoolState::InUse => "in_use",
            };
            json!({
                "session_id": sid,
                "site": key.site,
                "host": key.host,
                "state": state_str,
                "idle_secs": idle,
            })
        })
        .collect()
}

fn spawn_keepalive<D: SessionDriver>(
    registry: Arc<SessionRegistry<D>>,
    pool: PoolManager,
    mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
    period: Duration,
) {
    tokio::spawn(async move {
        let mut ticker = tokio::time::interval(period);
        ticker.tick().await;
        loop {
            tokio::select! {
                _ = ticker.tick() => {
                    touch_acquired::<D>(&registry, &pool);
                }
                _ = shutdown_rx.changed() => {
                    if *shutdown_rx.borrow() {
                        break;
                    }
                }
            }
        }
    });
}

fn touch_acquired<D: SessionDriver>(registry: &SessionRegistry<D>, pool: &PoolManager) {
    for (sid, _key, state, _idle) in pool.snapshot() {
        if state == PoolState::Acquired {
            let _ = registry.with(&sid, |_| ());
        }
    }
}

async fn write_response(
    writer: &mut tokio::net::unix::OwnedWriteHalf,
    resp: &Response,
) -> Result<()> {
    let mut line = serde_json::to_string(resp)?;
    line.push('\n');
    writer.write_all(line.as_bytes()).await?;
    writer.flush().await?;
    Ok(())
}