#![allow(dead_code)]
use anyhow::{Result, anyhow};
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use std::collections::VecDeque;
use tracing::{info, error};
use crate::models::claude::ClaudeCodeOutput;
use super::claude_manager::ClaudeManager;
#[derive(Clone)]
pub struct ProcessPool {
inner: Arc<ProcessPoolInner>,
}
struct ProcessPoolInner {
manager: Arc<ClaudeManager>,
pool: Mutex<Pool>,
config: PoolConfig,
}
struct Pool {
idle: VecDeque<PooledProcess>,
active: Vec<ActiveProcess>,
}
struct PooledProcess {
session_id: String,
model: String,
created_at: std::time::Instant,
}
struct ActiveProcess {
session_id: String,
in_use_since: std::time::Instant,
}
#[derive(Clone)]
pub struct PoolConfig {
pub min_idle: usize,
pub max_idle: usize,
pub max_active: usize,
pub idle_timeout_secs: u64,
pub default_model: String,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
min_idle: 2,
max_idle: 5,
max_active: 20,
idle_timeout_secs: 300, default_model: "claude-opus-4-20250514".to_string(),
}
}
}
impl ProcessPool {
pub fn new(manager: Arc<ClaudeManager>, config: PoolConfig) -> Self {
let pool = ProcessPool {
inner: Arc::new(ProcessPoolInner {
manager,
pool: Mutex::new(Pool {
idle: VecDeque::new(),
active: Vec::new(),
}),
config,
}),
};
let pool_clone = pool.clone();
tokio::spawn(async move {
pool_clone.maintain_min_idle().await;
});
let pool_clone = pool.clone();
tokio::spawn(async move {
pool_clone.cleanup_loop().await;
});
pool
}
pub async fn acquire(&self, model: Option<String>) -> Result<(String, mpsc::Receiver<ClaudeCodeOutput>)> {
let model = model.unwrap_or_else(|| self.inner.config.default_model.clone());
let session_id = {
let mut pool = self.inner.pool.lock();
let position = pool.idle.iter().position(|p| p.model == model);
if let Some(pos) = position {
let process = pool.idle.remove(pos).unwrap();
let session_id = process.session_id.clone();
pool.active.push(ActiveProcess {
session_id: session_id.clone(),
in_use_since: std::time::Instant::now(),
});
info!("Acquired process from pool: {}", session_id);
Some(session_id)
} else {
None
}
};
if let Some(session_id) = session_id {
let (_tx, rx) = mpsc::channel(100);
Ok((session_id, rx))
} else {
{
let pool = self.inner.pool.lock();
if pool.active.len() >= self.inner.config.max_active {
return Err(anyhow!("Process pool exhausted"));
}
}
info!("Creating new process for model: {}", model);
let result = self.inner.manager.create_interactive_session(None, None, Some(model.clone())).await?;
{
let mut pool = self.inner.pool.lock();
pool.active.push(ActiveProcess {
session_id: result.0.clone(),
in_use_since: std::time::Instant::now(),
});
}
Ok(result)
}
}
pub async fn release(&self, session_id: String, model: String) {
let should_close = {
let mut pool = self.inner.pool.lock();
pool.active.retain(|p| p.session_id != session_id);
if pool.idle.len() < self.inner.config.max_idle {
pool.idle.push_back(PooledProcess {
session_id: session_id.clone(),
model,
created_at: std::time::Instant::now(),
});
info!("Released process back to pool");
false
} else {
true
}
};
if should_close {
let _ = self.inner.manager.close_session(&session_id).await;
info!("Pool full, closed process: {}", session_id);
}
}
async fn maintain_min_idle(&self) {
loop {
let needed = {
let pool = self.inner.pool.lock();
let current_idle = pool.idle.len();
self.inner.config.min_idle.saturating_sub(current_idle)
};
for _ in 0..needed {
match self.inner.manager.create_interactive_session(
None,
None,
Some(self.inner.config.default_model.clone())
).await {
Ok((session_id, _)) => {
let mut pool = self.inner.pool.lock();
pool.idle.push_back(PooledProcess {
session_id,
model: self.inner.config.default_model.clone(),
created_at: std::time::Instant::now(),
});
info!("Pre-warmed process added to pool");
}
Err(e) => {
error!("Failed to create pre-warmed process: {}", e);
}
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
}
async fn cleanup_loop(&self) {
let timeout = std::time::Duration::from_secs(self.inner.config.idle_timeout_secs);
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
let expired = {
let mut pool = self.inner.pool.lock();
let mut expired = Vec::new();
pool.idle.retain(|p| {
if p.created_at.elapsed() > timeout {
expired.push(p.session_id.clone());
false
} else {
true
}
});
expired
};
for session_id in expired {
let _ = self.inner.manager.close_session(&session_id).await;
info!("Closed idle process due to timeout: {}", session_id);
}
}
}
}