Skip to main content

fluers_runtime/
event.rs

1//! The event stream.
2//!
3//! Mirrors Flue's `observe` / `FlueEventSubscriber` and `event-stream-store`.
4//! Observers subscribe to a stream of [`Event`]s (= [`fluers_core::RunEvent`])
5//! emitted as a session runs.
6//!
7//! The [`Event`] type and the [`EventSink`](fluers_core::EventSink) trait live
8//! in `fluers-core` so that `run_agent` (which lives in core) can emit events
9//! without creating a `core → runtime` dependency cycle. [`EventBus`] implements
10//! that trait here.
11
12use fluers_core::{EventSink, RunEvent};
13use tokio::sync::broadcast;
14
15/// Re-export of the core run-lifecycle event type.
16///
17/// Kept as a type alias for backward compatibility with code that imports
18/// `fluers_runtime::Event`.
19pub type Event = RunEvent;
20
21/// A fan-out event bus backed by a bounded broadcast channel.
22///
23/// Each subscriber receives an independent [`broadcast::Receiver`] to drain on
24/// its own task. Sending never blocks and no lock is held while receivers handle
25/// events. If a receiver falls behind its bounded buffer, Tokio's broadcast
26/// channel reports a lag error to that receiver on its next receive.
27#[derive(Clone)]
28pub struct EventBus {
29    sender: broadcast::Sender<RunEvent>,
30}
31
32impl EventBus {
33    /// The default event buffer capacity.
34    pub const DEFAULT_CAPACITY: usize = 256;
35
36    /// Create a bus with the given bounded buffer capacity.
37    ///
38    /// Tokio broadcast channels require a non-zero capacity, so a capacity of
39    /// zero is treated as one.
40    #[must_use]
41    pub fn new(capacity: usize) -> Self {
42        let capacity = capacity.max(1);
43        let (sender, _) = broadcast::channel(capacity);
44        Self { sender }
45    }
46
47    /// Create a bus with the default buffer capacity.
48    #[must_use]
49    pub fn new_default() -> Self {
50        Self::new(Self::DEFAULT_CAPACITY)
51    }
52
53    /// Subscribe to future events.
54    ///
55    /// The caller owns the returned receiver and should drain it, typically on
56    /// a dedicated task.
57    #[must_use]
58    pub fn subscribe(&self) -> broadcast::Receiver<RunEvent> {
59        self.sender.subscribe()
60    }
61
62    /// Emit an event to all current subscribers.
63    ///
64    /// Returns `false` when there are no active receivers. Sending is
65    /// non-blocking; slow receivers observe [`broadcast::error::RecvError::Lagged`]
66    /// when they next receive.
67    pub fn emit(&self, event: RunEvent) -> bool {
68        self.sender.send(event).is_ok()
69    }
70}
71
72/// `EventBus` satisfies the core [`EventSink`] trait so it can be passed to
73/// [`run_agent`](fluers_core::run_agent) via [`RunHooks`](fluers_core::RunHooks).
74impl EventSink for EventBus {
75    fn emit(&self, event: RunEvent) {
76        // Ignore the "no receivers" return — the agent loop doesn't care.
77        let _ = EventBus::emit(self, event);
78    }
79}
80
81impl Default for EventBus {
82    fn default() -> Self {
83        Self::new_default()
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use std::time::Duration;
90
91    use tokio::time::timeout;
92
93    use super::EventBus;
94    use fluers_core::{EventSink, RunEvent};
95    use uuid::Uuid;
96
97    const TEST_TIMEOUT: Duration = Duration::from_secs(2);
98
99    type TestResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
100
101    #[tokio::test]
102    async fn emit_delivers_to_subscriber() -> TestResult {
103        let bus = EventBus::new(16);
104        let session = Uuid::nil();
105        let mut receiver = bus.subscribe();
106
107        let handle = tokio::spawn(async move { receiver.recv().await.ok() });
108
109        assert!(bus.emit(RunEvent::SessionStarted { session }));
110
111        let received = timeout(TEST_TIMEOUT, handle).await??;
112        assert!(matches!(
113            received,
114            Some(RunEvent::SessionStarted { session: s }) if s == session
115        ));
116
117        Ok(())
118    }
119
120    #[tokio::test]
121    async fn event_sink_trait_delegates_to_emit() -> TestResult {
122        let bus = EventBus::new(16);
123        let session = Uuid::nil();
124        let mut receiver = bus.subscribe();
125
126        // Via the EventSink trait (not the inherent method).
127        EventSink::emit(&bus, RunEvent::SessionStarted { session });
128
129        let received = timeout(TEST_TIMEOUT, receiver.recv()).await?;
130        assert!(matches!(
131            received,
132            Ok(RunEvent::SessionStarted { session: s }) if s == session
133        ));
134        Ok(())
135    }
136
137    #[tokio::test]
138    async fn emit_with_no_receivers_returns_false() {
139        let bus = EventBus::new(16);
140        let session = Uuid::nil();
141
142        assert!(!bus.emit(RunEvent::SessionStarted { session }));
143    }
144
145    #[tokio::test]
146    async fn multiple_subscribers_each_receive() -> TestResult {
147        let bus = EventBus::new(16);
148        let session = Uuid::nil();
149        let mut first = bus.subscribe();
150        let mut second = bus.subscribe();
151
152        assert!(bus.emit(RunEvent::TurnStarted { session, turn: 1 }));
153
154        let first_event = timeout(TEST_TIMEOUT, first.recv()).await?;
155        let second_event = timeout(TEST_TIMEOUT, second.recv()).await?;
156
157        assert!(matches!(
158            first_event,
159            Ok(RunEvent::TurnStarted { session: s, turn: 1 }) if s == session
160        ));
161        assert!(matches!(
162            second_event,
163            Ok(RunEvent::TurnStarted { session: s, turn: 1 }) if s == session
164        ));
165
166        Ok(())
167    }
168}