Skip to main content

swink_agent/
agent_subscriptions.rs

1//! Subscription management for agent event listeners.
2//!
3//! [`ListenerRegistry`] owns the map of callbacks and dispatches [`AgentEvent`]s
4//! to them, catching panics so a single misbehaving subscriber cannot crash the
5//! agent.
6
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10use tracing::warn;
11
12use crate::loop_::AgentEvent;
13
14// ─── SubscriptionId ──────────────────────────────────────────────────────────
15
16/// Unique identifier for an event subscription.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct SubscriptionId(u64);
19
20impl SubscriptionId {
21    /// Allocate the next unique id.
22    pub fn next() -> Self {
23        static COUNTER: AtomicU64 = AtomicU64::new(1);
24        Self(COUNTER.fetch_add(1, Ordering::Relaxed))
25    }
26}
27
28// ─── ListenerFn ──────────────────────────────────────────────────────────────
29
30/// Type alias for a boxed event listener callback.
31pub type ListenerFn = Box<dyn Fn(&AgentEvent) + Send + Sync>;
32
33// ─── ListenerRegistry ────────────────────────────────────────────────────────
34
35/// Owns event listener callbacks and dispatches events to them.
36///
37/// Panicking listeners are automatically removed so a single bad subscriber
38/// cannot crash the agent.
39pub struct ListenerRegistry {
40    listeners: HashMap<SubscriptionId, ListenerFn>,
41}
42
43impl ListenerRegistry {
44    /// Create an empty registry.
45    pub fn new() -> Self {
46        Self {
47            listeners: HashMap::new(),
48        }
49    }
50
51    /// Register a callback and return its [`SubscriptionId`].
52    pub fn subscribe(
53        &mut self,
54        callback: impl Fn(&AgentEvent) + Send + Sync + 'static,
55    ) -> SubscriptionId {
56        let id = SubscriptionId::next();
57        self.listeners.insert(id, Box::new(callback));
58        id
59    }
60
61    /// Remove a subscription. Returns `true` if it existed.
62    pub fn unsubscribe(&mut self, id: SubscriptionId) -> bool {
63        self.listeners.remove(&id).is_some()
64    }
65
66    /// Dispatch an event to all listeners, catching panics.
67    ///
68    /// Any listener that panics is automatically removed to prevent future
69    /// disruption.
70    pub fn dispatch(&mut self, event: &AgentEvent) {
71        let mut panicked = Vec::new();
72        for (id, listener) in &self.listeners {
73            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| listener(event)));
74            if let Err(e) = result {
75                eprintln!("listener panic: {e:?}");
76                panicked.push(*id);
77            }
78        }
79        for id in panicked {
80            self.listeners.remove(&id);
81            warn!("removed panicking listener {id:?}");
82        }
83    }
84
85    /// Number of currently registered listeners.
86    pub fn len(&self) -> usize {
87        self.listeners.len()
88    }
89}