use std::{
fmt,
future::{poll_fn, Future},
pin::Pin,
task::{Context, Poll},
};
use futures_core::Stream;
use crate::{lock::Lock, state::ObservableState, ObservableReadGuard, SyncLock};
#[cfg(feature = "async-lock")]
pub(crate) mod async_lock;
#[must_use]
pub struct Subscriber<T, L: Lock = SyncLock> {
state: L::SubscriberState<T>,
observed_version: u64,
}
impl<T> Subscriber<T> {
pub(crate) fn new(state: readlock::SharedReadLock<ObservableState<T>>, version: u64) -> Self {
Self { state, observed_version: version }
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Next<'_, T>
where
T: Clone,
{
Next::new(self)
}
#[must_use]
pub fn next_now(&mut self) -> T
where
T: Clone,
{
let lock = self.state.lock();
self.observed_version = lock.version();
lock.get().clone()
}
#[must_use]
pub fn get(&self) -> T
where
T: Clone,
{
self.read().clone()
}
#[must_use]
pub async fn next_ref(&mut self) -> Option<ObservableReadGuard<'_, T>> {
poll_fn(|cx| self.poll_next_ref(cx).map(|opt| opt.map(|_| {}))).await?;
Some(self.next_ref_now())
}
pub fn next_ref_now(&mut self) -> ObservableReadGuard<'_, T> {
let lock = self.state.lock();
self.observed_version = lock.version();
ObservableReadGuard::new(lock)
}
pub fn read(&self) -> ObservableReadGuard<'_, T> {
ObservableReadGuard::new(self.state.lock())
}
fn poll_next_ref(&mut self, cx: &mut Context<'_>) -> Poll<Option<ObservableReadGuard<'_, T>>> {
let state = self.state.lock();
let version = state.version();
if version == 0 {
Poll::Ready(None)
} else if self.observed_version < version {
self.observed_version = version;
Poll::Ready(Some(ObservableReadGuard::new(state)))
} else {
state.add_waker(cx.waker().clone());
Poll::Pending
}
}
}
impl<T, L: Lock> Subscriber<T, L> {
pub fn reset(&mut self) {
self.observed_version = 0;
}
pub fn clone_reset(&self) -> Self
where
L::SubscriberState<T>: Clone,
{
Self { state: self.state.clone(), observed_version: 0 }
}
}
impl<T, L: Lock> Clone for Subscriber<T, L>
where
L::SubscriberState<T>: Clone,
{
fn clone(&self) -> Self {
Self { state: self.state.clone(), observed_version: self.observed_version }
}
}
impl<T, L: Lock> fmt::Debug for Subscriber<T, L>
where
L::SubscriberState<T>: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Subscriber")
.field("state", &self.state)
.field("observed_version", &self.observed_version)
.finish()
}
}
impl<T: Clone> Stream for Subscriber<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next_ref(cx).map(opt_guard_to_owned)
}
}
#[must_use]
#[allow(missing_debug_implementations)]
pub struct Next<'a, T, L: Lock = SyncLock> {
subscriber: &'a mut Subscriber<T, L>,
}
impl<'a, T> Next<'a, T> {
fn new(subscriber: &'a mut Subscriber<T>) -> Self {
Self { subscriber }
}
}
impl<T: Clone> Future for Next<'_, T> {
type Output = Option<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.subscriber.poll_next_ref(cx).map(opt_guard_to_owned)
}
}
fn opt_guard_to_owned<T: Clone>(value: Option<ObservableReadGuard<'_, T>>) -> Option<T> {
value.map(|guard| guard.to_owned())
}