use std::task::{Context, Poll};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::map::{Key, MapSlot};
pub(crate) type LaneTxSlot<T, V> = MapSlot<T, LaneTx<T, V>>;
#[derive(Debug)]
pub struct Lane<T: Key, V> {
tx: LaneTx<T, V>,
rx: LaneRx<T, V>,
}
impl<T: Key, V> Lane<T, V> {
#[inline]
pub(crate) fn from_parts(tx: LaneTx<T, V>, rx: LaneRx<T, V>) -> Self {
assert_eq!(tx.tag(), rx.tag());
Self { tx, rx }
}
#[inline]
pub fn sender(&mut self) -> &mut LaneTx<T, V> {
&mut self.tx
}
#[inline]
pub fn receiver(&mut self) -> &mut LaneRx<T, V> {
&mut self.rx
}
#[inline]
pub fn split(self) -> (LaneTx<T, V>, LaneRx<T, V>) {
(self.tx, self.rx)
}
}
pin_project_lite::pin_project! {
#[derive(Debug, Clone)]
pub struct LaneTx<T: Key, V> {
#[pin]
inner: Sender<(T, V)>,
tag: T,
}
}
impl<T: Key, V> LaneTx<T, V> {
#[inline]
pub(crate) const fn new(inner: Sender<(T, V)>, tag: T) -> Self {
Self { tag, inner }
}
#[inline]
pub async fn send(&mut self, value: V) -> Result<(), SendError<(T, V)>> {
self.inner.send((self.tag.clone(), value)).await
}
#[inline]
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
#[inline]
pub const fn tag(&self) -> &T {
&self.tag
}
#[inline(always)]
pub(crate) fn into_inner(self) -> (T, Sender<(T, V)>) {
(self.tag, self.inner)
}
}
#[derive(Debug)]
pub struct LaneRx<T: Key, V> {
inner: Receiver<(T, V)>,
tx_slot: LaneTxSlot<T, V>,
}
impl<T: Key, V> LaneRx<T, V> {
#[inline]
pub(crate) const fn new(
inner: Receiver<(T, V)>,
tx_slot: MapSlot<T, LaneTx<T, V>>,
) -> Self {
Self { inner, tx_slot }
}
#[inline]
pub async fn recv(&mut self) -> Option<V> {
if self.is_closed() {
return None;
}
let value = self.inner.recv().await;
self.map_value(value)
}
#[inline]
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<V>>
where
T: Unpin,
{
if self.is_closed() {
return Poll::Ready(None);
}
self.inner.poll_recv(cx).map(|v| self.map_value(v))
}
#[inline]
pub const fn is_closed(&self) -> bool {
!self.tx_slot.is_valid()
}
#[inline]
pub fn close(&mut self) {
if !self.is_closed() {
self.inner.close();
self.tx_slot.manual_drop();
}
}
#[inline]
pub const fn tag(&self) -> &T {
self.tx_slot.key()
}
#[inline]
fn map_value(&mut self, value: Option<(T, V)>) -> Option<V> {
match value {
| Some((tag, value)) => {
debug_assert_eq!(self.tag(), &tag);
Some(value)
}
| None => {
self.tx_slot.manual_drop();
None
}
}
}
}