use super::{AgentRecord, AgentRegistry, PresenceState, RoutingStrategy, select_best_agent};
use async_trait::async_trait;
use sea_orm::DatabaseConnection;
use std::collections::HashMap;
use std::time::Instant;
use tokio::sync::RwLock;
use tracing::info;
pub struct DbRegistry {
cache: RwLock<HashMap<String, AgentRecord>>,
rr_counter: RwLock<u64>,
event_handlers: RwLock<Vec<super::AgentEventHandler>>,
cache_ttl_secs: u64,
}
impl DbRegistry {
pub fn new(_db: DatabaseConnection) -> Self {
Self {
cache: RwLock::new(HashMap::new()),
rr_counter: RwLock::new(0),
event_handlers: RwLock::new(Vec::new()),
cache_ttl_secs: 30, }
}
pub fn with_cache_ttl(mut self, ttl_secs: u64) -> Self {
self.cache_ttl_secs = ttl_secs;
self
}
async fn notify_handlers(&self, record: &AgentRecord) {
let handlers = self.event_handlers.read().await;
for handler in handlers.iter() {
handler(record);
}
}
}
#[async_trait]
impl AgentRegistry for DbRegistry {
async fn register(
&self,
agent_id: String,
display_name: String,
uri: String,
skills: Vec<String>,
max_concurrency: u32,
) -> anyhow::Result<()> {
let record = AgentRecord {
agent_id: agent_id.clone(),
display_name,
uri,
skills,
max_concurrency,
current_calls: 0,
presence: PresenceState::Available,
last_state_change: Instant::now(),
total_calls_handled: 0,
total_talk_time_secs: 0,
last_call_end: None,
custom_data: HashMap::new(),
};
let mut cache = self.cache.write().await;
cache.insert(agent_id.clone(), record.clone());
drop(cache);
info!(agent_id = %agent_id, "Agent registered in database");
self.notify_handlers(&record).await;
Ok(())
}
async fn unregister(&self, agent_id: &str) -> anyhow::Result<()> {
let mut cache = self.cache.write().await;
if cache.remove(agent_id).is_some() {
info!(agent_id = %agent_id, "Agent unregistered from database");
Ok(())
} else {
anyhow::bail!("Agent {} not found", agent_id)
}
}
async fn get_agent(&self, agent_id: &str) -> Option<AgentRecord> {
let cache = self.cache.read().await;
if let Some(record) = cache.get(agent_id) {
return Some(record.clone());
}
drop(cache);
None
}
async fn list_agents(&self) -> Vec<AgentRecord> {
let cache = self.cache.read().await;
cache.values().cloned().collect()
}
async fn update_presence(
&self,
agent_id: &str,
new_state: PresenceState,
) -> anyhow::Result<()> {
let mut cache = self.cache.write().await;
let agent = cache
.get_mut(agent_id)
.ok_or_else(|| anyhow::anyhow!("Agent {} not found", agent_id))?;
let old_state = agent.presence.clone();
agent.presence = new_state;
agent.last_state_change = Instant::now();
info!(
agent_id = %agent_id,
old = %old_state.as_str(),
new = %agent.presence.as_str(),
"Presence updated in database"
);
let record = agent.clone();
drop(cache);
self.notify_handlers(&record).await;
Ok(())
}
async fn start_call(&self, agent_id: &str) -> anyhow::Result<()> {
let mut cache = self.cache.write().await;
let agent = cache
.get_mut(agent_id)
.ok_or_else(|| anyhow::anyhow!("Agent {} not found", agent_id))?;
agent.current_calls += 1;
agent.presence = PresenceState::Busy { call_id: None };
agent.last_state_change = Instant::now();
let record = agent.clone();
drop(cache);
self.notify_handlers(&record).await;
Ok(())
}
async fn end_call(&self, agent_id: &str, talk_time_secs: u64) -> anyhow::Result<()> {
let mut cache = self.cache.write().await;
let agent = cache
.get_mut(agent_id)
.ok_or_else(|| anyhow::anyhow!("Agent {} not found", agent_id))?;
if agent.current_calls > 0 {
agent.current_calls -= 1;
}
agent.total_calls_handled += 1;
agent.total_talk_time_secs += talk_time_secs;
agent.last_call_end = Some(Instant::now());
if agent.current_calls == 0 {
agent.presence = PresenceState::Wrapup { call_id: None };
}
let record = agent.clone();
drop(cache);
self.notify_handlers(&record).await;
Ok(())
}
async fn find_available_agents(&self, required_skills: &[String]) -> Vec<AgentRecord> {
let cache = self.cache.read().await;
cache
.values()
.filter(|a| a.has_capacity() && a.has_skills(required_skills))
.cloned()
.collect()
}
async fn select_agent(
&self,
required_skills: &[String],
strategy: RoutingStrategy,
) -> Option<AgentRecord> {
let candidates = self.find_available_agents(required_skills).await;
let mut rr_counter = self.rr_counter.write().await;
select_best_agent(candidates, strategy, &mut rr_counter)
}
async fn resolve_target(&self, _target_uri: &str) -> Vec<String> {
vec![]
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_agent_registry_basic() {
let db = sea_orm::Database::connect("sqlite::memory:").await.unwrap();
let registry = DbRegistry::new(db);
registry
.register(
"agent-001".to_string(),
"Alice".to_string(),
"sip:1001@localhost".to_string(),
vec!["support".to_string()],
2,
)
.await
.unwrap();
let agent = registry.get_agent("agent-001").await.unwrap();
assert_eq!(agent.display_name, "Alice");
registry
.update_presence("agent-001", PresenceState::Busy { call_id: None })
.await
.unwrap();
let agent = registry.get_agent("agent-001").await.unwrap();
assert!(matches!(
agent.presence,
PresenceState::Busy { call_id: None }
));
}
}