1use futures::Stream;
2use serde::{Deserialize, Serialize};
3use serde_closure::traits;
4use std::{
5 iter, pin::Pin, task::{Context, Poll}
6};
7
8use super::{
9 All, Any, Collect, Combine, Count, Filter, FlatMap, Fold, ForEach, Fork, GroupBy, Histogram, Inspect, Map, Max, MaxBy, MaxByKey, Mean, Min, MinBy, MinByKey, MostDistinct, MostFrequent, ParallelPipe, Pipe, PipeTask, SampleUnstable, StdDev, Sum, Update
10};
11
12#[derive(Clone, Copy, Debug)]
15pub struct Identity;
16
17impl_par_dist! {
18 impl<Item> ParallelPipe<Item> for Identity {
19 type Output = Item;
20 type Task = IdentityTask;
21
22 #[inline]
23 fn task(&self) -> Self::Task {
24 IdentityTask
25 }
26 }
27}
28
29mod workaround {
31 use super::*;
32
33 #[cfg_attr(not(nightly), serde_closure::desugar)]
34 #[doc(hidden)]
35 impl Identity {
36 #[inline]
37 pub fn pipe<S>(self, sink: S) -> Pipe<Self, S> {
38 Pipe::new(self, sink)
39 }
40
41 #[inline]
42 pub fn fork<A, B, RefAItem>(self, sink: A, sink_ref: B) -> Fork<Self, A, B, RefAItem> {
43 Fork::new(self, sink, sink_ref)
44 }
45
46 #[inline]
47 pub fn inspect<F>(self, f: F) -> Inspect<Self, F>
48 where
49 F: Clone + Send + 'static,
50 {
51 Inspect::new(self, f)
52 }
53
54 #[inline]
55 pub fn update<T, F>(self, f: F) -> Update<Self, F>
56 where
57 F: Clone + Send + 'static,
58 {
59 Update::new(self, f)
60 }
61
62 #[inline]
63 pub fn map<F>(self, f: F) -> Map<Self, F>
64 where
65 F: Clone + Send + 'static,
66 {
67 Map::new(self, f)
68 }
69
70 #[inline]
71 pub fn flat_map<F>(self, f: F) -> FlatMap<Self, F>
72 where
73 F: Clone + Send + 'static,
74 {
75 FlatMap::new(self, f)
76 }
77
78 #[inline]
79 pub fn filter<F>(self, f: F) -> Filter<Self, F>
80 where
81 F: Clone + Send + 'static,
82 {
83 Filter::new(self, f)
84 }
85
86 #[inline]
96 pub fn for_each<F>(self, f: F) -> ForEach<Self, F>
97 where
98 F: Clone + Send + 'static,
99 {
100 ForEach::new(self, f)
101 }
102
103 #[inline]
104 pub fn fold<ID, F, B>(self, identity: ID, op: F) -> Fold<Self, ID, F, B>
105 where
106 ID: traits::FnMut() -> B + Clone + Send + 'static,
107 F: Clone + Send + 'static,
108 B: Send + 'static,
109 {
110 Fold::new(self, identity, op)
111 }
112
113 #[inline]
114 pub fn group_by<S>(self, sink: S) -> GroupBy<Self, S> {
115 GroupBy::new(self, sink)
116 }
117
118 #[inline]
119 pub fn histogram(self) -> Histogram<Self> {
120 Histogram::new(self)
121 }
122
123 #[inline]
124 pub fn count(self) -> Count<Self> {
125 Count::new(self)
126 }
127
128 #[inline]
129 pub fn sum<B>(self) -> Sum<Self, B>
130 where
131 B: iter::Sum<B> + Send + 'static,
132 {
133 Sum::new(self)
134 }
135
136 #[inline]
137 pub fn mean(self) -> Mean<Self> {
138 Mean::new(self)
139 }
140
141 #[inline]
142 pub fn stddev(self) -> StdDev<Self> {
143 StdDev::new(self)
144 }
145
146 #[inline]
147 pub fn combine<F>(self, f: F) -> Combine<Self, F>
148 where
149 F: Clone + Send + 'static,
150 {
151 Combine::new(self, f)
152 }
153
154 #[inline]
155 pub fn max(self) -> Max<Self> {
156 Max::new(self)
157 }
158
159 #[inline]
160 pub fn max_by<F>(self, f: F) -> MaxBy<Self, F>
161 where
162 F: Clone + Send + 'static,
163 {
164 MaxBy::new(self, f)
165 }
166
167 #[inline]
168 pub fn max_by_key<F>(self, f: F) -> MaxByKey<Self, F>
169 where
170 F: Clone + Send + 'static,
171 {
172 MaxByKey::new(self, f)
173 }
174
175 #[inline]
176 pub fn min(self) -> Min<Self> {
177 Min::new(self)
178 }
179
180 #[inline]
181 pub fn min_by<F>(self, f: F) -> MinBy<Self, F>
182 where
183 F: Clone + Send + 'static,
184 {
185 MinBy::new(self, f)
186 }
187
188 #[inline]
189 pub fn min_by_key<F>(self, f: F) -> MinByKey<Self, F>
190 where
191 F: Clone + Send + 'static,
192 {
193 MinByKey::new(self, f)
194 }
195
196 #[inline]
197 pub fn most_frequent(
198 self, n: usize, probability: f64, tolerance: f64,
199 ) -> MostFrequent<Self> {
200 MostFrequent::new(self, n, probability, tolerance)
201 }
202
203 #[inline]
204 pub fn most_distinct(
205 self, n: usize, probability: f64, tolerance: f64, error_rate: f64,
206 ) -> MostDistinct<Self> {
207 MostDistinct::new(self, n, probability, tolerance, error_rate)
208 }
209
210 #[inline]
211 pub fn sample_unstable(self, samples: usize) -> SampleUnstable<Self> {
212 SampleUnstable::new(self, samples)
213 }
214
215 #[inline]
216 pub fn all<F>(self, f: F) -> All<Self, F>
217 where
218 F: Clone + Send + 'static,
219 {
220 All::new(self, f)
221 }
222
223 #[inline]
224 pub fn any<F>(self, f: F) -> Any<Self, F>
225 where
226 F: Clone + Send + 'static,
227 {
228 Any::new(self, f)
229 }
230
231 #[inline]
232 pub fn collect<B>(self) -> Collect<Self, B> {
233 Collect::new(self)
234 }
235 }
236}
237
238#[derive(Clone, Serialize, Deserialize)]
239pub struct IdentityTask;
240impl<Item> PipeTask<Item> for IdentityTask {
241 type Output = Item;
242 type Async = IdentityTask;
243
244 #[inline]
245 fn into_async(self) -> Self::Async {
246 IdentityTask
247 }
248}
249impl<Item> crate::pipe::Pipe<Item> for IdentityTask {
250 type Output = Item;
251
252 #[inline(always)]
253 fn poll_next(
254 self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
255 ) -> Poll<Option<Self::Output>> {
256 stream.poll_next(cx)
257 }
258}