#[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
use std::marker::PhantomData;
use log::{debug, error, trace};
use flowcore::errors::*;
#[cfg(feature = "metrics")]
use flowcore::model::metrics::Metrics;
use flowcore::model::submission::Submission;
#[cfg(feature = "debugger")]
use crate::debugger::Debugger;
use crate::dispatcher::Dispatcher;
use crate::job::Job;
use crate::run_state::RunState;
#[cfg(feature = "debugger")]
use crate::protocols::DebuggerProtocol;
#[cfg(feature = "submission")]
use crate::protocols::SubmissionProtocol;
pub struct Coordinator<'a> {
#[cfg(feature = "submission")]
submitter: &'a mut dyn SubmissionProtocol,
dispatcher: Dispatcher,
#[cfg(feature = "debugger")]
debugger: Debugger<'a>,
#[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
_data: PhantomData<&'a Dispatcher>
}
impl<'a> Coordinator<'a> {
pub fn new(
dispatcher: Dispatcher,
#[cfg(feature = "submission")] submitter: &'a mut dyn SubmissionProtocol,
#[cfg(feature = "debugger")] debug_server: &'a mut dyn DebuggerProtocol
) -> Result<Self> {
Ok(Coordinator {
#[cfg(feature = "submission")]
submitter,
dispatcher,
#[cfg(feature = "debugger")]
debugger: Debugger::new(debug_server),
#[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
_data: PhantomData
})
}
#[cfg(feature = "submission")]
pub fn submission_loop(
&mut self,
loop_forever: bool,
) -> Result<()> {
while let Some(submission) = self.submitter.wait_for_submission()? {
let _ = self.execute_flow(submission);
if !loop_forever {
break;
}
}
self.submitter.coordinator_is_exiting(Ok(()))
}
#[allow(unused_variables, unused_assignments, unused_mut)]
pub fn execute_flow(&mut self,
submission: Submission,) -> Result<()> {
self.dispatcher.set_results_timeout(submission.job_timeout)?;
let mut state = RunState::new(submission);
#[cfg(feature = "metrics")]
let mut metrics = Metrics::new(state.num_functions());
#[cfg(feature = "debugger")]
if state.submission.debug {
self.debugger.start();
}
let mut restart = false;
let mut display_next_output = false;
'flow_execution:
loop {
state.init()?;
#[cfg(feature = "metrics")]
metrics.reset();
#[cfg(feature = "debugger")]
if state.submission.debug {
(display_next_output, restart) = self.debugger.wait_for_command(&mut state)?;
}
#[cfg(feature = "submission")]
self.submitter.flow_execution_starting()?;
'jobs: loop {
trace!("{}", state);
#[cfg(feature = "debugger")]
if state.submission.debug && self.submitter.should_enter_debugger()? {
(display_next_output, restart) = self.debugger.wait_for_command(&mut state)?;
if restart {
break 'jobs;
}
}
(display_next_output, restart) = self.dispatch_jobs(
&mut state,
#[cfg(feature = "metrics")]
&mut metrics,
)?;
if restart {
break 'jobs;
}
if state.number_jobs_running() > 0 {
match self.dispatcher.get_next_result() {
Ok(result) => {
let job;
(display_next_output, restart, job) = state.retire_job(
#[cfg(feature = "metrics")]
&mut metrics,
result,
#[cfg(feature = "debugger")]
&mut self.debugger,
)?;
#[cfg(feature = "debugger")]
if display_next_output {
(display_next_output, restart) =
self.debugger.job_done(&mut state, &job)?;
if restart {
break 'jobs;
}
}
}
Err(err) => {
error!("\t{}", err.to_string());
#[cfg(feature = "debugger")]
if state.submission.debug {
(display_next_output, restart) = self.debugger.error(
&mut state, err.to_string())?;
if restart {
break 'jobs;
}
}
}
}
}
if state.number_jobs_running() == 0 && state.number_jobs_ready() == 0 {
break 'jobs;
}
}
#[allow(clippy::collapsible_if)]
#[cfg(feature = "debugger")]
if !restart {
{
if state.submission.debug {
(display_next_output, restart) = self.debugger.execution_ended(&mut state)?;
}
}
}
if !restart {
break 'flow_execution;
}
}
#[cfg(feature = "metrics")]
metrics.set_jobs_created(state.get_number_of_jobs_created());
#[cfg(all(feature = "submission", feature = "metrics"))]
self.submitter.flow_execution_ended(&state, metrics)?;
#[cfg(all(feature = "submission", not(feature = "metrics")))]
self.submitter.flow_execution_ended(&state)?;
Ok(()) }
fn dispatch_jobs(
&mut self,
state: &mut RunState,
#[cfg(feature = "metrics")] metrics: &mut Metrics,
) -> Result<(bool, bool)> {
let mut display_output = false;
let mut restart = false;
while let Some(job) = state.get_job() {
match self.dispatch_a_job(
job.clone(),
state,
#[cfg(feature = "metrics")]
metrics,
) {
Ok((display, rest)) => {
display_output = display;
restart = rest;
}
Err(err) => {
error!("Error sending on 'job_tx': {}", err.to_string());
debug!("{}", state);
#[cfg(feature = "debugger")]
return self.debugger.job_error(state, &job); }
}
}
Ok((display_output, restart))
}
fn dispatch_a_job(
&mut self,
job: Job,
state: &mut RunState,
#[cfg(feature = "metrics")] metrics: &mut Metrics,
) -> Result<(bool, bool)> {
#[cfg(not(feature = "debugger"))]
let debug_options = (false, false);
#[cfg(feature = "metrics")]
metrics.track_max_jobs(state.number_jobs_running());
#[cfg(feature = "debugger")]
let debug_options = self
.debugger
.check_prior_to_job(state, &job)?;
self.dispatcher.send_job_for_execution(&job.payload)?;
state.start_job(job)?;
Ok(debug_options)
}
}