paladin/queue/
publisher.rs

1use std::{
2    ops::{Deref, DerefMut},
3    pin::Pin,
4};
5
6use anyhow::Result;
7use async_trait::async_trait;
8use futures::{TryStream, TryStreamExt};
9
10/// Publisher for the [`PublisherExt::with`] function.
11pub struct With<P, F, O> {
12    p: P,
13    f: F,
14    _marker: std::marker::PhantomData<O>,
15}
16
17#[async_trait]
18impl<P, F, Input, Output> Publisher<Input> for With<P, F, Output>
19where
20    P: Publisher<Output> + Sync,
21    Input: Sync,
22    Output: Send + Sync,
23    F: Fn(&Input) -> Result<Output> + Sync,
24{
25    async fn publish(&self, payload: &Input) -> Result<()> {
26        let transformed = (self.f)(payload)?;
27        self.p.publish(&transformed).await
28    }
29
30    async fn close(&self) -> Result<()> {
31        self.p.close().await
32    }
33}
34
35impl<T: ?Sized, Item> PublisherExt<Item> for T where T: Publisher<Item> {}
36
37/// Extension trait for [`Publisher`].
38///
39/// This trait provides additional functionality for [`Publisher`], similar to
40/// how [`futures::SinkExt`] does for [`futures::Sink`].
41#[async_trait]
42pub trait PublisherExt<T>: Publisher<T> {
43    /// Compose a function in _front_ of the publisher.
44    fn with<F, Input, Output>(self, f: F) -> With<Self, F, Output>
45    where
46        F: Fn(&Input) -> Result<Output>,
47        Self: Sized,
48    {
49        With {
50            p: self,
51            f,
52            _marker: std::marker::PhantomData,
53        }
54    }
55
56    /// Drive the given stream to completion, publishing each yielded item
57    /// concurrently.
58    async fn publish_all<S: TryStream<Ok = T, Error = anyhow::Error> + Send + Unpin>(
59        &self,
60        stream: S,
61        max_concurrency: impl Into<Option<usize>> + Send,
62    ) -> Result<()>
63    where
64        T: Send,
65    {
66        stream
67            .try_for_each_concurrent(max_concurrency, |item| async move {
68                self.publish(&item).await?;
69                Ok(())
70            })
71            .await?;
72
73        Ok(())
74    }
75}
76
77/// A non-locking replacement for [`Sink`](futures::Sink).
78///
79/// Interactions with a [`Sink`](futures::Sink) generally require exclusive
80/// mutable access to the underlying sink, which is problematic when working in
81/// a highly concurrent system. While implementations of
82/// [`Sink`](futures::Sink), like
83/// [`futures::channel::mpsc::Sender`](futures::channel::mpsc::Sender), are
84/// generally cloneable, the [`Sink`](futures::Sink) trait itself does not
85/// require any such guarantees, which means that code that simply relies on
86/// [`Sink`](futures::Sink) may not make any assumptions about the ability to
87/// clone the underlying [`Sink`](futures::Sink). As such, we provide a trait
88/// that is functionally equivalent to [`Sink`](futures::Sink), but does not
89/// require exclusive mutable access to publish messages.
90#[async_trait]
91pub trait Publisher<T> {
92    /// Publish a message.
93    ///
94    /// The implementation should be take care of serializing the payload before
95    /// publishing.
96    async fn publish(&self, payload: &T) -> Result<()>;
97
98    /// Close the publisher, signaling to any receivers that no more messages
99    /// will be published.
100    async fn close(&self) -> Result<()>;
101}
102
103#[async_trait]
104impl<T: Sync, P: ?Sized + Publisher<T> + Sync> Publisher<T> for Box<P> {
105    async fn publish(&self, payload: &T) -> Result<()> {
106        (**self).publish(payload).await
107    }
108
109    async fn close(&self) -> Result<()> {
110        (**self).close().await
111    }
112}
113
114#[async_trait]
115impl<T: Sync, P> Publisher<T> for Pin<P>
116where
117    P: DerefMut + Unpin + Sync,
118    P::Target: Publisher<T> + Sync,
119{
120    async fn publish(&self, payload: &T) -> Result<()> {
121        self.deref().publish(payload).await
122    }
123
124    async fn close(&self) -> Result<()> {
125        self.deref().close().await
126    }
127}