use std::{sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Weak}, time::Duration};
use crossbeam::queue::ArrayQueue;
use tokio::sync::{Notify, Semaphore};
use crate::{BoundedSendError, BoundedSendResult, BoundedSharedDetails, SendResult, TimeoutBoundedSendError};
use crate::crossbeam::mpmc::base::array_queue::{Sender as BaseSender, Receiver};
use delegate::delegate;
use std::clone::Clone;
use tokio::time::timeout;
use crate::tokio_helpers::SemaphoreController;
use std::fmt::Debug;
pub struct Sender<T>
{
base: BaseSender<T, SemaphoreController>
}
impl<T> Sender<T>
{
pub fn new(shared_details: &Arc<BoundedSharedDetails<ArrayQueue<T>, SemaphoreController>>, sender_count: Arc<()>, receiver_count: &Arc<()>) -> Self {
Self
{
base: BaseSender::new(shared_details, sender_count, receiver_count)
}
}
delegate!
{
to self.base
{
pub fn capacity(&self) -> usize;
pub fn is_empty(&self) -> bool;
pub fn is_full(&self) -> bool;
pub fn len(&self) -> usize;
pub fn len_capacity(&self) -> (usize, usize);
pub fn remaining_capacity(&self) -> usize;
}
}
pub fn try_send(&self, value: T) -> Result<(), BoundedSendError<T>>
{
let res = self.base.try_send(value);
if res.is_ok()
{
self.base.receivers_notifier_ref().add_permit();
self.base.senders_notifier_ref().forget_permit();
}
res
}
pub async fn send(&self, value: T) -> Result<(), BoundedSendError<T>>
{
let mut item = value;
loop
{
let acquired_or_not = self.base.senders_notifier_ref().acquire().await;
match acquired_or_not
{
Ok(permit) =>
{
let sent_res = self.base.try_send(item);
permit.forget();
match sent_res
{
Ok(res) =>
{
self.base.receivers_notifier_ref().add_permit();
return Ok(res);
}
Err(err) =>
{
match err
{
BoundedSendError::Full(value) =>
{
item = value
}
BoundedSendError::Closed(_) => return Err(err)
}
}
}
}
Err(_err) =>
{
return self.base.try_send(item);
}
}
}
}
pub async fn send_or_timeout(&self, value: T, duration: Duration) -> Result<(), TimeoutBoundedSendError<T>>
{
let acquired_or_not= self.base.senders_notifier_ref().acquire_timeout(duration).await;
let sent;
match acquired_or_not
{
Ok(res) =>
{
match res
{
Ok(permit) =>
{
sent = self.base.try_send(value);
permit.forget();
}
Err(_err) =>
{
sent = self.base.try_send(value);
}
}
}
Err(_err) =>
{
return Err(TimeoutBoundedSendError::TimedOut(value));
}
}
match sent
{
Ok(res) =>
{
self.base.receivers_notifier_ref().add_permit();
Ok(res)
}
Err(err) =>
{
Err(TimeoutBoundedSendError::NotTimedOut(err))
}
}
}
}
impl<T> Clone for Sender<T>
{
fn clone(&self) -> Self
{
Self
{
base: self.base.clone()
}
}
}
impl<T> Debug for Sender<T>
where T: Debug
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Sender").field("base", &self.base).finish()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self)
{
if self.base.sender_strong_count() == 1
{
self.base.senders_notifier_ref().close();
self.base.receivers_notifier_ref().close();
}
}
}