Skip to main content

agentzero_core/
event_bus.rs

1//! Event bus for inter-agent communication.
2//!
3//! The bus is the central nervous system for all messages in AgentZero.
4//! Agents, channels, and system components publish and subscribe to events
5//! on the bus. The `EventBus` trait abstracts over transport so a future
6//! multi-node implementation (e.g. iroh QUIC) can be swapped in without
7//! changing any agent code.
8//!
9//! `FileBackedBus` wraps the in-memory broadcast with append-only JSONL
10//! persistence so events survive process restarts (useful for research
11//! pipelines and audit trails).
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use std::path::{Path, PathBuf};
16use std::time::{SystemTime, UNIX_EPOCH};
17use tokio::sync::broadcast;
18
19/// A message on the bus. Agents produce and consume these.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct Event {
22    /// Unique event identifier.
23    pub id: String,
24    /// Topic string for pub/sub routing (e.g. "task.image.complete").
25    pub topic: String,
26    /// Who published this event (agent_id, channel name, or "system").
27    pub source: String,
28    /// Event payload — typically JSON, but can be any string.
29    pub payload: String,
30    /// Privacy boundary inherited from the source (e.g. "local_only", "any").
31    pub privacy_boundary: String,
32    /// Unix timestamp in milliseconds.
33    pub timestamp_ms: u64,
34    /// Traces a chain of events back to the original trigger.
35    /// All events in an agent chain share the same correlation_id.
36    pub correlation_id: Option<String>,
37}
38
39impl Event {
40    /// Create a new event with a generated id and current timestamp.
41    pub fn new(
42        topic: impl Into<String>,
43        source: impl Into<String>,
44        payload: impl Into<String>,
45    ) -> Self {
46        Self {
47            id: new_event_id(),
48            topic: topic.into(),
49            source: source.into(),
50            payload: payload.into(),
51            privacy_boundary: String::new(),
52            timestamp_ms: now_ms(),
53            correlation_id: None,
54        }
55    }
56
57    /// Set the correlation id (builder pattern).
58    pub fn with_correlation(mut self, id: impl Into<String>) -> Self {
59        self.correlation_id = Some(id.into());
60        self
61    }
62
63    /// Set the privacy boundary (builder pattern).
64    pub fn with_boundary(mut self, boundary: impl Into<String>) -> Self {
65        self.privacy_boundary = boundary.into();
66        self
67    }
68}
69
70/// Trait for the event bus — abstracts over in-memory vs distributed transports.
71#[async_trait]
72pub trait EventBus: Send + Sync {
73    /// Publish an event to all subscribers.
74    async fn publish(&self, event: Event) -> anyhow::Result<()>;
75
76    /// Create a new subscriber that receives all future events.
77    fn subscribe(&self) -> Box<dyn EventSubscriber>;
78
79    /// Number of active subscribers.
80    fn subscriber_count(&self) -> usize;
81
82    /// Replay events since a given event ID (exclusive). Returns events in order.
83    /// If `since_id` is `None`, replays all events. If the bus does not support
84    /// persistence, returns an empty vec.
85    async fn replay_since(
86        &self,
87        _topic: Option<&str>,
88        _since_id: Option<&str>,
89    ) -> anyhow::Result<Vec<Event>> {
90        Ok(Vec::new())
91    }
92
93    /// Delete events older than the given age. Returns count of deleted events.
94    /// No-op for buses that don't support persistence.
95    async fn gc_older_than(&self, _max_age: std::time::Duration) -> anyhow::Result<usize> {
96        Ok(0)
97    }
98}
99
100/// Subscriber that can filter and receive events.
101#[async_trait]
102pub trait EventSubscriber: Send {
103    /// Receive the next event.
104    async fn recv(&mut self) -> anyhow::Result<Event>;
105
106    /// Receive the next event whose topic starts with the given prefix.
107    /// Events that don't match are silently skipped.
108    async fn recv_filtered(&mut self, topic_prefix: &str) -> anyhow::Result<Event> {
109        loop {
110            let event = self.recv().await?;
111            if event.topic.starts_with(topic_prefix) {
112                return Ok(event);
113            }
114        }
115    }
116}
117
118/// In-memory bus backed by `tokio::sync::broadcast`.
119pub struct InMemoryBus {
120    tx: broadcast::Sender<Event>,
121}
122
123impl InMemoryBus {
124    /// Create a new bus with the given channel capacity.
125    /// Lagged receivers will skip missed events (lossy).
126    pub fn new(capacity: usize) -> Self {
127        let (tx, _) = broadcast::channel(capacity);
128        Self { tx }
129    }
130
131    /// Default capacity suitable for most single-process deployments.
132    pub fn default_capacity() -> Self {
133        Self::new(256)
134    }
135}
136
137#[async_trait]
138impl EventBus for InMemoryBus {
139    async fn publish(&self, event: Event) -> anyhow::Result<()> {
140        // send returns Err if there are no receivers — that's fine,
141        // the event is simply dropped.
142        let _ = self.tx.send(event);
143        Ok(())
144    }
145
146    fn subscribe(&self) -> Box<dyn EventSubscriber> {
147        Box::new(InMemorySubscriber {
148            rx: self.tx.subscribe(),
149        })
150    }
151
152    fn subscriber_count(&self) -> usize {
153        self.tx.receiver_count()
154    }
155}
156
157/// Subscriber for the in-memory bus.
158pub struct InMemorySubscriber {
159    rx: broadcast::Receiver<Event>,
160}
161
162#[async_trait]
163impl EventSubscriber for InMemorySubscriber {
164    async fn recv(&mut self) -> anyhow::Result<Event> {
165        loop {
166            match self.rx.recv().await {
167                Ok(event) => return Ok(event),
168                Err(broadcast::error::RecvError::Lagged(n)) => {
169                    tracing::warn!(skipped = n, "event bus subscriber lagged, skipping events");
170                    // Continue to receive the next available event.
171                }
172                Err(broadcast::error::RecvError::Closed) => {
173                    anyhow::bail!("event bus closed");
174                }
175            }
176        }
177    }
178}
179
180/// File-backed event bus: wraps `InMemoryBus` with append-only JSONL persistence.
181///
182/// Every published event is serialized to a JSONL log file before being
183/// broadcast to live subscribers. Use [`FileBackedBus::replay`] to read
184/// back all persisted events (e.g. after a restart).
185pub struct FileBackedBus {
186    inner: InMemoryBus,
187    log_path: PathBuf,
188    writer: tokio::sync::Mutex<tokio::io::BufWriter<tokio::fs::File>>,
189}
190
191impl FileBackedBus {
192    /// Open (or create) a JSONL event log at `path` with the given broadcast capacity.
193    pub async fn open(path: impl AsRef<Path>, capacity: usize) -> anyhow::Result<Self> {
194        let log_path = path.as_ref().to_path_buf();
195        if let Some(parent) = log_path.parent() {
196            if !parent.as_os_str().is_empty() {
197                tokio::fs::create_dir_all(parent).await?;
198            }
199        }
200
201        let file = tokio::fs::OpenOptions::new()
202            .create(true)
203            .append(true)
204            .open(&log_path)
205            .await?;
206
207        Ok(Self {
208            inner: InMemoryBus::new(capacity),
209            log_path,
210            writer: tokio::sync::Mutex::new(tokio::io::BufWriter::new(file)),
211        })
212    }
213
214    /// Replay all persisted events, optionally filtered by topic prefix.
215    pub async fn replay(&self, topic_filter: Option<&str>) -> anyhow::Result<Vec<Event>> {
216        use tokio::io::AsyncBufReadExt;
217
218        let file = tokio::fs::File::open(&self.log_path).await?;
219        let reader = tokio::io::BufReader::new(file);
220        let mut lines = reader.lines();
221        let mut events = Vec::new();
222
223        while let Some(line) = lines.next_line().await? {
224            if line.trim().is_empty() {
225                continue;
226            }
227            match serde_json::from_str::<Event>(&line) {
228                Ok(evt) => {
229                    if let Some(prefix) = topic_filter {
230                        if !evt.topic.starts_with(prefix) {
231                            continue;
232                        }
233                    }
234                    events.push(evt);
235                }
236                Err(e) => {
237                    tracing::warn!(error = %e, "skipping malformed event line in log");
238                }
239            }
240        }
241
242        Ok(events)
243    }
244
245    /// Return the path to the JSONL log file.
246    pub fn log_path(&self) -> &Path {
247        &self.log_path
248    }
249}
250
251#[async_trait]
252impl EventBus for FileBackedBus {
253    async fn publish(&self, event: Event) -> anyhow::Result<()> {
254        use tokio::io::AsyncWriteExt;
255
256        // Persist first, then broadcast.
257        let mut line = serde_json::to_string(&event)?;
258        line.push('\n');
259
260        {
261            let mut w = self.writer.lock().await;
262            w.write_all(line.as_bytes()).await?;
263            w.flush().await?;
264        }
265
266        self.inner.publish(event).await
267    }
268
269    fn subscribe(&self) -> Box<dyn EventSubscriber> {
270        self.inner.subscribe()
271    }
272
273    fn subscriber_count(&self) -> usize {
274        self.inner.subscriber_count()
275    }
276
277    async fn replay_since(
278        &self,
279        topic: Option<&str>,
280        since_id: Option<&str>,
281    ) -> anyhow::Result<Vec<Event>> {
282        let all = self.replay(topic).await?;
283        if let Some(sid) = since_id {
284            // Find the position after the given event ID.
285            if let Some(pos) = all.iter().position(|e| e.id == sid) {
286                Ok(all[pos + 1..].to_vec())
287            } else {
288                Ok(all)
289            }
290        } else {
291            Ok(all)
292        }
293    }
294}
295
296/// Glob-style topic matching: "task.image.*" matches "task.image.complete".
297pub fn topic_matches(pattern: &str, topic: &str) -> bool {
298    if pattern == "*" {
299        return true;
300    }
301    if pattern.ends_with(".*") {
302        let prefix = &pattern[..pattern.len() - 1];
303        topic.starts_with(prefix)
304    } else {
305        pattern == topic
306    }
307}
308
309/// Generate a simple unique event ID (timestamp + random suffix).
310fn new_event_id() -> String {
311    use std::sync::atomic::{AtomicU64, Ordering};
312    static COUNTER: AtomicU64 = AtomicU64::new(0);
313    let ts = now_ms();
314    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
315    format!("evt-{ts}-{seq}")
316}
317
318fn now_ms() -> u64 {
319    now_ms_public()
320}
321
322/// Public helper for current time in milliseconds (used by storage crate for GC).
323pub fn now_ms_public() -> u64 {
324    SystemTime::now()
325        .duration_since(UNIX_EPOCH)
326        .unwrap_or_default()
327        .as_millis() as u64
328}
329
330/// Helper to check if two privacy boundaries are compatible.
331/// An event can flow from `source_boundary` to a consumer with `consumer_boundary`
332/// only if the consumer's boundary is at least as restrictive.
333pub fn is_boundary_compatible(source_boundary: &str, consumer_boundary: &str) -> bool {
334    // Boundary hierarchy: local_only > encrypted_only > any > "" (no restriction)
335    fn level(b: &str) -> u8 {
336        match b {
337            "local_only" => 3,
338            "encrypted_only" => 2,
339            "any" => 1,
340            _ => 0,
341        }
342    }
343    // A consumer can receive events from a source if the consumer's restriction
344    // level is >= the source's level (i.e., consumer is at least as restrictive).
345    // A "local_only" event can only go to "local_only" consumers.
346    // An unrestricted event can go anywhere.
347    level(consumer_boundary) >= level(source_boundary)
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353    use std::sync::Arc;
354
355    #[test]
356    fn topic_matching() {
357        assert!(topic_matches("task.image.*", "task.image.complete"));
358        assert!(topic_matches("task.image.*", "task.image.error"));
359        assert!(!topic_matches("task.image.*", "task.text.complete"));
360        assert!(topic_matches(
361            "channel.telegram.message",
362            "channel.telegram.message"
363        ));
364        assert!(!topic_matches(
365            "channel.telegram.message",
366            "channel.slack.message"
367        ));
368        assert!(topic_matches("*", "anything.at.all"));
369    }
370
371    #[test]
372    fn boundary_compatibility() {
373        // Unrestricted events go anywhere
374        assert!(is_boundary_compatible("", ""));
375        assert!(is_boundary_compatible("", "any"));
376        assert!(is_boundary_compatible("", "local_only"));
377
378        // "any" events go to "any" or more restrictive
379        assert!(is_boundary_compatible("any", "any"));
380        assert!(is_boundary_compatible("any", "encrypted_only"));
381        assert!(is_boundary_compatible("any", "local_only"));
382
383        // "local_only" events only go to "local_only"
384        assert!(is_boundary_compatible("local_only", "local_only"));
385        assert!(!is_boundary_compatible("local_only", "any"));
386        assert!(!is_boundary_compatible("local_only", "encrypted_only"));
387        assert!(!is_boundary_compatible("local_only", ""));
388
389        // "encrypted_only" events go to "encrypted_only" or "local_only"
390        assert!(is_boundary_compatible("encrypted_only", "encrypted_only"));
391        assert!(is_boundary_compatible("encrypted_only", "local_only"));
392        assert!(!is_boundary_compatible("encrypted_only", "any"));
393    }
394
395    #[test]
396    fn event_builder() {
397        let event = Event::new("task.test", "agent-1", r#"{"result":"ok"}"#)
398            .with_correlation("corr-123")
399            .with_boundary("local_only");
400
401        assert_eq!(event.topic, "task.test");
402        assert_eq!(event.source, "agent-1");
403        assert_eq!(event.correlation_id.as_deref(), Some("corr-123"));
404        assert_eq!(event.privacy_boundary, "local_only");
405        assert!(event.timestamp_ms > 0);
406        assert!(event.id.starts_with("evt-"));
407    }
408
409    #[tokio::test]
410    async fn in_memory_bus_publish_subscribe() {
411        let bus = InMemoryBus::new(16);
412        let mut sub = bus.subscribe();
413
414        bus.publish(Event::new("test.topic", "src", "hello"))
415            .await
416            .unwrap();
417
418        let event = sub.recv().await.unwrap();
419        assert_eq!(event.topic, "test.topic");
420        assert_eq!(event.payload, "hello");
421    }
422
423    #[tokio::test]
424    async fn in_memory_bus_multiple_subscribers() {
425        let bus = InMemoryBus::new(16);
426        let mut sub1 = bus.subscribe();
427        let mut sub2 = bus.subscribe();
428
429        assert_eq!(bus.subscriber_count(), 2);
430
431        bus.publish(Event::new("t", "s", "data")).await.unwrap();
432
433        let e1 = sub1.recv().await.unwrap();
434        let e2 = sub2.recv().await.unwrap();
435        assert_eq!(e1.payload, "data");
436        assert_eq!(e2.payload, "data");
437    }
438
439    #[tokio::test]
440    async fn filtered_recv() {
441        let bus = InMemoryBus::new(16);
442        let mut sub = bus.subscribe();
443
444        bus.publish(Event::new("task.text.complete", "a", "text"))
445            .await
446            .unwrap();
447        bus.publish(Event::new("task.image.complete", "b", "image"))
448            .await
449            .unwrap();
450
451        let event = sub.recv_filtered("task.image.").await.unwrap();
452        assert_eq!(event.payload, "image");
453    }
454
455    #[tokio::test]
456    async fn publish_with_no_subscribers_is_ok() {
457        let bus = InMemoryBus::new(16);
458        // No subscribers — should not error
459        bus.publish(Event::new("orphan", "system", ""))
460            .await
461            .unwrap();
462    }
463
464    #[tokio::test]
465    async fn bus_closed_returns_error() {
466        let bus = Arc::new(InMemoryBus::new(16));
467        let mut sub = bus.subscribe();
468
469        // Drop the bus (and its sender)
470        drop(bus);
471
472        let result = sub.recv().await;
473        assert!(result.is_err());
474    }
475
476    // --- FileBackedBus tests ---
477
478    fn temp_event_log(suffix: &str) -> PathBuf {
479        use std::sync::atomic::{AtomicU64, Ordering};
480        static SEQ: AtomicU64 = AtomicU64::new(0);
481        let ts = now_ms();
482        let seq = SEQ.fetch_add(1, Ordering::Relaxed);
483        std::env::temp_dir().join(format!(
484            "agentzero-core-events-{}-{ts}-{seq}-{suffix}.jsonl",
485            std::process::id()
486        ))
487    }
488
489    #[tokio::test]
490    async fn file_backed_publish_and_replay() {
491        let path = temp_event_log("roundtrip");
492        let bus = FileBackedBus::open(&path, 16).await.expect("open");
493
494        bus.publish(Event::new("task.research.raw", "researcher", "step-1"))
495            .await
496            .unwrap();
497        bus.publish(Event::new("task.research.raw", "researcher", "step-2"))
498            .await
499            .unwrap();
500        bus.publish(Event::new("task.alert", "system", "disk-low"))
501            .await
502            .unwrap();
503
504        let all = bus.replay(None).await.unwrap();
505        assert_eq!(all.len(), 3);
506
507        let research = bus.replay(Some("task.research.")).await.unwrap();
508        assert_eq!(research.len(), 2);
509        assert_eq!(research[0].payload, "step-1");
510        assert_eq!(research[1].payload, "step-2");
511
512        tokio::fs::remove_file(path).await.ok();
513    }
514
515    #[tokio::test]
516    async fn file_backed_live_subscribers() {
517        let path = temp_event_log("live");
518        let bus = FileBackedBus::open(&path, 16).await.expect("open");
519
520        let mut sub = bus.subscribe();
521        bus.publish(Event::new("t", "s", "hello")).await.unwrap();
522
523        let event = sub.recv().await.unwrap();
524        assert_eq!(event.payload, "hello");
525
526        tokio::fs::remove_file(path).await.ok();
527    }
528
529    #[tokio::test]
530    async fn file_backed_survives_reopen() {
531        let path = temp_event_log("reopen");
532
533        {
534            let bus = FileBackedBus::open(&path, 16).await.expect("open");
535            bus.publish(Event::new("pipeline.a", "agent", "first"))
536                .await
537                .unwrap();
538            bus.publish(Event::new("pipeline.a", "agent", "second"))
539                .await
540                .unwrap();
541        }
542
543        {
544            let bus = FileBackedBus::open(&path, 16).await.expect("reopen");
545            let events = bus.replay(Some("pipeline.")).await.unwrap();
546            assert_eq!(events.len(), 2);
547            assert_eq!(events[0].payload, "first");
548            assert_eq!(events[1].payload, "second");
549
550            // New events append.
551            bus.publish(Event::new("pipeline.a", "agent", "third"))
552                .await
553                .unwrap();
554            let events = bus.replay(None).await.unwrap();
555            assert_eq!(events.len(), 3);
556        }
557
558        tokio::fs::remove_file(path).await.ok();
559    }
560
561    #[tokio::test]
562    async fn file_backed_subscriber_count() {
563        let path = temp_event_log("subcount");
564        let bus = FileBackedBus::open(&path, 16).await.expect("open");
565
566        assert_eq!(bus.subscriber_count(), 0);
567        let _s1 = bus.subscribe();
568        assert_eq!(bus.subscriber_count(), 1);
569        let _s2 = bus.subscribe();
570        assert_eq!(bus.subscriber_count(), 2);
571
572        tokio::fs::remove_file(path).await.ok();
573    }
574}