#![allow(dead_code)]
use anyhow::{Result, bail};
use std::collections::VecDeque;
use std::io::Write;
use std::process::{Child, ChildStdin, Stdio};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::{Mutex, Semaphore};
use tokio::time::{Duration, interval};
use crate::docker_backend::{ContainerRuntime, ContainerSandbox, detect_container_runtime};
use crate::permissions::Permissions;
const OUTPUT_SENTINEL: &str = "___AGENTKERNEL_DONE___";
pub struct PersistentShell {
child: Child,
stdin: ChildStdin,
container_name: String,
}
impl PersistentShell {
pub fn new(runtime: ContainerRuntime, container_name: &str) -> Result<Self> {
let mut child = std::process::Command::new(runtime.cmd())
.args(["exec", "-i", container_name, "sh"])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdin = child
.stdin
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to get stdin"))?;
Ok(Self {
child,
stdin,
container_name: container_name.to_string(),
})
}
pub fn run_command(&mut self, cmd: &[String]) -> Result<String> {
let cmd_str = cmd.join(" ");
let full_cmd = format!("({}) 2>&1; echo '{}'\n", cmd_str, OUTPUT_SENTINEL);
self.stdin.write_all(full_cmd.as_bytes())?;
self.stdin.flush()?;
let stdout = self
.child
.stdout
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Failed to get stdout"))?;
let mut output = String::new();
let mut buf = [0u8; 4096];
use std::io::Read;
loop {
let n = stdout.read(&mut buf)?;
if n == 0 {
break;
}
output.push_str(&String::from_utf8_lossy(&buf[..n]));
if output.contains(OUTPUT_SENTINEL) {
break;
}
}
if let Some(pos) = output.find(OUTPUT_SENTINEL) {
output.truncate(pos);
}
Ok(output.trim_end().to_string())
}
pub fn is_alive(&mut self) -> bool {
matches!(self.child.try_wait(), Ok(None))
}
}
impl Drop for PersistentShell {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
const DEFAULT_POOL_SIZE: usize = 10;
const DEFAULT_MAX_POOL_SIZE: usize = 50;
const DEFAULT_IMAGE: &str = "alpine:3.20";
const GC_INTERVAL_MS: u64 = 1000;
const GC_BATCH_SIZE: usize = 10;
pub struct PooledContainer {
pub name: String,
#[allow(dead_code)] pub container_id: String,
runtime: ContainerRuntime,
persistent_shell: Option<std::sync::Mutex<PersistentShell>>,
}
impl std::fmt::Debug for PooledContainer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PooledContainer")
.field("name", &self.name)
.field("container_id", &self.container_id)
.field("runtime", &self.runtime)
.field("persistent_shell", &self.persistent_shell.is_some())
.finish()
}
}
impl PooledContainer {
pub async fn run_command(&self, cmd: &[String]) -> Result<String> {
if let Some(ref shell_mutex) = self.persistent_shell
&& let Ok(mut shell) = shell_mutex.lock()
&& shell.is_alive()
{
return shell.run_command(cmd);
}
self.run_command_exec(cmd).await
}
pub async fn run_command_exec(&self, cmd: &[String]) -> Result<String> {
let runtime_cmd = self.runtime.cmd();
let container_name = format!("agentkernel-{}", self.name);
let mut args = vec!["exec", &container_name];
let cmd_refs: Vec<&str> = cmd.iter().map(|s| s.as_str()).collect();
args.extend(cmd_refs);
let output = std::process::Command::new(runtime_cmd)
.args(&args)
.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
bail!("Command failed: {}", stderr);
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
pub fn init_persistent_shell(&mut self) -> Result<()> {
let container_name = format!("agentkernel-{}", self.name);
let shell = PersistentShell::new(self.runtime, &container_name)?;
self.persistent_shell = Some(std::sync::Mutex::new(shell));
Ok(())
}
}
pub struct ContainerPool {
warm_pool: Arc<Mutex<VecDeque<PooledContainer>>>,
cleanup_queue: Arc<Mutex<VecDeque<String>>>,
start_semaphore: Arc<Semaphore>,
name_counter: AtomicUsize,
runtime: ContainerRuntime,
image: String,
target_size: usize,
max_size: usize,
running: Arc<std::sync::atomic::AtomicBool>,
}
impl ContainerPool {
pub fn new() -> Result<Self> {
let runtime = detect_container_runtime()
.ok_or_else(|| anyhow::anyhow!("No container runtime available"))?;
Ok(Self {
warm_pool: Arc::new(Mutex::new(VecDeque::new())),
cleanup_queue: Arc::new(Mutex::new(VecDeque::new())),
start_semaphore: Arc::new(Semaphore::new(10)), name_counter: AtomicUsize::new(0),
runtime,
image: DEFAULT_IMAGE.to_string(),
target_size: DEFAULT_POOL_SIZE,
max_size: DEFAULT_MAX_POOL_SIZE,
running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
})
}
pub fn with_config(target_size: usize, max_size: usize, image: &str) -> Result<Self> {
let mut pool = Self::new()?;
pool.target_size = target_size;
pool.max_size = max_size;
pool.image = image.to_string();
Ok(pool)
}
pub async fn start(&self) -> Result<()> {
self.running.store(true, Ordering::SeqCst);
self.warm_pool_to_target().await?;
self.spawn_gc_task();
Ok(())
}
pub async fn stop(&self) -> Result<()> {
self.running.store(false, Ordering::SeqCst);
{
let mut warm = self.warm_pool.lock().await;
let mut cleanup = self.cleanup_queue.lock().await;
while let Some(container) = warm.pop_front() {
cleanup.push_back(container.name);
}
}
self.gc_all().await;
Ok(())
}
pub async fn acquire(&self) -> Result<PooledContainer> {
{
let mut pool = self.warm_pool.lock().await;
if let Some(container) = pool.pop_front() {
self.spawn_refill_task();
return Ok(container);
}
}
self.create_container().await
}
pub async fn release(&self, container: PooledContainer) {
let pool_size = {
let pool = self.warm_pool.lock().await;
pool.len()
};
if pool_size < self.max_size {
let mut pool = self.warm_pool.lock().await;
pool.push_back(container);
} else {
let mut cleanup = self.cleanup_queue.lock().await;
cleanup.push_back(container.name);
}
}
pub async fn release_for_cleanup(&self, name: String) {
let mut cleanup = self.cleanup_queue.lock().await;
cleanup.push_back(name);
}
pub async fn stats(&self) -> PoolStats {
let warm = self.warm_pool.lock().await;
let cleanup = self.cleanup_queue.lock().await;
PoolStats {
warm_count: warm.len(),
cleanup_pending: cleanup.len(),
target_size: self.target_size,
max_size: self.max_size,
}
}
async fn create_container(&self) -> Result<PooledContainer> {
let _permit = self.start_semaphore.acquire().await?;
let id = self.name_counter.fetch_add(1, Ordering::SeqCst);
let name = format!("pool-{}", id);
let mut sandbox = ContainerSandbox::with_runtime(&name, self.runtime);
sandbox
.start_with_permissions(&self.image, &Permissions::default())
.await?;
let container_name = format!("agentkernel-{}", name);
let output = std::process::Command::new(self.runtime.cmd())
.args(["inspect", "-f", "{{.Id}}", &container_name])
.output()?;
let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
let mut container = PooledContainer {
name,
container_id,
runtime: self.runtime,
persistent_shell: None,
};
if let Err(e) = container.init_persistent_shell() {
eprintln!("Warning: Failed to init persistent shell: {}", e);
}
Ok(container)
}
async fn warm_pool_to_target(&self) -> Result<()> {
let current_size = {
let pool = self.warm_pool.lock().await;
pool.len()
};
let needed = self.target_size.saturating_sub(current_size);
if needed == 0 {
return Ok(());
}
eprintln!("Warming pool: creating {} containers...", needed);
let mut handles = Vec::new();
for _ in 0..needed {
let pool = self.clone_for_task();
handles.push(tokio::spawn(async move { pool.create_container().await }));
}
let mut created = 0;
for handle in handles {
if let Ok(Ok(container)) = handle.await {
let mut pool = self.warm_pool.lock().await;
pool.push_back(container);
created += 1;
}
}
eprintln!("Pool warmed: {} containers ready", created);
Ok(())
}
fn spawn_refill_task(&self) {
let pool = self.clone_for_task();
tokio::spawn(async move {
let _ = pool.warm_pool_to_target().await;
});
}
fn spawn_gc_task(&self) {
let pool = self.clone_for_task();
tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(GC_INTERVAL_MS));
while pool.running.load(Ordering::SeqCst) {
interval.tick().await;
pool.gc_batch().await;
}
});
}
async fn gc_batch(&self) {
let to_cleanup: Vec<String> = {
let mut queue = self.cleanup_queue.lock().await;
let mut batch = Vec::new();
for _ in 0..GC_BATCH_SIZE {
if let Some(name) = queue.pop_front() {
batch.push(name);
} else {
break;
}
}
batch
};
if to_cleanup.is_empty() {
return;
}
let runtime = self.runtime;
let handles: Vec<_> = to_cleanup
.into_iter()
.map(|name| {
let container_name = format!("agentkernel-{}", name);
tokio::spawn(async move {
let _ = std::process::Command::new(runtime.cmd())
.args(["rm", "-f", &container_name])
.output();
})
})
.collect();
for handle in handles {
let _ = handle.await;
}
}
async fn gc_all(&self) {
loop {
let remaining = {
let queue = self.cleanup_queue.lock().await;
queue.len()
};
if remaining == 0 {
break;
}
self.gc_batch().await;
}
}
fn clone_for_task(&self) -> ContainerPoolHandle {
ContainerPoolHandle {
warm_pool: Arc::clone(&self.warm_pool),
cleanup_queue: Arc::clone(&self.cleanup_queue),
start_semaphore: Arc::clone(&self.start_semaphore),
name_counter: self.name_counter.load(Ordering::SeqCst),
runtime: self.runtime,
image: self.image.clone(),
target_size: self.target_size,
running: Arc::clone(&self.running),
}
}
}
struct ContainerPoolHandle {
warm_pool: Arc<Mutex<VecDeque<PooledContainer>>>,
cleanup_queue: Arc<Mutex<VecDeque<String>>>,
start_semaphore: Arc<Semaphore>,
name_counter: usize,
runtime: ContainerRuntime,
image: String,
target_size: usize,
running: Arc<std::sync::atomic::AtomicBool>,
}
impl ContainerPoolHandle {
async fn warm_pool_to_target(&self) -> Result<()> {
let current_size = {
let pool = self.warm_pool.lock().await;
pool.len()
};
let needed = self.target_size.saturating_sub(current_size);
if needed == 0 {
return Ok(());
}
for i in 0..needed {
let _permit = self.start_semaphore.acquire().await?;
let name = format!("pool-{}", self.name_counter + i);
let mut sandbox = ContainerSandbox::with_runtime(&name, self.runtime);
if sandbox
.start_with_permissions(&self.image, &Permissions::default())
.await
.is_ok()
{
let container_name = format!("agentkernel-{}", name);
if let Ok(output) = std::process::Command::new(self.runtime.cmd())
.args(["inspect", "-f", "{{.Id}}", &container_name])
.output()
{
let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
let mut container = PooledContainer {
name,
container_id,
runtime: self.runtime,
persistent_shell: None,
};
let _ = container.init_persistent_shell();
let mut pool = self.warm_pool.lock().await;
pool.push_back(container);
}
}
}
Ok(())
}
async fn gc_batch(&self) {
let to_cleanup: Vec<String> = {
let mut queue = self.cleanup_queue.lock().await;
let mut batch = Vec::new();
for _ in 0..GC_BATCH_SIZE {
if let Some(name) = queue.pop_front() {
batch.push(name);
} else {
break;
}
}
batch
};
if to_cleanup.is_empty() {
return;
}
let runtime = self.runtime;
for name in to_cleanup {
let container_name = format!("agentkernel-{}", name);
let _ = std::process::Command::new(runtime.cmd())
.args(["rm", "-f", &container_name])
.output();
}
}
async fn create_container(&self) -> Result<PooledContainer> {
let _permit = self.start_semaphore.acquire().await?;
let id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let name = format!("pool-{}", id);
let mut sandbox = ContainerSandbox::with_runtime(&name, self.runtime);
sandbox
.start_with_permissions(&self.image, &Permissions::default())
.await?;
let container_name = format!("agentkernel-{}", name);
let output = std::process::Command::new(self.runtime.cmd())
.args(["inspect", "-f", "{{.Id}}", &container_name])
.output()?;
let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
let mut container = PooledContainer {
name,
container_id,
runtime: self.runtime,
persistent_shell: None,
};
let _ = container.init_persistent_shell();
Ok(container)
}
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub warm_count: usize,
pub cleanup_pending: usize,
pub target_size: usize,
pub max_size: usize,
}
impl std::fmt::Display for PoolStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Pool: {}/{} warm, {} pending cleanup",
self.warm_count, self.target_size, self.cleanup_pending
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_stats_display() {
let stats = PoolStats {
warm_count: 5,
cleanup_pending: 2,
target_size: 10,
max_size: 50,
};
let display = format!("{}", stats);
assert!(display.contains("5/10 warm"));
assert!(display.contains("2 pending cleanup"));
}
#[test]
fn test_pool_stats_display_zero() {
let stats = PoolStats {
warm_count: 0,
cleanup_pending: 0,
target_size: 5,
max_size: 20,
};
let display = format!("{}", stats);
assert!(display.contains("0/5 warm"));
assert!(display.contains("0 pending cleanup"));
}
#[test]
fn test_pool_stats_debug() {
let stats = PoolStats {
warm_count: 3,
cleanup_pending: 1,
target_size: 5,
max_size: 10,
};
let debug = format!("{:?}", stats);
assert!(debug.contains("warm_count: 3"));
assert!(debug.contains("cleanup_pending: 1"));
assert!(debug.contains("target_size: 5"));
assert!(debug.contains("max_size: 10"));
}
#[test]
fn test_pool_stats_clone() {
let stats = PoolStats {
warm_count: 5,
cleanup_pending: 2,
target_size: 10,
max_size: 50,
};
let cloned = stats.clone();
assert_eq!(cloned.warm_count, 5);
assert_eq!(cloned.cleanup_pending, 2);
assert_eq!(cloned.target_size, 10);
assert_eq!(cloned.max_size, 50);
}
#[test]
fn test_default_constants() {
assert_eq!(DEFAULT_POOL_SIZE, 10);
assert_eq!(DEFAULT_MAX_POOL_SIZE, 50);
assert_eq!(DEFAULT_IMAGE, "alpine:3.20");
assert_eq!(GC_INTERVAL_MS, 1000);
assert_eq!(GC_BATCH_SIZE, 10);
}
#[test]
fn test_output_sentinel_is_unique() {
assert!(OUTPUT_SENTINEL.starts_with("___"));
assert!(OUTPUT_SENTINEL.ends_with("___"));
assert!(OUTPUT_SENTINEL.contains("AGENTKERNEL"));
}
#[test]
fn test_container_pool_with_config_values() {
if detect_container_runtime().is_none() {
eprintln!("Skipping test: No container runtime available");
return;
}
let pool = ContainerPool::with_config(3, 15, "python:3.12-alpine").unwrap();
assert_eq!(pool.target_size, 3);
assert_eq!(pool.max_size, 15);
assert_eq!(pool.image, "python:3.12-alpine");
}
#[test]
fn test_container_pool_default_values() {
if detect_container_runtime().is_none() {
eprintln!("Skipping test: No container runtime available");
return;
}
let pool = ContainerPool::new().unwrap();
assert_eq!(pool.target_size, DEFAULT_POOL_SIZE);
assert_eq!(pool.max_size, DEFAULT_MAX_POOL_SIZE);
assert_eq!(pool.image, DEFAULT_IMAGE);
}
#[tokio::test]
#[ignore] async fn test_pool_basic() {
let pool = ContainerPool::with_config(2, 5, "alpine:3.20").unwrap();
pool.start().await.unwrap();
let container = pool.acquire().await.unwrap();
assert!(!container.name.is_empty());
let output = container
.run_command(&["echo".into(), "hello".into()])
.await
.unwrap();
assert!(output.contains("hello"));
pool.release(container).await;
let stats = pool.stats().await;
assert!(stats.warm_count >= 1);
pool.stop().await.unwrap();
}
#[tokio::test]
#[ignore] async fn test_pool_acquire_release_cycle() {
let pool = ContainerPool::with_config(2, 5, "alpine:3.20").unwrap();
pool.start().await.unwrap();
for i in 0..3 {
let container = pool.acquire().await.unwrap();
let output = container
.run_command(&["echo".into(), format!("iteration-{}", i)])
.await
.unwrap();
assert!(output.contains(&format!("iteration-{}", i)));
pool.release(container).await;
}
pool.stop().await.unwrap();
}
#[tokio::test]
#[ignore] async fn test_pool_stats_after_operations() {
let pool = ContainerPool::with_config(2, 5, "alpine:3.20").unwrap();
pool.start().await.unwrap();
let initial = pool.stats().await;
assert_eq!(initial.target_size, 2);
assert_eq!(initial.max_size, 5);
let container = pool.acquire().await.unwrap();
pool.release(container).await;
let after_release = pool.stats().await;
assert!(after_release.warm_count >= 1);
pool.stop().await.unwrap();
}
}