flowrlib/
coordinator.rs

1#[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
2use std::marker::PhantomData;
3
4use log::{debug, error, trace};
5
6use flowcore::errors::*;
7#[cfg(feature = "metrics")]
8use flowcore::model::metrics::Metrics;
9use flowcore::model::submission::Submission;
10
11#[cfg(feature = "debugger")]
12use crate::debugger::Debugger;
13use crate::dispatcher::Dispatcher;
14use crate::job::Job;
15use crate::run_state::RunState;
16#[cfg(feature = "debugger")]
17use crate::protocols::DebuggerProtocol;
18#[cfg(feature = "submission")]
19use crate::protocols::SubmissionProtocol;
20
21/// The `Coordinator` is responsible for coordinating the dispatching of jobs (consisting
22/// of a set of Input values and an Implementation of a Function) for execution,
23/// gathering the resulting Outputs and distributing output values to other connected function's
24/// Inputs.
25///
26/// It accepts Flows to be executed in the form of a `Submission` struct that has the required
27/// information to execute the flow.
28pub struct Coordinator<'a> {
29    /// A `Server` to communicate with clients
30    #[cfg(feature = "submission")]
31    submitter: &'a mut dyn SubmissionProtocol,
32    /// Dispatcher todispatch jobs for execution
33    dispatcher: Dispatcher,
34    #[cfg(feature = "debugger")]
35    /// A `Debugger` to communicate with debug clients
36    debugger: Debugger<'a>,
37    #[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
38    _data: PhantomData<&'a Dispatcher>
39}
40
41impl<'a> Coordinator<'a> {
42    /// Create a new `coordinator` with `num_threads` local executor threads
43    pub fn new(
44        dispatcher: Dispatcher,
45        #[cfg(feature = "submission")] submitter: &'a mut dyn SubmissionProtocol,
46        #[cfg(feature = "debugger")] debug_server: &'a mut dyn DebuggerProtocol
47    ) -> Result<Self> {
48        Ok(Coordinator {
49            #[cfg(feature = "submission")]
50            submitter,
51            dispatcher,
52            #[cfg(feature = "debugger")]
53            debugger: Debugger::new(debug_server),
54            #[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
55            _data: PhantomData
56        })
57    }
58
59    /// Enter a loop - waiting for a submission from the client, or disconnection of the client
60    #[cfg(feature = "submission")]
61    pub fn submission_loop(
62        &mut self,
63        loop_forever: bool,
64    ) -> Result<()> {
65        while let Some(submission) = self.submitter.wait_for_submission()? {
66            let _ = self.execute_flow(submission);
67            if !loop_forever {
68                break;
69            }
70        }
71
72        self.submitter.coordinator_is_exiting(Ok(()))
73    }
74
75    //noinspection RsReassignImmutable
76    /// Execute a flow by looping while there are jobs to be processed in an inner loop.
77    /// There is an outer loop for the case when you are using the debugger, to allow entering
78    /// the debugger when the flow ends and at any point resetting all the state and starting
79    /// execution again from the initial state
80    #[allow(unused_variables, unused_assignments, unused_mut)]
81    pub fn execute_flow(&mut self,
82                        submission: Submission,) -> Result<()> {
83        self.dispatcher.set_results_timeout(submission.job_timeout)?;
84        let mut state = RunState::new(submission);
85
86        #[cfg(feature = "metrics")]
87        let mut metrics = Metrics::new(state.num_functions());
88
89        #[cfg(feature = "debugger")]
90        if state.submission.debug {
91            self.debugger.start();
92        }
93
94        let mut restart = false;
95        let mut display_next_output = false;
96
97        // This outer loop is just a way of restarting execution from scratch if the debugger requests it
98        'flow_execution:
99        loop {
100            state.init()?;
101            #[cfg(feature = "metrics")]
102            metrics.reset();
103
104            // If debugging - then prior to starting execution - enter the debugger
105            #[cfg(feature = "debugger")]
106            if state.submission.debug {
107                (display_next_output, restart) = self.debugger.wait_for_command(&mut state)?;
108            }
109
110            #[cfg(feature = "submission")]
111            self.submitter.flow_execution_starting()?;
112
113            'jobs: loop {
114                trace!("{}", state);
115                #[cfg(feature = "debugger")]
116                if state.submission.debug && self.submitter.should_enter_debugger()? {
117                    (display_next_output, restart) = self.debugger.wait_for_command(&mut state)?;
118                    if restart {
119                        break 'jobs;
120                    }
121                }
122
123                (display_next_output, restart) = self.dispatch_jobs(
124                    &mut state,
125                    #[cfg(feature = "metrics")]
126                    &mut metrics,
127                )?;
128
129                if restart {
130                    break 'jobs;
131                }
132
133                if state.number_jobs_running() > 0 {
134                    match self.dispatcher.get_next_result() {
135                        Ok(result) => {
136                            let job;
137
138                            (display_next_output, restart, job) = state.retire_job(
139                                #[cfg(feature = "metrics")]
140                                    &mut metrics,
141                                result,
142                                #[cfg(feature = "debugger")]
143                                    &mut self.debugger,
144                            )?;
145                            #[cfg(feature = "debugger")]
146                            if display_next_output {
147                                (display_next_output, restart) =
148                                    self.debugger.job_done(&mut state, &job)?;
149                                if restart {
150                                    break 'jobs;
151                                }
152                            }
153                        }
154
155                        Err(err) => {
156                            error!("\t{}", err.to_string());
157                            #[cfg(feature = "debugger")]
158                            if state.submission.debug {
159                                (display_next_output, restart) = self.debugger.error(
160                                    &mut state, err.to_string())?;
161                                if restart {
162                                    break 'jobs;
163                                }
164                            }
165                        }
166                    }
167                }
168
169                if state.number_jobs_running() == 0 && state.number_jobs_ready() == 0 {
170                    // execution is done - but not returning here allows us to go into debugger
171                    // at the end of execution, inspect state and possibly reset and rerun
172                    break 'jobs;
173                }
174            } // 'jobs loop end
175
176            // flow execution has ended
177            #[allow(clippy::collapsible_if)]
178            #[cfg(feature = "debugger")]
179            if !restart {
180                {
181                    // If debugging then enter the debugger for a final time before ending flow execution
182                    if state.submission.debug {
183                        (display_next_output, restart) = self.debugger.execution_ended(&mut state)?;
184                    }
185                }
186            }
187
188            // if no debugger then end execution always
189            // if a debugger - then end execution if the debugger has not requested a restart
190            if !restart {
191                break 'flow_execution;
192            }
193        }
194
195        #[cfg(feature = "metrics")]
196        metrics.set_jobs_created(state.get_number_of_jobs_created());
197        #[cfg(all(feature = "submission", feature = "metrics"))]
198        self.submitter.flow_execution_ended(&state, metrics)?;
199        #[cfg(all(feature = "submission", not(feature = "metrics")))]
200        self.submitter.flow_execution_ended(&state)?;
201
202        Ok(()) // Normal flow completion exit
203    }
204
205    // Dispatch as many jobs as possible for parallel execution.
206    // Return if the debugger is requesting (display output, restart)
207    fn dispatch_jobs(
208        &mut self,
209        state: &mut RunState,
210        #[cfg(feature = "metrics")] metrics: &mut Metrics,
211    ) -> Result<(bool, bool)> {
212        let mut display_output = false;
213        let mut restart = false;
214
215        while let Some(job) = state.get_job() {
216            match self.dispatch_a_job(
217                job.clone(),
218                state,
219                #[cfg(feature = "metrics")]
220                metrics,
221            ) {
222                Ok((display, rest)) => {
223                    display_output = display;
224                    restart = rest;
225                }
226                Err(err) => {
227                    error!("Error sending on 'job_tx': {}", err.to_string());
228                    debug!("{}", state);
229
230                    #[cfg(feature = "debugger")]
231                    return self.debugger.job_error(state, &job); // TODO avoid above clone() ?
232                }
233            }
234        }
235
236        Ok((display_output, restart))
237    }
238
239    // Dispatch a job for execution
240    fn dispatch_a_job(
241        &mut self,
242        job: Job,
243        state: &mut RunState,
244        #[cfg(feature = "metrics")] metrics: &mut Metrics,
245    ) -> Result<(bool, bool)> {
246        #[cfg(not(feature = "debugger"))]
247        let debug_options = (false, false);
248
249        #[cfg(feature = "metrics")]
250        metrics.track_max_jobs(state.number_jobs_running());
251
252        #[cfg(feature = "debugger")]
253        let debug_options = self
254            .debugger
255            .check_prior_to_job(state, &job)?;
256
257        self.dispatcher.send_job_for_execution(&job.payload)?;
258
259        state.start_job(job)?;
260
261        Ok(debug_options)
262    }
263}