use crate::logging::Logger;
use crate::trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic};
use crate::trace::cursor::CursorList;
use crate::trace::Merger;
use ::timely::dataflow::operators::generic::OperatorInfo;
use ::timely::progress::{Antichain, frontier::AntichainRef};
use ::timely::order::PartialOrder;
pub struct Spine<B: Batch> {
operator: OperatorInfo,
logger: Option<Logger>,
logical_frontier: Antichain<B::Time>, physical_frontier: Antichain<B::Time>, merging: Vec<MergeState<B>>, pending: Vec<B>, upper: Antichain<B::Time>,
effort: usize,
activator: Option<timely::scheduling::activate::Activator>,
exert_logic_param: Vec<(usize, usize, usize)>,
exert_logic: Option<ExertionLogic>,
}
use crate::trace::WithLayout;
impl<B: Batch> WithLayout for Spine<B> {
type Layout = B::Layout;
}
impl<B: Batch+Clone+'static> TraceReader for Spine<B> {
type Batch = B;
type Storage = Vec<B>;
type Cursor = CursorList<<B as BatchReader>::Cursor>;
fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, Self::Storage)> {
if upper.less_equal(&<Self::Time as timely::progress::Timestamp>::minimum()) {
let cursors = Vec::new();
let storage = Vec::new();
return Some((CursorList::new(cursors, &storage), storage));
}
assert!(self.logical_frontier.borrow().len() > 0);
assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &upper));
let mut cursors = Vec::new();
let mut storage = Vec::new();
for merge_state in self.merging.iter().rev() {
match merge_state {
MergeState::Double(variant) => {
match variant {
MergeVariant::InProgress(batch1, batch2, _) => {
if !batch1.is_empty() {
cursors.push(batch1.cursor());
storage.push(batch1.clone());
}
if !batch2.is_empty() {
cursors.push(batch2.cursor());
storage.push(batch2.clone());
}
},
MergeVariant::Complete(Some((batch, _))) => {
if !batch.is_empty() {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
}
MergeVariant::Complete(None) => { },
}
},
MergeState::Single(Some(batch)) => {
if !batch.is_empty() {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
},
MergeState::Single(None) => { },
MergeState::Vacant => { },
}
}
for batch in self.pending.iter() {
if !batch.is_empty() {
let include_lower = PartialOrder::less_equal(&batch.lower().borrow(), &upper);
let include_upper = PartialOrder::less_equal(&batch.upper().borrow(), &upper);
if include_lower != include_upper && upper != batch.lower().borrow() {
panic!("`cursor_through`: `upper` straddles batch");
}
if include_upper {
cursors.push(batch.cursor());
storage.push(batch.clone());
}
}
}
Some((CursorList::new(cursors, &storage), storage))
}
#[inline]
fn set_logical_compaction(&mut self, frontier: AntichainRef<B::Time>) {
self.logical_frontier.clear();
self.logical_frontier.extend(frontier.iter().cloned());
}
#[inline]
fn get_logical_compaction(&mut self) -> AntichainRef<'_, B::Time> { self.logical_frontier.borrow() }
#[inline]
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, B::Time>) {
debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier);
self.physical_frontier.clear();
self.physical_frontier.extend(frontier.iter().cloned());
self.consider_merges();
}
#[inline]
fn get_physical_compaction(&mut self) -> AntichainRef<'_, B::Time> { self.physical_frontier.borrow() }
#[inline]
fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
for batch in self.merging.iter().rev() {
match batch {
MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => { f(batch1); f(batch2); },
MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { f(batch) },
MergeState::Single(Some(batch)) => { f(batch) },
_ => { },
}
}
for batch in self.pending.iter() {
f(batch);
}
}
}
impl<B: Batch+Clone+'static> Trace for Spine<B> {
fn new(
info: ::timely::dataflow::operators::generic::OperatorInfo,
logging: Option<crate::logging::Logger>,
activator: Option<timely::scheduling::activate::Activator>,
) -> Self {
Self::with_effort(1, info, logging, activator)
}
fn exert(&mut self) {
self.tidy_layers();
if let Some(effort) = self.exert_effort() {
if self.merging.iter().any(|b| b.is_double()) {
self.apply_fuel(&mut (effort as isize));
}
else {
let level = effort.next_power_of_two().trailing_zeros() as usize;
self.introduce_batch(None, level);
}
if let Some(activator) = &self.activator {
activator.activate();
}
}
}
fn set_exert_logic(&mut self, logic: ExertionLogic) {
self.exert_logic = Some(logic);
}
fn insert(&mut self, batch: Self::Batch) {
self.logger.as_ref().map(|l| l.log(crate::logging::BatchEvent {
operator: self.operator.global_id,
length: batch.len()
}));
assert!(batch.lower() != batch.upper());
assert_eq!(batch.lower(), &self.upper);
self.upper.clone_from(batch.upper());
self.pending.push(batch);
self.consider_merges();
}
fn close(&mut self) {
if !self.upper.borrow().is_empty() {
self.insert(B::empty(self.upper.clone(), Antichain::new()));
}
}
}
impl<B: Batch> Drop for Spine<B> {
fn drop(&mut self) {
self.drop_batches();
}
}
impl<B: Batch> Spine<B> {
fn drop_batches(&mut self) {
if let Some(logger) = &self.logger {
for batch in self.merging.drain(..) {
match batch {
MergeState::Single(Some(batch)) => {
logger.log(crate::logging::DropEvent {
operator: self.operator.global_id,
length: batch.len(),
});
},
MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => {
logger.log(crate::logging::DropEvent {
operator: self.operator.global_id,
length: batch1.len(),
});
logger.log(crate::logging::DropEvent {
operator: self.operator.global_id,
length: batch2.len(),
});
},
MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => {
logger.log(crate::logging::DropEvent {
operator: self.operator.global_id,
length: batch.len(),
});
}
_ => { },
}
}
for batch in self.pending.drain(..) {
logger.log(crate::logging::DropEvent {
operator: self.operator.global_id,
length: batch.len(),
});
}
}
}
}
impl<B: Batch> Spine<B> {
fn exert_effort(&mut self) -> Option<usize> {
self.exert_logic.as_ref().and_then(|exert_logic| {
self.exert_logic_param.clear();
self.exert_logic_param.extend(self.merging.iter().enumerate().rev().map(|(index, batch)| {
match batch {
MergeState::Vacant => (index, 0, 0),
MergeState::Single(_) => (index, 1, batch.len()),
MergeState::Double(_) => (index, 2, batch.len()),
}
}));
(exert_logic)(&self.exert_logic_param[..])
})
}
#[allow(dead_code)]
fn describe(&self) -> Vec<(usize, usize)> {
self.merging
.iter()
.map(|b| match b {
MergeState::Vacant => (0, 0),
x @ MergeState::Single(_) => (1, x.len()),
x @ MergeState::Double(_) => (2, x.len()),
})
.collect()
}
pub fn with_effort(
mut effort: usize,
operator: OperatorInfo,
logger: Option<crate::logging::Logger>,
activator: Option<timely::scheduling::activate::Activator>,
) -> Self {
if effort == 0 { effort = 1; }
Spine {
operator,
logger,
logical_frontier: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
physical_frontier: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
merging: Vec::new(),
pending: Vec::new(),
upper: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
effort,
activator,
exert_logic_param: Vec::default(),
exert_logic: None,
}
}
#[inline(never)]
fn consider_merges(&mut self) {
while !self.pending.is_empty() && PartialOrder::less_equal(self.pending[0].upper(), &self.physical_frontier)
{
let mut batch = Some(self.pending.remove(0));
if batch.as_ref().unwrap().len() == 0 {
if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
if self.merging[position].is_single() && self.merging[position].len() == 0 {
self.insert_at(batch.take(), position);
let merged = self.complete_at(position);
self.merging[position] = MergeState::Single(merged);
}
}
}
if let Some(batch) = batch {
let index = batch.len().next_power_of_two();
self.introduce_batch(Some(batch), index.trailing_zeros() as usize);
}
}
if self.exert_effort().is_some() {
if let Some(activator) = &self.activator {
activator.activate();
}
}
}
pub fn introduce_batch(&mut self, batch: Option<B>, batch_index: usize) {
if batch_index > 32 { println!("Large batch index: {}", batch_index); }
let mut fuel = 8 << batch_index;
fuel *= self.effort;
let mut fuel = fuel as isize;
self.apply_fuel(&mut fuel);
self.roll_up(batch_index);
self.insert_at(batch, batch_index);
self.tidy_layers();
}
fn roll_up(&mut self, index: usize) {
while self.merging.len() <= index {
self.merging.push(MergeState::Vacant);
}
if self.merging[.. index].iter().any(|m| !m.is_vacant()) {
let mut merged = None;
for i in 0 .. index {
self.insert_at(merged, i);
merged = self.complete_at(i);
}
self.insert_at(merged, index);
if self.merging[index].is_double() {
let merged = self.complete_at(index);
self.insert_at(merged, index + 1);
}
}
}
pub fn apply_fuel(&mut self, fuel: &mut isize) {
for index in 0 .. self.merging.len() {
let mut fuel = *fuel;
self.merging[index].work(&mut fuel);
if self.merging[index].is_complete() {
let complete = self.complete_at(index);
self.insert_at(complete, index+1);
}
}
}
fn insert_at(&mut self, batch: Option<B>, index: usize) {
while self.merging.len() <= index {
self.merging.push(MergeState::Vacant);
}
match self.merging[index].take() {
MergeState::Vacant => {
self.merging[index] = MergeState::Single(batch);
}
MergeState::Single(old) => {
self.logger.as_ref().map(|l| l.log(
crate::logging::MergeEvent {
operator: self.operator.global_id,
scale: index,
length1: old.as_ref().map(|b| b.len()).unwrap_or(0),
length2: batch.as_ref().map(|b| b.len()).unwrap_or(0),
complete: None,
}
));
let compaction_frontier = self.logical_frontier.borrow();
self.merging[index] = MergeState::begin_merge(old, batch, compaction_frontier);
}
MergeState::Double(_) => {
panic!("Attempted to insert batch into incomplete merge!")
}
};
}
fn complete_at(&mut self, index: usize) -> Option<B> {
if let Some((merged, inputs)) = self.merging[index].complete() {
if let Some((input1, input2)) = inputs {
self.logger.as_ref().map(|l| l.log(
crate::logging::MergeEvent {
operator: self.operator.global_id,
scale: index,
length1: input1.len(),
length2: input2.len(),
complete: Some(merged.len()),
}
));
}
Some(merged)
}
else {
None
}
}
fn tidy_layers(&mut self) {
if !self.merging.is_empty() {
let mut length = self.merging.len();
if self.merging[length-1].is_single() {
let appropriate_level = self.merging[length-1].len().next_power_of_two().trailing_zeros() as usize;
while appropriate_level < length-1 {
match self.merging[length-2].take() {
MergeState::Vacant | MergeState::Single(None) => {
self.merging.remove(length-2);
length = self.merging.len();
}
MergeState::Single(Some(batch)) => {
let mut smaller = 0;
for (index, batch) in self.merging[..(length-2)].iter().enumerate() {
match batch {
MergeState::Vacant => { },
MergeState::Single(_) => { smaller += 1 << index; },
MergeState::Double(_) => { smaller += 2 << index; },
}
}
if smaller <= (1 << length) / 8 {
self.merging.remove(length-2);
self.insert_at(Some(batch), length-2);
}
else {
self.merging[length-2] = MergeState::Single(Some(batch));
}
return;
}
MergeState::Double(state) => {
self.merging[length-2] = MergeState::Double(state);
return;
}
}
}
}
}
}
}
enum MergeState<B: Batch> {
Vacant,
Single(Option<B>),
Double(MergeVariant<B>),
}
impl<B: Batch<Time: Eq>> MergeState<B> {
fn len(&self) -> usize {
match self {
MergeState::Single(Some(b)) => b.len(),
MergeState::Double(MergeVariant::InProgress(b1,b2,_)) => b1.len() + b2.len(),
MergeState::Double(MergeVariant::Complete(Some((b, _)))) => b.len(),
_ => 0,
}
}
fn is_vacant(&self) -> bool {
if let MergeState::Vacant = self { true } else { false }
}
fn is_single(&self) -> bool {
if let MergeState::Single(_) = self { true } else { false }
}
fn is_double(&self) -> bool {
if let MergeState::Double(_) = self { true } else { false }
}
fn complete(&mut self) -> Option<(B, Option<(B, B)>)> {
match std::mem::replace(self, MergeState::Vacant) {
MergeState::Vacant => None,
MergeState::Single(batch) => batch.map(|b| (b, None)),
MergeState::Double(variant) => variant.complete(),
}
}
fn is_complete(&mut self) -> bool {
if let MergeState::Double(MergeVariant::Complete(_)) = self {
true
}
else {
false
}
}
fn work(&mut self, fuel: &mut isize) {
if let MergeState::Double(layer) = self {
layer.work(fuel)
}
}
fn take(&mut self) -> Self {
std::mem::replace(self, MergeState::Vacant)
}
fn begin_merge(batch1: Option<B>, batch2: Option<B>, compaction_frontier: AntichainRef<B::Time>) -> MergeState<B> {
let variant =
match (batch1, batch2) {
(Some(batch1), Some(batch2)) => {
assert!(batch1.upper() == batch2.lower());
let begin_merge = <B as Batch>::begin_merge(&batch1, &batch2, compaction_frontier);
MergeVariant::InProgress(batch1, batch2, begin_merge)
}
(None, Some(x)) => MergeVariant::Complete(Some((x, None))),
(Some(x), None) => MergeVariant::Complete(Some((x, None))),
(None, None) => MergeVariant::Complete(None),
};
MergeState::Double(variant)
}
}
enum MergeVariant<B: Batch> {
InProgress(B, B, <B as Batch>::Merger),
Complete(Option<(B, Option<(B, B)>)>),
}
impl<B: Batch> MergeVariant<B> {
fn complete(mut self) -> Option<(B, Option<(B, B)>)> {
let mut fuel = isize::max_value();
self.work(&mut fuel);
if let MergeVariant::Complete(batch) = self { batch }
else { panic!("Failed to complete a merge!"); }
}
fn work(&mut self, fuel: &mut isize) {
let variant = std::mem::replace(self, MergeVariant::Complete(None));
if let MergeVariant::InProgress(b1,b2,mut merge) = variant {
merge.work(&b1,&b2,fuel);
if *fuel > 0 {
*self = MergeVariant::Complete(Some((merge.done(), Some((b1,b2)))));
}
else {
*self = MergeVariant::InProgress(b1,b2,merge);
}
}
else {
*self = variant;
}
}
}