codetether_agent/a2a/
spawn.rs1use crate::a2a::client::A2AClient;
4use crate::a2a::server::A2AServer;
5use crate::a2a::types::{Message, MessageRole, MessageSendConfiguration, MessageSendParams, Part};
6use crate::bus::AgentBus;
7use crate::cli::SpawnArgs;
8use anyhow::{Context, Result};
9use axum::Router;
10use reqwest::Url;
11use std::collections::HashSet;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Mutex;
15
16pub async fn run(args: SpawnArgs) -> Result<()> {
18 let bind_addr = format!("{}:{}", args.hostname, args.port);
19 let public_url = args
20 .public_url
21 .clone()
22 .unwrap_or_else(|| format!("http://{bind_addr}"));
23 let public_url = normalize_base_url(&public_url)?;
24
25 let agent_name = args
26 .name
27 .clone()
28 .unwrap_or_else(|| format!("spawned-agent-{}", std::process::id()));
29
30 let mut card = A2AServer::default_card(&public_url);
31 card.name = agent_name.clone();
32 if let Some(description) = args.description.clone() {
33 card.description = description;
34 }
35
36 let bus = AgentBus::new().into_arc();
37 bus.registry.register(card.clone());
38 let handle = bus.handle(&agent_name);
39 let capabilities = card.skills.iter().map(|skill| skill.id.clone()).collect();
40 handle.announce_ready(capabilities);
41
42 let peers = collect_peers(&args.peer, &public_url);
43 if peers.is_empty() {
44 tracing::info!(
45 agent = %agent_name,
46 "Spawned without peer seeds; pass --peer or CODETETHER_A2A_PEERS for discovery"
47 );
48 } else {
49 tracing::info!(
50 agent = %agent_name,
51 peer_count = peers.len(),
52 "Spawned with peer discovery seeds"
53 );
54 }
55
56 let discovery_task = tokio::spawn(discovery_loop(
57 Arc::clone(&bus),
58 peers,
59 normalize_base_url(&public_url)?,
60 agent_name.clone(),
61 args.discovery_interval_secs.max(5),
62 args.auto_introduce,
63 ));
64
65 let router: Router = A2AServer::new(card).router();
66 let listener = tokio::net::TcpListener::bind(&bind_addr)
67 .await
68 .with_context(|| format!("Failed to bind spawn agent on {bind_addr}"))?;
69
70 tracing::info!(
71 agent = %agent_name,
72 bind_addr = %bind_addr,
73 public_url = %public_url,
74 "Spawned A2A agent runtime"
75 );
76
77 axum::serve(listener, router)
78 .with_graceful_shutdown(shutdown_signal())
79 .await
80 .context("Spawned A2A server failed")?;
81
82 discovery_task.abort();
83 tracing::info!(agent = %agent_name, "Spawned A2A agent shut down");
84 Ok(())
85}
86
87async fn shutdown_signal() {
88 let _ = tokio::signal::ctrl_c().await;
89 tracing::info!("Shutdown signal received");
90}
91
92fn normalize_base_url(url: &str) -> Result<String> {
93 let trimmed = url.trim();
94 if trimmed.is_empty() {
95 anyhow::bail!("URL cannot be empty");
96 }
97
98 let normalized = if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
99 trimmed.to_string()
100 } else {
101 format!("http://{trimmed}")
102 };
103
104 let parsed = Url::parse(&normalized).with_context(|| format!("Invalid URL: {normalized}"))?;
105 let mut cleaned = parsed.to_string();
106 if cleaned.ends_with('/') {
107 cleaned.pop();
108 }
109 Ok(cleaned)
110}
111
112fn collect_peers(raw_peers: &[String], self_url: &str) -> Vec<String> {
113 let mut dedup = HashSet::new();
114 let self_url = self_url.trim_end_matches('/');
115
116 for raw in raw_peers {
117 if raw.trim().is_empty() {
118 continue;
119 }
120
121 if let Ok(normalized) = normalize_base_url(raw) {
122 if normalized.trim_end_matches('/') != self_url {
123 dedup.insert(normalized);
124 }
125 }
126 }
127
128 dedup.into_iter().collect()
129}
130
131async fn discovery_loop(
132 bus: Arc<AgentBus>,
133 peers: Vec<String>,
134 self_url: String,
135 agent_name: String,
136 interval_secs: u64,
137 auto_introduce: bool,
138) {
139 let discovered = Arc::new(Mutex::new(HashSet::<String>::new()));
140 let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs));
141
142 loop {
143 ticker.tick().await;
144
145 for peer_seed in &peers {
146 let candidates = peer_candidates(peer_seed);
147 let mut discovered_card = None;
148
149 for candidate in candidates {
150 match try_fetch_agent_card(&candidate).await {
151 Ok(card) => {
152 discovered_card = Some((candidate, card));
153 break;
154 }
155 Err(error) => {
156 tracing::debug!(
157 agent = %agent_name,
158 peer = %candidate,
159 error = %error,
160 "Peer probe failed"
161 );
162 }
163 }
164 }
165
166 let Some((endpoint, card)) = discovered_card else {
167 continue;
168 };
169
170 let peer_id = format!("{}::{}", endpoint, card.name);
171 let is_new = {
172 let mut known = discovered.lock().await;
173 known.insert(peer_id)
174 };
175
176 bus.registry.register(card.clone());
177
178 if is_new {
179 tracing::info!(
180 agent = %agent_name,
181 peer_name = %card.name,
182 peer_url = %card.url,
183 endpoint = %endpoint,
184 "Discovered A2A peer"
185 );
186
187 if auto_introduce {
188 if let Err(error) = send_intro(&endpoint, &agent_name, &self_url).await {
189 tracing::warn!(
190 agent = %agent_name,
191 peer = %endpoint,
192 error = %error,
193 "Auto-intro message failed"
194 );
195 }
196 }
197 }
198 }
199 }
200}
201
202fn peer_candidates(seed: &str) -> Vec<String> {
203 if seed.ends_with("/a2a") {
204 return vec![seed.to_string()];
205 }
206
207 vec![seed.to_string(), format!("{seed}/a2a")]
208}
209
210async fn try_fetch_agent_card(endpoint: &str) -> Result<crate::a2a::types::AgentCard> {
211 let client = A2AClient::new(endpoint);
212 let card = client.get_agent_card().await?;
213 Ok(card)
214}
215
216async fn send_intro(endpoint: &str, agent_name: &str, self_url: &str) -> Result<()> {
217 let client = A2AClient::new(endpoint);
218 let payload = MessageSendParams {
219 message: Message {
220 message_id: uuid::Uuid::new_v4().to_string(),
221 role: MessageRole::User,
222 parts: vec![Part::Text {
223 text: format!(
224 "Hello from {agent_name} ({self_url}). I am online and available for A2A collaboration."
225 ),
226 }],
227 context_id: None,
228 task_id: None,
229 metadata: std::collections::HashMap::new(),
230 extensions: vec![],
231 },
232 configuration: Some(MessageSendConfiguration {
233 accepted_output_modes: vec!["text/plain".to_string()],
234 blocking: Some(false),
235 history_length: Some(0),
236 push_notification_config: None,
237 }),
238 };
239
240 let _ = client.send_message(payload).await?;
241 tracing::info!(peer = %endpoint, "Auto-intro message sent");
242 Ok(())
243}
244
245#[cfg(test)]
246mod tests {
247 use super::{collect_peers, peer_candidates};
248
249 #[test]
250 fn collect_peers_deduplicates_and_skips_self() {
251 let peers = vec![
252 "localhost:5000".to_string(),
253 "http://localhost:5000/".to_string(),
254 "http://localhost:5001".to_string(),
255 "http://localhost:5002".to_string(),
256 ];
257
258 let mut out = collect_peers(&peers, "http://localhost:5001");
259 out.sort();
260
261 assert_eq!(
262 out,
263 vec![
264 "http://localhost:5000".to_string(),
265 "http://localhost:5002".to_string(),
266 ]
267 );
268 }
269
270 #[test]
271 fn peer_candidates_adds_a2a_variant() {
272 let out = peer_candidates("http://localhost:4096");
273 assert_eq!(
274 out,
275 vec![
276 "http://localhost:4096".to_string(),
277 "http://localhost:4096/a2a".to_string()
278 ]
279 );
280
281 let out2 = peer_candidates("http://localhost:4096/a2a");
282 assert_eq!(out2, vec!["http://localhost:4096/a2a".to_string()]);
283 }
284}