paladin/queue/
publisher.rs1use std::{
2 ops::{Deref, DerefMut},
3 pin::Pin,
4};
5
6use anyhow::Result;
7use async_trait::async_trait;
8use futures::{TryStream, TryStreamExt};
9
10pub struct With<P, F, O> {
12 p: P,
13 f: F,
14 _marker: std::marker::PhantomData<O>,
15}
16
17#[async_trait]
18impl<P, F, Input, Output> Publisher<Input> for With<P, F, Output>
19where
20 P: Publisher<Output> + Sync,
21 Input: Sync,
22 Output: Send + Sync,
23 F: Fn(&Input) -> Result<Output> + Sync,
24{
25 async fn publish(&self, payload: &Input) -> Result<()> {
26 let transformed = (self.f)(payload)?;
27 self.p.publish(&transformed).await
28 }
29
30 async fn close(&self) -> Result<()> {
31 self.p.close().await
32 }
33}
34
35impl<T: ?Sized, Item> PublisherExt<Item> for T where T: Publisher<Item> {}
36
37#[async_trait]
42pub trait PublisherExt<T>: Publisher<T> {
43 fn with<F, Input, Output>(self, f: F) -> With<Self, F, Output>
45 where
46 F: Fn(&Input) -> Result<Output>,
47 Self: Sized,
48 {
49 With {
50 p: self,
51 f,
52 _marker: std::marker::PhantomData,
53 }
54 }
55
56 async fn publish_all<S: TryStream<Ok = T, Error = anyhow::Error> + Send + Unpin>(
59 &self,
60 stream: S,
61 max_concurrency: impl Into<Option<usize>> + Send,
62 ) -> Result<()>
63 where
64 T: Send,
65 {
66 stream
67 .try_for_each_concurrent(max_concurrency, |item| async move {
68 self.publish(&item).await?;
69 Ok(())
70 })
71 .await?;
72
73 Ok(())
74 }
75}
76
77#[async_trait]
91pub trait Publisher<T> {
92 async fn publish(&self, payload: &T) -> Result<()>;
97
98 async fn close(&self) -> Result<()>;
101}
102
103#[async_trait]
104impl<T: Sync, P: ?Sized + Publisher<T> + Sync> Publisher<T> for Box<P> {
105 async fn publish(&self, payload: &T) -> Result<()> {
106 (**self).publish(payload).await
107 }
108
109 async fn close(&self) -> Result<()> {
110 (**self).close().await
111 }
112}
113
114#[async_trait]
115impl<T: Sync, P> Publisher<T> for Pin<P>
116where
117 P: DerefMut + Unpin + Sync,
118 P::Target: Publisher<T> + Sync,
119{
120 async fn publish(&self, payload: &T) -> Result<()> {
121 self.deref().publish(payload).await
122 }
123
124 async fn close(&self) -> Result<()> {
125 self.deref().close().await
126 }
127}