use std::rc::Rc;
use crate::trace::implementations::chunker::ContainerChunker;
use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::implementations::merge_batcher::MergeBatcher;
use crate::trace::implementations::merge_batcher::container::VecInternalMerger;
use crate::trace::rc_blanket_impls::RcBuilder;
use super::{Layout, Vector};
pub use self::val_batch::{OrdValBatch, OrdValBuilder};
pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ContainerChunker<Vec<((K,V),T,R)>>, VecInternalMerger<(K, V), T, R>>;
pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ContainerChunker<Vec<((K,()),T,R)>>, VecInternalMerger<(K, ()), T, R>>;
pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
pub use layers::{Vals, Upds};
pub mod layers {
use serde::{Deserialize, Serialize};
use crate::trace::implementations::BatchContainer;
#[derive(Debug, Serialize, Deserialize)]
pub struct Vals<O, V> {
pub offs: O,
pub vals: V,
}
impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
fn default() -> Self { Self::with_capacity(0, 0) }
}
impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
#[inline(always)] pub fn bounds(&self, index: usize) -> (usize, usize) {
(self.offs.index(index), self.offs.index(index+1))
}
pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
self.get_abs(self.bounds(list_idx).0 + item_idx)
}
pub fn len(&self) -> usize { self.offs.len() - 1 }
pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
self.vals.index(index)
}
pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
let mut offs = <O as BatchContainer>::with_capacity(o_size);
offs.push_ref(0);
Self {
offs,
vals: <V as BatchContainer>::with_capacity(v_size),
}
}
pub fn merge_capacity(this: &Self, that: &Self) -> Self {
let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
offs.push_ref(0);
Self {
offs,
vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Upds<O, T, D> {
pub offs: O,
pub times: T,
pub diffs: D,
}
impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {
fn default() -> Self { Self::with_capacity(0, 0) }
}
impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Upds<O, T, D> {
pub fn bounds(&self, index: usize) -> (usize, usize) {
let mut lower = self.offs.index(index);
let upper = self.offs.index(index+1);
if lower == upper {
assert!(lower > 0);
lower -= 1;
}
(lower, upper)
}
pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
self.get_abs(self.bounds(list_idx).0 + item_idx)
}
pub fn len(&self) -> usize { self.offs.len() - 1 }
pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
(self.times.index(index), self.diffs.index(index))
}
pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
let mut offs = <O as BatchContainer>::with_capacity(o_size);
offs.push_ref(0);
Self {
offs,
times: <T as BatchContainer>::with_capacity(u_size),
diffs: <D as BatchContainer>::with_capacity(u_size),
}
}
pub fn merge_capacity(this: &Self, that: &Self) -> Self {
let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
offs.push_ref(0);
Self {
offs,
times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
}
}
}
pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
stash: Vec<(T::Owned, D::Owned)>,
total: usize,
time_con: T,
diff_con: D,
}
impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
}
impl<T, D> UpdsBuilder<T, D>
where
T: BatchContainer<Owned: Ord>,
D: BatchContainer<Owned: crate::difference::Semigroup>,
{
pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
self.stash.push((time, diff));
}
pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
use crate::consolidation;
consolidation::consolidate(&mut self.stash);
if self.stash.is_empty() { return false; }
if self.stash.len() == 1 {
let (time, diff) = self.stash.last().unwrap();
self.time_con.clear(); self.time_con.push_own(time);
self.diff_con.clear(); self.diff_con.push_own(diff);
if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
self.total += 1;
self.stash.clear();
upds.offs.push_ref(upds.times.len());
return true;
}
}
self.total += self.stash.len();
for (time, diff) in self.stash.drain(..) {
upds.times.push_own(&time);
upds.diffs.push_own(&diff);
}
upds.offs.push_ref(upds.times.len());
true
}
pub fn total(&self) -> usize { self.total }
}
}
pub mod val_batch {
use std::marker::PhantomData;
use serde::{Deserialize, Serialize};
use timely::container::PushInto;
use timely::progress::{Antichain, frontier::AntichainRef};
use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, BuilderInput};
use crate::trace::implementations::layout;
use super::{Layout, Vals, Upds, layers::UpdsBuilder};
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound = "
L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
L::ValContainer: Serialize + for<'a> Deserialize<'a>,
L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
")]
pub struct OrdValStorage<L: Layout> {
pub keys: L::KeyContainer,
pub vals: Vals<L::OffsetContainer, L::ValContainer>,
pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
}
#[derive(Serialize, Deserialize)]
#[serde(bound = "
L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
L::ValContainer: Serialize + for<'a> Deserialize<'a>,
L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
")]
pub struct OrdValBatch<L: Layout> {
pub storage: OrdValStorage<L>,
pub description: Description<layout::Time<L>>,
pub updates: usize,
}
impl<L: Layout> WithLayout for OrdValBatch<L> {
type Layout = L;
}
impl<L: Layout> BatchReader for OrdValBatch<L> {
type Cursor = OrdValCursor<L>;
fn cursor(&self) -> Self::Cursor {
OrdValCursor {
key_cursor: 0,
val_cursor: 0,
phantom: PhantomData,
}
}
fn len(&self) -> usize {
self.updates
}
fn description(&self) -> &Description<layout::Time<L>> { &self.description }
}
impl<L: Layout> Batch for OrdValBatch<L> {
type Merger = OrdValMerger<L>;
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
OrdValMerger::new(self, other, compaction_frontier)
}
fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
use timely::progress::Timestamp;
Self {
storage: OrdValStorage {
keys: L::KeyContainer::with_capacity(0),
vals: Default::default(),
upds: Default::default(),
},
description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
updates: 0,
}
}
}
pub struct OrdValMerger<L: Layout> {
key_cursor1: usize,
key_cursor2: usize,
result: OrdValStorage<L>,
description: Description<layout::Time<L>>,
staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
}
impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
where
OrdValBatch<L>: Batch<Time=layout::Time<L>>,
{
fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
assert!(batch1.upper() == batch2.lower());
use crate::lattice::Lattice;
let mut since = batch1.description().since().join(batch2.description().since());
since = since.join(&compaction_frontier.to_owned());
let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
let batch1 = &batch1.storage;
let batch2 = &batch2.storage;
OrdValMerger {
key_cursor1: 0,
key_cursor2: 0,
result: OrdValStorage {
keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
},
description,
staging: UpdsBuilder::default(),
}
}
fn done(self) -> OrdValBatch<L> {
OrdValBatch {
updates: self.staging.total(),
storage: self.result,
description: self.description,
}
}
fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
let starting_updates = self.staging.total();
let mut effort = 0isize;
while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
self.merge_key(&source1.storage, &source2.storage);
effort = (self.staging.total() - starting_updates) as isize;
}
while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
self.copy_key(&source1.storage, self.key_cursor1);
self.key_cursor1 += 1;
effort = (self.staging.total() - starting_updates) as isize;
}
while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
self.copy_key(&source2.storage, self.key_cursor2);
self.key_cursor2 += 1;
effort = (self.staging.total() - starting_updates) as isize;
}
*fuel -= effort;
}
}
impl<L: Layout> OrdValMerger<L> {
fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
let init_vals = self.result.vals.vals.len();
let (mut lower, upper) = source.vals.bounds(cursor);
while lower < upper {
self.stash_updates_for_val(source, lower);
if self.staging.seal(&mut self.result.upds) {
self.result.vals.vals.push_ref(source.vals.get_abs(lower));
}
lower += 1;
}
if self.result.vals.vals.len() > init_vals {
self.result.keys.push_ref(source.keys.index(cursor));
self.result.vals.offs.push_ref(self.result.vals.vals.len());
}
}
fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
use ::std::cmp::Ordering;
match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
Ordering::Less => {
self.copy_key(source1, self.key_cursor1);
self.key_cursor1 += 1;
},
Ordering::Equal => {
let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
self.result.vals.offs.push_ref(off);
}
self.key_cursor1 += 1;
self.key_cursor2 += 1;
},
Ordering::Greater => {
self.copy_key(source2, self.key_cursor2);
self.key_cursor2 += 1;
},
}
}
fn merge_vals(
&mut self,
(source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
(source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
) -> Option<usize> {
let init_vals = self.result.vals.vals.len();
while lower1 < upper1 && lower2 < upper2 {
use ::std::cmp::Ordering;
match source1.vals.get_abs(lower1).cmp(&source2.vals.get_abs(lower2)) {
Ordering::Less => {
self.stash_updates_for_val(source1, lower1);
if self.staging.seal(&mut self.result.upds) {
self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
}
lower1 += 1;
},
Ordering::Equal => {
self.stash_updates_for_val(source1, lower1);
self.stash_updates_for_val(source2, lower2);
if self.staging.seal(&mut self.result.upds) {
self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
}
lower1 += 1;
lower2 += 1;
},
Ordering::Greater => {
self.stash_updates_for_val(source2, lower2);
if self.staging.seal(&mut self.result.upds) {
self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
}
lower2 += 1;
},
}
}
while lower1 < upper1 {
self.stash_updates_for_val(source1, lower1);
if self.staging.seal(&mut self.result.upds) {
self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
}
lower1 += 1;
}
while lower2 < upper2 {
self.stash_updates_for_val(source2, lower2);
if self.staging.seal(&mut self.result.upds) {
self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
}
lower2 += 1;
}
if self.result.vals.vals.len() > init_vals {
Some(self.result.vals.vals.len())
} else {
None
}
}
fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
let (lower, upper) = source.upds.bounds(index);
for i in lower .. upper {
let (time, diff) = source.upds.get_abs(i);
use crate::lattice::Lattice;
let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
new_time.advance_by(self.description.since().borrow());
self.staging.push(new_time, L::DiffContainer::into_owned(diff));
}
}
}
pub struct OrdValCursor<L: Layout> {
key_cursor: usize,
val_cursor: usize,
phantom: PhantomData<L>,
}
use crate::trace::implementations::WithLayout;
impl<L: Layout> WithLayout for OrdValCursor<L> {
type Layout = L;
}
impl<L: Layout> Cursor for OrdValCursor<L> {
type Storage = OrdValBatch<L>;
fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.get_abs(self.val_cursor) }
fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
for index in lower .. upper {
let (time, diff) = storage.storage.upds.get_abs(index);
logic(time, diff);
}
}
fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1 }
fn step_key(&mut self, storage: &OrdValBatch<L>){
self.key_cursor += 1;
if self.key_valid(storage) {
self.rewind_vals(storage);
}
else {
self.key_cursor = storage.storage.keys.len();
}
}
fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
if self.key_valid(storage) {
self.rewind_vals(storage);
}
}
fn step_val(&mut self, storage: &OrdValBatch<L>) {
self.val_cursor += 1;
if !self.val_valid(storage) {
self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
}
}
fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
self.val_cursor += storage.storage.vals.vals.advance(self.val_cursor, storage.storage.vals.bounds(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
}
fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
self.key_cursor = 0;
if self.key_valid(storage) {
self.rewind_vals(storage)
}
}
fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
}
}
pub struct OrdValBuilder<L: Layout, CI> {
pub result: OrdValStorage<L>,
staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
_marker: PhantomData<CI>,
}
impl<L, CI> Builder for OrdValBuilder<L, CI>
where
L: for<'a> Layout<
KeyContainer: PushInto<CI::Key<'a>>,
ValContainer: PushInto<CI::Val<'a>>,
>,
CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
{
type Input = CI;
type Time = layout::Time<L>;
type Output = OrdValBatch<L>;
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
Self {
result: OrdValStorage {
keys: L::KeyContainer::with_capacity(keys),
vals: Vals::with_capacity(keys + 1, vals),
upds: Upds::with_capacity(vals + 1, upds),
},
staging: UpdsBuilder::default(),
_marker: PhantomData,
}
}
#[inline]
fn push(&mut self, chunk: &mut Self::Input) {
for item in chunk.drain() {
let (key, val, time, diff) = CI::into_parts(item);
if self.result.keys.is_empty() {
self.result.vals.vals.push_into(val);
self.result.keys.push_into(key);
self.staging.push(time, diff);
}
else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
if self.result.vals.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
self.staging.push(time, diff);
} else {
self.staging.seal(&mut self.result.upds);
self.staging.push(time, diff);
self.result.vals.vals.push_into(val);
}
} else {
self.staging.seal(&mut self.result.upds);
self.staging.push(time, diff);
self.result.vals.offs.push_ref(self.result.vals.vals.len());
self.result.vals.vals.push_into(val);
self.result.keys.push_into(key);
}
}
}
#[inline(never)]
fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
self.staging.seal(&mut self.result.upds);
self.result.vals.offs.push_ref(self.result.vals.vals.len());
OrdValBatch {
updates: self.staging.total(),
storage: self.result,
description,
}
}
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
let mut builder = Self::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}
builder.done(description)
}
}
}
pub mod key_batch {
use std::marker::PhantomData;
use serde::{Deserialize, Serialize};
use timely::container::PushInto;
use timely::progress::{Antichain, frontier::AntichainRef};
use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use crate::trace::implementations::{BatchContainer, BuilderInput};
use crate::trace::implementations::layout;
use super::{Layout, Upds, layers::UpdsBuilder};
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound = "
L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
")]
pub struct OrdKeyStorage<L: Layout> {
pub keys: L::KeyContainer,
pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
}
#[derive(Serialize, Deserialize)]
#[serde(bound = "
L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
L::ValContainer: Serialize + for<'a> Deserialize<'a>,
L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
")]
pub struct OrdKeyBatch<L: Layout> {
pub storage: OrdKeyStorage<L>,
pub description: Description<layout::Time<L>>,
pub updates: usize,
pub value: L::ValContainer,
}
impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> OrdKeyBatch<L> {
pub fn create_value() -> L::ValContainer {
let mut value = L::ValContainer::with_capacity(1);
value.push_own(&Default::default());
value
}
}
impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> WithLayout for OrdKeyBatch<L> {
type Layout = L;
}
impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> BatchReader for OrdKeyBatch<L> {
type Cursor = OrdKeyCursor<L>;
fn cursor(&self) -> Self::Cursor {
OrdKeyCursor {
key_cursor: 0,
val_stepped: false,
phantom: std::marker::PhantomData,
}
}
fn len(&self) -> usize {
self.updates
}
fn description(&self) -> &Description<layout::Time<L>> { &self.description }
}
impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Batch for OrdKeyBatch<L> {
type Merger = OrdKeyMerger<L>;
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
OrdKeyMerger::new(self, other, compaction_frontier)
}
fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
use timely::progress::Timestamp;
Self {
storage: OrdKeyStorage {
keys: L::KeyContainer::with_capacity(0),
upds: Upds::default(),
},
description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
updates: 0,
value: Self::create_value(),
}
}
}
pub struct OrdKeyMerger<L: Layout> {
key_cursor1: usize,
key_cursor2: usize,
result: OrdKeyStorage<L>,
description: Description<layout::Time<L>>,
staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
}
impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
where
OrdKeyBatch<L>: Batch<Time=layout::Time<L>>,
{
fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
assert!(batch1.upper() == batch2.lower());
use crate::lattice::Lattice;
let mut since = batch1.description().since().join(batch2.description().since());
since = since.join(&compaction_frontier.to_owned());
let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
let batch1 = &batch1.storage;
let batch2 = &batch2.storage;
OrdKeyMerger {
key_cursor1: 0,
key_cursor2: 0,
result: OrdKeyStorage {
keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
},
description,
staging: UpdsBuilder::default(),
}
}
fn done(self) -> OrdKeyBatch<L> {
OrdKeyBatch {
updates: self.staging.total(),
storage: self.result,
description: self.description,
value: OrdKeyBatch::<L>::create_value(),
}
}
fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
let starting_updates = self.staging.total();
let mut effort = 0isize;
while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
self.merge_key(&source1.storage, &source2.storage);
effort = (self.staging.total() - starting_updates) as isize;
}
while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
self.copy_key(&source1.storage, self.key_cursor1);
self.key_cursor1 += 1;
effort = (self.staging.total() - starting_updates) as isize;
}
while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
self.copy_key(&source2.storage, self.key_cursor2);
self.key_cursor2 += 1;
effort = (self.staging.total() - starting_updates) as isize;
}
*fuel -= effort;
}
}
impl<L: Layout> OrdKeyMerger<L> {
fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
self.stash_updates_for_key(source, cursor);
if self.staging.seal(&mut self.result.upds) {
self.result.keys.push_ref(source.keys.index(cursor));
}
}
fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
use ::std::cmp::Ordering;
match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
Ordering::Less => {
self.copy_key(source1, self.key_cursor1);
self.key_cursor1 += 1;
},
Ordering::Equal => {
self.stash_updates_for_key(source1, self.key_cursor1);
self.stash_updates_for_key(source2, self.key_cursor2);
if self.staging.seal(&mut self.result.upds) {
self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
}
self.key_cursor1 += 1;
self.key_cursor2 += 1;
},
Ordering::Greater => {
self.copy_key(source2, self.key_cursor2);
self.key_cursor2 += 1;
},
}
}
fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
let (lower, upper) = source.upds.bounds(index);
for i in lower .. upper {
let (time, diff) = source.upds.get_abs(i);
use crate::lattice::Lattice;
let mut new_time = L::TimeContainer::into_owned(time);
new_time.advance_by(self.description.since().borrow());
self.staging.push(new_time, L::DiffContainer::into_owned(diff));
}
}
}
pub struct OrdKeyCursor<L: Layout> {
key_cursor: usize,
val_stepped: bool,
phantom: PhantomData<L>,
}
use crate::trace::implementations::WithLayout;
impl<L: Layout<ValContainer: BatchContainer>> WithLayout for OrdKeyCursor<L> {
type Layout = L;
}
impl<L: for<'a> Layout<ValContainer: BatchContainer<Owned: Default>>> Cursor for OrdKeyCursor<L> {
type Storage = OrdKeyBatch<L>;
fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { storage.value.index(0) }
fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
for index in lower .. upper {
let (time, diff) = storage.storage.upds.get_abs(index);
logic(time, diff);
}
}
fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
fn step_key(&mut self, storage: &Self::Storage){
self.key_cursor += 1;
if self.key_valid(storage) {
self.rewind_vals(storage);
}
else {
self.key_cursor = storage.storage.keys.len();
}
}
fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
if self.key_valid(storage) {
self.rewind_vals(storage);
}
}
fn step_val(&mut self, _storage: &Self::Storage) {
self.val_stepped = true;
}
fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
fn rewind_keys(&mut self, storage: &Self::Storage) {
self.key_cursor = 0;
if self.key_valid(storage) {
self.rewind_vals(storage)
}
}
fn rewind_vals(&mut self, _storage: &Self::Storage) {
self.val_stepped = false;
}
}
pub struct OrdKeyBuilder<L: Layout, CI> {
pub result: OrdKeyStorage<L>,
staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
_marker: PhantomData<CI>,
}
impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
where
L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
L: Layout<ValContainer: BatchContainer<Owned: Default>>,
CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
{
type Input = CI;
type Time = layout::Time<L>;
type Output = OrdKeyBatch<L>;
fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
Self {
result: OrdKeyStorage {
keys: L::KeyContainer::with_capacity(keys),
upds: Upds::with_capacity(keys+1, upds),
},
staging: UpdsBuilder::default(),
_marker: PhantomData,
}
}
#[inline]
fn push(&mut self, chunk: &mut Self::Input) {
for item in chunk.drain() {
let (key, _val, time, diff) = CI::into_parts(item);
if self.result.keys.is_empty() {
self.result.keys.push_into(key);
self.staging.push(time, diff);
}
else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
self.staging.push(time, diff);
} else {
self.staging.seal(&mut self.result.upds);
self.staging.push(time, diff);
self.result.keys.push_into(key);
}
}
}
#[inline(never)]
fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
self.staging.seal(&mut self.result.upds);
OrdKeyBatch {
updates: self.staging.total(),
storage: self.result,
description,
value: OrdKeyBatch::<L>::create_value(),
}
}
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
let mut builder = Self::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}
builder.done(description)
}
}
}