amadeus_core/par_stream/
identity.rs

1use futures::Stream;
2use serde::{Deserialize, Serialize};
3use serde_closure::traits;
4use std::{
5	iter, pin::Pin, task::{Context, Poll}
6};
7
8use super::{
9	All, Any, Collect, Combine, Count, Filter, FlatMap, Fold, ForEach, Fork, GroupBy, Histogram, Inspect, Map, Max, MaxBy, MaxByKey, Mean, Min, MinBy, MinByKey, MostDistinct, MostFrequent, ParallelPipe, Pipe, PipeTask, SampleUnstable, StdDev, Sum, Update
10};
11
12// TODO: add type parameter to Identity when type the type system includes HRTB in the ParallelPipe impl https://github.com/dtolnay/ghost/
13
14#[derive(Clone, Copy, Debug)]
15pub struct Identity;
16
17impl_par_dist! {
18	impl<Item> ParallelPipe<Item> for Identity {
19		type Output = Item;
20		type Task = IdentityTask;
21
22		#[inline]
23		fn task(&self) -> Self::Task {
24			IdentityTask
25		}
26	}
27}
28
29// These sortof work around https://github.com/rust-lang/rust/issues/73433
30mod workaround {
31	use super::*;
32
33	#[cfg_attr(not(nightly), serde_closure::desugar)]
34	#[doc(hidden)]
35	impl Identity {
36		#[inline]
37		pub fn pipe<S>(self, sink: S) -> Pipe<Self, S> {
38			Pipe::new(self, sink)
39		}
40
41		#[inline]
42		pub fn fork<A, B, RefAItem>(self, sink: A, sink_ref: B) -> Fork<Self, A, B, RefAItem> {
43			Fork::new(self, sink, sink_ref)
44		}
45
46		#[inline]
47		pub fn inspect<F>(self, f: F) -> Inspect<Self, F>
48		where
49			F: Clone + Send + 'static,
50		{
51			Inspect::new(self, f)
52		}
53
54		#[inline]
55		pub fn update<T, F>(self, f: F) -> Update<Self, F>
56		where
57			F: Clone + Send + 'static,
58		{
59			Update::new(self, f)
60		}
61
62		#[inline]
63		pub fn map<F>(self, f: F) -> Map<Self, F>
64		where
65			F: Clone + Send + 'static,
66		{
67			Map::new(self, f)
68		}
69
70		#[inline]
71		pub fn flat_map<F>(self, f: F) -> FlatMap<Self, F>
72		where
73			F: Clone + Send + 'static,
74		{
75			FlatMap::new(self, f)
76		}
77
78		#[inline]
79		pub fn filter<F>(self, f: F) -> Filter<Self, F>
80		where
81			F: Clone + Send + 'static,
82		{
83			Filter::new(self, f)
84		}
85
86		// #[must_use]
87		// #[inline]
88		// pub fn chain<C>(self, chain: C) -> Chain<Self, C::Iter>
89		// where
90		// 	C: IntoParallelStream<Item = Self::Item>,
91		// {
92		// 	Chain::new(self, chain.into_par_stream())
93		// }
94
95		#[inline]
96		pub fn for_each<F>(self, f: F) -> ForEach<Self, F>
97		where
98			F: Clone + Send + 'static,
99		{
100			ForEach::new(self, f)
101		}
102
103		#[inline]
104		pub fn fold<ID, F, B>(self, identity: ID, op: F) -> Fold<Self, ID, F, B>
105		where
106			ID: traits::FnMut() -> B + Clone + Send + 'static,
107			F: Clone + Send + 'static,
108			B: Send + 'static,
109		{
110			Fold::new(self, identity, op)
111		}
112
113		#[inline]
114		pub fn group_by<S>(self, sink: S) -> GroupBy<Self, S> {
115			GroupBy::new(self, sink)
116		}
117
118		#[inline]
119		pub fn histogram(self) -> Histogram<Self> {
120			Histogram::new(self)
121		}
122
123		#[inline]
124		pub fn count(self) -> Count<Self> {
125			Count::new(self)
126		}
127
128		#[inline]
129		pub fn sum<B>(self) -> Sum<Self, B>
130		where
131			B: iter::Sum<B> + Send + 'static,
132		{
133			Sum::new(self)
134		}
135
136		#[inline]
137		pub fn mean(self) -> Mean<Self> {
138			Mean::new(self)
139		}
140
141		#[inline]
142		pub fn stddev(self) -> StdDev<Self> {
143			StdDev::new(self)
144		}
145
146		#[inline]
147		pub fn combine<F>(self, f: F) -> Combine<Self, F>
148		where
149			F: Clone + Send + 'static,
150		{
151			Combine::new(self, f)
152		}
153
154		#[inline]
155		pub fn max(self) -> Max<Self> {
156			Max::new(self)
157		}
158
159		#[inline]
160		pub fn max_by<F>(self, f: F) -> MaxBy<Self, F>
161		where
162			F: Clone + Send + 'static,
163		{
164			MaxBy::new(self, f)
165		}
166
167		#[inline]
168		pub fn max_by_key<F>(self, f: F) -> MaxByKey<Self, F>
169		where
170			F: Clone + Send + 'static,
171		{
172			MaxByKey::new(self, f)
173		}
174
175		#[inline]
176		pub fn min(self) -> Min<Self> {
177			Min::new(self)
178		}
179
180		#[inline]
181		pub fn min_by<F>(self, f: F) -> MinBy<Self, F>
182		where
183			F: Clone + Send + 'static,
184		{
185			MinBy::new(self, f)
186		}
187
188		#[inline]
189		pub fn min_by_key<F>(self, f: F) -> MinByKey<Self, F>
190		where
191			F: Clone + Send + 'static,
192		{
193			MinByKey::new(self, f)
194		}
195
196		#[inline]
197		pub fn most_frequent(
198			self, n: usize, probability: f64, tolerance: f64,
199		) -> MostFrequent<Self> {
200			MostFrequent::new(self, n, probability, tolerance)
201		}
202
203		#[inline]
204		pub fn most_distinct(
205			self, n: usize, probability: f64, tolerance: f64, error_rate: f64,
206		) -> MostDistinct<Self> {
207			MostDistinct::new(self, n, probability, tolerance, error_rate)
208		}
209
210		#[inline]
211		pub fn sample_unstable(self, samples: usize) -> SampleUnstable<Self> {
212			SampleUnstable::new(self, samples)
213		}
214
215		#[inline]
216		pub fn all<F>(self, f: F) -> All<Self, F>
217		where
218			F: Clone + Send + 'static,
219		{
220			All::new(self, f)
221		}
222
223		#[inline]
224		pub fn any<F>(self, f: F) -> Any<Self, F>
225		where
226			F: Clone + Send + 'static,
227		{
228			Any::new(self, f)
229		}
230
231		#[inline]
232		pub fn collect<B>(self) -> Collect<Self, B> {
233			Collect::new(self)
234		}
235	}
236}
237
238#[derive(Clone, Serialize, Deserialize)]
239pub struct IdentityTask;
240impl<Item> PipeTask<Item> for IdentityTask {
241	type Output = Item;
242	type Async = IdentityTask;
243
244	#[inline]
245	fn into_async(self) -> Self::Async {
246		IdentityTask
247	}
248}
249impl<Item> crate::pipe::Pipe<Item> for IdentityTask {
250	type Output = Item;
251
252	#[inline(always)]
253	fn poll_next(
254		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
255	) -> Poll<Option<Self::Output>> {
256		stream.poll_next(cx)
257	}
258}