codetether_agent/a2a/
spawn.rs1use crate::a2a::client::A2AClient;
4use crate::a2a::server::A2AServer;
5use crate::a2a::types::{Message, MessageRole, MessageSendConfiguration, MessageSendParams, Part};
6use crate::bus::AgentBus;
7use crate::cli::SpawnArgs;
8use anyhow::{Context, Result};
9use axum::Router;
10use reqwest::Url;
11use std::collections::HashSet;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Mutex;
15
16pub async fn run(args: SpawnArgs) -> Result<()> {
18 let bind_addr = format!("{}:{}", args.hostname, args.port);
19 let public_url = args
20 .public_url
21 .clone()
22 .unwrap_or_else(|| format!("http://{bind_addr}"));
23 let public_url = normalize_base_url(&public_url)?;
24
25 let agent_name = args
26 .name
27 .clone()
28 .unwrap_or_else(|| format!("spawned-agent-{}", std::process::id()));
29
30 let mut card = A2AServer::default_card(&public_url);
31 card.name = agent_name.clone();
32 if let Some(description) = args.description.clone() {
33 card.description = description;
34 }
35
36 let bus = AgentBus::new().into_arc();
37
38 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
40 bus.registry.register(card.clone());
41 let handle = bus.handle(&agent_name);
42 let capabilities = card.skills.iter().map(|skill| skill.id.clone()).collect();
43 handle.announce_ready(capabilities);
44
45 let peers = collect_peers(&args.peer, &public_url);
46 if peers.is_empty() {
47 tracing::info!(
48 agent = %agent_name,
49 "Spawned without peer seeds; pass --peer or CODETETHER_A2A_PEERS for discovery"
50 );
51 } else {
52 tracing::info!(
53 agent = %agent_name,
54 peer_count = peers.len(),
55 "Spawned with peer discovery seeds"
56 );
57 }
58
59 let discovery_task = tokio::spawn(discovery_loop(
60 Arc::clone(&bus),
61 peers,
62 normalize_base_url(&public_url)?,
63 agent_name.clone(),
64 args.discovery_interval_secs.max(5),
65 args.auto_introduce,
66 ));
67
68 let router: Router = A2AServer::new(card).router();
69 let listener = tokio::net::TcpListener::bind(&bind_addr)
70 .await
71 .with_context(|| format!("Failed to bind spawn agent on {bind_addr}"))?;
72
73 tracing::info!(
74 agent = %agent_name,
75 bind_addr = %bind_addr,
76 public_url = %public_url,
77 "Spawned A2A agent runtime"
78 );
79
80 axum::serve(listener, router)
81 .with_graceful_shutdown(shutdown_signal())
82 .await
83 .context("Spawned A2A server failed")?;
84
85 discovery_task.abort();
86 tracing::info!(agent = %agent_name, "Spawned A2A agent shut down");
87 Ok(())
88}
89
90async fn shutdown_signal() {
91 let _ = tokio::signal::ctrl_c().await;
92 tracing::info!("Shutdown signal received");
93}
94
95fn normalize_base_url(url: &str) -> Result<String> {
96 let trimmed = url.trim();
97 if trimmed.is_empty() {
98 anyhow::bail!("URL cannot be empty");
99 }
100
101 let normalized = if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
102 trimmed.to_string()
103 } else {
104 format!("http://{trimmed}")
105 };
106
107 let parsed = Url::parse(&normalized).with_context(|| format!("Invalid URL: {normalized}"))?;
108 let mut cleaned = parsed.to_string();
109 if cleaned.ends_with('/') {
110 cleaned.pop();
111 }
112 Ok(cleaned)
113}
114
115fn collect_peers(raw_peers: &[String], self_url: &str) -> Vec<String> {
116 let mut dedup = HashSet::new();
117 let self_url = self_url.trim_end_matches('/');
118
119 for raw in raw_peers {
120 if raw.trim().is_empty() {
121 continue;
122 }
123
124 if let Ok(normalized) = normalize_base_url(raw) {
125 if normalized.trim_end_matches('/') != self_url {
126 dedup.insert(normalized);
127 }
128 }
129 }
130
131 dedup.into_iter().collect()
132}
133
134async fn discovery_loop(
135 bus: Arc<AgentBus>,
136 peers: Vec<String>,
137 self_url: String,
138 agent_name: String,
139 interval_secs: u64,
140 auto_introduce: bool,
141) {
142 let discovered = Arc::new(Mutex::new(HashSet::<String>::new()));
143 let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs));
144
145 loop {
146 ticker.tick().await;
147
148 for peer_seed in &peers {
149 let candidates = peer_candidates(peer_seed);
150 let mut discovered_card = None;
151
152 for candidate in candidates {
153 match try_fetch_agent_card(&candidate).await {
154 Ok(card) => {
155 discovered_card = Some((candidate, card));
156 break;
157 }
158 Err(error) => {
159 tracing::debug!(
160 agent = %agent_name,
161 peer = %candidate,
162 error = %error,
163 "Peer probe failed"
164 );
165 }
166 }
167 }
168
169 let Some((endpoint, card)) = discovered_card else {
170 continue;
171 };
172
173 let peer_id = format!("{}::{}", endpoint, card.name);
174 let is_new = {
175 let mut known = discovered.lock().await;
176 known.insert(peer_id)
177 };
178
179 bus.registry.register(card.clone());
180
181 if is_new {
182 tracing::info!(
183 agent = %agent_name,
184 peer_name = %card.name,
185 peer_url = %card.url,
186 endpoint = %endpoint,
187 "Discovered A2A peer"
188 );
189
190 if auto_introduce {
191 if let Err(error) = send_intro(&endpoint, &agent_name, &self_url).await {
192 tracing::warn!(
193 agent = %agent_name,
194 peer = %endpoint,
195 error = %error,
196 "Auto-intro message failed"
197 );
198 }
199 }
200 }
201 }
202 }
203}
204
205fn peer_candidates(seed: &str) -> Vec<String> {
206 if seed.ends_with("/a2a") {
207 return vec![seed.to_string()];
208 }
209
210 vec![seed.to_string(), format!("{seed}/a2a")]
211}
212
213async fn try_fetch_agent_card(endpoint: &str) -> Result<crate::a2a::types::AgentCard> {
214 let client = A2AClient::new(endpoint);
215 let card = client.get_agent_card().await?;
216 Ok(card)
217}
218
219async fn send_intro(endpoint: &str, agent_name: &str, self_url: &str) -> Result<()> {
220 let client = A2AClient::new(endpoint);
221 let payload = MessageSendParams {
222 message: Message {
223 message_id: uuid::Uuid::new_v4().to_string(),
224 role: MessageRole::User,
225 parts: vec![Part::Text {
226 text: format!(
227 "Hello from {agent_name} ({self_url}). I am online and available for A2A collaboration."
228 ),
229 }],
230 context_id: None,
231 task_id: None,
232 metadata: std::collections::HashMap::new(),
233 extensions: vec![],
234 },
235 configuration: Some(MessageSendConfiguration {
236 accepted_output_modes: vec!["text/plain".to_string()],
237 blocking: Some(false),
238 history_length: Some(0),
239 push_notification_config: None,
240 }),
241 };
242
243 let _ = client.send_message(payload).await?;
244 tracing::info!(peer = %endpoint, "Auto-intro message sent");
245 Ok(())
246}
247
248#[cfg(test)]
249mod tests {
250 use super::{collect_peers, peer_candidates};
251
252 #[test]
253 fn collect_peers_deduplicates_and_skips_self() {
254 let peers = vec![
255 "localhost:5000".to_string(),
256 "http://localhost:5000/".to_string(),
257 "http://localhost:5001".to_string(),
258 "http://localhost:5002".to_string(),
259 ];
260
261 let mut out = collect_peers(&peers, "http://localhost:5001");
262 out.sort();
263
264 assert_eq!(
265 out,
266 vec![
267 "http://localhost:5000".to_string(),
268 "http://localhost:5002".to_string(),
269 ]
270 );
271 }
272
273 #[test]
274 fn peer_candidates_adds_a2a_variant() {
275 let out = peer_candidates("http://localhost:4096");
276 assert_eq!(
277 out,
278 vec![
279 "http://localhost:4096".to_string(),
280 "http://localhost:4096/a2a".to_string()
281 ]
282 );
283
284 let out2 = peer_candidates("http://localhost:4096/a2a");
285 assert_eq!(out2, vec!["http://localhost:4096/a2a".to_string()]);
286 }
287}