use crate::a2a::client::A2AClient;
use crate::a2a::lan;
use crate::a2a::mdns::{self, DiscoveredPeer};
use crate::a2a::server::A2AServer;
use crate::a2a::types::{Message, MessageRole, MessageSendConfiguration, MessageSendParams, Part};
use crate::bus::AgentBus;
use crate::cli::SpawnArgs;
use anyhow::{Context, Result};
use axum::Router;
use reqwest::Url;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
#[derive(Debug, Clone)]
pub struct SpawnOptions {
pub name: Option<String>,
pub hostname: String,
pub port: u16,
pub public_url: Option<String>,
pub description: Option<String>,
pub peer: Vec<String>,
pub discovery_interval_secs: u64,
pub auto_introduce: bool,
pub mdns: bool,
}
impl SpawnOptions {
pub fn auto() -> Self {
Self {
name: None,
hostname: "0.0.0.0".to_string(),
port: 0,
public_url: None,
description: None,
peer: Vec::new(),
discovery_interval_secs: 15,
auto_introduce: true,
mdns: true,
}
}
pub fn from_spawn_args(args: &SpawnArgs) -> Self {
Self {
name: args.name.clone(),
hostname: args.hostname.clone(),
port: args.port,
public_url: args.public_url.clone(),
description: args.description.clone(),
peer: args.peer.clone(),
discovery_interval_secs: args.discovery_interval_secs,
auto_introduce: args.auto_introduce,
mdns: args.mdns,
}
}
}
pub fn auto_agent_name() -> String {
let host_full = gethostname::gethostname()
.into_string()
.unwrap_or_else(|_| "host".to_string());
let host_short = host_full
.split('.')
.next()
.unwrap_or(&host_full)
.to_string();
let repo = std::env::current_dir()
.ok()
.as_ref()
.and_then(|p| p.file_name())
.and_then(|n| n.to_str())
.unwrap_or("repo")
.to_string();
let pid = std::process::id();
let short_pid = format!("{:04x}", pid & 0xffff);
let raw = format!("{host_short}-{repo}-{short_pid}");
sanitize_name(&raw)
}
fn sanitize_name(input: &str) -> String {
input
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' {
c.to_ascii_lowercase()
} else {
'-'
}
})
.collect()
}
fn first_lan_ipv4() -> Option<String> {
let ifaces = if_addrs::get_if_addrs().ok()?;
ifaces
.into_iter()
.filter_map(|iface| {
let std::net::IpAddr::V4(v4) = iface.ip() else {
return None;
};
if v4.is_loopback() || v4.is_link_local() || v4.is_unspecified() {
return None;
}
Some(v4.to_string())
})
.next()
}
pub struct A2APeerHandle {
pub agent_name: String,
pub bind_addr: String,
pub public_url: String,
server_task: tokio::task::JoinHandle<()>,
discovery_task: tokio::task::JoinHandle<()>,
mdns_intake_task: Option<tokio::task::JoinHandle<()>>,
lan_tasks: Vec<tokio::task::JoinHandle<()>>,
_mdns_handle: Option<mdns::MdnsHandle>,
}
impl A2APeerHandle {
pub fn abort(self) {
drop(self);
}
}
impl Drop for A2APeerHandle {
fn drop(&mut self) {
self.server_task.abort();
self.discovery_task.abort();
if let Some(t) = self.mdns_intake_task.as_ref() {
t.abort();
}
for task in &self.lan_tasks {
task.abort();
}
}
}
struct A2APreparation {
listener: tokio::net::TcpListener,
router: Router,
discovery_task: tokio::task::JoinHandle<()>,
mdns_intake_task: Option<tokio::task::JoinHandle<()>>,
lan_tasks: Vec<tokio::task::JoinHandle<()>>,
mdns_handle: Option<mdns::MdnsHandle>,
agent_name: String,
bind_addr: String,
public_url: String,
}
async fn prepare_a2a(opts: SpawnOptions, bus: Arc<AgentBus>) -> Result<A2APreparation> {
let requested_bind_addr = format!("{}:{}", opts.hostname, opts.port);
let listener = tokio::net::TcpListener::bind(&requested_bind_addr)
.await
.with_context(|| format!("Failed to bind A2A peer on {requested_bind_addr}"))?;
let local_addr = listener
.local_addr()
.context("Failed to read local addr from listener")?;
let effective_port = local_addr.port();
let bind_addr = format!("{}:{}", opts.hostname, effective_port);
let agent_name = opts.name.clone().unwrap_or_else(auto_agent_name);
let public_host = match opts.hostname.as_str() {
"0.0.0.0" | "::" | "[::]" => first_lan_ipv4().unwrap_or_else(|| "127.0.0.1".to_string()),
other => other.to_string(),
};
let public_url = opts
.public_url
.clone()
.unwrap_or_else(|| format!("http://{public_host}:{effective_port}"));
let public_url = normalize_base_url(&public_url)?;
let mut card = A2AServer::default_card(&public_url);
card.name = agent_name.clone();
if let Some(description) = opts.description.clone() {
card.description = description;
}
bus.registry.register(card.clone());
let bus_handle = bus.handle(&agent_name);
let capabilities = card.skills.iter().map(|skill| skill.id.clone()).collect();
bus_handle.announce_ready(capabilities);
let peers = collect_peers(&opts.peer, &public_url);
if !peers.is_empty() {
tracing::info!(
agent = %agent_name,
peer_count = peers.len(),
"A2A peer started with explicit --peer seeds (additive to mDNS)"
);
}
let discovery_task = tokio::spawn(discovery_loop(
Arc::clone(&bus),
peers,
public_url.clone(),
agent_name.clone(),
opts.discovery_interval_secs.max(5),
opts.auto_introduce,
));
let mdns_bind_addrs: Vec<std::net::IpAddr> = match opts.hostname.as_str() {
"0.0.0.0" | "::" | "[::]" => vec![],
other => other.parse::<std::net::IpAddr>().ok().into_iter().collect(),
};
let mut lan_tasks = Vec::new();
let (mdns_handle, mdns_intake_task) = if opts.mdns {
let (peer_tx, peer_rx) = mpsc::channel::<DiscoveredPeer>(64);
match lan::announce_and_listen(agent_name.clone(), public_url.clone(), peer_tx.clone())
.await
{
Ok(tasks) => lan_tasks = tasks,
Err(error) => tracing::warn!(%error, "A2A LAN broadcast discovery unavailable"),
}
match mdns::announce_and_browse(&agent_name, effective_port, mdns_bind_addrs, peer_tx) {
Ok(handle) => {
let intake = tokio::spawn(mdns_intake_loop(
Arc::clone(&bus),
peer_rx,
public_url.clone(),
agent_name.clone(),
opts.auto_introduce,
));
(Some(handle), Some(intake))
}
Err(e) => {
tracing::warn!(
agent = %agent_name,
error = %e,
"mDNS unavailable; falling back to explicit --peer seeds only"
);
(None, None)
}
}
} else {
(None, None)
};
let router: Router = A2AServer::with_bus(card, Arc::clone(&bus)).router();
Ok(A2APreparation {
listener,
router,
discovery_task,
mdns_intake_task,
lan_tasks,
mdns_handle,
agent_name,
bind_addr,
public_url,
})
}
const MDNS_PEER_PROBE_TIMEOUT: Duration = Duration::from_secs(5);
async fn mdns_intake_loop(
bus: Arc<AgentBus>,
mut peer_rx: mpsc::Receiver<DiscoveredPeer>,
self_url: String,
agent_name: String,
auto_introduce: bool,
) {
let self_url = Arc::new(self_url.trim_end_matches('/').to_string());
let known: Arc<Mutex<HashSet<String>>> = Arc::new(Mutex::new(HashSet::new()));
while let Some(peer) = peer_rx.recv().await {
let already_known_by_instance = {
let k = known.lock().await;
k.contains(&peer.instance_name)
};
if already_known_by_instance {
continue;
}
let bus = Arc::clone(&bus);
let known = Arc::clone(&known);
let self_url = Arc::clone(&self_url);
let agent_name = agent_name.clone();
tokio::spawn(async move {
handle_mdns_peer(bus, known, self_url, agent_name, peer, auto_introduce).await;
});
}
}
async fn handle_mdns_peer(
bus: Arc<AgentBus>,
known: Arc<Mutex<HashSet<String>>>,
self_url: Arc<String>,
agent_name: String,
peer: DiscoveredPeer,
auto_introduce: bool,
) {
let mut resolved: Option<(String, crate::a2a::types::AgentCard)> = None;
'urls: for url in &peer.urls {
if url.trim_end_matches('/') == self_url.as_str() {
continue;
}
for candidate in peer_candidates(url) {
match tokio::time::timeout(MDNS_PEER_PROBE_TIMEOUT, try_fetch_agent_card(&candidate))
.await
{
Ok(Ok(card)) => {
resolved = Some((candidate, card));
break 'urls;
}
Ok(Err(error)) => {
tracing::debug!(
agent = %agent_name,
peer = %candidate,
error = %error,
"mDNS peer probe failed"
);
}
Err(_) => {
tracing::debug!(
agent = %agent_name,
peer = %candidate,
timeout_secs = MDNS_PEER_PROBE_TIMEOUT.as_secs(),
"mDNS peer probe timed out"
);
}
}
}
}
let Some((endpoint, card)) = resolved else {
return;
};
let is_new = {
let mut k = known.lock().await;
let inserted = k.insert(card.name.clone());
k.insert(peer.instance_name.clone());
inserted
};
bus.registry.register(card.clone());
if is_new {
let handle = bus.handle("a2a-discovery");
handle.send(
"broadcast",
crate::bus::BusMessage::Heartbeat {
agent_id: card.name.clone(),
status: format!("discovered via A2A at {endpoint}"),
},
);
tracing::info!(
agent = %agent_name,
peer_name = %card.name,
peer_url = %card.url,
endpoint = %endpoint,
via = "mdns",
"Discovered A2A peer"
);
if auto_introduce
&& let Err(error) = send_intro(&endpoint, &agent_name, self_url.as_str()).await
{
tracing::warn!(
agent = %agent_name,
peer = %endpoint,
error = %error,
"Auto-intro message failed"
);
}
}
}
pub async fn start_a2a_in_background(
opts: SpawnOptions,
bus: Arc<AgentBus>,
) -> Result<A2APeerHandle> {
let prep = prepare_a2a(opts, bus).await?;
tracing::info!(
agent = %prep.agent_name,
bind_addr = %prep.bind_addr,
public_url = %prep.public_url,
mdns = prep.mdns_handle.is_some(),
"A2A peer listening (background mode)"
);
let agent_name = prep.agent_name.clone();
let bind_addr = prep.bind_addr.clone();
let public_url = prep.public_url.clone();
let server_task = tokio::spawn(async move {
if let Err(e) = axum::serve(prep.listener, prep.router).await {
tracing::error!(error = %e, "A2A peer server task exited with error");
}
});
Ok(A2APeerHandle {
agent_name,
bind_addr,
public_url,
server_task,
discovery_task: prep.discovery_task,
mdns_intake_task: prep.mdns_intake_task,
lan_tasks: prep.lan_tasks,
_mdns_handle: prep.mdns_handle,
})
}
pub async fn run(args: SpawnArgs) -> Result<()> {
let bus = AgentBus::new().into_arc();
crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
let prep = prepare_a2a(SpawnOptions::from_spawn_args(&args), bus).await?;
tracing::info!(
agent = %prep.agent_name,
bind_addr = %prep.bind_addr,
public_url = %prep.public_url,
mdns = prep.mdns_handle.is_some(),
"Spawned A2A agent runtime"
);
let agent_name = prep.agent_name.clone();
let discovery_task = prep.discovery_task;
let mdns_intake_task = prep.mdns_intake_task;
let lan_tasks = prep.lan_tasks;
let _mdns_handle = prep.mdns_handle;
axum::serve(prep.listener, prep.router)
.with_graceful_shutdown(shutdown_signal())
.await
.context("Spawned A2A server failed")?;
discovery_task.abort();
if let Some(t) = mdns_intake_task {
t.abort();
}
for task in lan_tasks {
task.abort();
}
tracing::info!(agent = %agent_name, "Spawned A2A agent shut down");
Ok(())
}
async fn shutdown_signal() {
let _ = tokio::signal::ctrl_c().await;
tracing::info!("Shutdown signal received");
}
fn normalize_base_url(url: &str) -> Result<String> {
let trimmed = url.trim();
if trimmed.is_empty() {
anyhow::bail!("URL cannot be empty");
}
let normalized = if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
trimmed.to_string()
} else {
format!("http://{trimmed}")
};
let parsed = Url::parse(&normalized).with_context(|| format!("Invalid URL: {normalized}"))?;
let mut cleaned = parsed.to_string();
if cleaned.ends_with('/') {
cleaned.pop();
}
Ok(cleaned)
}
fn collect_peers(raw_peers: &[String], self_url: &str) -> Vec<String> {
let mut dedup = HashSet::new();
let self_url = self_url.trim_end_matches('/');
for raw in raw_peers {
if raw.trim().is_empty() {
continue;
}
if let Ok(normalized) = normalize_base_url(raw)
&& normalized.trim_end_matches('/') != self_url
{
dedup.insert(normalized);
}
}
dedup.into_iter().collect()
}
async fn discovery_loop(
bus: Arc<AgentBus>,
peers: Vec<String>,
self_url: String,
agent_name: String,
interval_secs: u64,
auto_introduce: bool,
) {
let discovered = Arc::new(Mutex::new(HashSet::<String>::new()));
let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs));
loop {
ticker.tick().await;
for peer_seed in &peers {
let candidates = peer_candidates(peer_seed);
let mut discovered_card = None;
for candidate in candidates {
match try_fetch_agent_card(&candidate).await {
Ok(card) => {
discovered_card = Some((candidate, card));
break;
}
Err(error) => {
tracing::debug!(
agent = %agent_name,
peer = %candidate,
error = %error,
"Peer probe failed"
);
}
}
}
let Some((endpoint, card)) = discovered_card else {
continue;
};
let peer_id = format!("{}::{}", endpoint, card.name);
let is_new = {
let mut known = discovered.lock().await;
known.insert(peer_id)
};
bus.registry.register(card.clone());
if is_new {
tracing::info!(
agent = %agent_name,
peer_name = %card.name,
peer_url = %card.url,
endpoint = %endpoint,
"Discovered A2A peer"
);
if auto_introduce
&& let Err(error) = send_intro(&endpoint, &agent_name, &self_url).await
{
tracing::warn!(
agent = %agent_name,
peer = %endpoint,
error = %error,
"Auto-intro message failed"
);
}
}
}
}
}
fn peer_candidates(seed: &str) -> Vec<String> {
if seed.ends_with("/a2a") {
return vec![seed.to_string()];
}
vec![seed.to_string(), format!("{seed}/a2a")]
}
async fn try_fetch_agent_card(endpoint: &str) -> Result<crate::a2a::types::AgentCard> {
let mut client = A2AClient::new(endpoint);
if let Ok(token) = std::env::var("CODETETHER_AUTH_TOKEN") {
client = client.with_token(token);
}
let card = client.get_agent_card().await?;
Ok(card)
}
async fn send_intro(endpoint: &str, agent_name: &str, self_url: &str) -> Result<()> {
let mut client = A2AClient::new(endpoint);
if let Ok(token) = std::env::var("CODETETHER_AUTH_TOKEN") {
client = client.with_token(token);
}
let payload = MessageSendParams {
message: Message {
message_id: uuid::Uuid::new_v4().to_string(),
role: MessageRole::User,
parts: vec![Part::Text {
text: format!(
"Hello from {agent_name} ({self_url}). I am online and available for A2A collaboration."
),
}],
context_id: None,
task_id: None,
metadata: std::collections::HashMap::new(),
extensions: vec![],
},
configuration: Some(MessageSendConfiguration {
accepted_output_modes: vec!["text/plain".to_string()],
blocking: Some(false),
history_length: Some(0),
push_notification_config: None,
}),
};
let _ = client.send_message(payload).await?;
tracing::info!(peer = %endpoint, "Auto-intro message sent");
Ok(())
}
#[cfg(test)]
mod tests {
use super::{collect_peers, peer_candidates};
#[test]
fn collect_peers_deduplicates_and_skips_self() {
let peers = vec![
"localhost:5000".to_string(),
"http://localhost:5000/".to_string(),
"http://localhost:5001".to_string(),
"http://localhost:5002".to_string(),
];
let mut out = collect_peers(&peers, "http://localhost:5001");
out.sort();
assert_eq!(
out,
vec![
"http://localhost:5000".to_string(),
"http://localhost:5002".to_string(),
]
);
}
#[test]
fn peer_candidates_adds_a2a_variant() {
let out = peer_candidates("http://localhost:4096");
assert_eq!(
out,
vec![
"http://localhost:4096".to_string(),
"http://localhost:4096/a2a".to_string()
]
);
let out2 = peer_candidates("http://localhost:4096/a2a");
assert_eq!(out2, vec!["http://localhost:4096/a2a".to_string()]);
}
}