1#[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
2use std::marker::PhantomData;
3
4use log::{debug, error, trace};
5
6use flowcore::errors::*;
7#[cfg(feature = "metrics")]
8use flowcore::model::metrics::Metrics;
9use flowcore::model::submission::Submission;
10
11#[cfg(feature = "debugger")]
12use crate::debugger::Debugger;
13use crate::dispatcher::Dispatcher;
14use crate::job::Job;
15use crate::run_state::RunState;
16#[cfg(feature = "debugger")]
17use crate::protocols::DebuggerProtocol;
18#[cfg(feature = "submission")]
19use crate::protocols::SubmissionProtocol;
20
21pub struct Coordinator<'a> {
29 #[cfg(feature = "submission")]
31 submitter: &'a mut dyn SubmissionProtocol,
32 dispatcher: Dispatcher,
34 #[cfg(feature = "debugger")]
35 debugger: Debugger<'a>,
37 #[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
38 _data: PhantomData<&'a Dispatcher>
39}
40
41impl<'a> Coordinator<'a> {
42 pub fn new(
44 dispatcher: Dispatcher,
45 #[cfg(feature = "submission")] submitter: &'a mut dyn SubmissionProtocol,
46 #[cfg(feature = "debugger")] debug_server: &'a mut dyn DebuggerProtocol
47 ) -> Result<Self> {
48 Ok(Coordinator {
49 #[cfg(feature = "submission")]
50 submitter,
51 dispatcher,
52 #[cfg(feature = "debugger")]
53 debugger: Debugger::new(debug_server),
54 #[cfg(all(not(feature = "debugger"), not(feature = "submission")))]
55 _data: PhantomData
56 })
57 }
58
59 #[cfg(feature = "submission")]
61 pub fn submission_loop(
62 &mut self,
63 loop_forever: bool,
64 ) -> Result<()> {
65 while let Some(submission) = self.submitter.wait_for_submission()? {
66 let _ = self.execute_flow(submission);
67 if !loop_forever {
68 break;
69 }
70 }
71
72 self.submitter.coordinator_is_exiting(Ok(()))
73 }
74
75 #[allow(unused_variables, unused_assignments, unused_mut)]
81 pub fn execute_flow(&mut self,
82 submission: Submission,) -> Result<()> {
83 self.dispatcher.set_results_timeout(submission.job_timeout)?;
84 let mut state = RunState::new(submission);
85
86 #[cfg(feature = "metrics")]
87 let mut metrics = Metrics::new(state.num_functions());
88
89 #[cfg(feature = "debugger")]
90 if state.submission.debug {
91 self.debugger.start();
92 }
93
94 let mut restart = false;
95 let mut display_next_output = false;
96
97 'flow_execution:
99 loop {
100 state.init()?;
101 #[cfg(feature = "metrics")]
102 metrics.reset();
103
104 #[cfg(feature = "debugger")]
106 if state.submission.debug {
107 (display_next_output, restart) = self.debugger.wait_for_command(&mut state)?;
108 }
109
110 #[cfg(feature = "submission")]
111 self.submitter.flow_execution_starting()?;
112
113 'jobs: loop {
114 trace!("{}", state);
115 #[cfg(feature = "debugger")]
116 if state.submission.debug && self.submitter.should_enter_debugger()? {
117 (display_next_output, restart) = self.debugger.wait_for_command(&mut state)?;
118 if restart {
119 break 'jobs;
120 }
121 }
122
123 (display_next_output, restart) = self.dispatch_jobs(
124 &mut state,
125 #[cfg(feature = "metrics")]
126 &mut metrics,
127 )?;
128
129 if restart {
130 break 'jobs;
131 }
132
133 if state.number_jobs_running() > 0 {
134 match self.dispatcher.get_next_result() {
135 Ok(result) => {
136 let job;
137
138 (display_next_output, restart, job) = state.retire_job(
139 #[cfg(feature = "metrics")]
140 &mut metrics,
141 result,
142 #[cfg(feature = "debugger")]
143 &mut self.debugger,
144 )?;
145 #[cfg(feature = "debugger")]
146 if display_next_output {
147 (display_next_output, restart) =
148 self.debugger.job_done(&mut state, &job)?;
149 if restart {
150 break 'jobs;
151 }
152 }
153 }
154
155 Err(err) => {
156 error!("\t{}", err.to_string());
157 #[cfg(feature = "debugger")]
158 if state.submission.debug {
159 (display_next_output, restart) = self.debugger.error(
160 &mut state, err.to_string())?;
161 if restart {
162 break 'jobs;
163 }
164 }
165 }
166 }
167 }
168
169 if state.number_jobs_running() == 0 && state.number_jobs_ready() == 0 {
170 break 'jobs;
173 }
174 } #[allow(clippy::collapsible_if)]
178 #[cfg(feature = "debugger")]
179 if !restart {
180 {
181 if state.submission.debug {
183 (display_next_output, restart) = self.debugger.execution_ended(&mut state)?;
184 }
185 }
186 }
187
188 if !restart {
191 break 'flow_execution;
192 }
193 }
194
195 #[cfg(feature = "metrics")]
196 metrics.set_jobs_created(state.get_number_of_jobs_created());
197 #[cfg(all(feature = "submission", feature = "metrics"))]
198 self.submitter.flow_execution_ended(&state, metrics)?;
199 #[cfg(all(feature = "submission", not(feature = "metrics")))]
200 self.submitter.flow_execution_ended(&state)?;
201
202 Ok(()) }
204
205 fn dispatch_jobs(
208 &mut self,
209 state: &mut RunState,
210 #[cfg(feature = "metrics")] metrics: &mut Metrics,
211 ) -> Result<(bool, bool)> {
212 let mut display_output = false;
213 let mut restart = false;
214
215 while let Some(job) = state.get_job() {
216 match self.dispatch_a_job(
217 job.clone(),
218 state,
219 #[cfg(feature = "metrics")]
220 metrics,
221 ) {
222 Ok((display, rest)) => {
223 display_output = display;
224 restart = rest;
225 }
226 Err(err) => {
227 error!("Error sending on 'job_tx': {}", err.to_string());
228 debug!("{}", state);
229
230 #[cfg(feature = "debugger")]
231 return self.debugger.job_error(state, &job); }
233 }
234 }
235
236 Ok((display_output, restart))
237 }
238
239 fn dispatch_a_job(
241 &mut self,
242 job: Job,
243 state: &mut RunState,
244 #[cfg(feature = "metrics")] metrics: &mut Metrics,
245 ) -> Result<(bool, bool)> {
246 #[cfg(not(feature = "debugger"))]
247 let debug_options = (false, false);
248
249 #[cfg(feature = "metrics")]
250 metrics.track_max_jobs(state.number_jobs_running());
251
252 #[cfg(feature = "debugger")]
253 let debug_options = self
254 .debugger
255 .check_prior_to_job(state, &job)?;
256
257 self.dispatcher.send_job_for_execution(&job.payload)?;
258
259 state.start_job(job)?;
260
261 Ok(debug_options)
262 }
263}