timely/dataflow/operators/
partition.rs1use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
5use crate::dataflow::{Scope, Stream};
6use crate::Data;
7
8pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2)> {
10 fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>>;
26}
27
28impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for Stream<G, D> {
29 fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>> {
30 let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
31
32 let mut input = builder.new_input(self, Pipeline);
33 let mut outputs = Vec::with_capacity(parts as usize);
34 let mut streams = Vec::with_capacity(parts as usize);
35
36 for _ in 0 .. parts {
37 let (output, stream) = builder.new_output();
38 outputs.push(output);
39 streams.push(stream);
40 }
41
42 builder.build(move |_| {
43 let mut vector = Vec::new();
44 move |_frontiers| {
45 let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
46 input.for_each(|time, data| {
47 data.swap(&mut vector);
48 let mut sessions = handles.iter_mut().map(|h| h.session(&time)).collect::<Vec<_>>();
49
50 for datum in vector.drain(..) {
51 let (part, datum2) = route(datum);
52 sessions[part as usize].give(datum2);
53 }
54 });
55 }
56 });
57
58 streams
59 }
60}