1use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use std::path::{Path, PathBuf};
16use std::time::{SystemTime, UNIX_EPOCH};
17use tokio::sync::broadcast;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct Event {
22 pub id: String,
24 pub topic: String,
26 pub source: String,
28 pub payload: String,
30 pub privacy_boundary: String,
32 pub timestamp_ms: u64,
34 pub correlation_id: Option<String>,
37}
38
39impl Event {
40 pub fn new(
42 topic: impl Into<String>,
43 source: impl Into<String>,
44 payload: impl Into<String>,
45 ) -> Self {
46 Self {
47 id: new_event_id(),
48 topic: topic.into(),
49 source: source.into(),
50 payload: payload.into(),
51 privacy_boundary: String::new(),
52 timestamp_ms: now_ms(),
53 correlation_id: None,
54 }
55 }
56
57 pub fn with_correlation(mut self, id: impl Into<String>) -> Self {
59 self.correlation_id = Some(id.into());
60 self
61 }
62
63 pub fn with_boundary(mut self, boundary: impl Into<String>) -> Self {
65 self.privacy_boundary = boundary.into();
66 self
67 }
68}
69
70#[async_trait]
72pub trait EventBus: Send + Sync {
73 async fn publish(&self, event: Event) -> anyhow::Result<()>;
75
76 fn subscribe(&self) -> Box<dyn EventSubscriber>;
78
79 fn subscriber_count(&self) -> usize;
81}
82
83#[async_trait]
85pub trait EventSubscriber: Send {
86 async fn recv(&mut self) -> anyhow::Result<Event>;
88
89 async fn recv_filtered(&mut self, topic_prefix: &str) -> anyhow::Result<Event> {
92 loop {
93 let event = self.recv().await?;
94 if event.topic.starts_with(topic_prefix) {
95 return Ok(event);
96 }
97 }
98 }
99}
100
101pub struct InMemoryBus {
103 tx: broadcast::Sender<Event>,
104}
105
106impl InMemoryBus {
107 pub fn new(capacity: usize) -> Self {
110 let (tx, _) = broadcast::channel(capacity);
111 Self { tx }
112 }
113
114 pub fn default_capacity() -> Self {
116 Self::new(256)
117 }
118}
119
120#[async_trait]
121impl EventBus for InMemoryBus {
122 async fn publish(&self, event: Event) -> anyhow::Result<()> {
123 let _ = self.tx.send(event);
126 Ok(())
127 }
128
129 fn subscribe(&self) -> Box<dyn EventSubscriber> {
130 Box::new(InMemorySubscriber {
131 rx: self.tx.subscribe(),
132 })
133 }
134
135 fn subscriber_count(&self) -> usize {
136 self.tx.receiver_count()
137 }
138}
139
140pub struct InMemorySubscriber {
142 rx: broadcast::Receiver<Event>,
143}
144
145#[async_trait]
146impl EventSubscriber for InMemorySubscriber {
147 async fn recv(&mut self) -> anyhow::Result<Event> {
148 loop {
149 match self.rx.recv().await {
150 Ok(event) => return Ok(event),
151 Err(broadcast::error::RecvError::Lagged(n)) => {
152 tracing::warn!(skipped = n, "event bus subscriber lagged, skipping events");
153 }
155 Err(broadcast::error::RecvError::Closed) => {
156 anyhow::bail!("event bus closed");
157 }
158 }
159 }
160 }
161}
162
163pub struct FileBackedBus {
169 inner: InMemoryBus,
170 log_path: PathBuf,
171 writer: tokio::sync::Mutex<tokio::io::BufWriter<tokio::fs::File>>,
172}
173
174impl FileBackedBus {
175 pub async fn open(path: impl AsRef<Path>, capacity: usize) -> anyhow::Result<Self> {
177 let log_path = path.as_ref().to_path_buf();
178 if let Some(parent) = log_path.parent() {
179 if !parent.as_os_str().is_empty() {
180 tokio::fs::create_dir_all(parent).await?;
181 }
182 }
183
184 let file = tokio::fs::OpenOptions::new()
185 .create(true)
186 .append(true)
187 .open(&log_path)
188 .await?;
189
190 Ok(Self {
191 inner: InMemoryBus::new(capacity),
192 log_path,
193 writer: tokio::sync::Mutex::new(tokio::io::BufWriter::new(file)),
194 })
195 }
196
197 pub async fn replay(&self, topic_filter: Option<&str>) -> anyhow::Result<Vec<Event>> {
199 use tokio::io::AsyncBufReadExt;
200
201 let file = tokio::fs::File::open(&self.log_path).await?;
202 let reader = tokio::io::BufReader::new(file);
203 let mut lines = reader.lines();
204 let mut events = Vec::new();
205
206 while let Some(line) = lines.next_line().await? {
207 if line.trim().is_empty() {
208 continue;
209 }
210 match serde_json::from_str::<Event>(&line) {
211 Ok(evt) => {
212 if let Some(prefix) = topic_filter {
213 if !evt.topic.starts_with(prefix) {
214 continue;
215 }
216 }
217 events.push(evt);
218 }
219 Err(e) => {
220 tracing::warn!(error = %e, "skipping malformed event line in log");
221 }
222 }
223 }
224
225 Ok(events)
226 }
227
228 pub fn log_path(&self) -> &Path {
230 &self.log_path
231 }
232}
233
234#[async_trait]
235impl EventBus for FileBackedBus {
236 async fn publish(&self, event: Event) -> anyhow::Result<()> {
237 use tokio::io::AsyncWriteExt;
238
239 let mut line = serde_json::to_string(&event)?;
241 line.push('\n');
242
243 {
244 let mut w = self.writer.lock().await;
245 w.write_all(line.as_bytes()).await?;
246 w.flush().await?;
247 }
248
249 self.inner.publish(event).await
250 }
251
252 fn subscribe(&self) -> Box<dyn EventSubscriber> {
253 self.inner.subscribe()
254 }
255
256 fn subscriber_count(&self) -> usize {
257 self.inner.subscriber_count()
258 }
259}
260
261pub fn topic_matches(pattern: &str, topic: &str) -> bool {
263 if pattern == "*" {
264 return true;
265 }
266 if pattern.ends_with(".*") {
267 let prefix = &pattern[..pattern.len() - 1];
268 topic.starts_with(prefix)
269 } else {
270 pattern == topic
271 }
272}
273
274fn new_event_id() -> String {
276 use std::sync::atomic::{AtomicU64, Ordering};
277 static COUNTER: AtomicU64 = AtomicU64::new(0);
278 let ts = now_ms();
279 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
280 format!("evt-{ts}-{seq}")
281}
282
283fn now_ms() -> u64 {
284 SystemTime::now()
285 .duration_since(UNIX_EPOCH)
286 .unwrap_or_default()
287 .as_millis() as u64
288}
289
290pub fn is_boundary_compatible(source_boundary: &str, consumer_boundary: &str) -> bool {
294 fn level(b: &str) -> u8 {
296 match b {
297 "local_only" => 3,
298 "encrypted_only" => 2,
299 "any" => 1,
300 _ => 0,
301 }
302 }
303 level(consumer_boundary) >= level(source_boundary)
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use std::sync::Arc;
314
315 #[test]
316 fn topic_matching() {
317 assert!(topic_matches("task.image.*", "task.image.complete"));
318 assert!(topic_matches("task.image.*", "task.image.error"));
319 assert!(!topic_matches("task.image.*", "task.text.complete"));
320 assert!(topic_matches(
321 "channel.telegram.message",
322 "channel.telegram.message"
323 ));
324 assert!(!topic_matches(
325 "channel.telegram.message",
326 "channel.slack.message"
327 ));
328 assert!(topic_matches("*", "anything.at.all"));
329 }
330
331 #[test]
332 fn boundary_compatibility() {
333 assert!(is_boundary_compatible("", ""));
335 assert!(is_boundary_compatible("", "any"));
336 assert!(is_boundary_compatible("", "local_only"));
337
338 assert!(is_boundary_compatible("any", "any"));
340 assert!(is_boundary_compatible("any", "encrypted_only"));
341 assert!(is_boundary_compatible("any", "local_only"));
342
343 assert!(is_boundary_compatible("local_only", "local_only"));
345 assert!(!is_boundary_compatible("local_only", "any"));
346 assert!(!is_boundary_compatible("local_only", "encrypted_only"));
347 assert!(!is_boundary_compatible("local_only", ""));
348
349 assert!(is_boundary_compatible("encrypted_only", "encrypted_only"));
351 assert!(is_boundary_compatible("encrypted_only", "local_only"));
352 assert!(!is_boundary_compatible("encrypted_only", "any"));
353 }
354
355 #[test]
356 fn event_builder() {
357 let event = Event::new("task.test", "agent-1", r#"{"result":"ok"}"#)
358 .with_correlation("corr-123")
359 .with_boundary("local_only");
360
361 assert_eq!(event.topic, "task.test");
362 assert_eq!(event.source, "agent-1");
363 assert_eq!(event.correlation_id.as_deref(), Some("corr-123"));
364 assert_eq!(event.privacy_boundary, "local_only");
365 assert!(event.timestamp_ms > 0);
366 assert!(event.id.starts_with("evt-"));
367 }
368
369 #[tokio::test]
370 async fn in_memory_bus_publish_subscribe() {
371 let bus = InMemoryBus::new(16);
372 let mut sub = bus.subscribe();
373
374 bus.publish(Event::new("test.topic", "src", "hello"))
375 .await
376 .unwrap();
377
378 let event = sub.recv().await.unwrap();
379 assert_eq!(event.topic, "test.topic");
380 assert_eq!(event.payload, "hello");
381 }
382
383 #[tokio::test]
384 async fn in_memory_bus_multiple_subscribers() {
385 let bus = InMemoryBus::new(16);
386 let mut sub1 = bus.subscribe();
387 let mut sub2 = bus.subscribe();
388
389 assert_eq!(bus.subscriber_count(), 2);
390
391 bus.publish(Event::new("t", "s", "data")).await.unwrap();
392
393 let e1 = sub1.recv().await.unwrap();
394 let e2 = sub2.recv().await.unwrap();
395 assert_eq!(e1.payload, "data");
396 assert_eq!(e2.payload, "data");
397 }
398
399 #[tokio::test]
400 async fn filtered_recv() {
401 let bus = InMemoryBus::new(16);
402 let mut sub = bus.subscribe();
403
404 bus.publish(Event::new("task.text.complete", "a", "text"))
405 .await
406 .unwrap();
407 bus.publish(Event::new("task.image.complete", "b", "image"))
408 .await
409 .unwrap();
410
411 let event = sub.recv_filtered("task.image.").await.unwrap();
412 assert_eq!(event.payload, "image");
413 }
414
415 #[tokio::test]
416 async fn publish_with_no_subscribers_is_ok() {
417 let bus = InMemoryBus::new(16);
418 bus.publish(Event::new("orphan", "system", ""))
420 .await
421 .unwrap();
422 }
423
424 #[tokio::test]
425 async fn bus_closed_returns_error() {
426 let bus = Arc::new(InMemoryBus::new(16));
427 let mut sub = bus.subscribe();
428
429 drop(bus);
431
432 let result = sub.recv().await;
433 assert!(result.is_err());
434 }
435
436 fn temp_event_log(suffix: &str) -> PathBuf {
439 use std::sync::atomic::{AtomicU64, Ordering};
440 static SEQ: AtomicU64 = AtomicU64::new(0);
441 let ts = now_ms();
442 let seq = SEQ.fetch_add(1, Ordering::Relaxed);
443 std::env::temp_dir().join(format!(
444 "agentzero-core-events-{}-{ts}-{seq}-{suffix}.jsonl",
445 std::process::id()
446 ))
447 }
448
449 #[tokio::test]
450 async fn file_backed_publish_and_replay() {
451 let path = temp_event_log("roundtrip");
452 let bus = FileBackedBus::open(&path, 16).await.expect("open");
453
454 bus.publish(Event::new("task.research.raw", "researcher", "step-1"))
455 .await
456 .unwrap();
457 bus.publish(Event::new("task.research.raw", "researcher", "step-2"))
458 .await
459 .unwrap();
460 bus.publish(Event::new("task.alert", "system", "disk-low"))
461 .await
462 .unwrap();
463
464 let all = bus.replay(None).await.unwrap();
465 assert_eq!(all.len(), 3);
466
467 let research = bus.replay(Some("task.research.")).await.unwrap();
468 assert_eq!(research.len(), 2);
469 assert_eq!(research[0].payload, "step-1");
470 assert_eq!(research[1].payload, "step-2");
471
472 tokio::fs::remove_file(path).await.ok();
473 }
474
475 #[tokio::test]
476 async fn file_backed_live_subscribers() {
477 let path = temp_event_log("live");
478 let bus = FileBackedBus::open(&path, 16).await.expect("open");
479
480 let mut sub = bus.subscribe();
481 bus.publish(Event::new("t", "s", "hello")).await.unwrap();
482
483 let event = sub.recv().await.unwrap();
484 assert_eq!(event.payload, "hello");
485
486 tokio::fs::remove_file(path).await.ok();
487 }
488
489 #[tokio::test]
490 async fn file_backed_survives_reopen() {
491 let path = temp_event_log("reopen");
492
493 {
494 let bus = FileBackedBus::open(&path, 16).await.expect("open");
495 bus.publish(Event::new("pipeline.a", "agent", "first"))
496 .await
497 .unwrap();
498 bus.publish(Event::new("pipeline.a", "agent", "second"))
499 .await
500 .unwrap();
501 }
502
503 {
504 let bus = FileBackedBus::open(&path, 16).await.expect("reopen");
505 let events = bus.replay(Some("pipeline.")).await.unwrap();
506 assert_eq!(events.len(), 2);
507 assert_eq!(events[0].payload, "first");
508 assert_eq!(events[1].payload, "second");
509
510 bus.publish(Event::new("pipeline.a", "agent", "third"))
512 .await
513 .unwrap();
514 let events = bus.replay(None).await.unwrap();
515 assert_eq!(events.len(), 3);
516 }
517
518 tokio::fs::remove_file(path).await.ok();
519 }
520
521 #[tokio::test]
522 async fn file_backed_subscriber_count() {
523 let path = temp_event_log("subcount");
524 let bus = FileBackedBus::open(&path, 16).await.expect("open");
525
526 assert_eq!(bus.subscriber_count(), 0);
527 let _s1 = bus.subscribe();
528 assert_eq!(bus.subscriber_count(), 1);
529 let _s2 = bus.subscribe();
530 assert_eq!(bus.subscriber_count(), 2);
531
532 tokio::fs::remove_file(path).await.ok();
533 }
534}