use std::hash::Hash;
use timely::Data;
use timely::progress::Timestamp;
use timely::order::Product;
use timely::dataflow::scopes::{Child, child::Iterative};
use timely::dataflow::{Scope, Stream};
use timely::dataflow::operators::*;
use crate::difference::{Semigroup, Abelian, Multiply};
use crate::lattice::Lattice;
use crate::hashable::Hashable;
#[derive(Clone)]
pub struct Collection<G: Scope, D, R: Semigroup = isize> {
pub inner: Stream<G, (D, G::Timestamp, R)>
}
impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Data {
pub fn new(stream: Stream<G, (D, G::Timestamp, R)>) -> Collection<G, D, R> {
Collection { inner: stream }
}
pub fn map<D2, L>(&self, mut logic: L) -> Collection<G, D2, R>
where D2: Data,
L: FnMut(D) -> D2 + 'static
{
self.inner
.map(move |(data, time, delta)| (logic(data), time, delta))
.as_collection()
}
pub fn map_in_place<L>(&self, mut logic: L) -> Collection<G, D, R>
where L: FnMut(&mut D) + 'static {
self.inner
.map_in_place(move |&mut (ref mut data, _, _)| logic(data))
.as_collection()
}
pub fn flat_map<I, L>(&self, mut logic: L) -> Collection<G, I::Item, R>
where G::Timestamp: Clone,
I: IntoIterator,
I::Item: Data,
L: FnMut(D) -> I + 'static {
self.inner
.flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
.as_collection()
}
pub fn filter<L>(&self, mut logic: L) -> Collection<G, D, R>
where L: FnMut(&D) -> bool + 'static {
self.inner
.filter(move |(data, _, _)| logic(data))
.as_collection()
}
pub fn concat(&self, other: &Collection<G, D, R>) -> Collection<G, D, R> {
self.inner
.concat(&other.inner)
.as_collection()
}
pub fn concatenate<I>(&self, sources: I) -> Collection<G, D, R>
where
I: IntoIterator<Item=Collection<G, D, R>>
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}
pub fn explode<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
where D2: Data,
R2: Semigroup+Multiply<R>,
<R2 as Multiply<R>>::Output: Data+Semigroup,
I: IntoIterator<Item=(D2,R2)>,
L: FnMut(D)->I+'static,
{
self.inner
.flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
.as_collection()
}
pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
where G::Timestamp: Lattice,
D2: Data,
R2: Semigroup+Multiply<R>,
<R2 as Multiply<R>>::Output: Data+Semigroup,
I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
L: FnMut(D)->I+'static,
{
self.inner
.flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
.as_collection()
}
pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
where
T: Refines<<G as ScopeParent>::Timestamp>,
{
self.inner
.enter(child)
.map(|(data, time, diff)| (data, T::to_inner(time), diff))
.as_collection()
}
pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, initial: F) -> Collection<Iterative<'a, G, T>, D, R>
where
T: Timestamp+Hash,
F: FnMut(&D) -> T + Clone + 'static,
G::Timestamp: Hash,
{
let mut initial1 = initial.clone();
let mut initial2 = initial.clone();
self.inner
.enter_at(child, move |x| initial1(&x.0))
.map(move |(data, time, diff)| {
let new_time = Product::new(time, initial2(&data));
(data, new_time, diff)
})
.as_collection()
}
pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R>
{
self.inner
.enter(child)
.as_collection()
}
pub fn delay<F>(&self, func: F) -> Collection<G, D, R>
where F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static {
let mut func1 = func.clone();
let mut func2 = func.clone();
self.inner
.delay_batch(move |x| func1(x))
.map_in_place(move |x| x.1 = func2(&x.1))
.as_collection()
}
pub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
where F: FnMut(&(D, G::Timestamp, R))+'static {
self.inner
.inspect(func)
.as_collection()
}
pub fn inspect_batch<F>(&self, func: F) -> Collection<G, D, R>
where F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static {
self.inner
.inspect_batch(func)
.as_collection()
}
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
}
pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Collection<G, D, R> {
self.inner
.probe_with(handle)
.as_collection()
}
pub fn assert_empty(&self)
where D: crate::ExchangeData+Hashable,
R: crate::ExchangeData+Hashable,
G::Timestamp: Lattice+Ord,
{
self.consolidate()
.inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
}
pub fn scope(&self) -> G {
self.inner.scope()
}
}
use timely::dataflow::scopes::ScopeParent;
use timely::progress::timestamp::Refines;
impl<'a, G: Scope, T: Timestamp, D: Data, R: Semigroup> Collection<Child<'a, G, T>, D, R>
where
T: Refines<<G as ScopeParent>::Timestamp>,
{
pub fn leave(&self) -> Collection<G, D, R> {
self.inner
.leave()
.map(|(data, time, diff)| (data, time.to_outer(), diff))
.as_collection()
}
}
impl<'a, G: Scope, D: Data, R: Semigroup> Collection<Child<'a, G, G::Timestamp>, D, R>
{
pub fn leave_region(&self) -> Collection<G, D, R> {
self.inner
.leave()
.as_collection()
}
}
impl<G: Scope, D: Data, R: Abelian> Collection<G, D, R> where G::Timestamp: Data {
pub fn negate(&self) -> Collection<G, D, R> {
self.inner
.map_in_place(|x| x.2 = x.2.clone().negate())
.as_collection()
}
pub fn assert_eq(&self, other: &Self)
where D: crate::ExchangeData+Hashable,
R: crate::ExchangeData+Hashable,
G::Timestamp: Lattice+Ord
{
self.negate()
.concat(other)
.assert_empty();
}
}
pub trait AsCollection<G: Scope, D: Data, R: Semigroup> {
fn as_collection(&self) -> Collection<G, D, R>;
}
impl<G: Scope, D: Data, R: Semigroup> AsCollection<G, D, R> for Stream<G, (D, G::Timestamp, R)> {
fn as_collection(&self) -> Collection<G, D, R> {
Collection::new(self.clone())
}
}
pub fn concatenate<G, D, R, I>(scope: &mut G, iterator: I) -> Collection<G, D, R>
where
G: Scope,
D: Data,
R: Semigroup,
I: IntoIterator<Item=Collection<G, D, R>>,
{
scope
.concatenate(iterator.into_iter().map(|x| x.inner))
.as_collection()
}