drmem_api/driver/
rw_device.rs1use crate::{device, driver::ReportReading, Error, Result};
2use std::{future::Future, pin::Pin};
3use tokio::sync::{mpsc, oneshot};
4use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
5
6pub type SettingRequest =
9 (device::Value, oneshot::Sender<Result<device::Value>>);
10
11pub type TxDeviceSetting = mpsc::Sender<SettingRequest>;
13
14pub type RxDeviceSetting = mpsc::Receiver<SettingRequest>;
16
17pub type SettingReply<T> = Box<dyn FnOnce(Result<T>) + Send>;
21
22pub type SettingTransaction<T> = (T, SettingReply<T>);
23
24type SettingStream<T> =
29 Pin<Box<dyn Stream<Item = SettingTransaction<T>> + Send + Sync>>;
30
31fn create_setting_stream<T>(rx: RxDeviceSetting) -> SettingStream<T>
38where
39 T: TryFrom<device::Value> + Into<device::Value> + Clone,
40{
41 Box::pin(ReceiverStream::new(rx).filter_map(
42 |(v, tx_rpy)| match T::try_from(v) {
43 Ok(v) => {
44 let f: SettingReply<T> = Box::new(|v: Result<T>| {
45 let _ = tx_rpy.send(v.map(T::into));
46 });
47
48 Some((v, f))
49 }
50 Err(_) => {
51 let _ = tx_rpy.send(Err(Error::TypeError));
52
53 None
54 }
55 },
56 ))
57}
58
59pub struct ReadWriteDevice<
60 T: TryFrom<device::Value> + Into<device::Value> + Clone,
61> {
62 report_chan: ReportReading,
63 set_stream: SettingStream<T>,
64 prev_val: Option<T>,
65}
66
67impl<T> ReadWriteDevice<T>
68where
69 T: TryFrom<device::Value> + Into<device::Value> + Clone,
70{
71 pub fn new(
72 report_chan: ReportReading,
73 setting_chan: RxDeviceSetting,
74 prev_val: Option<T>,
75 ) -> Self {
76 ReadWriteDevice {
77 report_chan,
78 set_stream: create_setting_stream(setting_chan),
79 prev_val,
80 }
81 }
82
83 pub fn report_update(
86 &mut self,
87 value: T,
88 ) -> impl Future<Output = ()> + use<'_, T> {
89 self.prev_val = Some(value.clone());
90 (self.report_chan)(value.into())
91 }
92
93 pub fn get_last(&self) -> Option<&T> {
97 self.prev_val.as_ref()
98 }
99
100 pub fn next_setting(
101 &mut self,
102 ) -> impl Future<Output = Option<SettingTransaction<T>>> + use<'_, T> {
103 self.set_stream.next()
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use super::*;
110 use tokio::sync::{mpsc, oneshot};
111
112 #[tokio::test]
113 async fn test_setting_stream() {
114 let (tx, rx) = mpsc::channel(20);
118 let mut s: SettingStream<bool> = create_setting_stream(rx);
119 let (os_tx, os_rx) = oneshot::channel();
120
121 assert_eq!(tx.send((true.into(), os_tx)).await.unwrap(), ());
124
125 let (v, f) = s.next().await.unwrap();
129
130 assert_eq!(v, true);
131
132 f(Ok(false));
136
137 assert_eq!(os_rx.await.unwrap().unwrap(), false.into());
138
139 let (os_tx, os_rx) = oneshot::channel();
147
148 assert_eq!(tx.send(((1.0).into(), os_tx)).await.unwrap(), ());
149
150 std::mem::drop(tx);
151
152 assert!(s.next().await.is_none());
153 assert!(os_rx.await.unwrap().is_err());
154 }
155}