pub mod registry;
pub mod relay;
pub mod s3_sink;
use crate::a2a::types::{Artifact, Part, TaskState};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::broadcast;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BusEnvelope {
pub id: String,
pub topic: String,
pub sender_id: String,
pub correlation_id: Option<String>,
pub timestamp: DateTime<Utc>,
pub message: BusMessage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum BusMessage {
AgentReady {
agent_id: String,
capabilities: Vec<String>,
},
AgentShutdown { agent_id: String },
AgentMessage {
from: String,
to: String,
parts: Vec<Part>,
},
TaskUpdate {
task_id: String,
state: TaskState,
message: Option<String>,
},
ArtifactUpdate { task_id: String, artifact: Artifact },
SharedResult {
key: String,
value: serde_json::Value,
tags: Vec<String>,
},
ToolRequest {
request_id: String,
agent_id: String,
tool_name: String,
arguments: serde_json::Value,
},
ToolResponse {
request_id: String,
agent_id: String,
tool_name: String,
result: String,
success: bool,
},
Heartbeat { agent_id: String, status: String },
RalphLearning {
prd_id: String,
story_id: String,
iteration: usize,
learnings: Vec<String>,
context: serde_json::Value,
},
RalphHandoff {
prd_id: String,
from_story: String,
to_story: String,
context: serde_json::Value,
progress_summary: String,
},
RalphProgress {
prd_id: String,
passed: usize,
total: usize,
iteration: usize,
status: String,
},
ToolOutputFull {
agent_id: String,
tool_name: String,
output: String,
success: bool,
step: usize,
},
AgentThinking {
agent_id: String,
thinking: String,
step: usize,
},
VoiceSessionStarted {
room_name: String,
agent_id: String,
voice_id: String,
},
VoiceTranscript {
room_name: String,
text: String,
role: String,
is_final: bool,
},
VoiceAgentStateChanged { room_name: String, state: String },
VoiceSessionEnded { room_name: String, reason: String },
}
const DEFAULT_BUS_CAPACITY: usize = 4096;
pub struct AgentBus {
tx: broadcast::Sender<BusEnvelope>,
pub registry: Arc<registry::AgentRegistry>,
}
impl std::fmt::Debug for AgentBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AgentBus")
.field("subscribers", &self.tx.receiver_count())
.finish()
}
}
impl AgentBus {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_BUS_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity);
Self {
tx,
registry: Arc::new(registry::AgentRegistry::new()),
}
}
pub fn into_arc(self) -> Arc<Self> {
Arc::new(self)
}
pub fn handle(self: &Arc<Self>, agent_id: impl Into<String>) -> BusHandle {
BusHandle {
agent_id: agent_id.into(),
bus: Arc::clone(self),
rx: self.tx.subscribe(),
}
}
pub fn publish(&self, envelope: BusEnvelope) -> usize {
match &envelope.message {
BusMessage::AgentReady {
agent_id,
capabilities,
} => {
self.registry.register_ready(agent_id, capabilities);
}
BusMessage::AgentShutdown { agent_id } => {
self.registry.deregister(agent_id);
}
_ => {}
}
self.tx.send(envelope).unwrap_or(0)
}
pub fn receiver_count(&self) -> usize {
self.tx.receiver_count()
}
}
impl Default for AgentBus {
fn default() -> Self {
Self::new()
}
}
pub struct BusHandle {
agent_id: String,
bus: Arc<AgentBus>,
rx: broadcast::Receiver<BusEnvelope>,
}
impl BusHandle {
pub fn agent_id(&self) -> &str {
&self.agent_id
}
pub fn send(&self, topic: impl Into<String>, message: BusMessage) -> usize {
self.send_with_correlation(topic, message, None)
}
pub fn send_with_correlation(
&self,
topic: impl Into<String>,
message: BusMessage,
correlation_id: Option<String>,
) -> usize {
let envelope = BusEnvelope {
id: Uuid::new_v4().to_string(),
topic: topic.into(),
sender_id: self.agent_id.clone(),
correlation_id,
timestamp: Utc::now(),
message,
};
self.bus.publish(envelope)
}
pub fn announce_ready(&self, capabilities: Vec<String>) -> usize {
self.send(
"broadcast",
BusMessage::AgentReady {
agent_id: self.agent_id.clone(),
capabilities,
},
)
}
pub fn announce_shutdown(&self) -> usize {
self.send(
"broadcast",
BusMessage::AgentShutdown {
agent_id: self.agent_id.clone(),
},
)
}
pub fn send_task_update(
&self,
task_id: &str,
state: TaskState,
message: Option<String>,
) -> usize {
self.send(
format!("task.{task_id}"),
BusMessage::TaskUpdate {
task_id: task_id.to_string(),
state,
message,
},
)
}
pub fn send_artifact_update(&self, task_id: &str, artifact: Artifact) -> usize {
self.send(
format!("task.{task_id}"),
BusMessage::ArtifactUpdate {
task_id: task_id.to_string(),
artifact,
},
)
}
pub fn send_to_agent(&self, to: &str, parts: Vec<Part>) -> usize {
self.send(
format!("agent.{to}"),
BusMessage::AgentMessage {
from: self.agent_id.clone(),
to: to.to_string(),
parts,
},
)
}
pub fn publish_shared_result(
&self,
key: impl Into<String>,
value: serde_json::Value,
tags: Vec<String>,
) -> usize {
let key = key.into();
self.send(
format!("results.{}", &key),
BusMessage::SharedResult { key, value, tags },
)
}
pub fn publish_ralph_learning(
&self,
prd_id: &str,
story_id: &str,
iteration: usize,
learnings: Vec<String>,
context: serde_json::Value,
) -> usize {
self.send(
format!("ralph.{prd_id}"),
BusMessage::RalphLearning {
prd_id: prd_id.to_string(),
story_id: story_id.to_string(),
iteration,
learnings,
context,
},
)
}
pub fn publish_ralph_handoff(
&self,
prd_id: &str,
from_story: &str,
to_story: &str,
context: serde_json::Value,
progress_summary: &str,
) -> usize {
self.send(
format!("ralph.{prd_id}"),
BusMessage::RalphHandoff {
prd_id: prd_id.to_string(),
from_story: from_story.to_string(),
to_story: to_story.to_string(),
context,
progress_summary: progress_summary.to_string(),
},
)
}
pub fn publish_ralph_progress(
&self,
prd_id: &str,
passed: usize,
total: usize,
iteration: usize,
status: &str,
) -> usize {
self.send(
format!("ralph.{prd_id}"),
BusMessage::RalphProgress {
prd_id: prd_id.to_string(),
passed,
total,
iteration,
status: status.to_string(),
},
)
}
pub fn drain_ralph_learnings(&mut self, prd_id: &str) -> Vec<BusEnvelope> {
let prefix = format!("ralph.{prd_id}");
let mut out = Vec::new();
while let Some(env) = self.try_recv() {
if env.topic.starts_with(&prefix) {
if matches!(
&env.message,
BusMessage::RalphLearning { .. } | BusMessage::RalphHandoff { .. }
) {
out.push(env);
}
}
}
out
}
pub fn send_voice_session_started(&self, room_name: &str, voice_id: &str) -> usize {
self.send(
format!("voice.{room_name}"),
BusMessage::VoiceSessionStarted {
room_name: room_name.to_string(),
agent_id: self.agent_id.clone(),
voice_id: voice_id.to_string(),
},
)
}
pub fn send_voice_transcript(
&self,
room_name: &str,
text: &str,
role: &str,
is_final: bool,
) -> usize {
self.send(
format!("voice.{room_name}"),
BusMessage::VoiceTranscript {
room_name: room_name.to_string(),
text: text.to_string(),
role: role.to_string(),
is_final,
},
)
}
pub fn send_voice_agent_state(&self, room_name: &str, state: &str) -> usize {
self.send(
format!("voice.{room_name}"),
BusMessage::VoiceAgentStateChanged {
room_name: room_name.to_string(),
state: state.to_string(),
},
)
}
pub fn send_voice_session_ended(&self, room_name: &str, reason: &str) -> usize {
self.send(
format!("voice.{room_name}"),
BusMessage::VoiceSessionEnded {
room_name: room_name.to_string(),
reason: reason.to_string(),
},
)
}
pub async fn recv(&mut self) -> Option<BusEnvelope> {
loop {
match self.rx.recv().await {
Ok(env) => return Some(env),
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(
agent_id = %self.agent_id,
skipped = n,
"Bus handle lagged, skipping messages"
);
continue;
}
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
pub async fn recv_topic(&mut self, prefix: &str) -> Option<BusEnvelope> {
loop {
match self.recv().await {
Some(env) if env.topic.starts_with(prefix) => return Some(env),
Some(_) => continue, None => return None,
}
}
}
pub async fn recv_mine(&mut self) -> Option<BusEnvelope> {
let prefix = format!("agent.{}", self.agent_id);
self.recv_topic(&prefix).await
}
pub fn try_recv(&mut self) -> Option<BusEnvelope> {
loop {
match self.rx.try_recv() {
Ok(env) => return Some(env),
Err(broadcast::error::TryRecvError::Lagged(n)) => {
tracing::warn!(
agent_id = %self.agent_id,
skipped = n,
"Bus handle lagged (try_recv), skipping"
);
continue;
}
Err(broadcast::error::TryRecvError::Empty)
| Err(broadcast::error::TryRecvError::Closed) => return None,
}
}
}
pub fn registry(&self) -> &Arc<registry::AgentRegistry> {
&self.bus.registry
}
pub fn into_receiver(self) -> broadcast::Receiver<BusEnvelope> {
self.rx
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_bus_send_recv() {
let bus = AgentBus::new().into_arc();
let mut handle_a = bus.handle("agent-a");
let mut handle_b = bus.handle("agent-b");
handle_a.send_to_agent(
"agent-b",
vec![Part::Text {
text: "hello".into(),
}],
);
let env = handle_b.recv().await.unwrap();
assert_eq!(env.topic, "agent.agent-b");
match &env.message {
BusMessage::AgentMessage { from, to, .. } => {
assert_eq!(from, "agent-a");
assert_eq!(to, "agent-b");
}
other => panic!("unexpected message: {other:?}"),
}
let env_a = handle_a.try_recv().unwrap();
assert_eq!(env_a.topic, "agent.agent-b");
}
#[tokio::test]
async fn test_bus_task_update() {
let bus = AgentBus::new().into_arc();
let handle = bus.handle("worker-1");
let h2 = bus.handle("observer");
let mut h2 = h2;
handle.send_task_update("task-42", TaskState::Working, Some("processing".into()));
let env = h2.recv().await.unwrap();
assert_eq!(env.topic, "task.task-42");
match &env.message {
BusMessage::TaskUpdate { task_id, state, .. } => {
assert_eq!(task_id, "task-42");
assert_eq!(*state, TaskState::Working);
}
other => panic!("unexpected: {other:?}"),
}
}
#[tokio::test]
async fn test_bus_no_receivers() {
let bus = AgentBus::new().into_arc();
let env = BusEnvelope {
id: "test".into(),
topic: "broadcast".into(),
sender_id: "nobody".into(),
correlation_id: None,
timestamp: Utc::now(),
message: BusMessage::Heartbeat {
agent_id: "nobody".into(),
status: "ok".into(),
},
};
let count = bus.publish(env);
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_recv_topic_filter() {
let bus = AgentBus::new().into_arc();
let handle = bus.handle("agent-x");
let mut listener = bus.handle("listener");
handle.send(
"task.1",
BusMessage::TaskUpdate {
task_id: "1".into(),
state: TaskState::Working,
message: None,
},
);
handle.send(
"task.2",
BusMessage::TaskUpdate {
task_id: "2".into(),
state: TaskState::Completed,
message: None,
},
);
let env = listener.recv_topic("task.2").await.unwrap();
match &env.message {
BusMessage::TaskUpdate { task_id, .. } => assert_eq!(task_id, "2"),
other => panic!("unexpected: {other:?}"),
}
}
#[tokio::test]
async fn test_ready_shutdown_syncs_registry() {
let bus = AgentBus::new().into_arc();
let handle = bus.handle("planner-1");
handle.announce_ready(vec!["plan".to_string(), "review".to_string()]);
assert!(bus.registry.get("planner-1").is_some());
handle.announce_shutdown();
assert!(bus.registry.get("planner-1").is_none());
}
}