use std::{sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Weak}, time::Duration};
use crossbeam::queue::SegQueue;
use tokio::sync::{Notify, Semaphore};
use crate::{BoundedSendError, BoundedSendResult, BoundedSharedDetails, SendResult, SharedDetails, TimeoutBoundedSendError};
use crate::crossbeam::mpmc::base::seg_queue::{Sender as BaseSender, Receiver};
use delegate::delegate;
use std::clone::Clone;
use tokio::time::timeout;
use crate::tokio_helpers::SemaphoreController;
use std::fmt::Debug;
pub struct Sender<T>
{
base: BaseSender<T, SemaphoreController>
}
impl<T> Sender<T>
{
pub fn new(shared_details: &Arc<SharedDetails<SegQueue<T>, SemaphoreController>>, sender_count: Arc<()>, receiver_count: &Arc<()>) -> Self {
Self
{
base: BaseSender::new(shared_details, sender_count, receiver_count)
}
}
delegate!
{
to self.base
{
pub fn is_empty(&self) -> bool;
pub fn len(&self) -> usize;
}
}
delegate!
{
to self.base.receivers_notifier_ref()
{
pub fn is_closed(&self) -> bool;
}
}
pub fn send(&self, value: T) -> SendResult<T>
{
let res = self.base.send(value);
if res.is_ok()
{
self.base.receivers_notifier_ref().add_permit();
}
res
}
}
impl<T> Clone for Sender<T>
{
fn clone(&self) -> Self
{
Self
{
base: self.base.clone()
}
}
}
impl<T> Debug for Sender<T>
where T: Debug
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Sender").field("base", &self.base).finish()
}
}
impl<T> Drop for Sender<T>
{
fn drop(&mut self)
{
if self.base.sender_strong_count() == 1
{
self.base.receivers_notifier_ref().close();
}
}
}