1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use crate::{
data::ArconType,
dataflow::{
builder::OperatorBuilder,
stream::{OperatorExt, Stream},
},
index::EmptyState,
stream::operator::function,
util::ArconFnBounds,
};
use std::sync::Arc;
/// Extension trait for map operations
pub trait MapExt<T: ArconType> {
/// Map each stream record to a possibly new type
///
/// # Example
/// ```rust
/// use arcon::prelude::*;
/// let stream: Stream<u64> = (0..100)
/// .to_stream(|conf| {
/// conf.set_arcon_time(ArconTime::Process);
/// })
/// .map(|x| x + 10);
/// ```
fn map<OUT: ArconType, F: Fn(T) -> OUT + ArconFnBounds>(self, f: F) -> Stream<OUT>;
/// Map each record in place keeping the same stream type
///
/// # Example
/// ```rust
/// use arcon::prelude::*;
/// let stream: Stream<u64> = (0..100)
/// .to_stream(|conf| {
/// conf.set_arcon_time(ArconTime::Process);
/// })
/// .map_in_place(|x| *x += 10);
/// ```
fn map_in_place<F: Fn(&mut T) + ArconFnBounds>(self, f: F) -> Stream<T>;
/// Akin to [Iterator::flat_map] but on a Stream
///
/// # Example
/// ```rust
/// use arcon::prelude::*;
/// let stream: Stream<u64> = (0..100)
/// .to_stream(|conf| {
/// conf.set_arcon_time(ArconTime::Process);
/// })
/// .flat_map(|x| (0..x));
/// ```
fn flat_map<I, F>(self, f: F) -> Stream<I::Item>
where
I: IntoIterator + 'static,
I::Item: ArconType,
F: Fn(T) -> I + ArconFnBounds;
}
impl<T: ArconType> MapExt<T> for Stream<T> {
#[must_use]
fn map<OUT: ArconType, F: Fn(T) -> OUT + ArconFnBounds>(self, f: F) -> Stream<OUT> {
self.operator(OperatorBuilder {
operator: Arc::new(move || function::Map::new(f.clone())),
state: Arc::new(|_| EmptyState),
conf: Default::default(),
})
}
#[must_use]
fn map_in_place<F: Fn(&mut T) + ArconFnBounds>(self, f: F) -> Stream<T> {
self.operator(OperatorBuilder {
operator: Arc::new(move || function::MapInPlace::new(f.clone())),
state: Arc::new(|_| EmptyState),
conf: Default::default(),
})
}
#[must_use]
fn flat_map<I, F>(self, f: F) -> Stream<I::Item>
where
I: IntoIterator + 'static,
I::Item: ArconType,
F: Fn(T) -> I + ArconFnBounds,
{
self.operator(OperatorBuilder {
operator: Arc::new(move || function::FlatMap::new(f.clone())),
state: Arc::new(|_| EmptyState),
conf: Default::default(),
})
}
}