use std::rc::Rc;
use ::difference::Monoid;
use lattice::Lattice;
use trace::layers::{Trie, TupleBuilder};
use trace::layers::Builder as TrieBuilder;
use trace::layers::Cursor as TrieCursor;
use trace::layers::ordered::{OrderedLayer, OrderedBuilder, OrderedCursor};
use trace::layers::ordered_leaf::{OrderedLeaf, OrderedLeafBuilder};
use trace::{Batch, BatchReader, Builder, Merger, Cursor};
use trace::description::Description;
use trace::layers::MergeBuilder;
use super::spine_fueled::Spine;
use super::merge_batcher::MergeBatcher;
use abomonation::abomonated::Abomonated;
pub type OrdValSpine<K, V, T, R> = Spine<K, V, T, R, Rc<OrdValBatch<K, V, T, R>>>;
pub type OrdValSpineAbom<K, V, T, R> = Spine<K, V, T, R, Rc<Abomonated<OrdValBatch<K, V, T, R>, Vec<u8>>>>;
pub type OrdKeySpine<K, T, R> = Spine<K, (), T, R, Rc<OrdKeyBatch<K, T, R>>>;
pub type OrdKeySpineAbom<K, T, R> = Spine<K, (), T, R, Rc<Abomonated<OrdKeyBatch<K, T, R>, Vec<u8>>>>;
#[derive(Debug, Abomonation)]
pub struct OrdValBatch<K: Ord, V: Ord, T: Lattice, R> {
pub layer: OrderedLayer<K, OrderedLayer<V, OrderedLeaf<T, R>>>,
pub desc: Description<T>,
}
impl<K, V, T, R> BatchReader<K, V, T, R> for OrdValBatch<K, V, T, R>
where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Monoid {
type Cursor = OrdValCursor<V, T, R>;
fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor() } }
fn len(&self) -> usize { <OrderedLayer<K, OrderedLayer<V, OrderedLeaf<T, R>>> as Trie>::tuples(&self.layer) }
fn description(&self) -> &Description<T> { &self.desc }
}
impl<K, V, T, R> Batch<K, V, T, R> for OrdValBatch<K, V, T, R>
where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+::std::fmt::Debug+'static, R: Monoid {
type Batcher = MergeBatcher<K, V, T, R, Self>;
type Builder = OrdValBuilder<K, V, T, R>;
type Merger = OrdValMerger<K, V, T, R>;
fn begin_merge(&self, other: &Self) -> Self::Merger {
OrdValMerger::new(self, other)
}
}
impl<K, V, T, R> OrdValBatch<K, V, T, R>
where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+::std::fmt::Debug+'static, R: Monoid {
fn advance_builder_from(layer: &mut OrderedBuilder<K, OrderedBuilder<V, OrderedLeafBuilder<T, R>>>, frontier: &[T], key_pos: usize) {
let key_start = key_pos;
let val_start = layer.offs[key_pos];
let time_start = layer.vals.offs[val_start];
for i in time_start .. layer.vals.vals.vals.len() {
layer.vals.vals.vals[i].0.advance_by(frontier);
}
let mut write_position = time_start;
for i in val_start .. layer.vals.keys.len() {
let lower = layer.vals.offs[i];
let upper = layer.vals.offs[i+1];
layer.vals.offs[i] = write_position;
let updates = &mut layer.vals.vals.vals[..];
updates[lower .. upper].sort_by(|x,y| x.0.cmp(&y.0));
for index in lower .. (upper - 1) {
if updates[index].0 == updates[index+1].0 {
let prev = ::std::mem::replace(&mut updates[index].1, R::zero());
updates[index+1].1 += &prev;
}
}
for index in lower .. upper {
if !updates[index].1.is_zero() {
updates.swap(write_position, index);
write_position += 1;
}
}
}
layer.vals.vals.vals.truncate(write_position);
layer.vals.offs[layer.vals.keys.len()] = write_position;
let mut write_position = val_start;
for i in key_start .. layer.keys.len() {
let lower = layer.offs[i];
let upper = layer.offs[i+1];
layer.offs[i] = write_position;
for index in lower .. upper {
let val_lower = layer.vals.offs[index];
let val_upper = layer.vals.offs[index+1];
if val_lower < val_upper {
layer.vals.keys.swap(write_position, index);
layer.vals.offs[write_position+1] = layer.vals.offs[index+1];
write_position += 1;
}
}
}
layer.vals.keys.truncate(write_position);
layer.vals.offs.truncate(write_position + 1);
layer.offs[layer.keys.len()] = write_position;
let mut write_position = key_start;
for i in key_start .. layer.keys.len() {
let lower = layer.offs[i];
let upper = layer.offs[i+1];
if lower < upper {
layer.keys.swap(write_position, i);
write_position += 1;
}
}
layer.offs.dedup();
layer.keys.truncate(write_position);
layer.offs.truncate(write_position+1);
}
}
pub struct OrdValMerger<K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+::std::fmt::Debug+'static, R: Monoid> {
lower1: usize,
upper1: usize,
lower2: usize,
upper2: usize,
result: <OrderedLayer<K, OrderedLayer<V, OrderedLeaf<T, R>>> as Trie>::MergeBuilder,
description: Description<T>,
}
impl<K, V, T, R> Merger<K, V, T, R, OrdValBatch<K, V, T, R>> for OrdValMerger<K, V, T, R>
where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+::std::fmt::Debug+'static, R: Monoid {
fn new(batch1: &OrdValBatch<K, V, T, R>, batch2: &OrdValBatch<K, V, T, R>) -> Self {
assert!(batch1.upper() == batch2.lower());
let since = if batch1.description().since().iter().all(|t1| batch2.description().since().iter().any(|t2| t2.less_equal(t1))) {
batch2.description().since()
}
else {
batch1.description().since()
};
let description = Description::new(batch1.lower(), batch2.upper(), since);
OrdValMerger {
lower1: 0,
upper1: batch1.layer.keys(),
lower2: 0,
upper2: batch2.layer.keys(),
result: <<OrderedLayer<K, OrderedLayer<V, OrderedLeaf<T, R>>> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer),
description: description,
}
}
fn done(self) -> OrdValBatch<K, V, T, R> {
assert!(self.lower1 == self.upper1);
assert!(self.lower2 == self.upper2);
OrdValBatch {
layer: self.result.done(),
desc: self.description,
}
}
fn work(&mut self, source1: &OrdValBatch<K,V,T,R>, source2: &OrdValBatch<K,V,T,R>, frontier: &Option<Vec<T>>, fuel: &mut usize) {
let starting_updates = self.result.vals.vals.vals.len();
let mut effort = 0;
let initial_key_pos = self.result.keys.len();
while self.lower1 < self.upper1 && self.lower2 < self.upper2 && effort < *fuel {
self.result.merge_step((&source1.layer, &mut self.lower1, self.upper1), (&source2.layer, &mut self.lower2, self.upper2));
effort = self.result.vals.vals.vals.len() - starting_updates;
}
if self.lower1 == self.upper1 || self.lower2 == self.upper2 {
if self.lower1 < self.upper1 { self.result.copy_range(&source1.layer, self.lower1, self.upper1); self.lower1 = self.upper1; }
if self.lower2 < self.upper2 { self.result.copy_range(&source2.layer, self.lower2, self.upper2); self.lower2 = self.upper2; }
}
effort = self.result.vals.vals.vals.len() - starting_updates;
if let Some(frontier) = frontier.as_ref() {
OrdValBatch::advance_builder_from(&mut self.result, frontier, initial_key_pos)
}
if effort >= *fuel { *fuel = 0; }
else { *fuel -= effort; }
}
}
#[derive(Debug)]
pub struct OrdValCursor<V: Ord+Clone, T: Lattice+Ord+Clone, R: Monoid> {
cursor: OrderedCursor<OrderedLayer<V, OrderedLeaf<T, R>>>,
}
impl<K, V, T, R> Cursor<K, V, T, R> for OrdValCursor<V, T, R>
where K: Ord+Clone, V: Ord+Clone, T: Lattice+Ord+Clone, R: Monoid {
type Storage = OrdValBatch<K, V, T, R>;
fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) }
fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { &self.cursor.child.key(&storage.layer.vals) }
fn map_times<L: FnMut(&T, &R)>(&mut self, storage: &Self::Storage, mut logic: L) {
self.cursor.child.child.rewind(&storage.layer.vals.vals);
while self.cursor.child.child.valid(&storage.layer.vals.vals) {
logic(&self.cursor.child.child.key(&storage.layer.vals.vals).0, &self.cursor.child.child.key(&storage.layer.vals.vals).1);
self.cursor.child.child.step(&storage.layer.vals.vals);
}
}
fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) }
fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.child.valid(&storage.layer.vals) }
fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); }
fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek(&storage.layer, key); }
fn step_val(&mut self, storage: &Self::Storage) { self.cursor.child.step(&storage.layer.vals); }
fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.child.seek(&storage.layer.vals, val); }
fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); }
fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.child.rewind(&storage.layer.vals); }
}
pub struct OrdValBuilder<K: Ord, V: Ord, T: Ord+Lattice, R: Monoid> {
builder: OrderedBuilder<K, OrderedBuilder<V, OrderedLeafBuilder<T, R>>>,
}
impl<K, V, T, R> Builder<K, V, T, R, OrdValBatch<K, V, T, R>> for OrdValBuilder<K, V, T, R>
where K: Ord+Clone+'static, V: Ord+Clone+'static, T: Lattice+Ord+Clone+::std::fmt::Debug+'static, R: Monoid {
fn new() -> Self {
OrdValBuilder {
builder: OrderedBuilder::<K, OrderedBuilder<V, OrderedLeafBuilder<T, R>>>::new()
}
}
fn with_capacity(cap: usize) -> Self {
OrdValBuilder {
builder: <OrderedBuilder<K, OrderedBuilder<V, OrderedLeafBuilder<T, R>>> as TupleBuilder>::with_capacity(cap)
}
}
#[inline]
fn push(&mut self, (key, val, time, diff): (K, V, T, R)) {
self.builder.push_tuple((key, (val, (time, diff))));
}
#[inline(never)]
fn done(self, lower: &[T], upper: &[T], since: &[T]) -> OrdValBatch<K, V, T, R> {
OrdValBatch {
layer: self.builder.done(),
desc: Description::new(lower, upper, since)
}
}
}
#[derive(Debug, Abomonation)]
pub struct OrdKeyBatch<K: Ord, T: Lattice, R> {
pub layer: OrderedLayer<K, OrderedLeaf<T, R>>,
pub desc: Description<T>,
}
impl<K, T, R> BatchReader<K, (), T, R> for OrdKeyBatch<K, T, R>
where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Monoid {
type Cursor = OrdKeyCursor<T, R>;
fn cursor(&self) -> Self::Cursor {
OrdKeyCursor {
empty: (),
valid: true,
cursor: self.layer.cursor(),
}
}
fn len(&self) -> usize { <OrderedLayer<K, OrderedLeaf<T, R>> as Trie>::tuples(&self.layer) }
fn description(&self) -> &Description<T> { &self.desc }
}
impl<K, T, R> Batch<K, (), T, R> for OrdKeyBatch<K, T, R>
where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Monoid {
type Batcher = MergeBatcher<K, (), T, R, Self>;
type Builder = OrdKeyBuilder<K, T, R>;
type Merger = OrdKeyMerger<K, T, R>;
fn begin_merge(&self, other: &Self) -> Self::Merger {
OrdKeyMerger::new(self, other)
}
}
impl<K, T, R> OrdKeyBatch<K, T, R>
where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Monoid {
fn advance_builder_from(layer: &mut OrderedBuilder<K, OrderedLeafBuilder<T, R>>, frontier: &[T], key_pos: usize) {
let key_start = key_pos;
let time_start = layer.offs[key_pos];
for i in time_start .. layer.vals.vals.len() {
layer.vals.vals[i].0.advance_by(frontier);
}
let mut write_position = time_start;
for i in key_start .. layer.keys.len() {
let lower = layer.offs[i];
let upper = layer.offs[i+1];
layer.offs[i] = write_position;
let updates = &mut layer.vals.vals[..];
updates[lower .. upper].sort_by(|x,y| x.0.cmp(&y.0));
for index in lower .. (upper - 1) {
if updates[index].0 == updates[index+1].0 {
let prev = ::std::mem::replace(&mut updates[index].1, R::zero());
updates[index + 1].1 += &prev;
}
}
for index in lower .. upper {
if !updates[index].1.is_zero() {
updates.swap(write_position, index);
write_position += 1;
}
}
}
layer.vals.vals.truncate(write_position);
layer.offs[layer.keys.len()] = write_position;
let mut write_position = key_start;
for i in key_start .. layer.keys.len() {
let lower = layer.offs[i];
let upper = layer.offs[i+1];
if lower < upper {
layer.keys.swap(write_position, i);
write_position += 1;
}
}
layer.offs.dedup();
layer.keys.truncate(write_position);
layer.offs.truncate(write_position+1);
}
}
pub struct OrdKeyMerger<K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Monoid> {
lower1: usize,
upper1: usize,
lower2: usize,
upper2: usize,
result: <OrderedLayer<K, OrderedLeaf<T, R>> as Trie>::MergeBuilder,
description: Description<T>,
}
impl<K, T, R> Merger<K, (), T, R, OrdKeyBatch<K, T, R>> for OrdKeyMerger<K, T, R>
where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Monoid {
fn new(batch1: &OrdKeyBatch<K, T, R>, batch2: &OrdKeyBatch<K, T, R>) -> Self {
assert!(batch1.upper() == batch2.lower());
let since = if batch1.description().since().iter().all(|t1| batch2.description().since().iter().any(|t2| t2.less_equal(t1))) {
batch2.description().since()
}
else {
batch1.description().since()
};
let description = Description::new(batch1.lower(), batch2.upper(), since);
OrdKeyMerger {
lower1: 0,
upper1: batch1.layer.keys(),
lower2: 0,
upper2: batch2.layer.keys(),
result: <<OrderedLayer<K, OrderedLeaf<T, R>> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer),
description: description,
}
}
fn done(self) -> OrdKeyBatch<K, T, R> {
assert!(self.lower1 == self.upper1);
assert!(self.lower2 == self.upper2);
OrdKeyBatch {
layer: self.result.done(),
desc: self.description,
}
}
fn work(&mut self, source1: &OrdKeyBatch<K,T,R>, source2: &OrdKeyBatch<K,T,R>, frontier: &Option<Vec<T>>, fuel: &mut usize) {
let starting_updates = self.result.vals.vals.len();
let mut effort = 0;
let initial_key_pos = self.result.keys.len();
while self.lower1 < self.upper1 && self.lower2 < self.upper2 && effort < *fuel {
self.result.merge_step((&source1.layer, &mut self.lower1, self.upper1), (&source2.layer, &mut self.lower2, self.upper2));
effort = self.result.vals.vals.len() - starting_updates;
}
if self.lower1 == self.upper1 || self.lower2 == self.upper2 {
if self.lower1 < self.upper1 { self.result.copy_range(&source1.layer, self.lower1, self.upper1); self.lower1 = self.upper1; }
if self.lower2 < self.upper2 { self.result.copy_range(&source2.layer, self.lower2, self.upper2); self.lower2 = self.upper2; }
}
effort = self.result.vals.vals.len() - starting_updates;
if let Some(frontier) = frontier.as_ref() {
OrdKeyBatch::advance_builder_from(&mut self.result, frontier, initial_key_pos);
}
if effort >= *fuel { *fuel = 0; }
else { *fuel -= effort; }
}
}
#[derive(Debug)]
pub struct OrdKeyCursor<T: Lattice+Ord+Clone, R: Monoid> {
valid: bool,
empty: (),
cursor: OrderedCursor<OrderedLeaf<T, R>>,
}
impl<K: Ord+Clone, T: Lattice+Ord+Clone, R: Monoid> Cursor<K, (), T, R> for OrdKeyCursor<T, R> {
type Storage = OrdKeyBatch<K, T, R>;
fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) }
fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { unsafe { ::std::mem::transmute(&self.empty) } }
fn map_times<L: FnMut(&T, &R)>(&mut self, storage: &Self::Storage, mut logic: L) {
self.cursor.child.rewind(&storage.layer.vals);
while self.cursor.child.valid(&storage.layer.vals) {
logic(&self.cursor.child.key(&storage.layer.vals).0, &self.cursor.child.key(&storage.layer.vals).1);
self.cursor.child.step(&storage.layer.vals);
}
}
fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) }
fn val_valid(&self, _storage: &Self::Storage) -> bool { self.valid }
fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); self.valid = true; }
fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek(&storage.layer, key); self.valid = true; }
fn step_val(&mut self, _storage: &Self::Storage) { self.valid = false; }
fn seek_val(&mut self, _storage: &Self::Storage, _val: &()) { }
fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); self.valid = true; }
fn rewind_vals(&mut self, _storage: &Self::Storage) { self.valid = true; }
}
pub struct OrdKeyBuilder<K: Ord, T: Ord+Lattice, R: Monoid> {
builder: OrderedBuilder<K, OrderedLeafBuilder<T, R>>,
}
impl<K, T, R> Builder<K, (), T, R, OrdKeyBatch<K, T, R>> for OrdKeyBuilder<K, T, R>
where K: Ord+Clone+'static, T: Lattice+Ord+Clone+'static, R: Monoid {
fn new() -> Self {
OrdKeyBuilder {
builder: OrderedBuilder::<K, OrderedLeafBuilder<T, R>>::new()
}
}
fn with_capacity(cap: usize) -> Self {
OrdKeyBuilder {
builder: <OrderedBuilder<K, OrderedLeafBuilder<T, R>> as TupleBuilder>::with_capacity(cap)
}
}
#[inline]
fn push(&mut self, (key, _, time, diff): (K, (), T, R)) {
self.builder.push_tuple((key, (time, diff)));
}
#[inline(never)]
fn done(self, lower: &[T], upper: &[T], since: &[T]) -> OrdKeyBatch<K, T, R> {
OrdKeyBatch {
layer: self.builder.done(),
desc: Description::new(lower, upper, since)
}
}
}