1pub mod registry;
22pub mod relay;
23
24use crate::a2a::types::{Artifact, Part, TaskState};
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27use std::sync::Arc;
28use tokio::sync::broadcast;
29use uuid::Uuid;
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct BusEnvelope {
36 pub id: String,
38 pub topic: String,
40 pub sender_id: String,
42 pub correlation_id: Option<String>,
44 pub timestamp: DateTime<Utc>,
46 pub message: BusMessage,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(tag = "kind", rename_all = "snake_case")]
53pub enum BusMessage {
54 AgentReady {
56 agent_id: String,
57 capabilities: Vec<String>,
58 },
59 AgentShutdown { agent_id: String },
61 AgentMessage {
63 from: String,
64 to: String,
65 parts: Vec<Part>,
66 },
67 TaskUpdate {
69 task_id: String,
70 state: TaskState,
71 message: Option<String>,
72 },
73 ArtifactUpdate { task_id: String, artifact: Artifact },
75 SharedResult {
77 key: String,
78 value: serde_json::Value,
79 tags: Vec<String>,
80 },
81 ToolRequest {
83 request_id: String,
84 agent_id: String,
85 tool_name: String,
86 arguments: serde_json::Value,
87 },
88 ToolResponse {
90 request_id: String,
91 agent_id: String,
92 tool_name: String,
93 result: String,
94 success: bool,
95 },
96 Heartbeat { agent_id: String, status: String },
98}
99
100const DEFAULT_BUS_CAPACITY: usize = 4096;
104
105pub struct AgentBus {
113 tx: broadcast::Sender<BusEnvelope>,
114 pub registry: Arc<registry::AgentRegistry>,
116}
117
118impl AgentBus {
119 pub fn new() -> Self {
121 Self::with_capacity(DEFAULT_BUS_CAPACITY)
122 }
123
124 pub fn with_capacity(capacity: usize) -> Self {
126 let (tx, _) = broadcast::channel(capacity);
127 Self {
128 tx,
129 registry: Arc::new(registry::AgentRegistry::new()),
130 }
131 }
132
133 pub fn into_arc(self) -> Arc<Self> {
135 Arc::new(self)
136 }
137
138 pub fn handle(self: &Arc<Self>, agent_id: impl Into<String>) -> BusHandle {
140 BusHandle {
141 agent_id: agent_id.into(),
142 bus: Arc::clone(self),
143 rx: self.tx.subscribe(),
144 }
145 }
146
147 pub fn publish(&self, envelope: BusEnvelope) -> usize {
149 match &envelope.message {
150 BusMessage::AgentReady {
151 agent_id,
152 capabilities,
153 } => {
154 self.registry.register_ready(agent_id, capabilities);
155 }
156 BusMessage::AgentShutdown { agent_id } => {
157 self.registry.deregister(agent_id);
158 }
159 _ => {}
160 }
161
162 self.tx.send(envelope).unwrap_or(0)
165 }
166
167 pub fn receiver_count(&self) -> usize {
169 self.tx.receiver_count()
170 }
171}
172
173impl Default for AgentBus {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179pub struct BusHandle {
186 agent_id: String,
187 bus: Arc<AgentBus>,
188 rx: broadcast::Receiver<BusEnvelope>,
189}
190
191impl BusHandle {
192 pub fn agent_id(&self) -> &str {
194 &self.agent_id
195 }
196
197 pub fn send(&self, topic: impl Into<String>, message: BusMessage) -> usize {
199 self.send_with_correlation(topic, message, None)
200 }
201
202 pub fn send_with_correlation(
204 &self,
205 topic: impl Into<String>,
206 message: BusMessage,
207 correlation_id: Option<String>,
208 ) -> usize {
209 let envelope = BusEnvelope {
210 id: Uuid::new_v4().to_string(),
211 topic: topic.into(),
212 sender_id: self.agent_id.clone(),
213 correlation_id,
214 timestamp: Utc::now(),
215 message,
216 };
217 self.bus.publish(envelope)
218 }
219
220 pub fn announce_ready(&self, capabilities: Vec<String>) -> usize {
222 self.send(
223 "broadcast",
224 BusMessage::AgentReady {
225 agent_id: self.agent_id.clone(),
226 capabilities,
227 },
228 )
229 }
230
231 pub fn announce_shutdown(&self) -> usize {
233 self.send(
234 "broadcast",
235 BusMessage::AgentShutdown {
236 agent_id: self.agent_id.clone(),
237 },
238 )
239 }
240
241 pub fn send_task_update(
243 &self,
244 task_id: &str,
245 state: TaskState,
246 message: Option<String>,
247 ) -> usize {
248 self.send(
249 format!("task.{task_id}"),
250 BusMessage::TaskUpdate {
251 task_id: task_id.to_string(),
252 state,
253 message,
254 },
255 )
256 }
257
258 pub fn send_artifact_update(&self, task_id: &str, artifact: Artifact) -> usize {
260 self.send(
261 format!("task.{task_id}"),
262 BusMessage::ArtifactUpdate {
263 task_id: task_id.to_string(),
264 artifact,
265 },
266 )
267 }
268
269 pub fn send_to_agent(&self, to: &str, parts: Vec<Part>) -> usize {
271 self.send(
272 format!("agent.{to}"),
273 BusMessage::AgentMessage {
274 from: self.agent_id.clone(),
275 to: to.to_string(),
276 parts,
277 },
278 )
279 }
280
281 pub fn publish_shared_result(
283 &self,
284 key: impl Into<String>,
285 value: serde_json::Value,
286 tags: Vec<String>,
287 ) -> usize {
288 let key = key.into();
289 self.send(
290 format!("results.{}", &key),
291 BusMessage::SharedResult { key, value, tags },
292 )
293 }
294
295 pub async fn recv(&mut self) -> Option<BusEnvelope> {
297 loop {
298 match self.rx.recv().await {
299 Ok(env) => return Some(env),
300 Err(broadcast::error::RecvError::Lagged(n)) => {
301 tracing::warn!(
302 agent_id = %self.agent_id,
303 skipped = n,
304 "Bus handle lagged, skipping messages"
305 );
306 continue;
307 }
308 Err(broadcast::error::RecvError::Closed) => return None,
309 }
310 }
311 }
312
313 pub async fn recv_topic(&mut self, prefix: &str) -> Option<BusEnvelope> {
315 loop {
316 match self.recv().await {
317 Some(env) if env.topic.starts_with(prefix) => return Some(env),
318 Some(_) => continue, None => return None,
320 }
321 }
322 }
323
324 pub async fn recv_mine(&mut self) -> Option<BusEnvelope> {
326 let prefix = format!("agent.{}", self.agent_id);
327 self.recv_topic(&prefix).await
328 }
329
330 pub fn try_recv(&mut self) -> Option<BusEnvelope> {
333 loop {
334 match self.rx.try_recv() {
335 Ok(env) => return Some(env),
336 Err(broadcast::error::TryRecvError::Lagged(n)) => {
337 tracing::warn!(
338 agent_id = %self.agent_id,
339 skipped = n,
340 "Bus handle lagged (try_recv), skipping"
341 );
342 continue;
343 }
344 Err(broadcast::error::TryRecvError::Empty)
345 | Err(broadcast::error::TryRecvError::Closed) => return None,
346 }
347 }
348 }
349
350 pub fn registry(&self) -> &Arc<registry::AgentRegistry> {
352 &self.bus.registry
353 }
354}
355
356#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[tokio::test]
363 async fn test_bus_send_recv() {
364 let bus = AgentBus::new().into_arc();
365 let mut handle_a = bus.handle("agent-a");
366 let mut handle_b = bus.handle("agent-b");
367
368 handle_a.send_to_agent(
369 "agent-b",
370 vec![Part::Text {
371 text: "hello".into(),
372 }],
373 );
374
375 let env = handle_b.recv().await.unwrap();
377 assert_eq!(env.topic, "agent.agent-b");
378 match &env.message {
379 BusMessage::AgentMessage { from, to, .. } => {
380 assert_eq!(from, "agent-a");
381 assert_eq!(to, "agent-b");
382 }
383 other => panic!("unexpected message: {other:?}"),
384 }
385
386 let env_a = handle_a.try_recv().unwrap();
388 assert_eq!(env_a.topic, "agent.agent-b");
389 }
390
391 #[tokio::test]
392 async fn test_bus_task_update() {
393 let bus = AgentBus::new().into_arc();
394 let handle = bus.handle("worker-1");
395
396 let h2 = bus.handle("observer");
397 let mut h2 = h2;
399
400 handle.send_task_update("task-42", TaskState::Working, Some("processing".into()));
401
402 let env = h2.recv().await.unwrap();
403 assert_eq!(env.topic, "task.task-42");
404 match &env.message {
405 BusMessage::TaskUpdate { task_id, state, .. } => {
406 assert_eq!(task_id, "task-42");
407 assert_eq!(*state, TaskState::Working);
408 }
409 other => panic!("unexpected: {other:?}"),
410 }
411 }
412
413 #[tokio::test]
414 async fn test_bus_no_receivers() {
415 let bus = AgentBus::new().into_arc();
416 let env = BusEnvelope {
418 id: "test".into(),
419 topic: "broadcast".into(),
420 sender_id: "nobody".into(),
421 correlation_id: None,
422 timestamp: Utc::now(),
423 message: BusMessage::Heartbeat {
424 agent_id: "nobody".into(),
425 status: "ok".into(),
426 },
427 };
428 let count = bus.publish(env);
429 assert_eq!(count, 0);
430 }
431
432 #[tokio::test]
433 async fn test_recv_topic_filter() {
434 let bus = AgentBus::new().into_arc();
435 let handle = bus.handle("agent-x");
436 let mut listener = bus.handle("listener");
437
438 handle.send(
440 "task.1",
441 BusMessage::TaskUpdate {
442 task_id: "1".into(),
443 state: TaskState::Working,
444 message: None,
445 },
446 );
447 handle.send(
448 "task.2",
449 BusMessage::TaskUpdate {
450 task_id: "2".into(),
451 state: TaskState::Completed,
452 message: None,
453 },
454 );
455
456 let env = listener.recv_topic("task.2").await.unwrap();
458 match &env.message {
459 BusMessage::TaskUpdate { task_id, .. } => assert_eq!(task_id, "2"),
460 other => panic!("unexpected: {other:?}"),
461 }
462 }
463
464 #[tokio::test]
465 async fn test_ready_shutdown_syncs_registry() {
466 let bus = AgentBus::new().into_arc();
467 let handle = bus.handle("planner-1");
468
469 handle.announce_ready(vec!["plan".to_string(), "review".to_string()]);
470 assert!(bus.registry.get("planner-1").is_some());
471
472 handle.announce_shutdown();
473 assert!(bus.registry.get("planner-1").is_none());
474 }
475}