rs_observable/
chobservable.rs

1/// Implementation of async, tokio based observers. The approach
2/// uses async channels instead of trait callbacks
3
4use log::debug;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7use tokio::sync::mpsc;
8use tokio::sync::mpsc::error::SendError;
9use tokio::sync::mpsc::{Receiver, Sender};
10use std::fmt::{self, Debug, Formatter};
11
12#[derive(Debug)]
13struct StoredObserver<T> {
14    tx: Sender<T>,
15    id: u32,
16}
17
18impl<T> StoredObserver<T> {
19    pub fn new(id: u32, tx: Sender<T>) -> Self {
20        StoredObserver { tx, id }
21    }
22}
23
24/// Async, multithreading-ready Observale that use channels instead of callbacks
25pub struct ChObservable<T: Clone> {
26    /// Registered bservers
27    observers: Arc<Mutex<Vec<StoredObserver<T>>>>,
28    /// Next available observerId for registrations
29    next_id: u32,
30}
31
32impl<T: Clone + Debug> Debug for ChObservable<T> {
33    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
34        f.debug_struct("ChObservable")
35            .field("observers", &self.observers)
36            .field("next_id", &self.next_id)
37            .finish()
38    }
39}
40
41impl<T: Clone> ChObservable<T> {
42    pub fn new() -> Self {
43        /// creates a new object
44        ChObservable {
45            observers: Arc::new(Mutex::new(Vec::new())),
46            next_id: 1,
47        }
48    }
49
50    /// This function registers a new observer. It returns the ID of the registered
51    /// observer and a channel receiver to get the new values
52    ///
53    pub async fn register(&mut self) -> (u32, Receiver<T>) {
54        let mut g = self.observers.lock().await;
55        let observers: &mut Vec<StoredObserver<T>> = &mut g;
56        let id = self.next_id;
57        self.next_id += 1;
58        let (tx, rx): (Sender<T>, Receiver<T>) = mpsc::channel(10);
59        observers.push(StoredObserver::new(id, tx));
60        debug!("register observer: id={}", id);
61        (id, rx)
62    }
63
64    /// This function unregisters an observer.
65    ///
66    /// ## Arguments
67    /// * `observer_id` - ID returned after the registration of an observer
68    ///
69    pub async fn unregister(&mut self, observer_id: u32) {
70        let mut g = self.observers.lock().await;
71        let observers: &mut Vec<StoredObserver<T>> = &mut g;
72        let mut found: Option<usize> = None;
73        debug!("receive unregister observer request: id={}", observer_id);
74        for (i, e) in observers.iter().enumerate() {
75            if e.id == observer_id {
76                found = Some(i);
77                break;
78            }
79        }
80        if let Some(index_to_remove) = found {
81            debug!("unregister observer request: id={}", observer_id);
82            observers.remove(index_to_remove);
83        }
84    }
85
86    /// Triggers the notification of the restistered observers.
87    ///
88    /// ## Arguments
89    /// * `data` - data that should be passed to the observers
90    pub async fn notify(&self, data: &T) -> Result<(), SendError<T>> {
91        debug!("received notify request");
92        let mut g = self.observers.lock().await;
93        let observers: &mut Vec<StoredObserver<T>> = &mut g;
94        debug!("start to notify ...");
95        for o in observers {
96            o.tx.send(data.clone()).await?;
97        }
98        debug!("notified.");
99        Ok(())
100    }
101}
102
103/// Observable wrapper around a specific value
104pub struct ChObservedValue<T: Clone> {
105    /// Value to be wrapped
106    value: Arc<Mutex<Option<T>>>,
107    /// Observable implementation
108    observable: Arc<Mutex<ChObservable<Option<T>>>>,
109}
110
111impl<T: Clone> ChObservedValue<T> {
112    /// Creates an new object
113    pub fn new() -> Self {
114        ChObservedValue {
115            observable: Arc::new(Mutex::new(ChObservable::<Option<T>>::new())),
116            value: Arc::new(Mutex::new(None)),
117        }
118    }
119
120
121    async fn set_value_impl(&mut self, v: Option<T>) {
122        let mut g = self.value.lock().await;
123        let o: &mut Option<T> = &mut g;
124        *o = v;
125    }
126
127    async fn notify_impl(&mut self, v: &Option<T>) {
128        let mut g = self.observable.lock().await;
129        let o: &mut ChObservable<Option<T>> = &mut g;
130        let _ = o.notify(v).await;
131    }
132
133    /// Set a new value to the object. All registered observers are
134    /// called to get notified.
135    ///
136    /// ## Arguments
137    /// * `v` - value to set
138    ///
139    pub async fn set_value(&mut self, v: &T) {
140        let new_v = Some(v.clone());
141        self.set_value_impl(new_v.clone()).await;
142        self.notify_impl(&new_v).await;
143    }
144
145    /// Reset the value of the object. All registered observers are
146    /// called to get notified.
147    ///
148    pub async fn reset_value(&mut self) {
149        let new_v = None;
150        self.set_value_impl(None).await;
151        self.notify_impl(&new_v).await;
152    }
153
154    /// This function registers a new observer. It returns the ID of the registered
155    /// observer and a channel receiver to get the new values
156    ///
157    pub async fn register(&mut self) -> (u32, Receiver<Option<T>>) {
158        let mut g = self.observable.lock().await;
159        let o: &mut ChObservable<Option<T>> = &mut g;
160        o.register().await
161    }
162
163    /// This function unregisters an observer.
164    ///
165    /// ## Arguments
166    /// * `observer_id` - ID returned after the registration of an observer
167    ///
168    pub async fn unregister(&mut self, observer_id: u32) {
169        let mut g = self.observable.lock().await;
170        let o: &mut ChObservable<Option<T>> = &mut g;
171        o.unregister(observer_id).await;
172    }
173
174    /// Returns a reference to the contained value
175    pub fn value_ref(&self) -> &Arc<Mutex<Option<T>>> {
176        &self.value
177    }
178
179    /// Returns a mutable reference to the contained value
180    pub fn value_mutref(&mut self) -> &mut Arc<Mutex<Option<T>>> {
181        &mut self.value
182    }
183
184}
185
186#[cfg(test)]
187mod tests {
188    use log::debug;
189    use std::sync::Arc;
190    use tokio::sync::Mutex;
191    use tokio::task::JoinHandle;
192    use tokio::sync::mpsc::Receiver;
193
194    use crate::chobservable::{ChObservable, ChObservedValue};
195
196    #[derive(Debug)]
197    struct ObserverObj {
198        pub v: Arc<Mutex<Option<String>>>,
199        observable: Arc<Mutex<ChObservable<String>>>,
200        pub id: Option<u32>,
201        h: Option<JoinHandle<()>>,
202    }
203
204
205    impl ObserverObj {
206        pub fn new() -> Self {
207            let o = ObserverObj {
208                v: Arc::new(Mutex::new(None)),
209                observable: Arc::new(Mutex::new(ChObservable::new())),
210                id: None,
211                h: None,
212            };
213            o
214        }
215
216        pub async fn observe(&mut self)-> (u32, Receiver<String>) {
217            let mut g = self.observable.lock().await;
218            let o: &mut ChObservable<String> = &mut g;
219            o.register().await
220        }
221
222        pub async fn register(&mut self, cho: &mut ChObservable<String>) {
223            let (id, mut rx) = cho.register().await;
224            self.id = Some(id);
225            let value = self.v.clone();
226            let o = self.observable.clone();
227            let h = tokio::spawn(async move {
228                loop {
229                    match rx.recv().await {
230                        Some(s) => {
231                            {
232                                debug!("[id={}]received value, request lock ...", id);
233                                let mut g = value.lock().await;
234                                debug!("[id={}]received value, got lock.", id);
235                                let v: &mut Option<String> = &mut g;
236                                *v = Some(s.clone());
237                            }
238                            {
239                                let x: &mut ChObservable<String>;
240                                debug!("[id={}]request lock, to inform about values ...", id);
241                                let mut og = o.lock().await;
242                                debug!("[id={}]got lock, to inform about values", id);
243                                x = &mut og;
244                                let _ = x.notify(&s).await;
245                            };
246                        },
247                        None => debug!("[id={}]received NONE value.", id),
248                    };
249                };
250            });
251            self.h = Some(h);
252        }
253    }
254
255    async fn check_val(id: u32, ov: &Arc<Mutex<Option<String>>>, expected: &Option<String>) {
256        let g = ov.lock().await;
257        let v: &Option<String> = &g;
258        println!("Observer [id={}], content: {:?}", id, v);
259        assert_eq!(v, expected);
260    }
261    async fn check_val2(id: u32, rx: &mut Receiver<String>, expected: &String) {
262        debug!("[id2={}]i am waiting to get informed ...", id);
263        match rx.recv().await {
264            Some(v) => {
265                debug!("[id2={}]i was informed", id);
266                assert_eq!(v, *expected);
267            },
268            None => {
269                debug!("[id2={}]i was informed 2", id);
270                assert!(false);
271            },
272        };
273    }
274
275    #[tokio::test(flavor = "current_thread")]
276    async fn test_chobservable_single() {
277
278        let mut cho: ChObservable<String> = ChObservable::new();
279        let mut o1: ObserverObj = ObserverObj::new();
280        o1.register(&mut cho).await;
281        let (_, mut o1_rx) = o1.observe().await;
282        let mut o2: ObserverObj = ObserverObj::new();
283        o2.register(&mut cho).await;
284        let (_, mut o2_rx) = o2.observe().await;
285        let mut o3: ObserverObj = ObserverObj::new();
286        o3.register(&mut cho).await;
287        let (_, mut o3_rx) = o3.observe().await;
288        let expected_none = None;
289        check_val(o1.id.unwrap(), &o1.v, &expected_none).await;
290        check_val(o2.id.unwrap(), &o2.v, &expected_none).await;
291        check_val(o3.id.unwrap(), &o3.v, &expected_none).await;
292        let t1 = "test-99".to_string();
293        match cho.notify(&t1).await {
294            Ok(()) => (),
295            Err(_) => assert!(false, "receive error while notify"),
296        };
297    
298        let expected_1 = Some(t1.clone());
299        // since notify is async we have to way until the value have changed
300        check_val2(o1.id.unwrap(), &mut o1_rx, &t1).await;
301        check_val2(o2.id.unwrap(), &mut o2_rx, &t1).await;
302        check_val2(o3.id.unwrap(), &mut o3_rx, &t1).await;
303    
304        let mut o4: ObserverObj = ObserverObj::new();
305        o4.register(&mut cho).await;
306        let (_, mut o4_rx) = o4.observe().await;
307        check_val(o1.id.unwrap(), &o1.v, &expected_1).await;
308        check_val(o2.id.unwrap(), &o2.v, &expected_1).await;
309        check_val(o3.id.unwrap(), &o3.v, &expected_1).await;
310        check_val(o4.id.unwrap(), &o4.v, &expected_none).await;
311    
312        let t2 = "test-999".to_string();
313        match cho.notify(&t2).await {
314            Ok(()) => (),
315            Err(_) => assert!(false, "receive error while notify"),
316        };
317        check_val2(o1.id.unwrap(), &mut o1_rx, &t2).await;
318        check_val2(o2.id.unwrap(), &mut o2_rx, &t2).await;
319        check_val2(o3.id.unwrap(), &mut o3_rx, &t2).await;
320        check_val2(o4.id.unwrap(), &mut o4_rx, &t2).await;
321    }
322
323    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
324    async fn test_chobservable() {
325        let mut cho: ChObservable<String> = ChObservable::new();
326        let mut o1: ObserverObj = ObserverObj::new();
327        o1.register(&mut cho).await;
328        let (_, mut o1_rx) = o1.observe().await;
329        let mut o2: ObserverObj = ObserverObj::new();
330        o2.register(&mut cho).await;
331        let (_, mut o2_rx) = o2.observe().await;
332        let mut o3: ObserverObj = ObserverObj::new();
333        o3.register(&mut cho).await;
334        let (_, mut o3_rx) = o3.observe().await;
335        let expected_none = None;
336        check_val(o1.id.unwrap(), &o1.v, &expected_none).await;
337        check_val(o2.id.unwrap(), &o2.v, &expected_none).await;
338        check_val(o3.id.unwrap(), &o3.v, &expected_none).await;
339        let t1 = "test-99".to_string();
340        match cho.notify(&t1).await {
341            Ok(()) => (),
342            Err(_) => assert!(false, "receive error while notify"),
343        };
344    
345        let expected_1 = Some(t1.clone());
346        // since notify is async we have to way until the value have changed
347        check_val2(o1.id.unwrap(), &mut o1_rx, &t1).await;
348        check_val2(o2.id.unwrap(), &mut o2_rx, &t1).await;
349        check_val2(o3.id.unwrap(), &mut o3_rx, &t1).await;
350    
351        let mut o4: ObserverObj = ObserverObj::new();
352        o4.register(&mut cho).await;
353        let (_, mut o4_rx) = o4.observe().await;
354        check_val(o1.id.unwrap(), &o1.v, &expected_1).await;
355        check_val(o2.id.unwrap(), &o2.v, &expected_1).await;
356        check_val(o3.id.unwrap(), &o3.v, &expected_1).await;
357        check_val(o4.id.unwrap(), &o4.v, &expected_none).await;
358    
359        let t2 = "test-999".to_string();
360        match cho.notify(&t2).await {
361            Ok(()) => (),
362            Err(_) => assert!(false, "receive error while notify"),
363        };
364        check_val2(o1.id.unwrap(), &mut o1_rx, &t2).await;
365        check_val2(o2.id.unwrap(), &mut o2_rx, &t2).await;
366        check_val2(o3.id.unwrap(), &mut o3_rx, &t2).await;
367        check_val2(o4.id.unwrap(), &mut o4_rx, &t2).await;
368    }
369
370    async fn check_val3(id: u32, rx: &mut Receiver<Option<String>>, expected: &String) {
371        debug!("[id2={}]i am waiting to get informed ...", id);
372        match rx.recv().await {
373            Some(v) => {
374                debug!("[id2={}]i was informed", id);
375                assert_eq!(v.unwrap(), *expected);
376            },
377            None => {
378                debug!("[id2={}]i was informed 2", id);
379                assert!(false);
380            },
381        };
382    }
383
384    async fn check_val5(id: u32, rx: &mut Receiver<Option<String>>) {
385        debug!("[id2={}]i am waiting to get informed ...", id);
386        match rx.recv().await {
387            Some(o) => {
388                debug!("[id2={}]i was informed", id);
389                assert_eq!(o, Option::None);
390            },
391            None => {
392                debug!("[id2={}]i was informed 2", id);
393                assert!(false);
394            },
395        };
396    }
397
398    async fn check_val4(cho: &ChObservedValue<String>, expected: &Option<String>) {
399        let r = cho.value_ref();
400        let g = r.lock().await;
401        let os: &Option<String> = &g;
402        assert_eq!(*os, *expected);
403    }
404
405    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
406    async fn test_chobservedvalue() {
407        let mut cho: ChObservedValue<String> = ChObservedValue::new();
408        let (id1,mut rx1) = cho.register().await;
409        let (id2,mut rx2) = cho.register().await;
410        let (id3,mut rx3) = cho.register().await;
411
412        check_val4(&cho, &Option::None).await;
413
414        let t1 = "test-99".to_string();
415        cho.set_value(&t1).await;
416
417        let expected_1 = Some(t1.clone());
418        // since notify is async we have to way until the value have changed
419        check_val3(id1, &mut rx1, &t1).await;
420        check_val3(id2, &mut rx2, &t1).await;
421        check_val3(id3, &mut rx3, &t1).await;
422
423        let (id4,mut rx4) = cho.register().await;
424
425        check_val4(&cho, &expected_1).await;
426
427        let t2 = "test-999".to_string();
428        cho.set_value(&t2).await;
429
430        check_val3(id1, &mut rx1, &t2).await;
431        check_val3(id2, &mut rx2, &t2).await;
432        check_val3(id3, &mut rx3, &t2).await;
433        check_val3(id4, &mut rx4, &t2).await;
434
435        let expected_2 = Some(t2);
436        check_val4(&cho, &expected_2).await;
437
438        cho.reset_value().await;
439
440        check_val5(id1, &mut rx1).await;
441        check_val5(id2, &mut rx2).await;
442        check_val5(id3, &mut rx3).await;
443        check_val5(id4, &mut rx4).await;
444    }
445
446}