pub mod cursor;
pub mod description;
pub mod implementations;
pub mod wrappers;
use timely::progress::{Antichain, frontier::AntichainRef};
use timely::progress::Timestamp;
use crate::logging::Logger;
pub use self::cursor::Cursor;
pub use self::description::Description;
use crate::trace::implementations::LayoutExt;
pub type ExertionLogic = std::sync::Arc<dyn for<'a> Fn(&'a [(usize, usize, usize)])->Option<usize>+Send+Sync>;
pub trait TraceReader : LayoutExt {
type Batch:
'static +
Clone +
BatchReader +
WithLayout<Layout = Self::Layout> +
for<'a> LayoutExt<
Key<'a> = Self::Key<'a>,
Val<'a> = Self::Val<'a>,
ValOwn = Self::ValOwn,
Time = Self::Time,
TimeGat<'a> = Self::TimeGat<'a>,
Diff = Self::Diff,
DiffGat<'a> = Self::DiffGat<'a>,
KeyContainer = Self::KeyContainer,
ValContainer = Self::ValContainer,
TimeContainer = Self::TimeContainer,
DiffContainer = Self::DiffContainer,
>;
type Storage;
type Cursor:
Cursor<Storage=Self::Storage> +
WithLayout<Layout = Self::Layout> +
for<'a> LayoutExt<
Key<'a> = Self::Key<'a>,
Val<'a> = Self::Val<'a>,
ValOwn = Self::ValOwn,
Time = Self::Time,
TimeGat<'a> = Self::TimeGat<'a>,
Diff = Self::Diff,
DiffGat<'a> = Self::DiffGat<'a>,
KeyContainer = Self::KeyContainer,
ValContainer = Self::ValContainer,
TimeContainer = Self::TimeContainer,
DiffContainer = Self::DiffContainer,
>;
fn cursor(&mut self) -> (Self::Cursor, Self::Storage) {
if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) {
cursor
}
else {
panic!("unable to acquire complete cursor for trace; is it closed?");
}
}
fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, Self::Storage)>;
fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
fn get_logical_compaction(&mut self) -> AntichainRef<'_, Self::Time>;
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Self::Time>);
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Self::Time>;
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
#[inline]
fn read_upper(&mut self, target: &mut Antichain<Self::Time>) {
target.clear();
target.insert(<Self::Time as timely::progress::Timestamp>::minimum());
self.map_batches(|batch| {
target.clone_from(batch.upper());
});
}
fn advance_upper(&mut self, upper: &mut Antichain<Self::Time>) {
self.map_batches(|batch| {
if batch.is_empty() && batch.lower() == upper {
upper.clone_from(batch.upper());
}
});
}
}
pub trait Trace : TraceReader<Batch: Batch> {
fn new(
info: ::timely::dataflow::operators::generic::OperatorInfo,
logging: Option<crate::logging::Logger>,
activator: Option<timely::scheduling::activate::Activator>,
) -> Self;
fn exert(&mut self);
fn set_exert_logic(&mut self, logic: ExertionLogic);
fn insert(&mut self, batch: Self::Batch);
fn close(&mut self);
}
use crate::trace::implementations::WithLayout;
pub trait BatchReader : LayoutExt + Sized {
type Cursor:
Cursor<Storage=Self> +
WithLayout<Layout = Self::Layout> +
for<'a> LayoutExt<
Key<'a> = Self::Key<'a>,
Val<'a> = Self::Val<'a>,
ValOwn = Self::ValOwn,
Time = Self::Time,
TimeGat<'a> = Self::TimeGat<'a>,
Diff = Self::Diff,
DiffGat<'a> = Self::DiffGat<'a>,
KeyContainer = Self::KeyContainer,
ValContainer = Self::ValContainer,
TimeContainer = Self::TimeContainer,
DiffContainer = Self::DiffContainer,
>;
fn cursor(&self) -> Self::Cursor;
fn len(&self) -> usize;
fn is_empty(&self) -> bool { self.len() == 0 }
fn description(&self) -> &Description<Self::Time>;
fn lower(&self) -> &Antichain<Self::Time> { self.description().lower() }
fn upper(&self) -> &Antichain<Self::Time> { self.description().upper() }
}
pub trait Batch : BatchReader + Sized {
type Merger: Merger<Self>;
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<Self::Time>) -> Self::Merger {
Self::Merger::new(self, other, compaction_frontier)
}
fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self;
}
pub trait Batcher {
type Input;
type Output;
type Time: Timestamp;
fn new(logger: Option<Logger>, operator_id: usize) -> Self;
fn push_container(&mut self, batch: &mut Self::Input);
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
fn frontier(&mut self) -> AntichainRef<'_, Self::Time>;
}
pub trait Builder: Sized {
type Input;
type Time: Timestamp;
type Output;
fn new() -> Self { Self::with_capacity(0, 0, 0) }
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self;
fn push(&mut self, chunk: &mut Self::Input);
fn done(self, description: Description<Self::Time>) -> Self::Output;
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output;
}
pub trait Merger<Output: Batch> {
fn new(source1: &Output, source2: &Output, compaction_frontier: AntichainRef<Output::Time>) -> Self;
fn work(&mut self, source1: &Output, source2: &Output, fuel: &mut isize);
fn done(self) -> Output;
}
pub mod rc_blanket_impls {
use std::rc::Rc;
use timely::progress::{Antichain, frontier::AntichainRef};
use super::{Batch, BatchReader, Builder, Merger, Cursor, Description};
impl<B: BatchReader> WithLayout for Rc<B> {
type Layout = B::Layout;
}
impl<B: BatchReader> BatchReader for Rc<B> {
type Cursor = RcBatchCursor<B::Cursor>;
fn cursor(&self) -> Self::Cursor {
RcBatchCursor::new((**self).cursor())
}
fn len(&self) -> usize { (**self).len() }
fn description(&self) -> &Description<Self::Time> { (**self).description() }
}
pub struct RcBatchCursor<C> {
cursor: C,
}
use crate::trace::implementations::WithLayout;
impl<C: Cursor> WithLayout for RcBatchCursor<C> {
type Layout = C::Layout;
}
impl<C> RcBatchCursor<C> {
fn new(cursor: C) -> Self {
RcBatchCursor {
cursor,
}
}
}
impl<C: Cursor> Cursor for RcBatchCursor<C> {
type Storage = Rc<C::Storage>;
#[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
#[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
#[inline] fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { self.cursor.get_key(storage) }
#[inline] fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { self.cursor.get_val(storage) }
#[inline]
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
}
#[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
#[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
#[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
#[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
#[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
#[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
}
impl<B: Batch> Batch for Rc<B> {
type Merger = RcMerger<B>;
fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
Rc::new(B::empty(lower, upper))
}
}
pub struct RcBuilder<B: Builder> { builder: B }
impl<B: Builder> Builder for RcBuilder<B> {
type Input = B::Input;
type Time = B::Time;
type Output = Rc<B::Output>;
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } }
fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) }
fn done(self, description: Description<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(description)) }
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
Rc::new(B::seal(chain, description))
}
}
pub struct RcMerger<B:Batch> { merger: B::Merger }
impl<B:Batch> Merger<Rc<B>> for RcMerger<B> {
fn new(source1: &Rc<B>, source2: &Rc<B>, compaction_frontier: AntichainRef<B::Time>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
fn work(&mut self, source1: &Rc<B>, source2: &Rc<B>, fuel: &mut isize) { self.merger.work(source1, source2, fuel) }
fn done(self) -> Rc<B> { Rc::new(self.merger.done()) }
}
}