pub mod cursor;
pub mod description;
pub mod implementations;
pub mod layers;
pub mod wrappers;
use timely::progress::{Antichain, frontier::AntichainRef};
use timely::progress::Timestamp;
pub use self::cursor::Cursor;
pub use self::description::Description;
pub trait TraceReader {
type Key;
type Val;
type Time;
type R;
type Batch: BatchReader<Self::Key, Self::Val, Self::Time, Self::R>+Clone+'static;
type Cursor: Cursor<Self::Key, Self::Val, Self::Time, Self::R>;
fn cursor(&mut self) -> (Self::Cursor, <Self::Cursor as Cursor<Self::Key, Self::Val, Self::Time, Self::R>>::Storage) {
if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) {
cursor
}
else {
panic!("unable to acquire complete cursor for trace; is it closed?");
}
}
fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, <Self::Cursor as Cursor<Self::Key, Self::Val, Self::Time, Self::R>>::Storage)>;
fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
#[deprecated(since = "0.11", note = "please use `set_logical_compaction`")]
fn advance_by(&mut self, frontier: AntichainRef<Self::Time>) {
self.set_logical_compaction(frontier);
}
fn get_logical_compaction(&mut self) -> AntichainRef<Self::Time>;
#[deprecated(since = "0.11", note = "please use `get_logical_compaction`")]
fn advance_frontier(&mut self) -> AntichainRef<Self::Time> {
self.get_logical_compaction()
}
fn set_physical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
#[deprecated(since = "0.11", note = "please use `set_physical_compaction`")]
fn distinguish_since(&mut self, frontier: AntichainRef<Self::Time>) {
self.set_physical_compaction(frontier);
}
fn get_physical_compaction(&mut self) -> AntichainRef<Self::Time>;
#[deprecated(since = "0.11", note = "please use `get_physical_compaction`")]
fn distinguish_frontier(&mut self) -> AntichainRef<Self::Time> {
self.get_physical_compaction()
}
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
fn read_upper(&mut self, target: &mut Antichain<Self::Time>)
where
Self::Time: Timestamp,
{
target.clear();
target.insert(<Self::Time as timely::progress::Timestamp>::minimum());
self.map_batches(|batch| {
target.clone_from(batch.upper());
});
}
fn advance_upper(&mut self, upper: &mut Antichain<Self::Time>)
where
Self::Time: Timestamp,
{
self.map_batches(|batch| {
if batch.is_empty() && batch.lower() == upper {
upper.clone_from(batch.upper());
}
});
}
}
pub trait Trace : TraceReader
where <Self as TraceReader>::Batch: Batch<Self::Key, Self::Val, Self::Time, Self::R> {
fn new(
info: ::timely::dataflow::operators::generic::OperatorInfo,
logging: Option<::logging::Logger>,
activator: Option<timely::scheduling::activate::Activator>,
) -> Self;
fn exert(&mut self, effort: &mut isize);
fn insert(&mut self, batch: Self::Batch);
fn close(&mut self);
}
pub trait BatchReader<K, V, T, R>
where
Self: ::std::marker::Sized,
{
type Cursor: Cursor<K, V, T, R, Storage=Self>;
fn cursor(&self) -> Self::Cursor;
fn len(&self) -> usize;
fn is_empty(&self) -> bool { self.len() == 0 }
fn description(&self) -> &Description<T>;
fn lower(&self) -> &Antichain<T> { self.description().lower() }
fn upper(&self) -> &Antichain<T> { self.description().upper() }
}
pub trait Batch<K, V, T, R> : BatchReader<K, V, T, R> where Self: ::std::marker::Sized {
type Batcher: Batcher<K, V, T, R, Self>;
type Builder: Builder<K, V, T, R, Self>;
type Merger: Merger<K, V, T, R, Self>;
fn begin_merge(&self, other: &Self, compaction_frontier: Option<AntichainRef<T>>) -> Self::Merger {
Self::Merger::new(self, other, compaction_frontier)
}
fn empty(lower: Antichain<T>, upper: Antichain<T>, since: Antichain<T>) -> Self {
<Self::Builder>::new().done(lower, upper, since)
}
}
pub trait Batcher<K, V, T, R, Output: Batch<K, V, T, R>> {
fn new() -> Self;
fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>);
fn seal(&mut self, upper: Antichain<T>) -> Output;
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T>;
}
pub trait Builder<K, V, T, R, Output: Batch<K, V, T, R>> {
fn new() -> Self;
fn with_capacity(cap: usize) -> Self;
fn push(&mut self, element: (K, V, T, R));
fn extend<I: Iterator<Item=(K,V,T,R)>>(&mut self, iter: I) {
for item in iter { self.push(item); }
}
fn done(self, lower: Antichain<T>, upper: Antichain<T>, since: Antichain<T>) -> Output;
}
pub trait Merger<K, V, T, R, Output: Batch<K, V, T, R>> {
fn new(source1: &Output, source2: &Output, compaction_frontier: Option<AntichainRef<T>>) -> Self;
fn work(&mut self, source1: &Output, source2: &Output, fuel: &mut isize);
fn done(self) -> Output;
}
pub mod rc_blanket_impls {
use std::rc::Rc;
use timely::progress::{Antichain, frontier::AntichainRef};
use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description};
impl<K, V, T, R, B: BatchReader<K,V,T,R>> BatchReader<K,V,T,R> for Rc<B> {
type Cursor = RcBatchCursor<K, V, T, R, B>;
fn cursor(&self) -> Self::Cursor {
RcBatchCursor::new((&**self).cursor())
}
fn len(&self) -> usize { (&**self).len() }
fn description(&self) -> &Description<T> { (&**self).description() }
}
pub struct RcBatchCursor<K, V, T, R, B: BatchReader<K, V, T, R>> {
phantom: ::std::marker::PhantomData<(K, V, T, R)>,
cursor: B::Cursor,
}
impl<K, V, T, R, B: BatchReader<K, V, T, R>> RcBatchCursor<K, V, T, R, B> {
fn new(cursor: B::Cursor) -> Self {
RcBatchCursor {
cursor,
phantom: ::std::marker::PhantomData,
}
}
}
impl<K, V, T, R, B: BatchReader<K, V, T, R>> Cursor<K, V, T, R> for RcBatchCursor<K, V, T, R, B> {
type Storage = Rc<B>;
#[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
#[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) }
#[inline]
fn map_times<L: FnMut(&T, &R)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
}
#[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
#[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) }
#[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
#[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) }
#[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
#[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
}
impl<K,V,T,R,B: Batch<K,V,T,R>> Batch<K, V, T, R> for Rc<B> {
type Batcher = RcBatcher<K, V, T, R, B>;
type Builder = RcBuilder<K, V, T, R, B>;
type Merger = RcMerger<K, V, T, R, B>;
}
pub struct RcBatcher<K,V,T,R,B:Batch<K,V,T,R>> { batcher: B::Batcher }
impl<K,V,T,R,B:Batch<K,V,T,R>> Batcher<K, V, T, R, Rc<B>> for RcBatcher<K,V,T,R,B> {
fn new() -> Self { RcBatcher { batcher: <B::Batcher as Batcher<K,V,T,R,B>>::new() } }
fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) }
fn seal(&mut self, upper: Antichain<T>) -> Rc<B> { Rc::new(self.batcher.seal(upper)) }
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> { self.batcher.frontier() }
}
pub struct RcBuilder<K,V,T,R,B:Batch<K,V,T,R>> { builder: B::Builder }
impl<K,V,T,R,B:Batch<K,V,T,R>> Builder<K, V, T, R, Rc<B>> for RcBuilder<K,V,T,R,B> {
fn new() -> Self { RcBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::new() } }
fn with_capacity(cap: usize) -> Self { RcBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::with_capacity(cap) } }
fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) }
fn done(self, lower: Antichain<T>, upper: Antichain<T>, since: Antichain<T>) -> Rc<B> { Rc::new(self.builder.done(lower, upper, since)) }
}
pub struct RcMerger<K,V,T,R,B:Batch<K,V,T,R>> { merger: B::Merger }
impl<K,V,T,R,B:Batch<K,V,T,R>> Merger<K, V, T, R, Rc<B>> for RcMerger<K,V,T,R,B> {
fn new(source1: &Rc<B>, source2: &Rc<B>, compaction_frontier: Option<AntichainRef<T>>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
fn work(&mut self, source1: &Rc<B>, source2: &Rc<B>, fuel: &mut isize) { self.merger.work(source1, source2, fuel) }
fn done(self) -> Rc<B> { Rc::new(self.merger.done()) }
}
}
pub mod abomonated_blanket_impls {
extern crate abomonation;
use abomonation::{Abomonation, measure};
use abomonation::abomonated::Abomonated;
use timely::progress::{Antichain, frontier::AntichainRef};
use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description};
impl<K, V, T, R, B: BatchReader<K,V,T,R>+Abomonation> BatchReader<K,V,T,R> for Abomonated<B, Vec<u8>> {
type Cursor = AbomonatedBatchCursor<K, V, T, R, B>;
fn cursor(&self) -> Self::Cursor {
AbomonatedBatchCursor::new((&**self).cursor())
}
fn len(&self) -> usize { (&**self).len() }
fn description(&self) -> &Description<T> { (&**self).description() }
}
pub struct AbomonatedBatchCursor<K, V, T, R, B: BatchReader<K, V, T, R>> {
phantom: ::std::marker::PhantomData<(K, V, T, R)>,
cursor: B::Cursor,
}
impl<K, V, T, R, B: BatchReader<K, V, T, R>> AbomonatedBatchCursor<K, V, T, R, B> {
fn new(cursor: B::Cursor) -> Self {
AbomonatedBatchCursor {
cursor,
phantom: ::std::marker::PhantomData,
}
}
}
impl<K, V, T, R, B: BatchReader<K, V, T, R>+Abomonation> Cursor<K, V, T, R> for AbomonatedBatchCursor<K, V, T, R, B> {
type Storage = Abomonated<B, Vec<u8>>;
#[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
#[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) }
#[inline]
fn map_times<L: FnMut(&T, &R)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
}
#[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
#[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) }
#[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
#[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) }
#[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
#[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
}
impl<K,V,T,R,B: Batch<K,V,T,R>+Abomonation> Batch<K, V, T, R> for Abomonated<B, Vec<u8>> {
type Batcher = AbomonatedBatcher<K, V, T, R, B>;
type Builder = AbomonatedBuilder<K, V, T, R, B>;
type Merger = AbomonatedMerger<K, V, T, R, B>;
}
pub struct AbomonatedBatcher<K,V,T,R,B:Batch<K,V,T,R>> { batcher: B::Batcher }
impl<K,V,T,R,B:Batch<K,V,T,R>+Abomonation> Batcher<K, V, T, R, Abomonated<B,Vec<u8>>> for AbomonatedBatcher<K,V,T,R,B> {
fn new() -> Self { AbomonatedBatcher { batcher: <B::Batcher as Batcher<K,V,T,R,B>>::new() } }
fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) }
fn seal(&mut self, upper: Antichain<T>) -> Abomonated<B, Vec<u8>> {
let batch = self.batcher.seal(upper);
let mut bytes = Vec::with_capacity(measure(&batch));
unsafe { abomonation::encode(&batch, &mut bytes).unwrap() };
unsafe { Abomonated::<B,_>::new(bytes).unwrap() }
}
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> { self.batcher.frontier() }
}
pub struct AbomonatedBuilder<K,V,T,R,B:Batch<K,V,T,R>> { builder: B::Builder }
impl<K,V,T,R,B:Batch<K,V,T,R>+Abomonation> Builder<K, V, T, R, Abomonated<B,Vec<u8>>> for AbomonatedBuilder<K,V,T,R,B> {
fn new() -> Self { AbomonatedBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::new() } }
fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::with_capacity(cap) } }
fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) }
fn done(self, lower: Antichain<T>, upper: Antichain<T>, since: Antichain<T>) -> Abomonated<B, Vec<u8>> {
let batch = self.builder.done(lower, upper, since);
let mut bytes = Vec::with_capacity(measure(&batch));
unsafe { abomonation::encode(&batch, &mut bytes).unwrap() };
unsafe { Abomonated::<B,_>::new(bytes).unwrap() }
}
}
pub struct AbomonatedMerger<K,V,T,R,B:Batch<K,V,T,R>> { merger: B::Merger }
impl<K,V,T,R,B:Batch<K,V,T,R>+Abomonation> Merger<K, V, T, R, Abomonated<B,Vec<u8>>> for AbomonatedMerger<K,V,T,R,B> {
fn new(source1: &Abomonated<B,Vec<u8>>, source2: &Abomonated<B,Vec<u8>>, compaction_frontier: Option<AntichainRef<T>>) -> Self {
AbomonatedMerger { merger: B::begin_merge(source1, source2, compaction_frontier) }
}
fn work(&mut self, source1: &Abomonated<B,Vec<u8>>, source2: &Abomonated<B,Vec<u8>>, fuel: &mut isize) {
self.merger.work(source1, source2, fuel)
}
fn done(self) -> Abomonated<B, Vec<u8>> {
let batch = self.merger.done();
let mut bytes = Vec::with_capacity(measure(&batch));
unsafe { abomonation::encode(&batch, &mut bytes).unwrap() };
unsafe { Abomonated::<B,_>::new(bytes).unwrap() }
}
}
}