timely/dataflow/operators/
partition.rs

1//! Partition a stream of records into multiple streams.
2
3use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
5use crate::dataflow::{Scope, Stream};
6use crate::Data;
7
8/// Partition a stream of records into multiple streams.
9pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2)> {
10    /// Produces `parts` output streams, containing records produced and assigned by `route`.
11    ///
12    /// # Examples
13    /// ```
14    /// use timely::dataflow::operators::{ToStream, Partition, Inspect};
15    ///
16    /// timely::example(|scope| {
17    ///     let streams = (0..10).to_stream(scope)
18    ///                          .partition(3, |x| (x % 3, x));
19    ///
20    ///     streams[0].inspect(|x| println!("seen 0: {:?}", x));
21    ///     streams[1].inspect(|x| println!("seen 1: {:?}", x));
22    ///     streams[2].inspect(|x| println!("seen 2: {:?}", x));
23    /// });
24    /// ```
25    fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>>;
26}
27
28impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for Stream<G, D> {
29    fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>> {
30        let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
31
32        let mut input = builder.new_input(self, Pipeline);
33        let mut outputs = Vec::with_capacity(parts as usize);
34        let mut streams = Vec::with_capacity(parts as usize);
35
36        for _ in 0 .. parts {
37            let (output, stream) = builder.new_output();
38            outputs.push(output);
39            streams.push(stream);
40        }
41
42        builder.build(move |_| {
43            let mut vector = Vec::new();
44            move |_frontiers| {
45                let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
46                input.for_each(|time, data| {
47                    data.swap(&mut vector);
48                    let mut sessions = handles.iter_mut().map(|h| h.session(&time)).collect::<Vec<_>>();
49
50                    for datum in vector.drain(..) {
51                        let (part, datum2) = route(datum);
52                        sessions[part as usize].give(datum2);
53                    }
54                });
55            }
56        });
57
58        streams
59    }
60}