use cognis_core::Message;
use serde::{Deserialize, Serialize};
use crate::agent_bus::{AgentBus, SubscribeError, Subscription};
use crate::multi_agent::AgentMessage;
pub const DEFAULT_EVENTS_TOPIC: &str = "agent.events";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum AgentEvent {
Started {
agent_id: String,
input: String,
},
Finished {
agent_id: String,
output: String,
iterations: u32,
},
ToolCalled {
agent_id: String,
tool_name: String,
args: serde_json::Value,
},
ToolResult {
agent_id: String,
tool_name: String,
result: serde_json::Value,
},
Errored {
agent_id: String,
error: String,
},
Custom {
agent_id: String,
tag: String,
payload: serde_json::Value,
},
}
impl AgentEvent {
pub fn agent_id(&self) -> &str {
match self {
AgentEvent::Started { agent_id, .. }
| AgentEvent::Finished { agent_id, .. }
| AgentEvent::ToolCalled { agent_id, .. }
| AgentEvent::ToolResult { agent_id, .. }
| AgentEvent::Errored { agent_id, .. }
| AgentEvent::Custom { agent_id, .. } => agent_id,
}
}
}
#[derive(Clone)]
pub struct AgentEventBus {
inner: AgentBus,
topic: String,
}
impl Default for AgentEventBus {
fn default() -> Self {
Self::new()
}
}
impl AgentEventBus {
pub fn new() -> Self {
Self::with_bus(AgentBus::new())
}
pub fn with_bus(bus: AgentBus) -> Self {
Self {
inner: bus,
topic: DEFAULT_EVENTS_TOPIC.into(),
}
}
pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
self.topic = topic.into();
self
}
pub fn bus(&self) -> &AgentBus {
&self.inner
}
pub async fn publish(&self, event: AgentEvent) -> usize {
let payload = serde_json::to_value(&event).unwrap_or(serde_json::Value::Null);
let agent_id = event.agent_id().to_string();
let envelope = AgentMessage {
from: agent_id,
to: self.topic.clone(),
content: Message::system("agent-event"),
metadata: serde_json::json!({ "event": payload }),
..Default::default()
};
self.inner.publish(&self.topic, envelope).await
}
pub async fn subscribe(&self) -> EventSubscription {
EventSubscription {
inner: self.inner.subscribe(&self.topic).await,
}
}
pub async fn topic_count(&self) -> usize {
self.inner.topic_count().await
}
pub async fn subscriber_count(&self) -> usize {
self.inner.subscriber_count(&self.topic).await
}
}
pub struct EventSubscription {
inner: Subscription,
}
impl EventSubscription {
pub fn topic(&self) -> &str {
self.inner.topic()
}
pub async fn recv(&mut self) -> Result<AgentEvent, SubscribeError> {
let envelope = self.inner.recv().await?;
decode_event(&envelope).map_err(|_| SubscribeError::Closed)
}
}
fn decode_event(env: &AgentMessage) -> Result<AgentEvent, serde_json::Error> {
let v = env
.metadata
.get("event")
.cloned()
.unwrap_or(serde_json::Value::Null);
serde_json::from_value(v)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn started_event_round_trip() {
let bus = AgentEventBus::new();
let mut sub = bus.subscribe().await;
let n = bus
.publish(AgentEvent::Started {
agent_id: "writer".into(),
input: "hello".into(),
})
.await;
assert_eq!(n, 1, "subscriber should receive the event");
match sub.recv().await.unwrap() {
AgentEvent::Started { agent_id, input } => {
assert_eq!(agent_id, "writer");
assert_eq!(input, "hello");
}
other => panic!("expected Started, got {other:?}"),
}
}
#[tokio::test]
async fn fanout_to_every_subscriber() {
let bus = AgentEventBus::new();
let mut a = bus.subscribe().await;
let mut b = bus.subscribe().await;
bus.publish(AgentEvent::Errored {
agent_id: "x".into(),
error: "boom".into(),
})
.await;
for sub in [&mut a, &mut b] {
match sub.recv().await.unwrap() {
AgentEvent::Errored { error, .. } => assert_eq!(error, "boom"),
_ => panic!("variant"),
}
}
}
#[tokio::test]
async fn custom_topic_isolates_subscriptions() {
let other = AgentEventBus::new().with_topic("alerts");
let mut sub = other.subscribe().await;
let unrelated = AgentEventBus::new();
let _unrelated_sub = unrelated.subscribe().await;
other
.publish(AgentEvent::Custom {
agent_id: "watchdog".into(),
tag: "alert".into(),
payload: serde_json::json!({"severity": "high"}),
})
.await;
match sub.recv().await.unwrap() {
AgentEvent::Custom { tag, .. } => assert_eq!(tag, "alert"),
_ => panic!("variant"),
}
}
#[tokio::test]
async fn shared_underlying_bus() {
let inner = AgentBus::new();
let a = AgentEventBus::with_bus(inner.clone());
let b = AgentEventBus::with_bus(inner);
let mut sub = b.subscribe().await;
a.publish(AgentEvent::Started {
agent_id: "shared".into(),
input: "ok".into(),
})
.await;
match sub.recv().await.unwrap() {
AgentEvent::Started { agent_id, .. } => assert_eq!(agent_id, "shared"),
_ => panic!("variant"),
}
}
#[tokio::test]
async fn subscriber_count_tracks_active() {
let bus = AgentEventBus::new();
let _a = bus.subscribe().await;
let _b = bus.subscribe().await;
assert_eq!(bus.subscriber_count().await, 2);
}
}