1pub mod global;
22pub mod registry;
23pub mod relay;
24pub mod s3_sink;
25
26pub use global::{global, set_global};
27
28use crate::a2a::types::{Artifact, Part, TaskState};
29use chrono::{DateTime, Utc};
30use serde::{Deserialize, Serialize};
31use std::sync::Arc;
32use tokio::sync::broadcast;
33use uuid::Uuid;
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BusEnvelope {
40 pub id: String,
42 pub topic: String,
44 pub sender_id: String,
46 pub correlation_id: Option<String>,
48 pub timestamp: DateTime<Utc>,
50 pub message: BusMessage,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56#[serde(tag = "kind", rename_all = "snake_case")]
57pub enum BusMessage {
58 AgentReady {
60 agent_id: String,
61 capabilities: Vec<String>,
62 },
63 AgentShutdown { agent_id: String },
65 AgentMessage {
67 from: String,
68 to: String,
69 parts: Vec<Part>,
70 },
71 TaskUpdate {
73 task_id: String,
74 state: TaskState,
75 message: Option<String>,
76 },
77 ArtifactUpdate { task_id: String, artifact: Artifact },
79 SharedResult {
81 key: String,
82 value: serde_json::Value,
83 tags: Vec<String>,
84 },
85 ToolRequest {
87 request_id: String,
88 agent_id: String,
89 tool_name: String,
90 arguments: serde_json::Value,
91 step: usize,
92 },
93 ToolResponse {
95 request_id: String,
96 agent_id: String,
97 tool_name: String,
98 result: String,
99 success: bool,
100 step: usize,
101 },
102 Heartbeat { agent_id: String, status: String },
104
105 RalphLearning {
110 prd_id: String,
111 story_id: String,
112 iteration: usize,
113 learnings: Vec<String>,
114 context: serde_json::Value,
115 },
116
117 RalphHandoff {
120 prd_id: String,
121 from_story: String,
122 to_story: String,
123 context: serde_json::Value,
124 progress_summary: String,
125 },
126
127 RalphProgress {
129 prd_id: String,
130 passed: usize,
131 total: usize,
132 iteration: usize,
133 status: String,
134 },
135
136 ToolOutputFull {
141 agent_id: String,
142 tool_name: String,
143 output: String,
144 success: bool,
145 step: usize,
146 },
147
148 AgentThinking {
152 agent_id: String,
153 thinking: String,
154 step: usize,
155 },
156
157 VoiceSessionStarted {
160 room_name: String,
161 agent_id: String,
162 voice_id: String,
163 },
164
165 VoiceTranscript {
167 room_name: String,
168 text: String,
169 role: String,
171 is_final: bool,
172 },
173
174 VoiceAgentStateChanged { room_name: String, state: String },
176
177 VoiceSessionEnded { room_name: String, reason: String },
179}
180
181const DEFAULT_BUS_CAPACITY: usize = 4096;
185
186pub struct AgentBus {
194 tx: broadcast::Sender<BusEnvelope>,
195 pub registry: Arc<registry::AgentRegistry>,
197}
198
199impl std::fmt::Debug for AgentBus {
200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 f.debug_struct("AgentBus")
202 .field("subscribers", &self.tx.receiver_count())
203 .finish()
204 }
205}
206
207impl AgentBus {
208 pub fn new() -> Self {
210 Self::with_capacity(DEFAULT_BUS_CAPACITY)
211 }
212
213 pub fn with_capacity(capacity: usize) -> Self {
215 let (tx, _) = broadcast::channel(capacity);
216 Self {
217 tx,
218 registry: Arc::new(registry::AgentRegistry::new()),
219 }
220 }
221
222 pub fn into_arc(self) -> Arc<Self> {
224 Arc::new(self)
225 }
226
227 pub fn handle(self: &Arc<Self>, agent_id: impl Into<String>) -> BusHandle {
229 BusHandle {
230 agent_id: agent_id.into(),
231 bus: Arc::clone(self),
232 rx: self.tx.subscribe(),
233 }
234 }
235
236 pub fn publish(&self, envelope: BusEnvelope) -> usize {
238 match &envelope.message {
239 BusMessage::AgentReady {
240 agent_id,
241 capabilities,
242 } => {
243 self.registry.register_ready(agent_id, capabilities);
244 }
245 BusMessage::AgentShutdown { agent_id } => {
246 self.registry.deregister(agent_id);
247 }
248 _ => {}
249 }
250
251 self.tx.send(envelope).unwrap_or(0)
254 }
255
256 pub fn receiver_count(&self) -> usize {
258 self.tx.receiver_count()
259 }
260}
261
262impl Default for AgentBus {
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268pub struct BusHandle {
275 agent_id: String,
276 bus: Arc<AgentBus>,
277 rx: broadcast::Receiver<BusEnvelope>,
278}
279
280impl BusHandle {
281 pub fn agent_id(&self) -> &str {
283 &self.agent_id
284 }
285
286 pub fn send(&self, topic: impl Into<String>, message: BusMessage) -> usize {
288 self.send_with_correlation(topic, message, None)
289 }
290
291 pub fn send_with_correlation(
293 &self,
294 topic: impl Into<String>,
295 message: BusMessage,
296 correlation_id: Option<String>,
297 ) -> usize {
298 let envelope = BusEnvelope {
299 id: Uuid::new_v4().to_string(),
300 topic: topic.into(),
301 sender_id: self.agent_id.clone(),
302 correlation_id,
303 timestamp: Utc::now(),
304 message,
305 };
306 self.bus.publish(envelope)
307 }
308
309 pub fn announce_ready(&self, capabilities: Vec<String>) -> usize {
311 self.send(
312 "broadcast",
313 BusMessage::AgentReady {
314 agent_id: self.agent_id.clone(),
315 capabilities,
316 },
317 )
318 }
319
320 pub fn announce_shutdown(&self) -> usize {
322 self.send(
323 "broadcast",
324 BusMessage::AgentShutdown {
325 agent_id: self.agent_id.clone(),
326 },
327 )
328 }
329
330 pub fn send_task_update(
332 &self,
333 task_id: &str,
334 state: TaskState,
335 message: Option<String>,
336 ) -> usize {
337 self.send(
338 format!("task.{task_id}"),
339 BusMessage::TaskUpdate {
340 task_id: task_id.to_string(),
341 state,
342 message,
343 },
344 )
345 }
346
347 pub fn send_artifact_update(&self, task_id: &str, artifact: Artifact) -> usize {
349 self.send(
350 format!("task.{task_id}"),
351 BusMessage::ArtifactUpdate {
352 task_id: task_id.to_string(),
353 artifact,
354 },
355 )
356 }
357
358 pub fn send_to_agent(&self, to: &str, parts: Vec<Part>) -> usize {
360 self.send(
361 format!("agent.{to}"),
362 BusMessage::AgentMessage {
363 from: self.agent_id.clone(),
364 to: to.to_string(),
365 parts,
366 },
367 )
368 }
369
370 pub fn publish_shared_result(
372 &self,
373 key: impl Into<String>,
374 value: serde_json::Value,
375 tags: Vec<String>,
376 ) -> usize {
377 let key = key.into();
378 self.send(
379 format!("results.{}", &key),
380 BusMessage::SharedResult { key, value, tags },
381 )
382 }
383
384 pub fn publish_ralph_learning(
389 &self,
390 prd_id: &str,
391 story_id: &str,
392 iteration: usize,
393 learnings: Vec<String>,
394 context: serde_json::Value,
395 ) -> usize {
396 self.send(
397 format!("ralph.{prd_id}"),
398 BusMessage::RalphLearning {
399 prd_id: prd_id.to_string(),
400 story_id: story_id.to_string(),
401 iteration,
402 learnings,
403 context,
404 },
405 )
406 }
407
408 pub fn publish_ralph_handoff(
410 &self,
411 prd_id: &str,
412 from_story: &str,
413 to_story: &str,
414 context: serde_json::Value,
415 progress_summary: &str,
416 ) -> usize {
417 self.send(
418 format!("ralph.{prd_id}"),
419 BusMessage::RalphHandoff {
420 prd_id: prd_id.to_string(),
421 from_story: from_story.to_string(),
422 to_story: to_story.to_string(),
423 context,
424 progress_summary: progress_summary.to_string(),
425 },
426 )
427 }
428
429 pub fn publish_ralph_progress(
431 &self,
432 prd_id: &str,
433 passed: usize,
434 total: usize,
435 iteration: usize,
436 status: &str,
437 ) -> usize {
438 self.send(
439 format!("ralph.{prd_id}"),
440 BusMessage::RalphProgress {
441 prd_id: prd_id.to_string(),
442 passed,
443 total,
444 iteration,
445 status: status.to_string(),
446 },
447 )
448 }
449
450 pub fn drain_ralph_learnings(&mut self, prd_id: &str) -> Vec<BusEnvelope> {
452 let prefix = format!("ralph.{prd_id}");
453 let mut out = Vec::new();
454 while let Some(env) = self.try_recv() {
455 if env.topic.starts_with(&prefix)
456 && matches!(
457 &env.message,
458 BusMessage::RalphLearning { .. } | BusMessage::RalphHandoff { .. }
459 )
460 {
461 out.push(env);
462 }
463 }
464 out
465 }
466
467 pub fn send_voice_session_started(&self, room_name: &str, voice_id: &str) -> usize {
471 self.send(
472 format!("voice.{room_name}"),
473 BusMessage::VoiceSessionStarted {
474 room_name: room_name.to_string(),
475 agent_id: self.agent_id.clone(),
476 voice_id: voice_id.to_string(),
477 },
478 )
479 }
480
481 pub fn send_voice_transcript(
483 &self,
484 room_name: &str,
485 text: &str,
486 role: &str,
487 is_final: bool,
488 ) -> usize {
489 self.send(
490 format!("voice.{room_name}"),
491 BusMessage::VoiceTranscript {
492 room_name: room_name.to_string(),
493 text: text.to_string(),
494 role: role.to_string(),
495 is_final,
496 },
497 )
498 }
499
500 pub fn send_voice_agent_state(&self, room_name: &str, state: &str) -> usize {
502 self.send(
503 format!("voice.{room_name}"),
504 BusMessage::VoiceAgentStateChanged {
505 room_name: room_name.to_string(),
506 state: state.to_string(),
507 },
508 )
509 }
510
511 pub fn send_voice_session_ended(&self, room_name: &str, reason: &str) -> usize {
513 self.send(
514 format!("voice.{room_name}"),
515 BusMessage::VoiceSessionEnded {
516 room_name: room_name.to_string(),
517 reason: reason.to_string(),
518 },
519 )
520 }
521
522 pub async fn recv(&mut self) -> Option<BusEnvelope> {
524 loop {
525 match self.rx.recv().await {
526 Ok(env) => return Some(env),
527 Err(broadcast::error::RecvError::Lagged(n)) => {
528 tracing::warn!(
529 agent_id = %self.agent_id,
530 skipped = n,
531 "Bus handle lagged, skipping messages"
532 );
533 continue;
534 }
535 Err(broadcast::error::RecvError::Closed) => return None,
536 }
537 }
538 }
539
540 pub async fn recv_topic(&mut self, prefix: &str) -> Option<BusEnvelope> {
542 loop {
543 match self.recv().await {
544 Some(env) if env.topic.starts_with(prefix) => return Some(env),
545 Some(_) => continue, None => return None,
547 }
548 }
549 }
550
551 pub async fn recv_mine(&mut self) -> Option<BusEnvelope> {
553 let prefix = format!("agent.{}", self.agent_id);
554 self.recv_topic(&prefix).await
555 }
556
557 pub fn try_recv(&mut self) -> Option<BusEnvelope> {
560 loop {
561 match self.rx.try_recv() {
562 Ok(env) => return Some(env),
563 Err(broadcast::error::TryRecvError::Lagged(n)) => {
564 tracing::warn!(
565 agent_id = %self.agent_id,
566 skipped = n,
567 "Bus handle lagged (try_recv), skipping"
568 );
569 continue;
570 }
571 Err(broadcast::error::TryRecvError::Empty)
572 | Err(broadcast::error::TryRecvError::Closed) => return None,
573 }
574 }
575 }
576
577 pub fn registry(&self) -> &Arc<registry::AgentRegistry> {
579 &self.bus.registry
580 }
581
582 pub fn into_receiver(self) -> broadcast::Receiver<BusEnvelope> {
585 self.rx
586 }
587}
588
589#[cfg(test)]
592mod tests {
593 use super::*;
594
595 #[tokio::test]
596 async fn test_bus_send_recv() {
597 let bus = AgentBus::new().into_arc();
598 let mut handle_a = bus.handle("agent-a");
599 let mut handle_b = bus.handle("agent-b");
600
601 handle_a.send_to_agent(
602 "agent-b",
603 vec![Part::Text {
604 text: "hello".into(),
605 }],
606 );
607
608 let env = handle_b.recv().await.unwrap();
610 assert_eq!(env.topic, "agent.agent-b");
611 match &env.message {
612 BusMessage::AgentMessage { from, to, .. } => {
613 assert_eq!(from, "agent-a");
614 assert_eq!(to, "agent-b");
615 }
616 other => panic!("unexpected message: {other:?}"),
617 }
618
619 let env_a = handle_a.try_recv().unwrap();
621 assert_eq!(env_a.topic, "agent.agent-b");
622 }
623
624 #[tokio::test]
625 async fn test_bus_task_update() {
626 let bus = AgentBus::new().into_arc();
627 let handle = bus.handle("worker-1");
628
629 let h2 = bus.handle("observer");
630 let mut h2 = h2;
632
633 handle.send_task_update("task-42", TaskState::Working, Some("processing".into()));
634
635 let env = h2.recv().await.unwrap();
636 assert_eq!(env.topic, "task.task-42");
637 match &env.message {
638 BusMessage::TaskUpdate { task_id, state, .. } => {
639 assert_eq!(task_id, "task-42");
640 assert_eq!(*state, TaskState::Working);
641 }
642 other => panic!("unexpected: {other:?}"),
643 }
644 }
645
646 #[tokio::test]
647 async fn test_bus_no_receivers() {
648 let bus = AgentBus::new().into_arc();
649 let env = BusEnvelope {
651 id: "test".into(),
652 topic: "broadcast".into(),
653 sender_id: "nobody".into(),
654 correlation_id: None,
655 timestamp: Utc::now(),
656 message: BusMessage::Heartbeat {
657 agent_id: "nobody".into(),
658 status: "ok".into(),
659 },
660 };
661 let count = bus.publish(env);
662 assert_eq!(count, 0);
663 }
664
665 #[tokio::test]
666 async fn test_recv_topic_filter() {
667 let bus = AgentBus::new().into_arc();
668 let handle = bus.handle("agent-x");
669 let mut listener = bus.handle("listener");
670
671 handle.send(
673 "task.1",
674 BusMessage::TaskUpdate {
675 task_id: "1".into(),
676 state: TaskState::Working,
677 message: None,
678 },
679 );
680 handle.send(
681 "task.2",
682 BusMessage::TaskUpdate {
683 task_id: "2".into(),
684 state: TaskState::Completed,
685 message: None,
686 },
687 );
688
689 let env = listener.recv_topic("task.2").await.unwrap();
691 match &env.message {
692 BusMessage::TaskUpdate { task_id, .. } => assert_eq!(task_id, "2"),
693 other => panic!("unexpected: {other:?}"),
694 }
695 }
696
697 #[tokio::test]
698 async fn test_ready_shutdown_syncs_registry() {
699 let bus = AgentBus::new().into_arc();
700 let handle = bus.handle("planner-1");
701
702 handle.announce_ready(vec!["plan".to_string(), "review".to_string()]);
703 assert!(bus.registry.get("planner-1").is_some());
704
705 handle.announce_shutdown();
706 assert!(bus.registry.get("planner-1").is_none());
707 }
708}