use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
use std::collections::VecDeque;
use std::fmt;
use log::{debug, error, info, trace};
use multimap::MultiMap;
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;
use flowcore::errors::*;
#[cfg(feature = "metrics")]
use flowcore::model::metrics::Metrics;
use flowcore::model::output_connection::OutputConnection;
use flowcore::model::output_connection::Source::{Input, Output};
use flowcore::model::runtime_function::RuntimeFunction;
use flowcore::model::submission::Submission;
use flowcore::RunAgain;
use crate::block::Block;
#[cfg(debug_assertions)]
use crate::checks;
#[cfg(feature = "debugger")]
use crate::debugger::Debugger;
use crate::job::{Job, JobPayload};
#[cfg(any(debug_assertions, feature = "debugger", test))]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum State {
Ready,
Blocked,
Waiting,
Running,
Completed,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct RunState {
pub(crate) submission: Submission,
blocked: HashSet<usize>,
blocks: HashSet<Block>,
ready_jobs: VecDeque<Job>,
running_jobs: HashMap<usize, Job>,
completed: HashSet<usize>,
number_of_jobs_created: usize,
busy_flows: MultiMap<usize, usize>,
flow_blocks: HashMap<usize, HashSet<usize>>,
}
impl RunState {
pub fn new(submission: Submission) -> Self {
RunState {
submission,
blocked: HashSet::<usize>::new(),
blocks: HashSet::<Block>::new(),
ready_jobs: VecDeque::<Job>::new(),
running_jobs: HashMap::<usize, Job>::new(),
completed: HashSet::<usize>::new(),
number_of_jobs_created: 0,
busy_flows: MultiMap::<usize, usize>::new(),
flow_blocks: HashMap::<usize, HashSet<usize>>::new(),
}
}
#[cfg(any(debug_assertions, feature = "debugger"))]
pub(crate) fn get_functions(&self) -> &Vec<RuntimeFunction> {
self.submission.manifest.functions()
}
#[cfg(feature = "debugger")]
fn reset(&mut self) {
debug!("Resetting RunState");
for function in self.submission.manifest.get_functions() {
function.reset()
}
self.blocked.clear();
self.blocks.clear();
self.ready_jobs.clear();
self.running_jobs.clear();
self.completed.clear();
self.number_of_jobs_created = 0;
self.busy_flows.clear();
self.flow_blocks.clear();
}
pub(crate) fn init(&mut self) -> Result<()> {
#[cfg(feature = "debugger")]
self.reset();
let mut make_ready_list = vec!();
debug!("Initializing all functions");
for function in self.submission.manifest.get_functions().iter_mut() {
function.init();
if function.can_run() {
make_ready_list.push((function.id(), function.get_flow_id()));
}
}
for (function_id, flow_id) in make_ready_list {
self.create_jobs(function_id, flow_id)?;
}
Ok(())
}
#[cfg(any(debug_assertions, feature = "debugger", test))]
pub fn get_function_states(&self, function_id: usize) -> Vec<State> {
let mut states = vec![];
if self.completed.contains(&function_id) {
states.push(State::Completed);
}
for ready_job in self.ready_jobs.iter() {
if ready_job.function_id == function_id {
states.push(State::Ready);
}
}
if self.blocked.contains(&function_id) {
states.push(State::Blocked);
}
if states.is_empty() {
states.push(State::Waiting);
}
states
}
#[cfg(test)]
pub(crate) fn function_state_is_only(&self, function_id: usize, state: State) -> bool {
let function_states = self.get_function_states(function_id);
function_states.len() == 1 && function_states.contains(&state)
}
#[cfg(feature = "debugger")]
pub fn get_blocked(&self) -> &HashSet<usize> {
&self.blocked
}
#[cfg(any(feature = "debugger", debug_assertions))]
pub fn get_running(&self) -> &HashMap<usize, Job> {
&self.running_jobs
}
#[cfg(feature = "debugger")]
pub fn get_completed(&self) -> &HashSet<usize> {
&self.completed
}
pub fn get_function(&self, id: usize) -> Option<&RuntimeFunction> {
self.submission.manifest.functions().get(id)
}
fn get_mut(&mut self, id: usize) -> Option<&mut RuntimeFunction> {
self.submission.manifest.get_functions().get_mut(id)
}
#[cfg(any(debug_assertions, feature = "debugger"))]
pub fn get_blocks(&self) -> &HashSet<Block> {
&self.blocks
}
#[cfg(debug_assertions)]
pub fn get_busy_flows(&self) -> &MultiMap<usize, usize> {
&self.busy_flows
}
#[cfg(debug_assertions)]
pub fn get_flow_blocks(&self) -> &HashMap<usize, HashSet<usize>> {
&self.flow_blocks
}
pub(crate) fn get_next_job(&mut self) -> Option<Job> {
if let Some(limit) = self.submission.max_parallel_jobs {
if self.number_jobs_running() >= limit {
trace!("max_parallel_jobs limit of {limit} reached");
return None;
}
}
self.ready_jobs.remove(0)
}
pub(crate) fn start_job(&mut self, job: Job) -> Result<()>{
self.block_external_flow_senders(job.payload.job_id, job.function_id,
job.flow_id)?;
self.running_jobs.insert(job.payload.job_id, job);
Ok(())
}
fn block_external_flow_senders(
&mut self,
job_id: usize,
blocker_function_id: usize,
blocker_flow_id: usize,
) -> Result<()> {
match self.flow_blocks.entry(blocker_flow_id) {
Entry::Occupied(mut o) => {
o.get_mut().insert(blocker_function_id);
},
Entry::Vacant(v) => {
let mut new_set = HashSet::new();
new_set.insert(blocker_function_id);
v.insert(new_set);
}
}
trace!("Job #{job_id}:\t\tAdded a flow_block -> #{blocker_function_id}({blocker_flow_id})");
Ok(())
}
#[cfg(any(feature = "metrics", feature = "debugger"))]
pub fn get_number_of_jobs_created(&self) -> usize {
self.number_of_jobs_created
}
#[allow(unused_variables, unused_assignments, unused_mut)]
pub(crate) fn retire_a_job(
&mut self,
#[cfg(feature = "metrics")] metrics: &mut Metrics,
result: (usize, Result<(Option<Value>, RunAgain)>),
#[cfg(feature = "debugger")] debugger: &mut Debugger,
) -> Result<(bool, bool, Job)>{
let mut display_next_output = false;
let mut restart = false;
let mut job = self.running_jobs.remove(&result.0)
.ok_or_else(|| format!("Could not find Job#{} to retire it", result.0))?;
match &result.1 {
Ok((output_value, function_can_run_again)) => {
#[cfg(feature = "debugger")]
debug!("Job #{}: Function #{} '{}' {:?} -> {:?}", job.payload.job_id, job.function_id,
self.get_function(job.function_id).ok_or("No such function")?.name(),
job.payload.input_set, output_value);
#[cfg(not(feature = "debugger"))]
debug!("Job #{}: Function #{} {:?} -> {:?}", job.payload.job_id, job.function_id,
job.payload.input_set, output_value);
for connection in &job.connections {
let value_to_send = match &connection.source {
Output(route) => {
match output_value {
Some(output_v) => output_v.pointer(route),
None => None
}
},
Input(index) => job.payload.input_set.get(*index),
};
if let Some(value) = value_to_send {
(display_next_output, restart) =
self.send_a_value(
job.function_id,
job.flow_id,
connection,
value.clone(),
#[cfg(feature = "metrics")] metrics,
#[cfg(feature = "debugger")] debugger,
)?;
} else {
trace!(
"Job #{}:\t\tNo value found at '{}'",
job.payload.job_id, &connection.source
);
}
}
if *function_can_run_again {
let function = self.get_mut(job.function_id)
.ok_or("No such function")?;
function.init_inputs(false, false);
if function.can_run() {
self.create_jobs(job.function_id, job.flow_id)?;
}
} else {
self.mark_as_completed(job.function_id);
}
},
Err(e) => {
error!("Error in Job #{}: {e}", job.payload.job_id)
}
}
(display_next_output, restart) = self.unblock_flows(&job,
#[cfg(feature = "debugger")] debugger,
)?;
#[cfg(debug_assertions)]
checks::check_invariants(self, job.payload.job_id)?;
trace!("Job #{}: Completed-----------------------", job.payload.job_id);
job.result = result.1;
Ok((display_next_output, restart, job))
}
fn send_a_value(
&mut self,
source_id: usize,
source_flow_id: usize,
connection: &OutputConnection,
output_value: Value,
#[cfg(feature = "metrics")] metrics: &mut Metrics,
#[cfg(feature = "debugger")] debugger: &mut Debugger,
) -> Result<(bool, bool)> {
let mut display_next_output = false;
let mut restart = false;
let route_str = match &connection.source {
Output(route) if route.is_empty() => "".into(),
Output(route) => format!(" from output route '{route}'"),
Input(index) => format!(" from Job value at input #{index}"),
};
let loopback = source_id == connection.destination_id;
let same_flow = source_flow_id == connection.destination_flow_id;
if loopback {
info!("\t\tFunction #{source_id} loopback of '{output_value}'{route_str} to Self:{}",
connection.destination_io_number);
} else {
info!("\t\tFunction #{source_id} sending '{output_value}'{route_str} to Function #{}:{}",
connection.destination_id, connection.destination_io_number);
};
#[cfg(feature = "debugger")]
if let Output(route) = &connection.source {
(display_next_output, restart) = debugger.check_prior_to_send(
self,
source_id,
route,
&output_value,
connection.destination_id,
connection.destination_io_number,
)?;
}
let function = self.get_mut(connection.destination_id)
.ok_or("Could not get function")?;
let job_count_before = function.input_sets_available();
function.send(connection.destination_io_number, output_value);
#[cfg(feature = "metrics")]
metrics.increment_outputs_sent();
let block = (function.values_available(connection.destination_io_number) > 0)
&& !loopback && !same_flow;
let new_job_available = function.input_sets_available() > job_count_before;
if block {
(display_next_output, restart) = self.create_block(
connection.destination_flow_id,
connection.destination_id,
connection.destination_io_number,
source_id,
source_flow_id,
#[cfg(feature = "debugger")]
debugger,
)?;
}
if new_job_available && !loopback {
self.create_jobs_or_block(connection.destination_id,
connection.destination_flow_id)?;
}
Ok((display_next_output, restart))
}
#[cfg(any(feature = "debugger", debug_assertions))]
pub fn get_output_blockers(&self, id: usize) -> Vec<usize> {
let mut blockers = vec![];
for block in &self.blocks {
if block.blocked_function_id == id {
blockers.push(block.blocking_function_id);
}
}
blockers
}
pub(crate) fn block_exists(&self, id: usize) -> bool {
for block in &self.blocks {
if block.blocked_function_id == id {
return true;
}
}
false
}
pub fn number_jobs_running(&self) -> usize {
self.running_jobs.len()
}
pub fn number_jobs_ready(&self) -> usize {
self.ready_jobs.len()
}
#[cfg(feature = "debugger")]
pub fn get_input_blockers(&self, target_id: usize) -> Result<Vec<usize>> {
let mut input_blockers = vec![];
let target_function = self.get_function(target_id)
.ok_or("No such function")?;
for (target_io, input) in target_function.inputs().iter().enumerate() {
if input.values_available() == 0 {
let mut senders = Vec::<usize>::new();
for sender_function in self.submission.manifest.functions().iter() {
let mut sender_is_ready = false;
for ready_job in self.ready_jobs.iter() {
if ready_job.function_id == sender_function.id() {
sender_is_ready = true;
}
}
if !sender_is_ready {
for destination in sender_function.get_output_connections() {
if (destination.destination_id == target_id)
&& (destination.destination_io_number == target_io)
{
senders.push(sender_function.id());
}
}
}
}
if senders.len() == 1 {
input_blockers.extend(senders);
}
}
}
Ok(input_blockers)
}
pub(crate) fn create_jobs_or_block(&mut self, function_id: usize, flow_id: usize)
-> Result<()> {
if self.block_exists(function_id) {
trace!( "\t\t\tFunction #{function_id} blocked on output. State set to 'Blocked'");
self.blocked.insert(function_id);
} else {
self.create_jobs(function_id, flow_id)?;
}
Ok(())
}
fn create_jobs(&mut self, function_id: usize, flow_id: usize) -> Result<()> {
loop {
self.number_of_jobs_created += 1;
let job_id = self.number_of_jobs_created;
let function = self.get_mut(function_id)
.ok_or("Could not get function")?;
if let Some(input_set) = function.take_input_set() {
let implementation_url = function.get_implementation_url().clone();
debug!("Job #{job_id} created for Function #{function_id}({flow_id}) \
in 'ready_jobs' with inputs: {:?}", input_set);
let job = Job {
function_id,
flow_id,
connections: function.get_output_connections().clone(),
payload: JobPayload {
job_id,
implementation_url,
input_set,
},
result: Ok((None, false)),
};
let always_ready = function.is_always_ready();
self.ready_jobs.push_back(job);
self.busy_flows.insert(flow_id, function_id);
if always_ready {
return Ok(());
}
} else {
self.number_of_jobs_created -= 1;
return Ok(())
}
}
}
#[cfg(any(feature = "debugger", feature = "metrics"))]
pub fn num_functions(&self) -> usize {
self.submission.manifest.functions().len()
}
#[allow(unused_variables, unused_assignments, unused_mut)]
fn unblock_flows(&mut self,
job: &Job,
#[cfg(feature = "debugger")] debugger: &mut Debugger,
) -> Result<(bool, bool)> {
let mut display_next_output = false;
let mut restart = false;
self.remove_from_busy(job.function_id);
if self.busy_flows.get(&job.flow_id).is_none() {
debug!("Job #{}:\tFlow #{} is now idle, so removing blocks on external functions to it",
job.payload.job_id, job.flow_id);
#[cfg(feature = "debugger")]
{
(display_next_output, restart) = debugger.check_prior_to_flow_unblock(self,
job.flow_id)?;
}
if let Some(blocker_functions) = self.flow_blocks.remove(&job.flow_id) {
for blocker_function_id in blocker_functions {
self.remove_blocks(blocker_function_id)?;
}
}
self.run_flow_initializers(job.flow_id)?;
}
Ok((display_next_output, restart))
}
fn remove_from_busy(&mut self, blocker_function_id: usize) {
let mut count = 0;
self.busy_flows.retain(|&_flow_id, &function_id| {
if function_id == blocker_function_id && count == 0 {
count += 1;
false } else {
true }
});
trace!("\t\t\tUpdated busy_flows list to: {:?}", self.busy_flows);
}
fn remove_blocks(&mut self, blocker_function_id: usize) -> Result<()>
{
let mut blocks_to_remove = vec![];
for block in &self.blocks {
if block.blocking_function_id == blocker_function_id {
blocks_to_remove.push(block.clone());
}
}
for block in blocks_to_remove {
self.blocks.remove(&block);
trace!("\t\t\tBlock removed {:?}", block);
if self.blocked.contains(&block.blocked_function_id) && !self.block_exists(block.blocked_function_id) {
trace!("\t\t\t\tFunction #{} removed from 'blocked' list", block.blocked_function_id);
self.blocked.remove(&block.blocked_function_id);
let function = self.get_function(block.blocked_function_id).ok_or("No such function")?;
if function.can_run() {
self.create_jobs_or_block(block.blocked_function_id, block.blocked_flow_id)?;
}
}
}
Ok(())
}
fn run_flow_initializers(&mut self, flow_id: usize) -> Result<()> {
let mut initialized_functions = Vec::<usize>::new();
for function in &mut self.submission.manifest.get_functions().iter_mut() {
if function.get_flow_id() == flow_id &&
!self.completed.contains(&function.id()) {
let could_run_before = function.can_run();
function.init_inputs(false, true);
let can_run_now = function.can_run();
if can_run_now && !could_run_before {
initialized_functions.push(function.id());
}
}
}
for function_id in initialized_functions {
self.create_jobs_or_block(function_id, flow_id)?;
}
Ok(())
}
pub(crate) fn mark_as_completed(&mut self, function_id: usize) {
self.completed.insert(function_id);
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn create_block(
&mut self,
blocking_flow_id: usize,
blocking_function_id: usize,
blocking_io_number: usize,
blocked_function_id: usize,
blocked_flow_id: usize,
#[cfg(feature = "debugger")] debugger: &mut Debugger,
) -> Result<(bool, bool)>{
let block = Block::new(
blocking_flow_id,
blocking_function_id,
blocking_io_number,
blocked_function_id,
blocked_flow_id,
);
trace!("\t\t\t\t\tCreating Block {:?}", block);
self.blocks.insert(block.clone());
#[cfg(feature = "debugger")]
return debugger.check_on_block_creation(self, &block);
#[cfg(not(feature = "debugger"))]
Ok((false, false))
}
}
impl fmt::Display for RunState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, " Submission:\n{}", self.submission)?;
writeln!(f, "RunState:")?;
writeln!(f, " Jobs Created: {}", self.number_of_jobs_created)?;
writeln!(f, "Number of Jobs Running: {}", self.running_jobs.len())?;
writeln!(f, " Jobs Running: {:?}", self.running_jobs.keys())?;
writeln!(f, " Functions Blocked: {:?}", self.blocked)?;
writeln!(f, " Blocks: {:?}", self.blocks)?;
writeln!(f, " Functions Ready: {:?}", self.ready_jobs.iter()
.map(|j| j.payload.job_id).collect::<Vec<usize>>())?;
writeln!(f, " Functions Completed: {:?}", self.completed)?;
writeln!(f, " Flows Busy: {:?}", self.busy_flows)?;
write!(f, " Pending Unblocks: {:?}", self.flow_blocks)
}
}
#[cfg(test)]
mod test {
use serde_json::{json, Value};
use url::Url;
use flowcore::errors::Result;
use flowcore::model::flow_manifest::FlowManifest;
use flowcore::model::input::Input;
use flowcore::model::input::InputInitializer::Once;
use flowcore::model::metadata::MetaData;
use flowcore::model::output_connection::{OutputConnection, Source};
use flowcore::model::runtime_function::RuntimeFunction;
use flowcore::model::submission::Submission;
#[cfg(feature = "debugger")]
use crate::block::Block;
#[cfg(feature = "debugger")]
use crate::debug_command::DebugCommand;
#[cfg(feature = "debugger")]
use crate::debugger::Debugger;
#[cfg(feature = "debugger")]
use crate::debugger_handler::DebuggerHandler;
#[cfg(feature = "debugger")]
use crate::run_state::State;
use super::{Job, JobPayload};
use super::RunState;
fn test_function_a_to_b_not_init() -> RuntimeFunction {
let connection_to_f1 = OutputConnection::new(
Source::default(),
1,
0,
0,
"/fB".to_string(),
#[cfg(feature = "debugger")]
String::default(),
);
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "", 0, false,
None, None)],
0,
0,
&[connection_to_f1],
false,
) }
fn test_function_a_to_b() -> RuntimeFunction {
let connection_to_f1 = OutputConnection::new(
Source::default(),
1,
0,
0,
"/fB".to_string(),
#[cfg(feature = "debugger")]
String::default(),
);
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "", 0, false,
Some(Once(json!(1))), None)],
0,
0,
&[connection_to_f1],
false,
) }
fn test_function_a_init() -> RuntimeFunction {
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "", 0, false,
Some(Once(json!(1))), None)],
0,
0,
&[],
false,
)
}
fn test_function_b_not_init() -> RuntimeFunction {
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fB",
#[cfg(feature = "debugger")]
"/fB",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "", 0, false,
None, None)],
1,
0,
&[],
false,
)
}
fn test_job(source_function_id: usize, destination_function_id: usize) -> Job {
let out_conn = OutputConnection::new(
Source::default(),
destination_function_id,
0,
0,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
);
Job {
function_id: source_function_id,
flow_id: 0,
connections: vec![out_conn],
payload: JobPayload {
job_id: 1,
implementation_url: Url::parse("file://test").expect("Could not parse Url"),
input_set: vec![json!(1)],
},
result: Ok((Some(json!(1)), true)),
}
}
#[cfg(feature = "debugger")]
struct DummyServer;
#[cfg(feature = "debugger")]
impl DebuggerHandler for DummyServer {
fn start(&mut self) {}
fn job_breakpoint(&mut self, _job: &Job, _function: &RuntimeFunction, _states: Vec<State>) {}
fn block_breakpoint(&mut self, _block: &Block) {}
fn flow_unblock_breakpoint(&mut self, _flow_id: usize) {}
fn send_breakpoint(&mut self, _: &str, _source_process_id: usize, _output_route: &str, _value: &Value,
_destination_id: usize, _destination_name:&str, _input_name: &str, _input_number: usize) {}
fn job_error(&mut self, _job: &Job) {}
fn job_completed(&mut self, _job: &Job) {}
fn blocks(&mut self, _blocks: Vec<Block>) {}
fn outputs(&mut self, _output: Vec<OutputConnection>) {}
fn input(&mut self, _input: Input) {}
fn function_list(&mut self, _functions: &[RuntimeFunction]) {}
fn function_states(&mut self, _function: RuntimeFunction, _function_states: Vec<State>) {}
fn run_state(&mut self, _run_state: &RunState) {}
fn message(&mut self, _message: String) {}
fn panic(&mut self, _state: &RunState, _error_message: String) {}
fn debugger_exiting(&mut self) {}
fn debugger_resetting(&mut self) {}
fn debugger_error(&mut self, _error: String) {}
fn execution_starting(&mut self) {}
fn execution_ended(&mut self) {}
fn get_command(&mut self, _state: &RunState) -> Result<DebugCommand> {
unimplemented!();
}
}
#[cfg(feature = "debugger")]
fn dummy_debugger(server: &mut dyn DebuggerHandler) -> Debugger {
Debugger::new(server)
}
fn test_meta_data() -> MetaData {
MetaData {
name: "test".into(),
version: "0.0.0".into(),
description: "a test".into(),
authors: vec!["me".into()],
}
}
fn test_manifest(functions: Vec<RuntimeFunction>) -> FlowManifest {
let mut manifest = FlowManifest::new(test_meta_data());
for function in functions {
manifest.add_function(function);
}
manifest
}
fn test_submission(functions: Vec<RuntimeFunction>) -> Submission {
Submission::new(
test_manifest(functions),
None,
None,
#[cfg(feature = "debugger")]
true,
)
}
mod general_run_state_tests {
#[cfg(feature = "debugger")]
use std::collections::HashSet;
use super::super::RunState;
#[test]
fn display_run_state_test() {
let f_a = super::test_function_a_to_b();
let f_b = super::test_function_b_not_init();
let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
state.init().expect("Could not init state");
#[cfg(any(feature = "debugger", feature = "metrics"))]
assert_eq!(state.num_functions(), 2);
println!("Run state: {state}");
}
#[cfg(feature = "metrics")]
#[test]
fn jobs_created_zero_at_init() {
let mut state = RunState::new(super::test_submission(vec![]));
state.init().expect("Could not init state");
assert_eq!(0, state.get_number_of_jobs_created(), "At init jobs() should be 0");
assert_eq!(0, state.number_jobs_ready());
}
#[cfg(feature = "debugger")]
#[test]
fn zero_blocks_at_init() {
let mut state = RunState::new(super::test_submission(vec![]));
state.init().expect("Could not init state");
assert_eq!(
&HashSet::new(),
state.get_blocks(),
"At init get_blocks() should be empty"
);
}
#[cfg(feature = "debugger")]
#[test]
fn zero_running_at_init() {
let mut state = RunState::new(super::test_submission(vec![]));
state.init().expect("Could not init state");
assert!(state.get_running().is_empty(), "At init get_running() should be empty");
}
#[cfg(feature = "debugger")]
#[test]
fn zero_blocked_at_init() {
let mut state = RunState::new(super::test_submission(vec![]));
state.init().expect("Could not init state");
assert_eq!(
&HashSet::new(),
state.get_blocked(),
"At init get_blocked() should be empty"
);
}
}
mod state_transitions {
use serde_json::json;
use serial_test::serial;
use url::Url;
use flowcore::model::input::Input;
use flowcore::model::input::InputInitializer::Always;
#[cfg(feature = "metrics")]
use flowcore::model::metrics::Metrics;
use flowcore::model::output_connection::{OutputConnection, Source};
use flowcore::model::runtime_function::RuntimeFunction;
use crate::run_state::test::test_function_b_not_init;
use super::super::{Job, JobPayload};
use super::super::RunState;
use super::super::State;
#[test]
fn to_ready_1_on_init() {
let f_a = super::test_function_a_to_b();
let f_b = test_function_b_not_init();
let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
state.init().expect("Could not init state");
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
assert_eq!(1, state.number_jobs_ready());
assert!(
state.function_state_is_only(1, State::Waiting),
"f_b should be waiting for input"
);
}
#[test]
fn input_blocker() {
let f_a = super::test_function_a_to_b_not_init();
let f_b = test_function_b_not_init();
let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
state.init().expect("Could not init state");
assert!(
state.function_state_is_only(0, State::Waiting),
"f_a should be waiting for input"
);
assert!(
state.function_state_is_only(1, State::Waiting),
"f_b should be waiting for input"
);
#[cfg(feature = "debugger")]
assert!(
state.get_input_blockers(1).expect("Could not get blockers").contains(&0),
"There should be an input blocker"
)
}
#[test]
fn to_ready_2_on_init() {
let f_a = super::test_function_a_to_b();
let f_b = test_function_b_not_init();
let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
state.init().expect("Could not init state");
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
}
#[test]
fn to_ready_3_on_init() {
let f_a = super::test_function_a_init();
let mut state = RunState::new(super::test_submission(vec![f_a]));
state.init().expect("Could not init state");
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
}
fn test_function_a_not_init() -> RuntimeFunction {
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "", 0, false,
None, None)],
0,
0,
&[],
false,
)
}
#[test]
fn to_waiting_on_init() {
let f_a = test_function_a_not_init();
let mut state = RunState::new(super::test_submission(vec![f_a]));
state.init().expect("Could not init state");
assert!(state.function_state_is_only(0, State::Waiting), "f_a should be Waiting");
}
#[test]
fn ready_to_running_on_next() {
let f_a = super::test_function_a_init();
let mut state = RunState::new(super::test_submission(vec![f_a]));
state.init().expect("Could not init state");
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
let job = state.get_next_job().expect("Couldn't get next job");
state.start_job(job.clone()).expect("Could not start job");
state.running_jobs.get(&job.payload.job_id)
.expect("Job should have been running");
}
#[test]
fn unready_not_to_running_on_next() {
let f_a = test_function_a_not_init();
let mut state = RunState::new(super::test_submission(vec![f_a]));
state.init().expect("Could not init state");
assert!(state.function_state_is_only(0, State::Waiting), "f_a should be Waiting");
assert!(state.get_next_job().is_none(), "next_job() should return None");
assert!(state.function_state_is_only(0, State::Waiting), "f_a should be Waiting");
}
fn test_job() -> Job {
Job {
function_id: 0,
flow_id: 0,
connections: vec![],
payload: JobPayload {
job_id: 1,
implementation_url: Url::parse("file://test").expect("Could not parse Url"),
input_set: vec![json!(1)],
},
result: (Ok((None, true))),
}
}
#[test]
#[serial]
fn running_to_ready_on_done() {
let f_a = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "", 0, false,
Some(Always(json!(1))), None)],
0,
0,
&[],
false,
);
let mut state = RunState::new(super::test_submission(vec![f_a]));
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(1);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init().expect("Could not init state");
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
let job = state.get_next_job().expect("Couldn't get next job");
assert_eq!(0, job.function_id, "get_next_job() should return function_id = 0");
state.start_job(job.clone()).expect("Could not start job");
state.running_jobs.get(&job.payload.job_id).expect("Job with f_a should be Running");
let job = test_job();
state.retire_a_job(
#[cfg(feature = "metrics")]
&mut metrics,
(job.payload.job_id, job.result),
#[cfg(feature = "debugger")]
&mut debugger,
).expect("Problem retiring job");
assert!(
state.function_state_is_only(0, State::Ready),
"f_a should be Ready again"
);
}
#[test]
#[serial]
fn running_to_waiting_on_done() {
let f_a = super::test_function_a_init();
let mut state = RunState::new(super::test_submission(vec![f_a]));
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(1);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init().expect("Could not init state");
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
let job = state.get_next_job().expect("Couldn't get next job");
assert_eq!(0, job.function_id, "next() should return function_id = 0");
state.start_job(job.clone()).expect("Could not start job");
state.running_jobs.get(&job.payload.job_id).expect("Job with f_a should be Running");
let job = test_job();
state.retire_a_job(
#[cfg(feature = "metrics")]
&mut metrics,
(job.payload.job_id, job.result),
#[cfg(feature = "debugger")]
&mut debugger,
).expect("Problem retiring job");
assert!(state.function_state_is_only(0, State::Waiting),
"f_a should be Waiting again"
);
}
#[test]
#[serial]
fn waiting_to_ready_on_input() {
let f_a = test_function_a_not_init();
let out_conn = OutputConnection::new(
Source::default(),
0,
0,
0,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
);
let f_b = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fB",
#[cfg(feature = "debugger")]
"/fB",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "", 0, false,
None, None)],
1,
0,
&[out_conn],
false,
);
let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(1);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init().expect("Could not init state");
assert!(state.function_state_is_only(0, State::Waiting), "f_a should be Waiting");
let job = super::test_job(1, 0);
state.start_job(job.clone()).expect("Could not start job");
state.retire_a_job(
#[cfg(feature = "metrics")]
&mut metrics,
(job.payload.job_id, job.result),
#[cfg(feature = "debugger")]
&mut debugger,
).expect("Problem retiring job");
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
}
#[test]
#[serial]
fn waiting_to_blocked_on_input() {
let f_a = super::test_function_a_to_b_not_init();
let connection_to_f0 = OutputConnection::new(
Source::default(),
0,
0,
0,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
);
let f_b = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fB",
#[cfg(feature = "debugger")]
"/fB",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "", 0, false,
Some(Always(json!(1))), None)],
1,
0,
&[connection_to_f0],
false,
);
let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(1);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init().expect("Could not init state");
assert!(state.function_state_is_only(1, State::Ready), "f_b should be Ready");
assert!(
state.function_state_is_only(0, State::Waiting),
"f_a should be in Waiting"
);
let job = super::test_job(1, 0);
state.start_job(job.clone()).expect("Could not start job");
state.retire_a_job(
#[cfg(feature = "metrics")]
&mut metrics,
(job.payload.job_id, job.result),
#[cfg(feature = "debugger")]
&mut debugger,
).expect("Problem retiring job");
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
}
}
mod functional_tests {
use serial_test::serial;
use flowcore::model::input::Input;
#[cfg(feature = "metrics")]
use flowcore::model::metrics::Metrics;
use flowcore::model::output_connection::{OutputConnection, Source};
use flowcore::model::runtime_function::RuntimeFunction;
use super::super::RunState;
fn test_functions() -> Vec<RuntimeFunction> {
let out_conn1 = OutputConnection::new(
Source::default(),
1,
0,
0,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
);
let out_conn2 = OutputConnection::new(
Source::default(),
2,
0,
0,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
);
let p0 = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"p0",
#[cfg(feature = "debugger")]
"/p0",
"file://fake/test/p0",
vec![], 0,
0,
&[out_conn1, out_conn2], false,
); let p1 = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"p1",
#[cfg(feature = "debugger")]
"/p1",
"file://fake/test/p1",
vec![Input::new(
#[cfg(feature = "debugger")] "", 0, false,
None, None)], 1,
0,
&[],
false,
);
let p2 = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"p2",
#[cfg(feature = "debugger")]
"/p2",
"file://fake/test/p2",
vec![Input::new(
#[cfg(feature = "debugger")] "", 0, false,
None, None)], 2,
0,
&[],
false,
);
vec![p0, p1, p2]
}
#[test]
#[serial]
fn blocked_works() {
let mut state = RunState::new(super::test_submission(test_functions()));
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
let _ = state.create_block(
0,
1,
0,
0,
0,
#[cfg(feature = "debugger")]
&mut debugger,
);
assert!(!state.get_output_blockers(0).is_empty());
}
#[test]
fn get_works() {
let state = RunState::new(super::test_submission(test_functions()));
let got = state.get_function(1)
.ok_or("Could not get function by id").expect("Could not get function with that id");
assert_eq!(got.id(), 1)
}
#[test]
fn no_next_if_none_ready() {
let mut state = RunState::new(super::test_submission(test_functions()));
assert!(state.get_next_job().is_none());
}
#[test]
fn next_works() {
let mut state = RunState::new(super::test_submission(test_functions()));
state.create_jobs_or_block(0, 0).expect("Could not make ready or blocked");
state.get_next_job().expect("Couldn't get next job");
}
#[test]
fn inputs_ready_makes_ready() {
let mut state = RunState::new(super::test_submission(test_functions()));
state.create_jobs_or_block(0, 0).expect("Could not make ready or blocked");
state.get_next_job().expect("Couldn't get next job");
}
#[test]
#[serial]
fn blocked_is_not_ready() {
let mut state = RunState::new(super::test_submission(test_functions()));
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
let _ = state.create_block(
0,
1,
0,
0,
0,
#[cfg(feature = "debugger")]
&mut debugger,
);
state.create_jobs_or_block(0, 0).expect("Could not make ready or blocked");
assert!(state.get_next_job().is_none());
}
#[test]
#[serial]
fn unblocking_doubly_blocked_functions_not_ready() {
let mut state = RunState::new(super::test_submission(test_functions()));
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
let _ = state.create_block(
0,
1,
0,
0,
0,
#[cfg(feature = "debugger")]
&mut debugger,
);
let _ = state.create_block(
0,
2,
0,
0,
0,
#[cfg(feature = "debugger")]
&mut debugger,
);
state.create_jobs_or_block(0, 0).expect("Could not make ready or blocked");
assert!(state.get_next_job().is_none());
state.block_external_flow_senders(0, 1, 0)
.expect("Could not unblock");
assert!(state.get_next_job().is_none());
}
#[test]
#[serial]
fn wont_return_too_many_jobs() {
let mut state = RunState::new(super::test_submission(test_functions()));
state.init().expect("Could not init state");
let _ = state.get_next_job().expect("Couldn't get next job");
assert!(state.get_next_job().is_none(), "Did not expect a Ready job!");
}
#[test]
#[serial]
fn pure_function_no_destinations() {
let f_a = super::test_function_a_init();
let _id = f_a.id();
let mut state = RunState::new(super::test_submission(vec![f_a]));
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(1);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init().expect("Could not init state");
let job = state.get_next_job().expect("Couldn't get next job");
state.start_job(job.clone()).expect("Could not start job");
state.retire_a_job(
#[cfg(feature = "metrics")]
&mut metrics,
(job.payload.job_id, job.result),
#[cfg(feature = "debugger")]
&mut debugger,
).expect("Failed to retire job correctly");
}
}
}