pollable_map/futures/
ordered.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use futures::Stream;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};

/// An unbounded queue of futures imposed a FIFO order while polling one future at a time
/// and returning the output to stream before popping the next future in queue to be polled.
pub struct OrderedFutureSet<F> {
    queue: VecDeque<F>,
    current_future: Option<F>,
    waker: Option<Waker>,
}

impl<F> OrderedFutureSet<F>
where
    F: Future + Send + Unpin + 'static,
{
    /// Constructs a new, empty [`OrderedFutureSet`]
    pub fn new() -> Self {
        Self {
            queue: VecDeque::new(),
            current_future: None,
            waker: None,
        }
    }

    /// Furshes a future to the back of the queue
    pub fn push(&mut self, fut: F) {
        self.queue.push_back(fut);
        if let Some(waker) = self.waker.take() {
            waker.wake();
        }
    }
}

impl<F> Stream for OrderedFutureSet<F>
where
    F: Future + Send + Unpin + 'static,
{
    type Item = F::Output;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let this = &mut *self;

        loop {
            if this.current_future.is_none() {
                let Some(fut) = this.queue.pop_front() else {
                    break;
                };
                this.current_future.replace(fut);
            }

            match this.current_future.as_mut() {
                Some(fut) => {
                    let output = futures::ready!(Pin::new(fut).poll(cx));
                    this.current_future.take();
                    cx.waker().wake_by_ref();
                    return Poll::Ready(Some(output));
                }
                None => {
                    this.waker.replace(cx.waker().clone());
                }
            }
        }

        this.waker.replace(cx.waker().clone());
        Poll::Pending
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (self.queue.len(), None)
    }
}

#[cfg(test)]
mod tests {
    use crate::futures::ordered::OrderedFutureSet;
    use futures::{FutureExt, StreamExt};

    #[test]
    fn fifo_futures() {
        futures::executor::block_on(async move {
            let mut fifo = OrderedFutureSet::new();
            fifo.push(futures::future::ready(1));
            fifo.push(futures::future::ready(2));
            fifo.push(futures::future::ready(4));
            fifo.push(futures::future::ready(3));

            let items = fifo.take(4).collect::<Vec<u8>>().now_or_never().unwrap();

            assert_eq!(items, vec![1, 2, 4, 3]);
        });
    }
}