use core::cell::Ref;
use core::fmt::Debug;
use core::marker::PhantomData;
use core::pin::{Pin, pin};
use pin_project::pin_project;
use crate::datastore::Storable;
use crate::datastore::initialized_reader::InitializedReader;
use crate::datastore::slot::{self, Slot};
#[derive(Debug)]
#[pin_project]
pub struct Reader<'a, T>
where
T: Storable + 'static,
{
#[pin]
waiter: slot::Waiter<'a, T>,
marker: PhantomData<fn(T)>,
}
impl<T> Reader<'_, T>
where
T: Storable + 'static,
{
#[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
pub fn read<U>(&self, f: impl FnOnce(Option<&T::DataType>) -> U) -> U {
self.waiter.read(|value| {
let value = value.as_ref();
#[cfg(feature = "veecle-telemetry")]
veecle_telemetry::trace!("Slot read.", type_name = self.waiter.inner_type_name());
f(value)
})
}
pub fn read_cloned(&self) -> Option<T::DataType>
where
T::DataType: Clone,
{
self.read(|t| t.cloned())
}
#[cfg_attr(feature = "veecle-telemetry", veecle_telemetry::instrument)]
pub async fn wait_for_update(&mut self) -> &mut Self {
self.waiter.wait().await;
self.waiter.update_generation();
self
}
}
impl<'a, T> Reader<'a, T>
where
T: Storable + 'static,
{
pub(crate) fn from_slot(slot: Pin<&'a Slot<T>>) -> Self {
Reader {
waiter: slot.waiter(),
marker: PhantomData,
}
}
pub async fn wait_init(self) -> InitializedReader<'a, T> {
if self.read(|t| t.is_none()) {
self.waiter.wait().await;
}
InitializedReader::new(self.waiter)
}
}
impl<T> super::combined_readers::Sealed for Reader<'_, T> where T: Storable {}
impl<T> super::combined_readers::CombinableReader for Reader<'_, T>
where
T: Storable,
{
type ToBeRead = Option<T::DataType>;
fn borrow(&self) -> Ref<'_, Self::ToBeRead> {
self.waiter.borrow()
}
async fn wait_for_update(&mut self) {
self.wait_for_update().await;
}
}
#[cfg(test)]
mod tests {
use core::pin::pin;
use futures::FutureExt;
use crate::datastore::{Reader, Slot, Storable, Writer, generational};
#[test]
fn wait_for_update() {
#[derive(Eq, PartialEq, Debug, Clone, Storable)]
#[storable(crate = crate)]
struct Sensor(u8);
let source = pin!(generational::Source::new());
let slot = pin!(Slot::<Sensor>::new());
let mut reader = Reader::from_slot(slot.as_ref());
let mut writer = Writer::new(source.as_ref().waiter(), slot.as_ref());
assert!(reader.wait_for_update().now_or_never().is_none());
source.as_ref().increment_generation();
writer.write(Sensor(1)).now_or_never().unwrap();
reader
.wait_for_update()
.now_or_never()
.unwrap()
.read(|x| assert_eq!(x, Some(&Sensor(1))));
}
}