use crate::{device, driver::ReportReading, Error, Result};
use std::{future::Future, pin::Pin};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
pub type SettingRequest =
(device::Value, oneshot::Sender<Result<device::Value>>);
pub type TxDeviceSetting = mpsc::Sender<SettingRequest>;
pub type RxDeviceSetting = mpsc::Receiver<SettingRequest>;
pub type SettingReply<T> = Box<dyn FnOnce(Result<T>) + Send>;
pub type SettingTransaction<T> = (T, SettingReply<T>);
type SettingStream<T> =
Pin<Box<dyn Stream<Item = SettingTransaction<T>> + Send + Sync>>;
fn create_setting_stream<T>(rx: RxDeviceSetting) -> SettingStream<T>
where
T: TryFrom<device::Value> + Into<device::Value> + Clone,
{
Box::pin(ReceiverStream::new(rx).filter_map(
|(v, tx_rpy)| match T::try_from(v) {
Ok(v) => {
let f: SettingReply<T> = Box::new(|v: Result<T>| {
let _ = tx_rpy.send(v.map(T::into));
});
Some((v, f))
}
Err(_) => {
let _ = tx_rpy.send(Err(Error::TypeError));
None
}
},
))
}
pub struct ReadWriteDevice<
T: TryFrom<device::Value> + Into<device::Value> + Clone,
> {
report_chan: ReportReading,
set_stream: SettingStream<T>,
prev_val: Option<T>,
}
impl<T> ReadWriteDevice<T>
where
T: TryFrom<device::Value> + Into<device::Value> + Clone,
{
pub fn new(
report_chan: ReportReading,
setting_chan: RxDeviceSetting,
prev_val: Option<T>,
) -> Self {
ReadWriteDevice {
report_chan,
set_stream: create_setting_stream(setting_chan),
prev_val,
}
}
pub fn report_update(
&mut self,
value: T,
) -> impl Future<Output = ()> + use<'_, T> {
self.prev_val = Some(value.clone());
(self.report_chan)(value.into())
}
pub fn get_last(&self) -> Option<&T> {
self.prev_val.as_ref()
}
pub fn next_setting(
&mut self,
) -> impl Future<Output = Option<SettingTransaction<T>>> + use<'_, T> {
self.set_stream.next()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::{mpsc, oneshot};
#[tokio::test]
async fn test_setting_stream() {
let (tx, rx) = mpsc::channel(20);
let mut s: SettingStream<bool> = create_setting_stream(rx);
let (os_tx, os_rx) = oneshot::channel();
assert_eq!(tx.send((true.into(), os_tx)).await.unwrap(), ());
let (v, f) = s.next().await.unwrap();
assert_eq!(v, true);
f(Ok(false));
assert_eq!(os_rx.await.unwrap().unwrap(), false.into());
let (os_tx, os_rx) = oneshot::channel();
assert_eq!(tx.send(((1.0).into(), os_tx)).await.unwrap(), ());
std::mem::drop(tx);
assert!(s.next().await.is_none());
assert!(os_rx.await.unwrap().is_err());
}
}