1pub mod global; pub mod payload;
21pub mod registry;
22pub mod relay;
23pub mod s3_sink;
24
25pub use global::{global, set_global};
26
27use crate::a2a::types::{Artifact, Part, TaskState};
28use chrono::{DateTime, Utc};
29use serde::{Deserialize, Serialize};
30use std::sync::Arc;
31use tokio::sync::broadcast;
32use uuid::Uuid;
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct BusEnvelope {
39 pub id: String,
41 pub topic: String,
43 pub sender_id: String,
45 pub correlation_id: Option<String>,
47 pub timestamp: DateTime<Utc>,
49 pub message: BusMessage,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55#[serde(tag = "kind", rename_all = "snake_case")]
56pub enum BusMessage {
57 AgentReady {
59 agent_id: String,
60 capabilities: Vec<String>,
61 },
62 AgentShutdown { agent_id: String },
64 AgentMessage {
66 from: String,
67 to: String,
68 parts: Vec<Part>,
69 },
70 TaskUpdate {
72 task_id: String,
73 state: TaskState,
74 message: Option<String>,
75 },
76 ArtifactUpdate { task_id: String, artifact: Artifact },
78 SharedResult {
80 key: String,
81 value: serde_json::Value,
82 tags: Vec<String>,
83 },
84 ToolRequest {
86 request_id: String,
87 agent_id: String,
88 tool_name: String,
89 arguments: serde_json::Value,
90 step: usize,
91 },
92 ToolResponse {
94 request_id: String,
95 agent_id: String,
96 tool_name: String,
97 result: String,
98 success: bool,
99 step: usize,
100 },
101 Heartbeat { agent_id: String, status: String },
103
104 RalphLearning {
109 prd_id: String,
110 story_id: String,
111 iteration: usize,
112 learnings: Vec<String>,
113 context: serde_json::Value,
114 },
115
116 RalphHandoff {
119 prd_id: String,
120 from_story: String,
121 to_story: String,
122 context: serde_json::Value,
123 progress_summary: String,
124 },
125
126 RalphProgress {
128 prd_id: String,
129 passed: usize,
130 total: usize,
131 iteration: usize,
132 status: String,
133 },
134
135 ToolOutputFull {
140 agent_id: String,
141 tool_name: String,
142 output: String,
143 success: bool,
144 step: usize,
145 },
146
147 AgentThinking {
151 agent_id: String,
152 thinking: String,
153 step: usize,
154 },
155
156 VoiceSessionStarted {
159 room_name: String,
160 agent_id: String,
161 voice_id: String,
162 },
163
164 VoiceTranscript {
166 room_name: String,
167 text: String,
168 role: String,
170 is_final: bool,
171 },
172
173 VoiceAgentStateChanged { room_name: String, state: String },
175
176 VoiceSessionEnded { room_name: String, reason: String },
178}
179
180const DEFAULT_BUS_CAPACITY: usize = 4096;
184
185pub struct AgentBus {
193 tx: broadcast::Sender<BusEnvelope>,
194 pub registry: Arc<registry::AgentRegistry>,
196}
197
198impl std::fmt::Debug for AgentBus {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 f.debug_struct("AgentBus")
201 .field("subscribers", &self.tx.receiver_count())
202 .finish()
203 }
204}
205
206impl AgentBus {
207 pub fn new() -> Self {
209 Self::with_capacity(DEFAULT_BUS_CAPACITY)
210 }
211
212 pub fn with_capacity(capacity: usize) -> Self {
214 let (tx, _) = broadcast::channel(capacity);
215 Self {
216 tx,
217 registry: Arc::new(registry::AgentRegistry::new()),
218 }
219 }
220
221 pub fn into_arc(self) -> Arc<Self> {
223 Arc::new(self)
224 }
225
226 pub fn handle(self: &Arc<Self>, agent_id: impl Into<String>) -> BusHandle {
228 BusHandle {
229 agent_id: agent_id.into(),
230 bus: Arc::clone(self),
231 rx: self.tx.subscribe(),
232 }
233 }
234
235 pub fn publish(&self, envelope: BusEnvelope) -> usize {
237 match &envelope.message {
238 BusMessage::AgentReady {
239 agent_id,
240 capabilities,
241 } => {
242 self.registry.register_ready(agent_id, capabilities);
243 }
244 BusMessage::AgentShutdown { agent_id } => {
245 self.registry.deregister(agent_id);
246 }
247 _ => {}
248 }
249
250 self.tx.send(envelope).unwrap_or(0)
253 }
254
255 pub fn receiver_count(&self) -> usize {
257 self.tx.receiver_count()
258 }
259}
260
261impl Default for AgentBus {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267pub struct BusHandle {
274 agent_id: String,
275 bus: Arc<AgentBus>,
276 rx: broadcast::Receiver<BusEnvelope>,
277}
278
279impl BusHandle {
280 pub fn agent_id(&self) -> &str {
282 &self.agent_id
283 }
284
285 pub fn send(&self, topic: impl Into<String>, message: BusMessage) -> usize {
287 self.send_with_correlation(topic, message, None)
288 }
289
290 pub fn send_with_correlation(
292 &self,
293 topic: impl Into<String>,
294 message: BusMessage,
295 correlation_id: Option<String>,
296 ) -> usize {
297 let envelope = BusEnvelope {
298 id: Uuid::new_v4().to_string(),
299 topic: topic.into(),
300 sender_id: self.agent_id.clone(),
301 correlation_id,
302 timestamp: Utc::now(),
303 message,
304 };
305 self.bus.publish(envelope)
306 }
307
308 pub fn announce_ready(&self, capabilities: Vec<String>) -> usize {
310 self.send(
311 "broadcast",
312 BusMessage::AgentReady {
313 agent_id: self.agent_id.clone(),
314 capabilities,
315 },
316 )
317 }
318
319 pub fn announce_shutdown(&self) -> usize {
321 self.send(
322 "broadcast",
323 BusMessage::AgentShutdown {
324 agent_id: self.agent_id.clone(),
325 },
326 )
327 }
328
329 pub fn send_task_update(
331 &self,
332 task_id: &str,
333 state: TaskState,
334 message: Option<String>,
335 ) -> usize {
336 self.send(
337 format!("task.{task_id}"),
338 BusMessage::TaskUpdate {
339 task_id: task_id.to_string(),
340 state,
341 message,
342 },
343 )
344 }
345
346 pub fn send_artifact_update(&self, task_id: &str, artifact: Artifact) -> usize {
348 self.send(
349 format!("task.{task_id}"),
350 BusMessage::ArtifactUpdate {
351 task_id: task_id.to_string(),
352 artifact,
353 },
354 )
355 }
356
357 pub fn send_to_agent(&self, to: &str, parts: Vec<Part>) -> usize {
359 self.send(
360 format!("agent.{to}"),
361 BusMessage::AgentMessage {
362 from: self.agent_id.clone(),
363 to: to.to_string(),
364 parts,
365 },
366 )
367 }
368
369 pub fn publish_shared_result(
371 &self,
372 key: impl Into<String>,
373 value: serde_json::Value,
374 tags: Vec<String>,
375 ) -> usize {
376 let key = key.into();
377 self.send(
378 format!("results.{}", &key),
379 BusMessage::SharedResult { key, value, tags },
380 )
381 }
382
383 pub fn publish_ralph_learning(
388 &self,
389 prd_id: &str,
390 story_id: &str,
391 iteration: usize,
392 learnings: Vec<String>,
393 context: serde_json::Value,
394 ) -> usize {
395 self.send(
396 format!("ralph.{prd_id}"),
397 BusMessage::RalphLearning {
398 prd_id: prd_id.to_string(),
399 story_id: story_id.to_string(),
400 iteration,
401 learnings,
402 context,
403 },
404 )
405 }
406
407 pub fn publish_ralph_handoff(
409 &self,
410 prd_id: &str,
411 from_story: &str,
412 to_story: &str,
413 context: serde_json::Value,
414 progress_summary: &str,
415 ) -> usize {
416 self.send(
417 format!("ralph.{prd_id}"),
418 BusMessage::RalphHandoff {
419 prd_id: prd_id.to_string(),
420 from_story: from_story.to_string(),
421 to_story: to_story.to_string(),
422 context,
423 progress_summary: progress_summary.to_string(),
424 },
425 )
426 }
427
428 pub fn publish_ralph_progress(
430 &self,
431 prd_id: &str,
432 passed: usize,
433 total: usize,
434 iteration: usize,
435 status: &str,
436 ) -> usize {
437 self.send(
438 format!("ralph.{prd_id}"),
439 BusMessage::RalphProgress {
440 prd_id: prd_id.to_string(),
441 passed,
442 total,
443 iteration,
444 status: status.to_string(),
445 },
446 )
447 }
448
449 pub fn drain_ralph_learnings(&mut self, prd_id: &str) -> Vec<BusEnvelope> {
451 let prefix = format!("ralph.{prd_id}");
452 let mut out = Vec::new();
453 while let Some(env) = self.try_recv() {
454 if env.topic.starts_with(&prefix)
455 && matches!(
456 &env.message,
457 BusMessage::RalphLearning { .. } | BusMessage::RalphHandoff { .. }
458 )
459 {
460 out.push(env);
461 }
462 }
463 out
464 }
465
466 pub fn send_voice_session_started(&self, room_name: &str, voice_id: &str) -> usize {
470 self.send(
471 format!("voice.{room_name}"),
472 BusMessage::VoiceSessionStarted {
473 room_name: room_name.to_string(),
474 agent_id: self.agent_id.clone(),
475 voice_id: voice_id.to_string(),
476 },
477 )
478 }
479
480 pub fn send_voice_transcript(
482 &self,
483 room_name: &str,
484 text: &str,
485 role: &str,
486 is_final: bool,
487 ) -> usize {
488 self.send(
489 format!("voice.{room_name}"),
490 BusMessage::VoiceTranscript {
491 room_name: room_name.to_string(),
492 text: text.to_string(),
493 role: role.to_string(),
494 is_final,
495 },
496 )
497 }
498
499 pub fn send_voice_agent_state(&self, room_name: &str, state: &str) -> usize {
501 self.send(
502 format!("voice.{room_name}"),
503 BusMessage::VoiceAgentStateChanged {
504 room_name: room_name.to_string(),
505 state: state.to_string(),
506 },
507 )
508 }
509
510 pub fn send_voice_session_ended(&self, room_name: &str, reason: &str) -> usize {
512 self.send(
513 format!("voice.{room_name}"),
514 BusMessage::VoiceSessionEnded {
515 room_name: room_name.to_string(),
516 reason: reason.to_string(),
517 },
518 )
519 }
520
521 pub async fn recv(&mut self) -> Option<BusEnvelope> {
523 loop {
524 match self.rx.recv().await {
525 Ok(env) => return Some(env),
526 Err(broadcast::error::RecvError::Lagged(n)) => {
527 tracing::warn!(
528 agent_id = %self.agent_id,
529 skipped = n,
530 "Bus handle lagged, skipping messages"
531 );
532 continue;
533 }
534 Err(broadcast::error::RecvError::Closed) => return None,
535 }
536 }
537 }
538
539 pub async fn recv_topic(&mut self, prefix: &str) -> Option<BusEnvelope> {
541 loop {
542 match self.recv().await {
543 Some(env) if env.topic.starts_with(prefix) => return Some(env),
544 Some(_) => continue, None => return None,
546 }
547 }
548 }
549
550 pub async fn recv_mine(&mut self) -> Option<BusEnvelope> {
552 let prefix = format!("agent.{}", self.agent_id);
553 self.recv_topic(&prefix).await
554 }
555
556 pub fn try_recv(&mut self) -> Option<BusEnvelope> {
559 loop {
560 match self.rx.try_recv() {
561 Ok(env) => return Some(env),
562 Err(broadcast::error::TryRecvError::Lagged(n)) => {
563 tracing::warn!(
564 agent_id = %self.agent_id,
565 skipped = n,
566 "Bus handle lagged (try_recv), skipping"
567 );
568 continue;
569 }
570 Err(broadcast::error::TryRecvError::Empty)
571 | Err(broadcast::error::TryRecvError::Closed) => return None,
572 }
573 }
574 }
575
576 pub fn registry(&self) -> &Arc<registry::AgentRegistry> {
578 &self.bus.registry
579 }
580
581 pub fn into_receiver(self) -> broadcast::Receiver<BusEnvelope> {
584 self.rx
585 }
586}
587
588#[cfg(test)]
591mod tests {
592 use super::*;
593
594 #[tokio::test]
595 async fn test_bus_send_recv() {
596 let bus = AgentBus::new().into_arc();
597 let mut handle_a = bus.handle("agent-a");
598 let mut handle_b = bus.handle("agent-b");
599
600 handle_a.send_to_agent(
601 "agent-b",
602 vec![Part::Text {
603 text: "hello".into(),
604 }],
605 );
606
607 let env = handle_b.recv().await.unwrap();
609 assert_eq!(env.topic, "agent.agent-b");
610 match &env.message {
611 BusMessage::AgentMessage { from, to, .. } => {
612 assert_eq!(from, "agent-a");
613 assert_eq!(to, "agent-b");
614 }
615 other => panic!("unexpected message: {other:?}"),
616 }
617
618 let env_a = handle_a.try_recv().unwrap();
620 assert_eq!(env_a.topic, "agent.agent-b");
621 }
622
623 #[tokio::test]
624 async fn test_bus_task_update() {
625 let bus = AgentBus::new().into_arc();
626 let handle = bus.handle("worker-1");
627
628 let h2 = bus.handle("observer");
629 let mut h2 = h2;
631
632 handle.send_task_update("task-42", TaskState::Working, Some("processing".into()));
633
634 let env = h2.recv().await.unwrap();
635 assert_eq!(env.topic, "task.task-42");
636 match &env.message {
637 BusMessage::TaskUpdate { task_id, state, .. } => {
638 assert_eq!(task_id, "task-42");
639 assert_eq!(*state, TaskState::Working);
640 }
641 other => panic!("unexpected: {other:?}"),
642 }
643 }
644
645 #[tokio::test]
646 async fn test_bus_no_receivers() {
647 let bus = AgentBus::new().into_arc();
648 let env = BusEnvelope {
650 id: "test".into(),
651 topic: "broadcast".into(),
652 sender_id: "nobody".into(),
653 correlation_id: None,
654 timestamp: Utc::now(),
655 message: BusMessage::Heartbeat {
656 agent_id: "nobody".into(),
657 status: "ok".into(),
658 },
659 };
660 let count = bus.publish(env);
661 assert_eq!(count, 0);
662 }
663
664 #[tokio::test]
665 async fn test_recv_topic_filter() {
666 let bus = AgentBus::new().into_arc();
667 let handle = bus.handle("agent-x");
668 let mut listener = bus.handle("listener");
669
670 handle.send(
672 "task.1",
673 BusMessage::TaskUpdate {
674 task_id: "1".into(),
675 state: TaskState::Working,
676 message: None,
677 },
678 );
679 handle.send(
680 "task.2",
681 BusMessage::TaskUpdate {
682 task_id: "2".into(),
683 state: TaskState::Completed,
684 message: None,
685 },
686 );
687
688 let env = listener.recv_topic("task.2").await.unwrap();
690 match &env.message {
691 BusMessage::TaskUpdate { task_id, .. } => assert_eq!(task_id, "2"),
692 other => panic!("unexpected: {other:?}"),
693 }
694 }
695
696 #[tokio::test]
697 async fn test_ready_shutdown_syncs_registry() {
698 let bus = AgentBus::new().into_arc();
699 let handle = bus.handle("planner-1");
700
701 handle.announce_ready(vec!["plan".to_string(), "review".to_string()]);
702 assert!(bus.registry.get("planner-1").is_some());
703
704 handle.announce_shutdown();
705 assert!(bus.registry.get("planner-1").is_none());
706 }
707}