use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
use std::collections::VecDeque;
use std::fmt;
use std::time::Duration;
use log::{debug, error, trace};
use multimap::MultiMap;
use serde_derive::{Deserialize, Serialize};
use serde_json::{json, Value};
#[cfg(feature = "metrics")]
use flowcore::model::metrics::Metrics;
use flowcore::model::output_connection::OutputConnection;
use flowcore::model::output_connection::Source::{Input, Output};
use flowcore::model::runtime_function::RuntimeFunction;
use flowcore::model::submission::Submission;
use crate::block::Block;
#[cfg(debug_assertions)]
use crate::checks;
#[cfg(feature = "debugger")]
use crate::debugger::Debugger;
use crate::job::Job;
#[cfg(any(debug_assertions, feature = "debugger", test))]
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum State {
Ready,
Blocked,
Waiting,
Running,
Completed,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct RunState {
functions: Vec<RuntimeFunction>,
blocked: HashSet<usize>,
blocks: HashSet<Block>,
ready: VecDeque<usize>,
running: MultiMap<usize, usize>,
completed: HashSet<usize>,
number_of_jobs_created: usize,
max_pending_jobs: usize,
#[cfg(feature = "debugger")]
pub debug: bool,
pub job_timeout: Duration,
busy_flows: MultiMap<usize, usize>,
pending_unblocks: HashMap<usize, HashSet<usize>>,
}
impl RunState {
pub fn new(functions: &[RuntimeFunction], submission: Submission) -> Self {
RunState {
functions: functions.to_vec(),
blocked: HashSet::<usize>::new(),
blocks: HashSet::<Block>::new(),
ready: VecDeque::<usize>::new(),
running: MultiMap::<usize, usize>::new(),
completed: HashSet::<usize>::new(),
number_of_jobs_created: 0,
max_pending_jobs: submission.max_parallel_jobs,
#[cfg(feature = "debugger")]
debug: submission.debug,
job_timeout: submission.job_timeout,
busy_flows: MultiMap::<usize, usize>::new(),
pending_unblocks: HashMap::<usize, HashSet<usize>>::new(),
}
}
pub fn get_functions(&self) -> &Vec<RuntimeFunction> {
&self.functions
}
#[cfg(feature = "debugger")]
fn reset(&mut self) {
debug!("Resetting RunState");
for function in &mut self.functions {
function.reset()
}
self.blocked.clear();
self.blocks.clear();
self.ready.clear();
self.running.clear();
self.completed.clear();
self.number_of_jobs_created = 0;
self.busy_flows.clear();
self.pending_unblocks.clear();
}
pub fn init(&mut self) {
#[cfg(feature = "debugger")]
self.reset();
let mut inputs_ready_list = Vec::<(usize, usize)>::new();
debug!("Initializing inputs with initializers");
for function in &mut self.functions {
function.init_inputs(true);
if function.can_produce_output() {
inputs_ready_list.push((function.id(), function.get_flow_id()));
}
}
self.create_init_blocks();
debug!("Readying initial functions: inputs full and not blocked on output");
for (id, flow_id) in inputs_ready_list {
self.make_ready_or_blocked(id, flow_id);
}
}
fn create_init_blocks(&mut self) {
let mut blocks = HashSet::<Block>::new();
let mut blocked = HashSet::<usize>::new();
debug!("Creating any initial block entries that are needed");
for source_function in &self.functions {
let source_id = source_function.id();
let source_flow_id = source_function.get_flow_id();
let destinations = source_function.get_output_connections();
let source_has_inputs_full = source_function.can_produce_output();
for destination in destinations {
if destination.function_id != source_id {
let destination_function = self.get_function(destination.function_id);
if destination_function.input_count(destination.io_number) > 0 {
trace!(
"\tAdded block #{} -> #{}:{}",
source_id,
destination.function_id,
destination.io_number
);
blocks.insert(Block::new(
destination.flow_id,
destination.function_id,
destination.io_number,
source_id,
source_flow_id,
0
));
if source_has_inputs_full {
blocked.insert(source_id);
}
}
}
}
}
self.blocks = blocks;
self.blocked = blocked;
}
#[cfg(any(debug_assertions, feature = "debugger", test))]
pub fn get_function_states(&self, function_id: usize) -> Vec<State> {
let mut states = vec![];
if self.completed.contains(&function_id) {
states.push(State::Completed);
}
if self.ready.contains(&function_id) {
states.push(State::Ready);
}
if self.blocked.contains(&function_id) {
states.push(State::Blocked);
}
if self.running.contains_key(&function_id) {
states.push(State::Running);
}
if states.is_empty() {
states.push(State::Waiting);
}
states
}
#[cfg(any(debug_assertions, feature = "debugger", test))]
pub fn function_state_is_only(&self, function_id: usize, state: State) -> bool {
let function_states = self.get_function_states(function_id);
function_states.len() == 1 && function_states.contains(&state)
}
#[cfg(any(debug_assertions, feature = "debugger", test))]
pub fn function_states_includes(&self, function_id: usize, state: State) -> bool {
match state {
State::Ready => self.ready.contains(&function_id),
State::Blocked => self.blocked.contains(&function_id),
State::Running => self.running.contains_key(&function_id),
State::Completed => self.completed.contains(&function_id),
State::Waiting => {
!self.ready.contains(&function_id) &&
!self.blocked.contains(&function_id) &&
!self.running.contains_key(&function_id) &&
!self.completed.contains(&function_id)
}
}
}
#[cfg(feature = "debugger")]
pub fn get_blocked(&self) -> &HashSet<usize> {
&self.blocked
}
#[cfg(feature = "debugger")]
pub fn get_running(&self) -> &MultiMap<usize, usize> {
&self.running
}
#[cfg(feature = "debugger")]
pub fn get_completed(&self) -> &HashSet<usize> {
&self.completed
}
pub fn get_function(&self, id: usize) -> &RuntimeFunction {
&self.functions[id]
}
pub fn get_mut(&mut self, id: usize) -> &mut RuntimeFunction {
&mut self.functions[id]
}
#[cfg(debug_assertions)]
pub fn get_blocks(&self) -> &HashSet<Block> {
&self.blocks
}
#[cfg(debug_assertions)]
pub(crate) fn get_busy_flows(&self) -> &MultiMap<usize, usize> {
&self.busy_flows
}
#[cfg(debug_assertions)]
pub(crate) fn get_pending_unblocks(&self) -> &HashMap<usize, HashSet<usize>> {
&self.pending_unblocks
}
pub fn next_job(&mut self) -> Option<Job> {
if self.number_jobs_running() >= self.max_pending_jobs {
trace!("Max Pending Job count of {} reached, skipping new jobs", self.max_pending_jobs);
return None;
}
match self.ready.remove(0) {
Some(function_id) => {
let job = self.create_job(function_id);
if let Some(ref j) = job {
self.unblock_internal_flow_senders(j.job_id, j.function_id, j.flow_id);
}
job
}
None => None,
}
}
fn unblock_internal_flow_senders(
&mut self,
job_id: usize,
blocker_function_id: usize,
blocker_flow_id: usize,
) {
let internal_senders_filter = |block: &Block|
(block.blocking_flow_id == block.blocked_flow_id) &
(block.blocking_function_id == blocker_function_id);
self.unblock_senders_to_function(internal_senders_filter);
match self.pending_unblocks.entry(blocker_flow_id) {
Entry::Occupied(mut o) => {
o.get_mut().insert(blocker_function_id);
},
Entry::Vacant(v) => {
let mut new_set = HashSet::new();
new_set.insert(blocker_function_id);
v.insert(new_set);
}
}
trace!("Job #{job_id}:\t\tAdded a pending_unblock -> #{blocker_function_id}({blocker_flow_id})");
}
#[cfg(any(feature = "metrics", feature = "debugger"))]
pub fn get_number_of_jobs_created(&self) -> usize {
self.number_of_jobs_created
}
fn create_job(&mut self, function_id: usize) -> Option<Job> {
self.number_of_jobs_created += 1;
let job_id = self.number_of_jobs_created;
let function = self.get_mut(function_id);
match function.take_input_set() {
Ok(input_set) => {
let flow_id = function.get_flow_id();
let implementation = function.get_implementation();
let connections = function.get_output_connections().clone();
trace!("Job #{job_id}: NEW Job Created for Function #{function_id}({flow_id})");
Some(Job {
job_id,
function_id,
flow_id,
implementation,
input_set,
connections,
result: Ok((None, false)),
})
}
Err(e) => {
error!(
"Job #{}: Error '{}' while creating job for Function #{}",
job_id, e, function_id
);
None
}
}
}
pub fn complete_job(
&mut self,
#[cfg(feature = "metrics")] metrics: &mut Metrics,
job: &Job,
#[cfg(feature = "debugger")] debugger: &mut Debugger,
) {
self.running.retain(|&_, &job_id| job_id != job.job_id);
#[cfg(debug_assertions)]
let job_id = job.job_id;
match &job.result {
Ok(result) => {
let output_value = &result.0;
let function_can_run_again = result.1;
#[cfg(feature = "debugger")]
debug!("Job #{}: Function #{} '{}' {:?} -> {:?}", job.job_id, job.function_id,
self.get_function(job.function_id).name(), job.input_set, output_value);
#[cfg(not(feature = "debugger"))]
debug!("Job #{}: Function #{} {:?} -> {:?}", job.job_id, job.function_id,
job.input_set, output_value);
if let Some(output_v) = output_value {
for destination in &job.connections {
let value_to_send = match &destination.source {
Output(route) => output_v.pointer(route),
Input(index) => job.input_set.get(*index),
};
if let Some(value) = value_to_send {
self.send_a_value(
job.function_id,
job.flow_id,
destination,
value,
#[cfg(feature = "metrics")]
metrics,
#[cfg(feature = "debugger")]
debugger,
);
} else {
trace!(
"Job #{}:\t\tNo value found at '{}'",
job.job_id, &destination.source
);
}
}
}
if function_can_run_again {
self.init_inputs(job.function_id);
let function = self.get_function(job.function_id);
if function.can_produce_output() {
self.make_ready_or_blocked(
job.function_id,
job.flow_id,
);
}
} else {
self.mark_as_completed(job.function_id);
}
},
Err(e) => error!("Error in Job#{}: {}", job.job_id, e)
}
self.remove_from_busy(job.function_id);
self.unblock_flows(job.flow_id, job.job_id);
#[cfg(debug_assertions)]
checks::check_invariants(self, job_id);
trace!(
"Job #{}: Completed-----------------------",
job.job_id,
);
}
fn send_a_value(
&mut self,
source_id: usize,
source_flow_id: usize,
connection: &OutputConnection,
output_value: &Value,
#[cfg(feature = "metrics")] metrics: &mut Metrics,
#[cfg(feature = "debugger")] debugger: &mut Debugger,
) {
let route_str = match &connection.source {
Output(route) if route.is_empty() => "".into(),
Output(route) => format!(" from output route '{}'", route),
Input(index) => format!(" from Job value at input #{}", index),
};
let loopback = source_id == connection.function_id;
if loopback {
trace!("\t\tFunction #{source_id} loopback of '{}'{} to Self:{}",
output_value, route_str, connection.io_number);
} else {
trace!("\t\tFunction #{source_id} sending '{}'{} to Function #{}:{}",
output_value, route_str, connection.function_id, connection.io_number);
};
#[cfg(feature = "debugger")]
if let Output(route) = &connection.source {
debugger.check_prior_to_send(
self,
source_id,
route,
output_value,
connection.function_id,
connection.io_number,
);
}
let function = self.get_mut(connection.function_id);
let count_before = function.input_set_count();
Self::type_convert_and_send(function, connection, output_value);
#[cfg(feature = "metrics")]
metrics.increment_outputs_sent();
let block = function.input_count(connection.io_number) > 0;
let new_input_set_available = function.input_set_count() > count_before;
if block && !loopback {
self.create_block(
connection.flow_id,
connection.function_id,
connection.io_number,
source_id,
source_flow_id,
connection.get_priority(),
#[cfg(feature = "debugger")]
debugger,
);
}
if new_input_set_available && !loopback {
self.make_ready_or_blocked(connection.function_id, connection.flow_id);
}
}
fn init_inputs(&mut self, function_id: usize) {
self.get_mut(function_id).init_inputs(false);
}
fn array_order(value: &Value) -> i32 {
match value {
Value::Array(array) if !array.is_empty() => 1 + Self::array_order(&array[0]),
Value::Array(array) if array.is_empty() => 1,
_ => 0,
}
}
fn type_convert_and_send(
function: &mut RuntimeFunction,
connection: &OutputConnection,
value: &Value,
) -> bool {
if connection.is_generic() {
function.send(connection.io_number, value);
} else {
match (
(Self::array_order(value) - connection.destination_array_order),
value,
) {
(0, _) => function.send(connection.io_number, value),
(1, Value::Array(array)) => function.send_iter(connection.io_number,
array),
(2, Value::Array(array_2)) => {
for array in array_2.iter() {
if let Value::Array(sub_array) = array {
function.send_iter(connection.io_number, sub_array)
}
}
}
(-1, _) => function.send(connection.io_number, &json!([value])),
(-2, _) => function.send(connection.io_number, &json!([[value]])),
_ => {
error!("Unable to handle difference in array order");
return false;
},
}
}
true }
pub fn start(&mut self, job: &Job) {
self.running.insert(job.function_id, job.job_id);
}
#[cfg(feature = "debugger")]
pub fn get_output_blockers(&self, id: usize) -> Vec<(usize, usize)> {
let mut blockers = vec![];
for block in &self.blocks {
if block.blocked_function_id == id {
blockers.push((block.blocking_function_id, block.blocking_io_number));
}
}
blockers
}
pub fn number_jobs_running(&self) -> usize {
let mut num_running_jobs = 0;
for (_, vector) in self.running.iter_all() {
num_running_jobs += vector.len()
}
num_running_jobs
}
pub fn number_jobs_ready(&self) -> usize {
self.ready.len()
}
#[cfg(feature = "debugger")]
pub fn get_input_blockers(&self, target_id: usize) -> Vec<(usize, usize)> {
let mut input_blockers = vec![];
let target_function = self.get_function(target_id);
for (target_io, input) in target_function.inputs().iter().enumerate() {
if input.count() == 0 {
let mut senders = Vec::<(usize, usize)>::new();
for sender_function in &self.functions {
if !self.ready.contains(&sender_function.id()) {
for destination in sender_function.get_output_connections() {
if (destination.function_id == target_id)
&& (destination.io_number == target_io)
{
senders.push((sender_function.id(), target_io));
}
}
}
}
if senders.len() == 1 {
input_blockers.extend(senders);
}
}
}
input_blockers
}
fn make_ready_or_blocked(&mut self, id: usize, flow_id: usize) {
if self.blocked_sending(id) {
trace!( "\t\t\tFunction #{} blocked on output. State set to 'Blocked'", id);
self.blocked.insert(id);
} else {
trace!("\t\t\tFunction #{} not blocked on output. State set to 'Ready'", id);
self.mark_ready(id, flow_id);
}
}
fn mark_ready(&mut self, function_id: usize, flow_id: usize) {
self.ready.push_back(function_id);
self.busy_flows.insert(flow_id, function_id);
}
pub(crate) fn blocked_sending(&self, id: usize) -> bool {
for block in &self.blocks {
if block.blocked_function_id == id {
return true;
}
}
false
}
#[cfg(any(feature = "debugger", feature = "metrics"))]
pub fn num_functions(&self) -> usize {
self.functions.len()
}
fn unblock_flows(&mut self, blocker_flow_id: usize, job_id: usize) {
if self.busy_flows.get(&blocker_flow_id).is_none() {
trace!("Job #{job_id}:\tFlow #{blocker_flow_id} is now idle, \
so removing pending_unblocks for flow #{blocker_flow_id}");
if let Some(pending_unblocks) = self.pending_unblocks.remove(&blocker_flow_id) {
trace!("Job #{job_id}:\tRemoving pending unblocks to functions in \
Flow #{blocker_flow_id} from other flows");
for unblock_function_id in pending_unblocks {
let all = |block: &Block| block.blocking_function_id == unblock_function_id;
self.unblock_senders_to_function(all);
}
}
}
}
fn mark_as_completed(&mut self, function_id: usize) {
self.completed.insert(function_id);
}
fn remove_from_busy(&mut self, blocker_function_id: usize) {
let mut count = 0;
self.busy_flows.retain(|&_flow_id, &function_id| {
if function_id == blocker_function_id && count == 0 {
count += 1;
false } else {
true }
});
trace!("\t\t\tUpdated busy_flows list to: {:?}", self.busy_flows);
}
fn unblock_senders_to_function<F>(&mut self, block_filter: F)
where
F: Fn(&Block) -> bool,
{
let mut unblock_set = vec![];
for block in &self.blocks {
if block_filter(block) {
unblock_set.push(block.clone());
}
}
unblock_set.sort_by(|a, b| b.priority.cmp(&a.priority));
unblock_set.reverse();
for block in unblock_set {
self.blocks.remove(&block);
trace!("\t\t\tBlock removed {:?}", block);
if self.blocked.contains(&block.blocked_function_id) && !self.blocked_sending(block.blocked_function_id) {
trace!("\t\t\t\tFunction #{} removed from 'blocked' list", block.blocked_function_id);
self.blocked.remove(&block.blocked_function_id);
if self.get_function(block.blocked_function_id).can_produce_output() {
trace!("\t\t\t\tFunction #{} has inputs ready, so added to 'ready' list",
block.blocked_function_id);
self.mark_ready(block.blocked_function_id, block.blocked_flow_id);
}
}
}
}
#[allow(clippy::too_many_arguments)]
fn create_block(
&mut self,
blocking_flow_id: usize,
blocking_function_id: usize,
blocking_io_number: usize,
blocked_function_id: usize,
blocked_flow_id: usize,
priority: usize,
#[cfg(feature = "debugger")] debugger: &mut Debugger,
) {
let block = Block::new(
blocking_flow_id,
blocking_function_id,
blocking_io_number,
blocked_function_id,
blocked_flow_id,
priority,
);
trace!("\t\t\t\t\tCreating Block {:?}", block);
#[cfg(feature = "debugger")]
debugger.check_on_block_creation(self, &block);
self.blocks.insert(block);
}
}
impl fmt::Display for RunState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "RunState:")?;
writeln!(f, " Jobs Created: {}", self.number_of_jobs_created)?;
writeln!(f, ". Functions Blocked: {:?}", self.blocked)?;
writeln!(f, " Blocks: {:?}", self.blocks)?;
writeln!(f, " Functions Ready: {:?}", self.ready)?;
writeln!(f, " Functions Running: {:?}", self.running)?;
writeln!(f, "Functions Completed: {:?}", self.completed)?;
writeln!(f, " Flows Busy: {:?}", self.busy_flows)?;
write!(f, " Pending Unblocks: {:?}", self.pending_unblocks)
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use serde_json::json;
use serde_json::Value;
use flowcore::{Implementation, RunAgain};
use flowcore::errors::Result;
use flowcore::model::input::Input;
use flowcore::model::input::InputInitializer::Once;
use flowcore::model::output_connection::{OutputConnection, Source};
use flowcore::model::runtime_function::RuntimeFunction;
#[cfg(feature = "debugger")]
use crate::block::Block;
#[cfg(feature = "debugger")]
use crate::debug_command::DebugCommand;
#[cfg(feature = "debugger")]
use crate::debugger::Debugger;
#[cfg(feature = "debugger")]
use crate::run_state::{RunState, State};
#[cfg(feature = "debugger")]
use crate::server::DebugServer;
use super::Job;
#[derive(Debug)]
struct TestImpl {}
impl Implementation for TestImpl {
fn run(&self, _inputs: &[Value]) -> Result<(Option<Value>, RunAgain)> {
unimplemented!()
}
}
fn test_impl() -> Arc<dyn Implementation> {
Arc::new(TestImpl {})
}
fn test_function_a_to_b_not_init() -> RuntimeFunction {
let connection_to_f1 = OutputConnection::new(
Source::default(),
1,
0,
0,
0,
false,
"/fB".to_string(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&None)],
0,
0,
&[connection_to_f1],
false,
) }
fn test_function_a_to_b() -> RuntimeFunction {
let connection_to_f1 = OutputConnection::new(
Source::default(),
1,
0,
0,
0,
false,
"/fB".to_string(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&Some(Once(json!(1))))],
0,
0,
&[connection_to_f1],
false,
) }
fn test_function_a_init() -> RuntimeFunction {
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&Some(Once(json!(1))))],
0,
0,
&[],
false,
)
}
fn test_function_b_not_init() -> RuntimeFunction {
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fB",
#[cfg(feature = "debugger")]
"/fB",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&None)],
1,
0,
&[],
false,
)
}
fn test_function_b_init() -> RuntimeFunction {
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fB",
#[cfg(feature = "debugger")]
"/fB",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&Some(Once(json!(1))))],
1,
0,
&[],
false,
)
}
fn test_output(source_function_id: usize, destination_function_id: usize) -> Job {
let out_conn = OutputConnection::new(
Source::default(),
destination_function_id,
0,
0,
0,
false,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
Job {
job_id: 1,
function_id: source_function_id,
flow_id: 0,
implementation: test_impl(),
input_set: vec![json!(1)],
result: Ok((Some(json!(1)), true)),
connections: vec![out_conn],
}
}
#[cfg(feature = "debugger")]
struct DummyServer;
#[cfg(feature = "debugger")]
impl DebugServer for DummyServer {
fn start(&mut self) {}
fn job_breakpoint(&mut self, _job: &Job, _function: &RuntimeFunction, _states: Vec<State>) {}
fn block_breakpoint(&mut self, _block: &Block) {}
fn send_breakpoint(&mut self, _: &str, _source_process_id: usize, _output_route: &str, _value: &Value,
_destination_id: usize, _destination_name:&str, _input_name: &str, _input_number: usize) {}
fn job_error(&mut self, _job: &Job) {}
fn job_completed(&mut self, _job: &Job) {}
fn blocks(&mut self, _blocks: Vec<Block>) {}
fn outputs(&mut self, _output: Vec<OutputConnection>) {}
fn input(&mut self, _input: Input) {}
fn function_list(&mut self, _functions: &[RuntimeFunction]) {}
fn function_states(&mut self, _function: RuntimeFunction, _function_states: Vec<State>) {}
fn run_state(&mut self, _run_state: &RunState) {}
fn message(&mut self, _message: String) {}
fn panic(&mut self, _state: &RunState, _error_message: String) {}
fn debugger_exiting(&mut self) {}
fn debugger_resetting(&mut self) {}
fn debugger_error(&mut self, _error: String) {}
fn execution_starting(&mut self) {}
fn execution_ended(&mut self) {}
fn get_command(&mut self, _state: &RunState) -> Result<DebugCommand> {
unimplemented!();
}
}
#[cfg(feature = "debugger")]
fn dummy_debugger(server: &mut dyn DebugServer) -> Debugger {
Debugger::new(server)
}
mod general_run_state_tests {
#[cfg(feature = "debugger")]
use std::collections::HashSet;
#[cfg(feature = "debugger")]
use multimap::MultiMap;
use url::Url;
use flowcore::model::submission::Submission;
use super::super::RunState;
#[test]
fn display_run_state_test() {
let f_a = super::test_function_a_to_b();
let f_b = super::test_function_b_not_init();
let functions = vec![f_a, f_b];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
state.init();
#[cfg(any(feature = "debugger", feature = "metrics"))]
assert_eq!(state.num_functions(), 2);
println!("Run state: {}", state);
}
#[cfg(feature = "metrics")]
#[test]
fn jobs_created_zero_at_init() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&[], submission);
state.init();
assert_eq!(0, state.get_number_of_jobs_created(), "At init jobs() should be 0");
assert_eq!(0, state.number_jobs_ready());
}
#[cfg(feature = "debugger")]
#[test]
fn zero_blocks_at_init() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&[], submission);
state.init();
assert_eq!(
&HashSet::new(),
state.get_blocks(),
"At init get_blocks() should be empty"
);
}
#[cfg(feature = "debugger")]
#[test]
fn zero_running_at_init() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&[], submission);
state.init();
assert_eq!(
&MultiMap::new(),
state.get_running(),
"At init get_running() should be empty"
);
}
#[cfg(feature = "debugger")]
#[test]
fn zero_blocked_at_init() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&[], submission);
state.init();
assert_eq!(
&HashSet::new(),
state.get_blocked(),
"At init get_blocked() should be empty"
);
}
}
mod state_transitions {
use serde_json::json;
use serial_test::serial;
use url::Url;
use flowcore::model::input::Input;
use flowcore::model::input::InputInitializer::{Always, Once};
#[cfg(feature = "metrics")]
use flowcore::model::metrics::Metrics;
use flowcore::model::output_connection::{OutputConnection, Source};
use flowcore::model::output_connection::Source::Output;
use flowcore::model::runtime_function::RuntimeFunction;
use flowcore::model::submission::Submission;
use crate::run_state::test::test_function_b_not_init;
use super::super::Job;
use super::super::RunState;
use super::super::State;
#[test]
fn to_ready_1_on_init() {
let f_a = super::test_function_a_to_b();
let f_b = test_function_b_not_init();
let functions = vec![f_a, f_b];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
state.init();
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
assert_eq!(1, state.number_jobs_ready());
assert!(
state.function_state_is_only(1, State::Waiting),
"f_b should be waiting for input"
);
}
#[test]
fn input_blocker() {
let f_a = super::test_function_a_to_b_not_init();
let f_b = test_function_b_not_init();
let functions = vec![f_a, f_b];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
state.init();
assert!(
state.function_state_is_only(0, State::Waiting),
"f_a should be waiting for input"
);
assert!(
state.function_state_is_only(1, State::Waiting),
"f_b should be waiting for input"
);
#[cfg(feature = "debugger")]
assert!(
state.get_input_blockers(1).contains(&(0, 0)),
"f_b should be waiting for input from f_a"
)
}
#[test]
fn to_ready_2_on_init() {
let f_a = super::test_function_a_to_b();
let f_b = test_function_b_not_init();
let functions = vec![f_a, f_b];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
state.init();
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
}
#[test]
fn to_ready_3_on_init() {
let f_a = super::test_function_a_init();
let functions = vec![f_a];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
state.init();
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
}
#[test]
fn to_blocked_on_init() {
let f_a = super::test_function_a_to_b();
let f_b = super::test_function_b_init();
let functions = vec![f_b, f_a];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
state.init();
assert!(state.function_state_is_only(1, State::Ready), "f_b should be Ready");
assert!(
state.function_state_is_only(0, State::Blocked),
"f_a should be in Blocked state"
);
#[cfg(feature = "debugger")]
assert!(
state.get_output_blockers(0).contains(&(1, 0)),
"f_a should be blocked by f_b, input 0"
);
}
fn test_function_a_not_init() -> RuntimeFunction {
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&None)],
0,
0,
&[],
false,
)
}
#[test]
fn to_waiting_on_init() {
let f_a = test_function_a_not_init();
let functions = vec![f_a];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
state.init();
assert!(state.function_state_is_only(0, State::Waiting), "f_a should be Waiting");
}
#[test]
fn ready_to_running_on_next() {
let f_a = super::test_function_a_init();
let functions = vec![f_a];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
state.init();
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
let job = state.next_job().expect("Couldn't get next job");
assert_eq!(
0, job.function_id,
"next_job() should return function_id = 0"
);
state.start(&job);
assert!(state.function_state_is_only(0, State::Running), "f_a should be Running");
}
#[test]
fn unready_not_to_running_on_next() {
let f_a = test_function_a_not_init();
let functions = vec![f_a];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
state.init();
assert!(state.function_state_is_only(0, State::Waiting), "f_a should be Waiting");
assert!(state.next_job().is_none(), "next_job() should return None");
assert!(state.function_state_is_only(0, State::Waiting), "f_a should be Waiting");
}
#[serial]
#[test]
fn blocked_to_ready_on_done() {
let f_a = super::test_function_a_to_b();
let f_b = super::test_function_b_init();
let functions = vec![f_a, f_b];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(2);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init();
assert!(state.function_state_is_only(1, State::Ready), "f_b should be Ready");
assert!(
state.function_state_is_only(0, State::Blocked),
"f_a should be in Blocked state, by f_b"
);
let job = state.next_job().expect("Couldn't get next job");
assert_eq!(
1, job.function_id,
"next() should return function_id=1 (f_b) for running"
);
state.start(&job);
assert!(state.function_state_is_only(1, State::Running), "f_b should be Running");
let output = super::test_output(1, 0);
state.complete_job(
#[cfg(feature = "metrics")]
&mut metrics,
&output,
#[cfg(feature = "debugger")]
&mut debugger,
);
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
}
#[test]
#[serial]
fn output_not_found() {
let f_a = super::test_function_a_to_b();
let f_b = super::test_function_b_init();
let functions = vec![f_a, f_b];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(2);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init();
assert!(state.function_state_is_only(1, State::Ready), "f_b should be Ready");
assert!(state.function_state_is_only(0, State::Blocked),
"f_a should be in Blocked state, by f_b"
);
let job = state.next_job().expect("Couldn't get next job");
assert_eq!(
1, job.function_id,
"next() should return function_id=1 (f_b) for running"
);
state.start(&job);
assert!(state.function_state_is_only(1, State::Running), "f_b should be Running");
let mut output = super::test_output(1, 0);
let no_such_out_conn = OutputConnection::new(
Output("/fake".into()),
0,
0,
0,
0,
false,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
output.connections = vec![no_such_out_conn];
state.complete_job(
#[cfg(feature = "metrics")]
&mut metrics,
&output,
#[cfg(feature = "debugger")]
&mut debugger,
);
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
}
fn test_job() -> Job {
Job {
job_id: 1,
function_id: 0,
flow_id: 0,
implementation: super::test_impl(),
input_set: vec![json!(1)],
result: Ok((None, true)),
connections: vec![],
}
}
#[test]
#[serial]
fn running_to_ready_on_done() {
let f_a = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&Some(Always(json!(1))))],
0,
0,
&[],
false,
);
let functions = vec![f_a];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(1);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init();
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
let job = state.next_job().expect("Couldn't get next job");
assert_eq!(0, job.function_id, "next() should return function_id = 0");
state.start(&job);
assert!(state.function_state_is_only(0, State::Running), "f_a should be Running");
let job = test_job();
state.complete_job(
#[cfg(feature = "metrics")]
&mut metrics,
&job,
#[cfg(feature = "debugger")]
&mut debugger,
);
assert!(
state.function_state_is_only(0, State::Ready),
"f_a should be Ready again"
);
}
#[test]
#[serial]
fn running_to_waiting_on_done() {
let f_a = super::test_function_a_init();
let functions = vec![f_a];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(1);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init();
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
let job = state.next_job().expect("Couldn't get next job");
assert_eq!(0, job.function_id, "next() should return function_id = 0");
state.start(&job);
assert!(state.function_state_is_only(0, State::Running), "f_a should be Running");
let job = test_job();
state.complete_job(
#[cfg(feature = "metrics")]
&mut metrics,
&job,
#[cfg(feature = "debugger")]
&mut debugger,
);
assert!(state.function_state_is_only(0, State::Waiting),
"f_a should be Waiting again"
);
}
#[test]
#[serial]
fn running_to_blocked_on_done() {
let out_conn = OutputConnection::new(
Source::default(),
1,
0,
0,
0,
false,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
let f_a = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&Some(Always(json!(1))))],
0,
0,
&[out_conn],
false,
); let f_b = test_function_b_not_init();
let functions = vec![f_a, f_b];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(1);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init();
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
let job = state.next_job().expect("Couldn't get next job");
assert_eq!(
0, job.function_id,
"next() should return function_id=0 (f_a) for running"
);
state.start(&job);
assert!(state.function_state_is_only(0, State::Running), "f_a should be Running");
let output = super::test_output(0, 1);
state.complete_job(
#[cfg(feature = "metrics")]
&mut metrics,
&output,
#[cfg(feature = "debugger")]
&mut debugger,
);
assert!(state.function_state_is_only(0, State::Blocked), "f_a should be Blocked");
}
#[test]
#[serial]
fn waiting_to_ready_on_input() {
let f_a = test_function_a_not_init();
let out_conn = OutputConnection::new(
Source::default(),
0,
0,
0,
0,
false,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
let f_b = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fB",
#[cfg(feature = "debugger")]
"/fB",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&None)],
1,
0,
&[out_conn],
false,
);
let functions = vec![f_a, f_b];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(1);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init();
assert!(state.function_state_is_only(0, State::Waiting), "f_a should be Waiting");
let output = super::test_output(1, 0);
state.complete_job(
#[cfg(feature = "metrics")]
&mut metrics,
&output,
#[cfg(feature = "debugger")]
&mut debugger,
);
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
}
#[test]
#[serial]
fn waiting_to_blocked_on_input() {
let f_a = super::test_function_a_to_b_not_init();
let connection_to_f0 = OutputConnection::new(
Source::default(),
0,
0,
0,
0,
false,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
let f_b = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fB",
#[cfg(feature = "debugger")]
"/fB",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&Some(Always(json!(1))))],
1,
0,
&[connection_to_f0],
false,
);
let functions = vec![f_a, f_b];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(1);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init();
assert!(state.function_state_is_only(1, State::Ready), "f_b should be Ready");
assert!(
state.function_state_is_only(0, State::Waiting),
"f_a should be in Waiting"
);
assert_eq!(
state.next_job().expect("Couldn't get next job").function_id,
1,
"next() should return function_id=1 (f_b) for running"
);
let output = super::test_output(1, 0);
state.complete_job(
#[cfg(feature = "metrics")]
&mut metrics,
&output,
#[cfg(feature = "debugger")]
&mut debugger,
);
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
}
#[test]
#[serial]
fn not_block_on_self() {
let connection_to_0 = OutputConnection::new(
Source::default(),
0,
0,
0,
0,
false,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
let connection_to_1 = OutputConnection::new(
Source::default(),
1,
0,
0,
0,
false,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
let f_a = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"fA",
#[cfg(feature = "debugger")]
"/fA",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&Some(Once(json!(1))))],
0,
0,
&[
connection_to_0, connection_to_1, ],
false,
);
let f_b = test_function_b_not_init();
let functions = vec![f_a, f_b]; let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(2);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init();
assert!(state.function_state_is_only(0, State::Ready), "f_a should be Ready");
assert!(state.function_state_is_only(1, State::Waiting),
"f_b should be in Waiting"
);
let mut job = state.next_job().expect("Couldn't get next job");
assert_eq!(job.function_id, 0, "Expected job with function_id=0");
job.result = Ok((Some(json!(1)), true));
state.complete_job(
#[cfg(feature = "metrics")]
&mut metrics,
&job,
#[cfg(feature = "debugger")]
&mut debugger,
);
assert!(state.function_state_is_only(1, State::Ready), "f_b should be Ready");
assert!(state.function_state_is_only(0, State::Blocked),
"f_a should be Blocked on f_b"
);
let job = state.next_job().expect("Couldn't get next job");
assert_eq!(
job.function_id, 1,
"next() should return function_id=1 (f_b) for running"
);
state.complete_job(
#[cfg(feature = "metrics")]
&mut metrics,
&job,
#[cfg(feature = "debugger")]
&mut debugger,
);
let job = state.next_job().expect("Couldn't get next job");
assert_eq!(
job.function_id, 0,
"next() should return function_id=0 (f_a) for running"
);
state.complete_job(
#[cfg(feature = "metrics")]
&mut metrics,
&job,
#[cfg(feature = "debugger")]
&mut debugger,
);
}
}
mod functional_tests {
use serde_json::json;
use serial_test::serial;
use url::Url;
use flowcore::model::input::Input;
#[cfg(feature = "metrics")]
use flowcore::model::metrics::Metrics;
use flowcore::model::output_connection::{OutputConnection, Source};
use flowcore::model::runtime_function::RuntimeFunction;
use flowcore::model::submission::Submission;
use super::super::Job;
use super::super::RunState;
use super::super::State;
fn test_functions() -> Vec<RuntimeFunction> {
let out_conn1 = OutputConnection::new(
Source::default(),
1,
0,
0,
0,
false,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
let out_conn2 = OutputConnection::new(
Source::default(),
2,
0,
0,
0,
false,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
let p0 = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"p0",
#[cfg(feature = "debugger")]
"/p0",
"file://fake/test/p0",
vec![], 0,
0,
&[out_conn1, out_conn2], false,
); let p1 = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"p1",
#[cfg(feature = "debugger")]
"/p1",
"file://fake/test/p1",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&None)], 1,
0,
&[],
false,
);
let p2 = RuntimeFunction::new(
#[cfg(feature = "debugger")]
"p2",
#[cfg(feature = "debugger")]
"/p2",
"file://fake/test/p2",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&None)], 2,
0,
&[],
false,
);
vec![p0, p1, p2]
}
#[test]
#[serial]
fn blocked_works() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&test_functions(), submission);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.create_block(
0,
1,
0,
0,
0,
0,
#[cfg(feature = "debugger")]
&mut debugger,
);
assert!(state.blocked_sending(0));
}
#[test]
fn get_works() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let state = RunState::new(&test_functions(), submission);
let got = state.get_function(1);
assert_eq!(got.id(), 1)
}
#[test]
fn no_next_if_none_ready() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&test_functions(), submission);
assert!(state.next_job().is_none());
}
#[test]
fn next_works() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&test_functions(), submission);
state.make_ready_or_blocked(0, 0);
assert_eq!(
state.next_job().expect("Couldn't get next job").function_id,
0
);
}
#[test]
fn inputs_ready_makes_ready() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&test_functions(), submission);
state.make_ready_or_blocked(0, 0);
assert_eq!(
state.next_job().expect("Couldn't get next job").function_id,
0
);
}
#[test]
#[serial]
fn blocked_is_not_ready() {
let submission = Submission::new(
&Url::parse("file://temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&test_functions(), submission);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.create_block(
0,
1,
0,
0,
0,
0,
#[cfg(feature = "debugger")]
&mut debugger,
);
state.make_ready_or_blocked(0, 0);
assert!(state.next_job().is_none());
}
#[test]
#[serial]
fn unblocking_makes_ready() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&test_functions(), submission);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.create_block(
0,
1,
0,
0,
0,
0,
#[cfg(feature = "debugger")]
&mut debugger,
);
state.make_ready_or_blocked(0, 0);
assert!(state.next_job().is_none());
state.unblock_internal_flow_senders(0, 1, 0);
assert_eq!(
state.next_job().expect("Couldn't get next job").function_id,
0
);
}
#[test]
#[serial]
fn unblocking_doubly_blocked_functions_not_ready() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&test_functions(), submission);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.create_block(
0,
1,
0,
0,
0,
0,
#[cfg(feature = "debugger")]
&mut debugger,
);
state.create_block(
0,
2,
0,
0,
0,
0,
#[cfg(feature = "debugger")]
&mut debugger,
);
state.make_ready_or_blocked(0, 0);
assert!(state.next_job().is_none());
state.unblock_internal_flow_senders(0, 1, 0);
assert!(state.next_job().is_none());
}
#[test]
fn wont_return_too_many_jobs() {
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&test_functions(), submission);
state.make_ready_or_blocked(0, 0);
state.make_ready_or_blocked(1, 0);
let job = state.next_job().expect("Couldn't get next job");
assert_eq!(0, job.function_id);
state.start(&job);
assert!(state.next_job().is_none());
}
#[test]
#[serial]
fn pure_function_no_destinations() {
let f_a = super::test_function_a_init();
let functions = vec![f_a];
let submission = Submission::new(
&Url::parse("file:///temp/fake.toml").expect("Could not create Url"),
1,
#[cfg(feature = "debugger")]
true,
);
let mut state = RunState::new(&functions, submission);
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(1);
#[cfg(feature = "debugger")]
let mut server = super::DummyServer{};
#[cfg(feature = "debugger")]
let mut debugger = super::dummy_debugger(&mut server);
state.init();
assert_eq!(
state.next_job().expect("Couldn't get next job").function_id,
0
);
let job = Job {
job_id: 0,
function_id: 0,
flow_id: 0,
implementation: super::test_impl(),
input_set: vec![json!(1)],
result: Ok((Some(json!(1)), true)),
connections: vec![],
};
state.complete_job(
#[cfg(feature = "metrics")]
&mut metrics,
&job,
#[cfg(feature = "debugger")]
&mut debugger,
);
assert!(state.function_state_is_only(0, State::Waiting), "f_a should be Waiting");
}
}
mod misc {
use serde_json::{json, Value};
use flowcore::model::input::Input;
use flowcore::model::output_connection::{OutputConnection, Source};
use flowcore::model::runtime_function::RuntimeFunction;
use super::super::RunState;
#[test]
fn test_array_order_0() {
let value = json!(1);
assert_eq!(RunState::array_order(&value), 0);
}
#[test]
fn test_array_order_1_empty_array() {
let value = json!([]);
assert_eq!(RunState::array_order(&value), 1);
}
#[test]
fn test_array_order_1() {
let value = json!([1, 2, 3]);
assert_eq!(RunState::array_order(&value), 1);
}
#[test]
fn test_array_order_2() {
let value = json!([[1, 2, 3], [2, 3, 4]]);
assert_eq!(RunState::array_order(&value), 2);
}
fn test_function() -> RuntimeFunction {
RuntimeFunction::new(
#[cfg(feature = "debugger")]
"test",
#[cfg(feature = "debugger")]
"/test",
"file://fake/test",
vec![Input::new(
#[cfg(feature = "debugger")] "",
&None)],
0,
0,
&[],
false,
)
}
#[test]
fn test_sending() {
#[derive(Debug)]
struct TestCase {
value: Value,
destination_is_generic: bool,
destination_array_order: i32,
value_expected: Value,
}
let test_cases = vec![
TestCase {
value: json!(1),
destination_is_generic: true,
destination_array_order: 0,
value_expected: json!(1),
},
TestCase {
value: json!([1]),
destination_is_generic: true,
destination_array_order: 0,
value_expected: json!([1]),
},
TestCase {
value: json!([[1, 2], [3, 4]]),
destination_is_generic: true,
destination_array_order: 0,
value_expected: json!([[1, 2], [3, 4]]),
},
TestCase {
value: json!(1),
destination_is_generic: false,
destination_array_order: 0,
value_expected: json!(1),
},
TestCase {
value: json!([1, 2]),
destination_is_generic: false,
destination_array_order: 0,
value_expected: json!(1),
},
TestCase {
value: json!([[1, 2], [3, 4]]),
destination_is_generic: false,
destination_array_order: 0,
value_expected: json!(1),
},
TestCase {
value: json!(1),
destination_is_generic: false,
destination_array_order: 1,
value_expected: json!([1]),
},
TestCase {
value: json!([1, 2]),
destination_is_generic: false,
destination_array_order: 1,
value_expected: json!([1, 2]),
},
TestCase {
value: json!([[1, 2], [3, 4]]),
destination_is_generic: false,
destination_array_order: 1,
value_expected: json!([1, 2]),
},
TestCase {
value: json!(1),
destination_is_generic: false,
destination_array_order: 2,
value_expected: json!([[1]]),
},
TestCase {
value: json!([1, 2]),
destination_is_generic: false,
destination_array_order: 2,
value_expected: json!([[1, 2]]),
},
TestCase {
value: json!([[1, 2], [3, 4]]),
destination_is_generic: false,
destination_array_order: 2,
value_expected: json!([[1, 2], [3, 4]]),
},
];
for test_case in test_cases {
let mut function = test_function();
let destination = OutputConnection::new(
Source::default(),
0,
0,
0,
test_case.destination_array_order,
test_case.destination_is_generic,
String::default(),
#[cfg(feature = "debugger")]
String::default(),
0,
);
assert!(RunState::type_convert_and_send(&mut function,
&destination, &test_case.value));
assert_eq!(
test_case.value_expected,
function
.take_input_set()
.expect("Couldn't get input set")
.remove(0)
);
}
}
}
}