1use async_trait::async_trait;
13use chrono::*;
14use drmem_api::{
15 client, device,
16 driver::{ReportReading, RxDeviceSetting, TxDeviceSetting},
17 Error, Result, Store,
18};
19use std::collections::{hash_map, HashMap};
20use std::{
21 sync::{Arc, Mutex},
22 time,
23};
24use tokio::sync::{broadcast, mpsc, oneshot};
25use tokio_stream::{
26 wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
27 StreamExt,
28};
29use tracing::{error, warn};
30
31const CHAN_SIZE: usize = 20;
32
33type ReadingState = (
34 broadcast::Sender<device::Reading>,
35 Option<device::Reading>,
36 time::SystemTime,
37);
38
39pub mod config;
40mod glob;
41
42struct DeviceInfo {
43 owner: String,
44 units: Option<String>,
45 tx_setting: Option<TxDeviceSetting>,
46 reading: Arc<Mutex<ReadingState>>,
47}
48
49impl DeviceInfo {
50 pub fn create(
51 owner: String,
52 units: Option<String>,
53 tx_setting: Option<TxDeviceSetting>,
54 ) -> DeviceInfo {
55 let (tx, _) = broadcast::channel(CHAN_SIZE);
56
57 DeviceInfo {
60 owner,
61 units,
62 tx_setting,
63 reading: Arc::new(Mutex::new((tx, None, time::UNIX_EPOCH))),
64 }
65 }
66}
67
68struct SimpleStore(HashMap<device::Name, DeviceInfo>);
69
70pub async fn open(_cfg: &config::Config) -> Result<impl Store> {
71 Ok(SimpleStore(HashMap::new()))
72}
73
74fn mk_report_func(
78 di: &DeviceInfo,
79 name: &device::Name,
80) -> ReportReading<device::Value> {
81 let reading = di.reading.clone();
82 let name = name.to_string();
83
84 Box::new(move |v| {
85 let mut ts = time::SystemTime::now();
90
91 if let Ok(mut data) = reading.lock() {
99 if ts <= data.2 {
109 if let Some(nts) =
110 data.2.checked_add(time::Duration::from_micros(1))
111 {
112 ts = nts
113 } else {
114 ts = time::UNIX_EPOCH
115 .checked_add(time::Duration::new(
116 i64::MAX as u64,
117 999_999_999,
118 ))
119 .unwrap()
120 }
121 }
122
123 let reading = device::Reading { ts, value: v };
124 let _ = data.0.send(reading.clone());
125
126 data.1 = Some(reading);
129 data.2 = ts
130 } else {
131 error!("couldn't set current value of {}", &name)
132 }
133 Box::pin(async {})
134 })
135}
136
137#[async_trait]
138impl Store for SimpleStore {
139 async fn register_read_only_device(
145 &mut self,
146 driver: &str,
147 name: &device::Name,
148 units: &Option<String>,
149 _max_history: &Option<usize>,
150 ) -> Result<(ReportReading<device::Value>, Option<device::Value>)> {
151 match self.0.entry((*name).clone()) {
154 hash_map::Entry::Vacant(e) => {
157 let di = e.insert(DeviceInfo::create(
160 String::from(driver),
161 units.clone(),
162 None,
163 ));
164
165 Ok((mk_report_func(di, name), None))
169 }
170
171 hash_map::Entry::Occupied(e) => {
175 let dev_info = e.get();
176
177 if dev_info.owner == driver {
178 let func = mk_report_func(dev_info, name);
179 let guard = dev_info.reading.lock();
180
181 Ok((
182 func,
183 if let Ok(data) = guard {
184 data.1.as_ref().map(
185 |device::Reading { value, .. }| value.clone(),
186 )
187 } else {
188 None
189 },
190 ))
191 } else {
192 Err(Error::InUse)
193 }
194 }
195 }
196 }
197
198 async fn register_read_write_device(
203 &mut self,
204 driver: &str,
205 name: &device::Name,
206 units: &Option<String>,
207 _max_history: &Option<usize>,
208 ) -> Result<(
209 ReportReading<device::Value>,
210 RxDeviceSetting,
211 Option<device::Value>,
212 )> {
213 match self.0.entry((*name).clone()) {
216 hash_map::Entry::Vacant(e) => {
219 let (tx_sets, rx_sets) = mpsc::channel(CHAN_SIZE);
222
223 let di = e.insert(DeviceInfo::create(
226 String::from(driver),
227 units.clone(),
228 Some(tx_sets),
229 ));
230
231 Ok((mk_report_func(di, name), rx_sets, None))
235 }
236
237 hash_map::Entry::Occupied(mut e) => {
241 let dev_info = e.get_mut();
242
243 if dev_info.owner == driver {
244 let (tx_sets, rx_sets) = mpsc::channel(CHAN_SIZE);
247
248 dev_info.tx_setting = Some(tx_sets);
249
250 let func = mk_report_func(dev_info, name);
251 let guard = dev_info.reading.lock();
252
253 Ok((
254 func,
255 rx_sets,
256 if let Ok(data) = guard {
257 data.1.as_ref().map(
258 |device::Reading { value, .. }| value.clone(),
259 )
260 } else {
261 None
262 },
263 ))
264 } else {
265 Err(Error::InUse)
266 }
267 }
268 }
269 }
270
271 async fn get_device_info(
272 &mut self,
273 pattern: &Option<String>,
274 ) -> Result<Vec<client::DevInfoReply>> {
275 let pred: Box<dyn FnMut(&(&device::Name, &DeviceInfo)) -> bool> =
276 if let Some(pattern) = pattern {
277 if let Ok(pattern) = pattern.parse::<device::Name>() {
278 Box::new(move |(k, _)| pattern == **k)
279 } else {
280 Box::new(move |(k, _)| {
281 glob::Pattern::create(pattern).matches(&k.to_string())
282 })
283 }
284 } else {
285 Box::new(|_| true)
286 };
287 let res: Vec<client::DevInfoReply> = self
288 .0
289 .iter()
290 .filter(pred)
291 .map(|(k, v)| {
292 let (tot, rdg) = if let Ok(data) = v.reading.lock() {
293 if data.1.is_some() {
294 (1, data.1.clone())
295 } else {
296 (0, None)
297 }
298 } else {
299 (0, None)
300 };
301
302 client::DevInfoReply {
303 name: k.clone(),
304 units: v.units.clone(),
305 settable: v.tx_setting.is_some(),
306 driver: v.owner.clone(),
307 total_points: tot,
308 first_point: rdg.clone(),
309 last_point: rdg,
310 }
311 })
312 .collect();
313
314 Ok(res)
315 }
316
317 async fn set_device(
318 &self,
319 name: device::Name,
320 value: device::Value,
321 ) -> Result<device::Value> {
322 if let Some(di) = self.0.get(&name) {
323 if let Some(tx) = &di.tx_setting {
324 let (tx_rpy, rx_rpy) = oneshot::channel();
325
326 match tx.send((value, tx_rpy)).await {
327 Ok(()) => match rx_rpy.await {
328 Ok(reply) => reply,
329 Err(_) => Err(Error::MissingPeer(
330 "driver broke connection".to_string(),
331 )),
332 },
333 Err(_) => Err(Error::MissingPeer(
334 "driver is ignoring settings".to_string(),
335 )),
336 }
337 } else {
338 Err(Error::OperationError)
339 }
340 } else {
341 Err(Error::NotFound)
342 }
343 }
344
345 async fn get_setting_chan(
346 &self,
347 name: device::Name,
348 _own: bool,
349 ) -> Result<TxDeviceSetting> {
350 if let Some(di) = self.0.get(&name) {
351 if let Some(tx) = &di.tx_setting {
352 return Ok(tx.clone());
353 }
354 }
355 Err(Error::NotFound)
356 }
357
358 async fn monitor_device(
364 &mut self,
365 name: device::Name,
366 start: Option<DateTime<Utc>>,
367 end: Option<DateTime<Utc>>,
368 ) -> Result<device::DataStream<device::Reading>> {
369 if let Some(di) = self.0.get(&name) {
373 if let Ok(guard) = di.reading.lock() {
377 let chan = guard.0.subscribe();
378
379 let strm =
387 BroadcastStream::new(chan).filter_map(move |entry| {
388 match entry {
389 Ok(v) => Some(v),
390 Err(BroadcastStreamRecvError::Lagged(count)) => {
391 warn!("missed {} readings of {}", count, &name);
392 None
393 }
394 }
395 });
396
397 match (start.map(|v| v.into()), end.map(|v| v.into())) {
398 (None, None) => {
399 if let Some(prev) = &guard.1 {
400 Ok(Box::pin(
401 tokio_stream::once(prev.clone()).chain(strm),
402 ))
403 } else {
404 Ok(Box::pin(strm))
405 }
406 }
407 (None, Some(end)) => {
408 let not_end = move |v: &device::Reading| v.ts <= end;
409
410 if let Some(prev) = &guard.1 {
411 Ok(Box::pin(
412 tokio_stream::once(prev.clone())
413 .chain(strm)
414 .take_while(not_end),
415 ))
416 } else {
417 Ok(Box::pin(strm.take_while(not_end)))
418 }
419 }
420 (Some(start), None) => {
421 let valid = move |v: &device::Reading| v.ts >= start;
422
423 if let Some(prev) = &guard.1 {
424 Ok(Box::pin(
425 tokio_stream::once(prev.clone())
426 .chain(strm)
427 .filter(valid),
428 ))
429 } else {
430 Ok(Box::pin(strm.filter(valid)))
431 }
432 }
433 (Some(start_tmp), Some(end_tmp)) => {
434 let start: time::SystemTime =
438 std::cmp::min(start_tmp, end_tmp);
439 let end: time::SystemTime =
440 std::cmp::max(start_tmp, end_tmp);
441
442 let valid = move |v: &device::Reading| v.ts >= start;
445 let not_end = move |v: &device::Reading| v.ts <= end;
446
447 if let Some(prev) = &guard.1 {
448 Ok(Box::pin(
449 tokio_stream::once(prev.clone())
450 .chain(strm)
451 .filter(valid)
452 .take_while(not_end),
453 ))
454 } else {
455 Ok(Box::pin(strm.filter(valid).take_while(not_end)))
456 }
457 }
458 }
459 } else {
460 Err(Error::OperationError)
461 }
462 } else {
463 Err(Error::NotFound)
464 }
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use crate::{mk_report_func, DeviceInfo, SimpleStore};
471 use drmem_api::{device, Store};
472 use std::{collections::HashMap, time};
473 use tokio::sync::{mpsc::error::TryRecvError, oneshot};
474 use tokio::time::interval;
475 use tokio_stream::StreamExt;
476
477 #[test]
478 fn test_timestamp() {
479 assert!(time::UNIX_EPOCH
480 .checked_add(time::Duration::new(i64::MAX as u64, 999_999_999))
481 .is_some())
482 }
483
484 #[tokio::test]
485 async fn test_read_live_stream() {
486 let mut db = SimpleStore(HashMap::new());
487 let name = "test:device".parse::<device::Name>().unwrap();
488
489 if let Ok((f, None)) = db
490 .register_read_only_device("test", &name, &None, &None)
491 .await
492 {
493 {
497 let data = vec![1, 2, 3];
498
499 f(device::Value::Int(data[0]));
500
501 let s = db
502 .monitor_device(name.clone(), None, None)
503 .await
504 .unwrap()
505 .timeout(time::Duration::from_millis(100));
506
507 tokio::pin!(s);
508
509 for ii in &data[1..] {
510 f(device::Value::Int(*ii));
511 }
512
513 assert_eq!(
514 s.try_next().await.unwrap().unwrap().value,
515 device::Value::Int(1)
516 );
517 assert_eq!(
518 s.try_next().await.unwrap().unwrap().value,
519 device::Value::Int(2)
520 );
521 assert_eq!(
522 s.try_next().await.unwrap().unwrap().value,
523 device::Value::Int(3)
524 );
525 assert!(s.try_next().await.is_err());
526 }
527
528 {
532 let data = vec![1, 2, 3, 4];
533
534 f(device::Value::Int(data[0]));
535 f(device::Value::Int(data[1]));
536
537 let s = db
538 .monitor_device(name.clone(), None, None)
539 .await
540 .unwrap()
541 .timeout(time::Duration::from_millis(100));
542
543 tokio::pin!(s);
544
545 for ii in &data[2..] {
546 f(device::Value::Int(*ii));
547 }
548
549 assert_eq!(
550 s.try_next().await.unwrap().unwrap().value,
551 device::Value::Int(2)
552 );
553 assert_eq!(
554 s.try_next().await.unwrap().unwrap().value,
555 device::Value::Int(3)
556 );
557 assert_eq!(
558 s.try_next().await.unwrap().unwrap().value,
559 device::Value::Int(4)
560 );
561 assert!(s.try_next().await.is_err());
562 }
563 } else {
564 panic!("error registering read-only device on empty database")
565 }
566 }
567
568 #[tokio::test]
569 async fn test_read_start_stream() {
570 let mut db = SimpleStore(HashMap::new());
571 let name = "test:device".parse::<device::Name>().unwrap();
572
573 if let Ok((f, None)) = db
574 .register_read_only_device("test", &name, &None, &None)
575 .await
576 {
577 {
581 let data = vec![1, 2, 3];
582
583 let s = db
584 .monitor_device(
585 name.clone(),
586 Some(time::SystemTime::now().into()),
587 None,
588 )
589 .await
590 .unwrap()
591 .timeout(time::Duration::from_millis(100));
592
593 tokio::pin!(s);
594
595 assert!(s.try_next().await.is_err());
596
597 for ii in data {
598 f(device::Value::Int(ii));
599 }
600
601 assert_eq!(
602 s.try_next().await.unwrap().unwrap().value,
603 device::Value::Int(1)
604 );
605 assert_eq!(
606 s.try_next().await.unwrap().unwrap().value,
607 device::Value::Int(2)
608 );
609 assert_eq!(
610 s.try_next().await.unwrap().unwrap().value,
611 device::Value::Int(3)
612 );
613 assert!(s.try_next().await.is_err());
614 }
615
616 {
620 let data = vec![1, 2, 3];
621
622 f(device::Value::Int(data[0]));
623
624 let s = db
625 .monitor_device(
626 name.clone(),
627 Some(time::SystemTime::now().into()),
628 None,
629 )
630 .await
631 .unwrap()
632 .timeout(time::Duration::from_millis(100));
633
634 tokio::pin!(s);
635
636 for ii in &data[1..] {
637 f(device::Value::Int(*ii));
638 }
639
640 assert_eq!(
641 s.try_next().await.unwrap().unwrap().value,
642 device::Value::Int(2)
643 );
644 assert_eq!(
645 s.try_next().await.unwrap().unwrap().value,
646 device::Value::Int(3)
647 );
648 assert!(s.try_next().await.is_err());
649 }
650 } else {
651 panic!("error registering read-only device on empty database")
652 }
653 }
654
655 #[tokio::test]
656 async fn test_read_end_stream() {
657 let mut db = SimpleStore(HashMap::new());
658 let name = "test:device".parse::<device::Name>().unwrap();
659
660 if let Ok((f, None)) = db
661 .register_read_only_device("test", &name, &None, &None)
662 .await
663 {
664 {
668 let data = vec![1, 2, 3];
669
670 f(device::Value::Int(data[0]));
671
672 let s = db
673 .monitor_device(
674 name.clone(),
675 None,
676 Some(time::SystemTime::now().into()),
677 )
678 .await
679 .unwrap()
680 .timeout(time::Duration::from_millis(100));
681
682 tokio::pin!(s);
683
684 for ii in &data[1..] {
685 f(device::Value::Int(*ii));
686 }
687
688 assert_eq!(
689 s.try_next().await.unwrap().unwrap().value,
690 device::Value::Int(1)
691 );
692 assert_eq!(s.try_next().await.unwrap(), None);
693 }
694 } else {
695 panic!("error registering read-only device on empty database")
696 }
697 }
698
699 #[tokio::test]
700 async fn test_read_start_end_stream() {
701 let mut db = SimpleStore(HashMap::new());
702 let name = "test:device".parse::<device::Name>().unwrap();
703
704 if let Ok((f, None)) = db
705 .register_read_only_device("test", &name, &None, &None)
706 .await
707 {
708 {
712 let data = vec![1, 2, 3, 4, 5];
713
714 let mut interval = interval(time::Duration::from_millis(100));
715
716 let now = time::SystemTime::now();
717 let s = db
718 .monitor_device(
719 name.clone(),
720 Some(
721 now.checked_sub(time::Duration::from_millis(500))
722 .unwrap()
723 .into(),
724 ),
725 Some(
726 now.checked_sub(time::Duration::from_millis(250))
727 .unwrap()
728 .into(),
729 ),
730 )
731 .await
732 .unwrap()
733 .timeout(time::Duration::from_millis(100));
734
735 tokio::pin!(s);
736
737 for ii in data {
738 interval.tick().await;
739 f(device::Value::Int(ii));
740 }
741
742 assert_eq!(s.try_next().await.unwrap(), None);
743 }
744
745 {
749 let data = vec![1, 2, 3, 4, 5];
750
751 f(device::Value::Int(data[0]));
752 let mut interval = interval(time::Duration::from_millis(100));
753
754 let now = time::SystemTime::now();
755 let s = db
756 .monitor_device(
757 name.clone(),
758 Some(now.into()),
759 Some(
760 now.checked_add(time::Duration::from_millis(250))
761 .unwrap()
762 .into(),
763 ),
764 )
765 .await
766 .unwrap()
767 .timeout(time::Duration::from_millis(100));
768
769 tokio::pin!(s);
770
771 for ii in &data[1..] {
772 interval.tick().await;
773 f(device::Value::Int(*ii));
774 }
775
776 assert_eq!(
777 s.try_next().await.unwrap().unwrap().value,
778 device::Value::Int(2)
779 );
780 assert_eq!(
781 s.try_next().await.unwrap().unwrap().value,
782 device::Value::Int(3)
783 );
784 assert_eq!(
785 s.try_next().await.unwrap().unwrap().value,
786 device::Value::Int(4)
787 );
788 assert_eq!(s.try_next().await.unwrap(), None);
789 }
790
791 {
795 let data = vec![1, 2, 3, 4, 5];
796
797 let mut interval = interval(time::Duration::from_millis(100));
798
799 let now = time::SystemTime::now();
800 let s = db
801 .monitor_device(
802 name.clone(),
803 Some(now.into()),
804 Some(
805 now.checked_add(time::Duration::from_millis(250))
806 .unwrap()
807 .into(),
808 ),
809 )
810 .await
811 .unwrap()
812 .timeout(time::Duration::from_millis(100));
813
814 tokio::pin!(s);
815
816 for ii in data {
817 interval.tick().await;
818 f(device::Value::Int(ii));
819 }
820
821 assert_eq!(
822 s.try_next().await.unwrap().unwrap().value,
823 device::Value::Int(1)
824 );
825 assert_eq!(
826 s.try_next().await.unwrap().unwrap().value,
827 device::Value::Int(2)
828 );
829 assert_eq!(
830 s.try_next().await.unwrap().unwrap().value,
831 device::Value::Int(3)
832 );
833 assert_eq!(s.try_next().await.unwrap(), None);
834 }
835
836 {
840 let data = vec![1, 2, 3, 4, 5];
841
842 let mut interval = interval(time::Duration::from_millis(100));
843
844 let now = time::SystemTime::now();
845 let s = db
846 .monitor_device(
847 name.clone(),
848 Some(
849 now.checked_add(time::Duration::from_millis(150))
850 .unwrap()
851 .into(),
852 ),
853 Some(
854 now.checked_add(time::Duration::from_millis(350))
855 .unwrap()
856 .into(),
857 ),
858 )
859 .await
860 .unwrap()
861 .timeout(time::Duration::from_millis(100));
862
863 tokio::pin!(s);
864
865 for ii in data {
866 interval.tick().await;
867 f(device::Value::Int(ii));
868 }
869
870 assert_eq!(
871 s.try_next().await.unwrap().unwrap().value,
872 device::Value::Int(3)
873 );
874 assert_eq!(
875 s.try_next().await.unwrap().unwrap().value,
876 device::Value::Int(4)
877 );
878 assert_eq!(s.try_next().await.unwrap(), None);
879 }
880 } else {
881 panic!("error registering read-only device on empty database")
882 }
883 }
884
885 #[tokio::test]
886 async fn test_ro_registration() {
887 let mut db = SimpleStore(HashMap::new());
888 let name = "misc:junk".parse::<device::Name>().unwrap();
889
890 if let Ok((f, None)) = db
894 .register_read_only_device("test", &name, &None, &None)
895 .await
896 {
897 assert!(db.0.get(&name).unwrap().tx_setting.is_none());
901
902 f(device::Value::Int(1)).await;
905
906 let mut rx =
909 db.0.get(&name)
910 .unwrap()
911 .reading
912 .lock()
913 .unwrap()
914 .0
915 .subscribe();
916
917 assert!(db
921 .register_read_only_device("test2", &name, &None, &None)
922 .await
923 .is_err());
924
925 if let Ok((f, Some(device::Value::Int(1)))) = db
929 .register_read_only_device("test", &name, &None, &None)
930 .await
931 {
932 f(device::Value::Int(2)).await;
937 assert_eq!(rx.try_recv().unwrap().value, device::Value::Int(2));
938 } else {
939 panic!("error registering read-only device from same driver")
940 }
941 } else {
942 panic!("error registering read-only device on empty database")
943 }
944 }
945
946 #[tokio::test]
947 async fn test_rw_registration() {
948 let mut db = SimpleStore(HashMap::new());
949 let name = "misc:junk".parse::<device::Name>().unwrap();
950
951 if let Ok((f, mut set_chan, None)) = db
955 .register_read_write_device("test", &name, &None, &None)
956 .await
957 {
958 assert!(db.0.get(&name).unwrap().tx_setting.is_some());
962
963 {
966 let tx_set =
967 db.0.get(&name).unwrap().tx_setting.clone().unwrap();
968
969 assert_eq!(tx_set.is_closed(), false);
970
971 let (tx_os, _rx_os) = oneshot::channel();
972
973 assert!(tx_set
974 .send((device::Value::Int(2), tx_os))
975 .await
976 .is_ok());
977 assert_eq!(
978 set_chan.try_recv().unwrap().0,
979 device::Value::Int(2)
980 );
981 }
982
983 f(device::Value::Int(1)).await;
986
987 let mut rx =
990 db.0.get(&name)
991 .unwrap()
992 .reading
993 .lock()
994 .unwrap()
995 .0
996 .subscribe();
997
998 assert!(db
1003 .register_read_only_device("test2", &name, &None, &None)
1004 .await
1005 .is_err());
1006 assert_eq!(
1007 Err(TryRecvError::Empty),
1008 set_chan.try_recv().map(|_| ())
1009 );
1010
1011 if let Ok((f, _, Some(device::Value::Int(1)))) = db
1015 .register_read_write_device("test", &name, &None, &None)
1016 .await
1017 {
1018 assert_eq!(
1019 Err(TryRecvError::Disconnected),
1020 set_chan.try_recv().map(|_| ())
1021 );
1022
1023 f(device::Value::Int(2)).await;
1028 assert_eq!(rx.try_recv().unwrap().value, device::Value::Int(2));
1029 } else {
1030 panic!("error registering read-only device from same driver")
1031 }
1032 } else {
1033 panic!("error registering read-only device on empty database")
1034 }
1035 }
1036
1037 #[tokio::test]
1038 async fn test_closure() {
1039 let di = DeviceInfo::create(String::from("test"), None, None);
1040 let name = "misc:junk".parse::<device::Name>().unwrap();
1041 let f = mk_report_func(&di, &name);
1042
1043 assert_eq!(di.reading.lock().unwrap().1, None);
1044 f(device::Value::Int(1)).await;
1045 assert_eq!(
1046 di.reading.lock().unwrap().1.as_ref().unwrap().value,
1047 device::Value::Int(1)
1048 );
1049
1050 {
1051 let ts1 = di.reading.lock().unwrap().1.as_ref().unwrap().ts;
1052 let mut rx = di.reading.lock().unwrap().0.subscribe();
1053
1054 f(device::Value::Int(2)).await;
1055 assert_eq!(rx.try_recv().unwrap().value, device::Value::Int(2));
1056 assert_eq!(
1057 di.reading.lock().unwrap().1.as_ref().unwrap().value,
1058 device::Value::Int(2)
1059 );
1060 assert!(ts1 < di.reading.lock().unwrap().1.as_ref().unwrap().ts);
1061 }
1062
1063 f(device::Value::Int(3)).await;
1064 assert_eq!(
1065 di.reading.lock().unwrap().1.as_ref().unwrap().value,
1066 device::Value::Int(3)
1067 );
1068
1069 {
1070 let mut rx1 = di.reading.lock().unwrap().0.subscribe();
1071 let mut rx2 = di.reading.lock().unwrap().0.subscribe();
1072
1073 f(device::Value::Int(4)).await;
1074 assert_eq!(rx1.try_recv().unwrap().value, device::Value::Int(4));
1075 assert_eq!(rx2.try_recv().unwrap().value, device::Value::Int(4));
1076 assert_eq!(
1077 di.reading.lock().unwrap().1.as_ref().unwrap().value,
1078 device::Value::Int(4)
1079 );
1080 }
1081 }
1082}