Skip to main content

codetether_agent/a2a/
spawn.rs

1//! Spawn an autonomous A2A agent runtime with auto peer discovery.
2//!
3//! Stands up an [`A2AServer`] (Axum) on a configurable host:port, publishes an
4//! agent card at `/.well-known/agent.json`, and runs a background discovery
5//! loop that polls each `--peer` seed for its agent card. New peers are
6//! registered in the local bus and (unless `--no-auto-introduce` is set) sent
7//! a one-shot non-blocking `message/send` introduction.
8//!
9//! Each spawned process is a self-contained A2A node: there is no central
10//! broker. Two `codetether spawn` instances pointing at each other form the
11//! minimal mesh; `n` instances form a fully-connected mesh as long as the
12//! seed graph is connected.
13//!
14//! # Lifecycle
15//!
16//! 1. Resolve identity (`--name`, bind addr, public URL).
17//! 2. Build the default [`AgentCard`](crate::a2a::types::AgentCard) and let
18//!    `--description` override the default text.
19//! 3. Initialize [`AgentBus`], start the best-effort training-record S3 sink,
20//!    register self in the registry, and announce-ready with the card's skill
21//!    ids as capabilities.
22//! 4. Spawn [`discovery_loop`] for `--peer` seeds.
23//! 5. Bind the [`A2AServer::router`] and serve until SIGINT.
24//! 6. On shutdown: abort discovery and exit cleanly.
25//!
26//! # Discovery
27//!
28//! Every `discovery_interval_secs` (clamped to ≥ 5):
29//! - For each seed, build candidates via [`peer_candidates`] (tries `seed`
30//!   and `seed/a2a` unless the seed already ends in `/a2a`).
31//! - First successful agent-card fetch wins; the card is registered in
32//!   `bus.registry`.
33//! - On *first* sighting of a given `endpoint::card.name` pair: emit
34//!   `Discovered A2A peer` and (if `auto_introduce`) call
35//!   [`send_intro`] over the A2A client.
36//! - Re-sightings re-register the card (refresh) but do not re-introduce.
37//!
38//! Outbound discovery and intro calls attach `CODETETHER_AUTH_TOKEN` as a
39//! bearer if set.
40//!
41//! # See also
42//!
43//! Full prose documentation lives at `docs/a2a-spawn.md` (CLI reference,
44//! HTTP/JSON-RPC API, curl recipes, cross-host setup, troubleshooting,
45//! source map).
46
47use crate::a2a::client::A2AClient;
48use crate::a2a::lan;
49use crate::a2a::mdns::{self, DiscoveredPeer};
50use crate::a2a::server::A2AServer;
51use crate::a2a::types::{Message, MessageRole, MessageSendConfiguration, MessageSendParams, Part};
52use crate::bus::AgentBus;
53use crate::cli::SpawnArgs;
54use anyhow::{Context, Result};
55use axum::Router;
56use reqwest::Url;
57use std::collections::HashSet;
58use std::sync::Arc;
59use std::time::Duration;
60use tokio::sync::Mutex;
61use tokio::sync::mpsc;
62
63/// Inputs for starting an A2A peer runtime.
64///
65/// Used by `codetether spawn` (one-shot CLI) and by `codetether tui` (which
66/// runs the same A2A surface in the background alongside the interactive
67/// session). Every field is optional / has an auto-pick default — the
68/// "default by default by default" experience is `SpawnOptions::auto()`.
69#[derive(Debug, Clone)]
70pub struct SpawnOptions {
71    /// Agent name. `None` → auto-pick `<host>-<repo>-<short-pid>`.
72    pub name: Option<String>,
73    /// Bind hostname. `0.0.0.0` is the zero-config default so mDNS can
74    /// announce real interfaces. Use `127.0.0.1` for loopback-only peers.
75    pub hostname: String,
76    /// Bind port. `0` → OS picks an available port.
77    pub port: u16,
78    /// Public URL published in the agent card. `None` → derived from the
79    /// effective bound address after binding.
80    pub public_url: Option<String>,
81    /// Optional card description.
82    pub description: Option<String>,
83    /// Explicit peer seeds — used in addition to mDNS discovery, mainly
84    /// for cross-host setups where mDNS isn't routable.
85    pub peer: Vec<String>,
86    /// Polling interval for explicit peer seeds (mDNS is event-driven).
87    /// Clamped to ≥ 5.
88    pub discovery_interval_secs: u64,
89    /// Send a non-blocking intro `message/send` on first sighting of a peer.
90    pub auto_introduce: bool,
91    /// Enable mDNS announce + browse (true P2P discovery, no central state).
92    pub mdns: bool,
93}
94
95impl SpawnOptions {
96    /// Zero-config defaults: wildcard host, auto port/name, mDNS, intro on.
97    pub fn auto() -> Self {
98        Self {
99            name: None,
100            hostname: "0.0.0.0".to_string(),
101            port: 0,
102            public_url: None,
103            description: None,
104            peer: Vec::new(),
105            discovery_interval_secs: 15,
106            auto_introduce: true,
107            mdns: true,
108        }
109    }
110
111    /// Materialize options from the `codetether spawn` CLI args.
112    pub fn from_spawn_args(args: &SpawnArgs) -> Self {
113        Self {
114            name: args.name.clone(),
115            hostname: args.hostname.clone(),
116            port: args.port,
117            public_url: args.public_url.clone(),
118            description: args.description.clone(),
119            peer: args.peer.clone(),
120            discovery_interval_secs: args.discovery_interval_secs,
121            auto_introduce: args.auto_introduce,
122            mdns: args.mdns,
123        }
124    }
125}
126
127/// Auto-derive a stable, human-readable agent name from
128/// `<short-host>-<repo-basename>-<short-pid>`. Falls back gracefully if any
129/// component is unavailable.
130pub fn auto_agent_name() -> String {
131    let host_full = gethostname::gethostname()
132        .into_string()
133        .unwrap_or_else(|_| "host".to_string());
134    let host_short = host_full
135        .split('.')
136        .next()
137        .unwrap_or(&host_full)
138        .to_string();
139    let repo = std::env::current_dir()
140        .ok()
141        .as_ref()
142        .and_then(|p| p.file_name())
143        .and_then(|n| n.to_str())
144        .unwrap_or("repo")
145        .to_string();
146    let pid = std::process::id();
147    let short_pid = format!("{:04x}", pid & 0xffff);
148    let raw = format!("{host_short}-{repo}-{short_pid}");
149    sanitize_name(&raw)
150}
151
152fn sanitize_name(input: &str) -> String {
153    input
154        .chars()
155        .map(|c| {
156            if c.is_ascii_alphanumeric() || c == '-' {
157                c.to_ascii_lowercase()
158            } else {
159                '-'
160            }
161        })
162        .collect()
163}
164
165/// First non-loopback non-link-local IPv4 address on any UP interface.
166/// Used to substitute for `0.0.0.0` in the public agent-card URL so the
167/// card advertises a reachable address.
168fn first_lan_ipv4() -> Option<String> {
169    let ifaces = if_addrs::get_if_addrs().ok()?;
170    ifaces
171        .into_iter()
172        .filter_map(|iface| {
173            let std::net::IpAddr::V4(v4) = iface.ip() else {
174                return None;
175            };
176            if v4.is_loopback() || v4.is_link_local() || v4.is_unspecified() {
177                return None;
178            }
179            // Skip docker bridge and similar internal addresses by preferring
180            // the actual host LAN interface — but we don't filter by name
181            // since interface naming is platform-specific. The first match
182            // is good enough; users with strict requirements can pass
183            // `--public-url` explicitly.
184            Some(v4.to_string())
185        })
186        .next()
187}
188
189/// Handle to A2A peer background tasks (server + discovery loop + mDNS).
190///
191/// Returned by [`start_a2a_in_background`]. The handle MUST be kept alive
192/// (bound to a non-`_` variable, or stashed in app state) for the lifetime
193/// of the peer — dropping it aborts every background task and unregisters
194/// the mDNS service via the `Drop` impl below. `JoinHandle::drop()` on its
195/// own only *detaches* tasks; the explicit `Drop` here is what actually
196/// stops them.
197pub struct A2APeerHandle {
198    pub agent_name: String,
199    pub bind_addr: String,
200    pub public_url: String,
201    server_task: tokio::task::JoinHandle<()>,
202    discovery_task: tokio::task::JoinHandle<()>,
203    mdns_intake_task: Option<tokio::task::JoinHandle<()>>,
204    lan_tasks: Vec<tokio::task::JoinHandle<()>>,
205    /// Held to keep the daemon alive; on drop unregisters the service.
206    _mdns_handle: Option<mdns::MdnsHandle>,
207}
208
209impl A2APeerHandle {
210    /// Abort all background tasks and shut down mDNS immediately.
211    pub fn abort(self) {
212        // Drop runs on `self` falling out of scope — it does the work.
213        drop(self);
214    }
215}
216
217impl Drop for A2APeerHandle {
218    fn drop(&mut self) {
219        self.server_task.abort();
220        self.discovery_task.abort();
221        if let Some(t) = self.mdns_intake_task.as_ref() {
222            t.abort();
223        }
224        for task in &self.lan_tasks {
225            task.abort();
226        }
227        // _mdns_handle's own Drop unregisters the mDNS service.
228    }
229}
230
231struct A2APreparation {
232    listener: tokio::net::TcpListener,
233    router: Router,
234    discovery_task: tokio::task::JoinHandle<()>,
235    mdns_intake_task: Option<tokio::task::JoinHandle<()>>,
236    lan_tasks: Vec<tokio::task::JoinHandle<()>>,
237    mdns_handle: Option<mdns::MdnsHandle>,
238    agent_name: String,
239    bind_addr: String,
240    public_url: String,
241}
242
243/// Shared setup: bind listener (port 0 → OS-assigned), auto-pick name if
244/// missing, build card, register on bus, spawn explicit-seed discovery
245/// loop, and start mDNS announce + browse if enabled.
246///
247/// Bind happens first so we know the effective port before publishing a
248/// URL. The TCP bind is the only step that can fail synchronously — and
249/// because nothing is registered on the bus and no tasks are spawned
250/// before bind succeeds, a bind failure leaves no leaked state.
251async fn prepare_a2a(opts: SpawnOptions, bus: Arc<AgentBus>) -> Result<A2APreparation> {
252    // 1. Bind first so we know the effective port (handles --port 0) AND so
253    //    a bind failure short-circuits cleanly without leaving any
254    //    background task or stale bus registry entry behind.
255    let requested_bind_addr = format!("{}:{}", opts.hostname, opts.port);
256    let listener = tokio::net::TcpListener::bind(&requested_bind_addr)
257        .await
258        .with_context(|| format!("Failed to bind A2A peer on {requested_bind_addr}"))?;
259    let local_addr = listener
260        .local_addr()
261        .context("Failed to read local addr from listener")?;
262    let effective_port = local_addr.port();
263    let bind_addr = format!("{}:{}", opts.hostname, effective_port);
264
265    // 2. Auto-pick name if not provided.
266    let agent_name = opts.name.clone().unwrap_or_else(auto_agent_name);
267
268    // 3. Derive public_url from effective bind addr if not supplied.
269    //    A wildcard host (`0.0.0.0` / `::`) is not a usable URL — pick the
270    //    first non-loopback IPv4 address on a real interface so the card
271    //    advertises something callers can actually reach.
272    let public_host = match opts.hostname.as_str() {
273        "0.0.0.0" | "::" | "[::]" => first_lan_ipv4().unwrap_or_else(|| "127.0.0.1".to_string()),
274        other => other.to_string(),
275    };
276    let public_url = opts
277        .public_url
278        .clone()
279        .unwrap_or_else(|| format!("http://{public_host}:{effective_port}"));
280    let public_url = normalize_base_url(&public_url)?;
281
282    // 4. Build card.
283    let mut card = A2AServer::default_card(&public_url);
284    card.name = agent_name.clone();
285    if let Some(description) = opts.description.clone() {
286        card.description = description;
287    }
288
289    bus.registry.register(card.clone());
290    let bus_handle = bus.handle(&agent_name);
291    let capabilities = card.skills.iter().map(|skill| skill.id.clone()).collect();
292    bus_handle.announce_ready(capabilities);
293
294    // 5. Spawn explicit-seed discovery loop (additive to mDNS-discovered
295    //    peers — useful for cross-LAN/WAN seeds where multicast doesn't
296    //    route).
297    let peers = collect_peers(&opts.peer, &public_url);
298    if !peers.is_empty() {
299        tracing::info!(
300            agent = %agent_name,
301            peer_count = peers.len(),
302            "A2A peer started with explicit --peer seeds (additive to mDNS)"
303        );
304    }
305    let discovery_task = tokio::spawn(discovery_loop(
306        Arc::clone(&bus),
307        peers,
308        public_url.clone(),
309        agent_name.clone(),
310        opts.discovery_interval_secs.max(5),
311        opts.auto_introduce,
312    ));
313
314    // 6. mDNS announce + browse (true P2P, default on).
315    //
316    // Pass concrete bound addrs to mdns-sd that match what the HTTP server
317    // will actually accept — anything else risks advertising a URL no
318    // peer can reach. For a wildcard bind (`0.0.0.0`) we leave the addr
319    // list empty so mdns-sd's `enable_addr_auto` enumerates the real
320    // interface IPs; for a loopback bind we pass loopback so we don't
321    // advertise LAN addrs the server isn't bound to.
322    let mdns_bind_addrs: Vec<std::net::IpAddr> = match opts.hostname.as_str() {
323        "0.0.0.0" | "::" | "[::]" => vec![],
324        other => other.parse::<std::net::IpAddr>().ok().into_iter().collect(),
325    };
326    let mut lan_tasks = Vec::new();
327    let (mdns_handle, mdns_intake_task) = if opts.mdns {
328        let (peer_tx, peer_rx) = mpsc::channel::<DiscoveredPeer>(64);
329        match lan::announce_and_listen(agent_name.clone(), public_url.clone(), peer_tx.clone())
330            .await
331        {
332            Ok(tasks) => lan_tasks = tasks,
333            Err(error) => tracing::warn!(%error, "A2A LAN broadcast discovery unavailable"),
334        }
335        match mdns::announce_and_browse(&agent_name, effective_port, mdns_bind_addrs, peer_tx) {
336            Ok(handle) => {
337                let intake = tokio::spawn(mdns_intake_loop(
338                    Arc::clone(&bus),
339                    peer_rx,
340                    public_url.clone(),
341                    agent_name.clone(),
342                    opts.auto_introduce,
343                ));
344                (Some(handle), Some(intake))
345            }
346            Err(e) => {
347                tracing::warn!(
348                    agent = %agent_name,
349                    error = %e,
350                    "mDNS unavailable; falling back to explicit --peer seeds only"
351                );
352                (None, None)
353            }
354        }
355    } else {
356        (None, None)
357    };
358
359    let router: Router = A2AServer::with_bus(card, Arc::clone(&bus)).router();
360
361    Ok(A2APreparation {
362        listener,
363        router,
364        discovery_task,
365        mdns_intake_task,
366        lan_tasks,
367        mdns_handle,
368        agent_name,
369        bind_addr,
370        public_url,
371    })
372}
373
374/// Per-peer card-fetch deadline. mDNS may resolve a peer via several IPs
375/// (LAN, VPN, docker bridge, link-local v6); we want to fail fast on the
376/// unreachable ones rather than blocking the queue.
377const MDNS_PEER_PROBE_TIMEOUT: Duration = Duration::from_secs(5);
378
379/// Drain peers discovered over mDNS, fetch their cards, register them, and
380/// optionally send the same auto-intro the explicit-seed loop sends.
381///
382/// Each discovered peer is processed on its own tokio task so a slow or
383/// unreachable peer cannot block the rest of the queue. Dedup is keyed
384/// by **agent name** (not by endpoint URL), so an agent reachable via
385/// multiple network interfaces — common when binding `0.0.0.0` on a
386/// multi-homed host — only triggers one discovery log line and one
387/// auto-intro across the lifetime of the process. The known-set is
388/// also pre-checked by mDNS instance name before any fetch is issued,
389/// which short-circuits the duplicate announcements that mdns-sd
390/// emits per interface.
391async fn mdns_intake_loop(
392    bus: Arc<AgentBus>,
393    mut peer_rx: mpsc::Receiver<DiscoveredPeer>,
394    self_url: String,
395    agent_name: String,
396    auto_introduce: bool,
397) {
398    let self_url = Arc::new(self_url.trim_end_matches('/').to_string());
399    let known: Arc<Mutex<HashSet<String>>> = Arc::new(Mutex::new(HashSet::new()));
400
401    while let Some(peer) = peer_rx.recv().await {
402        // Cheap pre-check by instance name to skip the per-interface
403        // duplicate announcements before we spawn anything.
404        let already_known_by_instance = {
405            let k = known.lock().await;
406            k.contains(&peer.instance_name)
407        };
408        if already_known_by_instance {
409            continue;
410        }
411
412        // Spawn one task per discovered peer so a slow/unreachable peer
413        // can't block the next event in the queue.
414        let bus = Arc::clone(&bus);
415        let known = Arc::clone(&known);
416        let self_url = Arc::clone(&self_url);
417        let agent_name = agent_name.clone();
418        tokio::spawn(async move {
419            handle_mdns_peer(bus, known, self_url, agent_name, peer, auto_introduce).await;
420        });
421    }
422}
423
424/// Try each URL the resolver gave us until one yields a valid agent card,
425/// then register and (optionally) intro. Each fetch attempt is bounded by
426/// `MDNS_PEER_PROBE_TIMEOUT`. Multi-homed peers commonly resolve to
427/// several addrs (LAN/VPN/docker); we cycle through them in order.
428async fn handle_mdns_peer(
429    bus: Arc<AgentBus>,
430    known: Arc<Mutex<HashSet<String>>>,
431    self_url: Arc<String>,
432    agent_name: String,
433    peer: DiscoveredPeer,
434    auto_introduce: bool,
435) {
436    let mut resolved: Option<(String, crate::a2a::types::AgentCard)> = None;
437
438    'urls: for url in &peer.urls {
439        if url.trim_end_matches('/') == self_url.as_str() {
440            continue;
441        }
442        for candidate in peer_candidates(url) {
443            match tokio::time::timeout(MDNS_PEER_PROBE_TIMEOUT, try_fetch_agent_card(&candidate))
444                .await
445            {
446                Ok(Ok(card)) => {
447                    resolved = Some((candidate, card));
448                    break 'urls;
449                }
450                Ok(Err(error)) => {
451                    tracing::debug!(
452                        agent = %agent_name,
453                        peer = %candidate,
454                        error = %error,
455                        "mDNS peer probe failed"
456                    );
457                }
458                Err(_) => {
459                    tracing::debug!(
460                        agent = %agent_name,
461                        peer = %candidate,
462                        timeout_secs = MDNS_PEER_PROBE_TIMEOUT.as_secs(),
463                        "mDNS peer probe timed out"
464                    );
465                }
466            }
467        }
468    }
469
470    let Some((endpoint, card)) = resolved else {
471        return;
472    };
473
474    let is_new = {
475        let mut k = known.lock().await;
476        // Insert both keys so future probes via either path short-circuit.
477        let inserted = k.insert(card.name.clone());
478        k.insert(peer.instance_name.clone());
479        inserted
480    };
481
482    bus.registry.register(card.clone());
483
484    if is_new {
485        let handle = bus.handle("a2a-discovery");
486        handle.send(
487            "broadcast",
488            crate::bus::BusMessage::Heartbeat {
489                agent_id: card.name.clone(),
490                status: format!("discovered via A2A at {endpoint}"),
491            },
492        );
493        tracing::info!(
494            agent = %agent_name,
495            peer_name = %card.name,
496            peer_url = %card.url,
497            endpoint = %endpoint,
498            via = "mdns",
499            "Discovered A2A peer"
500        );
501        if auto_introduce
502            && let Err(error) = send_intro(&endpoint, &agent_name, self_url.as_str()).await
503        {
504            tracing::warn!(
505                agent = %agent_name,
506                peer = %endpoint,
507                error = %error,
508                "Auto-intro message failed"
509            );
510        }
511    }
512}
513
514/// Start an A2A peer runtime in the background, attached to the given bus.
515///
516/// Caller is responsible for keeping the returned [`A2APeerHandle`] alive
517/// for as long as the peer should keep serving. Drop or call `abort()` on
518/// shutdown. Used by the TUI on every launch (unless `--no-a2a` was set).
519pub async fn start_a2a_in_background(
520    opts: SpawnOptions,
521    bus: Arc<AgentBus>,
522) -> Result<A2APeerHandle> {
523    let prep = prepare_a2a(opts, bus).await?;
524    tracing::info!(
525        agent = %prep.agent_name,
526        bind_addr = %prep.bind_addr,
527        public_url = %prep.public_url,
528        mdns = prep.mdns_handle.is_some(),
529        "A2A peer listening (background mode)"
530    );
531    let agent_name = prep.agent_name.clone();
532    let bind_addr = prep.bind_addr.clone();
533    let public_url = prep.public_url.clone();
534    let server_task = tokio::spawn(async move {
535        if let Err(e) = axum::serve(prep.listener, prep.router).await {
536            tracing::error!(error = %e, "A2A peer server task exited with error");
537        }
538    });
539    Ok(A2APeerHandle {
540        agent_name,
541        bind_addr,
542        public_url,
543        server_task,
544        discovery_task: prep.discovery_task,
545        mdns_intake_task: prep.mdns_intake_task,
546        lan_tasks: prep.lan_tasks,
547        _mdns_handle: prep.mdns_handle,
548    })
549}
550
551/// Run the `codetether spawn` command. Owns its own bus and S3 sink, blocks
552/// on ctrl_c with graceful shutdown.
553pub async fn run(args: SpawnArgs) -> Result<()> {
554    let bus = AgentBus::new().into_arc();
555
556    // Auto-start S3 sink if MinIO is configured
557    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
558
559    let prep = prepare_a2a(SpawnOptions::from_spawn_args(&args), bus).await?;
560
561    tracing::info!(
562        agent = %prep.agent_name,
563        bind_addr = %prep.bind_addr,
564        public_url = %prep.public_url,
565        mdns = prep.mdns_handle.is_some(),
566        "Spawned A2A agent runtime"
567    );
568
569    let agent_name = prep.agent_name.clone();
570    let discovery_task = prep.discovery_task;
571    let mdns_intake_task = prep.mdns_intake_task;
572    let lan_tasks = prep.lan_tasks;
573    let _mdns_handle = prep.mdns_handle; // dropped at end of fn → unregisters
574
575    axum::serve(prep.listener, prep.router)
576        .with_graceful_shutdown(shutdown_signal())
577        .await
578        .context("Spawned A2A server failed")?;
579
580    discovery_task.abort();
581    if let Some(t) = mdns_intake_task {
582        t.abort();
583    }
584    for task in lan_tasks {
585        task.abort();
586    }
587    tracing::info!(agent = %agent_name, "Spawned A2A agent shut down");
588    Ok(())
589}
590
591async fn shutdown_signal() {
592    let _ = tokio::signal::ctrl_c().await;
593    tracing::info!("Shutdown signal received");
594}
595
596fn normalize_base_url(url: &str) -> Result<String> {
597    let trimmed = url.trim();
598    if trimmed.is_empty() {
599        anyhow::bail!("URL cannot be empty");
600    }
601
602    let normalized = if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
603        trimmed.to_string()
604    } else {
605        format!("http://{trimmed}")
606    };
607
608    let parsed = Url::parse(&normalized).with_context(|| format!("Invalid URL: {normalized}"))?;
609    let mut cleaned = parsed.to_string();
610    if cleaned.ends_with('/') {
611        cleaned.pop();
612    }
613    Ok(cleaned)
614}
615
616fn collect_peers(raw_peers: &[String], self_url: &str) -> Vec<String> {
617    let mut dedup = HashSet::new();
618    let self_url = self_url.trim_end_matches('/');
619
620    for raw in raw_peers {
621        if raw.trim().is_empty() {
622            continue;
623        }
624
625        if let Ok(normalized) = normalize_base_url(raw)
626            && normalized.trim_end_matches('/') != self_url
627        {
628            dedup.insert(normalized);
629        }
630    }
631
632    dedup.into_iter().collect()
633}
634
635async fn discovery_loop(
636    bus: Arc<AgentBus>,
637    peers: Vec<String>,
638    self_url: String,
639    agent_name: String,
640    interval_secs: u64,
641    auto_introduce: bool,
642) {
643    let discovered = Arc::new(Mutex::new(HashSet::<String>::new()));
644    let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs));
645
646    loop {
647        ticker.tick().await;
648
649        for peer_seed in &peers {
650            let candidates = peer_candidates(peer_seed);
651            let mut discovered_card = None;
652
653            for candidate in candidates {
654                match try_fetch_agent_card(&candidate).await {
655                    Ok(card) => {
656                        discovered_card = Some((candidate, card));
657                        break;
658                    }
659                    Err(error) => {
660                        tracing::debug!(
661                            agent = %agent_name,
662                            peer = %candidate,
663                            error = %error,
664                            "Peer probe failed"
665                        );
666                    }
667                }
668            }
669
670            let Some((endpoint, card)) = discovered_card else {
671                continue;
672            };
673
674            let peer_id = format!("{}::{}", endpoint, card.name);
675            let is_new = {
676                let mut known = discovered.lock().await;
677                known.insert(peer_id)
678            };
679
680            bus.registry.register(card.clone());
681
682            if is_new {
683                tracing::info!(
684                    agent = %agent_name,
685                    peer_name = %card.name,
686                    peer_url = %card.url,
687                    endpoint = %endpoint,
688                    "Discovered A2A peer"
689                );
690
691                if auto_introduce
692                    && let Err(error) = send_intro(&endpoint, &agent_name, &self_url).await
693                {
694                    tracing::warn!(
695                        agent = %agent_name,
696                        peer = %endpoint,
697                        error = %error,
698                        "Auto-intro message failed"
699                    );
700                }
701            }
702        }
703    }
704}
705
706fn peer_candidates(seed: &str) -> Vec<String> {
707    if seed.ends_with("/a2a") {
708        return vec![seed.to_string()];
709    }
710
711    vec![seed.to_string(), format!("{seed}/a2a")]
712}
713
714async fn try_fetch_agent_card(endpoint: &str) -> Result<crate::a2a::types::AgentCard> {
715    let mut client = A2AClient::new(endpoint);
716    if let Ok(token) = std::env::var("CODETETHER_AUTH_TOKEN") {
717        client = client.with_token(token);
718    }
719    let card = client.get_agent_card().await?;
720    Ok(card)
721}
722
723async fn send_intro(endpoint: &str, agent_name: &str, self_url: &str) -> Result<()> {
724    let mut client = A2AClient::new(endpoint);
725    if let Ok(token) = std::env::var("CODETETHER_AUTH_TOKEN") {
726        client = client.with_token(token);
727    }
728    let payload = MessageSendParams {
729        message: Message {
730            message_id: uuid::Uuid::new_v4().to_string(),
731            role: MessageRole::User,
732            parts: vec![Part::Text {
733                text: format!(
734                    "Hello from {agent_name} ({self_url}). I am online and available for A2A collaboration."
735                ),
736            }],
737            context_id: None,
738            task_id: None,
739            metadata: std::collections::HashMap::new(),
740            extensions: vec![],
741        },
742        configuration: Some(MessageSendConfiguration {
743            accepted_output_modes: vec!["text/plain".to_string()],
744            blocking: Some(false),
745            history_length: Some(0),
746            push_notification_config: None,
747        }),
748    };
749
750    let _ = client.send_message(payload).await?;
751    tracing::info!(peer = %endpoint, "Auto-intro message sent");
752    Ok(())
753}
754
755#[cfg(test)]
756mod tests {
757    use super::{collect_peers, peer_candidates};
758
759    #[test]
760    fn collect_peers_deduplicates_and_skips_self() {
761        let peers = vec![
762            "localhost:5000".to_string(),
763            "http://localhost:5000/".to_string(),
764            "http://localhost:5001".to_string(),
765            "http://localhost:5002".to_string(),
766        ];
767
768        let mut out = collect_peers(&peers, "http://localhost:5001");
769        out.sort();
770
771        assert_eq!(
772            out,
773            vec![
774                "http://localhost:5000".to_string(),
775                "http://localhost:5002".to_string(),
776            ]
777        );
778    }
779
780    #[test]
781    fn peer_candidates_adds_a2a_variant() {
782        let out = peer_candidates("http://localhost:4096");
783        assert_eq!(
784            out,
785            vec![
786                "http://localhost:4096".to_string(),
787                "http://localhost:4096/a2a".to_string()
788            ]
789        );
790
791        let out2 = peer_candidates("http://localhost:4096/a2a");
792        assert_eq!(out2, vec!["http://localhost:4096/a2a".to_string()]);
793    }
794}