use crate::types::{
device::{Base, Name, Path, Value},
Error,
};
use std::future::Future;
use std::{convert::Infallible, pin::Pin, sync::Arc};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use toml::value;
use super::Result;
pub type DriverConfig = value::Table;
pub type SettingRequest = (Value, oneshot::Sender<Result<Value>>);
pub type TxDeviceSetting = mpsc::Sender<SettingRequest>;
pub type RxDeviceSetting = mpsc::Receiver<SettingRequest>;
pub type SettingReply<T> = Box<dyn FnOnce(Result<T>) + Send>;
pub type SettingStream<T> =
Pin<Box<dyn Stream<Item = (T, SettingReply<T>)> + Send + Sync>>;
pub type ReportReading<T> =
Box<dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub enum Request {
AddReadonlyDevice {
driver_name: String,
dev_name: Name,
dev_units: Option<String>,
max_history: Option<usize>,
rpy_chan:
oneshot::Sender<Result<(ReportReading<Value>, Option<Value>)>>,
},
AddReadWriteDevice {
driver_name: String,
dev_name: Name,
dev_units: Option<String>,
max_history: Option<usize>,
rpy_chan: oneshot::Sender<
Result<(ReportReading<Value>, RxDeviceSetting, Option<Value>)>,
>,
},
}
#[derive(Clone)]
pub struct RequestChan {
driver_name: String,
prefix: Path,
req_chan: mpsc::Sender<Request>,
}
impl RequestChan {
pub fn new(
driver_name: &str,
prefix: &Path,
req_chan: &mpsc::Sender<Request>,
) -> Self {
RequestChan {
driver_name: String::from(driver_name),
prefix: prefix.clone(),
req_chan: req_chan.clone(),
}
}
pub async fn add_ro_device<T: Into<Value> + TryFrom<Value>>(
&self,
name: Base,
units: Option<&str>,
max_history: Option<usize>,
) -> super::Result<(ReportReading<T>, Option<T>)> {
let (tx, rx) = oneshot::channel();
let result = self
.req_chan
.send(Request::AddReadonlyDevice {
driver_name: self.driver_name.clone(),
dev_name: Name::build(self.prefix.clone(), name),
dev_units: units.map(String::from),
max_history,
rpy_chan: tx,
})
.await;
if result.is_ok() {
if let Ok(v) = rx.await {
return v.map(|(rr, prev)| {
(
Box::new(move |a: T| rr(a.into())) as ReportReading<T>,
prev.and_then(|v| T::try_from(v).ok()),
)
});
}
}
Err(Error::MissingPeer(String::from(
"can't communicate with core",
)))
}
fn create_setting_stream<T: TryFrom<Value> + Into<Value>>(
rx: RxDeviceSetting,
) -> SettingStream<T> {
Box::pin(ReceiverStream::new(rx).filter_map(|(v, tx_rpy)| {
match T::try_from(v) {
Ok(v) => {
let f: SettingReply<T> = Box::new(|v: Result<T>| {
let _ = tx_rpy.send(v.map(T::into));
});
Some((v, f))
}
Err(_) => {
let _ = tx_rpy.send(Err(Error::TypeError));
None
}
}
}))
}
pub async fn add_rw_device<T: Into<Value> + TryFrom<Value>>(
&self,
name: Base,
units: Option<&str>,
max_history: Option<usize>,
) -> Result<(ReportReading<T>, SettingStream<T>, Option<T>)> {
let (tx, rx) = oneshot::channel();
let result = self
.req_chan
.send(Request::AddReadWriteDevice {
driver_name: self.driver_name.clone(),
dev_name: Name::build(self.prefix.clone(), name),
dev_units: units.map(String::from),
max_history,
rpy_chan: tx,
})
.await;
if result.is_ok() {
if let Ok(v) = rx.await {
return v.map(|(rr, rs, prev)| {
(
Box::new(move |a: T| rr(a.into())) as ReportReading<T>,
RequestChan::create_setting_stream(rs),
prev.and_then(|v| T::try_from(v).ok()),
)
});
}
}
Err(Error::MissingPeer(String::from(
"can't communicate with core",
)))
}
}
pub type DriverType<T> = Box<dyn API<DeviceSet = <T as API>::DeviceSet>>;
pub trait API: Send {
type DeviceSet: Send + Sync;
fn register_devices(
drc: RequestChan,
cfg: &DriverConfig,
max_history: Option<usize>,
) -> Pin<Box<dyn Future<Output = Result<Self::DeviceSet>> + Send>>;
fn create_instance(
cfg: &DriverConfig,
) -> Pin<Box<dyn Future<Output = Result<Box<Self>>> + Send>>
where
Self: Sized;
fn run<'a>(
&'a mut self,
devices: Arc<Mutex<Self::DeviceSet>>,
) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>>;
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::{mpsc, oneshot};
#[tokio::test]
async fn test_setting_stream() {
let (tx, rx) = mpsc::channel(20);
let mut s: SettingStream<bool> = RequestChan::create_setting_stream(rx);
let (os_tx, os_rx) = oneshot::channel();
assert_eq!(tx.send((true.into(), os_tx)).await.unwrap(), ());
let (v, f) = s.next().await.unwrap();
assert_eq!(v, true);
f(Ok(false));
assert_eq!(os_rx.await.unwrap().unwrap(), false.into());
let (os_tx, os_rx) = oneshot::channel();
assert_eq!(tx.send(((1.0).into(), os_tx)).await.unwrap(), ());
std::mem::drop(tx);
assert!(s.next().await.is_none());
assert!(os_rx.await.unwrap().is_err());
}
}