#![deny(missing_docs)]
#[macro_use]
extern crate abomonation_derive;
use std::cmp::Ordering;
#[derive(Abomonation, PartialEq, Debug, Clone, Copy, Hash, Eq, PartialOrd, Ord)]
pub enum ActivityType {
Scheduling = 0,
Processing = 2,
Spinning = 1,
Serialization = 3,
Deserialization = 4,
ControlMessage = 5,
DataMessage = 6,
Waiting = 8,
Busy = 9,
}
#[derive(Abomonation, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone, Copy)]
pub enum EventType {
Start = 1,
End = 2,
Sent = 3,
Received = 4,
}
pub type Worker = u64;
pub type Timestamp = std::time::Duration;
pub type CorrelatorId = u64;
pub type OperatorId = u64;
pub type ChannelId = u64;
#[derive(Abomonation, PartialEq, Eq, Hash, Clone, Debug)]
pub struct LogRecord {
pub seq_no: u64,
pub epoch: u64,
pub timestamp: Timestamp,
pub local_worker: Worker,
pub activity_type: ActivityType,
pub event_type: EventType,
pub remote_worker: Option<Worker>,
pub operator_id: Option<OperatorId>,
pub channel_id: Option<ChannelId>,
pub correlator_id: Option<u64>,
pub length: Option<usize>,
}
impl Ord for LogRecord {
fn cmp(&self, other: &LogRecord) -> Ordering {
self.timestamp.cmp(&other.timestamp)
}
}
impl PartialOrd for LogRecord {
fn partial_cmp(&self, other: &LogRecord) -> Option<Ordering> {
Some(self.cmp(other))
}
}
pub mod pair {
use differential_dataflow::lattice::Lattice;
use timely::{
order::{PartialOrder, TotalOrder},
progress::{timestamp::Refines, PathSummary, Timestamp}
};
use std::fmt::{Formatter, Error, Debug};
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation)]
pub struct Pair<S, T> {
pub first: S,
pub second: T,
}
impl<S, T> Pair<S, T> {
pub fn new(first: S, second: T) -> Self {
Pair { first, second }
}
}
impl<S: PartialOrder, T: PartialOrder> PartialOrder for Pair<S, T> {
fn less_equal(&self, other: &Self) -> bool {
self.first.less_than(&other.first) ||
self.first.less_equal(&other.first) && self.second.less_equal(&other.second)
}
}
impl<S: TotalOrder, T: TotalOrder> TotalOrder for Pair<S, T> {}
#[test]
fn compare_pairs() {
assert!(Pair::new(0, 0).less_equal(&Pair::new(0,0)));
assert!(Pair::new(0, 0).less_equal(&Pair::new(0,1)));
assert!(Pair::new(0, 0).less_equal(&Pair::new(1,0)));
assert!(Pair::new(0, 1).less_equal(&Pair::new(1,0)));
assert!(Pair::new(1, 0).less_equal(&Pair::new(1,0)));
assert!(Pair::new(1, 0).less_equal(&Pair::new(1,1)));
assert!(!Pair::new(1, 0).less_equal(&Pair::new(0,1)));
assert!(!Pair::new(1, 1).less_equal(&Pair::new(0,1000)));
assert!(Pair::new(0, 1000).less_equal(&Pair::new(1, 1)));
}
impl<S: Timestamp, T: Timestamp> Refines<()> for Pair<S, T> {
fn to_inner(_outer: ()) -> Self { Default::default() }
fn to_outer(self) -> () { () }
fn summarize(_summary: <Self>::Summary) -> () { () }
}
impl<S: Timestamp, T: Timestamp> PathSummary<Pair<S,T>> for () {
fn results_in(&self, timestamp: &Pair<S, T>) -> Option<Pair<S,T>> {
Some(timestamp.clone())
}
fn followed_by(&self, other: &Self) -> Option<Self> {
Some(other.clone())
}
}
impl<S: Timestamp, T: Timestamp> Timestamp for Pair<S, T> {
type Summary = ();
}
impl<S: Lattice, T: Lattice> Lattice for Pair<S, T> {
fn minimum() -> Self { Pair { first: S::minimum(), second: T::minimum() }}
fn join(&self, other: &Self) -> Self {
Pair {
first: self.first.join(&other.first),
second: self.second.join(&other.second),
}
}
fn meet(&self, other: &Self) -> Self {
Pair {
first: self.first.meet(&other.first),
second: self.second.meet(&other.second),
}
}
}
impl<TOuter: Debug, TInner: Debug> Debug for Pair<TOuter, TInner> {
fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
f.write_str(&format!("({:?}, {:?})", self.first, self.second))
}
}
}