use core::cell::Ref;
use core::fmt::Debug;
use core::marker::PhantomData;
use core::pin::Pin;
use crate::datastore::Storable;
use crate::datastore::slot::{self, Slot};
#[derive(Debug)]
pub struct ExclusiveReader<'a, T>
where
T: Storable + 'static,
{
waiter: slot::Waiter<'a, T>,
marker: PhantomData<fn(T)>,
}
impl<T> ExclusiveReader<'_, T>
where
T: Storable + 'static,
{
#[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
pub fn read<U>(&self, f: impl FnOnce(Option<&T::DataType>) -> U) -> U {
self.waiter.read(|value| {
let value = value.as_ref();
#[cfg(feature = "veecle-telemetry")]
veecle_telemetry::trace!("Slot read.", type_name = self.waiter.inner_type_name());
f(value)
})
}
#[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
pub fn take(&mut self) -> Option<T::DataType> {
let value = self.waiter.take();
#[cfg(feature = "veecle-telemetry")]
veecle_telemetry::trace!(
"Slot value taken.",
type_name = self.waiter.inner_type_name()
);
value
}
pub fn read_cloned(&self) -> Option<T::DataType>
where
T::DataType: Clone,
{
self.read(|t| t.cloned())
}
#[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
pub async fn wait_for_update(&mut self) -> &mut Self {
self.waiter.wait().await;
self.waiter.update_generation();
self
}
}
impl<'a, T> ExclusiveReader<'a, T>
where
T: Storable + 'static,
{
pub(crate) fn from_slot(slot: Pin<&'a Slot<T>>) -> Self {
ExclusiveReader {
waiter: slot.waiter(),
marker: PhantomData,
}
}
}
impl<T> super::combined_readers::Sealed for ExclusiveReader<'_, T> where T: Storable {}
impl<T> super::combined_readers::CombinableReader for ExclusiveReader<'_, T>
where
T: Storable,
{
type ToBeRead = Option<T::DataType>;
fn borrow(&self) -> Ref<'_, Self::ToBeRead> {
self.waiter.borrow()
}
async fn wait_for_update(&mut self) {
self.wait_for_update().await;
}
}
#[cfg(test)]
mod tests {
use core::pin::pin;
use futures::FutureExt;
use crate::datastore::{ExclusiveReader, Slot, Storable, Writer, generational};
#[test]
fn read() {
#[derive(Eq, PartialEq, Debug, Clone, Storable)]
#[storable(crate = crate)]
struct Sensor(u8);
let source = pin!(generational::Source::new());
let slot = pin!(Slot::<Sensor>::new());
let reader = ExclusiveReader::from_slot(slot.as_ref());
let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
assert_eq!(reader.read(|x| x.cloned()), None);
assert_eq!(reader.read_cloned(), None);
source.as_ref().increment_generation();
writer.write(Sensor(1)).now_or_never().unwrap();
assert_eq!(
reader.read(|x: Option<&Sensor>| x.cloned()),
Some(Sensor(1))
);
assert_eq!(reader.read_cloned(), Some(Sensor(1)));
}
#[test]
fn take() {
#[derive(Eq, PartialEq, Debug, Clone, Storable)]
#[storable(crate = crate)]
struct Sensor(u8);
let source = pin!(generational::Source::new());
let slot = pin!(Slot::<Sensor>::new());
let mut reader = ExclusiveReader::from_slot(slot.as_ref());
let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
assert_eq!(reader.take(), None);
source.as_ref().increment_generation();
writer.write(Sensor(10)).now_or_never().unwrap();
assert_eq!(reader.take(), Some(Sensor(10)));
assert_eq!(reader.take(), None);
}
#[test]
fn wait_for_update() {
#[derive(Eq, PartialEq, Debug, Clone, Storable)]
#[storable(crate = crate)]
struct Sensor(u8);
let source = pin!(generational::Source::new());
let slot = pin!(Slot::<Sensor>::new());
let mut reader = ExclusiveReader::from_slot(slot.as_ref());
let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
assert!(reader.wait_for_update().now_or_never().is_none());
source.as_ref().increment_generation();
writer.write(Sensor(1)).now_or_never().unwrap();
reader
.wait_for_update()
.now_or_never()
.unwrap()
.read(|x| assert_eq!(x, Some(&Sensor(1))));
}
}