use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll}
};
use crate::err::Error;
#[repr(transparent)]
pub struct Sender(pub(super) Arc<super::Shared>);
impl Sender {
pub fn push(&self, mut buf: Vec<u8>) -> Result<(), Error> {
let mut inner = self.0.inner.lock();
loop {
match self.0.try_push(&mut inner, buf) {
Ok(()) => break Ok(()),
Err(Error::WontFit(b)) => {
buf = b;
self.0.signal.wait(&mut inner);
}
Err(e) => break Err(e)
}
}
}
pub async fn push_async(&self, buf: Vec<u8>) -> Result<(), Error> {
{
let inner = self.0.inner.lock();
if let Some(max_size) = inner.q.max_size() {
if buf.len() > max_size {
return Err(Error::WontFit(buf));
}
}
}
let mut buf2 = buf;
loop {
match self.try_push(buf2) {
Ok(()) => break Ok(()),
Err(Error::WontFit(buf)) => {
let wsf = WaitSpaceFuture {
sender: self,
size: buf.len(),
waker_id: None
};
wsf.await;
buf2 = buf;
}
Err(e) => break Err(e)
}
}
}
pub fn force_push(&self, buf: Vec<u8>) -> Result<(), Error> {
let mut inner = self.0.inner.lock();
self.0.force_push(&mut inner, buf)
}
pub fn try_push(&self, buf: Vec<u8>) -> Result<(), Error> {
let mut inner = self.0.inner.lock();
self.0.try_push(&mut inner, buf)
}
}
impl Drop for Sender {
fn drop(&mut self) {
let mut inner = self.0.inner.lock();
inner.tx_count -= 1;
if inner.tx_count == 0 {
self.0.wake_senders(&mut inner);
}
}
}
pub struct WaitSpaceFuture<'snd> {
sender: &'snd Sender,
size: usize,
waker_id: Option<u32>
}
impl<'snd> Future for WaitSpaceFuture<'snd> {
type Output = ();
fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Self::Output> {
let mut inner = self.sender.0.lock_inner();
let is_full_len = inner
.q
.max_len()
.map_or(false, |max_len| inner.q.len() >= max_len);
let is_full_size = if let Some(max_size) = inner.q.max_size() {
inner.q.size() + self.size > max_size
} else {
false
};
if is_full_len || is_full_size {
let id = loop {
inner.idgen = inner.idgen.wrapping_add(1);
if !inner.tx_wakers.contains_key(&inner.idgen) {
break inner.idgen;
}
};
inner.tx_wakers.insert(id, ctx.waker().clone());
drop(inner);
self.waker_id = Some(id);
Poll::Pending
} else {
Poll::Ready(())
}
}
}
impl<'snd> Drop for WaitSpaceFuture<'snd> {
fn drop(&mut self) {
if let Some(id) = self.waker_id.take() {
let mut inner = self.sender.0.lock_inner();
inner.tx_wakers.remove(&id);
}
}
}