swink_agent/
agent_subscriptions.rs1use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10use tracing::warn;
11
12use crate::loop_::AgentEvent;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct SubscriptionId(u64);
19
20impl SubscriptionId {
21 pub fn next() -> Self {
23 static COUNTER: AtomicU64 = AtomicU64::new(1);
24 Self(COUNTER.fetch_add(1, Ordering::Relaxed))
25 }
26}
27
28pub type ListenerFn = Box<dyn Fn(&AgentEvent) + Send + Sync>;
32
33pub struct ListenerRegistry {
40 listeners: HashMap<SubscriptionId, ListenerFn>,
41}
42
43impl ListenerRegistry {
44 pub fn new() -> Self {
46 Self {
47 listeners: HashMap::new(),
48 }
49 }
50
51 pub fn subscribe(
53 &mut self,
54 callback: impl Fn(&AgentEvent) + Send + Sync + 'static,
55 ) -> SubscriptionId {
56 let id = SubscriptionId::next();
57 self.listeners.insert(id, Box::new(callback));
58 id
59 }
60
61 pub fn unsubscribe(&mut self, id: SubscriptionId) -> bool {
63 self.listeners.remove(&id).is_some()
64 }
65
66 pub fn dispatch(&mut self, event: &AgentEvent) {
71 let mut panicked = Vec::new();
72 for (id, listener) in &self.listeners {
73 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| listener(event)));
74 if let Err(e) = result {
75 eprintln!("listener panic: {e:?}");
76 panicked.push(*id);
77 }
78 }
79 for id in panicked {
80 self.listeners.remove(&id);
81 warn!("removed panicking listener {id:?}");
82 }
83 }
84
85 pub fn len(&self) -> usize {
87 self.listeners.len()
88 }
89}