use anyhow::{Context, Result, bail};
use std::collections::{HashMap, VecDeque};
use std::path::PathBuf;
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, Semaphore};
use tokio::time::sleep;
use crate::firecracker_client::{BootSource, Drive, FirecrackerClient, MachineConfig, VsockDevice};
use crate::permissions::CompatibilityMode;
use crate::vsock::VsockClient;
#[derive(Debug, Clone)]
pub struct VmHandle {
pub id: String,
pub cid: u32,
pub vsock_path: PathBuf,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct AgentPoolConfig {
pub min_warm: usize,
pub max_warm: usize,
pub runtime: String,
pub mem_size_mib: u64,
pub vcpu_count: u32,
}
#[allow(dead_code)]
impl AgentPoolConfig {
pub fn claude_code() -> Self {
Self {
min_warm: 2,
max_warm: 4,
runtime: "python".to_string(), mem_size_mib: 512,
vcpu_count: 1,
}
}
pub fn codex() -> Self {
Self {
min_warm: 1,
max_warm: 3,
runtime: "python".to_string(),
mem_size_mib: 512,
vcpu_count: 1,
}
}
pub fn gemini() -> Self {
Self {
min_warm: 1,
max_warm: 3,
runtime: "python".to_string(),
mem_size_mib: 512,
vcpu_count: 1,
}
}
pub fn native() -> Self {
Self {
min_warm: 2,
max_warm: 5,
runtime: "base".to_string(),
mem_size_mib: 512,
vcpu_count: 1,
}
}
}
impl Default for AgentPoolConfig {
fn default() -> Self {
Self::native()
}
}
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub min_warm: usize,
pub max_warm: usize,
pub max_age_secs: u64,
pub health_interval_secs: u64,
pub default_runtime: String,
pub agent_configs: HashMap<CompatibilityMode, AgentPoolConfig>,
pub prewarm_agents: Vec<CompatibilityMode>,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
min_warm: 3,
max_warm: 5,
max_age_secs: 300, health_interval_secs: 30,
default_runtime: "base".to_string(),
agent_configs: HashMap::new(),
prewarm_agents: vec![],
}
}
}
#[allow(dead_code)]
impl PoolConfig {
pub fn with_all_agents() -> Self {
let mut agent_configs = HashMap::new();
agent_configs.insert(CompatibilityMode::Native, AgentPoolConfig::native());
agent_configs.insert(
CompatibilityMode::ClaudeCode,
AgentPoolConfig::claude_code(),
);
agent_configs.insert(CompatibilityMode::Codex, AgentPoolConfig::codex());
agent_configs.insert(CompatibilityMode::Gemini, AgentPoolConfig::gemini());
Self {
min_warm: 3,
max_warm: 10,
max_age_secs: 300,
health_interval_secs: 30,
default_runtime: "base".to_string(),
agent_configs,
prewarm_agents: vec![CompatibilityMode::Native], }
}
pub fn get_agent_config(&self, mode: CompatibilityMode) -> AgentPoolConfig {
self.agent_configs.get(&mode).cloned().unwrap_or_default()
}
}
#[derive(Debug)]
pub struct PooledVm {
pub id: String,
pub cid: u32,
pub vsock_path: PathBuf,
pub api_socket_path: PathBuf,
process: Child,
pub runtime: String,
pub compatibility_mode: CompatibilityMode,
pub created_at: Instant,
pub last_used: Instant,
}
impl PooledVm {
pub fn is_alive(&self) -> bool {
Command::new("ps")
.arg("-p")
.arg(self.process.id().to_string())
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
pub fn kill(&mut self) {
let _ = self.process.kill();
let _ = self.process.wait();
let _ = std::fs::remove_file(&self.api_socket_path);
let _ = std::fs::remove_file(&self.vsock_path);
}
}
pub struct FirecrackerPool {
warm_pool: Mutex<VecDeque<PooledVm>>,
in_use: Mutex<HashMap<String, PooledVm>>,
config: PoolConfig,
start_semaphore: Semaphore,
next_cid: AtomicU32,
kernel_path: PathBuf,
rootfs_dir: PathBuf,
shutdown: std::sync::atomic::AtomicBool,
}
impl FirecrackerPool {
pub fn new(config: PoolConfig, kernel_path: PathBuf, rootfs_dir: PathBuf) -> Self {
Self {
warm_pool: Mutex::new(VecDeque::new()),
in_use: Mutex::new(HashMap::new()),
config,
start_semaphore: Semaphore::new(2), next_cid: AtomicU32::new(100), kernel_path,
rootfs_dir,
shutdown: std::sync::atomic::AtomicBool::new(false),
}
}
pub async fn stats(&self) -> (usize, usize) {
let warm = { self.warm_pool.lock().await.len() };
let in_use = { self.in_use.lock().await.len() };
(warm, in_use)
}
pub async fn stats_by_agent(&self) -> HashMap<String, usize> {
let pool = self.warm_pool.lock().await;
let mut stats: HashMap<String, usize> = HashMap::new();
for vm in pool.iter() {
let mode_str = match vm.compatibility_mode {
CompatibilityMode::Native => "native",
CompatibilityMode::ClaudeCode => "claude",
CompatibilityMode::Codex => "codex",
CompatibilityMode::Gemini => "gemini",
CompatibilityMode::Amp => "amp",
CompatibilityMode::Pi => "pi",
};
*stats.entry(mode_str.to_string()).or_insert(0) += 1;
}
stats
}
#[allow(dead_code)]
pub async fn acquire(&self, runtime: &str) -> Result<VmHandle> {
self.acquire_with_mode(runtime, CompatibilityMode::Native)
.await
}
#[allow(dead_code)]
pub async fn acquire_for_agent(&self, mode: CompatibilityMode) -> Result<VmHandle> {
let agent_config = self.config.get_agent_config(mode);
self.acquire_with_mode(&agent_config.runtime, mode).await
}
pub async fn acquire_with_mode(
&self,
runtime: &str,
mode: CompatibilityMode,
) -> Result<VmHandle> {
let vm_opt = {
let mut pool = self.warm_pool.lock().await;
let exact_idx = pool.iter().position(|vm| {
vm.runtime == runtime && vm.compatibility_mode == mode && vm.is_alive()
});
let idx = exact_idx.or_else(|| {
pool.iter()
.position(|vm| vm.runtime == runtime && vm.is_alive())
});
if let Some(idx) = idx {
let mut vm = pool.remove(idx).unwrap();
vm.last_used = Instant::now();
Some(vm)
} else {
None
}
};
if let Some(vm) = vm_opt {
let handle = VmHandle {
id: vm.id.clone(),
cid: vm.cid,
vsock_path: vm.vsock_path.clone(),
};
self.in_use.lock().await.insert(vm.id.clone(), vm);
return Ok(handle);
}
let vm = self.start_vm_with_mode(runtime, mode).await?;
let handle = VmHandle {
id: vm.id.clone(),
cid: vm.cid,
vsock_path: vm.vsock_path.clone(),
};
self.in_use.lock().await.insert(vm.id.clone(), vm);
Ok(handle)
}
pub async fn release(&self, id: &str) -> Result<()> {
let vm_opt = {
let mut in_use = self.in_use.lock().await;
in_use.remove(id)
};
if let Some(mut vm) = vm_opt {
let age = vm.created_at.elapsed();
let max_age = Duration::from_secs(self.config.max_age_secs);
if vm.is_alive() && age < max_age {
vm.last_used = Instant::now();
let mut pool = self.warm_pool.lock().await;
if pool.len() < self.config.max_warm {
pool.push_back(vm);
} else {
vm.kill();
}
} else {
vm.kill();
}
}
Ok(())
}
async fn start_vm(&self, runtime: &str) -> Result<PooledVm> {
self.start_vm_with_mode(runtime, CompatibilityMode::Native)
.await
}
async fn start_vm_with_mode(&self, runtime: &str, mode: CompatibilityMode) -> Result<PooledVm> {
let _permit = self.start_semaphore.acquire().await?;
let agent_config = self.config.get_agent_config(mode);
let cid = self.next_cid.fetch_add(1, Ordering::SeqCst);
let mode_str = match mode {
CompatibilityMode::Native => "native",
CompatibilityMode::ClaudeCode => "claude",
CompatibilityMode::Codex => "codex",
CompatibilityMode::Gemini => "gemini",
CompatibilityMode::Amp => "amp",
CompatibilityMode::Pi => "pi",
};
let id = format!("pool-{}-{}-{}", mode_str, runtime, cid);
let api_socket_path = PathBuf::from(format!("/tmp/agentkernel-{}.sock", id));
let vsock_path = PathBuf::from(format!("/tmp/agentkernel-{}-vsock.sock", id));
let _ = std::fs::remove_file(&api_socket_path);
let _ = std::fs::remove_file(&vsock_path);
let firecracker_bin = Self::find_firecracker()?;
let process = Command::new(&firecracker_bin)
.arg("--api-sock")
.arg(&api_socket_path)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.with_context(|| {
format!("Failed to start firecracker: {}", firecracker_bin.display())
})?;
for _ in 0..50 {
if api_socket_path.exists() {
break;
}
sleep(Duration::from_millis(100)).await;
}
if !api_socket_path.exists() {
bail!("Firecracker API socket not available after 5 seconds");
}
let client = FirecrackerClient::new(&api_socket_path);
let boot_source = BootSource {
kernel_image_path: self.kernel_path.to_string_lossy().to_string(),
boot_args: "console=ttyS0 reboot=k panic=1 pci=off init=/init quiet loglevel=4 i8042.nokbd i8042.noaux".to_string(),
};
client.set_boot_source(&boot_source).await?;
let rootfs_path = self.rootfs_dir.join(format!("{}.ext4", runtime));
if !rootfs_path.exists() {
bail!("Rootfs not found: {}", rootfs_path.display());
}
let drive = Drive {
drive_id: "rootfs".to_string(),
path_on_host: rootfs_path.to_string_lossy().to_string(),
is_root_device: true,
is_read_only: false,
};
client.set_drive("rootfs", &drive).await?;
let machine = MachineConfig {
vcpu_count: agent_config.vcpu_count,
mem_size_mib: agent_config.mem_size_mib,
};
client.set_machine_config(&machine).await?;
let vsock = VsockDevice {
guest_cid: cid,
uds_path: vsock_path.to_string_lossy().to_string(),
};
client.set_vsock(&vsock).await?;
client.start_instance().await?;
let vsock_client = VsockClient::for_firecracker(vsock_path.clone());
for i in 0..100 {
if vsock_client.ping().await.unwrap_or(false) {
break;
}
if i == 99 {
bail!("Guest agent not available after 10 seconds");
}
if i % 20 == 0 && i > 0 {
eprintln!("Waiting for guest agent... ({}s)", i / 10);
}
sleep(Duration::from_millis(100)).await;
}
let now = Instant::now();
Ok(PooledVm {
id,
cid,
vsock_path,
api_socket_path,
process,
runtime: runtime.to_string(),
compatibility_mode: mode,
created_at: now,
last_used: now,
})
}
fn find_firecracker() -> Result<PathBuf> {
if let Ok(path) = std::env::var("FIRECRACKER_BIN") {
let path = PathBuf::from(path);
if path.exists() {
return Ok(path);
}
}
if let Some(home) = std::env::var_os("HOME") {
let local_fc = PathBuf::from(home).join(".local/share/agentkernel/bin/firecracker");
if local_fc.exists() {
return Ok(local_fc);
}
}
let locations = [
"/usr/local/bin/firecracker",
"/usr/bin/firecracker",
"./firecracker",
];
for loc in locations {
let path = PathBuf::from(loc);
if path.exists() {
return Ok(path);
}
}
if let Ok(output) = Command::new("which").arg("firecracker").output()
&& output.status.success()
{
let path = String::from_utf8_lossy(&output.stdout).trim().to_string();
if !path.is_empty() {
return Ok(PathBuf::from(path));
}
}
bail!("Firecracker binary not found")
}
pub async fn warm_up(&self) -> Result<()> {
if !self.config.prewarm_agents.is_empty() {
return self.warm_up_agents().await;
}
let runtime = &self.config.default_runtime;
let current = self.warm_pool.lock().await.len();
let needed = self.config.min_warm.saturating_sub(current);
for _ in 0..needed {
if self.shutdown.load(Ordering::SeqCst) {
break;
}
match self.start_vm(runtime).await {
Ok(vm) => {
self.warm_pool.lock().await.push_back(vm);
}
Err(e) => {
eprintln!("Failed to warm up VM: {}", e);
}
}
}
Ok(())
}
pub async fn warm_up_agents(&self) -> Result<()> {
for mode in &self.config.prewarm_agents {
if self.shutdown.load(Ordering::SeqCst) {
break;
}
let agent_config = self.config.get_agent_config(*mode);
let mode_str = match mode {
CompatibilityMode::Native => "native",
CompatibilityMode::ClaudeCode => "claude",
CompatibilityMode::Codex => "codex",
CompatibilityMode::Gemini => "gemini",
CompatibilityMode::Amp => "amp",
CompatibilityMode::Pi => "pi",
};
let current = {
let pool = self.warm_pool.lock().await;
pool.iter()
.filter(|vm| vm.compatibility_mode == *mode)
.count()
};
let needed = agent_config.min_warm.saturating_sub(current);
if needed == 0 {
continue;
}
eprintln!("Pre-warming {} VMs for {} mode...", needed, mode_str);
for _ in 0..needed {
if self.shutdown.load(Ordering::SeqCst) {
break;
}
match self.start_vm_with_mode(&agent_config.runtime, *mode).await {
Ok(vm) => {
self.warm_pool.lock().await.push_back(vm);
}
Err(e) => {
eprintln!("Failed to warm up {} VM: {}", mode_str, e);
}
}
}
}
Ok(())
}
pub async fn warm_up_for_agent(&self, mode: CompatibilityMode) -> Result<()> {
let agent_config = self.config.get_agent_config(mode);
let mode_str = match mode {
CompatibilityMode::Native => "native",
CompatibilityMode::ClaudeCode => "claude",
CompatibilityMode::Codex => "codex",
CompatibilityMode::Gemini => "gemini",
CompatibilityMode::Amp => "amp",
CompatibilityMode::Pi => "pi",
};
let current = {
let pool = self.warm_pool.lock().await;
pool.iter()
.filter(|vm| vm.compatibility_mode == mode)
.count()
};
let needed = agent_config.min_warm.saturating_sub(current);
if needed == 0 {
return Ok(());
}
eprintln!("Pre-warming {} VMs for {} mode...", needed, mode_str);
for _ in 0..needed {
if self.shutdown.load(Ordering::SeqCst) {
break;
}
match self.start_vm_with_mode(&agent_config.runtime, mode).await {
Ok(vm) => {
self.warm_pool.lock().await.push_back(vm);
}
Err(e) => {
eprintln!("Failed to warm up {} VM: {}", mode_str, e);
}
}
}
Ok(())
}
pub async fn run_maintenance(&self) {
let interval = Duration::from_secs(self.config.health_interval_secs);
while !self.shutdown.load(Ordering::SeqCst) {
sleep(interval).await;
{
let mut pool = self.warm_pool.lock().await;
let max_age = Duration::from_secs(self.config.max_age_secs);
pool.retain(|vm| {
let alive = vm.is_alive();
let young = vm.created_at.elapsed() < max_age;
alive && young
});
}
let _ = self.warm_up().await;
}
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::SeqCst);
}
#[allow(dead_code)]
pub async fn destroy_all(&self) {
{
let mut pool = self.warm_pool.lock().await;
for mut vm in pool.drain(..) {
vm.kill();
}
}
{
let mut in_use = self.in_use.lock().await;
for (_, mut vm) in in_use.drain() {
vm.kill();
}
}
}
}
impl Drop for FirecrackerPool {
fn drop(&mut self) {
self.shutdown();
}
}