Skip to main content

a2ui_base/observable/
event_stream.rs

1//! Simple pub/sub event stream for A2UI state models.
2//!
3//! Supports multiple listeners and provides disposable subscriptions.
4
5use std::sync::Arc;
6
7/// A subscription handle — unsubscribes on drop.
8pub struct EventSubscription {
9    slot: usize,
10    drop_fn: Box<dyn Fn(usize) + Send + Sync>,
11}
12
13impl Drop for EventSubscription {
14    fn drop(&mut self) {
15        (self.drop_fn)(self.slot);
16    }
17}
18
19type Listener<T> = Box<dyn Fn(&T) + Send + Sync>;
20
21/// A simple multi-cast event stream.
22///
23/// Listeners are called synchronously when `emit()` is invoked.
24/// Clone-safe — cloning shares the underlying listener list.
25pub struct EventStream<T: 'static> {
26    listeners: Arc<std::sync::Mutex<Vec<Option<Listener<T>>>>>,
27    next_id: Arc<std::sync::Mutex<usize>>,
28}
29
30impl<T: 'static> Default for EventStream<T> {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl<T: 'static> EventStream<T> {
37    pub fn new() -> Self {
38        Self {
39            listeners: Arc::new(std::sync::Mutex::new(Vec::new())),
40            next_id: Arc::new(std::sync::Mutex::new(0)),
41        }
42    }
43
44    /// Subscribe to events. Returns an `EventSubscription` that unsubscribes on drop.
45    pub fn on<F>(&self, listener: F) -> EventSubscription
46    where
47        F: Fn(&T) + Send + Sync + 'static,
48    {
49        let id = {
50            let mut next = self.next_id.lock().unwrap();
51            let id = *next;
52            *next += 1;
53            id
54        };
55
56        {
57            let mut guard = self.listeners.lock().unwrap();
58            if id >= guard.len() {
59                guard.resize_with(id + 1, || None);
60            }
61            guard[id] = Some(Box::new(listener));
62        }
63
64        let listeners = Arc::clone(&self.listeners);
65        EventSubscription {
66            slot: id,
67            drop_fn: Box::new(move |slot: usize| {
68                let mut guard = listeners.lock().unwrap();
69                if slot < guard.len() {
70                    guard[slot] = None;
71                }
72            }),
73        }
74    }
75
76    /// Emit an event to all active listeners.
77    pub fn emit(&self, event: &T) {
78        let guard = self.listeners.lock().unwrap();
79        for listener in guard.iter().flatten() {
80            listener(event);
81        }
82    }
83
84    /// Returns the number of active listeners.
85    #[allow(dead_code)]
86    pub fn listener_count(&self) -> usize {
87        self.listeners.lock().unwrap().iter().flatten().count()
88    }
89}
90
91impl<T: 'static> Clone for EventStream<T> {
92    fn clone(&self) -> Self {
93        Self {
94            listeners: Arc::clone(&self.listeners),
95            next_id: Arc::clone(&self.next_id),
96        }
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use std::sync::atomic::{AtomicUsize, Ordering};
104
105    #[test]
106    fn test_subscribe_and_emit() {
107        let stream: EventStream<i32> = EventStream::new();
108        let count = Arc::new(AtomicUsize::new(0));
109        let c = Arc::clone(&count);
110
111        let _sub = stream.on(move |val: &i32| {
112            if *val == 42 {
113                c.fetch_add(1, Ordering::SeqCst);
114            }
115        });
116
117        stream.emit(&42);
118        stream.emit(&10);
119        stream.emit(&42);
120        assert_eq!(count.load(Ordering::SeqCst), 2);
121    }
122
123    #[test]
124    fn test_unsubscribe_on_drop() {
125        let stream: EventStream<i32> = EventStream::new();
126        let count = Arc::new(AtomicUsize::new(0));
127
128        {
129            let c = Arc::clone(&count);
130            let sub = stream.on(move |_: &i32| {
131                c.fetch_add(1, Ordering::SeqCst);
132            });
133            stream.emit(&1);
134            assert_eq!(count.load(Ordering::SeqCst), 1);
135            drop(sub);
136        }
137
138        stream.emit(&1);
139        assert_eq!(count.load(Ordering::SeqCst), 1);
140    }
141
142    #[test]
143    fn test_multiple_listeners() {
144        let stream: EventStream<i32> = EventStream::new();
145        let a = Arc::new(AtomicUsize::new(0));
146        let b = Arc::new(AtomicUsize::new(0));
147
148        let ac = Arc::clone(&a);
149        let _sa = stream.on(move |_: &i32| {
150            ac.fetch_add(1, Ordering::SeqCst);
151        });
152        let bc = Arc::clone(&b);
153        let _sb = stream.on(move |_: &i32| {
154            bc.fetch_add(1, Ordering::SeqCst);
155        });
156
157        stream.emit(&1);
158        assert_eq!(a.load(Ordering::SeqCst), 1);
159        assert_eq!(b.load(Ordering::SeqCst), 1);
160    }
161}