Skip to main content

codetether_agent/bus/
mod.rs

1//! Central in-process agent message bus
2//!
3//! Provides a broadcast-based pub/sub bus that all local agents (sub-agents,
4//! workers, the A2A server) can plug into.  Remote agents talk to the bus
5//! through the gRPC / JSON-RPC bridge; local agents get zero-copy clones
6//! via `BusHandle`.
7//!
8//! # Topic routing
9//!
10//! Every envelope carries a `topic` string that follows a hierarchical scheme:
11//!
12//! | Pattern | Semantics |
13//! |---------|-----------|
14//! | `agent.{id}` | Messages *to* a specific agent |
15//! | `agent.{id}.events` | Events *from* a specific agent |
16//! | `task.{id}` | All updates for a task |
17//! | `swarm.{id}` | Swarm-level coordination |
18//! | `broadcast` | Global announcements |
19//! | `tools.{name}` | Tool-specific channels |
20pub mod global; pub mod payload;
21pub mod registry;
22pub mod relay;
23pub mod s3_sink;
24
25pub use global::{global, set_global};
26
27use crate::a2a::types::{Artifact, Part, TaskState};
28use chrono::{DateTime, Utc};
29use serde::{Deserialize, Serialize};
30use std::sync::Arc;
31use tokio::sync::broadcast;
32use uuid::Uuid;
33
34// ─── Envelope & Messages ─────────────────────────────────────────────────
35
36/// Metadata wrapper for every message that travels through the bus.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct BusEnvelope {
39    /// Unique id for this envelope
40    pub id: String,
41    /// Hierarchical topic for routing (e.g. `agent.{id}`, `task.{id}`)
42    pub topic: String,
43    /// The agent that originated this message
44    pub sender_id: String,
45    /// Optional correlation id (links request → response)
46    pub correlation_id: Option<String>,
47    /// When the envelope was created
48    pub timestamp: DateTime<Utc>,
49    /// The payload
50    pub message: BusMessage,
51}
52
53/// The set of messages the bus can carry.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(tag = "kind", rename_all = "snake_case")]
56pub enum BusMessage {
57    /// An agent has come online and is ready to receive work
58    AgentReady {
59        agent_id: String,
60        capabilities: Vec<String>,
61    },
62    /// An agent is shutting down
63    AgentShutdown { agent_id: String },
64    /// Free-form message from one agent to another (mirrors A2A `Message`)
65    AgentMessage {
66        from: String,
67        to: String,
68        parts: Vec<Part>,
69    },
70    /// Task status changed
71    TaskUpdate {
72        task_id: String,
73        state: TaskState,
74        message: Option<String>,
75    },
76    /// A new artifact was produced for a task
77    ArtifactUpdate { task_id: String, artifact: Artifact },
78    /// A sub-agent published a shared result (replaces raw `ResultStore` publish)
79    SharedResult {
80        key: String,
81        value: serde_json::Value,
82        tags: Vec<String>,
83    },
84    /// Tool execution request (for shared tool dispatch)
85    ToolRequest {
86        request_id: String,
87        agent_id: String,
88        tool_name: String,
89        arguments: serde_json::Value,
90        step: usize,
91    },
92    /// Tool execution response
93    ToolResponse {
94        request_id: String,
95        agent_id: String,
96        tool_name: String,
97        result: String,
98        success: bool,
99        step: usize,
100    },
101    /// Heartbeat (keep-alive / health signal)
102    Heartbeat { agent_id: String, status: String },
103
104    // ── Ralph loop messages ──────────────────────────────────────────
105    /// An agent shares learnings from a Ralph iteration (insights,
106    /// blockers, patterns discovered) so subsequent iterations or
107    /// co-ordinating agents can build on them.
108    RalphLearning {
109        prd_id: String,
110        story_id: String,
111        iteration: usize,
112        learnings: Vec<String>,
113        context: serde_json::Value,
114    },
115
116    /// Context handoff between sequential Ralph stories.  The finishing
117    /// story publishes what the *next* story should know.
118    RalphHandoff {
119        prd_id: String,
120        from_story: String,
121        to_story: String,
122        context: serde_json::Value,
123        progress_summary: String,
124    },
125
126    /// PRD-level progress update (aggregated across all stories).
127    RalphProgress {
128        prd_id: String,
129        passed: usize,
130        total: usize,
131        iteration: usize,
132        status: String,
133    },
134
135    // ── Full tool output ─────────────────────────────────────────────
136    /// Full, untruncated tool output from an agent loop step.
137    /// Published *before* the result is truncated for the LLM context
138    /// window, so other agents and the bus log see the complete output.
139    ToolOutputFull {
140        agent_id: String,
141        tool_name: String,
142        output: String,
143        success: bool,
144        step: usize,
145    },
146
147    /// Internal reasoning / chain-of-thought from an agent step.
148    /// Published so the training pipeline captures the model's
149    /// thinking process alongside its actions.
150    AgentThinking {
151        agent_id: String,
152        thinking: String,
153        step: usize,
154    },
155
156    // ── Voice session messages ───────────────────────────────────────
157    /// A voice session (LiveKit room) has been created.
158    VoiceSessionStarted {
159        room_name: String,
160        agent_id: String,
161        voice_id: String,
162    },
163
164    /// A transcript fragment from a voice session.
165    VoiceTranscript {
166        room_name: String,
167        text: String,
168        /// "user" or "agent"
169        role: String,
170        is_final: bool,
171    },
172
173    /// The voice agent's state changed (e.g. listening, thinking, speaking).
174    VoiceAgentStateChanged { room_name: String, state: String },
175
176    /// A voice session has ended.
177    VoiceSessionEnded { room_name: String, reason: String },
178}
179
180// ─── Constants ───────────────────────────────────────────────────────────
181
182/// Default channel capacity (per bus instance)
183const DEFAULT_BUS_CAPACITY: usize = 4096;
184
185// ─── AgentBus ────────────────────────────────────────────────────────────
186
187/// The central in-process message bus.
188///
189/// Internally this is a `tokio::sync::broadcast` channel so every subscriber
190/// receives every message.  Filtering by topic is done on the consumer side
191/// through `BusHandle::subscribe_topic` / `BusHandle::recv_filtered`.
192pub struct AgentBus {
193    tx: broadcast::Sender<BusEnvelope>,
194    /// Registry of connected agents
195    pub registry: Arc<registry::AgentRegistry>,
196}
197
198impl std::fmt::Debug for AgentBus {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        f.debug_struct("AgentBus")
201            .field("subscribers", &self.tx.receiver_count())
202            .finish()
203    }
204}
205
206impl AgentBus {
207    /// Create a new bus with default capacity.
208    pub fn new() -> Self {
209        Self::with_capacity(DEFAULT_BUS_CAPACITY)
210    }
211
212    /// Create a new bus with a specific channel capacity.
213    pub fn with_capacity(capacity: usize) -> Self {
214        let (tx, _) = broadcast::channel(capacity);
215        Self {
216            tx,
217            registry: Arc::new(registry::AgentRegistry::new()),
218        }
219    }
220
221    /// Wrap `self` in an `Arc` for sharing across tasks.
222    pub fn into_arc(self) -> Arc<Self> {
223        Arc::new(self)
224    }
225
226    /// Create a `BusHandle` scoped to a specific agent.
227    pub fn handle(self: &Arc<Self>, agent_id: impl Into<String>) -> BusHandle {
228        BusHandle {
229            agent_id: agent_id.into(),
230            bus: Arc::clone(self),
231            rx: self.tx.subscribe(),
232        }
233    }
234
235    /// Publish an envelope directly (low-level).
236    pub fn publish(&self, envelope: BusEnvelope) -> usize {
237        match &envelope.message {
238            BusMessage::AgentReady {
239                agent_id,
240                capabilities,
241            } => {
242                self.registry.register_ready(agent_id, capabilities);
243            }
244            BusMessage::AgentShutdown { agent_id } => {
245                self.registry.deregister(agent_id);
246            }
247            _ => {}
248        }
249
250        // Returns the number of receivers that got the message.
251        // If there are no receivers the message is silently dropped.
252        self.tx.send(envelope).unwrap_or(0)
253    }
254
255    /// Number of active receivers.
256    pub fn receiver_count(&self) -> usize {
257        self.tx.receiver_count()
258    }
259}
260
261impl Default for AgentBus {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267// ─── BusHandle ───────────────────────────────────────────────────────────
268
269/// A scoped handle that a single agent uses to send and receive messages.
270///
271/// The handle knows the agent's id and uses it as `sender_id` on outgoing
272/// envelopes.  It also provides filtered receive helpers.
273pub struct BusHandle {
274    agent_id: String,
275    bus: Arc<AgentBus>,
276    rx: broadcast::Receiver<BusEnvelope>,
277}
278
279impl BusHandle {
280    /// The agent id this handle belongs to.
281    pub fn agent_id(&self) -> &str {
282        &self.agent_id
283    }
284
285    /// Send a message on the given topic.
286    pub fn send(&self, topic: impl Into<String>, message: BusMessage) -> usize {
287        self.send_with_correlation(topic, message, None)
288    }
289
290    /// Send a message with a correlation id.
291    pub fn send_with_correlation(
292        &self,
293        topic: impl Into<String>,
294        message: BusMessage,
295        correlation_id: Option<String>,
296    ) -> usize {
297        let envelope = BusEnvelope {
298            id: Uuid::new_v4().to_string(),
299            topic: topic.into(),
300            sender_id: self.agent_id.clone(),
301            correlation_id,
302            timestamp: Utc::now(),
303            message,
304        };
305        self.bus.publish(envelope)
306    }
307
308    /// Announce this agent as ready.
309    pub fn announce_ready(&self, capabilities: Vec<String>) -> usize {
310        self.send(
311            "broadcast",
312            BusMessage::AgentReady {
313                agent_id: self.agent_id.clone(),
314                capabilities,
315            },
316        )
317    }
318
319    /// Announce this agent is shutting down.
320    pub fn announce_shutdown(&self) -> usize {
321        self.send(
322            "broadcast",
323            BusMessage::AgentShutdown {
324                agent_id: self.agent_id.clone(),
325            },
326        )
327    }
328
329    /// Announce a task status update.
330    pub fn send_task_update(
331        &self,
332        task_id: &str,
333        state: TaskState,
334        message: Option<String>,
335    ) -> usize {
336        self.send(
337            format!("task.{task_id}"),
338            BusMessage::TaskUpdate {
339                task_id: task_id.to_string(),
340                state,
341                message,
342            },
343        )
344    }
345
346    /// Announce an artifact update.
347    pub fn send_artifact_update(&self, task_id: &str, artifact: Artifact) -> usize {
348        self.send(
349            format!("task.{task_id}"),
350            BusMessage::ArtifactUpdate {
351                task_id: task_id.to_string(),
352                artifact,
353            },
354        )
355    }
356
357    /// Send a direct message to another agent.
358    pub fn send_to_agent(&self, to: &str, parts: Vec<Part>) -> usize {
359        self.send(
360            format!("agent.{to}"),
361            BusMessage::AgentMessage {
362                from: self.agent_id.clone(),
363                to: to.to_string(),
364                parts,
365            },
366        )
367    }
368
369    /// Publish a shared result (visible to the entire swarm).
370    pub fn publish_shared_result(
371        &self,
372        key: impl Into<String>,
373        value: serde_json::Value,
374        tags: Vec<String>,
375    ) -> usize {
376        let key = key.into();
377        self.send(
378            format!("results.{}", &key),
379            BusMessage::SharedResult { key, value, tags },
380        )
381    }
382
383    // ── Ralph helpers ────────────────────────────────────────────────
384
385    /// Publish learnings from a Ralph iteration so other agents / future
386    /// iterations can build on them.
387    pub fn publish_ralph_learning(
388        &self,
389        prd_id: &str,
390        story_id: &str,
391        iteration: usize,
392        learnings: Vec<String>,
393        context: serde_json::Value,
394    ) -> usize {
395        self.send(
396            format!("ralph.{prd_id}"),
397            BusMessage::RalphLearning {
398                prd_id: prd_id.to_string(),
399                story_id: story_id.to_string(),
400                iteration,
401                learnings,
402                context,
403            },
404        )
405    }
406
407    /// Publish a context handoff between sequential Ralph stories.
408    pub fn publish_ralph_handoff(
409        &self,
410        prd_id: &str,
411        from_story: &str,
412        to_story: &str,
413        context: serde_json::Value,
414        progress_summary: &str,
415    ) -> usize {
416        self.send(
417            format!("ralph.{prd_id}"),
418            BusMessage::RalphHandoff {
419                prd_id: prd_id.to_string(),
420                from_story: from_story.to_string(),
421                to_story: to_story.to_string(),
422                context,
423                progress_summary: progress_summary.to_string(),
424            },
425        )
426    }
427
428    /// Publish PRD-level progress.
429    pub fn publish_ralph_progress(
430        &self,
431        prd_id: &str,
432        passed: usize,
433        total: usize,
434        iteration: usize,
435        status: &str,
436    ) -> usize {
437        self.send(
438            format!("ralph.{prd_id}"),
439            BusMessage::RalphProgress {
440                prd_id: prd_id.to_string(),
441                passed,
442                total,
443                iteration,
444                status: status.to_string(),
445            },
446        )
447    }
448
449    /// Drain all accumulated Ralph learnings for a PRD (non-blocking).
450    pub fn drain_ralph_learnings(&mut self, prd_id: &str) -> Vec<BusEnvelope> {
451        let prefix = format!("ralph.{prd_id}");
452        let mut out = Vec::new();
453        while let Some(env) = self.try_recv() {
454            if env.topic.starts_with(&prefix)
455                && matches!(
456                    &env.message,
457                    BusMessage::RalphLearning { .. } | BusMessage::RalphHandoff { .. }
458                )
459            {
460                out.push(env);
461            }
462        }
463        out
464    }
465
466    // ── Voice helpers ────────────────────────────────────────────────
467
468    /// Announce that a voice session has started.
469    pub fn send_voice_session_started(&self, room_name: &str, voice_id: &str) -> usize {
470        self.send(
471            format!("voice.{room_name}"),
472            BusMessage::VoiceSessionStarted {
473                room_name: room_name.to_string(),
474                agent_id: self.agent_id.clone(),
475                voice_id: voice_id.to_string(),
476            },
477        )
478    }
479
480    /// Publish a transcript fragment from a voice session.
481    pub fn send_voice_transcript(
482        &self,
483        room_name: &str,
484        text: &str,
485        role: &str,
486        is_final: bool,
487    ) -> usize {
488        self.send(
489            format!("voice.{room_name}"),
490            BusMessage::VoiceTranscript {
491                room_name: room_name.to_string(),
492                text: text.to_string(),
493                role: role.to_string(),
494                is_final,
495            },
496        )
497    }
498
499    /// Announce a voice agent state change.
500    pub fn send_voice_agent_state(&self, room_name: &str, state: &str) -> usize {
501        self.send(
502            format!("voice.{room_name}"),
503            BusMessage::VoiceAgentStateChanged {
504                room_name: room_name.to_string(),
505                state: state.to_string(),
506            },
507        )
508    }
509
510    /// Announce that a voice session has ended.
511    pub fn send_voice_session_ended(&self, room_name: &str, reason: &str) -> usize {
512        self.send(
513            format!("voice.{room_name}"),
514            BusMessage::VoiceSessionEnded {
515                room_name: room_name.to_string(),
516                reason: reason.to_string(),
517            },
518        )
519    }
520
521    /// Receive the next envelope (blocks until available).
522    pub async fn recv(&mut self) -> Option<BusEnvelope> {
523        loop {
524            match self.rx.recv().await {
525                Ok(env) => return Some(env),
526                Err(broadcast::error::RecvError::Lagged(n)) => {
527                    tracing::warn!(
528                        agent_id = %self.agent_id,
529                        skipped = n,
530                        "Bus handle lagged, skipping messages"
531                    );
532                    continue;
533                }
534                Err(broadcast::error::RecvError::Closed) => return None,
535            }
536        }
537    }
538
539    /// Receive the next envelope whose topic starts with the given prefix.
540    pub async fn recv_topic(&mut self, prefix: &str) -> Option<BusEnvelope> {
541        loop {
542            match self.recv().await {
543                Some(env) if env.topic.starts_with(prefix) => return Some(env),
544                Some(_) => continue, // wrong topic, skip
545                None => return None,
546            }
547        }
548    }
549
550    /// Receive the next envelope addressed to this agent.
551    pub async fn recv_mine(&mut self) -> Option<BusEnvelope> {
552        let prefix = format!("agent.{}", self.agent_id);
553        self.recv_topic(&prefix).await
554    }
555
556    /// Try to receive without blocking. Returns `None` if no message is
557    /// queued.
558    pub fn try_recv(&mut self) -> Option<BusEnvelope> {
559        loop {
560            match self.rx.try_recv() {
561                Ok(env) => return Some(env),
562                Err(broadcast::error::TryRecvError::Lagged(n)) => {
563                    tracing::warn!(
564                        agent_id = %self.agent_id,
565                        skipped = n,
566                        "Bus handle lagged (try_recv), skipping"
567                    );
568                    continue;
569                }
570                Err(broadcast::error::TryRecvError::Empty)
571                | Err(broadcast::error::TryRecvError::Closed) => return None,
572            }
573        }
574    }
575
576    /// Access the agent registry.
577    pub fn registry(&self) -> &Arc<registry::AgentRegistry> {
578        &self.bus.registry
579    }
580
581    /// Get mutable access to the underlying receiver.
582    /// This allows consuming the receiver in stream operations.
583    pub fn into_receiver(self) -> broadcast::Receiver<BusEnvelope> {
584        self.rx
585    }
586}
587
588// ─── Tests ───────────────────────────────────────────────────────────────
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593
594    #[tokio::test]
595    async fn test_bus_send_recv() {
596        let bus = AgentBus::new().into_arc();
597        let mut handle_a = bus.handle("agent-a");
598        let mut handle_b = bus.handle("agent-b");
599
600        handle_a.send_to_agent(
601            "agent-b",
602            vec![Part::Text {
603                text: "hello".into(),
604            }],
605        );
606
607        // Both handles receive the broadcast
608        let env = handle_b.recv().await.unwrap();
609        assert_eq!(env.topic, "agent.agent-b");
610        match &env.message {
611            BusMessage::AgentMessage { from, to, .. } => {
612                assert_eq!(from, "agent-a");
613                assert_eq!(to, "agent-b");
614            }
615            other => panic!("unexpected message: {other:?}"),
616        }
617
618        // handle_a also receives it (broadcast semantics)
619        let env_a = handle_a.try_recv().unwrap();
620        assert_eq!(env_a.topic, "agent.agent-b");
621    }
622
623    #[tokio::test]
624    async fn test_bus_task_update() {
625        let bus = AgentBus::new().into_arc();
626        let handle = bus.handle("worker-1");
627
628        let h2 = bus.handle("observer");
629        // need mutable for recv
630        let mut h2 = h2;
631
632        handle.send_task_update("task-42", TaskState::Working, Some("processing".into()));
633
634        let env = h2.recv().await.unwrap();
635        assert_eq!(env.topic, "task.task-42");
636        match &env.message {
637            BusMessage::TaskUpdate { task_id, state, .. } => {
638                assert_eq!(task_id, "task-42");
639                assert_eq!(*state, TaskState::Working);
640            }
641            other => panic!("unexpected: {other:?}"),
642        }
643    }
644
645    #[tokio::test]
646    async fn test_bus_no_receivers() {
647        let bus = AgentBus::new().into_arc();
648        // No handles created — send should succeed with 0 receivers
649        let env = BusEnvelope {
650            id: "test".into(),
651            topic: "broadcast".into(),
652            sender_id: "nobody".into(),
653            correlation_id: None,
654            timestamp: Utc::now(),
655            message: BusMessage::Heartbeat {
656                agent_id: "nobody".into(),
657                status: "ok".into(),
658            },
659        };
660        let count = bus.publish(env);
661        assert_eq!(count, 0);
662    }
663
664    #[tokio::test]
665    async fn test_recv_topic_filter() {
666        let bus = AgentBus::new().into_arc();
667        let handle = bus.handle("agent-x");
668        let mut listener = bus.handle("listener");
669
670        // Send to two different topics
671        handle.send(
672            "task.1",
673            BusMessage::TaskUpdate {
674                task_id: "1".into(),
675                state: TaskState::Working,
676                message: None,
677            },
678        );
679        handle.send(
680            "task.2",
681            BusMessage::TaskUpdate {
682                task_id: "2".into(),
683                state: TaskState::Completed,
684                message: None,
685            },
686        );
687
688        // recv_topic("task.2") should skip the first and return the second
689        let env = listener.recv_topic("task.2").await.unwrap();
690        match &env.message {
691            BusMessage::TaskUpdate { task_id, .. } => assert_eq!(task_id, "2"),
692            other => panic!("unexpected: {other:?}"),
693        }
694    }
695
696    #[tokio::test]
697    async fn test_ready_shutdown_syncs_registry() {
698        let bus = AgentBus::new().into_arc();
699        let handle = bus.handle("planner-1");
700
701        handle.announce_ready(vec!["plan".to_string(), "review".to_string()]);
702        assert!(bus.registry.get("planner-1").is_some());
703
704        handle.announce_shutdown();
705        assert!(bus.registry.get("planner-1").is_none());
706    }
707}