use columnar::Columnar;
use serde::{Deserialize, Serialize};
pub type DifferentialEventBuilder =
timely::container::CapacityContainerBuilder<Vec<(std::time::Duration, DifferentialEvent)>>;
pub type Logger = ::timely::logging_core::TypedLogger<DifferentialEventBuilder, DifferentialEvent>;
pub fn enable<A, W>(
worker: &mut timely::worker::Worker<A>,
writer: W,
) -> Option<Box<dyn std::any::Any + 'static>>
where
A: timely::communication::Allocate,
W: std::io::Write + 'static,
{
worker.log_register().and_then(|mut log_register| {
let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
let mut logger = ::timely::logging::BatchLogger::new(writer);
log_register
.insert::<DifferentialEventBuilder, _>("differential/arrange", move |time, data| {
logger.publish_batch(time, data)
})
})
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
pub enum DifferentialEvent {
Batch(BatchEvent),
Merge(MergeEvent),
Drop(DropEvent),
MergeShortfall(MergeShortfall),
TraceShare(TraceShare),
Batcher(BatcherEvent),
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
pub struct BatchEvent {
pub operator: usize,
pub length: usize,
}
impl From<BatchEvent> for DifferentialEvent {
fn from(e: BatchEvent) -> Self {
DifferentialEvent::Batch(e)
}
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
pub struct BatcherEvent {
pub operator: usize,
pub records_diff: isize,
pub size_diff: isize,
pub capacity_diff: isize,
pub allocations_diff: isize,
}
impl From<BatcherEvent> for DifferentialEvent {
fn from(e: BatcherEvent) -> Self {
DifferentialEvent::Batcher(e)
}
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
pub struct DropEvent {
pub operator: usize,
pub length: usize,
}
impl From<DropEvent> for DifferentialEvent {
fn from(e: DropEvent) -> Self {
DifferentialEvent::Drop(e)
}
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
pub struct MergeEvent {
pub operator: usize,
pub scale: usize,
pub length1: usize,
pub length2: usize,
pub complete: Option<usize>,
}
impl From<MergeEvent> for DifferentialEvent {
fn from(e: MergeEvent) -> Self {
DifferentialEvent::Merge(e)
}
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
pub struct MergeShortfall {
pub operator: usize,
pub scale: usize,
pub shortfall: usize,
}
impl From<MergeShortfall> for DifferentialEvent {
fn from(e: MergeShortfall) -> Self {
DifferentialEvent::MergeShortfall(e)
}
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
pub struct TraceShare {
pub operator: usize,
pub diff: isize,
}
impl From<TraceShare> for DifferentialEvent {
fn from(e: TraceShare) -> Self {
DifferentialEvent::TraceShare(e)
}
}