use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use night_fury_daemon_core::protocol::{Request, Response};
use night_fury_daemon_core::{SessionDriver, SessionRegistry};
use serde_json::json;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener;
use tokio::task::JoinSet;
use crate::driver::TailFinSession;
use crate::pool::{AcquireOutcome, PoolKey, PoolManager, PoolState};
pub async fn serve(
socket_path: &str,
idle_timeout_secs: u64,
daemon_idle_secs: u64,
max_per_pool: usize,
) -> Result<()> {
serve_with_driver::<TailFinSession>(
socket_path,
idle_timeout_secs,
daemon_idle_secs,
max_per_pool,
)
.await
}
pub async fn serve_with_driver<D: SessionDriver>(
socket_path: &str,
idle_timeout_secs: u64,
daemon_idle_secs: u64,
max_per_pool: usize,
) -> Result<()> {
serve_with_options::<D>(
socket_path,
idle_timeout_secs,
daemon_idle_secs,
max_per_pool,
Duration::from_secs(60),
Duration::from_secs(idle_timeout_secs.max(30) / 3),
)
.await
}
#[doc(hidden)]
pub async fn serve_with_options<D: SessionDriver>(
socket_path: &str,
idle_timeout_secs: u64,
daemon_idle_secs: u64,
max_per_pool: usize,
reap_interval: Duration,
keepalive_period: Duration,
) -> Result<()> {
crate::handshake::mark_daemon_started();
if reap_interval.is_zero() {
anyhow::bail!("reap_interval must be > 0");
}
if keepalive_period.is_zero() {
anyhow::bail!("keepalive_period must be > 0");
}
let _ = std::fs::remove_file(socket_path);
let listener = UnixListener::bind(socket_path)?;
let registry = Arc::new(SessionRegistry::<D>::new(idle_timeout_secs));
let pool = PoolManager::new(max_per_pool);
let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
SessionRegistry::<D>::spawn_reaper(
registry.clone(),
shutdown_tx.clone(),
daemon_idle_secs,
reap_interval,
);
spawn_keepalive::<D>(
registry.clone(),
pool.clone(),
shutdown_rx.clone(),
keepalive_period,
);
tracing::info!(socket_path, "tfd listening");
let mut tasks: JoinSet<()> = JoinSet::new();
loop {
tokio::select! {
accept = listener.accept() => {
match accept {
Ok((stream, _)) => {
let registry = registry.clone();
let pool = pool.clone();
let shutdown_tx = shutdown_tx.clone();
tasks.spawn(async move {
handle_connection::<D>(stream, registry, pool, shutdown_tx).await;
});
}
Err(e) => tracing::warn!(error = %e, "accept error"),
}
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
tracing::info!("shutting down");
break;
}
}
}
}
let drain = async { while tasks.join_next().await.is_some() {} };
let _ = tokio::time::timeout(Duration::from_secs(5), drain).await;
let _ = std::fs::remove_file(socket_path);
Ok(())
}
async fn handle_connection<D: SessionDriver>(
stream: tokio::net::UnixStream,
registry: Arc<SessionRegistry<D>>,
pool: PoolManager,
shutdown_tx: tokio::sync::watch::Sender<bool>,
) {
let (reader, mut writer) = stream.into_split();
let mut lines = BufReader::new(reader).lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => {
let req: Request = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
let resp = Response::err("?", format!("parse error: {e}"));
let _ = write_response(&mut writer, &resp).await;
continue;
}
};
let (resp, should_shutdown) =
handle_request::<D>(req, registry.clone(), pool.clone()).await;
if write_response(&mut writer, &resp).await.is_err() {
break;
}
if should_shutdown {
let _ = shutdown_tx.send(true);
break;
}
}
Ok(None) => break,
Err(e) => {
tracing::warn!(error = %e, "connection read error");
break;
}
}
}
}
async fn handle_request<D: SessionDriver>(
req: Request,
registry: Arc<SessionRegistry<D>>,
pool: PoolManager,
) -> (Response, bool) {
let id = req.id.clone();
match req.cmd.as_str() {
"daemon.handshake" => (crate::handshake::handshake_response(&id), false),
"session.acquire" => (acquire::<D>(&id, req.params, registry, pool).await, false),
"session.release" => {
let Some(sid) = req.session_id.as_deref() else {
return (Response::err(&id, "session_id required"), false);
};
if pool.mark(sid, PoolState::Idle) {
(Response::ok(&id, json!({"released": sid})), false)
} else {
(
Response::err(&id, format!("session not found: {sid}")),
false,
)
}
}
"session.list" => (
Response::ok(&id, json!({"sessions": snapshot(&pool)})),
false,
),
"session.destroy" => {
let Some(sid) = req.session_id.as_deref() else {
return (Response::err(&id, "session_id required"), false);
};
let removed_pool = pool.remove(sid).is_some();
let removed_reg = registry.destroy(sid);
if removed_pool || removed_reg {
(Response::ok(&id, json!({"destroyed": sid})), false)
} else {
(
Response::err(&id, format!("session not found: {sid}")),
false,
)
}
}
"daemon.status" => (
Response::ok(&id, json!({"ok": true, "sessions": registry.count()})),
false,
),
"daemon.shutdown" => (Response::ok(&id, json!({"message": "shutting down"})), true),
cmd => {
let Some(sid) = req.session_id.as_deref() else {
return (Response::err(&id, "session_id required"), false);
};
let session = match registry.with(sid, |h| h.session()) {
Ok(s) => s,
Err(e) => return (Response::err(&id, e.to_string()), false),
};
let (resp, shutdown) = session.handle(&id, cmd, &req.params).await;
if !resp.ok && should_evict_on_business_error(cmd, resp.error.as_deref()) {
let _ = pool.remove(sid);
let _ = registry.destroy(sid);
}
(resp, shutdown)
}
}
}
async fn acquire<D: SessionDriver>(
id: &str,
params: serde_json::Value,
registry: Arc<SessionRegistry<D>>,
pool: PoolManager,
) -> Response {
let site_str = match params.get("site").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => return Response::err(id, "'site' required"),
};
let host = params
.get("host")
.and_then(|v| v.as_str())
.unwrap_or(crate::cli::DEFAULT_CDP_HOST)
.to_string();
let mode: PoolState = match params
.get("mode")
.and_then(|v| v.as_str())
.unwrap_or("acquired")
{
"in_use" => PoolState::InUse,
"acquired" => PoolState::Acquired,
other => return Response::err(id, format!("invalid mode '{other}' (in_use|acquired)")),
};
let key = PoolKey::new(site_str.clone(), host.clone());
let deadline = std::time::Instant::now() + Duration::from_secs(30);
loop {
match pool.acquire_atomic(&key, mode) {
AcquireOutcome::Existing(sid) => {
return Response::ok(id, json!({"session_id": sid, "reused": true}));
}
AcquireOutcome::CreateNew => {
let launch_params = json!({"site": site_str, "host": host});
return match registry.create(launch_params).await {
Ok(sid) => {
pool.register(key, sid.clone(), mode);
Response::ok(id, json!({"session_id": sid, "reused": false}))
}
Err(e) => Response::err(id, format!("launch failed: {e}")),
};
}
AcquireOutcome::Busy => {
if std::time::Instant::now() >= deadline {
return Response::err(
id,
"pool at capacity; timed out after 30s waiting for an idle session",
);
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
}
}
}
fn should_evict_on_business_error(cmd: &str, error: Option<&str>) -> bool {
if cmd == "grok.ask"
&& error
.unwrap_or_default()
.contains("x-statsig-id harvest timed out")
{
return false;
}
true
}
fn snapshot(pool: &PoolManager) -> Vec<serde_json::Value> {
pool.snapshot()
.into_iter()
.map(|(sid, key, state, idle)| {
let state_str = match state {
PoolState::Idle => "idle",
PoolState::Acquired => "acquired",
PoolState::InUse => "in_use",
};
json!({
"session_id": sid,
"site": key.site,
"host": key.host,
"state": state_str,
"idle_secs": idle,
})
})
.collect()
}
fn spawn_keepalive<D: SessionDriver>(
registry: Arc<SessionRegistry<D>>,
pool: PoolManager,
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
period: Duration,
) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(period);
ticker.tick().await;
loop {
tokio::select! {
_ = ticker.tick() => {
touch_acquired::<D>(®istry, &pool);
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
break;
}
}
}
}
});
}
fn touch_acquired<D: SessionDriver>(registry: &SessionRegistry<D>, pool: &PoolManager) {
for (sid, _key, state, _idle) in pool.snapshot() {
if state == PoolState::Acquired {
let _ = registry.with(&sid, |_| ());
}
}
}
async fn write_response(
writer: &mut tokio::net::unix::OwnedWriteHalf,
resp: &Response,
) -> Result<()> {
let mut line = serde_json::to_string(resp)?;
line.push('\n');
writer.write_all(line.as_bytes()).await?;
writer.flush().await?;
Ok(())
}