futures_concurrency/concurrent_stream/
from_stream.rs1use super::Consumer;
2use crate::concurrent_stream::ConsumerState;
3use crate::prelude::*;
4
5use core::future::{ready, Ready};
6use core::num::NonZeroUsize;
7use core::pin::pin;
8use futures_lite::{Stream, StreamExt};
9
10#[pin_project::pin_project]
12#[derive(Debug)]
13pub struct FromStream<S: Stream> {
14 #[pin]
15 stream: S,
16}
17
18impl<S: Stream> FromStream<S> {
19 pub(crate) fn new(stream: S) -> Self {
20 Self { stream }
21 }
22}
23
24impl<S> ConcurrentStream for FromStream<S>
25where
26 S: Stream,
27{
28 type Item = S::Item;
29 type Future = Ready<Self::Item>;
30
31 async fn drive<C>(self, mut consumer: C) -> C::Output
32 where
33 C: Consumer<Self::Item, Self::Future>,
34 {
35 let mut iter = pin!(self.stream);
36 let mut consumer = pin!(consumer);
37
38 loop {
49 let a = async {
51 let item = iter.next().await;
52 State::Item(item)
53 };
54
55 let b = async {
57 let control_flow = consumer.as_mut().progress().await;
58 State::Progress(control_flow)
59 };
60
61 match (b, a).race().await {
64 State::Progress(control_flow) => match control_flow {
65 ConsumerState::Break => break,
66 ConsumerState::Continue => continue,
67 ConsumerState::Empty => match iter.next().await {
68 Some(item) => match consumer.as_mut().send(ready(item)).await {
69 ConsumerState::Break => break,
70 ConsumerState::Empty | ConsumerState::Continue => continue,
71 },
72 None => break,
73 },
74 },
75 State::Item(Some(item)) => match consumer.as_mut().send(ready(item)).await {
76 ConsumerState::Break => break,
77 ConsumerState::Empty | ConsumerState::Continue => continue,
78 },
79 State::Item(None) => break,
80 }
81 }
82
83 consumer.as_mut().flush().await
86 }
87
88 fn concurrency_limit(&self) -> Option<NonZeroUsize> {
89 None
90 }
91
92 fn size_hint(&self) -> (usize, Option<usize>) {
93 self.stream.size_hint()
94 }
95}
96
97enum State<T> {
98 Progress(super::ConsumerState),
99 Item(T),
100}