use timely::dataflow::operators::{Enter, vec::Map};
use timely::order::PartialOrder;
use timely::dataflow::{Scope, Stream};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline};
use timely::progress::Timestamp;
use timely::progress::Antichain;
use timely::dataflow::operators::Capability;
use crate::{Data, VecCollection, AsCollection};
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
use trace::wrappers::enter::{TraceEnter, BatchEnter,};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
use super::TraceAgent;
pub struct Arranged<'scope, Tr>
where
Tr: TraceReader+Clone,
{
pub stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>,
pub trace: Tr,
}
impl<'scope, Tr> Clone for Arranged<'scope, Tr>
where
Tr: TraceReader + Clone,
{
fn clone(&self) -> Self {
Arranged {
stream: self.stream.clone(),
trace: self.trace.clone(),
}
}
}
use ::timely::progress::timestamp::Refines;
use timely::Container;
impl<'scope, Tr> Arranged<'scope, Tr>
where
Tr: TraceReader + Clone,
{
pub fn enter<'inner, TInner>(self, child: Scope<'inner, TInner>) -> Arranged<'inner, TraceEnter<Tr, TInner>>
where
TInner: Refines<Tr::Time>+Lattice,
{
Arranged {
stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
trace: TraceEnter::make_from(self.trace),
}
}
pub fn enter_region<'inner>(self, child: Scope<'inner, Tr::Time>) -> Arranged<'inner, Tr> {
Arranged {
stream: self.stream.enter(child),
trace: self.trace,
}
}
pub fn enter_at<'inner, TInner, F, P>(self, child: Scope<'inner, TInner>, logic: F, prior: P) -> Arranged<'inner, TraceEnterAt<Tr, TInner, F, P>>
where
TInner: Refines<Tr::Time>+Lattice+'static,
F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
P: FnMut(&TInner)->Tr::Time+Clone+'static,
{
let logic1 = logic.clone();
let logic2 = logic.clone();
Arranged {
trace: TraceEnterAt::make_from(self.trace, logic1, prior),
stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
}
}
pub fn as_container<I, L>(self, mut logic: L) -> crate::Collection<'scope, Tr::Time, I::Item>
where
I: IntoIterator<Item: Container>,
L: FnMut(Tr::Batch) -> I+'static,
{
self.stream.unary(Pipeline, "AsContainer", move |_,_| move |input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for wrapper in data.drain(..) {
for mut container in logic(wrapper) {
session.give_container(&mut container);
}
}
});
})
.as_collection()
}
pub fn as_collection<D: Data, L>(self, mut logic: L) -> VecCollection<'scope, Tr::Time, D, Tr::Diff>
where
L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
{
self.flat_map_ref(move |key, val| Some(logic(key,val)))
}
pub fn as_vecs<K, V>(self) -> VecCollection<'scope, Tr::Time, (K, V), Tr::Diff>
where
K: crate::ExchangeData,
V: crate::ExchangeData,
Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V>,
{
self.flat_map_ref(move |key, val| [(key.clone(), val.clone())])
}
pub fn flat_map_ref<I, L>(self, logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff>
where
I: IntoIterator<Item: Data>,
L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
{
Self::flat_map_batches(self.stream, logic)
}
pub fn flat_map_batches<I, L>(stream: Stream<'scope, Tr::Time, Vec<Tr::Batch>>, mut logic: L) -> VecCollection<'scope, Tr::Time, I::Item, Tr::Diff>
where
I: IntoIterator<Item: Data>,
L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
{
stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for wrapper in data.iter() {
let batch = &wrapper;
let mut cursor = batch.cursor();
while let Some(key) = cursor.get_key(batch) {
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {
session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
});
}
cursor.step_val(batch);
}
cursor.step_key(batch);
}
}
});
})
.as_collection()
}
}
use crate::difference::Multiply;
impl<'scope, Tr1> Arranged<'scope, Tr1>
where
Tr1: TraceReader + Clone + 'static,
{
pub fn join_core<Tr2,I,L>(self, other: Arranged<'scope, Tr2>, mut result: L) -> VecCollection<'scope, Tr1::Time,I::Item,<Tr1::Diff as Multiply<Tr2::Diff>>::Output>
where
Tr2: for<'a> TraceReader<Key<'a>=Tr1::Key<'a>,Time=Tr1::Time>+Clone+'static,
Tr1::Diff: Multiply<Tr2::Diff, Output: Semigroup+'static>,
I: IntoIterator<Item: Data>,
L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>)->I+'static
{
let mut result = move |k: Tr1::Key<'_>, v1: Tr1::Val<'_>, v2: Tr2::Val<'_>, t: &Tr1::Time, r1: &Tr1::Diff, r2: &Tr2::Diff| {
let t = t.clone();
let r = (r1.clone()).multiply(r2);
result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
};
use crate::operators::join::join_traces;
join_traces::<_, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
self,
other,
move |k, v1, v2, t, d1, d2, c| {
for datum in result(k, v1, v2, t, d1, d2) {
c.give(datum);
}
}
)
.as_collection()
}
}
use crate::difference::Abelian;
impl<'scope, Tr1> Arranged<'scope, Tr1>
where
Tr1: TraceReader + Clone + 'static,
{
pub fn reduce_abelian<L, Bu, Tr2, P>(self, name: &str, mut logic: L, push: P) -> Arranged<'scope, TraceAgent<Tr2>>
where
Tr2: for<'a> Trace<
Key<'a>= Tr1::Key<'a>,
ValOwn: Data,
Time=Tr1::Time,
Diff: Abelian,
>+'static,
Bu: Builder<Time=Tr1::Time, Output = Tr2::Batch, Input: Default>,
L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
{
self.reduce_core::<_,Bu,Tr2,_>(name, move |key, input, output, change| {
if !input.is_empty() {
logic(key, input, change);
}
change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
crate::consolidation::consolidate(change);
}, push)
}
pub fn reduce_core<L, Bu, Tr2, P>(self, name: &str, logic: L, push: P) -> Arranged<'scope, TraceAgent<Tr2>>
where
Tr2: for<'a> Trace<
Key<'a>=Tr1::Key<'a>,
ValOwn: Data,
Time=Tr1::Time,
>+'static,
Bu: Builder<Time=Tr1::Time, Output = Tr2::Batch, Input: Default>,
L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn, Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
{
use crate::operators::reduce::reduce_trace;
reduce_trace::<_,Bu,_,_,_>(self, name, logic, push)
}
}
impl<'scope, Tr> Arranged<'scope, Tr>
where
Tr: TraceReader + Clone,
{
pub fn leave_region<'outer>(self, outer: Scope<'outer, Tr::Time>) -> Arranged<'outer, Tr> {
use timely::dataflow::operators::Leave;
Arranged {
stream: self.stream.leave(outer),
trace: self.trace,
}
}
}
pub trait Arrange<'scope, T, C> : Sized
where
T: Timestamp + Lattice,
{
fn arrange<Ba, Bu, Tr>(self) -> Arranged<'scope, TraceAgent<Tr>>
where
Ba: Batcher<Input=C, Time=T> + 'static,
Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=T> + 'static,
{
self.arrange_named::<Ba, Bu, Tr>("Arrange")
}
fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
where
Ba: Batcher<Input=C, Time=T> + 'static,
Bu: Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=T> + 'static,
;
}
pub fn arrange_core<'scope, P, Ba, Bu, Tr>(stream: Stream<'scope, Tr::Time, Ba::Input>, pact: P, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
where
P: ParallelizationContract<Tr::Time, Ba::Input>,
Ba: Batcher<Time=Tr::Time,Input: Container> + 'static,
Bu: Builder<Time=Tr::Time, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace+'static,
{
let mut reader: Option<TraceAgent<Tr>> = None;
let reader_ref = &mut reader;
let scope = stream.scope();
let stream = stream.unary_frontier(pact, name, move |_capability, info| {
let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
let mut batcher = Ba::new(logger.clone(), info.global_id);
let mut capabilities = Antichain::<Capability<Tr::Time>>::new();
let activator = Some(scope.activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
if let Some(exert_logic) = scope.worker().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
empty_trace.set_exert_logic(exert_logic);
}
let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
*reader_ref = Some(reader_local);
let mut prev_frontier = Antichain::from_elem(Tr::Time::minimum());
move |(input, frontier), output| {
input.for_each(|cap, data| {
capabilities.insert(cap.retain(0));
batcher.push_container(data);
});
assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));
if prev_frontier.borrow() != frontier.frontier() {
if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {
let mut upper = Antichain::new();
for (index, capability) in capabilities.elements().iter().enumerate() {
if !frontier.less_equal(capability.time()) {
upper.clear();
for time in frontier.frontier().iter() {
upper.insert(time.clone());
}
for other_capability in &capabilities.elements()[(index + 1) .. ] {
upper.insert(other_capability.time().clone());
}
let batch = batcher.seal::<Bu>(upper.clone());
writer.insert(batch.clone(), Some(capability.time().clone()));
output.session(&capabilities.elements()[index]).give(batch);
}
}
let mut new_capabilities = Antichain::new();
for time in batcher.frontier().iter() {
if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
new_capabilities.insert(capability.delayed(time));
}
else {
panic!("failed to find capability");
}
}
capabilities = new_capabilities;
}
else {
let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
writer.seal(frontier.frontier().to_owned());
}
prev_frontier.clear();
prev_frontier.extend(frontier.frontier().iter().cloned());
}
writer.exert();
}
});
Arranged { stream, trace: reader.unwrap() }
}