use std::{sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Weak}, thread::{sleep, sleep_ms, Thread}, time::Duration};
use crossbeam::queue::SegQueue;
use tokio::{sync::Notify, time::timeout};
use crate::{BoundedSharedDetails, ReceiveError, ReceiveResult, SharedDetails, TimeoutReceiveError};
use crate::crossbeam::mpmc::base::seg_queue::{Sender, Receiver as BaseReceiver};
use delegate::delegate;
use std::clone::Clone;
use crate::tokio_helpers::SemaphoreController;
use std::fmt::Debug;
pub struct Receiver<T>
{
base: BaseReceiver<T, SemaphoreController>
}
impl<T> Receiver<T>
{
pub fn new(shared_details: Arc<SharedDetails<SegQueue<T>, SemaphoreController>>, sender_count: Weak<()>, receiver_count: Arc<()>,) -> Self
{
Self
{
base: BaseReceiver::new(shared_details, sender_count, receiver_count)
}
}
delegate!
{
to self.base
{
pub fn is_empty(&self) -> bool;
pub fn len(&self) -> usize;
}
}
delegate!
{
to self.base.receivers_notifier_ref()
{
pub fn is_closed(&self) -> bool;
}
}
pub fn try_recv(&self) -> ReceiveResult<T>
{
let res = self.base.try_recv();
if res.is_ok()
{
self.base.receivers_notifier_ref().forget_permit();
}
res
}
pub async fn recv(&self) -> ReceiveResult<T> {
loop
{
let can_receive_or_not = self.base.receivers_notifier_ref().acquire().await;
let sent;
match can_receive_or_not
{
Ok(permit) =>
{
permit.forget();
sent = self.base.try_recv()?;
return Ok(sent);
}
Err(_err) =>
{
sent = self.base.try_recv()?;
return Ok(sent);
}
}
}
}
pub async fn recv_or_timeout(&self, duration: Duration) -> Result<T, TimeoutReceiveError>
{
let acquired_or_not= self.base.receivers_notifier_ref().acquire_timeout(duration).await;
let recvd;
match acquired_or_not
{
Ok(res) =>
{
match res
{
Ok(permit) =>
{
recvd = self.base.try_recv();
permit.forget();
}
Err(_err) =>
{
recvd = self.base.try_recv();
}
}
}
Err(_err) =>
{
return Err(TimeoutReceiveError::TimedOut);
}
}
match recvd
{
Ok(res) =>
{
Ok(res)
}
Err(err) =>
{
Err(TimeoutReceiveError::NotTimedOut(err))
}
}
}
}
impl<T> Clone for Receiver<T>
{
fn clone(&self) -> Self
{
Self
{
base: self.base.clone()
}
}
}
impl<T> Debug for Receiver<T>
where T: Debug
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Receiver").field("base", &self.base).finish()
}
}
impl<T> Drop for Receiver<T>
{
fn drop(&mut self)
{
if self.base.receiver_strong_count() == 1
{
self.base.receivers_notifier_ref().close();
}
}
}