Skip to main content

codetether_agent/a2a/
spawn.rs

1//! Spawn an autonomous A2A agent runtime with auto peer discovery.
2
3use crate::a2a::client::A2AClient;
4use crate::a2a::server::A2AServer;
5use crate::a2a::types::{Message, MessageRole, MessageSendConfiguration, MessageSendParams, Part};
6use crate::bus::AgentBus;
7use crate::cli::SpawnArgs;
8use anyhow::{Context, Result};
9use axum::Router;
10use reqwest::Url;
11use std::collections::HashSet;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Mutex;
15
16/// Run the spawn command.
17pub async fn run(args: SpawnArgs) -> Result<()> {
18    let bind_addr = format!("{}:{}", args.hostname, args.port);
19    let public_url = args
20        .public_url
21        .clone()
22        .unwrap_or_else(|| format!("http://{bind_addr}"));
23    let public_url = normalize_base_url(&public_url)?;
24
25    let agent_name = args
26        .name
27        .clone()
28        .unwrap_or_else(|| format!("spawned-agent-{}", std::process::id()));
29
30    let mut card = A2AServer::default_card(&public_url);
31    card.name = agent_name.clone();
32    if let Some(description) = args.description.clone() {
33        card.description = description;
34    }
35
36    let bus = AgentBus::new().into_arc();
37
38    // Auto-start S3 sink if MinIO is configured
39    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
40    bus.registry.register(card.clone());
41    let handle = bus.handle(&agent_name);
42    let capabilities = card.skills.iter().map(|skill| skill.id.clone()).collect();
43    handle.announce_ready(capabilities);
44
45    let peers = collect_peers(&args.peer, &public_url);
46    if peers.is_empty() {
47        tracing::info!(
48            agent = %agent_name,
49            "Spawned without peer seeds; pass --peer or CODETETHER_A2A_PEERS for discovery"
50        );
51    } else {
52        tracing::info!(
53            agent = %agent_name,
54            peer_count = peers.len(),
55            "Spawned with peer discovery seeds"
56        );
57    }
58
59    let discovery_task = tokio::spawn(discovery_loop(
60        Arc::clone(&bus),
61        peers,
62        normalize_base_url(&public_url)?,
63        agent_name.clone(),
64        args.discovery_interval_secs.max(5),
65        args.auto_introduce,
66    ));
67
68    let router: Router = A2AServer::new(card).router();
69    let listener = tokio::net::TcpListener::bind(&bind_addr)
70        .await
71        .with_context(|| format!("Failed to bind spawn agent on {bind_addr}"))?;
72
73    tracing::info!(
74        agent = %agent_name,
75        bind_addr = %bind_addr,
76        public_url = %public_url,
77        "Spawned A2A agent runtime"
78    );
79
80    axum::serve(listener, router)
81        .with_graceful_shutdown(shutdown_signal())
82        .await
83        .context("Spawned A2A server failed")?;
84
85    discovery_task.abort();
86    tracing::info!(agent = %agent_name, "Spawned A2A agent shut down");
87    Ok(())
88}
89
90async fn shutdown_signal() {
91    let _ = tokio::signal::ctrl_c().await;
92    tracing::info!("Shutdown signal received");
93}
94
95fn normalize_base_url(url: &str) -> Result<String> {
96    let trimmed = url.trim();
97    if trimmed.is_empty() {
98        anyhow::bail!("URL cannot be empty");
99    }
100
101    let normalized = if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
102        trimmed.to_string()
103    } else {
104        format!("http://{trimmed}")
105    };
106
107    let parsed = Url::parse(&normalized).with_context(|| format!("Invalid URL: {normalized}"))?;
108    let mut cleaned = parsed.to_string();
109    if cleaned.ends_with('/') {
110        cleaned.pop();
111    }
112    Ok(cleaned)
113}
114
115fn collect_peers(raw_peers: &[String], self_url: &str) -> Vec<String> {
116    let mut dedup = HashSet::new();
117    let self_url = self_url.trim_end_matches('/');
118
119    for raw in raw_peers {
120        if raw.trim().is_empty() {
121            continue;
122        }
123
124        if let Ok(normalized) = normalize_base_url(raw)
125            && normalized.trim_end_matches('/') != self_url
126        {
127            dedup.insert(normalized);
128        }
129    }
130
131    dedup.into_iter().collect()
132}
133
134async fn discovery_loop(
135    bus: Arc<AgentBus>,
136    peers: Vec<String>,
137    self_url: String,
138    agent_name: String,
139    interval_secs: u64,
140    auto_introduce: bool,
141) {
142    let discovered = Arc::new(Mutex::new(HashSet::<String>::new()));
143    let mut ticker = tokio::time::interval(Duration::from_secs(interval_secs));
144
145    loop {
146        ticker.tick().await;
147
148        for peer_seed in &peers {
149            let candidates = peer_candidates(peer_seed);
150            let mut discovered_card = None;
151
152            for candidate in candidates {
153                match try_fetch_agent_card(&candidate).await {
154                    Ok(card) => {
155                        discovered_card = Some((candidate, card));
156                        break;
157                    }
158                    Err(error) => {
159                        tracing::debug!(
160                            agent = %agent_name,
161                            peer = %candidate,
162                            error = %error,
163                            "Peer probe failed"
164                        );
165                    }
166                }
167            }
168
169            let Some((endpoint, card)) = discovered_card else {
170                continue;
171            };
172
173            let peer_id = format!("{}::{}", endpoint, card.name);
174            let is_new = {
175                let mut known = discovered.lock().await;
176                known.insert(peer_id)
177            };
178
179            bus.registry.register(card.clone());
180
181            if is_new {
182                tracing::info!(
183                    agent = %agent_name,
184                    peer_name = %card.name,
185                    peer_url = %card.url,
186                    endpoint = %endpoint,
187                    "Discovered A2A peer"
188                );
189
190                if auto_introduce
191                    && let Err(error) = send_intro(&endpoint, &agent_name, &self_url).await
192                {
193                    tracing::warn!(
194                        agent = %agent_name,
195                        peer = %endpoint,
196                        error = %error,
197                        "Auto-intro message failed"
198                    );
199                }
200            }
201        }
202    }
203}
204
205fn peer_candidates(seed: &str) -> Vec<String> {
206    if seed.ends_with("/a2a") {
207        return vec![seed.to_string()];
208    }
209
210    vec![seed.to_string(), format!("{seed}/a2a")]
211}
212
213async fn try_fetch_agent_card(endpoint: &str) -> Result<crate::a2a::types::AgentCard> {
214    let mut client = A2AClient::new(endpoint);
215    if let Ok(token) = std::env::var("CODETETHER_AUTH_TOKEN") {
216        client = client.with_token(token);
217    }
218    let card = client.get_agent_card().await?;
219    Ok(card)
220}
221
222async fn send_intro(endpoint: &str, agent_name: &str, self_url: &str) -> Result<()> {
223    let mut client = A2AClient::new(endpoint);
224    if let Ok(token) = std::env::var("CODETETHER_AUTH_TOKEN") {
225        client = client.with_token(token);
226    }
227    let payload = MessageSendParams {
228        message: Message {
229            message_id: uuid::Uuid::new_v4().to_string(),
230            role: MessageRole::User,
231            parts: vec![Part::Text {
232                text: format!(
233                    "Hello from {agent_name} ({self_url}). I am online and available for A2A collaboration."
234                ),
235            }],
236            context_id: None,
237            task_id: None,
238            metadata: std::collections::HashMap::new(),
239            extensions: vec![],
240        },
241        configuration: Some(MessageSendConfiguration {
242            accepted_output_modes: vec!["text/plain".to_string()],
243            blocking: Some(false),
244            history_length: Some(0),
245            push_notification_config: None,
246        }),
247    };
248
249    let _ = client.send_message(payload).await?;
250    tracing::info!(peer = %endpoint, "Auto-intro message sent");
251    Ok(())
252}
253
254#[cfg(test)]
255mod tests {
256    use super::{collect_peers, peer_candidates};
257
258    #[test]
259    fn collect_peers_deduplicates_and_skips_self() {
260        let peers = vec![
261            "localhost:5000".to_string(),
262            "http://localhost:5000/".to_string(),
263            "http://localhost:5001".to_string(),
264            "http://localhost:5002".to_string(),
265        ];
266
267        let mut out = collect_peers(&peers, "http://localhost:5001");
268        out.sort();
269
270        assert_eq!(
271            out,
272            vec![
273                "http://localhost:5000".to_string(),
274                "http://localhost:5002".to_string(),
275            ]
276        );
277    }
278
279    #[test]
280    fn peer_candidates_adds_a2a_variant() {
281        let out = peer_candidates("http://localhost:4096");
282        assert_eq!(
283            out,
284            vec![
285                "http://localhost:4096".to_string(),
286                "http://localhost:4096/a2a".to_string()
287            ]
288        );
289
290        let out2 = peer_candidates("http://localhost:4096/a2a");
291        assert_eq!(out2, vec!["http://localhost:4096/a2a".to_string()]);
292    }
293}