use std::hash::Hash;
use timely::dataflow::operators::*;
use timely::dataflow::scopes::{child::Iterative, Child};
use timely::dataflow::Scope;
use timely::dataflow::StreamCore;
use timely::order::Product;
use timely::progress::Timestamp;
use timely::{Container, Data};
use crate::difference::{Abelian, Multiply, Semigroup};
use crate::hashable::Hashable;
use crate::lattice::Lattice;
pub type VecCollection<G, D, R = isize> = Collection<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
#[derive(Clone)]
pub struct Collection<G: Scope, C> {
pub inner: timely::dataflow::StreamCore<G, C>,
}
impl<G: Scope, C> Collection<G, C> {
pub fn new(stream: StreamCore<G, C>) -> Self {
Self { inner: stream }
}
}
impl<G: Scope, C: Container> Collection<G, C> {
pub fn concat(&self, other: &Self) -> Self {
self.inner.concat(&other.inner).as_collection()
}
pub fn concatenate<I>(&self, sources: I) -> Self
where
I: IntoIterator<Item = Self>,
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}
pub fn enter_region<'a>(
&self,
child: &Child<'a, G, <G as ScopeParent>::Timestamp>,
) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, C> {
self.inner.enter(child).as_collection()
}
pub fn inspect_container<F>(&self, func: F) -> Self
where
F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static,
{
self.inner.inspect_container(func).as_collection()
}
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner.probe()
}
pub fn probe_with(&self, handle: &probe::Handle<G::Timestamp>) -> Self {
Self::new(self.inner.probe_with(handle))
}
pub fn scope(&self) -> G {
self.inner.scope()
}
pub fn negate(&self) -> Self
where
C: containers::Negate,
{
use timely::dataflow::channels::pact::Pipeline;
self.inner
.unary(Pipeline, "Negate", move |_, _| {
move |input, output| {
input.for_each(|time, data| {
output
.session(&time)
.give_container(&mut std::mem::take(data).negate())
});
}
})
.as_collection()
}
pub fn enter<'a, T>(
&self,
child: &Child<'a, G, T>,
) -> Collection<
Child<'a, G, T>,
<C as containers::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer,
>
where
C: containers::Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
T: Refines<<G as ScopeParent>::Timestamp>,
{
use timely::dataflow::channels::pact::Pipeline;
self.inner
.enter(child)
.unary(Pipeline, "Enter", move |_, _| {
move |input, output| {
input.for_each(|time, data| {
output
.session(&time)
.give_container(&mut std::mem::take(data).enter())
});
}
})
.as_collection()
}
pub fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self
where
C: containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
{
use timely::dataflow::channels::pact::Pipeline;
self.inner
.unary(Pipeline, "ResultsIn", move |_, _| {
move |input, output| {
input.for_each(|time, data| {
output
.session(&time)
.give_container(&mut std::mem::take(data).results_in(&step))
});
}
})
.as_collection()
}
}
impl<G: Scope, D: Clone + 'static, R: Clone + 'static> VecCollection<G, D, R> {
pub fn map<D2, L>(&self, mut logic: L) -> VecCollection<G, D2, R>
where
D2: Data,
L: FnMut(D) -> D2 + 'static,
{
self.inner
.map(move |(data, time, delta)| (logic(data), time, delta))
.as_collection()
}
pub fn map_in_place<L>(&self, mut logic: L) -> VecCollection<G, D, R>
where
L: FnMut(&mut D) + 'static,
{
self.inner
.map_in_place(move |&mut (ref mut data, _, _)| logic(data))
.as_collection()
}
pub fn flat_map<I, L>(&self, mut logic: L) -> VecCollection<G, I::Item, R>
where
G::Timestamp: Clone,
I: IntoIterator<Item: Data>,
L: FnMut(D) -> I + 'static,
{
self.inner
.flat_map(move |(data, time, delta)| {
logic(data)
.into_iter()
.map(move |x| (x, time.clone(), delta.clone()))
})
.as_collection()
}
pub fn filter<L>(&self, mut logic: L) -> VecCollection<G, D, R>
where
L: FnMut(&D) -> bool + 'static,
{
self.inner
.filter(move |(data, _, _)| logic(data))
.as_collection()
}
pub fn explode<D2, R2, I, L>(
&self,
mut logic: L,
) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
where
D2: Data,
R2: Semigroup + Multiply<R, Output: Semigroup + 'static>,
I: IntoIterator<Item = (D2, R2)>,
L: FnMut(D) -> I + 'static,
{
self.inner
.flat_map(move |(x, t, d)| {
logic(x)
.into_iter()
.map(move |(x, d2)| (x, t.clone(), d2.multiply(&d)))
})
.as_collection()
}
pub fn join_function<D2, R2, I, L>(
&self,
mut logic: L,
) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
where
G::Timestamp: Lattice,
D2: Data,
R2: Semigroup + Multiply<R, Output: Semigroup + 'static>,
I: IntoIterator<Item = (D2, G::Timestamp, R2)>,
L: FnMut(D) -> I + 'static,
{
self.inner
.flat_map(move |(x, t, d)| {
logic(x)
.into_iter()
.map(move |(x, t2, d2)| (x, t.join(&t2), d2.multiply(&d)))
})
.as_collection()
}
pub fn enter_at<'a, T, F>(
&self,
child: &Iterative<'a, G, T>,
mut initial: F,
) -> VecCollection<Iterative<'a, G, T>, D, R>
where
T: Timestamp + Hash,
F: FnMut(&D) -> T + Clone + 'static,
G::Timestamp: Hash,
{
self.inner
.enter(child)
.map(move |(data, time, diff)| {
let new_time = Product::new(time, initial(&data));
(data, new_time, diff)
})
.as_collection()
}
pub fn delay<F>(&self, func: F) -> VecCollection<G, D, R>
where
G::Timestamp: Hash,
F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
{
let mut func1 = func.clone();
let mut func2 = func.clone();
self.inner
.delay_batch(move |x| func1(x))
.map_in_place(move |x| x.1 = func2(&x.1))
.as_collection()
}
pub fn inspect<F>(&self, func: F) -> VecCollection<G, D, R>
where
F: FnMut(&(D, G::Timestamp, R)) + 'static,
{
self.inner.inspect(func).as_collection()
}
pub fn inspect_batch<F>(&self, mut func: F) -> VecCollection<G, D, R>
where
F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)]) + 'static,
{
self.inner
.inspect_batch(move |time, data| func(time, data))
.as_collection()
}
pub fn assert_empty(&self)
where
D: crate::ExchangeData + Hashable,
R: crate::ExchangeData + Hashable + Semigroup,
G::Timestamp: Lattice + Ord,
{
self.consolidate()
.inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
}
}
use timely::dataflow::scopes::ScopeParent;
use timely::progress::timestamp::Refines;
impl<'a, G: Scope, T: Timestamp, C: Container> Collection<Child<'a, G, T>, C>
where
C: containers::Leave<T, G::Timestamp, OuterContainer: Container>,
T: Refines<<G as ScopeParent>::Timestamp>,
{
pub fn leave(
&self,
) -> Collection<G, <C as containers::Leave<T, G::Timestamp>>::OuterContainer> {
use timely::dataflow::channels::pact::Pipeline;
self.inner
.leave()
.unary(Pipeline, "Leave", move |_, _| {
move |input, output| {
input.for_each(|time, data| {
output
.session(&time)
.give_container(&mut std::mem::take(data).leave())
});
}
})
.as_collection()
}
}
impl<G: Scope, C: Container + Data> Collection<Child<'_, G, G::Timestamp>, C> {
pub fn leave_region(&self) -> Collection<G, C> {
self.inner.leave().as_collection()
}
}
impl<G: Scope<Timestamp: Data>, D: Clone + 'static, R: Abelian + 'static> VecCollection<G, D, R> {
pub fn assert_eq(&self, other: &Self)
where
D: crate::ExchangeData + Hashable,
R: crate::ExchangeData + Hashable,
G::Timestamp: Lattice + Ord,
{
self.negate().concat(other).assert_empty();
}
}
pub trait AsCollection<G: Scope, C> {
fn as_collection(&self) -> Collection<G, C>;
}
impl<G: Scope, C: Clone> AsCollection<G, C> for StreamCore<G, C> {
fn as_collection(&self) -> Collection<G, C> {
Collection::<G, C>::new(self.clone())
}
}
pub fn concatenate<G, C, I>(scope: &mut G, iterator: I) -> Collection<G, C>
where
G: Scope,
C: Container,
I: IntoIterator<Item = Collection<G, C>>,
{
scope
.concatenate(iterator.into_iter().map(|x| x.inner))
.as_collection()
}
pub mod containers {
use crate::collection::Abelian;
use timely::progress::{timestamp::Refines, Timestamp};
pub trait Negate {
fn negate(self) -> Self;
}
impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
fn negate(mut self) -> Self {
for (_data, _time, diff) in self.iter_mut() {
diff.negate();
}
self
}
}
pub trait Enter<T1, T2> {
type InnerContainer;
fn enter(self) -> Self::InnerContainer;
}
impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
type InnerContainer = Vec<(D, T2, R)>;
fn enter(self) -> Self::InnerContainer {
self.into_iter()
.map(|(d, t1, r)| (d, T2::to_inner(t1), r))
.collect()
}
}
pub trait Leave<T1, T2> {
type OuterContainer;
fn leave(self) -> Self::OuterContainer;
}
impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
type OuterContainer = Vec<(D, T2, R)>;
fn leave(self) -> Self::OuterContainer {
self.into_iter()
.map(|(d, t1, r)| (d, t1.to_outer(), r))
.collect()
}
}
pub trait ResultsIn<TS> {
fn results_in(self, step: &TS) -> Self;
}
impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
fn results_in(self, step: &T::Summary) -> Self {
use timely::progress::PathSummary;
self.into_iter()
.filter_map(move |(d, t, r)| step.results_in(&t).map(|t| (d, t, r)))
.collect()
}
}
}