streamies/streamies/
merge_round_robin.rs

1use core::num::NonZeroUsize;
2use core::pin::Pin;
3use core::task::Context;
4use core::task::Poll;
5
6use futures::ready;
7use futures::stream::FusedStream;
8use futures::Stream;
9use pin_project_lite::pin_project;
10
11pin_project! {
12    /// Stream for the [`merge_round_robin`](crate::Streamies::merge_round_robin) method.
13    #[derive(Debug)]
14    #[must_use = "streams do nothing unless polled"]
15    pub struct MergeRoundRobin<St1, St2> {
16        #[pin]
17        first: Option<St1>,
18        #[pin]
19        second: Option<St2>,
20
21        first_nb_ele: NonZeroUsize,
22        second_nb_ele: NonZeroUsize,
23
24        first_count: usize,
25        second_count: usize
26    }
27}
28
29impl<St1, St2> MergeRoundRobin<St1, St2>
30where
31    St1: Stream,
32    St2: Stream<Item = St1::Item>,
33{
34    pub(super) fn new(
35        stream1: St1,
36        stream2: St2,
37        first_nb_ele: usize,
38        second_nb_ele: usize,
39    ) -> Self {
40        Self {
41            first: Some(stream1),
42            second: Some(stream2),
43            first_nb_ele: NonZeroUsize::new(first_nb_ele).expect(
44                "Couldn't convert `first_nb_ele` to `NonZeroUsize`. The value must no be 0",
45            ),
46            second_nb_ele: NonZeroUsize::new(second_nb_ele).expect(
47                "Couldn't convert `second_nb_ele` to `NonZeroUsize`. The value must no be 0",
48            ),
49            first_count: 0,
50            second_count: 0,
51        }
52    }
53}
54
55impl<St1, St2> FusedStream for MergeRoundRobin<St1, St2>
56where
57    St1: FusedStream,
58    St2: FusedStream<Item = St1::Item>,
59{
60    fn is_terminated(&self) -> bool {
61        self.first.as_ref().is_none_or(|s| s.is_terminated())
62            && self.second.as_ref().is_none_or(|s| s.is_terminated())
63    }
64}
65
66impl<St1, St2> Stream for MergeRoundRobin<St1, St2>
67where
68    St1: Stream,
69    St2: Stream<Item = St1::Item>,
70{
71    type Item = St1::Item;
72
73    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74        let mut this = self.project();
75        // Check if second stream has finished it's turn
76        if this.second_count == &mut this.second_nb_ele.get() {
77            // It finished. Let's reset the turns
78            *this.first_count = 0;
79            *this.second_count = 0;
80        }
81
82        // Check if we should be polling from the first stream.
83        // This means:
84        //     - It's our turn to be polled AND the se
85        //     - The stream isn't ended
86        if this.first_count < &mut this.first_nb_ele.get() {
87            if let Some(first) = this.first.as_mut().as_pin_mut() {
88                if let Some(item) = ready!(first.poll_next(cx)) {
89                    // We have an item! Increment the count for the next poll
90                    *this.first_count += 1;
91                    return Poll::Ready(Some(item));
92                }
93
94                // The stream has finished. Let's dispose of the stream
95                this.first.set(None);
96            } else {
97                // The stream is empty. We can just poll `second` now
98                return this
99                    .second
100                    .as_mut()
101                    .as_pin_mut()
102                    .map(|second| second.poll_next(cx))
103                    .unwrap_or_else(|| Poll::Ready(None));
104            }
105        }
106
107        // First stream wasn't polled, so we poll the second stream
108        if let Some(second) = this.second.as_mut().as_pin_mut() {
109            if let Some(item) = ready!(second.poll_next(cx)) {
110                // We have an item! Increment the count for the next poll
111                *this.second_count += 1;
112                return Poll::Ready(Some(item));
113            }
114
115            // The stream has finished. Let's dispose of the stream
116            this.second.set(None);
117        }
118
119        // The second stream is empty. We can just poll `first` now
120        this.first
121            .as_mut()
122            .as_pin_mut()
123            .map(|first| first.poll_next(cx))
124            .unwrap_or_else(|| Poll::Ready(None))
125    }
126
127    fn size_hint(&self) -> (usize, Option<usize>) {
128        match &self.first {
129            Some(first) => match &self.second {
130                Some(second) => {
131                    let first_size = first.size_hint();
132                    let second_size = second.size_hint();
133
134                    (
135                        first_size.0.saturating_add(second_size.0),
136                        match (first_size.1, second_size.1) {
137                            (Some(x), Some(y)) => x.checked_add(y),
138                            _ => None,
139                        },
140                    )
141                }
142                None => first.size_hint(),
143            },
144            None => match &self.second {
145                Some(second) => second.size_hint(),
146                None => (0, Some(0)),
147            },
148        }
149    }
150}