use std::fmt::Debug;
use ::difference::Monoid;
use lattice::Lattice;
use trace::{Batch, BatchReader, Trace, TraceReader};
use trace::cursor::{Cursor, CursorList};
use trace::Merger;
use ::timely::dataflow::operators::generic::OperatorInfo;
enum MergeState<K, V, T, R, B: Batch<K, V, T, R>> {
Merging(B, B, Option<Vec<T>>, <B as Batch<K,V,T,R>>::Merger),
Complete(B),
}
impl<K, V, T: Eq, R, B: Batch<K, V, T, R>> MergeState<K, V, T, R, B> {
fn complete(mut self, logger: &mut Option<::logging::Logger>, operator: usize, scale: usize) -> B {
if let MergeState::Merging(ref source1, ref source2, ref frontier, ref mut in_progress) = self {
let mut fuel = usize::max_value();
in_progress.work(source1, source2, frontier, &mut fuel);
assert!(fuel > 0);
}
match self {
MergeState::Merging(b1, b2, _, finished) => {
let finished = finished.done();
logger.as_ref().map(|l|
l.log(::logging::MergeEvent {
operator,
scale,
length1: b1.len(),
length2: b2.len(),
complete: Some(finished.len()),
})
);
finished
},
MergeState::Complete(x) => x,
}
}
fn is_complete(&self) -> bool {
match *self {
MergeState::Complete(_) => true,
_ => false,
}
}
fn begin_merge(batch1: B, batch2: B, frontier: Option<Vec<T>>) -> Self {
assert!(batch1.upper() == batch2.lower());
let begin_merge = <B as Batch<K, V, T, R>>::begin_merge(&batch1, &batch2);
MergeState::Merging(batch1, batch2, frontier, begin_merge)
}
fn work(mut self, fuel: &mut usize, logger: &mut Option<::logging::Logger>, operator: usize, scale: usize) -> Self {
if let MergeState::Merging(ref source1, ref source2, ref frontier, ref mut in_progress) = self {
in_progress.work(source1, source2, frontier, fuel);
}
if *fuel > 0 {
match self {
MergeState::Merging(b1, b2, _, finished) => {
let finished = finished.done();
logger.as_ref().map(|l|
l.log(::logging::MergeEvent {
operator,
scale,
length1: b1.len(),
length2: b2.len(),
complete: Some(finished.len()),
})
);
MergeState::Complete(finished)
},
MergeState::Complete(x) => MergeState::Complete(x),
}
}
else { self }
}
fn len(&self) -> usize {
match *self {
MergeState::Merging(ref batch1, ref batch2, _, _) => batch1.len() + batch2.len(),
MergeState::Complete(ref batch) => batch.len(),
}
}
}
pub struct Spine<K, V, T: Lattice+Ord, R: Monoid, B: Batch<K, V, T, R>> {
operator: OperatorInfo,
logger: Option<::logging::Logger>,
phantom: ::std::marker::PhantomData<(K, V, R)>,
advance_frontier: Vec<T>,
through_frontier: Vec<T>,
merging: Vec<Option<MergeState<K,V,T,R,B>>>,
pending: Vec<B>,
upper: Vec<T>,
effort: usize,
}
impl<K, V, T, R, B> TraceReader for Spine<K, V, T, R, B>
where
K: Ord+Clone,
V: Ord+Clone,
T: Lattice+Ord+Clone+Debug+Default,
R: Monoid,
B: Batch<K, V, T, R>+Clone+'static,
{
type Key = K;
type Val = V;
type Time = T;
type R = R;
type Batch = B;
type Cursor = CursorList<K, V, T, R, <B as BatchReader<K, V, T, R>>::Cursor>;
fn cursor_through(&mut self, upper: &[T]) -> Option<(Self::Cursor, <Self::Cursor as Cursor<K, V, T, R>>::Storage)> {
assert!(self.advance_frontier.len() > 0);
assert!(upper.iter().all(|t1| self.through_frontier.iter().any(|t2| t2.less_equal(t1))));
let mut cursors = Vec::new();
let mut storage = Vec::new();
for merge_state in self.merging.iter().rev() {
match *merge_state {
Some(MergeState::Merging(ref batch1, ref batch2, _, _)) => {
if !batch1.is_empty() {
cursors.push(batch1.cursor());
storage.push(batch1.clone());
}
if !batch2.is_empty() {
cursors.push(batch2.cursor());
storage.push(batch2.clone());
}
},
Some(MergeState::Complete(ref batch)) => {
if !batch.is_empty() {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
},
None => { }
}
}
for batch in self.pending.iter() {
if !batch.is_empty() {
let include_lower = upper.iter().all(|t1| batch.lower().iter().any(|t2| t2.less_equal(t1)));
let include_upper = upper.iter().all(|t1| batch.upper().iter().any(|t2| t2.less_equal(t1)));
if include_lower != include_upper && upper != batch.lower() {
panic!("`cursor_through`: `upper` straddles batch");
}
if include_upper {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
}
}
Some((CursorList::new(cursors, &storage), storage))
}
fn advance_by(&mut self, frontier: &[T]) {
self.advance_frontier = frontier.to_vec();
if self.advance_frontier.len() == 0 {
self.pending.clear();
self.merging.clear();
}
}
fn advance_frontier(&mut self) -> &[T] { &self.advance_frontier[..] }
fn distinguish_since(&mut self, frontier: &[T]) {
self.through_frontier = frontier.to_vec();
self.consider_merges();
}
fn distinguish_frontier(&mut self) -> &[T] { &self.through_frontier[..] }
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, mut f: F) {
for batch in self.merging.iter().rev() {
match *batch {
Some(MergeState::Merging(ref batch1, ref batch2, _, _)) => { f(batch1); f(batch2); },
Some(MergeState::Complete(ref batch)) => { f(batch); },
None => { },
}
}
for batch in self.pending.iter() {
f(batch);
}
}
}
impl<K, V, T, R, B> Trace for Spine<K, V, T, R, B>
where
K: Ord+Clone,
V: Ord+Clone,
T: Lattice+Ord+Clone+Debug+Default,
R: Monoid,
B: Batch<K, V, T, R>+Clone+'static,
{
fn new(info: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>) -> Self {
Self::with_effort(4, info, logging)
}
fn insert(&mut self, batch: Self::Batch) {
self.logger.as_ref().map(|l| l.log(::logging::BatchEvent {
operator: self.operator.global_id,
length: batch.len()
}));
assert!(batch.lower() != batch.upper());
assert_eq!(batch.lower(), &self.upper[..]);
self.upper = batch.upper().to_vec();
self.pending.push(batch);
self.consider_merges();
}
fn close(&mut self) {
if !self.upper.is_empty() {
use trace::Builder;
let builder = B::Builder::new();
let batch = builder.done(&self.upper[..], &[], &self.upper[..]);
self.insert(batch);
}
}
}
impl<K, V, T, R, B> Spine<K, V, T, R, B>
where
K: Ord+Clone,
V: Ord+Clone,
T: Lattice+Ord+Clone+Debug+Default,
R: Monoid,
B: Batch<K, V, T, R>,
{
pub fn with_effort(mut effort: usize, operator: OperatorInfo, logger: Option<::logging::Logger>) -> Self {
if effort == 0 { effort = 1; }
Spine {
operator,
logger,
phantom: ::std::marker::PhantomData,
advance_frontier: vec![<T as Lattice>::minimum()],
through_frontier: vec![<T as Lattice>::minimum()],
merging: Vec::new(),
pending: Vec::new(),
upper: vec![Default::default()],
effort,
}
}
#[inline(never)]
fn consider_merges(&mut self) {
while self.pending.len() > 0 &&
self.through_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1)))
{
let batch = self.pending.remove(0);
let batch_size = batch.len().next_power_of_two();
let batch_index = batch_size.trailing_zeros() as usize;
while self.merging.len() <= batch_index { self.merging.push(None); }
if self.merging.len() > 32 { eprintln!("large progressive merge; len: {:?}", self.merging.len()); }
for position in 0 .. batch_index {
if let Some(batch) = self.merging[position].take() {
let batch = batch.complete(&mut self.logger, self.operator.global_id, position);
if let Some(batch2) = self.merging[position+1].take() {
let batch2 = batch2.complete(&mut self.logger, self.operator.global_id, position);
self.logger.as_ref().map(|l| l.log(
::logging::MergeEvent {
operator: self.operator.global_id,
scale: position,
length1: batch.len(),
length2: batch2.len(),
complete: None,
}
));
self.merging[position+1] = Some(MergeState::begin_merge(batch2, batch, None));
}
else {
self.merging[position+1] = Some(MergeState::Complete(batch));
};
}
}
if let Some(batch2) = self.merging[batch_index].take() {
let batch2 = batch2.complete(&mut self.logger, self.operator.global_id, batch_index);
let frontier = if batch_index == self.merging.len()-1 { Some(self.advance_frontier.clone()) } else { None };
self.logger.as_ref().map(|l| l.log(
::logging::MergeEvent {
operator: self.operator.global_id,
scale: batch_index,
length1: batch.len(),
length2: batch2.len(),
complete: None,
}
));
self.merging[batch_index] = Some(MergeState::begin_merge(batch2, batch, frontier));
}
else {
self.merging[batch_index] = Some(MergeState::Complete(batch));
}
let mut fuel = 0;
for position in (batch_index .. self.merging.len()).rev() {
fuel += (2 * batch_size).saturating_mul(self.effort);
let mut new_position = position;
while self.merging[new_position].as_ref().map(|x| !x.is_complete()).unwrap_or(false) && fuel > 0 {
if let Some(mut batch) = self.merging[new_position].take() {
batch = batch.work(&mut fuel, &mut self.logger, self.operator.global_id, position);
if batch.is_complete() && batch.len() >= (1 << new_position) {
new_position += 1;
if self.merging.len() <= new_position { self.merging.push(None); }
if let Some(mut batch2) = self.merging[new_position].take() {
if !batch2.is_complete() {
let mut temp_fuel = usize::max_value();
batch2 = batch2.work(&mut temp_fuel, &mut self.logger, self.operator.global_id, position);
self.logger.as_ref().map(|l| l.log(
::logging::MergeShortfall {
operator: self.operator.global_id,
scale: new_position,
shortfall: usize::max_value() - temp_fuel,
}
));
}
let batch1 = batch.complete(&mut self.logger, self.operator.global_id, position);
let batch2 = batch2.complete(&mut self.logger, self.operator.global_id, position);
let frontier = if new_position+1 == self.merging.len() { Some(self.advance_frontier.clone()) } else { None };
self.logger.as_ref().map(|l| l.log(
::logging::MergeEvent {
operator: self.operator.global_id,
scale: position,
length1: batch1.len(),
length2: batch2.len(),
complete: None,
}
));
self.merging[new_position] = Some(MergeState::begin_merge(batch2, batch1, frontier));
}
else {
self.merging[new_position] = Some(batch);
}
}
else {
self.merging[new_position] = Some(batch);
}
}
else {
}
}
}
for index in (1 .. self.merging.len()).rev() {
if self.merging[index].as_ref().map(|x| x.is_complete() && x.len() < (1 << (index-1))).unwrap_or(false) {
if self.merging[index-1].is_none() {
self.merging[index-1] = self.merging[index].take();
}
}
}
while self.merging.last().map(|x| x.is_none()) == Some(true) { self.merging.pop(); }
}
}
}