1pub mod registry;
22pub mod relay;
23pub mod s3_sink;
24
25use crate::a2a::types::{Artifact, Part, TaskState};
26use chrono::{DateTime, Utc};
27use serde::{Deserialize, Serialize};
28use std::sync::Arc;
29use tokio::sync::broadcast;
30use uuid::Uuid;
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct BusEnvelope {
37 pub id: String,
39 pub topic: String,
41 pub sender_id: String,
43 pub correlation_id: Option<String>,
45 pub timestamp: DateTime<Utc>,
47 pub message: BusMessage,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53#[serde(tag = "kind", rename_all = "snake_case")]
54pub enum BusMessage {
55 AgentReady {
57 agent_id: String,
58 capabilities: Vec<String>,
59 },
60 AgentShutdown { agent_id: String },
62 AgentMessage {
64 from: String,
65 to: String,
66 parts: Vec<Part>,
67 },
68 TaskUpdate {
70 task_id: String,
71 state: TaskState,
72 message: Option<String>,
73 },
74 ArtifactUpdate { task_id: String, artifact: Artifact },
76 SharedResult {
78 key: String,
79 value: serde_json::Value,
80 tags: Vec<String>,
81 },
82 ToolRequest {
84 request_id: String,
85 agent_id: String,
86 tool_name: String,
87 arguments: serde_json::Value,
88 },
89 ToolResponse {
91 request_id: String,
92 agent_id: String,
93 tool_name: String,
94 result: String,
95 success: bool,
96 },
97 Heartbeat { agent_id: String, status: String },
99
100 RalphLearning {
105 prd_id: String,
106 story_id: String,
107 iteration: usize,
108 learnings: Vec<String>,
109 context: serde_json::Value,
110 },
111
112 RalphHandoff {
115 prd_id: String,
116 from_story: String,
117 to_story: String,
118 context: serde_json::Value,
119 progress_summary: String,
120 },
121
122 RalphProgress {
124 prd_id: String,
125 passed: usize,
126 total: usize,
127 iteration: usize,
128 status: String,
129 },
130
131 ToolOutputFull {
136 agent_id: String,
137 tool_name: String,
138 output: String,
139 success: bool,
140 step: usize,
141 },
142
143 AgentThinking {
147 agent_id: String,
148 thinking: String,
149 step: usize,
150 },
151
152 VoiceSessionStarted {
155 room_name: String,
156 agent_id: String,
157 voice_id: String,
158 },
159
160 VoiceTranscript {
162 room_name: String,
163 text: String,
164 role: String,
166 is_final: bool,
167 },
168
169 VoiceAgentStateChanged { room_name: String, state: String },
171
172 VoiceSessionEnded { room_name: String, reason: String },
174}
175
176const DEFAULT_BUS_CAPACITY: usize = 4096;
180
181pub struct AgentBus {
189 tx: broadcast::Sender<BusEnvelope>,
190 pub registry: Arc<registry::AgentRegistry>,
192}
193
194impl std::fmt::Debug for AgentBus {
195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 f.debug_struct("AgentBus")
197 .field("subscribers", &self.tx.receiver_count())
198 .finish()
199 }
200}
201
202impl AgentBus {
203 pub fn new() -> Self {
205 Self::with_capacity(DEFAULT_BUS_CAPACITY)
206 }
207
208 pub fn with_capacity(capacity: usize) -> Self {
210 let (tx, _) = broadcast::channel(capacity);
211 Self {
212 tx,
213 registry: Arc::new(registry::AgentRegistry::new()),
214 }
215 }
216
217 pub fn into_arc(self) -> Arc<Self> {
219 Arc::new(self)
220 }
221
222 pub fn handle(self: &Arc<Self>, agent_id: impl Into<String>) -> BusHandle {
224 BusHandle {
225 agent_id: agent_id.into(),
226 bus: Arc::clone(self),
227 rx: self.tx.subscribe(),
228 }
229 }
230
231 pub fn publish(&self, envelope: BusEnvelope) -> usize {
233 match &envelope.message {
234 BusMessage::AgentReady {
235 agent_id,
236 capabilities,
237 } => {
238 self.registry.register_ready(agent_id, capabilities);
239 }
240 BusMessage::AgentShutdown { agent_id } => {
241 self.registry.deregister(agent_id);
242 }
243 _ => {}
244 }
245
246 self.tx.send(envelope).unwrap_or(0)
249 }
250
251 pub fn receiver_count(&self) -> usize {
253 self.tx.receiver_count()
254 }
255}
256
257impl Default for AgentBus {
258 fn default() -> Self {
259 Self::new()
260 }
261}
262
263pub struct BusHandle {
270 agent_id: String,
271 bus: Arc<AgentBus>,
272 rx: broadcast::Receiver<BusEnvelope>,
273}
274
275impl BusHandle {
276 pub fn agent_id(&self) -> &str {
278 &self.agent_id
279 }
280
281 pub fn send(&self, topic: impl Into<String>, message: BusMessage) -> usize {
283 self.send_with_correlation(topic, message, None)
284 }
285
286 pub fn send_with_correlation(
288 &self,
289 topic: impl Into<String>,
290 message: BusMessage,
291 correlation_id: Option<String>,
292 ) -> usize {
293 let envelope = BusEnvelope {
294 id: Uuid::new_v4().to_string(),
295 topic: topic.into(),
296 sender_id: self.agent_id.clone(),
297 correlation_id,
298 timestamp: Utc::now(),
299 message,
300 };
301 self.bus.publish(envelope)
302 }
303
304 pub fn announce_ready(&self, capabilities: Vec<String>) -> usize {
306 self.send(
307 "broadcast",
308 BusMessage::AgentReady {
309 agent_id: self.agent_id.clone(),
310 capabilities,
311 },
312 )
313 }
314
315 pub fn announce_shutdown(&self) -> usize {
317 self.send(
318 "broadcast",
319 BusMessage::AgentShutdown {
320 agent_id: self.agent_id.clone(),
321 },
322 )
323 }
324
325 pub fn send_task_update(
327 &self,
328 task_id: &str,
329 state: TaskState,
330 message: Option<String>,
331 ) -> usize {
332 self.send(
333 format!("task.{task_id}"),
334 BusMessage::TaskUpdate {
335 task_id: task_id.to_string(),
336 state,
337 message,
338 },
339 )
340 }
341
342 pub fn send_artifact_update(&self, task_id: &str, artifact: Artifact) -> usize {
344 self.send(
345 format!("task.{task_id}"),
346 BusMessage::ArtifactUpdate {
347 task_id: task_id.to_string(),
348 artifact,
349 },
350 )
351 }
352
353 pub fn send_to_agent(&self, to: &str, parts: Vec<Part>) -> usize {
355 self.send(
356 format!("agent.{to}"),
357 BusMessage::AgentMessage {
358 from: self.agent_id.clone(),
359 to: to.to_string(),
360 parts,
361 },
362 )
363 }
364
365 pub fn publish_shared_result(
367 &self,
368 key: impl Into<String>,
369 value: serde_json::Value,
370 tags: Vec<String>,
371 ) -> usize {
372 let key = key.into();
373 self.send(
374 format!("results.{}", &key),
375 BusMessage::SharedResult { key, value, tags },
376 )
377 }
378
379 pub fn publish_ralph_learning(
384 &self,
385 prd_id: &str,
386 story_id: &str,
387 iteration: usize,
388 learnings: Vec<String>,
389 context: serde_json::Value,
390 ) -> usize {
391 self.send(
392 format!("ralph.{prd_id}"),
393 BusMessage::RalphLearning {
394 prd_id: prd_id.to_string(),
395 story_id: story_id.to_string(),
396 iteration,
397 learnings,
398 context,
399 },
400 )
401 }
402
403 pub fn publish_ralph_handoff(
405 &self,
406 prd_id: &str,
407 from_story: &str,
408 to_story: &str,
409 context: serde_json::Value,
410 progress_summary: &str,
411 ) -> usize {
412 self.send(
413 format!("ralph.{prd_id}"),
414 BusMessage::RalphHandoff {
415 prd_id: prd_id.to_string(),
416 from_story: from_story.to_string(),
417 to_story: to_story.to_string(),
418 context,
419 progress_summary: progress_summary.to_string(),
420 },
421 )
422 }
423
424 pub fn publish_ralph_progress(
426 &self,
427 prd_id: &str,
428 passed: usize,
429 total: usize,
430 iteration: usize,
431 status: &str,
432 ) -> usize {
433 self.send(
434 format!("ralph.{prd_id}"),
435 BusMessage::RalphProgress {
436 prd_id: prd_id.to_string(),
437 passed,
438 total,
439 iteration,
440 status: status.to_string(),
441 },
442 )
443 }
444
445 pub fn drain_ralph_learnings(&mut self, prd_id: &str) -> Vec<BusEnvelope> {
447 let prefix = format!("ralph.{prd_id}");
448 let mut out = Vec::new();
449 while let Some(env) = self.try_recv() {
450 if env.topic.starts_with(&prefix) {
451 if matches!(
452 &env.message,
453 BusMessage::RalphLearning { .. } | BusMessage::RalphHandoff { .. }
454 ) {
455 out.push(env);
456 }
457 }
458 }
459 out
460 }
461
462 pub fn send_voice_session_started(&self, room_name: &str, voice_id: &str) -> usize {
466 self.send(
467 format!("voice.{room_name}"),
468 BusMessage::VoiceSessionStarted {
469 room_name: room_name.to_string(),
470 agent_id: self.agent_id.clone(),
471 voice_id: voice_id.to_string(),
472 },
473 )
474 }
475
476 pub fn send_voice_transcript(
478 &self,
479 room_name: &str,
480 text: &str,
481 role: &str,
482 is_final: bool,
483 ) -> usize {
484 self.send(
485 format!("voice.{room_name}"),
486 BusMessage::VoiceTranscript {
487 room_name: room_name.to_string(),
488 text: text.to_string(),
489 role: role.to_string(),
490 is_final,
491 },
492 )
493 }
494
495 pub fn send_voice_agent_state(&self, room_name: &str, state: &str) -> usize {
497 self.send(
498 format!("voice.{room_name}"),
499 BusMessage::VoiceAgentStateChanged {
500 room_name: room_name.to_string(),
501 state: state.to_string(),
502 },
503 )
504 }
505
506 pub fn send_voice_session_ended(&self, room_name: &str, reason: &str) -> usize {
508 self.send(
509 format!("voice.{room_name}"),
510 BusMessage::VoiceSessionEnded {
511 room_name: room_name.to_string(),
512 reason: reason.to_string(),
513 },
514 )
515 }
516
517 pub async fn recv(&mut self) -> Option<BusEnvelope> {
519 loop {
520 match self.rx.recv().await {
521 Ok(env) => return Some(env),
522 Err(broadcast::error::RecvError::Lagged(n)) => {
523 tracing::warn!(
524 agent_id = %self.agent_id,
525 skipped = n,
526 "Bus handle lagged, skipping messages"
527 );
528 continue;
529 }
530 Err(broadcast::error::RecvError::Closed) => return None,
531 }
532 }
533 }
534
535 pub async fn recv_topic(&mut self, prefix: &str) -> Option<BusEnvelope> {
537 loop {
538 match self.recv().await {
539 Some(env) if env.topic.starts_with(prefix) => return Some(env),
540 Some(_) => continue, None => return None,
542 }
543 }
544 }
545
546 pub async fn recv_mine(&mut self) -> Option<BusEnvelope> {
548 let prefix = format!("agent.{}", self.agent_id);
549 self.recv_topic(&prefix).await
550 }
551
552 pub fn try_recv(&mut self) -> Option<BusEnvelope> {
555 loop {
556 match self.rx.try_recv() {
557 Ok(env) => return Some(env),
558 Err(broadcast::error::TryRecvError::Lagged(n)) => {
559 tracing::warn!(
560 agent_id = %self.agent_id,
561 skipped = n,
562 "Bus handle lagged (try_recv), skipping"
563 );
564 continue;
565 }
566 Err(broadcast::error::TryRecvError::Empty)
567 | Err(broadcast::error::TryRecvError::Closed) => return None,
568 }
569 }
570 }
571
572 pub fn registry(&self) -> &Arc<registry::AgentRegistry> {
574 &self.bus.registry
575 }
576
577 pub fn into_receiver(self) -> broadcast::Receiver<BusEnvelope> {
580 self.rx
581 }
582}
583
584#[cfg(test)]
587mod tests {
588 use super::*;
589
590 #[tokio::test]
591 async fn test_bus_send_recv() {
592 let bus = AgentBus::new().into_arc();
593 let mut handle_a = bus.handle("agent-a");
594 let mut handle_b = bus.handle("agent-b");
595
596 handle_a.send_to_agent(
597 "agent-b",
598 vec![Part::Text {
599 text: "hello".into(),
600 }],
601 );
602
603 let env = handle_b.recv().await.unwrap();
605 assert_eq!(env.topic, "agent.agent-b");
606 match &env.message {
607 BusMessage::AgentMessage { from, to, .. } => {
608 assert_eq!(from, "agent-a");
609 assert_eq!(to, "agent-b");
610 }
611 other => panic!("unexpected message: {other:?}"),
612 }
613
614 let env_a = handle_a.try_recv().unwrap();
616 assert_eq!(env_a.topic, "agent.agent-b");
617 }
618
619 #[tokio::test]
620 async fn test_bus_task_update() {
621 let bus = AgentBus::new().into_arc();
622 let handle = bus.handle("worker-1");
623
624 let h2 = bus.handle("observer");
625 let mut h2 = h2;
627
628 handle.send_task_update("task-42", TaskState::Working, Some("processing".into()));
629
630 let env = h2.recv().await.unwrap();
631 assert_eq!(env.topic, "task.task-42");
632 match &env.message {
633 BusMessage::TaskUpdate { task_id, state, .. } => {
634 assert_eq!(task_id, "task-42");
635 assert_eq!(*state, TaskState::Working);
636 }
637 other => panic!("unexpected: {other:?}"),
638 }
639 }
640
641 #[tokio::test]
642 async fn test_bus_no_receivers() {
643 let bus = AgentBus::new().into_arc();
644 let env = BusEnvelope {
646 id: "test".into(),
647 topic: "broadcast".into(),
648 sender_id: "nobody".into(),
649 correlation_id: None,
650 timestamp: Utc::now(),
651 message: BusMessage::Heartbeat {
652 agent_id: "nobody".into(),
653 status: "ok".into(),
654 },
655 };
656 let count = bus.publish(env);
657 assert_eq!(count, 0);
658 }
659
660 #[tokio::test]
661 async fn test_recv_topic_filter() {
662 let bus = AgentBus::new().into_arc();
663 let handle = bus.handle("agent-x");
664 let mut listener = bus.handle("listener");
665
666 handle.send(
668 "task.1",
669 BusMessage::TaskUpdate {
670 task_id: "1".into(),
671 state: TaskState::Working,
672 message: None,
673 },
674 );
675 handle.send(
676 "task.2",
677 BusMessage::TaskUpdate {
678 task_id: "2".into(),
679 state: TaskState::Completed,
680 message: None,
681 },
682 );
683
684 let env = listener.recv_topic("task.2").await.unwrap();
686 match &env.message {
687 BusMessage::TaskUpdate { task_id, .. } => assert_eq!(task_id, "2"),
688 other => panic!("unexpected: {other:?}"),
689 }
690 }
691
692 #[tokio::test]
693 async fn test_ready_shutdown_syncs_registry() {
694 let bus = AgentBus::new().into_arc();
695 let handle = bus.handle("planner-1");
696
697 handle.announce_ready(vec!["plan".to_string(), "review".to_string()]);
698 assert!(bus.registry.get("planner-1").is_some());
699
700 handle.announce_shutdown();
701 assert!(bus.registry.get("planner-1").is_none());
702 }
703}