use async_trait::async_trait;
#[must_use = "transmit do nothing unless polled"]
#[async_trait]
pub trait Transmit {
type Item;
type Error;
async fn transmit(&mut self, item: Self::Item) -> Result<(), Self::Error>
where
Self::Item: 'async_trait;
}
#[async_trait]
impl<T> Transmit for &mut T
where
T: Transmit + Send + ?Sized,
T::Item: Send,
{
type Item = T::Item;
type Error = T::Error;
async fn transmit(&mut self, item: Self::Item) -> Result<(), Self::Error>
where
Self::Item: 'async_trait,
{
let this = &mut **self;
this.transmit(item).await
}
}
#[allow(dead_code)]
pub(crate) fn assert_transmit<I, E, T>(t: T) -> T
where
T: Transmit<Item = I, Error = E>,
{
t
}
#[cfg(feature = "with-async-channel")]
mod async_channel;
#[cfg(feature = "with-async-channel")]
pub use self::async_channel::*;
#[cfg(feature = "with-tokio")]
mod tokio;
#[cfg(feature = "with-tokio")]
pub use self::tokio::*;
#[cfg(feature = "with-sink")]
mod from_sink;
#[cfg(feature = "with-sink")]
pub use from_sink::FromSink;
mod with;
pub use with::With;
mod transmit_map_err;
pub use transmit_map_err::TransmitMapErr;
impl<T: ?Sized> TransmitExt for T where T: Transmit {}
#[cfg(feature = "with-sink")]
pub fn from_sink<S, I>(sink: S) -> FromSink<S, I>
where
S: futures_sink::Sink<I> + Unpin + Send,
I: Send,
S::Error: Send,
{
assert_transmit::<I, S::Error, _>(from_sink::FromSink::from(sink))
}
pub trait TransmitExt: Transmit {
fn with<F, U>(self, f: F) -> with::With<Self, F, Self::Item, U, Self::Error>
where
Self: Sized + Send,
Self::Item: Send,
Self::Error: Send,
F: FnMut(U) -> Self::Item + Send,
U: Send,
{
assert_transmit::<U, Self::Error, _>(with::With::new(self, f))
}
fn transmit_map_err<E, F>(self, f: F) -> TransmitMapErr<Self, F>
where
Self: Sized + Send,
Self::Item: Send,
F: FnOnce(Self::Error) -> E + Send,
{
assert_transmit::<Self::Item, E, _>(TransmitMapErr::new(self, f))
}
}
#[cfg(test)]
mod tests {
use super::assert_transmit;
use super::*;
use anyhow::Result;
use futures::channel::mpsc;
use futures::prelude::*;
use futures_await_test::async_test;
#[test]
fn transmit_mut_ref_ok() -> Result<()> {
struct DummyTransmitter {}
#[async_trait]
impl Transmit for DummyTransmitter {
type Item = String;
type Error = anyhow::Error;
async fn transmit(&mut self, _item: Self::Item) -> Result<(), Self::Error>
where
Self::Item: 'async_trait,
{
unimplemented!();
}
}
let mut t = DummyTransmitter {};
let mut_t = &mut t;
assert_transmit(mut_t);
Ok(())
}
#[cfg(feature = "with-sink")]
#[async_test]
async fn transmit_ext_from_sink_is_transmit() -> Result<()> {
let (s, mut r) = mpsc::unbounded::<&'static str>();
let mut t = assert_transmit(from_sink(s));
t.transmit("Hello").await?;
t.transmit("World").await?;
drop(t);
assert_eq!(r.next().await, Some("Hello"));
assert_eq!(r.next().await, Some("World"));
assert_eq!(r.next().await, None);
Ok(())
}
#[async_test]
async fn transmit_ext_with_is_transmit() -> Result<()> {
let (s, mut r) = mpsc::unbounded::<String>();
struct DummyTransmitter<T> {
sender: mpsc::UnboundedSender<T>,
}
#[async_trait]
impl<T> Transmit for DummyTransmitter<T>
where
T: Send,
{
type Item = T;
type Error = anyhow::Error;
async fn transmit(&mut self, item: Self::Item) -> Result<(), Self::Error>
where
Self::Item: 'async_trait,
{
self.sender.send(item).await.map_err(Into::into)
}
}
let t = assert_transmit(DummyTransmitter { sender: s });
let mut t = t.with(|s| format!("!!!{}!!!", s));
t.transmit("Hello").await?;
t.transmit("World").await?;
drop(t);
assert_eq!(r.next().await, Some("!!!Hello!!!".to_string()));
assert_eq!(r.next().await, Some("!!!World!!!".to_string()));
assert_eq!(r.next().await, None);
Ok(())
}
#[async_test]
async fn transmit_ext_transmit_map_err_is_transmit() -> Result<()> {
struct DummyTransmitter {}
#[async_trait]
impl Transmit for DummyTransmitter {
type Item = &'static str;
type Error = String;
async fn transmit(&mut self, _item: Self::Item) -> Result<(), Self::Error>
where
Self::Item: 'async_trait,
{
Err("Hello World".to_string())
}
}
let mut t = assert_transmit(DummyTransmitter {});
assert_eq!(
"Hello World".to_string(),
t.transmit("Hello").await.err().unwrap()
);
let mut t = t.transmit_map_err(|e| anyhow::anyhow!("!!!{}!!!", e));
assert_eq!(
"!!!Hello World!!!",
format!("{:?}", t.transmit("Hello").await.err().unwrap())
);
Ok(())
}
}