1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
use futures::{pin_mut, stream}; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::{ iter, ops::{Range, RangeFrom, RangeInclusive}, pin::Pin, task::{Context, Poll} }; use super::{DistributedStream, IntoDistributedStream, StreamTask, StreamTaskAsync}; use crate::{pool::ProcessSend, sink::Sink}; pub trait IteratorExt: Iterator + Sized { fn dist(self) -> IterIter<Self> { IterIter(self) } } impl<I: Iterator + Sized> IteratorExt for I {} pub struct IterIter<I>(pub(super) I); impl<I: Iterator> DistributedStream for IterIter<I> where I::Item: ProcessSend, { type Item = I::Item; type Task = IterIterTask<I::Item>; fn size_hint(&self) -> (usize, Option<usize>) { self.0.size_hint() } fn next_task(&mut self) -> Option<Self::Task> { self.0.next().map(IterIterTask::new) } } #[pin_project] #[derive(Serialize, Deserialize)] pub struct IterIterTask<T>(Option<T>); impl<T> IterIterTask<T> { fn new(t: T) -> Self { Self(Some(t)) } } impl<T> StreamTask for IterIterTask<T> { type Item = T; type Async = IterIterTask<T>; fn into_async(self) -> Self::Async { self } } impl<T> StreamTaskAsync for IterIterTask<T> { type Item = T; fn poll_run( mut self: Pin<&mut Self>, cx: &mut Context, sink: Pin<&mut impl Sink<Self::Item>>, ) -> Poll<()> { let stream = stream::iter(iter::from_fn(|| self.0.take())); pin_mut!(stream); sink.poll_forward(cx, stream) } } impl<Idx> IntoDistributedStream for Range<Idx> where Self: Iterator, <Self as Iterator>::Item: ProcessSend, { type DistStream = IterIter<Self>; type Item = <Self as Iterator>::Item; fn into_dist_stream(self) -> Self::DistStream where Self: Sized, { IterIter(self) } } impl<Idx> IntoDistributedStream for RangeFrom<Idx> where Self: Iterator, <Self as Iterator>::Item: ProcessSend, { type DistStream = IterIter<Self>; type Item = <Self as Iterator>::Item; fn into_dist_stream(self) -> Self::DistStream where Self: Sized, { IterIter(self) } } impl<Idx> IntoDistributedStream for RangeInclusive<Idx> where Self: Iterator, <Self as Iterator>::Item: ProcessSend, { type DistStream = IterIter<Self>; type Item = <Self as Iterator>::Item; fn into_dist_stream(self) -> Self::DistStream where Self: Sized, { IterIter(self) } }