use std::fmt::Debug;
use ::logging::Logger;
use ::difference::Semigroup;
use lattice::Lattice;
use trace::{Batch, BatchReader, Trace, TraceReader};
use trace::cursor::{Cursor, CursorList};
use trace::Merger;
use ::timely::dataflow::operators::generic::OperatorInfo;
pub struct Spine<K, V, T: Lattice+Ord, R: Semigroup, B: Batch<K, V, T, R>> {
operator: OperatorInfo,
logger: Option<Logger>,
phantom: ::std::marker::PhantomData<(K, V, R)>,
advance_frontier: Vec<T>, through_frontier: Vec<T>, merging: Vec<MergeState<K,V,T,R,B>>, pending: Vec<B>, upper: Vec<T>,
effort: usize,
activator: Option<timely::scheduling::activate::Activator>,
}
impl<K, V, T, R, B> TraceReader for Spine<K, V, T, R, B>
where
K: Ord+Clone, V: Ord+Clone, T: Lattice+Ord+Clone+Debug+Default,
R: Semigroup,
B: Batch<K, V, T, R>+Clone+'static,
{
type Key = K;
type Val = V;
type Time = T;
type R = R;
type Batch = B;
type Cursor = CursorList<K, V, T, R, <B as BatchReader<K, V, T, R>>::Cursor>;
fn cursor_through(&mut self, upper: &[T]) -> Option<(Self::Cursor, <Self::Cursor as Cursor<K, V, T, R>>::Storage)> {
assert!(self.advance_frontier.len() > 0);
assert!(upper.iter().all(|t1| self.through_frontier.iter().any(|t2| t2.less_equal(t1))));
let mut cursors = Vec::new();
let mut storage = Vec::new();
for merge_state in self.merging.iter().rev() {
match merge_state {
MergeState::Double(variant) => {
match variant {
MergeVariant::InProgress(batch1, batch2, _, _) => {
if !batch1.is_empty() {
cursors.push(batch1.cursor());
storage.push(batch1.clone());
}
if !batch2.is_empty() {
cursors.push(batch2.cursor());
storage.push(batch2.clone());
}
},
MergeVariant::Complete(Some((batch, _))) => {
if !batch.is_empty() {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
}
MergeVariant::Complete(None) => { },
}
},
MergeState::Single(Some(batch)) => {
if !batch.is_empty() {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
},
MergeState::Single(None) => { },
MergeState::Vacant => { },
}
}
for batch in self.pending.iter() {
if !batch.is_empty() {
let include_lower = upper.iter().all(|t1| batch.lower().iter().any(|t2| t2.less_equal(t1)));
let include_upper = upper.iter().all(|t1| batch.upper().iter().any(|t2| t2.less_equal(t1)));
if include_lower != include_upper && upper != batch.lower() {
panic!("`cursor_through`: `upper` straddles batch");
}
if include_upper {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
}
}
Some((CursorList::new(cursors, &storage), storage))
}
fn advance_by(&mut self, frontier: &[T]) {
self.advance_frontier = frontier.to_vec();
if self.advance_frontier.len() == 0 {
self.pending.clear();
self.merging.clear();
}
}
fn advance_frontier(&mut self) -> &[T] { &self.advance_frontier[..] }
fn distinguish_since(&mut self, frontier: &[T]) {
self.through_frontier = frontier.to_vec();
self.consider_merges();
}
fn distinguish_frontier(&mut self) -> &[T] { &self.through_frontier[..] }
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, mut f: F) {
for batch in self.merging.iter().rev() {
match batch {
MergeState::Double(MergeVariant::InProgress(batch1, batch2, _, _)) => { f(batch1); f(batch2); },
MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { f(batch) },
MergeState::Single(Some(batch)) => { f(batch) },
_ => { },
}
}
for batch in self.pending.iter() {
f(batch);
}
}
}
impl<K, V, T, R, B> Trace for Spine<K, V, T, R, B>
where
K: Ord+Clone,
V: Ord+Clone,
T: Lattice+Ord+Clone+Debug+Default,
R: Semigroup,
B: Batch<K, V, T, R>+Clone+'static,
{
fn new(
info: ::timely::dataflow::operators::generic::OperatorInfo,
logging: Option<::logging::Logger>,
activator: Option<timely::scheduling::activate::Activator>,
) -> Self {
Self::with_effort(1, info, logging, activator)
}
fn exert(&mut self, effort: &mut isize) {
self.tidy_layers();
if !self.reduced() {
if self.merging.iter().any(|b| b.is_double()) {
self.apply_fuel(effort);
}
else {
let level = (*effort as usize).next_power_of_two().trailing_zeros() as usize;
self.introduce_batch(None, level);
}
if let Some(activator) = &self.activator {
activator.activate();
}
}
}
fn insert(&mut self, batch: Self::Batch) {
self.logger.as_ref().map(|l| l.log(::logging::BatchEvent {
operator: self.operator.global_id,
length: batch.len()
}));
assert!(batch.lower() != batch.upper());
assert_eq!(batch.lower(), &self.upper[..]);
self.upper = batch.upper().to_vec();
self.pending.push(batch);
self.consider_merges();
}
fn close(&mut self) {
if !self.upper.is_empty() {
use trace::Builder;
let builder = B::Builder::new();
let batch = builder.done(&self.upper[..], &[], &self.upper[..]);
self.insert(batch);
}
}
}
impl<K, V, T, R, B> Spine<K, V, T, R, B>
where
K: Ord+Clone,
V: Ord+Clone,
T: Lattice+Ord+Clone+Debug+Default,
R: Semigroup,
B: Batch<K, V, T, R>,
{
fn reduced(&self) -> bool {
let mut non_empty = 0;
for index in 0 .. self.merging.len() {
if self.merging[index].is_double() { return false; }
if self.merging[index].len() > 0 { non_empty += 1; }
if non_empty > 1 { return false; }
}
true
}
#[allow(dead_code)]
fn describe(&self) -> Vec<(usize, usize)> {
self.merging
.iter()
.map(|b| match b {
MergeState::Vacant => (0, 0),
x @ MergeState::Single(_) => (1, x.len()),
x @ MergeState::Double(_) => (2, x.len()),
})
.collect()
}
pub fn with_effort(
mut effort: usize,
operator: OperatorInfo,
logger: Option<::logging::Logger>,
activator: Option<timely::scheduling::activate::Activator>,
) -> Self {
if effort == 0 { effort = 1; }
Spine {
operator,
logger,
phantom: ::std::marker::PhantomData,
advance_frontier: vec![<T as Lattice>::minimum()],
through_frontier: vec![<T as Lattice>::minimum()],
merging: Vec::new(),
pending: Vec::new(),
upper: vec![Default::default()],
effort,
activator,
}
}
#[inline(never)]
fn consider_merges(&mut self) {
while self.pending.len() > 0 &&
self.through_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1)))
{
let mut batch = Some(self.pending.remove(0));
if batch.as_ref().unwrap().len() == 0 {
if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
if self.merging[position].is_single() && self.merging[position].len() == 0 {
self.insert_at(batch.take(), position);
let merged = self.complete_at(position);
self.merging[position] = MergeState::Single(merged);
}
}
}
if let Some(batch) = batch {
let index = batch.len().next_power_of_two();
self.introduce_batch(Some(batch), index.trailing_zeros() as usize);
}
}
if !self.reduced() {
if let Some(activator) = &self.activator {
activator.activate();
}
}
}
pub fn introduce_batch(&mut self, batch: Option<B>, batch_index: usize) {
if batch_index > 32 { println!("Large batch index: {}", batch_index); }
let mut fuel = 8 << batch_index;
fuel *= self.effort;
let mut fuel = fuel as isize;
self.apply_fuel(&mut fuel);
self.roll_up(batch_index);
self.insert_at(batch, batch_index);
self.tidy_layers();
}
fn roll_up(&mut self, index: usize) {
while self.merging.len() <= index {
self.merging.push(MergeState::Vacant);
}
if self.merging[.. index].iter().any(|m| !m.is_vacant()) {
let mut merged = None;
for i in 0 .. index {
self.insert_at(merged, i);
merged = self.complete_at(i);
}
self.insert_at(merged, index);
if self.merging[index].is_double() {
let merged = self.complete_at(index);
self.insert_at(merged, index + 1);
}
}
}
pub fn apply_fuel(&mut self, fuel: &mut isize) {
for index in 0 .. self.merging.len() {
let mut fuel = *fuel;
self.merging[index].work(&mut fuel);
if self.merging[index].is_complete() {
let complete = self.complete_at(index);
self.insert_at(complete, index+1);
}
}
}
fn insert_at(&mut self, batch: Option<B>, index: usize) {
while self.merging.len() <= index {
self.merging.push(MergeState::Vacant);
}
match self.merging[index].take() {
MergeState::Vacant => {
self.merging[index] = MergeState::Single(batch);
}
MergeState::Single(old) => {
self.logger.as_ref().map(|l| l.log(
::logging::MergeEvent {
operator: self.operator.global_id,
scale: index,
length1: old.as_ref().map(|b| b.len()).unwrap_or(0),
length2: batch.as_ref().map(|b| b.len()).unwrap_or(0),
complete: None,
}
));
let frontier = Some(self.advance_frontier.clone());
self.merging[index] = MergeState::begin_merge(old, batch, frontier);
}
MergeState::Double(_) => {
panic!("Attempted to insert batch into incomplete merge!")
}
};
}
fn complete_at(&mut self, index: usize) -> Option<B> {
if let Some((merged, inputs)) = self.merging[index].complete() {
if let Some((input1, input2)) = inputs {
self.logger.as_ref().map(|l| l.log(
::logging::MergeEvent {
operator: self.operator.global_id,
scale: index,
length1: input1.len(),
length2: input2.len(),
complete: Some(merged.len()),
}
));
}
Some(merged)
}
else {
None
}
}
fn tidy_layers(&mut self) {
if !self.merging.is_empty() {
let mut length = self.merging.len();
if self.merging[length-1].is_single() {
let appropriate_level = self.merging[length-1].len().next_power_of_two().trailing_zeros() as usize;
while appropriate_level < length-1 {
match self.merging[length-2].take() {
MergeState::Vacant | MergeState::Single(None) => {
self.merging.remove(length-2);
length = self.merging.len();
}
MergeState::Single(Some(batch)) => {
let mut smaller = 0;
for (index, batch) in self.merging[..(length-2)].iter().enumerate() {
match batch {
MergeState::Vacant => { },
MergeState::Single(_) => { smaller += 1 << index; },
MergeState::Double(_) => { smaller += 2 << index; },
}
}
if smaller <= (1 << length) / 8 {
self.merging.remove(length-2);
self.insert_at(Some(batch), length-2);
}
else {
self.merging[length-2] = MergeState::Single(Some(batch));
}
return;
}
MergeState::Double(state) => {
self.merging[length-2] = MergeState::Double(state);
return;
}
}
}
}
}
}
}
enum MergeState<K, V, T, R, B: Batch<K, V, T, R>> {
Vacant,
Single(Option<B>),
Double(MergeVariant<K, V, T, R, B>),
}
impl<K, V, T: Eq, R, B: Batch<K, V, T, R>> MergeState<K, V, T, R, B> {
fn len(&self) -> usize {
match self {
MergeState::Single(Some(b)) => b.len(),
MergeState::Double(MergeVariant::InProgress(b1,b2,_,_)) => b1.len() + b2.len(),
MergeState::Double(MergeVariant::Complete(Some((b, _)))) => b.len(),
_ => 0,
}
}
fn is_vacant(&self) -> bool {
if let MergeState::Vacant = self { true } else { false }
}
fn is_single(&self) -> bool {
if let MergeState::Single(_) = self { true } else { false }
}
fn is_double(&self) -> bool {
if let MergeState::Double(_) = self { true } else { false }
}
fn complete(&mut self) -> Option<(B, Option<(B, B)>)> {
match std::mem::replace(self, MergeState::Vacant) {
MergeState::Vacant => None,
MergeState::Single(batch) => batch.map(|b| (b, None)),
MergeState::Double(variant) => variant.complete(),
}
}
fn is_complete(&mut self) -> bool {
if let MergeState::Double(MergeVariant::Complete(_)) = self {
true
}
else {
false
}
}
fn work(&mut self, fuel: &mut isize) {
if let MergeState::Double(layer) = self {
layer.work(fuel)
}
}
fn take(&mut self) -> Self {
std::mem::replace(self, MergeState::Vacant)
}
fn begin_merge(batch1: Option<B>, batch2: Option<B>, frontier: Option<Vec<T>>) -> MergeState<K, V, T, R, B> {
let variant =
match (batch1, batch2) {
(Some(batch1), Some(batch2)) => {
assert!(batch1.upper() == batch2.lower());
let begin_merge = <B as Batch<K, V, T, R>>::begin_merge(&batch1, &batch2);
MergeVariant::InProgress(batch1, batch2, frontier, begin_merge)
}
(None, Some(x)) => MergeVariant::Complete(Some((x, None))),
(Some(x), None) => MergeVariant::Complete(Some((x, None))),
(None, None) => MergeVariant::Complete(None),
};
MergeState::Double(variant)
}
}
enum MergeVariant<K, V, T, R, B: Batch<K, V, T, R>> {
InProgress(B, B, Option<Vec<T>>, <B as Batch<K,V,T,R>>::Merger),
Complete(Option<(B, Option<(B, B)>)>),
}
impl<K, V, T, R, B: Batch<K, V, T, R>> MergeVariant<K, V, T, R, B> {
fn complete(mut self) -> Option<(B, Option<(B, B)>)> {
let mut fuel = isize::max_value();
self.work(&mut fuel);
if let MergeVariant::Complete(batch) = self { batch }
else { panic!("Failed to complete a merge!"); }
}
fn work(&mut self, fuel: &mut isize) {
let variant = std::mem::replace(self, MergeVariant::Complete(None));
if let MergeVariant::InProgress(b1,b2,frontier,mut merge) = variant {
merge.work(&b1,&b2,&frontier,fuel);
if *fuel > 0 {
*self = MergeVariant::Complete(Some((merge.done(), Some((b1,b2)))));
}
else {
*self = MergeVariant::InProgress(b1,b2,frontier,merge);
}
}
else {
*self = variant;
}
}
}