apalis_core/backend/poll_strategy/
race_next.rs1use core::fmt;
2use futures_core::Stream;
3use std::pin::Pin;
4use std::task::Context;
5use std::task::Poll;
6
7pub struct RaceNext<T> {
10 streams: Vec<Option<Pin<Box<dyn Stream<Item = T> + Send>>>>,
11 pending_skips: Vec<bool>,
12}
13
14impl<T> fmt::Debug for RaceNext<T> {
15 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16 f.debug_struct("RaceNext")
17 .field("active_streams", &self.active_count())
18 .finish()
19 }
20}
21
22impl<T: 'static + Send> RaceNext<T> {
23 pub fn new(streams: Vec<Pin<Box<dyn Stream<Item = T> + Send>>>) -> Self {
25 let len = streams.len();
26 Self {
27 streams: streams.into_iter().map(Some).collect(),
28 pending_skips: vec![false; len],
29 }
30 }
31}
32
33impl<T: 'static + Send> Stream for RaceNext<T> {
34 type Item = (usize, T);
35
36 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37 let this = self.as_mut().get_mut();
38
39 for i in 0..this.streams.len() {
41 if this.pending_skips[i] {
42 if let Some(ref mut stream) = this.streams[i] {
43 match stream.as_mut().poll_next(cx) {
44 Poll::Ready(Some(_)) => {
45 this.pending_skips[i] = false;
47 }
48 Poll::Ready(None) => {
49 this.streams[i] = None;
51 this.pending_skips[i] = false;
52 }
53 Poll::Pending => {
54 }
56 }
57 }
58 }
59 }
60
61 let mut any_pending = false;
63 for i in 0..this.streams.len() {
64 if this.pending_skips[i] {
66 any_pending = true;
67 continue;
68 }
69
70 if let Some(ref mut stream) = this.streams[i] {
71 match stream.as_mut().poll_next(cx) {
72 Poll::Ready(Some(item)) => {
73 for j in 0..this.streams.len() {
75 if j != i && this.streams[j].is_some() {
76 this.pending_skips[j] = true;
77 }
78 }
79 return Poll::Ready(Some((i, item)));
80 }
81 Poll::Ready(None) => {
82 this.streams[i] = None;
84 }
85 Poll::Pending => {
86 any_pending = true;
87 }
88 }
89 }
90 }
91
92 if this.streams.iter().all(|s| s.is_none()) {
94 return Poll::Ready(None);
95 }
96
97 if any_pending {
98 Poll::Pending
99 } else {
100 Poll::Ready(None)
102 }
103 }
104}
105
106impl<T> RaceNext<T> {
107 #[must_use]
109 pub fn active_count(&self) -> usize {
110 self.streams.iter().filter(|s| s.is_some()).count()
111 }
112
113 #[must_use]
115 pub fn has_active_streams(&self) -> bool {
116 self.streams.iter().any(|s| s.is_some())
117 }
118}