use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use anyhow::Result;
#[async_trait]
pub trait MonitorableResource: Send + Sync + 'static {
type Metrics: Clone + Send + Sync + Serialize + for<'a> Deserialize<'a>;
fn identifier(&self) -> String;
async fn collect_metrics(&self) -> Result<Self::Metrics>;
fn is_healthy(&self, metrics: &Self::Metrics) -> bool;
fn get_limits(&self) -> ResourceLimits;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceLimits {
pub cpu_limit: Option<f64>,
pub memory_limit: Option<u64>,
pub disk_limit: Option<u64>,
pub custom_limits: HashMap<String, f64>,
}
impl Default for ResourceLimits {
fn default() -> Self {
Self {
cpu_limit: Some(80.0),
memory_limit: Some(1024 * 1024 * 1024), disk_limit: Some(10 * 1024 * 1024 * 1024), custom_limits: HashMap::new(),
}
}
}
pub struct UnifiedMonitor<R: MonitorableResource> {
resources: Arc<RwLock<HashMap<String, R>>>,
metrics_cache: Arc<RwLock<HashMap<String, R::Metrics>>>,
update_interval: std::time::Duration,
}
impl<R: MonitorableResource> UnifiedMonitor<R> {
pub fn new(update_interval: std::time::Duration) -> Self {
Self {
resources: Arc::new(RwLock::new(HashMap::new())),
metrics_cache: Arc::new(RwLock::new(HashMap::new())),
update_interval,
}
}
pub async fn register(&self, resource: R) -> Result<()> {
let id = resource.identifier();
self.resources.write().await.insert(id.clone(), resource);
Ok(())
}
pub async fn update_all(&self) -> Result<()> {
let resources = self.resources.read().await;
let mut metrics = self.metrics_cache.write().await;
for (id, resource) in resources.iter() {
match resource.collect_metrics().await {
Ok(m) => {
metrics.insert(id.clone(), m);
}
Err(e) => {
log::warn!("Failed to collect metrics for {}: {}", id, e);
}
}
}
Ok(())
}
pub async fn get_metrics(&self, id: &str) -> Option<R::Metrics> {
self.metrics_cache.read().await.get(id).cloned()
}
pub async fn find_exceeding_limits(&self) -> Vec<(String, R::Metrics)> {
let resources = self.resources.read().await;
let metrics = self.metrics_cache.read().await;
let mut exceeding = Vec::new();
for (id, resource) in resources.iter() {
if let Some(m) = metrics.get(id) {
if !resource.is_healthy(m) {
exceeding.push((id.clone(), m.clone()));
}
}
}
exceeding
}
pub async fn get_all_states(&self) -> HashMap<String, R::Metrics> {
self.metrics_cache.read().await.clone()
}
pub async fn start_monitoring(self: Arc<Self>) {
let monitor = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(monitor.update_interval);
loop {
interval.tick().await;
if let Err(e) = monitor.update_all().await {
log::error!("Monitoring update failed: {}", e);
}
}
});
}
}
pub struct AgentResource {
pub name: String,
pub pid: Option<u32>,
pub limits: ResourceLimits,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentMetrics {
pub cpu_usage: f64,
pub memory_usage: u64,
pub disk_usage: u64,
pub active_tasks: usize,
pub last_updated: std::time::SystemTime,
}
#[async_trait]
impl MonitorableResource for AgentResource {
type Metrics = AgentMetrics;
fn identifier(&self) -> String {
self.name.clone()
}
async fn collect_metrics(&self) -> Result<Self::Metrics> {
Ok(AgentMetrics {
cpu_usage: 0.0, memory_usage: 0, disk_usage: 0, active_tasks: 0,
last_updated: std::time::SystemTime::now(),
})
}
fn is_healthy(&self, metrics: &Self::Metrics) -> bool {
if let Some(cpu_limit) = self.limits.cpu_limit {
if metrics.cpu_usage > cpu_limit {
return false;
}
}
if let Some(mem_limit) = self.limits.memory_limit {
if metrics.memory_usage > mem_limit {
return false;
}
}
true
}
fn get_limits(&self) -> ResourceLimits {
self.limits.clone()
}
}
pub struct SessionResource {
pub id: String,
pub agent_name: String,
pub limits: ResourceLimits,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionMetrics {
pub token_usage: usize,
pub context_size: usize,
pub duration: std::time::Duration,
pub last_activity: std::time::SystemTime,
}
#[async_trait]
impl MonitorableResource for SessionResource {
type Metrics = SessionMetrics;
fn identifier(&self) -> String {
self.id.clone()
}
async fn collect_metrics(&self) -> Result<Self::Metrics> {
Ok(SessionMetrics {
token_usage: 0,
context_size: 0,
duration: std::time::Duration::from_secs(0),
last_activity: std::time::SystemTime::now(),
})
}
fn is_healthy(&self, metrics: &Self::Metrics) -> bool {
if let Some(token_limit) = self.limits.custom_limits.get("max_tokens") {
if metrics.token_usage as f64 > *token_limit {
return false;
}
}
true
}
fn get_limits(&self) -> ResourceLimits {
self.limits.clone()
}
}
pub struct MonitorFactory;
impl MonitorFactory {
pub fn create_agent_monitor() -> Arc<UnifiedMonitor<AgentResource>> {
Arc::new(UnifiedMonitor::new(std::time::Duration::from_secs(5)))
}
pub fn create_session_monitor() -> Arc<UnifiedMonitor<SessionResource>> {
Arc::new(UnifiedMonitor::new(std::time::Duration::from_secs(10)))
}
}