use timely::progress::{frontier::AntichainRef, Antichain};
use crate::lattice::Lattice;
use crate::trace::cursor::Cursor;
use crate::trace::{BatchReader, Description, TraceReader};
pub struct TraceFrontier<Tr: TraceReader> {
trace: Tr,
since: Antichain<Tr::Time>,
until: Antichain<Tr::Time>,
}
impl<Tr: TraceReader + Clone> Clone for TraceFrontier<Tr> {
fn clone(&self) -> Self {
TraceFrontier {
trace: self.trace.clone(),
since: self.since.clone(),
until: self.until.clone(),
}
}
}
impl<Tr: TraceReader> WithLayout for TraceFrontier<Tr> {
type Layout = (
<Tr::Layout as Layout>::KeyContainer,
<Tr::Layout as Layout>::ValContainer,
Vec<Tr::Time>,
<Tr::Layout as Layout>::DiffContainer,
<Tr::Layout as Layout>::OffsetContainer,
);
}
impl<Tr: TraceReader> TraceReader for TraceFrontier<Tr> {
type Batch = BatchFrontier<Tr::Batch>;
type Storage = Tr::Storage;
type Cursor = CursorFrontier<Tr::Cursor, Tr::Time>;
fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
let since = self.since.borrow();
let until = self.until.borrow();
self.trace
.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until)))
}
fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
self.trace.set_logical_compaction(frontier)
}
fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
self.trace.get_logical_compaction()
}
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
self.trace.set_physical_compaction(frontier)
}
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
self.trace.get_physical_compaction()
}
fn cursor_through(
&mut self,
upper: AntichainRef<'_, Tr::Time>,
) -> Option<(Self::Cursor, Self::Storage)> {
let since = self.since.borrow();
let until = self.until.borrow();
self.trace
.cursor_through(upper)
.map(|(x, y)| (CursorFrontier::new(x, since, until), y))
}
}
impl<Tr: TraceReader> TraceFrontier<Tr> {
pub fn make_from(
trace: Tr,
since: AntichainRef<'_, Tr::Time>,
until: AntichainRef<'_, Tr::Time>,
) -> Self {
TraceFrontier {
trace,
since: since.to_owned(),
until: until.to_owned(),
}
}
}
#[derive(Clone)]
pub struct BatchFrontier<B: BatchReader> {
batch: B,
since: Antichain<B::Time>,
until: Antichain<B::Time>,
}
impl<B: BatchReader> WithLayout for BatchFrontier<B> {
type Layout = (
<B::Layout as Layout>::KeyContainer,
<B::Layout as Layout>::ValContainer,
Vec<B::Time>,
<B::Layout as Layout>::DiffContainer,
<B::Layout as Layout>::OffsetContainer,
);
}
impl<B: BatchReader> BatchReader for BatchFrontier<B> {
type Cursor = BatchCursorFrontier<B::Cursor>;
fn cursor(&self) -> Self::Cursor {
BatchCursorFrontier::new(
self.batch.cursor(),
self.since.borrow(),
self.until.borrow(),
)
}
fn len(&self) -> usize {
self.batch.len()
}
fn description(&self) -> &Description<B::Time> {
self.batch.description()
}
}
impl<B: BatchReader> BatchFrontier<B> {
pub fn make_from(batch: B, since: AntichainRef<B::Time>, until: AntichainRef<B::Time>) -> Self {
BatchFrontier {
batch,
since: since.to_owned(),
until: until.to_owned(),
}
}
}
pub struct CursorFrontier<C, T> {
cursor: C,
since: Antichain<T>,
until: Antichain<T>,
}
use crate::trace::implementations::{Layout, WithLayout};
impl<C: Cursor> WithLayout for CursorFrontier<C, C::Time> {
type Layout = (
<C::Layout as Layout>::KeyContainer,
<C::Layout as Layout>::ValContainer,
Vec<C::Time>,
<C::Layout as Layout>::DiffContainer,
<C::Layout as Layout>::OffsetContainer,
);
}
impl<C, T: Clone> CursorFrontier<C, T> {
fn new(cursor: C, since: AntichainRef<T>, until: AntichainRef<T>) -> Self {
CursorFrontier {
cursor,
since: since.to_owned(),
until: until.to_owned(),
}
}
}
impl<C: Cursor> Cursor for CursorFrontier<C, C::Time> {
type Storage = C::Storage;
#[inline]
fn key_valid(&self, storage: &Self::Storage) -> bool {
self.cursor.key_valid(storage)
}
#[inline]
fn val_valid(&self, storage: &Self::Storage) -> bool {
self.cursor.val_valid(storage)
}
#[inline]
fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
self.cursor.key(storage)
}
#[inline]
fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
self.cursor.val(storage)
}
#[inline]
fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
self.cursor.get_key(storage)
}
#[inline]
fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
self.cursor.get_val(storage)
}
#[inline]
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
&mut self,
storage: &Self::Storage,
mut logic: L,
) {
let since = self.since.borrow();
let until = self.until.borrow();
let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
self.cursor.map_times(storage, |time, diff| {
C::clone_time_onto(time, &mut temp);
temp.advance_by(since);
if !until.less_equal(&temp) {
logic(&temp, diff);
}
})
}
#[inline]
fn step_key(&mut self, storage: &Self::Storage) {
self.cursor.step_key(storage)
}
#[inline]
fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
self.cursor.seek_key(storage, key)
}
#[inline]
fn step_val(&mut self, storage: &Self::Storage) {
self.cursor.step_val(storage)
}
#[inline]
fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
self.cursor.seek_val(storage, val)
}
#[inline]
fn rewind_keys(&mut self, storage: &Self::Storage) {
self.cursor.rewind_keys(storage)
}
#[inline]
fn rewind_vals(&mut self, storage: &Self::Storage) {
self.cursor.rewind_vals(storage)
}
}
pub struct BatchCursorFrontier<C: Cursor> {
cursor: C,
since: Antichain<C::Time>,
until: Antichain<C::Time>,
}
impl<C: Cursor> WithLayout for BatchCursorFrontier<C> {
type Layout = (
<C::Layout as Layout>::KeyContainer,
<C::Layout as Layout>::ValContainer,
Vec<C::Time>,
<C::Layout as Layout>::DiffContainer,
<C::Layout as Layout>::OffsetContainer,
);
}
impl<C: Cursor> BatchCursorFrontier<C> {
fn new(cursor: C, since: AntichainRef<C::Time>, until: AntichainRef<C::Time>) -> Self {
BatchCursorFrontier {
cursor,
since: since.to_owned(),
until: until.to_owned(),
}
}
}
impl<C: Cursor<Storage: BatchReader>> Cursor for BatchCursorFrontier<C> {
type Storage = BatchFrontier<C::Storage>;
#[inline]
fn key_valid(&self, storage: &Self::Storage) -> bool {
self.cursor.key_valid(&storage.batch)
}
#[inline]
fn val_valid(&self, storage: &Self::Storage) -> bool {
self.cursor.val_valid(&storage.batch)
}
#[inline]
fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
self.cursor.key(&storage.batch)
}
#[inline]
fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
self.cursor.val(&storage.batch)
}
#[inline]
fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
self.cursor.get_key(&storage.batch)
}
#[inline]
fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
self.cursor.get_val(&storage.batch)
}
#[inline]
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(
&mut self,
storage: &Self::Storage,
mut logic: L,
) {
let since = self.since.borrow();
let until = self.until.borrow();
let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
self.cursor.map_times(&storage.batch, |time, diff| {
C::clone_time_onto(time, &mut temp);
temp.advance_by(since);
if !until.less_equal(&temp) {
logic(&temp, diff);
}
})
}
#[inline]
fn step_key(&mut self, storage: &Self::Storage) {
self.cursor.step_key(&storage.batch)
}
#[inline]
fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
self.cursor.seek_key(&storage.batch, key)
}
#[inline]
fn step_val(&mut self, storage: &Self::Storage) {
self.cursor.step_val(&storage.batch)
}
#[inline]
fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
self.cursor.seek_val(&storage.batch, val)
}
#[inline]
fn rewind_keys(&mut self, storage: &Self::Storage) {
self.cursor.rewind_keys(&storage.batch)
}
#[inline]
fn rewind_vals(&mut self, storage: &Self::Storage) {
self.cursor.rewind_vals(&storage.batch)
}
}