Skip to main content

flowrlib/
coordinator.rs

1#[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
2use std::marker::PhantomData;
3
4use log::{debug, error, info, trace};
5use serde_json::Value;
6
7use flowcore::errors::Result;
8#[cfg(feature = "metrics")]
9use flowcore::model::metrics::Metrics;
10use flowcore::model::submission::Submission;
11use flowcore::RunAgain;
12
13#[cfg(feature = "debugger")]
14use crate::debugger::Debugger;
15#[cfg(feature = "debugger")]
16use crate::debugger_handler::DebuggerHandler;
17use crate::dispatcher::Dispatcher;
18use crate::job::Job;
19use crate::run_state::RunState;
20#[cfg(feature = "submission")]
21use crate::submission_handler::SubmissionHandler;
22
23/// The `Coordinator` coordinates the dispatching of jobs for flow execution.
24///
25/// A Job consists of a set of Input values and an Implementation of a Function for execution,
26/// gathering the resulting Outputs and distributing output values to other connected function's
27/// Inputs.
28///
29/// It accepts Flows to be executed in the form of a `Submission` struct that has the required
30/// information to execute the flow.
31pub struct Coordinator<'a> {
32    /// A `Server` to communicate with clients
33    #[cfg(feature = "submission")]
34    submission_handler: &'a mut dyn SubmissionHandler,
35    /// Dispatcher to dispatch jobs for execution
36    dispatcher: Dispatcher,
37    #[cfg(feature = "debugger")]
38    /// A `Debugger` to communicate with debug clients
39    debugger: Debugger<'a>,
40    #[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
41    _data: PhantomData<&'a Dispatcher>,
42}
43
44impl<'a> Coordinator<'a> {
45    /// Create a new `coordinator` with `num_threads` local executor threads
46    pub fn new(
47        dispatcher: Dispatcher,
48        #[cfg(feature = "submission")] submitter: &'a mut dyn SubmissionHandler,
49        #[cfg(feature = "debugger")] debug_server: &'a mut dyn DebuggerHandler,
50    ) -> Self {
51        Coordinator {
52            #[cfg(feature = "submission")]
53            submission_handler: submitter,
54            dispatcher,
55            #[cfg(feature = "debugger")]
56            debugger: Debugger::new(debug_server),
57            #[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
58            _data: PhantomData,
59        }
60    }
61
62    /// Enter a loop - waiting for a submission from the client, or disconnection of the client
63    ///
64    /// # Errors
65    ///
66    /// Returns an error if there was some issue while waiting for a submission to be sent, usually
67    /// related to some networking issue, busy ports etc.
68    ///
69    #[cfg(feature = "submission")]
70    pub fn submission_loop(&mut self, loop_forever: bool) -> Result<()> {
71        while let Some(submission) = self.submission_handler.wait_for_submission()? {
72            let _ = self.execute_flow(submission);
73            if !loop_forever {
74                break;
75            }
76        }
77
78        self.submission_handler.coordinator_is_exiting(Ok(()))
79    }
80
81    //noinspection RsReassignImmutable
82    /// Execute a flow by looping while there are jobs to be processed in an inner loop.
83    /// There is an outer loop for the case when you are using the debugger, to allow entering
84    /// the debugger when the flow ends and at any point resetting all the state and starting
85    /// execution again from the initial state
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if the execution of the flow did not complete normally.
90    ///
91    #[allow(unused_variables, unused_assignments, unused_mut)]
92    pub fn execute_flow(&mut self, submission: Submission) -> Result<()> {
93        self.dispatcher
94            .set_results_timeout(submission.job_timeout)?;
95        let mut state = RunState::new(submission);
96
97        #[cfg(feature = "metrics")]
98        let mut metrics = Metrics::new(state.num_functions());
99
100        #[cfg(feature = "debugger")]
101        if state.submission.debug_enabled {
102            self.debugger.start();
103        }
104
105        let mut restart = false;
106        let mut display_next_output = false;
107
108        // This outer loop is just a way of restarting execution from scratch if the debugger requests it
109        'flow_execution: loop {
110            state.init()?;
111            #[cfg(feature = "metrics")]
112            metrics.reset();
113
114            // If debugging - then prior to starting execution - enter the debugger
115            #[cfg(feature = "debugger")]
116            if state.submission.debug_enabled {
117                (display_next_output, restart) = self.debugger.wait_for_command(&mut state)?;
118            }
119
120            #[cfg(feature = "submission")]
121            self.submission_handler.flow_execution_starting()?;
122
123            'jobs: loop {
124                trace!("{state}");
125
126                #[cfg(feature = "submission")]
127                if self.submission_handler.should_stop()? {
128                    break 'flow_execution;
129                }
130
131                #[cfg(feature = "debugger")]
132                if state.submission.debug_enabled
133                    && self.submission_handler.should_enter_debugger()?
134                {
135                    (display_next_output, restart) = self.debugger.wait_for_command(&mut state)?;
136                    if restart {
137                        break 'jobs;
138                    }
139                }
140
141                (display_next_output, restart) = self.dispatch_jobs(
142                    &mut state,
143                    #[cfg(feature = "metrics")]
144                    &mut metrics,
145                )?;
146
147                if restart {
148                    break 'jobs;
149                }
150
151                (display_next_output, restart) = self.retire_jobs(
152                    &mut state,
153                    #[cfg(feature = "metrics")]
154                    &mut metrics,
155                )?;
156
157                #[cfg(feature = "submission")]
158                self.submission_handler
159                    .jobs_created(state.get_number_of_jobs_created());
160
161                if restart {
162                    break 'jobs;
163                }
164
165                if state.number_jobs_running() == 0 && state.number_jobs_ready() == 0 {
166                    // execution is done - but not returning here allows us to go into debugger
167                    // at the end of execution, inspect state and possibly reset and rerun
168                    break 'jobs;
169                }
170            } // jobs loop end
171
172            // flow execution has ended
173            #[allow(clippy::collapsible_if)]
174            #[cfg(feature = "debugger")]
175            if !restart {
176                {
177                    // If debugging then enter the debugger for a final time before ending flow execution
178                    if state.submission.debug_enabled {
179                        (display_next_output, restart) =
180                            self.debugger.execution_ended(&mut state)?;
181                    }
182                }
183            }
184
185            // if no debugger then end execution always
186            // if a debugger - then end execution if the debugger has not requested a restart
187            if !restart {
188                break 'flow_execution;
189            }
190        }
191
192        #[cfg(feature = "metrics")]
193        metrics.stop_timer();
194        #[cfg(feature = "metrics")]
195        metrics.set_jobs_created(state.get_number_of_jobs_created());
196        #[cfg(all(feature = "submission", feature = "metrics"))]
197        self.submission_handler
198            .flow_execution_ended(&state, metrics)?;
199        #[cfg(all(feature = "submission", not(feature = "metrics")))]
200        self.submitter.flow_execution_ended(&state)?;
201
202        Ok(()) // Normal flow completion exit
203    }
204
205    // Get a result back from an executor
206    #[allow(clippy::type_complexity)]
207    fn get_result(
208        &mut self,
209        state: &RunState,
210    ) -> Result<Option<(usize, Result<(Option<Value>, RunAgain)>)>> {
211        if let Ok(result) = self.dispatcher.get_next_result(false) {
212            return Ok(Some(result));
213        }
214
215        if state.number_jobs_ready() > 0 {
216            return Ok(None);
217        }
218
219        match self.dispatcher.get_next_result(true) {
220            Ok(result) => Ok(Some(result)),
221            Err(e) => Err(e),
222        }
223    }
224
225    // Retire as many jobs as possible, based on returned results.
226    // NOTE: This will block waiting for the last pending result
227    fn retire_jobs(
228        &mut self,
229        state: &mut RunState,
230        #[cfg(feature = "metrics")] metrics: &mut Metrics,
231    ) -> Result<(bool, bool)> {
232        let mut display_next_output = false;
233        let mut restart = false;
234
235        if state.number_jobs_running() > 0 {
236            match self.get_result(state) {
237                Ok(Some(result)) => {
238                    let job;
239
240                    (display_next_output, restart, job) = state.retire_a_job(
241                        #[cfg(feature = "metrics")]
242                        metrics,
243                        result,
244                        #[cfg(feature = "debugger")]
245                        &mut self.debugger,
246                    )?;
247                    #[cfg(feature = "debugger")]
248                    if display_next_output {
249                        (display_next_output, restart) = self.debugger.job_done(state, &job);
250                        if restart {
251                            return Ok((display_next_output, restart));
252                        }
253                    }
254                }
255
256                Ok(None) => {
257                    info!(
258                        "No result was immediately available, but jobs are ready to be dispatched.\
259                     So coordinator avoided blocking for result. Will be received next time around"
260                    );
261                }
262
263                Err(err) => {
264                    error!("\t{err}");
265                    #[cfg(feature = "debugger")]
266                    if state.submission.debug_enabled {
267                        return self.debugger.error(state, err.to_string());
268                    }
269                    return Ok((display_next_output, restart));
270                }
271            }
272        }
273
274        Ok((display_next_output, restart))
275    }
276
277    // Dispatch as many jobs as possible for parallel execution.
278    // Return if the debugger is requesting (display output, restart)
279    fn dispatch_jobs(
280        &mut self,
281        state: &mut RunState,
282        #[cfg(feature = "metrics")] metrics: &mut Metrics,
283    ) -> Result<(bool, bool)> {
284        let mut display_next_output = false;
285        let mut restart = false;
286
287        while let Some(job) = state.get_next_job() {
288            match self.dispatch_a_job(
289                job.clone(),
290                state,
291                #[cfg(feature = "metrics")]
292                metrics,
293            ) {
294                Ok((display, rest)) => {
295                    display_next_output = display;
296                    restart = rest;
297                }
298                Err(err) => {
299                    error!("Error sending on 'job_tx': {err}");
300                    debug!("{state}");
301
302                    #[cfg(feature = "debugger")]
303                    return self.debugger.job_error(state, &job); // TODO avoid above clone() ?
304                }
305            }
306        }
307
308        Ok((display_next_output, restart))
309    }
310
311    // Dispatch a job for execution
312    fn dispatch_a_job(
313        &mut self,
314        job: Job,
315        state: &mut RunState,
316        #[cfg(feature = "metrics")] metrics: &mut Metrics,
317    ) -> Result<(bool, bool)> {
318        #[cfg(not(feature = "debugger"))]
319        let debug_options = (false, false);
320
321        #[cfg(feature = "debugger")]
322        let debug_options = self.debugger.check_prior_to_job(state, &job)?;
323
324        self.dispatcher.send_job_for_execution(&job.payload)?;
325
326        state.start_job(job);
327
328        #[cfg(feature = "metrics")]
329        metrics.track_max_jobs(state.number_jobs_running());
330
331        Ok(debug_options)
332    }
333}
334
335#[cfg(test)]
336#[allow(clippy::unwrap_used, clippy::expect_used)]
337mod test {
338    use std::time::Duration;
339
340    use portpicker::pick_unused_port;
341    use serial_test::serial;
342
343    use flowcore::model::flow_manifest::FlowManifest;
344    use flowcore::model::input::Input;
345    use flowcore::model::metadata::MetaData;
346    #[cfg(feature = "metrics")]
347    use flowcore::model::metrics::Metrics;
348    use flowcore::model::output_connection::OutputConnection;
349    use flowcore::model::runtime_function::RuntimeFunction;
350    use flowcore::model::submission::Submission;
351
352    #[cfg(feature = "submission")]
353    use crate::submission_handler::SubmissionHandler;
354
355    #[cfg(feature = "debugger")]
356    use crate::block::Block;
357    #[cfg(feature = "debugger")]
358    use crate::debug_command::DebugCommand;
359    #[cfg(feature = "debugger")]
360    use crate::debugger_handler::DebuggerHandler;
361    #[cfg(feature = "debugger")]
362    use crate::job::Job;
363    #[cfg(feature = "debugger")]
364    use crate::run_state::State;
365
366    use super::Coordinator;
367    use crate::dispatcher::Dispatcher;
368    use crate::run_state::RunState;
369
370    fn get_bind_addresses(ports: (u16, u16, u16, u16)) -> (String, String, String, String) {
371        (
372            format!("tcp://*:{}", ports.0),
373            format!("tcp://*:{}", ports.1),
374            format!("tcp://*:{}", ports.2),
375            format!("tcp://*:{}", ports.3),
376        )
377    }
378
379    fn get_four_ports() -> (u16, u16, u16, u16) {
380        (
381            pick_unused_port().expect("No ports free"),
382            pick_unused_port().expect("No ports free"),
383            pick_unused_port().expect("No ports free"),
384            pick_unused_port().expect("No ports free"),
385        )
386    }
387
388    fn test_meta_data() -> MetaData {
389        MetaData {
390            name: "test".into(),
391            version: "0.0.0".into(),
392            description: "a test".into(),
393            authors: vec!["me".into()],
394        }
395    }
396
397    fn test_manifest(functions: Vec<RuntimeFunction>) -> FlowManifest {
398        let mut manifest = FlowManifest::new(test_meta_data());
399        for function in functions {
400            manifest.add_function(function);
401        }
402        manifest
403    }
404
405    fn test_submission(functions: Vec<RuntimeFunction>) -> Submission {
406        Submission::new(
407            test_manifest(functions),
408            None,
409            Some(Duration::from_millis(100)),
410            #[cfg(feature = "debugger")]
411            false,
412        )
413    }
414
415    fn test_dispatcher() -> Dispatcher {
416        Dispatcher::new(&get_bind_addresses(get_four_ports())).expect("Could not create dispatcher")
417    }
418
419    #[cfg(feature = "debugger")]
420    struct DummyDebugServer;
421
422    #[cfg(feature = "debugger")]
423    impl DebuggerHandler for DummyDebugServer {
424        fn start(&mut self) {}
425        fn job_breakpoint(&mut self, _job: &Job, _function: &RuntimeFunction, _states: Vec<State>) {
426        }
427        fn block_breakpoint(&mut self, _block: &Block) {}
428        fn flow_unblock_breakpoint(&mut self, _flow_id: usize) {}
429        fn send_breakpoint(
430            &mut self,
431            _: &str,
432            _source_process_id: usize,
433            _output_route: &str,
434            _value: &serde_json::Value,
435            _destination_id: usize,
436            _destination_name: &str,
437            _input_name: &str,
438            _input_number: usize,
439        ) {
440        }
441        fn job_error(&mut self, _job: &Job) {}
442        fn job_completed(&mut self, _job: &Job) {}
443        fn blocks(&mut self, _blocks: Vec<Block>) {}
444        fn outputs(&mut self, _output: Vec<OutputConnection>) {}
445        fn input(&mut self, _input: Input) {}
446        fn function_list(&mut self, _functions: &[RuntimeFunction]) {}
447        fn function_states(&mut self, _function: RuntimeFunction, _function_states: Vec<State>) {}
448        fn run_state(&mut self, _run_state: &RunState) {}
449        fn message(&mut self, _message: String) {}
450        fn panic(&mut self, _state: &RunState, _error_message: String) {}
451        fn debugger_exiting(&mut self) {}
452        fn debugger_resetting(&mut self) {}
453        fn debugger_error(&mut self, _error: String) {}
454        fn execution_starting(&mut self) {}
455        fn execution_ended(&mut self) {}
456        fn get_command(&mut self, _state: &RunState) -> flowcore::errors::Result<DebugCommand> {
457            Ok(DebugCommand::Continue)
458        }
459    }
460
461    #[cfg(feature = "submission")]
462    struct DummySubmissionHandler;
463
464    #[cfg(feature = "submission")]
465    impl SubmissionHandler for DummySubmissionHandler {
466        fn flow_execution_starting(&mut self) -> flowcore::errors::Result<()> {
467            Ok(())
468        }
469
470        #[cfg(feature = "debugger")]
471        fn should_enter_debugger(&mut self) -> flowcore::errors::Result<bool> {
472            Ok(false)
473        }
474
475        fn flow_execution_ended(
476            &mut self,
477            _state: &RunState,
478            #[cfg(feature = "metrics")] _metrics: Metrics,
479        ) -> flowcore::errors::Result<()> {
480            Ok(())
481        }
482
483        fn wait_for_submission(&mut self) -> flowcore::errors::Result<Option<Submission>> {
484            Ok(None)
485        }
486
487        fn coordinator_is_exiting(
488            &mut self,
489            result: flowcore::errors::Result<()>,
490        ) -> flowcore::errors::Result<()> {
491            result
492        }
493    }
494
495    #[test]
496    #[serial]
497    fn create_coordinator() {
498        let dispatcher = test_dispatcher();
499        #[cfg(feature = "submission")]
500        let mut submission_handler = DummySubmissionHandler;
501        #[cfg(feature = "debugger")]
502        let mut debug_server = DummyDebugServer;
503
504        let _coordinator = Coordinator::new(
505            dispatcher,
506            #[cfg(feature = "submission")]
507            &mut submission_handler,
508            #[cfg(feature = "debugger")]
509            &mut debug_server,
510        );
511    }
512
513    #[test]
514    #[serial]
515    fn execute_empty_flow() {
516        let dispatcher = test_dispatcher();
517        #[cfg(feature = "submission")]
518        let mut submission_handler = DummySubmissionHandler;
519        #[cfg(feature = "debugger")]
520        let mut debug_server = DummyDebugServer;
521
522        let mut coordinator = Coordinator::new(
523            dispatcher,
524            #[cfg(feature = "submission")]
525            &mut submission_handler,
526            #[cfg(feature = "debugger")]
527            &mut debug_server,
528        );
529
530        let submission = test_submission(vec![]);
531        let result = coordinator.execute_flow(submission);
532        assert!(result.is_ok(), "Empty flow should execute successfully");
533    }
534
535    #[test]
536    #[serial]
537    fn execute_empty_flow_with_no_timeout() {
538        let dispatcher = test_dispatcher();
539        #[cfg(feature = "submission")]
540        let mut submission_handler = DummySubmissionHandler;
541        #[cfg(feature = "debugger")]
542        let mut debug_server = DummyDebugServer;
543
544        let mut coordinator = Coordinator::new(
545            dispatcher,
546            #[cfg(feature = "submission")]
547            &mut submission_handler,
548            #[cfg(feature = "debugger")]
549            &mut debug_server,
550        );
551
552        let submission = Submission::new(
553            test_manifest(vec![]),
554            None,
555            None,
556            #[cfg(feature = "debugger")]
557            false,
558        );
559        let result = coordinator.execute_flow(submission);
560        assert!(
561            result.is_ok(),
562            "Empty flow with no timeout should execute successfully"
563        );
564    }
565
566    #[test]
567    #[serial]
568    fn execute_empty_flow_with_max_parallel_jobs() {
569        let dispatcher = test_dispatcher();
570        #[cfg(feature = "submission")]
571        let mut submission_handler = DummySubmissionHandler;
572        #[cfg(feature = "debugger")]
573        let mut debug_server = DummyDebugServer;
574
575        let mut coordinator = Coordinator::new(
576            dispatcher,
577            #[cfg(feature = "submission")]
578            &mut submission_handler,
579            #[cfg(feature = "debugger")]
580            &mut debug_server,
581        );
582
583        let submission = Submission::new(
584            test_manifest(vec![]),
585            Some(4),
586            Some(Duration::from_millis(100)),
587            #[cfg(feature = "debugger")]
588            false,
589        );
590        let result = coordinator.execute_flow(submission);
591        assert!(
592            result.is_ok(),
593            "Empty flow with max_parallel_jobs should execute successfully"
594        );
595    }
596
597    #[cfg(feature = "submission")]
598    #[test]
599    #[serial]
600    fn submission_loop_no_submission() {
601        let dispatcher = test_dispatcher();
602        let mut submission_handler = DummySubmissionHandler;
603        #[cfg(feature = "debugger")]
604        let mut debug_server = DummyDebugServer;
605
606        let mut coordinator = Coordinator::new(
607            dispatcher,
608            &mut submission_handler,
609            #[cfg(feature = "debugger")]
610            &mut debug_server,
611        );
612
613        let result = coordinator.submission_loop(false);
614        assert!(
615            result.is_ok(),
616            "submission_loop should return Ok when no submission is available"
617        );
618    }
619}