Skip to main content

codetether_agent/a2a/
spawn.rs

1//! Spawn an autonomous A2A agent runtime with auto peer discovery.
2
3use crate::a2a::client::A2AClient;
4use crate::a2a::server::A2AServer;
5use crate::a2a::types::{Message, MessageRole, MessageSendConfiguration, MessageSendParams, Part};
6use crate::bus::AgentBus;
7use crate::cli::SpawnArgs;
8use anyhow::{Context, Result};
9use axum::Router;
10use reqwest::Url;
11use std::collections::HashSet;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Mutex;
15
16/// Run the spawn command.
17pub async fn run(args: SpawnArgs) -> Result<()> {
18    let bind_addr = format!("{}:{}", args.hostname, args.port);
19    let public_url = args
20        .public_url
21        .clone()
22        .unwrap_or_else(|| format!("http://{bind_addr}"));
23    let public_url = normalize_base_url(&public_url)?;
24
25    let agent_name = args
26        .name
27        .clone()
28        .unwrap_or_else(|| format!("spawned-agent-{}", std::process::id()));
29
30    let mut card = A2AServer::default_card(&public_url);
31    card.name = agent_name.clone();
32    if let Some(description) = args.description.clone() {
33        card.description = description;
34    }
35
36    let bus = AgentBus::new().into_arc();
37
38    // Auto-start S3 sink if MinIO is configured
39    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
40    bus.registry.register(card.clone());
41    let handle = bus.handle(&agent_name);
42    let capabilities = card.skills.iter().map(|skill| skill.id.clone()).collect();
43    handle.announce_ready(capabilities);
44
45    let peers = collect_peers(&args.peer, &public_url);
46    if peers.is_empty() {
47        tracing::info!(
48            agent = %agent_name,
49            "Spawned without peer seeds; pass --peer or CODETETHER_A2A_PEERS for discovery"
50        );
51    } else {
52        tracing::info!(
53            agent = %agent_name,
54            peer_count = peers.len(),
55            "Spawned with peer discovery seeds"
56        );
57    }
58
59    let discovery_task = tokio::spawn(discovery_loop(
60        Arc::clone(&bus),
61        peers,
62        normalize_base_url(&public_url)?,
63        agent_name.clone(),
64        args.discovery_interval_secs.max(5),
65        args.auto_introduce,
66    ));
67
68    let router: Router = A2AServer::new(card).router();
69    let listener = tokio::net::TcpListener::bind(&bind_addr)
70        .await
71        .with_context(|| format!("Failed to bind spawn agent on {bind_addr}"))?;
72
73    tracing::info!(
74        agent = %agent_name,
75        bind_addr = %bind_addr,
76        public_url = %public_url,
77        "Spawned A2A agent runtime"
78    );
79
80    axum::serve(listener, router)
81        .with_graceful_shutdown(shutdown_signal())
82        .await
83        .context("Spawned A2A server failed")?;
84
85    discovery_task.abort();
86    tracing::info!(agent = %agent_name, "Spawned A2A agent shut down");
87    Ok(())
88}
89
90async fn shutdown_signal() {
91    let _ = tokio::signal::ctrl_c().await;
92    tracing::info!("Shutdown signal received");
93}
94
95fn normalize_base_url(url: &str) -> Result<String> {
96    let trimmed = url.trim();
97    if trimmed.is_empty() {
98        anyhow::bail!("URL cannot be empty");
99    }
100
101    let normalized = if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
102        trimmed.to_string()
103    } else {
104        format!("http://{trimmed}")
105    };
106
107    let parsed = Url::parse(&normalized).with_context(|| format!("Invalid URL: {normalized}"))?;
108    let mut cleaned = parsed.to_string();
109    if cleaned.ends_with('/') {
110        cleaned.pop();
111    }
112    Ok(cleaned)
113}
114
115fn collect_peers(raw_peers: &[String], self_url: &str) -> Vec<String> {
116    let mut dedup = HashSet::new();
117    let self_url = self_url.trim_end_matches('/');
118
119    for raw in raw_peers {
120        if raw.trim().is_empty() {
121            continue;
122        }
123
124        if let Ok(normalized) = normalize_base_url(raw) {
125            if normalized.trim_end_matches('/') != self_url {
126                dedup.insert(normalized);
127            }
128        }
129    }
130
131    dedup.into_iter().collect()
132}
133
134async fn discovery_loop(
135    bus: Arc<AgentBus>,
136    peers: Vec<String>,
137    self_url: String,
138    agent_name: String,
139    interval_secs: u64,
140    auto_introduce: bool,
141) {
142    let discovered = Arc::new(Mutex::new(HashSet::<String>::new()));
143    let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs));
144
145    loop {
146        ticker.tick().await;
147
148        for peer_seed in &peers {
149            let candidates = peer_candidates(peer_seed);
150            let mut discovered_card = None;
151
152            for candidate in candidates {
153                match try_fetch_agent_card(&candidate).await {
154                    Ok(card) => {
155                        discovered_card = Some((candidate, card));
156                        break;
157                    }
158                    Err(error) => {
159                        tracing::debug!(
160                            agent = %agent_name,
161                            peer = %candidate,
162                            error = %error,
163                            "Peer probe failed"
164                        );
165                    }
166                }
167            }
168
169            let Some((endpoint, card)) = discovered_card else {
170                continue;
171            };
172
173            let peer_id = format!("{}::{}", endpoint, card.name);
174            let is_new = {
175                let mut known = discovered.lock().await;
176                known.insert(peer_id)
177            };
178
179            bus.registry.register(card.clone());
180
181            if is_new {
182                tracing::info!(
183                    agent = %agent_name,
184                    peer_name = %card.name,
185                    peer_url = %card.url,
186                    endpoint = %endpoint,
187                    "Discovered A2A peer"
188                );
189
190                if auto_introduce {
191                    if let Err(error) = send_intro(&endpoint, &agent_name, &self_url).await {
192                        tracing::warn!(
193                            agent = %agent_name,
194                            peer = %endpoint,
195                            error = %error,
196                            "Auto-intro message failed"
197                        );
198                    }
199                }
200            }
201        }
202    }
203}
204
205fn peer_candidates(seed: &str) -> Vec<String> {
206    if seed.ends_with("/a2a") {
207        return vec![seed.to_string()];
208    }
209
210    vec![seed.to_string(), format!("{seed}/a2a")]
211}
212
213async fn try_fetch_agent_card(endpoint: &str) -> Result<crate::a2a::types::AgentCard> {
214    let client = A2AClient::new(endpoint);
215    let card = client.get_agent_card().await?;
216    Ok(card)
217}
218
219async fn send_intro(endpoint: &str, agent_name: &str, self_url: &str) -> Result<()> {
220    let client = A2AClient::new(endpoint);
221    let payload = MessageSendParams {
222        message: Message {
223            message_id: uuid::Uuid::new_v4().to_string(),
224            role: MessageRole::User,
225            parts: vec![Part::Text {
226                text: format!(
227                    "Hello from {agent_name} ({self_url}). I am online and available for A2A collaboration."
228                ),
229            }],
230            context_id: None,
231            task_id: None,
232            metadata: std::collections::HashMap::new(),
233            extensions: vec![],
234        },
235        configuration: Some(MessageSendConfiguration {
236            accepted_output_modes: vec!["text/plain".to_string()],
237            blocking: Some(false),
238            history_length: Some(0),
239            push_notification_config: None,
240        }),
241    };
242
243    let _ = client.send_message(payload).await?;
244    tracing::info!(peer = %endpoint, "Auto-intro message sent");
245    Ok(())
246}
247
248#[cfg(test)]
249mod tests {
250    use super::{collect_peers, peer_candidates};
251
252    #[test]
253    fn collect_peers_deduplicates_and_skips_self() {
254        let peers = vec![
255            "localhost:5000".to_string(),
256            "http://localhost:5000/".to_string(),
257            "http://localhost:5001".to_string(),
258            "http://localhost:5002".to_string(),
259        ];
260
261        let mut out = collect_peers(&peers, "http://localhost:5001");
262        out.sort();
263
264        assert_eq!(
265            out,
266            vec![
267                "http://localhost:5000".to_string(),
268                "http://localhost:5002".to_string(),
269            ]
270        );
271    }
272
273    #[test]
274    fn peer_candidates_adds_a2a_variant() {
275        let out = peer_candidates("http://localhost:4096");
276        assert_eq!(
277            out,
278            vec![
279                "http://localhost:4096".to_string(),
280                "http://localhost:4096/a2a".to_string()
281            ]
282        );
283
284        let out2 = peer_candidates("http://localhost:4096/a2a");
285        assert_eq!(out2, vec!["http://localhost:4096/a2a".to_string()]);
286    }
287}