1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
use futures::Stream; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::{ ops::{Range, RangeFrom, RangeInclusive}, pin::Pin, task::{Context, Poll} }; use super::{ DistributedStream, IntoDistributedStream, IntoParallelStream, ParallelStream, StreamTask }; use crate::pool::ProcessSend; pub trait IteratorExt: Iterator + Sized { #[inline] fn par(self) -> IterParStream<Self> { IterParStream(self) } #[inline] fn dist(self) -> IterDistStream<Self> { IterDistStream(self) } } impl<I: Iterator + Sized> IteratorExt for I {} impl_par_dist_rename! { #[pin_project] pub struct IterParStream<I>(pub(crate) I); impl<I: Iterator> ParallelStream for IterParStream<I> where I::Item: Send + 'static, { type Item = I::Item; type Task = IterStreamTask<I::Item>; #[inline] fn size_hint(&self) -> (usize, Option<usize>) { self.0.size_hint() } #[inline] fn next_task(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Task>> { Poll::Ready(self.0.next().map(IterStreamTask::new)) } } } #[pin_project] #[derive(Serialize, Deserialize)] pub struct IterStreamTask<T>(Option<T>); impl<T> IterStreamTask<T> { #[inline] fn new(t: T) -> Self { Self(Some(t)) } } impl<T> StreamTask for IterStreamTask<T> { type Item = T; type Async = IterStreamTask<T>; #[inline] fn into_async(self) -> Self::Async { self } } impl<T> Stream for IterStreamTask<T> { type Item = T; #[inline] fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> { Poll::Ready(self.project().0.take()) } } impl_par_dist_rename! { impl<Idx> IntoParallelStream for Range<Idx> where Self: Iterator, <Self as Iterator>::Item: Send + 'static, { type ParStream = IterParStream<Self>; type Item = <Self as Iterator>::Item; #[inline] fn into_par_stream(self) -> Self::ParStream where Self: Sized, { IterParStream(self) } } impl<Idx> IntoParallelStream for RangeFrom<Idx> where Self: Iterator, <Self as Iterator>::Item: Send + 'static, { type ParStream = IterParStream<Self>; type Item = <Self as Iterator>::Item; #[inline] fn into_par_stream(self) -> Self::ParStream where Self: Sized, { IterParStream(self) } } impl<Idx> IntoParallelStream for RangeInclusive<Idx> where Self: Iterator, <Self as Iterator>::Item: Send + 'static, { type ParStream = IterParStream<Self>; type Item = <Self as Iterator>::Item; #[inline] fn into_par_stream(self) -> Self::ParStream where Self: Sized, { IterParStream(self) } } }