extern crate time;
use std::cell::RefCell;
use std::io::Write;
use std::fs::File;
use ::Data;
use timely_communication::Allocate;
use ::progress::timestamp::RootTimestamp;
use ::progress::nested::product::Product;
use dataflow::scopes::root::Root;
use dataflow::operators::capture::{EventWriter, Event, EventPusher};
use abomonation::Abomonation;
static mut PRECISE_TIME_NS_DELTA: Option<i64> = None;
#[inline(always)]
fn get_precise_time_ns() -> u64 {
(time::precise_time_ns() as i64 - unsafe { PRECISE_TIME_NS_DELTA.unwrap() }) as u64
}
pub fn log<T: Logger>(logger: &'static ::std::thread::LocalKey<T>, record: T::Record) {
if cfg!(feature = "logging") {
logger.with(|x| x.log(record));
}
}
pub trait Logger {
type Record;
fn log(&self, record: Self::Record);
fn flush(&self);
}
pub struct EventStreamLogger<T: Data, S: Write> {
buffer: RefCell<Vec<(T,u64)>>,
stream: RefCell<Option<EventWriter<Product<RootTimestamp, u64>, (T,u64), S>>>,
last_time: RefCell<u64>,
}
impl<T: Data, S: Write> Logger for EventStreamLogger<T, S> {
type Record = T;
#[inline]
fn log(&self, record: T) {
self.buffer.borrow_mut().push((record, get_precise_time_ns()));
}
fn flush(&self) {
if let Some(ref mut writer) = *self.stream.borrow_mut() {
let time = get_precise_time_ns();
if self.buffer.borrow().len() > 0 {
writer.push(Event::Messages(RootTimestamp::new(*self.last_time.borrow()), self.buffer.borrow().clone()));
}
writer.push(Event::Progress(vec![(RootTimestamp::new(*self.last_time.borrow()),-1), (RootTimestamp::new(time), 1)]));
*self.last_time.borrow_mut() = time;
self.buffer.borrow_mut().clear();
}
else {
panic!("logging file not initialized!!!!");
}
}
}
impl<T: Data, S: Write> EventStreamLogger<T, S> {
fn new() -> EventStreamLogger<T, S> {
EventStreamLogger {
buffer: RefCell::new(Vec::new()),
stream: RefCell::new(None),
last_time: RefCell::new(0),
}
}
fn set(&self, stream: S) {
let mut stream = EventWriter::new(stream);
stream.push(Event::Progress(vec![(RootTimestamp::new(0), 1)]));
*self.stream.borrow_mut() = Some(stream);
}
}
impl<T: Data, S: Write> Drop for EventStreamLogger<T, S> {
fn drop(&mut self) {
if let Some(ref mut writer) = *self.stream.borrow_mut() {
if self.buffer.borrow().len() > 0 {
writer.push(Event::Messages(RootTimestamp::new(*self.last_time.borrow()), self.buffer.borrow().clone()));
}
writer.push(Event::Progress(vec![(RootTimestamp::new(*self.last_time.borrow()),-1)]));
}
else {
panic!("logging file not initialized!!!!");
}
}
}
pub fn initialize<A: Allocate>(root: &mut Root<A>) {
OPERATES.with(|x| x.set(File::create(format!("logs/operates-{}.abom", root.index())).unwrap()));
CHANNELS.with(|x| x.set(File::create(format!("logs/channels-{}.abom", root.index())).unwrap()));
MESSAGES.with(|x| x.set(File::create(format!("logs/messages-{}.abom", root.index())).unwrap()));
PROGRESS.with(|x| x.set(File::create(format!("logs/progress-{}.abom", root.index())).unwrap()));
SCHEDULE.with(|x| x.set(File::create(format!("logs/schedule-{}.abom", root.index())).unwrap()));
GUARDED_MESSAGE.with(|x| x.set(File::create(format!("logs/guarded_message-{}.abom", root.index())).unwrap()));
GUARDED_PROGRESS.with(|x| x.set(File::create(format!("logs/guarded_progress-{}.abom", root.index())).unwrap()));
unsafe {
PRECISE_TIME_NS_DELTA = Some({
let wall_time = time::get_time();
let wall_time_ns = wall_time.nsec as i64 + wall_time.sec * 1000000000;
time::precise_time_ns() as i64 - wall_time_ns
});
}
}
pub fn flush_logs() {
OPERATES.with(|x| x.flush());
CHANNELS.with(|x| x.flush());
PROGRESS.with(|x| x.flush());
MESSAGES.with(|x| x.flush());
SCHEDULE.with(|x| x.flush());
GUARDED_MESSAGE.with(|x| x.flush());
GUARDED_PROGRESS.with(|x| x.flush());
}
thread_local!{
pub static OPERATES: EventStreamLogger<OperatesEvent, File> = EventStreamLogger::new();
pub static CHANNELS: EventStreamLogger<ChannelsEvent, File> = EventStreamLogger::new();
pub static PROGRESS: EventStreamLogger<ProgressEvent, File> = EventStreamLogger::new();
pub static MESSAGES: EventStreamLogger<MessagesEvent, File> = EventStreamLogger::new();
pub static SCHEDULE: EventStreamLogger<ScheduleEvent, File> = EventStreamLogger::new();
pub static GUARDED_MESSAGE: EventStreamLogger<bool, File> = EventStreamLogger::new();
pub static GUARDED_PROGRESS: EventStreamLogger<bool, File> = EventStreamLogger::new();
}
#[derive(Debug, Clone)]
pub struct OperatesEvent {
pub id: usize,
pub addr: Vec<usize>,
pub name: String,
}
unsafe_abomonate!(OperatesEvent : addr, name);
#[derive(Debug, Clone)]
pub struct ChannelsEvent {
pub id: usize,
pub scope_addr: Vec<usize>,
pub source: (usize, usize),
pub target: (usize, usize),
}
unsafe_abomonate!(ChannelsEvent : id, scope_addr, source, target);
#[derive(Debug, Clone)]
pub struct ProgressEvent {
pub is_send: bool,
pub addr: Vec<usize>,
pub messages: Vec<(usize, usize, String, i64)>,
pub internal: Vec<(usize, usize, String, i64)>,
}
unsafe_abomonate!(ProgressEvent : is_send, addr, messages, internal);
#[derive(Debug, Clone)]
pub struct MessagesEvent {
pub is_send: bool,
pub channel: usize,
pub source: usize,
pub target: usize,
pub seq_no: usize,
pub length: usize,
}
unsafe_abomonate!(MessagesEvent);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StartStop {
Start,
Stop {
activity: bool
},
}
impl Abomonation for StartStop { }
#[test]
fn start_stop_abomonation_roundtrip() {
use abomonation::{encode, decode};
fn check(data: StartStop) -> () {
let mut bytes = Vec::new();
unsafe {
encode(&data, &mut bytes);
}
if let Some((result, remaining)) = unsafe { decode::<StartStop>(&mut bytes) } {
assert!(result == &data);
assert!(remaining.len() == 0);
}
}
for data in vec![
StartStop::Stop { activity: true },
StartStop::Stop { activity: false },
StartStop::Start,
] {
check(data);
}
}
#[derive(Debug, Clone)]
pub struct ScheduleEvent {
pub id: usize,
pub start_stop: StartStop,
}
unsafe_abomonate!(ScheduleEvent : id, start_stop);