Skip to main content

defect_agent/session/
events.rs

1//! Event publishing: mpsc bounded + fan-out.
2//!
3//! ## Shape
4//!
5//! The main loop only calls [`EventEmitter::emit`]; subscribers get an independent mpsc
6//! receiver via [`EventEmitter::subscribe`]. `emit` sends to all receivers serially.
7//! **Slow consumers block `emit`** (backpressure) — this is the desired "no event loss"
8//! behavior.
9//!
10//! ## Why not broadcast
11//!
12//! [`tokio::sync::broadcast`] returns `Lagged` and skips events when receivers fall
13//! behind, violating the "no drop" invariant of [`AgentEvent`].
14
15use std::sync::Mutex;
16
17use futures::StreamExt;
18use futures::stream::BoxStream;
19use tokio::sync::mpsc;
20use tokio_stream::wrappers::ReceiverStream;
21
22use crate::event::AgentEvent;
23
24/// Default mpsc capacity. The main loop blocks on the 257th event when subscribers fall
25/// behind.
26const DEFAULT_CHANNEL_CAPACITY: usize = 256;
27
28/// A single subscriber's sender handle. It is wrapped in `Mutex` only because
29/// [`EventEmitter::emit`] is `&self` + `async` — either `DashMap` or `RwLock` would work;
30/// `std::Mutex` is chosen here because we only briefly hold the lock during `emit` to
31/// snapshot the list, and `send` happens outside the lock.
32type SubscriberHandle = mpsc::Sender<AgentEvent>;
33
34/// Event bus. One instance per session.
35pub struct EventEmitter {
36    capacity: usize,
37    /// Subscribers currently registered. Dropping a receiver will be cleaned up
38    /// automatically on the next `emit`.
39    senders: Mutex<Vec<SubscriberHandle>>,
40}
41
42impl EventEmitter {
43    pub fn new() -> Self {
44        Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
45    }
46
47    pub fn with_capacity(capacity: usize) -> Self {
48        Self {
49            capacity,
50            senders: Mutex::new(Vec::new()),
51        }
52    }
53
54    /// Subscribes a new listener. The returned stream ends naturally when [`Self`] is
55    /// dropped.
56    pub fn subscribe(&self) -> BoxStream<'static, AgentEvent> {
57        let (tx, rx) = mpsc::channel(self.capacity);
58        self.senders
59            .lock()
60            .expect("EventEmitter senders mutex poisoned")
61            .push(tx);
62        ReceiverStream::new(rx).boxed()
63    }
64
65    /// Delivers the event to every subscriber.
66    ///
67    /// Awaits each sender serially. If a subscriber's channel is full, this call blocks
68    /// until the subscriber consumes — this is intentional backpressure.
69    pub async fn emit(&self, event: AgentEvent) {
70        // Take a snapshot to avoid holding the lock across an await point.
71        let snapshot: Vec<SubscriberHandle> = {
72            let guard = self
73                .senders
74                .lock()
75                .expect("EventEmitter senders mutex poisoned");
76            guard.clone()
77        };
78
79        let mut dead_indices: Vec<usize> = Vec::new();
80        for (idx, tx) in snapshot.iter().enumerate() {
81            if tx.send(event.clone()).await.is_err() {
82                dead_indices.push(idx);
83            }
84        }
85
86        if !dead_indices.is_empty() {
87            self.prune(&snapshot, &dead_indices);
88        }
89    }
90
91    /// Remove senders whose receivers have been dropped.
92    fn prune(&self, snapshot: &[SubscriberHandle], dead_indices: &[usize]) {
93        let mut guard = self
94            .senders
95            .lock()
96            .expect("EventEmitter senders mutex poisoned");
97        // The snapshot and `*guard` may have different lengths due to concurrent
98        // `subscribe` calls; we compare by pointer equality to avoid removing the wrong
99        // sender.
100        guard.retain(|tx| {
101            !dead_indices.iter().any(|&i| {
102                snapshot
103                    .get(i)
104                    .map(|dead| dead.same_channel(tx))
105                    .unwrap_or(false)
106            })
107        });
108    }
109}
110
111impl Default for EventEmitter {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117#[cfg(test)]
118mod tests;