dora_node_api/event_stream/
merged.rs

1use futures::{Stream, StreamExt};
2use futures_concurrency::stream::Merge;
3
4#[derive(Debug)]
5pub enum MergedEvent<E> {
6    Dora(super::Event),
7    External(E),
8}
9
10pub enum Either<A, B> {
11    First(A),
12    Second(B),
13}
14
15impl<A> Either<A, A> {
16    pub fn flatten(self) -> A {
17        match self {
18            Either::First(a) => a,
19            Either::Second(a) => a,
20        }
21    }
22}
23
24// TODO: use impl trait return type once stable
25pub trait MergeExternal<'a, E> {
26    type Item;
27
28    fn merge_external(
29        self,
30        external_events: impl Stream<Item = E> + Unpin + 'a,
31    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>;
32}
33
34pub trait MergeExternalSend<'a, E> {
35    type Item;
36
37    fn merge_external_send(
38        self,
39        external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a,
40    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a>;
41}
42
43impl<'a, E> MergeExternal<'a, E> for super::EventStream
44where
45    E: 'static,
46{
47    type Item = MergedEvent<E>;
48
49    fn merge_external(
50        self,
51        external_events: impl Stream<Item = E> + Unpin + 'a,
52    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
53        let dora = self.map(MergedEvent::Dora);
54        let external = external_events.map(MergedEvent::External);
55        Box::new((dora, external).merge())
56    }
57}
58
59impl<'a, E> MergeExternalSend<'a, E> for super::EventStream
60where
61    E: 'static,
62{
63    type Item = MergedEvent<E>;
64
65    fn merge_external_send(
66        self,
67        external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a,
68    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a> {
69        let dora = self.map(MergedEvent::Dora);
70        let external = external_events.map(MergedEvent::External);
71        Box::new((dora, external).merge())
72    }
73}
74
75impl<'a, E, F, S> MergeExternal<'a, F> for S
76where
77    S: Stream<Item = MergedEvent<E>> + Unpin + 'a,
78    E: 'a,
79    F: 'a,
80{
81    type Item = MergedEvent<Either<E, F>>;
82
83    fn merge_external(
84        self,
85        external_events: impl Stream<Item = F> + Unpin + 'a,
86    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
87        let first = self.map(|e| match e {
88            MergedEvent::Dora(d) => MergedEvent::Dora(d),
89            MergedEvent::External(e) => MergedEvent::External(Either::First(e)),
90        });
91        let second = external_events.map(|e| MergedEvent::External(Either::Second(e)));
92        Box::new((first, second).merge())
93    }
94}
95
96impl<'a, E, F, S> MergeExternalSend<'a, F> for S
97where
98    S: Stream<Item = MergedEvent<E>> + Unpin + Send + Sync + 'a,
99    E: 'a,
100    F: 'a,
101{
102    type Item = MergedEvent<Either<E, F>>;
103
104    fn merge_external_send(
105        self,
106        external_events: impl Stream<Item = F> + Unpin + Send + Sync + 'a,
107    ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a> {
108        let first = self.map(|e| match e {
109            MergedEvent::Dora(d) => MergedEvent::Dora(d),
110            MergedEvent::External(e) => MergedEvent::External(Either::First(e)),
111        });
112        let second = external_events.map(|e| MergedEvent::External(Either::Second(e)));
113        Box::new((first, second).merge())
114    }
115}