futures_concurrency/collections/
vec.rs1use crate::concurrent_stream::{self, FromStream};
9use crate::prelude::*;
10use crate::utils::{from_iter, FromIter};
11#[cfg(all(feature = "alloc", not(feature = "std")))]
12use alloc::vec::Vec;
13use core::future::Ready;
14
15pub use crate::future::join::vec::Join;
16pub use crate::future::race::vec::Race;
17pub use crate::future::race_ok::vec::{AggregateError, RaceOk};
18pub use crate::future::try_join::vec::TryJoin;
19pub use crate::stream::chain::vec::Chain;
20pub use crate::stream::merge::vec::Merge;
21pub use crate::stream::zip::vec::Zip;
22
23#[derive(Debug)]
25pub struct IntoConcurrentStream<T>(FromStream<FromIter<alloc::vec::IntoIter<T>>>);
26
27impl<T> ConcurrentStream for IntoConcurrentStream<T> {
28 type Item = T;
29
30 type Future = Ready<T>;
31
32 async fn drive<C>(self, consumer: C) -> C::Output
33 where
34 C: concurrent_stream::Consumer<Self::Item, Self::Future>,
35 {
36 self.0.drive(consumer).await
37 }
38
39 fn concurrency_limit(&self) -> Option<core::num::NonZeroUsize> {
40 self.0.concurrency_limit()
41 }
42}
43
44impl<T> concurrent_stream::IntoConcurrentStream for Vec<T> {
45 type Item = T;
46
47 type IntoConcurrentStream = IntoConcurrentStream<T>;
48
49 fn into_co_stream(self) -> Self::IntoConcurrentStream {
50 let stream = from_iter(self);
51 let co_stream = stream.co();
52 IntoConcurrentStream(co_stream)
53 }
54}
55
56#[cfg(test)]
57mod test {
58 use crate::prelude::*;
59
60 #[test]
61 fn collect() {
62 futures_lite::future::block_on(async {
63 let v: Vec<_> = vec![1, 2, 3, 4, 5].into_co_stream().collect().await;
64 assert_eq!(v, &[1, 2, 3, 4, 5]);
65 });
66 }
67}