use std::{
fmt,
future::{poll_fn, Future},
hash::Hash,
ops,
pin::Pin,
task::{Context, Poll},
};
use futures_core::Stream;
use readlock::{Shared, SharedReadGuard, SharedReadLock};
use crate::state::ObservableState;
#[derive(Debug)]
pub struct Observable<T> {
state: Shared<ObservableState<T>>,
}
impl<T> Observable<T> {
pub fn new(value: T) -> Self {
Self { state: Shared::new(ObservableState::new(value)) }
}
pub fn subscribe(this: &Self) -> Subscriber<T> {
Subscriber::new(Shared::get_read_lock(&this.state), this.state.version())
}
pub fn get(this: &Self) -> &T {
this.state.get()
}
pub fn set(this: &mut Self, value: T) {
Shared::lock(&mut this.state).set(value);
}
pub fn set_eq(this: &mut Self, value: T)
where
T: Clone + PartialEq,
{
Self::update_eq(this, |inner| {
*inner = value;
});
}
pub fn set_hash(this: &mut Self, value: T)
where
T: Hash,
{
Self::update_hash(this, |inner| {
*inner = value;
});
}
pub fn replace(this: &mut Self, value: T) -> T {
Shared::lock(&mut this.state).replace(value)
}
pub fn take(this: &mut Self) -> T
where
T: Default,
{
Self::replace(this, T::default())
}
pub fn update(this: &mut Self, f: impl FnOnce(&mut T)) {
Shared::lock(&mut this.state).update(f);
}
pub fn update_eq(this: &mut Self, f: impl FnOnce(&mut T))
where
T: Clone + PartialEq,
{
Shared::lock(&mut this.state).update_eq(f);
}
pub fn update_hash(this: &mut Self, f: impl FnOnce(&mut T))
where
T: Hash,
{
Shared::lock(&mut this.state).update_hash(f);
}
}
impl<T: Default> Default for Observable<T> {
fn default() -> Self {
Self::new(T::default())
}
}
impl<T> ops::Deref for Observable<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.state.get()
}
}
impl<T> Drop for Observable<T> {
fn drop(&mut self) {
Shared::lock(&mut self.state).close();
}
}
#[derive(Debug)]
pub struct Subscriber<T> {
state: SharedReadLock<ObservableState<T>>,
observed_version: u64,
}
impl<T> Subscriber<T> {
pub(crate) fn new(read_lock: SharedReadLock<ObservableState<T>>, version: u64) -> Self {
Self { state: read_lock, observed_version: version }
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Next<T>
where
T: Clone,
{
Next::new(self)
}
pub fn next_now(&mut self) -> T
where
T: Clone,
{
let lock = self.state.lock();
self.observed_version = lock.version();
lock.get().clone()
}
pub fn get(&self) -> T
where
T: Clone,
{
self.read().clone()
}
pub async fn next_ref(&mut self) -> Option<ObservableReadGuard<'_, T>> {
poll_fn(|cx| self.poll_next_ref(cx).map(|opt| opt.map(|_| {}))).await?;
Some(self.next_ref_now())
}
pub fn next_ref_now(&mut self) -> ObservableReadGuard<'_, T> {
let lock = self.state.lock();
self.observed_version = lock.version();
ObservableReadGuard::new(lock)
}
pub fn read(&self) -> ObservableReadGuard<'_, T> {
ObservableReadGuard::new(self.state.lock())
}
fn poll_next_ref(&mut self, cx: &mut Context<'_>) -> Poll<Option<ObservableReadGuard<'_, T>>> {
let state = self.state.lock();
let version = state.version();
if version == 0 {
Poll::Ready(None)
} else if self.observed_version < version {
self.observed_version = version;
Poll::Ready(Some(ObservableReadGuard::new(state)))
} else {
state.add_waker(cx.waker().clone());
Poll::Pending
}
}
}
impl<T: Clone> Stream for Subscriber<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next_ref(cx).map(opt_guard_to_owned)
}
}
#[clippy::has_significant_drop]
pub struct ObservableReadGuard<'a, T> {
inner: SharedReadGuard<'a, ObservableState<T>>,
}
impl<'a, T> ObservableReadGuard<'a, T> {
fn new(inner: SharedReadGuard<'a, ObservableState<T>>) -> Self {
Self { inner }
}
}
impl<T: fmt::Debug> fmt::Debug for ObservableReadGuard<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
impl<T> ops::Deref for ObservableReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.inner.get()
}
}
#[derive(Debug)]
pub struct Next<'a, T> {
subscriber: &'a mut Subscriber<T>,
}
impl<'a, T> Next<'a, T> {
fn new(subscriber: &'a mut Subscriber<T>) -> Self {
Self { subscriber }
}
}
impl<'a, T: Clone> Future for Next<'a, T> {
type Output = Option<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.subscriber.poll_next_ref(cx).map(opt_guard_to_owned)
}
}
fn opt_guard_to_owned<T: Clone>(value: Option<ObservableReadGuard<'_, T>>) -> Option<T> {
value.map(|guard| guard.to_owned())
}