use std::sync::Mutex;
use futures::StreamExt;
use futures::stream::BoxStream;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use crate::event::AgentEvent;
const DEFAULT_CHANNEL_CAPACITY: usize = 256;
type SubscriberHandle = mpsc::Sender<AgentEvent>;
pub struct EventEmitter {
capacity: usize,
senders: Mutex<Vec<SubscriberHandle>>,
}
impl EventEmitter {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
capacity,
senders: Mutex::new(Vec::new()),
}
}
pub fn subscribe(&self) -> BoxStream<'static, AgentEvent> {
let (tx, rx) = mpsc::channel(self.capacity);
self.senders
.lock()
.expect("EventEmitter senders mutex poisoned")
.push(tx);
ReceiverStream::new(rx).boxed()
}
pub async fn emit(&self, event: AgentEvent) {
let snapshot: Vec<SubscriberHandle> = {
let guard = self
.senders
.lock()
.expect("EventEmitter senders mutex poisoned");
guard.clone()
};
let mut dead_indices: Vec<usize> = Vec::new();
for (idx, tx) in snapshot.iter().enumerate() {
if tx.send(event.clone()).await.is_err() {
dead_indices.push(idx);
}
}
if !dead_indices.is_empty() {
self.prune(&snapshot, &dead_indices);
}
}
fn prune(&self, snapshot: &[SubscriberHandle], dead_indices: &[usize]) {
let mut guard = self
.senders
.lock()
.expect("EventEmitter senders mutex poisoned");
guard.retain(|tx| {
!dead_indices.iter().any(|&i| {
snapshot
.get(i)
.map(|dead| dead.same_channel(tx))
.unwrap_or(false)
})
});
}
}
impl Default for EventEmitter {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests;