gears/runtime/
xfrunner.rs1use crate::structure::xflow::*;
2use super::xfstate::*;
3use crate::runtime::dispatcher::*;
4
5#[derive(Debug, PartialEq)]
6pub enum XFlowStatus {
7 Uninitialized,
8 Initialized,
9 Running,
10 Finished,
11 Aborted,
12 TimedOut,
13 InvalidState,
14}
15
16pub struct XFlowRunner<'a> {
17 pub status: XFlowStatus,
18 xflow: &'a XFlowDocument,
19 dispatcher: &'a Dispatcher<'a>,
20 state: XFState,
21 current_node: Option<&'a XFlowNode>,
22 pub output: Option<Vec<XFlowValue>>,
23}
24
25impl<'a> XFlowRunner<'a> {
26 pub fn new(
27 xflow: &'a XFlowDocument,
28 dispatcher: &'a Dispatcher<'a>,
29 input: &'a XFState,
30 ) -> Result<XFlowRunner<'a>, String> {
31
32 let mut state = XFState::default();
33
34 for xvardef in &xflow.body.variables.input {
35 match input.get(&xvardef.name) {
36 Some(xvar) => state.add(xvar),
37 None => {
38 let err = format!(
39 "Missing required xvar in input parameters : {}",
40 xvardef.name
41 );
42 error!("{}", err);
43 return Err(err);
44 }
45 }
46 }
47
48 for xvar in &xflow.body.variables.local {
49 state.add(xvar);
50 }
51
52 match xflow.body.get_entry_node() {
53 Ok(node) => {
54 Ok(XFlowRunner {
55 status: XFlowStatus::Initialized,
56 xflow: xflow,
57 dispatcher: dispatcher,
58 state: state,
59 current_node: Some(node),
60 output: None,
61 })
62 }
63 _ => Err("Unable to init XFlowRunner".to_owned()),
64 }
65 }
66
67 pub fn can_run(&self) -> bool {
68 match self.status {
69 XFlowStatus::Initialized | XFlowStatus::Running => true,
70 _ => false,
71 }
72 }
73
74 pub fn is_initialized(&self) -> bool {
75 self.status == XFlowStatus::Initialized
76 }
77
78 pub fn is_completed(&self) -> bool {
79 self.status == XFlowStatus::Finished || self.status == XFlowStatus::Aborted ||
80 self.status == XFlowStatus::TimedOut || self.status == XFlowStatus::InvalidState
81 }
82
83 pub fn is_completed_ok(&self) -> bool {
84 self.status == XFlowStatus::Finished
85 }
86
87 pub fn run(&mut self) -> Result<(), String> {
88 while self.can_run() {
90 match self.step() {
91 Ok(()) => (),
92 Err(err) => {
93 error!("{}", err);
94 }
95 }
96 }
97 if self.is_completed_ok() {
98 Ok(())
99 } else {
100 let msg = "Unhandled error has occurred while running flow".to_owned();
101 Err(msg)
102 }
103 }
104
105 pub fn step(&mut self) -> Result<(), String> {
106 self.next_node();
107 self.run_node()
108 }
109
110 fn run_node(&mut self) -> Result<(), String> {
111 let st = &mut self.state;
112 if let Some(node) = self.current_node {
113 self.status = XFlowStatus::Running;
114 match self.dispatcher.dispatch(node, st) {
115 Ok(_) => Ok(()),
116 Err(err) => Err(err),
117 }
118 } else {
119 self.status = XFlowStatus::Finished;
120 Ok(())
121 }
122 }
123
124 fn next_node(&mut self) -> () {
125 if let Some(current_node) = self.current_node {
126 let edges = self.xflow.body.get_out_edges(current_node);
127 match edges.len() {
128 0 => {
129 self.status = XFlowStatus::InvalidState;
130 self.current_node = None;
131 }
132 1 => {
133 if let Some(edge) = edges.first() {
134 self.current_node = self.xflow.body.get_node_id(edge.1);
135 } else {
136 self.status = XFlowStatus::InvalidState;
137 self.current_node = None;
138 }
139 }
140 _ => {
141
142 let branches: Vec<&XFlowBranch> = self.xflow
147 .body
148 .get_out_branches(current_node.id)
149 .iter()
150 .filter({
151 |branch| {
152 let xv = self.state.get(&branch.xvar.name);
153 if let Some(xvar) = xv {
154 *xvar == branch.xvar
155 } else {
156 false
157 }
158 }
159 })
160 .cloned()
161 .collect();
162 match branches.len() {
163 1 => {
164 if let Some(branch) = branches.first() {
165 self.current_node = self.xflow.body.get_node_id(branch.edge.1);
166 } else {
167 self.status = XFlowStatus::InvalidState;
168 self.current_node = None;
169 }
170 }
171 _ => {
172 self.status = XFlowStatus::InvalidState;
173 self.current_node = None;
174 }
175 }
176 }
177 }
178 } else {
179 self.status = XFlowStatus::InvalidState;
180 self.current_node = None;
181 }
182 }
183
184 pub fn get_output(self) -> Result<XFState, String> {
185 if self.status == XFlowStatus::Finished {
186 let mut state = XFState::default();
187 for xvar_out in &self.xflow.body.variables.output {
188 if let Some(xvar_local) = self.state.get(&xvar_out.name) {
189 if xvar_local.vtype == xvar_out.vtype {
190 state.add(xvar_local);
191 } else {
192 let msg = format!(
193 "Output var '{}' has a different type than its local \
194 one",
195 &xvar_out.name
196 );
197
198 error!("{}", msg);
199 return Err(msg);
200 }
201 } else {
202 let msg = format!("Required var '{:?}' not found in state!", &xvar_out.name);
203 error!("{}", msg);
204 return Err(msg);
205 }
206 }
207 Ok(state)
208 } else {
209 let msg = "Called before xflow has finished!".to_owned();
210 error!("{}", msg);
211 Err(msg)
212 }
213 }
214}