Skip to main content

agentzero_core/
event_bus.rs

1//! Event bus for inter-agent communication.
2//!
3//! The bus is the central nervous system for all messages in AgentZero.
4//! Agents, channels, and system components publish and subscribe to events
5//! on the bus. The `EventBus` trait abstracts over transport so a future
6//! multi-node implementation (e.g. iroh QUIC) can be swapped in without
7//! changing any agent code.
8//!
9//! `FileBackedBus` wraps the in-memory broadcast with append-only JSONL
10//! persistence so events survive process restarts (useful for research
11//! pipelines and audit trails).
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use std::path::{Path, PathBuf};
16use std::time::{SystemTime, UNIX_EPOCH};
17use tokio::sync::broadcast;
18
19/// A message on the bus. Agents produce and consume these.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct Event {
22    /// Unique event identifier.
23    pub id: String,
24    /// Topic string for pub/sub routing (e.g. "task.image.complete").
25    pub topic: String,
26    /// Who published this event (agent_id, channel name, or "system").
27    pub source: String,
28    /// Event payload — typically JSON, but can be any string.
29    pub payload: String,
30    /// Privacy boundary inherited from the source (e.g. "local_only", "any").
31    pub privacy_boundary: String,
32    /// Unix timestamp in milliseconds.
33    pub timestamp_ms: u64,
34    /// Traces a chain of events back to the original trigger.
35    /// All events in an agent chain share the same correlation_id.
36    pub correlation_id: Option<String>,
37}
38
39impl Event {
40    /// Create a new event with a generated id and current timestamp.
41    pub fn new(
42        topic: impl Into<String>,
43        source: impl Into<String>,
44        payload: impl Into<String>,
45    ) -> Self {
46        Self {
47            id: new_event_id(),
48            topic: topic.into(),
49            source: source.into(),
50            payload: payload.into(),
51            privacy_boundary: String::new(),
52            timestamp_ms: now_ms(),
53            correlation_id: None,
54        }
55    }
56
57    /// Set the correlation id (builder pattern).
58    pub fn with_correlation(mut self, id: impl Into<String>) -> Self {
59        self.correlation_id = Some(id.into());
60        self
61    }
62
63    /// Set the privacy boundary (builder pattern).
64    pub fn with_boundary(mut self, boundary: impl Into<String>) -> Self {
65        self.privacy_boundary = boundary.into();
66        self
67    }
68}
69
70/// Trait for the event bus — abstracts over in-memory vs distributed transports.
71#[async_trait]
72pub trait EventBus: Send + Sync {
73    /// Publish an event to all subscribers.
74    async fn publish(&self, event: Event) -> anyhow::Result<()>;
75
76    /// Create a new subscriber that receives all future events.
77    fn subscribe(&self) -> Box<dyn EventSubscriber>;
78
79    /// Number of active subscribers.
80    fn subscriber_count(&self) -> usize;
81}
82
83/// Subscriber that can filter and receive events.
84#[async_trait]
85pub trait EventSubscriber: Send {
86    /// Receive the next event.
87    async fn recv(&mut self) -> anyhow::Result<Event>;
88
89    /// Receive the next event whose topic starts with the given prefix.
90    /// Events that don't match are silently skipped.
91    async fn recv_filtered(&mut self, topic_prefix: &str) -> anyhow::Result<Event> {
92        loop {
93            let event = self.recv().await?;
94            if event.topic.starts_with(topic_prefix) {
95                return Ok(event);
96            }
97        }
98    }
99}
100
101/// In-memory bus backed by `tokio::sync::broadcast`.
102pub struct InMemoryBus {
103    tx: broadcast::Sender<Event>,
104}
105
106impl InMemoryBus {
107    /// Create a new bus with the given channel capacity.
108    /// Lagged receivers will skip missed events (lossy).
109    pub fn new(capacity: usize) -> Self {
110        let (tx, _) = broadcast::channel(capacity);
111        Self { tx }
112    }
113
114    /// Default capacity suitable for most single-process deployments.
115    pub fn default_capacity() -> Self {
116        Self::new(256)
117    }
118}
119
120#[async_trait]
121impl EventBus for InMemoryBus {
122    async fn publish(&self, event: Event) -> anyhow::Result<()> {
123        // send returns Err if there are no receivers — that's fine,
124        // the event is simply dropped.
125        let _ = self.tx.send(event);
126        Ok(())
127    }
128
129    fn subscribe(&self) -> Box<dyn EventSubscriber> {
130        Box::new(InMemorySubscriber {
131            rx: self.tx.subscribe(),
132        })
133    }
134
135    fn subscriber_count(&self) -> usize {
136        self.tx.receiver_count()
137    }
138}
139
140/// Subscriber for the in-memory bus.
141pub struct InMemorySubscriber {
142    rx: broadcast::Receiver<Event>,
143}
144
145#[async_trait]
146impl EventSubscriber for InMemorySubscriber {
147    async fn recv(&mut self) -> anyhow::Result<Event> {
148        loop {
149            match self.rx.recv().await {
150                Ok(event) => return Ok(event),
151                Err(broadcast::error::RecvError::Lagged(n)) => {
152                    tracing::warn!(skipped = n, "event bus subscriber lagged, skipping events");
153                    // Continue to receive the next available event.
154                }
155                Err(broadcast::error::RecvError::Closed) => {
156                    anyhow::bail!("event bus closed");
157                }
158            }
159        }
160    }
161}
162
163/// File-backed event bus: wraps `InMemoryBus` with append-only JSONL persistence.
164///
165/// Every published event is serialized to a JSONL log file before being
166/// broadcast to live subscribers. Use [`FileBackedBus::replay`] to read
167/// back all persisted events (e.g. after a restart).
168pub struct FileBackedBus {
169    inner: InMemoryBus,
170    log_path: PathBuf,
171    writer: tokio::sync::Mutex<tokio::io::BufWriter<tokio::fs::File>>,
172}
173
174impl FileBackedBus {
175    /// Open (or create) a JSONL event log at `path` with the given broadcast capacity.
176    pub async fn open(path: impl AsRef<Path>, capacity: usize) -> anyhow::Result<Self> {
177        let log_path = path.as_ref().to_path_buf();
178        if let Some(parent) = log_path.parent() {
179            if !parent.as_os_str().is_empty() {
180                tokio::fs::create_dir_all(parent).await?;
181            }
182        }
183
184        let file = tokio::fs::OpenOptions::new()
185            .create(true)
186            .append(true)
187            .open(&log_path)
188            .await?;
189
190        Ok(Self {
191            inner: InMemoryBus::new(capacity),
192            log_path,
193            writer: tokio::sync::Mutex::new(tokio::io::BufWriter::new(file)),
194        })
195    }
196
197    /// Replay all persisted events, optionally filtered by topic prefix.
198    pub async fn replay(&self, topic_filter: Option<&str>) -> anyhow::Result<Vec<Event>> {
199        use tokio::io::AsyncBufReadExt;
200
201        let file = tokio::fs::File::open(&self.log_path).await?;
202        let reader = tokio::io::BufReader::new(file);
203        let mut lines = reader.lines();
204        let mut events = Vec::new();
205
206        while let Some(line) = lines.next_line().await? {
207            if line.trim().is_empty() {
208                continue;
209            }
210            match serde_json::from_str::<Event>(&line) {
211                Ok(evt) => {
212                    if let Some(prefix) = topic_filter {
213                        if !evt.topic.starts_with(prefix) {
214                            continue;
215                        }
216                    }
217                    events.push(evt);
218                }
219                Err(e) => {
220                    tracing::warn!(error = %e, "skipping malformed event line in log");
221                }
222            }
223        }
224
225        Ok(events)
226    }
227
228    /// Return the path to the JSONL log file.
229    pub fn log_path(&self) -> &Path {
230        &self.log_path
231    }
232}
233
234#[async_trait]
235impl EventBus for FileBackedBus {
236    async fn publish(&self, event: Event) -> anyhow::Result<()> {
237        use tokio::io::AsyncWriteExt;
238
239        // Persist first, then broadcast.
240        let mut line = serde_json::to_string(&event)?;
241        line.push('\n');
242
243        {
244            let mut w = self.writer.lock().await;
245            w.write_all(line.as_bytes()).await?;
246            w.flush().await?;
247        }
248
249        self.inner.publish(event).await
250    }
251
252    fn subscribe(&self) -> Box<dyn EventSubscriber> {
253        self.inner.subscribe()
254    }
255
256    fn subscriber_count(&self) -> usize {
257        self.inner.subscriber_count()
258    }
259}
260
261/// Glob-style topic matching: "task.image.*" matches "task.image.complete".
262pub fn topic_matches(pattern: &str, topic: &str) -> bool {
263    if pattern == "*" {
264        return true;
265    }
266    if pattern.ends_with(".*") {
267        let prefix = &pattern[..pattern.len() - 1];
268        topic.starts_with(prefix)
269    } else {
270        pattern == topic
271    }
272}
273
274/// Generate a simple unique event ID (timestamp + random suffix).
275fn new_event_id() -> String {
276    use std::sync::atomic::{AtomicU64, Ordering};
277    static COUNTER: AtomicU64 = AtomicU64::new(0);
278    let ts = now_ms();
279    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
280    format!("evt-{ts}-{seq}")
281}
282
283fn now_ms() -> u64 {
284    SystemTime::now()
285        .duration_since(UNIX_EPOCH)
286        .unwrap_or_default()
287        .as_millis() as u64
288}
289
290/// Helper to check if two privacy boundaries are compatible.
291/// An event can flow from `source_boundary` to a consumer with `consumer_boundary`
292/// only if the consumer's boundary is at least as restrictive.
293pub fn is_boundary_compatible(source_boundary: &str, consumer_boundary: &str) -> bool {
294    // Boundary hierarchy: local_only > encrypted_only > any > "" (no restriction)
295    fn level(b: &str) -> u8 {
296        match b {
297            "local_only" => 3,
298            "encrypted_only" => 2,
299            "any" => 1,
300            _ => 0,
301        }
302    }
303    // A consumer can receive events from a source if the consumer's restriction
304    // level is >= the source's level (i.e., consumer is at least as restrictive).
305    // A "local_only" event can only go to "local_only" consumers.
306    // An unrestricted event can go anywhere.
307    level(consumer_boundary) >= level(source_boundary)
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use std::sync::Arc;
314
315    #[test]
316    fn topic_matching() {
317        assert!(topic_matches("task.image.*", "task.image.complete"));
318        assert!(topic_matches("task.image.*", "task.image.error"));
319        assert!(!topic_matches("task.image.*", "task.text.complete"));
320        assert!(topic_matches(
321            "channel.telegram.message",
322            "channel.telegram.message"
323        ));
324        assert!(!topic_matches(
325            "channel.telegram.message",
326            "channel.slack.message"
327        ));
328        assert!(topic_matches("*", "anything.at.all"));
329    }
330
331    #[test]
332    fn boundary_compatibility() {
333        // Unrestricted events go anywhere
334        assert!(is_boundary_compatible("", ""));
335        assert!(is_boundary_compatible("", "any"));
336        assert!(is_boundary_compatible("", "local_only"));
337
338        // "any" events go to "any" or more restrictive
339        assert!(is_boundary_compatible("any", "any"));
340        assert!(is_boundary_compatible("any", "encrypted_only"));
341        assert!(is_boundary_compatible("any", "local_only"));
342
343        // "local_only" events only go to "local_only"
344        assert!(is_boundary_compatible("local_only", "local_only"));
345        assert!(!is_boundary_compatible("local_only", "any"));
346        assert!(!is_boundary_compatible("local_only", "encrypted_only"));
347        assert!(!is_boundary_compatible("local_only", ""));
348
349        // "encrypted_only" events go to "encrypted_only" or "local_only"
350        assert!(is_boundary_compatible("encrypted_only", "encrypted_only"));
351        assert!(is_boundary_compatible("encrypted_only", "local_only"));
352        assert!(!is_boundary_compatible("encrypted_only", "any"));
353    }
354
355    #[test]
356    fn event_builder() {
357        let event = Event::new("task.test", "agent-1", r#"{"result":"ok"}"#)
358            .with_correlation("corr-123")
359            .with_boundary("local_only");
360
361        assert_eq!(event.topic, "task.test");
362        assert_eq!(event.source, "agent-1");
363        assert_eq!(event.correlation_id.as_deref(), Some("corr-123"));
364        assert_eq!(event.privacy_boundary, "local_only");
365        assert!(event.timestamp_ms > 0);
366        assert!(event.id.starts_with("evt-"));
367    }
368
369    #[tokio::test]
370    async fn in_memory_bus_publish_subscribe() {
371        let bus = InMemoryBus::new(16);
372        let mut sub = bus.subscribe();
373
374        bus.publish(Event::new("test.topic", "src", "hello"))
375            .await
376            .unwrap();
377
378        let event = sub.recv().await.unwrap();
379        assert_eq!(event.topic, "test.topic");
380        assert_eq!(event.payload, "hello");
381    }
382
383    #[tokio::test]
384    async fn in_memory_bus_multiple_subscribers() {
385        let bus = InMemoryBus::new(16);
386        let mut sub1 = bus.subscribe();
387        let mut sub2 = bus.subscribe();
388
389        assert_eq!(bus.subscriber_count(), 2);
390
391        bus.publish(Event::new("t", "s", "data")).await.unwrap();
392
393        let e1 = sub1.recv().await.unwrap();
394        let e2 = sub2.recv().await.unwrap();
395        assert_eq!(e1.payload, "data");
396        assert_eq!(e2.payload, "data");
397    }
398
399    #[tokio::test]
400    async fn filtered_recv() {
401        let bus = InMemoryBus::new(16);
402        let mut sub = bus.subscribe();
403
404        bus.publish(Event::new("task.text.complete", "a", "text"))
405            .await
406            .unwrap();
407        bus.publish(Event::new("task.image.complete", "b", "image"))
408            .await
409            .unwrap();
410
411        let event = sub.recv_filtered("task.image.").await.unwrap();
412        assert_eq!(event.payload, "image");
413    }
414
415    #[tokio::test]
416    async fn publish_with_no_subscribers_is_ok() {
417        let bus = InMemoryBus::new(16);
418        // No subscribers — should not error
419        bus.publish(Event::new("orphan", "system", ""))
420            .await
421            .unwrap();
422    }
423
424    #[tokio::test]
425    async fn bus_closed_returns_error() {
426        let bus = Arc::new(InMemoryBus::new(16));
427        let mut sub = bus.subscribe();
428
429        // Drop the bus (and its sender)
430        drop(bus);
431
432        let result = sub.recv().await;
433        assert!(result.is_err());
434    }
435
436    // --- FileBackedBus tests ---
437
438    fn temp_event_log(suffix: &str) -> PathBuf {
439        use std::sync::atomic::{AtomicU64, Ordering};
440        static SEQ: AtomicU64 = AtomicU64::new(0);
441        let ts = now_ms();
442        let seq = SEQ.fetch_add(1, Ordering::Relaxed);
443        std::env::temp_dir().join(format!(
444            "agentzero-core-events-{}-{ts}-{seq}-{suffix}.jsonl",
445            std::process::id()
446        ))
447    }
448
449    #[tokio::test]
450    async fn file_backed_publish_and_replay() {
451        let path = temp_event_log("roundtrip");
452        let bus = FileBackedBus::open(&path, 16).await.expect("open");
453
454        bus.publish(Event::new("task.research.raw", "researcher", "step-1"))
455            .await
456            .unwrap();
457        bus.publish(Event::new("task.research.raw", "researcher", "step-2"))
458            .await
459            .unwrap();
460        bus.publish(Event::new("task.alert", "system", "disk-low"))
461            .await
462            .unwrap();
463
464        let all = bus.replay(None).await.unwrap();
465        assert_eq!(all.len(), 3);
466
467        let research = bus.replay(Some("task.research.")).await.unwrap();
468        assert_eq!(research.len(), 2);
469        assert_eq!(research[0].payload, "step-1");
470        assert_eq!(research[1].payload, "step-2");
471
472        tokio::fs::remove_file(path).await.ok();
473    }
474
475    #[tokio::test]
476    async fn file_backed_live_subscribers() {
477        let path = temp_event_log("live");
478        let bus = FileBackedBus::open(&path, 16).await.expect("open");
479
480        let mut sub = bus.subscribe();
481        bus.publish(Event::new("t", "s", "hello")).await.unwrap();
482
483        let event = sub.recv().await.unwrap();
484        assert_eq!(event.payload, "hello");
485
486        tokio::fs::remove_file(path).await.ok();
487    }
488
489    #[tokio::test]
490    async fn file_backed_survives_reopen() {
491        let path = temp_event_log("reopen");
492
493        {
494            let bus = FileBackedBus::open(&path, 16).await.expect("open");
495            bus.publish(Event::new("pipeline.a", "agent", "first"))
496                .await
497                .unwrap();
498            bus.publish(Event::new("pipeline.a", "agent", "second"))
499                .await
500                .unwrap();
501        }
502
503        {
504            let bus = FileBackedBus::open(&path, 16).await.expect("reopen");
505            let events = bus.replay(Some("pipeline.")).await.unwrap();
506            assert_eq!(events.len(), 2);
507            assert_eq!(events[0].payload, "first");
508            assert_eq!(events[1].payload, "second");
509
510            // New events append.
511            bus.publish(Event::new("pipeline.a", "agent", "third"))
512                .await
513                .unwrap();
514            let events = bus.replay(None).await.unwrap();
515            assert_eq!(events.len(), 3);
516        }
517
518        tokio::fs::remove_file(path).await.ok();
519    }
520
521    #[tokio::test]
522    async fn file_backed_subscriber_count() {
523        let path = temp_event_log("subcount");
524        let bus = FileBackedBus::open(&path, 16).await.expect("open");
525
526        assert_eq!(bus.subscriber_count(), 0);
527        let _s1 = bus.subscribe();
528        assert_eq!(bus.subscriber_count(), 1);
529        let _s2 = bus.subscribe();
530        assert_eq!(bus.subscriber_count(), 2);
531
532        tokio::fs::remove_file(path).await.ok();
533    }
534}