use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::operators::Capability;
use timely::dataflow::operators::{Enter, Map};
use timely::dataflow::{Scope, Stream, StreamCore};
use timely::order::PartialOrder;
use timely::progress::Antichain;
use timely::progress::Timestamp;
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::implementations::merge_batcher::container::MergerChunk;
use crate::trace::implementations::{
KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine,
};
use crate::trace::{self, BatchReader, Batcher, Builder, Cursor, Trace, TraceReader};
use crate::{AsCollection, Data, ExchangeData, Hashable, VecCollection};
use trace::wrappers::enter::{BatchEnter, TraceEnter};
use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
use super::TraceAgent;
pub struct Arranged<G, Tr>
where
G: Scope<Timestamp: Lattice + Ord>,
Tr: TraceReader + Clone,
{
pub stream: Stream<G, Tr::Batch>,
pub trace: Tr,
}
impl<G, Tr> Clone for Arranged<G, Tr>
where
G: Scope<Timestamp = Tr::Time>,
Tr: TraceReader + Clone,
{
fn clone(&self) -> Self {
Arranged {
stream: self.stream.clone(),
trace: self.trace.clone(),
}
}
}
use ::timely::dataflow::scopes::Child;
use ::timely::progress::timestamp::Refines;
use timely::container::PushInto;
use timely::Container;
impl<G, Tr> Arranged<G, Tr>
where
G: Scope<Timestamp = Tr::Time>,
Tr: TraceReader + Clone,
{
pub fn enter<'a, TInner>(
&self,
child: &Child<'a, G, TInner>,
) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
where
TInner: Refines<G::Timestamp> + Lattice + Timestamp + Clone,
{
Arranged {
stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
trace: TraceEnter::make_from(self.trace.clone()),
}
}
pub fn enter_region<'a>(
&self,
child: &Child<'a, G, G::Timestamp>,
) -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
Arranged {
stream: self.stream.enter(child),
trace: self.trace.clone(),
}
}
pub fn enter_at<'a, TInner, F, P>(
&self,
child: &Child<'a, G, TInner>,
logic: F,
prior: P,
) -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
where
TInner: Refines<G::Timestamp> + Lattice + Timestamp + Clone + 'static,
F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>) -> TInner + Clone + 'static,
P: FnMut(&TInner) -> Tr::Time + Clone + 'static,
{
let logic1 = logic.clone();
let logic2 = logic.clone();
Arranged {
trace: TraceEnterAt::make_from(self.trace.clone(), logic1, prior),
stream: self
.stream
.enter(child)
.map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
}
}
pub fn as_collection<D: Data, L>(&self, mut logic: L) -> VecCollection<G, D, Tr::Diff>
where
L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
{
self.flat_map_ref(move |key, val| Some(logic(key, val)))
}
pub fn flat_map_ref<I, L>(&self, logic: L) -> VecCollection<G, I::Item, Tr::Diff>
where
I: IntoIterator<Item: Data>,
L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I + 'static,
{
Self::flat_map_batches(&self.stream, logic)
}
pub fn flat_map_batches<I, L>(
stream: &Stream<G, Tr::Batch>,
mut logic: L,
) -> VecCollection<G, I::Item, Tr::Diff>
where
I: IntoIterator<Item: Data>,
L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I + 'static,
{
stream
.unary(Pipeline, "AsCollection", move |_, _| {
move |input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for wrapper in data.iter() {
let batch = &wrapper;
let mut cursor = batch.cursor();
while let Some(key) = cursor.get_key(batch) {
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {
session.give((
datum.clone(),
Tr::owned_time(time),
Tr::owned_diff(diff),
));
});
}
cursor.step_val(batch);
}
cursor.step_key(batch);
}
}
});
}
})
.as_collection()
}
}
use crate::difference::Multiply;
impl<G, T1> Arranged<G, T1>
where
G: Scope<Timestamp = T1::Time>,
T1: TraceReader + Clone + 'static,
{
pub fn join_core<T2, I, L>(
&self,
other: &Arranged<G, T2>,
mut result: L,
) -> VecCollection<G, I::Item, <T1::Diff as Multiply<T2::Diff>>::Output>
where
T2: for<'a> TraceReader<Key<'a> = T1::Key<'a>, Time = T1::Time> + Clone + 'static,
T1::Diff: Multiply<T2::Diff, Output: Semigroup + 'static>,
I: IntoIterator<Item: Data>,
L: FnMut(T1::Key<'_>, T1::Val<'_>, T2::Val<'_>) -> I + 'static,
{
let result = move |k: T1::Key<'_>,
v1: T1::Val<'_>,
v2: T2::Val<'_>,
t: &G::Timestamp,
r1: &T1::Diff,
r2: &T2::Diff| {
let t = t.clone();
let r = (r1.clone()).multiply(r2);
result(k, v1, v2)
.into_iter()
.map(move |d| (d, t.clone(), r.clone()))
};
self.join_core_internal_unsafe(other, result)
}
pub fn join_core_internal_unsafe<T2, I, L, D, ROut>(
&self,
other: &Arranged<G, T2>,
mut result: L,
) -> VecCollection<G, D, ROut>
where
T2: for<'a> TraceReader<Key<'a> = T1::Key<'a>, Time = T1::Time> + Clone + 'static,
D: Data,
ROut: Semigroup + 'static,
I: IntoIterator<Item = (D, G::Timestamp, ROut)>,
L: FnMut(T1::Key<'_>, T1::Val<'_>, T2::Val<'_>, &G::Timestamp, &T1::Diff, &T2::Diff) -> I
+ 'static,
{
use crate::operators::join::join_traces;
join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
self,
other,
move |k, v1, v2, t, d1, d2, c| {
for datum in result(k, v1, v2, t, d1, d2) {
c.give(datum);
}
},
)
.as_collection()
}
}
use crate::difference::Abelian;
impl<G, T1> Arranged<G, T1>
where
G: Scope<Timestamp = T1::Time>,
T1: TraceReader + Clone + 'static,
{
pub fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
T1: TraceReader<KeyOwn: Ord>,
T2: for<'a> Trace<
Key<'a> = T1::Key<'a>,
KeyOwn = T1::KeyOwn,
ValOwn: Data,
Time = T1::Time,
Diff: Abelian,
> + 'static,
Bu: Builder<
Time = G::Timestamp,
Output = T2::Batch,
Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
+ 'static,
{
self.reduce_core::<_, Bu, T2>(name, move |key, input, output, change| {
if !input.is_empty() {
logic(key, input, change);
}
change.extend(output.drain(..).map(|(x, mut d)| {
d.negate();
(x, d)
}));
crate::consolidation::consolidate(change);
})
}
pub fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T1: TraceReader<KeyOwn: Ord>,
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, KeyOwn = T1::KeyOwn, ValOwn: Data, Time = T1::Time>
+ 'static,
Bu: Builder<
Time = G::Timestamp,
Output = T2::Batch,
Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
>,
L: FnMut(
T1::Key<'_>,
&[(T1::Val<'_>, T1::Diff)],
&mut Vec<(T2::ValOwn, T2::Diff)>,
&mut Vec<(T2::ValOwn, T2::Diff)>,
) + 'static,
{
use crate::operators::reduce::reduce_trace;
reduce_trace::<_, _, Bu, _, _>(self, name, logic)
}
}
impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
where
G: Scope<Timestamp = Tr::Time>,
Tr: TraceReader + Clone,
{
pub fn leave_region(&self) -> Arranged<G, Tr> {
use timely::dataflow::operators::Leave;
Arranged {
stream: self.stream.leave(),
trace: self.trace.clone(),
}
}
}
pub trait Arrange<G, C>
where
G: Scope<Timestamp: Lattice>,
{
fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input = C, Time = G::Timestamp> + 'static,
Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time = G::Timestamp> + 'static,
{
self.arrange_named::<Ba, Bu, Tr>("Arrange")
}
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input = C, Time = G::Timestamp> + 'static,
Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time = G::Timestamp> + 'static;
}
impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for VecCollection<G, (K, V), R>
where
G: Scope<Timestamp: Lattice>,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
{
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input = Vec<((K, V), G::Timestamp, R)>, Time = G::Timestamp> + 'static,
Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time = G::Timestamp> + 'static,
{
let exchange =
Exchange::new(move |update: &((K, V), G::Timestamp, R)| (update.0).0.hashed().into());
arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
}
}
pub fn arrange_core<G, P, Ba, Bu, Tr>(
stream: &StreamCore<G, Ba::Input>,
pact: P,
name: &str,
) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp: Lattice>,
P: ParallelizationContract<G::Timestamp, Ba::Input>,
Ba: Batcher<Time = G::Timestamp, Input: Container> + 'static,
Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time = G::Timestamp> + 'static,
{
let mut reader: Option<TraceAgent<Tr>> = None;
let reader_ref = &mut reader;
let scope = stream.scope();
let stream = stream.unary_frontier(pact, name, move |_capability, info| {
let logger = scope
.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange")
.map(Into::into);
let mut batcher = Ba::new(logger.clone(), info.global_id);
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let activator = Some(scope.activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
if let Some(exert_logic) = scope
.config()
.get::<trace::ExertionLogic>("differential/default_exert_logic")
.cloned()
{
empty_trace.set_exert_logic(exert_logic);
}
let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
*reader_ref = Some(reader_local);
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
move |(input, frontier), output| {
input.for_each(|cap, data| {
capabilities.insert(cap.retain());
batcher.push_container(data);
});
assert!(PartialOrder::less_equal(
&prev_frontier.borrow(),
&frontier.frontier()
));
if prev_frontier.borrow() != frontier.frontier() {
if capabilities
.elements()
.iter()
.any(|c| !frontier.less_equal(c.time()))
{
let mut upper = Antichain::new();
for (index, capability) in capabilities.elements().iter().enumerate() {
if !frontier.less_equal(capability.time()) {
upper.clear();
for time in frontier.frontier().iter() {
upper.insert(time.clone());
}
for other_capability in &capabilities.elements()[(index + 1)..] {
upper.insert(other_capability.time().clone());
}
let batch = batcher.seal::<Bu>(upper.clone());
writer.insert(batch.clone(), Some(capability.time().clone()));
output.session(&capabilities.elements()[index]).give(batch);
}
}
let mut new_capabilities = Antichain::new();
for time in batcher.frontier().iter() {
if let Some(capability) = capabilities
.elements()
.iter()
.find(|c| c.time().less_equal(time))
{
new_capabilities.insert(capability.delayed(time));
} else {
panic!("failed to find capability");
}
}
capabilities = new_capabilities;
} else {
let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
writer.seal(frontier.frontier().to_owned());
}
prev_frontier.clear();
prev_frontier.extend(frontier.frontier().iter().cloned());
}
writer.exert();
}
});
Arranged {
stream,
trace: reader.unwrap(),
}
}
impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup>
Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for VecCollection<G, K, R>
where
G: Scope<Timestamp: Lattice + Ord>,
{
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input = Vec<((K, ()), G::Timestamp, R)>, Time = G::Timestamp> + 'static,
Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time = G::Timestamp> + 'static,
{
let exchange =
Exchange::new(move |update: &((K, ()), G::Timestamp, R)| (update.0).0.hashed().into());
arrange_core::<_, _, Ba, Bu, _>(&self.map(|k| (k, ())).inner, exchange, name)
}
}
pub trait ArrangeByKey<G: Scope, K: Data + Hashable, V: Data, R: Ord + Semigroup + 'static>
where
G: Scope<Timestamp: Lattice + Ord>,
{
fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
fn arrange_by_key_named(
&self,
name: &str,
) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
}
impl<G, K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData + Semigroup>
ArrangeByKey<G, K, V, R> for VecCollection<G, (K, V), R>
where
G: Scope<Timestamp: Lattice + Ord>,
{
fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
self.arrange_by_key_named("ArrangeByKey")
}
fn arrange_by_key_named(
&self,
name: &str,
) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
self.arrange_named::<ValBatcher<_, _, _, _>, ValBuilder<_, _, _, _>, _>(name)
}
}
pub trait ArrangeBySelf<G, K: Data + Hashable, R: Ord + Semigroup + 'static>
where
G: Scope<Timestamp: Lattice + Ord>,
{
fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
fn arrange_by_self_named(
&self,
name: &str,
) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
}
impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> ArrangeBySelf<G, K, R>
for VecCollection<G, K, R>
where
G: Scope<Timestamp: Lattice + Ord>,
{
fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
self.arrange_by_self_named("ArrangeBySelf")
}
fn arrange_by_self_named(
&self,
name: &str,
) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
self.map(|k| (k, ()))
.arrange_named::<KeyBatcher<_, _, _>, KeyBuilder<_, _, _>, _>(name)
}
}