amadeus_core/into_par_stream/
iterator.rs1use futures::Stream;
2use pin_project::pin_project;
3use serde::{Deserialize, Serialize};
4use std::{
5 ops::{Range, RangeFrom, RangeInclusive}, pin::Pin, task::{Context, Poll}
6};
7
8use super::{
9 DistributedStream, IntoDistributedStream, IntoParallelStream, ParallelStream, StreamTask
10};
11use crate::pool::ProcessSend;
12
13pub trait IteratorExt: Iterator + Sized {
14 #[inline]
15 fn par(self) -> IterParStream<Self> {
16 IterParStream(self)
17 }
18 #[inline]
19 fn dist(self) -> IterDistStream<Self> {
20 IterDistStream(self)
21 }
22}
23impl<I: Iterator + Sized> IteratorExt for I {}
24
25impl_par_dist_rename! {
26 #[pin_project]
27 pub struct IterParStream<I>(pub(crate) I);
28
29 impl<I: Iterator> ParallelStream for IterParStream<I>
30 where
31 I::Item: Send + 'static,
32 {
33 type Item = I::Item;
34 type Task = IterStreamTask<I::Item>;
35
36 #[inline]
37 fn size_hint(&self) -> (usize, Option<usize>) {
38 self.0.size_hint()
39 }
40 #[inline]
41 fn next_task(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Task>> {
42 Poll::Ready(self.0.next().map(IterStreamTask::new))
43 }
44 }
45}
46
47#[pin_project]
48#[derive(Serialize, Deserialize)]
49pub struct IterStreamTask<T>(Option<T>);
50impl<T> IterStreamTask<T> {
51 #[inline]
52 fn new(t: T) -> Self {
53 Self(Some(t))
54 }
55}
56
57impl<T> StreamTask for IterStreamTask<T> {
58 type Item = T;
59 type Async = IterStreamTask<T>;
60
61 #[inline]
62 fn into_async(self) -> Self::Async {
63 self
64 }
65}
66impl<T> Stream for IterStreamTask<T> {
67 type Item = T;
68
69 #[inline]
70 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
71 Poll::Ready(self.project().0.take())
72 }
73}
74
75impl_par_dist_rename! {
76 impl<Idx> IntoParallelStream for Range<Idx>
77 where
78 Self: Iterator,
79 <Self as Iterator>::Item: Send + 'static,
80 {
81 type ParStream = IterParStream<Self>;
82 type Item = <Self as Iterator>::Item;
83
84 #[inline]
85 fn into_par_stream(self) -> Self::ParStream
86 where
87 Self: Sized,
88 {
89 IterParStream(self)
90 }
91 }
92
93 impl<Idx> IntoParallelStream for RangeFrom<Idx>
94 where
95 Self: Iterator,
96 <Self as Iterator>::Item: Send + 'static,
97 {
98 type ParStream = IterParStream<Self>;
99 type Item = <Self as Iterator>::Item;
100
101 #[inline]
102 fn into_par_stream(self) -> Self::ParStream
103 where
104 Self: Sized,
105 {
106 IterParStream(self)
107 }
108 }
109
110 impl<Idx> IntoParallelStream for RangeInclusive<Idx>
111 where
112 Self: Iterator,
113 <Self as Iterator>::Item: Send + 'static,
114 {
115 type ParStream = IterParStream<Self>;
116 type Item = <Self as Iterator>::Item;
117
118 #[inline]
119 fn into_par_stream(self) -> Self::ParStream
120 where
121 Self: Sized,
122 {
123 IterParStream(self)
124 }
125 }
126}