use std::{
collections::VecDeque,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll}
};
use parking_lot::{Mutex, MutexGuard};
use super::Inner;
use crate::err::Error;
pub struct Sender<T, E>(pub(super) Arc<super::Shared<T, E>>);
impl<T, E> Sender<T, E> {
#[inline]
fn lock_inner(&self) -> MutexGuard<'_, Inner<T, E>> {
self.0.inner.lock()
}
#[inline]
fn signal_receiver(&self, inner: &mut Inner<T, E>) {
self.0.signal.notify_all();
if let Some(ref mut rctx) = inner.rctx {
if let Some(waker) = rctx.waker.take() {
waker.wake();
}
}
}
#[inline]
fn check_error(&self, inner: &mut Inner<T, E>) -> Result<(), Error<E>> {
if let Some(ref mut sctx) = inner.sctx {
if let Some(err) = sctx.error.take() {
return Err(err);
}
}
if inner.rctx.is_none() {
return Err(Error::ReceiverDisappeared);
}
Ok(())
}
#[inline]
fn queue_full(&self, inner: &mut Inner<T, E>) -> bool {
self.0.queue_full(inner)
}
#[inline]
fn blocking_wait(&self, inner: &mut MutexGuard<'_, Inner<T, E>>) {
self.0.signal.wait(inner);
}
fn wait_for_space(
&self,
inner: &mut MutexGuard<'_, Inner<T, E>>
) -> Result<(), Error<E>> {
self.check_error(inner)?;
loop {
if !self.queue_full(inner) {
break;
}
self.blocking_wait(inner);
self.check_error(inner)?;
}
Ok(())
}
#[inline]
fn expected(&self, inner: &mut Inner<T, E>) -> Option<(usize, usize)> {
if let (Some(expected), Some(sent)) = (self.0.num_records, inner.sent_recs)
{
Some((expected, sent))
} else {
None
}
}
}
impl<T, E> Sender<T, E> {
pub fn send(&self, n: T) -> Result<(), Error<E>> {
let mut inner = self.lock_inner();
self.wait_for_space(&mut inner)?;
inner.push(n);
self.signal_receiver(&mut inner);
Ok(())
}
pub fn send_batch<I>(&self, it: I) -> Result<(), Error<E>>
where
I: Iterator<Item = T>
{
let mut inner = self.lock_inner();
self.check_error(&mut inner)?;
let mut did_push = false;
for n in it {
while self.queue_full(&mut inner) {
if did_push {
self.signal_receiver(&mut inner);
did_push = false;
}
self.wait_for_space(&mut inner)?;
}
inner.push(n);
did_push = true;
}
if did_push {
self.signal_receiver(&mut inner);
}
Ok(())
}
pub fn try_send(&self, n: T) -> Result<(), Error<E>> {
let mut inner = self.lock_inner();
self.check_error(&mut inner)?;
if self.queue_full(&mut inner) {
Err(Error::QueueFull)
} else {
inner.push(n);
self.signal_receiver(&mut inner);
Ok(())
}
}
pub fn send_async(&self, n: T) -> SendFuture<T, E> {
let mut q = VecDeque::new();
q.push_back(n);
SendFuture {
snd: self,
outq: Mutex::new(q)
}
}
pub fn send_batch_async<I>(&self, it: I) -> SendFuture<T, E>
where
I: Iterator<Item = T>
{
let q = VecDeque::from_iter(it);
SendFuture {
snd: self,
outq: Mutex::new(q)
}
}
pub fn fail(self, e: E) {
let mut inner = self.lock_inner();
if let Some(ref mut rctx) = inner.rctx {
rctx.error = Some(Error::App(e));
self.signal_receiver(&mut inner);
}
}
}
impl<T, E> Drop for Sender<T, E> {
fn drop(&mut self) {
let mut inner = self.lock_inner();
let _ = inner.sctx.take();
if let Some((expected, sent)) = self.expected(&mut inner) {
if sent < expected {
if let Some(ref mut rctx) = inner.rctx {
rctx.error = Some(Error::RecordsUnderflow);
}
}
}
self.signal_receiver(&mut inner);
}
}
pub struct SendFuture<'a, T, E> {
snd: &'a Sender<T, E>,
outq: Mutex<VecDeque<T>>
}
impl<'a, T, E> Future for SendFuture<'a, T, E> {
type Output = Result<(), Error<E>>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut inner = self.snd.lock_inner();
if let Err(e) = self.snd.check_error(&mut inner) {
return Poll::Ready(Err(e));
};
if self.snd.queue_full(&mut inner) {
if let Some(ref mut sctx) = inner.sctx {
sctx.waker = Some(ctx.waker().clone());
return Poll::Pending;
} else {
return Poll::Ready(Ok(()));
}
}
let mut outq = self.outq.lock();
let mut did_push = false;
let ret = loop {
let Some(n) = outq.pop_front() else {
break Poll::Ready(Ok(()));
};
inner.push(n);
did_push = true;
if outq.is_empty() {
break Poll::Ready(Ok(()));
};
if self.snd.queue_full(&mut inner) {
if let Some(ref mut sctx) = inner.sctx {
sctx.waker = Some(ctx.waker().clone());
break Poll::Pending;
} else {
break Poll::Ready(Ok(()));
}
}
};
if did_push {
self.snd.signal_receiver(&mut inner);
}
ret
}
}