use super::{state::Side, Cursor, PushError, Result, State};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
#[derive(Debug)]
pub struct Sender<T>(pub(super) State<T>);
impl<T> Sender<T> {
#[inline]
pub fn capacity(&self) -> usize {
self.0.cursor.capacity()
}
#[inline]
pub fn slice(&mut self) -> SendSlice<'_, T> {
let cursor = self.0.cursor;
SendSlice(&mut self.0, cursor)
}
#[inline]
pub async fn acquire(&mut self) -> Result<()> {
Acquire { sender: self }.await
}
#[inline]
pub fn poll_slice(&mut self, cx: &mut Context) -> Poll<Result<SendSlice<'_, T>>> {
macro_rules! acquire_capacity {
() => {
match self.0.acquire_capacity() {
Ok(true) => {
let cursor = self.0.cursor;
return Ok(SendSlice(&mut self.0, cursor)).into();
}
Ok(false) => {
}
Err(err) => {
return Err(err).into();
}
}
};
}
acquire_capacity!();
self.0.sender.register(cx.waker());
acquire_capacity!();
Poll::Pending
}
#[inline]
pub fn try_slice(&mut self) -> Result<Option<SendSlice<'_, T>>> {
Ok(if self.0.acquire_capacity()? {
let cursor = self.0.cursor;
Some(SendSlice(&mut self.0, cursor))
} else {
None
})
}
}
impl<T> Drop for Sender<T> {
#[inline]
fn drop(&mut self) {
self.0.close(Side::Sender);
}
}
#[derive(Debug)]
pub struct SendSlice<'a, T>(&'a mut State<T>, Cursor);
impl<T> SendSlice<'_, T> {
#[inline]
pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
if self.0.cursor.is_full() && !self.0.acquire_capacity()? {
return Err(PushError::Full(value));
}
let (_, pair) = self.0.as_pairs();
unsafe {
pair.write(0, value);
}
self.0.cursor.increment_tail(1);
Ok(())
}
pub fn extend<I: Iterator<Item = T>>(&mut self, iter: &mut I) -> Result<()> {
if self.0.acquire_capacity()? {
let (_, pair) = self.0.as_pairs();
let mut idx = 0;
let capacity = self.capacity();
while idx < capacity {
if let Some(value) = iter.next() {
unsafe {
pair.write(idx, value);
}
idx += 1;
} else {
break;
}
}
self.0.cursor.increment_tail(idx);
}
Ok(())
}
#[inline]
pub fn capacity(&self) -> usize {
self.0.cursor.send_capacity()
}
#[inline]
pub fn sync(&mut self) -> Result<(), super::ClosedError> {
self.0.acquire_capacity()?;
Ok(())
}
}
impl<T> Drop for SendSlice<'_, T> {
#[inline]
fn drop(&mut self) {
self.0.persist_tail(self.1);
}
}
struct Acquire<'a, T> {
sender: &'a mut Sender<T>,
}
impl<T> Future for Acquire<'_, T> {
type Output = Result<()>;
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.sender.poll_slice(cx) {
Poll::Ready(v) => Poll::Ready(v.map(|_| ())),
Poll::Pending => Poll::Pending,
}
}
}