use error_chain::bail;
use log::error;
use flowcore::errors::Result;
use flowcore::model::runtime_function::RuntimeFunction;
use crate::block::Block;
use crate::run_state::{RunState, State};
fn runtime_error(state: &RunState, job_id: usize, message: &str, file: &str, line: u32) -> Result<()> {
let msg = format!("Job #{job_id}: Runtime error: at file: {file}, line: {line}\n
{message}\nJob #{job_id}: Error State -\n{state}");
error!("{}", msg);
bail!(msg);
}
fn ready_check(state: &RunState, job_id: usize, function: &RuntimeFunction) -> Result<()> {
if !state.get_busy_flows().contains_key(&function.get_flow_id()) {
return runtime_error(
state,
job_id,
&format!(
"Function #{} is Ready, but Flow #{} is not busy",
function.id(),
function.get_flow_id()
),
file!(),
line!(),
);
}
if !(state.get_function_states(function.id()).contains(&State::Ready) ||
state.get_function_states(function.id()).contains(&State::Running)) {
return runtime_error(
state,
job_id,
&format!(
"Flow is busy but Function #{} is not Ready or Running",
function.id(),
),
file!(),
line!(),
);
}
Ok(())
}
fn running_check(state: &RunState, job_id: usize, function: &RuntimeFunction) -> Result<()> {
if state.get_running().contains_key(&job_id) && !state.get_busy_flows().contains_key(&function.get_flow_id()) {
return runtime_error(
state,
job_id,
&format!(
"Function #{} is Running, but Flow #{} is not busy",
function.id(),
function.get_flow_id()
),
file!(),
line!(),
);
}
Ok(())
}
fn blocked_check(state: &RunState, job_id: usize, function: &RuntimeFunction) -> Result<()> {
if state.get_output_blockers(function.id()).is_empty() {
return runtime_error(
state,
job_id,
&format!(
"Function #{} is in Blocked state, but no block exists",
function.id()
),
file!(),
line!(),
);
}
if !function.can_run() {
return runtime_error(
state,
job_id,
&format!(
"Function #{} is Blocked, but can_run() returns false",
function.id(),
),
file!(),
line!(),
);
}
Ok(())
}
fn waiting_check(_state: &RunState, _job_id: usize, _function: &RuntimeFunction) -> Result<()> {
Ok(())
}
fn completed_check(state: &RunState, job_id: usize, function: &RuntimeFunction,
function_states: &Vec<State>) -> Result<()> {
if !(function_states.contains(&State::Completed) && function_states.len() == 1)
{
return runtime_error(
state,
job_id,
&format!(
"Function #{} has Completed, but also appears as Ready or Blocked or Running",
function.id(),
),
file!(),
line!(),
);
}
Ok(())
}
fn self_block_check(state: &RunState, job_id: usize, block: &Block) -> Result<()> {
if block.blocked_function_id == block.blocking_function_id {
return runtime_error(
state,
job_id,
&format!(
"Block {block} has same Function id as blocked and blocking"),
file!(),
line!(),
);
}
Ok(())
}
fn pending_unblock_checks(state: &RunState, job_id: usize) -> Result<()> {
for pending_unblock_flow_id in state.get_flow_blocks().keys() {
if !state.get_busy_flows().contains_key(pending_unblock_flow_id) {
return runtime_error(
state,
job_id,
&format!(
"Pending Unblock exists for Flow #{pending_unblock_flow_id}, but it is not busy"),
file!(),
line!(),
);
}
}
Ok(())
}
fn block_checks(state: &RunState, job_id: usize) -> Result<()> {
for block in state.get_blocks() {
self_block_check(state, job_id, block)?;
}
Ok(())
}
fn function_state_checks(state: &RunState, job_id: usize) -> Result<()> {
for function in state.get_functions() {
running_check(state, job_id, function)?;
let function_states = &state.get_function_states(function.id());
for function_state in function_states {
match function_state {
State::Ready => ready_check(state, job_id, function)?,
State::Blocked => blocked_check(state, job_id, function)?,
State::Waiting => waiting_check(state, job_id, function)?,
State::Completed => completed_check(state, job_id, function, function_states)?,
_ => {}
}
}
}
Ok(())
}
pub(crate) fn check_invariants(state: &RunState, job_id: usize) -> Result<()> {
function_state_checks(state, job_id)?;
block_checks(state, job_id)?;
pending_unblock_checks(state, job_id)
}
#[cfg(test)]
mod test {
#[cfg(feature = "debugger")]
use serde_json::Value;
#[cfg(feature = "debugger")]
use flowcore::errors::Result;
use flowcore::model::flow_manifest::FlowManifest;
#[cfg(feature = "debugger")]
use flowcore::model::input::Input;
use flowcore::model::metadata::MetaData;
#[cfg(feature = "debugger")]
use flowcore::model::output_connection::OutputConnection;
use flowcore::model::runtime_function::RuntimeFunction;
use flowcore::model::submission::Submission;
#[cfg(feature = "debugger")]
use crate::block::Block;
use crate::checks::completed_check;
#[cfg(feature = "debugger")]
use crate::debug_command::DebugCommand;
#[cfg(feature = "debugger")]
use crate::debugger::Debugger;
#[cfg(feature = "debugger")]
use crate::job::Job;
use crate::run_state::{RunState, State};
#[cfg(feature = "debugger")]
use crate::protocols::DebuggerProtocol;
use super::blocked_check;
use super::ready_check;
use super::running_check;
fn test_meta_data() -> MetaData {
MetaData {
name: "test".into(),
version: "0.0.0".into(),
description: "a test".into(),
authors: vec!["me".into()],
}
}
fn test_manifest(functions: Vec<RuntimeFunction>) -> FlowManifest {
let mut manifest = FlowManifest::new(test_meta_data());
for function in functions {
manifest.add_function(function);
}
manifest
}
fn test_submission(functions: Vec<RuntimeFunction>) -> Submission {
Submission::new(
test_manifest(functions),
None,
None,
#[cfg(feature = "debugger")]
true,
)
}
fn test_state(functions: Vec<RuntimeFunction>) -> RunState {
RunState::new(test_submission(functions))
}
fn test_function(function_id: usize, flow_id: usize) -> RuntimeFunction {
RuntimeFunction::new(
#[cfg(feature = "debugger")] "test",
#[cfg(feature = "debugger")] "/test",
"file://fake/test",
vec![],
function_id,
flow_id,
&[],
true,
)
}
#[test]
fn test_ready_passes() {
let function = test_function(0, 0);
let mut state = test_state(vec![function]);
state.make_ready_or_blocked(0, 0).expect("Couldn't get next job");
ready_check(&state, 0, state.get_function(0)
.ok_or("No function").expect("No function")).expect("Should pass")
}
#[test]
fn test_ready_fails() {
let function = test_function(0, 0);
let state = test_state(vec![function]);
assert!(ready_check(&state, 0, state.get_function(0)
.ok_or("No function").expect("No function")).is_err());
}
#[test]
fn test_running_passes() {
let function = test_function(0, 0);
let mut state = test_state(vec![function]);
state.make_ready_or_blocked(0, 0).expect("Couldn't get next job");
running_check(&state, 0, state.get_function(0)
.ok_or("No function").expect("No function")).expect("Should pass");
}
#[cfg(feature = "debugger")]
struct DummyServer;
#[cfg(feature = "debugger")]
impl DebuggerProtocol for DummyServer {
fn start(&mut self) {}
fn job_breakpoint(&mut self, _job: &Job, _function: &RuntimeFunction, _states: Vec<State>) {}
fn block_breakpoint(&mut self, _block: &Block) {}
fn flow_unblock_breakpoint(&mut self, _flow_id: usize) {}
fn send_breakpoint(&mut self, _: &str, _source_process_id: usize, _output_route: &str, _value: &Value,
_destination_id: usize, _destination_name:&str, _input_name: &str, _input_number: usize) {}
fn job_error(&mut self, _job: &Job) {}
fn job_completed(&mut self, _job: &Job) {}
fn blocks(&mut self, _blocks: Vec<Block>) {}
fn outputs(&mut self, _output: Vec<OutputConnection>) {}
fn input(&mut self, _input: Input) {}
fn function_list(&mut self, _functions: &[RuntimeFunction]) {}
fn function_states(&mut self, _function: RuntimeFunction, _function_states: Vec<State>) {}
fn run_state(&mut self, _run_state: &RunState) {}
fn message(&mut self, _message: String) {}
fn panic(&mut self, _state: &RunState, _error_message: String) {}
fn debugger_exiting(&mut self) {}
fn debugger_resetting(&mut self) {}
fn debugger_error(&mut self, _error: String) {}
fn execution_starting(&mut self) {}
fn execution_ended(&mut self) {}
fn get_command(&mut self, _state: &RunState) -> Result<DebugCommand> {
unimplemented!();
}
}
#[cfg(feature = "debugger")]
fn dummy_debugger(server: &mut dyn DebuggerProtocol) -> Debugger {
Debugger::new(server)
}
#[test]
fn test_blocked_passes() {
let function = test_function(0, 0);
let mut state = test_state(vec![function]);
#[cfg(feature = "debugger")]
let mut server = DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = dummy_debugger(&mut server);
let _ = state.create_block(1,
1,
0,
0, 0,
#[cfg(feature = "debugger")] &mut debugger);
blocked_check(&state, 0, state.get_function(0)
.ok_or("No function").expect("No function")).expect("Should pass");
}
#[test]
fn test_blocked_fails() {
let function = test_function(0, 0);
let state = test_state(vec![function]);
assert!(blocked_check(&state, 0, state.get_function(0)
.ok_or("No function").expect("No function")).is_err());
}
#[test]
fn test_completed_passes() {
let function = test_function(0, 0);
let mut state = test_state(vec![function]);
state.mark_as_completed(0);
let functions_states = vec![State::Completed];
completed_check(&state, 0, state.get_function(0)
.ok_or("No function").expect("No function"), &functions_states)
.expect("Should pass");
}
#[test]
fn test_completed_fails() {
let function = test_function(0, 0);
let state = test_state(vec![function]);
let functions_states = vec![State::Ready];
assert!(completed_check(&state, 0,
state.get_function(0)
.ok_or("No function").expect("No function"), &functions_states)
.is_err());
}
}