dora_node_api/event_stream/
merged.rs

1//! Merge external stream into an [`EventStream`][super::EventStream].
2//!
3//! Sometimes nodes need to listen to external events, in addition to Dora events.
4//! This module provides support for that by providing the [`MergeExternal`] trait.
5
6use futures::{Stream, StreamExt};
7use futures_concurrency::stream::Merge;
8
9/// A Dora event or an event from an external source.
10#[derive(Debug)]
11#[allow(clippy::large_enum_variant)]
12pub enum MergedEvent<E> {
13    /// A Dora event
14    Dora(super::Event),
15    /// An external event
16    ///
17    /// Yielded by the stream that was merged into the Dora [`EventStream`][super::EventStream].
18    External(E),
19}
20
21/// A general enum to represent a value of two possible types.
22pub enum Either<A, B> {
23    /// Value is of the first type, type `A`.
24    First(A),
25    /// Value is of the second type, type `B`.
26    Second(B),
27}
28
29impl<A> Either<A, A> {
30    /// Unwraps an `Either` instance where both types are identical.
31    pub fn flatten(self) -> A {
32        match self {
33            Either::First(a) => a,
34            Either::Second(a) => a,
35        }
36    }
37}
38
39/// Allows merging an external event stream into an existing event stream.
40// TODO: use impl trait return type once stable
41pub trait MergeExternal<'a, E> {
42    /// The item type yielded from the merged stream.
43    type Item;
44
45    /// Merge the given stream into an existing event stream.
46    ///
47    /// Returns a new event stream that yields items from both streams.
48    /// The ordering between the two streams is not guaranteed.
49    fn merge_external(
50        self,
51        external_events: impl Stream<Item = E> + Unpin + 'a,
52    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>;
53}
54
55/// Allows merging a sendable external event stream into an existing (sendable) event stream.
56///
57/// By implementing [`Send`], the streams can be sent to different threads.
58pub trait MergeExternalSend<'a, E> {
59    /// The item type yielded from the merged stream.
60    type Item;
61
62    /// Merge the given stream into an existing event stream.
63    ///
64    /// Returns a new event stream that yields items from both streams.
65    /// The ordering between the two streams is not guaranteed.
66    fn merge_external_send(
67        self,
68        external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a,
69    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a>;
70}
71
72impl<'a, E> MergeExternal<'a, E> for super::EventStream
73where
74    E: 'static,
75{
76    type Item = MergedEvent<E>;
77
78    fn merge_external(
79        self,
80        external_events: impl Stream<Item = E> + Unpin + 'a,
81    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
82        let dora = self.map(MergedEvent::Dora);
83        let external = external_events.map(MergedEvent::External);
84        Box::new((dora, external).merge())
85    }
86}
87
88impl<'a, E> MergeExternalSend<'a, E> for super::EventStream
89where
90    E: 'static,
91{
92    type Item = MergedEvent<E>;
93
94    fn merge_external_send(
95        self,
96        external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a,
97    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a> {
98        let dora = self.map(MergedEvent::Dora);
99        let external = external_events.map(MergedEvent::External);
100        Box::new((dora, external).merge())
101    }
102}
103
104impl<'a, E, F, S> MergeExternal<'a, F> for S
105where
106    S: Stream<Item = MergedEvent<E>> + Unpin + 'a,
107    E: 'a,
108    F: 'a,
109{
110    type Item = MergedEvent<Either<E, F>>;
111
112    fn merge_external(
113        self,
114        external_events: impl Stream<Item = F> + Unpin + 'a,
115    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
116        let first = self.map(|e| match e {
117            MergedEvent::Dora(d) => MergedEvent::Dora(d),
118            MergedEvent::External(e) => MergedEvent::External(Either::First(e)),
119        });
120        let second = external_events.map(|e| MergedEvent::External(Either::Second(e)));
121        Box::new((first, second).merge())
122    }
123}
124
125impl<'a, E, F, S> MergeExternalSend<'a, F> for S
126where
127    S: Stream<Item = MergedEvent<E>> + Unpin + Send + Sync + 'a,
128    E: 'a,
129    F: 'a,
130{
131    type Item = MergedEvent<Either<E, F>>;
132
133    fn merge_external_send(
134        self,
135        external_events: impl Stream<Item = F> + Unpin + Send + Sync + 'a,
136    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a> {
137        let first = self.map(|e| match e {
138            MergedEvent::Dora(d) => MergedEvent::Dora(d),
139            MergedEvent::External(e) => MergedEvent::External(Either::First(e)),
140        });
141        let second = external_events.map(|e| MergedEvent::External(Either::Second(e)));
142        Box::new((first, second).merge())
143    }
144}