1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge;

pub enum MergedEvent<E> {
    Dora(super::Event),
    External(E),
}

pub enum Either<A, B> {
    First(A),
    Second(B),
}

impl<A> Either<A, A> {
    pub fn flatten(self) -> A {
        match self {
            Either::First(a) => a,
            Either::Second(a) => a,
        }
    }
}

pub trait MergeExternal<'a, E> {
    type Item;

    fn merge_external(
        self,
        external_events: impl Stream<Item = E> + Send + Unpin + 'a,
    ) -> Box<dyn Stream<Item = Self::Item> + Send + Unpin + 'a>;
}

impl<'a, E> MergeExternal<'a, E> for super::EventStream
where
    E: 'static,
{
    type Item = MergedEvent<E>;

    fn merge_external(
        self,
        external_events: impl Stream<Item = E> + Send + Unpin + 'a,
    ) -> Box<dyn Stream<Item = Self::Item> + Send + Unpin + 'a> {
        let dora = self.map(MergedEvent::Dora);
        let external = external_events.map(MergedEvent::External);
        Box::new((dora, external).merge())
    }
}

impl<'a, E, F, S> MergeExternal<'a, F> for S
where
    S: Stream<Item = MergedEvent<E>> + Send + Unpin + 'a,
    E: 'a,
    F: 'a,
{
    type Item = MergedEvent<Either<E, F>>;

    fn merge_external(
        self,
        external_events: impl Stream<Item = F> + Send + Unpin + 'a,
    ) -> Box<dyn Stream<Item = Self::Item> + Send + Unpin + 'a> {
        let first = self.map(|e| match e {
            MergedEvent::Dora(d) => MergedEvent::Dora(d),
            MergedEvent::External(e) => MergedEvent::External(Either::First(e)),
        });
        let second = external_events.map(|e| MergedEvent::External(Either::Second(e)));
        Box::new((first, second).merge())
    }
}