1use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use std::path::{Path, PathBuf};
16use std::time::{SystemTime, UNIX_EPOCH};
17use tokio::sync::broadcast;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct Event {
22 pub id: String,
24 pub topic: String,
26 pub source: String,
28 pub payload: String,
30 pub privacy_boundary: String,
32 pub timestamp_ms: u64,
34 pub correlation_id: Option<String>,
37}
38
39impl Event {
40 pub fn new(
42 topic: impl Into<String>,
43 source: impl Into<String>,
44 payload: impl Into<String>,
45 ) -> Self {
46 Self {
47 id: new_event_id(),
48 topic: topic.into(),
49 source: source.into(),
50 payload: payload.into(),
51 privacy_boundary: String::new(),
52 timestamp_ms: now_ms(),
53 correlation_id: None,
54 }
55 }
56
57 pub fn with_correlation(mut self, id: impl Into<String>) -> Self {
59 self.correlation_id = Some(id.into());
60 self
61 }
62
63 pub fn with_boundary(mut self, boundary: impl Into<String>) -> Self {
65 self.privacy_boundary = boundary.into();
66 self
67 }
68}
69
70#[async_trait]
72pub trait EventBus: Send + Sync {
73 async fn publish(&self, event: Event) -> anyhow::Result<()>;
75
76 fn subscribe(&self) -> Box<dyn EventSubscriber>;
78
79 fn subscriber_count(&self) -> usize;
81
82 async fn replay_since(
86 &self,
87 _topic: Option<&str>,
88 _since_id: Option<&str>,
89 ) -> anyhow::Result<Vec<Event>> {
90 Ok(Vec::new())
91 }
92
93 async fn gc_older_than(&self, _max_age: std::time::Duration) -> anyhow::Result<usize> {
96 Ok(0)
97 }
98}
99
100#[async_trait]
102pub trait EventSubscriber: Send {
103 async fn recv(&mut self) -> anyhow::Result<Event>;
105
106 async fn recv_filtered(&mut self, topic_prefix: &str) -> anyhow::Result<Event> {
109 loop {
110 let event = self.recv().await?;
111 if event.topic.starts_with(topic_prefix) {
112 return Ok(event);
113 }
114 }
115 }
116}
117
118pub struct InMemoryBus {
120 tx: broadcast::Sender<Event>,
121}
122
123impl InMemoryBus {
124 pub fn new(capacity: usize) -> Self {
127 let (tx, _) = broadcast::channel(capacity);
128 Self { tx }
129 }
130
131 pub fn default_capacity() -> Self {
133 Self::new(256)
134 }
135}
136
137#[async_trait]
138impl EventBus for InMemoryBus {
139 async fn publish(&self, event: Event) -> anyhow::Result<()> {
140 let _ = self.tx.send(event);
143 Ok(())
144 }
145
146 fn subscribe(&self) -> Box<dyn EventSubscriber> {
147 Box::new(InMemorySubscriber {
148 rx: self.tx.subscribe(),
149 })
150 }
151
152 fn subscriber_count(&self) -> usize {
153 self.tx.receiver_count()
154 }
155}
156
157pub struct InMemorySubscriber {
159 rx: broadcast::Receiver<Event>,
160}
161
162#[async_trait]
163impl EventSubscriber for InMemorySubscriber {
164 async fn recv(&mut self) -> anyhow::Result<Event> {
165 loop {
166 match self.rx.recv().await {
167 Ok(event) => return Ok(event),
168 Err(broadcast::error::RecvError::Lagged(n)) => {
169 tracing::warn!(skipped = n, "event bus subscriber lagged, skipping events");
170 }
172 Err(broadcast::error::RecvError::Closed) => {
173 anyhow::bail!("event bus closed");
174 }
175 }
176 }
177 }
178}
179
180pub struct FileBackedBus {
186 inner: InMemoryBus,
187 log_path: PathBuf,
188 writer: tokio::sync::Mutex<tokio::io::BufWriter<tokio::fs::File>>,
189}
190
191impl FileBackedBus {
192 pub async fn open(path: impl AsRef<Path>, capacity: usize) -> anyhow::Result<Self> {
194 let log_path = path.as_ref().to_path_buf();
195 if let Some(parent) = log_path.parent() {
196 if !parent.as_os_str().is_empty() {
197 tokio::fs::create_dir_all(parent).await?;
198 }
199 }
200
201 let file = tokio::fs::OpenOptions::new()
202 .create(true)
203 .append(true)
204 .open(&log_path)
205 .await?;
206
207 Ok(Self {
208 inner: InMemoryBus::new(capacity),
209 log_path,
210 writer: tokio::sync::Mutex::new(tokio::io::BufWriter::new(file)),
211 })
212 }
213
214 pub async fn replay(&self, topic_filter: Option<&str>) -> anyhow::Result<Vec<Event>> {
216 use tokio::io::AsyncBufReadExt;
217
218 let file = tokio::fs::File::open(&self.log_path).await?;
219 let reader = tokio::io::BufReader::new(file);
220 let mut lines = reader.lines();
221 let mut events = Vec::new();
222
223 while let Some(line) = lines.next_line().await? {
224 if line.trim().is_empty() {
225 continue;
226 }
227 match serde_json::from_str::<Event>(&line) {
228 Ok(evt) => {
229 if let Some(prefix) = topic_filter {
230 if !evt.topic.starts_with(prefix) {
231 continue;
232 }
233 }
234 events.push(evt);
235 }
236 Err(e) => {
237 tracing::warn!(error = %e, "skipping malformed event line in log");
238 }
239 }
240 }
241
242 Ok(events)
243 }
244
245 pub fn log_path(&self) -> &Path {
247 &self.log_path
248 }
249}
250
251#[async_trait]
252impl EventBus for FileBackedBus {
253 async fn publish(&self, event: Event) -> anyhow::Result<()> {
254 use tokio::io::AsyncWriteExt;
255
256 let mut line = serde_json::to_string(&event)?;
258 line.push('\n');
259
260 {
261 let mut w = self.writer.lock().await;
262 w.write_all(line.as_bytes()).await?;
263 w.flush().await?;
264 }
265
266 self.inner.publish(event).await
267 }
268
269 fn subscribe(&self) -> Box<dyn EventSubscriber> {
270 self.inner.subscribe()
271 }
272
273 fn subscriber_count(&self) -> usize {
274 self.inner.subscriber_count()
275 }
276
277 async fn replay_since(
278 &self,
279 topic: Option<&str>,
280 since_id: Option<&str>,
281 ) -> anyhow::Result<Vec<Event>> {
282 let all = self.replay(topic).await?;
283 if let Some(sid) = since_id {
284 if let Some(pos) = all.iter().position(|e| e.id == sid) {
286 Ok(all[pos + 1..].to_vec())
287 } else {
288 Ok(all)
289 }
290 } else {
291 Ok(all)
292 }
293 }
294}
295
296pub fn topic_matches(pattern: &str, topic: &str) -> bool {
298 if pattern == "*" {
299 return true;
300 }
301 if pattern.ends_with(".*") {
302 let prefix = &pattern[..pattern.len() - 1];
303 topic.starts_with(prefix)
304 } else {
305 pattern == topic
306 }
307}
308
309fn new_event_id() -> String {
311 use std::sync::atomic::{AtomicU64, Ordering};
312 static COUNTER: AtomicU64 = AtomicU64::new(0);
313 let ts = now_ms();
314 let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
315 format!("evt-{ts}-{seq}")
316}
317
318fn now_ms() -> u64 {
319 now_ms_public()
320}
321
322pub fn now_ms_public() -> u64 {
324 SystemTime::now()
325 .duration_since(UNIX_EPOCH)
326 .unwrap_or_default()
327 .as_millis() as u64
328}
329
330pub fn is_boundary_compatible(source_boundary: &str, consumer_boundary: &str) -> bool {
334 fn level(b: &str) -> u8 {
336 match b {
337 "local_only" => 3,
338 "encrypted_only" => 2,
339 "any" => 1,
340 _ => 0,
341 }
342 }
343 level(consumer_boundary) >= level(source_boundary)
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353 use std::sync::Arc;
354
355 #[test]
356 fn topic_matching() {
357 assert!(topic_matches("task.image.*", "task.image.complete"));
358 assert!(topic_matches("task.image.*", "task.image.error"));
359 assert!(!topic_matches("task.image.*", "task.text.complete"));
360 assert!(topic_matches(
361 "channel.telegram.message",
362 "channel.telegram.message"
363 ));
364 assert!(!topic_matches(
365 "channel.telegram.message",
366 "channel.slack.message"
367 ));
368 assert!(topic_matches("*", "anything.at.all"));
369 }
370
371 #[test]
372 fn boundary_compatibility() {
373 assert!(is_boundary_compatible("", ""));
375 assert!(is_boundary_compatible("", "any"));
376 assert!(is_boundary_compatible("", "local_only"));
377
378 assert!(is_boundary_compatible("any", "any"));
380 assert!(is_boundary_compatible("any", "encrypted_only"));
381 assert!(is_boundary_compatible("any", "local_only"));
382
383 assert!(is_boundary_compatible("local_only", "local_only"));
385 assert!(!is_boundary_compatible("local_only", "any"));
386 assert!(!is_boundary_compatible("local_only", "encrypted_only"));
387 assert!(!is_boundary_compatible("local_only", ""));
388
389 assert!(is_boundary_compatible("encrypted_only", "encrypted_only"));
391 assert!(is_boundary_compatible("encrypted_only", "local_only"));
392 assert!(!is_boundary_compatible("encrypted_only", "any"));
393 }
394
395 #[test]
396 fn event_builder() {
397 let event = Event::new("task.test", "agent-1", r#"{"result":"ok"}"#)
398 .with_correlation("corr-123")
399 .with_boundary("local_only");
400
401 assert_eq!(event.topic, "task.test");
402 assert_eq!(event.source, "agent-1");
403 assert_eq!(event.correlation_id.as_deref(), Some("corr-123"));
404 assert_eq!(event.privacy_boundary, "local_only");
405 assert!(event.timestamp_ms > 0);
406 assert!(event.id.starts_with("evt-"));
407 }
408
409 #[tokio::test]
410 async fn in_memory_bus_publish_subscribe() {
411 let bus = InMemoryBus::new(16);
412 let mut sub = bus.subscribe();
413
414 bus.publish(Event::new("test.topic", "src", "hello"))
415 .await
416 .unwrap();
417
418 let event = sub.recv().await.unwrap();
419 assert_eq!(event.topic, "test.topic");
420 assert_eq!(event.payload, "hello");
421 }
422
423 #[tokio::test]
424 async fn in_memory_bus_multiple_subscribers() {
425 let bus = InMemoryBus::new(16);
426 let mut sub1 = bus.subscribe();
427 let mut sub2 = bus.subscribe();
428
429 assert_eq!(bus.subscriber_count(), 2);
430
431 bus.publish(Event::new("t", "s", "data")).await.unwrap();
432
433 let e1 = sub1.recv().await.unwrap();
434 let e2 = sub2.recv().await.unwrap();
435 assert_eq!(e1.payload, "data");
436 assert_eq!(e2.payload, "data");
437 }
438
439 #[tokio::test]
440 async fn filtered_recv() {
441 let bus = InMemoryBus::new(16);
442 let mut sub = bus.subscribe();
443
444 bus.publish(Event::new("task.text.complete", "a", "text"))
445 .await
446 .unwrap();
447 bus.publish(Event::new("task.image.complete", "b", "image"))
448 .await
449 .unwrap();
450
451 let event = sub.recv_filtered("task.image.").await.unwrap();
452 assert_eq!(event.payload, "image");
453 }
454
455 #[tokio::test]
456 async fn publish_with_no_subscribers_is_ok() {
457 let bus = InMemoryBus::new(16);
458 bus.publish(Event::new("orphan", "system", ""))
460 .await
461 .unwrap();
462 }
463
464 #[tokio::test]
465 async fn bus_closed_returns_error() {
466 let bus = Arc::new(InMemoryBus::new(16));
467 let mut sub = bus.subscribe();
468
469 drop(bus);
471
472 let result = sub.recv().await;
473 assert!(result.is_err());
474 }
475
476 fn temp_event_log(suffix: &str) -> PathBuf {
479 use std::sync::atomic::{AtomicU64, Ordering};
480 static SEQ: AtomicU64 = AtomicU64::new(0);
481 let ts = now_ms();
482 let seq = SEQ.fetch_add(1, Ordering::Relaxed);
483 std::env::temp_dir().join(format!(
484 "agentzero-core-events-{}-{ts}-{seq}-{suffix}.jsonl",
485 std::process::id()
486 ))
487 }
488
489 #[tokio::test]
490 async fn file_backed_publish_and_replay() {
491 let path = temp_event_log("roundtrip");
492 let bus = FileBackedBus::open(&path, 16).await.expect("open");
493
494 bus.publish(Event::new("task.research.raw", "researcher", "step-1"))
495 .await
496 .unwrap();
497 bus.publish(Event::new("task.research.raw", "researcher", "step-2"))
498 .await
499 .unwrap();
500 bus.publish(Event::new("task.alert", "system", "disk-low"))
501 .await
502 .unwrap();
503
504 let all = bus.replay(None).await.unwrap();
505 assert_eq!(all.len(), 3);
506
507 let research = bus.replay(Some("task.research.")).await.unwrap();
508 assert_eq!(research.len(), 2);
509 assert_eq!(research[0].payload, "step-1");
510 assert_eq!(research[1].payload, "step-2");
511
512 tokio::fs::remove_file(path).await.ok();
513 }
514
515 #[tokio::test]
516 async fn file_backed_live_subscribers() {
517 let path = temp_event_log("live");
518 let bus = FileBackedBus::open(&path, 16).await.expect("open");
519
520 let mut sub = bus.subscribe();
521 bus.publish(Event::new("t", "s", "hello")).await.unwrap();
522
523 let event = sub.recv().await.unwrap();
524 assert_eq!(event.payload, "hello");
525
526 tokio::fs::remove_file(path).await.ok();
527 }
528
529 #[tokio::test]
530 async fn file_backed_survives_reopen() {
531 let path = temp_event_log("reopen");
532
533 {
534 let bus = FileBackedBus::open(&path, 16).await.expect("open");
535 bus.publish(Event::new("pipeline.a", "agent", "first"))
536 .await
537 .unwrap();
538 bus.publish(Event::new("pipeline.a", "agent", "second"))
539 .await
540 .unwrap();
541 }
542
543 {
544 let bus = FileBackedBus::open(&path, 16).await.expect("reopen");
545 let events = bus.replay(Some("pipeline.")).await.unwrap();
546 assert_eq!(events.len(), 2);
547 assert_eq!(events[0].payload, "first");
548 assert_eq!(events[1].payload, "second");
549
550 bus.publish(Event::new("pipeline.a", "agent", "third"))
552 .await
553 .unwrap();
554 let events = bus.replay(None).await.unwrap();
555 assert_eq!(events.len(), 3);
556 }
557
558 tokio::fs::remove_file(path).await.ok();
559 }
560
561 #[tokio::test]
562 async fn file_backed_subscriber_count() {
563 let path = temp_event_log("subcount");
564 let bus = FileBackedBus::open(&path, 16).await.expect("open");
565
566 assert_eq!(bus.subscriber_count(), 0);
567 let _s1 = bus.subscribe();
568 assert_eq!(bus.subscriber_count(), 1);
569 let _s2 = bus.subscribe();
570 assert_eq!(bus.subscriber_count(), 2);
571
572 tokio::fs::remove_file(path).await.ok();
573 }
574}