use std::{collections::HashMap, sync::Arc};
use anyhow::Result;
use tokio::sync::{RwLock, mpsc};
use crate::config::{runtime::RuntimeConfig, schema::AgentEntry};
const DEFAULT_MAX_CONCURRENT: u32 = 4;
#[derive(Clone)]
pub struct AgentHandle {
pub id: String,
pub config: AgentEntry,
pub tx: mpsc::Sender<AgentMessage>,
pub concurrency: Arc<tokio::sync::Semaphore>,
pub live_status: Arc<RwLock<crate::agent::runtime::LiveStatus>>,
pub providers: Arc<crate::provider::registry::ProviderRegistry>,
}
#[derive(Debug, Clone)]
pub struct ImageAttachment {
pub data: String,
pub mime_type: String,
}
#[derive(Debug)]
pub struct FileAttachment {
pub filename: String,
pub data: Vec<u8>,
pub mime_type: String,
}
#[derive(Debug)]
pub struct AgentMessage {
pub session_key: String,
pub text: String,
pub channel: String,
pub peer_id: String,
pub chat_id: String,
pub reply_tx: tokio::sync::oneshot::Sender<AgentReply>,
pub extra_tools: Vec<crate::provider::ToolDef>,
pub images: Vec<ImageAttachment>,
pub files: Vec<FileAttachment>,
}
#[derive(Debug)]
pub struct PendingAnalysis {
pub text: String,
pub session_key: String,
pub channel: String,
pub peer_id: String,
}
#[derive(Debug)]
pub struct AgentReply {
pub text: String,
pub is_empty: bool,
pub tool_calls: Option<Vec<serde_json::Value>>,
pub images: Vec<String>,
pub pending_analysis: Option<PendingAnalysis>,
}
struct RegistryInner {
agents: HashMap<String, Arc<AgentHandle>>,
default_id: Option<String>,
}
pub struct AgentRegistry {
inner: std::sync::RwLock<RegistryInner>,
}
impl Default for AgentRegistry {
fn default() -> Self {
Self {
inner: std::sync::RwLock::new(RegistryInner {
agents: HashMap::new(),
default_id: None,
}),
}
}
}
impl AgentRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn from_config(cfg: &RuntimeConfig) -> Self {
let providers = Arc::new(crate::provider::registry::ProviderRegistry::new());
let (registry, _) = Self::from_config_with_receivers(cfg, providers);
registry
}
pub fn from_config_with_receivers(
cfg: &RuntimeConfig,
providers: Arc<crate::provider::registry::ProviderRegistry>,
) -> (Self, HashMap<String, mpsc::Receiver<AgentMessage>>) {
let registry = Self::new();
let mut receivers = HashMap::new();
let agent_list = if cfg.agents.list.is_empty() {
let model = cfg.agents.defaults.model.clone();
let workspace = cfg.agents.defaults.workspace.clone();
vec![crate::config::schema::AgentEntry {
id: "main".to_owned(),
default: Some(true),
name: Some("Main Agent".to_owned()),
workspace,
model,
lane: None,
lane_concurrency: None,
group_chat: None,
channels: None,
commands: None,
allowed_commands: None,
opencode: None,
agent_dir: None,
system: None,
}]
} else {
cfg.agents.list.clone()
};
let max_concurrent = cfg
.agents
.defaults
.max_concurrent
.unwrap_or(DEFAULT_MAX_CONCURRENT) as usize;
{
let mut inner = registry.inner.write().unwrap();
for entry in &agent_list {
let (tx, rx) = mpsc::channel::<AgentMessage>(32);
let permits = entry
.lane_concurrency
.map(|n| n as usize)
.unwrap_or(max_concurrent);
let handle = Arc::new(AgentHandle {
id: entry.id.clone(),
config: entry.clone(),
tx,
concurrency: Arc::new(tokio::sync::Semaphore::new(permits)),
live_status: Arc::new(
RwLock::new(crate::agent::runtime::LiveStatus::default()),
),
providers: Arc::clone(&providers),
});
inner.agents.insert(entry.id.clone(), handle);
receivers.insert(entry.id.clone(), rx);
if entry.default == Some(true) && inner.default_id.is_none() {
inner.default_id = Some(entry.id.clone());
}
}
if inner.default_id.is_none()
&& let Some(id) = inner.agents.keys().next().cloned()
{
inner.default_id = Some(id);
}
}
(registry, receivers)
}
pub fn insert_handle(&self, handle: Arc<AgentHandle>) {
let mut inner = self.inner.write().unwrap();
inner.agents.insert(handle.id.clone(), handle);
}
pub fn get(&self, id: &str) -> Result<Arc<AgentHandle>> {
self.inner
.read()
.unwrap()
.agents
.get(id)
.cloned()
.ok_or_else(|| anyhow::anyhow!("agent not found: `{id}`"))
}
pub fn default_agent(&self) -> Result<Arc<AgentHandle>> {
let inner = self.inner.read().unwrap();
let id = inner
.default_id
.as_deref()
.ok_or_else(|| anyhow::anyhow!("no default agent configured"))?
.to_owned();
drop(inner);
self.get(&id)
}
pub fn route(&self, channel: &str) -> Result<Arc<AgentHandle>> {
self.route_account(channel, None)
}
pub fn route_account(&self, channel: &str, account: Option<&str>) -> Result<Arc<AgentHandle>> {
let inner = self.inner.read().unwrap();
let qualified = account.map(|a| format!("{channel}:{a}"));
let mut exact: Vec<Arc<AgentHandle>> = Vec::new();
let mut bare: Vec<Arc<AgentHandle>> = Vec::new();
for a in inner.agents.values() {
let Some(chs) = a.config.channels.as_ref() else {
continue;
};
if let Some(q) = &qualified {
if chs.iter().any(|c| c == q) {
exact.push(Arc::clone(a));
continue;
}
}
if chs.iter().any(|c| c == channel) {
bare.push(Arc::clone(a));
}
}
let bound = if !exact.is_empty() { exact } else { bare };
match bound.len() {
0 => {
let id = inner
.default_id
.as_deref()
.ok_or_else(|| anyhow::anyhow!("no default agent configured"))?
.to_owned();
drop(inner);
self.get(&id)
}
1 => Ok(Arc::clone(&bound[0])),
_ => {
let winner = bound
.into_iter()
.min_by_key(|a| a.id.clone())
.expect("non-empty");
Ok(winner)
}
}
}
pub fn len(&self) -> usize {
self.inner.read().unwrap().agents.len()
}
pub fn is_empty(&self) -> bool {
self.inner.read().unwrap().agents.is_empty()
}
pub fn all(&self) -> Vec<Arc<AgentHandle>> {
self.inner
.read()
.unwrap()
.agents
.values()
.cloned()
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{
runtime::{
AgentsRuntime, ChannelRuntime, ExtRuntime, GatewayRuntime, ModelRuntime, OpsRuntime,
RuntimeConfig,
},
schema::{AgentEntry, BindMode, GatewayMode, ReloadMode, SessionConfig},
};
fn make_runtime(agents: Vec<AgentEntry>) -> RuntimeConfig {
RuntimeConfig {
gateway: GatewayRuntime {
port: 18888,
mode: GatewayMode::Local,
bind: BindMode::Loopback,
reload: ReloadMode::Hybrid,
auth_token: None,
allow_tailscale: false,
channel_health_check_minutes: 5,
channel_stale_event_threshold_minutes: 30,
channel_max_restarts_per_hour: 10,
auth_token_configured: false,
auth_token_is_plaintext: false,
bind_address: None,
},
agents: AgentsRuntime {
defaults: Default::default(),
list: agents,
bindings: vec![],
external: vec![],
},
channel: ChannelRuntime {
channels: Default::default(),
session: SessionConfig {
dm_scope: None,
thread_bindings: None,
reset: None,
identity_links: None,
maintenance: None,
},
},
model: ModelRuntime {
models: None,
auth: None,
},
ext: ExtRuntime {
tools: None,
skills: None,
plugins: None,
},
ops: OpsRuntime {
cron: None,
hooks: None,
sandbox: None,
logging: None,
secrets: None,
},
raw: Default::default(),
}
}
fn entry(id: &str, default: bool, channels: Option<Vec<&str>>) -> AgentEntry {
AgentEntry {
id: id.to_owned(),
default: if default { Some(true) } else { None },
workspace: None,
model: None,
lane: None,
lane_concurrency: None,
group_chat: None,
channels: channels.map(|v| v.into_iter().map(str::to_owned).collect()),
name: None,
commands: None,
allowed_commands: None,
opencode: None,
agent_dir: None,
system: None,
}
}
#[test]
fn routes_to_default_when_no_binding() {
let cfg = make_runtime(vec![
entry("main", true, None),
entry("telegram_bot", false, Some(vec!["telegram"])),
]);
let reg = AgentRegistry::from_config(&cfg);
let agent = reg.route("slack").expect("route");
assert_eq!(agent.id, "main");
}
#[test]
fn routes_to_bound_agent() {
let cfg = make_runtime(vec![
entry("main", true, None),
entry("tgbot", false, Some(vec!["telegram"])),
]);
let reg = AgentRegistry::from_config(&cfg);
let agent = reg.route("telegram").expect("route");
assert_eq!(agent.id, "tgbot");
}
#[test]
fn get_by_id() {
let cfg = make_runtime(vec![entry("alpha", true, None)]);
let reg = AgentRegistry::from_config(&cfg);
assert!(reg.get("alpha").is_ok());
assert!(reg.get("nonexistent").is_err());
}
}