drmem_api/driver/
rw_device.rs

1use crate::{device, driver::ReportReading, Error, Result};
2use std::{future::Future, pin::Pin};
3use tokio::sync::{mpsc, oneshot};
4use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
5
6/// This type represents the data that is transferred in the
7/// communication channel. It simplifies the next two types.
8pub type SettingRequest =
9    (device::Value, oneshot::Sender<Result<device::Value>>);
10
11/// Used by client APIs to send setting requests to a driver.
12pub type TxDeviceSetting = mpsc::Sender<SettingRequest>;
13
14/// Used by a driver to receive settings from a client.
15pub type RxDeviceSetting = mpsc::Receiver<SettingRequest>;
16
17/// A closure type that defines how a driver replies to a setting
18/// request. It can return `Ok()` to show what value was actually used
19/// or `Err()` to indicate the setting failed.
20pub type SettingReply<T> = Box<dyn FnOnce(Result<T>) + Send>;
21
22pub type SettingTransaction<T> = (T, SettingReply<T>);
23
24// The driver is given a stream that yields setting requests. If the
25// driver uses a type that can be converted to and from a
26// `device::Value`, this stream will automatically reject settings
27// that aren't of the correct type and pass on converted values.
28type SettingStream<T> =
29    Pin<Box<dyn Stream<Item = SettingTransaction<T>> + Send + Sync>>;
30
31// Creates a stream of incoming settings. Since settings are provided
32// as `device::Value` types, we try to map them to the desired
33// type. If the conversion can't be done, an error is automatically
34// sent back to the client and the message isn't forwarded to the
35// driver. Otherwise the converted value is yielded.
36
37fn create_setting_stream<T>(rx: RxDeviceSetting) -> SettingStream<T>
38where
39    T: TryFrom<device::Value> + Into<device::Value> + Clone,
40{
41    Box::pin(ReceiverStream::new(rx).filter_map(
42        |(v, tx_rpy)| match T::try_from(v) {
43            Ok(v) => {
44                let f: SettingReply<T> = Box::new(|v: Result<T>| {
45                    let _ = tx_rpy.send(v.map(T::into));
46                });
47
48                Some((v, f))
49            }
50            Err(_) => {
51                let _ = tx_rpy.send(Err(Error::TypeError));
52
53                None
54            }
55        },
56    ))
57}
58
59pub struct ReadWriteDevice<
60    T: TryFrom<device::Value> + Into<device::Value> + Clone,
61> {
62    report_chan: ReportReading,
63    set_stream: SettingStream<T>,
64    prev_val: Option<T>,
65}
66
67impl<T> ReadWriteDevice<T>
68where
69    T: TryFrom<device::Value> + Into<device::Value> + Clone,
70{
71    pub fn new(
72        report_chan: ReportReading,
73        setting_chan: RxDeviceSetting,
74        prev_val: Option<T>,
75    ) -> Self {
76        ReadWriteDevice {
77            report_chan,
78            set_stream: create_setting_stream(setting_chan),
79            prev_val,
80        }
81    }
82
83    /// Saves a new value, returned by the device, to the backend
84    /// storage.
85    pub fn report_update(
86        &mut self,
87        value: T,
88    ) -> impl Future<Output = ()> + use<'_, T> {
89        self.prev_val = Some(value.clone());
90        (self.report_chan)(value.into())
91    }
92
93    /// Gets the last value of the device. If DrMem is built with
94    /// persistent storage, this value will be initialized with the
95    /// last value saved to storage.
96    pub fn get_last(&self) -> Option<&T> {
97        self.prev_val.as_ref()
98    }
99
100    pub fn next_setting(
101        &mut self,
102    ) -> impl Future<Output = Option<SettingTransaction<T>>> + use<'_, T> {
103        self.set_stream.next()
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110    use tokio::sync::{mpsc, oneshot};
111
112    #[tokio::test]
113    async fn test_setting_stream() {
114        // Build communication channels, including wrapping the
115        // receive handle in a `SettingStream`.
116
117        let (tx, rx) = mpsc::channel(20);
118        let mut s: SettingStream<bool> = create_setting_stream(rx);
119        let (os_tx, os_rx) = oneshot::channel();
120
121        // Assert we can send to an active channel.
122
123        assert_eq!(tx.send((true.into(), os_tx)).await.unwrap(), ());
124
125        // Assert there's an item in the stream and that it's been
126        // converted to a `bool` type.
127
128        let (v, f) = s.next().await.unwrap();
129
130        assert_eq!(v, true);
131
132        // Send back the reply -- changing it to `false`. Verify the
133        // received reply is also `false`.
134
135        f(Ok(false));
136
137        assert_eq!(os_rx.await.unwrap().unwrap(), false.into());
138
139        // Now try to send the wrong type to the channel. The stream
140        // should reject the bad settings and return an error. This
141        // means calling `.next()` will block. To avoid our tests from
142        // blocking forever, we drop the `mpsc::Send` handle so the
143        // stream reports end-of-stream. We can then check to see if
144        // our reply was an error.
145
146        let (os_tx, os_rx) = oneshot::channel();
147
148        assert_eq!(tx.send(((1.0).into(), os_tx)).await.unwrap(), ());
149
150        std::mem::drop(tx);
151
152        assert!(s.next().await.is_none());
153        assert!(os_rx.await.unwrap().is_err());
154    }
155}