brainos_observe/
observer.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use thiserror::Error;
5use tokio::sync::broadcast;
6
7use crate::event::BrainEvent;
8
9pub const DEFAULT_BROADCAST_CAPACITY: usize = 4096;
11
12#[derive(Debug, Error)]
13pub enum ObserveError {
14 #[error("bus closed: no remaining subscribers")]
15 BusClosed,
16}
17
18#[async_trait]
23pub trait Observer: Send + Sync {
24 async fn publish(&self, ev: BrainEvent) -> Result<(), ObserveError>;
28
29 fn subscribe(&self) -> broadcast::Receiver<BrainEvent>;
32}
33
34pub struct BroadcastObserver {
36 tx: broadcast::Sender<BrainEvent>,
37}
38
39impl BroadcastObserver {
40 pub fn new() -> Arc<Self> {
41 Self::with_capacity(DEFAULT_BROADCAST_CAPACITY)
42 }
43
44 pub fn with_capacity(capacity: usize) -> Arc<Self> {
45 let (tx, _) = broadcast::channel(capacity);
46 Arc::new(Self { tx })
47 }
48
49 pub fn receiver_count(&self) -> usize {
50 self.tx.receiver_count()
51 }
52}
53
54#[async_trait]
55impl Observer for BroadcastObserver {
56 async fn publish(&self, ev: BrainEvent) -> Result<(), ObserveError> {
57 match self.tx.send(ev) {
58 Ok(_n) => Ok(()),
59 Err(_) => Err(ObserveError::BusClosed),
60 }
61 }
62
63 fn subscribe(&self) -> broadcast::Receiver<BrainEvent> {
64 self.tx.subscribe()
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use super::*;
71 use crate::event::BrainEvent;
72 use chrono::Utc;
73 use tokio::sync::broadcast::error::RecvError;
74 use uuid::Uuid;
75
76 fn err_event(msg: &str) -> BrainEvent {
77 BrainEvent::Error {
78 id: Uuid::new_v4(),
79 source: "test".into(),
80 message: msg.into(),
81 ts: Utc::now(),
82 }
83 }
84
85 #[tokio::test]
86 async fn publish_with_no_subscribers_returns_bus_closed() {
87 let obs = BroadcastObserver::new();
88 let res = obs.publish(err_event("noone")).await;
89 assert!(matches!(res, Err(ObserveError::BusClosed)));
90 }
91
92 #[tokio::test]
93 async fn publish_reaches_subscriber() {
94 let obs = BroadcastObserver::new();
95 let mut rx = obs.subscribe();
96 obs.publish(err_event("hi")).await.unwrap();
97
98 let got = rx.recv().await.unwrap();
99 assert_eq!(got.kind(), "error");
100 }
101
102 #[tokio::test]
103 async fn slow_subscriber_sees_lagged_not_block() {
104 let obs = BroadcastObserver::with_capacity(4);
105 let mut rx = obs.subscribe();
106
107 for i in 0..16 {
109 obs.publish(err_event(&format!("burst-{i}"))).await.unwrap();
110 }
111
112 match rx.recv().await {
114 Err(RecvError::Lagged(n)) => assert!(n > 0),
115 other => panic!("expected Lagged, got {other:?}"),
116 }
117 while let Ok(ev) = rx.try_recv() {
119 assert_eq!(ev.kind(), "error");
120 }
121 }
122}