use ::Diff;
use lattice::Lattice;
use trace::{Batch, BatchReader, Trace, TraceReader};
use trace::cursor::cursor_list::CursorList;
use trace::cursor::Cursor;
#[derive(Debug)]
pub struct Spine<K, V, T: Lattice+Ord, R: Diff, B: Batch<K, V, T, R>> {
phantom: ::std::marker::PhantomData<(K, V, R)>,
advance_frontier: Vec<T>, through_frontier: Vec<T>, merging: Vec<B>, pending: Vec<B>, }
impl<K, V, T, R, B> TraceReader<K, V, T, R> for Spine<K, V, T, R, B>
where
K: Ord+Clone, V: Ord+Clone, T: Lattice+Ord+Clone, R: Diff,
B: Batch<K, V, T, R>+Clone+'static,
{
type Batch = B;
type Cursor = CursorList<K, V, T, R, <B as BatchReader<K, V, T, R>>::Cursor>;
fn cursor_through(&mut self, upper: &[T]) -> Option<(Self::Cursor, <Self::Cursor as Cursor<K, V, T, R>>::Storage)> {
assert!(self.advance_frontier.len() > 0);
if upper.iter().all(|t1| self.through_frontier.iter().any(|t2| t2.less_equal(t1))) {
let mut cursors = Vec::new();
let mut storage = Vec::new();
for (cursor, store) in self.merging.iter().filter(|b| b.len() > 0).map(|b| (b.cursor(), b.clone())) {
cursors.push(cursor);
storage.push(store);
}
for batch in &self.pending {
let include_lower = upper.iter().all(|t1| batch.lower().iter().any(|t2| t2.less_equal(t1)));
let include_upper = upper.iter().all(|t1| batch.upper().iter().any(|t2| t2.less_equal(t1)));
if include_lower != include_upper && upper != batch.lower() {
panic!("`cursor_through`: `upper` straddles batch");
}
if include_upper {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
}
Some((CursorList::new(cursors, &storage), storage))
}
else {
None
}
}
fn advance_by(&mut self, frontier: &[T]) {
self.advance_frontier = frontier.to_vec();
if self.advance_frontier.len() == 0 {
self.pending.clear();
self.merging.clear();
}
}
fn advance_frontier(&mut self) -> &[T] { &self.advance_frontier[..] }
fn distinguish_since(&mut self, frontier: &[T]) {
self.through_frontier = frontier.to_vec();
self.consider_merges();
}
fn distinguish_frontier(&mut self) -> &[T] { &self.through_frontier[..] }
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, mut f: F) {
for batch in self.merging.iter() {
f(batch);
}
for batch in self.pending.iter() {
f(batch);
}
}
}
impl<K, V, T, R, B> Trace<K, V, T, R> for Spine<K, V, T, R, B>
where
K: Ord+Clone, V: Ord+Clone, T: Lattice+Ord+Clone, R: Diff,
B: Batch<K, V, T, R>+Clone+'static,
{
fn new() -> Self {
Spine {
phantom: ::std::marker::PhantomData,
advance_frontier: vec![<T as Lattice>::minimum()],
through_frontier: vec![<T as Lattice>::minimum()],
merging: Vec::new(),
pending: Vec::new(),
}
}
fn insert(&mut self, batch: Self::Batch) {
if batch.lower() != batch.upper() {
self.pending.push(batch);
self.consider_merges();
}
else {
assert!(batch.len() == 0);
}
}
}
impl<K, V, T, R, B> Spine<K, V, T, R, B>
where
K: Ord+Clone, V: Ord+Clone, T: Lattice+Ord+Clone, R: Diff,
B: Batch<K, V, T, R>,
{
#[inline(never)]
fn consider_merges(&mut self) {
while self.pending.len() > 0 &&
self.through_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1)))
{
let batch = self.pending.remove(0);
while self.merging.len() >= 2 && self.merging[self.merging.len() - 2].len() < batch.len() {
let batch1 = self.merging.pop().unwrap();
let batch2 = self.merging.pop().unwrap();
let result = batch2.merge(&batch1);
self.merging.push(result);
}
self.merging.push(batch);
let mut len = self.merging.len();
while len >= 2 && self.merging[len - 2].len() < 2 * self.merging[len - 1].len() {
let mut batch1 = self.merging.pop().unwrap();
let mut batch2 = self.merging.pop().unwrap();
if self.merging.len() == 0 {
batch1.advance_mut(&self.advance_frontier[..]);
batch2.advance_mut(&self.advance_frontier[..]);
}
let result = batch2.merge(&batch1);
self.merging.push(result);
len = self.merging.len();
}
}
}
}