use std::sync::Arc;
#[cfg(feature="use_std_sync")]
use std::sync::{ RwLockReadGuard, RwLockWriteGuard, TryLockError };
#[cfg(feature="tokio")]
use std::time::Duration;
#[cfg(feature="tokio")]
use tokio::time::{error::Elapsed, timeout, timeout_at};
use accessorise::impl_get_ref;
use futures::executor::block_on;
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
use parking_lot::{ RwLockReadGuard, RwLockWriteGuard };
use crate::{ItemUpdater, PreferredRwLockType, shared_reading_and_writing::NotifyingSharedInternals};
use pastey::paste;
use delegate::delegate;
use super::{Reader, WeakNotifyingSharedReader};
pub struct NotifyingSharedReader<T, I, U>
where U: ItemUpdater<I>,
I: Clone + PartialEq + Unpin
{
internals: Arc<NotifyingSharedInternals<T, I, U>>,
current_item: I
}
impl<T, I, U> NotifyingSharedReader<T, I, U>
where U: ItemUpdater<I>,
I: Clone + PartialEq + Unpin
{
pub fn new(internals: Arc<NotifyingSharedInternals<T, I, U>>) -> Self {
Self
{
internals,
current_item: U::init()
}
}
pub fn new_current_item_from_notifier(internals: Arc<NotifyingSharedInternals<T, I, U>>) -> Self {
let current_item;
if let Some(val) = internals.notifier.get_item()
{
current_item = val;
}
else
{
current_item = U::init();
}
Self
{
internals,
current_item
}
}
impl_get_ref!(current_item, I);
#[cfg(feature="use_std_sync")]
fn read_get_rg(&self) -> RwLockReadGuard<'_, T>
{
let lock_result = self.internals.rw_lock.read();
match lock_result
{
Ok(mg) =>
{
mg
}
Err(err) =>
{
self.internals.rw_lock.clear_poison();
err.into_inner()
}
}
}
async fn wake_me_with_item(&mut self)
{
match self.internals.notifier.wake_me_with_item(self.current_item.clone()).await
{
Ok(val) =>
{
self.current_item = val;
}
Err(_) => {}
}
}
#[cfg(feature="use_std_sync")]
pub async fn read(&mut self) -> Reader<'_, T>
{
self.wake_me_with_item().await;
Reader::new(self.read_get_rg())
}
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
pub async fn read(&mut self) -> Reader<'_, T>
{
self.wake_me_with_item().await;
Reader::new(self.internals.rw_lock.read())
}
pub fn blocking_read(&mut self) -> Reader<'_, T>
{
block_on(self.read())
}
#[cfg(feature="tokio")]
pub async fn read_timeout_tokio(&mut self, duration: Duration) -> Result<Reader<'_, T>, Elapsed>
{
timeout(duration, self.read()).await
}
#[cfg(feature="tokio")]
pub async fn read_timeout_at_tokio(&mut self, deadline: tokio::time::Instant) -> Result<Reader<'_, T>, Elapsed>
{
timeout_at(deadline, self.read()).await
}
pub async fn read_clone(&mut self) -> T
where T: Clone
{
self.wake_me_with_item().await;
(*self.read().await).clone()
}
pub fn blocking_read_clone(&mut self) -> T
where T: Clone
{
block_on(self.read_clone())
}
#[cfg(feature="tokio")]
pub async fn read_clone_timeout_tokio(&mut self, duration: Duration) -> Result<T, Elapsed>
where T: Clone
{
timeout(duration, self.read_clone()).await
}
#[cfg(feature="tokio")]
pub async fn read_clone_timeout_at_tokio(&mut self, deadline: tokio::time::Instant) -> Result<T, Elapsed>
where T: Clone
{
timeout_at(deadline, self.read_clone()).await
}
#[cfg(feature="use_std_sync")]
pub fn read_dont_wait(&self) -> Reader<'_, T>
{
Reader::new(self.read_get_rg())
}
#[cfg(any(feature="use_parking_lot_sync", feature="use_parking_lot_fair_sync"))]
pub fn read_dont_wait(&self) -> Reader<'_, T>
{
Reader::new(self.internals.rw_lock.read())
}
pub fn read_clone_dont_wait(&self) -> T
where T: Clone
{
(*self.read_dont_wait()).clone()
}
pub fn strong_count(&self) -> usize
{
Arc::strong_count(&self.internals)
}
pub fn weak_count(&self) -> usize
{
Arc::weak_count(&self.internals)
}
pub fn downgrade(&self) -> WeakNotifyingSharedReader<T, I, U>
{
WeakNotifyingSharedReader::new(&self.internals)
}
delegate!
{
to self.internals
{
pub fn is_closed(&self);
}
}
}