pub mod cursor;
pub mod description;
pub mod implementations;
pub mod wrappers;
use timely::progress::Timestamp;
use timely::progress::{frontier::AntichainRef, Antichain};
pub use self::cursor::Cursor;
pub use self::description::Description;
use crate::logging::Logger;
use crate::trace::implementations::LayoutExt;
pub type ExertionLogic =
std::sync::Arc<dyn for<'a> Fn(&'a [(usize, usize, usize)]) -> Option<usize> + Send + Sync>;
pub trait TraceReader: LayoutExt {
type Batch: 'static
+ Clone
+ BatchReader
+ WithLayout<Layout = Self::Layout>
+ for<'a> LayoutExt<
Key<'a> = Self::Key<'a>,
KeyOwn = Self::KeyOwn,
Val<'a> = Self::Val<'a>,
ValOwn = Self::ValOwn,
Time = Self::Time,
TimeGat<'a> = Self::TimeGat<'a>,
Diff = Self::Diff,
DiffGat<'a> = Self::DiffGat<'a>,
KeyContainer = Self::KeyContainer,
ValContainer = Self::ValContainer,
TimeContainer = Self::TimeContainer,
DiffContainer = Self::DiffContainer,
>;
type Storage;
type Cursor: Cursor<Storage = Self::Storage>
+ WithLayout<Layout = Self::Layout>
+ for<'a> LayoutExt<
Key<'a> = Self::Key<'a>,
KeyOwn = Self::KeyOwn,
Val<'a> = Self::Val<'a>,
ValOwn = Self::ValOwn,
Time = Self::Time,
TimeGat<'a> = Self::TimeGat<'a>,
Diff = Self::Diff,
DiffGat<'a> = Self::DiffGat<'a>,
KeyContainer = Self::KeyContainer,
ValContainer = Self::ValContainer,
TimeContainer = Self::TimeContainer,
DiffContainer = Self::DiffContainer,
>;
fn cursor(&mut self) -> (Self::Cursor, Self::Storage) {
if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) {
cursor
} else {
panic!("unable to acquire complete cursor for trace; is it closed?");
}
}
fn cursor_through(
&mut self,
upper: AntichainRef<Self::Time>,
) -> Option<(Self::Cursor, Self::Storage)>;
fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
fn get_logical_compaction(&mut self) -> AntichainRef<'_, Self::Time>;
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Self::Time>);
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Self::Time>;
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
#[inline]
fn read_upper(&mut self, target: &mut Antichain<Self::Time>) {
target.clear();
target.insert(<Self::Time as timely::progress::Timestamp>::minimum());
self.map_batches(|batch| {
target.clone_from(batch.upper());
});
}
fn advance_upper(&mut self, upper: &mut Antichain<Self::Time>) {
self.map_batches(|batch| {
if batch.is_empty() && batch.lower() == upper {
upper.clone_from(batch.upper());
}
});
}
}
pub trait Trace: TraceReader<Batch: Batch> {
fn new(
info: ::timely::dataflow::operators::generic::OperatorInfo,
logging: Option<crate::logging::Logger>,
activator: Option<timely::scheduling::activate::Activator>,
) -> Self;
fn exert(&mut self);
fn set_exert_logic(&mut self, logic: ExertionLogic);
fn insert(&mut self, batch: Self::Batch);
fn close(&mut self);
}
use crate::trace::implementations::WithLayout;
pub trait BatchReader: LayoutExt + Sized {
type Cursor: Cursor<Storage = Self>
+ WithLayout<Layout = Self::Layout>
+ for<'a> LayoutExt<
Key<'a> = Self::Key<'a>,
KeyOwn = Self::KeyOwn,
Val<'a> = Self::Val<'a>,
ValOwn = Self::ValOwn,
Time = Self::Time,
TimeGat<'a> = Self::TimeGat<'a>,
Diff = Self::Diff,
DiffGat<'a> = Self::DiffGat<'a>,
KeyContainer = Self::KeyContainer,
ValContainer = Self::ValContainer,
TimeContainer = Self::TimeContainer,
DiffContainer = Self::DiffContainer,
>;
fn cursor(&self) -> Self::Cursor;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn description(&self) -> &Description<Self::Time>;
fn lower(&self) -> &Antichain<Self::Time> {
self.description().lower()
}
fn upper(&self) -> &Antichain<Self::Time> {
self.description().upper()
}
}
pub trait Batch: BatchReader + Sized {
type Merger: Merger<Self>;
fn begin_merge(
&self,
other: &Self,
compaction_frontier: AntichainRef<Self::Time>,
) -> Self::Merger {
Self::Merger::new(self, other, compaction_frontier)
}
fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self;
}
pub trait Batcher {
type Input;
type Output;
type Time: Timestamp;
fn new(logger: Option<Logger>, operator_id: usize) -> Self;
fn push_container(&mut self, batch: &mut Self::Input);
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
&mut self,
upper: Antichain<Self::Time>,
) -> B::Output;
fn frontier(&mut self) -> AntichainRef<'_, Self::Time>;
}
pub trait Builder: Sized {
type Input;
type Time: Timestamp;
type Output;
fn new() -> Self {
Self::with_capacity(0, 0, 0)
}
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self;
fn push(&mut self, chunk: &mut Self::Input);
fn done(self, description: Description<Self::Time>) -> Self::Output;
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output;
}
pub trait Merger<Output: Batch> {
fn new(
source1: &Output,
source2: &Output,
compaction_frontier: AntichainRef<Output::Time>,
) -> Self;
fn work(&mut self, source1: &Output, source2: &Output, fuel: &mut isize);
fn done(self) -> Output;
}
pub mod rc_blanket_impls {
use std::rc::Rc;
use super::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use timely::progress::{frontier::AntichainRef, Antichain};
impl<B: BatchReader> WithLayout for Rc<B> {
type Layout = B::Layout;
}
impl<B: BatchReader> BatchReader for Rc<B> {
type Cursor = RcBatchCursor<B::Cursor>;
fn cursor(&self) -> Self::Cursor {
RcBatchCursor::new((**self).cursor())
}
fn len(&self) -> usize {
(**self).len()
}
fn description(&self) -> &Description<Self::Time> {
(**self).description()
}
}
pub struct RcBatchCursor<C> {
cursor: C,
}
use crate::trace::implementations::WithLayout;
impl<C: Cursor> WithLayout for RcBatchCursor<C> {
type Layout = C::Layout;
}
impl<C> RcBatchCursor<C> {
fn new(cursor: C) -> Self {
RcBatchCursor { cursor }
}
}
impl<C: Cursor> Cursor for RcBatchCursor<C> {
type Storage = Rc<C::Storage>;
#[inline]
fn key_valid(&self, storage: &Self::Storage) -> bool {
self.cursor.key_valid(storage)
}
#[inline]
fn val_valid(&self, storage: &Self::Storage) -> bool {
self.cursor.val_valid(storage)
}
#[inline]
fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
self.cursor.key(storage)
}
#[inline]
fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
self.cursor.val(storage)
}
#[inline]
fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
self.cursor.get_key(storage)
}
#[inline]
fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
self.cursor.get_val(storage)
}
#[inline]
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
&mut self,
storage: &Self::Storage,
logic: L,
) {
self.cursor.map_times(storage, logic)
}
#[inline]
fn step_key(&mut self, storage: &Self::Storage) {
self.cursor.step_key(storage)
}
#[inline]
fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
self.cursor.seek_key(storage, key)
}
#[inline]
fn step_val(&mut self, storage: &Self::Storage) {
self.cursor.step_val(storage)
}
#[inline]
fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
self.cursor.seek_val(storage, val)
}
#[inline]
fn rewind_keys(&mut self, storage: &Self::Storage) {
self.cursor.rewind_keys(storage)
}
#[inline]
fn rewind_vals(&mut self, storage: &Self::Storage) {
self.cursor.rewind_vals(storage)
}
}
impl<B: Batch> Batch for Rc<B> {
type Merger = RcMerger<B>;
fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
Rc::new(B::empty(lower, upper))
}
}
pub struct RcBuilder<B: Builder> {
builder: B,
}
impl<B: Builder> Builder for RcBuilder<B> {
type Input = B::Input;
type Time = B::Time;
type Output = Rc<B::Output>;
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
RcBuilder {
builder: B::with_capacity(keys, vals, upds),
}
}
fn push(&mut self, input: &mut Self::Input) {
self.builder.push(input)
}
fn done(self, description: Description<Self::Time>) -> Rc<B::Output> {
Rc::new(self.builder.done(description))
}
fn seal(
chain: &mut Vec<Self::Input>,
description: Description<Self::Time>,
) -> Self::Output {
Rc::new(B::seal(chain, description))
}
}
pub struct RcMerger<B: Batch> {
merger: B::Merger,
}
impl<B: Batch> Merger<Rc<B>> for RcMerger<B> {
fn new(
source1: &Rc<B>,
source2: &Rc<B>,
compaction_frontier: AntichainRef<B::Time>,
) -> Self {
RcMerger {
merger: B::begin_merge(source1, source2, compaction_frontier),
}
}
fn work(&mut self, source1: &Rc<B>, source2: &Rc<B>, fuel: &mut isize) {
self.merger.work(source1, source2, fuel)
}
fn done(self) -> Rc<B> {
Rc::new(self.merger.done())
}
}
}