use std::{
future::Future,
mem::ManuallyDrop,
ops::{Deref, DerefMut},
pin::Pin,
sync::Arc,
task::{Context, Poll}
};
use crate::err::Error;
#[derive(Default)]
enum DropAction {
#[default]
ReturnToQueue,
Drop,
Nothing
}
pub struct MustHandle {
sh: Arc<super::Shared>,
inner: ManuallyDrop<Vec<u8>>,
drop_action: DropAction
}
impl MustHandle {
fn new(sh: Arc<super::Shared>, inner: Vec<u8>) -> Self {
Self {
sh,
inner: ManuallyDrop::new(inner),
drop_action: DropAction::default()
}
}
pub fn handled(mut self) {
self.drop_action = DropAction::Drop;
}
#[must_use]
pub fn into_inner(mut self) -> Vec<u8> {
self.drop_action = DropAction::Nothing;
unsafe { ManuallyDrop::take(&mut self.inner) }
}
}
impl Deref for MustHandle {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.inner
}
}
impl DerefMut for MustHandle {
fn deref_mut(&mut self) -> &mut [u8] {
&mut self.inner
}
}
impl Drop for MustHandle {
fn drop(&mut self) {
match self.drop_action {
DropAction::ReturnToQueue => {
let t = unsafe { ManuallyDrop::take(&mut self.inner) };
let mut inner = self.sh.inner.lock();
let _ = inner.q.try_return(t);
}
DropAction::Drop => unsafe { ManuallyDrop::drop(&mut self.inner) },
DropAction::Nothing => {}
}
}
}
#[repr(transparent)]
pub struct Receiver(pub(super) Arc<super::Shared>);
impl Receiver {
#[allow(clippy::significant_drop_tightening)]
#[must_use]
pub fn pop(&self) -> Option<Vec<u8>> {
let mut inner = self.0.inner.lock();
loop {
if let Some(buf) = inner.pop() {
self.0.wake_senders(&mut inner);
break Some(buf);
}
if inner.tx_count == 0 {
break None;
}
self.0.signal.wait(&mut inner);
}
}
#[must_use]
pub fn pop_managed(&self) -> Option<MustHandle> {
let n = self.pop()?;
Some(MustHandle::new(Arc::clone(&self.0), n))
}
pub fn try_pop(&self) -> Result<Option<Vec<u8>>, Error> {
let mut inner = self.0.inner.lock();
self.0.pop(&mut inner)
}
pub fn try_pop_managed(&self) -> Result<Option<MustHandle>, Error> {
let mut inner = self.0.inner.lock();
Ok(
self
.0
.pop(&mut inner)?
.map(|n| MustHandle::new(Arc::clone(&self.0), n))
)
}
#[must_use]
pub fn apop(&self) -> RecvFuture {
RecvFuture {
sh: Arc::clone(&self.0),
waker_id: None
}
}
#[must_use]
pub fn apop_managed(&self) -> RecvManagedFuture {
RecvManagedFuture {
sh: Arc::clone(&self.0),
waker_id: None
}
}
}
impl Drop for Receiver {
fn drop(&mut self) {
let mut inner = self.0.inner.lock();
inner.rx_count -= 1;
if inner.rx_count == 0 {
self.0.wake_senders(&mut inner);
}
}
}
pub struct RecvFuture {
sh: Arc<super::Shared>,
waker_id: Option<u32>
}
impl Future for RecvFuture {
type Output = Result<Vec<u8>, Error>;
fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Self::Output> {
let mut inner = self.sh.lock_inner();
match self.sh.pop(&mut inner) {
Ok(Some(buf)) => Poll::Ready(Ok(buf)),
Ok(None) => {
let id = loop {
inner.idgen = inner.idgen.wrapping_add(1);
if !inner.rx_wakers.contains_key(&inner.idgen) {
break inner.idgen;
}
};
inner.rx_wakers.insert(id, ctx.waker().clone());
drop(inner);
self.waker_id = Some(id);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e))
}
}
}
impl Drop for RecvFuture {
fn drop(&mut self) {
if let Some(id) = self.waker_id.take() {
let mut inner = self.sh.lock_inner();
inner.rx_wakers.remove(&id);
}
}
}
pub struct RecvManagedFuture {
sh: Arc<super::Shared>,
waker_id: Option<u32>
}
impl Future for RecvManagedFuture {
type Output = Result<MustHandle, Error>;
fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Self::Output> {
let mut inner = self.sh.lock_inner();
match self.sh.pop(&mut inner) {
Ok(Some(buf)) => {
let ret = MustHandle::new(Arc::clone(&self.sh), buf);
Poll::Ready(Ok(ret))
}
Ok(None) => {
let id = loop {
inner.idgen = inner.idgen.wrapping_add(1);
if !inner.rx_wakers.contains_key(&inner.idgen) {
break inner.idgen;
}
};
inner.rx_wakers.insert(id, ctx.waker().clone());
drop(inner);
self.waker_id = Some(id);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e))
}
}
}
impl Drop for RecvManagedFuture {
fn drop(&mut self) {
if let Some(id) = self.waker_id.take() {
let mut inner = self.sh.lock_inner();
inner.rx_wakers.remove(&id);
}
}
}