use std::{fmt, hash::Hash, mem, ops, ptr};
use readlock::Shared;
#[cfg(feature = "async-lock")]
use readlock_tokio::Shared as SharedAsync;
#[cfg(feature = "async-lock")]
use crate::AsyncLock;
use crate::{lock::Lock, shared::SharedObservable, state::ObservableState, Subscriber, SyncLock};
pub struct Observable<T, L: Lock = SyncLock> {
state: L::Shared<ObservableState<T>>,
}
impl<T> Observable<T> {
#[must_use]
pub fn new(value: T) -> Self {
let state = Shared::new(ObservableState::new(value));
Self::from_inner(state)
}
pub fn subscribe(this: &Self) -> Subscriber<T> {
Subscriber::new(Shared::get_read_lock(&this.state), this.state.version())
}
pub fn subscribe_reset(this: &Self) -> Subscriber<T> {
Subscriber::new(Shared::get_read_lock(&this.state), 0)
}
pub fn get(this: &Self) -> &T {
this.state.get()
}
pub fn set(this: &mut Self, value: T) -> T {
Shared::lock(&mut this.state).set(value)
}
pub fn set_if_not_eq(this: &mut Self, value: T) -> Option<T>
where
T: PartialEq,
{
Shared::lock(&mut this.state).set_if_not_eq(value)
}
pub fn set_if_hash_not_eq(this: &mut Self, value: T) -> Option<T>
where
T: Hash,
{
Shared::lock(&mut this.state).set_if_hash_not_eq(value)
}
pub fn take(this: &mut Self) -> T
where
T: Default,
{
Self::set(this, T::default())
}
pub fn update(this: &mut Self, f: impl FnOnce(&mut T)) {
Shared::lock(&mut this.state).update(f);
}
pub fn update_if(this: &mut Self, f: impl FnOnce(&mut T) -> bool) {
Shared::lock(&mut this.state).update_if(f);
}
}
#[cfg(feature = "async-lock")]
impl<T: Send + Sync + 'static> Observable<T, AsyncLock> {
#[must_use]
pub fn new_async(value: T) -> Self {
let state = SharedAsync::new(ObservableState::new(value));
Self::from_inner(state)
}
pub fn subscribe_async(this: &Self) -> Subscriber<T, AsyncLock> {
Subscriber::new_async(SharedAsync::get_read_lock(&this.state), this.state.version())
}
pub fn subscribe_reset_async(this: &Self) -> Subscriber<T, AsyncLock> {
Subscriber::new_async(SharedAsync::get_read_lock(&this.state), 0)
}
pub fn get_async(this: &Self) -> &T {
this.state.get()
}
pub async fn set_async(this: &mut Self, value: T) -> T {
SharedAsync::lock(&mut this.state).await.set(value)
}
pub async fn set_if_not_eq_async(this: &mut Self, value: T) -> Option<T>
where
T: PartialEq,
{
SharedAsync::lock(&mut this.state).await.set_if_not_eq(value)
}
pub async fn set_if_hash_not_eq_async(this: &mut Self, value: T) -> Option<T>
where
T: Hash,
{
SharedAsync::lock(&mut this.state).await.set_if_hash_not_eq(value)
}
pub async fn take_async(this: &mut Self) -> T
where
T: Default,
{
Self::set_async(this, T::default()).await
}
pub async fn update_async(this: &mut Self, f: impl FnOnce(&mut T)) {
SharedAsync::lock(&mut this.state).await.update(f);
}
pub async fn update_if_async(this: &mut Self, f: impl FnOnce(&mut T) -> bool) {
SharedAsync::lock(&mut this.state).await.update_if(f);
}
}
impl<T, L: Lock> Observable<T, L> {
pub(crate) fn from_inner(state: L::Shared<ObservableState<T>>) -> Self {
Self { state }
}
#[must_use]
pub fn subscriber_count(this: &Self) -> usize {
L::shared_read_count(&this.state)
}
pub fn into_shared(this: Self) -> SharedObservable<T, L> {
let state = unsafe { ptr::read(&this.state) };
mem::forget(this);
let rwlock = L::shared_into_inner(state);
SharedObservable::from_inner(rwlock)
}
}
impl<T, L: Lock> fmt::Debug for Observable<T, L>
where
L::Shared<ObservableState<T>>: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SharedObservable").field("state", &self.state).finish()
}
}
impl<T, L> Default for Observable<T, L>
where
T: Default,
L: Lock,
{
fn default() -> Self {
let shared = L::new_shared(ObservableState::new(T::default()));
Self::from_inner(shared)
}
}
impl<T> ops::Deref for Observable<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.state.get()
}
}
impl<T, L: Lock> Drop for Observable<T, L> {
fn drop(&mut self) {
self.state.close();
}
}