Skip to main content

codetether_agent/bus/
mod.rs

1//! Central in-process agent message bus
2//!
3//! Provides a broadcast-based pub/sub bus that all local agents (sub-agents,
4//! workers, the A2A server) can plug into.  Remote agents talk to the bus
5//! through the gRPC / JSON-RPC bridge; local agents get zero-copy clones
6//! via `BusHandle`.
7//!
8//! # Topic routing
9//!
10//! Every envelope carries a `topic` string that follows a hierarchical scheme:
11//!
12//! | Pattern | Semantics |
13//! |---------|-----------|
14//! | `agent.{id}` | Messages *to* a specific agent |
15//! | `agent.{id}.events` | Events *from* a specific agent |
16//! | `task.{id}` | All updates for a task |
17//! | `swarm.{id}` | Swarm-level coordination |
18//! | `broadcast` | Global announcements |
19//! | `tools.{name}` | Tool-specific channels |
20
21pub mod registry;
22pub mod relay;
23pub mod s3_sink;
24
25use crate::a2a::types::{Artifact, Part, TaskState};
26use chrono::{DateTime, Utc};
27use serde::{Deserialize, Serialize};
28use std::sync::Arc;
29use tokio::sync::broadcast;
30use uuid::Uuid;
31
32// ─── Envelope & Messages ─────────────────────────────────────────────────
33
34/// Metadata wrapper for every message that travels through the bus.
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct BusEnvelope {
37    /// Unique id for this envelope
38    pub id: String,
39    /// Hierarchical topic for routing (e.g. `agent.{id}`, `task.{id}`)
40    pub topic: String,
41    /// The agent that originated this message
42    pub sender_id: String,
43    /// Optional correlation id (links request → response)
44    pub correlation_id: Option<String>,
45    /// When the envelope was created
46    pub timestamp: DateTime<Utc>,
47    /// The payload
48    pub message: BusMessage,
49}
50
51/// The set of messages the bus can carry.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[serde(tag = "kind", rename_all = "snake_case")]
54pub enum BusMessage {
55    /// An agent has come online and is ready to receive work
56    AgentReady {
57        agent_id: String,
58        capabilities: Vec<String>,
59    },
60    /// An agent is shutting down
61    AgentShutdown { agent_id: String },
62    /// Free-form message from one agent to another (mirrors A2A `Message`)
63    AgentMessage {
64        from: String,
65        to: String,
66        parts: Vec<Part>,
67    },
68    /// Task status changed
69    TaskUpdate {
70        task_id: String,
71        state: TaskState,
72        message: Option<String>,
73    },
74    /// A new artifact was produced for a task
75    ArtifactUpdate { task_id: String, artifact: Artifact },
76    /// A sub-agent published a shared result (replaces raw `ResultStore` publish)
77    SharedResult {
78        key: String,
79        value: serde_json::Value,
80        tags: Vec<String>,
81    },
82    /// Tool execution request (for shared tool dispatch)
83    ToolRequest {
84        request_id: String,
85        agent_id: String,
86        tool_name: String,
87        arguments: serde_json::Value,
88    },
89    /// Tool execution response
90    ToolResponse {
91        request_id: String,
92        agent_id: String,
93        tool_name: String,
94        result: String,
95        success: bool,
96    },
97    /// Heartbeat (keep-alive / health signal)
98    Heartbeat { agent_id: String, status: String },
99
100    // ── Ralph loop messages ──────────────────────────────────────────
101    /// An agent shares learnings from a Ralph iteration (insights,
102    /// blockers, patterns discovered) so subsequent iterations or
103    /// co-ordinating agents can build on them.
104    RalphLearning {
105        prd_id: String,
106        story_id: String,
107        iteration: usize,
108        learnings: Vec<String>,
109        context: serde_json::Value,
110    },
111
112    /// Context handoff between sequential Ralph stories.  The finishing
113    /// story publishes what the *next* story should know.
114    RalphHandoff {
115        prd_id: String,
116        from_story: String,
117        to_story: String,
118        context: serde_json::Value,
119        progress_summary: String,
120    },
121
122    /// PRD-level progress update (aggregated across all stories).
123    RalphProgress {
124        prd_id: String,
125        passed: usize,
126        total: usize,
127        iteration: usize,
128        status: String,
129    },
130
131    // ── Full tool output ─────────────────────────────────────────────
132    /// Full, untruncated tool output from an agent loop step.
133    /// Published *before* the result is truncated for the LLM context
134    /// window, so other agents and the bus log see the complete output.
135    ToolOutputFull {
136        agent_id: String,
137        tool_name: String,
138        output: String,
139        success: bool,
140        step: usize,
141    },
142
143    /// Internal reasoning / chain-of-thought from an agent step.
144    /// Published so the training pipeline captures the model's
145    /// thinking process alongside its actions.
146    AgentThinking {
147        agent_id: String,
148        thinking: String,
149        step: usize,
150    },
151
152    // ── Voice session messages ───────────────────────────────────────
153    /// A voice session (LiveKit room) has been created.
154    VoiceSessionStarted {
155        room_name: String,
156        agent_id: String,
157        voice_id: String,
158    },
159
160    /// A transcript fragment from a voice session.
161    VoiceTranscript {
162        room_name: String,
163        text: String,
164        /// "user" or "agent"
165        role: String,
166        is_final: bool,
167    },
168
169    /// The voice agent's state changed (e.g. listening, thinking, speaking).
170    VoiceAgentStateChanged { room_name: String, state: String },
171
172    /// A voice session has ended.
173    VoiceSessionEnded { room_name: String, reason: String },
174}
175
176// ─── Constants ───────────────────────────────────────────────────────────
177
178/// Default channel capacity (per bus instance)
179const DEFAULT_BUS_CAPACITY: usize = 4096;
180
181// ─── AgentBus ────────────────────────────────────────────────────────────
182
183/// The central in-process message bus.
184///
185/// Internally this is a `tokio::sync::broadcast` channel so every subscriber
186/// receives every message.  Filtering by topic is done on the consumer side
187/// through `BusHandle::subscribe_topic` / `BusHandle::recv_filtered`.
188pub struct AgentBus {
189    tx: broadcast::Sender<BusEnvelope>,
190    /// Registry of connected agents
191    pub registry: Arc<registry::AgentRegistry>,
192}
193
194impl std::fmt::Debug for AgentBus {
195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        f.debug_struct("AgentBus")
197            .field("subscribers", &self.tx.receiver_count())
198            .finish()
199    }
200}
201
202impl AgentBus {
203    /// Create a new bus with default capacity.
204    pub fn new() -> Self {
205        Self::with_capacity(DEFAULT_BUS_CAPACITY)
206    }
207
208    /// Create a new bus with a specific channel capacity.
209    pub fn with_capacity(capacity: usize) -> Self {
210        let (tx, _) = broadcast::channel(capacity);
211        Self {
212            tx,
213            registry: Arc::new(registry::AgentRegistry::new()),
214        }
215    }
216
217    /// Wrap `self` in an `Arc` for sharing across tasks.
218    pub fn into_arc(self) -> Arc<Self> {
219        Arc::new(self)
220    }
221
222    /// Create a `BusHandle` scoped to a specific agent.
223    pub fn handle(self: &Arc<Self>, agent_id: impl Into<String>) -> BusHandle {
224        BusHandle {
225            agent_id: agent_id.into(),
226            bus: Arc::clone(self),
227            rx: self.tx.subscribe(),
228        }
229    }
230
231    /// Publish an envelope directly (low-level).
232    pub fn publish(&self, envelope: BusEnvelope) -> usize {
233        match &envelope.message {
234            BusMessage::AgentReady {
235                agent_id,
236                capabilities,
237            } => {
238                self.registry.register_ready(agent_id, capabilities);
239            }
240            BusMessage::AgentShutdown { agent_id } => {
241                self.registry.deregister(agent_id);
242            }
243            _ => {}
244        }
245
246        // Returns the number of receivers that got the message.
247        // If there are no receivers the message is silently dropped.
248        self.tx.send(envelope).unwrap_or(0)
249    }
250
251    /// Number of active receivers.
252    pub fn receiver_count(&self) -> usize {
253        self.tx.receiver_count()
254    }
255}
256
257impl Default for AgentBus {
258    fn default() -> Self {
259        Self::new()
260    }
261}
262
263// ─── BusHandle ───────────────────────────────────────────────────────────
264
265/// A scoped handle that a single agent uses to send and receive messages.
266///
267/// The handle knows the agent's id and uses it as `sender_id` on outgoing
268/// envelopes.  It also provides filtered receive helpers.
269pub struct BusHandle {
270    agent_id: String,
271    bus: Arc<AgentBus>,
272    rx: broadcast::Receiver<BusEnvelope>,
273}
274
275impl BusHandle {
276    /// The agent id this handle belongs to.
277    pub fn agent_id(&self) -> &str {
278        &self.agent_id
279    }
280
281    /// Send a message on the given topic.
282    pub fn send(&self, topic: impl Into<String>, message: BusMessage) -> usize {
283        self.send_with_correlation(topic, message, None)
284    }
285
286    /// Send a message with a correlation id.
287    pub fn send_with_correlation(
288        &self,
289        topic: impl Into<String>,
290        message: BusMessage,
291        correlation_id: Option<String>,
292    ) -> usize {
293        let envelope = BusEnvelope {
294            id: Uuid::new_v4().to_string(),
295            topic: topic.into(),
296            sender_id: self.agent_id.clone(),
297            correlation_id,
298            timestamp: Utc::now(),
299            message,
300        };
301        self.bus.publish(envelope)
302    }
303
304    /// Announce this agent as ready.
305    pub fn announce_ready(&self, capabilities: Vec<String>) -> usize {
306        self.send(
307            "broadcast",
308            BusMessage::AgentReady {
309                agent_id: self.agent_id.clone(),
310                capabilities,
311            },
312        )
313    }
314
315    /// Announce this agent is shutting down.
316    pub fn announce_shutdown(&self) -> usize {
317        self.send(
318            "broadcast",
319            BusMessage::AgentShutdown {
320                agent_id: self.agent_id.clone(),
321            },
322        )
323    }
324
325    /// Announce a task status update.
326    pub fn send_task_update(
327        &self,
328        task_id: &str,
329        state: TaskState,
330        message: Option<String>,
331    ) -> usize {
332        self.send(
333            format!("task.{task_id}"),
334            BusMessage::TaskUpdate {
335                task_id: task_id.to_string(),
336                state,
337                message,
338            },
339        )
340    }
341
342    /// Announce an artifact update.
343    pub fn send_artifact_update(&self, task_id: &str, artifact: Artifact) -> usize {
344        self.send(
345            format!("task.{task_id}"),
346            BusMessage::ArtifactUpdate {
347                task_id: task_id.to_string(),
348                artifact,
349            },
350        )
351    }
352
353    /// Send a direct message to another agent.
354    pub fn send_to_agent(&self, to: &str, parts: Vec<Part>) -> usize {
355        self.send(
356            format!("agent.{to}"),
357            BusMessage::AgentMessage {
358                from: self.agent_id.clone(),
359                to: to.to_string(),
360                parts,
361            },
362        )
363    }
364
365    /// Publish a shared result (visible to the entire swarm).
366    pub fn publish_shared_result(
367        &self,
368        key: impl Into<String>,
369        value: serde_json::Value,
370        tags: Vec<String>,
371    ) -> usize {
372        let key = key.into();
373        self.send(
374            format!("results.{}", &key),
375            BusMessage::SharedResult { key, value, tags },
376        )
377    }
378
379    // ── Ralph helpers ────────────────────────────────────────────────
380
381    /// Publish learnings from a Ralph iteration so other agents / future
382    /// iterations can build on them.
383    pub fn publish_ralph_learning(
384        &self,
385        prd_id: &str,
386        story_id: &str,
387        iteration: usize,
388        learnings: Vec<String>,
389        context: serde_json::Value,
390    ) -> usize {
391        self.send(
392            format!("ralph.{prd_id}"),
393            BusMessage::RalphLearning {
394                prd_id: prd_id.to_string(),
395                story_id: story_id.to_string(),
396                iteration,
397                learnings,
398                context,
399            },
400        )
401    }
402
403    /// Publish a context handoff between sequential Ralph stories.
404    pub fn publish_ralph_handoff(
405        &self,
406        prd_id: &str,
407        from_story: &str,
408        to_story: &str,
409        context: serde_json::Value,
410        progress_summary: &str,
411    ) -> usize {
412        self.send(
413            format!("ralph.{prd_id}"),
414            BusMessage::RalphHandoff {
415                prd_id: prd_id.to_string(),
416                from_story: from_story.to_string(),
417                to_story: to_story.to_string(),
418                context,
419                progress_summary: progress_summary.to_string(),
420            },
421        )
422    }
423
424    /// Publish PRD-level progress.
425    pub fn publish_ralph_progress(
426        &self,
427        prd_id: &str,
428        passed: usize,
429        total: usize,
430        iteration: usize,
431        status: &str,
432    ) -> usize {
433        self.send(
434            format!("ralph.{prd_id}"),
435            BusMessage::RalphProgress {
436                prd_id: prd_id.to_string(),
437                passed,
438                total,
439                iteration,
440                status: status.to_string(),
441            },
442        )
443    }
444
445    /// Drain all accumulated Ralph learnings for a PRD (non-blocking).
446    pub fn drain_ralph_learnings(&mut self, prd_id: &str) -> Vec<BusEnvelope> {
447        let prefix = format!("ralph.{prd_id}");
448        let mut out = Vec::new();
449        while let Some(env) = self.try_recv() {
450            if env.topic.starts_with(&prefix) {
451                if matches!(
452                    &env.message,
453                    BusMessage::RalphLearning { .. } | BusMessage::RalphHandoff { .. }
454                ) {
455                    out.push(env);
456                }
457            }
458        }
459        out
460    }
461
462    // ── Voice helpers ────────────────────────────────────────────────
463
464    /// Announce that a voice session has started.
465    pub fn send_voice_session_started(&self, room_name: &str, voice_id: &str) -> usize {
466        self.send(
467            format!("voice.{room_name}"),
468            BusMessage::VoiceSessionStarted {
469                room_name: room_name.to_string(),
470                agent_id: self.agent_id.clone(),
471                voice_id: voice_id.to_string(),
472            },
473        )
474    }
475
476    /// Publish a transcript fragment from a voice session.
477    pub fn send_voice_transcript(
478        &self,
479        room_name: &str,
480        text: &str,
481        role: &str,
482        is_final: bool,
483    ) -> usize {
484        self.send(
485            format!("voice.{room_name}"),
486            BusMessage::VoiceTranscript {
487                room_name: room_name.to_string(),
488                text: text.to_string(),
489                role: role.to_string(),
490                is_final,
491            },
492        )
493    }
494
495    /// Announce a voice agent state change.
496    pub fn send_voice_agent_state(&self, room_name: &str, state: &str) -> usize {
497        self.send(
498            format!("voice.{room_name}"),
499            BusMessage::VoiceAgentStateChanged {
500                room_name: room_name.to_string(),
501                state: state.to_string(),
502            },
503        )
504    }
505
506    /// Announce that a voice session has ended.
507    pub fn send_voice_session_ended(&self, room_name: &str, reason: &str) -> usize {
508        self.send(
509            format!("voice.{room_name}"),
510            BusMessage::VoiceSessionEnded {
511                room_name: room_name.to_string(),
512                reason: reason.to_string(),
513            },
514        )
515    }
516
517    /// Receive the next envelope (blocks until available).
518    pub async fn recv(&mut self) -> Option<BusEnvelope> {
519        loop {
520            match self.rx.recv().await {
521                Ok(env) => return Some(env),
522                Err(broadcast::error::RecvError::Lagged(n)) => {
523                    tracing::warn!(
524                        agent_id = %self.agent_id,
525                        skipped = n,
526                        "Bus handle lagged, skipping messages"
527                    );
528                    continue;
529                }
530                Err(broadcast::error::RecvError::Closed) => return None,
531            }
532        }
533    }
534
535    /// Receive the next envelope whose topic starts with the given prefix.
536    pub async fn recv_topic(&mut self, prefix: &str) -> Option<BusEnvelope> {
537        loop {
538            match self.recv().await {
539                Some(env) if env.topic.starts_with(prefix) => return Some(env),
540                Some(_) => continue, // wrong topic, skip
541                None => return None,
542            }
543        }
544    }
545
546    /// Receive the next envelope addressed to this agent.
547    pub async fn recv_mine(&mut self) -> Option<BusEnvelope> {
548        let prefix = format!("agent.{}", self.agent_id);
549        self.recv_topic(&prefix).await
550    }
551
552    /// Try to receive without blocking. Returns `None` if no message is
553    /// queued.
554    pub fn try_recv(&mut self) -> Option<BusEnvelope> {
555        loop {
556            match self.rx.try_recv() {
557                Ok(env) => return Some(env),
558                Err(broadcast::error::TryRecvError::Lagged(n)) => {
559                    tracing::warn!(
560                        agent_id = %self.agent_id,
561                        skipped = n,
562                        "Bus handle lagged (try_recv), skipping"
563                    );
564                    continue;
565                }
566                Err(broadcast::error::TryRecvError::Empty)
567                | Err(broadcast::error::TryRecvError::Closed) => return None,
568            }
569        }
570    }
571
572    /// Access the agent registry.
573    pub fn registry(&self) -> &Arc<registry::AgentRegistry> {
574        &self.bus.registry
575    }
576
577    /// Get mutable access to the underlying receiver.
578    /// This allows consuming the receiver in stream operations.
579    pub fn into_receiver(self) -> broadcast::Receiver<BusEnvelope> {
580        self.rx
581    }
582}
583
584// ─── Tests ───────────────────────────────────────────────────────────────
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589
590    #[tokio::test]
591    async fn test_bus_send_recv() {
592        let bus = AgentBus::new().into_arc();
593        let mut handle_a = bus.handle("agent-a");
594        let mut handle_b = bus.handle("agent-b");
595
596        handle_a.send_to_agent(
597            "agent-b",
598            vec![Part::Text {
599                text: "hello".into(),
600            }],
601        );
602
603        // Both handles receive the broadcast
604        let env = handle_b.recv().await.unwrap();
605        assert_eq!(env.topic, "agent.agent-b");
606        match &env.message {
607            BusMessage::AgentMessage { from, to, .. } => {
608                assert_eq!(from, "agent-a");
609                assert_eq!(to, "agent-b");
610            }
611            other => panic!("unexpected message: {other:?}"),
612        }
613
614        // handle_a also receives it (broadcast semantics)
615        let env_a = handle_a.try_recv().unwrap();
616        assert_eq!(env_a.topic, "agent.agent-b");
617    }
618
619    #[tokio::test]
620    async fn test_bus_task_update() {
621        let bus = AgentBus::new().into_arc();
622        let handle = bus.handle("worker-1");
623
624        let h2 = bus.handle("observer");
625        // need mutable for recv
626        let mut h2 = h2;
627
628        handle.send_task_update("task-42", TaskState::Working, Some("processing".into()));
629
630        let env = h2.recv().await.unwrap();
631        assert_eq!(env.topic, "task.task-42");
632        match &env.message {
633            BusMessage::TaskUpdate { task_id, state, .. } => {
634                assert_eq!(task_id, "task-42");
635                assert_eq!(*state, TaskState::Working);
636            }
637            other => panic!("unexpected: {other:?}"),
638        }
639    }
640
641    #[tokio::test]
642    async fn test_bus_no_receivers() {
643        let bus = AgentBus::new().into_arc();
644        // No handles created — send should succeed with 0 receivers
645        let env = BusEnvelope {
646            id: "test".into(),
647            topic: "broadcast".into(),
648            sender_id: "nobody".into(),
649            correlation_id: None,
650            timestamp: Utc::now(),
651            message: BusMessage::Heartbeat {
652                agent_id: "nobody".into(),
653                status: "ok".into(),
654            },
655        };
656        let count = bus.publish(env);
657        assert_eq!(count, 0);
658    }
659
660    #[tokio::test]
661    async fn test_recv_topic_filter() {
662        let bus = AgentBus::new().into_arc();
663        let handle = bus.handle("agent-x");
664        let mut listener = bus.handle("listener");
665
666        // Send to two different topics
667        handle.send(
668            "task.1",
669            BusMessage::TaskUpdate {
670                task_id: "1".into(),
671                state: TaskState::Working,
672                message: None,
673            },
674        );
675        handle.send(
676            "task.2",
677            BusMessage::TaskUpdate {
678                task_id: "2".into(),
679                state: TaskState::Completed,
680                message: None,
681            },
682        );
683
684        // recv_topic("task.2") should skip the first and return the second
685        let env = listener.recv_topic("task.2").await.unwrap();
686        match &env.message {
687            BusMessage::TaskUpdate { task_id, .. } => assert_eq!(task_id, "2"),
688            other => panic!("unexpected: {other:?}"),
689        }
690    }
691
692    #[tokio::test]
693    async fn test_ready_shutdown_syncs_registry() {
694        let bus = AgentBus::new().into_arc();
695        let handle = bus.handle("planner-1");
696
697        handle.announce_ready(vec!["plan".to_string(), "review".to_string()]);
698        assert!(bus.registry.get("planner-1").is_some());
699
700        handle.announce_shutdown();
701        assert!(bus.registry.get("planner-1").is_none());
702    }
703}