use anyhow::{Context, Result};
use k8s_openapi::api::core::v1::{Container, Pod, PodSpec};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::Client;
use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams};
use serde_json::json;
use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct KubernetesPoolConfig {
pub warm_pool_size: usize,
pub max_pool_size: usize,
pub image: String,
pub namespace: String,
pub runtime_class: Option<String>,
pub memory_mb: u64,
pub vcpus: u32,
}
impl Default for KubernetesPoolConfig {
fn default() -> Self {
Self {
warm_pool_size: 10,
max_pool_size: 50,
image: "alpine:3.20".to_string(),
namespace: "agentkernel".to_string(),
runtime_class: None,
memory_mb: 512,
vcpus: 1,
}
}
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub warm: usize,
pub active: usize,
pub target_warm: usize,
pub max_total: usize,
}
pub struct KubernetesPool {
config: KubernetesPoolConfig,
client: Client,
lock: Arc<Mutex<()>>,
}
impl KubernetesPool {
pub fn new(client: Client, config: KubernetesPoolConfig) -> Self {
Self {
config,
client,
lock: Arc::new(Mutex::new(())),
}
}
fn warm_labels() -> BTreeMap<String, String> {
let mut labels = BTreeMap::new();
labels.insert(
"agentkernel/managed-by".to_string(),
"agentkernel".to_string(),
);
labels.insert("agentkernel/pool".to_string(), "warm".to_string());
labels
}
fn build_warm_pod(&self, index: usize) -> Pod {
let pod_name = format!("agentkernel-warm-{}", index);
let mut labels = Self::warm_labels();
labels.insert("agentkernel/warm-index".to_string(), index.to_string());
let security_context = k8s_openapi::api::core::v1::SecurityContext {
privileged: Some(false),
allow_privilege_escalation: Some(false),
run_as_non_root: Some(true),
run_as_user: Some(1000),
capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
drop: Some(vec!["ALL".to_string()]),
..Default::default()
}),
..Default::default()
};
let mut resource_limits = BTreeMap::new();
resource_limits.insert(
"memory".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!(
"{}Mi",
self.config.memory_mb
)),
);
resource_limits.insert(
"cpu".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(format!(
"{}m",
self.config.vcpus * 1000
)),
);
let container = Container {
name: "sandbox".to_string(),
image: Some(self.config.image.clone()),
command: Some(vec![
"sh".to_string(),
"-c".to_string(),
"sleep infinity".to_string(),
]),
security_context: Some(security_context),
resources: Some(k8s_openapi::api::core::v1::ResourceRequirements {
limits: Some(resource_limits),
..Default::default()
}),
..Default::default()
};
Pod {
metadata: ObjectMeta {
name: Some(pod_name),
namespace: Some(self.config.namespace.clone()),
labels: Some(labels),
..Default::default()
},
spec: Some(PodSpec {
containers: vec![container],
restart_policy: Some("Never".to_string()),
automount_service_account_token: Some(false),
runtime_class_name: self.config.runtime_class.clone(),
..Default::default()
}),
..Default::default()
}
}
pub async fn initialize(&self) -> Result<()> {
let _guard = self.lock.lock().await;
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
let lp = ListParams::default().labels("agentkernel/pool=warm");
let existing = pods.list(&lp).await?;
let existing_count = existing.items.len();
let needed = self.config.warm_pool_size.saturating_sub(existing_count);
let start_index = existing_count;
for i in 0..needed {
let pod = self.build_warm_pod(start_index + i);
match pods.create(&PostParams::default(), &pod).await {
Ok(_) => {}
Err(e) => {
eprintln!(
"Warning: Failed to create warm pod {}: {}",
start_index + i,
e
);
}
}
}
Ok(())
}
pub async fn acquire(&self, sandbox_name: &str) -> Result<String> {
let _guard = self.lock.lock().await;
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
let lp = ListParams::default().labels("agentkernel/pool=warm");
let warm_pods = pods.list(&lp).await?;
let warm_pod = warm_pods
.items
.into_iter()
.find(|p| p.status.as_ref().and_then(|s| s.phase.as_deref()) == Some("Running"))
.ok_or_else(|| anyhow::anyhow!("No warm pods available in pool"))?;
let pod_name = warm_pod
.metadata
.name
.ok_or_else(|| anyhow::anyhow!("Warm pod has no name"))?;
let patch = json!({
"metadata": {
"labels": {
"agentkernel/pool": "active",
"agentkernel/sandbox": sandbox_name,
}
}
});
pods.patch(&pod_name, &PatchParams::default(), &Patch::Merge(&patch))
.await
.context("Failed to relabel warm pod to active")?;
Ok(pod_name)
}
pub async fn release(&self, pod_name: &str) -> Result<()> {
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
let _ = pods.delete(pod_name, &DeleteParams::default()).await;
self.replenish().await?;
Ok(())
}
pub async fn replenish(&self) -> Result<()> {
let _guard = self.lock.lock().await;
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
let warm_lp = ListParams::default().labels("agentkernel/pool=warm");
let warm_pods = pods.list(&warm_lp).await?;
let warm_count = warm_pods.items.len();
let active_lp = ListParams::default().labels("agentkernel/pool=active");
let active_pods = pods.list(&active_lp).await?;
let active_count = active_pods.items.len();
let total = warm_count + active_count;
if total >= self.config.max_pool_size {
return Ok(());
}
let needed = self.config.warm_pool_size.saturating_sub(warm_count);
let available_capacity = self.config.max_pool_size.saturating_sub(total);
let to_create = needed.min(available_capacity);
let max_index = warm_pods
.items
.iter()
.filter_map(|p| {
p.metadata
.labels
.as_ref()
.and_then(|l| l.get("agentkernel/warm-index"))
.and_then(|v| v.parse::<usize>().ok())
})
.max()
.unwrap_or(0);
for i in 0..to_create {
let pod = self.build_warm_pod(max_index + 1 + i);
match pods.create(&PostParams::default(), &pod).await {
Ok(_) => {}
Err(e) => {
eprintln!("Warning: Failed to replenish warm pod: {}", e);
}
}
}
Ok(())
}
pub async fn stats(&self) -> Result<PoolStats> {
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
let warm_lp = ListParams::default().labels("agentkernel/pool=warm");
let warm_count = pods.list(&warm_lp).await?.items.len();
let active_lp = ListParams::default().labels("agentkernel/pool=active");
let active_count = pods.list(&active_lp).await?.items.len();
Ok(PoolStats {
warm: warm_count,
active: active_count,
target_warm: self.config.warm_pool_size,
max_total: self.config.max_pool_size,
})
}
pub fn spawn_replenish_task(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
if let Err(e) = self.replenish().await {
eprintln!("Warm pool replenish error: {}", e);
}
}
})
}
pub async fn cleanup(&self) -> Result<()> {
let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
let lp = ListParams::default()
.labels("agentkernel/managed-by=agentkernel,agentkernel/pool=warm");
let warm_pods = pods.list(&lp).await?;
for pod in warm_pods.items {
if let Some(name) = pod.metadata.name {
let _ = pods.delete(&name, &DeleteParams::default()).await;
}
}
Ok(())
}
}