use std::sync::Arc;
use anyhow::Result;
use futures::future::join_all;
use serde_json::Value;
use tokio::sync::oneshot;
use tracing::debug;
use super::registry::{AgentHandle, AgentMessage, AgentRegistry, AgentReply};
#[derive(Debug, Clone)]
pub enum CollabMode {
Sequential(Vec<String>),
Parallel(Vec<String>),
Orchestrated,
}
pub async fn run_sequential(
agent_ids: &[String],
initial_input: &str,
session_key: &str,
channel: &str,
peer_id: &str,
registry: &AgentRegistry,
) -> Result<AgentReply> {
if agent_ids.is_empty() {
return Ok(AgentReply {
text: String::new(),
is_empty: true,
tool_calls: None,
images: vec![],
files: vec![],
pending_analysis: None,
needs_outer_done_emit: false,
});
}
let mut current_text = initial_input.to_owned();
for id in agent_ids {
let handle = registry.get(id)?;
let reply = invoke_agent(&handle, ¤t_text, session_key, channel, peer_id).await?;
debug!(agent = %id, chars = reply.text.len(), "sequential step complete");
current_text = reply.text;
}
Ok(AgentReply {
is_empty: current_text.is_empty(),
text: current_text,
tool_calls: None,
images: vec![],
files: vec![],
pending_analysis: None,
needs_outer_done_emit: false,
})
}
pub async fn run_parallel(
agent_ids: &[String],
input: &str,
session_key: &str,
channel: &str,
peer_id: &str,
registry: &AgentRegistry,
) -> Result<Vec<AgentReply>> {
if agent_ids.is_empty() {
return Ok(Vec::new());
}
let futures: Vec<_> = agent_ids
.iter()
.map(|id| {
let handle = registry.get(id);
let input = input.to_owned();
let sk = session_key.to_owned();
let ch = channel.to_owned();
let pid = peer_id.to_owned();
async move {
let handle = handle?;
invoke_agent(&handle, &input, &sk, &ch, &pid).await
}
})
.collect();
join_all(futures).await.into_iter().collect()
}
pub async fn dispatch_a2a(
agent_id: &str,
args: Value,
session_key: &str,
channel: &str,
peer_id: &str,
registry: &AgentRegistry,
) -> Result<Value> {
let message = args
.get("message")
.and_then(Value::as_str)
.unwrap_or_else(|| args.to_string().as_str().to_owned().leak());
let child_session_key = format!("{session_key}:a2a:{agent_id}");
let handle = registry.get(agent_id)?;
debug!(
orchestrator_session = %session_key,
sub_agent = %agent_id,
"A2A dispatch"
);
let reply = invoke_agent(&handle, message, &child_session_key, channel, peer_id).await?;
Ok(Value::String(reply.text))
}
pub fn build_a2a_tool_defs(
registry: &AgentRegistry,
orchestrator_id: &str,
) -> Vec<crate::provider::ToolDef> {
registry
.all()
.into_iter()
.filter(|h| h.id != orchestrator_id) .map(|h| {
let desc = format!(
"Invoke sub-agent '{}'. Provide a 'message' parameter with the task.",
h.id
);
crate::provider::ToolDef {
name: format!("agent_{}", h.id),
description: desc,
parameters: serde_json::json!({
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "The task or question for the sub-agent"
}
},
"required": ["message"]
}),
}
})
.collect()
}
pub(crate) async fn invoke_agent(
handle: &Arc<AgentHandle>,
text: &str,
session_key: &str,
channel: &str,
peer_id: &str,
) -> Result<AgentReply> {
let (reply_tx, reply_rx) = oneshot::channel();
handle
.tx
.send(AgentMessage {
session_key: session_key.to_owned(),
text: text.to_owned(),
channel: channel.to_owned(),
peer_id: peer_id.to_owned(),
chat_id: String::new(),
reply_tx,
extra_tools: vec![],
images: vec![],
files: vec![],
account: None,
})
.await
.map_err(|_| anyhow::anyhow!("agent `{}` mailbox closed", handle.id))?;
reply_rx
.await
.map_err(|_| anyhow::anyhow!("agent `{}` dropped reply sender", handle.id))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
agent::registry::AgentRegistry,
config::{
runtime::{
AgentsRuntime, ChannelRuntime, ExtRuntime, GatewayRuntime, ModelRuntime,
OpsRuntime, RuntimeConfig,
},
schema::{AgentEntry, BindMode, GatewayMode, ReloadMode, SessionConfig},
},
};
fn make_registry_with_echo(ids: &[&str]) -> AgentRegistry {
let agents: Vec<AgentEntry> = ids
.iter()
.enumerate()
.map(|(i, id)| AgentEntry {
id: id.to_string(),
name: None,
default: if i == 0 { Some(true) } else { None },
workspace: None,
model: None,
flash_model: None,
lane: None,
lane_concurrency: None,
group_chat: None,
channels: None,
commands: None,
allowed_commands: None,
opencode: None,
claudecode: None,
codex: None,
agent_dir: None,
system: None,
temperature: None,
})
.collect();
let cfg = RuntimeConfig {
gateway: GatewayRuntime {
port: 18888,
mode: GatewayMode::Local,
bind: BindMode::Loopback,
bind_address: None,
reload: ReloadMode::Hybrid,
auth_token: None,
allow_tailscale: false,
channel_health_check_minutes: 5,
channel_stale_event_threshold_minutes: 30,
channel_max_restarts_per_hour: 10,
auth_token_configured: false,
auth_token_is_plaintext: false,
user_agent: None,
language: None,
},
agents: AgentsRuntime {
defaults: Default::default(),
list: agents,
bindings: vec![],
external: vec![],
},
channel: ChannelRuntime {
channels: Default::default(),
session: SessionConfig {
dm_scope: None,
thread_bindings: None,
reset: None,
identity_links: None,
maintenance: None,
},
},
model: ModelRuntime {
models: None,
auth: None,
},
ext: ExtRuntime {
tools: None,
skills: None,
plugins: None,
evolution: None,
},
ops: OpsRuntime {
cron: None,
hooks: None,
sandbox: None,
logging: None,
secrets: None,
},
raw: Default::default(),
};
AgentRegistry::from_config(&cfg)
}
#[test]
fn collab_mode_variants_exist() {
let _s = CollabMode::Sequential(vec!["a".into(), "b".into()]);
let _p = CollabMode::Parallel(vec!["a".into()]);
let _o = CollabMode::Orchestrated;
}
#[test]
fn build_a2a_tools_excludes_self() {
let reg = make_registry_with_echo(&["main", "researcher", "writer"]);
let tools = build_a2a_tool_defs(®, "main");
assert_eq!(tools.len(), 2);
let names: Vec<_> = tools.iter().map(|t| t.name.as_str()).collect();
assert!(names.contains(&"agent_researcher"));
assert!(names.contains(&"agent_writer"));
assert!(!names.contains(&"agent_main"));
}
#[test]
fn a2a_tool_schema_has_message_param() {
let reg = make_registry_with_echo(&["main", "sub"]);
let tools = build_a2a_tool_defs(®, "main");
let props = &tools[0].parameters["properties"];
assert!(props.get("message").is_some());
}
}