use std::collections::BTreeMap;
use crate::container::{DrainContainer, PushInto};
use crate::progress::Timestamp;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::Stream;
use crate::{Container, ContainerBuilder};
pub trait Partition<'scope, T: Timestamp, C: DrainContainer> {
fn partition<CB, D2, F>(self, parts: u64, route: F) -> Vec<Stream<'scope, T, CB::Container>>
where
CB: ContainerBuilder + PushInto<D2>,
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
}
impl<'scope, T: Timestamp, C: Container + DrainContainer> Partition<'scope, T, C> for Stream<'scope, T, C> {
fn partition<CB, D2, F>(self, parts: u64, mut route: F) -> Vec<Stream<'scope, T, CB::Container>>
where
CB: ContainerBuilder + PushInto<D2>,
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
{
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
let mut input = builder.new_input(self, Pipeline);
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
let mut outputs = Vec::with_capacity(parts as usize);
let mut streams = Vec::with_capacity(parts as usize);
let mut c_build = CB::default();
for _ in 0..parts {
let (output, stream) = builder.new_output::<CB::Container>();
outputs.push(output);
streams.push(stream);
}
builder.build(move |_| {
move |_frontiers| {
let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
let mut targets = BTreeMap::<u64,Vec<_>>::default();
input.for_each_time(|time, data| {
for datum in data.flat_map(|d| d.drain()) {
let (part, datum) = route(datum);
targets.entry(part).or_default().push(datum);
}
while let Some((part, data)) = targets.pop_first() {
for datum in data.into_iter() {
c_build.push_into(datum);
while let Some(container) = c_build.extract() {
handles[part as usize].give(&time, container);
}
}
while let Some(container) = c_build.finish() {
handles[part as usize].give(&time, container);
}
}
});
}
});
streams
}
}