rt_graph/
observable_value.rs

1//! An implementation of the Observer pattern.
2
3use std::{
4    cell::RefCell,
5    rc::Rc,
6};
7
8/// A value that implements the Observer pattern.
9///
10/// Consumers can connect to receive callbacks when the value changes.
11pub struct ObservableValue<T>
12    where T: Clone
13{
14    value: T,
15    subs: Vec<Subscription<T>>,
16    new_id: usize,
17}
18
19/// The identifier for a subscription, used to disconnect it when no longer required.
20#[derive(Clone, Copy, Eq, PartialEq)]
21pub struct SubscriptionId(usize);
22
23struct Subscription<T> {
24    id: SubscriptionId,
25    callback: Box<dyn Fn(&T)>
26}
27
28impl<T> ObservableValue<T>
29    where T: Clone
30{
31    /// Construct an `ObservableValue`.
32    pub fn new(initial_value: T) -> ObservableValue<T> {
33        ObservableValue {
34            value: initial_value,
35            new_id: 0,
36            subs: Vec::with_capacity(0),
37        }
38    }
39
40    /// Get the current value
41    pub fn get(&self) -> &T {
42        &self.value
43    }
44
45    /// Set a new value and notify all connected subscribers.
46    pub fn set(&mut self, new_value: &T) {
47        self.value = new_value.clone();
48        self.call_subscribers();
49    }
50
51    fn call_subscribers(&self) {
52        for sub in self.subs.iter() {
53            (sub.callback)(&self.value)
54        }
55    }
56
57    /// Connect a new subscriber that will receive callbacks when the
58    /// value is set.
59    ///
60    /// Returns a SubscriptionId to disconnect the subscription when
61    /// no longer required.
62    pub fn connect<F>(&mut self, callback: F) -> SubscriptionId
63        where F: (Fn(&T)) + 'static
64    {
65        let id = SubscriptionId(self.new_id);
66        self.new_id = self.new_id.checked_add(1).expect("No overflow");
67
68        self.subs.push(Subscription {
69            id,
70            callback: Box::new(callback),
71        });
72        self.subs.shrink_to_fit();
73        id
74    }
75
76    /// Disconnect an existing subscription.
77    pub fn disconnect(&mut self, sub_id: SubscriptionId) {
78        self.subs.retain(|sub| sub.id != sub_id);
79        self.subs.shrink_to_fit();
80    }
81
82    /// Divide this instance into a read half (can listen for updates, but cannot
83    /// write new values) and a write half (can write new values).
84    pub fn split(self) -> (ReadHalf<T>, WriteHalf<T>) {
85        let inner = Rc::new(RefCell::new(self));
86        (
87            ReadHalf {
88                inner: inner.clone(),
89            },
90            WriteHalf {
91                inner: inner
92            }
93        )
94    }
95}
96
97/// The read half of an `ObservableValue`, which can only listen for
98/// updates and read the current value.
99pub struct ReadHalf<T>
100    where T: Clone
101{
102    inner: Rc<RefCell<ObservableValue<T>>>,
103}
104
105/// The write half of an `ObservableValue`, which can write new values.
106pub struct WriteHalf<T>
107    where T: Clone
108{
109    inner: Rc<RefCell<ObservableValue<T>>>,
110}
111
112impl<T> ReadHalf<T>
113    where T: Clone
114{
115    /// Get the current value
116    pub fn get(&self) -> T {
117        self.inner.borrow().get().clone()
118    }
119
120    /// Connect a new subscriber that will receive callbacks when the
121    /// value is set.
122    ///
123    /// Returns a SubscriptionId to disconnect the subscription when
124    /// no longer required.
125    pub fn connect<F>(&mut self, callback: F) -> SubscriptionId
126        where F: (Fn(&T)) + 'static
127    {
128        self.inner.borrow_mut().connect(callback)
129    }
130
131    /// Disconnect an existing subscription.
132    pub fn disconnect(&mut self, sub_id: SubscriptionId) {
133        self.inner.borrow_mut().disconnect(sub_id)
134    }
135}
136
137impl<T> WriteHalf<T>
138    where T: Clone
139{
140    /// Set a new value and notify all connected subscribers.
141    pub fn set(&mut self, new_value: &T) {
142        self.inner.borrow_mut().set(new_value)
143    }
144}
145
146#[cfg(test)]
147mod test {
148    use std::{
149        cell::Cell,
150        rc::Rc,
151    };
152    use super::ObservableValue;
153
154    #[test]
155    fn new_get_set() {
156        let mut ov = ObservableValue::new(17);
157        assert_eq!(*ov.get(), 17);
158
159        ov.set(&18);
160        assert_eq!(*ov.get(), 18);
161    }
162
163    #[test]
164    fn connect_set() {
165        let mut ov = ObservableValue::<u32>::new(17);
166        let mirror: Rc<Cell<u32>> = Rc::new(Cell::new(0));
167
168        let mc = mirror.clone();
169        ov.connect(move |val| {
170            mc.set(*val);
171        });
172
173        // Check callback not yet called.
174        assert_eq!(mirror.get(), 0);
175
176        ov.set(&18);
177
178        // Check the callback was called with the correct value.
179        assert_eq!(mirror.get(), 18);
180    }
181
182    #[test]
183    fn disconnect() {
184        let mut ov = ObservableValue::<u32>::new(17);
185        let mirror_1: Rc<Cell<u32>> = Rc::new(Cell::new(0));
186        let mirror_2: Rc<Cell<u32>> = Rc::new(Cell::new(0));
187
188        let mc1 = mirror_1.clone();
189        let sub_id_1 = ov.connect(move |val| {
190            mc1.set(*val);
191        });
192
193        let mc2 = mirror_2.clone();
194        let _sub_id_2 = ov.connect(move |val| {
195            mc2.set(*val);
196        });
197
198        // Both mirrors are connected with callbacks, set() updates both mirror values.
199        ov.set(&18);
200        assert_eq!(mirror_1.get(), 18);
201        assert_eq!(mirror_2.get(), 18);
202
203        ov.disconnect(sub_id_1);
204
205        // Only sub_id_2 is still connected, set() only updates one mirror value.
206        ov.set(&19);
207        assert_eq!(mirror_1.get(), 18);
208        assert_eq!(mirror_2.get(), 19);
209    }
210
211    #[test]
212    fn split() {
213        let ov = ObservableValue::<u32>::new(17);
214        let (mut r, mut w) = ov.split();
215
216        let mirror: Rc<Cell<u32>> = Rc::new(Cell::new(0));
217
218        let mc = mirror.clone();
219        r.connect(move |val| {
220            mc.set(*val);
221        });
222
223        // Check callback not yet called.
224        assert_eq!(mirror.get(), 0);
225
226        w.set(&18);
227
228        // Check the callback was called with the correct value.
229        assert_eq!(mirror.get(), 18);
230    }
231}