timely/dataflow/operators/map.rs
1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::Data;
4use crate::dataflow::{Stream, Scope};
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::operator::Operator;
7
8/// Extension trait for `Stream`.
9pub trait Map<S: Scope, D: Data> {
10 /// Consumes each element of the stream and yields a new element.
11 ///
12 /// # Examples
13 /// ```
14 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
15 ///
16 /// timely::example(|scope| {
17 /// (0..10).to_stream(scope)
18 /// .map(|x| x + 1)
19 /// .inspect(|x| println!("seen: {:?}", x));
20 /// });
21 /// ```
22 fn map<D2: Data, L: FnMut(D)->D2+'static>(&self, logic: L) -> Stream<S, D2>;
23 /// Updates each element of the stream and yields the element, re-using memory where possible.
24 ///
25 /// # Examples
26 /// ```
27 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
28 ///
29 /// timely::example(|scope| {
30 /// (0..10).to_stream(scope)
31 /// .map_in_place(|x| *x += 1)
32 /// .inspect(|x| println!("seen: {:?}", x));
33 /// });
34 /// ```
35 fn map_in_place<L: FnMut(&mut D)+'static>(&self, logic: L) -> Stream<S, D>;
36 /// Consumes each element of the stream and yields some number of new elements.
37 ///
38 /// # Examples
39 /// ```
40 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
41 ///
42 /// timely::example(|scope| {
43 /// (0..10).to_stream(scope)
44 /// .flat_map(|x| (0..x))
45 /// .inspect(|x| println!("seen: {:?}", x));
46 /// });
47 /// ```
48 fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(&self, logic: L) -> Stream<S, I::Item> where I::Item: Data;
49}
50
51impl<S: Scope, D: Data> Map<S, D> for Stream<S, D> {
52 fn map<D2: Data, L: FnMut(D)->D2+'static>(&self, mut logic: L) -> Stream<S, D2> {
53 let mut vector = Vec::new();
54 self.unary(Pipeline, "Map", move |_,_| move |input, output| {
55 input.for_each(|time, data| {
56 data.swap(&mut vector);
57 output.session(&time).give_iterator(vector.drain(..).map(|x| logic(x)));
58 });
59 })
60 }
61 fn map_in_place<L: FnMut(&mut D)+'static>(&self, mut logic: L) -> Stream<S, D> {
62 let mut vector = Vec::new();
63 self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| {
64 input.for_each(|time, data| {
65 data.swap(&mut vector);
66 for datum in vector.iter_mut() { logic(datum); }
67 output.session(&time).give_vec(&mut vector);
68 })
69 })
70 }
71 // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
72 // TODO : number of elements from the iterator. This would allow iterators that produce many
73 // TODO : records without taking arbitrarily long and arbitrarily much memory.
74 fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(&self, mut logic: L) -> Stream<S, I::Item> where I::Item: Data {
75 let mut vector = Vec::new();
76 self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
77 input.for_each(|time, data| {
78 data.swap(&mut vector);
79 output.session(&time).give_iterator(vector.drain(..).flat_map(|x| logic(x).into_iter()));
80 });
81 })
82 }
83}