#![feature(get_mut_unchecked)]
#![feature(specialization)]
extern crate abomonation;
#[macro_use]
extern crate abomonation_derive;
extern crate bincode;
extern crate clap;
#[macro_use]
extern crate slog;
extern crate slog_term;
use clap::{App, Arg};
use rand::{Rng, SeedableRng, StdRng};
use serde::{Deserialize, Serialize};
use std::{cell::RefCell, fmt};
use uuid;
pub mod communication;
pub mod configuration;
pub mod dataflow;
pub mod node;
#[cfg(feature = "python")]
pub mod python;
pub mod scheduler;
pub use crate::configuration::Configuration;
#[macro_export]
macro_rules! make_operator_runner {
($t:ty, $config:expr, ($($rs:ident),+), ($($ws:ident),+)) => {
{
$(
let $rs = ($rs.get_id());
)+
$(
let $ws = ($ws.get_id());
)+
move |channel_manager: Arc<Mutex<ChannelManager>>| {
let mut op_ex_streams: Vec<Box<dyn OperatorExecutorStreamT>> = Vec::new();
$(
let $rs = {
let recv_endpoint = channel_manager.lock().unwrap().take_recv_endpoint($rs).unwrap();
let read_stream = ReadStream::from(InternalReadStream::from_endpoint(recv_endpoint, $rs));
op_ex_streams.push(
Box::new(OperatorExecutorStream::from(&read_stream))
);
read_stream
};
)+
$(
let $ws = {
let send_endpoints = channel_manager.lock().unwrap().get_send_endpoints($ws).unwrap();
WriteStream::from_endpoints(send_endpoints, $ws)
};
)+
let config = $config.clone();
let flow_watermarks = config.flow_watermarks;
let mut op = <$t>::new($config.clone(), $($rs.clone()),+, $($ws.clone()),+);
if flow_watermarks {
$crate::add_watermark_callback!(($($rs.add_state(())),+), ($($ws),+), (|timestamp, $($rs),+, $($ws),+| {
$(
match $ws.send(Message::new_watermark(timestamp.clone())) {
Ok(_) => (),
Err(_) => eprintln!("Error passing on watermark"),
}
)+
}));
}
thread::sleep(Duration::from_millis(500));
op.run();
let mut op_executor = OperatorExecutor::new(op_ex_streams, $crate::get_terminal_logger());
op_executor
}
}
};
($t:ty, $config:expr, ($($rs:ident),+), ()) => {
{
$(
let $rs = $rs.get_id();
)+
move |channel_manager: Arc<Mutex<ChannelManager>>| {
let mut op_ex_streams: Vec<Box<dyn OperatorExecutorStreamT>> = Vec::new();
$(
let $rs = {
let recv_endpoint = channel_manager.lock().unwrap().take_recv_endpoint($rs).unwrap();
let read_stream = ReadStream::from(InternalReadStream::from_endpoint(recv_endpoint, $rs));
op_ex_streams.push(
Box::new(OperatorExecutorStream::from(&read_stream))
);
read_stream
};
)+
let mut op = <$t>::new($config.clone(), $($rs),+);
thread::sleep(Duration::from_millis(500));
op.run();
let mut op_executor = OperatorExecutor::new(op_ex_streams, $crate::get_terminal_logger());
op_executor
}
}
};
($t:ty, $config:expr, (), ($($ws:ident),+)) => {
{
$(
let $ws = ($ws.get_id());
)+
move |channel_manager: Arc<Mutex<ChannelManager>>| {
$(
let $ws = {
let send_endpoints = channel_manager.lock().unwrap().get_send_endpoints($ws).unwrap();
WriteStream::from_endpoints(send_endpoints, $ws)
};
)+
let mut op_ex_streams: Vec<Box<dyn OperatorExecutorStreamT>> = Vec::new();
let mut op = <$t>::new($config.clone(), $($ws),+);
thread::sleep(Duration::from_millis(500));
op.run();
let mut op_executor = OperatorExecutor::new(op_ex_streams, $crate::get_terminal_logger());
op_executor
}
}
};
($t:ty, $config:expr, (), ()) => {
move |channel_manager: Arc<Mutex<ChannelManager>>| {
let op_ex_streams: Vec<Box<dyn OperatorExecutorStreamT>> = Vec::new();
let mut op = <$t>::new($config.clone());
thread::sleep(Duration::from_millis(500));
op.run();
let mut op_executor = OperatorExecutor::new(op_ex_streams, $crate::get_terminal_logger());
op_executor
}
};
}
#[macro_export]
macro_rules! imports {
() => {
use std::{
cell::RefCell,
rc::Rc,
sync::{mpsc, Arc, Mutex},
thread,
time::Duration,
};
use $crate::{
self,
dataflow::graph::default_graph,
dataflow::stream::{InternalReadStream, WriteStreamT},
dataflow::{Message, OperatorConfig, ReadStream, ReadStreamT, WriteStream},
node::operator_executor::{
OperatorExecutor, OperatorExecutorStream, OperatorExecutorStreamT,
},
scheduler::channel_manager::ChannelManager,
OperatorId,
};
};
}
#[macro_export]
macro_rules! register {
($t:ty, $config:expr, ($($rs:ident),*), ($($ws:ident),*)) => {
{
$crate::imports!();
let mut config = OperatorConfig::from($config);
config.id = OperatorId::new_deterministic();
let config_copy = config.clone();
let read_stream_ids = vec![$($rs.get_id()),*];
let write_stream_ids = vec![$($ws.get_id()),*];
let op_runner = $crate::make_operator_runner!($t, config_copy, ($($rs),*), ($($ws),*));
default_graph::add_operator(config.id, config.node_id, read_stream_ids, write_stream_ids, op_runner);
$(
default_graph::add_operator_stream(config.id, &$ws);
)*
($(ReadStream::from(&$ws)),*)
}
};
}
#[macro_export]
macro_rules! connect_0_write {
($t:ty, $config:expr) => {
{
<$t>::connect();
$crate::register!($t, $config, (), ())
}
};
($t:ty, $config:expr, $($s:ident),+) => {
{
$(
let $s = (&$s).into();
)+
<$t>::connect($(&$s),+);
$crate::register!($t, $config, ($($s),+), ())
}
};
}
#[macro_export]
macro_rules! connect_1_write {
($t:ty, $config:expr) => {
{
let ws = <$t>::connect();
$crate::register!($t, $config, (), (ws))
}
};
($t:ty, $config:expr, $($s:ident),+) => {
{
$(
let $s = (&$s).into();
)+
let ws = <$t>::connect($(&$s),+);
$crate::register!($t, $config, ($($s),+), (ws))
}
};
}
#[macro_export]
macro_rules! connect_2_write {
($t:ty, $config:expr) => {
{
let ws1, ws2 = <$t>::connect();
$crate::register!($t, $config, (), (ws1, ws2))
}
};
($t:ty, $config:expr, $($s:ident),+) => {
{
$(
let $s = (&$s).into();
)+
let ws1, ws2 = <$t>::connect();
$crate::register!($t, $config, ($($s),+), (ws1, ws2))
}
};
}
#[macro_export]
macro_rules! connect_3_write {
($t:ty, $config:expr) => {
{
let ws1, ws2, ws3 = <$t>::connect();
$crate::register!($t, (), (ws1, ws2, ws3))
}
};
($t:ty, $config:expr, $($s:ident),*) => {
{
$(
let $s = (&$s).into();
)+
let ws1, ws2, ws3 = <$t>::connect($(&$s),*);
$crate::register!($t, $config, ($($s),*), (ws1, ws2, ws3))
}
};
}
#[macro_export]
macro_rules! make_callback_builder {
(($rs_head:expr), (), $state:expr) => {
{
use std::{cell::RefCell, rc::Rc};
Rc::new(RefCell::new($rs_head.add_state($state)))
}
};
(($rs_head:expr), ($($ws:expr),*)) => {
{
use std::{cell::RefCell, rc::Rc};
use $crate::dataflow::callback_builder::MultiStreamEventMaker;
let cb_builder = Rc::new(RefCell::new($rs_head));
$(
let cb_builder = cb_builder.borrow_mut().add_write_stream(&$ws);
)*
cb_builder
}
};
(($($rs:expr),+), ($($ws:expr),*), $state:expr) => {
{
use $crate::dataflow::callback_builder::MultiStreamEventMaker;
make_callback_builder!(($($rs),+), ($($ws),*)).borrow_mut().add_state($state)
}
};
(($rs_head:expr, $($rs:expr),*), ($($ws:expr),*)) => {
{
use std::{cell::RefCell, rc::Rc};
let cb_builder = Rc::new(RefCell::new($rs_head));
$(
let cb_builder = cb_builder.borrow_mut().add_read_stream(&$rs);
)*
$(
let cb_builder = cb_builder.borrow_mut().add_write_stream(&$ws);
)*
cb_builder
}
};
}
#[macro_export]
macro_rules! add_watermark_callback {
(($($rs:expr),+), ($($ws:expr),*), ($($cb:expr),+), $state:expr) => (
let cb_builder = $crate::make_callback_builder!(($($rs),+), ($($ws),*), $state);
$(
cb_builder.borrow_mut().add_watermark_callback($cb);
)+
);
(($($rs:expr),+), ($($ws:expr),*), ($($cb:expr),+)) => (
let cb_builder = $crate::make_callback_builder!(($($rs),+), ($($ws),*));
$(
cb_builder.borrow_mut().add_watermark_callback($cb);
)+
);
}
pub type OperatorId = Uuid;
thread_local!(static RNG: RefCell<StdRng>= RefCell::new(StdRng::from_seed(&[1913, 03, 26])));
pub fn generate_id() -> Uuid {
RNG.with(|rng| {
let mut bytes = [0u8; 16];
rng.borrow_mut().fill_bytes(&mut bytes);
Uuid(bytes)
})
}
#[derive(Abomonation, Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
pub struct Uuid(uuid::Bytes);
impl Uuid {
pub fn new_v4() -> Self {
Self(*uuid::Uuid::new_v4().as_bytes())
}
pub fn new_deterministic() -> Self {
generate_id()
}
pub fn nil() -> Uuid {
Uuid([0; 16])
}
}
impl fmt::Debug for Uuid {
fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
let &Uuid(bytes) = self;
let id = uuid::Uuid::from_bytes(bytes.clone());
fmt::Display::fmt(&id, f)
}
}
impl fmt::Display for Uuid {
fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result {
let &Uuid(bytes) = self;
let id = uuid::Uuid::from_bytes(bytes.clone());
fmt::Display::fmt(&id, f)
}
}
pub fn get_terminal_logger() -> slog::Logger {
use slog::Drain;
use slog::Logger;
use slog_term::term_full;
use std::sync::Mutex;
Logger::root(Mutex::new(term_full()).fuse(), o!())
}
pub fn new_app(name: &str) -> clap::App {
App::new(name)
.arg(
Arg::with_name("threads")
.short("t")
.long("threads")
.default_value("4")
.help("Number of worker threads per process"),
)
.arg(
Arg::with_name("addresses")
.short("a")
.long("addresses")
.default_value("127.0.0.1:9000")
.help("Comma separated list of socket addresses of all nodes"),
)
.arg(
Arg::with_name("index")
.short("i")
.long("index")
.default_value("0")
.help("Current node index"),
)
}