observable_rs/
observable.rs

1use std::{cell::Ref, cell::RefCell, rc::Rc};
2
3#[derive(Default)]
4struct ListenerSet<T> {
5    nextid: usize,
6    items: Vec<ListenerItem<T>>,
7}
8
9struct ListenerItem<T> {
10    /// Monotonic id for use in the binary search
11    id: usize,
12    listener: Listener<T>,
13}
14
15pub enum Listener<T> {
16    Once(Box<dyn Fn(&T)>),
17    Durable(Rc<dyn Fn(&T)>),
18}
19
20pub struct ListenerHandle(usize);
21
22pub struct Observable<T> {
23    value: Rc<RefCell<T>>,
24    listener_set: Rc<RefCell<ListenerSet<T>>>,
25}
26
27// Implemented manually because `T` does not need to be Clone
28impl<T> Clone for Observable<T> {
29    fn clone(&self) -> Self {
30        Observable {
31            value: self.value.clone(),
32            listener_set: self.listener_set.clone(),
33        }
34    }
35}
36
37impl<T> Observable<T> {
38    pub fn new(value: T) -> Self {
39        Self {
40            value: Rc::new(RefCell::new(value)),
41            listener_set: Rc::new(RefCell::new(ListenerSet {
42                nextid: 0,
43                items: Vec::new(),
44            })),
45        }
46    }
47    fn notify(&self) {
48        let mut working_set: Vec<Listener<T>>;
49        {
50            let mut listenerset = self.listener_set.borrow_mut();
51            // It's possible to add listeners while we are firing a listener
52            // so we need to make a copy of the listeners vec so we're not mutating it while calling listener functions
53
54            working_set = Vec::with_capacity(listenerset.items.len());
55
56            // Take all Listener::Once entries, and clone the others
57            let mut i = 0;
58            while i != listenerset.items.len() {
59                match listenerset.items[i].listener {
60                    Listener::Once(_) => {
61                        // Just take it
62                        working_set.push(listenerset.items.remove(i).listener);
63                    }
64                    Listener::Durable(ref f) => {
65                        working_set.push(Listener::Durable(f.clone()));
66                        i += 1;
67                    }
68                }
69            }
70        }
71
72        let r = self.get();
73
74        // Now that the borrow on the listeners vec is over, we can safely call them
75        // We can also be confident that we won't call any listeners which were attached during our dispatch
76        for listener in working_set {
77            match listener {
78                Listener::Once(f) => f(&r),
79                Listener::Durable(f) => f(&r),
80            }
81        }
82    }
83
84    fn _subscribe(&self, listener: Listener<T>) -> ListenerHandle {
85        let mut listener_set = self.listener_set.borrow_mut();
86
87        let id = listener_set.nextid;
88        listener_set.nextid += 1;
89        listener_set.items.push(ListenerItem { id, listener });
90        ListenerHandle(id)
91    }
92
93    // impl<T> Set<T> for Observable<T> {
94    pub fn set(&self, value: T) {
95        {
96            *(self.value.borrow_mut()) = value;
97        };
98
99        self.notify();
100    }
101    // }
102    // impl<T> Observe<T> for Observable<T> {
103    pub fn get(&self) -> Ref<T> {
104        self.value.borrow()
105    }
106    pub fn subscribe(&self, cb: Box<dyn Fn(&T)>) -> ListenerHandle {
107        let listener = Listener::Durable(cb.into());
108        self._subscribe(listener)
109    }
110    pub fn once(&self, cb: Box<dyn Fn(&T)>) -> ListenerHandle {
111        let listener = Listener::Once(cb);
112        self._subscribe(listener)
113    }
114
115    pub fn unsubscribe(&self, handle: ListenerHandle) -> bool {
116        let mut listener_set = self.listener_set.borrow_mut();
117
118        // Find the current listener offset
119        match listener_set
120            .items
121            .binary_search_by(|probe| probe.id.cmp(&handle.0))
122        {
123            Ok(offset) => {
124                listener_set.items.remove(offset);
125                true
126            }
127            Err(_) => false,
128        }
129    }
130}
131
132impl<T, V> Observable<V>
133where
134    V: Pushable<Value = T>,
135{
136    pub fn push(&self, item: T) {
137        {
138            let mut ref_mut = self.value.borrow_mut();
139            let vec = &mut *ref_mut;
140            vec.push(item);
141        }
142        self.notify();
143    }
144}
145
146impl<T> Default for Observable<T>
147where
148    T: Default,
149{
150    fn default() -> Self {
151        Observable::new(T::default())
152    }
153}
154
155pub trait Pushable {
156    type Value;
157    fn push(&mut self, value: Self::Value);
158}
159
160impl<T> Pushable for Vec<T> {
161    type Value = T;
162    fn push(&mut self, value: Self::Value) {
163        self.push(value)
164    }
165}
166
167#[cfg(test)]
168mod test {
169    use std::{cell::RefCell, rc::Rc};
170
171    use crate::Pushable;
172
173    use super::Observable;
174
175    #[test]
176    fn observable_vec_push() {
177        let obs = Observable::new(vec![1, 2, 3]);
178
179        let counter: Rc<RefCell<Option<usize>>> = Rc::new(RefCell::new(None));
180
181        {
182            let counter = counter.clone();
183            obs.subscribe(Box::new(move |v: &Vec<u32>| {
184                *(counter.borrow_mut()) = Some(v.len());
185            }));
186        }
187
188        assert_eq!(*counter.borrow(), None);
189        obs.push(0);
190        assert_eq!(*counter.borrow(), Some(4));
191    }
192
193    struct Wrapper<T>(Vec<T>);
194
195    impl<T> Pushable for Wrapper<T> {
196        type Value = T;
197
198        fn push(&mut self, value: Self::Value) {
199            self.0.push(value)
200        }
201    }
202
203    #[test]
204    fn observable_vec_wrapper_push() {
205        let obs = Observable::new(Wrapper(vec![1, 2, 3]));
206
207        let counter: Rc<RefCell<Option<usize>>> = Rc::new(RefCell::new(None));
208
209        {
210            let counter = counter.clone();
211            obs.subscribe(Box::new(move |v: &Wrapper<u32>| {
212                *(counter.borrow_mut()) = Some(v.0.len());
213            }));
214        }
215
216        assert_eq!(*counter.borrow(), None);
217        obs.push(0);
218        assert_eq!(*counter.borrow(), Some(4));
219    }
220}