use std::{
ops::{Deref, DerefMut},
pin::Pin,
};
use anyhow::Result;
use async_trait::async_trait;
use futures::{TryStream, TryStreamExt};
pub struct With<P, F, O> {
p: P,
f: F,
_marker: std::marker::PhantomData<O>,
}
#[async_trait]
impl<P, F, Input, Output> Publisher<Input> for With<P, F, Output>
where
P: Publisher<Output> + Sync,
Input: Sync,
Output: Send + Sync,
F: Fn(&Input) -> Result<Output> + Sync,
{
async fn publish(&self, payload: &Input) -> Result<()> {
let transformed = (self.f)(payload)?;
self.p.publish(&transformed).await
}
async fn close(&self) -> Result<()> {
self.p.close().await
}
}
impl<T: ?Sized, Item> PublisherExt<Item> for T where T: Publisher<Item> {}
#[async_trait]
pub trait PublisherExt<T>: Publisher<T> {
fn with<F, Input, Output>(self, f: F) -> With<Self, F, Output>
where
F: Fn(&Input) -> Result<Output>,
Self: Sized,
{
With {
p: self,
f,
_marker: std::marker::PhantomData,
}
}
async fn publish_all<S: TryStream<Ok = T, Error = anyhow::Error> + Send + Unpin>(
&self,
stream: S,
max_concurrency: impl Into<Option<usize>> + Send,
) -> Result<()>
where
T: Send,
{
stream
.try_for_each_concurrent(max_concurrency, |item| async move {
self.publish(&item).await?;
Ok(())
})
.await?;
Ok(())
}
}
#[async_trait]
pub trait Publisher<T> {
async fn publish(&self, payload: &T) -> Result<()>;
async fn close(&self) -> Result<()>;
}
#[async_trait]
impl<T: Sync, P: ?Sized + Publisher<T> + Sync> Publisher<T> for Box<P> {
async fn publish(&self, payload: &T) -> Result<()> {
(**self).publish(payload).await
}
async fn close(&self) -> Result<()> {
(**self).close().await
}
}
#[async_trait]
impl<T: Sync, P> Publisher<T> for Pin<P>
where
P: DerefMut + Unpin + Sync,
P::Target: Publisher<T> + Sync,
{
async fn publish(&self, payload: &T) -> Result<()> {
self.deref().publish(payload).await
}
async fn close(&self) -> Result<()> {
self.deref().close().await
}
}