a2ui_base/observable/
event_stream.rs1use std::sync::Arc;
6
7pub struct EventSubscription {
9 slot: usize,
10 drop_fn: Box<dyn Fn(usize) + Send + Sync>,
11}
12
13impl Drop for EventSubscription {
14 fn drop(&mut self) {
15 (self.drop_fn)(self.slot);
16 }
17}
18
19type Listener<T> = Box<dyn Fn(&T) + Send + Sync>;
20
21pub struct EventStream<T: 'static> {
26 listeners: Arc<std::sync::Mutex<Vec<Option<Listener<T>>>>>,
27 next_id: Arc<std::sync::Mutex<usize>>,
28}
29
30impl<T: 'static> Default for EventStream<T> {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl<T: 'static> EventStream<T> {
37 pub fn new() -> Self {
38 Self {
39 listeners: Arc::new(std::sync::Mutex::new(Vec::new())),
40 next_id: Arc::new(std::sync::Mutex::new(0)),
41 }
42 }
43
44 pub fn on<F>(&self, listener: F) -> EventSubscription
46 where
47 F: Fn(&T) + Send + Sync + 'static,
48 {
49 let id = {
50 let mut next = self.next_id.lock().unwrap();
51 let id = *next;
52 *next += 1;
53 id
54 };
55
56 {
57 let mut guard = self.listeners.lock().unwrap();
58 if id >= guard.len() {
59 guard.resize_with(id + 1, || None);
60 }
61 guard[id] = Some(Box::new(listener));
62 }
63
64 let listeners = Arc::clone(&self.listeners);
65 EventSubscription {
66 slot: id,
67 drop_fn: Box::new(move |slot: usize| {
68 let mut guard = listeners.lock().unwrap();
69 if slot < guard.len() {
70 guard[slot] = None;
71 }
72 }),
73 }
74 }
75
76 pub fn emit(&self, event: &T) {
78 let guard = self.listeners.lock().unwrap();
79 for listener in guard.iter().flatten() {
80 listener(event);
81 }
82 }
83
84 #[allow(dead_code)]
86 pub fn listener_count(&self) -> usize {
87 self.listeners.lock().unwrap().iter().flatten().count()
88 }
89}
90
91impl<T: 'static> Clone for EventStream<T> {
92 fn clone(&self) -> Self {
93 Self {
94 listeners: Arc::clone(&self.listeners),
95 next_id: Arc::clone(&self.next_id),
96 }
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103 use std::sync::atomic::{AtomicUsize, Ordering};
104
105 #[test]
106 fn test_subscribe_and_emit() {
107 let stream: EventStream<i32> = EventStream::new();
108 let count = Arc::new(AtomicUsize::new(0));
109 let c = Arc::clone(&count);
110
111 let _sub = stream.on(move |val: &i32| {
112 if *val == 42 {
113 c.fetch_add(1, Ordering::SeqCst);
114 }
115 });
116
117 stream.emit(&42);
118 stream.emit(&10);
119 stream.emit(&42);
120 assert_eq!(count.load(Ordering::SeqCst), 2);
121 }
122
123 #[test]
124 fn test_unsubscribe_on_drop() {
125 let stream: EventStream<i32> = EventStream::new();
126 let count = Arc::new(AtomicUsize::new(0));
127
128 {
129 let c = Arc::clone(&count);
130 let sub = stream.on(move |_: &i32| {
131 c.fetch_add(1, Ordering::SeqCst);
132 });
133 stream.emit(&1);
134 assert_eq!(count.load(Ordering::SeqCst), 1);
135 drop(sub);
136 }
137
138 stream.emit(&1);
139 assert_eq!(count.load(Ordering::SeqCst), 1);
140 }
141
142 #[test]
143 fn test_multiple_listeners() {
144 let stream: EventStream<i32> = EventStream::new();
145 let a = Arc::new(AtomicUsize::new(0));
146 let b = Arc::new(AtomicUsize::new(0));
147
148 let ac = Arc::clone(&a);
149 let _sa = stream.on(move |_: &i32| {
150 ac.fetch_add(1, Ordering::SeqCst);
151 });
152 let bc = Arc::clone(&b);
153 let _sb = stream.on(move |_: &i32| {
154 bc.fetch_add(1, Ordering::SeqCst);
155 });
156
157 stream.emit(&1);
158 assert_eq!(a.load(Ordering::SeqCst), 1);
159 assert_eq!(b.load(Ordering::SeqCst), 1);
160 }
161}