1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
use futures::{pin_mut, stream}; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::{ iter, ops::{Range, RangeFrom, RangeInclusive}, pin::Pin, task::{Context, Poll} }; use super::{ DistributedStream, IntoDistributedStream, IntoParallelStream, ParallelStream, StreamTask, StreamTaskAsync }; use crate::{pool::ProcessSend, sink::Sink}; pub trait IteratorExt: Iterator + Sized { fn par(self) -> IterParStream<Self> { IterParStream(self) } fn dist(self) -> IterDistStream<Self> { IterDistStream(self) } } impl<I: Iterator + Sized> IteratorExt for I {} impl_par_dist_rename! { pub struct IterParStream<I>(pub(crate) I); impl<I: Iterator> ParallelStream for IterParStream<I> where I::Item: Send + 'static, { type Item = I::Item; type Task = IterStreamTask<I::Item>; fn size_hint(&self) -> (usize, Option<usize>) { self.0.size_hint() } fn next_task(&mut self) -> Option<Self::Task> { self.0.next().map(IterStreamTask::new) } } } #[pin_project] #[derive(Serialize, Deserialize)] pub struct IterStreamTask<T>(Option<T>); impl<T> IterStreamTask<T> { fn new(t: T) -> Self { Self(Some(t)) } } impl<T> StreamTask for IterStreamTask<T> { type Item = T; type Async = IterStreamTask<T>; fn into_async(self) -> Self::Async { self } } impl<T> StreamTaskAsync for IterStreamTask<T> { type Item = T; fn poll_run( mut self: Pin<&mut Self>, cx: &mut Context, sink: Pin<&mut impl Sink<Item = Self::Item>>, ) -> Poll<()> { let stream = stream::iter(iter::from_fn(|| self.0.take())); pin_mut!(stream); sink.poll_forward(cx, stream) } } impl_par_dist_rename! { impl<Idx> IntoParallelStream for Range<Idx> where Self: Iterator, <Self as Iterator>::Item: Send + 'static, { type ParStream = IterParStream<Self>; type Item = <Self as Iterator>::Item; fn into_par_stream(self) -> Self::ParStream where Self: Sized, { IterParStream(self) } } impl<Idx> IntoParallelStream for RangeFrom<Idx> where Self: Iterator, <Self as Iterator>::Item: Send + 'static, { type ParStream = IterParStream<Self>; type Item = <Self as Iterator>::Item; fn into_par_stream(self) -> Self::ParStream where Self: Sized, { IterParStream(self) } } impl<Idx> IntoParallelStream for RangeInclusive<Idx> where Self: Iterator, <Self as Iterator>::Item: Send + 'static, { type ParStream = IterParStream<Self>; type Item = <Self as Iterator>::Item; fn into_par_stream(self) -> Self::ParStream where Self: Sized, { IterParStream(self) } } }