defect_agent/session/events.rs
1//! Event publishing: mpsc bounded + fan-out.
2//!
3//! ## Shape
4//!
5//! The main loop only calls [`EventEmitter::emit`]; subscribers get an independent mpsc
6//! receiver via [`EventEmitter::subscribe`]. `emit` sends to all receivers serially.
7//! **Slow consumers block `emit`** (backpressure) — this is the desired "no event loss"
8//! behavior.
9//!
10//! ## Why not broadcast
11//!
12//! [`tokio::sync::broadcast`] returns `Lagged` and skips events when receivers fall
13//! behind, violating the "no drop" invariant of [`AgentEvent`].
14
15use std::sync::Mutex;
16
17use futures::StreamExt;
18use futures::stream::BoxStream;
19use tokio::sync::mpsc;
20use tokio_stream::wrappers::ReceiverStream;
21
22use crate::event::AgentEvent;
23
24/// Default mpsc capacity. The main loop blocks on the 257th event when subscribers fall
25/// behind.
26const DEFAULT_CHANNEL_CAPACITY: usize = 256;
27
28/// A single subscriber's sender handle. It is wrapped in `Mutex` only because
29/// [`EventEmitter::emit`] is `&self` + `async` — either `DashMap` or `RwLock` would work;
30/// `std::Mutex` is chosen here because we only briefly hold the lock during `emit` to
31/// snapshot the list, and `send` happens outside the lock.
32type SubscriberHandle = mpsc::Sender<AgentEvent>;
33
34/// Event bus. One instance per session.
35pub struct EventEmitter {
36 capacity: usize,
37 /// Subscribers currently registered. Dropping a receiver will be cleaned up
38 /// automatically on the next `emit`.
39 senders: Mutex<Vec<SubscriberHandle>>,
40}
41
42impl EventEmitter {
43 pub fn new() -> Self {
44 Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
45 }
46
47 pub fn with_capacity(capacity: usize) -> Self {
48 Self {
49 capacity,
50 senders: Mutex::new(Vec::new()),
51 }
52 }
53
54 /// Subscribes a new listener. The returned stream ends naturally when [`Self`] is
55 /// dropped.
56 pub fn subscribe(&self) -> BoxStream<'static, AgentEvent> {
57 let (tx, rx) = mpsc::channel(self.capacity);
58 self.senders
59 .lock()
60 .expect("EventEmitter senders mutex poisoned")
61 .push(tx);
62 ReceiverStream::new(rx).boxed()
63 }
64
65 /// Delivers the event to every subscriber.
66 ///
67 /// Awaits each sender serially. If a subscriber's channel is full, this call blocks
68 /// until the subscriber consumes — this is intentional backpressure.
69 pub async fn emit(&self, event: AgentEvent) {
70 // Take a snapshot to avoid holding the lock across an await point.
71 let snapshot: Vec<SubscriberHandle> = {
72 let guard = self
73 .senders
74 .lock()
75 .expect("EventEmitter senders mutex poisoned");
76 guard.clone()
77 };
78
79 let mut dead_indices: Vec<usize> = Vec::new();
80 for (idx, tx) in snapshot.iter().enumerate() {
81 if tx.send(event.clone()).await.is_err() {
82 dead_indices.push(idx);
83 }
84 }
85
86 if !dead_indices.is_empty() {
87 self.prune(&snapshot, &dead_indices);
88 }
89 }
90
91 /// Remove senders whose receivers have been dropped.
92 fn prune(&self, snapshot: &[SubscriberHandle], dead_indices: &[usize]) {
93 let mut guard = self
94 .senders
95 .lock()
96 .expect("EventEmitter senders mutex poisoned");
97 // The snapshot and `*guard` may have different lengths due to concurrent
98 // `subscribe` calls; we compare by pointer equality to avoid removing the wrong
99 // sender.
100 guard.retain(|tx| {
101 !dead_indices.iter().any(|&i| {
102 snapshot
103 .get(i)
104 .map(|dead| dead.same_channel(tx))
105 .unwrap_or(false)
106 })
107 });
108 }
109}
110
111impl Default for EventEmitter {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117#[cfg(test)]
118mod tests;