edfsm_machine/
adapter.rs

1use crate::error::Result;
2use core::{future::Future, marker::PhantomData};
3
4/// A trait to intercept messages in a `Machine` for logging and outbound communication.
5///
6/// Adapters can be combined and this is the basis of a wiring scheme for machines.  
7/// For the absence of doubt, all `Adapter`s are `Send` meaning they can be part of the
8/// state of a task in a multithreaded environment.
9pub trait Adapter: Send {
10    type Item;
11
12    /// Forward the given item to an asynchronous consumer, possibly converting the type
13    /// or possibly dropping the item if it cannot be converted.
14    fn notify(&mut self, a: Self::Item) -> impl Future<Output = ()> + Send
15    where
16        Self::Item: 'static;
17
18    /// Clone the referenced item and then forward it to an asynchonous consumer.
19    /// The clone operation can is avoid in the `Placeholder` implementation.
20    fn clone_notify(&mut self, a: &Self::Item) -> impl Future<Output = ()> + Send
21    where
22        Self::Item: Clone + 'static,
23    {
24        self.notify(a.clone())
25    }
26
27    /// Combine this with another adapter. The notify call is delegated to both adapters.
28    fn merge<T>(self, other: T) -> impl Adapter<Item = Self::Item>
29    where
30        T: Adapter<Item = Self::Item>,
31        Self: Sized + Send,
32        Self::Item: Send + Clone,
33    {
34        Merge {
35            first: self,
36            next: other,
37        }
38    }
39
40    /// Create an adapter that maps items with an optional function.
41    /// `Some` values are passed on, analogous to `Iterator::filter_map`.
42    fn with_filter_map<A>(
43        self,
44        func: impl Fn(A) -> Option<Self::Item> + Send,
45    ) -> impl Adapter<Item = A>
46    where
47        Self: Sized + Send,
48        Self::Item: Send + 'static,
49        A: Send,
50    {
51        FilterMap {
52            func,
53            inner: self,
54            marker: PhantomData,
55        }
56    }
57
58    /// Create an adapter that maps each item with a function.
59    fn with_map<A>(self, func: impl Fn(A) -> Self::Item + Send) -> impl Adapter<Item = A>
60    where
61        Self: Sized + Send,
62        Self::Item: Send + 'static,
63        A: Send,
64    {
65        self.with_filter_map(move |a| Some(func(a)))
66    }
67
68    /// Create an adapter that converts each item from another type.
69    /// This relies on an `Into` implementation for the conversion.
70    fn adapt<A>(self) -> impl Adapter<Item = A>
71    where
72        Self: Sized + Send,
73        Self::Item: Send + 'static,
74        A: Into<Self::Item> + Send,
75    {
76        self.with_filter_map::<A>(move |a| Some(a.into()))
77    }
78
79    /// Create an adapter that fallibly converts each item from another type.
80    /// This relies on an `TryInto` implementation for the conversion.
81    fn adapt_fallible<A>(self) -> impl Adapter<Item = A>
82    where
83        Self: Sized + Send,
84        Self::Item: Send + 'static,
85        A: TryInto<Self::Item> + Send,
86    {
87        self.with_filter_map::<A>(move |a| a.try_into().ok())
88    }
89}
90
91/// A  placeholder for an `Adapter` and/or `Feed`.
92///
93/// As an `Adapter` this discards all items. As a `Feed` it provides no items.
94#[derive(Debug)]
95pub struct Placeholder<Event>(PhantomData<Event>);
96
97impl<A> Default for Placeholder<A> {
98    fn default() -> Self {
99        Self(PhantomData)
100    }
101}
102
103impl<A> Adapter for Placeholder<A>
104where
105    A: Send,
106{
107    type Item = A;
108
109    /// Discard the item
110    async fn notify(&mut self, _e: Self::Item) {}
111
112    /// Ignore the reference and avoid the clone.
113    #[allow(clippy::manual_async_fn)]
114    fn clone_notify(&mut self, _a: &Self::Item) -> impl Future<Output = ()> + Send {
115        async {}
116    }
117
118    /// Replace this placeholder with the given adapter.
119    fn merge<N>(self, other: N) -> impl Adapter<Item = Self::Item>
120    where
121        N: Adapter<Item = Self::Item>,
122    {
123        other
124    }
125}
126
127/// An `Adapter` that bifucates notifications.  This contains two downstream adapters.
128#[derive(Debug)]
129pub struct Merge<S, T> {
130    first: S,
131    next: T,
132}
133
134impl<A, S, T> Adapter for Merge<S, T>
135where
136    S: Adapter<Item = A> + Send,
137    T: Adapter<Item = A> + Send,
138    A: Send + Clone,
139{
140    type Item = A;
141
142    async fn notify(&mut self, a: Self::Item)
143    where
144        Self::Item: 'static,
145    {
146        self.first.notify(a.clone()).await;
147        self.next.notify(a).await
148    }
149}
150
151/// An `Adapter` that passes each item through an optional function
152/// and passes the `Some` values on.
153#[derive(Debug)]
154pub struct FilterMap<A, F, G> {
155    func: F,
156    inner: G,
157    marker: PhantomData<A>,
158}
159
160impl<F, G, A, B> Adapter for FilterMap<A, F, G>
161where
162    F: Fn(A) -> Option<B> + Send,
163    B: Send + 'static,
164    G: Adapter<Item = B> + Send,
165    A: Send,
166{
167    type Item = A;
168
169    async fn notify(&mut self, a: Self::Item)
170    where
171        Self::Item: 'static,
172    {
173        if let Some(b) = (self.func)(a) {
174            self.inner.notify(b).await;
175        }
176    }
177}
178
179/// Implement `Adapter` for a vector
180#[cfg(feature = "std")]
181impl<A> Adapter for std::vec::Vec<A>
182where
183    A: Send,
184{
185    type Item = A;
186
187    async fn notify(&mut self, a: Self::Item)
188    where
189        Self::Item: 'static,
190    {
191        self.push(a);
192    }
193}
194
195/// Implement `Feed` for a vector
196#[cfg(feature = "std")]
197impl<A> Feed for std::vec::Vec<A>
198where
199    A: Clone + Send + Sync + 'static,
200{
201    type Item = A;
202
203    async fn feed(&self, output: &mut impl Adapter<Item = Self::Item>) -> Result<()> {
204        for a in self.iter().cloned() {
205            output.notify(a).await;
206        }
207        Ok(())
208    }
209}
210
211/// Implementations of  `Adapter` for tokio channels.
212#[cfg(feature = "tokio")]
213pub mod adapt_tokio {
214    use crate::adapter::Adapter;
215    use tokio::sync::{broadcast, mpsc};
216
217    impl<A> Adapter for mpsc::Sender<A>
218    where
219        A: Send,
220    {
221        type Item = A;
222
223        async fn notify(&mut self, a: Self::Item) {
224            let _ = self.send(a).await;
225        }
226    }
227
228    impl<A> Adapter for broadcast::Sender<A>
229    where
230        A: Send,
231    {
232        type Item = A;
233
234        async fn notify(&mut self, a: Self::Item) {
235            let _ = self.send(a);
236        }
237    }
238}
239
240#[cfg(feature = "async-broadcast")]
241impl<A> Adapter for async_broadcast::Sender<A>
242where
243    A: Send + Clone,
244{
245    type Item = A;
246
247    async fn notify(&mut self, a: Self::Item) {
248        let _ = self.broadcast(a).await;
249    }
250}
251
252/// A source of messages that can `feed` an `Adapter`.
253pub trait Feed {
254    type Item;
255
256    /// Send a stream of messages into an adapter.
257    fn feed(
258        &self,
259        output: &mut impl Adapter<Item = Self::Item>,
260    ) -> impl Future<Output = Result<()>> + Send;
261}
262
263impl<A> Feed for Placeholder<A>
264where
265    A: Send,
266    Self: Sync,
267{
268    type Item = A;
269
270    async fn feed(&self, _: &mut impl Adapter<Item = Self::Item>) -> Result<()> {
271        Ok(())
272    }
273}
274
275/// Implementations of `Adapter` for streambed
276#[cfg(feature = "streambed")]
277mod adapt_streambed {
278    use crate::{
279        adapter::{Adapter, Feed},
280        error::Result,
281    };
282    use futures_util::StreamExt;
283    use streambed_codec::{Codec, CommitLog, LogAdapter};
284
285    impl<L, C, A> Feed for LogAdapter<L, C, A>
286    where
287        C: Codec<A> + Sync + Send,
288        L: CommitLog + Sync + Send,
289        A: Send + Sync + 'static,
290    {
291        type Item = A;
292
293        async fn feed(&self, output: &mut impl Adapter<Item = Self::Item>) -> Result<()> {
294            let mut s = self.history().await;
295            while let Some(a) = s.next().await {
296                output.notify(a).await;
297            }
298            Ok(())
299        }
300    }
301
302    impl<L, C, A> Adapter for LogAdapter<L, C, A>
303    where
304        C: Codec<A> + Sync + Send,
305        L: CommitLog + Sync + Send,
306        A: Sync + Send,
307    {
308        type Item = A;
309
310        async fn notify(&mut self, a: Self::Item)
311        where
312            Self::Item: 'static,
313        {
314            let _ = self.produce(a).await;
315        }
316    }
317}