use anyhow::{Result, bail};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::Mutex;
use super::pool::{FirecrackerPool, PoolConfig};
use super::protocol::{DaemonCompatibilityMode, DaemonRequest, DaemonResponse};
use crate::permissions::CompatibilityMode;
use crate::vsock::{AGENT_PORT, VsockClient, VsockConnection};
fn to_internal_mode(mode: DaemonCompatibilityMode) -> CompatibilityMode {
match mode {
DaemonCompatibilityMode::Native => CompatibilityMode::Native,
DaemonCompatibilityMode::Claude => CompatibilityMode::ClaudeCode,
DaemonCompatibilityMode::Codex => CompatibilityMode::Codex,
DaemonCompatibilityMode::Gemini => CompatibilityMode::Gemini,
}
}
type ConnectionCache = Arc<Mutex<HashMap<String, VsockConnection>>>;
pub struct DaemonServer {
pool: Arc<FirecrackerPool>,
socket_path: PathBuf,
connections: ConnectionCache,
}
impl DaemonServer {
pub fn new(config: PoolConfig, kernel_path: PathBuf, rootfs_dir: PathBuf) -> Self {
let socket_path = Self::default_socket_path();
let pool = Arc::new(FirecrackerPool::new(config, kernel_path, rootfs_dir));
let connections = Arc::new(Mutex::new(HashMap::new()));
Self {
pool,
socket_path,
connections,
}
}
pub fn default_socket_path() -> PathBuf {
if let Some(home) = std::env::var_os("HOME") {
let dir = PathBuf::from(home).join(".agentkernel");
let _ = std::fs::create_dir_all(&dir);
dir.join("daemon.sock")
} else {
PathBuf::from("/tmp/agentkernel-daemon.sock")
}
}
pub fn is_running(socket_path: &Path) -> bool {
std::os::unix::net::UnixStream::connect(socket_path).is_ok()
}
pub async fn run(&self) -> Result<()> {
if Self::is_running(&self.socket_path) {
bail!(
"Daemon is already running at {}",
self.socket_path.display()
);
}
let _ = std::fs::remove_file(&self.socket_path);
if let Some(parent) = self.socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
let listener = UnixListener::bind(&self.socket_path)?;
eprintln!("Daemon listening on {}", self.socket_path.display());
eprintln!("Warming up pool...");
self.pool.warm_up().await?;
let (warm, in_use) = self.pool.stats().await;
eprintln!("Pool ready: {} warm, {} in use", warm, in_use);
let pool_clone = Arc::clone(&self.pool);
tokio::spawn(async move {
pool_clone.run_maintenance().await;
});
loop {
match listener.accept().await {
Ok((stream, _)) => {
let pool = Arc::clone(&self.pool);
let connections = Arc::clone(&self.connections);
tokio::spawn(async move {
if let Err(e) = handle_client(stream, pool, connections).await {
eprintln!("Client error: {}", e);
}
});
}
Err(e) => {
eprintln!("Accept error: {}", e);
}
}
}
}
#[allow(dead_code)]
pub fn pool(&self) -> &Arc<FirecrackerPool> {
&self.pool
}
#[allow(dead_code)]
pub async fn shutdown(&self) {
self.pool.shutdown();
self.pool.destroy_all().await;
let _ = std::fs::remove_file(&self.socket_path);
}
}
async fn handle_client(
stream: UnixStream,
pool: Arc<FirecrackerPool>,
connections: ConnectionCache,
) -> Result<()> {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
let n = reader.read_line(&mut line).await?;
if n == 0 {
break;
}
let request: DaemonRequest = match serde_json::from_str(&line) {
Ok(req) => req,
Err(e) => {
let response = DaemonResponse::error(format!("Invalid request: {}", e));
let json = serde_json::to_string(&response)? + "\n";
writer.write_all(json.as_bytes()).await?;
continue;
}
};
let response = handle_request(request, &pool, &connections).await;
let json = serde_json::to_string(&response)? + "\n";
writer.write_all(json.as_bytes()).await?;
if matches!(response, DaemonResponse::ShuttingDown) {
break;
}
}
Ok(())
}
async fn handle_request(
request: DaemonRequest,
pool: &FirecrackerPool,
connections: &ConnectionCache,
) -> DaemonResponse {
use super::protocol::DaemonBackend;
match request {
DaemonRequest::Acquire {
runtime,
backend,
compatibility_mode,
} => {
if !matches!(backend, DaemonBackend::Firecracker) {
return DaemonResponse::error(format!(
"Backend {:?} not yet supported in daemon mode",
backend
));
}
let internal_mode = to_internal_mode(compatibility_mode);
match pool.acquire_with_mode(&runtime, internal_mode).await {
Ok(vm) => DaemonResponse::Acquired {
id: vm.id,
cid: Some(vm.cid),
vsock_path: Some(vm.vsock_path.to_string_lossy().to_string()),
backend: DaemonBackend::Firecracker,
},
Err(e) => DaemonResponse::error(format!("Failed to acquire VM: {}", e)),
}
}
DaemonRequest::Release { id } => match pool.release(&id).await {
Ok(_) => DaemonResponse::Released,
Err(e) => DaemonResponse::error(format!("Failed to release VM: {}", e)),
},
DaemonRequest::Exec {
runtime,
command,
backend,
compatibility_mode,
} => {
if !matches!(backend, DaemonBackend::Firecracker) {
return DaemonResponse::error(format!(
"Backend {:?} not yet supported in daemon mode",
backend
));
}
let internal_mode = to_internal_mode(compatibility_mode);
let vm = match pool.acquire_with_mode(&runtime, internal_mode).await {
Ok(vm) => vm,
Err(e) => return DaemonResponse::error(format!("Failed to acquire VM: {}", e)),
};
let vsock_path = vm.vsock_path.to_string_lossy().to_string();
let result = {
let mut cache = connections.lock().await;
if let Some(conn) = cache.get_mut(&vsock_path) {
conn.run_command(&command).await
} else {
drop(cache);
match VsockConnection::connect(&vm.vsock_path, AGENT_PORT).await {
Ok(mut conn) => {
let result = conn.run_command(&command).await;
if result.is_ok() {
connections.lock().await.insert(vsock_path.clone(), conn);
}
result
}
Err(e) => {
let vsock_client = VsockClient::for_firecracker(&vm.vsock_path);
vsock_client.run_command(&command).await.map_err(|_| e)
}
}
}
};
let _ = pool.release(&vm.id).await;
match result {
Ok(run_result) => DaemonResponse::Executed {
exit_code: run_result.exit_code,
stdout: run_result.stdout,
stderr: run_result.stderr,
},
Err(e) => DaemonResponse::error(format!("Command failed: {}", e)),
}
}
DaemonRequest::Prewarm { compatibility_mode } => {
let internal_mode = to_internal_mode(compatibility_mode);
match pool.warm_up_for_agent(internal_mode).await {
Ok(_) => {
let (warm, _) = pool.stats().await;
DaemonResponse::Prewarmed {
compatibility_mode,
count: warm,
}
}
Err(e) => DaemonResponse::error(format!("Failed to prewarm: {}", e)),
}
}
DaemonRequest::Status => {
let (warm, in_use) = pool.stats().await;
let agent_stats = pool.stats_by_agent().await;
DaemonResponse::Status {
warm,
in_use,
min_warm: 3, max_warm: 5,
backends: vec!["firecracker".to_string()],
agent_stats,
}
}
DaemonRequest::Shutdown => {
pool.shutdown();
DaemonResponse::ShuttingDown
}
}
}