use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use crate::cost_tracker::AgentMetrics;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedAgentMetrics {
pub version: u32,
pub updated_at: String,
pub agents: HashMap<String, AgentMetrics>,
pub fleet: AgentMetrics,
}
impl PersistedAgentMetrics {
pub fn new(agents: HashMap<String, AgentMetrics>, fleet: AgentMetrics) -> Self {
Self {
version: 1,
updated_at: chrono::Utc::now().to_rfc3339(),
agents,
fleet,
}
}
}
#[derive(Debug, Clone)]
pub struct MetricsPersistenceConfig {
pub key_prefix: String,
pub compress: bool,
}
impl Default for MetricsPersistenceConfig {
fn default() -> Self {
Self {
key_prefix: "adf/metrics".to_string(),
compress: true,
}
}
}
#[async_trait]
pub trait MetricsPersistence: Send + Sync {
async fn save_metrics(
&self,
agent_name: &str,
metrics: &AgentMetrics,
) -> Result<(), MetricsPersistenceError>;
async fn load_metrics(
&self,
agent_name: &str,
) -> Result<Option<AgentMetrics>, MetricsPersistenceError>;
async fn save_fleet_metrics(
&self,
metrics: &AgentMetrics,
) -> Result<(), MetricsPersistenceError>;
async fn load_fleet_metrics(&self) -> Result<Option<AgentMetrics>, MetricsPersistenceError>;
async fn list_agents(&self) -> Result<Vec<String>, MetricsPersistenceError>;
async fn delete_metrics(&self, agent_name: &str) -> Result<(), MetricsPersistenceError>;
}
#[derive(Debug, thiserror::Error)]
pub enum MetricsPersistenceError {
#[error("storage error: {0}")]
Storage(String),
#[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("agent not found: {0}")]
NotFound(String),
}
pub struct InMemoryMetricsPersistence {
data: std::sync::RwLock<HashMap<String, AgentMetrics>>,
fleet: std::sync::RwLock<Option<AgentMetrics>>,
}
impl InMemoryMetricsPersistence {
pub fn new() -> Self {
Self {
data: std::sync::RwLock::new(HashMap::new()),
fleet: std::sync::RwLock::new(None),
}
}
}
impl Default for InMemoryMetricsPersistence {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl MetricsPersistence for InMemoryMetricsPersistence {
async fn save_metrics(
&self,
agent_name: &str,
metrics: &AgentMetrics,
) -> Result<(), MetricsPersistenceError> {
let mut data = self
.data
.write()
.map_err(|e| MetricsPersistenceError::Storage(format!("Lock poisoned: {}", e)))?;
data.insert(agent_name.to_string(), metrics.clone());
Ok(())
}
async fn load_metrics(
&self,
agent_name: &str,
) -> Result<Option<AgentMetrics>, MetricsPersistenceError> {
let data = self
.data
.read()
.map_err(|e| MetricsPersistenceError::Storage(format!("Lock poisoned: {}", e)))?;
Ok(data.get(agent_name).cloned())
}
async fn save_fleet_metrics(
&self,
metrics: &AgentMetrics,
) -> Result<(), MetricsPersistenceError> {
let mut fleet = self
.fleet
.write()
.map_err(|e| MetricsPersistenceError::Storage(format!("Lock poisoned: {}", e)))?;
*fleet = Some(metrics.clone());
Ok(())
}
async fn load_fleet_metrics(&self) -> Result<Option<AgentMetrics>, MetricsPersistenceError> {
let fleet = self
.fleet
.read()
.map_err(|e| MetricsPersistenceError::Storage(format!("Lock poisoned: {}", e)))?;
Ok(fleet.clone())
}
async fn list_agents(&self) -> Result<Vec<String>, MetricsPersistenceError> {
let data = self
.data
.read()
.map_err(|e| MetricsPersistenceError::Storage(format!("Lock poisoned: {}", e)))?;
Ok(data.keys().cloned().collect())
}
async fn delete_metrics(&self, agent_name: &str) -> Result<(), MetricsPersistenceError> {
let mut data = self
.data
.write()
.map_err(|e| MetricsPersistenceError::Storage(format!("Lock poisoned: {}", e)))?;
data.remove(agent_name);
Ok(())
}
}
pub struct FileMetricsPersistence {
config: MetricsPersistenceConfig,
}
impl FileMetricsPersistence {
pub fn new(config: MetricsPersistenceConfig) -> Self {
Self { config }
}
fn agent_key(&self, agent_name: &str) -> String {
format!("{}/{}", self.config.key_prefix, agent_name)
}
fn fleet_key(&self) -> String {
format!("{}/fleet", self.config.key_prefix)
}
}
#[async_trait]
impl MetricsPersistence for FileMetricsPersistence {
async fn save_metrics(
&self,
agent_name: &str,
_metrics: &AgentMetrics,
) -> Result<(), MetricsPersistenceError> {
tracing::debug!(
"Saving metrics for agent {} (key: {})",
agent_name,
self.agent_key(agent_name)
);
Ok(())
}
async fn load_metrics(
&self,
agent_name: &str,
) -> Result<Option<AgentMetrics>, MetricsPersistenceError> {
tracing::debug!(
"Loading metrics for agent {} (key: {})",
agent_name,
self.agent_key(agent_name)
);
Ok(None)
}
async fn save_fleet_metrics(
&self,
_metrics: &AgentMetrics,
) -> Result<(), MetricsPersistenceError> {
tracing::debug!("Saving fleet metrics (key: {})", self.fleet_key());
Ok(())
}
async fn load_fleet_metrics(&self) -> Result<Option<AgentMetrics>, MetricsPersistenceError> {
tracing::debug!("Loading fleet metrics (key: {})", self.fleet_key());
Ok(None)
}
async fn list_agents(&self) -> Result<Vec<String>, MetricsPersistenceError> {
Ok(vec![])
}
async fn delete_metrics(&self, agent_name: &str) -> Result<(), MetricsPersistenceError> {
tracing::debug!(
"Deleting metrics for agent {} (key: {})",
agent_name,
self.agent_key(agent_name)
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_in_memory_save_and_load() {
let persistence = InMemoryMetricsPersistence::new();
let mut metrics = AgentMetrics::new("test-agent".to_string());
metrics.total_executions = 10;
metrics.total_tokens = 5000;
persistence
.save_metrics("test-agent", &metrics)
.await
.unwrap();
let loaded = persistence.load_metrics("test-agent").await.unwrap();
assert!(loaded.is_some());
let loaded = loaded.unwrap();
assert_eq!(loaded.agent_name, "test-agent");
assert_eq!(loaded.total_executions, 10);
assert_eq!(loaded.total_tokens, 5000);
}
#[tokio::test]
async fn test_in_memory_load_not_found() {
let persistence = InMemoryMetricsPersistence::new();
let loaded = persistence.load_metrics("non-existent").await.unwrap();
assert!(loaded.is_none());
}
#[tokio::test]
async fn test_in_memory_fleet_metrics() {
let persistence = InMemoryMetricsPersistence::new();
let mut fleet = AgentMetrics::new("fleet".to_string());
fleet.total_executions = 100;
fleet.total_cost_usd = 5.0;
persistence.save_fleet_metrics(&fleet).await.unwrap();
let loaded = persistence.load_fleet_metrics().await.unwrap();
assert!(loaded.is_some());
let loaded = loaded.unwrap();
assert_eq!(loaded.agent_name, "fleet");
assert_eq!(loaded.total_executions, 100);
assert!((loaded.total_cost_usd - 5.0).abs() < 0.001);
}
#[tokio::test]
async fn test_in_memory_list_agents() {
let persistence = InMemoryMetricsPersistence::new();
let metrics1 = AgentMetrics::new("agent-1".to_string());
let metrics2 = AgentMetrics::new("agent-2".to_string());
persistence
.save_metrics("agent-1", &metrics1)
.await
.unwrap();
persistence
.save_metrics("agent-2", &metrics2)
.await
.unwrap();
let agents = persistence.list_agents().await.unwrap();
assert_eq!(agents.len(), 2);
assert!(agents.contains(&"agent-1".to_string()));
assert!(agents.contains(&"agent-2".to_string()));
}
#[tokio::test]
async fn test_in_memory_delete() {
let persistence = InMemoryMetricsPersistence::new();
let metrics = AgentMetrics::new("test-agent".to_string());
persistence
.save_metrics("test-agent", &metrics)
.await
.unwrap();
persistence.delete_metrics("test-agent").await.unwrap();
let loaded = persistence.load_metrics("test-agent").await.unwrap();
assert!(loaded.is_none());
}
#[test]
fn test_persisted_agent_metrics_new() {
let mut agents = HashMap::new();
agents.insert(
"agent-1".to_string(),
AgentMetrics::new("agent-1".to_string()),
);
let fleet = AgentMetrics::new("fleet".to_string());
let persisted = PersistedAgentMetrics::new(agents, fleet);
assert_eq!(persisted.version, 1);
assert_eq!(persisted.agents.len(), 1);
assert_eq!(persisted.fleet.agent_name, "fleet");
}
}