use std::{
future::{poll_fn, Future},
pin::Pin,
task::{Context, Poll},
};
use futures_core::Stream;
use readlock::SharedReadLock;
use crate::{state::ObservableState, ObservableReadGuard};
#[derive(Debug)]
pub struct Subscriber<T> {
state: SharedReadLock<ObservableState<T>>,
observed_version: u64,
}
impl<T> Subscriber<T> {
pub(crate) fn new(read_lock: SharedReadLock<ObservableState<T>>, version: u64) -> Self {
Self { state: read_lock, observed_version: version }
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Next<'_, T>
where
T: Clone,
{
Next::new(self)
}
pub fn next_now(&mut self) -> T
where
T: Clone,
{
let lock = self.state.lock();
self.observed_version = lock.version();
lock.get().clone()
}
pub fn get(&self) -> T
where
T: Clone,
{
self.read().clone()
}
pub async fn next_ref(&mut self) -> Option<ObservableReadGuard<'_, T>> {
poll_fn(|cx| self.poll_next_ref(cx).map(|opt| opt.map(|_| {}))).await?;
Some(self.next_ref_now())
}
pub fn next_ref_now(&mut self) -> ObservableReadGuard<'_, T> {
let lock = self.state.lock();
self.observed_version = lock.version();
ObservableReadGuard::new(lock)
}
pub fn read(&self) -> ObservableReadGuard<'_, T> {
ObservableReadGuard::new(self.state.lock())
}
fn poll_next_ref(&mut self, cx: &mut Context<'_>) -> Poll<Option<ObservableReadGuard<'_, T>>> {
let state = self.state.lock();
let version = state.version();
if version == 0 {
Poll::Ready(None)
} else if self.observed_version < version {
self.observed_version = version;
Poll::Ready(Some(ObservableReadGuard::new(state)))
} else {
state.add_waker(cx.waker().clone());
Poll::Pending
}
}
}
impl<T: Clone> Stream for Subscriber<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next_ref(cx).map(opt_guard_to_owned)
}
}
#[derive(Debug)]
pub struct Next<'a, T> {
subscriber: &'a mut Subscriber<T>,
}
impl<'a, T> Next<'a, T> {
fn new(subscriber: &'a mut Subscriber<T>) -> Self {
Self { subscriber }
}
}
impl<'a, T: Clone> Future for Next<'a, T> {
type Output = Option<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.subscriber.poll_next_ref(cx).map(opt_guard_to_owned)
}
}
fn opt_guard_to_owned<T: Clone>(value: Option<ObservableReadGuard<'_, T>>) -> Option<T> {
value.map(|guard| guard.to_owned())
}