renoir/operator/mod.rs
1//! Operators that can be applied to a stream.
2//!
3//! The actual operator list can be found from the implemented methods of [`Stream`],
4//! [`KeyedStream`], [`crate::WindowedStream`]
5
6use std::cmp::Ordering;
7use std::collections::HashMap;
8use std::fmt::Display;
9use std::hash::Hash;
10use std::ops::{AddAssign, Div};
11
12use cache::{CacheRegistry, CacheSink, CachedStream, Cacher, VecCacher};
13use flume::{unbounded, Receiver};
14#[cfg(feature = "tokio")]
15use futures::Future;
16use limit_sorted::LimitSorted;
17use serde::{Deserialize, Serialize};
18
19pub(crate) use start::*;
20
21pub use rich_map_custom::ElementGenerator;
22
23use crate::block::{group_by_hash, BlockStructure, GroupHasherBuilder, NextStrategy, Replication};
24use crate::scheduler::ExecutionMetadata;
25
26use crate::block::BatchMode;
27use crate::stream::KeyedItem;
28use crate::{KeyedStream, Stream};
29
30#[cfg(feature = "tokio")]
31use self::map_async::MapAsync;
32use self::map_memo::MapMemo;
33use self::sink::collect::Collect;
34use self::sink::collect_channel::CollectChannelSink;
35use self::sink::collect_count::CollectCountSink;
36use self::sink::collect_vec::CollectVecSink;
37use self::sink::for_each::ForEach;
38use self::sink::{StreamOutput, StreamOutputRef};
39#[cfg(feature = "timestamp")]
40use self::{
41 add_timestamps::{AddTimestamp, DropTimestamp},
42 interval_join::IntervalJoin,
43};
44use self::{
45 end::End,
46 filter::Filter,
47 filter_map::FilterMap,
48 flat_map::{FlatMap, KeyedFlatMap},
49 flatten::{Flatten, KeyedFlatten},
50 fold::Fold,
51 inspect::Inspect,
52 key_by::KeyBy,
53 keyed_fold::KeyedFold,
54 map::Map,
55 merge::MergeElement,
56 reorder::Reorder,
57 rich_map::RichMap,
58 rich_map_custom::RichMapCustom,
59 route::RouterBuilder,
60 zip::Zip,
61};
62
63#[cfg(feature = "timestamp")]
64mod add_timestamps;
65mod batch_mode;
66mod boxed;
67pub mod cache;
68pub(crate) mod end;
69mod filter;
70mod filter_map;
71mod flat_map;
72mod flatten;
73mod fold;
74mod inspect;
75#[cfg(feature = "timestamp")]
76mod interval_join;
77pub mod iteration;
78pub mod join;
79mod key_by;
80mod keyed_fold;
81mod limit_sorted;
82mod map;
83#[cfg(feature = "tokio")]
84mod map_async;
85mod map_memo;
86mod merge;
87mod reorder;
88mod replication;
89mod rich_map;
90mod rich_map_custom;
91mod route;
92pub mod sink;
93pub mod source;
94mod start;
95pub mod window;
96mod zip;
97
98/// Marker trait that all the types inside a stream should implement.
99pub trait Data: Clone + Send + 'static {}
100impl<T: Clone + Send + 'static> Data for T {}
101
102/// Marker trait for data types that are used to communicate between different blocks.
103pub trait ExchangeData: Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static {}
104impl<T: Serialize + for<'a> Deserialize<'a> + Clone + Send + 'static> ExchangeData for T {}
105
106/// Marker trait that all the keys should implement.
107pub trait DataKey: Clone + Send + Hash + Eq + 'static {}
108impl<T: Clone + Send + Hash + Eq + 'static> DataKey for T {}
109
110/// Marker trait for key types that are used when communicating between different blocks.
111pub trait ExchangeDataKey: DataKey + ExchangeData {}
112impl<T: DataKey + ExchangeData> ExchangeDataKey for T {}
113
114/// Marker trait for the function that extracts the key out of a type.
115pub trait KeyerFn<Key, Out>: Fn(&Out) -> Key + Clone + Send + 'static {}
116impl<Key, Out, T: Fn(&Out) -> Key + Clone + Send + 'static> KeyerFn<Key, Out> for T {}
117
118/// When using timestamps and watermarks, this type expresses the timestamp of a message or of a
119/// watermark.
120#[cfg(feature = "timestamp")]
121pub type Timestamp = i64;
122
123#[cfg(not(feature = "timestamp"))]
124pub type Timestamp = ();
125
126/// An element of the stream. This is what enters and exits from the operators.
127///
128/// An operator may need to change the content of a `StreamElement` (e.g. a `Map` may change the
129/// value of the `Item`). Usually `Watermark` and `FlushAndRestart` are simply forwarded to the next
130/// operator in the chain.
131///
132/// In general a stream may be composed of a sequence of this kind:
133///
134/// `((Item | Timestamped | Watermark | FlushBatch)* FlushAndRestart)+ Terminate`
135#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
136pub enum StreamElement<Out> {
137 /// A normal element containing just the value of the message.
138 Item(Out),
139 /// Like `Item`, but it's attached with a timestamp, it's used to ensure the ordering of the
140 /// messages.
141 Timestamped(Out, Timestamp),
142 /// When an operator receives a `Watermark` with timestamp `t`, the operator will never see any
143 /// message with timestamp less or equal to `t`.
144 Watermark(Timestamp),
145 /// Flush the internal batch since there will be too much delay till the next message to come.
146 FlushBatch,
147 /// The stream has ended, and the operators should exit as soon as possible.
148 ///
149 /// No messages should be generated by the operator between a `FlushAndRestart` and a
150 /// `Terminate`.
151 Terminate,
152 /// Mark the end of a stream of data.
153 ///
154 /// Note that this does not mean that the entire stream has ended, for example this is used to
155 /// mark the end of an iteration. Therefore an operator may be prepared to received new data
156 /// after this message, but should not retain the internal state.
157 FlushAndRestart,
158}
159
160/// An operator represents a unit of computation. It's always included inside a chain of operators,
161/// inside a block.
162///
163/// Each operator implements the `Operator<Out>` trait, it produced a stream of `Out` elements.
164///
165/// An `Operator` must be Clone since it is part of a single chain when it's built, but it has to
166/// be cloned to spawn the replicas of the block.
167pub trait Operator: Clone + Send + Display {
168 type Out: Send;
169 /// Setup the operator chain. This is called before any call to `next` and it's used to
170 /// initialize the operator. When it's called the operator has already been cloned and it will
171 /// never be cloned again. Therefore it's safe to store replica-specific metadata inside of it.
172 ///
173 /// It's important that each operator (except the start of a chain) calls `.setup()` recursively
174 /// on the previous operators.
175 fn setup(&mut self, metadata: &mut ExecutionMetadata);
176
177 /// Take a value from the previous operator, process it and return it.
178 fn next(&mut self) -> StreamElement<Self::Out>;
179
180 /// A more refined representation of the operator and its predecessors.
181 fn structure(&self) -> BlockStructure;
182}
183
184impl<Out> StreamElement<Out> {
185 /// Create a new `StreamElement` with an `Item(())` if `self` contains an item, otherwise it
186 /// returns the same variant of `self`.
187 pub fn variant(&self) -> StreamElement<()> {
188 match self {
189 StreamElement::Item(_) => StreamElement::Item(()),
190 StreamElement::Timestamped(_, _) => StreamElement::Item(()),
191 StreamElement::Watermark(w) => StreamElement::Watermark(*w),
192 StreamElement::Terminate => StreamElement::Terminate,
193 StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
194 StreamElement::FlushBatch => StreamElement::FlushBatch,
195 }
196 }
197
198 /// Change the type of the element inside the `StreamElement`.
199 pub fn map<NewOut>(self, f: impl FnOnce(Out) -> NewOut) -> StreamElement<NewOut> {
200 match self {
201 StreamElement::Item(item) => StreamElement::Item(f(item)),
202 StreamElement::Timestamped(item, ts) => StreamElement::Timestamped(f(item), ts),
203 StreamElement::Watermark(w) => StreamElement::Watermark(w),
204 StreamElement::Terminate => StreamElement::Terminate,
205 StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
206 StreamElement::FlushBatch => StreamElement::FlushBatch,
207 }
208 }
209
210 /// Change the type of the element inside the `StreamElement`.
211 #[cfg(feature = "tokio")]
212 pub async fn map_async<NewOut, F, Fut>(self, f: F) -> StreamElement<NewOut>
213 where
214 F: Fn(Out) -> Fut,
215 Fut: Future<Output = NewOut>,
216 {
217 match self {
218 StreamElement::Item(item) => StreamElement::Item(f(item).await),
219 StreamElement::Timestamped(item, ts) => StreamElement::Timestamped(f(item).await, ts),
220 StreamElement::Watermark(w) => StreamElement::Watermark(w),
221 StreamElement::Terminate => StreamElement::Terminate,
222 StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
223 StreamElement::FlushBatch => StreamElement::FlushBatch,
224 }
225 }
226
227 /// A string representation of the variant of this `StreamElement`.
228 pub fn variant_str(&self) -> &'static str {
229 match self {
230 StreamElement::Item(_) => "Item",
231 StreamElement::Timestamped(_, _) => "Timestamped",
232 StreamElement::Watermark(_) => "Watermark",
233 StreamElement::FlushBatch => "FlushBatch",
234 StreamElement::Terminate => "Terminate",
235 StreamElement::FlushAndRestart => "FlushAndRestart",
236 }
237 }
238
239 /// A string representation of the variant of this `StreamElement`.
240 pub fn timestamp(&self) -> Option<&Timestamp> {
241 match self {
242 StreamElement::Timestamped(_, ts) | StreamElement::Watermark(ts) => Some(ts),
243 _ => None,
244 }
245 }
246
247 pub fn add_key<Key>(self, k: Key) -> StreamElement<(Key, Out)> {
248 match self {
249 StreamElement::Item(v) => StreamElement::Item((k, v)),
250 StreamElement::Timestamped(v, ts) => StreamElement::Timestamped((k, v), ts),
251 StreamElement::Watermark(w) => StreamElement::Watermark(w),
252 StreamElement::Terminate => StreamElement::Terminate,
253 StreamElement::FlushAndRestart => StreamElement::FlushAndRestart,
254 StreamElement::FlushBatch => StreamElement::FlushBatch,
255 }
256 }
257
258 pub fn value(&self) -> Option<&Out> {
259 match self {
260 StreamElement::Item(v) => Some(v),
261 StreamElement::Timestamped(v, _) => Some(v),
262 StreamElement::Watermark(_) => None,
263 StreamElement::FlushBatch => None,
264 StreamElement::Terminate => None,
265 StreamElement::FlushAndRestart => None,
266 }
267 }
268}
269
270impl<Key, Out> StreamElement<(Key, Out)> {
271 /// Map a `StreamElement<KeyValue(Key, Out)>` to a `StreamElement<Out>`,
272 /// returning the key if possible
273 pub fn take_key(self) -> (Option<Key>, StreamElement<Out>) {
274 match self {
275 StreamElement::Item((k, v)) => (Some(k), StreamElement::Item(v)),
276 StreamElement::Timestamped((k, v), ts) => (Some(k), StreamElement::Timestamped(v, ts)),
277 StreamElement::Watermark(w) => (None, StreamElement::Watermark(w)),
278 StreamElement::Terminate => (None, StreamElement::Terminate),
279 StreamElement::FlushAndRestart => (None, StreamElement::FlushAndRestart),
280 StreamElement::FlushBatch => (None, StreamElement::FlushBatch),
281 }
282 }
283
284 pub fn key(self) -> Option<Key> {
285 match self {
286 StreamElement::Item((k, _)) => Some(k),
287 StreamElement::Timestamped((k, _), _) => Some(k),
288 StreamElement::Watermark(_) => None,
289 StreamElement::Terminate => None,
290 StreamElement::FlushAndRestart => None,
291 StreamElement::FlushBatch => None,
292 }
293 }
294}
295
296impl<Op> Stream<Op>
297where
298 Op: Operator + 'static,
299{
300 /// Given a stream without timestamps nor watermarks, tag each item with a timestamp and insert
301 /// watermarks.
302 ///
303 /// The two functions given to this operator are the following:
304 /// - `timestamp_gen` returns the timestamp assigned to the provided element of the stream
305 /// - `watermark_gen` returns an optional watermark to add after the provided element
306 ///
307 /// Note that the two functions **must** follow the watermark semantics.
308 /// TODO: link to watermark semantics
309 ///
310 /// ## Example
311 ///
312 /// In this example the stream contains the integers from 0 to 9, each will be tagged with a
313 /// timestamp with the value of the item as milliseconds, and after each even number a watermark
314 /// will be inserted.
315 ///
316 /// ```
317 /// # use renoir::{StreamContext, RuntimeConfig};
318 /// # use renoir::operator::source::IteratorSource;
319 /// use renoir::operator::Timestamp;
320 /// # let mut env = StreamContext::new_local();
321 ///
322 /// let s = env.stream_iter(0..10);
323 /// s.add_timestamps(
324 /// |&n| n,
325 /// |&n, &ts| if n % 2 == 0 { Some(ts) } else { None }
326 /// );
327 /// ```
328 #[cfg(feature = "timestamp")]
329 pub fn add_timestamps<F, G>(
330 self,
331 timestamp_gen: F,
332 watermark_gen: G,
333 ) -> Stream<AddTimestamp<F, G, Op>>
334 where
335 F: FnMut(&Op::Out) -> Timestamp + Clone + Send + 'static,
336 G: FnMut(&Op::Out, &Timestamp) -> Option<Timestamp> + Clone + Send + 'static,
337 {
338 self.add_operator(|prev| AddTimestamp::new(prev, timestamp_gen, watermark_gen))
339 }
340
341 #[cfg(feature = "timestamp")]
342 pub fn drop_timestamps(self) -> Stream<DropTimestamp<Op>> {
343 self.add_operator(|prev| DropTimestamp::new(prev))
344 }
345 /// Change the batch mode for this stream.
346 ///
347 /// This change will be propagated to all the operators following, even of the next blocks,
348 /// until it's changed again.
349 ///
350 /// ## Example
351 ///
352 /// ```
353 /// # use renoir::{StreamContext, RuntimeConfig};
354 /// # use renoir::operator::source::IteratorSource;
355 /// use renoir::BatchMode;
356 /// # let mut env = StreamContext::new_local();
357 ///
358 /// let s = env.stream_iter(0..10);
359 /// s.batch_mode(BatchMode::fixed(1024));
360 /// ```
361 pub fn batch_mode(mut self, batch_mode: BatchMode) -> Self {
362 self.block.batch_mode = batch_mode;
363 self
364 }
365
366 /// Remove from the stream all the elements for which the provided function returns `None` and
367 /// keep the elements that returned `Some(_)`.
368 ///
369 /// **Note**: this is very similar to [`Iteartor::filter_map`](std::iter::Iterator::filter_map)
370 ///
371 /// ## Example
372 ///
373 /// ```
374 /// # use renoir::{StreamContext, RuntimeConfig};
375 /// # use renoir::operator::source::IteratorSource;
376 /// # let mut env = StreamContext::new_local();
377 /// let s = env.stream_iter(0..10);
378 /// let res = s.filter_map(|n| if n % 2 == 0 { Some(n * 3) } else { None }).collect_vec();
379 ///
380 /// env.execute_blocking();
381 ///
382 /// assert_eq!(res.get().unwrap(), vec![0, 6, 12, 18, 24])
383 /// ```
384 pub fn filter_map<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
385 where
386 F: Fn(Op::Out) -> Option<O> + Send + Clone + 'static,
387 O: Data,
388 {
389 self.add_operator(|prev| FilterMap::new(prev, f))
390 }
391
392 /// Remove from the stream all the elements for which the provided predicate returns `false`.
393 ///
394 /// **Note**: this is very similar to [`Iteartor::filter`](std::iter::Iterator::filter)
395 ///
396 /// ## Example
397 ///
398 /// ```
399 /// # use renoir::{StreamContext, RuntimeConfig};
400 /// # use renoir::operator::source::IteratorSource;
401 /// # let mut env = StreamContext::new_local();
402 /// let s = env.stream_iter(0..10);
403 /// let res = s.filter(|&n| n % 2 == 0).collect_vec();
404 ///
405 /// env.execute_blocking();
406 ///
407 /// assert_eq!(res.get().unwrap(), vec![0, 2, 4, 6, 8])
408 /// ```
409 pub fn filter<F>(self, predicate: F) -> Stream<impl Operator<Out = Op::Out>>
410 where
411 F: Fn(&Op::Out) -> bool + Clone + Send + 'static,
412 {
413 self.add_operator(|prev| Filter::new(prev, predicate))
414 }
415
416 /// Reorder timestamped items
417 ///
418 /// # Example
419 /// ### TODO
420 pub fn reorder(self) -> Stream<impl Operator<Out = Op::Out>> {
421 self.add_operator(|prev| Reorder::new(prev))
422 }
423
424 /// Remove from the stream all the elements for which the provided function returns `None` and
425 /// keep the elements that returned `Some(_)`. The mapping function can be stateful.
426 ///
427 /// This is equivalent to [`Stream::filter_map`] but with a stateful function.
428 ///
429 /// Since the mapping function can be stateful, it is a `FnMut`. This allows expressing simple
430 /// algorithms with very few lines of code (see examples).
431 ///
432 /// The mapping function is _cloned_ inside each replica, and they will not share state between
433 /// each other. If you want that only a single replica handles all the items you may want to
434 /// change the parallelism of this operator with [`Stream::replication`].
435 ///
436 /// ## Examples
437 ///
438 /// This will emit only the _positive prefix-sums_.
439 ///
440 /// ```
441 /// # use renoir::{StreamContext, RuntimeConfig};
442 /// # use renoir::operator::source::IteratorSource;
443 /// # let mut env = StreamContext::new_local();
444 /// let s = env.stream_iter((std::array::IntoIter::new([1, 2, -5, 3, 1])));
445 /// let res = s.rich_filter_map({
446 /// let mut sum = 0;
447 /// move |x| {
448 /// sum += x;
449 /// if sum >= 0 {
450 /// Some(sum)
451 /// } else {
452 /// None
453 /// }
454 /// }
455 /// }).collect_vec();
456 ///
457 /// env.execute_blocking();
458 ///
459 /// assert_eq!(res.get().unwrap(), vec![1, 1 + 2, /* 1 + 2 - 5, */ 1 + 2 - 5 + 3, 1 + 2 - 5 + 3 + 1]);
460 /// ```
461 pub fn rich_filter_map<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
462 where
463 F: FnMut(Op::Out) -> Option<O> + Send + Clone + 'static,
464 O: Data,
465 {
466 self.rich_map(f).filter(|x| x.is_some()).map(|x| x.unwrap())
467 }
468
469 /// Map the elements of the stream into new elements. The mapping function can be stateful.
470 ///
471 /// This is equivalent to [`Stream::map`] but with a stateful function.
472 ///
473 /// Since the mapping function can be stateful, it is a `FnMut`. This allows expressing simple
474 /// algorithms with very few lines of code (see examples).
475 ///
476 /// The mapping function is _cloned_ inside each replica, and they will not share state between
477 /// each other. If you want that only a single replica handles all the items you may want to
478 /// change the parallelism of this operator with [`Stream::replication`].
479 ///
480 /// ## Examples
481 ///
482 /// This is a simple implementation of the prefix-sum using a single replica (i.e. each element
483 /// is mapped to the sum of all the elements up to that point). Note that this won't work if
484 /// there are more replicas.
485 ///
486 /// ```
487 /// # use renoir::{StreamContext, RuntimeConfig};
488 /// # use renoir::operator::source::IteratorSource;
489 /// # let mut env = StreamContext::new_local();
490 /// let s = env.stream_iter(1..=5);
491 /// let res = s.rich_map({
492 /// let mut sum = 0;
493 /// move |x| {
494 /// sum += x;
495 /// sum
496 /// }
497 /// }).collect_vec();
498 ///
499 /// env.execute_blocking();
500 ///
501 /// assert_eq!(res.get().unwrap(), vec![1, 1 + 2, 1 + 2 + 3, 1 + 2 + 3 + 4, 1 + 2 + 3 + 4 + 5]);
502 /// ```
503 ///
504 /// This will enumerate all the elements that reach a replica. This is basically equivalent to
505 /// the `enumerate` function in Python.
506 ///
507 /// ```
508 /// # use renoir::{StreamContext, RuntimeConfig};
509 /// # use renoir::operator::source::IteratorSource;
510 /// # let mut env = StreamContext::new_local();
511 /// let s = env.stream_iter(1..=5);
512 /// let res = s.rich_map({
513 /// let mut id = 0;
514 /// move |x| {
515 /// id += 1;
516 /// (id - 1, x)
517 /// }
518 /// }).collect_vec();
519 ///
520 /// env.execute_blocking();
521 ///
522 /// assert_eq!(res.get().unwrap(), vec![(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]);
523 /// ```
524 pub fn rich_map<O, F>(self, mut f: F) -> Stream<impl Operator<Out = O>>
525 where
526 F: FnMut(Op::Out) -> O + Send + Clone + 'static,
527 O: Send + 'static,
528 {
529 self.key_by(|_| ())
530 .add_operator(|prev| RichMap::new(prev, move |(_, value)| f(value)))
531 .drop_key()
532 }
533
534 /// Map the elements of the stream into new elements.
535 ///
536 /// **Note**: this is very similar to [`Iteartor::map`](std::iter::Iterator::map).
537 ///
538 /// ## Example
539 ///
540 /// ```
541 /// # use renoir::{StreamContext, RuntimeConfig};
542 /// # use renoir::operator::source::IteratorSource;
543 /// # let mut env = StreamContext::new_local();
544 /// let s = env.stream_iter(0..5);
545 /// let res = s.map(|n| n * 10).collect_vec();
546 ///
547 /// env.execute_blocking();
548 ///
549 /// assert_eq!(res.get().unwrap(), vec![0, 10, 20, 30, 40]);
550 /// ```
551 pub fn map<O: Send, F>(self, f: F) -> Stream<impl Operator<Out = O>>
552 where
553 F: Fn(Op::Out) -> O + Send + Clone + 'static,
554 {
555 self.add_operator(|prev| Map::new(prev, f))
556 }
557
558 /// Map the elements of the stream into new elements by evaluating a future for each one.
559 /// Use memoization to cache outputs for previously seen inputs.
560 ///
561 /// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
562 /// The maximum number of elements to be cached is passed as the `capacity` parameter.
563 ///
564 /// The outputs are cached according to the key produced by the `fk` function.
565 ///
566 /// ## Example
567 ///
568 /// ```
569 /// # use renoir::{StreamContext, RuntimeConfig};
570 /// # use renoir::operator::source::IteratorSource;
571 /// # tokio::runtime::Runtime::new()
572 /// # .unwrap()
573 /// # .block_on(base());
574 /// # async fn base() {
575 /// # let mut env = StreamContext::new_local();
576 /// let s = env.stream_iter(5..15);
577 /// let res = s.map_async_memo_by(
578 /// |n| async move {(n * n) % 7}, |n| n % 7, 5
579 /// ).collect_vec();
580 /// env.execute().await;
581 /// assert_eq!(res.get().unwrap(), vec![4, 1, 0, 1, 4, 2, 2, 4, 1, 0]);
582 /// # }
583 /// ```
584 #[cfg(feature = "tokio")]
585 pub fn map_async_memo_by<O, K, F, Fk, Fut>(
586 self,
587 f: F,
588 fk: Fk,
589 capacity: usize,
590 ) -> Stream<impl Operator<Out = O>>
591 where
592 F: Fn(Op::Out) -> Fut + Send + Sync + 'static + Clone,
593 Fk: Fn(&Op::Out) -> K + Send + Sync + Clone + 'static,
594 Fut: futures::Future<Output = O> + Send,
595 O: Clone + Send + Sync + 'static,
596 K: DataKey + Sync,
597 {
598 use futures::FutureExt;
599 use quick_cache::{sync::Cache, UnitWeighter};
600 use std::{convert::Infallible, sync::Arc};
601
602 let cache: Arc<Cache<K, O, _, GroupHasherBuilder>> = Arc::new(Cache::with(
603 capacity,
604 capacity as u64,
605 UnitWeighter,
606 Default::default(),
607 Default::default(),
608 ));
609 self.add_operator(|prev| {
610 MapAsync::new(
611 prev,
612 move |el| {
613 let fk = fk.clone();
614 let f = f.clone();
615 let cache = cache.clone();
616 let k = fk(&el);
617 async move {
618 cache
619 .get_or_insert_async(&k, (f)(el).map(Result::Ok::<_, Infallible>))
620 .await
621 .unwrap()
622 }
623 },
624 4,
625 )
626 })
627 }
628
629 /// Map the elements of the stream into new elements by evaluating a future for each one.
630 ///
631 /// ## Example
632 ///
633 /// ```
634 /// # use renoir::{StreamContext, RuntimeConfig};
635 /// # use renoir::operator::source::IteratorSource;
636 /// # tokio::runtime::Runtime::new()
637 /// # .unwrap()
638 /// # .block_on(base());
639 /// # async fn base() {
640 /// # let mut env = StreamContext::new_local();
641 /// let s = env.stream_iter(5..15);
642 /// let res = s.map_async(|n| async move {(n * n) % 7}).collect_vec();
643 /// env.execute().await;
644 /// assert_eq!(res.get().unwrap(), vec![4, 1, 0, 1, 4, 2, 2, 4, 1, 0]);
645 /// # }
646 /// ```
647 #[cfg(feature = "tokio")]
648 pub fn map_async<O: Data, F, Fut>(self, f: F) -> Stream<impl Operator<Out = O>>
649 where
650 F: Fn(Op::Out) -> Fut + Send + Sync + 'static + Clone,
651 Fut: futures::Future<Output = O> + Send + 'static,
652 {
653 self.add_operator(|prev| MapAsync::new(prev, f, 4))
654 }
655
656 /// Map the elements of the stream into new elements. Use memoization
657 /// to cache outputs for previously seen inputs.
658 ///
659 /// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
660 /// The maximum number of elements to be cached is passed as the `capacity` parameter.
661 ///
662 /// The outputs are cached according to the key produced by the `fk` function.
663 ///
664 /// ## Example
665 ///
666 /// ```
667 /// # use renoir::{StreamContext, RuntimeConfig};
668 /// # use renoir::operator::source::IteratorSource;
669 /// # let mut env = StreamContext::new_local();
670 /// let s = env.stream_iter(5..15);
671 /// let res = s.map_memo_by(|n| (n * n) % 7, |n| n % 7, 5).collect_vec();
672 ///
673 /// env.execute_blocking();
674 ///
675 /// assert_eq!(res.get().unwrap(), vec![4, 1, 0, 1, 4, 2, 2, 4, 1, 0]);
676 /// ```
677 pub fn map_memo_by<K: DataKey + Sync, O: Clone + Send + Sync + 'static, F, Fk>(
678 self,
679 f: F,
680 fk: Fk,
681 capacity: usize,
682 ) -> Stream<impl Operator<Out = O>>
683 where
684 F: Fn(Op::Out) -> O + Send + Clone + 'static,
685 Fk: Fn(&Op::Out) -> K + Send + Clone + 'static,
686 {
687 self.add_operator(|prev| MapMemo::new(prev, f, fk, capacity))
688 }
689
690 /// Fold the stream into a stream that emits a single value.
691 ///
692 /// The folding operator consists in adding to the current accumulation value (initially the
693 /// value provided as `init`) the value of the current item in the stream.
694 ///
695 /// The folding function is provided with a mutable reference to the current accumulator and the
696 /// owned item of the stream. The function should modify the accumulator without returning
697 /// anything.
698 ///
699 /// Note that the output type may be different from the input type. Consider using
700 /// [`Stream::reduce`] if the output type is the same as the input type.
701 ///
702 /// **Note**: this operator will retain all the messages of the stream and emit the values only
703 /// when the stream ends. Therefore this is not properly _streaming_.
704 ///
705 /// **Note**: this operator is not parallelized, it creates a bottleneck where all the stream
706 /// elements are sent to and the folding is done using a single thread.
707 ///
708 /// **Note**: this is very similar to [`Iteartor::fold`](std::iter::Iterator::fold).
709 ///
710 /// **Note**: this operator will split the current block.
711 ///
712 /// ## Example
713 ///
714 /// ```
715 /// # use renoir::{StreamContext, RuntimeConfig};
716 /// # use renoir::operator::source::IteratorSource;
717 /// # let mut env = StreamContext::new_local();
718 /// let s = env.stream_iter(0..5);
719 /// let res = s.fold(0, |acc, value| *acc += value).collect_vec();
720 ///
721 /// env.execute_blocking();
722 ///
723 /// assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
724 /// ```
725 pub fn fold<O, F>(self, init: O, f: F) -> Stream<impl Operator<Out = O>>
726 where
727 F: Fn(&mut O, Op::Out) + Send + Clone + 'static,
728 Op::Out: ExchangeData,
729 O: Send + Clone,
730 {
731 self.replication(Replication::One)
732 .add_operator(|prev| Fold::new(prev, init, f))
733 }
734
735 /// Fold the stream into a stream that emits a single value.
736 ///
737 /// The folding operator consists in adding to the current accumulation value (initially the
738 /// value provided as `init`) the value of the current item in the stream.
739 ///
740 /// This method is very similary to [`Stream::fold`], but performs the folding distributely. To
741 /// do so the folding function must be _associative_, in particular the folding process is
742 /// performed in 2 steps:
743 ///
744 /// - `local`: the local function is used to fold the elements present in each replica of the
745 /// stream independently. All those replicas will start with the same `init` value.
746 /// - `global`: all the partial results (the elements produced by the `local` step) have to be
747 /// aggregated into a single result. This is done using the `global` folding function.
748 ///
749 /// Note that the output type may be different from the input type, therefore requireing
750 /// different function for the aggregation. Consider using [`Stream::reduce_assoc`] if the
751 /// output type is the same as the input type.
752 ///
753 /// **Note**: this operator will retain all the messages of the stream and emit the values only
754 /// when the stream ends. Therefore this is not properly _streaming_.
755 ///
756 /// **Note**: this operator will split the current block.
757 ///
758 /// ## Example
759 ///
760 /// ```
761 /// # use renoir::{StreamContext, RuntimeConfig};
762 /// # use renoir::operator::source::IteratorSource;
763 /// # let mut env = StreamContext::new_local();
764 /// let s = env.stream_iter(0..5);
765 /// let res = s.fold_assoc(0, |acc, value| *acc += value, |acc, value| *acc += value).collect_vec();
766 ///
767 /// env.execute_blocking();
768 ///
769 /// assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
770 /// ```
771 pub fn fold_assoc<O, F, G>(self, init: O, local: F, global: G) -> Stream<impl Operator<Out = O>>
772 where
773 F: Fn(&mut O, Op::Out) + Send + Clone + 'static,
774 G: Fn(&mut O, O) + Send + Clone + 'static,
775 O: ExchangeData,
776 {
777 self.add_operator(|prev| Fold::new(prev, init.clone(), local))
778 .replication(Replication::One)
779 .add_operator(|prev| Fold::new(prev, init, global))
780 }
781
782 /// Perform the folding operation separately for each key.
783 ///
784 /// This is equivalent of partitioning the stream using the `keyer` function, and then applying
785 /// [`Stream::fold_assoc`] to each partition separately.
786 ///
787 /// Note however that there is a difference between `stream.group_by(keyer).fold(...)` and
788 /// `stream.group_by_fold(keyer, ...)`. The first performs the network shuffle of every item in
789 /// the stream, and **later** performs the folding (i.e. nearly all the elements will be sent to
790 /// the network). The latter avoids sending the items by performing first a local reduction on
791 /// each host, and then send only the locally folded results (i.e. one message per replica, per
792 /// key); then the global step is performed aggregating the results.
793 ///
794 /// The resulting stream will still be keyed and will contain only a single message per key (the
795 /// final result).
796 ///
797 /// Note that the output type may be different from the input type, therefore requireing
798 /// different function for the aggregation. Consider using [`Stream::group_by_reduce`] if the
799 /// output type is the same as the input type.
800 ///
801 /// **Note**: this operator will retain all the messages of the stream and emit the values only
802 /// when the stream ends. Therefore this is not properly _streaming_.
803 ///
804 /// **Note**: this operator will split the current block.
805 ///
806 /// ## Example
807 /// ```
808 /// # use renoir::{StreamContext, RuntimeConfig};
809 /// # use renoir::operator::source::IteratorSource;
810 /// # let mut env = StreamContext::new_local();
811 /// let s = env.stream_iter(0..5);
812 /// let res = s
813 /// .group_by_fold(|&n| n % 2, 0, |acc, value| *acc += value, |acc, value| *acc += value)
814 /// .collect_vec();
815 ///
816 /// env.execute_blocking();
817 ///
818 /// let mut res = res.get().unwrap();
819 /// res.sort_unstable();
820 /// assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
821 /// ```
822 pub fn group_by_fold<K, O, Fk, F, G>(
823 self,
824 keyer: Fk,
825 init: O,
826 local: F,
827 global: G,
828 ) -> KeyedStream<impl Operator<Out = (K, O)>>
829 where
830 Fk: Fn(&Op::Out) -> K + Send + Clone + 'static,
831 F: Fn(&mut O, Op::Out) + Send + Clone + 'static,
832 G: Fn(&mut O, O) + Send + Clone + 'static,
833 K: ExchangeDataKey,
834 O: ExchangeData,
835 Op::Out: Clone,
836 {
837 // GroupBy based on key
838 let next_strategy = NextStrategy::GroupBy(
839 move |(key, _): &(K, O)| group_by_hash(&key),
840 Default::default(),
841 );
842
843 let new_stream = self
844 // key_by with given keyer
845 .add_operator(|prev| KeyBy::new(prev, keyer.clone()))
846 // local fold
847 .add_operator(|prev| KeyedFold::new(prev, init.clone(), local))
848 // group by key
849 .split_block(End::new, next_strategy)
850 // global fold
851 .add_operator(|prev| KeyedFold::new(prev, init.clone(), global));
852
853 KeyedStream(new_stream)
854 }
855
856 pub fn fold_scan<O, SL, SG, L, G, F>(
857 self,
858 local_fold: L,
859 global_fold: G,
860 global_init: SG,
861 map: F,
862 ) -> Stream<impl Operator<Out = O>>
863 where
864 Op::Out: ExchangeData,
865 L: Fn(&mut SL, Op::Out) + Send + Clone + 'static,
866 G: Fn(&mut SG, SL) + Send + Clone + 'static,
867 F: Fn(Op::Out, &SG) -> O + Send + Clone + 'static,
868 SL: ExchangeData + Default,
869 SG: ExchangeData + Sync,
870 O: ExchangeData,
871 {
872 #[derive(Serialize, Deserialize, Clone)]
873 enum TwoPass<I, O> {
874 First(I),
875 Second(I),
876 Output(O),
877 }
878
879 let (state, s) = self.map(|el| TwoPass::First(el)).iterate(
880 2,
881 None,
882 |s, state| {
883 s.map(move |el| match el {
884 TwoPass::First(el) => TwoPass::Second(el),
885 TwoPass::Second(el) => {
886 TwoPass::Output((map)(el, state.get().as_ref().unwrap()))
887 }
888 TwoPass::Output(_) => unreachable!(),
889 })
890 },
891 move |local: &mut SL, el| match el {
892 TwoPass::First(_) => {}
893 TwoPass::Second(el) => local_fold(local, el),
894 TwoPass::Output(_) => {}
895 },
896 move |global: &mut Option<SG>, local| {
897 global_fold(global.get_or_insert(global_init.clone()), local)
898 },
899 |_| true,
900 );
901
902 state.for_each(std::mem::drop);
903 s.map(|t| match t {
904 TwoPass::First(_) | TwoPass::Second(_) => unreachable!(),
905 TwoPass::Output(o) => o,
906 })
907 }
908
909 pub fn reduce_scan<O, S, F1, F2, R>(
910 self,
911 first_map: F1,
912 reduce: R,
913 second_map: F2,
914 ) -> Stream<impl Operator<Out = O>>
915 where
916 Op::Out: ExchangeData,
917 F1: Fn(Op::Out) -> S + Send + Clone + 'static,
918 F2: Fn(Op::Out, &S) -> O + Send + Clone + 'static,
919 R: Fn(S, S) -> S + Send + Clone + 'static,
920 S: ExchangeData + Sync,
921 O: ExchangeData,
922 {
923 let reduce2 = reduce.clone();
924 self.fold_scan(
925 move |acc: &mut Option<S>, x| {
926 let map = (first_map)(x);
927 let cur = acc.take();
928 *acc = Some(match cur {
929 Some(cur) => (reduce)(cur, map),
930 None => map,
931 });
932 },
933 move |global, local| {
934 let cur = global.take();
935 *global = match (cur, local) {
936 (Some(cur), Some(local)) => Some((reduce2)(cur, local)),
937 (Some(a), None) | (None, Some(a)) => Some(a),
938 (None, None) => None,
939 };
940 },
941 None,
942 move |x, state| (second_map)(x, state.as_ref().unwrap()),
943 )
944 }
945
946 /// Deduplicate elements. The resulting stream will contain exactly one occurrence
947 /// for each unique element in the input stream
948 ///
949 /// The current implementation requires `Hash` and `Eq` and it will repartition the stream
950 /// setting replication to Unlimited
951 pub fn unique_assoc(self) -> Stream<impl Operator<Out = Op::Out>>
952 where
953 Op::Out: Hash + Eq + Clone + ExchangeData + Sync,
954 {
955 use dashmap::DashSet;
956 use std::collections::HashSet;
957 use std::sync::Arc;
958
959 let local_set = Arc::new(DashSet::<_, GroupHasherBuilder>::default());
960 let mut final_set = HashSet::<_, GroupHasherBuilder>::default();
961
962 self.rich_flat_map(move |el| {
963 if !local_set.contains(&el) {
964 local_set.insert(el.clone());
965 Some(el)
966 } else {
967 None
968 }
969 })
970 .repartition_by(Replication::Unlimited, group_by_hash)
971 .rich_flat_map(move |el| {
972 if !final_set.contains(&el) {
973 final_set.insert(el.clone());
974 Some(el)
975 } else {
976 None
977 }
978 })
979 }
980
981 /// Deduplicate elements. The resulting stream will contain exactly one occurrence
982 /// for each unique key computed from the item in the input stream
983 ///
984 /// The current implementation requires `Hash` and `Eq` and it will repartition the stream
985 /// setting replication to Unlimited
986 pub fn unique_assoc_by_key<F, K>(self, f: F) -> Stream<impl Operator<Out = Op::Out>>
987 where
988 Op::Out: Hash + Eq + Clone + ExchangeData + Sync,
989 F: Fn(&Op::Out) -> K + Clone + Send + 'static,
990 K: Hash + Eq + Clone + Send + Sync + 'static,
991 {
992 use dashmap::DashSet;
993 use std::collections::HashSet;
994 use std::sync::Arc;
995 let f2 = f.clone();
996
997 let local_set = Arc::new(DashSet::<_, GroupHasherBuilder>::default());
998 let mut final_set = HashSet::<_, GroupHasherBuilder>::default();
999
1000 self.rich_flat_map(move |el| {
1001 let k = (f)(&el);
1002 if !local_set.insert(k) {
1003 Some(el)
1004 } else {
1005 None
1006 }
1007 })
1008 .repartition_by(Replication::Unlimited, group_by_hash)
1009 .rich_flat_map(move |el| {
1010 let k = (f2)(&el);
1011 if !final_set.insert(k) {
1012 Some(el)
1013 } else {
1014 None
1015 }
1016 })
1017 }
1018
1019 /// Construct a [`KeyedStream`] from a [`Stream`] without shuffling the data.
1020 ///
1021 /// **Note**: this violates the semantics of [`KeyedStream`], without sending all the values
1022 /// with the same key to the same replica some of the following operators may misbehave. You
1023 /// probably need to use [`Stream::group_by`] instead.
1024 ///
1025 /// ## Example
1026 /// ```
1027 /// # use renoir::{StreamContext, RuntimeConfig};
1028 /// # use renoir::operator::source::IteratorSource;
1029 /// # let mut env = StreamContext::new_local();
1030 /// let s = env.stream_iter(0..5);
1031 /// let res = s.key_by(|&n| n % 2).collect_vec();
1032 ///
1033 /// env.execute_blocking();
1034 ///
1035 /// let mut res = res.get().unwrap();
1036 /// res.sort_unstable();
1037 /// assert_eq!(res, vec![(0, 0), (0, 2), (0, 4), (1, 1), (1, 3)]);
1038 /// ```
1039 pub fn key_by<K, Fk>(self, keyer: Fk) -> KeyedStream<impl Operator<Out = (K, Op::Out)>>
1040 where
1041 Fk: Fn(&Op::Out) -> K + Send + Clone + 'static,
1042 K: DataKey,
1043 {
1044 KeyedStream(self.add_operator(|prev| KeyBy::new(prev, keyer)))
1045 }
1046
1047 /// Apply the given function to all the elements of the stream, consuming the stream.
1048 ///
1049 /// ## Example
1050 ///
1051 /// ```
1052 /// # use renoir::{StreamContext, RuntimeConfig};
1053 /// # use renoir::operator::source::IteratorSource;
1054 /// # let mut env = StreamContext::new_local();
1055 /// let s = env.stream_iter(0..5);
1056 /// s.inspect(|n| println!("Item: {}", n)).for_each(std::mem::drop);
1057 ///
1058 /// env.execute_blocking();
1059 /// ```
1060 pub fn inspect<F>(self, f: F) -> Stream<impl Operator<Out = Op::Out>>
1061 where
1062 F: FnMut(&Op::Out) + Send + Clone + 'static,
1063 {
1064 self.add_operator(|prev| Inspect::new(prev, f))
1065 }
1066
1067 /// Apply a mapping operation to each element of the stream, the resulting stream will be the
1068 /// flattened values of the result of the mapping. The mapping function can be stateful.
1069 ///
1070 /// This is equivalent to [`Stream::flat_map`] but with a stateful function.
1071 ///
1072 /// Since the mapping function can be stateful, it is a `FnMut`. This allows expressing simple
1073 /// algorithms with very few lines of code (see examples).
1074 ///
1075 /// The mapping function is _cloned_ inside each replica, and they will not share state between
1076 /// each other. If you want that only a single replica handles all the items you may want to
1077 /// change the parallelism of this operator with [`Stream::replication`].
1078 ///
1079 /// ## Examples
1080 ///
1081 /// This will emit only the _positive prefix-sums_.
1082 ///
1083 /// ```
1084 /// # use renoir::{StreamContext, RuntimeConfig};
1085 /// # use renoir::operator::source::IteratorSource;
1086 /// # let mut env = StreamContext::new_local();
1087 /// let s = env.stream_iter(0..=3);
1088 /// let res = s.rich_flat_map({
1089 /// let mut elements = Vec::new();
1090 /// move |y| {
1091 /// let new_pairs = elements
1092 /// .iter()
1093 /// .map(|&x: &u32| (x, y))
1094 /// .collect::<Vec<_>>();
1095 /// elements.push(y);
1096 /// new_pairs
1097 /// }
1098 /// }).collect_vec();
1099 ///
1100 /// env.execute_blocking();
1101 ///
1102 /// assert_eq!(res.get().unwrap(), vec![(0, 1), (0, 2), (1, 2), (0, 3), (1, 3), (2, 3)]);
1103 /// ```
1104 pub fn rich_flat_map<It, F>(self, f: F) -> Stream<impl Operator<Out = It::Item>>
1105 where
1106 It: IntoIterator + Send + 'static,
1107 <It as IntoIterator>::IntoIter: Send + 'static,
1108 <It as IntoIterator>::Item: Send,
1109 F: FnMut(Op::Out) -> It + Send + Clone + 'static,
1110 {
1111 self.rich_map(f).flatten()
1112 }
1113
1114 /// Map the elements of the stream into new elements. The mapping function can be stateful.
1115 ///
1116 /// This version of `rich_flat_map` is a lower level primitive that gives full control over the
1117 /// inner types used in streams. It can be used to define custom unary operators.
1118 ///
1119 /// The closure must follow these rules to ensure the correct behaviour of renoir:
1120 /// + `Watermark` messages must be sent when no more items with lower timestamp will ever be produced
1121 /// + `FlushBatch` messages must be forwarded if received
1122 /// + For each `FlushAndRestart` and `Terminate` message received, the operator must generate
1123 /// one and only one message of the same kind. No other messages of this kind should be created
1124 ///
1125 /// The mapping function is _cloned_ inside each replica, and they will not share state between
1126 /// each other. If you want that only a single replica handles all the items you may want to
1127 /// change the parallelism of this operator with [`Stream::replication`].
1128 ///
1129 /// ## Examples
1130 ///
1131 /// TODO
1132 pub fn rich_map_custom<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
1133 where
1134 F: FnMut(ElementGenerator<Op>) -> StreamElement<O> + Clone + Send + 'static,
1135 O: Send,
1136 {
1137 self.add_operator(|prev| RichMapCustom::new(prev, f))
1138 }
1139
1140 /// Apply a mapping operation to each element of the stream, the resulting stream will be the
1141 /// flatMaped values of the result of the mapping.
1142 ///
1143 /// **Note**: this is very similar to [`Iteartor::flat_map`](std::iter::Iterator::flat_map)
1144 ///
1145 /// ## Example
1146 ///
1147 /// ```
1148 /// # use renoir::{StreamContext, RuntimeConfig};
1149 /// # use renoir::operator::source::IteratorSource;
1150 /// # let mut env = StreamContext::new_local();
1151 /// let s = env.stream_iter(0..3);
1152 /// let res = s.flat_map(|n| vec![n, n]).collect_vec();
1153 ///
1154 /// env.execute_blocking();
1155 ///
1156 /// assert_eq!(res.get().unwrap(), vec![0, 0, 1, 1, 2, 2]);
1157 /// ```
1158 pub fn flat_map<It, F>(self, f: F) -> Stream<impl Operator<Out = It::Item>>
1159 where
1160 It: IntoIterator,
1161 It::IntoIter: Send,
1162 It::Item: Send,
1163 F: Fn(Op::Out) -> It + Send + Clone,
1164 {
1165 self.add_operator(|prev| FlatMap::new(prev, f))
1166 }
1167
1168 /// Apply the given function to all the elements of the stream, consuming the stream.
1169 ///
1170 /// ## Example
1171 ///
1172 /// ```
1173 /// # use renoir::{StreamContext, RuntimeConfig};
1174 /// # use renoir::operator::source::IteratorSource;
1175 /// # let mut env = StreamContext::new_local();
1176 /// let s = env.stream_iter(0..5);
1177 /// s.for_each(|n| println!("Item: {}", n));
1178 ///
1179 /// env.execute_blocking();
1180 /// ```
1181 pub fn for_each<F>(self, f: F)
1182 where
1183 F: FnMut(Op::Out) + Send + Clone + 'static,
1184 {
1185 self.add_operator(|prev| ForEach::new(prev, f))
1186 .finalize_block();
1187 }
1188
1189 /// Transform this stream of containers into a stream of all the contained values.
1190 ///
1191 /// **Note**: this is very similar to [`Iteartor::flatten`](std::iter::Iterator::flatten)
1192 ///
1193 /// ## Example
1194 ///
1195 /// ```
1196 /// # use renoir::{StreamContext, RuntimeConfig};
1197 /// # use renoir::operator::source::IteratorSource;
1198 /// # let mut env = StreamContext::new_local();
1199 /// let s = env.stream_iter((vec![
1200 /// vec![1, 2, 3],
1201 /// vec![],
1202 /// vec![4, 5],
1203 /// ].into_iter()));
1204 /// let res = s.flatten().collect_vec();
1205 ///
1206 /// env.execute_blocking();
1207 ///
1208 /// assert_eq!(res.get().unwrap(), vec![1, 2, 3, 4, 5]);
1209 /// ```
1210 pub fn flatten(self) -> Stream<impl Operator<Out = <Op::Out as IntoIterator>::Item>>
1211 where
1212 Op::Out: IntoIterator,
1213 <Op::Out as IntoIterator>::IntoIter: Send,
1214 <Op::Out as IntoIterator>::Item: Send,
1215 {
1216 self.add_operator(|prev| Flatten::new(prev))
1217 }
1218
1219 /// Sort the items in the stream using the provided comparison function.
1220 ///
1221 /// **Note**: This is a blocking operator an will retain items until the end
1222 /// of the stream or a restart.
1223 ///
1224 /// ## Example
1225 ///
1226 /// ```
1227 /// # use renoir::{StreamContext, RuntimeConfig};
1228 /// # use renoir::operator::source::IteratorSource;
1229 /// # use rand::{rng, Rng};
1230 /// # use rand::seq::SliceRandom;
1231 /// # let mut env = StreamContext::new_local();
1232 ///
1233 /// let mut vec: Vec<_> = (0..20).collect::<Vec<_>>();
1234 /// vec.shuffle(&mut rng());
1235 ///
1236 /// let s = env.stream_iter(vec.into_iter());
1237 /// let res = s.sorted_by(|a,b| a.cmp(b)).collect_vec();
1238 ///
1239 /// env.execute_blocking();
1240 ///
1241 /// assert_eq!(res.get().unwrap(), (0..20).collect::<Vec<_>>());
1242 /// ```
1243 pub fn sorted_by<F>(self, compare: F) -> Stream<LimitSorted<F, Op>>
1244 where
1245 F: Fn(&Op::Out, &Op::Out) -> std::cmp::Ordering + Clone + Send,
1246 {
1247 self.add_operator(|prev| LimitSorted::new(prev, compare, None, None, true))
1248 }
1249
1250 /// Keep only the first `limit` items. If an `offset` is specified, keep only
1251 /// the first `limit` items after skipping `offset` elements. The order of the
1252 /// items across partitions is unspecified.
1253 ///
1254 /// **Note**: This is a blocking operator an will retain items until the end
1255 /// of the stream or a restart.
1256 /// **Note**: This operator will run all the previous computations to completion
1257 /// even if requires processing more elements than limit in order to preserve
1258 /// correctness
1259 ///
1260 /// ## Example
1261 ///
1262 /// ```
1263 /// # use renoir::{StreamContext, RuntimeConfig};
1264 /// # use renoir::operator::source::IteratorSource;
1265 /// # let mut env = StreamContext::new_local();
1266 ///
1267 /// let mut vec: Vec<_> = (0..80).collect::<Vec<_>>();
1268 ///
1269 /// let s = env.stream_iter(vec.into_iter());
1270 /// let res = s.limit(20, Some(40)).collect_vec();
1271 ///
1272 /// env.execute_blocking();
1273 ///
1274 /// assert_eq!(res.get().unwrap(), (40..60).collect::<Vec<_>>());
1275 /// ```
1276 pub fn limit(
1277 self,
1278 limit: usize,
1279 offset: Option<usize>,
1280 ) -> Stream<LimitSorted<impl Fn(&Op::Out, &Op::Out) -> Ordering + Clone + Send, Op>> {
1281 use std::cmp::Ordering;
1282
1283 self.add_operator(|prev| {
1284 LimitSorted::new(prev, |_, _| Ordering::Equal, Some(limit), offset, false)
1285 })
1286 }
1287
1288 /// Sort the items in the stream using the provided comparison function.
1289 /// Keep only the first `limit` items. If an `offset` is specified, keep only
1290 /// the first `limit` items after skipping `offset` elements.
1291 ///
1292 /// **Note**: This is a blocking operator an will retain items until the end
1293 /// of the stream or a restart.
1294 /// **Note**: This operator will run all the previous computations to completion
1295 /// even if requires processing more elements than limit in order to preserve
1296 /// correctness
1297 ///
1298 /// ## Example
1299 ///
1300 /// ```
1301 /// # use renoir::{StreamContext, RuntimeConfig};
1302 /// # use renoir::operator::source::IteratorSource;
1303 /// # use rand::{rng, Rng};
1304 /// # use rand::seq::SliceRandom;
1305 /// # let mut env = StreamContext::new_local();
1306 ///
1307 /// let mut vec: Vec<_> = (0..80).collect::<Vec<_>>();
1308 /// vec.shuffle(&mut rng());
1309 ///
1310 /// let s = env.stream_iter(vec.into_iter());
1311 /// let res = s.sorted_limit_by(|a,b| a.cmp(b), 20, Some(40)).collect_vec();
1312 ///
1313 /// env.execute_blocking();
1314 ///
1315 /// assert_eq!(res.get().unwrap(), (40..60).collect::<Vec<_>>());
1316 /// ```
1317 pub fn sorted_limit_by<F>(
1318 self,
1319 compare: F,
1320 limit: usize,
1321 offset: Option<usize>,
1322 ) -> Stream<LimitSorted<F, Op>>
1323 where
1324 F: Fn(&Op::Out, &Op::Out) -> std::cmp::Ordering + Clone + Send,
1325 {
1326 self.add_operator(|prev| LimitSorted::new(prev, compare, Some(limit), offset, true))
1327 }
1328}
1329
1330impl<I, Op> Stream<Op>
1331where
1332 I: ExchangeData,
1333 Op: Operator<Out = I> + 'static,
1334{
1335 /// Duplicate each element of the stream and forward it to all the replicas of the next block.
1336 ///
1337 /// **Note**: this will duplicate the elements of the stream, this is potentially a very
1338 /// expensive operation.
1339 ///
1340 /// **Note**: this operator will split the current block.
1341 ///
1342 /// ## Example
1343 ///
1344 /// ```
1345 /// # use renoir::{StreamContext, RuntimeConfig};
1346 /// # use renoir::operator::source::IteratorSource;
1347 /// # let mut env = StreamContext::new_local();
1348 /// let s = env.stream_iter(0..10);
1349 /// s.broadcast();
1350 /// ```
1351 pub fn broadcast(self) -> Stream<impl Operator<Out = Op::Out>> {
1352 self.split_block(End::new, NextStrategy::all())
1353 }
1354
1355 /// Given a stream, make a [`KeyedStream`] partitioning the values according to a key generated
1356 /// by the `keyer` function provided.
1357 ///
1358 /// The returned [`KeyedStream`] is partitioned by key, and all the operators added to it will
1359 /// be evaluated _after_ the network shuffle. Therefore all the items are sent to the network
1360 /// (if their destination is not the local host). In many cases this behaviour can be avoided by
1361 /// using the associative variant of the operators (e.g. [`Stream::group_by_reduce`],
1362 /// [`Stream::group_by_sum`], ...).
1363 ///
1364 /// **Note**: the keys are not sent to the network, they are built on the sending side, and
1365 /// rebuilt on the receiving side.
1366 ///
1367 /// **Note**: this operator will split the current block.
1368 ///
1369 /// ## Example
1370 /// ```
1371 /// # use renoir::{StreamContext, RuntimeConfig};
1372 /// # use renoir::operator::source::IteratorSource;
1373 /// # let mut env = StreamContext::new_local();
1374 /// let s = env.stream_iter(0..5);
1375 /// let keyed = s.group_by(|&n| n % 2); // partition even and odd elements
1376 /// ```
1377 pub fn group_by<K, Fk>(self, keyer: Fk) -> KeyedStream<impl Operator<Out = (K, I)>>
1378 where
1379 Fk: Fn(&Op::Out) -> K + Send + Clone + 'static,
1380 K: DataKey,
1381 {
1382 let next_strategy = NextStrategy::group_by(keyer.clone());
1383 let new_stream = self
1384 .split_block(End::new, next_strategy)
1385 .add_operator(|prev| KeyBy::new(prev, keyer));
1386 KeyedStream(new_stream)
1387 }
1388
1389 /// Find, for each partition of the stream, the item with the largest value.
1390 ///
1391 /// The stream is partitioned using the `keyer` function and the value to compare is obtained
1392 /// with `get_value`.
1393 ///
1394 /// This operation is associative, therefore the computation is done in parallel before sending
1395 /// all the elements to the network.
1396 ///
1397 /// **Note**: the comparison is done using the value returned by `get_value`, but the resulting
1398 /// items have the same type as the input.
1399 ///
1400 /// **Note**: this operator will split the current block.
1401 ///
1402 /// ## Example
1403 /// ```
1404 /// # use renoir::{StreamContext, RuntimeConfig};
1405 /// # use renoir::operator::source::IteratorSource;
1406 /// # let mut env = StreamContext::new_local();
1407 /// let s = env.stream_iter(0..5);
1408 /// let res = s
1409 /// .group_by_max_element(|&n| n % 2, |&n| n)
1410 /// .collect_vec();
1411 ///
1412 /// env.execute_blocking();
1413 ///
1414 /// let mut res = res.get().unwrap();
1415 /// res.sort_unstable();
1416 /// assert_eq!(res, vec![(0, 4), (1, 3)]);
1417 /// ```
1418 pub fn group_by_max_element<K, V, Fk, Fv>(
1419 self,
1420 keyer: Fk,
1421 get_value: Fv,
1422 ) -> KeyedStream<impl Operator<Out = (K, I)>>
1423 where
1424 Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K,
1425 Fv: KeyerFn<V, Op::Out> + Fn(&Op::Out) -> V,
1426 K: ExchangeDataKey,
1427 V: Ord,
1428 {
1429 self.group_by_reduce(keyer, move |out, value| {
1430 if get_value(&value) > get_value(out) {
1431 *out = value;
1432 }
1433 })
1434 }
1435
1436 /// Find, for each partition of the stream, the sum of the values of the items.
1437 ///
1438 /// The stream is partitioned using the `keyer` function and the value to sum is obtained
1439 /// with `get_value`.
1440 ///
1441 /// This operation is associative, therefore the computation is done in parallel before sending
1442 /// all the elements to the network.
1443 ///
1444 /// **Note**: this is similar to the SQL: `SELECT SUM(value) ... GROUP BY key`
1445 ///
1446 /// **Note**: the type of the result does not have to be a number, any type that implements
1447 /// `AddAssign` is accepted.
1448 ///
1449 /// **Note**: this operator will split the current block.
1450 ///
1451 /// ## Example
1452 /// ```
1453 /// # use renoir::{StreamContext, RuntimeConfig};
1454 /// # use renoir::operator::source::IteratorSource;
1455 /// # let mut env = StreamContext::new_local();
1456 /// let s = env.stream_iter(0..5);
1457 /// let res = s
1458 /// .group_by_sum(|&n| n % 2, |n| n)
1459 /// .collect_vec();
1460 ///
1461 /// env.execute_blocking();
1462 ///
1463 /// let mut res = res.get().unwrap();
1464 /// res.sort_unstable();
1465 /// assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
1466 /// ```
1467 pub fn group_by_sum<K, V, Fk, Fv>(
1468 self,
1469 keyer: Fk,
1470 get_value: Fv,
1471 ) -> KeyedStream<impl Operator<Out = (K, V)>>
1472 where
1473 Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K,
1474 Fv: Fn(Op::Out) -> V + Clone + Send + 'static,
1475 V: ExchangeData + AddAssign,
1476 K: ExchangeDataKey,
1477 {
1478 let s = self.group_by_fold(
1479 keyer,
1480 None,
1481 move |acc, value| {
1482 if let Some(acc) = acc {
1483 *acc += get_value(value);
1484 } else {
1485 *acc = Some(get_value(value));
1486 }
1487 },
1488 |acc, value| match acc {
1489 None => *acc = value,
1490 Some(acc) => {
1491 if let Some(value) = value {
1492 *acc += value
1493 }
1494 }
1495 },
1496 );
1497 s.map(|(_, o)| o.unwrap())
1498 }
1499
1500 /// Find, for each partition of the stream, the average of the values of the items.
1501 ///
1502 /// The stream is partitioned using the `keyer` function and the value to average is obtained
1503 /// with `get_value`.
1504 ///
1505 /// This operation is associative, therefore the computation is done in parallel before sending
1506 /// all the elements to the network.
1507 ///
1508 /// **Note**: this is similar to the SQL: `SELECT AVG(value) ... GROUP BY key`
1509 ///
1510 /// **Note**: the type of the result does not have to be a number, any type that implements
1511 /// `AddAssign` and can be divided by `f64` is accepted.
1512 ///
1513 /// **Note**: this operator will split the current block.
1514 ///
1515 /// ## Example
1516 /// ```
1517 /// # use renoir::{StreamContext, RuntimeConfig};
1518 /// # use renoir::operator::source::IteratorSource;
1519 /// # let mut env = StreamContext::new_local();
1520 /// let s = env.stream_iter(0..5);
1521 /// let res = s
1522 /// .group_by_avg(|&n| n % 2, |&n| n as f64)
1523 /// .collect_vec();
1524 ///
1525 /// env.execute_blocking();
1526 ///
1527 /// let mut res = res.get().unwrap();
1528 /// res.sort_by_key(|(k, _)| *k);
1529 /// assert_eq!(res, vec![(0, (0.0 + 2.0 + 4.0) / 3.0), (1, (1.0 + 3.0) / 2.0)]);
1530 /// ```
1531 pub fn group_by_avg<K, V, Fk, Fv>(
1532 self,
1533 keyer: Fk,
1534 get_value: Fv,
1535 ) -> KeyedStream<impl Operator<Out = (K, V)>>
1536 where
1537 Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K,
1538 Fv: KeyerFn<V, Op::Out> + Fn(&Op::Out) -> V,
1539 V: ExchangeData + AddAssign + Div<f64, Output = V>,
1540 K: ExchangeDataKey,
1541 {
1542 self.group_by_fold(
1543 keyer,
1544 (None, 0usize),
1545 move |(sum, count), value| {
1546 *count += 1;
1547 match sum {
1548 Some(sum) => *sum += get_value(&value),
1549 None => *sum = Some(get_value(&value)),
1550 }
1551 },
1552 |(sum, count), (local_sum, local_count)| {
1553 *count += local_count;
1554 match sum {
1555 None => *sum = local_sum,
1556 Some(sum) => {
1557 if let Some(local_sum) = local_sum {
1558 *sum += local_sum;
1559 }
1560 }
1561 }
1562 },
1563 )
1564 .map(|(_, (sum, count))| sum.unwrap() / (count as f64))
1565 }
1566
1567 /// Count, for each partition of the stream, the number of items.
1568 ///
1569 /// The stream is partitioned using the `keyer` function.
1570 ///
1571 /// This operation is associative, therefore the computation is done in parallel before sending
1572 /// all the elements to the network.
1573 ///
1574 /// **Note**: this is similar to the SQL: `SELECT COUNT(*) ... GROUP BY key`
1575 ///
1576 /// **Note**: this operator will split the current block.
1577 ///
1578 /// ## Example
1579 /// ```
1580 /// # use renoir::{StreamContext, RuntimeConfig};
1581 /// # use renoir::operator::source::IteratorSource;
1582 /// # let mut env = StreamContext::new_local();
1583 /// let s = env.stream_iter(0..5);
1584 /// let res = s
1585 /// .group_by_count(|&n| n % 2)
1586 /// .collect_vec();
1587 ///
1588 /// env.execute_blocking();
1589 ///
1590 /// let mut res = res.get().unwrap();
1591 /// res.sort_by_key(|(k, _)| *k);
1592 /// assert_eq!(res, vec![(0, 3), (1, 2)]);
1593 /// ```
1594 pub fn group_by_count<K, Fk>(self, keyer: Fk) -> KeyedStream<impl Operator<Out = (K, usize)>>
1595 where
1596 Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K,
1597 K: ExchangeDataKey,
1598 {
1599 self.group_by_fold(
1600 keyer,
1601 0,
1602 move |count, _| *count += 1,
1603 |count, local_count| *count += local_count,
1604 )
1605 }
1606
1607 /// Find, for each partition of the stream, the item with the smallest value.
1608 ///
1609 /// The stream is partitioned using the `keyer` function and the value to compare is obtained
1610 /// with `get_value`.
1611 ///
1612 /// This operation is associative, therefore the computation is done in parallel before sending
1613 /// all the elements to the network.
1614 ///
1615 /// **Note**: the comparison is done using the value returned by `get_value`, but the resulting
1616 /// items have the same type as the input.
1617 ///
1618 /// **Note**: this operator will split the current block.
1619 ///
1620 /// ## Example
1621 /// ```
1622 /// # use renoir::{StreamContext, RuntimeConfig};
1623 /// # use renoir::operator::source::IteratorSource;
1624 /// # let mut env = StreamContext::new_local();
1625 /// let s = env.stream_iter(0..5);
1626 /// let res = s
1627 /// .group_by_min_element(|&n| n % 2, |&n| n)
1628 /// .collect_vec();
1629 ///
1630 /// env.execute_blocking();
1631 ///
1632 /// let mut res = res.get().unwrap();
1633 /// res.sort_unstable();
1634 /// assert_eq!(res, vec![(0, 0), (1, 1)]);
1635 /// ```
1636 pub fn group_by_min_element<K, V, Fk, Fv>(
1637 self,
1638 keyer: Fk,
1639 get_value: Fv,
1640 ) -> KeyedStream<impl Operator<Out = (K, I)>>
1641 where
1642 Fk: KeyerFn<K, Op::Out> + Fn(&Op::Out) -> K,
1643 Fv: KeyerFn<V, Op::Out> + Fn(&Op::Out) -> V,
1644 K: ExchangeDataKey,
1645 V: Ord,
1646 {
1647 self.group_by_reduce(keyer, move |out, value| {
1648 if get_value(&value) < get_value(out) {
1649 *out = value;
1650 }
1651 })
1652 }
1653
1654 /// Perform the reduction operation separately for each key.
1655 ///
1656 /// This is equivalent of partitioning the stream using the `keyer` function, and then applying
1657 /// [`Stream::reduce_assoc`] to each partition separately.
1658 ///
1659 /// Note however that there is a difference between `stream.group_by(keyer).reduce(...)` and
1660 /// `stream.group_by_reduce(keyer, ...)`. The first performs the network shuffle of every item in
1661 /// the stream, and **later** performs the reduction (i.e. nearly all the elements will be sent to
1662 /// the network). The latter avoids sending the items by performing first a local reduction on
1663 /// each host, and then send only the locally reduced results (i.e. one message per replica, per
1664 /// key); then the global step is performed aggregating the results.
1665 ///
1666 /// The resulting stream will still be keyed and will contain only a single message per key (the
1667 /// final result).
1668 ///
1669 /// Note that the output type must be the same as the input type, if you need a different type
1670 /// consider using [`Stream::group_by_fold`].
1671 ///
1672 /// **Note**: this operator will retain all the messages of the stream and emit the values only
1673 /// when the stream ends. Therefore this is not properly _streaming_.
1674 ///
1675 /// **Note**: this operator will split the current block.
1676 ///
1677 /// ## Example
1678 /// ```
1679 /// # use renoir::{StreamContext, RuntimeConfig};
1680 /// # use renoir::operator::source::IteratorSource;
1681 /// # let mut env = StreamContext::new_local();
1682 /// let s = env.stream_iter(0..5);
1683 /// let res = s
1684 /// .group_by_reduce(|&n| n % 2, |acc, value| *acc += value)
1685 /// .collect_vec();
1686 ///
1687 /// env.execute_blocking();
1688 ///
1689 /// let mut res = res.get().unwrap();
1690 /// res.sort_unstable();
1691 /// assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
1692 /// ```
1693 pub fn group_by_reduce<K, Fk, F>(
1694 self,
1695 keyer: Fk,
1696 f: F,
1697 ) -> KeyedStream<impl Operator<Out = (K, I)>>
1698 where
1699 Fk: Fn(&Op::Out) -> K + Send + Clone + 'static,
1700 F: Fn(&mut I, I) + Send + Clone + 'static,
1701 K: ExchangeDataKey,
1702 {
1703 let f2 = f.clone();
1704
1705 self.group_by_fold(
1706 keyer,
1707 None,
1708 move |acc, value| match acc {
1709 None => *acc = Some(value),
1710 Some(acc) => f(acc, value),
1711 },
1712 move |acc1, acc2| match acc1 {
1713 None => *acc1 = acc2,
1714 Some(acc1) => {
1715 if let Some(acc2) = acc2 {
1716 f2(acc1, acc2)
1717 }
1718 }
1719 },
1720 )
1721 .map(|(_, value)| value.unwrap())
1722 }
1723
1724 /// Given two streams **with timestamps** join them according to an interval centered around the
1725 /// timestamp of the left side.
1726 ///
1727 /// This means that an element on the left side with timestamp T will be joined to all the
1728 /// elements on the right with timestamp Q such that `T - lower_bound <= Q <= T + upper_bound`.
1729 ///
1730 /// **Note**: this operator is not parallelized, all the elements are sent to a single node to
1731 /// perform the join.
1732 ///
1733 /// **Note**: this operator will split the current block.
1734 ///
1735 /// ## Example
1736 /// TODO: example
1737 #[cfg(feature = "timestamp")]
1738 pub fn interval_join<I2, Op2>(
1739 self,
1740 right: Stream<Op2>,
1741 lower_bound: Timestamp,
1742 upper_bound: Timestamp,
1743 ) -> Stream<impl Operator<Out = (I, I2)>>
1744 where
1745 I2: ExchangeData,
1746 Op2: Operator<Out = I2> + 'static,
1747 {
1748 let left = self.replication(Replication::One);
1749 let right = right.replication(Replication::One);
1750 left.merge_distinct(right)
1751 .key_by(|_| ())
1752 .add_operator(Reorder::new)
1753 .add_operator(|prev| IntervalJoin::new(prev, lower_bound, upper_bound))
1754 .drop_key()
1755 }
1756
1757 /// Change the maximum parallelism of the following operators.
1758 ///
1759 /// **Note**: this operator is pretty advanced, some operators may need to be fully replicated
1760 /// and will fail otherwise.
1761 pub fn replication(self, replication: Replication) -> Stream<impl Operator<Out = Op::Out>> {
1762 let mut new_stream = self.split_block(End::new, NextStrategy::only_one());
1763 // TODO: Cannot scale up
1764 new_stream.block.scheduling.replication(replication);
1765 new_stream
1766 }
1767
1768 /// Advanced operator that allows changing the replication and forwarding strategy
1769 ///
1770 /// **Note**: this operator is advanced and is only intended to add functionality
1771 /// that is not achievable with other operators. Use with care
1772 pub(crate) fn repartition<Fk: KeyerFn<u64, Op::Out>>(
1773 self,
1774 replication: Replication,
1775 next_strategy: NextStrategy<Op::Out, Fk>,
1776 ) -> Stream<impl Operator<Out = Op::Out>> {
1777 let mut new_stream = self.split_block(End::new, next_strategy);
1778 new_stream.block.scheduling.replication(replication);
1779 new_stream
1780 }
1781
1782 /// Advanced operator that allows changing the replication and forwarding strategy
1783 ///
1784 /// **Note**: this operator is advanced and is only intended to add functionality
1785 /// that is not achievable with other operators. Use with care
1786 pub fn repartition_by<Fk: KeyerFn<u64, Op::Out>>(
1787 self,
1788 replication: Replication,
1789 partition_fn: Fk,
1790 ) -> Stream<impl Operator<Out = Op::Out>> {
1791 let mut new_stream = self.split_block(End::new, NextStrategy::group_by(partition_fn));
1792 new_stream.block.scheduling.replication(replication);
1793 new_stream
1794 }
1795
1796 /// Reduce the stream into a stream that emits a single value.
1797 ///
1798 /// The reducing operator consists in adding to the current accumulation value the value of the
1799 /// current item in the stream.
1800 ///
1801 /// The reducing function is provided with a mutable reference to the current accumulator and the
1802 /// owned item of the stream. The function should modify the accumulator without returning
1803 /// anything.
1804 ///
1805 /// Note that the output type must be the same as the input type, if you need a different type
1806 /// consider using [`Stream::fold`].
1807 ///
1808 /// **Note**: this operator will retain all the messages of the stream and emit the values only
1809 /// when the stream ends. Therefore this is not properly _streaming_.
1810 ///
1811 /// **Note**: this operator is not parallelized, it creates a bottleneck where all the stream
1812 /// elements are sent to and the folding is done using a single thread.
1813 ///
1814 /// **Note**: this is very similar to [`Iteartor::reduce`](std::iter::Iterator::reduce).
1815 ///
1816 /// **Note**: this operator will split the current block.
1817 ///
1818 /// ## Example
1819 ///
1820 /// ```
1821 /// # use renoir::{StreamContext, RuntimeConfig};
1822 /// # use renoir::operator::source::IteratorSource;
1823 /// # let mut env = StreamContext::new_local();
1824 /// let s = env.stream_iter(0..5);
1825 /// let res = s.reduce(|a, b| a + b).collect::<Vec<_>>();
1826 ///
1827 /// env.execute_blocking();
1828 ///
1829 /// assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
1830 /// ```
1831 pub fn reduce<F>(self, f: F) -> Stream<impl Operator<Out = Op::Out>>
1832 where
1833 F: Fn(I, I) -> I + Send + Clone + 'static,
1834 {
1835 self.fold(None, move |acc, b| {
1836 *acc = Some(if let Some(a) = acc.take() { f(a, b) } else { b })
1837 })
1838 .map(|value| value.unwrap())
1839 }
1840
1841 /// Reduce the stream into a stream that emits a single value.
1842 ///
1843 /// The reducing operator consists in adding to the current accumulation value the value of the
1844 /// current item in the stream.
1845 ///
1846 /// This method is very similary to [`Stream::reduce`], but performs the reduction distributely.
1847 /// To do so the reducing function must be _associative_, in particular the reducing process is
1848 /// performed in 2 steps:
1849 ///
1850 /// - local: the reducing function is used to reduce the elements present in each replica of
1851 /// the stream independently.
1852 /// - global: all the partial results (the elements produced by the local step) have to be
1853 /// aggregated into a single result.
1854 ///
1855 /// Note that the output type must be the same as the input type, if you need a different type
1856 /// consider using [`Stream::fold_assoc`].
1857 ///
1858 /// **Note**: this operator will retain all the messages of the stream and emit the values only
1859 /// when the stream ends. Therefore this is not properly _streaming_.
1860 ///
1861 /// **Note**: this operator will split the current block.
1862 ///
1863 /// ## Example
1864 ///
1865 /// ```
1866 /// # use renoir::{StreamContext, RuntimeConfig};
1867 /// # use renoir::operator::source::IteratorSource;
1868 /// # let mut env = StreamContext::new_local();
1869 /// let s = env.stream_iter(0..5);
1870 /// let res = s.reduce_assoc(|a, b| a + b).collect_vec();
1871 ///
1872 /// env.execute_blocking();
1873 ///
1874 /// assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
1875 /// ```
1876 pub fn reduce_assoc<F>(self, f: F) -> Stream<impl Operator<Out = Op::Out>>
1877 where
1878 F: Fn(I, I) -> I + Send + Clone + 'static,
1879 {
1880 let f2 = f.clone();
1881
1882 self.fold_assoc(
1883 None,
1884 move |acc, b| *acc = Some(if let Some(a) = acc.take() { f(a, b) } else { b }),
1885 move |acc1, mut acc2| {
1886 *acc1 = match (acc1.take(), acc2.take()) {
1887 (Some(a), Some(b)) => Some(f2(a, b)),
1888 (None, Some(a)) | (Some(a), None) => Some(a),
1889 (None, None) => None,
1890 }
1891 },
1892 )
1893 .map(|value| value.unwrap())
1894 }
1895
1896 /// Route each element depending on its content.
1897 ///
1898 /// + Routes are created with the `add_route` method, a new stream is created for each route.
1899 /// + Each element is routed to the first stream for which the routing condition evaluates to true.
1900 /// + If no route condition is satisfied, the element is dropped
1901 ///
1902 /// **Note**: this operator will split the current block.
1903 ///
1904 /// ## Example
1905 ///
1906 /// ```
1907 /// # use renoir::prelude::*;
1908 /// # let mut env = StreamContext::new_local();
1909 /// # let s = env.stream_iter(0..10);
1910 /// let mut routes = s.route()
1911 /// .add_route(|&i| i < 5)
1912 /// .add_route(|&i| i % 2 == 0)
1913 /// .build()
1914 /// .into_iter();
1915 /// assert_eq!(routes.len(), 2);
1916 /// // 0 1 2 3 4
1917 /// routes.next().unwrap().for_each(|i| eprintln!("route1: {i}"));
1918 /// // 6 8
1919 /// routes.next().unwrap().for_each(|i| eprintln!("route2: {i}"));
1920 /// // 5 7 9 ignored
1921 /// env.execute_blocking();
1922 /// ```
1923 pub fn route(self) -> RouterBuilder<I, Op> {
1924 RouterBuilder::new(self)
1925 }
1926
1927 /// Perform a network shuffle sending the messages to a random replica.
1928 ///
1929 /// This can be useful if for some reason the load is very unbalanced (e.g. after a very
1930 /// unbalanced [`Stream::group_by`]).
1931 ///
1932 /// **Note**: this operator will split the current block.
1933 ///
1934 /// ## Example
1935 ///
1936 /// ```
1937 /// # use renoir::{StreamContext, RuntimeConfig};
1938 /// # use renoir::operator::source::IteratorSource;
1939 /// # let mut env = StreamContext::new_local();
1940 /// let s = env.stream_iter(0..5);
1941 /// let res = s.shuffle();
1942 /// ```
1943 pub fn shuffle(self) -> Stream<impl Operator<Out = Op::Out>> {
1944 self.split_block(End::new, NextStrategy::random())
1945 }
1946
1947 /// Split the stream into `splits` streams, each with all the elements of the first one.
1948 ///
1949 /// This will effectively duplicate every item in the stream into the newly created streams.
1950 ///
1951 /// **Note**: this operator will split the current block.
1952 ///
1953 /// ## Example
1954 ///
1955 /// ```
1956 /// # use renoir::{StreamContext, RuntimeConfig};
1957 /// # use renoir::operator::source::IteratorSource;
1958 /// # let mut env = StreamContext::new_local();
1959 /// let s = env.stream_iter(0..5);
1960 /// let mut splits = s.split(3);
1961 /// let a = splits.pop().unwrap();
1962 /// let b = splits.pop().unwrap();
1963 /// let c = splits.pop().unwrap();
1964 /// ```
1965 pub fn split(self, splits: usize) -> Vec<Stream<impl Operator<Out = Op::Out>>> {
1966 // This is needed to maintain the same parallelism of the split block
1967 let scheduler_requirements = self.block.scheduling.clone();
1968 let mut new_stream = self.split_block(End::new, NextStrategy::only_one());
1969 new_stream.block.scheduling = scheduler_requirements;
1970
1971 let mut streams = Vec::with_capacity(splits);
1972 for _ in 0..splits - 1 {
1973 streams.push(new_stream.clone());
1974 }
1975 streams.push(new_stream);
1976
1977 streams
1978 }
1979
1980 /// Given two [`Stream`]s, zip their elements together: the resulting stream will be a stream of
1981 /// pairs, each of which is an element from both streams respectively.
1982 ///
1983 /// **Note**: all the elements after the end of one of the streams are discarded (i.e. the
1984 /// resulting stream will have a number of elements that is the minimum between the lengths of
1985 /// the two input streams).
1986 ///
1987 /// **Note**: this operator will split the current block.
1988 ///
1989 /// ## Example
1990 ///
1991 /// ```
1992 /// # use renoir::{StreamContext, RuntimeConfig};
1993 /// # use renoir::operator::source::IteratorSource;
1994 /// # let mut env = StreamContext::new_local();
1995 /// let s1 = env.stream_iter((vec!['A', 'B', 'C', 'D'].into_iter()));
1996 /// let s2 = env.stream_iter((vec![1, 2, 3].into_iter()));
1997 /// let res = s1.zip(s2).collect_vec();
1998 ///
1999 /// env.execute_blocking();
2000 ///
2001 /// assert_eq!(res.get().unwrap(), vec![('A', 1), ('B', 2), ('C', 3)]);
2002 /// ```
2003 pub fn zip<I2, Op2>(self, oth: Stream<Op2>) -> Stream<impl Operator<Out = (I, I2)>>
2004 where
2005 Op2: Operator<Out = I2> + 'static,
2006 I2: ExchangeData,
2007 {
2008 let mut new_stream = self.binary_connection(
2009 oth,
2010 Zip::new,
2011 NextStrategy::only_one(),
2012 NextStrategy::only_one(),
2013 );
2014 // if the zip operator is partitioned there could be some loss of data
2015 new_stream.block.scheduling.replication(Replication::One);
2016 new_stream
2017 }
2018
2019 /// Close the stream and send resulting items to a channel on a single host.
2020 ///
2021 /// If the stream is distributed among multiple replicas, parallelism will
2022 /// be set to 1 to gather all results
2023 ///
2024 /// **Note**: the order of items and keys is unspecified.
2025 ///
2026 /// **Note**: this operator will split the current block.
2027 ///
2028 /// ## Example
2029 ///
2030 /// ```
2031 /// # use renoir::{StreamContext, RuntimeConfig};
2032 /// # use renoir::operator::source::IteratorSource;
2033 /// # let mut env = StreamContext::new_local();
2034 /// let s = env.stream_iter(0..10u32);
2035 /// let rx = s.collect_channel();
2036 ///
2037 /// env.execute_blocking();
2038 /// let mut v = Vec::new();
2039 /// while let Ok(x) = rx.recv() {
2040 /// v.push(x)
2041 /// }
2042 /// assert_eq!(v, (0..10u32).collect::<Vec<_>>());
2043 /// ```
2044 pub fn collect_channel(self) -> Receiver<I> {
2045 let (tx, rx) = unbounded();
2046 self.replication(Replication::One)
2047 .add_operator(|prev| CollectChannelSink::new(prev, tx))
2048 .finalize_block();
2049 rx
2050 }
2051 /// Close the stream and send resulting items to a channel on each single host.
2052 ///
2053 /// Each host sends its outputs to the channel without repartitioning.
2054 /// Elements will be sent to the channel on the same host that produced
2055 /// the output.
2056 ///
2057 /// **Note**: the order of items and keys is unspecified.
2058 ///
2059 /// ## Example
2060 ///
2061 /// ```
2062 /// # use renoir::{StreamContext, RuntimeConfig};
2063 /// # use renoir::operator::source::IteratorSource;
2064 /// # let mut env = StreamContext::new_local();
2065 /// let s = env.stream_iter(0..10u32);
2066 /// let rx = s.collect_channel();
2067 ///
2068 /// env.execute_blocking();
2069 /// let mut v = Vec::new();
2070 /// while let Ok(x) = rx.recv() {
2071 /// v.push(x)
2072 /// }
2073 /// assert_eq!(v, (0..10u32).collect::<Vec<_>>());
2074 /// ```
2075 pub fn collect_channel_parallel(self) -> Receiver<I> {
2076 let (tx, rx) = unbounded();
2077 self.add_operator(|prev| CollectChannelSink::new(prev, tx))
2078 .finalize_block();
2079 rx
2080 }
2081
2082 /// Close the stream and store all the resulting items into a [`Vec`] on a single host.
2083 ///
2084 /// If the stream is distributed among multiple replicas, a bottleneck is placed where all the
2085 /// replicas sends the items to.
2086 ///
2087 /// **Note**: the order of items and keys is unspecified.
2088 ///
2089 /// **Note**: this operator will split the current block.
2090 ///
2091 /// ## Example
2092 ///
2093 /// ```
2094 /// # use renoir::{StreamContext, RuntimeConfig};
2095 /// # use renoir::operator::source::IteratorSource;
2096 /// # let mut env = StreamContext::new_local();
2097 /// let s = env.stream_iter(0..10);
2098 /// let res = s.collect_vec();
2099 ///
2100 /// env.execute_blocking();
2101 ///
2102 /// assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
2103 /// ```
2104 pub fn collect_count(self) -> StreamOutput<usize> {
2105 let output = StreamOutputRef::default();
2106 self.add_operator(|prev| Fold::new(prev, 0, |acc, _| *acc += 1))
2107 .replication(Replication::One)
2108 .add_operator(|prev| CollectCountSink::new(prev, output.clone()))
2109 .finalize_block();
2110 StreamOutput::from(output)
2111 }
2112
2113 /// Close the stream and store all the resulting items into a [`Vec`] on a single host.
2114 ///
2115 /// If the stream is distributed among multiple replicas, a bottleneck is placed where all the
2116 /// replicas sends the items to.
2117 ///
2118 /// **Note**: the order of items and keys is unspecified.
2119 ///
2120 /// **Note**: this operator will split the current block.
2121 ///
2122 /// ## Example
2123 ///
2124 /// ```
2125 /// # use renoir::{StreamContext, RuntimeConfig};
2126 /// # use renoir::operator::source::IteratorSource;
2127 /// # let mut env = StreamContext::new_local();
2128 /// let s = env.stream_iter(0..10);
2129 /// let res = s.collect_vec();
2130 ///
2131 /// env.execute_blocking();
2132 ///
2133 /// assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
2134 /// ```
2135 pub fn collect_vec(self) -> StreamOutput<Vec<I>> {
2136 let output = StreamOutputRef::default();
2137 self.replication(Replication::One)
2138 .add_operator(|prev| CollectVecSink::new(prev, output.clone()))
2139 .finalize_block();
2140 StreamOutput::from(output)
2141 }
2142
2143 /// Close the stream and store all the resulting items into a [`Vec`] on a single host.
2144 ///
2145 /// If the stream is distributed among multiple replicas, a bottleneck is placed where all the
2146 /// replicas sends the items to.
2147 ///
2148 /// **Note**: the order of items and keys is unspecified.
2149 ///
2150 /// **Note**: this operator will split the current block.
2151 ///
2152 /// ## Example
2153 ///
2154 /// ```
2155 /// # use renoir::{StreamContext, RuntimeConfig};
2156 /// # use renoir::operator::source::IteratorSource;
2157 /// # let mut env = StreamContext::new_local();
2158 /// let s = env.stream_iter(0..10);
2159 /// let res = s.collect_vec();
2160 ///
2161 /// env.execute_blocking();
2162 ///
2163 /// assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
2164 /// ```
2165 pub fn collect_vec_all(self) -> StreamOutput<Vec<I>> {
2166 let output = StreamOutputRef::default();
2167 self.repartition(Replication::Host, NextStrategy::all())
2168 .add_operator(|prev| CollectVecSink::new(prev, output.clone()))
2169 .finalize_block();
2170 StreamOutput::from(output)
2171 }
2172
2173 /// Close the stream and store all the resulting items into a collection on a single host.
2174 ///
2175 /// If the stream is distributed among multiple replicas, parallelism will
2176 /// be set to 1 to gather all results
2177 ///
2178 /// **Note**: the order of items and keys is unspecified.
2179 ///
2180 /// **Note**: this operator will split the current block.
2181 ///
2182 /// ## Example
2183 ///
2184 /// ```
2185 /// # use renoir::{StreamContext, RuntimeConfig};
2186 /// # use renoir::operator::source::IteratorSource;
2187 /// # let mut env = StreamContext::new_local();
2188 /// let s = env.stream_iter(0..10);
2189 /// let res = s.collect_vec();
2190 ///
2191 /// env.execute_blocking();
2192 ///
2193 /// assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
2194 /// ```
2195 pub fn collect<C: FromIterator<I> + Send + 'static>(self) -> StreamOutput<C> {
2196 let output = StreamOutputRef::default();
2197 self.replication(Replication::One)
2198 .add_operator(|prev| Collect::new(prev, output.clone()))
2199 .finalize_block();
2200 StreamOutput::from(output)
2201 }
2202
2203 /// Close the stream and store all the resulting items into a collection on each single host.
2204 ///
2205 /// Partitioning will be set to Host and results will be replicated
2206 ///
2207 /// **Note**: the order of items and keys is unspecified.
2208 ///
2209 /// **Note**: this operator will split the current block.
2210 ///
2211 /// ## Example
2212 ///
2213 /// ```
2214 /// # use renoir::{StreamContext, RuntimeConfig};
2215 /// # use renoir::operator::source::IteratorSource;
2216 /// # let mut env = StreamContext::new_local();
2217 /// let s = env.stream_iter(0..10);
2218 /// let res = s.collect_vec();
2219 ///
2220 /// env.execute_blocking();
2221 ///
2222 /// assert_eq!(res.get().unwrap(), (0..10).collect::<Vec<_>>());
2223 /// ```
2224 pub fn collect_all<C: FromIterator<I> + Send + 'static>(self) -> StreamOutput<C> {
2225 let output = StreamOutputRef::default();
2226 self.repartition(Replication::Host, NextStrategy::all())
2227 .add_operator(|prev| Collect::new(prev, output.clone()))
2228 .finalize_block();
2229 StreamOutput::from(output)
2230 }
2231
2232 /// Collect the output of the stream to a [StreamCache] that can later be resumed to
2233 /// create a [Stream] with its content. Returns the cache and consumes the stream.
2234 ///
2235 /// To resume the cache, create a new [StreamContext](crate::StreamContext) with the **same**
2236 /// [RuntimeConfig](crate::RuntimeConfig) and call the [StreamCache::stream_in] method.
2237 ///
2238 /// **Warning**: [StreamCache] methods must only be called **after** the original `StreamContext`
2239 /// has finished executing. Calling `stream_in` or `inner_cloned` on an incomplete cache will panic!
2240 ///
2241 /// ## Example
2242 ///
2243 /// ```
2244 /// # use renoir::prelude::*;
2245 /// use renoir::operator::cache::VecCacher;
2246 /// let mut ctx = StreamContext::new_local(); ///
2247 /// // Create a cached stream by applying filters and caching the results
2248 /// let cache = ctx.stream_iter(0..10).filter(|x| x % 3 == 0).collect_cache::<VecCacher<_>>(());
2249 ///
2250 /// // Execute the stream interactively to capture and store the results in the cache
2251 /// ctx.execute_blocking();
2252 ///
2253 /// // Further process the cached stream, applying additional filters and collecting the results
2254 /// let ctx = StreamContext::new(cache.config());
2255 /// let res = cache.stream_in(&ctx).filter(|x| x % 2 == 0).collect_vec();
2256 ///
2257 /// // Execute the environment to finalize the processing
2258 /// ctx.execute_blocking();
2259 ///
2260 /// // Assert the final result matches the expected values
2261 /// let expected = (0..10).filter(|x| x % 3 == 0 && x % 2 == 0).collect::<Vec<_>>();
2262 /// assert_eq!(expected, res.get().unwrap());
2263 /// ```
2264 pub fn collect_cache<C: Cacher<I> + 'static>(self, config: C::Config) -> CachedStream<I, C> {
2265 let rt_config = self.ctx.lock().config.clone();
2266 let replication = self.block.scheduling.replication;
2267 let output = CacheRegistry::<I, C::Handle>::new();
2268 self.add_operator(|prev| {
2269 CacheSink::<I, C, _>::new(prev, rt_config.clone(), config, output.clone())
2270 })
2271 .finalize_block();
2272 CachedStream::new(rt_config, replication, output)
2273 }
2274
2275 /// Collect the output of the stream to a [StreamCache] that can later be resumed to
2276 /// create a [Stream] with its content. Returns the cache and consumes the stream.
2277 ///
2278 /// **See [Stream::collect_cache]**
2279 pub fn collect_cache_vec(self) -> CachedStream<I, VecCacher<I>> {
2280 self.collect_cache(())
2281 }
2282
2283 /// Collect the output of the stream to a [StreamCache] that can later be resumed to
2284 /// create a [Stream] with its content. Returns the cache and a copy of the current stream.
2285 ///
2286 /// To resume the cache, create a new [StreamContext](crate::StreamContext) with the **same**
2287 /// [RuntimeConfig](crate::RuntimeConfig) and call the [StreamCache::stream_in] method.
2288 ///
2289 /// **Warning**: [StreamCache] methods must only be called **after** the original `StreamContext`
2290 /// has finished executing. Calling `stream_in` or `inner_cloned` on an incomplete cache will panic!
2291 /// ## Example
2292 /// ```
2293 /// # use renoir::prelude::*;
2294 /// use renoir::operator::cache::VecCacher;
2295 /// let ctx = StreamContext::new_local();
2296 ///
2297 /// // Create a cached stream by applying filters and caching the results
2298 /// let (cache, stream) = ctx.stream_iter(0..10).filter(|x| x % 3 == 0).cache::<VecCacher<_>>(());
2299 ///
2300 /// // Further process the cached stream, applying additional filters and collecting the results
2301 /// let result = stream.filter(|x| x % 2 == 0).collect_vec();
2302 ///
2303 /// // Execute the environment to finalize the processing
2304 /// ctx.execute_blocking();
2305 ///
2306 /// // Assert the final result matches the expected values
2307 /// assert_eq!(result.get().unwrap(), (0..10).filter(|x| x % 3 == 0 && x % 2 == 0).collect::<Vec<_>>());
2308 ///
2309 /// // Further process the cached stream, applying additional filters and collecting the results
2310 /// let ctx = StreamContext::new(cache.config());
2311 /// let res = cache.stream_in(&ctx).filter(|x| x % 2 == 0).collect_vec();
2312 ///
2313 /// // Execute the environment to finalize the processing
2314 /// ctx.execute_blocking();
2315 ///
2316 /// // Assert the final result matches the expected values
2317 /// let expected = (0..10).filter(|x| x % 3 == 0 && x % 2 == 0).collect::<Vec<_>>();
2318 /// assert_eq!(expected, res.get().unwrap());
2319 /// ```
2320 pub fn cache<C: Cacher<I> + 'static>(
2321 self,
2322 config: C::Config,
2323 ) -> (CachedStream<I, C>, Stream<impl Operator<Out = Op::Out>>) {
2324 let mut splits = self.split(2);
2325 (
2326 splits.pop().unwrap().collect_cache(config),
2327 splits.pop().unwrap(),
2328 )
2329 }
2330
2331 /// Collect the output of the stream to a [StreamCache] that can later be resumed to
2332 /// create a [Stream] with its content. Returns the cache and a copy of the current stream.
2333 ///
2334 /// **See [Stream::cache]**
2335 pub fn cache_vec(
2336 self,
2337 ) -> (
2338 CachedStream<I, VecCacher<I>>,
2339 Stream<impl Operator<Out = Op::Out>>,
2340 ) {
2341 self.cache(())
2342 }
2343}
2344
2345impl<Op> Stream<Op>
2346where
2347 Op: Operator + 'static,
2348 Op::Out: Clone + Hash + Eq + Sync,
2349{
2350 /// Map the elements of the stream into new elements by evaluating a future for each one.
2351 /// Use memoization to cache outputs for previously seen inputs.
2352 ///
2353 /// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
2354 /// The maximum number of elements to be cached is passed as the `capacity` parameter.
2355 ///
2356 /// ## Example
2357 ///
2358 /// ```
2359 /// # use renoir::{StreamContext, RuntimeConfig};
2360 /// # use renoir::operator::source::IteratorSource;
2361 /// # tokio::runtime::Runtime::new()
2362 /// # .unwrap()
2363 /// # .block_on(base());
2364 /// # async fn base() {
2365 /// # let mut env = StreamContext::new_local();
2366 /// let s = env.stream_iter((0..4).cycle().take(10));
2367 /// let res = s.map_async_memo(|n| async move {n * n}, 100).collect_vec();
2368 /// env.execute().await;
2369 /// assert_eq!(res.get().unwrap(), vec![0, 1, 4, 9, 0, 1, 4, 9, 0, 1]);
2370 /// # }
2371 /// ```
2372 #[cfg(feature = "tokio")]
2373 pub fn map_async_memo<O: Clone + Send + Sync + 'static, F, Fut>(
2374 self,
2375 f: F,
2376 capacity: usize,
2377 ) -> Stream<impl Operator<Out = O>>
2378 where
2379 F: Fn(Op::Out) -> Fut + Send + Sync + Clone + 'static,
2380 Fut: Future<Output = O> + Send,
2381 {
2382 self.map_async_memo_by(f, |x: &Op::Out| x.clone(), capacity)
2383 }
2384}
2385
2386impl<Op> Stream<Op>
2387where
2388 Op::Out: Clone + Hash + Eq + Sync,
2389 Op: Operator + 'static,
2390{
2391 /// Map the elements of the stream into new elements. Use memoization
2392 /// to cache outputs for previously seen inputs.
2393 ///
2394 /// The cache is implemented through a *per-process* [`quick_cache::sync::Cache`].
2395 /// The maximum number of elements to be cached is passed as the `capacity` parameter.
2396 ///
2397 /// ## Example
2398 ///
2399 /// ```
2400 /// # use renoir::{StreamContext, RuntimeConfig};
2401 /// # use renoir::operator::source::IteratorSource;
2402 /// # let mut env = StreamContext::new_local();
2403 /// let s = env.stream_iter((0..4).cycle().take(10));
2404 /// let res = s.map_memo(|n| n * n, 5).collect_vec();
2405 ///
2406 /// env.execute_blocking();
2407 ///
2408 /// assert_eq!(res.get().unwrap(), vec![0, 1, 4, 9, 0, 1, 4, 9, 0, 1]);
2409 /// ```
2410 pub fn map_memo<O: Data + Sync, F>(
2411 self,
2412 f: F,
2413 capacity: usize,
2414 ) -> Stream<impl Operator<Out = O>>
2415 where
2416 F: Fn(Op::Out) -> O + Send + Clone + 'static,
2417 {
2418 self.add_operator(|prev| MapMemo::new(prev, f, |x| x.clone(), capacity))
2419 }
2420}
2421
2422impl<Op, K, I> KeyedStream<Op>
2423where
2424 K: DataKey,
2425 I: Send + 'static,
2426 Op: Operator<Out = (K, I)> + 'static,
2427{
2428 /// Given a keyed stream without timestamps nor watermarks, tag each item with a timestamp and insert
2429 /// watermarks.
2430 ///
2431 /// The two functions given to this operator are the following:
2432 /// - `timestamp_gen` returns the timestamp assigned to the provided element of the stream
2433 /// - `watermark_gen` returns an optional watermark to add after the provided element
2434 ///
2435 /// Note that the two functions **must** follow the watermark semantics.
2436 /// TODO: link to watermark semantics
2437 ///
2438 /// ## Example
2439 ///
2440 /// In this example the stream contains the integers from 0 to 9 and group them by parity, each will be tagged with a
2441 /// timestamp with the value of the item as milliseconds, and after each even number a watermark
2442 /// will be inserted.
2443 ///
2444 /// ```
2445 /// # use renoir::{StreamContext, RuntimeConfig};
2446 /// # use renoir::operator::source::IteratorSource;
2447 /// use renoir::operator::Timestamp;
2448 /// # let mut env = StreamContext::new_local();
2449 ///
2450 /// let s = env.stream_iter(0..10);
2451 /// s
2452 /// .group_by(|i| i % 2)
2453 /// .add_timestamps(
2454 /// |&(_k, n)| n,
2455 /// |&(_k, n), &ts| if n % 2 == 0 { Some(ts) } else { None }
2456 /// );
2457 /// ```
2458 #[cfg(feature = "timestamp")]
2459 pub fn add_timestamps<F, G>(
2460 self,
2461 timestamp_gen: F,
2462 watermark_gen: G,
2463 ) -> KeyedStream<impl Operator<Out = Op::Out>>
2464 where
2465 F: FnMut(&Op::Out) -> Timestamp + Clone + Send + 'static,
2466 G: FnMut(&Op::Out, &Timestamp) -> Option<Timestamp> + Clone + Send + 'static,
2467 {
2468 self.add_operator(|prev| AddTimestamp::new(prev, timestamp_gen, watermark_gen))
2469 }
2470
2471 #[cfg(feature = "timestamp")]
2472 pub fn drop_timestamps(self) -> KeyedStream<impl Operator<Out = Op::Out>> {
2473 self.add_operator(|prev| DropTimestamp::new(prev))
2474 }
2475
2476 /// Change the batch mode for this stream.
2477 ///
2478 /// This change will be propagated to all the operators following, even of the next blocks,
2479 /// until it's changed again.
2480 ///
2481 /// ## Example
2482 ///
2483 /// ```
2484 /// # use renoir::{StreamContext, RuntimeConfig};
2485 /// # use renoir::operator::source::IteratorSource;
2486 /// use renoir::BatchMode;
2487 /// # let mut env = StreamContext::new_local();
2488 ///
2489 /// let s = env.stream_iter(0..10).group_by(|&n| n % 2);
2490 /// s.batch_mode(BatchMode::fixed(1024));
2491 /// ```
2492 pub fn batch_mode(mut self, batch_mode: BatchMode) -> Self {
2493 self.0.block.batch_mode = batch_mode;
2494 self
2495 }
2496
2497 /// Remove from the stream all the elements for which the provided function returns `None` and
2498 /// keep the elements that returned `Some(_)`.
2499 ///
2500 /// **Note**: this is very similar to [`Iteartor::filter_map`](std::iter::Iterator::filter_map)
2501 ///
2502 /// ## Example
2503 ///
2504 /// ```
2505 /// # use renoir::{StreamContext, RuntimeConfig};
2506 /// # use renoir::operator::source::IteratorSource;
2507 /// # let mut env = StreamContext::new_local();
2508 /// let s = env.stream_iter(0..10).group_by(|&n| n % 2);
2509 /// let res = s.filter_map(|(_key, n)| if n % 3 == 0 { Some(n * 4) } else { None }).collect_vec();
2510 ///
2511 /// env.execute_blocking();
2512 ///
2513 /// let mut res = res.get().unwrap();
2514 /// res.sort_unstable();
2515 /// assert_eq!(res, vec![(0, 0), (0, 24), (1, 12), (1, 36)]);
2516 /// ```
2517 pub fn filter_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2518 where
2519 F: Fn((&K, I)) -> Option<O> + Send + Clone + 'static,
2520 O: Send + 'static,
2521 {
2522 self.map(f)
2523 .filter(|(_, x)| x.is_some())
2524 .map(|(_, x)| x.unwrap())
2525 }
2526
2527 /// Remove from the stream all the elements for which the provided predicate returns `false`.
2528 ///
2529 /// **Note**: this is very similar to [`Iteartor::filter`](std::iter::Iterator::filter)
2530 ///
2531 /// ## Example
2532 ///
2533 /// ```
2534 /// # use renoir::{StreamContext, RuntimeConfig};
2535 /// # use renoir::operator::source::IteratorSource;
2536 /// # let mut env = StreamContext::new_local();
2537 /// let s = env.stream_iter(0..10).group_by(|&n| n % 2);
2538 /// let res = s.filter(|&(_key, n)| n % 3 == 0).collect_vec();
2539 ///
2540 /// env.execute_blocking();
2541 ///
2542 /// let mut res = res.get().unwrap();
2543 /// res.sort_unstable();
2544 /// assert_eq!(res, vec![(0, 0), (0, 6), (1, 3), (1, 9)]);
2545 /// ```
2546 pub fn filter<F>(self, predicate: F) -> KeyedStream<impl Operator<Out = (K, I)>>
2547 where
2548 F: Fn(&(K, I)) -> bool + Clone + Send + 'static,
2549 {
2550 self.add_operator(|prev| Filter::new(prev, predicate))
2551 }
2552
2553 /// Apply a mapping operation to each element of the stream, the resulting stream will be the
2554 /// flatMaped values of the result of the mapping.
2555 ///
2556 /// **Note**: this is very similar to [`Iteartor::flat_map`](std::iter::Iterator::flat_map).
2557 ///
2558 /// ## Example
2559 ///
2560 /// ```
2561 /// # use renoir::{StreamContext, RuntimeConfig};
2562 /// # use renoir::operator::source::IteratorSource;
2563 /// # let mut env = StreamContext::new_local();
2564 /// let s = env.stream_iter(0..3).group_by(|&n| n % 2);
2565 /// let res = s.flat_map(|(_key, n)| vec![n, n]).collect_vec();
2566 ///
2567 /// env.execute_blocking();
2568 ///
2569 /// let mut res = res.get().unwrap();
2570 /// res.sort_unstable();
2571 /// assert_eq!(res, vec![(0, 0), (0, 0), (0, 2), (0, 2), (1, 1), (1, 1)]);
2572 /// ```
2573 pub fn flat_map<O, It, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2574 where
2575 It: IntoIterator<Item = O>,
2576 <It as IntoIterator>::IntoIter: Send + 'static,
2577 F: Fn(Op::Out) -> It + Send + Clone + 'static,
2578 O: Data,
2579 It: 'static,
2580 {
2581 self.add_operator(|prev| KeyedFlatMap::new(prev, f))
2582 }
2583
2584 /// Apply the given function to all the elements of the stream, consuming the stream.
2585 ///
2586 /// ## Example
2587 ///
2588 /// ```
2589 /// # use renoir::{StreamContext, RuntimeConfig};
2590 /// # use renoir::operator::source::IteratorSource;
2591 /// # let mut env = StreamContext::new_local();
2592 /// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
2593 /// s.inspect(|(key, n)| println!("Item: {} has key {}", n, key)).for_each(std::mem::drop);
2594 ///
2595 /// env.execute_blocking();
2596 /// ```
2597 pub fn inspect<F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, I)>>
2598 where
2599 F: FnMut(&(K, I)) + Send + Clone + 'static,
2600 {
2601 self.add_operator(|prev| Inspect::new(prev, f))
2602 }
2603
2604 /// Perform the folding operation separately for each key.
2605 ///
2606 /// Note that there is a difference between `stream.group_by(keyer).fold(...)` and
2607 /// `stream.group_by_fold(keyer, ...)`. The first performs the network shuffle of every item in
2608 /// the stream, and **later** performs the folding (i.e. nearly all the elements will be sent to
2609 /// the network). The latter avoids sending the items by performing first a local reduction on
2610 /// each host, and then send only the locally folded results (i.e. one message per replica, per
2611 /// key); then the global step is performed aggregating the results.
2612 ///
2613 /// The resulting stream will still be keyed and will contain only a single message per key (the
2614 /// final result).
2615 ///
2616 /// Note that the output type may be different from the input type. Consider using
2617 /// [`KeyedStream::reduce`] if the output type is the same as the input type.
2618 ///
2619 /// **Note**: this operator will retain all the messages of the stream and emit the values only
2620 /// when the stream ends. Therefore this is not properly _streaming_.
2621 ///
2622 /// **Note**: this operator will split the current block.
2623 ///
2624 /// ## Example
2625 ///
2626 /// ```
2627 /// # use renoir::{StreamContext, RuntimeConfig};
2628 /// # use renoir::operator::source::IteratorSource;
2629 /// # let mut env = StreamContext::new_local();
2630 /// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
2631 /// let res = s
2632 /// .fold(0, |acc, value| *acc += value)
2633 /// .collect_vec();
2634 ///
2635 /// env.execute_blocking();
2636 ///
2637 /// let mut res = res.get().unwrap();
2638 /// res.sort_unstable();
2639 /// assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
2640 /// ```
2641 pub fn fold<O, F>(self, init: O, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2642 where
2643 F: Fn(&mut O, <Op::Out as KeyedItem>::Value) + Send + Clone + 'static,
2644 O: Send + Clone,
2645 {
2646 self.add_operator(|prev| KeyedFold::new(prev, init, f))
2647 }
2648
2649 /// Perform the reduction operation separately for each key.
2650 ///
2651 /// Note that there is a difference between `stream.group_by(keyer).reduce(...)` and
2652 /// `stream.group_by_reduce(keyer, ...)`. The first performs the network shuffle of every item in
2653 /// the stream, and **later** performs the reduction (i.e. nearly all the elements will be sent to
2654 /// the network). The latter avoids sending the items by performing first a local reduction on
2655 /// each host, and then send only the locally reduced results (i.e. one message per replica, per
2656 /// key); then the global step is performed aggregating the results.
2657 ///
2658 /// The resulting stream will still be keyed and will contain only a single message per key (the
2659 /// final result).
2660 ///
2661 /// Note that the output type must be the same as the input type, if you need a different type
2662 /// consider using [`KeyedStream::fold`].
2663 ///
2664 /// **Note**: this operator will retain all the messages of the stream and emit the values only
2665 /// when the stream ends. Therefore this is not properly _streaming_.
2666 ///
2667 /// **Note**: this operator will split the current block.
2668 ///
2669 /// ## Example
2670 ///
2671 /// ```
2672 /// # use renoir::{StreamContext, RuntimeConfig};
2673 /// # use renoir::operator::source::IteratorSource;
2674 /// # let mut env = StreamContext::new_local();
2675 /// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
2676 /// let res = s
2677 /// .reduce(|acc, value| *acc += value)
2678 /// .collect_vec();
2679 ///
2680 /// env.execute_blocking();
2681 ///
2682 /// let mut res = res.get().unwrap();
2683 /// res.sort_unstable();
2684 /// assert_eq!(res, vec![(0, 0 + 2 + 4), (1, 1 + 3)]);
2685 /// ```
2686 pub fn reduce<F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, I)>>
2687 where
2688 I: Clone + 'static,
2689 F: Fn(&mut I, I) + Send + Clone + 'static,
2690 {
2691 self.fold(None, move |acc, value| match acc {
2692 None => *acc = Some(value),
2693 Some(acc) => f(acc, value),
2694 })
2695 .map(|(_, value)| value.unwrap())
2696 }
2697
2698 /// Map the elements of the stream into new elements.
2699 ///
2700 /// **Note**: this is very similar to [`Iteartor::map`](std::iter::Iterator::map).
2701 ///
2702 /// ## Example
2703 ///
2704 /// ```
2705 /// # use renoir::{StreamContext, RuntimeConfig};
2706 /// # use renoir::operator::source::IteratorSource;
2707 /// # let mut env = StreamContext::new_local();
2708 /// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
2709 /// let res = s.map(|(_key, n)| 10 * n).collect_vec();
2710 ///
2711 /// env.execute_blocking();
2712 ///
2713 /// let mut res = res.get().unwrap();
2714 /// res.sort_unstable();
2715 /// assert_eq!(res, vec![(0, 0), (0, 20), (0, 40), (1, 10), (1, 30)]);
2716 /// ```
2717 pub fn map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2718 where
2719 F: Fn((&K, I)) -> O + Send + Clone + 'static,
2720 O: Send,
2721 {
2722 self.add_operator(|prev| {
2723 Map::new(prev, move |(k, v)| {
2724 let mapped_value = f((&k, v));
2725 (k, mapped_value)
2726 })
2727 })
2728 }
2729
2730 /// # TODO
2731 /// Reorder timestamped items
2732 pub fn reorder(self) -> KeyedStream<impl Operator<Out = (K, I)>> {
2733 self.add_operator(|prev| Reorder::new(prev))
2734 }
2735
2736 /// Map the elements of the stream into new elements. The mapping function can be stateful.
2737 ///
2738 /// This is exactly like [`Stream::rich_map`], but the function is cloned for each key. This
2739 /// means that each key will have a unique mapping function (and therefore a unique state).
2740 pub fn rich_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2741 where
2742 F: FnMut((&K, I)) -> O + Clone + Send + 'static,
2743 O: Data,
2744 {
2745 self.add_operator(|prev| RichMap::new(prev, f))
2746 }
2747
2748 /// Apply a mapping operation to each element of the stream, the resulting stream will be the
2749 /// flattened values of the result of the mapping. The mapping function can be stateful.
2750 ///
2751 /// This is exactly like [`Stream::rich_flat_map`], but the function is cloned for each key.
2752 /// This means that each key will have a unique mapping function (and therefore a unique state).
2753 pub fn rich_flat_map<O, It, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2754 where
2755 It: IntoIterator<Item = O>,
2756 <It as IntoIterator>::IntoIter: Clone + Send + 'static,
2757 F: FnMut((&K, I)) -> It + Clone + Send + 'static,
2758 O: Data,
2759 It: Data,
2760 {
2761 self.rich_map(f).flatten()
2762 }
2763
2764 /// Remove from the stream all the elements for which the provided function returns `None` and
2765 /// keep the elements that returned `Some(_)`. The mapping function can be stateful.
2766 ///
2767 /// This is exactly like [`Stream::rich_filter_map`], but the function is cloned for each key.
2768 /// This means that each key will have a unique mapping function (and therefore a unique state).
2769 pub fn rich_filter_map<O, F>(self, f: F) -> KeyedStream<impl Operator<Out = (K, O)>>
2770 where
2771 F: FnMut((&K, I)) -> Option<O> + Send + Clone + 'static,
2772 O: Data,
2773 {
2774 self.rich_map(f)
2775 .filter(|(_, x)| x.is_some())
2776 .map(|(_, x)| x.unwrap())
2777 }
2778
2779 /// Map the elements of the stream into new elements. The mapping function can be stateful.
2780 ///
2781 /// This is exactly like [`Stream::rich_map`], but the function is cloned for each key. This
2782 /// means that each key will have a unique mapping function (and therefore a unique state).
2783 pub fn rich_map_custom<O, F>(self, f: F) -> Stream<impl Operator<Out = O>>
2784 where
2785 F: FnMut(ElementGenerator<Op>) -> StreamElement<O> + Clone + Send + 'static,
2786 O: Data,
2787 {
2788 self.0.add_operator(|prev| RichMapCustom::new(prev, f))
2789 }
2790
2791 /// Make this [`KeyedStream`] a normal [`Stream`] of key-value pairs.
2792 ///
2793 /// ## Example
2794 ///
2795 /// ```
2796 /// # use renoir::{StreamContext, RuntimeConfig};
2797 /// # use renoir::operator::source::IteratorSource;
2798 /// # let mut env = StreamContext::new_local();
2799 /// let stream = env.stream_iter(0..4).group_by(|&n| n % 2);
2800 /// let res = stream.unkey().collect_vec();
2801 ///
2802 /// env.execute_blocking();
2803 ///
2804 /// let mut res = res.get().unwrap();
2805 /// res.sort_unstable(); // the output order is nondeterministic
2806 /// assert_eq!(res, vec![(0, 0), (0, 2), (1, 1), (1, 3)]);
2807 /// ```
2808 pub fn unkey(self) -> Stream<impl Operator<Out = (K, I)>> {
2809 self.0
2810 }
2811
2812 /// Forget about the key of this [`KeyedStream`] and return a [`Stream`] containing just the
2813 /// values.
2814 ///
2815 /// ## Example
2816 ///
2817 /// ```
2818 /// # use renoir::{StreamContext, RuntimeConfig};
2819 /// # use renoir::operator::source::IteratorSource;
2820 /// # let mut env = StreamContext::new_local();
2821 /// let stream = env.stream_iter(0..4).group_by(|&n| n % 2);
2822 /// let res = stream.drop_key().collect_vec();
2823 ///
2824 /// env.execute_blocking();
2825 ///
2826 /// let mut res = res.get().unwrap();
2827 /// res.sort_unstable(); // the output order is nondeterministic
2828 /// assert_eq!(res, (0..4).collect::<Vec<_>>());
2829 /// ```
2830 pub fn drop_key(self) -> Stream<impl Operator<Out = I>> {
2831 self.0.map(|(_k, v)| v)
2832 }
2833
2834 /// Apply the given function to all the elements of the stream, consuming the stream.
2835 ///
2836 /// ## Example
2837 ///
2838 /// ```
2839 /// # use renoir::{StreamContext, RuntimeConfig};
2840 /// # use renoir::operator::source::IteratorSource;
2841 /// # let mut env = StreamContext::new_local();
2842 /// let s = env.stream_iter(0..5).group_by(|&n| n % 2);
2843 /// s.for_each(|(key, n)| println!("Item: {} has key {}", n, key));
2844 ///
2845 /// env.execute_blocking();
2846 /// ```
2847 pub fn for_each<F>(self, f: F)
2848 where
2849 F: FnMut((K, I)) + Send + Clone + 'static,
2850 {
2851 self.0
2852 .add_operator(|prev| ForEach::new(prev, f))
2853 .finalize_block();
2854 }
2855}
2856
2857impl<K, I, Op> KeyedStream<Op>
2858where
2859 Op: Operator<Out = (K, I)> + 'static,
2860 K: ExchangeDataKey,
2861 I: ExchangeData,
2862{
2863 /// Given two streams **with timestamps** join them according to an interval centered around the
2864 /// timestamp of the left side.
2865 ///
2866 /// This means that an element on the left side with timestamp T will be joined to all the
2867 /// elements on the right with timestamp Q such that `T - lower_bound <= Q <= T + upper_bound`.
2868 /// Only items with the same key can be joined together.
2869 ///
2870 /// **Note**: this operator will split the current block.
2871 ///
2872 /// ## Example
2873 /// TODO: example
2874 #[cfg(feature = "timestamp")]
2875 pub fn interval_join<I2, Op2>(
2876 self,
2877 right: KeyedStream<Op2>,
2878 lower_bound: Timestamp,
2879 upper_bound: Timestamp,
2880 ) -> KeyedStream<impl Operator<Out = (K, (I, I2))>>
2881 where
2882 I2: ExchangeData,
2883 Op2: Operator<Out = (K, I2)> + 'static,
2884 {
2885 self.merge_distinct(right)
2886 .add_operator(Reorder::new)
2887 .add_operator(|prev| IntervalJoin::new(prev, lower_bound, upper_bound))
2888 }
2889
2890 /// Merge the items of this stream with the items of another stream with the same type.
2891 ///
2892 /// **Note**: the order of the resulting items is not specified.
2893 ///
2894 /// **Note**: this operator will split the current block.
2895 ///
2896 /// ## Example
2897 ///
2898 /// ```
2899 /// # use renoir::{StreamContext, RuntimeConfig};
2900 /// # use renoir::operator::source::IteratorSource;
2901 /// # let mut env = StreamContext::new_local();
2902 /// let s1 = env.stream_iter(0..3).group_by(|&n| n % 2);
2903 /// let s2 = env.stream_iter(3..5).group_by(|&n| n % 2);
2904 /// let res = s1.merge(s2).collect_vec();
2905 ///
2906 /// env.execute_blocking();
2907 ///
2908 /// let mut res = res.get().unwrap();
2909 /// res.sort_unstable(); // the output order is nondeterministic
2910 /// assert_eq!(res, vec![(0, 0), (0, 2), (0, 4), (1, 1), (1, 3)]);
2911 /// ```
2912 pub fn merge<Op2>(self, oth: KeyedStream<Op2>) -> KeyedStream<impl Operator<Out = (K, I)>>
2913 where
2914 Op2: Operator<Out = (K, I)> + 'static,
2915 {
2916 KeyedStream(self.0.merge(oth.0))
2917 }
2918
2919 pub(crate) fn merge_distinct<I2, Op2>(
2920 self,
2921 right: KeyedStream<Op2>,
2922 ) -> KeyedStream<impl Operator<Out = (K, MergeElement<I, I2>)>>
2923 where
2924 I2: ExchangeData,
2925 Op2: Operator<Out = (K, I2)> + 'static,
2926 {
2927 // map the left and right streams to the same type
2928 let left = self.map(|(_, x)| MergeElement::Left(x));
2929 let right = right.map(|(_, x)| MergeElement::Right(x));
2930
2931 left.merge(right)
2932 }
2933
2934 /// Perform a network shuffle sending the messages to a random replica.
2935 ///
2936 /// This operator returns a `Stream` instead of a `KeyedStream` as after
2937 /// shuffling the messages between replicas, the keyed semantics are lost.
2938 ///
2939 /// **Note**: this operator will split the current block.
2940 ///
2941 /// ## Example
2942 ///
2943 /// ```
2944 /// # use renoir::{StreamContext, RuntimeConfig};
2945 /// # use renoir::operator::source::IteratorSource;
2946 /// # let mut env = StreamContext::new_local();
2947 /// let s = env.stream_iter(0..5);
2948 /// let res = s.shuffle();
2949 /// ```
2950 pub fn shuffle(self) -> Stream<impl Operator<Out = Op::Out>> {
2951 self.0.split_block(End::new, NextStrategy::random())
2952 }
2953
2954 pub fn fold_scan<O, S, L, F>(
2955 self,
2956 init: S,
2957 fold: L,
2958 map: F,
2959 ) -> KeyedStream<impl Operator<Out = (K, O)>>
2960 where
2961 Op::Out: ExchangeData,
2962 I: Send,
2963 K: ExchangeDataKey + Sync,
2964 L: Fn(&K, &mut S, I) + Send + Clone + 'static,
2965 F: Fn(&K, &S, I) -> O + Send + Clone + 'static,
2966 S: ExchangeData + Sync,
2967 O: ExchangeData,
2968 {
2969 #[derive(Serialize, Deserialize, Clone)]
2970 enum TwoPass<I, O> {
2971 First(I),
2972 Second(I),
2973 Output(O),
2974 }
2975
2976 let (state, s) = self.map(|el| TwoPass::First(el.1)).unkey().iterate(
2977 2,
2978 HashMap::<K, S>::default(),
2979 |s, state| {
2980 s.to_keyed()
2981 .map(move |(k, el)| match el {
2982 TwoPass::First(el) => TwoPass::Second(el),
2983 TwoPass::Second(el) => {
2984 TwoPass::Output((map)(k, state.get().get(k).unwrap(), el))
2985 }
2986 TwoPass::Output(_) => unreachable!(),
2987 })
2988 .unkey()
2989 },
2990 move |local: &mut HashMap<K, S>, (k, el)| match el {
2991 TwoPass::First(_) => {}
2992 TwoPass::Second(el) => fold(
2993 &k,
2994 local.entry(k.clone()).or_insert_with(|| init.clone()),
2995 el,
2996 ),
2997 TwoPass::Output(_) => {}
2998 },
2999 move |global: &mut HashMap<K, S>, mut local| {
3000 global.extend(local.drain());
3001 },
3002 |_| true,
3003 );
3004
3005 state.for_each(std::mem::drop);
3006 s.to_keyed().map(|(_, t)| match t {
3007 TwoPass::First(_) | TwoPass::Second(_) => unreachable!(),
3008 TwoPass::Output(o) => o,
3009 })
3010 }
3011
3012 pub fn reduce_scan<O, S, F1, F2, R>(
3013 self,
3014 first_map: F1,
3015 reduce: R,
3016 second_map: F2,
3017 ) -> KeyedStream<impl Operator<Out = (K, O)>>
3018 where
3019 Op::Out: ExchangeData,
3020 F1: Fn(&K, I) -> S + Send + Clone + 'static,
3021 F2: Fn(&K, &S, I) -> O + Send + Clone + 'static,
3022 R: Fn(&K, S, S) -> S + Send + Clone + 'static,
3023 K: Sync,
3024 S: ExchangeData + Sync,
3025 O: ExchangeData,
3026 {
3027 self.fold_scan(
3028 None,
3029 move |k, acc: &mut Option<S>, x| {
3030 let map = (first_map)(k, x);
3031 *acc = Some(match acc.take() {
3032 Some(v) => (reduce)(k, v, map),
3033 None => map,
3034 });
3035 },
3036 move |k, state, x| (second_map)(k, state.as_ref().unwrap(), x),
3037 )
3038 }
3039
3040 /// Close the stream and send resulting items to a channel on a single host.
3041 ///
3042 /// If the stream is distributed among multiple replicas, parallelism will
3043 /// be set to 1 to gather all results
3044 ///
3045 /// **Note**: the order of items and keys is unspecified.
3046 ///
3047 /// **Note**: this operator will split the current block.
3048 ///
3049 /// ## Example
3050 ///
3051 /// ```
3052 /// # use renoir::{StreamContext, RuntimeConfig};
3053 /// # use renoir::operator::source::IteratorSource;
3054 /// # let mut env = StreamContext::new_local();
3055 /// let s = env.stream_iter(0..10u32);
3056 /// let rx = s.collect_channel();
3057 ///
3058 /// env.execute_blocking();
3059 /// let mut v = Vec::new();
3060 /// while let Ok(x) = rx.recv() {
3061 /// v.push(x)
3062 /// }
3063 /// assert_eq!(v, (0..10u32).collect::<Vec<_>>());
3064 /// ```
3065 pub fn collect_channel(self) -> Receiver<(K, I)> {
3066 self.unkey().collect_channel()
3067 }
3068 /// Close the stream and send resulting items to a channel on each single host.
3069 ///
3070 /// Each host sends its outputs to the channel without repartitioning.
3071 /// Elements will be sent to the channel on the same host that produced
3072 /// the output.
3073 ///
3074 /// **Note**: the order of items and keys is unspecified.
3075 ///
3076 /// ## Example
3077 ///
3078 /// ```
3079 /// # use renoir::{StreamContext, RuntimeConfig};
3080 /// # use renoir::operator::source::IteratorSource;
3081 /// # let mut env = StreamContext::new_local();
3082 /// let s = env.stream_iter(0..10u32);
3083 /// let rx = s.collect_channel();
3084 ///
3085 /// env.execute_blocking();
3086 /// let mut v = Vec::new();
3087 /// while let Ok(x) = rx.recv() {
3088 /// v.push(x)
3089 /// }
3090 /// assert_eq!(v, (0..10u32).collect::<Vec<_>>());
3091 /// ```
3092 pub fn collect_channel_parallel(self) -> Receiver<(K, I)> {
3093 self.unkey().collect_channel_parallel()
3094 }
3095
3096 /// Close the stream and store all the resulting items into a [`Vec`] on a single host.
3097 ///
3098 /// If the stream is distributed among multiple replicas, a bottleneck is placed where all the
3099 /// replicas sends the items to.
3100 ///
3101 /// **Note**: the collected items are the pairs `(key, value)`.
3102 ///
3103 /// **Note**: the order of items and keys is unspecified.
3104 ///
3105 /// **Note**: this operator will split the current block.
3106 ///
3107 /// ## Example
3108 ///
3109 /// ```
3110 /// # use renoir::{StreamContext, RuntimeConfig};
3111 /// # use renoir::operator::source::IteratorSource;
3112 /// # let mut env = StreamContext::new_local();
3113 /// let s = env.stream_iter(0..3).group_by(|&n| n % 2);
3114 /// let res = s.collect_vec();
3115 ///
3116 /// env.execute_blocking();
3117 ///
3118 /// let mut res = res.get().unwrap();
3119 /// res.sort_unstable(); // the output order is nondeterministic
3120 /// assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
3121 /// ```
3122 pub fn collect_vec(self) -> StreamOutput<Vec<(K, I)>> {
3123 self.unkey().collect_vec()
3124 }
3125
3126 /// Close the stream and store all the resulting items into replicated [`Vec`] on all hosts.
3127 ///
3128 /// If the stream is distributed among multiple replicas, a bottleneck is placed where all the
3129 /// replicas sends the items to.
3130 ///
3131 /// **Note**: the collected items are the pairs `(key, value)`.
3132 ///
3133 /// **Note**: the order of items and keys is unspecified.
3134 ///
3135 /// **Note**: this operator will split the current block.
3136 ///
3137 /// ## Example
3138 ///
3139 /// ```
3140 /// # use renoir::{StreamContext, RuntimeConfig};
3141 /// # use renoir::operator::source::IteratorSource;
3142 /// # let mut env = StreamContext::new_local();
3143 /// let s = env.stream_iter(0..3).group_by(|&n| n % 2);
3144 /// let res = s.collect_vec_all();
3145 ///
3146 /// env.execute_blocking();
3147 ///
3148 /// let mut res = res.get().unwrap();
3149 /// res.sort_unstable(); // the output order is nondeterministic
3150 /// assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
3151 /// ```
3152 pub fn collect_vec_all(self) -> StreamOutput<Vec<(K, I)>> {
3153 self.unkey().collect_vec_all()
3154 }
3155
3156 /// Close the stream and store all the resulting items into a collection on a single host.
3157 ///
3158 /// If the stream is distributed among multiple replicas, parallelism will
3159 /// be set to 1 to gather all results
3160 ///
3161 ///
3162 /// **Note**: the order of items and keys is unspecified.
3163 ///
3164 /// **Note**: this operator will split the current block.
3165 ///
3166 /// ## Example
3167 ///
3168 /// ```
3169 /// # use renoir::{StreamContext, RuntimeConfig};
3170 /// # use renoir::operator::source::IteratorSource;
3171 /// # let mut env = StreamContext::new_local();
3172 /// let s = env.stream_iter(0..3).group_by(|&n| n % 2);
3173 /// let res = s.collect_vec();
3174 ///
3175 /// env.execute_blocking();
3176 ///
3177 /// let mut res = res.get().unwrap();
3178 /// res.sort_unstable(); // the output order is nondeterministic
3179 /// assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
3180 /// ```
3181 pub fn collect<C: FromIterator<(K, I)> + Send + 'static>(self) -> StreamOutput<C> {
3182 self.unkey().collect()
3183 }
3184
3185 /// Close the stream and store all the resulting items into a collection on each single host.
3186 ///
3187 /// Partitioning will be set to Host and results will be replicated
3188 ///
3189 ///
3190 /// **Note**: the order of items and keys is unspecified.
3191 ///
3192 /// **Note**: this operator will split the current block.
3193 ///
3194 /// ## Example
3195 ///
3196 /// ```
3197 /// # use renoir::{StreamContext, RuntimeConfig};
3198 /// # use renoir::operator::source::IteratorSource;
3199 /// # let mut env = StreamContext::new_local();
3200 /// let s = env.stream_iter(0..3).group_by(|&n| n % 2);
3201 /// let res = s.collect_vec();
3202 ///
3203 /// env.execute_blocking();
3204 ///
3205 /// let mut res = res.get().unwrap();
3206 /// res.sort_unstable(); // the output order is nondeterministic
3207 /// assert_eq!(res, vec![(0, 0), (0, 2), (1, 1)]);
3208 /// ```
3209 pub fn collect_all<C: FromIterator<(K, I)> + Send + 'static>(self) -> StreamOutput<C> {
3210 self.unkey().collect_all()
3211 }
3212}
3213
3214impl<K, I, O, It, Op> KeyedStream<Op>
3215where
3216 K: DataKey,
3217 Op: Operator<Out = (K, I)> + 'static,
3218 It: Iterator<Item = O> + Clone + Send + 'static,
3219 I: Data + IntoIterator<IntoIter = It, Item = It::Item>,
3220 O: Data + Clone,
3221 K: DataKey,
3222{
3223 /// Transform this stream of containers into a stream of all the contained values.
3224 ///
3225 /// **Note**: this is very similar to [`Iteartor::flatten`](std::iter::Iterator::flatten)
3226 ///
3227 /// ## Example
3228 ///
3229 /// ```
3230 /// # use renoir::{StreamContext, RuntimeConfig};
3231 /// # use renoir::operator::source::IteratorSource;
3232 /// # let mut env = StreamContext::new_local();
3233 /// let s = env
3234 /// .stream_iter((vec![
3235 /// vec![0, 1, 2],
3236 /// vec![3, 4, 5],
3237 /// vec![6, 7]
3238 /// ].into_iter()))
3239 /// .group_by(|v| v[0] % 2);
3240 /// let res = s.flatten().collect_vec();
3241 ///
3242 /// env.execute_blocking();
3243 ///
3244 /// let mut res = res.get().unwrap();
3245 /// res.sort_unstable();
3246 /// assert_eq!(res, vec![(0, 0), (0, 1), (0, 2), (0, 6), (0, 7), (1, 3), (1, 4), (1, 5)]);
3247 /// ```
3248 pub fn flatten(self) -> KeyedStream<impl Operator<Out = (K, O)>> {
3249 self.add_operator(|prev| KeyedFlatten::new(prev))
3250 }
3251}