use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use terraphim_config::Role;
use terraphim_persistence::DeviceStorage;
use crate::{
AgentPool, CommandInput, CommandOutput, LoadBalancingStrategy, MultiAgentError,
MultiAgentResult, PoolConfig, PoolStats, TerraphimAgent,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoolManagerConfig {
pub default_pool_config: PoolConfig,
pub max_pools: usize,
pub create_pools_on_demand: bool,
pub cleanup_interval_seconds: u64,
pub pool_max_idle_duration_seconds: u64,
}
impl Default for PoolManagerConfig {
fn default() -> Self {
Self {
default_pool_config: PoolConfig::default(),
max_pools: 20,
create_pools_on_demand: true,
cleanup_interval_seconds: 300, pool_max_idle_duration_seconds: 1800, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoolInfo {
pub role_name: String,
pub created_at: DateTime<Utc>,
pub last_used: DateTime<Utc>,
pub stats: PoolStats,
pub is_active: bool,
}
pub struct PoolManager {
config: PoolManagerConfig,
persistence: Arc<DeviceStorage>,
pools: Arc<RwLock<HashMap<String, Arc<AgentPool>>>>,
pool_info: Arc<RwLock<HashMap<String, PoolInfo>>>,
global_stats: Arc<RwLock<GlobalStats>>,
_cleanup_task: tokio::task::JoinHandle<()>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GlobalStats {
pub total_pools: usize,
pub total_agents: usize,
pub total_operations: u64,
pub average_operation_time_ms: f64,
pub total_pool_hits: u64,
pub total_pool_misses: u64,
pub last_updated: DateTime<Utc>,
}
impl Default for GlobalStats {
fn default() -> Self {
Self {
total_pools: 0,
total_agents: 0,
total_operations: 0,
average_operation_time_ms: 0.0,
total_pool_hits: 0,
total_pool_misses: 0,
last_updated: Utc::now(),
}
}
}
impl PoolManager {
pub async fn new(
persistence: Arc<DeviceStorage>,
config: Option<PoolManagerConfig>,
) -> MultiAgentResult<Self> {
let config = config.unwrap_or_default();
let pools = Arc::new(RwLock::new(HashMap::new()));
let pool_info = Arc::new(RwLock::new(HashMap::new()));
let global_stats = Arc::new(RwLock::new(GlobalStats::default()));
let cleanup_task = Self::start_cleanup_task(
pools.clone(),
pool_info.clone(),
global_stats.clone(),
config.clone(),
);
Ok(Self {
config,
persistence,
pools,
pool_info,
global_stats,
_cleanup_task: cleanup_task,
})
}
pub async fn get_pool(&self, role: &Role) -> MultiAgentResult<Arc<AgentPool>> {
let role_name = role.name.to_string();
{
let pools = self.pools.read().await;
if let Some(pool) = pools.get(&role_name) {
{
let mut pool_info = self.pool_info.write().await;
if let Some(info) = pool_info.get_mut(&role_name) {
info.last_used = Utc::now();
}
}
{
let mut stats = self.global_stats.write().await;
stats.total_pool_hits += 1;
stats.last_updated = Utc::now();
}
return Ok(pool.clone());
}
}
if !self.config.create_pools_on_demand {
return Err(MultiAgentError::PoolError(
"Pool creation on demand is disabled".to_string(),
));
}
let pools_count = self.pools.read().await.len();
if pools_count >= self.config.max_pools {
return Err(MultiAgentError::PoolError(format!(
"Maximum number of pools ({}) reached",
self.config.max_pools
)));
}
log::info!("Creating new agent pool for role: {}", role_name);
let pool = Arc::new(
AgentPool::new(
role.clone(),
self.persistence.clone(),
Some(self.config.default_pool_config.clone()),
)
.await?,
);
{
let mut pools = self.pools.write().await;
pools.insert(role_name.clone(), pool.clone());
}
{
let mut pool_info = self.pool_info.write().await;
let now = Utc::now();
pool_info.insert(
role_name.clone(),
PoolInfo {
role_name: role_name.clone(),
created_at: now,
last_used: now,
stats: pool.get_stats().await,
is_active: true,
},
);
}
{
let mut stats = self.global_stats.write().await;
stats.total_pools += 1;
stats.total_pool_misses += 1;
stats.last_updated = Utc::now();
}
log::info!("Successfully created agent pool for role: {}", role_name);
Ok(pool)
}
pub async fn execute_command(
&self,
role: &Role,
input: CommandInput,
) -> MultiAgentResult<CommandOutput> {
let pool = self.get_pool(role).await?;
let start_time = std::time::Instant::now();
let result = pool.execute_command(input).await;
let duration = start_time.elapsed();
{
let mut stats = self.global_stats.write().await;
stats.total_operations += 1;
let duration_ms = duration.as_millis() as f64;
if stats.average_operation_time_ms == 0.0 {
stats.average_operation_time_ms = duration_ms;
} else {
stats.average_operation_time_ms =
0.95 * stats.average_operation_time_ms + 0.05 * duration_ms;
}
stats.last_updated = Utc::now();
}
result
}
pub async fn get_agent(&self, role: &Role) -> MultiAgentResult<Arc<TerraphimAgent>> {
let pool = self.get_pool(role).await?;
let handle = pool.get_agent().await?;
if let Some(agent) = handle.agent() {
Ok(agent.clone())
} else {
Err(MultiAgentError::PoolError(
"Agent handle is empty".to_string(),
))
}
}
pub async fn list_pools(&self) -> Vec<PoolInfo> {
let pool_info = self.pool_info.read().await;
pool_info.values().cloned().collect()
}
pub async fn get_pool_stats(&self, role_name: &str) -> Option<PoolStats> {
let pools = self.pools.read().await;
if let Some(pool) = pools.get(role_name) {
Some(pool.get_stats().await)
} else {
None
}
}
pub async fn get_global_stats(&self) -> GlobalStats {
let mut stats = self.global_stats.read().await.clone();
let pools = self.pools.read().await;
stats.total_pools = pools.len();
let mut total_agents = 0;
for pool in pools.values() {
let pool_stats = pool.get_stats().await;
total_agents += pool_stats.current_pool_size + pool_stats.current_busy_agents;
}
stats.total_agents = total_agents;
stats
}
pub async fn shutdown_pool(&self, role_name: &str) -> MultiAgentResult<()> {
let pool = {
let mut pools = self.pools.write().await;
pools.remove(role_name)
};
if let Some(pool) = pool {
pool.shutdown().await?;
{
let mut pool_info = self.pool_info.write().await;
if let Some(info) = pool_info.get_mut(role_name) {
info.is_active = false;
}
}
{
let mut stats = self.global_stats.write().await;
stats.total_pools = stats.total_pools.saturating_sub(1);
stats.last_updated = Utc::now();
}
log::info!("Shut down pool for role: {}", role_name);
}
Ok(())
}
pub async fn shutdown_all(&self) -> MultiAgentResult<()> {
log::info!("Shutting down all agent pools");
let pool_names: Vec<String> = {
let pools = self.pools.read().await;
pools.keys().cloned().collect()
};
for role_name in pool_names {
if let Err(e) = self.shutdown_pool(&role_name).await {
log::error!("Failed to shutdown pool {}: {}", role_name, e);
}
}
log::info!("All agent pools shut down");
Ok(())
}
pub async fn set_load_balancing_strategy(&self, strategy: LoadBalancingStrategy) {
log::info!("Load balancing strategy update requested: {:?}", strategy);
}
fn start_cleanup_task(
pools: Arc<RwLock<HashMap<String, Arc<AgentPool>>>>,
pool_info: Arc<RwLock<HashMap<String, PoolInfo>>>,
global_stats: Arc<RwLock<GlobalStats>>,
config: PoolManagerConfig,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(
config.cleanup_interval_seconds,
));
loop {
interval.tick().await;
let max_idle_duration =
std::time::Duration::from_secs(config.pool_max_idle_duration_seconds);
let pools_to_cleanup = {
let pool_info_guard = pool_info.read().await;
let now = Utc::now();
pool_info_guard
.iter()
.filter_map(|(name, info)| {
let idle_duration = now - info.last_used;
if idle_duration.to_std().unwrap_or_default() > max_idle_duration
&& info.is_active
{
Some(name.clone())
} else {
None
}
})
.collect::<Vec<_>>()
};
for pool_name in pools_to_cleanup {
log::info!("Cleaning up idle pool: {}", pool_name);
{
let mut pools_guard = pools.write().await;
if let Some(pool) = pools_guard.remove(&pool_name) {
if let Err(e) = pool.shutdown().await {
log::error!("Failed to shutdown pool {}: {}", pool_name, e);
}
}
}
{
let mut pool_info_guard = pool_info.write().await;
if let Some(info) = pool_info_guard.get_mut(&pool_name) {
info.is_active = false;
}
}
{
let mut stats = global_stats.write().await;
stats.total_pools = stats.total_pools.saturating_sub(1);
stats.last_updated = Utc::now();
}
}
let active_pools = pools.read().await.len();
if active_pools > 0 {
log::debug!("Pool manager status: {} active pools", active_pools);
}
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::create_test_role;
use terraphim_persistence::DeviceStorage;
#[tokio::test]
async fn test_pool_manager_creation() {
DeviceStorage::init_memory_only().await.unwrap();
let storage = DeviceStorage::arc_memory_only().await.unwrap();
let manager = PoolManager::new(storage, None).await.unwrap();
let stats = manager.get_global_stats().await;
assert_eq!(stats.total_pools, 0);
assert_eq!(stats.total_agents, 0);
}
#[tokio::test]
async fn test_pool_creation_on_demand() {
DeviceStorage::init_memory_only().await.unwrap();
let storage = DeviceStorage::arc_memory_only().await.unwrap();
let manager = PoolManager::new(storage, None).await.unwrap();
let role = create_test_role();
let pool1 = manager.get_pool(&role).await.unwrap();
assert!(pool1.get_stats().await.current_pool_size > 0);
let pool2 = manager.get_pool(&role).await.unwrap();
assert!(Arc::ptr_eq(&pool1, &pool2));
let stats = manager.get_global_stats().await;
assert_eq!(stats.total_pools, 1);
assert_eq!(stats.total_pool_hits, 1);
assert_eq!(stats.total_pool_misses, 1);
}
#[tokio::test]
async fn test_pool_shutdown() {
DeviceStorage::init_memory_only().await.unwrap();
let storage = DeviceStorage::arc_memory_only().await.unwrap();
let manager = PoolManager::new(storage, None).await.unwrap();
let role = create_test_role();
let _pool = manager.get_pool(&role).await.unwrap();
assert_eq!(manager.get_global_stats().await.total_pools, 1);
manager.shutdown_pool(&role.name.to_string()).await.unwrap();
assert_eq!(manager.get_global_stats().await.total_pools, 0);
}
}