use super::Channel;
use crate::error::SendError as LifelineSendError;
use crate::{error::type_name, impl_channel_clone, impl_channel_take};
use async_trait::async_trait;
use log::debug;
use std::fmt::Debug;
use tokio::sync::{broadcast, mpsc, oneshot, watch};
impl<T: Send + 'static> Channel for mpsc::Sender<T> {
type Tx = Self;
type Rx = mpsc::Receiver<T>;
fn channel(capacity: usize) -> (Self::Tx, Self::Rx) {
mpsc::channel(capacity)
}
fn default_capacity() -> usize {
16
}
}
impl_channel_clone!(mpsc::Sender<T>);
impl_channel_take!(mpsc::Receiver<T>);
#[async_trait]
impl<T> crate::Sender<T> for mpsc::Sender<T>
where
T: Debug + Send,
{
async fn send(&mut self, value: T) -> Result<(), LifelineSendError<T>> {
mpsc::Sender::send(self, value)
.await
.map_err(|err| LifelineSendError::Return(err.0))
}
}
#[async_trait]
impl<T> crate::Receiver<T> for mpsc::Receiver<T>
where
T: Debug + Send,
{
async fn recv(&mut self) -> Option<T> {
mpsc::Receiver::recv(self).await
}
}
impl<T: Send + Clone + 'static> Channel for broadcast::Sender<T> {
type Tx = Self;
type Rx = broadcast::Receiver<T>;
fn channel(capacity: usize) -> (Self::Tx, Self::Rx) {
broadcast::channel(capacity)
}
fn default_capacity() -> usize {
16
}
fn clone_rx(rx: &mut Option<Self::Rx>, tx: Option<&Self::Tx>) -> Option<Self::Rx> {
rx.take().or_else(|| tx.map(|tx| tx.subscribe()))
}
}
impl_channel_clone!(broadcast::Sender<T>);
impl_channel_take!(broadcast::Receiver<T>);
#[async_trait]
impl<T> crate::Sender<T> for broadcast::Sender<T>
where
T: Debug + Send,
{
async fn send(&mut self, value: T) -> Result<(), LifelineSendError<T>> {
broadcast::Sender::send(self, value)
.map(|_| ())
.map_err(|err| LifelineSendError::Return(err.0))
}
}
#[async_trait]
impl<T> crate::Receiver<T> for broadcast::Receiver<T>
where
T: Clone + Debug + Send,
{
async fn recv(&mut self) -> Option<T> {
loop {
let result = broadcast::Receiver::recv(self).await;
match result {
Ok(t) => return Some(t),
Err(broadcast::error::RecvError::Closed) => return None,
Err(broadcast::error::RecvError::Lagged(n)) => {
debug!("LAGGED {} {}", n, type_name::<T>());
continue;
}
}
}
}
}
impl<T: Send + 'static> Channel for oneshot::Sender<T> {
type Tx = Self;
type Rx = oneshot::Receiver<T>;
fn channel(_capacity: usize) -> (Self::Tx, Self::Rx) {
oneshot::channel()
}
fn default_capacity() -> usize {
1
}
}
impl_channel_take!(oneshot::Sender<T>);
impl_channel_take!(oneshot::Receiver<T>);
impl<T> Channel for watch::Sender<T>
where
T: Default + Clone + Send + Sync + 'static,
{
type Tx = Self;
type Rx = watch::Receiver<T>;
fn channel(_capacity: usize) -> (Self::Tx, Self::Rx) {
watch::channel(T::default())
}
fn default_capacity() -> usize {
1
}
}
impl_channel_take!(watch::Sender<T>);
impl_channel_clone!(watch::Receiver<T>);
#[async_trait]
impl<T> crate::Sender<T> for watch::Sender<T>
where
T: Clone + Debug + Send + Sync,
{
async fn send(&mut self, value: T) -> Result<(), LifelineSendError<T>> {
watch::Sender::send(self, value).map_err(|_| LifelineSendError::Closed)
}
}
#[async_trait]
impl<T> crate::Receiver<T> for watch::Receiver<T>
where
T: Clone + Debug + Send + Sync,
{
async fn recv(&mut self) -> Option<T> {
match self.changed().await {
Ok(_) => Some(self.borrow().clone()),
Err(_) => None,
}
}
}