1use log::debug;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7use tokio::sync::mpsc;
8use tokio::sync::mpsc::error::SendError;
9use tokio::sync::mpsc::{Receiver, Sender};
10use std::fmt::{self, Debug, Formatter};
11
12#[derive(Debug)]
13struct StoredObserver<T> {
14 tx: Sender<T>,
15 id: u32,
16}
17
18impl<T> StoredObserver<T> {
19 pub fn new(id: u32, tx: Sender<T>) -> Self {
20 StoredObserver { tx, id }
21 }
22}
23
24pub struct ChObservable<T: Clone> {
26 observers: Arc<Mutex<Vec<StoredObserver<T>>>>,
28 next_id: u32,
30}
31
32impl<T: Clone + Debug> Debug for ChObservable<T> {
33 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
34 f.debug_struct("ChObservable")
35 .field("observers", &self.observers)
36 .field("next_id", &self.next_id)
37 .finish()
38 }
39}
40
41impl<T: Clone> ChObservable<T> {
42 pub fn new() -> Self {
43 ChObservable {
45 observers: Arc::new(Mutex::new(Vec::new())),
46 next_id: 1,
47 }
48 }
49
50 pub async fn register(&mut self) -> (u32, Receiver<T>) {
54 let mut g = self.observers.lock().await;
55 let observers: &mut Vec<StoredObserver<T>> = &mut g;
56 let id = self.next_id;
57 self.next_id += 1;
58 let (tx, rx): (Sender<T>, Receiver<T>) = mpsc::channel(10);
59 observers.push(StoredObserver::new(id, tx));
60 debug!("register observer: id={}", id);
61 (id, rx)
62 }
63
64 pub async fn unregister(&mut self, observer_id: u32) {
70 let mut g = self.observers.lock().await;
71 let observers: &mut Vec<StoredObserver<T>> = &mut g;
72 let mut found: Option<usize> = None;
73 debug!("receive unregister observer request: id={}", observer_id);
74 for (i, e) in observers.iter().enumerate() {
75 if e.id == observer_id {
76 found = Some(i);
77 break;
78 }
79 }
80 if let Some(index_to_remove) = found {
81 debug!("unregister observer request: id={}", observer_id);
82 observers.remove(index_to_remove);
83 }
84 }
85
86 pub async fn notify(&self, data: &T) -> Result<(), SendError<T>> {
91 debug!("received notify request");
92 let mut g = self.observers.lock().await;
93 let observers: &mut Vec<StoredObserver<T>> = &mut g;
94 debug!("start to notify ...");
95 for o in observers {
96 o.tx.send(data.clone()).await?;
97 }
98 debug!("notified.");
99 Ok(())
100 }
101}
102
103pub struct ChObservedValue<T: Clone> {
105 value: Arc<Mutex<Option<T>>>,
107 observable: Arc<Mutex<ChObservable<Option<T>>>>,
109}
110
111impl<T: Clone> ChObservedValue<T> {
112 pub fn new() -> Self {
114 ChObservedValue {
115 observable: Arc::new(Mutex::new(ChObservable::<Option<T>>::new())),
116 value: Arc::new(Mutex::new(None)),
117 }
118 }
119
120
121 async fn set_value_impl(&mut self, v: Option<T>) {
122 let mut g = self.value.lock().await;
123 let o: &mut Option<T> = &mut g;
124 *o = v;
125 }
126
127 async fn notify_impl(&mut self, v: &Option<T>) {
128 let mut g = self.observable.lock().await;
129 let o: &mut ChObservable<Option<T>> = &mut g;
130 let _ = o.notify(v).await;
131 }
132
133 pub async fn set_value(&mut self, v: &T) {
140 let new_v = Some(v.clone());
141 self.set_value_impl(new_v.clone()).await;
142 self.notify_impl(&new_v).await;
143 }
144
145 pub async fn reset_value(&mut self) {
149 let new_v = None;
150 self.set_value_impl(None).await;
151 self.notify_impl(&new_v).await;
152 }
153
154 pub async fn register(&mut self) -> (u32, Receiver<Option<T>>) {
158 let mut g = self.observable.lock().await;
159 let o: &mut ChObservable<Option<T>> = &mut g;
160 o.register().await
161 }
162
163 pub async fn unregister(&mut self, observer_id: u32) {
169 let mut g = self.observable.lock().await;
170 let o: &mut ChObservable<Option<T>> = &mut g;
171 o.unregister(observer_id).await;
172 }
173
174 pub fn value_ref(&self) -> &Arc<Mutex<Option<T>>> {
176 &self.value
177 }
178
179 pub fn value_mutref(&mut self) -> &mut Arc<Mutex<Option<T>>> {
181 &mut self.value
182 }
183
184}
185
186#[cfg(test)]
187mod tests {
188 use log::debug;
189 use std::sync::Arc;
190 use tokio::sync::Mutex;
191 use tokio::task::JoinHandle;
192 use tokio::sync::mpsc::Receiver;
193
194 use crate::chobservable::{ChObservable, ChObservedValue};
195
196 #[derive(Debug)]
197 struct ObserverObj {
198 pub v: Arc<Mutex<Option<String>>>,
199 observable: Arc<Mutex<ChObservable<String>>>,
200 pub id: Option<u32>,
201 h: Option<JoinHandle<()>>,
202 }
203
204
205 impl ObserverObj {
206 pub fn new() -> Self {
207 let o = ObserverObj {
208 v: Arc::new(Mutex::new(None)),
209 observable: Arc::new(Mutex::new(ChObservable::new())),
210 id: None,
211 h: None,
212 };
213 o
214 }
215
216 pub async fn observe(&mut self)-> (u32, Receiver<String>) {
217 let mut g = self.observable.lock().await;
218 let o: &mut ChObservable<String> = &mut g;
219 o.register().await
220 }
221
222 pub async fn register(&mut self, cho: &mut ChObservable<String>) {
223 let (id, mut rx) = cho.register().await;
224 self.id = Some(id);
225 let value = self.v.clone();
226 let o = self.observable.clone();
227 let h = tokio::spawn(async move {
228 loop {
229 match rx.recv().await {
230 Some(s) => {
231 {
232 debug!("[id={}]received value, request lock ...", id);
233 let mut g = value.lock().await;
234 debug!("[id={}]received value, got lock.", id);
235 let v: &mut Option<String> = &mut g;
236 *v = Some(s.clone());
237 }
238 {
239 let x: &mut ChObservable<String>;
240 debug!("[id={}]request lock, to inform about values ...", id);
241 let mut og = o.lock().await;
242 debug!("[id={}]got lock, to inform about values", id);
243 x = &mut og;
244 let _ = x.notify(&s).await;
245 };
246 },
247 None => debug!("[id={}]received NONE value.", id),
248 };
249 };
250 });
251 self.h = Some(h);
252 }
253 }
254
255 async fn check_val(id: u32, ov: &Arc<Mutex<Option<String>>>, expected: &Option<String>) {
256 let g = ov.lock().await;
257 let v: &Option<String> = &g;
258 println!("Observer [id={}], content: {:?}", id, v);
259 assert_eq!(v, expected);
260 }
261 async fn check_val2(id: u32, rx: &mut Receiver<String>, expected: &String) {
262 debug!("[id2={}]i am waiting to get informed ...", id);
263 match rx.recv().await {
264 Some(v) => {
265 debug!("[id2={}]i was informed", id);
266 assert_eq!(v, *expected);
267 },
268 None => {
269 debug!("[id2={}]i was informed 2", id);
270 assert!(false);
271 },
272 };
273 }
274
275 #[tokio::test(flavor = "current_thread")]
276 async fn test_chobservable_single() {
277
278 let mut cho: ChObservable<String> = ChObservable::new();
279 let mut o1: ObserverObj = ObserverObj::new();
280 o1.register(&mut cho).await;
281 let (_, mut o1_rx) = o1.observe().await;
282 let mut o2: ObserverObj = ObserverObj::new();
283 o2.register(&mut cho).await;
284 let (_, mut o2_rx) = o2.observe().await;
285 let mut o3: ObserverObj = ObserverObj::new();
286 o3.register(&mut cho).await;
287 let (_, mut o3_rx) = o3.observe().await;
288 let expected_none = None;
289 check_val(o1.id.unwrap(), &o1.v, &expected_none).await;
290 check_val(o2.id.unwrap(), &o2.v, &expected_none).await;
291 check_val(o3.id.unwrap(), &o3.v, &expected_none).await;
292 let t1 = "test-99".to_string();
293 match cho.notify(&t1).await {
294 Ok(()) => (),
295 Err(_) => assert!(false, "receive error while notify"),
296 };
297
298 let expected_1 = Some(t1.clone());
299 check_val2(o1.id.unwrap(), &mut o1_rx, &t1).await;
301 check_val2(o2.id.unwrap(), &mut o2_rx, &t1).await;
302 check_val2(o3.id.unwrap(), &mut o3_rx, &t1).await;
303
304 let mut o4: ObserverObj = ObserverObj::new();
305 o4.register(&mut cho).await;
306 let (_, mut o4_rx) = o4.observe().await;
307 check_val(o1.id.unwrap(), &o1.v, &expected_1).await;
308 check_val(o2.id.unwrap(), &o2.v, &expected_1).await;
309 check_val(o3.id.unwrap(), &o3.v, &expected_1).await;
310 check_val(o4.id.unwrap(), &o4.v, &expected_none).await;
311
312 let t2 = "test-999".to_string();
313 match cho.notify(&t2).await {
314 Ok(()) => (),
315 Err(_) => assert!(false, "receive error while notify"),
316 };
317 check_val2(o1.id.unwrap(), &mut o1_rx, &t2).await;
318 check_val2(o2.id.unwrap(), &mut o2_rx, &t2).await;
319 check_val2(o3.id.unwrap(), &mut o3_rx, &t2).await;
320 check_val2(o4.id.unwrap(), &mut o4_rx, &t2).await;
321 }
322
323 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
324 async fn test_chobservable() {
325 let mut cho: ChObservable<String> = ChObservable::new();
326 let mut o1: ObserverObj = ObserverObj::new();
327 o1.register(&mut cho).await;
328 let (_, mut o1_rx) = o1.observe().await;
329 let mut o2: ObserverObj = ObserverObj::new();
330 o2.register(&mut cho).await;
331 let (_, mut o2_rx) = o2.observe().await;
332 let mut o3: ObserverObj = ObserverObj::new();
333 o3.register(&mut cho).await;
334 let (_, mut o3_rx) = o3.observe().await;
335 let expected_none = None;
336 check_val(o1.id.unwrap(), &o1.v, &expected_none).await;
337 check_val(o2.id.unwrap(), &o2.v, &expected_none).await;
338 check_val(o3.id.unwrap(), &o3.v, &expected_none).await;
339 let t1 = "test-99".to_string();
340 match cho.notify(&t1).await {
341 Ok(()) => (),
342 Err(_) => assert!(false, "receive error while notify"),
343 };
344
345 let expected_1 = Some(t1.clone());
346 check_val2(o1.id.unwrap(), &mut o1_rx, &t1).await;
348 check_val2(o2.id.unwrap(), &mut o2_rx, &t1).await;
349 check_val2(o3.id.unwrap(), &mut o3_rx, &t1).await;
350
351 let mut o4: ObserverObj = ObserverObj::new();
352 o4.register(&mut cho).await;
353 let (_, mut o4_rx) = o4.observe().await;
354 check_val(o1.id.unwrap(), &o1.v, &expected_1).await;
355 check_val(o2.id.unwrap(), &o2.v, &expected_1).await;
356 check_val(o3.id.unwrap(), &o3.v, &expected_1).await;
357 check_val(o4.id.unwrap(), &o4.v, &expected_none).await;
358
359 let t2 = "test-999".to_string();
360 match cho.notify(&t2).await {
361 Ok(()) => (),
362 Err(_) => assert!(false, "receive error while notify"),
363 };
364 check_val2(o1.id.unwrap(), &mut o1_rx, &t2).await;
365 check_val2(o2.id.unwrap(), &mut o2_rx, &t2).await;
366 check_val2(o3.id.unwrap(), &mut o3_rx, &t2).await;
367 check_val2(o4.id.unwrap(), &mut o4_rx, &t2).await;
368 }
369
370 async fn check_val3(id: u32, rx: &mut Receiver<Option<String>>, expected: &String) {
371 debug!("[id2={}]i am waiting to get informed ...", id);
372 match rx.recv().await {
373 Some(v) => {
374 debug!("[id2={}]i was informed", id);
375 assert_eq!(v.unwrap(), *expected);
376 },
377 None => {
378 debug!("[id2={}]i was informed 2", id);
379 assert!(false);
380 },
381 };
382 }
383
384 async fn check_val5(id: u32, rx: &mut Receiver<Option<String>>) {
385 debug!("[id2={}]i am waiting to get informed ...", id);
386 match rx.recv().await {
387 Some(o) => {
388 debug!("[id2={}]i was informed", id);
389 assert_eq!(o, Option::None);
390 },
391 None => {
392 debug!("[id2={}]i was informed 2", id);
393 assert!(false);
394 },
395 };
396 }
397
398 async fn check_val4(cho: &ChObservedValue<String>, expected: &Option<String>) {
399 let r = cho.value_ref();
400 let g = r.lock().await;
401 let os: &Option<String> = &g;
402 assert_eq!(*os, *expected);
403 }
404
405 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
406 async fn test_chobservedvalue() {
407 let mut cho: ChObservedValue<String> = ChObservedValue::new();
408 let (id1,mut rx1) = cho.register().await;
409 let (id2,mut rx2) = cho.register().await;
410 let (id3,mut rx3) = cho.register().await;
411
412 check_val4(&cho, &Option::None).await;
413
414 let t1 = "test-99".to_string();
415 cho.set_value(&t1).await;
416
417 let expected_1 = Some(t1.clone());
418 check_val3(id1, &mut rx1, &t1).await;
420 check_val3(id2, &mut rx2, &t1).await;
421 check_val3(id3, &mut rx3, &t1).await;
422
423 let (id4,mut rx4) = cho.register().await;
424
425 check_val4(&cho, &expected_1).await;
426
427 let t2 = "test-999".to_string();
428 cho.set_value(&t2).await;
429
430 check_val3(id1, &mut rx1, &t2).await;
431 check_val3(id2, &mut rx2, &t2).await;
432 check_val3(id3, &mut rx3, &t2).await;
433 check_val3(id4, &mut rx4, &t2).await;
434
435 let expected_2 = Some(t2);
436 check_val4(&cho, &expected_2).await;
437
438 cho.reset_value().await;
439
440 check_val5(id1, &mut rx1).await;
441 check_val5(id2, &mut rx2).await;
442 check_val5(id3, &mut rx3).await;
443 check_val5(id4, &mut rx4).await;
444 }
445
446}