1#[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
2use std::marker::PhantomData;
3
4use log::{debug, error, info, trace};
5use serde_json::Value;
6
7use flowcore::errors::Result;
8#[cfg(feature = "metrics")]
9use flowcore::model::metrics::Metrics;
10use flowcore::model::submission::Submission;
11use flowcore::RunAgain;
12
13#[cfg(feature = "debugger")]
14use crate::debugger::Debugger;
15#[cfg(feature = "debugger")]
16use crate::debugger_handler::DebuggerHandler;
17use crate::dispatcher::Dispatcher;
18use crate::job::Job;
19use crate::run_state::RunState;
20#[cfg(feature = "submission")]
21use crate::submission_handler::SubmissionHandler;
22
23pub struct Coordinator<'a> {
32 #[cfg(feature = "submission")]
34 submission_handler: &'a mut dyn SubmissionHandler,
35 dispatcher: Dispatcher,
37 #[cfg(feature = "debugger")]
38 debugger: Debugger<'a>,
40 #[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
41 _data: PhantomData<&'a Dispatcher>,
42}
43
44impl<'a> Coordinator<'a> {
45 pub fn new(
47 dispatcher: Dispatcher,
48 #[cfg(feature = "submission")] submitter: &'a mut dyn SubmissionHandler,
49 #[cfg(feature = "debugger")] debug_server: &'a mut dyn DebuggerHandler,
50 ) -> Self {
51 Coordinator {
52 #[cfg(feature = "submission")]
53 submission_handler: submitter,
54 dispatcher,
55 #[cfg(feature = "debugger")]
56 debugger: Debugger::new(debug_server),
57 #[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
58 _data: PhantomData,
59 }
60 }
61
62 #[cfg(feature = "submission")]
70 pub fn submission_loop(&mut self, loop_forever: bool) -> Result<()> {
71 while let Some(submission) = self.submission_handler.wait_for_submission()? {
72 let _ = self.execute_flow(submission);
73 if !loop_forever {
74 break;
75 }
76 }
77
78 self.submission_handler.coordinator_is_exiting(Ok(()))
79 }
80
81 #[allow(unused_variables, unused_assignments, unused_mut)]
92 pub fn execute_flow(&mut self, submission: Submission) -> Result<()> {
93 self.dispatcher
94 .set_results_timeout(submission.job_timeout)?;
95 let mut state = RunState::new(submission);
96
97 #[cfg(feature = "metrics")]
98 let mut metrics = Metrics::new(state.num_functions());
99
100 #[cfg(feature = "debugger")]
101 if state.submission.debug_enabled {
102 self.debugger.start();
103 }
104
105 let mut restart = false;
106 let mut display_next_output = false;
107
108 'flow_execution: loop {
110 state.init()?;
111 #[cfg(feature = "metrics")]
112 metrics.reset();
113
114 #[cfg(feature = "debugger")]
116 if state.submission.debug_enabled {
117 (display_next_output, restart) = self.debugger.wait_for_command(&mut state)?;
118 }
119
120 #[cfg(feature = "submission")]
121 self.submission_handler.flow_execution_starting()?;
122
123 'jobs: loop {
124 trace!("{state}");
125
126 #[cfg(feature = "submission")]
127 if self.submission_handler.should_stop()? {
128 break 'flow_execution;
129 }
130
131 #[cfg(feature = "debugger")]
132 if state.submission.debug_enabled
133 && self.submission_handler.should_enter_debugger()?
134 {
135 (display_next_output, restart) = self.debugger.wait_for_command(&mut state)?;
136 if restart {
137 break 'jobs;
138 }
139 }
140
141 (display_next_output, restart) = self.dispatch_jobs(
142 &mut state,
143 #[cfg(feature = "metrics")]
144 &mut metrics,
145 )?;
146
147 if restart {
148 break 'jobs;
149 }
150
151 (display_next_output, restart) = self.retire_jobs(
152 &mut state,
153 #[cfg(feature = "metrics")]
154 &mut metrics,
155 )?;
156
157 #[cfg(feature = "submission")]
158 self.submission_handler
159 .jobs_created(state.get_number_of_jobs_created());
160
161 if restart {
162 break 'jobs;
163 }
164
165 if state.number_jobs_running() == 0 && state.number_jobs_ready() == 0 {
166 break 'jobs;
169 }
170 } #[allow(clippy::collapsible_if)]
174 #[cfg(feature = "debugger")]
175 if !restart {
176 {
177 if state.submission.debug_enabled {
179 (display_next_output, restart) =
180 self.debugger.execution_ended(&mut state)?;
181 }
182 }
183 }
184
185 if !restart {
188 break 'flow_execution;
189 }
190 }
191
192 #[cfg(feature = "metrics")]
193 metrics.stop_timer();
194 #[cfg(feature = "metrics")]
195 metrics.set_jobs_created(state.get_number_of_jobs_created());
196 #[cfg(all(feature = "submission", feature = "metrics"))]
197 self.submission_handler
198 .flow_execution_ended(&state, metrics)?;
199 #[cfg(all(feature = "submission", not(feature = "metrics")))]
200 self.submitter.flow_execution_ended(&state)?;
201
202 Ok(()) }
204
205 #[allow(clippy::type_complexity)]
207 fn get_result(
208 &mut self,
209 state: &RunState,
210 ) -> Result<Option<(usize, Result<(Option<Value>, RunAgain)>)>> {
211 if let Ok(result) = self.dispatcher.get_next_result(false) {
212 return Ok(Some(result));
213 }
214
215 if state.number_jobs_ready() > 0 {
216 return Ok(None);
217 }
218
219 match self.dispatcher.get_next_result(true) {
220 Ok(result) => Ok(Some(result)),
221 Err(e) => Err(e),
222 }
223 }
224
225 fn retire_jobs(
228 &mut self,
229 state: &mut RunState,
230 #[cfg(feature = "metrics")] metrics: &mut Metrics,
231 ) -> Result<(bool, bool)> {
232 let mut display_next_output = false;
233 let mut restart = false;
234
235 if state.number_jobs_running() > 0 {
236 match self.get_result(state) {
237 Ok(Some(result)) => {
238 let job;
239
240 (display_next_output, restart, job) = state.retire_a_job(
241 #[cfg(feature = "metrics")]
242 metrics,
243 result,
244 #[cfg(feature = "debugger")]
245 &mut self.debugger,
246 )?;
247 #[cfg(feature = "debugger")]
248 if display_next_output {
249 (display_next_output, restart) = self.debugger.job_done(state, &job);
250 if restart {
251 return Ok((display_next_output, restart));
252 }
253 }
254 }
255
256 Ok(None) => {
257 info!(
258 "No result was immediately available, but jobs are ready to be dispatched.\
259 So coordinator avoided blocking for result. Will be received next time around"
260 );
261 }
262
263 Err(err) => {
264 error!("\t{err}");
265 #[cfg(feature = "debugger")]
266 if state.submission.debug_enabled {
267 return self.debugger.error(state, err.to_string());
268 }
269 return Ok((display_next_output, restart));
270 }
271 }
272 }
273
274 Ok((display_next_output, restart))
275 }
276
277 fn dispatch_jobs(
280 &mut self,
281 state: &mut RunState,
282 #[cfg(feature = "metrics")] metrics: &mut Metrics,
283 ) -> Result<(bool, bool)> {
284 let mut display_next_output = false;
285 let mut restart = false;
286
287 while let Some(job) = state.get_next_job() {
288 match self.dispatch_a_job(
289 job.clone(),
290 state,
291 #[cfg(feature = "metrics")]
292 metrics,
293 ) {
294 Ok((display, rest)) => {
295 display_next_output = display;
296 restart = rest;
297 }
298 Err(err) => {
299 error!("Error sending on 'job_tx': {err}");
300 debug!("{state}");
301
302 #[cfg(feature = "debugger")]
303 return self.debugger.job_error(state, &job); }
305 }
306 }
307
308 Ok((display_next_output, restart))
309 }
310
311 fn dispatch_a_job(
313 &mut self,
314 job: Job,
315 state: &mut RunState,
316 #[cfg(feature = "metrics")] metrics: &mut Metrics,
317 ) -> Result<(bool, bool)> {
318 #[cfg(not(feature = "debugger"))]
319 let debug_options = (false, false);
320
321 #[cfg(feature = "debugger")]
322 let debug_options = self.debugger.check_prior_to_job(state, &job)?;
323
324 self.dispatcher.send_job_for_execution(&job.payload)?;
325
326 state.start_job(job);
327
328 #[cfg(feature = "metrics")]
329 metrics.track_max_jobs(state.number_jobs_running());
330
331 Ok(debug_options)
332 }
333}
334
335#[cfg(test)]
336#[allow(clippy::unwrap_used, clippy::expect_used)]
337mod test {
338 use std::time::Duration;
339
340 use portpicker::pick_unused_port;
341 use serial_test::serial;
342
343 use flowcore::model::flow_manifest::FlowManifest;
344 use flowcore::model::input::Input;
345 use flowcore::model::metadata::MetaData;
346 #[cfg(feature = "metrics")]
347 use flowcore::model::metrics::Metrics;
348 use flowcore::model::output_connection::OutputConnection;
349 use flowcore::model::runtime_function::RuntimeFunction;
350 use flowcore::model::submission::Submission;
351
352 #[cfg(feature = "submission")]
353 use crate::submission_handler::SubmissionHandler;
354
355 #[cfg(feature = "debugger")]
356 use crate::block::Block;
357 #[cfg(feature = "debugger")]
358 use crate::debug_command::DebugCommand;
359 #[cfg(feature = "debugger")]
360 use crate::debugger_handler::DebuggerHandler;
361 #[cfg(feature = "debugger")]
362 use crate::job::Job;
363 #[cfg(feature = "debugger")]
364 use crate::run_state::State;
365
366 use super::Coordinator;
367 use crate::dispatcher::Dispatcher;
368 use crate::run_state::RunState;
369
370 fn get_bind_addresses(ports: (u16, u16, u16, u16)) -> (String, String, String, String) {
371 (
372 format!("tcp://*:{}", ports.0),
373 format!("tcp://*:{}", ports.1),
374 format!("tcp://*:{}", ports.2),
375 format!("tcp://*:{}", ports.3),
376 )
377 }
378
379 fn get_four_ports() -> (u16, u16, u16, u16) {
380 (
381 pick_unused_port().expect("No ports free"),
382 pick_unused_port().expect("No ports free"),
383 pick_unused_port().expect("No ports free"),
384 pick_unused_port().expect("No ports free"),
385 )
386 }
387
388 fn test_meta_data() -> MetaData {
389 MetaData {
390 name: "test".into(),
391 version: "0.0.0".into(),
392 description: "a test".into(),
393 authors: vec!["me".into()],
394 }
395 }
396
397 fn test_manifest(functions: Vec<RuntimeFunction>) -> FlowManifest {
398 let mut manifest = FlowManifest::new(test_meta_data());
399 for function in functions {
400 manifest.add_function(function);
401 }
402 manifest
403 }
404
405 fn test_submission(functions: Vec<RuntimeFunction>) -> Submission {
406 Submission::new(
407 test_manifest(functions),
408 None,
409 Some(Duration::from_millis(100)),
410 #[cfg(feature = "debugger")]
411 false,
412 )
413 }
414
415 fn test_dispatcher() -> Dispatcher {
416 Dispatcher::new(&get_bind_addresses(get_four_ports())).expect("Could not create dispatcher")
417 }
418
419 #[cfg(feature = "debugger")]
420 struct DummyDebugServer;
421
422 #[cfg(feature = "debugger")]
423 impl DebuggerHandler for DummyDebugServer {
424 fn start(&mut self) {}
425 fn job_breakpoint(&mut self, _job: &Job, _function: &RuntimeFunction, _states: Vec<State>) {
426 }
427 fn block_breakpoint(&mut self, _block: &Block) {}
428 fn flow_unblock_breakpoint(&mut self, _flow_id: usize) {}
429 fn send_breakpoint(
430 &mut self,
431 _: &str,
432 _source_process_id: usize,
433 _output_route: &str,
434 _value: &serde_json::Value,
435 _destination_id: usize,
436 _destination_name: &str,
437 _input_name: &str,
438 _input_number: usize,
439 ) {
440 }
441 fn job_error(&mut self, _job: &Job) {}
442 fn job_completed(&mut self, _job: &Job) {}
443 fn blocks(&mut self, _blocks: Vec<Block>) {}
444 fn outputs(&mut self, _output: Vec<OutputConnection>) {}
445 fn input(&mut self, _input: Input) {}
446 fn function_list(&mut self, _functions: &[RuntimeFunction]) {}
447 fn function_states(&mut self, _function: RuntimeFunction, _function_states: Vec<State>) {}
448 fn run_state(&mut self, _run_state: &RunState) {}
449 fn message(&mut self, _message: String) {}
450 fn panic(&mut self, _state: &RunState, _error_message: String) {}
451 fn debugger_exiting(&mut self) {}
452 fn debugger_resetting(&mut self) {}
453 fn debugger_error(&mut self, _error: String) {}
454 fn execution_starting(&mut self) {}
455 fn execution_ended(&mut self) {}
456 fn get_command(&mut self, _state: &RunState) -> flowcore::errors::Result<DebugCommand> {
457 Ok(DebugCommand::Continue)
458 }
459 }
460
461 #[cfg(feature = "submission")]
462 struct DummySubmissionHandler;
463
464 #[cfg(feature = "submission")]
465 impl SubmissionHandler for DummySubmissionHandler {
466 fn flow_execution_starting(&mut self) -> flowcore::errors::Result<()> {
467 Ok(())
468 }
469
470 #[cfg(feature = "debugger")]
471 fn should_enter_debugger(&mut self) -> flowcore::errors::Result<bool> {
472 Ok(false)
473 }
474
475 fn flow_execution_ended(
476 &mut self,
477 _state: &RunState,
478 #[cfg(feature = "metrics")] _metrics: Metrics,
479 ) -> flowcore::errors::Result<()> {
480 Ok(())
481 }
482
483 fn wait_for_submission(&mut self) -> flowcore::errors::Result<Option<Submission>> {
484 Ok(None)
485 }
486
487 fn coordinator_is_exiting(
488 &mut self,
489 result: flowcore::errors::Result<()>,
490 ) -> flowcore::errors::Result<()> {
491 result
492 }
493 }
494
495 #[test]
496 #[serial]
497 fn create_coordinator() {
498 let dispatcher = test_dispatcher();
499 #[cfg(feature = "submission")]
500 let mut submission_handler = DummySubmissionHandler;
501 #[cfg(feature = "debugger")]
502 let mut debug_server = DummyDebugServer;
503
504 let _coordinator = Coordinator::new(
505 dispatcher,
506 #[cfg(feature = "submission")]
507 &mut submission_handler,
508 #[cfg(feature = "debugger")]
509 &mut debug_server,
510 );
511 }
512
513 #[test]
514 #[serial]
515 fn execute_empty_flow() {
516 let dispatcher = test_dispatcher();
517 #[cfg(feature = "submission")]
518 let mut submission_handler = DummySubmissionHandler;
519 #[cfg(feature = "debugger")]
520 let mut debug_server = DummyDebugServer;
521
522 let mut coordinator = Coordinator::new(
523 dispatcher,
524 #[cfg(feature = "submission")]
525 &mut submission_handler,
526 #[cfg(feature = "debugger")]
527 &mut debug_server,
528 );
529
530 let submission = test_submission(vec![]);
531 let result = coordinator.execute_flow(submission);
532 assert!(result.is_ok(), "Empty flow should execute successfully");
533 }
534
535 #[test]
536 #[serial]
537 fn execute_empty_flow_with_no_timeout() {
538 let dispatcher = test_dispatcher();
539 #[cfg(feature = "submission")]
540 let mut submission_handler = DummySubmissionHandler;
541 #[cfg(feature = "debugger")]
542 let mut debug_server = DummyDebugServer;
543
544 let mut coordinator = Coordinator::new(
545 dispatcher,
546 #[cfg(feature = "submission")]
547 &mut submission_handler,
548 #[cfg(feature = "debugger")]
549 &mut debug_server,
550 );
551
552 let submission = Submission::new(
553 test_manifest(vec![]),
554 None,
555 None,
556 #[cfg(feature = "debugger")]
557 false,
558 );
559 let result = coordinator.execute_flow(submission);
560 assert!(
561 result.is_ok(),
562 "Empty flow with no timeout should execute successfully"
563 );
564 }
565
566 #[test]
567 #[serial]
568 fn execute_empty_flow_with_max_parallel_jobs() {
569 let dispatcher = test_dispatcher();
570 #[cfg(feature = "submission")]
571 let mut submission_handler = DummySubmissionHandler;
572 #[cfg(feature = "debugger")]
573 let mut debug_server = DummyDebugServer;
574
575 let mut coordinator = Coordinator::new(
576 dispatcher,
577 #[cfg(feature = "submission")]
578 &mut submission_handler,
579 #[cfg(feature = "debugger")]
580 &mut debug_server,
581 );
582
583 let submission = Submission::new(
584 test_manifest(vec![]),
585 Some(4),
586 Some(Duration::from_millis(100)),
587 #[cfg(feature = "debugger")]
588 false,
589 );
590 let result = coordinator.execute_flow(submission);
591 assert!(
592 result.is_ok(),
593 "Empty flow with max_parallel_jobs should execute successfully"
594 );
595 }
596
597 #[cfg(feature = "submission")]
598 #[test]
599 #[serial]
600 fn submission_loop_no_submission() {
601 let dispatcher = test_dispatcher();
602 let mut submission_handler = DummySubmissionHandler;
603 #[cfg(feature = "debugger")]
604 let mut debug_server = DummyDebugServer;
605
606 let mut coordinator = Coordinator::new(
607 dispatcher,
608 &mut submission_handler,
609 #[cfg(feature = "debugger")]
610 &mut debug_server,
611 );
612
613 let result = coordinator.submission_loop(false);
614 assert!(
615 result.is_ok(),
616 "submission_loop should return Ok when no submission is available"
617 );
618 }
619}