1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
//! Extension methods for `Stream` based on record-by-record transformation. use Data; use dataflow::{Stream, Scope}; use dataflow::channels::pact::Pipeline; use dataflow::operators::generic::unary::Unary; /// Extension trait for `Stream`. pub trait Map<S: Scope, D: Data> { /// Consumes each element of the stream and yields a new element. /// /// #Examples /// ``` /// use timely::dataflow::operators::{ToStream, Map, Inspect}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .map(|x| x + 1) /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` fn map<D2: Data, L: Fn(D)->D2+'static>(&self, logic: L) -> Stream<S, D2>; /// Updates each element of the stream and yields the element, re-using memory where possible. /// /// #Examples /// ``` /// use timely::dataflow::operators::{ToStream, Map, Inspect}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .map_in_place(|x| *x += 1) /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` fn map_in_place<L: Fn(&mut D)+'static>(&self, logic: L) -> Stream<S, D>; /// Consumes each element of the stream and yields some number of new elements. /// /// #Examples /// ``` /// use timely::dataflow::operators::{ToStream, Map, Inspect}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) /// .flat_map(|x| (0..x)) /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` fn flat_map<I: IntoIterator, L: Fn(D)->I+'static>(&self, logic: L) -> Stream<S, I::Item> where I::Item: Data; } impl<S: Scope, D: Data> Map<S, D> for Stream<S, D> { fn map<D2: Data, L: Fn(D)->D2+'static>(&self, logic: L) -> Stream<S, D2> { self.unary_stream(Pipeline, "Map", move |input, output| { input.for_each(|time, data| { output.session(&time).give_iterator(data.drain(..).map(|x| logic(x))); }); }) } fn map_in_place<L: Fn(&mut D)+'static>(&self, logic: L) -> Stream<S, D> { self.unary_stream(Pipeline, "MapInPlace", move |input, output| { input.for_each(|time, data| { for datum in data.iter_mut() { logic(datum); } output.session(&time).give_content(data); }) }) } // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. fn flat_map<I: IntoIterator, L: Fn(D)->I+'static>(&self, logic: L) -> Stream<S, I::Item> where I::Item: Data { self.unary_stream(Pipeline, "FlatMap", move |input, output| { input.for_each(|time, data| { output.session(&time).give_iterator(data.drain(..).flat_map(|x| logic(x).into_iter())); }); }) } // fn filter_map<D2: Data, L: Fn(D)->Option<D2>+'static>(&self, logic: L) -> Stream<S, D2> { // self.unary_stream(Pipeline, "FilterMap", move |input, output| { // while let Some((time, data)) = input.next() { // output.session(time).give_iterator(data.drain(..).filter_map(|x| logic(x))); // } // }) // } }