Skip to main content

agentzero_core/
event_bus.rs

1//! Event bus for inter-agent communication.
2//!
3//! The bus is the central nervous system for all messages in AgentZero.
4//! Agents, channels, and system components publish and subscribe to events
5//! on the bus. The `EventBus` trait abstracts over transport so a future
6//! multi-node implementation (e.g. iroh QUIC) can be swapped in without
7//! changing any agent code.
8
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use std::time::{SystemTime, UNIX_EPOCH};
12use tokio::sync::broadcast;
13
14/// A message on the bus. Agents produce and consume these.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct Event {
17    /// Unique event identifier.
18    pub id: String,
19    /// Topic string for pub/sub routing (e.g. "task.image.complete").
20    pub topic: String,
21    /// Who published this event (agent_id, channel name, or "system").
22    pub source: String,
23    /// Event payload — typically JSON, but can be any string.
24    pub payload: String,
25    /// Privacy boundary inherited from the source (e.g. "local_only", "any").
26    pub privacy_boundary: String,
27    /// Unix timestamp in milliseconds.
28    pub timestamp_ms: u64,
29    /// Traces a chain of events back to the original trigger.
30    /// All events in an agent chain share the same correlation_id.
31    pub correlation_id: Option<String>,
32}
33
34impl Event {
35    /// Create a new event with a generated id and current timestamp.
36    pub fn new(
37        topic: impl Into<String>,
38        source: impl Into<String>,
39        payload: impl Into<String>,
40    ) -> Self {
41        Self {
42            id: new_event_id(),
43            topic: topic.into(),
44            source: source.into(),
45            payload: payload.into(),
46            privacy_boundary: String::new(),
47            timestamp_ms: now_ms(),
48            correlation_id: None,
49        }
50    }
51
52    /// Set the correlation id (builder pattern).
53    pub fn with_correlation(mut self, id: impl Into<String>) -> Self {
54        self.correlation_id = Some(id.into());
55        self
56    }
57
58    /// Set the privacy boundary (builder pattern).
59    pub fn with_boundary(mut self, boundary: impl Into<String>) -> Self {
60        self.privacy_boundary = boundary.into();
61        self
62    }
63}
64
65/// Trait for the event bus — abstracts over in-memory vs distributed transports.
66#[async_trait]
67pub trait EventBus: Send + Sync {
68    /// Publish an event to all subscribers.
69    async fn publish(&self, event: Event) -> anyhow::Result<()>;
70
71    /// Create a new subscriber that receives all future events.
72    fn subscribe(&self) -> Box<dyn EventSubscriber>;
73
74    /// Number of active subscribers.
75    fn subscriber_count(&self) -> usize;
76}
77
78/// Subscriber that can filter and receive events.
79#[async_trait]
80pub trait EventSubscriber: Send {
81    /// Receive the next event.
82    async fn recv(&mut self) -> anyhow::Result<Event>;
83
84    /// Receive the next event whose topic starts with the given prefix.
85    /// Events that don't match are silently skipped.
86    async fn recv_filtered(&mut self, topic_prefix: &str) -> anyhow::Result<Event> {
87        loop {
88            let event = self.recv().await?;
89            if event.topic.starts_with(topic_prefix) {
90                return Ok(event);
91            }
92        }
93    }
94}
95
96/// In-memory bus backed by `tokio::sync::broadcast`.
97pub struct InMemoryBus {
98    tx: broadcast::Sender<Event>,
99}
100
101impl InMemoryBus {
102    /// Create a new bus with the given channel capacity.
103    /// Lagged receivers will skip missed events (lossy).
104    pub fn new(capacity: usize) -> Self {
105        let (tx, _) = broadcast::channel(capacity);
106        Self { tx }
107    }
108
109    /// Default capacity suitable for most single-process deployments.
110    pub fn default_capacity() -> Self {
111        Self::new(256)
112    }
113}
114
115#[async_trait]
116impl EventBus for InMemoryBus {
117    async fn publish(&self, event: Event) -> anyhow::Result<()> {
118        // send returns Err if there are no receivers — that's fine,
119        // the event is simply dropped.
120        let _ = self.tx.send(event);
121        Ok(())
122    }
123
124    fn subscribe(&self) -> Box<dyn EventSubscriber> {
125        Box::new(InMemorySubscriber {
126            rx: self.tx.subscribe(),
127        })
128    }
129
130    fn subscriber_count(&self) -> usize {
131        self.tx.receiver_count()
132    }
133}
134
135/// Subscriber for the in-memory bus.
136pub struct InMemorySubscriber {
137    rx: broadcast::Receiver<Event>,
138}
139
140#[async_trait]
141impl EventSubscriber for InMemorySubscriber {
142    async fn recv(&mut self) -> anyhow::Result<Event> {
143        loop {
144            match self.rx.recv().await {
145                Ok(event) => return Ok(event),
146                Err(broadcast::error::RecvError::Lagged(n)) => {
147                    tracing::warn!(skipped = n, "event bus subscriber lagged, skipping events");
148                    // Continue to receive the next available event.
149                }
150                Err(broadcast::error::RecvError::Closed) => {
151                    anyhow::bail!("event bus closed");
152                }
153            }
154        }
155    }
156}
157
158/// Glob-style topic matching: "task.image.*" matches "task.image.complete".
159pub fn topic_matches(pattern: &str, topic: &str) -> bool {
160    if pattern == "*" {
161        return true;
162    }
163    if pattern.ends_with(".*") {
164        let prefix = &pattern[..pattern.len() - 1];
165        topic.starts_with(prefix)
166    } else {
167        pattern == topic
168    }
169}
170
171/// Generate a simple unique event ID (timestamp + random suffix).
172fn new_event_id() -> String {
173    use std::sync::atomic::{AtomicU64, Ordering};
174    static COUNTER: AtomicU64 = AtomicU64::new(0);
175    let ts = now_ms();
176    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
177    format!("evt-{ts}-{seq}")
178}
179
180fn now_ms() -> u64 {
181    SystemTime::now()
182        .duration_since(UNIX_EPOCH)
183        .unwrap_or_default()
184        .as_millis() as u64
185}
186
187/// Helper to check if two privacy boundaries are compatible.
188/// An event can flow from `source_boundary` to a consumer with `consumer_boundary`
189/// only if the consumer's boundary is at least as restrictive.
190pub fn is_boundary_compatible(source_boundary: &str, consumer_boundary: &str) -> bool {
191    // Boundary hierarchy: local_only > encrypted_only > any > "" (no restriction)
192    fn level(b: &str) -> u8 {
193        match b {
194            "local_only" => 3,
195            "encrypted_only" => 2,
196            "any" => 1,
197            _ => 0,
198        }
199    }
200    // A consumer can receive events from a source if the consumer's restriction
201    // level is >= the source's level (i.e., consumer is at least as restrictive).
202    // A "local_only" event can only go to "local_only" consumers.
203    // An unrestricted event can go anywhere.
204    level(consumer_boundary) >= level(source_boundary)
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use std::sync::Arc;
211
212    #[test]
213    fn topic_matching() {
214        assert!(topic_matches("task.image.*", "task.image.complete"));
215        assert!(topic_matches("task.image.*", "task.image.error"));
216        assert!(!topic_matches("task.image.*", "task.text.complete"));
217        assert!(topic_matches(
218            "channel.telegram.message",
219            "channel.telegram.message"
220        ));
221        assert!(!topic_matches(
222            "channel.telegram.message",
223            "channel.slack.message"
224        ));
225        assert!(topic_matches("*", "anything.at.all"));
226    }
227
228    #[test]
229    fn boundary_compatibility() {
230        // Unrestricted events go anywhere
231        assert!(is_boundary_compatible("", ""));
232        assert!(is_boundary_compatible("", "any"));
233        assert!(is_boundary_compatible("", "local_only"));
234
235        // "any" events go to "any" or more restrictive
236        assert!(is_boundary_compatible("any", "any"));
237        assert!(is_boundary_compatible("any", "encrypted_only"));
238        assert!(is_boundary_compatible("any", "local_only"));
239
240        // "local_only" events only go to "local_only"
241        assert!(is_boundary_compatible("local_only", "local_only"));
242        assert!(!is_boundary_compatible("local_only", "any"));
243        assert!(!is_boundary_compatible("local_only", "encrypted_only"));
244        assert!(!is_boundary_compatible("local_only", ""));
245
246        // "encrypted_only" events go to "encrypted_only" or "local_only"
247        assert!(is_boundary_compatible("encrypted_only", "encrypted_only"));
248        assert!(is_boundary_compatible("encrypted_only", "local_only"));
249        assert!(!is_boundary_compatible("encrypted_only", "any"));
250    }
251
252    #[test]
253    fn event_builder() {
254        let event = Event::new("task.test", "agent-1", r#"{"result":"ok"}"#)
255            .with_correlation("corr-123")
256            .with_boundary("local_only");
257
258        assert_eq!(event.topic, "task.test");
259        assert_eq!(event.source, "agent-1");
260        assert_eq!(event.correlation_id.as_deref(), Some("corr-123"));
261        assert_eq!(event.privacy_boundary, "local_only");
262        assert!(event.timestamp_ms > 0);
263        assert!(event.id.starts_with("evt-"));
264    }
265
266    #[tokio::test]
267    async fn in_memory_bus_publish_subscribe() {
268        let bus = InMemoryBus::new(16);
269        let mut sub = bus.subscribe();
270
271        bus.publish(Event::new("test.topic", "src", "hello"))
272            .await
273            .unwrap();
274
275        let event = sub.recv().await.unwrap();
276        assert_eq!(event.topic, "test.topic");
277        assert_eq!(event.payload, "hello");
278    }
279
280    #[tokio::test]
281    async fn in_memory_bus_multiple_subscribers() {
282        let bus = InMemoryBus::new(16);
283        let mut sub1 = bus.subscribe();
284        let mut sub2 = bus.subscribe();
285
286        assert_eq!(bus.subscriber_count(), 2);
287
288        bus.publish(Event::new("t", "s", "data")).await.unwrap();
289
290        let e1 = sub1.recv().await.unwrap();
291        let e2 = sub2.recv().await.unwrap();
292        assert_eq!(e1.payload, "data");
293        assert_eq!(e2.payload, "data");
294    }
295
296    #[tokio::test]
297    async fn filtered_recv() {
298        let bus = InMemoryBus::new(16);
299        let mut sub = bus.subscribe();
300
301        bus.publish(Event::new("task.text.complete", "a", "text"))
302            .await
303            .unwrap();
304        bus.publish(Event::new("task.image.complete", "b", "image"))
305            .await
306            .unwrap();
307
308        let event = sub.recv_filtered("task.image.").await.unwrap();
309        assert_eq!(event.payload, "image");
310    }
311
312    #[tokio::test]
313    async fn publish_with_no_subscribers_is_ok() {
314        let bus = InMemoryBus::new(16);
315        // No subscribers — should not error
316        bus.publish(Event::new("orphan", "system", ""))
317            .await
318            .unwrap();
319    }
320
321    #[tokio::test]
322    async fn bus_closed_returns_error() {
323        let bus = Arc::new(InMemoryBus::new(16));
324        let mut sub = bus.subscribe();
325
326        // Drop the bus (and its sender)
327        drop(bus);
328
329        let result = sub.recv().await;
330        assert!(result.is_err());
331    }
332}