renoir/operator/
mod.rs

1//! Operators that can be applied to a stream.
2//!
3//! The actual operator list can be found from the implemented methods of [`Stream`],
4//! [`KeyedStream`], [`crate::WindowedStream`]
5
6use std::cmp::Ordering;
7use std::collections::HashMap;
8use std::fmt::Display;
9use std::hash::Hash;
10use std::ops::{AddAssign, Div};
11
12use cache::{CacheRegistry, CacheSink, CachedStream, Cacher, VecCacher};
13use flume::{unbounded, Receiver};
14#[cfg(feature = "tokio")]
15use futures::Future;
16use limit_sorted::LimitSorted;
17use serde::{Deserialize, Serialize};
18
19pub(crate) use start::*;
20
21pub use rich_map_custom::ElementGenerator;
22
23use crate::block::{group_by_hash, BlockStructure, GroupHasherBuilder, NextStrategy, Replication};
24use crate::scheduler::ExecutionMetadata;
25
26use crate::block::BatchMode;
27use crate::stream::KeyedItem;
28use crate::{KeyedStream, Stream};
29
30#[cfg(feature = "tokio")]
31use self::map_async::MapAsync;
32use self::map_memo::MapMemo;
33use self::sink::collect::Collect;
34use self::sink::collect_channel::CollectChannelSink;
35use self::sink::collect_count::CollectCountSink;
36use self::sink::collect_vec::CollectVecSink;
37use self::sink::for_each::ForEach;
38use self::sink::{StreamOutput, StreamOutputRef};
39#[cfg(feature = "timestamp")]
40use self::{
41    add_timestamps::{AddTimestamp, DropTimestamp},
42    interval_join::IntervalJoin,
43};
44use self::{
45    end::End,
46    filter::Filter,
47    filter_map::FilterMap,
48    flat_map::{FlatMap, KeyedFlatMap},
49    flatten::{Flatten, KeyedFlatten},
50    fold::Fold,
51    inspect::Inspect,
52    key_by::KeyBy,
53    keyed_fold::KeyedFold,
54    map::Map,
55    merge::MergeElement,
56    reorder::Reorder,
57    rich_map::RichMap,
58    rich_map_custom::RichMapCustom,
59    route::RouterBuilder,
60    zip::Zip,
61};
62
63#[cfg(feature = "timestamp")]
64mod add_timestamps;
65mod batch_mode;
66mod boxed;
67pub mod cache;
68pub(crate) mod end;
69mod filter;
70mod filter_map;
71mod flat_map;
72mod flatten;
73mod fold;
74mod inspect;
75#[cfg(feature = "timestamp")]
76mod interval_join;
77pub mod iteration;
78pub mod join;
79mod key_by;
80mod keyed_fold;
81mod limit_sorted;
82mod map;
83#[cfg(feature = "tokio")]
84mod map_async;
85mod map_memo;
86mod merge;
87mod reorder;
88mod replication;
89mod rich_map;
90mod rich_map_custom;
91mod route;
92pub mod sink;
93pub mod source;
94mod start;
95pub mod window;
96mod zip;
97
98/// Marker trait that all the types inside a stream should implement.
99pub trait Data: Clone + Send + 'static {}
100impl<T: Clone + Send + 'static> Data for T {}
101
102/// Marker trait for data types that are used to communicate between different blocks.
103pub trait ExchangeData: Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static {}
104impl<T: Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static> ExchangeData for T {}
105
106/// Marker trait that all the keys should implement.
107pub trait DataKey: Clone + Send + Hash + Eq + 'static {}
108impl<T: Clone + Send + Hash + Eq + 'static> DataKey for T {}
109
110/// Marker trait for key types that are used when communicating between different blocks.
111pub trait ExchangeDataKey: DataKey + ExchangeData {}
112impl<T: DataKey + ExchangeData> ExchangeDataKey for T {}
113
114/// Marker trait for the function that extracts the key out of a type.
115pub trait KeyerFn<Key, Out>: Fn(&Out) -> Key + Clone + Send + 'static {}
116impl<Key, Out, T: Fn(&Out) -> Key + Clone + Send + 'static> KeyerFn<Key, Out> for T {}
117
118/// When using timestamps and watermarks, this type expresses the timestamp of a message or of a
119/// watermark.
120#[cfg(feature = "timestamp")]
121pub type Timestamp = i64;
122
123#[cfg(not(feature = "timestamp"))]
124pub type Timestamp = ();
125
126/// An element of the stream. This is what enters and exits from the operators.
127///
128/// An operator may need to change the content of a `StreamElement` (e.g. a `Map` may change the
129/// value of the `Item`). Usually `Watermark` and `FlushAndRestart` are simply forwarded to the next
130/// operator in the chain.
131///
132/// In general a stream may be composed of a sequence of this kind:
133///
134/// `((Item | Timestamped | Watermark | FlushBatch)* FlushAndRestart)+ Terminate`
135#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
136pub enum StreamElement<Out> {
137    /// A normal element containing just the value of the message.
138    Item(Out),
139    /// Like `Item`, but it's attached with a timestamp, it's used to ensure the ordering of the
140    /// messages.
141    Timestamped(Out, Timestamp),
142    /// When an operator receives a `Watermark` with timestamp `t`, the operator will never see any
143    /// message with timestamp less or equal to `t`.
144    Watermark(Timestamp),
145    /// Flush the internal batch since there will be too much delay till the next message to come.
146    FlushBatch,
147    /// The stream has ended, and the operators should exit as soon as possible.
148    ///
149    /// No messages should be generated by the operator between a `FlushAndRestart` and a
150    /// `Terminate`.
151    Terminate,
152    /// Mark the end of a stream of data.
153    ///
154    /// Note that this does not mean that the entire stream has ended, for example this is used to
155    /// mark the end of an iteration. Therefore an operator may be prepared to received new data
156    /// after this message, but should not retain the internal state.
157    FlushAndRestart,
158}
159
160/// An operator represents a unit of computation. It's always included inside a chain of operators,
161/// inside a block.
162///
163/// Each operator implements the `Operator<Out>` trait, it produced a stream of `Out` elements.
164///
165/// An `Operator` must be Clone since it is part of a single chain when it's built, but it has to
166/// be cloned to spawn the replicas of the block.
167pub trait Operator: Clone + Send + Display {
168    type Out: Send;
169    /// Setup the operator chain. This is called before any call to `next` and it's used to
170    /// initialize the operator. When it's called the operator has already been cloned and it will
171    /// never be cloned again. Therefore it's safe to store replica-specific metadata inside of it.
172    ///
173    /// It's important that each operator (except the start of a chain) calls `.setup()` recursively
174    /// on the previous operators.
175    fn setup(&mut self, metadata: &mut ExecutionMetadata);
176
177    /// Take a value from the previous operator, process it and return it.
178    fn next(&mut self) -> StreamElement<Self::Out>;
179
180    /// A more refined representation of the operator and its predecessors.
181    fn structure(&self) -> BlockStructure;
182}
183
184impl<Out> StreamElement<Out> {
185    /// Create a new `StreamElement` with an `Item(())` if `self` contains an item, otherwise it
186    /// returns the same variant of `self`.
187    pub fn variant(&self) -> StreamElement<()> {
188        match self {
189            StreamElement::Item(_) => StreamElement::Item(()),
190            StreamElement::Timestamped(_, _) => StreamElement::Item(()),
191            StreamElement::Watermark(w) => StreamElement::Watermark(*w),
192            StreamElement::Terminate => StreamElement::Terminate,
193            StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
194            StreamElement::FlushBatch => StreamElement::FlushBatch,
195        }
196    }
197
198    /// Change the type of the element inside the `StreamElement`.
199    pub fn map<NewOut>(self, f: impl FnOnce(Out) -> NewOut) -> StreamElement<NewOut> {
200        match self {
201            StreamElement::Item(item) => StreamElement::Item(f(item)),
202            StreamElement::Timestamped(item, ts) => StreamElement::Timestamped(f(item), ts),
203            StreamElement::Watermark(w) => StreamElement::Watermark(w),
204            StreamElement::Terminate => StreamElement::Terminate,
205            StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
206            StreamElement::FlushBatch => StreamElement::FlushBatch,
207        }
208    }
209
210    /// Change the type of the element inside the `StreamElement`.
211    #[cfg(feature = "tokio")]
212    pub async fn map_async<NewOut, F, Fut>(self, f: F) -> StreamElement<NewOut>
213    where
214        F: Fn(Out) -> Fut,
215        Fut: Future<Output = NewOut>,
216    {
217        match self {
218            StreamElement::Item(item) => StreamElement::Item(f(item).await),
219            StreamElement::Timestamped(item, ts) => StreamElement::Timestamped(f(item).await, ts),
220            StreamElement::Watermark(w) => StreamElement::Watermark(w),
221            StreamElement::Terminate => StreamElement::Terminate,
222            StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
223            StreamElement::FlushBatch => StreamElement::FlushBatch,
224        }
225    }
226
227    /// A string representation of the variant of this `StreamElement`.
228    pub fn variant_str(&self) -> &'static str {
229        match self {
230            StreamElement::Item(_) => "Item",
231            StreamElement::Timestamped(_, _) => "Timestamped",
232            StreamElement::Watermark(_) => "Watermark",
233            StreamElement::FlushBatch => "FlushBatch",
234            StreamElement::Terminate => "Terminate",
235            StreamElement::FlushAndRestart => "FlushAndRestart",
236        }
237    }
238
239    /// A string representation of the variant of this `StreamElement`.
240    pub fn timestamp(&self) -> Option<&Timestamp> {
241        match self {
242            StreamElement::Timestamped(_, ts) | StreamElement::Watermark(ts) => Some(ts),
243            _ => None,
244        }
245    }
246
247    pub fn add_key<Key>(self, k: Key) -> StreamElement<(Key, Out)> {
248        match self {
249            StreamElement::Item(v) => StreamElement::Item((k, v)),
250            StreamElement::Timestamped(v, ts) => StreamElement::Timestamped((k, v), ts),
251            StreamElement::Watermark(w) => StreamElement::Watermark(w),
252            StreamElement::Terminate => StreamElement::Terminate,
253            StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
254            StreamElement::FlushBatch => StreamElement::FlushBatch,
255        }
256    }
257
258    pub fn value(&self) -> Option<&Out> {
259        match self {
260            StreamElement::Item(v) => Some(v),
261            StreamElement::Timestamped(v, _) => Some(v),
262            StreamElement::Watermark(_) => None,
263            StreamElement::FlushBatch => None,
264            StreamElement::Terminate => None,
265            StreamElement::FlushAndRestart => None,
266        }
267    }
268}
269
270impl<Key, Out> StreamElement<(Key, Out)> {
271    /// Map a `StreamElement<KeyValue(Key, Out)>` to a `StreamElement<Out>`,
272    /// returning the key if possible
273    pub fn take_key(self) -> (Option<Key>, StreamElement<Out>) {
274        match self {
275            StreamElement::Item((k, v)) => (Some(k), StreamElement::Item(v)),
276            StreamElement::Timestamped((k, v), ts) => (Some(k), StreamElement::Timestamped(v, ts)),
277            StreamElement::Watermark(w) => (None, StreamElement::Watermark(w)),
278            StreamElement::Terminate => (None, StreamElement::Terminate),
279            StreamElement::FlushAndRestart => (None, StreamElement::FlushAndRestart),
280            StreamElement::FlushBatch => (None, StreamElement::FlushBatch),
281        }
282    }
283
284    pub fn key(self) -> Option<Key> {
285        match self {
286            StreamElement::Item((k, _)) => Some(k),
287            StreamElement::Timestamped((k, _), _) => Some(k),
288            StreamElement::Watermark(_) => None,
289            StreamElement::Terminate => None,
290            StreamElement::FlushAndRestart => None,
291            StreamElement::FlushBatch => None,
292        }
293    }
294}
295
296impl<Op> Stream<Op>
297where
298    Op: Operator + 'static,
299{
300    /// Given a stream without timestamps nor watermarks, tag each item with a timestamp and insert
301    /// watermarks.
302    ///
303    /// The two functions given to this operator are the following:
304    /// - `timestamp_gen` returns the timestamp assigned to the provided element of the stream
305    /// - `watermark_gen` returns an optional watermark to add after the provided element
306    ///
307    /// Note that the two functions **must** follow the watermark semantics.
308    /// TODO: link to watermark semantics
309    ///
310    /// ## Example
311    ///
312    /// In this example the stream contains the integers from 0 to 9, each will be tagged with a
313    /// timestamp with the value of the item as milliseconds, and after each even number a watermark
314    /// will be inserted.
315    ///
316    /// ```
317    /// # use renoir::{StreamContext, RuntimeConfig};
318    /// # use renoir::operator::source::IteratorSource;
319    /// use renoir::operator::Timestamp;
320    /// # let mut env = StreamContext::new_local();
321    ///
322    /// let s = env.stream_iter(0..10);
323    /// s.add_timestamps(
324    ///     |&n| n,
325    ///     |&n, &ts| if n % 2 == 0 { Some(ts) } else { None }
326    /// );
327    /// ```
328    #[cfg(feature = "timestamp")]
329    pub fn add_timestamps<F, G>(
330        self,
331        timestamp_gen: F,
332        watermark_gen: G,
333    ) -> Stream<AddTimestamp<F, G, Op>>
334    where
335        F: FnMut(&Op::Out) -> Timestamp + Clone + Send + 'static,
336        G: FnMut(&Op::Out, &Timestamp) -> Option<Timestamp> + Clone + Send + 'static,
337    {
338        self.add_operator(|prev| AddTimestamp::new(prev, timestamp_gen, watermark_gen))
339    }
340
341    #[cfg(feature = "timestamp")]
342    pub fn drop_timestamps(self) -> Stream<DropTimestamp<Op>> {
343        self.add_operator(|prev| DropTimestamp::new(prev))
344    }
345    /// Change the batch mode for this stream.
346    ///
347    /// This change will be propagated to all the operators following, even of the next blocks,
348    /// until it's changed again.
349    ///
350    /// ## Example
351    ///
352    /// ```
353    /// # use renoir::{StreamContext, RuntimeConfig};
354    /// # use renoir::operator::source::IteratorSource;
355    /// use renoir::BatchMode;
356    /// # let mut env = StreamContext::new_local();
357    ///
358    /// let s = env.stream_iter(0..10);
359    /// s.batch_mode(BatchMode::fixed(1024));
360    /// ```
361    pub fn batch_mode(mut self, batch_mode: BatchMode) -> Self {
362        self.block.batch_mode = batch_mode;
363        self
364    }
365
366    /// Remove from the stream all the elements for which the provided function returns `None` and
367    /// keep the elements that returned `Some(_)`.
368    ///
369    /// **Note**: this is very similar to [`Iteartor::filter_map`](std::iter::Iterator::filter_map)
370    ///
371    /// ## Example
372    ///
373    /// ```
374    /// # use renoir::{StreamContext, RuntimeConfig};
375    /// # use renoir::operator::source::IteratorSource;
376    /// # let mut env = StreamContext::new_local();
377    /// let s = env.stream_iter(0..10);
378    /// let res = s.filter_map(|n| if n % 2 == 0 { Some(n * 3) } else { None }).collect_vec();
379    ///
380    /// env.execute_blocking();
381    ///
382    /// assert_eq!(res.get().unwrap(), vec![0, 6, 12, 18, 24])
383    /// ```
384    pub fn filter_map<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
385    where
386        F: Fn(Op::Out) -> Option<O> + Send + Clone + 'static,
387        O: Data,
388    {
389        self.add_operator(|prev| FilterMap::new(prev, f))
390    }
391
392    /// Remove from the stream all the elements for which the provided predicate returns `false`.
393    ///
394    /// **Note**: this is very similar to [`Iteartor::filter`](std::iter::Iterator::filter)
395    ///
396    /// ## Example
397    ///
398    /// ```
399    /// # use renoir::{StreamContext, RuntimeConfig};
400    /// # use renoir::operator::source::IteratorSource;
401    /// # let mut env = StreamContext::new_local();
402    /// let s = env.stream_iter(0..10);
403    /// let res = s.filter(|&n| n % 2 == 0).collect_vec();
404    ///
405    /// env.execute_blocking();
406    ///
407    /// assert_eq!(res.get().unwrap(), vec![0, 2, 4, 6, 8])
408    /// ```
409    pub fn filter<F>(self, predicate: F) -> Stream<impl Operator<Out = Op::Out>>
410    where
411        F: Fn(&Op::Out) -> bool + Clone + Send + 'static,
412    {
413        self.add_operator(|prev| Filter::new(prev, predicate))
414    }
415
416    /// Reorder timestamped items
417    ///
418    /// # Example
419    /// ### TODO
420    pub fn reorder(self) -> Stream<impl Operator<Out = Op::Out>> {
421        self.add_operator(|prev| Reorder::new(prev))
422    }
423
424    /// Remove from the stream all the elements for which the provided function returns `None` and
425    /// keep the elements that returned `Some(_)`. The mapping function can be stateful.
426    ///
427    /// This is equivalent to [`Stream::filter_map`] but with a stateful function.
428    ///
429    /// Since the mapping function can be stateful, it is a `FnMut`. This allows expressing simple
430    /// algorithms with very few lines of code (see examples).
431    ///
432    /// The mapping function is _cloned_ inside each replica, and they will not share state between
433    /// each other. If you want that only a single replica handles all the items you may want to
434    /// change the parallelism of this operator with [`Stream::replication`].
435    ///
436    /// ## Examples
437    ///
438    /// This will emit only the _positive prefix-sums_.
439    ///
440    /// ```
441    /// # use renoir::{StreamContext, RuntimeConfig};
442    /// # use renoir::operator::source::IteratorSource;
443    /// # let mut env = StreamContext::new_local();
444    /// let s = env.stream_iter((std::array::IntoIter::new([1, 2, -5, 3, 1])));
445    /// let res = s.rich_filter_map({
446    ///     let mut sum = 0;
447    ///     move |x| {
448    ///         sum += x;
449    ///         if sum >= 0 {
450    ///             Some(sum)
451    ///         } else {
452    ///             None
453    ///         }
454    ///     }
455    /// }).collect_vec();
456    ///
457    /// env.execute_blocking();
458    ///
459    /// assert_eq!(res.get().unwrap(), vec![1, 1 + 2, /* 1 + 2 - 5, */ 1 + 2 - 5 + 3, 1 + 2 - 5 + 3 + 1]);
460    /// ```
461    pub fn rich_filter_map<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
462    where
463        F: FnMut(Op::Out) -> Option<O> + Send + Clone + 'static,
464        O: Data,
465    {
466        self.rich_map(f).filter(|x| x.is_some()).map(|x| x.unwrap())
467    }
468
469    /// Map the elements of the stream into new elements. The mapping function can be stateful.
470    ///
471    /// This is equivalent to [`Stream::map`] but with a stateful function.
472    ///
473    /// Since the mapping function can be stateful, it is a `FnMut`. This allows expressing simple
474    /// algorithms with very few lines of code (see examples).
475    ///
476    /// The mapping function is _cloned_ inside each replica, and they will not share state between
477    /// each other. If you want that only a single replica handles all the items you may want to
478    /// change the parallelism of this operator with [`Stream::replication`].
479    ///
480    /// ## Examples
481    ///
482    /// This is a simple implementation of the prefix-sum using a single replica (i.e. each element
483    /// is mapped to the sum of all the elements up to that point). Note that this won't work if
484    /// there are more replicas.
485    ///
486    /// ```
487    /// # use renoir::{StreamContext, RuntimeConfig};
488    /// # use renoir::operator::source::IteratorSource;
489    /// # let mut env = StreamContext::new_local();
490    /// let s = env.stream_iter(1..=5);
491    /// let res = s.rich_map({
492    ///     let mut sum = 0;
493    ///     move |x| {
494    ///         sum += x;
495    ///         sum
496    ///     }
497    /// }).collect_vec();
498    ///
499    /// env.execute_blocking();
500    ///
501    /// assert_eq!(res.get().unwrap(), vec![1, 1 + 2, 1 + 2 + 3, 1 + 2 + 3 + 4, 1 + 2 + 3 + 4 + 5]);
502    /// ```    
503    ///
504    /// This will enumerate all the elements that reach a replica. This is basically equivalent to
505    /// the `enumerate` function in Python.
506    ///
507    /// ```
508    /// # use renoir::{StreamContext, RuntimeConfig};
509    /// # use renoir::operator::source::IteratorSource;
510    /// # let mut env = StreamContext::new_local();
511    /// let s = env.stream_iter(1..=5);
512    /// let res = s.rich_map({
513    ///     let mut id = 0;
514    ///     move |x| {
515    ///         id += 1;
516    ///         (id - 1, x)
517    ///     }
518    /// }).collect_vec();
519    ///
520    /// env.execute_blocking();
521    ///
522    /// assert_eq!(res.get().unwrap(), vec![(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]);
523    /// ```
524    pub fn rich_map<O, F>(self, mut f: F) -> Stream<impl Operator<Out = O>>
525    where
526        F: FnMut(Op::Out) -> O + Send + Clone + 'static,
527        O: Send + 'static,
528    {
529        self.key_by(|_| ())
530            .add_operator(|prev| RichMap::new(prev, move |(_, value)| f(value)))
531            .drop_key()
532    }
533
534    /// Map the elements of the stream into new elements.
535    ///
536    /// **Note**: this is very similar to [`Iteartor::map`](std::iter::Iterator::map).
537    ///
538    /// ## Example
539    ///
540    /// ```
541    /// # use renoir::{StreamContext, RuntimeConfig};
542    /// # use renoir::operator::source::IteratorSource;
543    /// # let mut env = StreamContext::new_local();
544    /// let s = env.stream_iter(0..5);
545    /// let res = s.map(|n| n * 10).collect_vec();
546    ///
547    /// env.execute_blocking();
548    ///
549    /// assert_eq!(res.get().unwrap(), vec![0, 10, 20, 30, 40]);
550    /// ```
551    pub fn map<O: Send, F>(self, f: F) -> Stream<impl Operator<Out = O>>
552    where
553        F: Fn(Op::Out) -> O + Send + Clone + 'static,
554    {
555        self.add_operator(|prev| Map::new(prev, f))
556    }
557
558    /// Map the elements of the stream into new elements by evaluating a future for each one.
559    /// Use memoization to cache outputs for previously seen inputs.
560    ///
561    /// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
562    /// The maximum number of elements to be cached is passed as the `capacity` parameter.
563    ///
564    /// The outputs are cached according to the key produced by the `fk` function.
565    ///
566    /// ## Example
567    ///
568    /// ```
569    /// # use renoir::{StreamContext, RuntimeConfig};
570    /// # use renoir::operator::source::IteratorSource;
571    /// # tokio::runtime::Runtime::new()
572    /// #    .unwrap()
573    /// #    .block_on(base());
574    /// # async fn base() {
575    /// #    let mut env = StreamContext::new_local();
576    /// let s = env.stream_iter(5..15);
577    /// let res = s.map_async_memo_by(
578    ///     |n| async move {(n * n) % 7}, |n| n % 7, 5
579    /// ).collect_vec();
580    /// env.execute().await;
581    /// assert_eq!(res.get().unwrap(), vec![4, 1, 0, 1, 4, 2, 2, 4, 1, 0]);
582    /// # }
583    /// ```
584    #[cfg(feature = "tokio")]
585    pub fn map_async_memo_by<O, K, F, Fk, Fut>(
586        self,
587        f: F,
588        fk: Fk,
589        capacity: usize,
590    ) -> Stream<impl Operator<Out = O>>
591    where
592        F: Fn(Op::Out) -> Fut + Send + Sync + 'static + Clone,
593        Fk: Fn(&Op::Out) -> K + Send + Sync + Clone + 'static,
594        Fut: futures::Future<Output = O> + Send,
595        O: Clone + Send + Sync + 'static,
596        K: DataKey + Sync,
597    {
598        use futures::FutureExt;
599        use quick_cache::{sync::Cache, UnitWeighter};
600        use std::{convert::Infallible, sync::Arc};
601
602        let cache: Arc<Cache<K, O, _, GroupHasherBuilder>> = Arc::new(Cache::with(
603            capacity,
604            capacity as u64,
605            UnitWeighter,
606            Default::default(),
607            Default::default(),
608        ));
609        self.add_operator(|prev| {
610            MapAsync::new(
611                prev,
612                move |el| {
613                    let fk = fk.clone();
614                    let f = f.clone();
615                    let cache = cache.clone();
616                    let k = fk(&el);
617                    async move {
618                        cache
619                            .get_or_insert_async(&k, (f)(el).map(Result::Ok::<_, Infallible>))
620                            .await
621                            .unwrap()
622                    }
623                },
624                4,
625            )
626        })
627    }
628
629    /// Map the elements of the stream into new elements by evaluating a future for each one.
630    ///
631    /// ## Example
632    ///
633    /// ```
634    /// # use renoir::{StreamContext, RuntimeConfig};
635    /// # use renoir::operator::source::IteratorSource;
636    /// # tokio::runtime::Runtime::new()
637    /// #    .unwrap()
638    /// #    .block_on(base());
639    /// # async fn base() {
640    /// #    let mut env = StreamContext::new_local();
641    /// let s = env.stream_iter(5..15);
642    /// let res = s.map_async(|n| async move {(n * n) % 7}).collect_vec();
643    /// env.execute().await;
644    /// assert_eq!(res.get().unwrap(), vec![4, 1, 0, 1, 4, 2, 2, 4, 1, 0]);
645    /// # }
646    /// ```
647    #[cfg(feature = "tokio")]
648    pub fn map_async<O: Data, F, Fut>(self, f: F) -> Stream<impl Operator<Out = O>>
649    where
650        F: Fn(Op::Out) -> Fut + Send + Sync + 'static + Clone,
651        Fut: futures::Future<Output = O> + Send + 'static,
652    {
653        self.add_operator(|prev| MapAsync::new(prev, f, 4))
654    }
655
656    /// Map the elements of the stream into new elements. Use memoization
657    /// to cache outputs for previously seen inputs.
658    ///
659    /// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
660    /// The maximum number of elements to be cached is passed as the `capacity` parameter.
661    ///
662    /// The outputs are cached according to the key produced by the `fk` function.
663    ///
664    /// ## Example
665    ///
666    /// ```
667    /// # use renoir::{StreamContext, RuntimeConfig};
668    /// # use renoir::operator::source::IteratorSource;
669    /// # let mut env = StreamContext::new_local();
670    /// let s = env.stream_iter(5..15);
671    /// let res = s.map_memo_by(|n| (n * n) % 7, |n| n % 7, 5).collect_vec();
672    ///
673    /// env.execute_blocking();
674    ///
675    /// assert_eq!(res.get().unwrap(), vec![4, 1, 0, 1, 4, 2, 2, 4, 1, 0]);
676    /// ```
677    pub fn map_memo_by<K: DataKey + Sync, O: Clone + Send + Sync + 'static, F, Fk>(
678        self,
679        f: F,
680        fk: Fk,
681        capacity: usize,
682    ) -> Stream<impl Operator<Out = O>>
683    where
684        F: Fn(Op::Out) -> O + Send + Clone + 'static,
685        Fk: Fn(&Op::Out) -> K + Send + Clone + 'static,
686    {
687        self.add_operator(|prev| MapMemo::new(prev, f, fk, capacity))
688    }
689
690    /// Fold the stream into a stream that emits a single value.
691    ///
692    /// The folding operator consists in adding to the current accumulation value (initially the
693    /// value provided as `init`) the value of the current item in the stream.
694    ///
695    /// The folding function is provided with a mutable reference to the current accumulator and the
696    /// owned item of the stream. The function should modify the accumulator without returning
697    /// anything.
698    ///
699    /// Note that the output type may be different from the input type. Consider using
700    /// [`Stream::reduce`] if the output type is the same as the input type.
701    ///
702    /// **Note**: this operator will retain all the messages of the stream and emit the values only
703    /// when the stream ends. Therefore this is not properly _streaming_.
704    ///
705    /// **Note**: this operator is not parallelized, it creates a bottleneck where all the stream
706    /// elements are sent to and the folding is done using a single thread.
707    ///
708    /// **Note**: this is very similar to [`Iteartor::fold`](std::iter::Iterator::fold).
709    ///
710    /// **Note**: this operator will split the current block.
711    ///
712    /// ## Example
713    ///
714    /// ```
715    /// # use renoir::{StreamContext, RuntimeConfig};
716    /// # use renoir::operator::source::IteratorSource;
717    /// # let mut env = StreamContext::new_local();
718    /// let s = env.stream_iter(0..5);
719    /// let res = s.fold(0, |acc, value| *acc += value).collect_vec();
720    ///
721    /// env.execute_blocking();
722    ///
723    /// assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
724    /// ```
725    pub fn fold<O, F>(self, init: O, f: F) -> Stream<impl Operator<Out = O>>
726    where
727        F: Fn(&mut O, Op::Out) + Send + Clone + 'static,
728        Op::Out: ExchangeData,
729        O: Send + Clone,
730    {
731        self.replication(Replication::One)
732            .add_operator(|prev| Fold::new(prev, init, f))
733    }
734
735    /// Fold the stream into a stream that emits a single value.
736    ///
737    /// The folding operator consists in adding to the current accumulation value (initially the
738    /// value provided as `init`) the value of the current item in the stream.
739    ///
740    /// This method is very similary to [`Stream::fold`], but performs the folding distributely. To
741    /// do so the folding function must be _associative_, in particular the folding process is
742    /// performed in 2 steps:
743    ///
744    /// - `local`: the local function is used to fold the elements present in each replica of the
745    ///   stream independently. All those replicas will start with the same `init` value.
746    /// - `global`: all the partial results (the elements produced by the `local` step) have to be
747    ///   aggregated into a single result. This is done using the `global` folding function.
748    ///
749    /// Note that the output type may be different from the input type, therefore requireing
750    /// different function for the aggregation. Consider using [`Stream::reduce_assoc`] if the
751    /// output type is the same as the input type.
752    ///
753    /// **Note**: this operator will retain all the messages of the stream and emit the values only
754    /// when the stream ends. Therefore this is not properly _streaming_.
755    ///
756    /// **Note**: this operator will split the current block.
757    ///
758    /// ## Example
759    ///
760    /// ```
761    /// # use renoir::{StreamContext, RuntimeConfig};
762    /// # use renoir::operator::source::IteratorSource;
763    /// # let mut env = StreamContext::new_local();
764    /// let s = env.stream_iter(0..5);
765    /// let res = s.fold_assoc(0, |acc, value| *acc += value, |acc, value| *acc += value).collect_vec();
766    ///
767    /// env.execute_blocking();
768    ///
769    /// assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
770    /// ```
771    pub fn fold_assoc<O, F, G>(self, init: O, local: F, global: G) -> Stream<impl Operator<Out = O>>
772    where
773        F: Fn(&mut O, Op::Out) + Send + Clone + 'static,
774        G: Fn(&mut O, O) + Send + Clone + 'static,
775        O: ExchangeData,
776    {
777        self.add_operator(|prev| Fold::new(prev, init.clone(), local))
778            .replication(Replication::One)
779            .add_operator(|prev| Fold::new(prev, init, global))
780    }
781
782    /// Perform the folding operation separately for each key.
783    ///
784    /// This is equivalent of partitioning the stream using the `keyer` function, and then applying
785    /// [`Stream::fold_assoc`] to each partition separately.
786    ///
787    /// Note however that there is a difference between `stream.group_by(keyer).fold(...)` and
788    /// `stream.group_by_fold(keyer, ...)`. The first performs the network shuffle of every item in
789    /// the stream, and **later** performs the folding (i.e. nearly all the elements will be sent to
790    /// the network). The latter avoids sending the items by performing first a local reduction on
791    /// each host, and then send only the locally folded results (i.e. one message per replica, per
792    /// key); then the global step is performed aggregating the results.
793    ///
794    /// The resulting stream will still be keyed and will contain only a single message per key (the
795    /// final result).
796    ///
797    /// Note that the output type may be different from the input type, therefore requireing
798    /// different function for the aggregation. Consider using [`Stream::group_by_reduce`] if the
799    /// output type is the same as the input type.
800    ///
801    /// **Note**: this operator will retain all the messages of the stream and emit the values only
802    /// when the stream ends. Therefore this is not properly _streaming_.
803    ///
804    /// **Note**: this operator will split the current block.
805    ///
806    /// ## Example
807    /// ```
808    /// # use renoir::{StreamContext, RuntimeConfig};
809    /// # use renoir::operator::source::IteratorSource;
810    /// # let mut env = StreamContext::new_local();
811    /// let s = env.stream_iter(0..5);
812    /// let res = s
813    ///     .group_by_fold(|&n| n % 2, 0, |acc, value| *acc += value, |acc, value| *acc += value)
814    ///     .collect_vec();
815    ///
816    /// env.execute_blocking();
817    ///
818    /// let mut res = res.get().unwrap();
819    /// res.sort_unstable();
820    /// assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
821    /// ```
822    pub fn group_by_fold<K, O, Fk, F, G>(
823        self,
824        keyer: Fk,
825        init: O,
826        local: F,
827        global: G,
828    ) -> KeyedStream<impl Operator<Out = (K, O)>>
829    where
830        Fk: Fn(&Op::Out) -> K + Send + Clone + 'static,
831        F: Fn(&mut O, Op::Out) + Send + Clone + 'static,
832        G: Fn(&mut O, O) + Send + Clone + 'static,
833        K: ExchangeDataKey,
834        O: ExchangeData,
835        Op::Out: Clone,
836    {
837        // GroupBy based on key
838        let next_strategy = NextStrategy::GroupBy(
839            move |(key, _): &(K, O)| group_by_hash(&key),
840            Default::default(),
841        );
842
843        let new_stream = self
844            // key_by with given keyer
845            .add_operator(|prev| KeyBy::new(prev, keyer.clone()))
846            // local fold
847            .add_operator(|prev| KeyedFold::new(prev, init.clone(), local))
848            // group by key
849            .split_block(End::new, next_strategy)
850            // global fold
851            .add_operator(|prev| KeyedFold::new(prev, init.clone(), global));
852
853        KeyedStream(new_stream)
854    }
855
856    pub fn fold_scan<O, SL, SG, L, G, F>(
857        self,
858        local_fold: L,
859        global_fold: G,
860        global_init: SG,
861        map: F,
862    ) -> Stream<impl Operator<Out = O>>
863    where
864        Op::Out: ExchangeData,
865        L: Fn(&mut SL, Op::Out) + Send + Clone + 'static,
866        G: Fn(&mut SG, SL) + Send + Clone + 'static,
867        F: Fn(Op::Out, &SG) -> O + Send + Clone + 'static,
868        SL: ExchangeData + Default,
869        SG: ExchangeData + Sync,
870        O: ExchangeData,
871    {
872        #[derive(Serialize, Deserialize, Clone)]
873        enum TwoPass<I, O> {
874            First(I),
875            Second(I),
876            Output(O),
877        }
878
879        let (state, s) = self.map(|el| TwoPass::First(el)).iterate(
880            2,
881            None,
882            |s, state| {
883                s.map(move |el| match el {
884                    TwoPass::First(el) => TwoPass::Second(el),
885                    TwoPass::Second(el) => {
886                        TwoPass::Output((map)(el, state.get().as_ref().unwrap()))
887                    }
888                    TwoPass::Output(_) => unreachable!(),
889                })
890            },
891            move |local: &mut SL, el| match el {
892                TwoPass::First(_) => {}
893                TwoPass::Second(el) => local_fold(local, el),
894                TwoPass::Output(_) => {}
895            },
896            move |global: &mut Option<SG>, local| {
897                global_fold(global.get_or_insert(global_init.clone()), local)
898            },
899            |_| true,
900        );
901
902        state.for_each(std::mem::drop);
903        s.map(|t| match t {
904            TwoPass::First(_) | TwoPass::Second(_) => unreachable!(),
905            TwoPass::Output(o) => o,
906        })
907    }
908
909    pub fn reduce_scan<O, S, F1, F2, R>(
910        self,
911        first_map: F1,
912        reduce: R,
913        second_map: F2,
914    ) -> Stream<impl Operator<Out = O>>
915    where
916        Op::Out: ExchangeData,
917        F1: Fn(Op::Out) -> S + Send + Clone + 'static,
918        F2: Fn(Op::Out, &S) -> O + Send + Clone + 'static,
919        R: Fn(S, S) -> S + Send + Clone + 'static,
920        S: ExchangeData + Sync,
921        O: ExchangeData,
922    {
923        let reduce2 = reduce.clone();
924        self.fold_scan(
925            move |acc: &mut Option<S>, x| {
926                let map = (first_map)(x);
927                let cur = acc.take();
928                *acc = Some(match cur {
929                    Some(cur) => (reduce)(cur, map),
930                    None => map,
931                });
932            },
933            move |global, local| {
934                let cur = global.take();
935                *global = match (cur, local) {
936                    (Some(cur), Some(local)) => Some((reduce2)(cur, local)),
937                    (Some(a), None) | (None, Some(a)) => Some(a),
938                    (None, None) => None,
939                };
940            },
941            None,
942            move |x, state| (second_map)(x, state.as_ref().unwrap()),
943        )
944    }
945
946    /// Deduplicate elements. The resulting stream will contain exactly one occurrence
947    /// for each unique element in the input stream
948    ///
949    /// The current implementation requires `Hash` and `Eq` and it will repartition the stream
950    /// setting replication to Unlimited
951    pub fn unique_assoc(self) -> Stream<impl Operator<Out = Op::Out>>
952    where
953        Op::Out: Hash + Eq + Clone + ExchangeData + Sync,
954    {
955        use dashmap::DashSet;
956        use std::collections::HashSet;
957        use std::sync::Arc;
958
959        let local_set = Arc::new(DashSet::<_, GroupHasherBuilder>::default());
960        let mut final_set = HashSet::<_, GroupHasherBuilder>::default();
961
962        self.rich_flat_map(move |el| {
963            if !local_set.contains(&el) {
964                local_set.insert(el.clone());
965                Some(el)
966            } else {
967                None
968            }
969        })
970        .repartition_by(Replication::Unlimited, group_by_hash)
971        .rich_flat_map(move |el| {
972            if !final_set.contains(&el) {
973                final_set.insert(el.clone());
974                Some(el)
975            } else {
976                None
977            }
978        })
979    }
980
981    /// Deduplicate elements. The resulting stream will contain exactly one occurrence
982    /// for each unique key computed from the item in the input stream
983    ///
984    /// The current implementation requires `Hash` and `Eq` and it will repartition the stream
985    /// setting replication to Unlimited
986    pub fn unique_assoc_by_key<F, K>(self, f: F) -> Stream<impl Operator<Out = Op::Out>>
987    where
988        Op::Out: Hash + Eq + Clone + ExchangeData + Sync,
989        F: Fn(&Op::Out) -> K + Clone + Send + 'static,
990        K: Hash + Eq + Clone + Send + Sync + 'static,
991    {
992        use dashmap::DashSet;
993        use std::collections::HashSet;
994        use std::sync::Arc;
995        let f2 = f.clone();
996
997        let local_set = Arc::new(DashSet::<_, GroupHasherBuilder>::default());
998        let mut final_set = HashSet::<_, GroupHasherBuilder>::default();
999
1000        self.rich_flat_map(move |el| {
1001            let k = (f)(&el);
1002            if !local_set.insert(k) {
1003                Some(el)
1004            } else {
1005                None
1006            }
1007        })
1008        .repartition_by(Replication::Unlimited, group_by_hash)
1009        .rich_flat_map(move |el| {
1010            let k = (f2)(&el);
1011            if !final_set.insert(k) {
1012                Some(el)
1013            } else {
1014                None
1015            }
1016        })
1017    }
1018
1019    /// Construct a [`KeyedStream`] from a [`Stream`] without shuffling the data.
1020    ///
1021    /// **Note**: this violates the semantics of [`KeyedStream`], without sending all the values
1022    /// with the same key to the same replica some of the following operators may misbehave. You
1023    /// probably need to use [`Stream::group_by`] instead.
1024    ///
1025    /// ## Example
1026    /// ```
1027    /// # use renoir::{StreamContext, RuntimeConfig};
1028    /// # use renoir::operator::source::IteratorSource;
1029    /// # let mut env = StreamContext::new_local();
1030    /// let s = env.stream_iter(0..5);
1031    /// let res = s.key_by(|&n| n % 2).collect_vec();
1032    ///
1033    /// env.execute_blocking();
1034    ///
1035    /// let mut res = res.get().unwrap();
1036    /// res.sort_unstable();
1037    /// assert_eq!(res, vec![(0, 0), (0, 2), (0, 4), (1, 1), (1, 3)]);
1038    /// ```
1039    pub fn key_by<K, Fk>(self, keyer: Fk) -> KeyedStream<impl Operator<Out = (K, Op::Out)>>
1040    where
1041        Fk: Fn(&Op::Out) -> K + Send + Clone + 'static,
1042        K: DataKey,
1043    {
1044        KeyedStream(self.add_operator(|prev| KeyBy::new(prev, keyer)))
1045    }
1046
1047    /// Apply the given function to all the elements of the stream, consuming the stream.
1048    ///
1049    /// ## Example
1050    ///
1051    /// ```
1052    /// # use renoir::{StreamContext, RuntimeConfig};
1053    /// # use renoir::operator::source::IteratorSource;
1054    /// # let mut env = StreamContext::new_local();
1055    /// let s = env.stream_iter(0..5);
1056    /// s.inspect(|n| println!("Item: {}", n)).for_each(std::mem::drop);
1057    ///
1058    /// env.execute_blocking();
1059    /// ```
1060    pub fn inspect<F>(self, f: F) -> Stream<impl Operator<Out = Op::Out>>
1061    where
1062        F: FnMut(&Op::Out) + Send + Clone + 'static,
1063    {
1064        self.add_operator(|prev| Inspect::new(prev, f))
1065    }
1066
1067    /// Apply a mapping operation to each element of the stream, the resulting stream will be the
1068    /// flattened values of the result of the mapping. The mapping function can be stateful.
1069    ///
1070    /// This is equivalent to [`Stream::flat_map`] but with a stateful function.
1071    ///
1072    /// Since the mapping function can be stateful, it is a `FnMut`. This allows expressing simple
1073    /// algorithms with very few lines of code (see examples).
1074    ///
1075    /// The mapping function is _cloned_ inside each replica, and they will not share state between
1076    /// each other. If you want that only a single replica handles all the items you may want to
1077    /// change the parallelism of this operator with [`Stream::replication`].
1078    ///
1079    /// ## Examples
1080    ///
1081    /// This will emit only the _positive prefix-sums_.
1082    ///
1083    /// ```
1084    /// # use renoir::{StreamContext, RuntimeConfig};
1085    /// # use renoir::operator::source::IteratorSource;
1086    /// # let mut env = StreamContext::new_local();
1087    /// let s = env.stream_iter(0..=3);
1088    /// let res = s.rich_flat_map({
1089    ///     let mut elements = Vec::new();
1090    ///     move |y| {
1091    ///         let new_pairs = elements
1092    ///             .iter()
1093    ///             .map(|&x: &u32| (x, y))
1094    ///             .collect::<Vec<_>>();
1095    ///         elements.push(y);
1096    ///         new_pairs
1097    ///     }
1098    /// }).collect_vec();
1099    ///
1100    /// env.execute_blocking();
1101    ///
1102    /// assert_eq!(res.get().unwrap(), vec![(0, 1), (0, 2), (1, 2), (0, 3), (1, 3), (2, 3)]);
1103    /// ```
1104    pub fn rich_flat_map<It, F>(self, f: F) -> Stream<impl Operator<Out = It::Item>>
1105    where
1106        It: IntoIterator + Send + 'static,
1107        <It as IntoIterator>::IntoIter: Send + 'static,
1108        <It as IntoIterator>::Item: Send,
1109        F: FnMut(Op::Out) -> It + Send + Clone + 'static,
1110    {
1111        self.rich_map(f).flatten()
1112    }
1113
1114    /// Map the elements of the stream into new elements. The mapping function can be stateful.
1115    ///
1116    /// This version of `rich_flat_map` is a lower level primitive that gives full control over the
1117    /// inner types used in streams. It can be used to define custom unary operators.
1118    ///
1119    /// The closure must follow these rules to ensure the correct behaviour of renoir:
1120    /// + `Watermark` messages must be sent when no more items with lower timestamp will ever be produced
1121    /// + `FlushBatch` messages must be forwarded if received
1122    /// + For each `FlushAndRestart` and `Terminate` message received, the operator must generate
1123    ///   one and only one message of the same kind. No other messages of this kind should be created
1124    ///
1125    /// The mapping function is _cloned_ inside each replica, and they will not share state between
1126    /// each other. If you want that only a single replica handles all the items you may want to
1127    /// change the parallelism of this operator with [`Stream::replication`].
1128    ///
1129    /// ## Examples
1130    ///
1131    /// TODO
1132    pub fn rich_map_custom<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
1133    where
1134        F: FnMut(ElementGenerator<Op>) -> StreamElement<O> + Clone + Send + 'static,
1135        O: Send,
1136    {
1137        self.add_operator(|prev| RichMapCustom::new(prev, f))
1138    }
1139
1140    /// Apply a mapping operation to each element of the stream, the resulting stream will be the
1141    /// flatMaped values of the result of the mapping.
1142    ///
1143    /// **Note**: this is very similar to [`Iteartor::flat_map`](std::iter::Iterator::flat_map)
1144    ///
1145    /// ## Example
1146    ///
1147    /// ```
1148    /// # use renoir::{StreamContext, RuntimeConfig};
1149    /// # use renoir::operator::source::IteratorSource;
1150    /// # let mut env = StreamContext::new_local();
1151    /// let s = env.stream_iter(0..3);
1152    /// let res = s.flat_map(|n| vec![n, n]).collect_vec();
1153    ///
1154    /// env.execute_blocking();
1155    ///
1156    /// assert_eq!(res.get().unwrap(), vec![0, 0, 1, 1, 2, 2]);
1157    /// ```
1158    pub fn flat_map<It, F>(self, f: F) -> Stream<impl Operator<Out = It::Item>>
1159    where
1160        It: IntoIterator,
1161        It::IntoIter: Send,
1162        It::Item: Send,
1163        F: Fn(Op::Out) -> It + Send + Clone,
1164    {
1165        self.add_operator(|prev| FlatMap::new(prev, f))
1166    }
1167
1168    /// Apply the given function to all the elements of the stream, consuming the stream.
1169    ///
1170    /// ## Example
1171    ///
1172    /// ```
1173    /// # use renoir::{StreamContext, RuntimeConfig};
1174    /// # use renoir::operator::source::IteratorSource;
1175    /// # let mut env = StreamContext::new_local();
1176    /// let s = env.stream_iter(0..5);
1177    /// s.for_each(|n| println!("Item: {}", n));
1178    ///
1179    /// env.execute_blocking();
1180    /// ```
1181    pub fn for_each<F>(self, f: F)
1182    where
1183        F: FnMut(Op::Out) + Send + Clone + 'static,
1184    {
1185        self.add_operator(|prev| ForEach::new(prev, f))
1186            .finalize_block();
1187    }
1188
1189    /// Transform this stream of containers into a stream of all the contained values.
1190    ///
1191    /// **Note**: this is very similar to [`Iteartor::flatten`](std::iter::Iterator::flatten)
1192    ///
1193    /// ## Example
1194    ///
1195    /// ```
1196    /// # use renoir::{StreamContext, RuntimeConfig};
1197    /// # use renoir::operator::source::IteratorSource;
1198    /// # let mut env = StreamContext::new_local();
1199    /// let s = env.stream_iter((vec![
1200    ///     vec![1, 2, 3],
1201    ///     vec![],
1202    ///     vec![4, 5],
1203    /// ].into_iter()));
1204    /// let res = s.flatten().collect_vec();
1205    ///
1206    /// env.execute_blocking();
1207    ///
1208    /// assert_eq!(res.get().unwrap(), vec![1, 2, 3, 4, 5]);
1209    /// ```
1210    pub fn flatten(self) -> Stream<impl Operator<Out = <Op::Out as IntoIterator>::Item>>
1211    where
1212        Op::Out: IntoIterator,
1213        <Op::Out as IntoIterator>::IntoIter: Send,
1214        <Op::Out as IntoIterator>::Item: Send,
1215    {
1216        self.add_operator(|prev| Flatten::new(prev))
1217    }
1218
1219    /// Sort the items in the stream using the provided comparison function.
1220    ///
1221    /// **Note**: This is a blocking operator an will retain items until the end
1222    /// of the stream or a restart.
1223    ///
1224    /// ## Example
1225    ///
1226    /// ```
1227    /// # use renoir::{StreamContext, RuntimeConfig};
1228    /// # use renoir::operator::source::IteratorSource;
1229    /// # use rand::{rng, Rng};
1230    /// # use rand::seq::SliceRandom;
1231    /// # let mut env = StreamContext::new_local();
1232    ///
1233    /// let mut vec: Vec<_> = (0..20).collect::<Vec<_>>();
1234    /// vec.shuffle(&mut rng());
1235    ///
1236    /// let s = env.stream_iter(vec.into_iter());
1237    /// let res = s.sorted_by(|a,b| a.cmp(b)).collect_vec();
1238    ///
1239    /// env.execute_blocking();
1240    ///
1241    /// assert_eq!(res.get().unwrap(), (0..20).collect::<Vec<_>>());
1242    /// ```
1243    pub fn sorted_by<F>(self, compare: F) -> Stream<LimitSorted<F, Op>>
1244    where
1245        F: Fn(&Op::Out, &Op::Out) -> std::cmp::Ordering + Clone + Send,
1246    {
1247        self.add_operator(|prev| LimitSorted::new(prev, compare, None, None, true))
1248    }
1249
1250    /// Keep only the first `limit` items. If an `offset` is specified, keep only
1251    /// the first `limit` items after skipping `offset` elements. The order of the
1252    /// items across partitions is unspecified.
1253    ///
1254    /// **Note**: This is a blocking operator an will retain items until the end
1255    /// of the stream or a restart.
1256    /// **Note**: This operator will run all the previous computations to completion
1257    /// even if requires processing more elements than limit in order to preserve
1258    /// correctness
1259    ///
1260    /// ## Example
1261    ///
1262    /// ```
1263    /// # use renoir::{StreamContext, RuntimeConfig};
1264    /// # use renoir::operator::source::IteratorSource;
1265    /// # let mut env = StreamContext::new_local();
1266    ///
1267    /// let mut vec: Vec<_> = (0..80).collect::<Vec<_>>();
1268    ///
1269    /// let s = env.stream_iter(vec.into_iter());
1270    /// let res = s.limit(20, Some(40)).collect_vec();
1271    ///
1272    /// env.execute_blocking();
1273    ///
1274    /// assert_eq!(res.get().unwrap(), (40..60).collect::<Vec<_>>());
1275    /// ```
1276    pub fn limit(
1277        self,
1278        limit: usize,
1279        offset: Option<usize>,
1280    ) -> Stream<LimitSorted<impl Fn(&Op::Out, &Op::Out) -> Ordering + Clone + Send, Op>> {
1281        use std::cmp::Ordering;
1282
1283        self.add_operator(|prev| {
1284            LimitSorted::new(prev, |_, _| Ordering::Equal, Some(limit), offset, false)
1285        })
1286    }
1287
1288    /// Sort the items in the stream using the provided comparison function.
1289    /// Keep only the first `limit` items. If an `offset` is specified, keep only
1290    /// the first `limit` items after skipping `offset` elements.
1291    ///
1292    /// **Note**: This is a blocking operator an will retain items until the end
1293    /// of the stream or a restart.
1294    /// **Note**: This operator will run all the previous computations to completion
1295    /// even if requires processing more elements than limit in order to preserve
1296    /// correctness
1297    ///
1298    /// ## Example
1299    ///
1300    /// ```
1301    /// # use renoir::{StreamContext, RuntimeConfig};
1302    /// # use renoir::operator::source::IteratorSource;
1303    /// # use rand::{rng, Rng};
1304    /// # use rand::seq::SliceRandom;
1305    /// # let mut env = StreamContext::new_local();
1306    ///
1307    /// let mut vec: Vec<_> = (0..80).collect::<Vec<_>>();
1308    /// vec.shuffle(&mut rng());
1309    ///
1310    /// let s = env.stream_iter(vec.into_iter());
1311    /// let res = s.sorted_limit_by(|a,b| a.cmp(b), 20, Some(40)).collect_vec();
1312    ///
1313    /// env.execute_blocking();
1314    ///
1315    /// assert_eq!(res.get().unwrap(), (40..60).collect::<Vec<_>>());
1316    /// ```
1317    pub fn sorted_limit_by<F>(
1318        self,
1319        compare: F,
1320        limit: usize,
1321        offset: Option<usize>,
1322    ) -> Stream<LimitSorted<F, Op>>
1323    where
1324        F: Fn(&Op::Out, &Op::Out) -> std::cmp::Ordering + Clone + Send,
1325    {
1326        self.add_operator(|prev| LimitSorted::new(prev, compare, Some(limit), offset, true))
1327    }
1328}
1329
1330impl<I, Op> Stream<Op>
1331where
1332    I: ExchangeData,
1333    Op: Operator<Out = I> + 'static,
1334{
1335    /// Duplicate each element of the stream and forward it to all the replicas of the next block.
1336    ///
1337    /// **Note**: this will duplicate the elements of the stream, this is potentially a very
1338    /// expensive operation.
1339    ///
1340    /// **Note**: this operator will split the current block.
1341    ///
1342    /// ## Example
1343    ///
1344    /// ```
1345    /// # use renoir::{StreamContext, RuntimeConfig};
1346    /// # use renoir::operator::source::IteratorSource;
1347    /// # let mut env = StreamContext::new_local();
1348    /// let s = env.stream_iter(0..10);
1349    /// s.broadcast();
1350    /// ```
1351    pub fn broadcast(self) -> Stream<impl Operator<Out = Op::Out>> {
1352        self.split_block(End::new, NextStrategy::all())
1353    }
1354
1355    /// Given a stream, make a [`KeyedStream`] partitioning the values according to a key generated
1356    /// by the `keyer` function provided.
1357    ///
1358    /// The returned [`KeyedStream`] is partitioned by key, and all the operators added to it will
1359    /// be evaluated _after_ the network shuffle. Therefore all the items are sent to the network
1360    /// (if their destination is not the local host). In many cases this behaviour can be avoided by
1361    /// using the associative variant of the operators (e.g. [`Stream::group_by_reduce`],
1362    /// [`Stream::group_by_sum`], ...).
1363    ///
1364    /// **Note**: the keys are not sent to the network, they are built on the sending side, and
1365    /// rebuilt on the receiving side.
1366    ///
1367    /// **Note**: this operator will split the current block.
1368    ///
1369    /// ## Example
1370    /// ```
1371    /// # use renoir::{StreamContext, RuntimeConfig};
1372    /// # use renoir::operator::source::IteratorSource;
1373    /// # let mut env = StreamContext::new_local();
1374    /// let s = env.stream_iter(0..5);
1375    /// let keyed = s.group_by(|&n| n % 2); // partition even and odd elements
1376    /// ```
1377    pub fn group_by<K, Fk>(self, keyer: Fk) -> KeyedStream<impl Operator<Out = (K, I)>>
1378    where
1379        Fk: Fn(&Op::Out) -> K + Send + Clone + 'static,
1380        K: DataKey,
1381    {
1382        let next_strategy = NextStrategy::group_by(keyer.clone());
1383        let new_stream = self
1384            .split_block(End::new, next_strategy)
1385            .add_operator(|prev| KeyBy::new(prev, keyer));
1386        KeyedStream(new_stream)
1387    }
1388
1389    /// Find, for each partition of the stream, the item with the largest value.
1390    ///
1391    /// The stream is partitioned using the `keyer` function and the value to compare is obtained
1392    /// with `get_value`.
1393    ///
1394    /// This operation is associative, therefore the computation is done in parallel before sending
1395    /// all the elements to the network.
1396    ///
1397    /// **Note**: the comparison is done using the value returned by `get_value`, but the resulting
1398    /// items have the same type as the input.
1399    ///
1400    /// **Note**: this operator will split the current block.
1401    ///
1402    /// ## Example
1403    /// ```
1404    /// # use renoir::{StreamContext, RuntimeConfig};
1405    /// # use renoir::operator::source::IteratorSource;
1406    /// # let mut env = StreamContext::new_local();
1407    /// let s = env.stream_iter(0..5);
1408    /// let res = s
1409    ///     .group_by_max_element(|&n| n % 2, |&n| n)
1410    ///     .collect_vec();
1411    ///
1412    /// env.execute_blocking();
1413    ///
1414    /// let mut res = res.get().unwrap();
1415    /// res.sort_unstable();
1416    /// assert_eq!(res, vec![(0, 4), (1, 3)]);
1417    /// ```
1418    pub fn group_by_max_element<K, V, Fk, Fv>(
1419        self,
1420        keyer: Fk,
1421        get_value: Fv,
1422    ) -> KeyedStream<impl Operator<Out = (K, I)>>
1423    where
1424        Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K,
1425        Fv: KeyerFn<V, Op::Out> + Fn(&Op::Out) -> V,
1426        K: ExchangeDataKey,
1427        V: Ord,
1428    {
1429        self.group_by_reduce(keyer, move |out, value| {
1430            if get_value(&value) > get_value(out) {
1431                *out = value;
1432            }
1433        })
1434    }
1435
1436    /// Find, for each partition of the stream, the sum of the values of the items.
1437    ///
1438    /// The stream is partitioned using the `keyer` function and the value to sum is obtained
1439    /// with `get_value`.
1440    ///
1441    /// This operation is associative, therefore the computation is done in parallel before sending
1442    /// all the elements to the network.
1443    ///
1444    /// **Note**: this is similar to the SQL: `SELECT SUM(value) ... GROUP BY key`
1445    ///
1446    /// **Note**: the type of the result does not have to be a number, any type that implements
1447    /// `AddAssign` is accepted.
1448    ///
1449    /// **Note**: this operator will split the current block.
1450    ///
1451    /// ## Example
1452    /// ```
1453    /// # use renoir::{StreamContext, RuntimeConfig};
1454    /// # use renoir::operator::source::IteratorSource;
1455    /// # let mut env = StreamContext::new_local();
1456    /// let s = env.stream_iter(0..5);
1457    /// let res = s
1458    ///     .group_by_sum(|&n| n % 2, |n| n)
1459    ///     .collect_vec();
1460    ///
1461    /// env.execute_blocking();
1462    ///
1463    /// let mut res = res.get().unwrap();
1464    /// res.sort_unstable();
1465    /// assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
1466    /// ```
1467    pub fn group_by_sum<K, V, Fk, Fv>(
1468        self,
1469        keyer: Fk,
1470        get_value: Fv,
1471    ) -> KeyedStream<impl Operator<Out = (K, V)>>
1472    where
1473        Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K,
1474        Fv: Fn(Op::Out) -> V + Clone + Send + 'static,
1475        V: ExchangeData + AddAssign,
1476        K: ExchangeDataKey,
1477    {
1478        let s = self.group_by_fold(
1479            keyer,
1480            None,
1481            move |acc, value| {
1482                if let Some(acc) = acc {
1483                    *acc += get_value(value);
1484                } else {
1485                    *acc = Some(get_value(value));
1486                }
1487            },
1488            |acc, value| match acc {
1489                None => *acc = value,
1490                Some(acc) => {
1491                    if let Some(value) = value {
1492                        *acc += value
1493                    }
1494                }
1495            },
1496        );
1497        s.map(|(_, o)| o.unwrap())
1498    }
1499
1500    /// Find, for each partition of the stream, the average of the values of the items.
1501    ///
1502    /// The stream is partitioned using the `keyer` function and the value to average is obtained
1503    /// with `get_value`.
1504    ///
1505    /// This operation is associative, therefore the computation is done in parallel before sending
1506    /// all the elements to the network.
1507    ///
1508    /// **Note**: this is similar to the SQL: `SELECT AVG(value) ... GROUP BY key`
1509    ///
1510    /// **Note**: the type of the result does not have to be a number, any type that implements
1511    /// `AddAssign` and can be divided by `f64` is accepted.
1512    ///
1513    /// **Note**: this operator will split the current block.
1514    ///
1515    /// ## Example
1516    /// ```
1517    /// # use renoir::{StreamContext, RuntimeConfig};
1518    /// # use renoir::operator::source::IteratorSource;
1519    /// # let mut env = StreamContext::new_local();
1520    /// let s = env.stream_iter(0..5);
1521    /// let res = s
1522    ///     .group_by_avg(|&n| n % 2, |&n| n as f64)
1523    ///     .collect_vec();
1524    ///
1525    /// env.execute_blocking();
1526    ///
1527    /// let mut res = res.get().unwrap();
1528    /// res.sort_by_key(|(k, _)| *k);
1529    /// assert_eq!(res, vec![(0, (0.0 + 2.0 + 4.0) / 3.0), (1, (1.0 + 3.0) / 2.0)]);
1530    /// ```
1531    pub fn group_by_avg<K, V, Fk, Fv>(
1532        self,
1533        keyer: Fk,
1534        get_value: Fv,
1535    ) -> KeyedStream<impl Operator<Out = (K, V)>>
1536    where
1537        Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K,
1538        Fv: KeyerFn<V, Op::Out> + Fn(&Op::Out) -> V,
1539        V: ExchangeData + AddAssign + Div<f64, Output = V>,
1540        K: ExchangeDataKey,
1541    {
1542        self.group_by_fold(
1543            keyer,
1544            (None, 0usize),
1545            move |(sum, count), value| {
1546                *count += 1;
1547                match sum {
1548                    Some(sum) => *sum += get_value(&value),
1549                    None => *sum = Some(get_value(&value)),
1550                }
1551            },
1552            |(sum, count), (local_sum, local_count)| {
1553                *count += local_count;
1554                match sum {
1555                    None => *sum = local_sum,
1556                    Some(sum) => {
1557                        if let Some(local_sum) = local_sum {
1558                            *sum += local_sum;
1559                        }
1560                    }
1561                }
1562            },
1563        )
1564        .map(|(_, (sum, count))| sum.unwrap() / (count as f64))
1565    }
1566
1567    /// Count, for each partition of the stream, the number of items.
1568    ///
1569    /// The stream is partitioned using the `keyer` function.
1570    ///
1571    /// This operation is associative, therefore the computation is done in parallel before sending
1572    /// all the elements to the network.
1573    ///
1574    /// **Note**: this is similar to the SQL: `SELECT COUNT(*) ... GROUP BY key`
1575    ///
1576    /// **Note**: this operator will split the current block.
1577    ///
1578    /// ## Example
1579    /// ```
1580    /// # use renoir::{StreamContext, RuntimeConfig};
1581    /// # use renoir::operator::source::IteratorSource;
1582    /// # let mut env = StreamContext::new_local();
1583    /// let s = env.stream_iter(0..5);
1584    /// let res = s
1585    ///     .group_by_count(|&n| n % 2)
1586    ///     .collect_vec();
1587    ///
1588    /// env.execute_blocking();
1589    ///
1590    /// let mut res = res.get().unwrap();
1591    /// res.sort_by_key(|(k, _)| *k);
1592    /// assert_eq!(res, vec![(0, 3), (1, 2)]);
1593    /// ```
1594    pub fn group_by_count<K, Fk>(self, keyer: Fk) -> KeyedStream<impl Operator<Out = (K, usize)>>
1595    where
1596        Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K,
1597        K: ExchangeDataKey,
1598    {
1599        self.group_by_fold(
1600            keyer,
1601            0,
1602            move |count, _| *count += 1,
1603            |count, local_count| *count += local_count,
1604        )
1605    }
1606
1607    /// Find, for each partition of the stream, the item with the smallest value.
1608    ///
1609    /// The stream is partitioned using the `keyer` function and the value to compare is obtained
1610    /// with `get_value`.
1611    ///
1612    /// This operation is associative, therefore the computation is done in parallel before sending
1613    /// all the elements to the network.
1614    ///
1615    /// **Note**: the comparison is done using the value returned by `get_value`, but the resulting
1616    /// items have the same type as the input.
1617    ///
1618    /// **Note**: this operator will split the current block.
1619    ///
1620    /// ## Example
1621    /// ```
1622    /// # use renoir::{StreamContext, RuntimeConfig};
1623    /// # use renoir::operator::source::IteratorSource;
1624    /// # let mut env = StreamContext::new_local();
1625    /// let s = env.stream_iter(0..5);
1626    /// let res = s
1627    ///     .group_by_min_element(|&n| n % 2, |&n| n)
1628    ///     .collect_vec();
1629    ///
1630    /// env.execute_blocking();
1631    ///
1632    /// let mut res = res.get().unwrap();
1633    /// res.sort_unstable();
1634    /// assert_eq!(res, vec![(0, 0), (1, 1)]);
1635    /// ```
1636    pub fn group_by_min_element<K, V, Fk, Fv>(
1637        self,
1638        keyer: Fk,
1639        get_value: Fv,
1640    ) -> KeyedStream<impl Operator<Out = (K, I)>>
1641    where
1642        Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K,
1643        Fv: KeyerFn<V, Op::Out> + Fn(&Op::Out) -> V,
1644        K: ExchangeDataKey,
1645        V: Ord,
1646    {
1647        self.group_by_reduce(keyer, move |out, value| {
1648            if get_value(&value) < get_value(out) {
1649                *out = value;
1650            }
1651        })
1652    }
1653
1654    /// Perform the reduction operation separately for each key.
1655    ///
1656    /// This is equivalent of partitioning the stream using the `keyer` function, and then applying
1657    /// [`Stream::reduce_assoc`] to each partition separately.
1658    ///
1659    /// Note however that there is a difference between `stream.group_by(keyer).reduce(...)` and
1660    /// `stream.group_by_reduce(keyer, ...)`. The first performs the network shuffle of every item in
1661    /// the stream, and **later** performs the reduction (i.e. nearly all the elements will be sent to
1662    /// the network). The latter avoids sending the items by performing first a local reduction on
1663    /// each host, and then send only the locally reduced results (i.e. one message per replica, per
1664    /// key); then the global step is performed aggregating the results.
1665    ///
1666    /// The resulting stream will still be keyed and will contain only a single message per key (the
1667    /// final result).
1668    ///
1669    /// Note that the output type must be the same as the input type, if you need a different type
1670    /// consider using [`Stream::group_by_fold`].
1671    ///
1672    /// **Note**: this operator will retain all the messages of the stream and emit the values only
1673    /// when the stream ends. Therefore this is not properly _streaming_.
1674    ///
1675    /// **Note**: this operator will split the current block.
1676    ///
1677    /// ## Example
1678    /// ```
1679    /// # use renoir::{StreamContext, RuntimeConfig};
1680    /// # use renoir::operator::source::IteratorSource;
1681    /// # let mut env = StreamContext::new_local();
1682    /// let s = env.stream_iter(0..5);
1683    /// let res = s
1684    ///     .group_by_reduce(|&n| n % 2, |acc, value| *acc += value)
1685    ///     .collect_vec();
1686    ///
1687    /// env.execute_blocking();
1688    ///
1689    /// let mut res = res.get().unwrap();
1690    /// res.sort_unstable();
1691    /// assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
1692    /// ```
1693    pub fn group_by_reduce<K, Fk, F>(
1694        self,
1695        keyer: Fk,
1696        f: F,
1697    ) -> KeyedStream<impl Operator<Out = (K, I)>>
1698    where
1699        Fk: Fn(&Op::Out) -> K + Send + Clone + 'static,
1700        F: Fn(&mut I, I) + Send + Clone + 'static,
1701        K: ExchangeDataKey,
1702    {
1703        let f2 = f.clone();
1704
1705        self.group_by_fold(
1706            keyer,
1707            None,
1708            move |acc, value| match acc {
1709                None => *acc = Some(value),
1710                Some(acc) => f(acc, value),
1711            },
1712            move |acc1, acc2| match acc1 {
1713                None => *acc1 = acc2,
1714                Some(acc1) => {
1715                    if let Some(acc2) = acc2 {
1716                        f2(acc1, acc2)
1717                    }
1718                }
1719            },
1720        )
1721        .map(|(_, value)| value.unwrap())
1722    }
1723
1724    /// Given two streams **with timestamps** join them according to an interval centered around the
1725    /// timestamp of the left side.
1726    ///
1727    /// This means that an element on the left side with timestamp T will be joined to all the
1728    /// elements on the right with timestamp Q such that `T - lower_bound <= Q <= T + upper_bound`.
1729    ///
1730    /// **Note**: this operator is not parallelized, all the elements are sent to a single node to
1731    /// perform the join.
1732    ///
1733    /// **Note**: this operator will split the current block.
1734    ///
1735    /// ## Example
1736    /// TODO: example
1737    #[cfg(feature = "timestamp")]
1738    pub fn interval_join<I2, Op2>(
1739        self,
1740        right: Stream<Op2>,
1741        lower_bound: Timestamp,
1742        upper_bound: Timestamp,
1743    ) -> Stream<impl Operator<Out = (I, I2)>>
1744    where
1745        I2: ExchangeData,
1746        Op2: Operator<Out = I2> + 'static,
1747    {
1748        let left = self.replication(Replication::One);
1749        let right = right.replication(Replication::One);
1750        left.merge_distinct(right)
1751            .key_by(|_| ())
1752            .add_operator(Reorder::new)
1753            .add_operator(|prev| IntervalJoin::new(prev, lower_bound, upper_bound))
1754            .drop_key()
1755    }
1756
1757    /// Change the maximum parallelism of the following operators.
1758    ///
1759    /// **Note**: this operator is pretty advanced, some operators may need to be fully replicated
1760    /// and will fail otherwise.
1761    pub fn replication(self, replication: Replication) -> Stream<impl Operator<Out = Op::Out>> {
1762        let mut new_stream = self.split_block(End::new, NextStrategy::only_one());
1763        // TODO: Cannot scale up
1764        new_stream.block.scheduling.replication(replication);
1765        new_stream
1766    }
1767
1768    /// Advanced operator that allows changing the replication and forwarding strategy
1769    ///
1770    /// **Note**: this operator is advanced and is only intended to add functionality
1771    /// that is not achievable with other operators. Use with care
1772    pub(crate) fn repartition<Fk: KeyerFn<u64, Op::Out>>(
1773        self,
1774        replication: Replication,
1775        next_strategy: NextStrategy<Op::Out, Fk>,
1776    ) -> Stream<impl Operator<Out = Op::Out>> {
1777        let mut new_stream = self.split_block(End::new, next_strategy);
1778        new_stream.block.scheduling.replication(replication);
1779        new_stream
1780    }
1781
1782    /// Advanced operator that allows changing the replication and forwarding strategy
1783    ///
1784    /// **Note**: this operator is advanced and is only intended to add functionality
1785    /// that is not achievable with other operators. Use with care
1786    pub fn repartition_by<Fk: KeyerFn<u64, Op::Out>>(
1787        self,
1788        replication: Replication,
1789        partition_fn: Fk,
1790    ) -> Stream<impl Operator<Out = Op::Out>> {
1791        let mut new_stream = self.split_block(End::new, NextStrategy::group_by(partition_fn));
1792        new_stream.block.scheduling.replication(replication);
1793        new_stream
1794    }
1795
1796    /// Reduce the stream into a stream that emits a single value.
1797    ///
1798    /// The reducing operator consists in adding to the current accumulation value  the value of the
1799    /// current item in the stream.
1800    ///
1801    /// The reducing function is provided with a mutable reference to the current accumulator and the
1802    /// owned item of the stream. The function should modify the accumulator without returning
1803    /// anything.
1804    ///
1805    /// Note that the output type must be the same as the input type, if you need a different type
1806    /// consider using [`Stream::fold`].
1807    ///
1808    /// **Note**: this operator will retain all the messages of the stream and emit the values only
1809    /// when the stream ends. Therefore this is not properly _streaming_.
1810    ///
1811    /// **Note**: this operator is not parallelized, it creates a bottleneck where all the stream
1812    /// elements are sent to and the folding is done using a single thread.
1813    ///
1814    /// **Note**: this is very similar to [`Iteartor::reduce`](std::iter::Iterator::reduce).
1815    ///
1816    /// **Note**: this operator will split the current block.
1817    ///
1818    /// ## Example
1819    ///
1820    /// ```
1821    /// # use renoir::{StreamContext, RuntimeConfig};
1822    /// # use renoir::operator::source::IteratorSource;
1823    /// # let mut env = StreamContext::new_local();
1824    /// let s = env.stream_iter(0..5);
1825    /// let res = s.reduce(|a, b| a + b).collect::<Vec<_>>();
1826    ///
1827    /// env.execute_blocking();
1828    ///
1829    /// assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
1830    /// ```
1831    pub fn reduce<F>(self, f: F) -> Stream<impl Operator<Out = Op::Out>>
1832    where
1833        F: Fn(I, I) -> I + Send + Clone + 'static,
1834    {
1835        self.fold(None, move |acc, b| {
1836            *acc = Some(if let Some(a) = acc.take() { f(a, b) } else { b })
1837        })
1838        .map(|value| value.unwrap())
1839    }
1840
1841    /// Reduce the stream into a stream that emits a single value.
1842    ///
1843    /// The reducing operator consists in adding to the current accumulation value the value of the
1844    /// current item in the stream.
1845    ///
1846    /// This method is very similary to [`Stream::reduce`], but performs the reduction distributely.
1847    /// To do so the reducing function must be _associative_, in particular the reducing process is
1848    /// performed in 2 steps:
1849    ///
1850    /// - local: the reducing function is used to reduce the elements present in each replica of
1851    ///   the stream independently.
1852    /// - global: all the partial results (the elements produced by the local step) have to be
1853    ///   aggregated into a single result.
1854    ///
1855    /// Note that the output type must be the same as the input type, if you need a different type
1856    /// consider using [`Stream::fold_assoc`].
1857    ///
1858    /// **Note**: this operator will retain all the messages of the stream and emit the values only
1859    /// when the stream ends. Therefore this is not properly _streaming_.
1860    ///
1861    /// **Note**: this operator will split the current block.
1862    ///
1863    /// ## Example
1864    ///
1865    /// ```
1866    /// # use renoir::{StreamContext, RuntimeConfig};
1867    /// # use renoir::operator::source::IteratorSource;
1868    /// # let mut env = StreamContext::new_local();
1869    /// let s = env.stream_iter(0..5);
1870    /// let res = s.reduce_assoc(|a, b| a + b).collect_vec();
1871    ///
1872    /// env.execute_blocking();
1873    ///
1874    /// assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
1875    /// ```
1876    pub fn reduce_assoc<F>(self, f: F) -> Stream<impl Operator<Out = Op::Out>>
1877    where
1878        F: Fn(I, I) -> I + Send + Clone + 'static,
1879    {
1880        let f2 = f.clone();
1881
1882        self.fold_assoc(
1883            None,
1884            move |acc, b| *acc = Some(if let Some(a) = acc.take() { f(a, b) } else { b }),
1885            move |acc1, mut acc2| {
1886                *acc1 = match (acc1.take(), acc2.take()) {
1887                    (Some(a), Some(b)) => Some(f2(a, b)),
1888                    (None, Some(a)) | (Some(a), None) => Some(a),
1889                    (None, None) => None,
1890                }
1891            },
1892        )
1893        .map(|value| value.unwrap())
1894    }
1895
1896    /// Route each element depending on its content.
1897    ///
1898    /// + Routes are created with the `add_route` method, a new stream is created for each route.
1899    /// + Each element is routed to the first stream for which the routing condition evaluates to true.
1900    /// + If no route condition is satisfied, the element is dropped
1901    ///
1902    /// **Note**: this operator will split the current block.
1903    ///
1904    /// ## Example
1905    ///
1906    /// ```
1907    /// # use renoir::prelude::*;
1908    /// # let mut env = StreamContext::new_local();
1909    /// # let s = env.stream_iter(0..10);
1910    /// let mut routes = s.route()
1911    ///     .add_route(|&i| i < 5)
1912    ///     .add_route(|&i| i % 2 == 0)
1913    ///     .build()
1914    ///     .into_iter();
1915    /// assert_eq!(routes.len(), 2);
1916    /// // 0 1 2 3 4
1917    /// routes.next().unwrap().for_each(|i| eprintln!("route1: {i}"));
1918    /// // 6 8
1919    /// routes.next().unwrap().for_each(|i| eprintln!("route2: {i}"));
1920    /// // 5 7 9 ignored
1921    /// env.execute_blocking();
1922    /// ```
1923    pub fn route(self) -> RouterBuilder<I, Op> {
1924        RouterBuilder::new(self)
1925    }
1926
1927    /// Perform a network shuffle sending the messages to a random replica.
1928    ///
1929    /// This can be useful if for some reason the load is very unbalanced (e.g. after a very
1930    /// unbalanced [`Stream::group_by`]).
1931    ///
1932    /// **Note**: this operator will split the current block.
1933    ///
1934    /// ## Example
1935    ///
1936    /// ```
1937    /// # use renoir::{StreamContext, RuntimeConfig};
1938    /// # use renoir::operator::source::IteratorSource;
1939    /// # let mut env = StreamContext::new_local();
1940    /// let s = env.stream_iter(0..5);
1941    /// let res = s.shuffle();
1942    /// ```
1943    pub fn shuffle(self) -> Stream<impl Operator<Out = Op::Out>> {
1944        self.split_block(End::new, NextStrategy::random())
1945    }
1946
1947    /// Split the stream into `splits` streams, each with all the elements of the first one.
1948    ///
1949    /// This will effectively duplicate every item in the stream into the newly created streams.
1950    ///
1951    /// **Note**: this operator will split the current block.
1952    ///
1953    /// ## Example
1954    ///
1955    /// ```
1956    /// # use renoir::{StreamContext, RuntimeConfig};
1957    /// # use renoir::operator::source::IteratorSource;
1958    /// # let mut env = StreamContext::new_local();
1959    /// let s = env.stream_iter(0..5);
1960    /// let mut splits = s.split(3);
1961    /// let a = splits.pop().unwrap();
1962    /// let b = splits.pop().unwrap();
1963    /// let c = splits.pop().unwrap();
1964    /// ```
1965    pub fn split(self, splits: usize) -> Vec<Stream<impl Operator<Out = Op::Out>>> {
1966        // This is needed to maintain the same parallelism of the split block
1967        let scheduler_requirements = self.block.scheduling.clone();
1968        let mut new_stream = self.split_block(End::new, NextStrategy::only_one());
1969        new_stream.block.scheduling = scheduler_requirements;
1970
1971        let mut streams = Vec::with_capacity(splits);
1972        for _ in 0..splits - 1 {
1973            streams.push(new_stream.clone());
1974        }
1975        streams.push(new_stream);
1976
1977        streams
1978    }
1979
1980    /// Given two [`Stream`]s, zip their elements together: the resulting stream will be a stream of
1981    /// pairs, each of which is an element from both streams respectively.
1982    ///
1983    /// **Note**: all the elements after the end of one of the streams are discarded (i.e. the
1984    /// resulting stream will have a number of elements that is the minimum between the lengths of
1985    /// the two input streams).
1986    ///
1987    /// **Note**: this operator will split the current block.
1988    ///
1989    /// ## Example
1990    ///
1991    /// ```
1992    /// # use renoir::{StreamContext, RuntimeConfig};
1993    /// # use renoir::operator::source::IteratorSource;
1994    /// # let mut env = StreamContext::new_local();
1995    /// let s1 = env.stream_iter((vec!['A', 'B', 'C', 'D'].into_iter()));
1996    /// let s2 = env.stream_iter((vec![1, 2, 3].into_iter()));
1997    /// let res = s1.zip(s2).collect_vec();
1998    ///
1999    /// env.execute_blocking();
2000    ///
2001    /// assert_eq!(res.get().unwrap(), vec![('A', 1), ('B', 2), ('C', 3)]);
2002    /// ```
2003    pub fn zip<I2, Op2>(self, oth: Stream<Op2>) -> Stream<impl Operator<Out = (I, I2)>>
2004    where
2005        Op2: Operator<Out = I2> + 'static,
2006        I2: ExchangeData,
2007    {
2008        let mut new_stream = self.binary_connection(
2009            oth,
2010            Zip::new,
2011            NextStrategy::only_one(),
2012            NextStrategy::only_one(),
2013        );
2014        // if the zip operator is partitioned there could be some loss of data
2015        new_stream.block.scheduling.replication(Replication::One);
2016        new_stream
2017    }
2018
2019    /// Close the stream and send resulting items to a channel on a single host.
2020    ///
2021    /// If the stream is distributed among multiple replicas, parallelism will
2022    /// be set to 1 to gather all results
2023    ///
2024    /// **Note**: the order of items and keys is unspecified.
2025    ///
2026    /// **Note**: this operator will split the current block.
2027    ///
2028    /// ## Example
2029    ///
2030    /// ```
2031    /// # use renoir::{StreamContext, RuntimeConfig};
2032    /// # use renoir::operator::source::IteratorSource;
2033    /// # let mut env = StreamContext::new_local();
2034    /// let s = env.stream_iter(0..10u32);
2035    /// let rx = s.collect_channel();
2036    ///
2037    /// env.execute_blocking();
2038    /// let mut v = Vec::new();
2039    /// while let Ok(x) = rx.recv() {
2040    ///     v.push(x)
2041    /// }
2042    /// assert_eq!(v, (0..10u32).collect::<Vec<_>>());
2043    /// ```
2044    pub fn collect_channel(self) -> Receiver<I> {
2045        let (tx, rx) = unbounded();
2046        self.replication(Replication::One)
2047            .add_operator(|prev| CollectChannelSink::new(prev, tx))
2048            .finalize_block();
2049        rx
2050    }
2051    /// Close the stream and send resulting items to a channel on each single host.
2052    ///
2053    /// Each host sends its outputs to the channel without repartitioning.
2054    /// Elements will be sent to the channel on the same host that produced
2055    /// the output.
2056    ///
2057    /// **Note**: the order of items and keys is unspecified.
2058    ///
2059    /// ## Example
2060    ///
2061    /// ```
2062    /// # use renoir::{StreamContext, RuntimeConfig};
2063    /// # use renoir::operator::source::IteratorSource;
2064    /// # let mut env = StreamContext::new_local();
2065    /// let s = env.stream_iter(0..10u32);
2066    /// let rx = s.collect_channel();
2067    ///
2068    /// env.execute_blocking();
2069    /// let mut v = Vec::new();
2070    /// while let Ok(x) = rx.recv() {
2071    ///     v.push(x)
2072    /// }
2073    /// assert_eq!(v, (0..10u32).collect::<Vec<_>>());
2074    /// ```
2075    pub fn collect_channel_parallel(self) -> Receiver<I> {
2076        let (tx, rx) = unbounded();
2077        self.add_operator(|prev| CollectChannelSink::new(prev, tx))
2078            .finalize_block();
2079        rx
2080    }
2081
2082    /// Close the stream and store all the resulting items into a [`Vec`] on a single host.
2083    ///
2084    /// If the stream is distributed among multiple replicas, a bottleneck is placed where all the
2085    /// replicas sends the items to.
2086    ///
2087    /// **Note**: the order of items and keys is unspecified.
2088    ///
2089    /// **Note**: this operator will split the current block.
2090    ///
2091    /// ## Example
2092    ///
2093    /// ```
2094    /// # use renoir::{StreamContext, RuntimeConfig};
2095    /// # use renoir::operator::source::IteratorSource;
2096    /// # let mut env = StreamContext::new_local();
2097    /// let s = env.stream_iter(0..10);
2098    /// let res = s.collect_vec();
2099    ///
2100    /// env.execute_blocking();
2101    ///
2102    /// assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
2103    /// ```
2104    pub fn collect_count(self) -> StreamOutput<usize> {
2105        let output = StreamOutputRef::default();
2106        self.add_operator(|prev| Fold::new(prev, 0, |acc, _| *acc += 1))
2107            .replication(Replication::One)
2108            .add_operator(|prev| CollectCountSink::new(prev, output.clone()))
2109            .finalize_block();
2110        StreamOutput::from(output)
2111    }
2112
2113    /// Close the stream and store all the resulting items into a [`Vec`] on a single host.
2114    ///
2115    /// If the stream is distributed among multiple replicas, a bottleneck is placed where all the
2116    /// replicas sends the items to.
2117    ///
2118    /// **Note**: the order of items and keys is unspecified.
2119    ///
2120    /// **Note**: this operator will split the current block.
2121    ///
2122    /// ## Example
2123    ///
2124    /// ```
2125    /// # use renoir::{StreamContext, RuntimeConfig};
2126    /// # use renoir::operator::source::IteratorSource;
2127    /// # let mut env = StreamContext::new_local();
2128    /// let s = env.stream_iter(0..10);
2129    /// let res = s.collect_vec();
2130    ///
2131    /// env.execute_blocking();
2132    ///
2133    /// assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
2134    /// ```
2135    pub fn collect_vec(self) -> StreamOutput<Vec<I>> {
2136        let output = StreamOutputRef::default();
2137        self.replication(Replication::One)
2138            .add_operator(|prev| CollectVecSink::new(prev, output.clone()))
2139            .finalize_block();
2140        StreamOutput::from(output)
2141    }
2142
2143    /// Close the stream and store all the resulting items into a [`Vec`] on a single host.
2144    ///
2145    /// If the stream is distributed among multiple replicas, a bottleneck is placed where all the
2146    /// replicas sends the items to.
2147    ///
2148    /// **Note**: the order of items and keys is unspecified.
2149    ///
2150    /// **Note**: this operator will split the current block.
2151    ///
2152    /// ## Example
2153    ///
2154    /// ```
2155    /// # use renoir::{StreamContext, RuntimeConfig};
2156    /// # use renoir::operator::source::IteratorSource;
2157    /// # let mut env = StreamContext::new_local();
2158    /// let s = env.stream_iter(0..10);
2159    /// let res = s.collect_vec();
2160    ///
2161    /// env.execute_blocking();
2162    ///
2163    /// assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
2164    /// ```
2165    pub fn collect_vec_all(self) -> StreamOutput<Vec<I>> {
2166        let output = StreamOutputRef::default();
2167        self.repartition(Replication::Host, NextStrategy::all())
2168            .add_operator(|prev| CollectVecSink::new(prev, output.clone()))
2169            .finalize_block();
2170        StreamOutput::from(output)
2171    }
2172
2173    /// Close the stream and store all the resulting items into a collection on a single host.
2174    ///
2175    /// If the stream is distributed among multiple replicas, parallelism will
2176    /// be set to 1 to gather all results
2177    ///
2178    /// **Note**: the order of items and keys is unspecified.
2179    ///
2180    /// **Note**: this operator will split the current block.
2181    ///
2182    /// ## Example
2183    ///
2184    /// ```
2185    /// # use renoir::{StreamContext, RuntimeConfig};
2186    /// # use renoir::operator::source::IteratorSource;
2187    /// # let mut env = StreamContext::new_local();
2188    /// let s = env.stream_iter(0..10);
2189    /// let res = s.collect_vec();
2190    ///
2191    /// env.execute_blocking();
2192    ///
2193    /// assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
2194    /// ```
2195    pub fn collect<C: FromIterator<I> + Send + 'static>(self) -> StreamOutput<C> {
2196        let output = StreamOutputRef::default();
2197        self.replication(Replication::One)
2198            .add_operator(|prev| Collect::new(prev, output.clone()))
2199            .finalize_block();
2200        StreamOutput::from(output)
2201    }
2202
2203    /// Close the stream and store all the resulting items into a collection on each single host.
2204    ///
2205    /// Partitioning will be set to Host and results will be replicated
2206    ///
2207    /// **Note**: the order of items and keys is unspecified.
2208    ///
2209    /// **Note**: this operator will split the current block.
2210    ///
2211    /// ## Example
2212    ///
2213    /// ```
2214    /// # use renoir::{StreamContext, RuntimeConfig};
2215    /// # use renoir::operator::source::IteratorSource;
2216    /// # let mut env = StreamContext::new_local();
2217    /// let s = env.stream_iter(0..10);
2218    /// let res = s.collect_vec();
2219    ///
2220    /// env.execute_blocking();
2221    ///
2222    /// assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
2223    /// ```
2224    pub fn collect_all<C: FromIterator<I> + Send + 'static>(self) -> StreamOutput<C> {
2225        let output = StreamOutputRef::default();
2226        self.repartition(Replication::Host, NextStrategy::all())
2227            .add_operator(|prev| Collect::new(prev, output.clone()))
2228            .finalize_block();
2229        StreamOutput::from(output)
2230    }
2231
2232    /// Collect the output of the stream to a [StreamCache] that can later be resumed to
2233    /// create a [Stream] with its content. Returns the cache and consumes the stream.
2234    ///
2235    /// To resume the cache, create a new [StreamContext](crate::StreamContext) with the **same**
2236    /// [RuntimeConfig](crate::RuntimeConfig) and call the [StreamCache::stream_in] method.
2237    ///
2238    /// **Warning**: [StreamCache] methods must only be called **after** the original `StreamContext`
2239    /// has finished executing. Calling `stream_in` or `inner_cloned` on an incomplete cache will panic!
2240    ///
2241    /// ## Example
2242    ///
2243    /// ```
2244    /// # use renoir::prelude::*;
2245    /// use renoir::operator::cache::VecCacher;
2246    /// let mut ctx = StreamContext::new_local();    ///
2247    /// // Create a cached stream by applying filters and caching the results
2248    /// let cache = ctx.stream_iter(0..10).filter(|x| x % 3 == 0).collect_cache::<VecCacher<_>>(());
2249    ///
2250    /// // Execute the stream interactively to capture and store the results in the cache
2251    /// ctx.execute_blocking();
2252    ///
2253    /// // Further process the cached stream, applying additional filters and collecting the results
2254    /// let ctx = StreamContext::new(cache.config());
2255    /// let res = cache.stream_in(&ctx).filter(|x| x % 2 == 0).collect_vec();
2256    ///
2257    /// // Execute the environment to finalize the processing
2258    /// ctx.execute_blocking();
2259    ///
2260    /// // Assert the final result matches the expected values
2261    /// let expected = (0..10).filter(|x| x % 3 == 0 && x % 2 == 0).collect::<Vec<_>>();
2262    /// assert_eq!(expected, res.get().unwrap());
2263    /// ```
2264    pub fn collect_cache<C: Cacher<I> + 'static>(self, config: C::Config) -> CachedStream<I, C> {
2265        let rt_config = self.ctx.lock().config.clone();
2266        let replication = self.block.scheduling.replication;
2267        let output = CacheRegistry::<I, C::Handle>::new();
2268        self.add_operator(|prev| {
2269            CacheSink::<I, C, _>::new(prev, rt_config.clone(), config, output.clone())
2270        })
2271        .finalize_block();
2272        CachedStream::new(rt_config, replication, output)
2273    }
2274
2275    /// Collect the output of the stream to a [StreamCache] that can later be resumed to
2276    /// create a [Stream] with its content. Returns the cache and consumes the stream.
2277    ///
2278    /// **See [Stream::collect_cache]**
2279    pub fn collect_cache_vec(self) -> CachedStream<I, VecCacher<I>> {
2280        self.collect_cache(())
2281    }
2282
2283    /// Collect the output of the stream to a [StreamCache] that can later be resumed to
2284    /// create a [Stream] with its content. Returns the cache and a copy of the current stream.
2285    ///
2286    /// To resume the cache, create a new [StreamContext](crate::StreamContext) with the **same**
2287    /// [RuntimeConfig](crate::RuntimeConfig) and call the [StreamCache::stream_in] method.
2288    ///
2289    /// **Warning**: [StreamCache] methods must only be called **after** the original `StreamContext`
2290    /// has finished executing. Calling `stream_in` or `inner_cloned` on an incomplete cache will panic!
2291    /// ## Example
2292    /// ```
2293    /// # use renoir::prelude::*;
2294    /// use renoir::operator::cache::VecCacher;
2295    /// let ctx = StreamContext::new_local();
2296    ///
2297    /// // Create a cached stream by applying filters and caching the results
2298    /// let (cache, stream) = ctx.stream_iter(0..10).filter(|x| x % 3 == 0).cache::<VecCacher<_>>(());
2299    ///
2300    /// // Further process the cached stream, applying additional filters and collecting the results
2301    /// let result = stream.filter(|x| x % 2 == 0).collect_vec();
2302    ///
2303    /// // Execute the environment to finalize the processing
2304    /// ctx.execute_blocking();
2305    ///
2306    /// // Assert the final result matches the expected values
2307    /// assert_eq!(result.get().unwrap(), (0..10).filter(|x| x % 3 == 0 && x % 2 == 0).collect::<Vec<_>>());
2308    ///
2309    /// // Further process the cached stream, applying additional filters and collecting the results
2310    /// let ctx = StreamContext::new(cache.config());
2311    /// let res = cache.stream_in(&ctx).filter(|x| x % 2 == 0).collect_vec();
2312    ///
2313    /// // Execute the environment to finalize the processing
2314    /// ctx.execute_blocking();
2315    ///
2316    /// // Assert the final result matches the expected values
2317    /// let expected = (0..10).filter(|x| x % 3 == 0 && x % 2 == 0).collect::<Vec<_>>();
2318    /// assert_eq!(expected, res.get().unwrap());
2319    /// ```
2320    pub fn cache<C: Cacher<I> + 'static>(
2321        self,
2322        config: C::Config,
2323    ) -> (CachedStream<I, C>, Stream<impl Operator<Out = Op::Out>>) {
2324        let mut splits = self.split(2);
2325        (
2326            splits.pop().unwrap().collect_cache(config),
2327            splits.pop().unwrap(),
2328        )
2329    }
2330
2331    /// Collect the output of the stream to a [StreamCache] that can later be resumed to
2332    /// create a [Stream] with its content. Returns the cache and a copy of the current stream.
2333    ///
2334    /// **See [Stream::cache]**
2335    pub fn cache_vec(
2336        self,
2337    ) -> (
2338        CachedStream<I, VecCacher<I>>,
2339        Stream<impl Operator<Out = Op::Out>>,
2340    ) {
2341        self.cache(())
2342    }
2343}
2344
2345impl<Op> Stream<Op>
2346where
2347    Op: Operator + 'static,
2348    Op::Out: Clone + Hash + Eq + Sync,
2349{
2350    /// Map the elements of the stream into new elements by evaluating a future for each one.
2351    /// Use memoization to cache outputs for previously seen inputs.
2352    ///
2353    /// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
2354    /// The maximum number of elements to be cached is passed as the `capacity` parameter.
2355    ///
2356    /// ## Example
2357    ///
2358    /// ```
2359    /// # use renoir::{StreamContext, RuntimeConfig};
2360    /// # use renoir::operator::source::IteratorSource;
2361    /// # tokio::runtime::Runtime::new()
2362    /// #    .unwrap()
2363    /// #    .block_on(base());
2364    /// # async fn base() {
2365    /// #    let mut env = StreamContext::new_local();
2366    /// let s = env.stream_iter((0..4).cycle().take(10));
2367    /// let res = s.map_async_memo(|n| async move {n * n}, 100).collect_vec();
2368    /// env.execute().await;
2369    /// assert_eq!(res.get().unwrap(), vec![0, 1, 4, 9, 0, 1, 4, 9, 0, 1]);
2370    /// # }
2371    /// ```
2372    #[cfg(feature = "tokio")]
2373    pub fn map_async_memo<O: Clone + Send + Sync + 'static, F, Fut>(
2374        self,
2375        f: F,
2376        capacity: usize,
2377    ) -> Stream<impl Operator<Out = O>>
2378    where
2379        F: Fn(Op::Out) -> Fut + Send + Sync + Clone + 'static,
2380        Fut: Future<Output = O> + Send,
2381    {
2382        self.map_async_memo_by(f, |x: &Op::Out| x.clone(), capacity)
2383    }
2384}
2385
2386impl<Op> Stream<Op>
2387where
2388    Op::Out: Clone + Hash + Eq + Sync,
2389    Op: Operator + 'static,
2390{
2391    /// Map the elements of the stream into new elements. Use memoization
2392    /// to cache outputs for previously seen inputs.
2393    ///
2394    /// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
2395    /// The maximum number of elements to be cached is passed as the `capacity` parameter.
2396    ///
2397    /// ## Example
2398    ///
2399    /// ```
2400    /// # use renoir::{StreamContext, RuntimeConfig};
2401    /// # use renoir::operator::source::IteratorSource;
2402    /// # let mut env = StreamContext::new_local();
2403    /// let s = env.stream_iter((0..4).cycle().take(10));
2404    /// let res = s.map_memo(|n| n * n, 5).collect_vec();
2405    ///
2406    /// env.execute_blocking();
2407    ///
2408    /// assert_eq!(res.get().unwrap(), vec![0, 1, 4, 9, 0, 1, 4, 9, 0, 1]);
2409    /// ```
2410    pub fn map_memo<O: Data + Sync, F>(
2411        self,
2412        f: F,
2413        capacity: usize,
2414    ) -> Stream<impl Operator<Out = O>>
2415    where
2416        F: Fn(Op::Out) -> O + Send + Clone + 'static,
2417    {
2418        self.add_operator(|prev| MapMemo::new(prev, f, |x| x.clone(), capacity))
2419    }
2420}
2421
2422impl<Op, K, I> KeyedStream<Op>
2423where
2424    K: DataKey,
2425    I: Send + 'static,
2426    Op: Operator<Out = (K, I)> + 'static,
2427{
2428    /// Given a keyed stream without timestamps nor watermarks, tag each item with a timestamp and insert
2429    /// watermarks.
2430    ///
2431    /// The two functions given to this operator are the following:
2432    /// - `timestamp_gen` returns the timestamp assigned to the provided element of the stream
2433    /// - `watermark_gen` returns an optional watermark to add after the provided element
2434    ///
2435    /// Note that the two functions **must** follow the watermark semantics.
2436    /// TODO: link to watermark semantics
2437    ///
2438    /// ## Example
2439    ///
2440    /// In this example the stream contains the integers from 0 to 9 and group them by parity, each will be tagged with a
2441    /// timestamp with the value of the item as milliseconds, and after each even number a watermark
2442    /// will be inserted.
2443    ///
2444    /// ```
2445    /// # use renoir::{StreamContext, RuntimeConfig};
2446    /// # use renoir::operator::source::IteratorSource;
2447    /// use renoir::operator::Timestamp;
2448    /// # let mut env = StreamContext::new_local();
2449    ///
2450    /// let s = env.stream_iter(0..10);
2451    /// s
2452    ///     .group_by(|i| i % 2)
2453    ///     .add_timestamps(
2454    ///     |&(_k, n)| n,
2455    ///     |&(_k, n), &ts| if n % 2 == 0 { Some(ts) } else { None }
2456    /// );
2457    /// ```
2458    #[cfg(feature = "timestamp")]
2459    pub fn add_timestamps<F, G>(
2460        self,
2461        timestamp_gen: F,
2462        watermark_gen: G,
2463    ) -> KeyedStream<impl Operator<Out = Op::Out>>
2464    where
2465        F: FnMut(&Op::Out) -> Timestamp + Clone + Send + 'static,
2466        G: FnMut(&Op::Out, &Timestamp) -> Option<Timestamp> + Clone + Send + 'static,
2467    {
2468        self.add_operator(|prev| AddTimestamp::new(prev, timestamp_gen, watermark_gen))
2469    }
2470
2471    #[cfg(feature = "timestamp")]
2472    pub fn drop_timestamps(self) -> KeyedStream<impl Operator<Out = Op::Out>> {
2473        self.add_operator(|prev| DropTimestamp::new(prev))
2474    }
2475
2476    /// Change the batch mode for this stream.
2477    ///
2478    /// This change will be propagated to all the operators following, even of the next blocks,
2479    /// until it's changed again.
2480    ///
2481    /// ## Example
2482    ///
2483    /// ```
2484    /// # use renoir::{StreamContext, RuntimeConfig};
2485    /// # use renoir::operator::source::IteratorSource;
2486    /// use renoir::BatchMode;
2487    /// # let mut env = StreamContext::new_local();
2488    ///
2489    /// let s = env.stream_iter(0..10).group_by(|&n| n % 2);
2490    /// s.batch_mode(BatchMode::fixed(1024));
2491    /// ```
2492    pub fn batch_mode(mut self, batch_mode: BatchMode) -> Self {
2493        self.0.block.batch_mode = batch_mode;
2494        self
2495    }
2496
2497    /// Remove from the stream all the elements for which the provided function returns `None` and
2498    /// keep the elements that returned `Some(_)`.
2499    ///
2500    /// **Note**: this is very similar to [`Iteartor::filter_map`](std::iter::Iterator::filter_map)
2501    ///
2502    /// ## Example
2503    ///
2504    /// ```
2505    /// # use renoir::{StreamContext, RuntimeConfig};
2506    /// # use renoir::operator::source::IteratorSource;
2507    /// # let mut env = StreamContext::new_local();
2508    /// let s = env.stream_iter(0..10).group_by(|&n| n % 2);
2509    /// let res = s.filter_map(|(_key, n)| if n % 3 == 0 { Some(n * 4) } else { None }).collect_vec();
2510    ///
2511    /// env.execute_blocking();
2512    ///
2513    /// let mut res = res.get().unwrap();
2514    /// res.sort_unstable();
2515    /// assert_eq!(res, vec![(0, 0), (0, 24), (1, 12), (1, 36)]);
2516    /// ```
2517    pub fn filter_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2518    where
2519        F: Fn((&K, I)) -> Option<O> + Send + Clone + 'static,
2520        O: Send + 'static,
2521    {
2522        self.map(f)
2523            .filter(|(_, x)| x.is_some())
2524            .map(|(_, x)| x.unwrap())
2525    }
2526
2527    /// Remove from the stream all the elements for which the provided predicate returns `false`.
2528    ///
2529    /// **Note**: this is very similar to [`Iteartor::filter`](std::iter::Iterator::filter)
2530    ///
2531    /// ## Example
2532    ///
2533    /// ```
2534    /// # use renoir::{StreamContext, RuntimeConfig};
2535    /// # use renoir::operator::source::IteratorSource;
2536    /// # let mut env = StreamContext::new_local();
2537    /// let s = env.stream_iter(0..10).group_by(|&n| n % 2);
2538    /// let res = s.filter(|&(_key, n)| n % 3 == 0).collect_vec();
2539    ///
2540    /// env.execute_blocking();
2541    ///
2542    /// let mut res = res.get().unwrap();
2543    /// res.sort_unstable();
2544    /// assert_eq!(res, vec![(0, 0), (0, 6), (1, 3), (1, 9)]);
2545    /// ```
2546    pub fn filter<F>(self, predicate: F) -> KeyedStream<impl Operator<Out = (K, I)>>
2547    where
2548        F: Fn(&(K, I)) -> bool + Clone + Send + 'static,
2549    {
2550        self.add_operator(|prev| Filter::new(prev, predicate))
2551    }
2552
2553    /// Apply a mapping operation to each element of the stream, the resulting stream will be the
2554    /// flatMaped values of the result of the mapping.
2555    ///
2556    /// **Note**: this is very similar to [`Iteartor::flat_map`](std::iter::Iterator::flat_map).
2557    ///
2558    /// ## Example
2559    ///
2560    /// ```
2561    /// # use renoir::{StreamContext, RuntimeConfig};
2562    /// # use renoir::operator::source::IteratorSource;
2563    /// # let mut env = StreamContext::new_local();
2564    /// let s = env.stream_iter(0..3).group_by(|&n| n % 2);
2565    /// let res = s.flat_map(|(_key, n)| vec![n, n]).collect_vec();
2566    ///
2567    /// env.execute_blocking();
2568    ///
2569    /// let mut res = res.get().unwrap();
2570    /// res.sort_unstable();
2571    /// assert_eq!(res, vec![(0, 0), (0, 0), (0, 2), (0, 2), (1, 1), (1, 1)]);
2572    /// ```
2573    pub fn flat_map<O, It, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2574    where
2575        It: IntoIterator<Item = O>,
2576        <It as IntoIterator>::IntoIter: Send + 'static,
2577        F: Fn(Op::Out) -> It + Send + Clone + 'static,
2578        O: Data,
2579        It: 'static,
2580    {
2581        self.add_operator(|prev| KeyedFlatMap::new(prev, f))
2582    }
2583
2584    /// Apply the given function to all the elements of the stream, consuming the stream.
2585    ///
2586    /// ## Example
2587    ///
2588    /// ```
2589    /// # use renoir::{StreamContext, RuntimeConfig};
2590    /// # use renoir::operator::source::IteratorSource;
2591    /// # let mut env = StreamContext::new_local();
2592    /// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
2593    /// s.inspect(|(key, n)| println!("Item: {} has key {}", n, key)).for_each(std::mem::drop);
2594    ///
2595    /// env.execute_blocking();
2596    /// ```
2597    pub fn inspect<F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, I)>>
2598    where
2599        F: FnMut(&(K, I)) + Send + Clone + 'static,
2600    {
2601        self.add_operator(|prev| Inspect::new(prev, f))
2602    }
2603
2604    /// Perform the folding operation separately for each key.
2605    ///
2606    /// Note that there is a difference between `stream.group_by(keyer).fold(...)` and
2607    /// `stream.group_by_fold(keyer, ...)`. The first performs the network shuffle of every item in
2608    /// the stream, and **later** performs the folding (i.e. nearly all the elements will be sent to
2609    /// the network). The latter avoids sending the items by performing first a local reduction on
2610    /// each host, and then send only the locally folded results (i.e. one message per replica, per
2611    /// key); then the global step is performed aggregating the results.
2612    ///
2613    /// The resulting stream will still be keyed and will contain only a single message per key (the
2614    /// final result).
2615    ///
2616    /// Note that the output type may be different from the input type. Consider using
2617    /// [`KeyedStream::reduce`] if the output type is the same as the input type.
2618    ///
2619    /// **Note**: this operator will retain all the messages of the stream and emit the values only
2620    /// when the stream ends. Therefore this is not properly _streaming_.
2621    ///
2622    /// **Note**: this operator will split the current block.
2623    ///
2624    /// ## Example
2625    ///
2626    /// ```
2627    /// # use renoir::{StreamContext, RuntimeConfig};
2628    /// # use renoir::operator::source::IteratorSource;
2629    /// # let mut env = StreamContext::new_local();
2630    /// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
2631    /// let res = s
2632    ///     .fold(0, |acc, value| *acc += value)
2633    ///     .collect_vec();
2634    ///
2635    /// env.execute_blocking();
2636    ///
2637    /// let mut res = res.get().unwrap();
2638    /// res.sort_unstable();
2639    /// assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
2640    /// ```
2641    pub fn fold<O, F>(self, init: O, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2642    where
2643        F: Fn(&mut O, <Op::Out as KeyedItem>::Value) + Send + Clone + 'static,
2644        O: Send + Clone,
2645    {
2646        self.add_operator(|prev| KeyedFold::new(prev, init, f))
2647    }
2648
2649    /// Perform the reduction operation separately for each key.
2650    ///
2651    /// Note that there is a difference between `stream.group_by(keyer).reduce(...)` and
2652    /// `stream.group_by_reduce(keyer, ...)`. The first performs the network shuffle of every item in
2653    /// the stream, and **later** performs the reduction (i.e. nearly all the elements will be sent to
2654    /// the network). The latter avoids sending the items by performing first a local reduction on
2655    /// each host, and then send only the locally reduced results (i.e. one message per replica, per
2656    /// key); then the global step is performed aggregating the results.
2657    ///
2658    /// The resulting stream will still be keyed and will contain only a single message per key (the
2659    /// final result).
2660    ///
2661    /// Note that the output type must be the same as the input type, if you need a different type
2662    /// consider using [`KeyedStream::fold`].
2663    ///
2664    /// **Note**: this operator will retain all the messages of the stream and emit the values only
2665    /// when the stream ends. Therefore this is not properly _streaming_.
2666    ///
2667    /// **Note**: this operator will split the current block.
2668    ///
2669    /// ## Example
2670    ///
2671    /// ```
2672    /// # use renoir::{StreamContext, RuntimeConfig};
2673    /// # use renoir::operator::source::IteratorSource;
2674    /// # let mut env = StreamContext::new_local();
2675    /// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
2676    /// let res = s
2677    ///     .reduce(|acc, value| *acc += value)
2678    ///     .collect_vec();
2679    ///
2680    /// env.execute_blocking();
2681    ///
2682    /// let mut res = res.get().unwrap();
2683    /// res.sort_unstable();
2684    /// assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
2685    /// ```
2686    pub fn reduce<F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, I)>>
2687    where
2688        I: Clone + 'static,
2689        F: Fn(&mut I, I) + Send + Clone + 'static,
2690    {
2691        self.fold(None, move |acc, value| match acc {
2692            None => *acc = Some(value),
2693            Some(acc) => f(acc, value),
2694        })
2695        .map(|(_, value)| value.unwrap())
2696    }
2697
2698    /// Map the elements of the stream into new elements.
2699    ///
2700    /// **Note**: this is very similar to [`Iteartor::map`](std::iter::Iterator::map).
2701    ///
2702    /// ## Example
2703    ///
2704    /// ```
2705    /// # use renoir::{StreamContext, RuntimeConfig};
2706    /// # use renoir::operator::source::IteratorSource;
2707    /// # let mut env = StreamContext::new_local();
2708    /// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
2709    /// let res = s.map(|(_key, n)| 10 * n).collect_vec();
2710    ///
2711    /// env.execute_blocking();
2712    ///
2713    /// let mut res = res.get().unwrap();
2714    /// res.sort_unstable();
2715    /// assert_eq!(res, vec![(0, 0), (0, 20), (0, 40), (1, 10), (1, 30)]);
2716    /// ```
2717    pub fn map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2718    where
2719        F: Fn((&K, I)) -> O + Send + Clone + 'static,
2720        O: Send,
2721    {
2722        self.add_operator(|prev| {
2723            Map::new(prev, move |(k, v)| {
2724                let mapped_value = f((&k, v));
2725                (k, mapped_value)
2726            })
2727        })
2728    }
2729
2730    /// # TODO
2731    /// Reorder timestamped items
2732    pub fn reorder(self) -> KeyedStream<impl Operator<Out = (K, I)>> {
2733        self.add_operator(|prev| Reorder::new(prev))
2734    }
2735
2736    /// Map the elements of the stream into new elements. The mapping function can be stateful.
2737    ///
2738    /// This is exactly like [`Stream::rich_map`], but the function is cloned for each key. This
2739    /// means that each key will have a unique mapping function (and therefore a unique state).
2740    pub fn rich_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2741    where
2742        F: FnMut((&K, I)) -> O + Clone + Send + 'static,
2743        O: Data,
2744    {
2745        self.add_operator(|prev| RichMap::new(prev, f))
2746    }
2747
2748    /// Apply a mapping operation to each element of the stream, the resulting stream will be the
2749    /// flattened values of the result of the mapping. The mapping function can be stateful.
2750    ///
2751    /// This is exactly like [`Stream::rich_flat_map`], but the function is cloned for each key.
2752    /// This means that each key will have a unique mapping function (and therefore a unique state).
2753    pub fn rich_flat_map<O, It, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2754    where
2755        It: IntoIterator<Item = O>,
2756        <It as IntoIterator>::IntoIter: Clone + Send + 'static,
2757        F: FnMut((&K, I)) -> It + Clone + Send + 'static,
2758        O: Data,
2759        It: Data,
2760    {
2761        self.rich_map(f).flatten()
2762    }
2763
2764    /// Remove from the stream all the elements for which the provided function returns `None` and
2765    /// keep the elements that returned `Some(_)`. The mapping function can be stateful.
2766    ///
2767    /// This is exactly like [`Stream::rich_filter_map`], but the function is cloned for each key.
2768    /// This means that each key will have a unique mapping function (and therefore a unique state).
2769    pub fn rich_filter_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2770    where
2771        F: FnMut((&K, I)) -> Option<O> + Send + Clone + 'static,
2772        O: Data,
2773    {
2774        self.rich_map(f)
2775            .filter(|(_, x)| x.is_some())
2776            .map(|(_, x)| x.unwrap())
2777    }
2778
2779    /// Map the elements of the stream into new elements. The mapping function can be stateful.
2780    ///
2781    /// This is exactly like [`Stream::rich_map`], but the function is cloned for each key. This
2782    /// means that each key will have a unique mapping function (and therefore a unique state).
2783    pub fn rich_map_custom<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
2784    where
2785        F: FnMut(ElementGenerator<Op>) -> StreamElement<O> + Clone + Send + 'static,
2786        O: Data,
2787    {
2788        self.0.add_operator(|prev| RichMapCustom::new(prev, f))
2789    }
2790
2791    /// Make this [`KeyedStream`] a normal [`Stream`] of key-value pairs.
2792    ///
2793    /// ## Example
2794    ///
2795    /// ```
2796    /// # use renoir::{StreamContext, RuntimeConfig};
2797    /// # use renoir::operator::source::IteratorSource;
2798    /// # let mut env = StreamContext::new_local();
2799    /// let stream = env.stream_iter(0..4).group_by(|&n| n % 2);
2800    /// let res = stream.unkey().collect_vec();
2801    ///
2802    /// env.execute_blocking();
2803    ///
2804    /// let mut res = res.get().unwrap();
2805    /// res.sort_unstable(); // the output order is nondeterministic
2806    /// assert_eq!(res, vec![(0, 0), (0, 2), (1, 1), (1, 3)]);
2807    /// ```
2808    pub fn unkey(self) -> Stream<impl Operator<Out = (K, I)>> {
2809        self.0
2810    }
2811
2812    /// Forget about the key of this [`KeyedStream`] and return a [`Stream`] containing just the
2813    /// values.
2814    ///
2815    /// ## Example
2816    ///
2817    /// ```
2818    /// # use renoir::{StreamContext, RuntimeConfig};
2819    /// # use renoir::operator::source::IteratorSource;
2820    /// # let mut env = StreamContext::new_local();
2821    /// let stream = env.stream_iter(0..4).group_by(|&n| n % 2);
2822    /// let res = stream.drop_key().collect_vec();
2823    ///
2824    /// env.execute_blocking();
2825    ///
2826    /// let mut res = res.get().unwrap();
2827    /// res.sort_unstable(); // the output order is nondeterministic
2828    /// assert_eq!(res, (0..4).collect::<Vec<_>>());
2829    /// ```
2830    pub fn drop_key(self) -> Stream<impl Operator<Out = I>> {
2831        self.0.map(|(_k, v)| v)
2832    }
2833
2834    /// Apply the given function to all the elements of the stream, consuming the stream.
2835    ///
2836    /// ## Example
2837    ///
2838    /// ```
2839    /// # use renoir::{StreamContext, RuntimeConfig};
2840    /// # use renoir::operator::source::IteratorSource;
2841    /// # let mut env = StreamContext::new_local();
2842    /// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
2843    /// s.for_each(|(key, n)| println!("Item: {} has key {}", n, key));
2844    ///
2845    /// env.execute_blocking();
2846    /// ```
2847    pub fn for_each<F>(self, f: F)
2848    where
2849        F: FnMut((K, I)) + Send + Clone + 'static,
2850    {
2851        self.0
2852            .add_operator(|prev| ForEach::new(prev, f))
2853            .finalize_block();
2854    }
2855}
2856
2857impl<K, I, Op> KeyedStream<Op>
2858where
2859    Op: Operator<Out = (K, I)> + 'static,
2860    K: ExchangeDataKey,
2861    I: ExchangeData,
2862{
2863    /// Given two streams **with timestamps** join them according to an interval centered around the
2864    /// timestamp of the left side.
2865    ///
2866    /// This means that an element on the left side with timestamp T will be joined to all the
2867    /// elements on the right with timestamp Q such that `T - lower_bound <= Q <= T + upper_bound`.
2868    /// Only items with the same key can be joined together.
2869    ///
2870    /// **Note**: this operator will split the current block.
2871    ///
2872    /// ## Example
2873    /// TODO: example
2874    #[cfg(feature = "timestamp")]
2875    pub fn interval_join<I2, Op2>(
2876        self,
2877        right: KeyedStream<Op2>,
2878        lower_bound: Timestamp,
2879        upper_bound: Timestamp,
2880    ) -> KeyedStream<impl Operator<Out = (K, (I, I2))>>
2881    where
2882        I2: ExchangeData,
2883        Op2: Operator<Out = (K, I2)> + 'static,
2884    {
2885        self.merge_distinct(right)
2886            .add_operator(Reorder::new)
2887            .add_operator(|prev| IntervalJoin::new(prev, lower_bound, upper_bound))
2888    }
2889
2890    /// Merge the items of this stream with the items of another stream with the same type.
2891    ///
2892    /// **Note**: the order of the resulting items is not specified.
2893    ///
2894    /// **Note**: this operator will split the current block.
2895    ///
2896    /// ## Example
2897    ///
2898    /// ```
2899    /// # use renoir::{StreamContext, RuntimeConfig};
2900    /// # use renoir::operator::source::IteratorSource;
2901    /// # let mut env = StreamContext::new_local();
2902    /// let s1 = env.stream_iter(0..3).group_by(|&n| n % 2);
2903    /// let s2 = env.stream_iter(3..5).group_by(|&n| n % 2);
2904    /// let res = s1.merge(s2).collect_vec();
2905    ///
2906    /// env.execute_blocking();
2907    ///
2908    /// let mut res = res.get().unwrap();
2909    /// res.sort_unstable(); // the output order is nondeterministic
2910    /// assert_eq!(res, vec![(0, 0), (0, 2), (0, 4), (1, 1), (1, 3)]);
2911    /// ```
2912    pub fn merge<Op2>(self, oth: KeyedStream<Op2>) -> KeyedStream<impl Operator<Out = (K, I)>>
2913    where
2914        Op2: Operator<Out = (K, I)> + 'static,
2915    {
2916        KeyedStream(self.0.merge(oth.0))
2917    }
2918
2919    pub(crate) fn merge_distinct<I2, Op2>(
2920        self,
2921        right: KeyedStream<Op2>,
2922    ) -> KeyedStream<impl Operator<Out = (K, MergeElement<I, I2>)>>
2923    where
2924        I2: ExchangeData,
2925        Op2: Operator<Out = (K, I2)> + 'static,
2926    {
2927        // map the left and right streams to the same type
2928        let left = self.map(|(_, x)| MergeElement::Left(x));
2929        let right = right.map(|(_, x)| MergeElement::Right(x));
2930
2931        left.merge(right)
2932    }
2933
2934    /// Perform a network shuffle sending the messages to a random replica.
2935    ///
2936    /// This operator returns a `Stream` instead of a `KeyedStream` as after
2937    /// shuffling the messages between replicas, the keyed semantics are lost.
2938    ///
2939    /// **Note**: this operator will split the current block.
2940    ///
2941    /// ## Example
2942    ///
2943    /// ```
2944    /// # use renoir::{StreamContext, RuntimeConfig};
2945    /// # use renoir::operator::source::IteratorSource;
2946    /// # let mut env = StreamContext::new_local();
2947    /// let s = env.stream_iter(0..5);
2948    /// let res = s.shuffle();
2949    /// ```
2950    pub fn shuffle(self) -> Stream<impl Operator<Out = Op::Out>> {
2951        self.0.split_block(End::new, NextStrategy::random())
2952    }
2953
2954    pub fn fold_scan<O, S, L, F>(
2955        self,
2956        init: S,
2957        fold: L,
2958        map: F,
2959    ) -> KeyedStream<impl Operator<Out = (K, O)>>
2960    where
2961        Op::Out: ExchangeData,
2962        I: Send,
2963        K: ExchangeDataKey + Sync,
2964        L: Fn(&K, &mut S, I) + Send + Clone + 'static,
2965        F: Fn(&K, &S, I) -> O + Send + Clone + 'static,
2966        S: ExchangeData + Sync,
2967        O: ExchangeData,
2968    {
2969        #[derive(Serialize, Deserialize, Clone)]
2970        enum TwoPass<I, O> {
2971            First(I),
2972            Second(I),
2973            Output(O),
2974        }
2975
2976        let (state, s) = self.map(|el| TwoPass::First(el.1)).unkey().iterate(
2977            2,
2978            HashMap::<K, S>::default(),
2979            |s, state| {
2980                s.to_keyed()
2981                    .map(move |(k, el)| match el {
2982                        TwoPass::First(el) => TwoPass::Second(el),
2983                        TwoPass::Second(el) => {
2984                            TwoPass::Output((map)(k, state.get().get(k).unwrap(), el))
2985                        }
2986                        TwoPass::Output(_) => unreachable!(),
2987                    })
2988                    .unkey()
2989            },
2990            move |local: &mut HashMap<K, S>, (k, el)| match el {
2991                TwoPass::First(_) => {}
2992                TwoPass::Second(el) => fold(
2993                    &k,
2994                    local.entry(k.clone()).or_insert_with(|| init.clone()),
2995                    el,
2996                ),
2997                TwoPass::Output(_) => {}
2998            },
2999            move |global: &mut HashMap<K, S>, mut local| {
3000                global.extend(local.drain());
3001            },
3002            |_| true,
3003        );
3004
3005        state.for_each(std::mem::drop);
3006        s.to_keyed().map(|(_, t)| match t {
3007            TwoPass::First(_) | TwoPass::Second(_) => unreachable!(),
3008            TwoPass::Output(o) => o,
3009        })
3010    }
3011
3012    pub fn reduce_scan<O, S, F1, F2, R>(
3013        self,
3014        first_map: F1,
3015        reduce: R,
3016        second_map: F2,
3017    ) -> KeyedStream<impl Operator<Out = (K, O)>>
3018    where
3019        Op::Out: ExchangeData,
3020        F1: Fn(&K, I) -> S + Send + Clone + 'static,
3021        F2: Fn(&K, &S, I) -> O + Send + Clone + 'static,
3022        R: Fn(&K, S, S) -> S + Send + Clone + 'static,
3023        K: Sync,
3024        S: ExchangeData + Sync,
3025        O: ExchangeData,
3026    {
3027        self.fold_scan(
3028            None,
3029            move |k, acc: &mut Option<S>, x| {
3030                let map = (first_map)(k, x);
3031                *acc = Some(match acc.take() {
3032                    Some(v) => (reduce)(k, v, map),
3033                    None => map,
3034                });
3035            },
3036            move |k, state, x| (second_map)(k, state.as_ref().unwrap(), x),
3037        )
3038    }
3039
3040    /// Close the stream and send resulting items to a channel on a single host.
3041    ///
3042    /// If the stream is distributed among multiple replicas, parallelism will
3043    /// be set to 1 to gather all results
3044    ///
3045    /// **Note**: the order of items and keys is unspecified.
3046    ///
3047    /// **Note**: this operator will split the current block.
3048    ///
3049    /// ## Example
3050    ///
3051    /// ```
3052    /// # use renoir::{StreamContext, RuntimeConfig};
3053    /// # use renoir::operator::source::IteratorSource;
3054    /// # let mut env = StreamContext::new_local();
3055    /// let s = env.stream_iter(0..10u32);
3056    /// let rx = s.collect_channel();
3057    ///
3058    /// env.execute_blocking();
3059    /// let mut v = Vec::new();
3060    /// while let Ok(x) = rx.recv() {
3061    ///     v.push(x)
3062    /// }
3063    /// assert_eq!(v, (0..10u32).collect::<Vec<_>>());
3064    /// ```
3065    pub fn collect_channel(self) -> Receiver<(K, I)> {
3066        self.unkey().collect_channel()
3067    }
3068    /// Close the stream and send resulting items to a channel on each single host.
3069    ///
3070    /// Each host sends its outputs to the channel without repartitioning.
3071    /// Elements will be sent to the channel on the same host that produced
3072    /// the output.
3073    ///
3074    /// **Note**: the order of items and keys is unspecified.
3075    ///
3076    /// ## Example
3077    ///
3078    /// ```
3079    /// # use renoir::{StreamContext, RuntimeConfig};
3080    /// # use renoir::operator::source::IteratorSource;
3081    /// # let mut env = StreamContext::new_local();
3082    /// let s = env.stream_iter(0..10u32);
3083    /// let rx = s.collect_channel();
3084    ///
3085    /// env.execute_blocking();
3086    /// let mut v = Vec::new();
3087    /// while let Ok(x) = rx.recv() {
3088    ///     v.push(x)
3089    /// }
3090    /// assert_eq!(v, (0..10u32).collect::<Vec<_>>());
3091    /// ```
3092    pub fn collect_channel_parallel(self) -> Receiver<(K, I)> {
3093        self.unkey().collect_channel_parallel()
3094    }
3095
3096    /// Close the stream and store all the resulting items into a [`Vec`] on a single host.
3097    ///
3098    /// If the stream is distributed among multiple replicas, a bottleneck is placed where all the
3099    /// replicas sends the items to.
3100    ///
3101    /// **Note**: the collected items are the pairs `(key, value)`.
3102    ///
3103    /// **Note**: the order of items and keys is unspecified.
3104    ///
3105    /// **Note**: this operator will split the current block.
3106    ///
3107    /// ## Example
3108    ///
3109    /// ```
3110    /// # use renoir::{StreamContext, RuntimeConfig};
3111    /// # use renoir::operator::source::IteratorSource;
3112    /// # let mut env = StreamContext::new_local();
3113    /// let s = env.stream_iter(0..3).group_by(|&n| n % 2);
3114    /// let res = s.collect_vec();
3115    ///
3116    /// env.execute_blocking();
3117    ///
3118    /// let mut res = res.get().unwrap();
3119    /// res.sort_unstable(); // the output order is nondeterministic
3120    /// assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
3121    /// ```
3122    pub fn collect_vec(self) -> StreamOutput<Vec<(K, I)>> {
3123        self.unkey().collect_vec()
3124    }
3125
3126    /// Close the stream and store all the resulting items into replicated [`Vec`] on all hosts.
3127    ///
3128    /// If the stream is distributed among multiple replicas, a bottleneck is placed where all the
3129    /// replicas sends the items to.
3130    ///
3131    /// **Note**: the collected items are the pairs `(key, value)`.
3132    ///
3133    /// **Note**: the order of items and keys is unspecified.
3134    ///
3135    /// **Note**: this operator will split the current block.
3136    ///
3137    /// ## Example
3138    ///
3139    /// ```
3140    /// # use renoir::{StreamContext, RuntimeConfig};
3141    /// # use renoir::operator::source::IteratorSource;
3142    /// # let mut env = StreamContext::new_local();
3143    /// let s = env.stream_iter(0..3).group_by(|&n| n % 2);
3144    /// let res = s.collect_vec_all();
3145    ///
3146    /// env.execute_blocking();
3147    ///
3148    /// let mut res = res.get().unwrap();
3149    /// res.sort_unstable(); // the output order is nondeterministic
3150    /// assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
3151    /// ```
3152    pub fn collect_vec_all(self) -> StreamOutput<Vec<(K, I)>> {
3153        self.unkey().collect_vec_all()
3154    }
3155
3156    /// Close the stream and store all the resulting items into a collection on a single host.
3157    ///
3158    /// If the stream is distributed among multiple replicas, parallelism will
3159    /// be set to 1 to gather all results
3160    ///
3161    ///
3162    /// **Note**: the order of items and keys is unspecified.
3163    ///
3164    /// **Note**: this operator will split the current block.
3165    ///
3166    /// ## Example
3167    ///
3168    /// ```
3169    /// # use renoir::{StreamContext, RuntimeConfig};
3170    /// # use renoir::operator::source::IteratorSource;
3171    /// # let mut env = StreamContext::new_local();
3172    /// let s = env.stream_iter(0..3).group_by(|&n| n % 2);
3173    /// let res = s.collect_vec();
3174    ///
3175    /// env.execute_blocking();
3176    ///
3177    /// let mut res = res.get().unwrap();
3178    /// res.sort_unstable(); // the output order is nondeterministic
3179    /// assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
3180    /// ```
3181    pub fn collect<C: FromIterator<(K, I)> + Send + 'static>(self) -> StreamOutput<C> {
3182        self.unkey().collect()
3183    }
3184
3185    /// Close the stream and store all the resulting items into a collection on each single host.
3186    ///
3187    /// Partitioning will be set to Host and results will be replicated
3188    ///
3189    ///
3190    /// **Note**: the order of items and keys is unspecified.
3191    ///
3192    /// **Note**: this operator will split the current block.
3193    ///
3194    /// ## Example
3195    ///
3196    /// ```
3197    /// # use renoir::{StreamContext, RuntimeConfig};
3198    /// # use renoir::operator::source::IteratorSource;
3199    /// # let mut env = StreamContext::new_local();
3200    /// let s = env.stream_iter(0..3).group_by(|&n| n % 2);
3201    /// let res = s.collect_vec();
3202    ///
3203    /// env.execute_blocking();
3204    ///
3205    /// let mut res = res.get().unwrap();
3206    /// res.sort_unstable(); // the output order is nondeterministic
3207    /// assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
3208    /// ```
3209    pub fn collect_all<C: FromIterator<(K, I)> + Send + 'static>(self) -> StreamOutput<C> {
3210        self.unkey().collect_all()
3211    }
3212}
3213
3214impl<K, I, O, It, Op> KeyedStream<Op>
3215where
3216    K: DataKey,
3217    Op: Operator<Out = (K, I)> + 'static,
3218    It: Iterator<Item = O> + Clone + Send + 'static,
3219    I: Data + IntoIterator<IntoIter = It, Item = It::Item>,
3220    O: Data + Clone,
3221    K: DataKey,
3222{
3223    /// Transform this stream of containers into a stream of all the contained values.
3224    ///
3225    /// **Note**: this is very similar to [`Iteartor::flatten`](std::iter::Iterator::flatten)
3226    ///
3227    /// ## Example
3228    ///
3229    /// ```
3230    /// # use renoir::{StreamContext, RuntimeConfig};
3231    /// # use renoir::operator::source::IteratorSource;
3232    /// # let mut env = StreamContext::new_local();
3233    /// let s = env
3234    ///     .stream_iter((vec![
3235    ///         vec![0, 1, 2],
3236    ///         vec![3, 4, 5],
3237    ///         vec![6, 7]
3238    ///     ].into_iter()))
3239    ///     .group_by(|v| v[0] % 2);
3240    /// let res = s.flatten().collect_vec();
3241    ///
3242    /// env.execute_blocking();
3243    ///
3244    /// let mut res = res.get().unwrap();
3245    /// res.sort_unstable();
3246    /// assert_eq!(res, vec![(0, 0), (0, 1), (0, 2), (0, 6), (0, 7), (1, 3), (1, 4), (1, 5)]);
3247    /// ```
3248    pub fn flatten(self) -> KeyedStream<impl Operator<Out = (K, O)>> {
3249        self.add_operator(|prev| KeyedFlatten::new(prev))
3250    }
3251}