apalis_core/backend/poll_strategy/
race_next.rs

1use core::fmt;
2use futures_core::Stream;
3use std::pin::Pin;
4use std::task::Context;
5use std::task::Poll;
6
7/// A stream that polls multiple streams, always returning the first ready item,
8/// and skipping one item from all other streams each round.
9pub struct RaceNext<T> {
10    streams: Vec<Option<Pin<Box<dyn Stream<Item = T> + Send>>>>,
11    pending_skips: Vec<bool>,
12}
13
14impl<T> fmt::Debug for RaceNext<T> {
15    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16        f.debug_struct("RaceNext")
17            .field("active_streams", &self.active_count())
18            .finish()
19    }
20}
21
22impl<T: 'static + Send> RaceNext<T> {
23    /// Create a new RaceNext stream from a vector of streams
24    pub fn new(streams: Vec<Pin<Box<dyn Stream<Item = T> + Send>>>) -> Self {
25        let len = streams.len();
26        Self {
27            streams: streams.into_iter().map(Some).collect(),
28            pending_skips: vec![false; len],
29        }
30    }
31}
32
33impl<T: 'static + Send> Stream for RaceNext<T> {
34    type Item = (usize, T);
35
36    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37        let this = self.as_mut().get_mut();
38
39        // First, handle any pending skips from the previous round
40        for i in 0..this.streams.len() {
41            if this.pending_skips[i] {
42                if let Some(ref mut stream) = this.streams[i] {
43                    match stream.as_mut().poll_next(cx) {
44                        Poll::Ready(Some(_)) => {
45                            // Successfully skipped an item
46                            this.pending_skips[i] = false;
47                        }
48                        Poll::Ready(None) => {
49                            // Stream ended while trying to skip
50                            this.streams[i] = None;
51                            this.pending_skips[i] = false;
52                        }
53                        Poll::Pending => {
54                            // Still pending, keep the skip flag
55                        }
56                    }
57                }
58            }
59        }
60
61        // Now poll for the next ready item
62        let mut any_pending = false;
63        for i in 0..this.streams.len() {
64            // Skip streams that are still pending a skip operation
65            if this.pending_skips[i] {
66                any_pending = true;
67                continue;
68            }
69
70            if let Some(ref mut stream) = this.streams[i] {
71                match stream.as_mut().poll_next(cx) {
72                    Poll::Ready(Some(item)) => {
73                        // Found a ready item! Mark other streams for skipping
74                        for j in 0..this.streams.len() {
75                            if j != i && this.streams[j].is_some() {
76                                this.pending_skips[j] = true;
77                            }
78                        }
79                        return Poll::Ready(Some((i, item)));
80                    }
81                    Poll::Ready(None) => {
82                        // This stream ended, remove it
83                        this.streams[i] = None;
84                    }
85                    Poll::Pending => {
86                        any_pending = true;
87                    }
88                }
89            }
90        }
91
92        // Check if all streams are exhausted
93        if this.streams.iter().all(|s| s.is_none()) {
94            return Poll::Ready(None);
95        }
96
97        if any_pending {
98            Poll::Pending
99        } else {
100            // All remaining streams are exhausted
101            Poll::Ready(None)
102        }
103    }
104}
105
106impl<T> RaceNext<T> {
107    /// Returns the number of active streams remaining
108    #[must_use]
109    pub fn active_count(&self) -> usize {
110        self.streams.iter().filter(|s| s.is_some()).count()
111    }
112
113    /// Checks if any streams are still active
114    #[must_use]
115    pub fn has_active_streams(&self) -> bool {
116        self.streams.iter().any(|s| s.is_some())
117    }
118}