Skip to main content

flowrlib/
run_state.rs

1use std::collections::VecDeque;
2use std::collections::{HashMap, HashSet};
3use std::fmt;
4
5use log::{debug, error, info, trace};
6use serde_derive::{Deserialize, Serialize};
7use serde_json::Value;
8
9use flowcore::errors::Result;
10#[cfg(feature = "metrics")]
11use flowcore::model::metrics::Metrics;
12use flowcore::model::output_connection::OutputConnection;
13use flowcore::model::output_connection::Source::{Input, Output};
14use flowcore::model::runtime_function::RuntimeFunction;
15use flowcore::model::submission::Submission;
16use flowcore::RunAgain;
17
18#[cfg(debug_assertions)]
19use crate::checks;
20#[cfg(feature = "debugger")]
21use crate::debugger::Debugger;
22use crate::job::{Job, Payload};
23
24/// `State` represents the possible states it is possible for a function to be in
25#[cfg(any(debug_assertions, feature = "debugger", test))]
26#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
27pub enum State {
28    /// Ready     - Function will be in Ready state when all of its inputs are full
29    Ready,
30    /// Waiting   - Function is in the Waiting state when at least one of its inputs is not full
31    Waiting,
32    /// Running   - Function is in Running state when it has been picked from the Ready list for
33    /// execution using the `next` function
34    Running,
35    /// Completed - Function has indicated that it no longer wants to be run, so it's execution
36    ///           has completed.
37    Completed,
38}
39
40/// `RunState` is a structure that maintains the state of all the functions in the currently
41/// executing flow.
42///
43/// The Semantics of a Flow's `RunState`
44/// ==================================
45/// The semantics of the state of each function in a flow and the flow over are described here
46/// and the tests of the struct attempt to reproduce and confirm as many of them as is possible
47///
48/// Terminology
49/// ===========
50/// * function        - an entry in the manifest and the flow graph that may take inputs, will
51///   execute an implementation on a Job and may produce an Output
52/// * input           - a function may have 0 or more inputs that accept values required for it's
53///   execution
54/// * implementation  - the code that is run, accepting 0 or more input values performing some
55///   calculations and possibly producing an output value. One implementation can
56///   be used by multiple functions in a flow
57/// * destinations    - a set of other functions and their specific inputs that a function is
58///   connected to and hence where the output value is sent when execution is
59///   completed
60/// * job             - a job is the bundle of information necessary to execute. It consists of the
61///   function's id, the input values, the implementation to run, and the
62///   destinations to send the output value to
63/// * execution       - the act of running an implementation on the input values to produce an
64///   output
65/// * output          - a function when ran produces an output. The output contains the id of the
66///   function that was ran, the input values (for debugging), the result
67///   (optional value plus an indicator if the function wishes to be ran again
68///   when ready), the destinations to send any value to and an optional error
69///   string.
70///
71/// Start-up
72/// ==============
73/// At start-up all functions are initialized. For each of the functions inputs their
74/// `init_inputs` function will be called, meaning that some inputs may be initialized (filled).
75/// If all inputs are full then the Function will be ready to run.
76///
77/// One-time Execution or Stopping repetitive execution
78/// ===================================================
79/// A function may need to only run once, or to stop being executed repeatedly at some point.
80/// So each implementation when ran returns a "run again" flag to indicate this.
81/// An example of functions that may decide to stop running are:
82/// - args: produces arguments from the command line execution of a flow once at start-up
83/// - readline: read a line of input from standard input, until End-of-file (EOF) is detected.
84///   If this was not done, then the flow would never stop running as the readline function would
85///   always be re-run and waiting for more input, but none would ever be received after EOF.
86///
87/// Unused Functions
88/// ================
89/// If a pure function has an output but it is not used (not connected to any input) then the
90/// function should have no affect on the execution of the flow and the optimizer may remove it and
91/// all connections to its input. That in turn may affect other functions which can be removed,
92/// until there are no more left to remove.
93/// Thus at run-time, a pure function with it's output unused is not expected and no special
94/// handling of that case is taken. If a manifest is read where a pure function has no destinations,
95/// then it will be run (when it received inputs) and it's output discarded.
96/// That is sub-optimal execution but no errors should result. Hence the role of the optimizer at
97/// compile time.
98/// Tests: `pure_function_no_destinations`
99///
100/// Unconnected inputs
101/// ==================
102/// If a function's output is used but one or more of it's inputs is unconnected, then the compiler
103/// should throw an error. If for some reason an alternative compiler did not reject this and
104/// generated a manifest with no other function sending to that input, then at run-time that
105/// functions inputs will never be full and the function will never run. This could produce some
106/// form of deadlock or incomplete execution, but it should not produce any run-time error.
107///
108/// A run-time is within it's rights to discard this function, and then potentially other functions
109/// connected to it's output and other inputs until no more can be removed.
110/// This run-time does not do that, in order to keep things simple and start-up time to a minimum.
111/// It relies on the compiler having done that previously.
112///
113/// Initializers
114/// ============
115/// There are two types of initializers on inputs:
116/// * "Once" - the input is filled with the specified value once at start-up.
117/// * "Constant" - after the functions runs the input refilled with the same value.
118///
119/// Runtime Rules
120/// =============
121/// * A function won't be run until all of its inputs are ready
122/// * An input maybe initialized at start-up once by a "Once" input initializer
123/// * An input maybe initialized after each run by a "Constant" input initializer that ensures that
124///   the same value always re-fills the input
125///
126/// State Transitions
127/// =================
128///
129/// From    To State  Event causing transition and additional conditions          Test
130/// ----    --------  --------------------------------------------------          ----
131/// Init    Ready     No inputs                                                   `to_ready_1_on_init`
132///                   All inputs initialized                                      `to_ready_2_on_init`
133///                   All inputs initialized and no destinations                  `to_ready_3_on_init`
134/// Init    Waiting   At least one input is not full                              `to_waiting_on_init`
135///
136/// Ready   Running   `NextJob`: called to fetch the `function_id` for execution      `ready_to_running_on_next`
137///
138/// Waiting Ready     `Output`: last empty input on a function is filled            `waiting_to_ready_on_input`
139///
140/// Running Ready     `Output`: it's inputs are all full, so it can run again       `running_to_ready_on_done`
141/// Running Waiting   `Output`: it has one input or more empty, to it can't run     `running_to_waiting_on_done`
142///
143/// Iteration and Recursion
144/// =======================
145/// A function may send values to itself using a loop-back connector, in order to perform something
146/// similar to iteration or recursion, in procedural programming.
147///
148/// Parallel Execution of Jobs
149/// ==========================
150/// Multiple functions (jobs) may execute in parallel, providing there is no data dependency
151/// preventing it. Example dependencies:
152///   * a function lacks an input and needs to get it from another function that has not completed
153///
154/// Respecting this rule, a `RunTime` can dispatch as many Jobs in parallel as it desires. This one
155/// takes the parameter `max_jobs` on `RunState::new()` to specify the maximum number of jobs that
156/// are launched in parallel. The minimum value for this is 1
157#[derive(Deserialize, Serialize, Clone)]
158pub struct RunState {
159    /// The `Submission` that lead to this `RunState` object being created
160    pub(crate) submission: Submission,
161    /// `ready_jobs`: A queue of [Jobs][crate::job::Job] ready to run
162    ready_jobs: VecDeque<Job>,
163    /// `running_jobs`: set of [Jobs][crate::job::Job] that are running
164    running_jobs: HashMap<usize, Job>,
165    /// `completed`: [`RuntimeFunction`][flowcore::model::runtime_function::RuntimeFunction]
166    /// that have run to completion and won't run again
167    completed: HashSet<usize>,
168    /// number of jobs sent for execution to date
169    number_of_jobs_created: usize,
170    /// Track how many busy entries exist per `process_id` (functions and ancestor flows)
171    busy_count: HashMap<usize, usize>,
172    /// Index: parent flow ID → function IDs in that flow (avoids full-manifest scans)
173    functions_by_flow: HashMap<usize, Vec<usize>>,
174}
175
176impl RunState {
177    /// Create a new `RunState` struct from the list of functions provided and the `Submission`
178    /// that was sent to be executed
179    #[must_use]
180    pub fn new(submission: Submission) -> Self {
181        let mut functions_by_flow = HashMap::<usize, Vec<usize>>::new();
182        for (id, function) in submission.manifest.functions() {
183            functions_by_flow
184                .entry(function.get_parent_id())
185                .or_default()
186                .push(*id);
187        }
188
189        RunState {
190            submission,
191            ready_jobs: VecDeque::<Job>::new(),
192            running_jobs: HashMap::<usize, Job>::new(),
193            completed: HashSet::<usize>::new(),
194            number_of_jobs_created: 0,
195            busy_count: HashMap::<usize, usize>::new(),
196            functions_by_flow,
197        }
198    }
199
200    #[cfg(any(debug_assertions, feature = "debugger"))]
201    /// Get a reference to the map of all functions
202    pub(crate) fn get_functions(&self) -> &HashMap<usize, RuntimeFunction> {
203        self.submission.manifest.functions()
204    }
205
206    // Reset all values back to initial ones to enable debugging to restart from the initial state
207    #[cfg(feature = "debugger")]
208    fn reset(&mut self) {
209        debug!("Resetting RunState");
210        for function in self.submission.manifest.get_functions().values_mut() {
211            function.reset();
212        }
213        self.ready_jobs.clear();
214        self.running_jobs.clear();
215        self.completed.clear();
216        self.number_of_jobs_created = 0;
217        self.busy_count.clear();
218    }
219
220    /// The `ìnit()` function is responsible for initializing all functions, and it returns a
221    /// boolean to indicate that it's inputs are fulfilled - and this information is added to the
222    /// `RunList` to control the readiness of the Function to be executed.
223    ///
224    /// After `init` Functions will either be:
225    ///    - Ready:   an entry will be added to the `ready` list with this function's id
226    ///    - Waiting: function has at least one empty input, so it cannot run. It will not be added to
227    ///      the `ready` list, so by omission it is in the `Waiting` state.
228    pub(crate) fn init(&mut self) -> Result<()> {
229        #[cfg(feature = "debugger")]
230        self.reset();
231
232        let mut make_ready_list = vec![];
233
234        debug!("Initializing all functions");
235        for function in self.submission.manifest.get_functions().values_mut() {
236            function.init();
237            if function.can_run() {
238                make_ready_list.push((function.id(), function.get_parent_id()));
239            }
240        }
241
242        for (process_id, parent_id) in make_ready_list {
243            self.create_jobs(process_id, parent_id)?;
244        }
245
246        Ok(())
247    }
248
249    /// Return the states a function is in
250    #[cfg(any(debug_assertions, feature = "debugger", test))]
251    #[must_use]
252    pub fn get_function_states(&self, function_id: usize) -> Vec<State> {
253        let mut states = vec![];
254
255        if self.completed.contains(&function_id) {
256            states.push(State::Completed);
257        }
258
259        for ready_job in &self.ready_jobs {
260            if ready_job.process_id == function_id {
261                states.push(State::Ready);
262            }
263        }
264
265        if states.is_empty() {
266            states.push(State::Waiting);
267        }
268
269        states
270    }
271
272    // See if the function is in only the specified state
273    #[cfg(test)]
274    pub(crate) fn function_state_is_only(&self, function_id: usize, state: &State) -> bool {
275        let function_states = self.get_function_states(function_id);
276        function_states.len() == 1 && function_states.contains(state)
277    }
278
279    /// Get a Set (`job_id`) of the currently running jobs
280    #[cfg(any(feature = "debugger", debug_assertions))]
281    #[must_use]
282    pub fn get_running(&self) -> &HashMap<usize, Job> {
283        &self.running_jobs
284    }
285
286    /// Get a reference to the function with `id`
287    #[cfg(any(feature = "debugger", test))]
288    #[must_use]
289    pub fn get_function(&self, id: usize) -> Option<&RuntimeFunction> {
290        self.submission.manifest.functions().get(&id)
291    }
292
293    // Get a mutable reference to the function with `id`
294    fn get_mut(&mut self, id: usize) -> Option<&mut RuntimeFunction> {
295        self.submission.manifest.get_functions().get_mut(&id)
296    }
297
298    #[cfg(debug_assertions)]
299    /// Return the busy count map (`process_id` -> count of busy entries)
300    #[must_use]
301    pub fn get_busy_count(&self) -> &HashMap<usize, usize> {
302        &self.busy_count
303    }
304
305    // Return a new job to run if there is one and there are not too many jobs already running
306    pub(crate) fn get_next_job(&mut self) -> Option<Job> {
307        if let Some(limit) = self.submission.max_parallel_jobs {
308            if self.number_jobs_running() >= limit {
309                trace!("max_parallel_jobs limit of {limit} reached");
310                return None;
311            }
312        }
313
314        self.ready_jobs.remove(0)
315    }
316
317    // Update the run_state to reflect that the job is now running
318    pub(crate) fn start_job(&mut self, job: Job) {
319        self.running_jobs.insert(job.payload.job_id, job);
320    }
321
322    /// get the number of jobs created to date in the flow's execution
323    #[cfg(any(feature = "metrics", feature = "debugger"))]
324    #[must_use]
325    pub fn get_number_of_jobs_created(&self) -> usize {
326        self.number_of_jobs_created
327    }
328
329    // Complete a Job by taking its output and updating the run-list accordingly.
330    //
331    // If other functions were blocked trying to send to this one - we can now unblock them
332    // as it has consumed its inputs, and they are free to be sent to again.
333    //
334    // Then, take the output and send it to all destination IOs on different function it should be
335    // sent to, marking the source function as blocked because those others must consume the output
336    // if those other functions have all their inputs, then mark them accordingly.
337    #[allow(unused_variables, unused_assignments, unused_mut)]
338    pub(crate) fn retire_a_job(
339        &mut self,
340        #[cfg(feature = "metrics")] metrics: &mut Metrics,
341        result: (usize, Result<(Option<Value>, RunAgain)>),
342        #[cfg(feature = "debugger")] debugger: &mut Debugger,
343    ) -> Result<(bool, bool, Job)> {
344        let mut display_next_output = false;
345        let mut restart = false;
346
347        let mut job = self
348            .running_jobs
349            .remove(&result.0)
350            .ok_or_else(|| format!("Could not find Job#{} to retire it", result.0))?;
351
352        match &result.1 {
353            Ok((output_value, function_can_run_again)) => {
354                #[cfg(feature = "debugger")]
355                debug!(
356                    "Job #{}: Function #{} '{}' {:?} -> {:?}",
357                    job.payload.job_id,
358                    job.process_id,
359                    job.function_name,
360                    job.payload.input_set,
361                    output_value
362                );
363                #[cfg(not(feature = "debugger"))]
364                debug!(
365                    "Job #{}: Function #{} {:?} -> {:?}",
366                    job.payload.job_id, job.process_id, job.payload.input_set, output_value
367                );
368
369                for connection in &job.connections {
370                    let value_to_send = match &connection.source {
371                        Output(route) => match output_value {
372                            Some(output_v) => output_v.pointer(route),
373                            None => None,
374                        },
375                        Input(index) => job.payload.input_set.get(*index),
376                    };
377
378                    if let Some(value) = value_to_send {
379                        (display_next_output, restart) = self.send_a_value(
380                            job.process_id,
381                            job.parent_id,
382                            connection,
383                            value.clone(),
384                            #[cfg(feature = "metrics")]
385                            metrics,
386                            #[cfg(feature = "debugger")]
387                            debugger,
388                        )?;
389                    } else {
390                        trace!(
391                            "Job #{}:\t\tNo value found at '{}'",
392                            job.payload.job_id,
393                            connection.source
394                        );
395                    }
396                }
397
398                if *function_can_run_again {
399                    let function = self.get_mut(job.process_id).ok_or("No such function")?;
400
401                    // Refill any inputs with function initializers
402                    function.init_inputs(false, false);
403
404                    // NOTE: The function we are retiring may have new input sets due to sending
405                    // to itself via a loopback
406                    if function.can_run() {
407                        self.create_jobs(job.process_id, job.parent_id)?;
408                    }
409                } else {
410                    // otherwise mark it as completed as it will never run again
411                    self.mark_as_completed(job.process_id);
412                }
413            }
414            Err(e) => {
415                error!("Error in Job #{}: {e}", job.payload.job_id);
416            }
417        }
418
419        // unblock any senders from other flows that can now run due to this function completing
420        // causing the flow to be idle now
421        (display_next_output, restart) = self.unblock_flows(
422            &job,
423            #[cfg(feature = "debugger")]
424            debugger,
425        )?;
426
427        #[cfg(debug_assertions)]
428        checks::check_invariants(self, job.payload.job_id)?;
429
430        trace!(
431            "Job #{}: Completed-----------------------",
432            job.payload.job_id
433        );
434        job.result = result.1;
435
436        Ok((display_next_output, restart, job))
437    }
438
439    // Send a value produced as part of an output of running a job to a destination function on
440    // a specific input, update the metrics and potentially enter the debugger
441    fn send_a_value(
442        &mut self,
443        source_id: usize,
444        _source_parent_id: usize,
445        connection: &OutputConnection,
446        output_value: Value,
447        #[cfg(feature = "metrics")] metrics: &mut Metrics,
448        #[cfg(feature = "debugger")] debugger: &mut Debugger,
449    ) -> Result<(bool, bool)> {
450        let mut display_next_output = false;
451        let mut restart = false;
452
453        let route_str = match &connection.source {
454            Output(route) if route.is_empty() => String::new(),
455            Output(route) => format!(" from output route '{route}'"),
456            Input(index) => format!(" from Job input #{index}"),
457        };
458
459        let loopback = source_id == connection.destination_id;
460
461        if loopback {
462            info!("\t\tFunction #{source_id} loopback of value '{output_value}'{route_str} to Self:{}",
463                    connection.destination_io_number);
464        } else {
465            info!(
466                "\t\tFunction #{source_id} sending '{output_value}'{route_str} to Function #{}:{}",
467                connection.destination_id, connection.destination_io_number
468            );
469        }
470
471        #[cfg(feature = "debugger")]
472        if let Output(route) = &connection.source {
473            (display_next_output, restart) = debugger.check_prior_to_send(
474                self,
475                source_id,
476                route,
477                &output_value,
478                connection.destination_id,
479                connection.destination_io_number,
480            )?;
481        }
482
483        let function = self
484            .get_mut(connection.destination_id)
485            .ok_or("Could not get function")?;
486        let job_count_before = function.input_sets_available();
487        if connection.internal {
488            function.send_internal(connection.destination_io_number, output_value)?;
489        } else {
490            function.send(connection.destination_io_number, output_value)?;
491        }
492
493        #[cfg(feature = "metrics")]
494        metrics.increment_outputs_sent(); // not distinguishing array serialization / wrapping etc.
495
496        let new_job_available = function.input_sets_available() > job_count_before;
497
498        // postpone the decision about making the sending function Ready when we have a loopback
499        // connection that sends a value to itself, as it may also send to other functions.
500        // But for all other receivers of values, make them Ready
501        if new_job_available && !loopback {
502            self.create_jobs(connection.destination_id, connection.destination_parent_id)?;
503        }
504
505        Ok((display_next_output, restart))
506    }
507
508    /// Return how many jobs are currently running
509    #[must_use]
510    pub fn number_jobs_running(&self) -> usize {
511        self.running_jobs.len()
512    }
513
514    /// Return how many jobs are ready to be run, but not running yet
515    #[must_use]
516    pub fn number_jobs_ready(&self) -> usize {
517        self.ready_jobs.len()
518    }
519
520    /// An input blocker is another function that is the only function connected to an empty input
521    /// of the target function, and which is not ready to run, hence the target function cannot run.
522    #[cfg(feature = "debugger")]
523    pub(crate) fn get_input_blockers(&self, target_id: usize) -> Result<Vec<usize>> {
524        let mut input_blockers = vec![];
525        let target_function = self.get_function(target_id).ok_or("No such function")?;
526
527        // for each empty input of the target function
528        for (target_io, input) in target_function.inputs().iter().enumerate() {
529            if input.values_available() == 0 {
530                let mut senders = Vec::<usize>::new();
531
532                // go through all functions to see if sends to the target function on this input
533                for sender_function in self.submission.manifest.functions().values() {
534                    // if the sender function is not ready to run
535                    let mut sender_is_ready = false;
536
537                    for ready_job in &self.ready_jobs {
538                        if ready_job.process_id == sender_function.id() {
539                            sender_is_ready = true;
540                        }
541                    }
542
543                    if !sender_is_ready {
544                        // for each output route of the sending function, see if the target is
545                        // the target function and input
546                        for destination in sender_function.get_output_connections() {
547                            if (destination.destination_id == target_id)
548                                && (destination.destination_io_number == target_io)
549                            {
550                                senders.push(sender_function.id());
551                            }
552                        }
553                    }
554                }
555
556                // If unique sender to this Input, then the target function is waiting for that value
557                if senders.len() == 1 {
558                    input_blockers.extend(senders);
559                }
560            }
561        }
562
563        Ok(input_blockers)
564    }
565
566    // Create one or more new jobs for the function and mark it and ancestor flows as busy
567    pub(crate) fn create_jobs(&mut self, process_id: usize, parent_id: usize) -> Result<()> {
568        loop {
569            self.number_of_jobs_created = self
570                .number_of_jobs_created
571                .checked_add(1)
572                .ok_or("Ran out of job IDs")?;
573            let job_id = self.number_of_jobs_created;
574            let function = self.get_mut(process_id).ok_or("Could not get function")?;
575            if let Some(input_set) = function.take_input_set() {
576                let implementation_url = function.get_implementation_url().clone();
577                debug!(
578                    "Job #{job_id} created for Function #{process_id}({parent_id}) with inputs: {input_set:?}"
579                );
580                let job = Job {
581                    process_id,
582                    parent_id,
583                    #[cfg(feature = "debugger")]
584                    function_name: function.name().to_string(),
585                    connections: function.get_output_connections().clone(),
586                    payload: Payload {
587                        job_id,
588                        input_set,
589                        implementation_url,
590                    },
591                    result: Ok((None, false)),
592                };
593
594                // avoid getting stuck in a loop generating jobs for a function - generate just one
595                let always_ready = function.is_always_ready();
596                self.ready_jobs.push_back(job);
597                *self.busy_count.entry(process_id).or_insert(0) += 1;
598                for ancestor in self.ancestors(parent_id) {
599                    *self.busy_count.entry(ancestor).or_insert(0) += 1;
600                }
601                if always_ready {
602                    return Ok(());
603                }
604            } else {
605                self.number_of_jobs_created = self
606                    .number_of_jobs_created
607                    .checked_sub(1)
608                    .ok_or("Couldn't fix count")?;
609                return Ok(());
610            }
611        }
612    }
613
614    /// Return how many functions exist in this flow being executed
615    #[cfg(any(feature = "debugger", feature = "metrics"))]
616    #[must_use]
617    pub fn num_functions(&self) -> usize {
618        self.submission.manifest.functions().len()
619    }
620
621    /// Return the ancestor flow ids starting from `parent_id` up to the root
622    fn ancestors(&self, parent_id: usize) -> Vec<usize> {
623        let mut result = vec![parent_id];
624        let mut current = parent_id;
625        while let Some(flow_info) = self.submission.manifest.flows().get(&current) {
626            if let Some(pid) = flow_info.parent_id {
627                result.push(pid);
628                current = pid;
629            } else {
630                break; // reached root
631            }
632        }
633        result
634    }
635
636    // Check if ancestor flows have gone idle and run flow initializers if so
637    #[allow(unused_variables, unused_assignments, unused_mut)]
638    fn unblock_flows(
639        &mut self,
640        job: &Job,
641        #[cfg(feature = "debugger")] debugger: &mut Debugger,
642    ) -> Result<(bool, bool)> {
643        let mut display_next_output = false;
644        let mut restart = false;
645
646        self.remove_from_busy(job.process_id, job.parent_id);
647
648        // Check each ancestor flow, from innermost to root
649        for ancestor_id in self.ancestors(job.parent_id) {
650            if self.busy_count.contains_key(&ancestor_id) {
651                continue;
652            }
653
654            // No functions busy — check if any function can still run on internal data
655            if self.has_runnable_on_internal(ancestor_id) {
656                let runnable: Vec<_> = self
657                    .functions_by_flow
658                    .get(&ancestor_id)
659                    .cloned()
660                    .unwrap_or_default()
661                    .into_iter()
662                    .filter(|id| {
663                        !self.completed.contains(id)
664                            && self
665                                .submission
666                                .manifest
667                                .functions()
668                                .get(id)
669                                .is_some_and(RuntimeFunction::can_run)
670                    })
671                    .collect();
672                for func_id in runnable {
673                    self.create_jobs(func_id, ancestor_id)?;
674                }
675            } else {
676                // Flow has truly run to completion
677                debug!(
678                    "Job #{}:\tFlow #{} is now idle",
679                    job.payload.job_id, ancestor_id
680                );
681
682                #[cfg(feature = "debugger")]
683                {
684                    (display_next_output, restart) =
685                        debugger.check_prior_to_flow_unblock(self, ancestor_id)?;
686                }
687
688                self.clear_flow_internal_inputs(ancestor_id);
689                self.run_flow_initializers(ancestor_id)?;
690            }
691        }
692
693        Ok((display_next_output, restart))
694    }
695
696    // Decrement busy_count for the function and all its ancestor flows
697    fn remove_from_busy(&mut self, process_id: usize, parent_id: usize) {
698        // Decrement function's own count
699        if let Some(count) = self.busy_count.get_mut(&process_id) {
700            *count = count.saturating_sub(1);
701            if *count == 0 {
702                self.busy_count.remove(&process_id);
703            }
704        }
705        // Decrement ancestor flow counts
706        for ancestor in self.ancestors(parent_id) {
707            if let Some(count) = self.busy_count.get_mut(&ancestor) {
708                *count = count.saturating_sub(1);
709                if *count == 0 {
710                    self.busy_count.remove(&ancestor);
711                }
712            }
713        }
714        trace!("\t\t\tUpdated busy_count to: {:?}", self.busy_count);
715    }
716
717    fn has_runnable_on_internal(&self, flow_id: usize) -> bool {
718        self.functions_by_flow
719            .get(&flow_id)
720            .is_some_and(|func_ids| {
721                func_ids.iter().any(|id| {
722                    !self.completed.contains(id)
723                        && self
724                            .submission
725                            .manifest
726                            .functions()
727                            .get(id)
728                            .is_some_and(RuntimeFunction::can_run_on_internal)
729                })
730            })
731    }
732
733    fn clear_flow_internal_inputs(&mut self, flow_id: usize) {
734        if let Some(func_ids) = self.functions_by_flow.get(&flow_id).cloned() {
735            for id in &func_ids {
736                if !self.completed.contains(id) {
737                    if let Some(f) = self.submission.manifest.get_functions().get_mut(id) {
738                        f.clear_internal_inputs();
739                    }
740                }
741            }
742        }
743    }
744
745    fn run_flow_initializers(&mut self, flow_id: usize) -> Result<()> {
746        let mut runnable_functions = Vec::<usize>::new();
747        if let Some(func_ids) = self.functions_by_flow.get(&flow_id).cloned() {
748            for id in &func_ids {
749                if !self.completed.contains(id) {
750                    if let Some(f) = self.submission.manifest.get_functions().get_mut(id) {
751                        f.init_inputs(false, true);
752                        if f.can_run() {
753                            runnable_functions.push(*id);
754                        }
755                    }
756                }
757            }
758        }
759
760        for function_id in runnable_functions {
761            self.create_jobs(function_id, flow_id)?;
762        }
763
764        Ok(())
765    }
766
767    // Mark a function (via its ID) as having run to completion
768    pub(crate) fn mark_as_completed(&mut self, function_id: usize) {
769        self.completed.insert(function_id);
770    }
771}
772
773impl fmt::Display for RunState {
774    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
775        writeln!(f, "         Submission:\n{}", self.submission)?;
776
777        writeln!(f, "RunState:")?;
778        writeln!(f, "          Jobs Created: {}", self.number_of_jobs_created)?;
779        writeln!(f, "Number of Jobs Running: {}", self.running_jobs.len())?;
780        writeln!(f, "          Jobs Running: {:?}", self.running_jobs.keys())?;
781        writeln!(
782            f,
783            "       Functions Ready: {:?}",
784            self.ready_jobs
785                .iter() // jonesy:allow(capacity)
786                .map(|j| j.payload.job_id)
787                .collect::<Vec<usize>>()
788        )?;
789        writeln!(f, "   Functions Completed: {:?}", self.completed)?;
790        write!(f, "            Busy Count: {:?}", self.busy_count)
791    }
792}
793
794#[cfg(test)]
795#[allow(clippy::unwrap_used, clippy::expect_used)]
796mod test {
797    use serde_json::{json, Value};
798    use url::Url;
799
800    use flowcore::errors::Result;
801    use flowcore::model::flow_manifest::FlowManifest;
802    use flowcore::model::input::Input;
803    use flowcore::model::input::InputInitializer::Once;
804    use flowcore::model::metadata::MetaData;
805    use flowcore::model::output_connection::{OutputConnection, Source};
806    use flowcore::model::runtime_function::RuntimeFunction;
807    use flowcore::model::submission::Submission;
808
809    #[cfg(feature = "debugger")]
810    use crate::block::Block;
811    #[cfg(feature = "debugger")]
812    use crate::debug_command::DebugCommand;
813    #[cfg(feature = "debugger")]
814    use crate::debugger::Debugger;
815    #[cfg(feature = "debugger")]
816    use crate::debugger_handler::DebuggerHandler;
817
818    use super::RunState;
819    use super::State;
820    use super::{Job, Payload};
821
822    fn test_function_a_to_b_not_init() -> RuntimeFunction {
823        let connection_to_f1 = OutputConnection::new(
824            Source::default(),
825            1,
826            0,
827            0,
828            true,
829            "/fB".to_string(),
830            #[cfg(feature = "debugger")]
831            String::default(),
832        );
833
834        RuntimeFunction::new(
835            #[cfg(feature = "debugger")]
836            "fA",
837            #[cfg(feature = "debugger")]
838            "/fA",
839            "file://fake/test",
840            vec![Input::new(
841                #[cfg(feature = "debugger")]
842                "",
843                0,
844                false,
845                None,
846                None,
847            )],
848            0,
849            0,
850            &[connection_to_f1],
851            false,
852        ) // outputs to fB:0
853    }
854
855    fn test_function_a_to_b() -> RuntimeFunction {
856        let connection_to_f1 = OutputConnection::new(
857            Source::default(),
858            1,
859            0,
860            0,
861            true,
862            "/fB".to_string(),
863            #[cfg(feature = "debugger")]
864            String::default(),
865        );
866        RuntimeFunction::new(
867            #[cfg(feature = "debugger")]
868            "fA",
869            #[cfg(feature = "debugger")]
870            "/fA",
871            "file://fake/test",
872            vec![Input::new(
873                #[cfg(feature = "debugger")]
874                "",
875                0,
876                false,
877                Some(Once(json!(1))),
878                None,
879            )],
880            0,
881            0,
882            &[connection_to_f1],
883            false,
884        ) // outputs to fB:0
885    }
886
887    fn test_function_a_init() -> RuntimeFunction {
888        RuntimeFunction::new(
889            #[cfg(feature = "debugger")]
890            "fA",
891            #[cfg(feature = "debugger")]
892            "/fA",
893            "file://fake/test",
894            vec![Input::new(
895                #[cfg(feature = "debugger")]
896                "",
897                0,
898                false,
899                Some(Once(json!(1))),
900                None,
901            )],
902            0,
903            0,
904            &[],
905            false,
906        )
907    }
908
909    fn test_function_b_not_init() -> RuntimeFunction {
910        RuntimeFunction::new(
911            #[cfg(feature = "debugger")]
912            "fB",
913            #[cfg(feature = "debugger")]
914            "/fB",
915            "file://fake/test",
916            vec![Input::new(
917                #[cfg(feature = "debugger")]
918                "",
919                0,
920                false,
921                None,
922                None,
923            )],
924            1,
925            0,
926            &[],
927            false,
928        )
929    }
930
931    fn test_job(source_process_id: usize, destination_process_id: usize) -> Job {
932        let out_conn = OutputConnection::new(
933            Source::default(),
934            destination_process_id,
935            0,
936            0,
937            true,
938            String::default(),
939            #[cfg(feature = "debugger")]
940            String::default(),
941        );
942        Job {
943            process_id: source_process_id,
944            parent_id: 0,
945            #[cfg(feature = "debugger")]
946            function_name: String::new(),
947            connections: vec![out_conn],
948            payload: Payload {
949                job_id: 1,
950                implementation_url: Url::parse("file://test").expect("Could not parse Url"),
951                input_set: vec![json!(1)],
952            },
953            result: Ok((Some(json!(1)), true)),
954        }
955    }
956
957    #[cfg(feature = "debugger")]
958    struct DummyServer;
959
960    #[cfg(feature = "debugger")]
961    impl DebuggerHandler for DummyServer {
962        fn start(&mut self) {}
963        fn job_breakpoint(&mut self, _job: &Job, _function: &RuntimeFunction, _states: Vec<State>) {
964        }
965        fn block_breakpoint(&mut self, _block: &Block) {}
966        fn flow_unblock_breakpoint(&mut self, _flow_id: usize) {}
967        fn send_breakpoint(
968            &mut self,
969            _: &str,
970            _source_process_id: usize,
971            _output_route: &str,
972            _value: &Value,
973            _destination_id: usize,
974            _destination_name: &str,
975            _input_name: &str,
976            _input_number: usize,
977        ) {
978        }
979        fn job_error(&mut self, _job: &Job) {}
980        fn job_completed(&mut self, _job: &Job) {}
981        fn blocks(&mut self, _blocks: Vec<Block>) {}
982        fn outputs(&mut self, _output: Vec<OutputConnection>) {}
983        fn input(&mut self, _input: Input) {}
984        fn function_list(&mut self, _functions: &[RuntimeFunction]) {}
985        fn function_states(&mut self, _function: RuntimeFunction, _function_states: Vec<State>) {}
986        fn run_state(&mut self, _run_state: &RunState) {}
987        fn message(&mut self, _message: String) {}
988        fn panic(&mut self, _state: &RunState, _error_message: String) {}
989        fn debugger_exiting(&mut self) {}
990        fn debugger_resetting(&mut self) {}
991        fn debugger_error(&mut self, _error: String) {}
992        fn execution_starting(&mut self) {}
993        fn execution_ended(&mut self) {}
994        fn get_command(&mut self, _state: &RunState) -> Result<DebugCommand> {
995            unimplemented!();
996        }
997    }
998
999    #[cfg(feature = "debugger")]
1000    fn dummy_debugger(server: &mut dyn DebuggerHandler) -> Debugger<'_> {
1001        Debugger::new(server)
1002    }
1003
1004    fn test_meta_data() -> MetaData {
1005        MetaData {
1006            name: "test".into(),
1007            version: "0.0.0".into(),
1008            description: "a test".into(),
1009            authors: vec!["me".into()],
1010        }
1011    }
1012
1013    fn test_manifest(functions: Vec<RuntimeFunction>) -> FlowManifest {
1014        let mut manifest = FlowManifest::new(test_meta_data());
1015        for function in functions {
1016            manifest.add_function(function);
1017        }
1018        manifest
1019    }
1020
1021    fn test_submission(functions: Vec<RuntimeFunction>) -> Submission {
1022        Submission::new(
1023            test_manifest(functions),
1024            None,
1025            None,
1026            #[cfg(feature = "debugger")]
1027            true,
1028        )
1029    }
1030
1031    mod general_run_state_tests {
1032        use super::super::RunState;
1033
1034        #[test]
1035        fn display_run_state_test() {
1036            let f_a = super::test_function_a_to_b();
1037            let f_b = super::test_function_b_not_init();
1038            let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1039            state.init().expect("Could not init state");
1040
1041            #[cfg(any(feature = "debugger", feature = "metrics"))]
1042            assert_eq!(state.num_functions(), 2);
1043
1044            println!("Run state: {state}");
1045        }
1046
1047        #[cfg(feature = "metrics")]
1048        #[test]
1049        fn jobs_created_zero_at_init() {
1050            let mut state = RunState::new(super::test_submission(vec![]));
1051            state.init().expect("Could not init state");
1052            assert_eq!(
1053                0,
1054                state.get_number_of_jobs_created(),
1055                "At init jobs() should be 0"
1056            );
1057            assert_eq!(0, state.number_jobs_ready());
1058        }
1059
1060        #[cfg(feature = "debugger")]
1061        #[test]
1062        fn zero_running_at_init() {
1063            let mut state = RunState::new(super::test_submission(vec![]));
1064            state.init().expect("Could not init state");
1065            assert!(
1066                state.get_running().is_empty(),
1067                "At init get_running() should be empty"
1068            );
1069        }
1070    }
1071
1072    /********************************* State Transition Tests *********************************/
1073    mod state_transitions {
1074        use serde_json::json;
1075        use serial_test::serial;
1076        use url::Url;
1077
1078        use flowcore::model::input::Input;
1079        use flowcore::model::input::InputInitializer::Always;
1080        #[cfg(feature = "metrics")]
1081        use flowcore::model::metrics::Metrics;
1082        use flowcore::model::output_connection::{OutputConnection, Source};
1083        use flowcore::model::runtime_function::RuntimeFunction;
1084
1085        use crate::run_state::test::test_function_b_not_init;
1086
1087        use super::super::RunState;
1088        use super::super::State;
1089        use super::super::{Job, Payload};
1090
1091        #[test]
1092        fn to_ready_1_on_init() {
1093            let f_a = super::test_function_a_to_b();
1094            let f_b = test_function_b_not_init();
1095            let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1096
1097            // Event
1098            state.init().expect("Could not init state");
1099
1100            // Test
1101            assert!(
1102                state.function_state_is_only(0, &State::Ready),
1103                "f_a should be Ready"
1104            );
1105            assert_eq!(1, state.number_jobs_ready());
1106            assert!(
1107                state.function_state_is_only(1, &State::Waiting),
1108                "f_b should be waiting for input"
1109            );
1110        }
1111
1112        #[test]
1113        fn input_blocker() {
1114            let f_a = super::test_function_a_to_b_not_init();
1115            let f_b = test_function_b_not_init();
1116            let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1117
1118            // Event
1119            state.init().expect("Could not init state");
1120
1121            // Test
1122            assert!(
1123                state.function_state_is_only(0, &State::Waiting),
1124                "f_a should be waiting for input"
1125            );
1126            assert!(
1127                state.function_state_is_only(1, &State::Waiting),
1128                "f_b should be waiting for input"
1129            );
1130            #[cfg(feature = "debugger")]
1131            assert!(
1132                state
1133                    .get_input_blockers(1)
1134                    .expect("Could not get blockers")
1135                    .contains(&0),
1136                "There should be an input blocker"
1137            );
1138        }
1139
1140        #[test]
1141        fn to_ready_2_on_init() {
1142            let f_a = super::test_function_a_to_b();
1143            let f_b = test_function_b_not_init();
1144            let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1145
1146            // Event
1147            state.init().expect("Could not init state");
1148
1149            // Test
1150            assert!(
1151                state.function_state_is_only(0, &State::Ready),
1152                "f_a should be Ready"
1153            );
1154        }
1155
1156        #[test]
1157        fn to_ready_3_on_init() {
1158            let f_a = super::test_function_a_init();
1159            let mut state = RunState::new(super::test_submission(vec![f_a]));
1160
1161            // Event
1162            state.init().expect("Could not init state");
1163
1164            // Test
1165            assert!(
1166                state.function_state_is_only(0, &State::Ready),
1167                "f_a should be Ready"
1168            );
1169        }
1170
1171        fn test_function_a_not_init() -> RuntimeFunction {
1172            RuntimeFunction::new(
1173                #[cfg(feature = "debugger")]
1174                "fA",
1175                #[cfg(feature = "debugger")]
1176                "/fA",
1177                "file://fake/test",
1178                vec![Input::new(
1179                    #[cfg(feature = "debugger")]
1180                    "",
1181                    0,
1182                    false,
1183                    None,
1184                    None,
1185                )],
1186                0,
1187                0,
1188                &[],
1189                false,
1190            )
1191        }
1192
1193        #[test]
1194        fn to_waiting_on_init() {
1195            let f_a = test_function_a_not_init();
1196            let mut state = RunState::new(super::test_submission(vec![f_a]));
1197
1198            // Event
1199            state.init().expect("Could not init state");
1200
1201            // Test
1202            assert!(
1203                state.function_state_is_only(0, &State::Waiting),
1204                "f_a should be Waiting"
1205            );
1206        }
1207
1208        #[test]
1209        fn ready_to_running_on_next() {
1210            let f_a = super::test_function_a_init();
1211            let mut state = RunState::new(super::test_submission(vec![f_a]));
1212            state.init().expect("Could not init state");
1213            assert!(
1214                state.function_state_is_only(0, &State::Ready),
1215                "f_a should be Ready"
1216            );
1217
1218            // Event
1219            let job = state.get_next_job().expect("Couldn't get next job");
1220            state.start_job(job.clone());
1221
1222            // Test
1223            state
1224                .running_jobs
1225                .get(&job.payload.job_id)
1226                .expect("Job should have been running");
1227        }
1228
1229        #[test]
1230        fn unready_not_to_running_on_next() {
1231            let f_a = test_function_a_not_init();
1232            let mut state = RunState::new(super::test_submission(vec![f_a]));
1233            state.init().expect("Could not init state");
1234            assert!(
1235                state.function_state_is_only(0, &State::Waiting),
1236                "f_a should be Waiting"
1237            );
1238
1239            // Event
1240            assert!(
1241                state.get_next_job().is_none(),
1242                "next_job() should return None"
1243            );
1244
1245            // Test
1246            assert!(
1247                state.function_state_is_only(0, &State::Waiting),
1248                "f_a should be Waiting"
1249            );
1250        }
1251
1252        fn test_job() -> Job {
1253            Job {
1254                process_id: 0,
1255                #[cfg(feature = "debugger")]
1256                function_name: String::new(),
1257                parent_id: 0,
1258                connections: vec![],
1259                payload: Payload {
1260                    job_id: 1,
1261                    implementation_url: Url::parse("file://test").expect("Could not parse Url"),
1262                    input_set: vec![json!(1)],
1263                },
1264                result: Ok((None, true)),
1265            }
1266        }
1267
1268        #[test]
1269        #[serial]
1270        fn running_to_ready_on_done() {
1271            let f_a = RuntimeFunction::new(
1272                #[cfg(feature = "debugger")]
1273                "fA",
1274                #[cfg(feature = "debugger")]
1275                "/fA",
1276                "file://fake/test",
1277                vec![Input::new(
1278                    #[cfg(feature = "debugger")]
1279                    "",
1280                    0,
1281                    false,
1282                    Some(Always(json!(1))),
1283                    None,
1284                )],
1285                0,
1286                0,
1287                &[],
1288                false,
1289            );
1290
1291            let mut state = RunState::new(super::test_submission(vec![f_a]));
1292            #[cfg(feature = "metrics")]
1293            let mut metrics = Metrics::new(1);
1294            #[cfg(feature = "debugger")]
1295            let mut server = super::DummyServer {};
1296            #[cfg(feature = "debugger")]
1297            let mut debugger = super::dummy_debugger(&mut server);
1298
1299            state.init().expect("Could not init state");
1300            assert!(
1301                state.function_state_is_only(0, &State::Ready),
1302                "f_a should be Ready"
1303            );
1304            let job = state.get_next_job().expect("Couldn't get next job");
1305            assert_eq!(
1306                0, job.process_id,
1307                "get_next_job() should return process_id = 0"
1308            );
1309            state.start_job(job.clone());
1310
1311            state
1312                .running_jobs
1313                .get(&job.payload.job_id)
1314                .expect("Job with f_a should be Running");
1315
1316            // Event
1317            let job = test_job();
1318            state
1319                .retire_a_job(
1320                    #[cfg(feature = "metrics")]
1321                    &mut metrics,
1322                    (job.payload.job_id, job.result),
1323                    #[cfg(feature = "debugger")]
1324                    &mut debugger,
1325                )
1326                .expect("Problem retiring job");
1327
1328            // Test
1329            assert!(
1330                state.function_state_is_only(0, &State::Ready),
1331                "f_a should be Ready again"
1332            );
1333        }
1334
1335        // Done: it has one input or more empty, to it can't run
1336        #[test]
1337        #[serial]
1338        fn running_to_waiting_on_done() {
1339            let f_a = super::test_function_a_init();
1340
1341            let mut state = RunState::new(super::test_submission(vec![f_a]));
1342            #[cfg(feature = "metrics")]
1343            let mut metrics = Metrics::new(1);
1344            #[cfg(feature = "debugger")]
1345            let mut server = super::DummyServer {};
1346            #[cfg(feature = "debugger")]
1347            let mut debugger = super::dummy_debugger(&mut server);
1348
1349            state.init().expect("Could not init state");
1350            assert!(
1351                state.function_state_is_only(0, &State::Ready),
1352                "f_a should be Ready"
1353            );
1354            let job = state.get_next_job().expect("Couldn't get next job");
1355            assert_eq!(0, job.process_id, "next() should return process_id = 0");
1356            state.start_job(job.clone());
1357
1358            state
1359                .running_jobs
1360                .get(&job.payload.job_id)
1361                .expect("Job with f_a should be Running");
1362
1363            // Event
1364            let job = test_job();
1365            state
1366                .retire_a_job(
1367                    #[cfg(feature = "metrics")]
1368                    &mut metrics,
1369                    (job.payload.job_id, job.result),
1370                    #[cfg(feature = "debugger")]
1371                    &mut debugger,
1372                )
1373                .expect("Problem retiring job");
1374
1375            // Test
1376            assert!(
1377                state.function_state_is_only(0, &State::Waiting),
1378                "f_a should be Waiting again"
1379            );
1380        }
1381
1382        #[test]
1383        #[serial]
1384        fn waiting_to_ready_on_input() {
1385            let f_a = test_function_a_not_init();
1386            let out_conn = OutputConnection::new(
1387                Source::default(),
1388                0,
1389                0,
1390                0,
1391                true,
1392                String::default(),
1393                #[cfg(feature = "debugger")]
1394                String::default(),
1395            );
1396            let f_b = RuntimeFunction::new(
1397                #[cfg(feature = "debugger")]
1398                "fB",
1399                #[cfg(feature = "debugger")]
1400                "/fB",
1401                "file://fake/test",
1402                vec![Input::new(
1403                    #[cfg(feature = "debugger")]
1404                    "",
1405                    0,
1406                    false,
1407                    None,
1408                    None,
1409                )],
1410                1,
1411                0,
1412                &[out_conn],
1413                false,
1414            );
1415            let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1416            #[cfg(feature = "metrics")]
1417            let mut metrics = Metrics::new(1);
1418            #[cfg(feature = "debugger")]
1419            let mut server = super::DummyServer {};
1420            #[cfg(feature = "debugger")]
1421            let mut debugger = super::dummy_debugger(&mut server);
1422
1423            state.init().expect("Could not init state");
1424            assert!(
1425                state.function_state_is_only(0, &State::Waiting),
1426                "f_a should be Waiting"
1427            );
1428
1429            // Event run f_b which will send to f_a
1430            let job = super::test_job(1, 0);
1431            state.start_job(job.clone());
1432
1433            state
1434                .retire_a_job(
1435                    #[cfg(feature = "metrics")]
1436                    &mut metrics,
1437                    (job.payload.job_id, job.result),
1438                    #[cfg(feature = "debugger")]
1439                    &mut debugger,
1440                )
1441                .expect("Problem retiring job");
1442
1443            // Test
1444            assert!(
1445                state.function_state_is_only(0, &State::Ready),
1446                "f_a should be Ready"
1447            );
1448        }
1449
1450        /*
1451            fA (#0) has an input but not initialized, outputs to #1 (fB)
1452            fB (#1) has an input with a ConstantInitializer, outputs back to #0 (fA)
1453        */
1454        #[test]
1455        #[serial]
1456        fn waiting_to_blocked_on_input() {
1457            let f_a = super::test_function_a_to_b_not_init();
1458            let connection_to_f0 = OutputConnection::new(
1459                Source::default(),
1460                0,
1461                0,
1462                0,
1463                true,
1464                String::default(),
1465                #[cfg(feature = "debugger")]
1466                String::default(),
1467            );
1468            let f_b = RuntimeFunction::new(
1469                #[cfg(feature = "debugger")]
1470                "fB",
1471                #[cfg(feature = "debugger")]
1472                "/fB",
1473                "file://fake/test",
1474                vec![Input::new(
1475                    #[cfg(feature = "debugger")]
1476                    "",
1477                    0,
1478                    false,
1479                    Some(Always(json!(1))),
1480                    None,
1481                )],
1482                1,
1483                0,
1484                &[connection_to_f0],
1485                false,
1486            );
1487            let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1488            #[cfg(feature = "metrics")]
1489            let mut metrics = Metrics::new(1);
1490            #[cfg(feature = "debugger")]
1491            let mut server = super::DummyServer {};
1492            #[cfg(feature = "debugger")]
1493            let mut debugger = super::dummy_debugger(&mut server);
1494
1495            state.init().expect("Could not init state");
1496
1497            assert!(
1498                state.function_state_is_only(1, &State::Ready),
1499                "f_b should be Ready"
1500            );
1501            assert!(
1502                state.function_state_is_only(0, &State::Waiting),
1503                "f_a should be in Waiting"
1504            );
1505
1506            // create output from f_b as if it had run - will send to f_a
1507            let job = super::test_job(1, 0);
1508            state.start_job(job.clone());
1509
1510            state
1511                .retire_a_job(
1512                    #[cfg(feature = "metrics")]
1513                    &mut metrics,
1514                    (job.payload.job_id, job.result),
1515                    #[cfg(feature = "debugger")]
1516                    &mut debugger,
1517                )
1518                .expect("Problem retiring job");
1519
1520            // Test
1521            assert!(
1522                state.function_state_is_only(0, &State::Ready),
1523                "f_a should be Ready"
1524            );
1525        }
1526    }
1527
1528    /****************************** Miscellaneous tests **************************/
1529    mod functional_tests {
1530        // Tests using Debugger (and hence Client/Server connection) need to be executed in parallel
1531        // to avoid multiple trying to bind to the same socket at the same time
1532        use serial_test::serial;
1533
1534        use flowcore::model::input::Input;
1535        #[cfg(feature = "metrics")]
1536        use flowcore::model::metrics::Metrics;
1537        use flowcore::model::output_connection::{OutputConnection, Source};
1538        use flowcore::model::runtime_function::RuntimeFunction;
1539
1540        use super::super::RunState;
1541
1542        fn test_functions() -> Vec<RuntimeFunction> {
1543            let out_conn1 = OutputConnection::new(
1544                Source::default(),
1545                1,
1546                0,
1547                0,
1548                true,
1549                String::default(),
1550                #[cfg(feature = "debugger")]
1551                String::default(),
1552            );
1553            let out_conn2 = OutputConnection::new(
1554                Source::default(),
1555                2,
1556                0,
1557                0,
1558                true,
1559                String::default(),
1560                #[cfg(feature = "debugger")]
1561                String::default(),
1562            );
1563            let p0 = RuntimeFunction::new(
1564                #[cfg(feature = "debugger")]
1565                "p0",
1566                #[cfg(feature = "debugger")]
1567                "/p0",
1568                "file://fake/test/p0",
1569                vec![], // input array
1570                0,
1571                0,
1572                &[out_conn1, out_conn2], // destinations
1573                false,
1574            ); // implementation
1575            let p1 = RuntimeFunction::new(
1576                #[cfg(feature = "debugger")]
1577                "p1",
1578                #[cfg(feature = "debugger")]
1579                "/p1",
1580                "file://fake/test/p1",
1581                vec![Input::new(
1582                    #[cfg(feature = "debugger")]
1583                    "",
1584                    0,
1585                    false,
1586                    None,
1587                    None,
1588                )], // inputs array
1589                1,
1590                0,
1591                &[],
1592                false,
1593            );
1594            let p2 = RuntimeFunction::new(
1595                #[cfg(feature = "debugger")]
1596                "p2",
1597                #[cfg(feature = "debugger")]
1598                "/p2",
1599                "file://fake/test/p2",
1600                vec![Input::new(
1601                    #[cfg(feature = "debugger")]
1602                    "",
1603                    0,
1604                    false,
1605                    None,
1606                    None,
1607                )], // inputs array
1608                2,
1609                0,
1610                &[],
1611                false,
1612            );
1613            vec![p0, p1, p2]
1614        }
1615
1616        #[test]
1617        fn get_works() {
1618            let state = RunState::new(super::test_submission(test_functions()));
1619            let got = state
1620                .get_function(1)
1621                .ok_or("Could not get function by id")
1622                .expect("Could not get function with that id");
1623            assert_eq!(got.id(), 1);
1624        }
1625
1626        #[test]
1627        fn no_next_if_none_ready() {
1628            let mut state = RunState::new(super::test_submission(test_functions()));
1629            assert!(state.get_next_job().is_none());
1630        }
1631
1632        #[test]
1633        fn next_works() {
1634            let mut state = RunState::new(super::test_submission(test_functions()));
1635
1636            // Put 0 on the ready list
1637            state.create_jobs(0, 0).expect("Could not create jobs");
1638
1639            state.get_next_job().expect("Couldn't get next job");
1640        }
1641
1642        #[test]
1643        fn inputs_ready_makes_ready() {
1644            let mut state = RunState::new(super::test_submission(test_functions()));
1645
1646            // Put 0 on the ready list
1647            state.create_jobs(0, 0).expect("Could not create jobs");
1648
1649            state.get_next_job().expect("Couldn't get next job");
1650        }
1651
1652        #[test]
1653        #[serial]
1654        fn wont_return_too_many_jobs() {
1655            let mut state = RunState::new(super::test_submission(test_functions()));
1656
1657            state.init().expect("Could not init state");
1658
1659            let _ = state.get_next_job().expect("Couldn't get next job");
1660
1661            assert!(
1662                state.get_next_job().is_none(),
1663                "Did not expect a Ready job!"
1664            );
1665        }
1666
1667        /*
1668            This test checks that a function with no output destinations (even if pure and produces
1669            some output) can be executed and nothing crashes
1670        */
1671        #[test]
1672        #[serial]
1673        fn pure_function_no_destinations() {
1674            let f_a = super::test_function_a_init();
1675            let _id = f_a.id();
1676
1677            let mut state = RunState::new(super::test_submission(vec![f_a]));
1678            #[cfg(feature = "metrics")]
1679            let mut metrics = Metrics::new(1);
1680            #[cfg(feature = "debugger")]
1681            let mut server = super::DummyServer {};
1682            #[cfg(feature = "debugger")]
1683            let mut debugger = super::dummy_debugger(&mut server);
1684
1685            state.init().expect("Could not init state");
1686
1687            let job = state.get_next_job().expect("Couldn't get next job");
1688            state.start_job(job.clone());
1689
1690            // Test there is no problem producing an Output when no destinations to send it to
1691            state
1692                .retire_a_job(
1693                    #[cfg(feature = "metrics")]
1694                    &mut metrics,
1695                    (job.payload.job_id, job.result),
1696                    #[cfg(feature = "debugger")]
1697                    &mut debugger,
1698                )
1699                .expect("Failed to retire job correctly");
1700        }
1701    }
1702}