use super::{AgentRecord, AgentRegistry, PresenceState, RoutingStrategy, select_best_agent};
use async_trait::async_trait;
use std::collections::HashMap;
use std::time::Instant;
use tokio::sync::RwLock;
use tracing::info;
pub struct MemoryRegistry {
agents: RwLock<HashMap<String, AgentRecord>>,
rr_counter: RwLock<u64>,
event_handlers: RwLock<Vec<super::AgentEventHandler>>,
}
impl MemoryRegistry {
pub fn new() -> Self {
Self {
agents: RwLock::new(HashMap::new()),
rr_counter: RwLock::new(0),
event_handlers: RwLock::new(Vec::new()),
}
}
async fn notify_handlers(&self, record: &AgentRecord) {
let handlers = self.event_handlers.read().await;
for handler in handlers.iter() {
handler(record);
}
}
}
impl Default for MemoryRegistry {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl AgentRegistry for MemoryRegistry {
async fn register(
&self,
agent_id: String,
display_name: String,
uri: String,
skills: Vec<String>,
max_concurrency: u32,
) -> anyhow::Result<()> {
let mut agents = self.agents.write().await;
if agents.contains_key(&agent_id) {
anyhow::bail!("Agent {} already registered", agent_id);
}
let record = AgentRecord {
agent_id: agent_id.clone(),
display_name,
uri,
skills,
max_concurrency,
current_calls: 0,
presence: PresenceState::Available,
last_state_change: Instant::now(),
total_calls_handled: 0,
total_talk_time_secs: 0,
last_call_end: None,
custom_data: HashMap::new(),
};
agents.insert(agent_id.clone(), record.clone());
info!(agent_id = %agent_id, "Agent registered in memory");
drop(agents);
self.notify_handlers(&record).await;
Ok(())
}
async fn unregister(&self, agent_id: &str) -> anyhow::Result<()> {
let mut agents = self.agents.write().await;
if agents.remove(agent_id).is_some() {
info!(agent_id = %agent_id, "Agent unregistered from memory");
Ok(())
} else {
anyhow::bail!("Agent {} not found", agent_id)
}
}
async fn get_agent(&self, agent_id: &str) -> Option<AgentRecord> {
let agents = self.agents.read().await;
agents.get(agent_id).cloned()
}
async fn list_agents(&self) -> Vec<AgentRecord> {
let agents = self.agents.read().await;
agents.values().cloned().collect()
}
async fn update_presence(
&self,
agent_id: &str,
new_state: PresenceState,
) -> anyhow::Result<()> {
let mut agents = self.agents.write().await;
let agent = agents
.get_mut(agent_id)
.ok_or_else(|| anyhow::anyhow!("Agent {} not found", agent_id))?;
let old_state = agent.presence.clone();
agent.presence = new_state;
agent.last_state_change = Instant::now();
info!(
agent_id = %agent_id,
old = %old_state.as_str(),
new = %agent.presence.as_str(),
"Presence updated in memory"
);
let record = agent.clone();
drop(agents);
self.notify_handlers(&record).await;
Ok(())
}
async fn start_call(&self, agent_id: &str) -> anyhow::Result<()> {
let mut agents = self.agents.write().await;
let agent = agents
.get_mut(agent_id)
.ok_or_else(|| anyhow::anyhow!("Agent {} not found", agent_id))?;
agent.current_calls += 1;
agent.presence = PresenceState::Busy { call_id: None };
agent.last_state_change = Instant::now();
let record = agent.clone();
drop(agents);
self.notify_handlers(&record).await;
Ok(())
}
async fn end_call(&self, agent_id: &str, talk_time_secs: u64) -> anyhow::Result<()> {
let mut agents = self.agents.write().await;
let agent = agents
.get_mut(agent_id)
.ok_or_else(|| anyhow::anyhow!("Agent {} not found", agent_id))?;
if agent.current_calls > 0 {
agent.current_calls -= 1;
}
agent.total_calls_handled += 1;
agent.total_talk_time_secs += talk_time_secs;
agent.last_call_end = Some(Instant::now());
if agent.current_calls == 0 {
agent.presence = PresenceState::Wrapup { call_id: None };
}
let record = agent.clone();
drop(agents);
self.notify_handlers(&record).await;
Ok(())
}
async fn find_available_agents(&self, required_skills: &[String]) -> Vec<AgentRecord> {
let agents = self.agents.read().await;
agents
.values()
.filter(|a| a.has_capacity() && a.has_skills(required_skills))
.cloned()
.collect()
}
async fn select_agent(
&self,
required_skills: &[String],
strategy: RoutingStrategy,
) -> Option<AgentRecord> {
let candidates = self.find_available_agents(required_skills).await;
let mut rr_counter = self.rr_counter.write().await;
select_best_agent(candidates, strategy, &mut rr_counter)
}
async fn resolve_target(&self, _target_uri: &str) -> Vec<String> {
vec![]
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_memory_registry_lifecycle() {
let registry = MemoryRegistry::new();
registry
.register(
"agent-001".to_string(),
"Alice".to_string(),
"sip:1001@localhost".to_string(),
vec!["support".to_string()],
2,
)
.await
.unwrap();
let agent = registry.get_agent("agent-001").await.unwrap();
assert_eq!(agent.display_name, "Alice");
assert!(agent.has_capacity());
registry
.update_presence("agent-001", PresenceState::Busy { call_id: None })
.await
.unwrap();
let agent = registry.get_agent("agent-001").await.unwrap();
assert!(!agent.has_capacity());
registry.unregister("agent-001").await.unwrap();
assert!(registry.get_agent("agent-001").await.is_none());
}
#[tokio::test]
async fn test_memory_registry_routing() {
let registry = MemoryRegistry::new();
for i in 1..=3 {
registry
.register(
format!("agent-00{}", i),
format!("Agent {}", i),
format!("sip:100{}@localhost", i),
vec!["support".to_string()],
1,
)
.await
.unwrap();
}
let agent = registry
.select_agent(&["support".to_string()], RoutingStrategy::LongestIdle)
.await;
assert!(agent.is_some());
let a1 = registry
.select_agent(&["support".to_string()], RoutingStrategy::RoundRobin)
.await;
let a2 = registry
.select_agent(&["support".to_string()], RoutingStrategy::RoundRobin)
.await;
assert_ne!(a1.unwrap().agent_id, a2.unwrap().agent_id);
}
}