Skip to main content

codetether_agent/bus/
mod.rs

1//! Central in-process agent message bus
2//!
3//! Provides a broadcast-based pub/sub bus that all local agents (sub-agents,
4//! workers, the A2A server) can plug into.  Remote agents talk to the bus
5//! through the gRPC / JSON-RPC bridge; local agents get zero-copy clones
6//! via `BusHandle`.
7//!
8//! # Topic routing
9//!
10//! Every envelope carries a `topic` string that follows a hierarchical scheme:
11//!
12//! | Pattern | Semantics |
13//! |---------|-----------|
14//! | `agent.{id}` | Messages *to* a specific agent |
15//! | `agent.{id}.events` | Events *from* a specific agent |
16//! | `task.{id}` | All updates for a task |
17//! | `swarm.{id}` | Swarm-level coordination |
18//! | `broadcast` | Global announcements |
19//! | `tools.{name}` | Tool-specific channels |
20
21pub mod global;
22pub mod registry;
23pub mod relay;
24pub mod s3_sink;
25
26pub use global::{global, set_global};
27
28use crate::a2a::types::{Artifact, Part, TaskState};
29use chrono::{DateTime, Utc};
30use serde::{Deserialize, Serialize};
31use std::sync::Arc;
32use tokio::sync::broadcast;
33use uuid::Uuid;
34
35// ─── Envelope & Messages ─────────────────────────────────────────────────
36
37/// Metadata wrapper for every message that travels through the bus.
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BusEnvelope {
40    /// Unique id for this envelope
41    pub id: String,
42    /// Hierarchical topic for routing (e.g. `agent.{id}`, `task.{id}`)
43    pub topic: String,
44    /// The agent that originated this message
45    pub sender_id: String,
46    /// Optional correlation id (links request → response)
47    pub correlation_id: Option<String>,
48    /// When the envelope was created
49    pub timestamp: DateTime<Utc>,
50    /// The payload
51    pub message: BusMessage,
52}
53
54/// The set of messages the bus can carry.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(tag = "kind", rename_all = "snake_case")]
57pub enum BusMessage {
58    /// An agent has come online and is ready to receive work
59    AgentReady {
60        agent_id: String,
61        capabilities: Vec<String>,
62    },
63    /// An agent is shutting down
64    AgentShutdown { agent_id: String },
65    /// Free-form message from one agent to another (mirrors A2A `Message`)
66    AgentMessage {
67        from: String,
68        to: String,
69        parts: Vec<Part>,
70    },
71    /// Task status changed
72    TaskUpdate {
73        task_id: String,
74        state: TaskState,
75        message: Option<String>,
76    },
77    /// A new artifact was produced for a task
78    ArtifactUpdate { task_id: String, artifact: Artifact },
79    /// A sub-agent published a shared result (replaces raw `ResultStore` publish)
80    SharedResult {
81        key: String,
82        value: serde_json::Value,
83        tags: Vec<String>,
84    },
85    /// Tool execution request (for shared tool dispatch)
86    ToolRequest {
87        request_id: String,
88        agent_id: String,
89        tool_name: String,
90        arguments: serde_json::Value,
91        step: usize,
92    },
93    /// Tool execution response
94    ToolResponse {
95        request_id: String,
96        agent_id: String,
97        tool_name: String,
98        result: String,
99        success: bool,
100        step: usize,
101    },
102    /// Heartbeat (keep-alive / health signal)
103    Heartbeat { agent_id: String, status: String },
104
105    // ── Ralph loop messages ──────────────────────────────────────────
106    /// An agent shares learnings from a Ralph iteration (insights,
107    /// blockers, patterns discovered) so subsequent iterations or
108    /// co-ordinating agents can build on them.
109    RalphLearning {
110        prd_id: String,
111        story_id: String,
112        iteration: usize,
113        learnings: Vec<String>,
114        context: serde_json::Value,
115    },
116
117    /// Context handoff between sequential Ralph stories.  The finishing
118    /// story publishes what the *next* story should know.
119    RalphHandoff {
120        prd_id: String,
121        from_story: String,
122        to_story: String,
123        context: serde_json::Value,
124        progress_summary: String,
125    },
126
127    /// PRD-level progress update (aggregated across all stories).
128    RalphProgress {
129        prd_id: String,
130        passed: usize,
131        total: usize,
132        iteration: usize,
133        status: String,
134    },
135
136    // ── Full tool output ─────────────────────────────────────────────
137    /// Full, untruncated tool output from an agent loop step.
138    /// Published *before* the result is truncated for the LLM context
139    /// window, so other agents and the bus log see the complete output.
140    ToolOutputFull {
141        agent_id: String,
142        tool_name: String,
143        output: String,
144        success: bool,
145        step: usize,
146    },
147
148    /// Internal reasoning / chain-of-thought from an agent step.
149    /// Published so the training pipeline captures the model's
150    /// thinking process alongside its actions.
151    AgentThinking {
152        agent_id: String,
153        thinking: String,
154        step: usize,
155    },
156
157    // ── Voice session messages ───────────────────────────────────────
158    /// A voice session (LiveKit room) has been created.
159    VoiceSessionStarted {
160        room_name: String,
161        agent_id: String,
162        voice_id: String,
163    },
164
165    /// A transcript fragment from a voice session.
166    VoiceTranscript {
167        room_name: String,
168        text: String,
169        /// "user" or "agent"
170        role: String,
171        is_final: bool,
172    },
173
174    /// The voice agent's state changed (e.g. listening, thinking, speaking).
175    VoiceAgentStateChanged { room_name: String, state: String },
176
177    /// A voice session has ended.
178    VoiceSessionEnded { room_name: String, reason: String },
179}
180
181// ─── Constants ───────────────────────────────────────────────────────────
182
183/// Default channel capacity (per bus instance)
184const DEFAULT_BUS_CAPACITY: usize = 4096;
185
186// ─── AgentBus ────────────────────────────────────────────────────────────
187
188/// The central in-process message bus.
189///
190/// Internally this is a `tokio::sync::broadcast` channel so every subscriber
191/// receives every message.  Filtering by topic is done on the consumer side
192/// through `BusHandle::subscribe_topic` / `BusHandle::recv_filtered`.
193pub struct AgentBus {
194    tx: broadcast::Sender<BusEnvelope>,
195    /// Registry of connected agents
196    pub registry: Arc<registry::AgentRegistry>,
197}
198
199impl std::fmt::Debug for AgentBus {
200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201        f.debug_struct("AgentBus")
202            .field("subscribers", &self.tx.receiver_count())
203            .finish()
204    }
205}
206
207impl AgentBus {
208    /// Create a new bus with default capacity.
209    pub fn new() -> Self {
210        Self::with_capacity(DEFAULT_BUS_CAPACITY)
211    }
212
213    /// Create a new bus with a specific channel capacity.
214    pub fn with_capacity(capacity: usize) -> Self {
215        let (tx, _) = broadcast::channel(capacity);
216        Self {
217            tx,
218            registry: Arc::new(registry::AgentRegistry::new()),
219        }
220    }
221
222    /// Wrap `self` in an `Arc` for sharing across tasks.
223    pub fn into_arc(self) -> Arc<Self> {
224        Arc::new(self)
225    }
226
227    /// Create a `BusHandle` scoped to a specific agent.
228    pub fn handle(self: &Arc<Self>, agent_id: impl Into<String>) -> BusHandle {
229        BusHandle {
230            agent_id: agent_id.into(),
231            bus: Arc::clone(self),
232            rx: self.tx.subscribe(),
233        }
234    }
235
236    /// Publish an envelope directly (low-level).
237    pub fn publish(&self, envelope: BusEnvelope) -> usize {
238        match &envelope.message {
239            BusMessage::AgentReady {
240                agent_id,
241                capabilities,
242            } => {
243                self.registry.register_ready(agent_id, capabilities);
244            }
245            BusMessage::AgentShutdown { agent_id } => {
246                self.registry.deregister(agent_id);
247            }
248            _ => {}
249        }
250
251        // Returns the number of receivers that got the message.
252        // If there are no receivers the message is silently dropped.
253        self.tx.send(envelope).unwrap_or(0)
254    }
255
256    /// Number of active receivers.
257    pub fn receiver_count(&self) -> usize {
258        self.tx.receiver_count()
259    }
260}
261
262impl Default for AgentBus {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268// ─── BusHandle ───────────────────────────────────────────────────────────
269
270/// A scoped handle that a single agent uses to send and receive messages.
271///
272/// The handle knows the agent's id and uses it as `sender_id` on outgoing
273/// envelopes.  It also provides filtered receive helpers.
274pub struct BusHandle {
275    agent_id: String,
276    bus: Arc<AgentBus>,
277    rx: broadcast::Receiver<BusEnvelope>,
278}
279
280impl BusHandle {
281    /// The agent id this handle belongs to.
282    pub fn agent_id(&self) -> &str {
283        &self.agent_id
284    }
285
286    /// Send a message on the given topic.
287    pub fn send(&self, topic: impl Into<String>, message: BusMessage) -> usize {
288        self.send_with_correlation(topic, message, None)
289    }
290
291    /// Send a message with a correlation id.
292    pub fn send_with_correlation(
293        &self,
294        topic: impl Into<String>,
295        message: BusMessage,
296        correlation_id: Option<String>,
297    ) -> usize {
298        let envelope = BusEnvelope {
299            id: Uuid::new_v4().to_string(),
300            topic: topic.into(),
301            sender_id: self.agent_id.clone(),
302            correlation_id,
303            timestamp: Utc::now(),
304            message,
305        };
306        self.bus.publish(envelope)
307    }
308
309    /// Announce this agent as ready.
310    pub fn announce_ready(&self, capabilities: Vec<String>) -> usize {
311        self.send(
312            "broadcast",
313            BusMessage::AgentReady {
314                agent_id: self.agent_id.clone(),
315                capabilities,
316            },
317        )
318    }
319
320    /// Announce this agent is shutting down.
321    pub fn announce_shutdown(&self) -> usize {
322        self.send(
323            "broadcast",
324            BusMessage::AgentShutdown {
325                agent_id: self.agent_id.clone(),
326            },
327        )
328    }
329
330    /// Announce a task status update.
331    pub fn send_task_update(
332        &self,
333        task_id: &str,
334        state: TaskState,
335        message: Option<String>,
336    ) -> usize {
337        self.send(
338            format!("task.{task_id}"),
339            BusMessage::TaskUpdate {
340                task_id: task_id.to_string(),
341                state,
342                message,
343            },
344        )
345    }
346
347    /// Announce an artifact update.
348    pub fn send_artifact_update(&self, task_id: &str, artifact: Artifact) -> usize {
349        self.send(
350            format!("task.{task_id}"),
351            BusMessage::ArtifactUpdate {
352                task_id: task_id.to_string(),
353                artifact,
354            },
355        )
356    }
357
358    /// Send a direct message to another agent.
359    pub fn send_to_agent(&self, to: &str, parts: Vec<Part>) -> usize {
360        self.send(
361            format!("agent.{to}"),
362            BusMessage::AgentMessage {
363                from: self.agent_id.clone(),
364                to: to.to_string(),
365                parts,
366            },
367        )
368    }
369
370    /// Publish a shared result (visible to the entire swarm).
371    pub fn publish_shared_result(
372        &self,
373        key: impl Into<String>,
374        value: serde_json::Value,
375        tags: Vec<String>,
376    ) -> usize {
377        let key = key.into();
378        self.send(
379            format!("results.{}", &key),
380            BusMessage::SharedResult { key, value, tags },
381        )
382    }
383
384    // ── Ralph helpers ────────────────────────────────────────────────
385
386    /// Publish learnings from a Ralph iteration so other agents / future
387    /// iterations can build on them.
388    pub fn publish_ralph_learning(
389        &self,
390        prd_id: &str,
391        story_id: &str,
392        iteration: usize,
393        learnings: Vec<String>,
394        context: serde_json::Value,
395    ) -> usize {
396        self.send(
397            format!("ralph.{prd_id}"),
398            BusMessage::RalphLearning {
399                prd_id: prd_id.to_string(),
400                story_id: story_id.to_string(),
401                iteration,
402                learnings,
403                context,
404            },
405        )
406    }
407
408    /// Publish a context handoff between sequential Ralph stories.
409    pub fn publish_ralph_handoff(
410        &self,
411        prd_id: &str,
412        from_story: &str,
413        to_story: &str,
414        context: serde_json::Value,
415        progress_summary: &str,
416    ) -> usize {
417        self.send(
418            format!("ralph.{prd_id}"),
419            BusMessage::RalphHandoff {
420                prd_id: prd_id.to_string(),
421                from_story: from_story.to_string(),
422                to_story: to_story.to_string(),
423                context,
424                progress_summary: progress_summary.to_string(),
425            },
426        )
427    }
428
429    /// Publish PRD-level progress.
430    pub fn publish_ralph_progress(
431        &self,
432        prd_id: &str,
433        passed: usize,
434        total: usize,
435        iteration: usize,
436        status: &str,
437    ) -> usize {
438        self.send(
439            format!("ralph.{prd_id}"),
440            BusMessage::RalphProgress {
441                prd_id: prd_id.to_string(),
442                passed,
443                total,
444                iteration,
445                status: status.to_string(),
446            },
447        )
448    }
449
450    /// Drain all accumulated Ralph learnings for a PRD (non-blocking).
451    pub fn drain_ralph_learnings(&mut self, prd_id: &str) -> Vec<BusEnvelope> {
452        let prefix = format!("ralph.{prd_id}");
453        let mut out = Vec::new();
454        while let Some(env) = self.try_recv() {
455            if env.topic.starts_with(&prefix)
456                && matches!(
457                    &env.message,
458                    BusMessage::RalphLearning { .. } | BusMessage::RalphHandoff { .. }
459                )
460            {
461                out.push(env);
462            }
463        }
464        out
465    }
466
467    // ── Voice helpers ────────────────────────────────────────────────
468
469    /// Announce that a voice session has started.
470    pub fn send_voice_session_started(&self, room_name: &str, voice_id: &str) -> usize {
471        self.send(
472            format!("voice.{room_name}"),
473            BusMessage::VoiceSessionStarted {
474                room_name: room_name.to_string(),
475                agent_id: self.agent_id.clone(),
476                voice_id: voice_id.to_string(),
477            },
478        )
479    }
480
481    /// Publish a transcript fragment from a voice session.
482    pub fn send_voice_transcript(
483        &self,
484        room_name: &str,
485        text: &str,
486        role: &str,
487        is_final: bool,
488    ) -> usize {
489        self.send(
490            format!("voice.{room_name}"),
491            BusMessage::VoiceTranscript {
492                room_name: room_name.to_string(),
493                text: text.to_string(),
494                role: role.to_string(),
495                is_final,
496            },
497        )
498    }
499
500    /// Announce a voice agent state change.
501    pub fn send_voice_agent_state(&self, room_name: &str, state: &str) -> usize {
502        self.send(
503            format!("voice.{room_name}"),
504            BusMessage::VoiceAgentStateChanged {
505                room_name: room_name.to_string(),
506                state: state.to_string(),
507            },
508        )
509    }
510
511    /// Announce that a voice session has ended.
512    pub fn send_voice_session_ended(&self, room_name: &str, reason: &str) -> usize {
513        self.send(
514            format!("voice.{room_name}"),
515            BusMessage::VoiceSessionEnded {
516                room_name: room_name.to_string(),
517                reason: reason.to_string(),
518            },
519        )
520    }
521
522    /// Receive the next envelope (blocks until available).
523    pub async fn recv(&mut self) -> Option<BusEnvelope> {
524        loop {
525            match self.rx.recv().await {
526                Ok(env) => return Some(env),
527                Err(broadcast::error::RecvError::Lagged(n)) => {
528                    tracing::warn!(
529                        agent_id = %self.agent_id,
530                        skipped = n,
531                        "Bus handle lagged, skipping messages"
532                    );
533                    continue;
534                }
535                Err(broadcast::error::RecvError::Closed) => return None,
536            }
537        }
538    }
539
540    /// Receive the next envelope whose topic starts with the given prefix.
541    pub async fn recv_topic(&mut self, prefix: &str) -> Option<BusEnvelope> {
542        loop {
543            match self.recv().await {
544                Some(env) if env.topic.starts_with(prefix) => return Some(env),
545                Some(_) => continue, // wrong topic, skip
546                None => return None,
547            }
548        }
549    }
550
551    /// Receive the next envelope addressed to this agent.
552    pub async fn recv_mine(&mut self) -> Option<BusEnvelope> {
553        let prefix = format!("agent.{}", self.agent_id);
554        self.recv_topic(&prefix).await
555    }
556
557    /// Try to receive without blocking. Returns `None` if no message is
558    /// queued.
559    pub fn try_recv(&mut self) -> Option<BusEnvelope> {
560        loop {
561            match self.rx.try_recv() {
562                Ok(env) => return Some(env),
563                Err(broadcast::error::TryRecvError::Lagged(n)) => {
564                    tracing::warn!(
565                        agent_id = %self.agent_id,
566                        skipped = n,
567                        "Bus handle lagged (try_recv), skipping"
568                    );
569                    continue;
570                }
571                Err(broadcast::error::TryRecvError::Empty)
572                | Err(broadcast::error::TryRecvError::Closed) => return None,
573            }
574        }
575    }
576
577    /// Access the agent registry.
578    pub fn registry(&self) -> &Arc<registry::AgentRegistry> {
579        &self.bus.registry
580    }
581
582    /// Get mutable access to the underlying receiver.
583    /// This allows consuming the receiver in stream operations.
584    pub fn into_receiver(self) -> broadcast::Receiver<BusEnvelope> {
585        self.rx
586    }
587}
588
589// ─── Tests ───────────────────────────────────────────────────────────────
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594
595    #[tokio::test]
596    async fn test_bus_send_recv() {
597        let bus = AgentBus::new().into_arc();
598        let mut handle_a = bus.handle("agent-a");
599        let mut handle_b = bus.handle("agent-b");
600
601        handle_a.send_to_agent(
602            "agent-b",
603            vec![Part::Text {
604                text: "hello".into(),
605            }],
606        );
607
608        // Both handles receive the broadcast
609        let env = handle_b.recv().await.unwrap();
610        assert_eq!(env.topic, "agent.agent-b");
611        match &env.message {
612            BusMessage::AgentMessage { from, to, .. } => {
613                assert_eq!(from, "agent-a");
614                assert_eq!(to, "agent-b");
615            }
616            other => panic!("unexpected message: {other:?}"),
617        }
618
619        // handle_a also receives it (broadcast semantics)
620        let env_a = handle_a.try_recv().unwrap();
621        assert_eq!(env_a.topic, "agent.agent-b");
622    }
623
624    #[tokio::test]
625    async fn test_bus_task_update() {
626        let bus = AgentBus::new().into_arc();
627        let handle = bus.handle("worker-1");
628
629        let h2 = bus.handle("observer");
630        // need mutable for recv
631        let mut h2 = h2;
632
633        handle.send_task_update("task-42", TaskState::Working, Some("processing".into()));
634
635        let env = h2.recv().await.unwrap();
636        assert_eq!(env.topic, "task.task-42");
637        match &env.message {
638            BusMessage::TaskUpdate { task_id, state, .. } => {
639                assert_eq!(task_id, "task-42");
640                assert_eq!(*state, TaskState::Working);
641            }
642            other => panic!("unexpected: {other:?}"),
643        }
644    }
645
646    #[tokio::test]
647    async fn test_bus_no_receivers() {
648        let bus = AgentBus::new().into_arc();
649        // No handles created — send should succeed with 0 receivers
650        let env = BusEnvelope {
651            id: "test".into(),
652            topic: "broadcast".into(),
653            sender_id: "nobody".into(),
654            correlation_id: None,
655            timestamp: Utc::now(),
656            message: BusMessage::Heartbeat {
657                agent_id: "nobody".into(),
658                status: "ok".into(),
659            },
660        };
661        let count = bus.publish(env);
662        assert_eq!(count, 0);
663    }
664
665    #[tokio::test]
666    async fn test_recv_topic_filter() {
667        let bus = AgentBus::new().into_arc();
668        let handle = bus.handle("agent-x");
669        let mut listener = bus.handle("listener");
670
671        // Send to two different topics
672        handle.send(
673            "task.1",
674            BusMessage::TaskUpdate {
675                task_id: "1".into(),
676                state: TaskState::Working,
677                message: None,
678            },
679        );
680        handle.send(
681            "task.2",
682            BusMessage::TaskUpdate {
683                task_id: "2".into(),
684                state: TaskState::Completed,
685                message: None,
686            },
687        );
688
689        // recv_topic("task.2") should skip the first and return the second
690        let env = listener.recv_topic("task.2").await.unwrap();
691        match &env.message {
692            BusMessage::TaskUpdate { task_id, .. } => assert_eq!(task_id, "2"),
693            other => panic!("unexpected: {other:?}"),
694        }
695    }
696
697    #[tokio::test]
698    async fn test_ready_shutdown_syncs_registry() {
699        let bus = AgentBus::new().into_arc();
700        let handle = bus.handle("planner-1");
701
702        handle.announce_ready(vec!["plan".to_string(), "review".to_string()]);
703        assert!(bus.registry.get("planner-1").is_some());
704
705        handle.announce_shutdown();
706        assert!(bus.registry.get("planner-1").is_none());
707    }
708}