streamies/streamies/
merge_round_robin.rs1use core::num::NonZeroUsize;
2use core::pin::Pin;
3use core::task::Context;
4use core::task::Poll;
5
6use futures::ready;
7use futures::stream::FusedStream;
8use futures::Stream;
9use pin_project_lite::pin_project;
10
11pin_project! {
12 #[derive(Debug)]
14 #[must_use = "streams do nothing unless polled"]
15 pub struct MergeRoundRobin<St1, St2> {
16 #[pin]
17 first: Option<St1>,
18 #[pin]
19 second: Option<St2>,
20
21 first_nb_ele: NonZeroUsize,
22 second_nb_ele: NonZeroUsize,
23
24 first_count: usize,
25 second_count: usize
26 }
27}
28
29impl<St1, St2> MergeRoundRobin<St1, St2>
30where
31 St1: Stream,
32 St2: Stream<Item = St1::Item>,
33{
34 pub(super) fn new(
35 stream1: St1,
36 stream2: St2,
37 first_nb_ele: usize,
38 second_nb_ele: usize,
39 ) -> Self {
40 Self {
41 first: Some(stream1),
42 second: Some(stream2),
43 first_nb_ele: NonZeroUsize::new(first_nb_ele).expect(
44 "Couldn't convert `first_nb_ele` to `NonZeroUsize`. The value must no be 0",
45 ),
46 second_nb_ele: NonZeroUsize::new(second_nb_ele).expect(
47 "Couldn't convert `second_nb_ele` to `NonZeroUsize`. The value must no be 0",
48 ),
49 first_count: 0,
50 second_count: 0,
51 }
52 }
53}
54
55impl<St1, St2> FusedStream for MergeRoundRobin<St1, St2>
56where
57 St1: FusedStream,
58 St2: FusedStream<Item = St1::Item>,
59{
60 fn is_terminated(&self) -> bool {
61 self.first.as_ref().is_none_or(|s| s.is_terminated())
62 && self.second.as_ref().is_none_or(|s| s.is_terminated())
63 }
64}
65
66impl<St1, St2> Stream for MergeRoundRobin<St1, St2>
67where
68 St1: Stream,
69 St2: Stream<Item = St1::Item>,
70{
71 type Item = St1::Item;
72
73 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74 let mut this = self.project();
75 if this.second_count == &mut this.second_nb_ele.get() {
77 *this.first_count = 0;
79 *this.second_count = 0;
80 }
81
82 if this.first_count < &mut this.first_nb_ele.get() {
87 if let Some(first) = this.first.as_mut().as_pin_mut() {
88 if let Some(item) = ready!(first.poll_next(cx)) {
89 *this.first_count += 1;
91 return Poll::Ready(Some(item));
92 }
93
94 this.first.set(None);
96 } else {
97 return this
99 .second
100 .as_mut()
101 .as_pin_mut()
102 .map(|second| second.poll_next(cx))
103 .unwrap_or_else(|| Poll::Ready(None));
104 }
105 }
106
107 if let Some(second) = this.second.as_mut().as_pin_mut() {
109 if let Some(item) = ready!(second.poll_next(cx)) {
110 *this.second_count += 1;
112 return Poll::Ready(Some(item));
113 }
114
115 this.second.set(None);
117 }
118
119 this.first
121 .as_mut()
122 .as_pin_mut()
123 .map(|first| first.poll_next(cx))
124 .unwrap_or_else(|| Poll::Ready(None))
125 }
126
127 fn size_hint(&self) -> (usize, Option<usize>) {
128 match &self.first {
129 Some(first) => match &self.second {
130 Some(second) => {
131 let first_size = first.size_hint();
132 let second_size = second.size_hint();
133
134 (
135 first_size.0.saturating_add(second_size.0),
136 match (first_size.1, second_size.1) {
137 (Some(x), Some(y)) => x.checked_add(y),
138 _ => None,
139 },
140 )
141 }
142 None => first.size_hint(),
143 },
144 None => match &self.second {
145 Some(second) => second.size_hint(),
146 None => (0, Some(0)),
147 },
148 }
149 }
150}