use std::ops::ControlFlow;
pub mod oneshot;
pub mod parker;
#[cfg(test)]
mod tests;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub enum Sent {
Closed,
Ok,
}
impl From<ControlFlow<Sent>> for Sent {
fn from(value: ControlFlow<Sent>) -> Self {
match value {
ControlFlow::Continue(_) => Sent::Ok,
ControlFlow::Break(v) => v,
}
}
}
pub trait Sender {
type Value;
fn send(&mut self, value: Self::Value) -> ControlFlow<Sent>;
fn may_send(&self) -> bool {
true
}
}
pub struct SenderRef<'a, T> {
pub(super) sender: &'a mut dyn Sender<Value = T>,
pub(super) flag: &'a mut bool,
}
impl<'a, T> SenderRef<'a, T> {
pub fn did_send(&self) -> bool {
*self.flag
}
pub fn send(&mut self, value: T) -> ControlFlow<Sent> {
let result = self.sender.send(value);
*self.flag |= !matches!(result, ControlFlow::Break(Sent::Closed));
result
}
pub fn may_send(&self) -> bool {
self.sender.may_send()
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct ClosedSender<T>(std::marker::PhantomData<fn(T)>);
impl<T> Sender for ClosedSender<T> {
type Value = T;
fn send(&mut self, _value: T) -> ControlFlow<Sent> {
ControlFlow::Break(Sent::Closed)
}
fn may_send(&self) -> bool {
false
}
}
impl<T> Sender for std::sync::mpsc::Sender<T> {
type Value = T;
fn send(&mut self, value: T) -> ControlFlow<Sent> {
if std::sync::mpsc::Sender::send(&*self, value).is_ok() {
ControlFlow::Continue(())
} else {
ControlFlow::Break(Sent::Closed)
}
}
}
#[cfg(feature = "tokio")]
impl<T> Sender for Option<tokio::sync::oneshot::Sender<T>> {
type Value = T;
fn send(&mut self, value: Self::Value) -> ControlFlow<Sent> {
if let Some(sender) = self.take() {
if sender.send(value).is_ok() {
return ControlFlow::Break(Sent::Ok);
}
}
ControlFlow::Break(Sent::Closed)
}
}
#[cfg(feature = "tokio")]
impl<T> Sender for tokio::sync::mpsc::UnboundedSender<T> {
type Value = T;
fn send(&mut self, value: T) -> ControlFlow<Sent> {
if tokio::sync::mpsc::UnboundedSender::send(&*self, value).is_ok() {
ControlFlow::Continue(())
} else {
ControlFlow::Break(Sent::Closed)
}
}
}
#[cfg(feature = "tokio")]
impl<T> Sender for tokio::sync::mpsc::WeakUnboundedSender<T> {
type Value = T;
fn send(&mut self, value: T) -> ControlFlow<Sent> {
if let Some(sender) = self.upgrade() {
if sender.send(value).is_ok() {
return ControlFlow::Continue(());
}
}
ControlFlow::Break(Sent::Closed)
}
}
pub trait ChannelSpec {
type Oneshot<T>;
type Queue<T>;
fn new_oneshot<T: 'static + Send>(
&self,
) -> (Box<dyn Sender<Value = T> + Send>, Self::Oneshot<T>);
fn new_queue<T: 'static + Send>(&self) -> (Box<dyn Sender<Value = T> + Send>, Self::Queue<T>);
}
pub struct SyncChannels;
#[cfg(feature = "tokio")]
pub struct TokioChannels;
impl ChannelSpec for SyncChannels {
type Oneshot<T> = (self::oneshot::Receiver<T>, self::parker::Parker);
type Queue<T> = std::sync::mpsc::Receiver<T>;
fn new_oneshot<T: 'static + Send>(
&self,
) -> (Box<dyn Sender<Value = T> + Send>, Self::Oneshot<T>) {
let (send, recv) = self::oneshot::channel();
let (unparker, parker) = self::parker::new(Some(send));
(Box::new(unparker), (recv, parker))
}
fn new_queue<T: 'static + Send>(&self) -> (Box<dyn Sender<Value = T> + Send>, Self::Queue<T>) {
let (send, recv) = std::sync::mpsc::channel();
(Box::new(send), recv)
}
}
#[cfg(feature = "tokio")]
impl ChannelSpec for TokioChannels {
type Oneshot<T> = tokio::sync::oneshot::Receiver<T>;
type Queue<T> = tokio::sync::mpsc::UnboundedReceiver<T>;
fn new_oneshot<T: 'static + Send>(
&self,
) -> (Box<dyn Sender<Value = T> + Send>, Self::Oneshot<T>) {
let (send, recv) = tokio::sync::oneshot::channel();
(Box::new(Some(send)), recv)
}
fn new_queue<T: 'static + Send>(&self) -> (Box<dyn Sender<Value = T> + Send>, Self::Queue<T>) {
let (send, recv) = tokio::sync::mpsc::unbounded_channel();
(Box::new(send), recv)
}
}