1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
//! This module defines types and interfaces that driver use to
//! interact with the core of DrMem.
use crate::types::{
device::{Base, Name, Path, Value},
Error,
};
use std::future::Future;
use std::{convert::Infallible, pin::Pin};
use tokio::sync::{mpsc, oneshot};
use toml::value;
use super::Result;
pub type DriverConfig = value::Table;
pub type TxDeviceSetting =
mpsc::Sender<(Value, oneshot::Sender<Result<Value>>)>;
pub type RxDeviceSetting =
mpsc::Receiver<(Value, oneshot::Sender<Result<Value>>)>;
pub type ReportReading = Box<
dyn Fn(Value) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
+ Send
+ Sync
+ 'static,
>;
/// Defines the requests that can be sent to core.
pub enum Request {
/// Registers a read-only device with core.
///
/// The reply will contain a channel to broadcast values read from
/// the hardware.
AddReadonlyDevice {
driver_name: String,
dev_name: Name,
dev_units: Option<String>,
max_history: Option<usize>,
rpy_chan: oneshot::Sender<Result<(ReportReading, Option<Value>)>>,
},
/// Registers a writable device with core.
///
/// The reply is a pair where the first element is a channel to
/// broadcast values read from the hardware. The second element is
/// a read-handle to acccept incoming setting to the device.
AddReadWriteDevice {
driver_name: String,
dev_name: Name,
dev_units: Option<String>,
max_history: Option<usize>,
rpy_chan: oneshot::Sender<
Result<(ReportReading, RxDeviceSetting, Option<Value>)>,
>,
},
}
/// A handle which is used to communicate with the core of DrMem.
/// When a driver is created, it will be given a handle to be used
/// throughout its life.
///
/// This type wraps the `mpsc::Sender<>` and defines a set of helper
/// methods to send requests and receive replies with the core.
#[derive(Clone)]
pub struct RequestChan {
driver_name: String,
prefix: Path,
req_chan: mpsc::Sender<Request>,
}
impl RequestChan {
pub fn new(
driver_name: &str, prefix: &Path, req_chan: &mpsc::Sender<Request>,
) -> Self {
RequestChan {
driver_name: String::from(driver_name),
prefix: prefix.clone(),
req_chan: req_chan.clone(),
}
}
/// Registers a read-only device with the framework. `name` is the
/// last section of the full device name. Typically a driver will
/// register several devices, each representing a portion of the
/// hardware being controlled. All devices for a given driver
/// instance will have the same prefix; the `name` parameter is
/// appended to it.
///
/// If it returns `Ok()`, the value is a broadcast channel that
/// the driver uses to announce new values of the associated
/// hardware.
///
/// If it returns `Err()`, the underlying value could be `InUse`,
/// meaning the device name is already registered. If the error is
/// `InternalError`, then the core has exited and the
/// `RequestChan` has been closed. Since the driver can't report
/// any more updates, it may as well shutdown.
pub async fn add_ro_device(
&self, name: Base, units: Option<&str>, max_history: Option<usize>,
) -> super::Result<(ReportReading, Option<Value>)> {
// Create a location for the reply.
let (tx, rx) = oneshot::channel();
// Send a request to Core to register the given name.
let result = self
.req_chan
.send(Request::AddReadonlyDevice {
driver_name: self.driver_name.clone(),
dev_name: Name::build(self.prefix.clone(), name),
dev_units: units.map(String::from),
max_history,
rpy_chan: tx,
})
.await;
// If the request was sent successfully and we successfully
// received a reply, process the payload.
if result.is_ok() {
if let Ok(v) = rx.await {
return v;
} else {
return Err(Error::MissingPeer(String::from(
"core didn't reply to request",
)));
}
}
// If either communication direction failed, return an error
// indicating we can't talk to core.
Err(Error::MissingPeer(String::from(
"core didn't accept request",
)))
}
/// Registers a read-write device with the framework. `name` is the
/// last section of the full device name. Typically a driver will
/// register several devices, each representing a portion of the
/// hardware being controlled. All devices for a given driver
/// instance will have the same prefix; the `name` parameter is
/// appended to it.
///
/// If it returns `Ok()`, the value is a pair containing a
/// broadcast channel that the driver uses to announce new values
/// of the associated hardware and a receive channel for incoming
/// settings to be applied to the hardware.
///
/// If it returns `Err()`, the underlying value could be `InUse`,
/// meaning the device name is already registered. If the error is
/// `InternalError`, then the core has exited and the
/// `RequestChan` has been closed. Since the driver can't report
/// any more updates or accept new settings, it may as well shutdown.
pub async fn add_rw_device(
&self, name: Base, units: Option<&str>, max_history: Option<usize>,
) -> Result<(ReportReading, RxDeviceSetting, Option<Value>)> {
let (tx, rx) = oneshot::channel();
let result = self
.req_chan
.send(Request::AddReadWriteDevice {
driver_name: self.driver_name.clone(),
dev_name: Name::build(self.prefix.clone(), name),
dev_units: units.map(String::from),
max_history,
rpy_chan: tx,
})
.await;
if result.is_ok() {
if let Ok(v) = rx.await {
return v;
} else {
return Err(Error::MissingPeer(String::from(
"core didn't reply to request",
)));
}
}
Err(Error::MissingPeer(String::from(
"core didn't accept request",
)))
}
}
pub type DriverType = Box<dyn API>;
/// All drivers implement the `driver::API` trait.
///
/// The `API` trait defines methods that are expected to be available
/// from a driver instance. By supporting this API, the framework can
/// create driver instances and monitor them as they run.
pub trait API: Send {
/// Creates an instance of the driver.
///
/// `cfg` contains the driver parameters, as specified in the
/// `drmem.toml` configuration file. It is a `toml::Table` type so
/// the keys for the parameter names are strings and the
/// associated data are `toml::Value` types. This method should
/// validate the parameters and convert them into forms useful to
/// the driver. By convention, if any errors are found in the
/// configuration, this method should return `Error::BadConfig`.
///
/// `drc` is the send handle to a device request channel. The
/// driver should store this handle and use it to communicate with
/// the framework. Its typical use is to register devices with the
/// framework, which is usually done in this method. As other
/// request types are added, they can be used while the driver is
/// running.
///
/// `max_history` is specified in the configuration file. It is a
/// hint as to the maximum number of data point to save for each
/// of the devices created by this driver. A backend can choose to
/// interpret this in its own way. For instance, the simple
/// backend can only ever save one data point. Redis will take
/// this as a hint and will choose the most efficient way to prune
/// the history. That means, if more than the limit is present,
/// redis won't prune the history to less than the limit. However
/// there may be more than the limit -- it just won't grow without
/// bound.
fn create_instance(
cfg: DriverConfig, drc: RequestChan, max_history: Option<usize>,
) -> Pin<Box<dyn Future<Output = Result<DriverType>> + Send + 'static>>
where
Self: Sized;
/// Runs the instance of the driver.
///
/// Since drivers provide access to hardware, this method should
/// never return unless something severe occurs. All drivers are
/// monitored by a task and if a driver panics or returns an error
/// from this method, it gets reported in the log and then, after
/// a short delay, the driver is restarted.
fn run<'a>(
&'a mut self,
) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>>;
}