dora_node_api/event_stream/
merged.rs1use futures::{Stream, StreamExt};
2use futures_concurrency::stream::Merge;
3
4#[derive(Debug)]
5pub enum MergedEvent<E> {
6 Dora(super::Event),
7 External(E),
8}
9
10pub enum Either<A, B> {
11 First(A),
12 Second(B),
13}
14
15impl<A> Either<A, A> {
16 pub fn flatten(self) -> A {
17 match self {
18 Either::First(a) => a,
19 Either::Second(a) => a,
20 }
21 }
22}
23
24pub trait MergeExternal<'a, E> {
26 type Item;
27
28 fn merge_external(
29 self,
30 external_events: impl Stream<Item = E> + Unpin + 'a,
31 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>;
32}
33
34pub trait MergeExternalSend<'a, E> {
35 type Item;
36
37 fn merge_external_send(
38 self,
39 external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a,
40 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a>;
41}
42
43impl<'a, E> MergeExternal<'a, E> for super::EventStream
44where
45 E: 'static,
46{
47 type Item = MergedEvent<E>;
48
49 fn merge_external(
50 self,
51 external_events: impl Stream<Item = E> + Unpin + 'a,
52 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
53 let dora = self.map(MergedEvent::Dora);
54 let external = external_events.map(MergedEvent::External);
55 Box::new((dora, external).merge())
56 }
57}
58
59impl<'a, E> MergeExternalSend<'a, E> for super::EventStream
60where
61 E: 'static,
62{
63 type Item = MergedEvent<E>;
64
65 fn merge_external_send(
66 self,
67 external_events: impl Stream<Item = E> + Unpin + Send + Sync + 'a,
68 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a> {
69 let dora = self.map(MergedEvent::Dora);
70 let external = external_events.map(MergedEvent::External);
71 Box::new((dora, external).merge())
72 }
73}
74
75impl<'a, E, F, S> MergeExternal<'a, F> for S
76where
77 S: Stream<Item = MergedEvent<E>> + Unpin + 'a,
78 E: 'a,
79 F: 'a,
80{
81 type Item = MergedEvent<Either<E, F>>;
82
83 fn merge_external(
84 self,
85 external_events: impl Stream<Item = F> + Unpin + 'a,
86 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
87 let first = self.map(|e| match e {
88 MergedEvent::Dora(d) => MergedEvent::Dora(d),
89 MergedEvent::External(e) => MergedEvent::External(Either::First(e)),
90 });
91 let second = external_events.map(|e| MergedEvent::External(Either::Second(e)));
92 Box::new((first, second).merge())
93 }
94}
95
96impl<'a, E, F, S> MergeExternalSend<'a, F> for S
97where
98 S: Stream<Item = MergedEvent<E>> + Unpin + Send + Sync + 'a,
99 E: 'a,
100 F: 'a,
101{
102 type Item = MergedEvent<Either<E, F>>;
103
104 fn merge_external_send(
105 self,
106 external_events: impl Stream<Item = F> + Unpin + Send + Sync + 'a,
107 ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + Sync + 'a> {
108 let first = self.map(|e| match e {
109 MergedEvent::Dora(d) => MergedEvent::Dora(d),
110 MergedEvent::External(e) => MergedEvent::External(Either::First(e)),
111 });
112 let second = external_events.map(|e| MergedEvent::External(Either::Second(e)));
113 Box::new((first, second).merge())
114 }
115}