use timely::progress::timestamp::Refines;
use timely::progress::{frontier::AntichainRef, Antichain};
use crate::lattice::Lattice;
use crate::trace::cursor::Cursor;
use crate::trace::{BatchReader, Description, TraceReader};
pub struct TraceEnter<Tr: TraceReader, TInner, F, G> {
trace: Tr,
stash1: Antichain<Tr::Time>,
stash2: Antichain<TInner>,
logic: F,
prior: G,
}
impl<Tr, TInner, F, G> Clone for TraceEnter<Tr, TInner, F, G>
where
Tr: TraceReader + Clone,
F: Clone,
G: Clone,
{
fn clone(&self) -> Self {
TraceEnter {
trace: self.trace.clone(),
stash1: Antichain::new(),
stash2: Antichain::new(),
logic: self.logic.clone(),
prior: self.prior.clone(),
}
}
}
impl<Tr, TInner, F, G> WithLayout for TraceEnter<Tr, TInner, F, G>
where
Tr: TraceReader<Batch: Clone>,
TInner: Refines<Tr::Time> + Lattice,
F: Clone,
G: Clone,
{
type Layout = (
<Tr::Layout as Layout>::KeyContainer,
<Tr::Layout as Layout>::ValContainer,
Vec<TInner>,
<Tr::Layout as Layout>::DiffContainer,
<Tr::Layout as Layout>::OffsetContainer,
);
}
impl<Tr, TInner, F, G> TraceReader for TraceEnter<Tr, TInner, F, G>
where
Tr: TraceReader<Batch: Clone>,
TInner: Refines<Tr::Time> + Lattice,
F: 'static,
F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>) -> TInner + Clone,
G: FnMut(&TInner) -> Tr::Time + Clone + 'static,
{
type Batch = BatchEnter<Tr::Batch, TInner, F>;
type Storage = Tr::Storage;
type Cursor = CursorEnter<Tr::Cursor, TInner, F>;
fn map_batches<F2: FnMut(&Self::Batch)>(&self, mut f: F2) {
let logic = self.logic.clone();
self.trace.map_batches(|batch| {
f(&Self::Batch::make_from(batch.clone(), logic.clone()));
})
}
fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
self.stash1.clear();
for time in frontier.iter() {
self.stash1.insert((self.prior)(time));
}
self.trace.set_logical_compaction(self.stash1.borrow());
}
fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> {
self.stash2.clear();
for time in self.trace.get_logical_compaction().iter() {
self.stash2.insert(TInner::to_inner(time.clone()));
}
self.stash2.borrow()
}
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
self.stash1.clear();
for time in frontier.iter() {
self.stash1.insert((self.prior)(time));
}
self.trace.set_physical_compaction(self.stash1.borrow());
}
fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> {
self.stash2.clear();
for time in self.trace.get_physical_compaction().iter() {
self.stash2.insert(TInner::to_inner(time.clone()));
}
self.stash2.borrow()
}
fn cursor_through(
&mut self,
upper: AntichainRef<TInner>,
) -> Option<(Self::Cursor, Self::Storage)> {
self.stash1.clear();
for time in upper.iter() {
self.stash1.insert(time.clone().to_outer());
}
self.trace
.cursor_through(self.stash1.borrow())
.map(|(x, y)| (CursorEnter::new(x, self.logic.clone()), y))
}
}
impl<Tr, TInner, F, G> TraceEnter<Tr, TInner, F, G>
where
Tr: TraceReader,
TInner: Refines<Tr::Time> + Lattice,
{
pub fn make_from(trace: Tr, logic: F, prior: G) -> Self {
TraceEnter {
trace,
stash1: Antichain::new(),
stash2: Antichain::new(),
logic,
prior,
}
}
}
#[derive(Clone)]
pub struct BatchEnter<B, TInner, F> {
batch: B,
description: Description<TInner>,
logic: F,
}
impl<B, TInner, F> WithLayout for BatchEnter<B, TInner, F>
where
B: BatchReader,
TInner: Refines<B::Time> + Lattice,
{
type Layout = (
<B::Layout as Layout>::KeyContainer,
<B::Layout as Layout>::ValContainer,
Vec<TInner>,
<B::Layout as Layout>::DiffContainer,
<B::Layout as Layout>::OffsetContainer,
);
}
use crate::trace::implementations::LayoutExt;
impl<B, TInner, F> BatchReader for BatchEnter<B, TInner, F>
where
B: BatchReader,
TInner: Refines<B::Time> + Lattice,
F: FnMut(B::Key<'_>, <B::Cursor as LayoutExt>::Val<'_>, B::TimeGat<'_>) -> TInner + Clone,
{
type Cursor = BatchCursorEnter<B::Cursor, TInner, F>;
fn cursor(&self) -> Self::Cursor {
BatchCursorEnter::new(self.batch.cursor(), self.logic.clone())
}
fn len(&self) -> usize {
self.batch.len()
}
fn description(&self) -> &Description<TInner> {
&self.description
}
}
impl<B, TInner, F> BatchEnter<B, TInner, F>
where
B: BatchReader,
TInner: Refines<B::Time> + Lattice,
{
pub fn make_from(batch: B, logic: F) -> Self {
let lower: Vec<_> = batch
.description()
.lower()
.elements()
.iter()
.map(|x| TInner::to_inner(x.clone()))
.collect();
let upper: Vec<_> = batch
.description()
.upper()
.elements()
.iter()
.map(|x| TInner::to_inner(x.clone()))
.collect();
let since: Vec<_> = batch
.description()
.since()
.elements()
.iter()
.map(|x| TInner::to_inner(x.clone()))
.collect();
BatchEnter {
batch,
description: Description::new(
Antichain::from(lower),
Antichain::from(upper),
Antichain::from(since),
),
logic,
}
}
}
pub struct CursorEnter<C, TInner, F> {
phantom: ::std::marker::PhantomData<TInner>,
cursor: C,
logic: F,
}
use crate::trace::implementations::{Layout, WithLayout};
impl<C, TInner, F> WithLayout for CursorEnter<C, TInner, F>
where
C: Cursor,
TInner: Refines<C::Time> + Lattice,
{
type Layout = (
<C::Layout as Layout>::KeyContainer,
<C::Layout as Layout>::ValContainer,
Vec<TInner>,
<C::Layout as Layout>::DiffContainer,
<C::Layout as Layout>::OffsetContainer,
);
}
impl<C, TInner, F> CursorEnter<C, TInner, F> {
fn new(cursor: C, logic: F) -> Self {
CursorEnter {
phantom: ::std::marker::PhantomData,
cursor,
logic,
}
}
}
impl<C, TInner, F> Cursor for CursorEnter<C, TInner, F>
where
C: Cursor,
TInner: Refines<C::Time> + Lattice,
F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>) -> TInner,
{
type Storage = C::Storage;
#[inline]
fn key_valid(&self, storage: &Self::Storage) -> bool {
self.cursor.key_valid(storage)
}
#[inline]
fn val_valid(&self, storage: &Self::Storage) -> bool {
self.cursor.val_valid(storage)
}
#[inline]
fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
self.cursor.key(storage)
}
#[inline]
fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
self.cursor.val(storage)
}
#[inline]
fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
self.cursor.get_key(storage)
}
#[inline]
fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
self.cursor.get_val(storage)
}
#[inline]
fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(
&mut self,
storage: &Self::Storage,
mut logic: L,
) {
let key = self.key(storage);
let val = self.val(storage);
let logic2 = &mut self.logic;
self.cursor
.map_times(storage, |time, diff| logic(&logic2(key, val, time), diff))
}
#[inline]
fn step_key(&mut self, storage: &Self::Storage) {
self.cursor.step_key(storage)
}
#[inline]
fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
self.cursor.seek_key(storage, key)
}
#[inline]
fn step_val(&mut self, storage: &Self::Storage) {
self.cursor.step_val(storage)
}
#[inline]
fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
self.cursor.seek_val(storage, val)
}
#[inline]
fn rewind_keys(&mut self, storage: &Self::Storage) {
self.cursor.rewind_keys(storage)
}
#[inline]
fn rewind_vals(&mut self, storage: &Self::Storage) {
self.cursor.rewind_vals(storage)
}
}
pub struct BatchCursorEnter<C, TInner, F> {
phantom: ::std::marker::PhantomData<TInner>,
cursor: C,
logic: F,
}
impl<C, TInner, F> WithLayout for BatchCursorEnter<C, TInner, F>
where
C: Cursor,
TInner: Refines<C::Time> + Lattice,
{
type Layout = (
<C::Layout as Layout>::KeyContainer,
<C::Layout as Layout>::ValContainer,
Vec<TInner>,
<C::Layout as Layout>::DiffContainer,
<C::Layout as Layout>::OffsetContainer,
);
}
impl<C, TInner, F> BatchCursorEnter<C, TInner, F> {
fn new(cursor: C, logic: F) -> Self {
BatchCursorEnter {
phantom: ::std::marker::PhantomData,
cursor,
logic,
}
}
}
impl<TInner, C: Cursor, F> Cursor for BatchCursorEnter<C, TInner, F>
where
TInner: Refines<C::Time> + Lattice,
F: FnMut(C::Key<'_>, C::Val<'_>, C::TimeGat<'_>) -> TInner,
{
type Storage = BatchEnter<C::Storage, TInner, F>;
#[inline]
fn key_valid(&self, storage: &Self::Storage) -> bool {
self.cursor.key_valid(&storage.batch)
}
#[inline]
fn val_valid(&self, storage: &Self::Storage) -> bool {
self.cursor.val_valid(&storage.batch)
}
#[inline]
fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> {
self.cursor.key(&storage.batch)
}
#[inline]
fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> {
self.cursor.val(&storage.batch)
}
#[inline]
fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> {
self.cursor.get_key(&storage.batch)
}
#[inline]
fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> {
self.cursor.get_val(&storage.batch)
}
#[inline]
fn map_times<L: FnMut(&TInner, Self::DiffGat<'_>)>(
&mut self,
storage: &Self::Storage,
mut logic: L,
) {
let key = self.key(storage);
let val = self.val(storage);
let logic2 = &mut self.logic;
self.cursor.map_times(&storage.batch, |time, diff| {
logic(&logic2(key, val, time), diff)
})
}
#[inline]
fn step_key(&mut self, storage: &Self::Storage) {
self.cursor.step_key(&storage.batch)
}
#[inline]
fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
self.cursor.seek_key(&storage.batch, key)
}
#[inline]
fn step_val(&mut self, storage: &Self::Storage) {
self.cursor.step_val(&storage.batch)
}
#[inline]
fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) {
self.cursor.seek_val(&storage.batch, val)
}
#[inline]
fn rewind_keys(&mut self, storage: &Self::Storage) {
self.cursor.rewind_keys(&storage.batch)
}
#[inline]
fn rewind_vals(&mut self, storage: &Self::Storage) {
self.cursor.rewind_vals(&storage.batch)
}
}