use std::{collections::btree_map::Values, default, sync::{Arc, Weak}, time::Duration};
use crossbeam_queue::ArrayQueue;
use futures::executor::block_on;
use crate::{BoundedSendError, ChannelSharedDetails, LimitedWakerPermitQueue, SendResult, TimeoutBoundedSendError, crossbeam_queue::mpmc::array_queue::WeakSender};
use delegate::delegate;
use std::fmt::Debug;
#[cfg(feature="tokio")]
use tokio::time::{Instant, timeout, timeout_at};
pub struct Sender<T>
{
shared_details: Arc<ChannelSharedDetails<ArrayQueue<T>, LimitedWakerPermitQueue>>,
senders_count: Arc<()>,
receivers_count: Weak<()>
}
impl<T> Sender<T>
{
pub fn new(shared_details: Arc<ChannelSharedDetails<ArrayQueue<T>, LimitedWakerPermitQueue>>, senders_count: Arc<()>, receivers_count: Weak<()>) -> Self
{
Self
{
shared_details, senders_count, receivers_count
}
}
pub fn try_send(&self, value: T) -> Result<(), BoundedSendError<T>>
{
match self.shared_details.notifier_ref().add_permit()
{
Some(true) =>
{
if let Err(mut val) = self.shared_details.message_queue_ref().push(value)
{
loop
{
if let Err(val_again) = self.shared_details.message_queue_ref().push(val)
{
val = val_again;
if self.shared_details.notifier_ref().is_closed()
{
return Err(BoundedSendError::Closed(val));
}
}
else
{
break;
}
}
}
Ok(())
}
Some(false) =>
{
Err(BoundedSendError::Full(value))
}
None =>
{
Err(BoundedSendError::Closed(value))
}
}
}
pub async fn send(&self, value: T) -> SendResult<T>
{
let res = self.shared_details.notifier_ref().increment_permits_or_wait().await;
match res
{
Ok(_) =>
{
if let Err(mut val) = self.shared_details.message_queue_ref().push(value)
{
loop
{
if let Err(val_again) = self.shared_details.message_queue_ref().push(val)
{
if self.is_closed()
{
return Err(val_again);
}
val = val_again;
}
else
{
break;
}
}
}
Ok(())
}
Err(_err) =>
{
Err(value)
}
}
}
pub fn blocking_send(&self, value: T) -> SendResult<T>
{
block_on(self.send(value))
}
#[cfg(feature="tokio")]
pub async fn send_timeout_tokio(&self, value: T, duration: Duration) -> Result<(), TimeoutBoundedSendError<T>>
{
let res = self.shared_details.notifier_ref().increment_permits_or_wait();
let timeout_res = timeout(duration, res).await;
match timeout_res
{
Ok(_res) =>
{
match self.shared_details.message_queue_ref().push(value)
{
Ok(_) =>
{
Ok(())
}
Err(value) =>
{
if self.is_closed()
{
return Err(TimeoutBoundedSendError::NotTimedOut(BoundedSendError::Closed(value)));
}
Err(TimeoutBoundedSendError::NotTimedOut(BoundedSendError::Full(value)))
}
}
}
Err(_) =>
{
if self.is_closed()
{
return Err(TimeoutBoundedSendError::NotTimedOut(BoundedSendError::Closed(value)));
}
Err(TimeoutBoundedSendError::TimedOut(value))
}
}
}
#[cfg(feature="tokio")]
pub async fn send_timeout_at_tokio(&self, value: T, deadline: Instant) -> Result<(), TimeoutBoundedSendError<T>>
{
let res = self.shared_details.notifier_ref().increment_permits_or_wait();
let timeout_res = timeout_at(deadline, res).await;
match timeout_res
{
Ok(_res) =>
{
match self.shared_details.message_queue_ref().push(value)
{
Ok(_) =>
{
Ok(())
}
Err(value) =>
{
if self.is_closed()
{
return Err(TimeoutBoundedSendError::NotTimedOut(BoundedSendError::Closed(value)));
}
Err(TimeoutBoundedSendError::NotTimedOut(BoundedSendError::Full(value)))
}
}
}
Err(_) =>
{
if self.is_closed()
{
return Err(TimeoutBoundedSendError::NotTimedOut(BoundedSendError::Closed(value)));
}
Err(TimeoutBoundedSendError::TimedOut(value))
}
}
}
delegate!
{
to self.shared_details.message_queue_ref()
{
pub fn is_empty(&self) -> bool;
pub fn len(&self) -> usize;
pub fn capacity(&self) -> usize;
pub fn is_full(&self) -> bool;
}
}
pub fn strong_count(&self) -> usize
{
Arc::strong_count(&self.senders_count)
}
pub fn weak_count(&self) -> usize
{
Arc::weak_count(&self.senders_count)
}
delegate!
{
to self.receivers_count
{
#[call(strong_count)]
pub fn receivers_strong_count(&self) -> usize;
#[call(weak_count)]
pub fn receivers_weak_count(&self) -> usize;
}
}
pub fn is_closed(&self) -> bool
{
self.receivers_strong_count() == 0
}
pub fn head_room(&self) -> usize
{
let queue_ref = self.shared_details.message_queue_ref();
queue_ref.capacity() - queue_ref.len()
}
pub fn downgrade(&self) -> WeakSender<T>
{
WeakSender::new(&self.shared_details, &self.senders_count, &self.receivers_count)
}
pub fn same_channel(&self, other: &Self) -> bool
{
Arc::ptr_eq(&self.shared_details, &other.shared_details)
}
pub fn shared_details_ptr_addr(&self) -> usize
{
Arc::as_ptr(&self.shared_details).addr()
}
pub fn same_channel_receiver(&self, other: &super::Receiver<T>) -> bool
{
self.shared_details_ptr_addr() == other.shared_details_ptr_addr()
}
}
impl<T> Clone for Sender<T>
{
fn clone(&self) -> Self
{
Self
{
shared_details: self.shared_details.clone(),
senders_count: self.senders_count.clone(),
receivers_count: self.receivers_count.clone()
}
}
}
impl<T> Debug for Sender<T>
where T: Debug
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Sender").field("shared_details", &self.shared_details).field("senders_count", &self.senders_count).field("receivers_count", &self.receivers_count).finish()
}
}
impl<T> Drop for Sender<T>
{
fn drop(&mut self)
{
if self.strong_count() == 1
{
self.shared_details.notifier_ref().close();
}
}
}