1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
//! This module defines types and interfaces that drivers use to
//! interact with the core of DrMem.

use crate::types::{
    device::{Base, Name, Path, Value},
    Error,
};
use std::future::Future;
use std::{convert::Infallible, pin::Pin, sync::Arc};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use toml::value;

use super::Result;

/// Represents how configuration information is given to a driver.
/// Since each driver can have vastly different requirements, the
/// config structure needs to be as general as possible. A
/// `DriverConfig` type is a map with `String` keys and `toml::Value`
/// values.
pub type DriverConfig = value::Table;

/// This type represents the data that is transferred in the
/// communication channel. It simplifies the next two types.
pub type SettingRequest = (Value, oneshot::Sender<Result<Value>>);

/// Used by client APIs to send setting requests to a driver.
pub type TxDeviceSetting = mpsc::Sender<SettingRequest>;

/// Used by a driver to receive settings from a client.
pub type RxDeviceSetting = mpsc::Receiver<SettingRequest>;

/// A closure type that defines how a driver replies to a setting
/// request. It can return `Ok()` to show what value was actually used
/// or `Err()` to indicate the setting failed.
pub type SettingReply<T> = Box<dyn FnOnce(Result<T>) + Send>;

/// The driver is given a stream that yields setting requests. If the
/// driver uses a type that can be converted to and from a
/// `device::Value`, this stream will automatically reject settings
/// that aren't of the correct type and pass on converted values.
pub type SettingStream<T> =
    Pin<Box<dyn Stream<Item = (T, SettingReply<T>)> + Send + Sync>>;

/// A function that drivers use to report updated values of a device.
pub type ReportReading<T> =
    Box<dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;

/// Defines the requests that can be sent to core. Drivers don't use
/// this type directly. They are indirectly used by `RequestChan`.
pub enum Request {
    /// Registers a read-only device with core.
    ///
    /// The reply is a pair where the first element is a channel to
    /// report updated values of the device. The second element, if
    /// not `None`, is the last saved value of the device.
    AddReadonlyDevice {
        driver_name: String,
        dev_name: Name,
        dev_units: Option<String>,
        max_history: Option<usize>,
        rpy_chan:
            oneshot::Sender<Result<(ReportReading<Value>, Option<Value>)>>,
    },

    /// Registers a writable device with core.
    ///
    /// The reply is a 3-tuple where the first element is a channel to
    /// report updated values of the device. The second element is a
    /// stream that yileds incoming settings to the device. The last
    /// element, if not `None`, is the last saved value of the device.
    AddReadWriteDevice {
        driver_name: String,
        dev_name: Name,
        dev_units: Option<String>,
        max_history: Option<usize>,
        rpy_chan: oneshot::Sender<
            Result<(ReportReading<Value>, RxDeviceSetting, Option<Value>)>,
        >,
    },
}

/// A handle which is used to communicate with the core of DrMem.
/// When a driver is created, it will be given a handle to be used
/// throughout its life.
///
/// This type wraps the `mpsc::Sender<>` and defines a set of helper
/// methods to send requests and receive replies with the core.
#[derive(Clone)]
pub struct RequestChan {
    driver_name: String,
    prefix: Path,
    req_chan: mpsc::Sender<Request>,
}

impl RequestChan {
    pub fn new(
        driver_name: &str,
        prefix: &Path,
        req_chan: &mpsc::Sender<Request>,
    ) -> Self {
        RequestChan {
            driver_name: String::from(driver_name),
            prefix: prefix.clone(),
            req_chan: req_chan.clone(),
        }
    }

    /// Registers a read-only device with the framework. `name` is the
    /// last section of the full device name. Typically a driver will
    /// register several devices, each representing a portion of the
    /// hardware being controlled. All devices for a given driver
    /// instance will have the same prefix; the `name` parameter is
    /// appended to it.
    ///
    /// If it returns `Ok()`, the value is a broadcast channel that
    /// the driver uses to announce new values of the associated
    /// hardware.
    ///
    /// If it returns `Err()`, the underlying value could be `InUse`,
    /// meaning the device name is already registered. If the error is
    /// `InternalError`, then the core has exited and the
    /// `RequestChan` has been closed. Since the driver can't report
    /// any more updates, it may as well shutdown.
    pub async fn add_ro_device<T: Into<Value> + TryFrom<Value>>(
        &self,
        name: Base,
        units: Option<&str>,
        max_history: Option<usize>,
    ) -> super::Result<(ReportReading<T>, Option<T>)> {
        // Create a location for the reply.

        let (tx, rx) = oneshot::channel();

        // Send a request to Core to register the given name.

        let result = self
            .req_chan
            .send(Request::AddReadonlyDevice {
                driver_name: self.driver_name.clone(),
                dev_name: Name::build(self.prefix.clone(), name),
                dev_units: units.map(String::from),
                max_history,
                rpy_chan: tx,
            })
            .await;

        // If the request was sent successfully and we successfully
        // received a reply, process the payload.

        if result.is_ok() {
            if let Ok(v) = rx.await {
                return v.map(|(rr, prev)| {
                    (
                        Box::new(move |a: T| rr(a.into())) as ReportReading<T>,
                        prev.and_then(|v| T::try_from(v).ok()),
                    )
                });
            }
        }

        Err(Error::MissingPeer(String::from(
            "can't communicate with core",
        )))
    }

    // Creates a stream of incoming settings. Since settings are
    // provided as `device::Value` types, we try to map them to the
    // desired type. If the conversion can't be done, an error is
    // automatically sent back to the client and the message isn't
    // forwarded to the driver. Otherwise the converted value is
    // yielded.

    fn create_setting_stream<T: TryFrom<Value> + Into<Value>>(
        rx: RxDeviceSetting,
    ) -> SettingStream<T> {
        Box::pin(ReceiverStream::new(rx).filter_map(|(v, tx_rpy)| {
            match T::try_from(v) {
                Ok(v) => {
                    let f: SettingReply<T> = Box::new(|v: Result<T>| {
                        let _ = tx_rpy.send(v.map(T::into));
                    });

                    Some((v, f))
                }
                Err(_) => {
                    let _ = tx_rpy.send(Err(Error::TypeError));

                    None
                }
            }
        }))
    }

    /// Registers a read-write device with the framework. `name` is the
    /// last section of the full device name. Typically a driver will
    /// register several devices, each representing a portion of the
    /// hardware being controlled. All devices for a given driver
    /// instance will have the same prefix; the `name` parameter is
    /// appended to it.
    ///
    /// If it returns `Ok()`, the value is a pair containing a
    /// broadcast channel that the driver uses to announce new values
    /// of the associated hardware and a receive channel for incoming
    /// settings to be applied to the hardware.
    ///
    /// If it returns `Err()`, the underlying value could be `InUse`,
    /// meaning the device name is already registered. If the error is
    /// `InternalError`, then the core has exited and the
    /// `RequestChan` has been closed. Since the driver can't report
    /// any more updates or accept new settings, it may as well shutdown.
    pub async fn add_rw_device<T: Into<Value> + TryFrom<Value>>(
        &self,
        name: Base,
        units: Option<&str>,
        max_history: Option<usize>,
    ) -> Result<(ReportReading<T>, SettingStream<T>, Option<T>)> {
        let (tx, rx) = oneshot::channel();
        let result = self
            .req_chan
            .send(Request::AddReadWriteDevice {
                driver_name: self.driver_name.clone(),
                dev_name: Name::build(self.prefix.clone(), name),
                dev_units: units.map(String::from),
                max_history,
                rpy_chan: tx,
            })
            .await;

        if result.is_ok() {
            if let Ok(v) = rx.await {
                return v.map(|(rr, rs, prev)| {
                    (
                        Box::new(move |a: T| rr(a.into())) as ReportReading<T>,
                        RequestChan::create_setting_stream(rs),
                        prev.and_then(|v| T::try_from(v).ok()),
                    )
                });
            }
        }

        Err(Error::MissingPeer(String::from(
            "can't communicate with core",
        )))
    }
}

/// Defines a boxed type that supports the `driver::API` trait.

pub type DriverType<T> = Box<dyn API<DeviceSet = <T as API>::DeviceSet>>;

/// All drivers implement the `driver::API` trait.
///
/// The `API` trait defines methods that are expected to be available
/// from a driver instance. By supporting this API, the framework can
/// create driver instances and monitor them as they run.

pub trait API: Send {
    type DeviceSet: Send + Sync;

    fn register_devices(
        drc: RequestChan,
        cfg: &DriverConfig,
        max_history: Option<usize>,
    ) -> Pin<Box<dyn Future<Output = Result<Self::DeviceSet>> + Send>>;

    /// Creates an instance of the driver.
    ///
    /// `cfg` contains the driver parameters, as specified in the
    /// `drmem.toml` configuration file. It is a `toml::Table` type so
    /// the keys for the parameter names are strings and the
    /// associated data are `toml::Value` types. This method should
    /// validate the parameters and convert them into forms useful to
    /// the driver. By convention, if any errors are found in the
    /// configuration, this method should return `Error::BadConfig`.
    ///
    /// `drc` is a communication channel with which the driver makes
    /// requests to the core. Its typical use is to register devices
    /// with the framework, which is usually done in this method. As
    /// other request types are added, they can be used while the
    /// driver is running.
    ///
    /// `max_history` is specified in the configuration file. It is a
    /// hint as to the maximum number of data point to save for each
    /// of the devices created by this driver. A backend can choose to
    /// interpret this in its own way. For instance, the simple
    /// backend can only ever save one data point. Redis will take
    /// this as a hint and will choose the most efficient way to prune
    /// the history. That means, if more than the limit is present,
    /// redis won't prune the history to less than the limit. However
    /// there may be more than the limit -- it just won't grow without
    /// bound.

    fn create_instance(
        cfg: &DriverConfig,
    ) -> Pin<Box<dyn Future<Output = Result<Box<Self>>> + Send>>
    where
        Self: Sized;

    /// Runs the instance of the driver.
    ///
    /// Since drivers provide access to hardware, this method should
    /// never return unless something severe occurs and, in that case,
    /// it should use `panic!()`. All drivers are monitored by a task
    /// and if a driver panics or returns an error from this method,
    /// it gets reported in the log and then, after a short delay, the
    /// driver is restarted.

    fn run<'a>(
        &'a mut self,
        devices: Arc<Mutex<Self::DeviceSet>>,
    ) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>>;
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::sync::{mpsc, oneshot};

    #[tokio::test]
    async fn test_setting_stream() {
        // Build communication channels, including wrapping the
        // receive handle in a `SettingStream`.

        let (tx, rx) = mpsc::channel(20);
        let mut s: SettingStream<bool> = RequestChan::create_setting_stream(rx);
        let (os_tx, os_rx) = oneshot::channel();

        // Assert we can send to an active channel.

        assert_eq!(tx.send((true.into(), os_tx)).await.unwrap(), ());

        // Assert there's an item in the stream and that it's been
        // converted to a `bool` type.

        let (v, f) = s.next().await.unwrap();

        assert_eq!(v, true);

        // Send back the reply -- changing it to `false`. Verify the
        // received reply is also `false`.

        f(Ok(false));

        assert_eq!(os_rx.await.unwrap().unwrap(), false.into());

        // Now try to send the wrong type to the channel. The stream
        // should reject the bad settings and return an error. This
        // means calling `.next()` will block. To avoid our tests from
        // blocking forever, we drop the `mpsc::Send` handle so the
        // stream reports end-of-stream. We can then check to see if
        // our reply was an error.

        let (os_tx, os_rx) = oneshot::channel();

        assert_eq!(tx.send(((1.0).into(), os_tx)).await.unwrap(), ());

        std::mem::drop(tx);

        assert!(s.next().await.is_none());
        assert!(os_rx.await.unwrap().is_err());
    }
}