1use std::collections::VecDeque;
2use std::collections::{HashMap, HashSet};
3use std::fmt;
4
5use log::{debug, error, info, trace};
6use serde_derive::{Deserialize, Serialize};
7use serde_json::Value;
8
9use flowcore::errors::Result;
10#[cfg(feature = "metrics")]
11use flowcore::model::metrics::Metrics;
12use flowcore::model::output_connection::OutputConnection;
13use flowcore::model::output_connection::Source::{Input, Output};
14use flowcore::model::runtime_function::RuntimeFunction;
15use flowcore::model::submission::Submission;
16use flowcore::RunAgain;
17
18#[cfg(debug_assertions)]
19use crate::checks;
20#[cfg(feature = "debugger")]
21use crate::debugger::Debugger;
22use crate::job::{Job, Payload};
23
24#[cfg(any(debug_assertions, feature = "debugger", test))]
26#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
27pub enum State {
28 Ready,
30 Waiting,
32 Running,
35 Completed,
38}
39
40#[derive(Deserialize, Serialize, Clone)]
158pub struct RunState {
159 pub(crate) submission: Submission,
161 ready_jobs: VecDeque<Job>,
163 running_jobs: HashMap<usize, Job>,
165 completed: HashSet<usize>,
168 number_of_jobs_created: usize,
170 busy_count: HashMap<usize, usize>,
172 functions_by_flow: HashMap<usize, Vec<usize>>,
174}
175
176impl RunState {
177 #[must_use]
180 pub fn new(submission: Submission) -> Self {
181 let mut functions_by_flow = HashMap::<usize, Vec<usize>>::new();
182 for (id, function) in submission.manifest.functions() {
183 functions_by_flow
184 .entry(function.get_parent_id())
185 .or_default()
186 .push(*id);
187 }
188
189 RunState {
190 submission,
191 ready_jobs: VecDeque::<Job>::new(),
192 running_jobs: HashMap::<usize, Job>::new(),
193 completed: HashSet::<usize>::new(),
194 number_of_jobs_created: 0,
195 busy_count: HashMap::<usize, usize>::new(),
196 functions_by_flow,
197 }
198 }
199
200 #[cfg(any(debug_assertions, feature = "debugger"))]
201 pub(crate) fn get_functions(&self) -> &HashMap<usize, RuntimeFunction> {
203 self.submission.manifest.functions()
204 }
205
206 #[cfg(feature = "debugger")]
208 fn reset(&mut self) {
209 debug!("Resetting RunState");
210 for function in self.submission.manifest.get_functions().values_mut() {
211 function.reset();
212 }
213 self.ready_jobs.clear();
214 self.running_jobs.clear();
215 self.completed.clear();
216 self.number_of_jobs_created = 0;
217 self.busy_count.clear();
218 }
219
220 pub(crate) fn init(&mut self) -> Result<()> {
229 #[cfg(feature = "debugger")]
230 self.reset();
231
232 let mut make_ready_list = vec![];
233
234 debug!("Initializing all functions");
235 for function in self.submission.manifest.get_functions().values_mut() {
236 function.init();
237 if function.can_run() {
238 make_ready_list.push((function.id(), function.get_parent_id()));
239 }
240 }
241
242 for (process_id, parent_id) in make_ready_list {
243 self.create_jobs(process_id, parent_id)?;
244 }
245
246 Ok(())
247 }
248
249 #[cfg(any(debug_assertions, feature = "debugger", test))]
251 #[must_use]
252 pub fn get_function_states(&self, function_id: usize) -> Vec<State> {
253 let mut states = vec![];
254
255 if self.completed.contains(&function_id) {
256 states.push(State::Completed);
257 }
258
259 for ready_job in &self.ready_jobs {
260 if ready_job.process_id == function_id {
261 states.push(State::Ready);
262 }
263 }
264
265 if states.is_empty() {
266 states.push(State::Waiting);
267 }
268
269 states
270 }
271
272 #[cfg(test)]
274 pub(crate) fn function_state_is_only(&self, function_id: usize, state: &State) -> bool {
275 let function_states = self.get_function_states(function_id);
276 function_states.len() == 1 && function_states.contains(state)
277 }
278
279 #[cfg(any(feature = "debugger", debug_assertions))]
281 #[must_use]
282 pub fn get_running(&self) -> &HashMap<usize, Job> {
283 &self.running_jobs
284 }
285
286 #[cfg(any(feature = "debugger", test))]
288 #[must_use]
289 pub fn get_function(&self, id: usize) -> Option<&RuntimeFunction> {
290 self.submission.manifest.functions().get(&id)
291 }
292
293 fn get_mut(&mut self, id: usize) -> Option<&mut RuntimeFunction> {
295 self.submission.manifest.get_functions().get_mut(&id)
296 }
297
298 #[cfg(debug_assertions)]
299 #[must_use]
301 pub fn get_busy_count(&self) -> &HashMap<usize, usize> {
302 &self.busy_count
303 }
304
305 pub(crate) fn get_next_job(&mut self) -> Option<Job> {
307 if let Some(limit) = self.submission.max_parallel_jobs {
308 if self.number_jobs_running() >= limit {
309 trace!("max_parallel_jobs limit of {limit} reached");
310 return None;
311 }
312 }
313
314 self.ready_jobs.remove(0)
315 }
316
317 pub(crate) fn start_job(&mut self, job: Job) {
319 self.running_jobs.insert(job.payload.job_id, job);
320 }
321
322 #[cfg(any(feature = "metrics", feature = "debugger"))]
324 #[must_use]
325 pub fn get_number_of_jobs_created(&self) -> usize {
326 self.number_of_jobs_created
327 }
328
329 #[allow(unused_variables, unused_assignments, unused_mut)]
338 pub(crate) fn retire_a_job(
339 &mut self,
340 #[cfg(feature = "metrics")] metrics: &mut Metrics,
341 result: (usize, Result<(Option<Value>, RunAgain)>),
342 #[cfg(feature = "debugger")] debugger: &mut Debugger,
343 ) -> Result<(bool, bool, Job)> {
344 let mut display_next_output = false;
345 let mut restart = false;
346
347 let mut job = self
348 .running_jobs
349 .remove(&result.0)
350 .ok_or_else(|| format!("Could not find Job#{} to retire it", result.0))?;
351
352 match &result.1 {
353 Ok((output_value, function_can_run_again)) => {
354 #[cfg(feature = "debugger")]
355 debug!(
356 "Job #{}: Function #{} '{}' {:?} -> {:?}",
357 job.payload.job_id,
358 job.process_id,
359 job.function_name,
360 job.payload.input_set,
361 output_value
362 );
363 #[cfg(not(feature = "debugger"))]
364 debug!(
365 "Job #{}: Function #{} {:?} -> {:?}",
366 job.payload.job_id, job.process_id, job.payload.input_set, output_value
367 );
368
369 for connection in &job.connections {
370 let value_to_send = match &connection.source {
371 Output(route) => match output_value {
372 Some(output_v) => output_v.pointer(route),
373 None => None,
374 },
375 Input(index) => job.payload.input_set.get(*index),
376 };
377
378 if let Some(value) = value_to_send {
379 (display_next_output, restart) = self.send_a_value(
380 job.process_id,
381 job.parent_id,
382 connection,
383 value.clone(),
384 #[cfg(feature = "metrics")]
385 metrics,
386 #[cfg(feature = "debugger")]
387 debugger,
388 )?;
389 } else {
390 trace!(
391 "Job #{}:\t\tNo value found at '{}'",
392 job.payload.job_id,
393 connection.source
394 );
395 }
396 }
397
398 if *function_can_run_again {
399 let function = self.get_mut(job.process_id).ok_or("No such function")?;
400
401 function.init_inputs(false, false);
403
404 if function.can_run() {
407 self.create_jobs(job.process_id, job.parent_id)?;
408 }
409 } else {
410 self.mark_as_completed(job.process_id);
412 }
413 }
414 Err(e) => {
415 error!("Error in Job #{}: {e}", job.payload.job_id);
416 }
417 }
418
419 (display_next_output, restart) = self.unblock_flows(
422 &job,
423 #[cfg(feature = "debugger")]
424 debugger,
425 )?;
426
427 #[cfg(debug_assertions)]
428 checks::check_invariants(self, job.payload.job_id)?;
429
430 trace!(
431 "Job #{}: Completed-----------------------",
432 job.payload.job_id
433 );
434 job.result = result.1;
435
436 Ok((display_next_output, restart, job))
437 }
438
439 fn send_a_value(
442 &mut self,
443 source_id: usize,
444 _source_parent_id: usize,
445 connection: &OutputConnection,
446 output_value: Value,
447 #[cfg(feature = "metrics")] metrics: &mut Metrics,
448 #[cfg(feature = "debugger")] debugger: &mut Debugger,
449 ) -> Result<(bool, bool)> {
450 let mut display_next_output = false;
451 let mut restart = false;
452
453 let route_str = match &connection.source {
454 Output(route) if route.is_empty() => String::new(),
455 Output(route) => format!(" from output route '{route}'"),
456 Input(index) => format!(" from Job input #{index}"),
457 };
458
459 let loopback = source_id == connection.destination_id;
460
461 if loopback {
462 info!("\t\tFunction #{source_id} loopback of value '{output_value}'{route_str} to Self:{}",
463 connection.destination_io_number);
464 } else {
465 info!(
466 "\t\tFunction #{source_id} sending '{output_value}'{route_str} to Function #{}:{}",
467 connection.destination_id, connection.destination_io_number
468 );
469 }
470
471 #[cfg(feature = "debugger")]
472 if let Output(route) = &connection.source {
473 (display_next_output, restart) = debugger.check_prior_to_send(
474 self,
475 source_id,
476 route,
477 &output_value,
478 connection.destination_id,
479 connection.destination_io_number,
480 )?;
481 }
482
483 let function = self
484 .get_mut(connection.destination_id)
485 .ok_or("Could not get function")?;
486 let job_count_before = function.input_sets_available();
487 if connection.internal {
488 function.send_internal(connection.destination_io_number, output_value)?;
489 } else {
490 function.send(connection.destination_io_number, output_value)?;
491 }
492
493 #[cfg(feature = "metrics")]
494 metrics.increment_outputs_sent(); let new_job_available = function.input_sets_available() > job_count_before;
497
498 if new_job_available && !loopback {
502 self.create_jobs(connection.destination_id, connection.destination_parent_id)?;
503 }
504
505 Ok((display_next_output, restart))
506 }
507
508 #[must_use]
510 pub fn number_jobs_running(&self) -> usize {
511 self.running_jobs.len()
512 }
513
514 #[must_use]
516 pub fn number_jobs_ready(&self) -> usize {
517 self.ready_jobs.len()
518 }
519
520 #[cfg(feature = "debugger")]
523 pub(crate) fn get_input_blockers(&self, target_id: usize) -> Result<Vec<usize>> {
524 let mut input_blockers = vec![];
525 let target_function = self.get_function(target_id).ok_or("No such function")?;
526
527 for (target_io, input) in target_function.inputs().iter().enumerate() {
529 if input.values_available() == 0 {
530 let mut senders = Vec::<usize>::new();
531
532 for sender_function in self.submission.manifest.functions().values() {
534 let mut sender_is_ready = false;
536
537 for ready_job in &self.ready_jobs {
538 if ready_job.process_id == sender_function.id() {
539 sender_is_ready = true;
540 }
541 }
542
543 if !sender_is_ready {
544 for destination in sender_function.get_output_connections() {
547 if (destination.destination_id == target_id)
548 && (destination.destination_io_number == target_io)
549 {
550 senders.push(sender_function.id());
551 }
552 }
553 }
554 }
555
556 if senders.len() == 1 {
558 input_blockers.extend(senders);
559 }
560 }
561 }
562
563 Ok(input_blockers)
564 }
565
566 pub(crate) fn create_jobs(&mut self, process_id: usize, parent_id: usize) -> Result<()> {
568 loop {
569 self.number_of_jobs_created = self
570 .number_of_jobs_created
571 .checked_add(1)
572 .ok_or("Ran out of job IDs")?;
573 let job_id = self.number_of_jobs_created;
574 let function = self.get_mut(process_id).ok_or("Could not get function")?;
575 if let Some(input_set) = function.take_input_set() {
576 let implementation_url = function.get_implementation_url().clone();
577 debug!(
578 "Job #{job_id} created for Function #{process_id}({parent_id}) with inputs: {input_set:?}"
579 );
580 let job = Job {
581 process_id,
582 parent_id,
583 #[cfg(feature = "debugger")]
584 function_name: function.name().to_string(),
585 connections: function.get_output_connections().clone(),
586 payload: Payload {
587 job_id,
588 input_set,
589 implementation_url,
590 },
591 result: Ok((None, false)),
592 };
593
594 let always_ready = function.is_always_ready();
596 self.ready_jobs.push_back(job);
597 *self.busy_count.entry(process_id).or_insert(0) += 1;
598 for ancestor in self.ancestors(parent_id) {
599 *self.busy_count.entry(ancestor).or_insert(0) += 1;
600 }
601 if always_ready {
602 return Ok(());
603 }
604 } else {
605 self.number_of_jobs_created = self
606 .number_of_jobs_created
607 .checked_sub(1)
608 .ok_or("Couldn't fix count")?;
609 return Ok(());
610 }
611 }
612 }
613
614 #[cfg(any(feature = "debugger", feature = "metrics"))]
616 #[must_use]
617 pub fn num_functions(&self) -> usize {
618 self.submission.manifest.functions().len()
619 }
620
621 fn ancestors(&self, parent_id: usize) -> Vec<usize> {
623 let mut result = vec![parent_id];
624 let mut current = parent_id;
625 while let Some(flow_info) = self.submission.manifest.flows().get(¤t) {
626 if let Some(pid) = flow_info.parent_id {
627 result.push(pid);
628 current = pid;
629 } else {
630 break; }
632 }
633 result
634 }
635
636 #[allow(unused_variables, unused_assignments, unused_mut)]
638 fn unblock_flows(
639 &mut self,
640 job: &Job,
641 #[cfg(feature = "debugger")] debugger: &mut Debugger,
642 ) -> Result<(bool, bool)> {
643 let mut display_next_output = false;
644 let mut restart = false;
645
646 self.remove_from_busy(job.process_id, job.parent_id);
647
648 for ancestor_id in self.ancestors(job.parent_id) {
650 if self.busy_count.contains_key(&ancestor_id) {
651 continue;
652 }
653
654 if self.has_runnable_on_internal(ancestor_id) {
656 let runnable: Vec<_> = self
657 .functions_by_flow
658 .get(&ancestor_id)
659 .cloned()
660 .unwrap_or_default()
661 .into_iter()
662 .filter(|id| {
663 !self.completed.contains(id)
664 && self
665 .submission
666 .manifest
667 .functions()
668 .get(id)
669 .is_some_and(RuntimeFunction::can_run)
670 })
671 .collect();
672 for func_id in runnable {
673 self.create_jobs(func_id, ancestor_id)?;
674 }
675 } else {
676 debug!(
678 "Job #{}:\tFlow #{} is now idle",
679 job.payload.job_id, ancestor_id
680 );
681
682 #[cfg(feature = "debugger")]
683 {
684 (display_next_output, restart) =
685 debugger.check_prior_to_flow_unblock(self, ancestor_id)?;
686 }
687
688 self.clear_flow_internal_inputs(ancestor_id);
689 self.run_flow_initializers(ancestor_id)?;
690 }
691 }
692
693 Ok((display_next_output, restart))
694 }
695
696 fn remove_from_busy(&mut self, process_id: usize, parent_id: usize) {
698 if let Some(count) = self.busy_count.get_mut(&process_id) {
700 *count = count.saturating_sub(1);
701 if *count == 0 {
702 self.busy_count.remove(&process_id);
703 }
704 }
705 for ancestor in self.ancestors(parent_id) {
707 if let Some(count) = self.busy_count.get_mut(&ancestor) {
708 *count = count.saturating_sub(1);
709 if *count == 0 {
710 self.busy_count.remove(&ancestor);
711 }
712 }
713 }
714 trace!("\t\t\tUpdated busy_count to: {:?}", self.busy_count);
715 }
716
717 fn has_runnable_on_internal(&self, flow_id: usize) -> bool {
718 self.functions_by_flow
719 .get(&flow_id)
720 .is_some_and(|func_ids| {
721 func_ids.iter().any(|id| {
722 !self.completed.contains(id)
723 && self
724 .submission
725 .manifest
726 .functions()
727 .get(id)
728 .is_some_and(RuntimeFunction::can_run_on_internal)
729 })
730 })
731 }
732
733 fn clear_flow_internal_inputs(&mut self, flow_id: usize) {
734 if let Some(func_ids) = self.functions_by_flow.get(&flow_id).cloned() {
735 for id in &func_ids {
736 if !self.completed.contains(id) {
737 if let Some(f) = self.submission.manifest.get_functions().get_mut(id) {
738 f.clear_internal_inputs();
739 }
740 }
741 }
742 }
743 }
744
745 fn run_flow_initializers(&mut self, flow_id: usize) -> Result<()> {
746 let mut runnable_functions = Vec::<usize>::new();
747 if let Some(func_ids) = self.functions_by_flow.get(&flow_id).cloned() {
748 for id in &func_ids {
749 if !self.completed.contains(id) {
750 if let Some(f) = self.submission.manifest.get_functions().get_mut(id) {
751 f.init_inputs(false, true);
752 if f.can_run() {
753 runnable_functions.push(*id);
754 }
755 }
756 }
757 }
758 }
759
760 for function_id in runnable_functions {
761 self.create_jobs(function_id, flow_id)?;
762 }
763
764 Ok(())
765 }
766
767 pub(crate) fn mark_as_completed(&mut self, function_id: usize) {
769 self.completed.insert(function_id);
770 }
771}
772
773impl fmt::Display for RunState {
774 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
775 writeln!(f, " Submission:\n{}", self.submission)?;
776
777 writeln!(f, "RunState:")?;
778 writeln!(f, " Jobs Created: {}", self.number_of_jobs_created)?;
779 writeln!(f, "Number of Jobs Running: {}", self.running_jobs.len())?;
780 writeln!(f, " Jobs Running: {:?}", self.running_jobs.keys())?;
781 writeln!(
782 f,
783 " Functions Ready: {:?}",
784 self.ready_jobs
785 .iter() .map(|j| j.payload.job_id)
787 .collect::<Vec<usize>>()
788 )?;
789 writeln!(f, " Functions Completed: {:?}", self.completed)?;
790 write!(f, " Busy Count: {:?}", self.busy_count)
791 }
792}
793
794#[cfg(test)]
795#[allow(clippy::unwrap_used, clippy::expect_used)]
796mod test {
797 use serde_json::{json, Value};
798 use url::Url;
799
800 use flowcore::errors::Result;
801 use flowcore::model::flow_manifest::FlowManifest;
802 use flowcore::model::input::Input;
803 use flowcore::model::input::InputInitializer::Once;
804 use flowcore::model::metadata::MetaData;
805 use flowcore::model::output_connection::{OutputConnection, Source};
806 use flowcore::model::runtime_function::RuntimeFunction;
807 use flowcore::model::submission::Submission;
808
809 #[cfg(feature = "debugger")]
810 use crate::block::Block;
811 #[cfg(feature = "debugger")]
812 use crate::debug_command::DebugCommand;
813 #[cfg(feature = "debugger")]
814 use crate::debugger::Debugger;
815 #[cfg(feature = "debugger")]
816 use crate::debugger_handler::DebuggerHandler;
817
818 use super::RunState;
819 use super::State;
820 use super::{Job, Payload};
821
822 fn test_function_a_to_b_not_init() -> RuntimeFunction {
823 let connection_to_f1 = OutputConnection::new(
824 Source::default(),
825 1,
826 0,
827 0,
828 true,
829 "/fB".to_string(),
830 #[cfg(feature = "debugger")]
831 String::default(),
832 );
833
834 RuntimeFunction::new(
835 #[cfg(feature = "debugger")]
836 "fA",
837 #[cfg(feature = "debugger")]
838 "/fA",
839 "file://fake/test",
840 vec![Input::new(
841 #[cfg(feature = "debugger")]
842 "",
843 0,
844 false,
845 None,
846 None,
847 )],
848 0,
849 0,
850 &[connection_to_f1],
851 false,
852 ) }
854
855 fn test_function_a_to_b() -> RuntimeFunction {
856 let connection_to_f1 = OutputConnection::new(
857 Source::default(),
858 1,
859 0,
860 0,
861 true,
862 "/fB".to_string(),
863 #[cfg(feature = "debugger")]
864 String::default(),
865 );
866 RuntimeFunction::new(
867 #[cfg(feature = "debugger")]
868 "fA",
869 #[cfg(feature = "debugger")]
870 "/fA",
871 "file://fake/test",
872 vec![Input::new(
873 #[cfg(feature = "debugger")]
874 "",
875 0,
876 false,
877 Some(Once(json!(1))),
878 None,
879 )],
880 0,
881 0,
882 &[connection_to_f1],
883 false,
884 ) }
886
887 fn test_function_a_init() -> RuntimeFunction {
888 RuntimeFunction::new(
889 #[cfg(feature = "debugger")]
890 "fA",
891 #[cfg(feature = "debugger")]
892 "/fA",
893 "file://fake/test",
894 vec![Input::new(
895 #[cfg(feature = "debugger")]
896 "",
897 0,
898 false,
899 Some(Once(json!(1))),
900 None,
901 )],
902 0,
903 0,
904 &[],
905 false,
906 )
907 }
908
909 fn test_function_b_not_init() -> RuntimeFunction {
910 RuntimeFunction::new(
911 #[cfg(feature = "debugger")]
912 "fB",
913 #[cfg(feature = "debugger")]
914 "/fB",
915 "file://fake/test",
916 vec![Input::new(
917 #[cfg(feature = "debugger")]
918 "",
919 0,
920 false,
921 None,
922 None,
923 )],
924 1,
925 0,
926 &[],
927 false,
928 )
929 }
930
931 fn test_job(source_process_id: usize, destination_process_id: usize) -> Job {
932 let out_conn = OutputConnection::new(
933 Source::default(),
934 destination_process_id,
935 0,
936 0,
937 true,
938 String::default(),
939 #[cfg(feature = "debugger")]
940 String::default(),
941 );
942 Job {
943 process_id: source_process_id,
944 parent_id: 0,
945 #[cfg(feature = "debugger")]
946 function_name: String::new(),
947 connections: vec![out_conn],
948 payload: Payload {
949 job_id: 1,
950 implementation_url: Url::parse("file://test").expect("Could not parse Url"),
951 input_set: vec![json!(1)],
952 },
953 result: Ok((Some(json!(1)), true)),
954 }
955 }
956
957 #[cfg(feature = "debugger")]
958 struct DummyServer;
959
960 #[cfg(feature = "debugger")]
961 impl DebuggerHandler for DummyServer {
962 fn start(&mut self) {}
963 fn job_breakpoint(&mut self, _job: &Job, _function: &RuntimeFunction, _states: Vec<State>) {
964 }
965 fn block_breakpoint(&mut self, _block: &Block) {}
966 fn flow_unblock_breakpoint(&mut self, _flow_id: usize) {}
967 fn send_breakpoint(
968 &mut self,
969 _: &str,
970 _source_process_id: usize,
971 _output_route: &str,
972 _value: &Value,
973 _destination_id: usize,
974 _destination_name: &str,
975 _input_name: &str,
976 _input_number: usize,
977 ) {
978 }
979 fn job_error(&mut self, _job: &Job) {}
980 fn job_completed(&mut self, _job: &Job) {}
981 fn blocks(&mut self, _blocks: Vec<Block>) {}
982 fn outputs(&mut self, _output: Vec<OutputConnection>) {}
983 fn input(&mut self, _input: Input) {}
984 fn function_list(&mut self, _functions: &[RuntimeFunction]) {}
985 fn function_states(&mut self, _function: RuntimeFunction, _function_states: Vec<State>) {}
986 fn run_state(&mut self, _run_state: &RunState) {}
987 fn message(&mut self, _message: String) {}
988 fn panic(&mut self, _state: &RunState, _error_message: String) {}
989 fn debugger_exiting(&mut self) {}
990 fn debugger_resetting(&mut self) {}
991 fn debugger_error(&mut self, _error: String) {}
992 fn execution_starting(&mut self) {}
993 fn execution_ended(&mut self) {}
994 fn get_command(&mut self, _state: &RunState) -> Result<DebugCommand> {
995 unimplemented!();
996 }
997 }
998
999 #[cfg(feature = "debugger")]
1000 fn dummy_debugger(server: &mut dyn DebuggerHandler) -> Debugger<'_> {
1001 Debugger::new(server)
1002 }
1003
1004 fn test_meta_data() -> MetaData {
1005 MetaData {
1006 name: "test".into(),
1007 version: "0.0.0".into(),
1008 description: "a test".into(),
1009 authors: vec!["me".into()],
1010 }
1011 }
1012
1013 fn test_manifest(functions: Vec<RuntimeFunction>) -> FlowManifest {
1014 let mut manifest = FlowManifest::new(test_meta_data());
1015 for function in functions {
1016 manifest.add_function(function);
1017 }
1018 manifest
1019 }
1020
1021 fn test_submission(functions: Vec<RuntimeFunction>) -> Submission {
1022 Submission::new(
1023 test_manifest(functions),
1024 None,
1025 None,
1026 #[cfg(feature = "debugger")]
1027 true,
1028 )
1029 }
1030
1031 mod general_run_state_tests {
1032 use super::super::RunState;
1033
1034 #[test]
1035 fn display_run_state_test() {
1036 let f_a = super::test_function_a_to_b();
1037 let f_b = super::test_function_b_not_init();
1038 let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1039 state.init().expect("Could not init state");
1040
1041 #[cfg(any(feature = "debugger", feature = "metrics"))]
1042 assert_eq!(state.num_functions(), 2);
1043
1044 println!("Run state: {state}");
1045 }
1046
1047 #[cfg(feature = "metrics")]
1048 #[test]
1049 fn jobs_created_zero_at_init() {
1050 let mut state = RunState::new(super::test_submission(vec![]));
1051 state.init().expect("Could not init state");
1052 assert_eq!(
1053 0,
1054 state.get_number_of_jobs_created(),
1055 "At init jobs() should be 0"
1056 );
1057 assert_eq!(0, state.number_jobs_ready());
1058 }
1059
1060 #[cfg(feature = "debugger")]
1061 #[test]
1062 fn zero_running_at_init() {
1063 let mut state = RunState::new(super::test_submission(vec![]));
1064 state.init().expect("Could not init state");
1065 assert!(
1066 state.get_running().is_empty(),
1067 "At init get_running() should be empty"
1068 );
1069 }
1070 }
1071
1072 mod state_transitions {
1074 use serde_json::json;
1075 use serial_test::serial;
1076 use url::Url;
1077
1078 use flowcore::model::input::Input;
1079 use flowcore::model::input::InputInitializer::Always;
1080 #[cfg(feature = "metrics")]
1081 use flowcore::model::metrics::Metrics;
1082 use flowcore::model::output_connection::{OutputConnection, Source};
1083 use flowcore::model::runtime_function::RuntimeFunction;
1084
1085 use crate::run_state::test::test_function_b_not_init;
1086
1087 use super::super::RunState;
1088 use super::super::State;
1089 use super::super::{Job, Payload};
1090
1091 #[test]
1092 fn to_ready_1_on_init() {
1093 let f_a = super::test_function_a_to_b();
1094 let f_b = test_function_b_not_init();
1095 let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1096
1097 state.init().expect("Could not init state");
1099
1100 assert!(
1102 state.function_state_is_only(0, &State::Ready),
1103 "f_a should be Ready"
1104 );
1105 assert_eq!(1, state.number_jobs_ready());
1106 assert!(
1107 state.function_state_is_only(1, &State::Waiting),
1108 "f_b should be waiting for input"
1109 );
1110 }
1111
1112 #[test]
1113 fn input_blocker() {
1114 let f_a = super::test_function_a_to_b_not_init();
1115 let f_b = test_function_b_not_init();
1116 let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1117
1118 state.init().expect("Could not init state");
1120
1121 assert!(
1123 state.function_state_is_only(0, &State::Waiting),
1124 "f_a should be waiting for input"
1125 );
1126 assert!(
1127 state.function_state_is_only(1, &State::Waiting),
1128 "f_b should be waiting for input"
1129 );
1130 #[cfg(feature = "debugger")]
1131 assert!(
1132 state
1133 .get_input_blockers(1)
1134 .expect("Could not get blockers")
1135 .contains(&0),
1136 "There should be an input blocker"
1137 );
1138 }
1139
1140 #[test]
1141 fn to_ready_2_on_init() {
1142 let f_a = super::test_function_a_to_b();
1143 let f_b = test_function_b_not_init();
1144 let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1145
1146 state.init().expect("Could not init state");
1148
1149 assert!(
1151 state.function_state_is_only(0, &State::Ready),
1152 "f_a should be Ready"
1153 );
1154 }
1155
1156 #[test]
1157 fn to_ready_3_on_init() {
1158 let f_a = super::test_function_a_init();
1159 let mut state = RunState::new(super::test_submission(vec![f_a]));
1160
1161 state.init().expect("Could not init state");
1163
1164 assert!(
1166 state.function_state_is_only(0, &State::Ready),
1167 "f_a should be Ready"
1168 );
1169 }
1170
1171 fn test_function_a_not_init() -> RuntimeFunction {
1172 RuntimeFunction::new(
1173 #[cfg(feature = "debugger")]
1174 "fA",
1175 #[cfg(feature = "debugger")]
1176 "/fA",
1177 "file://fake/test",
1178 vec![Input::new(
1179 #[cfg(feature = "debugger")]
1180 "",
1181 0,
1182 false,
1183 None,
1184 None,
1185 )],
1186 0,
1187 0,
1188 &[],
1189 false,
1190 )
1191 }
1192
1193 #[test]
1194 fn to_waiting_on_init() {
1195 let f_a = test_function_a_not_init();
1196 let mut state = RunState::new(super::test_submission(vec![f_a]));
1197
1198 state.init().expect("Could not init state");
1200
1201 assert!(
1203 state.function_state_is_only(0, &State::Waiting),
1204 "f_a should be Waiting"
1205 );
1206 }
1207
1208 #[test]
1209 fn ready_to_running_on_next() {
1210 let f_a = super::test_function_a_init();
1211 let mut state = RunState::new(super::test_submission(vec![f_a]));
1212 state.init().expect("Could not init state");
1213 assert!(
1214 state.function_state_is_only(0, &State::Ready),
1215 "f_a should be Ready"
1216 );
1217
1218 let job = state.get_next_job().expect("Couldn't get next job");
1220 state.start_job(job.clone());
1221
1222 state
1224 .running_jobs
1225 .get(&job.payload.job_id)
1226 .expect("Job should have been running");
1227 }
1228
1229 #[test]
1230 fn unready_not_to_running_on_next() {
1231 let f_a = test_function_a_not_init();
1232 let mut state = RunState::new(super::test_submission(vec![f_a]));
1233 state.init().expect("Could not init state");
1234 assert!(
1235 state.function_state_is_only(0, &State::Waiting),
1236 "f_a should be Waiting"
1237 );
1238
1239 assert!(
1241 state.get_next_job().is_none(),
1242 "next_job() should return None"
1243 );
1244
1245 assert!(
1247 state.function_state_is_only(0, &State::Waiting),
1248 "f_a should be Waiting"
1249 );
1250 }
1251
1252 fn test_job() -> Job {
1253 Job {
1254 process_id: 0,
1255 #[cfg(feature = "debugger")]
1256 function_name: String::new(),
1257 parent_id: 0,
1258 connections: vec![],
1259 payload: Payload {
1260 job_id: 1,
1261 implementation_url: Url::parse("file://test").expect("Could not parse Url"),
1262 input_set: vec![json!(1)],
1263 },
1264 result: Ok((None, true)),
1265 }
1266 }
1267
1268 #[test]
1269 #[serial]
1270 fn running_to_ready_on_done() {
1271 let f_a = RuntimeFunction::new(
1272 #[cfg(feature = "debugger")]
1273 "fA",
1274 #[cfg(feature = "debugger")]
1275 "/fA",
1276 "file://fake/test",
1277 vec![Input::new(
1278 #[cfg(feature = "debugger")]
1279 "",
1280 0,
1281 false,
1282 Some(Always(json!(1))),
1283 None,
1284 )],
1285 0,
1286 0,
1287 &[],
1288 false,
1289 );
1290
1291 let mut state = RunState::new(super::test_submission(vec![f_a]));
1292 #[cfg(feature = "metrics")]
1293 let mut metrics = Metrics::new(1);
1294 #[cfg(feature = "debugger")]
1295 let mut server = super::DummyServer {};
1296 #[cfg(feature = "debugger")]
1297 let mut debugger = super::dummy_debugger(&mut server);
1298
1299 state.init().expect("Could not init state");
1300 assert!(
1301 state.function_state_is_only(0, &State::Ready),
1302 "f_a should be Ready"
1303 );
1304 let job = state.get_next_job().expect("Couldn't get next job");
1305 assert_eq!(
1306 0, job.process_id,
1307 "get_next_job() should return process_id = 0"
1308 );
1309 state.start_job(job.clone());
1310
1311 state
1312 .running_jobs
1313 .get(&job.payload.job_id)
1314 .expect("Job with f_a should be Running");
1315
1316 let job = test_job();
1318 state
1319 .retire_a_job(
1320 #[cfg(feature = "metrics")]
1321 &mut metrics,
1322 (job.payload.job_id, job.result),
1323 #[cfg(feature = "debugger")]
1324 &mut debugger,
1325 )
1326 .expect("Problem retiring job");
1327
1328 assert!(
1330 state.function_state_is_only(0, &State::Ready),
1331 "f_a should be Ready again"
1332 );
1333 }
1334
1335 #[test]
1337 #[serial]
1338 fn running_to_waiting_on_done() {
1339 let f_a = super::test_function_a_init();
1340
1341 let mut state = RunState::new(super::test_submission(vec![f_a]));
1342 #[cfg(feature = "metrics")]
1343 let mut metrics = Metrics::new(1);
1344 #[cfg(feature = "debugger")]
1345 let mut server = super::DummyServer {};
1346 #[cfg(feature = "debugger")]
1347 let mut debugger = super::dummy_debugger(&mut server);
1348
1349 state.init().expect("Could not init state");
1350 assert!(
1351 state.function_state_is_only(0, &State::Ready),
1352 "f_a should be Ready"
1353 );
1354 let job = state.get_next_job().expect("Couldn't get next job");
1355 assert_eq!(0, job.process_id, "next() should return process_id = 0");
1356 state.start_job(job.clone());
1357
1358 state
1359 .running_jobs
1360 .get(&job.payload.job_id)
1361 .expect("Job with f_a should be Running");
1362
1363 let job = test_job();
1365 state
1366 .retire_a_job(
1367 #[cfg(feature = "metrics")]
1368 &mut metrics,
1369 (job.payload.job_id, job.result),
1370 #[cfg(feature = "debugger")]
1371 &mut debugger,
1372 )
1373 .expect("Problem retiring job");
1374
1375 assert!(
1377 state.function_state_is_only(0, &State::Waiting),
1378 "f_a should be Waiting again"
1379 );
1380 }
1381
1382 #[test]
1383 #[serial]
1384 fn waiting_to_ready_on_input() {
1385 let f_a = test_function_a_not_init();
1386 let out_conn = OutputConnection::new(
1387 Source::default(),
1388 0,
1389 0,
1390 0,
1391 true,
1392 String::default(),
1393 #[cfg(feature = "debugger")]
1394 String::default(),
1395 );
1396 let f_b = RuntimeFunction::new(
1397 #[cfg(feature = "debugger")]
1398 "fB",
1399 #[cfg(feature = "debugger")]
1400 "/fB",
1401 "file://fake/test",
1402 vec![Input::new(
1403 #[cfg(feature = "debugger")]
1404 "",
1405 0,
1406 false,
1407 None,
1408 None,
1409 )],
1410 1,
1411 0,
1412 &[out_conn],
1413 false,
1414 );
1415 let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1416 #[cfg(feature = "metrics")]
1417 let mut metrics = Metrics::new(1);
1418 #[cfg(feature = "debugger")]
1419 let mut server = super::DummyServer {};
1420 #[cfg(feature = "debugger")]
1421 let mut debugger = super::dummy_debugger(&mut server);
1422
1423 state.init().expect("Could not init state");
1424 assert!(
1425 state.function_state_is_only(0, &State::Waiting),
1426 "f_a should be Waiting"
1427 );
1428
1429 let job = super::test_job(1, 0);
1431 state.start_job(job.clone());
1432
1433 state
1434 .retire_a_job(
1435 #[cfg(feature = "metrics")]
1436 &mut metrics,
1437 (job.payload.job_id, job.result),
1438 #[cfg(feature = "debugger")]
1439 &mut debugger,
1440 )
1441 .expect("Problem retiring job");
1442
1443 assert!(
1445 state.function_state_is_only(0, &State::Ready),
1446 "f_a should be Ready"
1447 );
1448 }
1449
1450 #[test]
1455 #[serial]
1456 fn waiting_to_blocked_on_input() {
1457 let f_a = super::test_function_a_to_b_not_init();
1458 let connection_to_f0 = OutputConnection::new(
1459 Source::default(),
1460 0,
1461 0,
1462 0,
1463 true,
1464 String::default(),
1465 #[cfg(feature = "debugger")]
1466 String::default(),
1467 );
1468 let f_b = RuntimeFunction::new(
1469 #[cfg(feature = "debugger")]
1470 "fB",
1471 #[cfg(feature = "debugger")]
1472 "/fB",
1473 "file://fake/test",
1474 vec![Input::new(
1475 #[cfg(feature = "debugger")]
1476 "",
1477 0,
1478 false,
1479 Some(Always(json!(1))),
1480 None,
1481 )],
1482 1,
1483 0,
1484 &[connection_to_f0],
1485 false,
1486 );
1487 let mut state = RunState::new(super::test_submission(vec![f_a, f_b]));
1488 #[cfg(feature = "metrics")]
1489 let mut metrics = Metrics::new(1);
1490 #[cfg(feature = "debugger")]
1491 let mut server = super::DummyServer {};
1492 #[cfg(feature = "debugger")]
1493 let mut debugger = super::dummy_debugger(&mut server);
1494
1495 state.init().expect("Could not init state");
1496
1497 assert!(
1498 state.function_state_is_only(1, &State::Ready),
1499 "f_b should be Ready"
1500 );
1501 assert!(
1502 state.function_state_is_only(0, &State::Waiting),
1503 "f_a should be in Waiting"
1504 );
1505
1506 let job = super::test_job(1, 0);
1508 state.start_job(job.clone());
1509
1510 state
1511 .retire_a_job(
1512 #[cfg(feature = "metrics")]
1513 &mut metrics,
1514 (job.payload.job_id, job.result),
1515 #[cfg(feature = "debugger")]
1516 &mut debugger,
1517 )
1518 .expect("Problem retiring job");
1519
1520 assert!(
1522 state.function_state_is_only(0, &State::Ready),
1523 "f_a should be Ready"
1524 );
1525 }
1526 }
1527
1528 mod functional_tests {
1530 use serial_test::serial;
1533
1534 use flowcore::model::input::Input;
1535 #[cfg(feature = "metrics")]
1536 use flowcore::model::metrics::Metrics;
1537 use flowcore::model::output_connection::{OutputConnection, Source};
1538 use flowcore::model::runtime_function::RuntimeFunction;
1539
1540 use super::super::RunState;
1541
1542 fn test_functions() -> Vec<RuntimeFunction> {
1543 let out_conn1 = OutputConnection::new(
1544 Source::default(),
1545 1,
1546 0,
1547 0,
1548 true,
1549 String::default(),
1550 #[cfg(feature = "debugger")]
1551 String::default(),
1552 );
1553 let out_conn2 = OutputConnection::new(
1554 Source::default(),
1555 2,
1556 0,
1557 0,
1558 true,
1559 String::default(),
1560 #[cfg(feature = "debugger")]
1561 String::default(),
1562 );
1563 let p0 = RuntimeFunction::new(
1564 #[cfg(feature = "debugger")]
1565 "p0",
1566 #[cfg(feature = "debugger")]
1567 "/p0",
1568 "file://fake/test/p0",
1569 vec![], 0,
1571 0,
1572 &[out_conn1, out_conn2], false,
1574 ); let p1 = RuntimeFunction::new(
1576 #[cfg(feature = "debugger")]
1577 "p1",
1578 #[cfg(feature = "debugger")]
1579 "/p1",
1580 "file://fake/test/p1",
1581 vec![Input::new(
1582 #[cfg(feature = "debugger")]
1583 "",
1584 0,
1585 false,
1586 None,
1587 None,
1588 )], 1,
1590 0,
1591 &[],
1592 false,
1593 );
1594 let p2 = RuntimeFunction::new(
1595 #[cfg(feature = "debugger")]
1596 "p2",
1597 #[cfg(feature = "debugger")]
1598 "/p2",
1599 "file://fake/test/p2",
1600 vec![Input::new(
1601 #[cfg(feature = "debugger")]
1602 "",
1603 0,
1604 false,
1605 None,
1606 None,
1607 )], 2,
1609 0,
1610 &[],
1611 false,
1612 );
1613 vec![p0, p1, p2]
1614 }
1615
1616 #[test]
1617 fn get_works() {
1618 let state = RunState::new(super::test_submission(test_functions()));
1619 let got = state
1620 .get_function(1)
1621 .ok_or("Could not get function by id")
1622 .expect("Could not get function with that id");
1623 assert_eq!(got.id(), 1);
1624 }
1625
1626 #[test]
1627 fn no_next_if_none_ready() {
1628 let mut state = RunState::new(super::test_submission(test_functions()));
1629 assert!(state.get_next_job().is_none());
1630 }
1631
1632 #[test]
1633 fn next_works() {
1634 let mut state = RunState::new(super::test_submission(test_functions()));
1635
1636 state.create_jobs(0, 0).expect("Could not create jobs");
1638
1639 state.get_next_job().expect("Couldn't get next job");
1640 }
1641
1642 #[test]
1643 fn inputs_ready_makes_ready() {
1644 let mut state = RunState::new(super::test_submission(test_functions()));
1645
1646 state.create_jobs(0, 0).expect("Could not create jobs");
1648
1649 state.get_next_job().expect("Couldn't get next job");
1650 }
1651
1652 #[test]
1653 #[serial]
1654 fn wont_return_too_many_jobs() {
1655 let mut state = RunState::new(super::test_submission(test_functions()));
1656
1657 state.init().expect("Could not init state");
1658
1659 let _ = state.get_next_job().expect("Couldn't get next job");
1660
1661 assert!(
1662 state.get_next_job().is_none(),
1663 "Did not expect a Ready job!"
1664 );
1665 }
1666
1667 #[test]
1672 #[serial]
1673 fn pure_function_no_destinations() {
1674 let f_a = super::test_function_a_init();
1675 let _id = f_a.id();
1676
1677 let mut state = RunState::new(super::test_submission(vec![f_a]));
1678 #[cfg(feature = "metrics")]
1679 let mut metrics = Metrics::new(1);
1680 #[cfg(feature = "debugger")]
1681 let mut server = super::DummyServer {};
1682 #[cfg(feature = "debugger")]
1683 let mut debugger = super::dummy_debugger(&mut server);
1684
1685 state.init().expect("Could not init state");
1686
1687 let job = state.get_next_job().expect("Couldn't get next job");
1688 state.start_job(job.clone());
1689
1690 state
1692 .retire_a_job(
1693 #[cfg(feature = "metrics")]
1694 &mut metrics,
1695 (job.payload.job_id, job.result),
1696 #[cfg(feature = "debugger")]
1697 &mut debugger,
1698 )
1699 .expect("Failed to retire job correctly");
1700 }
1701 }
1702}