Skip to main content

codetether_agent/bus/
mod.rs

1//! Central in-process agent message bus
2//!
3//! Provides a broadcast-based pub/sub bus that all local agents (sub-agents,
4//! workers, the A2A server) can plug into.  Remote agents talk to the bus
5//! through the gRPC / JSON-RPC bridge; local agents get zero-copy clones
6//! via `BusHandle`.
7//!
8//! # Topic routing
9//!
10//! Every envelope carries a `topic` string that follows a hierarchical scheme:
11//!
12//! | Pattern | Semantics |
13//! |---------|-----------|
14//! | `agent.{id}` | Messages *to* a specific agent |
15//! | `agent.{id}.events` | Events *from* a specific agent |
16//! | `task.{id}` | All updates for a task |
17//! | `swarm.{id}` | Swarm-level coordination |
18//! | `broadcast` | Global announcements |
19//! | `tools.{name}` | Tool-specific channels |
20
21pub mod registry;
22pub mod relay;
23
24use crate::a2a::types::{Artifact, Part, TaskState};
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27use std::sync::Arc;
28use tokio::sync::broadcast;
29use uuid::Uuid;
30
31// ─── Envelope & Messages ─────────────────────────────────────────────────
32
33/// Metadata wrapper for every message that travels through the bus.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct BusEnvelope {
36    /// Unique id for this envelope
37    pub id: String,
38    /// Hierarchical topic for routing (e.g. `agent.{id}`, `task.{id}`)
39    pub topic: String,
40    /// The agent that originated this message
41    pub sender_id: String,
42    /// Optional correlation id (links request → response)
43    pub correlation_id: Option<String>,
44    /// When the envelope was created
45    pub timestamp: DateTime<Utc>,
46    /// The payload
47    pub message: BusMessage,
48}
49
50/// The set of messages the bus can carry.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(tag = "kind", rename_all = "snake_case")]
53pub enum BusMessage {
54    /// An agent has come online and is ready to receive work
55    AgentReady {
56        agent_id: String,
57        capabilities: Vec<String>,
58    },
59    /// An agent is shutting down
60    AgentShutdown { agent_id: String },
61    /// Free-form message from one agent to another (mirrors A2A `Message`)
62    AgentMessage {
63        from: String,
64        to: String,
65        parts: Vec<Part>,
66    },
67    /// Task status changed
68    TaskUpdate {
69        task_id: String,
70        state: TaskState,
71        message: Option<String>,
72    },
73    /// A new artifact was produced for a task
74    ArtifactUpdate { task_id: String, artifact: Artifact },
75    /// A sub-agent published a shared result (replaces raw `ResultStore` publish)
76    SharedResult {
77        key: String,
78        value: serde_json::Value,
79        tags: Vec<String>,
80    },
81    /// Tool execution request (for shared tool dispatch)
82    ToolRequest {
83        request_id: String,
84        agent_id: String,
85        tool_name: String,
86        arguments: serde_json::Value,
87    },
88    /// Tool execution response
89    ToolResponse {
90        request_id: String,
91        agent_id: String,
92        tool_name: String,
93        result: String,
94        success: bool,
95    },
96    /// Heartbeat (keep-alive / health signal)
97    Heartbeat { agent_id: String, status: String },
98}
99
100// ─── Constants ───────────────────────────────────────────────────────────
101
102/// Default channel capacity (per bus instance)
103const DEFAULT_BUS_CAPACITY: usize = 4096;
104
105// ─── AgentBus ────────────────────────────────────────────────────────────
106
107/// The central in-process message bus.
108///
109/// Internally this is a `tokio::sync::broadcast` channel so every subscriber
110/// receives every message.  Filtering by topic is done on the consumer side
111/// through `BusHandle::subscribe_topic` / `BusHandle::recv_filtered`.
112pub struct AgentBus {
113    tx: broadcast::Sender<BusEnvelope>,
114    /// Registry of connected agents
115    pub registry: Arc<registry::AgentRegistry>,
116}
117
118impl AgentBus {
119    /// Create a new bus with default capacity.
120    pub fn new() -> Self {
121        Self::with_capacity(DEFAULT_BUS_CAPACITY)
122    }
123
124    /// Create a new bus with a specific channel capacity.
125    pub fn with_capacity(capacity: usize) -> Self {
126        let (tx, _) = broadcast::channel(capacity);
127        Self {
128            tx,
129            registry: Arc::new(registry::AgentRegistry::new()),
130        }
131    }
132
133    /// Wrap `self` in an `Arc` for sharing across tasks.
134    pub fn into_arc(self) -> Arc<Self> {
135        Arc::new(self)
136    }
137
138    /// Create a `BusHandle` scoped to a specific agent.
139    pub fn handle(self: &Arc<Self>, agent_id: impl Into<String>) -> BusHandle {
140        BusHandle {
141            agent_id: agent_id.into(),
142            bus: Arc::clone(self),
143            rx: self.tx.subscribe(),
144        }
145    }
146
147    /// Publish an envelope directly (low-level).
148    pub fn publish(&self, envelope: BusEnvelope) -> usize {
149        match &envelope.message {
150            BusMessage::AgentReady {
151                agent_id,
152                capabilities,
153            } => {
154                self.registry.register_ready(agent_id, capabilities);
155            }
156            BusMessage::AgentShutdown { agent_id } => {
157                self.registry.deregister(agent_id);
158            }
159            _ => {}
160        }
161
162        // Returns the number of receivers that got the message.
163        // If there are no receivers the message is silently dropped.
164        self.tx.send(envelope).unwrap_or(0)
165    }
166
167    /// Number of active receivers.
168    pub fn receiver_count(&self) -> usize {
169        self.tx.receiver_count()
170    }
171}
172
173impl Default for AgentBus {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179// ─── BusHandle ───────────────────────────────────────────────────────────
180
181/// A scoped handle that a single agent uses to send and receive messages.
182///
183/// The handle knows the agent's id and uses it as `sender_id` on outgoing
184/// envelopes.  It also provides filtered receive helpers.
185pub struct BusHandle {
186    agent_id: String,
187    bus: Arc<AgentBus>,
188    rx: broadcast::Receiver<BusEnvelope>,
189}
190
191impl BusHandle {
192    /// The agent id this handle belongs to.
193    pub fn agent_id(&self) -> &str {
194        &self.agent_id
195    }
196
197    /// Send a message on the given topic.
198    pub fn send(&self, topic: impl Into<String>, message: BusMessage) -> usize {
199        self.send_with_correlation(topic, message, None)
200    }
201
202    /// Send a message with a correlation id.
203    pub fn send_with_correlation(
204        &self,
205        topic: impl Into<String>,
206        message: BusMessage,
207        correlation_id: Option<String>,
208    ) -> usize {
209        let envelope = BusEnvelope {
210            id: Uuid::new_v4().to_string(),
211            topic: topic.into(),
212            sender_id: self.agent_id.clone(),
213            correlation_id,
214            timestamp: Utc::now(),
215            message,
216        };
217        self.bus.publish(envelope)
218    }
219
220    /// Announce this agent as ready.
221    pub fn announce_ready(&self, capabilities: Vec<String>) -> usize {
222        self.send(
223            "broadcast",
224            BusMessage::AgentReady {
225                agent_id: self.agent_id.clone(),
226                capabilities,
227            },
228        )
229    }
230
231    /// Announce this agent is shutting down.
232    pub fn announce_shutdown(&self) -> usize {
233        self.send(
234            "broadcast",
235            BusMessage::AgentShutdown {
236                agent_id: self.agent_id.clone(),
237            },
238        )
239    }
240
241    /// Announce a task status update.
242    pub fn send_task_update(
243        &self,
244        task_id: &str,
245        state: TaskState,
246        message: Option<String>,
247    ) -> usize {
248        self.send(
249            format!("task.{task_id}"),
250            BusMessage::TaskUpdate {
251                task_id: task_id.to_string(),
252                state,
253                message,
254            },
255        )
256    }
257
258    /// Announce an artifact update.
259    pub fn send_artifact_update(&self, task_id: &str, artifact: Artifact) -> usize {
260        self.send(
261            format!("task.{task_id}"),
262            BusMessage::ArtifactUpdate {
263                task_id: task_id.to_string(),
264                artifact,
265            },
266        )
267    }
268
269    /// Send a direct message to another agent.
270    pub fn send_to_agent(&self, to: &str, parts: Vec<Part>) -> usize {
271        self.send(
272            format!("agent.{to}"),
273            BusMessage::AgentMessage {
274                from: self.agent_id.clone(),
275                to: to.to_string(),
276                parts,
277            },
278        )
279    }
280
281    /// Publish a shared result (visible to the entire swarm).
282    pub fn publish_shared_result(
283        &self,
284        key: impl Into<String>,
285        value: serde_json::Value,
286        tags: Vec<String>,
287    ) -> usize {
288        let key = key.into();
289        self.send(
290            format!("results.{}", &key),
291            BusMessage::SharedResult { key, value, tags },
292        )
293    }
294
295    /// Receive the next envelope (blocks until available).
296    pub async fn recv(&mut self) -> Option<BusEnvelope> {
297        loop {
298            match self.rx.recv().await {
299                Ok(env) => return Some(env),
300                Err(broadcast::error::RecvError::Lagged(n)) => {
301                    tracing::warn!(
302                        agent_id = %self.agent_id,
303                        skipped = n,
304                        "Bus handle lagged, skipping messages"
305                    );
306                    continue;
307                }
308                Err(broadcast::error::RecvError::Closed) => return None,
309            }
310        }
311    }
312
313    /// Receive the next envelope whose topic starts with the given prefix.
314    pub async fn recv_topic(&mut self, prefix: &str) -> Option<BusEnvelope> {
315        loop {
316            match self.recv().await {
317                Some(env) if env.topic.starts_with(prefix) => return Some(env),
318                Some(_) => continue, // wrong topic, skip
319                None => return None,
320            }
321        }
322    }
323
324    /// Receive the next envelope addressed to this agent.
325    pub async fn recv_mine(&mut self) -> Option<BusEnvelope> {
326        let prefix = format!("agent.{}", self.agent_id);
327        self.recv_topic(&prefix).await
328    }
329
330    /// Try to receive without blocking. Returns `None` if no message is
331    /// queued.
332    pub fn try_recv(&mut self) -> Option<BusEnvelope> {
333        loop {
334            match self.rx.try_recv() {
335                Ok(env) => return Some(env),
336                Err(broadcast::error::TryRecvError::Lagged(n)) => {
337                    tracing::warn!(
338                        agent_id = %self.agent_id,
339                        skipped = n,
340                        "Bus handle lagged (try_recv), skipping"
341                    );
342                    continue;
343                }
344                Err(broadcast::error::TryRecvError::Empty)
345                | Err(broadcast::error::TryRecvError::Closed) => return None,
346            }
347        }
348    }
349
350    /// Access the agent registry.
351    pub fn registry(&self) -> &Arc<registry::AgentRegistry> {
352        &self.bus.registry
353    }
354}
355
356// ─── Tests ───────────────────────────────────────────────────────────────
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[tokio::test]
363    async fn test_bus_send_recv() {
364        let bus = AgentBus::new().into_arc();
365        let mut handle_a = bus.handle("agent-a");
366        let mut handle_b = bus.handle("agent-b");
367
368        handle_a.send_to_agent(
369            "agent-b",
370            vec![Part::Text {
371                text: "hello".into(),
372            }],
373        );
374
375        // Both handles receive the broadcast
376        let env = handle_b.recv().await.unwrap();
377        assert_eq!(env.topic, "agent.agent-b");
378        match &env.message {
379            BusMessage::AgentMessage { from, to, .. } => {
380                assert_eq!(from, "agent-a");
381                assert_eq!(to, "agent-b");
382            }
383            other => panic!("unexpected message: {other:?}"),
384        }
385
386        // handle_a also receives it (broadcast semantics)
387        let env_a = handle_a.try_recv().unwrap();
388        assert_eq!(env_a.topic, "agent.agent-b");
389    }
390
391    #[tokio::test]
392    async fn test_bus_task_update() {
393        let bus = AgentBus::new().into_arc();
394        let handle = bus.handle("worker-1");
395
396        let h2 = bus.handle("observer");
397        // need mutable for recv
398        let mut h2 = h2;
399
400        handle.send_task_update("task-42", TaskState::Working, Some("processing".into()));
401
402        let env = h2.recv().await.unwrap();
403        assert_eq!(env.topic, "task.task-42");
404        match &env.message {
405            BusMessage::TaskUpdate { task_id, state, .. } => {
406                assert_eq!(task_id, "task-42");
407                assert_eq!(*state, TaskState::Working);
408            }
409            other => panic!("unexpected: {other:?}"),
410        }
411    }
412
413    #[tokio::test]
414    async fn test_bus_no_receivers() {
415        let bus = AgentBus::new().into_arc();
416        // No handles created — send should succeed with 0 receivers
417        let env = BusEnvelope {
418            id: "test".into(),
419            topic: "broadcast".into(),
420            sender_id: "nobody".into(),
421            correlation_id: None,
422            timestamp: Utc::now(),
423            message: BusMessage::Heartbeat {
424                agent_id: "nobody".into(),
425                status: "ok".into(),
426            },
427        };
428        let count = bus.publish(env);
429        assert_eq!(count, 0);
430    }
431
432    #[tokio::test]
433    async fn test_recv_topic_filter() {
434        let bus = AgentBus::new().into_arc();
435        let handle = bus.handle("agent-x");
436        let mut listener = bus.handle("listener");
437
438        // Send to two different topics
439        handle.send(
440            "task.1",
441            BusMessage::TaskUpdate {
442                task_id: "1".into(),
443                state: TaskState::Working,
444                message: None,
445            },
446        );
447        handle.send(
448            "task.2",
449            BusMessage::TaskUpdate {
450                task_id: "2".into(),
451                state: TaskState::Completed,
452                message: None,
453            },
454        );
455
456        // recv_topic("task.2") should skip the first and return the second
457        let env = listener.recv_topic("task.2").await.unwrap();
458        match &env.message {
459            BusMessage::TaskUpdate { task_id, .. } => assert_eq!(task_id, "2"),
460            other => panic!("unexpected: {other:?}"),
461        }
462    }
463
464    #[tokio::test]
465    async fn test_ready_shutdown_syncs_registry() {
466        let bus = AgentBus::new().into_arc();
467        let handle = bus.handle("planner-1");
468
469        handle.announce_ready(vec!["plan".to_string(), "review".to_string()]);
470        assert!(bus.registry.get("planner-1").is_some());
471
472        handle.announce_shutdown();
473        assert!(bus.registry.get("planner-1").is_none());
474    }
475}