Skip to main content

codetether_agent/a2a/
spawn.rs

1//! Spawn an autonomous A2A agent runtime with auto peer discovery.
2
3use crate::a2a::client::A2AClient;
4use crate::a2a::server::A2AServer;
5use crate::a2a::types::{Message, MessageRole, MessageSendConfiguration, MessageSendParams, Part};
6use crate::bus::AgentBus;
7use crate::cli::SpawnArgs;
8use anyhow::{Context, Result};
9use axum::Router;
10use reqwest::Url;
11use std::collections::HashSet;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Mutex;
15
16/// Run the spawn command.
17pub async fn run(args: SpawnArgs) -> Result<()> {
18    let bind_addr = format!("{}:{}", args.hostname, args.port);
19    let public_url = args
20        .public_url
21        .clone()
22        .unwrap_or_else(|| format!("http://{bind_addr}"));
23    let public_url = normalize_base_url(&public_url)?;
24
25    let agent_name = args
26        .name
27        .clone()
28        .unwrap_or_else(|| format!("spawned-agent-{}", std::process::id()));
29
30    let mut card = A2AServer::default_card(&public_url);
31    card.name = agent_name.clone();
32    if let Some(description) = args.description.clone() {
33        card.description = description;
34    }
35
36    let bus = AgentBus::new().into_arc();
37    bus.registry.register(card.clone());
38    let handle = bus.handle(&agent_name);
39    let capabilities = card.skills.iter().map(|skill| skill.id.clone()).collect();
40    handle.announce_ready(capabilities);
41
42    let peers = collect_peers(&args.peer, &public_url);
43    if peers.is_empty() {
44        tracing::info!(
45            agent = %agent_name,
46            "Spawned without peer seeds; pass --peer or CODETETHER_A2A_PEERS for discovery"
47        );
48    } else {
49        tracing::info!(
50            agent = %agent_name,
51            peer_count = peers.len(),
52            "Spawned with peer discovery seeds"
53        );
54    }
55
56    let discovery_task = tokio::spawn(discovery_loop(
57        Arc::clone(&bus),
58        peers,
59        normalize_base_url(&public_url)?,
60        agent_name.clone(),
61        args.discovery_interval_secs.max(5),
62        args.auto_introduce,
63    ));
64
65    let router: Router = A2AServer::new(card).router();
66    let listener = tokio::net::TcpListener::bind(&bind_addr)
67        .await
68        .with_context(|| format!("Failed to bind spawn agent on {bind_addr}"))?;
69
70    tracing::info!(
71        agent = %agent_name,
72        bind_addr = %bind_addr,
73        public_url = %public_url,
74        "Spawned A2A agent runtime"
75    );
76
77    axum::serve(listener, router)
78        .with_graceful_shutdown(shutdown_signal())
79        .await
80        .context("Spawned A2A server failed")?;
81
82    discovery_task.abort();
83    tracing::info!(agent = %agent_name, "Spawned A2A agent shut down");
84    Ok(())
85}
86
87async fn shutdown_signal() {
88    let _ = tokio::signal::ctrl_c().await;
89    tracing::info!("Shutdown signal received");
90}
91
92fn normalize_base_url(url: &str) -> Result<String> {
93    let trimmed = url.trim();
94    if trimmed.is_empty() {
95        anyhow::bail!("URL cannot be empty");
96    }
97
98    let normalized = if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
99        trimmed.to_string()
100    } else {
101        format!("http://{trimmed}")
102    };
103
104    let parsed = Url::parse(&normalized).with_context(|| format!("Invalid URL: {normalized}"))?;
105    let mut cleaned = parsed.to_string();
106    if cleaned.ends_with('/') {
107        cleaned.pop();
108    }
109    Ok(cleaned)
110}
111
112fn collect_peers(raw_peers: &[String], self_url: &str) -> Vec<String> {
113    let mut dedup = HashSet::new();
114    let self_url = self_url.trim_end_matches('/');
115
116    for raw in raw_peers {
117        if raw.trim().is_empty() {
118            continue;
119        }
120
121        if let Ok(normalized) = normalize_base_url(raw) {
122            if normalized.trim_end_matches('/') != self_url {
123                dedup.insert(normalized);
124            }
125        }
126    }
127
128    dedup.into_iter().collect()
129}
130
131async fn discovery_loop(
132    bus: Arc<AgentBus>,
133    peers: Vec<String>,
134    self_url: String,
135    agent_name: String,
136    interval_secs: u64,
137    auto_introduce: bool,
138) {
139    let discovered = Arc::new(Mutex::new(HashSet::<String>::new()));
140    let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs));
141
142    loop {
143        ticker.tick().await;
144
145        for peer_seed in &peers {
146            let candidates = peer_candidates(peer_seed);
147            let mut discovered_card = None;
148
149            for candidate in candidates {
150                match try_fetch_agent_card(&candidate).await {
151                    Ok(card) => {
152                        discovered_card = Some((candidate, card));
153                        break;
154                    }
155                    Err(error) => {
156                        tracing::debug!(
157                            agent = %agent_name,
158                            peer = %candidate,
159                            error = %error,
160                            "Peer probe failed"
161                        );
162                    }
163                }
164            }
165
166            let Some((endpoint, card)) = discovered_card else {
167                continue;
168            };
169
170            let peer_id = format!("{}::{}", endpoint, card.name);
171            let is_new = {
172                let mut known = discovered.lock().await;
173                known.insert(peer_id)
174            };
175
176            bus.registry.register(card.clone());
177
178            if is_new {
179                tracing::info!(
180                    agent = %agent_name,
181                    peer_name = %card.name,
182                    peer_url = %card.url,
183                    endpoint = %endpoint,
184                    "Discovered A2A peer"
185                );
186
187                if auto_introduce {
188                    if let Err(error) = send_intro(&endpoint, &agent_name, &self_url).await {
189                        tracing::warn!(
190                            agent = %agent_name,
191                            peer = %endpoint,
192                            error = %error,
193                            "Auto-intro message failed"
194                        );
195                    }
196                }
197            }
198        }
199    }
200}
201
202fn peer_candidates(seed: &str) -> Vec<String> {
203    if seed.ends_with("/a2a") {
204        return vec![seed.to_string()];
205    }
206
207    vec![seed.to_string(), format!("{seed}/a2a")]
208}
209
210async fn try_fetch_agent_card(endpoint: &str) -> Result<crate::a2a::types::AgentCard> {
211    let client = A2AClient::new(endpoint);
212    let card = client.get_agent_card().await?;
213    Ok(card)
214}
215
216async fn send_intro(endpoint: &str, agent_name: &str, self_url: &str) -> Result<()> {
217    let client = A2AClient::new(endpoint);
218    let payload = MessageSendParams {
219        message: Message {
220            message_id: uuid::Uuid::new_v4().to_string(),
221            role: MessageRole::User,
222            parts: vec![Part::Text {
223                text: format!(
224                    "Hello from {agent_name} ({self_url}). I am online and available for A2A collaboration."
225                ),
226            }],
227            context_id: None,
228            task_id: None,
229            metadata: std::collections::HashMap::new(),
230            extensions: vec![],
231        },
232        configuration: Some(MessageSendConfiguration {
233            accepted_output_modes: vec!["text/plain".to_string()],
234            blocking: Some(false),
235            history_length: Some(0),
236            push_notification_config: None,
237        }),
238    };
239
240    let _ = client.send_message(payload).await?;
241    tracing::info!(peer = %endpoint, "Auto-intro message sent");
242    Ok(())
243}
244
245#[cfg(test)]
246mod tests {
247    use super::{collect_peers, peer_candidates};
248
249    #[test]
250    fn collect_peers_deduplicates_and_skips_self() {
251        let peers = vec![
252            "localhost:5000".to_string(),
253            "http://localhost:5000/".to_string(),
254            "http://localhost:5001".to_string(),
255            "http://localhost:5002".to_string(),
256        ];
257
258        let mut out = collect_peers(&peers, "http://localhost:5001");
259        out.sort();
260
261        assert_eq!(
262            out,
263            vec![
264                "http://localhost:5000".to_string(),
265                "http://localhost:5002".to_string(),
266            ]
267        );
268    }
269
270    #[test]
271    fn peer_candidates_adds_a2a_variant() {
272        let out = peer_candidates("http://localhost:4096");
273        assert_eq!(
274            out,
275            vec![
276                "http://localhost:4096".to_string(),
277                "http://localhost:4096/a2a".to_string()
278            ]
279        );
280
281        let out2 = peer_candidates("http://localhost:4096/a2a");
282        assert_eq!(out2, vec!["http://localhost:4096/a2a".to_string()]);
283    }
284}