Skip to main content

brainos_observe/
observer.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use thiserror::Error;
5use tokio::sync::broadcast;
6
7use crate::event::BrainEvent;
8
9/// Bus capacity. Slow consumers see `Lagged(n)` rather than block the pipeline.
10pub const DEFAULT_BROADCAST_CAPACITY: usize = 4096;
11
12#[derive(Debug, Error)]
13pub enum ObserveError {
14    #[error("bus closed: no remaining subscribers")]
15    BusClosed,
16}
17
18/// Single ingestion point for every consequential event in the system.
19///
20/// Implementations MUST be cheap to clone (via `Arc`) and MUST not block in
21/// `publish` — slow subscribers should be dropped, not back-pressure the pipeline.
22#[async_trait]
23pub trait Observer: Send + Sync {
24    /// Publish an event to all current subscribers. Errors are surfaced for
25    /// liveness checks (e.g. "is anything listening?") but publishers SHOULD
26    /// treat them as informational, not fatal.
27    async fn publish(&self, ev: BrainEvent) -> Result<(), ObserveError>;
28
29    /// Subscribe to all future events. Returns a `broadcast::Receiver` with
30    /// lag-drop semantics — slow readers see `Err(RecvError::Lagged(n))`.
31    fn subscribe(&self) -> broadcast::Receiver<BrainEvent>;
32}
33
34/// Default in-process implementation backed by `tokio::sync::broadcast`.
35pub struct BroadcastObserver {
36    tx: broadcast::Sender<BrainEvent>,
37}
38
39impl BroadcastObserver {
40    pub fn new() -> Arc<Self> {
41        Self::with_capacity(DEFAULT_BROADCAST_CAPACITY)
42    }
43
44    pub fn with_capacity(capacity: usize) -> Arc<Self> {
45        let (tx, _) = broadcast::channel(capacity);
46        Arc::new(Self { tx })
47    }
48
49    pub fn receiver_count(&self) -> usize {
50        self.tx.receiver_count()
51    }
52}
53
54#[async_trait]
55impl Observer for BroadcastObserver {
56    async fn publish(&self, ev: BrainEvent) -> Result<(), ObserveError> {
57        match self.tx.send(ev) {
58            Ok(_n) => Ok(()),
59            Err(_) => Err(ObserveError::BusClosed),
60        }
61    }
62
63    fn subscribe(&self) -> broadcast::Receiver<BrainEvent> {
64        self.tx.subscribe()
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::*;
71    use crate::event::BrainEvent;
72    use chrono::Utc;
73    use tokio::sync::broadcast::error::RecvError;
74    use uuid::Uuid;
75
76    fn err_event(msg: &str) -> BrainEvent {
77        BrainEvent::Error {
78            id: Uuid::new_v4(),
79            source: "test".into(),
80            message: msg.into(),
81            ts: Utc::now(),
82        }
83    }
84
85    #[tokio::test]
86    async fn publish_with_no_subscribers_returns_bus_closed() {
87        let obs = BroadcastObserver::new();
88        let res = obs.publish(err_event("noone")).await;
89        assert!(matches!(res, Err(ObserveError::BusClosed)));
90    }
91
92    #[tokio::test]
93    async fn publish_reaches_subscriber() {
94        let obs = BroadcastObserver::new();
95        let mut rx = obs.subscribe();
96        obs.publish(err_event("hi")).await.unwrap();
97
98        let got = rx.recv().await.unwrap();
99        assert_eq!(got.kind(), "error");
100    }
101
102    #[tokio::test]
103    async fn slow_subscriber_sees_lagged_not_block() {
104        let obs = BroadcastObserver::with_capacity(4);
105        let mut rx = obs.subscribe();
106
107        // Overrun the buffer without consuming.
108        for i in 0..16 {
109            obs.publish(err_event(&format!("burst-{i}"))).await.unwrap();
110        }
111
112        // First recv should report lag, then deliver subsequent events normally.
113        match rx.recv().await {
114            Err(RecvError::Lagged(n)) => assert!(n > 0),
115            other => panic!("expected Lagged, got {other:?}"),
116        }
117        // Drain remainder without panicking.
118        while let Ok(ev) = rx.try_recv() {
119            assert_eq!(ev.kind(), "error");
120        }
121    }
122}