1use crate::error::Result;
2use core::{future::Future, marker::PhantomData};
3
4pub trait Adapter: Send {
10 type Item;
11
12 fn notify(&mut self, a: Self::Item) -> impl Future<Output = ()> + Send
15 where
16 Self::Item: 'static;
17
18 fn clone_notify(&mut self, a: &Self::Item) -> impl Future<Output = ()> + Send
21 where
22 Self::Item: Clone + 'static,
23 {
24 self.notify(a.clone())
25 }
26
27 fn merge<T>(self, other: T) -> impl Adapter<Item = Self::Item>
29 where
30 T: Adapter<Item = Self::Item>,
31 Self: Sized + Send,
32 Self::Item: Send + Clone,
33 {
34 Merge {
35 first: self,
36 next: other,
37 }
38 }
39
40 fn with_filter_map<A>(
43 self,
44 func: impl Fn(A) -> Option<Self::Item> + Send,
45 ) -> impl Adapter<Item = A>
46 where
47 Self: Sized + Send,
48 Self::Item: Send + 'static,
49 A: Send,
50 {
51 FilterMap {
52 func,
53 inner: self,
54 marker: PhantomData,
55 }
56 }
57
58 fn with_map<A>(self, func: impl Fn(A) -> Self::Item + Send) -> impl Adapter<Item = A>
60 where
61 Self: Sized + Send,
62 Self::Item: Send + 'static,
63 A: Send,
64 {
65 self.with_filter_map(move |a| Some(func(a)))
66 }
67
68 fn adapt<A>(self) -> impl Adapter<Item = A>
71 where
72 Self: Sized + Send,
73 Self::Item: Send + 'static,
74 A: Into<Self::Item> + Send,
75 {
76 self.with_filter_map::<A>(move |a| Some(a.into()))
77 }
78
79 fn adapt_fallible<A>(self) -> impl Adapter<Item = A>
82 where
83 Self: Sized + Send,
84 Self::Item: Send + 'static,
85 A: TryInto<Self::Item> + Send,
86 {
87 self.with_filter_map::<A>(move |a| a.try_into().ok())
88 }
89}
90
91#[derive(Debug)]
95pub struct Placeholder<Event>(PhantomData<Event>);
96
97impl<A> Default for Placeholder<A> {
98 fn default() -> Self {
99 Self(PhantomData)
100 }
101}
102
103impl<A> Adapter for Placeholder<A>
104where
105 A: Send,
106{
107 type Item = A;
108
109 async fn notify(&mut self, _e: Self::Item) {}
111
112 #[allow(clippy::manual_async_fn)]
114 fn clone_notify(&mut self, _a: &Self::Item) -> impl Future<Output = ()> + Send {
115 async {}
116 }
117
118 fn merge<N>(self, other: N) -> impl Adapter<Item = Self::Item>
120 where
121 N: Adapter<Item = Self::Item>,
122 {
123 other
124 }
125}
126
127#[derive(Debug)]
129pub struct Merge<S, T> {
130 first: S,
131 next: T,
132}
133
134impl<A, S, T> Adapter for Merge<S, T>
135where
136 S: Adapter<Item = A> + Send,
137 T: Adapter<Item = A> + Send,
138 A: Send + Clone,
139{
140 type Item = A;
141
142 async fn notify(&mut self, a: Self::Item)
143 where
144 Self::Item: 'static,
145 {
146 self.first.notify(a.clone()).await;
147 self.next.notify(a).await
148 }
149}
150
151#[derive(Debug)]
154pub struct FilterMap<A, F, G> {
155 func: F,
156 inner: G,
157 marker: PhantomData<A>,
158}
159
160impl<F, G, A, B> Adapter for FilterMap<A, F, G>
161where
162 F: Fn(A) -> Option<B> + Send,
163 B: Send + 'static,
164 G: Adapter<Item = B> + Send,
165 A: Send,
166{
167 type Item = A;
168
169 async fn notify(&mut self, a: Self::Item)
170 where
171 Self::Item: 'static,
172 {
173 if let Some(b) = (self.func)(a) {
174 self.inner.notify(b).await;
175 }
176 }
177}
178
179#[cfg(feature = "std")]
181impl<A> Adapter for std::vec::Vec<A>
182where
183 A: Send,
184{
185 type Item = A;
186
187 async fn notify(&mut self, a: Self::Item)
188 where
189 Self::Item: 'static,
190 {
191 self.push(a);
192 }
193}
194
195#[cfg(feature = "std")]
197impl<A> Feed for std::vec::Vec<A>
198where
199 A: Clone + Send + Sync + 'static,
200{
201 type Item = A;
202
203 async fn feed(&self, output: &mut impl Adapter<Item = Self::Item>) -> Result<()> {
204 for a in self.iter().cloned() {
205 output.notify(a).await;
206 }
207 Ok(())
208 }
209}
210
211#[cfg(feature = "tokio")]
213pub mod adapt_tokio {
214 use crate::adapter::Adapter;
215 use tokio::sync::{broadcast, mpsc};
216
217 impl<A> Adapter for mpsc::Sender<A>
218 where
219 A: Send,
220 {
221 type Item = A;
222
223 async fn notify(&mut self, a: Self::Item) {
224 let _ = self.send(a).await;
225 }
226 }
227
228 impl<A> Adapter for broadcast::Sender<A>
229 where
230 A: Send,
231 {
232 type Item = A;
233
234 async fn notify(&mut self, a: Self::Item) {
235 let _ = self.send(a);
236 }
237 }
238}
239
240#[cfg(feature = "async-broadcast")]
241impl<A> Adapter for async_broadcast::Sender<A>
242where
243 A: Send + Clone,
244{
245 type Item = A;
246
247 async fn notify(&mut self, a: Self::Item) {
248 let _ = self.broadcast(a).await;
249 }
250}
251
252pub trait Feed {
254 type Item;
255
256 fn feed(
258 &self,
259 output: &mut impl Adapter<Item = Self::Item>,
260 ) -> impl Future<Output = Result<()>> + Send;
261}
262
263impl<A> Feed for Placeholder<A>
264where
265 A: Send,
266 Self: Sync,
267{
268 type Item = A;
269
270 async fn feed(&self, _: &mut impl Adapter<Item = Self::Item>) -> Result<()> {
271 Ok(())
272 }
273}
274
275#[cfg(feature = "streambed")]
277mod adapt_streambed {
278 use crate::{
279 adapter::{Adapter, Feed},
280 error::Result,
281 };
282 use futures_util::StreamExt;
283 use streambed_codec::{Codec, CommitLog, LogAdapter};
284
285 impl<L, C, A> Feed for LogAdapter<L, C, A>
286 where
287 C: Codec<A> + Sync + Send,
288 L: CommitLog + Sync + Send,
289 A: Send + Sync + 'static,
290 {
291 type Item = A;
292
293 async fn feed(&self, output: &mut impl Adapter<Item = Self::Item>) -> Result<()> {
294 let mut s = self.history().await;
295 while let Some(a) = s.next().await {
296 output.notify(a).await;
297 }
298 Ok(())
299 }
300 }
301
302 impl<L, C, A> Adapter for LogAdapter<L, C, A>
303 where
304 C: Codec<A> + Sync + Send,
305 L: CommitLog + Sync + Send,
306 A: Sync + Send,
307 {
308 type Item = A;
309
310 async fn notify(&mut self, a: Self::Item)
311 where
312 Self::Item: 'static,
313 {
314 let _ = self.produce(a).await;
315 }
316 }
317}