dora_node_api/event_stream/
merged.rs1use futures::{Stream, StreamExt};
7use futures_concurrency::stream::Merge;
8
9#[derive(Debug)]
11#[allow(clippy::large_enum_variant)]
12pub enum MergedEvent<E> {
13 Dora(super::Event),
15 External(E),
19}
20
21pub enum Either<A, B> {
23 First(A),
25 Second(B),
27}
28
29impl<A> Either<A, A> {
30 pub fn flatten(self) -> A {
32 match self {
33 Either::First(a) => a,
34 Either::Second(a) => a,
35 }
36 }
37}
38
39pub trait MergeExternal<'a, E> {
42 type Item;
44
45 fn merge_external(
50 self,
51 external_events: impl Stream<Item = E> + Unpin + 'a,
52 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>;
53}
54
55pub trait MergeExternalSend<'a, E> {
59 type Item;
61
62 fn merge_external_send(
67 self,
68 external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a,
69 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a>;
70}
71
72impl<'a, E> MergeExternal<'a, E> for super::EventStream
73where
74 E: 'static,
75{
76 type Item = MergedEvent<E>;
77
78 fn merge_external(
79 self,
80 external_events: impl Stream<Item = E> + Unpin + 'a,
81 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
82 let dora = self.map(MergedEvent::Dora);
83 let external = external_events.map(MergedEvent::External);
84 Box::new((dora, external).merge())
85 }
86}
87
88impl<'a, E> MergeExternalSend<'a, E> for super::EventStream
89where
90 E: 'static,
91{
92 type Item = MergedEvent<E>;
93
94 fn merge_external_send(
95 self,
96 external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a,
97 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a> {
98 let dora = self.map(MergedEvent::Dora);
99 let external = external_events.map(MergedEvent::External);
100 Box::new((dora, external).merge())
101 }
102}
103
104impl<'a, E, F, S> MergeExternal<'a, F> for S
105where
106 S: Stream<Item = MergedEvent<E>> + Unpin + 'a,
107 E: 'a,
108 F: 'a,
109{
110 type Item = MergedEvent<Either<E, F>>;
111
112 fn merge_external(
113 self,
114 external_events: impl Stream<Item = F> + Unpin + 'a,
115 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
116 let first = self.map(|e| match e {
117 MergedEvent::Dora(d) => MergedEvent::Dora(d),
118 MergedEvent::External(e) => MergedEvent::External(Either::First(e)),
119 });
120 let second = external_events.map(|e| MergedEvent::External(Either::Second(e)));
121 Box::new((first, second).merge())
122 }
123}
124
125impl<'a, E, F, S> MergeExternalSend<'a, F> for S
126where
127 S: Stream<Item = MergedEvent<E>> + Unpin + Send + Sync + 'a,
128 E: 'a,
129 F: 'a,
130{
131 type Item = MergedEvent<Either<E, F>>;
132
133 fn merge_external_send(
134 self,
135 external_events: impl Stream<Item = F> + Unpin + Send + Sync + 'a,
136 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a> {
137 let first = self.map(|e| match e {
138 MergedEvent::Dora(d) => MergedEvent::Dora(d),
139 MergedEvent::External(e) => MergedEvent::External(Either::First(e)),
140 });
141 let second = external_events.map(|e| MergedEvent::External(Either::Second(e)));
142 Box::new((first, second).merge())
143 }
144}