pub mod cursor;
pub mod description;
pub mod implementations;
pub mod layers;
pub mod wrappers;
use ::difference::Monoid;
pub use self::cursor::Cursor;
pub use self::description::Description;
pub trait TraceReader<Key, Val, Time, R> {
type Batch: BatchReader<Key, Val, Time, R>+Clone+'static;
type Cursor: Cursor<Key, Val, Time, R>;
fn cursor(&mut self) -> (Self::Cursor, <Self::Cursor as Cursor<Key, Val, Time, R>>::Storage) {
if let Some(cursor) = self.cursor_through(&[]) {
cursor
}
else {
panic!("unable to acquire complete cursor for trace; is it closed?");
}
}
fn cursor_through(&mut self, upper: &[Time]) -> Option<(Self::Cursor, <Self::Cursor as Cursor<Key, Val, Time, R>>::Storage)>;
fn advance_by(&mut self, frontier: &[Time]);
fn advance_frontier(&mut self) -> &[Time];
fn distinguish_since(&mut self, frontier: &[Time]);
fn distinguish_frontier(&mut self) -> &[Time];
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, f: F);
}
pub trait Trace<Key, Val, Time, R> : TraceReader<Key, Val, Time, R>
where <Self as TraceReader<Key, Val, Time, R>>::Batch: Batch<Key, Val, Time, R> {
fn new(info: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>) -> Self;
fn insert(&mut self, batch: Self::Batch);
fn close(&mut self);
}
pub trait BatchReader<K, V, T, R> where Self: ::std::marker::Sized
{
type Cursor: Cursor<K, V, T, R, Storage=Self>;
fn cursor(&self) -> Self::Cursor;
fn len(&self) -> usize;
fn description(&self) -> &Description<T>;
fn lower(&self) -> &[T] { self.description().lower() }
fn upper(&self) -> &[T] { self.description().upper() }
}
pub trait Batch<K, V, T, R> : BatchReader<K, V, T, R> where Self: ::std::marker::Sized {
type Batcher: Batcher<K, V, T, R, Self>;
type Builder: Builder<K, V, T, R, Self>;
type Merger: Merger<K, V, T, R, Self>;
fn begin_merge(&self, other: &Self) -> Self::Merger {
Self::Merger::new(self, other)
}
}
pub trait Batcher<K, V, T, R, Output: Batch<K, V, T, R>> {
fn new() -> Self;
fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>);
fn seal(&mut self, upper: &[T]) -> Output;
fn frontier(&mut self) -> &[T];
}
pub trait Builder<K, V, T, R, Output: Batch<K, V, T, R>> {
fn new() -> Self;
fn with_capacity(cap: usize) -> Self;
fn push(&mut self, element: (K, V, T, R));
fn extend<I: Iterator<Item=(K,V,T,R)>>(&mut self, iter: I) {
for item in iter { self.push(item); }
}
fn done(self, lower: &[T], upper: &[T], since: &[T]) -> Output;
}
pub trait Merger<K, V, T, R, Output: Batch<K, V, T, R>> {
fn new(source1: &Output, source2: &Output) -> Self;
fn work(&mut self, source1: &Output, source2: &Output, frontier: &Option<Vec<T>>, fuel: &mut usize);
fn done(self) -> Output;
}
pub mod rc_blanket_impls {
use std::rc::Rc;
use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description};
impl<K, V, T, R, B: BatchReader<K,V,T,R>> BatchReader<K,V,T,R> for Rc<B> {
type Cursor = RcBatchCursor<K, V, T, R, B>;
fn cursor(&self) -> Self::Cursor {
RcBatchCursor::new((&**self).cursor())
}
fn len(&self) -> usize { (&**self).len() }
fn description(&self) -> &Description<T> { (&**self).description() }
}
pub struct RcBatchCursor<K, V, T, R, B: BatchReader<K, V, T, R>> {
phantom: ::std::marker::PhantomData<(K, V, T, R)>,
cursor: B::Cursor,
}
impl<K, V, T, R, B: BatchReader<K, V, T, R>> RcBatchCursor<K, V, T, R, B> {
fn new(cursor: B::Cursor) -> Self {
RcBatchCursor {
cursor,
phantom: ::std::marker::PhantomData,
}
}
}
impl<K, V, T, R, B: BatchReader<K, V, T, R>> Cursor<K, V, T, R> for RcBatchCursor<K, V, T, R, B> {
type Storage = Rc<B>;
#[inline(always)] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
#[inline(always)] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
#[inline(always)] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) }
#[inline(always)] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) }
#[inline(always)]
fn map_times<L: FnMut(&T, &R)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
}
#[inline(always)] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
#[inline(always)] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) }
#[inline(always)] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
#[inline(always)] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) }
#[inline(always)] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
#[inline(always)] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
}
impl<K,V,T,R,B: Batch<K,V,T,R>> Batch<K, V, T, R> for Rc<B> {
type Batcher = RcBatcher<K, V, T, R, B>;
type Builder = RcBuilder<K, V, T, R, B>;
type Merger = RcMerger<K, V, T, R, B>;
}
pub struct RcBatcher<K,V,T,R,B:Batch<K,V,T,R>> { batcher: B::Batcher }
impl<K,V,T,R,B:Batch<K,V,T,R>> Batcher<K, V, T, R, Rc<B>> for RcBatcher<K,V,T,R,B> {
fn new() -> Self { RcBatcher { batcher: <B::Batcher as Batcher<K,V,T,R,B>>::new() } }
fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) }
fn seal(&mut self, upper: &[T]) -> Rc<B> { Rc::new(self.batcher.seal(upper)) }
fn frontier(&mut self) -> &[T] { self.batcher.frontier() }
}
pub struct RcBuilder<K,V,T,R,B:Batch<K,V,T,R>> { builder: B::Builder }
impl<K,V,T,R,B:Batch<K,V,T,R>> Builder<K, V, T, R, Rc<B>> for RcBuilder<K,V,T,R,B> {
fn new() -> Self { RcBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::new() } }
fn with_capacity(cap: usize) -> Self { RcBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::with_capacity(cap) } }
fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) }
fn done(self, lower: &[T], upper: &[T], since: &[T]) -> Rc<B> { Rc::new(self.builder.done(lower, upper, since)) }
}
pub struct RcMerger<K,V,T,R,B:Batch<K,V,T,R>> { merger: B::Merger }
impl<K,V,T,R,B:Batch<K,V,T,R>> Merger<K, V, T, R, Rc<B>> for RcMerger<K,V,T,R,B> {
fn new(source1: &Rc<B>, source2: &Rc<B>) -> Self { RcMerger { merger: B::begin_merge(source1, source2) } }
fn work(&mut self, source1: &Rc<B>, source2: &Rc<B>, frontier: &Option<Vec<T>>, fuel: &mut usize) { self.merger.work(source1, source2, frontier, fuel) }
fn done(self) -> Rc<B> { Rc::new(self.merger.done()) }
}
}
pub mod abomonated_blanket_impls {
extern crate abomonation;
use abomonation::{Abomonation, measure};
use abomonation::abomonated::Abomonated;
use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description};
impl<K, V, T, R, B: BatchReader<K,V,T,R>+Abomonation> BatchReader<K,V,T,R> for Abomonated<B, Vec<u8>> {
type Cursor = AbomonatedBatchCursor<K, V, T, R, B>;
fn cursor(&self) -> Self::Cursor {
AbomonatedBatchCursor::new((&**self).cursor())
}
fn len(&self) -> usize { (&**self).len() }
fn description(&self) -> &Description<T> { (&**self).description() }
}
pub struct AbomonatedBatchCursor<K, V, T, R, B: BatchReader<K, V, T, R>> {
phantom: ::std::marker::PhantomData<(K, V, T, R)>,
cursor: B::Cursor,
}
impl<K, V, T, R, B: BatchReader<K, V, T, R>> AbomonatedBatchCursor<K, V, T, R, B> {
fn new(cursor: B::Cursor) -> Self {
AbomonatedBatchCursor {
cursor,
phantom: ::std::marker::PhantomData,
}
}
}
impl<K, V, T, R, B: BatchReader<K, V, T, R>+Abomonation> Cursor<K, V, T, R> for AbomonatedBatchCursor<K, V, T, R, B> {
type Storage = Abomonated<B, Vec<u8>>;
#[inline(always)] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
#[inline(always)] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
#[inline(always)] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) }
#[inline(always)] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) }
#[inline(always)]
fn map_times<L: FnMut(&T, &R)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
}
#[inline(always)] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
#[inline(always)] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) }
#[inline(always)] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
#[inline(always)] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) }
#[inline(always)] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
#[inline(always)] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
}
impl<K,V,T,R,B: Batch<K,V,T,R>+Abomonation> Batch<K, V, T, R> for Abomonated<B, Vec<u8>> {
type Batcher = AbomonatedBatcher<K, V, T, R, B>;
type Builder = AbomonatedBuilder<K, V, T, R, B>;
type Merger = AbomonatedMerger<K, V, T, R, B>;
}
pub struct AbomonatedBatcher<K,V,T,R,B:Batch<K,V,T,R>> { batcher: B::Batcher }
impl<K,V,T,R,B:Batch<K,V,T,R>+Abomonation> Batcher<K, V, T, R, Abomonated<B,Vec<u8>>> for AbomonatedBatcher<K,V,T,R,B> {
fn new() -> Self { AbomonatedBatcher { batcher: <B::Batcher as Batcher<K,V,T,R,B>>::new() } }
fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) }
fn seal(&mut self, upper: &[T]) -> Abomonated<B, Vec<u8>> {
let batch = self.batcher.seal(upper);
let mut bytes = Vec::with_capacity(measure(&batch));
unsafe { abomonation::encode(&batch, &mut bytes).unwrap() };
unsafe { Abomonated::<B,_>::new(bytes).unwrap() }
}
fn frontier(&mut self) -> &[T] { self.batcher.frontier() }
}
pub struct AbomonatedBuilder<K,V,T,R,B:Batch<K,V,T,R>> { builder: B::Builder }
impl<K,V,T,R,B:Batch<K,V,T,R>+Abomonation> Builder<K, V, T, R, Abomonated<B,Vec<u8>>> for AbomonatedBuilder<K,V,T,R,B> {
fn new() -> Self { AbomonatedBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::new() } }
fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::with_capacity(cap) } }
fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) }
fn done(self, lower: &[T], upper: &[T], since: &[T]) -> Abomonated<B, Vec<u8>> {
let batch = self.builder.done(lower, upper, since);
let mut bytes = Vec::with_capacity(measure(&batch));
unsafe { abomonation::encode(&batch, &mut bytes).unwrap() };
unsafe { Abomonated::<B,_>::new(bytes).unwrap() }
}
}
pub struct AbomonatedMerger<K,V,T,R,B:Batch<K,V,T,R>> { merger: B::Merger }
impl<K,V,T,R,B:Batch<K,V,T,R>+Abomonation> Merger<K, V, T, R, Abomonated<B,Vec<u8>>> for AbomonatedMerger<K,V,T,R,B> {
fn new(source1: &Abomonated<B,Vec<u8>>, source2: &Abomonated<B,Vec<u8>>) -> Self {
AbomonatedMerger { merger: B::begin_merge(source1, source2) }
}
fn work(&mut self, source1: &Abomonated<B,Vec<u8>>, source2: &Abomonated<B,Vec<u8>>, frontier: &Option<Vec<T>>, fuel: &mut usize) {
self.merger.work(source1, source2, frontier, fuel)
}
fn done(self) -> Abomonated<B, Vec<u8>> {
let batch = self.merger.done();
let mut bytes = Vec::with_capacity(measure(&batch));
unsafe { abomonation::encode(&batch, &mut bytes).unwrap() };
unsafe { Abomonated::<B,_>::new(bytes).unwrap() }
}
}
}
pub fn consolidate<T: Ord+Clone, R: Monoid>(vec: &mut Vec<(T, R)>, off: usize) {
consolidate_by(vec, off, |x,y| x.cmp(&y));
}
pub fn consolidate_by<T: Eq+Clone, L: Fn(&T, &T)->::std::cmp::Ordering, R: Monoid>(vec: &mut Vec<(T, R)>, off: usize, cmp: L) {
vec[off..].sort_by(|x,y| cmp(&x.0, &y.0));
for index in (off + 1) .. vec.len() {
if vec[index].0 == vec[index - 1].0 {
let prev = ::std::mem::replace(&mut vec[index - 1].1, R::zero());
vec[index].1 += &prev;
}
}
let mut cursor = off;
for index in off .. vec.len() {
if !vec[index].1.is_zero() {
vec.swap(cursor, index);
cursor += 1;
}
}
vec.truncate(cursor);
}