use std::collections::HashSet;
use std::collections::VecDeque;
use std::fmt;
use std::sync::Arc;
use multimap::MultiMap;
use serde_json::Value;
use flow_impl::Implementation;
use crate::debugger::Debugger;
use crate::function::Function;
use crate::metrics::Metrics;
#[derive(Debug, PartialEq)]
pub enum State {
Ready,
Blocked,
Waiting,
Running, }
pub struct Job {
pub job_id: usize,
pub function_id: usize,
pub implementation: Arc<dyn Implementation>,
pub input_set: Vec<Vec<Value>>,
pub destinations: Vec<(String, usize, usize)>,
pub impure: bool,
}
#[derive(Debug)]
pub struct Output {
pub job_id: usize,
pub function_id: usize,
pub input_values: Vec<Vec<Value>>,
pub result: (Option<Value>, bool),
pub destinations: Vec<(String, usize, usize)>,
pub error: Option<String>,
}
pub struct RunState {
functions: Vec<Function>,
blocked: HashSet<usize>,
blocks: VecDeque<(usize, usize, usize)>,
ready: VecDeque<usize>,
running: MultiMap<usize, usize>,
jobs_sent: usize,
max_jobs: usize,
}
impl RunState {
pub fn new(functions: Vec<Function>, max_jobs: usize) -> Self {
RunState {
functions,
blocked: HashSet::<usize>::new(),
blocks: VecDeque::<(usize, usize, usize)>::new(),
ready: VecDeque::<usize>::new(),
running: MultiMap::<usize, usize>::new(),
jobs_sent: 0,
max_jobs,
}
}
#[cfg(feature = "debugger")]
fn reset(&mut self) {
for function in &mut self.functions {
function.reset()
};
self.blocked.clear();
self.blocks.clear();
self.ready.clear();
self.running.clear();
self.jobs_sent = 0;
}
pub fn init(&mut self) {
self.reset();
let mut inputs_ready_list = Vec::<usize>::new();
for function in &mut self.functions {
debug!("\tInitializing Function #{} '{}'", function.id(), function.name());
function.init_inputs(true);
if function.inputs_full() {
inputs_ready_list.push(function.id());
}
}
self.create_init_blocks();
for id in inputs_ready_list {
self.inputs_now_full(id);
}
}
fn create_init_blocks(&mut self) {
let mut blocks = VecDeque::<(usize, usize, usize)>::new();
let mut blocked = HashSet::<usize>::new();
debug!("Creating any initial block entries that are needed");
for source_function in &self.functions {
let source_id;
let destinations;
let source_has_inputs_full;
{
source_id = source_function.id();
source_has_inputs_full = source_function.inputs_full();
destinations = source_function.output_destinations().clone();
}
for (_, destination_id, io_number) in destinations {
if destination_id != source_id { let destination_function = self.get(destination_id);
if destination_function.input_full(io_number) {
debug!("\tAdded block between #{} <-- #{}", destination_id, source_id);
blocks.push_back((destination_id, io_number, source_id));
if source_has_inputs_full {
blocked.insert(source_id);
}
}
}
}
}
self.blocks = blocks;
self.blocked = blocked;
}
fn get_state(&self, function_id: usize) -> State {
if self.ready.contains(&function_id) {
State::Ready
} else {
if self.blocked.contains(&function_id) {
State::Blocked
} else if self.running.contains_key(&function_id) {
State::Running
} else {
State::Waiting
}
}
}
#[cfg(feature = "debugger")]
pub fn get_blocked(&self) -> &HashSet<usize> {
&self.blocked
}
#[cfg(feature = "debugger")]
pub fn display_state(&self, function_id: usize) -> String {
let function_state = self.get_state(function_id);
let mut output = format!("\tState: {:?}\n", function_state);
if function_state == State::Running {
output.push_str(&format!("\t\tJob Numbers Running: {:?}\n",
self.running.get_vec(&function_id).unwrap()));
}
for (blocking, blocking_io_number, blocked) in &self.blocks {
if *blocked == function_id {
output.push_str(&format!("\t\tBlocked #{} --> Blocked by #{}:{}\n",
blocked, blocking, blocking_io_number));
} else if *blocking == function_id {
output.push_str(&format!("\t\tBlocking #{}:{} <-- Blocked #{}\n",
blocking, blocking_io_number, blocked));
}
}
output
}
pub fn get(&self, id: usize) -> &Function {
&self.functions[id]
}
pub fn get_mut(&mut self, id: usize) -> &mut Function {
&mut self.functions[id]
}
pub fn next_job(&mut self) -> Option<Job> {
if self.ready.is_empty() || self.number_jobs_running() >= self.max_jobs {
return None;
}
let function_id = *self.ready.get(0).unwrap();
let (job, can_create_more_jobs) = self.create_job(function_id);
if !can_create_more_jobs {
self.ready.remove(0);
}
Some(job)
}
pub fn job_sent(&mut self) {
self.jobs_sent += 1;
}
fn create_job(&mut self, function_id: usize) -> (Job, bool) {
let job_id = self.jobs_sent;
debug!("Creating Job #{} for Function #{}", self.jobs_sent, function_id);
let function = self.get_mut(function_id);
let input_set = function.take_input_set();
let refilled = function.init_inputs(false);
let all_refilled = refilled.len() == function.inputs().len();
debug!("Job #{}, Function #{} '{}', Input set: {:?}", job_id, function_id, function.name(), input_set);
let implementation = function.get_implementation();
let destinations = function.output_destinations().clone();
let can_create_more_jobs = !function.inputs().is_empty() && function.inputs_full()
&& !all_refilled;
(Job { job_id, function_id, implementation, input_set, destinations, impure: function.is_impure() },
can_create_more_jobs)
}
pub fn process_output(&mut self, metrics: &mut Metrics, output: Output, debugger: &mut Option<Debugger>) {
match output.error {
None => {
let output_value = output.result.0;
let source_can_run_again = output.result.1;
if let Some(output_v) = output_value {
debug!("\tProcessing output value '{}' from Job #{}", output_v, output.job_id);
for (ref output_route, destination_id, io_number) in &output.destinations {
let output_value = output_v.pointer(&output_route).unwrap();
self.send_value(output.function_id, output_route, *destination_id,
*io_number, output_value, metrics, debugger);
}
}
if source_can_run_again {
let (refilled, full) = self.refill_inputs(output.function_id);
if full {
self.inputs_now_full(output.function_id);
} else {
self.unblock_senders_to(output.function_id, refilled);
}
}
}
Some(_) => {
match debugger {
None => error!("Error in Job execution:\n{:#?}", output),
Some(debugger) => debugger.panic(&self, output)
}
}
}
}
fn send_value(&mut self, source_id: usize, output_route: &str, destination_id: usize, io_number: usize,
output_value: &Value, metrics: &mut Metrics, debugger: &mut Option<Debugger>) {
let block;
let full;
debug!("\t\tJob #{} sending value '{}' via output route '{}' to Function #{}:{}",
source_id, output_value, output_route, destination_id, io_number);
if let Some(ref mut debugger) = debugger {
debugger.check_prior_to_send(self, source_id, output_route,
&output_value, destination_id, io_number);
}
{
let destination = self.get_mut(destination_id);
destination.write_input(io_number, output_value);
#[cfg(feature = "metrics")]
metrics.increment_outputs_sent();
block = destination.input_full(io_number);
full = destination.inputs_full() && (source_id != destination_id);
}
if block {
self.create_block(destination_id, io_number, source_id, debugger);
}
if full {
self.inputs_now_full(destination_id);
}
}
fn refill_inputs(&mut self, id: usize) -> (Vec<usize>, bool) {
let function = self.get_mut(id);
let refilled = function.init_inputs(false);
(refilled, function.inputs_full())
}
pub fn job_done(&mut self, output: &Output) {
self.running.retain(|&k, &v| k != output.function_id || v != output.job_id);
}
pub fn start(&mut self, job: &Job) {
self.running.insert(job.function_id, job.job_id);
}
fn is_blocked(&self, id: usize) -> bool {
for &(_blocking_id, _io_number, blocked_id) in &self.blocks {
if blocked_id == id {
return true;
}
}
false
}
#[cfg(feature = "debugger")]
pub fn get_output_blockers(&self, id: usize) -> Vec<(usize, usize)> {
let mut blockers = vec!();
for &(blocking_id, blocking_io_number, blocked_id) in &self.blocks {
if blocked_id == id {
blockers.push((blocking_id, blocking_io_number));
}
}
blockers
}
pub fn number_jobs_running(&self) -> usize {
let mut num_running_jobs = 0;
for (_, vector) in self.running.iter_all() {
num_running_jobs += vector.len()
};
num_running_jobs
}
pub fn number_jobs_ready(&self) -> usize {
self.ready.len()
}
#[cfg(feature = "debugger")]
pub fn get_input_blockers(&self, target_id: usize) -> Vec<(usize, usize)> {
let mut input_blockers = vec!();
let target_function = self.get(target_id);
for (target_io, input) in target_function.inputs().iter().enumerate() {
if input.is_empty() {
let mut senders = Vec::<(usize, usize)>::new();
for sender_function in &self.functions {
if !self.ready.contains(&sender_function.id()) {
for (ref _output_route, destination_id, io_number) in sender_function.output_destinations() {
if (*destination_id == target_id) && (*io_number == target_io) {
senders.push((sender_function.id(), target_io));
}
}
}
}
if senders.len() == 1 {
input_blockers.extend(senders);
}
}
}
input_blockers
}
fn inputs_now_full(&mut self, id: usize) {
if self.is_blocked(id) {
debug!("\t\t\tFunction #{} inputs are ready, but blocked on output", id);
self.blocked.insert(id);
} else {
debug!("\t\t\tFunction #{} not blocked on output, so added to 'Ready' list", id);
self.mark_ready(id);
}
}
fn mark_ready(&mut self, id: usize) {
if !self.ready.contains(&id) {
self.ready.push_back(id);
}
}
pub fn jobs(&self) -> usize {
self.jobs_sent
}
pub fn num_functions(&self) -> usize {
self.functions.len()
}
fn unblock_senders_to(&mut self, blocker_id: usize, refilled_inputs: Vec<usize>) {
if !self.blocks.is_empty() {
let mut unblocked_list = vec!();
for &(blocking_id, blocking_io_number, blocked_id) in &self.blocks {
if (blocking_id == blocker_id) && !refilled_inputs.contains(&blocking_io_number) {
debug!("\t\tBlock removed #{}:{} <-- #{}", blocking_id, blocking_io_number, blocked_id);
unblocked_list.push(blocked_id);
}
}
self.blocks.retain(|&(blocking_id, blocking_io_number, _blocked_id)|
!((blocking_id == blocker_id) && !refilled_inputs.contains(&blocking_io_number))
);
for unblocked in unblocked_list {
if self.blocked.contains(&unblocked) && !self.is_blocked(unblocked) {
debug!("\t\t\tFunction #{} removed from 'blocked' list", unblocked);
self.blocked.remove(&unblocked);
if self.get(unblocked).inputs_full() {
debug!("\t\t\tFunction #{} has inputs ready, so added to 'ready' list", unblocked);
self.mark_ready(unblocked);
}
}
}
}
}
fn create_block(&mut self, blocking_id: usize, blocking_io_number: usize,
blocked_id: usize, debugger: &mut Option<Debugger>) {
if blocked_id != blocking_id {
debug!("\t\t\tProcess #{}:{} <-- Process #{} blocked", &blocking_id,
&blocking_io_number, &blocked_id);
if !self.blocks.contains(&(blocking_id, blocking_io_number, blocked_id)) {
self.blocks.push_back((blocking_id, blocking_io_number, blocked_id));
if let Some(ref mut debugger) = debugger {
debugger.check_on_block_creation(self, blocking_id, blocking_io_number, blocked_id);
}
}
}
}
}
#[cfg(any(feature = "logging", feature = "debugger"))]
impl fmt::Display for RunState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "RunState:\n")?;
write!(f, " Processes: {}\n", self.functions.len())?;
write!(f, " Jobs: {}\n", self.jobs_sent)?;
write!(f, " Blocked: {:?}\n", self.blocked)?;
write!(f, " Blocks: {:?}\n", self.blocks)?;
write!(f, " Ready: {:?}\n", self.ready)?;
write!(f, " Running: {:?}\n", self.running)
}
}
#[cfg(test)]
mod tests {
use crate::debug_client::{DebugClient, Event, Response};
use crate::debug_client::{Command, Param};
use crate::run_state;
struct TestDebugClient {}
impl DebugClient for TestDebugClient {
fn init(&self) {}
fn get_command(&self, _job_number: Option<usize>) -> Command {
Command::Step(Some(run_state::tests::Param::Numeric(1)))
}
fn send_event(&self, _event: Event) {}
fn send_response(&self, _response: Response) {}
}
fn test_debug_client() -> &'static dyn DebugClient {
&TestDebugClient {}
}
mod state_transitions {
use crate::debugger::Debugger;
use crate::function::Function;
use crate::input::{ConstantInputInitializer, OneTimeInputInitializer};
use crate::input::Input;
use crate::input::InputInitializer::{Constant, OneTime};
use crate::metrics::Metrics;
use super::super::Output;
use super::super::RunState;
use super::super::State;
use super::test_debug_client;
#[test]
fn to_ready_1_on_init() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(),
0,
&vec!(("".to_string(), 1, 0))); let f_b = Function::new("fB".to_string(), "/context/fB".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)),
1,
&vec!());
let functions = vec!(f_a, f_b);
let mut state = RunState::new(functions, 1);
state.init();
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready");
}
#[test]
fn to_ready_2_on_init() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(OneTime(OneTimeInputInitializer { once: json!(1) })),
false)),
0,
&vec!(("".to_string(), 1, 0))); let f_b = Function::new("fB".to_string(), "/context/fB".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)),
1,
&vec!());
let functions = vec!(f_a, f_b);
let mut state = RunState::new(functions, 1);
state.init();
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready");
}
#[test]
fn to_ready_3_on_init() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(OneTime(OneTimeInputInitializer { once: json!(1) })),
false)),
0,
&vec!());
let functions = vec!(f_a);
let mut state = RunState::new(functions, 1);
state.init();
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready");
}
#[test]
fn to_blocked_on_init() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(OneTime(OneTimeInputInitializer { once: json!(1) })),
false)),
0,
&vec!(("".to_string(), 1, 0))); let f_b = Function::new("fB".to_string(), "/context/fB".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(OneTime(OneTimeInputInitializer { once: json!(1) })),
false)),
1,
&vec!());
let functions = vec!(f_b, f_a);
let mut state = RunState::new(functions, 1);
state.init();
assert_eq!(State::Ready, state.get_state(1), "f_b should be Ready");
assert_eq!(State::Blocked, state.get_state(0), "f_a should be in Blocked state, by fB");
}
#[test]
fn to_waiting_on_init() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)),
0,
&vec!());
let functions = vec!(f_a);
let mut state = RunState::new(functions, 1);
state.init();
assert_eq!(State::Waiting, state.get_state(0), "f_a should be Waiting");
}
#[test]
fn ready_to_running_on_next() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(OneTime(OneTimeInputInitializer { once: json!(1) })),
false)),
0,
&vec!());
let functions = vec!(f_a);
let mut state = RunState::new(functions, 1);
state.init();
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready");
let job = state.next_job().unwrap();
assert_eq!(0, job.function_id, "next_job() should return function_id = 0");
state.start(&job);
assert_eq!(State::Running, state.get_state(0), "f_a should be Running");
}
#[test]
fn unready_not_to_running_on_next() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)),
0,
&vec!());
let functions = vec!(f_a);
let mut state = RunState::new(functions, 1);
state.init();
assert_eq!(State::Waiting, state.get_state(0), "f_a should be Waiting");
assert!(state.next_job().is_none(), "next_job() should return None");
assert_eq!(State::Waiting, state.get_state(0), "f_a should be Waiting");
}
#[test]
fn blocked_to_ready_on_done() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(OneTime(OneTimeInputInitializer { once: json!(1) })),
false)),
0,
&vec!(("".to_string(), 1, 0))); let f_b = Function::new("fB".to_string(), "/context/fB".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(OneTime(OneTimeInputInitializer { once: json!(1) })),
false)),
1,
&vec!());
let functions = vec!(f_b, f_a); let mut state = RunState::new(functions, 1);
let mut metrics = Metrics::new(2);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
assert_eq!(State::Ready, state.get_state(1), "f_b should be Ready");
assert_eq!(State::Blocked, state.get_state(0), "f_a should be in Blocked state, by f_b");
assert_eq!(1, state.next_job().unwrap().function_id, "next() should return function_id=1 (f_b) for running");
let output = Output {
job_id: 1,
function_id: 1,
input_values: vec!(vec!(json!(1))),
result: (Some(json!(1)), true),
destinations: vec!(),
error: None,
};
state.process_output(&mut metrics, output, &mut debugger);
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready");
}
#[test]
fn running_to_ready_on_done() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(Constant(ConstantInputInitializer { constant: json!(1) })),
false)),
0,
&vec!());
let functions = vec!(f_a);
let mut state = RunState::new(functions, 1);
let mut metrics = Metrics::new(1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready");
let job = state.next_job().unwrap();
assert_eq!(0, job.function_id, "next() should return function_id = 0");
state.start(&job);
assert_eq!(State::Running, state.get_state(0), "f_a should be Running");
let output = Output {
job_id: 1,
function_id: 0,
input_values: vec!(vec!(json!(1))),
result: (None, true),
destinations: vec!(),
error: None,
};
state.process_output(&mut metrics, output, &mut debugger);
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready again");
}
#[test]
fn running_to_waiting_on_done() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(OneTime(OneTimeInputInitializer { once: json!(1) })),
false)),
0,
&vec!());
let functions = vec!(f_a);
let mut state = RunState::new(functions, 1);
let mut metrics = Metrics::new(1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready");
let job = state.next_job().unwrap();
assert_eq!(0, job.function_id, "next() should return function_id = 0");
state.start(&job);
assert_eq!(State::Running, state.get_state(0), "f_a should be Running");
let output = Output {
job_id: 0,
function_id: 0,
input_values: vec!(vec!(json!(1))),
result: (None, true),
destinations: vec!(),
error: None,
};
state.job_done(&output);
state.process_output(&mut metrics, output, &mut debugger);
assert_eq!(State::Waiting, state.get_state(0), "f_a should be Waiting again");
}
#[test]
fn running_to_blocked_on_done() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(Constant(ConstantInputInitializer { constant: json!(1) })),
false)),
0,
&vec!(("".to_string(), 1, 0))); let f_b = Function::new("fB".to_string(), "/context/fB".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)),
1,
&vec!());
let functions = vec!(f_a, f_b);
let mut state = RunState::new(functions, 1);
let mut metrics = Metrics::new(1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready");
let job = state.next_job().unwrap();
assert_eq!(0, job.function_id, "next() should return function_id=0 (f_a) for running");
state.start(&job);
assert_eq!(State::Running, state.get_state(0), "f_a should be Running");
let output = Output {
job_id: 1,
function_id: 0,
input_values: vec!(vec!(json!(1))),
result: (Some(json!(1)), true),
destinations: vec!(("".to_string(), 1, 0)),
error: None,
};
state.process_output(&mut metrics, output, &mut debugger);
assert_eq!(State::Blocked, state.get_state(0), "f_a should be Blocked");
}
#[test]
fn waiting_to_ready_on_input() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)),
0,
&vec!());
let f_b = Function::new("fB".to_string(), "/context/fB".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)),
1,
&vec!(("".into(), 0, 0)));
let functions = vec!(f_a, f_b);
let mut state = RunState::new(functions, 1);
let mut metrics = Metrics::new(1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
assert_eq!(State::Waiting, state.get_state(0), "f_a should be Waiting");
let output = Output {
job_id: 1,
function_id: 1,
input_values: vec!(vec!(json!(1))),
result: (Some(json!(1)), true),
destinations: vec!(("".to_string(), 0, 0)),
error: None,
};
state.process_output(&mut metrics, output, &mut debugger);
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready");
}
#[test]
fn waiting_to_blocked_on_input() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)),
0,
&vec!(("".to_string(), 1, 0))); let f_b = Function::new("fB".to_string(), "/context/fB".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(Constant(ConstantInputInitializer { constant: json!(1) })),
false)),
1,
&vec!(("".into(), 0, 0)));
let functions = vec!(f_a, f_b);
let mut state = RunState::new(functions, 1);
let mut metrics = Metrics::new(1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
assert_eq!(State::Ready, state.get_state(1), "f_b should be Ready");
assert_eq!(State::Waiting, state.get_state(0), "f_a should be in Waiting");
let output = Output {
job_id: 1,
function_id: 1,
input_values: vec!(vec!(json!(1))),
result: (Some(json!(1)), true),
destinations: vec!(("".to_string(), 0, 0)),
error: None,
};
state.process_output(&mut metrics, output, &mut debugger);
assert_eq!(State::Blocked, state.get_state(0), "f_a should be Blocked");
}
#[test]
fn not_block_on_self() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(OneTime(OneTimeInputInitializer { once: json!(1) })),
false)),
0,
&vec!(
("".to_string(), 0, 0), ("".to_string(), 1, 0) ));
let f_b = Function::new("fB".to_string(), "/context/fB".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)),
1,
&vec!());
let functions = vec!(f_a, f_b); let mut state = RunState::new(functions, 1);
let mut metrics = Metrics::new(2);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready");
assert_eq!(State::Waiting, state.get_state(1), "f_b should be in Waiting");
assert_eq!(0, state.next_job().unwrap().function_id, "next() should return function_id=0 (f_a) for running");
let output = Output {
job_id: 0,
function_id: 0,
input_values: vec!(vec!(json!(1))),
result: (Some(json!(1)), true),
destinations: vec!(("".into(), 0, 0), ("".into(), 1, 0)),
error: None,
};
state.process_output(&mut metrics, output, &mut debugger);
assert_eq!(State::Ready, state.get_state(1), "f_b should be Ready");
assert_eq!(State::Blocked, state.get_state(0), "f_a should be Blocked on f_b");
assert_eq!(1, state.next_job().unwrap().function_id, "next() should return function_id=1 (f_b) for running");
let output = Output {
job_id: 1,
function_id: 1,
input_values: vec!(vec!(json!(1))),
result: (None, true),
destinations: vec!(),
error: None,
};
state.process_output(&mut metrics, output, &mut debugger);
assert_eq!(State::Ready, state.get_state(0), "f_a should be Ready");
}
}
mod functional_tests {
use crate::debugger::Debugger;
use crate::function::Function;
use crate::input::ConstantInputInitializer;
use crate::input::Input;
use crate::input::InputInitializer::Constant;
use crate::input::InputInitializer::OneTime;
use crate::input::OneTimeInputInitializer;
use crate::metrics::Metrics;
use super::super::Output;
use super::super::RunState;
use super::super::State;
use super::test_debug_client;
fn test_functions<'a>() -> Vec<Function> {
let p0 = Function::new("p0".to_string(), "/context/p0".to_string(),
"/test".to_string(),
false,
vec!(), 0, &vec!(("".to_string(), 1, 0), ("".to_string(), 2, 0)), ); let p1 = Function::new("p1".to_string(),
"/context/p1".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)), 1, &vec!());
let p2 = Function::new("p2".to_string(),
"/context/p2".to_string(),
"/test".to_string(),
false, vec!(Input::new(1, &None, false)), 2, &vec!());
vec!(p0, p1, p2)
}
#[test]
fn blocked_works() {
let mut state = RunState::new(test_functions(), 1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.create_block(1, 0, 0, &mut debugger);
assert!(state.is_blocked(0));
}
#[test]
fn get_works() {
let state = RunState::new(test_functions(), 1);
let got = state.get(1);
assert_eq!(got.id(), 1)
}
#[test]
fn no_next_if_none_ready() {
let mut state = RunState::new(test_functions(), 1);
assert!(state.next_job().is_none());
}
#[test]
fn next_works() {
let mut state = RunState::new(test_functions(), 1);
state.inputs_now_full(0);
assert_eq!(state.next_job().unwrap().function_id, 0);
}
#[test]
fn inputs_ready_makes_ready() {
let mut state = RunState::new(test_functions(), 1);
state.inputs_now_full(0);
assert_eq!(state.next_job().unwrap().function_id, 0);
}
#[test]
fn blocked_is_not_ready() {
let mut state = RunState::new(test_functions(), 1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.create_block(1, 0, 0, &mut debugger);
state.inputs_now_full(0);
assert!(state.next_job().is_none());
}
#[test]
fn unblocking_makes_ready() {
let mut state = RunState::new(test_functions(), 1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.create_block(1, 0, 0, &mut debugger);
state.inputs_now_full(0);
assert!(state.next_job().is_none());
state.unblock_senders_to(1, vec!());
assert_eq!(state.next_job().unwrap().function_id, 0);
}
#[test]
fn unblocking_doubly_blocked_functions_not_ready() {
let mut state = RunState::new(test_functions(), 1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.create_block(1, 0, 0, &mut debugger);
state.create_block(2, 0, 0, &mut debugger);
state.inputs_now_full(0);
assert!(state.next_job().is_none());
state.unblock_senders_to(1, vec!());
assert!(state.next_job().is_none());
}
#[test]
fn wont_return_too_many_jobs() {
let mut state = RunState::new(test_functions(), 1);
state.inputs_now_full(0);
state.inputs_now_full(1);
let job = state.next_job().unwrap();
assert_eq!(0, job.function_id);
state.start(&job);
assert!(state.next_job().is_none());
}
#[test]
fn pure_function_no_destinations() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(OneTime(OneTimeInputInitializer { once: json!(1) })),
false)),
0,
&vec!());
let functions = vec!(f_a);
let mut state = RunState::new(functions, 1);
let mut metrics = Metrics::new(1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
assert_eq!(state.next_job().unwrap().function_id, 0);
let output = Output {
job_id: 0,
function_id: 0,
input_values: vec!(vec!(json!(1))),
result: (Some(json!(1)), true),
destinations: vec!(),
error: None,
};
state.process_output(&mut metrics, output, &mut debugger);
assert_eq!(State::Waiting, state.get_state(0), "f_a should be Waiting");
}
#[test]
fn constant_initializer_not_unblock() {
let f_a = Function::new("fA".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(OneTime(OneTimeInputInitializer { once: json!(1) })),
false)),
0,
&vec!(("".to_string(), 1, 0)));
let f_b = Function::new("fB".to_string(), "/context/fB".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1,
&Some(Constant(ConstantInputInitializer { constant: json!(1) })),
false)),
1,
&vec!());
let functions = vec!(f_a, f_b);
let mut state = RunState::new(functions, 1);
let mut metrics = Metrics::new(2);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
assert_eq!(State::Ready, state.get_state(1), "f_b should be Ready initially");
assert_eq!(State::Blocked, state.get_state(0), "f_a should be in Blocked initially");
assert_eq!(1, state.next_job().unwrap().function_id, "next() should return a job for function_id=1 (f_b) for running");
let output = Output {
job_id: 1,
function_id: 1,
input_values: vec!(vec!(json!(1))),
result: (Some(json!(1)), true),
destinations: vec!(),
error: None,
};
state.process_output(&mut metrics, output, &mut debugger);
assert_eq!(State::Ready, state.get_state(1), "f_b should be Ready after inputs refreshed");
assert_eq!(State::Blocked, state.get_state(0), "f_a should still be Blocked on f_b");
}
#[test]
fn can_create_multiple_jobs() {
let f_a = Function::new("f_a".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)),
0,
&vec!());
let functions = vec!(f_a);
let mut state = RunState::new(functions, 4);
let mut metrics = Metrics::new(1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
state.send_value(1, "/", 0, 0, &json!(1), &mut metrics, &mut debugger);
state.send_value(1, "/", 0, 0, &json!(1), &mut metrics, &mut debugger);
assert_eq!(0, state.next_job().unwrap().function_id, "next() should return a job for function_id=0 (f_a) for running");
assert_eq!(0, state.next_job().unwrap().function_id, "next() should return a second job for function_id=0 (f_a) for running");
}
#[test]
fn can_create_multiple_jobs_from_array() {
let f_a = Function::new("f_a".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false)),
0,
&vec!());
let functions = vec!(f_a);
let mut state = RunState::new(functions, 4);
let mut metrics = Metrics::new(1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
state.send_value(1, "/", 0, 0, &json!([1, 2, 3, 4]), &mut metrics, &mut debugger);
assert_eq!(0, state.next_job().unwrap().function_id, "next() should return a job for function_id=0 (f_a) for running");
assert_eq!(0, state.next_job().unwrap().function_id, "next() should return a second job for function_id=0 (f_a) for running");
assert_eq!(0, state.next_job().unwrap().function_id, "next() should return a third job for function_id=0 (f_a) for running");
assert_eq!(0, state.next_job().unwrap().function_id, "next() should return a fourth job for function_id=0 (f_a) for running");
}
#[test]
fn can_create_multiple_jobs_with_initializer() {
let f_a = Function::new("f_a".to_string(), "/context/fA".to_string(),
"/test".to_string(),
false,
vec!(Input::new(1, &None, false),
Input::new(1,
&Some(Constant(ConstantInputInitializer { constant: json!(1) })),
false)),
0,
&vec!());
let functions = vec!(f_a);
let mut state = RunState::new(functions, 4);
let mut metrics = Metrics::new(1);
let mut debugger = Some(Debugger::new(test_debug_client()));
state.init();
state.send_value(1, "/", 0, 0, &json!([1, 2, 3, 4]), &mut metrics, &mut debugger);
assert_eq!(0, state.next_job().unwrap().function_id, "next() should return a job for function_id=0 (f_a) for running");
assert_eq!(0, state.next_job().unwrap().function_id, "next() should return a second job for function_id=0 (f_a) for running");
assert_eq!(0, state.next_job().unwrap().function_id, "next() should return a third job for function_id=0 (f_a) for running");
assert_eq!(0, state.next_job().unwrap().function_id, "next() should return a fourth job for function_id=0 (f_a) for running");
}
}
}