futures_concurrency/concurrent_stream/
from_stream.rs

1use super::Consumer;
2use crate::concurrent_stream::ConsumerState;
3use crate::prelude::*;
4
5use core::future::{ready, Ready};
6use core::num::NonZeroUsize;
7use core::pin::pin;
8use futures_lite::{Stream, StreamExt};
9
10/// A concurrent for each implementation from a `Stream`
11#[pin_project::pin_project]
12#[derive(Debug)]
13pub struct FromStream<S: Stream> {
14    #[pin]
15    stream: S,
16}
17
18impl<S: Stream> FromStream<S> {
19    pub(crate) fn new(stream: S) -> Self {
20        Self { stream }
21    }
22}
23
24impl<S> ConcurrentStream for FromStream<S>
25where
26    S: Stream,
27{
28    type Item = S::Item;
29    type Future = Ready<Self::Item>;
30
31    async fn drive<C>(self, mut consumer: C) -> C::Output
32    where
33        C: Consumer<Self::Item, Self::Future>,
34    {
35        let mut iter = pin!(self.stream);
36        let mut consumer = pin!(consumer);
37
38        // Concurrently progress the consumer as well as the stream. Whenever
39        // there is an item from the stream available, we submit it to the
40        // consumer and we wait.
41        //
42        // NOTE(yosh): we're relying on the fact that `Stream::next` can be
43        // dropped and recreated freely. That's also true for
44        // `Consumer::progress`; though that is intentional. It should be
45        // possible to write a combinator which does not drop the `Stream::next`
46        // future repeatedly. However for now we're happy to rely on this
47        // property here.
48        loop {
49            // Drive the stream forward
50            let a = async {
51                let item = iter.next().await;
52                State::Item(item)
53            };
54
55            // Drive the consumer forward
56            let b = async {
57                let control_flow = consumer.as_mut().progress().await;
58                State::Progress(control_flow)
59            };
60
61            // If an item is available, submit it to the consumer and wait for
62            // it to be ready.
63            match (b, a).race().await {
64                State::Progress(control_flow) => match control_flow {
65                    ConsumerState::Break => break,
66                    ConsumerState::Continue => continue,
67                    ConsumerState::Empty => match iter.next().await {
68                        Some(item) => match consumer.as_mut().send(ready(item)).await {
69                            ConsumerState::Break => break,
70                            ConsumerState::Empty | ConsumerState::Continue => continue,
71                        },
72                        None => break,
73                    },
74                },
75                State::Item(Some(item)) => match consumer.as_mut().send(ready(item)).await {
76                    ConsumerState::Break => break,
77                    ConsumerState::Empty | ConsumerState::Continue => continue,
78                },
79                State::Item(None) => break,
80            }
81        }
82
83        // We will no longer receive items from the underlying stream, which
84        // means we're ready to wait for the consumer to finish up.
85        consumer.as_mut().flush().await
86    }
87
88    fn concurrency_limit(&self) -> Option<NonZeroUsize> {
89        None
90    }
91
92    fn size_hint(&self) -> (usize, Option<usize>) {
93        self.stream.size_hint()
94    }
95}
96
97enum State<T> {
98    Progress(super::ConsumerState),
99    Item(T),
100}