use anyhow::{Context, Result, bail};
use serde_json::json;
use std::sync::Arc;
use tokio::sync::Mutex;
use super::nomad::shared_http_client;
struct PoolNomadClient {
addr: String,
token: Option<String>,
http: &'static reqwest::Client,
}
impl PoolNomadClient {
fn new(addr: &str, token: Option<String>) -> Self {
Self {
addr: addr.to_string(),
token,
http: shared_http_client(),
}
}
async fn get(&self, path: &str) -> Result<serde_json::Value> {
let url = format!("{}{}", self.addr, path);
let mut req = self.http.get(&url);
if let Some(ref token) = self.token {
req = req.header("X-Nomad-Token", token);
}
let resp = req.send().await.context("Nomad API GET failed")?;
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
bail!("Nomad API error: {}", body);
}
resp.json().await.context("Failed to parse Nomad response")
}
async fn put(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value> {
let url = format!("{}{}", self.addr, path);
let mut req = self.http.put(&url).json(body);
if let Some(ref token) = self.token {
req = req.header("X-Nomad-Token", token);
}
let resp = req.send().await.context("Nomad API PUT failed")?;
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
bail!("Nomad API error: {}", body);
}
resp.json().await.context("Failed to parse Nomad response")
}
async fn post(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value> {
let url = format!("{}{}", self.addr, path);
let mut req = self.http.post(&url).json(body);
if let Some(ref token) = self.token {
req = req.header("X-Nomad-Token", token);
}
let resp = req.send().await.context("Nomad API POST failed")?;
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
bail!("Nomad API error: {}", body);
}
resp.json().await.context("Failed to parse Nomad response")
}
async fn delete(&self, path: &str) -> Result<()> {
let url = format!("{}{}", self.addr, path);
let mut req = self.http.delete(&url);
if let Some(ref token) = self.token {
req = req.header("X-Nomad-Token", token);
}
let resp = req.send().await.context("Nomad API DELETE failed")?;
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
bail!("Nomad API error: {}", body);
}
Ok(())
}
}
pub struct NomadPoolConfig {
pub nomad_addr: String,
pub nomad_token: Option<String>,
pub warm_pool_size: usize,
pub max_pool_size: usize,
pub image: String,
pub driver: String,
pub datacenter: String,
pub memory_mb: u64,
pub cpu_mhz: u32,
}
impl Default for NomadPoolConfig {
fn default() -> Self {
Self {
nomad_addr: std::env::var("NOMAD_ADDR")
.unwrap_or_else(|_| "http://127.0.0.1:4646".to_string()),
nomad_token: std::env::var("NOMAD_TOKEN").ok(),
warm_pool_size: 10,
max_pool_size: 50,
image: "alpine:3.20".to_string(),
driver: "docker".to_string(),
datacenter: "dc1".to_string(),
memory_mb: 512,
cpu_mhz: 1000,
}
}
}
#[derive(Debug, Clone)]
pub struct NomadPoolStats {
pub warm: usize,
pub active: usize,
pub target_warm: usize,
pub max_total: usize,
}
const POOL_JOB_ID: &str = "agentkernel-warm-pool";
pub struct NomadPool {
config: NomadPoolConfig,
client: PoolNomadClient,
lock: Arc<Mutex<()>>,
}
impl NomadPool {
pub fn new(config: NomadPoolConfig) -> Self {
let client = PoolNomadClient::new(&config.nomad_addr, config.nomad_token.clone());
Self {
config,
client,
lock: Arc::new(Mutex::new(())),
}
}
async fn register_pool_job(&self) -> Result<()> {
let driver_config = match self.config.driver.as_str() {
"docker" => {
json!({
"image": self.config.image,
"command": "sh",
"args": ["-c", "sleep infinity"],
"cap_drop": ["ALL"],
"privileged": false,
})
}
_ => {
json!({
"command": "sh",
"args": ["-c", "sleep infinity"],
})
}
};
let job_spec = json!({
"Job": {
"ID": POOL_JOB_ID,
"Name": POOL_JOB_ID,
"Type": "batch",
"Datacenters": [self.config.datacenter],
"Parameterized": {
"Payload": "forbidden",
"MetaRequired": [],
"MetaOptional": ["sandbox_name"]
},
"Meta": {
"agentkernel-managed": "true",
"agentkernel-pool": "warm"
},
"TaskGroups": [{
"Name": "sandbox",
"Count": 1,
"Tasks": [{
"Name": "sandbox",
"Driver": self.config.driver,
"Config": driver_config,
"Resources": {
"CPU": self.config.cpu_mhz,
"MemoryMB": self.config.memory_mb
},
"Meta": {
"agentkernel-pool": "warm"
}
}]
}]
}
});
self.client.put("/v1/jobs", &job_spec).await?;
Ok(())
}
async fn dispatch_warm(&self) -> Result<String> {
let dispatch_body = json!({
"Meta": {
"sandbox_name": "",
"agentkernel-pool-status": "warm"
}
});
let result = self
.client
.post(&format!("/v1/job/{}/dispatch", POOL_JOB_ID), &dispatch_body)
.await?;
let dispatch_id = result
.get("DispatchedJobID")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("No DispatchedJobID in response"))?
.to_string();
Ok(dispatch_id)
}
pub async fn initialize(&self) -> Result<()> {
let _guard = self.lock.lock().await;
self.register_pool_job().await?;
let warm_count = self.count_warm_allocs().await?;
let needed = self.config.warm_pool_size.saturating_sub(warm_count);
for _ in 0..needed {
if let Err(e) = self.dispatch_warm().await {
eprintln!("Warning: Failed to dispatch warm allocation: {}", e);
}
}
Ok(())
}
async fn count_warm_allocs(&self) -> Result<usize> {
let jobs = self
.client
.get("/v1/jobs?prefix=agentkernel-warm-pool/dispatch")
.await?;
let mut count = 0;
if let Some(jobs_arr) = jobs.as_array() {
for job in jobs_arr {
let job_id = job.get("ID").and_then(|v| v.as_str()).unwrap_or("");
let status = job.get("Status").and_then(|v| v.as_str()).unwrap_or("");
if status == "running" {
let meta = job.get("Meta").and_then(|m| m.as_object());
let pool_status = meta
.and_then(|m| m.get("agentkernel-pool-status"))
.and_then(|v| v.as_str())
.unwrap_or("");
if pool_status == "warm" || job_id.contains("dispatch") {
count += 1;
}
}
}
}
Ok(count)
}
pub async fn acquire(&self, _sandbox_name: &str) -> Result<(String, String)> {
let _guard = self.lock.lock().await;
let jobs = self
.client
.get("/v1/jobs?prefix=agentkernel-warm-pool/dispatch")
.await?;
if let Some(jobs_arr) = jobs.as_array() {
for job in jobs_arr {
let job_id = job.get("ID").and_then(|v| v.as_str()).unwrap_or("");
let status = job.get("Status").and_then(|v| v.as_str()).unwrap_or("");
if status != "running" || job_id.is_empty() {
continue;
}
let allocs = self
.client
.get(&format!("/v1/job/{}/allocations", job_id))
.await?;
if let Some(allocs_arr) = allocs.as_array() {
for alloc in allocs_arr {
let alloc_status = alloc
.get("ClientStatus")
.and_then(|v| v.as_str())
.unwrap_or("");
let alloc_id = alloc.get("ID").and_then(|v| v.as_str()).unwrap_or("");
if alloc_status == "running" && !alloc_id.is_empty() {
return Ok((job_id.to_string(), alloc_id.to_string()));
}
}
}
}
}
bail!("No warm allocations available in pool")
}
pub async fn release(&self, dispatched_job_id: &str) -> Result<()> {
let path = format!("/v1/job/{}?purge=true", dispatched_job_id);
let _ = self.client.delete(&path).await;
self.replenish().await?;
Ok(())
}
pub async fn replenish(&self) -> Result<()> {
let warm_count = self.count_warm_allocs().await?;
let needed = self.config.warm_pool_size.saturating_sub(warm_count);
for _ in 0..needed {
if let Err(e) = self.dispatch_warm().await {
eprintln!("Warning: Failed to replenish warm allocation: {}", e);
}
}
Ok(())
}
pub async fn stats(&self) -> Result<NomadPoolStats> {
let warm = self.count_warm_allocs().await?;
let all_jobs = self
.client
.get("/v1/jobs?prefix=agentkernel-warm-pool")
.await?;
let total_running = all_jobs
.as_array()
.map(|arr| {
arr.iter()
.filter(|j| j.get("Status").and_then(|v| v.as_str()) == Some("running"))
.count()
})
.unwrap_or(0);
Ok(NomadPoolStats {
warm,
active: total_running.saturating_sub(warm),
target_warm: self.config.warm_pool_size,
max_total: self.config.max_pool_size,
})
}
pub fn spawn_replenish_task(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
if let Err(e) = self.replenish().await {
eprintln!("Nomad warm pool replenish error: {}", e);
}
}
})
}
pub async fn cleanup(&self) -> Result<()> {
let jobs = self
.client
.get("/v1/jobs?prefix=agentkernel-warm-pool")
.await?;
if let Some(jobs_arr) = jobs.as_array() {
for job in jobs_arr {
if let Some(job_id) = job.get("ID").and_then(|v| v.as_str()) {
let path = format!("/v1/job/{}?purge=true", job_id);
let _ = self.client.delete(&path).await;
}
}
}
Ok(())
}
}