use std::rc::Rc;
use crate::trace::implementations::ord_neu::OrdValBatch;
use crate::trace::rc_blanket_impls::RcBuilder;
use crate::trace::implementations::spine_fueled::Spine;
use super::layout::ColumnarLayout;
pub mod trie_merger;
pub type ValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<ColumnarLayout<(K,V,T,R)>>>>;
pub type ValBatcher<K, V, T, R> = super::batcher::MergeBatcher<(K,V,T,R)>;
pub type ValChunker<U> = TrieChunker<U>;
pub type ValBuilder<K, V, T, R> = RcBuilder<builder::ValMirror<(K,V,T,R)>>;
pub use batch_container::Coltainer;
pub mod batch_container {
use columnar::{Borrow, Columnar, Container, Clear, Push, Index, Len};
use crate::trace::implementations::BatchContainer;
pub struct Coltainer<C: Columnar> {
pub container: C::Container,
}
impl<C: Columnar> Default for Coltainer<C> {
fn default() -> Self { Self { container: Default::default() } }
}
impl<C: Columnar + Ord + Clone> BatchContainer for Coltainer<C> where for<'a> columnar::Ref<'a, C> : Ord {
type ReadItem<'a> = columnar::Ref<'a, C>;
type Owned = C;
#[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { C::into_owned(item) }
#[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.copy_from(item) }
#[inline(always)] fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.container.push(item) }
#[inline(always)] fn push_own(&mut self, item: &Self::Owned) { self.container.push(item) }
fn clear(&mut self) { self.container.clear() }
fn with_capacity(_size: usize) -> Self { Self::default() }
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Self {
container: <C as Columnar>::Container::with_capacity_for([cont1.container.borrow(), cont2.container.borrow()].into_iter()),
}
}
#[inline(always)] fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { columnar::ContainerOf::<C>::reborrow_ref(item) }
#[inline(always)] fn index(&self, index: usize) -> Self::ReadItem<'_> { self.container.borrow().get(index) }
#[inline(always)] fn len(&self) -> usize { self.container.len() }
fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
let borrow = self.container.borrow();
let small_limit = 8;
if end > start + small_limit && function(borrow.get(start + small_limit)) {
let mut index = small_limit + 1;
if start + index < end && function(borrow.get(start + index)) {
let mut step = 1;
while start + index + step < end && function(borrow.get(start + index + step)) {
index += step;
step <<= 1;
}
step >>= 1;
while step > 0 {
if start + index + step < end && function(borrow.get(start + index + step)) {
index += step;
}
step >>= 1;
}
index += 1;
}
index
}
else {
let limit = std::cmp::min(end, start + small_limit);
(start .. limit).filter(|x| function(borrow.get(*x))).count()
}
}
}
}
use super::updates::UpdatesTyped;
use super::RecordedUpdates;
pub struct TrieChunker<U: super::layout::ColumnarUpdate> {
blobs: Vec<(UpdatesTyped<U>, bool)>,
blob_records: usize,
ready: std::collections::VecDeque<UpdatesTyped<U>>,
stage: Option<UpdatesTyped<U>>,
}
impl<U: super::layout::ColumnarUpdate> Default for TrieChunker<U> {
fn default() -> Self {
Self {
blobs: Default::default(),
blob_records: 0,
ready: Default::default(),
stage: None,
}
}
}
impl<U: super::layout::ColumnarUpdate> TrieChunker<U> {
fn consolidate_blobs(&mut self) -> UpdatesTyped<U> {
if self.blobs.len() == 1 && self.blobs[0].1 {
let (result, _) = self.blobs.pop().unwrap();
self.blob_records = 0;
return result;
}
let result = UpdatesTyped::<U>::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter()));
self.blobs.clear();
self.blob_records = 0;
result
}
fn absorb(&mut self, updates: UpdatesTyped<U>, consolidated: bool) {
self.blob_records += updates.len();
self.blobs.push((updates, consolidated));
}
}
impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates<U>> for TrieChunker<U> {
fn push_into(&mut self, container: &'a mut RecordedUpdates<U>) {
if container.updates.len() == 0 { return; }
let updates = std::mem::take(&mut container.updates).into_typed();
let consolidated = container.consolidated;
let len = updates.len();
if consolidated && len >= crate::columnar::LINK_TARGET { self.ready.push_back(updates); }
else if self.blob_records + len < 2 * crate::columnar::LINK_TARGET { self.absorb(updates, consolidated); }
else {
let input_residual = if len >= crate::columnar::LINK_TARGET {
let cons = if consolidated { updates } else { updates.consolidate() };
if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None }
else if cons.len() > 0 { Some((cons, true)) }
else { None }
}
else { Some((updates, consolidated)) };
let blobs_residual = if self.blob_records >= crate::columnar::LINK_TARGET {
let cons = self.consolidate_blobs();
if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None }
else if cons.len() > 0 { Some((cons, true)) }
else { None }
}
else { None };
if let Some((r, c)) = input_residual { self.absorb(r, c); }
if let Some((r, c)) = blobs_residual { self.absorb(r, c); }
}
}
}
impl<U: super::layout::ColumnarUpdate> timely::container::ContainerBuilder for TrieChunker<U> {
type Container = UpdatesTyped<U>;
fn extract(&mut self) -> Option<&mut Self::Container> {
self.stage = self.ready.pop_front();
self.stage.as_mut()
}
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.blobs.is_empty() {
let cons = self.consolidate_blobs();
if cons.len() > 0 { self.ready.push_back(cons); }
}
self.extract()
}
}
pub mod builder {
use crate::trace::implementations::ord_neu::{Vals, Upds};
use crate::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage};
use crate::trace::Description;
use super::super::updates::UpdatesTyped;
use super::super::layout::ColumnarUpdate as Update;
use super::super::layout::ColumnarLayout as Layout;
use super::Coltainer;
use columnar::{Borrow, IndexAs};
use columnar::primitive::offsets::Strides;
use crate::trace::implementations::OffsetList;
fn strides_to_offset_list(bounds: &Strides, count: usize) -> OffsetList {
let mut output = OffsetList::with_capacity(count);
output.push(0);
let bounds_b = bounds.borrow();
for i in 0..count {
output.push(bounds_b.index_as(i) as usize);
}
output
}
pub struct ValMirror<U: Update> {
chunks: Vec<UpdatesTyped<U>>,
}
impl<U: Update> crate::trace::Builder for ValMirror<U> {
type Time = U::Time;
type Input = UpdatesTyped<U>;
type Output = OrdValBatch<Layout<U>>;
fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
Self { chunks: Vec::new() }
}
fn push(&mut self, chunk: &mut Self::Input) {
if chunk.len() > 0 {
self.chunks.push(std::mem::take(chunk));
}
}
fn done(self, description: Description<Self::Time>) -> Self::Output {
let mut chain = self.chunks;
Self::seal(&mut chain, description)
}
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
use columnar::Len;
use columnar::Container;
let mut updates = UpdatesTyped::<U>::default();
updates.keys.reserve_for(chain.iter().map(|c| c.view().keys));
updates.vals.reserve_for(chain.iter().map(|c| c.view().vals));
updates.times.reserve_for(chain.iter().map(|c| c.view().times));
updates.diffs.reserve_for(chain.iter().map(|c| c.view().diffs));
let mut builder = super::super::updates::UpdatesBuilder::new_from(updates);
for chunk in chain.iter() {
builder.meld(chunk);
}
let merged = builder.done();
chain.clear();
let updates = Len::len(&merged.diffs.values);
if updates == 0 {
let storage = OrdValStorage {
keys: Default::default(),
vals: Default::default(),
upds: Default::default(),
};
OrdValBatch { storage, description, updates: 0 }
} else {
let val_offs = strides_to_offset_list(&merged.vals.bounds, Len::len(&merged.keys.values));
let time_offs = strides_to_offset_list(&merged.times.bounds, Len::len(&merged.vals.values));
let storage = OrdValStorage {
keys: Coltainer { container: merged.keys.values },
vals: Vals {
offs: val_offs,
vals: Coltainer { container: merged.vals.values },
},
upds: Upds {
offs: time_offs,
times: Coltainer { container: merged.times.values },
diffs: Coltainer { container: merged.diffs.values },
},
};
OrdValBatch { storage, description, updates }
}
}
}
}