gears/runtime/
xfrunner.rs

1use crate::structure::xflow::*;
2use super::xfstate::*;
3use crate::runtime::dispatcher::*;
4
5#[derive(Debug, PartialEq)]
6pub enum XFlowStatus {
7    Uninitialized,
8    Initialized,
9    Running,
10    Finished,
11    Aborted,
12    TimedOut,
13    InvalidState,
14}
15
16pub struct XFlowRunner<'a> {
17    pub status: XFlowStatus,
18    xflow: &'a XFlowDocument,
19    dispatcher: &'a Dispatcher<'a>,
20    state: XFState,
21    current_node: Option<&'a XFlowNode>,
22    pub output: Option<Vec<XFlowValue>>,
23}
24
25impl<'a> XFlowRunner<'a> {
26    pub fn new(
27        xflow: &'a XFlowDocument,
28        dispatcher: &'a Dispatcher<'a>,
29        input: &'a XFState,
30    ) -> Result<XFlowRunner<'a>, String> {
31
32        let mut state = XFState::default();
33
34        for xvardef in &xflow.body.variables.input {
35            match input.get(&xvardef.name) {
36                Some(xvar) => state.add(xvar),
37                None => {
38                    let err = format!(
39                        "Missing required xvar in input parameters : {}",
40                        xvardef.name
41                    );
42                    error!("{}", err);
43                    return Err(err);
44                }
45            }
46        }
47
48        for xvar in &xflow.body.variables.local {
49            state.add(xvar);
50        }
51
52        match xflow.body.get_entry_node() {
53            Ok(node) => {
54                Ok(XFlowRunner {
55                    status: XFlowStatus::Initialized,
56                    xflow: xflow,
57                    dispatcher: dispatcher,
58                    state: state,
59                    current_node: Some(node),
60                    output: None,
61                })
62            }
63            _ => Err("Unable to init XFlowRunner".to_owned()),
64        }
65    }
66
67    pub fn can_run(&self) -> bool {
68        match self.status {
69            XFlowStatus::Initialized | XFlowStatus::Running => true,
70            _ => false,
71        }
72    }
73
74    pub fn is_initialized(&self) -> bool {
75        self.status == XFlowStatus::Initialized
76    }
77
78    pub fn is_completed(&self) -> bool {
79        self.status == XFlowStatus::Finished || self.status == XFlowStatus::Aborted ||
80            self.status == XFlowStatus::TimedOut || self.status == XFlowStatus::InvalidState
81    }
82
83    pub fn is_completed_ok(&self) -> bool {
84        self.status == XFlowStatus::Finished
85    }
86
87    pub fn run(&mut self) -> Result<(), String> {
88        // XXX: Clean up error handling
89        while self.can_run() {
90            match self.step() {
91                Ok(()) => (),
92                Err(err) => {
93                    error!("{}", err);
94                }
95            }
96        }
97        if self.is_completed_ok() {
98            Ok(())
99        } else {
100            let msg = "Unhandled error has occurred while running flow".to_owned();
101            Err(msg)
102        }
103    }
104
105    pub fn step(&mut self) -> Result<(), String> {
106        self.next_node();
107        self.run_node()
108    }
109
110    fn run_node(&mut self) -> Result<(), String> {
111        let st = &mut self.state;
112        if let Some(node) = self.current_node {
113            self.status = XFlowStatus::Running;
114            match self.dispatcher.dispatch(node, st) {
115                Ok(_) => Ok(()),
116                Err(err) => Err(err),
117            }
118        } else {
119            self.status = XFlowStatus::Finished;
120            Ok(())
121        }
122    }
123
124    fn next_node(&mut self) -> () {
125        if let Some(current_node) = self.current_node {
126            let edges = self.xflow.body.get_out_edges(current_node);
127            match edges.len() {
128                0 => {
129                    self.status = XFlowStatus::InvalidState;
130                    self.current_node = None;
131                }
132                1 => {
133                    if let Some(edge) = edges.first() {
134                        self.current_node = self.xflow.body.get_node_id(edge.1);
135                    } else {
136                        self.status = XFlowStatus::InvalidState;
137                        self.current_node = None;
138                    }
139                }
140                _ => {
141
142                    // XXX This branch matching is sloppy - it should happen on the node.parameters
143                    // settings
144                    //
145
146                    let branches: Vec<&XFlowBranch> = self.xflow
147                        .body
148                        .get_out_branches(current_node.id)
149                        .iter()
150                        .filter({
151                            |branch| {
152                                let xv = self.state.get(&branch.xvar.name);
153                                if let Some(xvar) = xv {
154                                    *xvar == branch.xvar
155                                } else {
156                                    false
157                                }
158                            }
159                        })
160                        .cloned()
161                        .collect();
162                    match branches.len() {
163                        1 => {
164                            if let Some(branch) = branches.first() {
165                                self.current_node = self.xflow.body.get_node_id(branch.edge.1);
166                            } else {
167                                self.status = XFlowStatus::InvalidState;
168                                self.current_node = None;
169                            }
170                        }
171                        _ => {
172                            self.status = XFlowStatus::InvalidState;
173                            self.current_node = None;
174                        }
175                    }
176                }
177            }
178        } else {
179            self.status = XFlowStatus::InvalidState;
180            self.current_node = None;
181        }
182    }
183
184    pub fn get_output(self) -> Result<XFState, String> {
185        if self.status == XFlowStatus::Finished {
186            let mut state = XFState::default();
187            for xvar_out in &self.xflow.body.variables.output {
188                if let Some(xvar_local) = self.state.get(&xvar_out.name) {
189                    if xvar_local.vtype == xvar_out.vtype {
190                        state.add(xvar_local);
191                    } else {
192                        let msg = format!(
193                            "Output var '{}' has a different type than its local \
194                                           one",
195                            &xvar_out.name
196                        );
197
198                        error!("{}", msg);
199                        return Err(msg);
200                    }
201                } else {
202                    let msg = format!("Required var '{:?}' not found in state!", &xvar_out.name);
203                    error!("{}", msg);
204                    return Err(msg);
205                }
206            }
207            Ok(state)
208        } else {
209            let msg = "Called before xflow has finished!".to_owned();
210            error!("{}", msg);
211            Err(msg)
212        }
213    }
214}