1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
use log::error;
use crate::run_state::{RunState, State};
#[cfg(debug_assertions)]
fn runtime_error(state: &RunState, job_id: usize, message: &str, file: &str, line: u32) {
error!(
"Job #{}: Runtime error: at file: {}, line: {}\n\t\t{}",
job_id, file, line, message
);
error!("Job #{}: Error State - {}", job_id, state);
}
/// Check a number of "invariants" i.e. unbreakable rules about the state.
/// If one is found to be broken, report a runtime error explaining it, which may
/// trigger entry into the debugger.
#[cfg(debug_assertions)]
pub(crate) fn check_invariants(state: &RunState, job_id: usize) {
let functions = state.get_functions();
for function in functions {
let function_states = &state.get_function_states(function.id());
for function_state in function_states {
match function_state {
State::Ready => {
if !state.get_busy_flows().contains_key(&function.get_flow_id()) {
return runtime_error(
state,
job_id,
&format!(
"Function #{} is Ready, but Flow #{} is not busy",
function.id(),
function.get_flow_id()
),
file!(),
line!(),
);
}
}
State::Running => {
if !state.get_busy_flows().contains_key(&function.get_flow_id()) {
return runtime_error(
state,
job_id,
&format!(
"Function #{} is Running, but Flow #{} is not busy",
function.id(),
function.get_flow_id()
),
file!(),
line!(),
);
}
}
State::Blocked => {
if !state.blocked_sending(function.id()) {
return runtime_error(
state,
job_id,
&format!(
"Function #{} is in Blocked state, but no block exists",
function.id()
),
file!(),
line!(),
);
}
}
State::Waiting => {},
State::Completed => {
// If completed, should not be in any of the other states
if function_states.len() > 1
{
return runtime_error(
state,
job_id,
&format!(
"Function #{} has Completed, but also appears as Ready or Blocked or Running",
function.id(),
),
file!(),
line!(),
);
}
},
}
}
// State::Running is because functions with initializers auto-refill
// So they will show as inputs full, but not Ready or Blocked
if (!function.inputs().is_empty())
&& function.can_produce_output()
&& !(function_states.contains(&State::Ready)
|| function_states.contains(&State::Blocked)
|| function_states.contains(&State::Running))
{
return runtime_error(
state,
job_id,
&format!(
"Function #{} inputs have data, but it is not Ready or Blocked or Running",
function.id()),
file!(),
line!(),
);
}
}
// Check block invariants
for block in state.get_blocks() {
// function should not be blocked on itself
if block.blocked_function_id == block.blocking_function_id {
return runtime_error(
state,
job_id,
&format!(
"Block {} has same Function id as blocked and blocking",
block),
file!(),
line!(),
);
}
// For each block on a destination function, then either that input should be full or
// the function should be running in parallel with the one that just completed
// or it's flow should be busy and there should be a pending unblock on it
if let Some(function) = functions.get(block.blocking_function_id) {
if !(function.input_count(block.blocking_io_number) > 0
|| (state.get_busy_flows().contains_key(&block.blocking_flow_id)
&& state.get_pending_unblocks().contains_key(&block.blocking_flow_id)))
{
return runtime_error(
state,
job_id,
&format!("Block {} exists for function #{}, but Function #{}:{} input is not full",
block, block.blocking_function_id, block.blocking_function_id, block.blocking_io_number),
file!(), line!());
}
}
}
// Check pending unblock invariants
for pending_unblock_flow_id in state.get_pending_unblocks().keys() {
// flow it's in must be busy
if !state.get_busy_flows().contains_key(pending_unblock_flow_id) {
return runtime_error(
state,
job_id,
&format!(
"Pending Unblock exists for Flow #{}, but it is not busy",
pending_unblock_flow_id),
file!(),
line!(),
);
}
}
// Check busy flow invariants
for (flow_id, function_id) in state.get_busy_flows().iter() {
if !state.function_states_includes(*function_id, State::Ready) &&
!state.function_states_includes(*function_id, State::Running) {
return runtime_error(
state,
job_id,
&format!("Busy flow entry exists for Function #{} in Flow #{} but it's not Ready nor Running",
function_id, flow_id),
file!(), line!());
}
}
}