1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
//! This module defines types and interfaces that drivers use to
//! interact with the core of DrMem.
use crate::types::{
device::{Base, Name, Path, Value},
Error,
};
use std::future::Future;
use std::{convert::Infallible, pin::Pin, sync::Arc};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use toml::value;
use super::Result;
/// Represents how configuration information is given to a driver.
/// Since each driver can have vastly different requirements, the
/// config structure needs to be as general as possible. A
/// `DriverConfig` type is a map with `String` keys and `toml::Value`
/// values.
pub type DriverConfig = value::Table;
/// This type represents the data that is transferred in the
/// communication channel. It simplifies the next two types.
pub type SettingRequest = (Value, oneshot::Sender<Result<Value>>);
/// Used by client APIs to send setting requests to a driver.
pub type TxDeviceSetting = mpsc::Sender<SettingRequest>;
/// Used by a driver to receive settings from a client.
pub type RxDeviceSetting = mpsc::Receiver<SettingRequest>;
/// A closure type that defines how a driver replies to a setting
/// request. It can return `Ok()` to show what value was actually used
/// or `Err()` to indicate the setting failed.
pub type SettingReply<T> = Box<dyn FnOnce(Result<T>) + Send>;
/// The driver is given a stream that yields setting requests. If the
/// driver uses a type that can be converted to and from a
/// `device::Value`, this stream will automatically reject settings
/// that aren't of the correct type and pass on converted values.
pub type SettingStream<T> =
Pin<Box<dyn Stream<Item = (T, SettingReply<T>)> + Send + Sync>>;
/// A function that drivers use to report updated values of a device.
pub type ReportReading<T> =
Box<dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
/// Defines the requests that can be sent to core. Drivers don't use
/// this type directly. They are indirectly used by `RequestChan`.
pub enum Request {
/// Registers a read-only device with core.
///
/// The reply is a pair where the first element is a channel to
/// report updated values of the device. The second element, if
/// not `None`, is the last saved value of the device.
AddReadonlyDevice {
driver_name: String,
dev_name: Name,
dev_units: Option<String>,
max_history: Option<usize>,
rpy_chan:
oneshot::Sender<Result<(ReportReading<Value>, Option<Value>)>>,
},
/// Registers a writable device with core.
///
/// The reply is a 3-tuple where the first element is a channel to
/// report updated values of the device. The second element is a
/// stream that yileds incoming settings to the device. The last
/// element, if not `None`, is the last saved value of the device.
AddReadWriteDevice {
driver_name: String,
dev_name: Name,
dev_units: Option<String>,
max_history: Option<usize>,
rpy_chan: oneshot::Sender<
Result<(ReportReading<Value>, RxDeviceSetting, Option<Value>)>,
>,
},
}
/// A handle which is used to communicate with the core of DrMem.
/// When a driver is created, it will be given a handle to be used
/// throughout its life.
///
/// This type wraps the `mpsc::Sender<>` and defines a set of helper
/// methods to send requests and receive replies with the core.
#[derive(Clone)]
pub struct RequestChan {
driver_name: String,
prefix: Path,
req_chan: mpsc::Sender<Request>,
}
impl RequestChan {
pub fn new(
driver_name: &str,
prefix: &Path,
req_chan: &mpsc::Sender<Request>,
) -> Self {
RequestChan {
driver_name: String::from(driver_name),
prefix: prefix.clone(),
req_chan: req_chan.clone(),
}
}
/// Registers a read-only device with the framework. `name` is the
/// last section of the full device name. Typically a driver will
/// register several devices, each representing a portion of the
/// hardware being controlled. All devices for a given driver
/// instance will have the same prefix; the `name` parameter is
/// appended to it.
///
/// If it returns `Ok()`, the value is a broadcast channel that
/// the driver uses to announce new values of the associated
/// hardware.
///
/// If it returns `Err()`, the underlying value could be `InUse`,
/// meaning the device name is already registered. If the error is
/// `InternalError`, then the core has exited and the
/// `RequestChan` has been closed. Since the driver can't report
/// any more updates, it may as well shutdown.
pub async fn add_ro_device<T: Into<Value> + TryFrom<Value>>(
&self,
name: Base,
units: Option<&str>,
max_history: Option<usize>,
) -> super::Result<(ReportReading<T>, Option<T>)> {
// Create a location for the reply.
let (tx, rx) = oneshot::channel();
// Send a request to Core to register the given name.
let result = self
.req_chan
.send(Request::AddReadonlyDevice {
driver_name: self.driver_name.clone(),
dev_name: Name::build(self.prefix.clone(), name),
dev_units: units.map(String::from),
max_history,
rpy_chan: tx,
})
.await;
// If the request was sent successfully and we successfully
// received a reply, process the payload.
if result.is_ok() {
if let Ok(v) = rx.await {
return v.map(|(rr, prev)| {
(
Box::new(move |a: T| rr(a.into())) as ReportReading<T>,
prev.and_then(|v| T::try_from(v).ok()),
)
});
}
}
Err(Error::MissingPeer(String::from(
"can't communicate with core",
)))
}
// Creates a stream of incoming settings. Since settings are
// provided as `device::Value` types, we try to map them to the
// desired type. If the conversion can't be done, an error is
// automatically sent back to the client and the message isn't
// forwarded to the driver. Otherwise the converted value is
// yielded.
fn create_setting_stream<T: TryFrom<Value> + Into<Value>>(
rx: RxDeviceSetting,
) -> SettingStream<T> {
Box::pin(ReceiverStream::new(rx).filter_map(|(v, tx_rpy)| {
match T::try_from(v) {
Ok(v) => {
let f: SettingReply<T> = Box::new(|v: Result<T>| {
let _ = tx_rpy.send(v.map(T::into));
});
Some((v, f))
}
Err(_) => {
let _ = tx_rpy.send(Err(Error::TypeError));
None
}
}
}))
}
/// Registers a read-write device with the framework. `name` is the
/// last section of the full device name. Typically a driver will
/// register several devices, each representing a portion of the
/// hardware being controlled. All devices for a given driver
/// instance will have the same prefix; the `name` parameter is
/// appended to it.
///
/// If it returns `Ok()`, the value is a pair containing a
/// broadcast channel that the driver uses to announce new values
/// of the associated hardware and a receive channel for incoming
/// settings to be applied to the hardware.
///
/// If it returns `Err()`, the underlying value could be `InUse`,
/// meaning the device name is already registered. If the error is
/// `InternalError`, then the core has exited and the
/// `RequestChan` has been closed. Since the driver can't report
/// any more updates or accept new settings, it may as well shutdown.
pub async fn add_rw_device<T: Into<Value> + TryFrom<Value>>(
&self,
name: Base,
units: Option<&str>,
max_history: Option<usize>,
) -> Result<(ReportReading<T>, SettingStream<T>, Option<T>)> {
let (tx, rx) = oneshot::channel();
let result = self
.req_chan
.send(Request::AddReadWriteDevice {
driver_name: self.driver_name.clone(),
dev_name: Name::build(self.prefix.clone(), name),
dev_units: units.map(String::from),
max_history,
rpy_chan: tx,
})
.await;
if result.is_ok() {
if let Ok(v) = rx.await {
return v.map(|(rr, rs, prev)| {
(
Box::new(move |a: T| rr(a.into())) as ReportReading<T>,
RequestChan::create_setting_stream(rs),
prev.and_then(|v| T::try_from(v).ok()),
)
});
}
}
Err(Error::MissingPeer(String::from(
"can't communicate with core",
)))
}
}
/// Defines a boxed type that supports the `driver::API` trait.
pub type DriverType<T> = Box<dyn API<DeviceSet = <T as API>::DeviceSet>>;
/// All drivers implement the `driver::API` trait.
///
/// The `API` trait defines methods that are expected to be available
/// from a driver instance. By supporting this API, the framework can
/// create driver instances and monitor them as they run.
pub trait API: Send {
type DeviceSet: Send + Sync;
fn register_devices(
drc: RequestChan,
cfg: &DriverConfig,
max_history: Option<usize>,
) -> Pin<Box<dyn Future<Output = Result<Self::DeviceSet>> + Send>>;
/// Creates an instance of the driver.
///
/// `cfg` contains the driver parameters, as specified in the
/// `drmem.toml` configuration file. It is a `toml::Table` type so
/// the keys for the parameter names are strings and the
/// associated data are `toml::Value` types. This method should
/// validate the parameters and convert them into forms useful to
/// the driver. By convention, if any errors are found in the
/// configuration, this method should return `Error::BadConfig`.
///
/// `drc` is a communication channel with which the driver makes
/// requests to the core. Its typical use is to register devices
/// with the framework, which is usually done in this method. As
/// other request types are added, they can be used while the
/// driver is running.
///
/// `max_history` is specified in the configuration file. It is a
/// hint as to the maximum number of data point to save for each
/// of the devices created by this driver. A backend can choose to
/// interpret this in its own way. For instance, the simple
/// backend can only ever save one data point. Redis will take
/// this as a hint and will choose the most efficient way to prune
/// the history. That means, if more than the limit is present,
/// redis won't prune the history to less than the limit. However
/// there may be more than the limit -- it just won't grow without
/// bound.
fn create_instance(
cfg: &DriverConfig,
) -> Pin<Box<dyn Future<Output = Result<Box<Self>>> + Send>>
where
Self: Sized;
/// Runs the instance of the driver.
///
/// Since drivers provide access to hardware, this method should
/// never return unless something severe occurs and, in that case,
/// it should use `panic!()`. All drivers are monitored by a task
/// and if a driver panics or returns an error from this method,
/// it gets reported in the log and then, after a short delay, the
/// driver is restarted.
fn run<'a>(
&'a mut self,
devices: Arc<Mutex<Self::DeviceSet>>,
) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>>;
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::{mpsc, oneshot};
#[tokio::test]
async fn test_setting_stream() {
// Build communication channels, including wrapping the
// receive handle in a `SettingStream`.
let (tx, rx) = mpsc::channel(20);
let mut s: SettingStream<bool> = RequestChan::create_setting_stream(rx);
let (os_tx, os_rx) = oneshot::channel();
// Assert we can send to an active channel.
assert_eq!(tx.send((true.into(), os_tx)).await.unwrap(), ());
// Assert there's an item in the stream and that it's been
// converted to a `bool` type.
let (v, f) = s.next().await.unwrap();
assert_eq!(v, true);
// Send back the reply -- changing it to `false`. Verify the
// received reply is also `false`.
f(Ok(false));
assert_eq!(os_rx.await.unwrap().unwrap(), false.into());
// Now try to send the wrong type to the channel. The stream
// should reject the bad settings and return an error. This
// means calling `.next()` will block. To avoid our tests from
// blocking forever, we drop the `mpsc::Send` handle so the
// stream reports end-of-stream. We can then check to see if
// our reply was an error.
let (os_tx, os_rx) = oneshot::channel();
assert_eq!(tx.send(((1.0).into(), os_tx)).await.unwrap(), ());
std::mem::drop(tx);
assert!(s.next().await.is_none());
assert!(os_rx.await.unwrap().is_err());
}
}