1use crate::a2a::client::A2AClient;
48use crate::a2a::lan;
49use crate::a2a::mdns::{self, DiscoveredPeer};
50use crate::a2a::server::A2AServer;
51use crate::a2a::types::{Message, MessageRole, MessageSendConfiguration, MessageSendParams, Part};
52use crate::bus::AgentBus;
53use crate::cli::SpawnArgs;
54use anyhow::{Context, Result};
55use axum::Router;
56use reqwest::Url;
57use std::collections::HashSet;
58use std::sync::Arc;
59use std::time::Duration;
60use tokio::sync::Mutex;
61use tokio::sync::mpsc;
62
63#[derive(Debug, Clone)]
70pub struct SpawnOptions {
71 pub name: Option<String>,
73 pub hostname: String,
76 pub port: u16,
78 pub public_url: Option<String>,
81 pub description: Option<String>,
83 pub peer: Vec<String>,
86 pub discovery_interval_secs: u64,
89 pub auto_introduce: bool,
91 pub mdns: bool,
93}
94
95impl SpawnOptions {
96 pub fn auto() -> Self {
98 Self {
99 name: None,
100 hostname: "0.0.0.0".to_string(),
101 port: 0,
102 public_url: None,
103 description: None,
104 peer: Vec::new(),
105 discovery_interval_secs: 15,
106 auto_introduce: true,
107 mdns: true,
108 }
109 }
110
111 pub fn from_spawn_args(args: &SpawnArgs) -> Self {
113 Self {
114 name: args.name.clone(),
115 hostname: args.hostname.clone(),
116 port: args.port,
117 public_url: args.public_url.clone(),
118 description: args.description.clone(),
119 peer: args.peer.clone(),
120 discovery_interval_secs: args.discovery_interval_secs,
121 auto_introduce: args.auto_introduce,
122 mdns: args.mdns,
123 }
124 }
125}
126
127pub fn auto_agent_name() -> String {
131 let host_full = gethostname::gethostname()
132 .into_string()
133 .unwrap_or_else(|_| "host".to_string());
134 let host_short = host_full
135 .split('.')
136 .next()
137 .unwrap_or(&host_full)
138 .to_string();
139 let repo = std::env::current_dir()
140 .ok()
141 .as_ref()
142 .and_then(|p| p.file_name())
143 .and_then(|n| n.to_str())
144 .unwrap_or("repo")
145 .to_string();
146 let pid = std::process::id();
147 let short_pid = format!("{:04x}", pid & 0xffff);
148 let raw = format!("{host_short}-{repo}-{short_pid}");
149 sanitize_name(&raw)
150}
151
152fn sanitize_name(input: &str) -> String {
153 input
154 .chars()
155 .map(|c| {
156 if c.is_ascii_alphanumeric() || c == '-' {
157 c.to_ascii_lowercase()
158 } else {
159 '-'
160 }
161 })
162 .collect()
163}
164
165fn first_lan_ipv4() -> Option<String> {
169 let ifaces = if_addrs::get_if_addrs().ok()?;
170 ifaces
171 .into_iter()
172 .filter_map(|iface| {
173 let std::net::IpAddr::V4(v4) = iface.ip() else {
174 return None;
175 };
176 if v4.is_loopback() || v4.is_link_local() || v4.is_unspecified() {
177 return None;
178 }
179 Some(v4.to_string())
185 })
186 .next()
187}
188
189pub struct A2APeerHandle {
198 pub agent_name: String,
199 pub bind_addr: String,
200 pub public_url: String,
201 server_task: tokio::task::JoinHandle<()>,
202 discovery_task: tokio::task::JoinHandle<()>,
203 mdns_intake_task: Option<tokio::task::JoinHandle<()>>,
204 lan_tasks: Vec<tokio::task::JoinHandle<()>>,
205 _mdns_handle: Option<mdns::MdnsHandle>,
207}
208
209impl A2APeerHandle {
210 pub fn abort(self) {
212 drop(self);
214 }
215}
216
217impl Drop for A2APeerHandle {
218 fn drop(&mut self) {
219 self.server_task.abort();
220 self.discovery_task.abort();
221 if let Some(t) = self.mdns_intake_task.as_ref() {
222 t.abort();
223 }
224 for task in &self.lan_tasks {
225 task.abort();
226 }
227 }
229}
230
231struct A2APreparation {
232 listener: tokio::net::TcpListener,
233 router: Router,
234 discovery_task: tokio::task::JoinHandle<()>,
235 mdns_intake_task: Option<tokio::task::JoinHandle<()>>,
236 lan_tasks: Vec<tokio::task::JoinHandle<()>>,
237 mdns_handle: Option<mdns::MdnsHandle>,
238 agent_name: String,
239 bind_addr: String,
240 public_url: String,
241}
242
243async fn prepare_a2a(opts: SpawnOptions, bus: Arc<AgentBus>) -> Result<A2APreparation> {
252 let requested_bind_addr = format!("{}:{}", opts.hostname, opts.port);
256 let listener = tokio::net::TcpListener::bind(&requested_bind_addr)
257 .await
258 .with_context(|| format!("Failed to bind A2A peer on {requested_bind_addr}"))?;
259 let local_addr = listener
260 .local_addr()
261 .context("Failed to read local addr from listener")?;
262 let effective_port = local_addr.port();
263 let bind_addr = format!("{}:{}", opts.hostname, effective_port);
264
265 let agent_name = opts.name.clone().unwrap_or_else(auto_agent_name);
267
268 let public_host = match opts.hostname.as_str() {
273 "0.0.0.0" | "::" | "[::]" => first_lan_ipv4().unwrap_or_else(|| "127.0.0.1".to_string()),
274 other => other.to_string(),
275 };
276 let public_url = opts
277 .public_url
278 .clone()
279 .unwrap_or_else(|| format!("http://{public_host}:{effective_port}"));
280 let public_url = normalize_base_url(&public_url)?;
281
282 let mut card = A2AServer::default_card(&public_url);
284 card.name = agent_name.clone();
285 if let Some(description) = opts.description.clone() {
286 card.description = description;
287 }
288
289 bus.registry.register(card.clone());
290 let bus_handle = bus.handle(&agent_name);
291 let capabilities = card.skills.iter().map(|skill| skill.id.clone()).collect();
292 bus_handle.announce_ready(capabilities);
293
294 let peers = collect_peers(&opts.peer, &public_url);
298 if !peers.is_empty() {
299 tracing::info!(
300 agent = %agent_name,
301 peer_count = peers.len(),
302 "A2A peer started with explicit --peer seeds (additive to mDNS)"
303 );
304 }
305 let discovery_task = tokio::spawn(discovery_loop(
306 Arc::clone(&bus),
307 peers,
308 public_url.clone(),
309 agent_name.clone(),
310 opts.discovery_interval_secs.max(5),
311 opts.auto_introduce,
312 ));
313
314 let mdns_bind_addrs: Vec<std::net::IpAddr> = match opts.hostname.as_str() {
323 "0.0.0.0" | "::" | "[::]" => vec![],
324 other => other.parse::<std::net::IpAddr>().ok().into_iter().collect(),
325 };
326 let mut lan_tasks = Vec::new();
327 let (mdns_handle, mdns_intake_task) = if opts.mdns {
328 let (peer_tx, peer_rx) = mpsc::channel::<DiscoveredPeer>(64);
329 match lan::announce_and_listen(agent_name.clone(), public_url.clone(), peer_tx.clone())
330 .await
331 {
332 Ok(tasks) => lan_tasks = tasks,
333 Err(error) => tracing::warn!(%error, "A2A LAN broadcast discovery unavailable"),
334 }
335 match mdns::announce_and_browse(&agent_name, effective_port, mdns_bind_addrs, peer_tx) {
336 Ok(handle) => {
337 let intake = tokio::spawn(mdns_intake_loop(
338 Arc::clone(&bus),
339 peer_rx,
340 public_url.clone(),
341 agent_name.clone(),
342 opts.auto_introduce,
343 ));
344 (Some(handle), Some(intake))
345 }
346 Err(e) => {
347 tracing::warn!(
348 agent = %agent_name,
349 error = %e,
350 "mDNS unavailable; falling back to explicit --peer seeds only"
351 );
352 (None, None)
353 }
354 }
355 } else {
356 (None, None)
357 };
358
359 let router: Router = A2AServer::with_bus(card, Arc::clone(&bus)).router();
360
361 Ok(A2APreparation {
362 listener,
363 router,
364 discovery_task,
365 mdns_intake_task,
366 lan_tasks,
367 mdns_handle,
368 agent_name,
369 bind_addr,
370 public_url,
371 })
372}
373
374const MDNS_PEER_PROBE_TIMEOUT: Duration = Duration::from_secs(5);
378
379async fn mdns_intake_loop(
392 bus: Arc<AgentBus>,
393 mut peer_rx: mpsc::Receiver<DiscoveredPeer>,
394 self_url: String,
395 agent_name: String,
396 auto_introduce: bool,
397) {
398 let self_url = Arc::new(self_url.trim_end_matches('/').to_string());
399 let known: Arc<Mutex<HashSet<String>>> = Arc::new(Mutex::new(HashSet::new()));
400
401 while let Some(peer) = peer_rx.recv().await {
402 let already_known_by_instance = {
405 let k = known.lock().await;
406 k.contains(&peer.instance_name)
407 };
408 if already_known_by_instance {
409 continue;
410 }
411
412 let bus = Arc::clone(&bus);
415 let known = Arc::clone(&known);
416 let self_url = Arc::clone(&self_url);
417 let agent_name = agent_name.clone();
418 tokio::spawn(async move {
419 handle_mdns_peer(bus, known, self_url, agent_name, peer, auto_introduce).await;
420 });
421 }
422}
423
424async fn handle_mdns_peer(
429 bus: Arc<AgentBus>,
430 known: Arc<Mutex<HashSet<String>>>,
431 self_url: Arc<String>,
432 agent_name: String,
433 peer: DiscoveredPeer,
434 auto_introduce: bool,
435) {
436 let mut resolved: Option<(String, crate::a2a::types::AgentCard)> = None;
437
438 'urls: for url in &peer.urls {
439 if url.trim_end_matches('/') == self_url.as_str() {
440 continue;
441 }
442 for candidate in peer_candidates(url) {
443 match tokio::time::timeout(MDNS_PEER_PROBE_TIMEOUT, try_fetch_agent_card(&candidate))
444 .await
445 {
446 Ok(Ok(card)) => {
447 resolved = Some((candidate, card));
448 break 'urls;
449 }
450 Ok(Err(error)) => {
451 tracing::debug!(
452 agent = %agent_name,
453 peer = %candidate,
454 error = %error,
455 "mDNS peer probe failed"
456 );
457 }
458 Err(_) => {
459 tracing::debug!(
460 agent = %agent_name,
461 peer = %candidate,
462 timeout_secs = MDNS_PEER_PROBE_TIMEOUT.as_secs(),
463 "mDNS peer probe timed out"
464 );
465 }
466 }
467 }
468 }
469
470 let Some((endpoint, card)) = resolved else {
471 return;
472 };
473
474 let is_new = {
475 let mut k = known.lock().await;
476 let inserted = k.insert(card.name.clone());
478 k.insert(peer.instance_name.clone());
479 inserted
480 };
481
482 bus.registry.register(card.clone());
483
484 if is_new {
485 let handle = bus.handle("a2a-discovery");
486 handle.send(
487 "broadcast",
488 crate::bus::BusMessage::Heartbeat {
489 agent_id: card.name.clone(),
490 status: format!("discovered via A2A at {endpoint}"),
491 },
492 );
493 tracing::info!(
494 agent = %agent_name,
495 peer_name = %card.name,
496 peer_url = %card.url,
497 endpoint = %endpoint,
498 via = "mdns",
499 "Discovered A2A peer"
500 );
501 if auto_introduce
502 && let Err(error) = send_intro(&endpoint, &agent_name, self_url.as_str()).await
503 {
504 tracing::warn!(
505 agent = %agent_name,
506 peer = %endpoint,
507 error = %error,
508 "Auto-intro message failed"
509 );
510 }
511 }
512}
513
514pub async fn start_a2a_in_background(
520 opts: SpawnOptions,
521 bus: Arc<AgentBus>,
522) -> Result<A2APeerHandle> {
523 let prep = prepare_a2a(opts, bus).await?;
524 tracing::info!(
525 agent = %prep.agent_name,
526 bind_addr = %prep.bind_addr,
527 public_url = %prep.public_url,
528 mdns = prep.mdns_handle.is_some(),
529 "A2A peer listening (background mode)"
530 );
531 let agent_name = prep.agent_name.clone();
532 let bind_addr = prep.bind_addr.clone();
533 let public_url = prep.public_url.clone();
534 let server_task = tokio::spawn(async move {
535 if let Err(e) = axum::serve(prep.listener, prep.router).await {
536 tracing::error!(error = %e, "A2A peer server task exited with error");
537 }
538 });
539 Ok(A2APeerHandle {
540 agent_name,
541 bind_addr,
542 public_url,
543 server_task,
544 discovery_task: prep.discovery_task,
545 mdns_intake_task: prep.mdns_intake_task,
546 lan_tasks: prep.lan_tasks,
547 _mdns_handle: prep.mdns_handle,
548 })
549}
550
551pub async fn run(args: SpawnArgs) -> Result<()> {
554 let bus = AgentBus::new().into_arc();
555
556 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
558
559 let prep = prepare_a2a(SpawnOptions::from_spawn_args(&args), bus).await?;
560
561 tracing::info!(
562 agent = %prep.agent_name,
563 bind_addr = %prep.bind_addr,
564 public_url = %prep.public_url,
565 mdns = prep.mdns_handle.is_some(),
566 "Spawned A2A agent runtime"
567 );
568
569 let agent_name = prep.agent_name.clone();
570 let discovery_task = prep.discovery_task;
571 let mdns_intake_task = prep.mdns_intake_task;
572 let lan_tasks = prep.lan_tasks;
573 let _mdns_handle = prep.mdns_handle; axum::serve(prep.listener, prep.router)
576 .with_graceful_shutdown(shutdown_signal())
577 .await
578 .context("Spawned A2A server failed")?;
579
580 discovery_task.abort();
581 if let Some(t) = mdns_intake_task {
582 t.abort();
583 }
584 for task in lan_tasks {
585 task.abort();
586 }
587 tracing::info!(agent = %agent_name, "Spawned A2A agent shut down");
588 Ok(())
589}
590
591async fn shutdown_signal() {
592 let _ = tokio::signal::ctrl_c().await;
593 tracing::info!("Shutdown signal received");
594}
595
596fn normalize_base_url(url: &str) -> Result<String> {
597 let trimmed = url.trim();
598 if trimmed.is_empty() {
599 anyhow::bail!("URL cannot be empty");
600 }
601
602 let normalized = if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
603 trimmed.to_string()
604 } else {
605 format!("http://{trimmed}")
606 };
607
608 let parsed = Url::parse(&normalized).with_context(|| format!("Invalid URL: {normalized}"))?;
609 let mut cleaned = parsed.to_string();
610 if cleaned.ends_with('/') {
611 cleaned.pop();
612 }
613 Ok(cleaned)
614}
615
616fn collect_peers(raw_peers: &[String], self_url: &str) -> Vec<String> {
617 let mut dedup = HashSet::new();
618 let self_url = self_url.trim_end_matches('/');
619
620 for raw in raw_peers {
621 if raw.trim().is_empty() {
622 continue;
623 }
624
625 if let Ok(normalized) = normalize_base_url(raw)
626 && normalized.trim_end_matches('/') != self_url
627 {
628 dedup.insert(normalized);
629 }
630 }
631
632 dedup.into_iter().collect()
633}
634
635async fn discovery_loop(
636 bus: Arc<AgentBus>,
637 peers: Vec<String>,
638 self_url: String,
639 agent_name: String,
640 interval_secs: u64,
641 auto_introduce: bool,
642) {
643 let discovered = Arc::new(Mutex::new(HashSet::<String>::new()));
644 let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs));
645
646 loop {
647 ticker.tick().await;
648
649 for peer_seed in &peers {
650 let candidates = peer_candidates(peer_seed);
651 let mut discovered_card = None;
652
653 for candidate in candidates {
654 match try_fetch_agent_card(&candidate).await {
655 Ok(card) => {
656 discovered_card = Some((candidate, card));
657 break;
658 }
659 Err(error) => {
660 tracing::debug!(
661 agent = %agent_name,
662 peer = %candidate,
663 error = %error,
664 "Peer probe failed"
665 );
666 }
667 }
668 }
669
670 let Some((endpoint, card)) = discovered_card else {
671 continue;
672 };
673
674 let peer_id = format!("{}::{}", endpoint, card.name);
675 let is_new = {
676 let mut known = discovered.lock().await;
677 known.insert(peer_id)
678 };
679
680 bus.registry.register(card.clone());
681
682 if is_new {
683 tracing::info!(
684 agent = %agent_name,
685 peer_name = %card.name,
686 peer_url = %card.url,
687 endpoint = %endpoint,
688 "Discovered A2A peer"
689 );
690
691 if auto_introduce
692 && let Err(error) = send_intro(&endpoint, &agent_name, &self_url).await
693 {
694 tracing::warn!(
695 agent = %agent_name,
696 peer = %endpoint,
697 error = %error,
698 "Auto-intro message failed"
699 );
700 }
701 }
702 }
703 }
704}
705
706fn peer_candidates(seed: &str) -> Vec<String> {
707 if seed.ends_with("/a2a") {
708 return vec![seed.to_string()];
709 }
710
711 vec![seed.to_string(), format!("{seed}/a2a")]
712}
713
714async fn try_fetch_agent_card(endpoint: &str) -> Result<crate::a2a::types::AgentCard> {
715 let mut client = A2AClient::new(endpoint);
716 if let Ok(token) = std::env::var("CODETETHER_AUTH_TOKEN") {
717 client = client.with_token(token);
718 }
719 let card = client.get_agent_card().await?;
720 Ok(card)
721}
722
723async fn send_intro(endpoint: &str, agent_name: &str, self_url: &str) -> Result<()> {
724 let mut client = A2AClient::new(endpoint);
725 if let Ok(token) = std::env::var("CODETETHER_AUTH_TOKEN") {
726 client = client.with_token(token);
727 }
728 let payload = MessageSendParams {
729 message: Message {
730 message_id: uuid::Uuid::new_v4().to_string(),
731 role: MessageRole::User,
732 parts: vec![Part::Text {
733 text: format!(
734 "Hello from {agent_name} ({self_url}). I am online and available for A2A collaboration."
735 ),
736 }],
737 context_id: None,
738 task_id: None,
739 metadata: std::collections::HashMap::new(),
740 extensions: vec![],
741 },
742 configuration: Some(MessageSendConfiguration {
743 accepted_output_modes: vec!["text/plain".to_string()],
744 blocking: Some(false),
745 history_length: Some(0),
746 push_notification_config: None,
747 }),
748 };
749
750 let _ = client.send_message(payload).await?;
751 tracing::info!(peer = %endpoint, "Auto-intro message sent");
752 Ok(())
753}
754
755#[cfg(test)]
756mod tests {
757 use super::{collect_peers, peer_candidates};
758
759 #[test]
760 fn collect_peers_deduplicates_and_skips_self() {
761 let peers = vec![
762 "localhost:5000".to_string(),
763 "http://localhost:5000/".to_string(),
764 "http://localhost:5001".to_string(),
765 "http://localhost:5002".to_string(),
766 ];
767
768 let mut out = collect_peers(&peers, "http://localhost:5001");
769 out.sort();
770
771 assert_eq!(
772 out,
773 vec![
774 "http://localhost:5000".to_string(),
775 "http://localhost:5002".to_string(),
776 ]
777 );
778 }
779
780 #[test]
781 fn peer_candidates_adds_a2a_variant() {
782 let out = peer_candidates("http://localhost:4096");
783 assert_eq!(
784 out,
785 vec![
786 "http://localhost:4096".to_string(),
787 "http://localhost:4096/a2a".to_string()
788 ]
789 );
790
791 let out2 = peer_candidates("http://localhost:4096/a2a");
792 assert_eq!(out2, vec!["http://localhost:4096/a2a".to_string()]);
793 }
794}