drmem_api/
driver.rs

1//! Defines types and interfaces that drivers use to interact with the
2//! core of DrMem.
3
4use crate::types::{device, Error};
5use std::future::Future;
6use std::{convert::Infallible, pin::Pin, sync::Arc};
7use tokio::sync::{mpsc, oneshot, Mutex};
8use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
9use toml::value;
10
11use super::Result;
12
13/// Represents the type used to specify the name of a driver.
14
15pub type Name = Arc<str>;
16
17/// Represents how configuration information is given to a driver.
18/// Since each driver can have vastly different requirements, the
19/// config structure needs to be as general as possible. A
20/// `DriverConfig` type is a map with `String` keys and `toml::Value`
21/// values.
22pub type DriverConfig = value::Table;
23
24/// This type represents the data that is transferred in the
25/// communication channel. It simplifies the next two types.
26pub type SettingRequest =
27    (device::Value, oneshot::Sender<Result<device::Value>>);
28
29/// Used by client APIs to send setting requests to a driver.
30pub type TxDeviceSetting = mpsc::Sender<SettingRequest>;
31
32/// Used by a driver to receive settings from a client.
33pub type RxDeviceSetting = mpsc::Receiver<SettingRequest>;
34
35/// A closure type that defines how a driver replies to a setting
36/// request. It can return `Ok()` to show what value was actually used
37/// or `Err()` to indicate the setting failed.
38pub type SettingReply<T> = Box<dyn FnOnce(Result<T>) + Send>;
39
40/// The driver is given a stream that yields setting requests. If the
41/// driver uses a type that can be converted to and from a
42/// `device::Value`, this stream will automatically reject settings
43/// that aren't of the correct type and pass on converted values.
44pub type SettingStream<T> =
45    Pin<Box<dyn Stream<Item = (T, SettingReply<T>)> + Send + Sync>>;
46
47/// A function that drivers use to report updated values of a device.
48pub type ReportReading<T> =
49    Box<dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
50
51/// Defines the requests that can be sent to core. Drivers don't use
52/// this type directly. They are indirectly used by `RequestChan`.
53pub enum Request {
54    /// Registers a read-only device with core.
55    ///
56    /// The reply is a pair where the first element is a channel to
57    /// report updated values of the device. The second element, if
58    /// not `None`, is the last saved value of the device.
59    AddReadonlyDevice {
60        driver_name: Name,
61        dev_name: device::Name,
62        dev_units: Option<String>,
63        max_history: Option<usize>,
64        rpy_chan: oneshot::Sender<
65            Result<(ReportReading<device::Value>, Option<device::Value>)>,
66        >,
67    },
68
69    /// Registers a writable device with core.
70    ///
71    /// The reply is a 3-tuple where the first element is a channel to
72    /// report updated values of the device. The second element is a
73    /// stream that yileds incoming settings to the device. The last
74    /// element, if not `None`, is the last saved value of the device.
75    AddReadWriteDevice {
76        driver_name: Name,
77        dev_name: device::Name,
78        dev_units: Option<String>,
79        max_history: Option<usize>,
80        rpy_chan: oneshot::Sender<
81            Result<(
82                ReportReading<device::Value>,
83                RxDeviceSetting,
84                Option<device::Value>,
85            )>,
86        >,
87    },
88}
89
90/// A handle which is used to communicate with the core of DrMem.
91/// When a driver is created, it will be given a handle to be used
92/// throughout its life.
93///
94/// This type wraps the `mpsc::Sender<>` and defines a set of helper
95/// methods to send requests and receive replies with the core.
96#[derive(Clone)]
97pub struct RequestChan {
98    driver_name: Name,
99    prefix: device::Path,
100    req_chan: mpsc::Sender<Request>,
101}
102
103impl RequestChan {
104    pub fn new(
105        driver_name: Name,
106        prefix: &device::Path,
107        req_chan: &mpsc::Sender<Request>,
108    ) -> Self {
109        RequestChan {
110            driver_name,
111            prefix: prefix.clone(),
112            req_chan: req_chan.clone(),
113        }
114    }
115
116    /// Registers a read-only device with the framework. `name` is the
117    /// last section of the full device name. Typically a driver will
118    /// register several devices, each representing a portion of the
119    /// hardware being controlled. All devices for a given driver
120    /// instance will have the same prefix; the `name` parameter is
121    /// appended to it.
122    ///
123    /// If it returns `Ok()`, the value is a broadcast channel that
124    /// the driver uses to announce new values of the associated
125    /// hardware.
126    ///
127    /// If it returns `Err()`, the underlying value could be `InUse`,
128    /// meaning the device name is already registered. If the error is
129    /// `InternalError`, then the core has exited and the
130    /// `RequestChan` has been closed. Since the driver can't report
131    /// any more updates, it may as well shutdown.
132    pub async fn add_ro_device<
133        T: Into<device::Value> + TryFrom<device::Value>,
134    >(
135        &self,
136        name: device::Base,
137        units: Option<&str>,
138        max_history: Option<usize>,
139    ) -> super::Result<(ReportReading<T>, Option<T>)> {
140        // Create a location for the reply.
141
142        let (tx, rx) = oneshot::channel();
143
144        // Send a request to Core to register the given name.
145
146        let result = self
147            .req_chan
148            .send(Request::AddReadonlyDevice {
149                driver_name: self.driver_name.clone(),
150                dev_name: device::Name::build(self.prefix.clone(), name),
151                dev_units: units.map(String::from),
152                max_history,
153                rpy_chan: tx,
154            })
155            .await;
156
157        // If the request was sent successfully and we successfully
158        // received a reply, process the payload.
159
160        if result.is_ok() {
161            if let Ok(v) = rx.await {
162                return v.map(|(rr, prev)| {
163                    (
164                        Box::new(move |a: T| rr(a.into())) as ReportReading<T>,
165                        prev.and_then(|v| T::try_from(v).ok()),
166                    )
167                });
168            }
169        }
170
171        Err(Error::MissingPeer(String::from(
172            "can't communicate with core",
173        )))
174    }
175
176    // Creates a stream of incoming settings. Since settings are
177    // provided as `device::Value` types, we try to map them to the
178    // desired type. If the conversion can't be done, an error is
179    // automatically sent back to the client and the message isn't
180    // forwarded to the driver. Otherwise the converted value is
181    // yielded.
182
183    fn create_setting_stream<T>(rx: RxDeviceSetting) -> SettingStream<T>
184    where
185        T: TryFrom<device::Value> + Into<device::Value>,
186    {
187        Box::pin(ReceiverStream::new(rx).filter_map(|(v, tx_rpy)| {
188            match T::try_from(v) {
189                Ok(v) => {
190                    let f: SettingReply<T> = Box::new(|v: Result<T>| {
191                        let _ = tx_rpy.send(v.map(T::into));
192                    });
193
194                    Some((v, f))
195                }
196                Err(_) => {
197                    let _ = tx_rpy.send(Err(Error::TypeError));
198
199                    None
200                }
201            }
202        }))
203    }
204
205    /// Registers a read-write device with the framework. `name` is the
206    /// last section of the full device name. Typically a driver will
207    /// register several devices, each representing a portion of the
208    /// hardware being controlled. All devices for a given driver
209    /// instance will have the same prefix; the `name` parameter is
210    /// appended to it.
211    ///
212    /// If it returns `Ok()`, the value is a pair containing a
213    /// broadcast channel that the driver uses to announce new values
214    /// of the associated hardware and a receive channel for incoming
215    /// settings to be applied to the hardware.
216    ///
217    /// If it returns `Err()`, the underlying value could be `InUse`,
218    /// meaning the device name is already registered. If the error is
219    /// `InternalError`, then the core has exited and the
220    /// `RequestChan` has been closed. Since the driver can't report
221    /// any more updates or accept new settings, it may as well shutdown.
222    pub async fn add_rw_device<T>(
223        &self,
224        name: device::Base,
225        units: Option<&str>,
226        max_history: Option<usize>,
227    ) -> Result<(ReportReading<T>, SettingStream<T>, Option<T>)>
228    where
229        T: Into<device::Value> + TryFrom<device::Value>,
230    {
231        let (tx, rx) = oneshot::channel();
232        let result = self
233            .req_chan
234            .send(Request::AddReadWriteDevice {
235                driver_name: self.driver_name.clone(),
236                dev_name: device::Name::build(self.prefix.clone(), name),
237                dev_units: units.map(String::from),
238                max_history,
239                rpy_chan: tx,
240            })
241            .await;
242
243        if result.is_ok() {
244            if let Ok(v) = rx.await {
245                return v.map(|(rr, rs, prev)| {
246                    (
247                        Box::new(move |a: T| rr(a.into())) as ReportReading<T>,
248                        RequestChan::create_setting_stream(rs),
249                        prev.and_then(|v| T::try_from(v).ok()),
250                    )
251                });
252            }
253        }
254
255        Err(Error::MissingPeer(String::from(
256            "can't communicate with core",
257        )))
258    }
259}
260
261/// Defines a boxed type that supports the `driver::API` trait.
262
263pub type DriverType<T> = Box<dyn API<DeviceSet = <T as API>::DeviceSet>>;
264
265/// All drivers implement the `driver::API` trait.
266///
267/// The `API` trait defines methods that are expected to be available
268/// from a driver instance. By supporting this API, the framework can
269/// create driver instances and monitor them as they run.
270
271pub trait API: Send {
272    type DeviceSet: Send + Sync;
273
274    fn register_devices(
275        drc: RequestChan,
276        cfg: &DriverConfig,
277        max_history: Option<usize>,
278    ) -> Pin<Box<dyn Future<Output = Result<Self::DeviceSet>> + Send>>;
279
280    /// Creates an instance of the driver.
281    ///
282    /// `cfg` contains the driver parameters, as specified in the
283    /// `drmem.toml` configuration file. It is a `toml::Table` type so
284    /// the keys for the parameter names are strings and the
285    /// associated data are `toml::Value` types. This method should
286    /// validate the parameters and convert them into forms useful to
287    /// the driver. By convention, if any errors are found in the
288    /// configuration, this method should return `Error::BadConfig`.
289    ///
290    /// `drc` is a communication channel with which the driver makes
291    /// requests to the core. Its typical use is to register devices
292    /// with the framework, which is usually done in this method. As
293    /// other request types are added, they can be used while the
294    /// driver is running.
295    ///
296    /// `max_history` is specified in the configuration file. It is a
297    /// hint as to the maximum number of data point to save for each
298    /// of the devices created by this driver. A backend can choose to
299    /// interpret this in its own way. For instance, the simple
300    /// backend can only ever save one data point. Redis will take
301    /// this as a hint and will choose the most efficient way to prune
302    /// the history. That means, if more than the limit is present,
303    /// redis won't prune the history to less than the limit. However
304    /// there may be more than the limit -- it just won't grow without
305    /// bound.
306
307    fn create_instance(
308        cfg: &DriverConfig,
309    ) -> Pin<Box<dyn Future<Output = Result<Box<Self>>> + Send>>
310    where
311        Self: Sized;
312
313    /// Runs the instance of the driver.
314    ///
315    /// Since drivers provide access to hardware, this method should
316    /// never return unless something severe occurs and, in that case,
317    /// it should use `panic!()`. All drivers are monitored by a task
318    /// and if a driver panics or returns an error from this method,
319    /// it gets reported in the log and then, after a short delay, the
320    /// driver is restarted.
321
322    fn run<'a>(
323        &'a mut self,
324        devices: Arc<Mutex<Self::DeviceSet>>,
325    ) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>>;
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use tokio::sync::{mpsc, oneshot};
332
333    #[tokio::test]
334    async fn test_setting_stream() {
335        // Build communication channels, including wrapping the
336        // receive handle in a `SettingStream`.
337
338        let (tx, rx) = mpsc::channel(20);
339        let mut s: SettingStream<bool> = RequestChan::create_setting_stream(rx);
340        let (os_tx, os_rx) = oneshot::channel();
341
342        // Assert we can send to an active channel.
343
344        assert_eq!(tx.send((true.into(), os_tx)).await.unwrap(), ());
345
346        // Assert there's an item in the stream and that it's been
347        // converted to a `bool` type.
348
349        let (v, f) = s.next().await.unwrap();
350
351        assert_eq!(v, true);
352
353        // Send back the reply -- changing it to `false`. Verify the
354        // received reply is also `false`.
355
356        f(Ok(false));
357
358        assert_eq!(os_rx.await.unwrap().unwrap(), false.into());
359
360        // Now try to send the wrong type to the channel. The stream
361        // should reject the bad settings and return an error. This
362        // means calling `.next()` will block. To avoid our tests from
363        // blocking forever, we drop the `mpsc::Send` handle so the
364        // stream reports end-of-stream. We can then check to see if
365        // our reply was an error.
366
367        let (os_tx, os_rx) = oneshot::channel();
368
369        assert_eq!(tx.send(((1.0).into(), os_tx)).await.unwrap(), ());
370
371        std::mem::drop(tx);
372
373        assert!(s.next().await.is_none());
374        assert!(os_rx.await.unwrap().is_err());
375    }
376}