timely/dataflow/operators/
map.rs

1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::Data;
4use crate::dataflow::{Stream, Scope};
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::operator::Operator;
7
8/// Extension trait for `Stream`.
9pub trait Map<S: Scope, D: Data> {
10    /// Consumes each element of the stream and yields a new element.
11    ///
12    /// # Examples
13    /// ```
14    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
15    ///
16    /// timely::example(|scope| {
17    ///     (0..10).to_stream(scope)
18    ///            .map(|x| x + 1)
19    ///            .inspect(|x| println!("seen: {:?}", x));
20    /// });
21    /// ```
22    fn map<D2: Data, L: FnMut(D)->D2+'static>(&self, logic: L) -> Stream<S, D2>;
23    /// Updates each element of the stream and yields the element, re-using memory where possible.
24    ///
25    /// # Examples
26    /// ```
27    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
28    ///
29    /// timely::example(|scope| {
30    ///     (0..10).to_stream(scope)
31    ///            .map_in_place(|x| *x += 1)
32    ///            .inspect(|x| println!("seen: {:?}", x));
33    /// });
34    /// ```
35    fn map_in_place<L: FnMut(&mut D)+'static>(&self, logic: L) -> Stream<S, D>;
36    /// Consumes each element of the stream and yields some number of new elements.
37    ///
38    /// # Examples
39    /// ```
40    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
41    ///
42    /// timely::example(|scope| {
43    ///     (0..10).to_stream(scope)
44    ///            .flat_map(|x| (0..x))
45    ///            .inspect(|x| println!("seen: {:?}", x));
46    /// });
47    /// ```
48    fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(&self, logic: L) -> Stream<S, I::Item> where I::Item: Data;
49}
50
51impl<S: Scope, D: Data> Map<S, D> for Stream<S, D> {
52    fn map<D2: Data, L: FnMut(D)->D2+'static>(&self, mut logic: L) -> Stream<S, D2> {
53        let mut vector = Vec::new();
54        self.unary(Pipeline, "Map", move |_,_| move |input, output| {
55            input.for_each(|time, data| {
56                data.swap(&mut vector);
57                output.session(&time).give_iterator(vector.drain(..).map(|x| logic(x)));
58            });
59        })
60    }
61    fn map_in_place<L: FnMut(&mut D)+'static>(&self, mut logic: L) -> Stream<S, D> {
62        let mut vector = Vec::new();
63        self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| {
64            input.for_each(|time, data| {
65                data.swap(&mut vector);
66                for datum in vector.iter_mut() { logic(datum); }
67                output.session(&time).give_vec(&mut vector);
68            })
69        })
70    }
71    // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
72    // TODO : number of elements from the iterator. This would allow iterators that produce many
73    // TODO : records without taking arbitrarily long and arbitrarily much memory.
74    fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(&self, mut logic: L) -> Stream<S, I::Item> where I::Item: Data {
75        let mut vector = Vec::new();
76        self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
77            input.for_each(|time, data| {
78                data.swap(&mut vector);
79                output.session(&time).give_iterator(vector.drain(..).flat_map(|x| logic(x).into_iter()));
80            });
81        })
82    }
83}