flowrlib/
job.rs

1use std::fmt;
2
3use serde_derive::{Deserialize, Serialize};
4use serde_json::Value;
5use url::Url;
6
7use flowcore::errors::*;
8use flowcore::model::output_connection::OutputConnection;
9use flowcore::RunAgain;
10
11/// Conatins the minimum amount of information required to execute a [Job] and return the result
12#[derive(Serialize, Deserialize, Clone)]
13pub struct JobPayload {
14    /// Each `Job` has a unique id that increments as jobs are executed
15    pub job_id: usize,
16    /// The set of input values to be used by the function when executing this job
17    pub input_set: Vec<Value>,
18    /// The url of the implementation to be run for this job
19    pub implementation_url: Url,
20}
21
22/// A `Job` contains the information necessary to manage the execution of a function in the
23/// flow on a set of input values, and then where to send the outputs that maybe produces.
24#[derive(Serialize, Deserialize, Clone)]
25pub struct Job {
26    /// The `id` of the function in the `RunState`'s list of functions that will execute this job
27    pub function_id: usize,
28    /// The `id` of the nested flow (from root flow on down) there the function executing the job is
29    pub flow_id: usize,
30    /// the payload required to execute the job
31    pub payload: JobPayload,
32    /// The result of the execution with the job_id, the optional output Value and if the function
33    /// should be run again in the future
34    pub result: Result<(Option<Value>, RunAgain)>,
35    /// The destinations (other function's inputs) where any output should be sent
36    pub connections: Vec<OutputConnection>,
37}
38
39unsafe impl Send for Job{}
40unsafe impl Sync for Job{}
41
42impl fmt::Display for Job {
43    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
44        writeln!(f, "{}", self.payload)?;
45        writeln!(f, "Connections: {:?}", self.connections)?;
46        writeln!(f, "Function Id: {}, Flow Id: {}", self.function_id, self.flow_id)?;
47        write!(f, "Result: {:?}", self.result)
48    }
49}
50
51impl fmt::Display for JobPayload {
52    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
53        writeln!(f, "Job #: {}", self.job_id)?;
54        writeln!(f, "Implementation Url: {}", self.implementation_url)?;
55        writeln!(f, "Inputs: {:?}", self.input_set)
56    }
57}
58
59#[cfg(test)]
60mod test {
61    use std::collections::HashMap;
62
63    use serde_json::json;
64    use url::Url;
65
66    use flowcore::model::datatype::ARRAY_TYPE;
67    use crate::job::JobPayload;
68
69    #[test]
70    fn display_job_test() {
71        let job = super::Job {
72            function_id: 1,
73            flow_id: 0,
74            connections: vec![],
75            payload: JobPayload {
76                job_id: 0,
77                input_set: vec![],
78                implementation_url: Url::parse("lib://flowstdlib/math/add").expect("Could not parse Url"),
79            },
80            result: Ok((None, false))
81        };
82        println!("Job: {job}");
83    }
84
85    #[test]
86    fn get_entire_output_value() {
87        let job = super::Job {
88            function_id: 1,
89            flow_id: 0,
90            connections: vec![],
91            payload: JobPayload {
92                job_id: 0,
93                input_set: vec![],
94                implementation_url: Url::parse("lib://flowstdlib/math/add").expect("Could not parse Url"),
95            },
96            result: Ok((Some(json!(42u64)), false))
97        };
98
99        assert_eq!(
100            &json!(42u64),
101            job.result
102                .expect("Could not get result")
103                .0
104                .expect("No output value when one was expected")
105                .pointer("")
106                .expect("Could not get value using json pointer")
107        );
108    }
109
110    #[test]
111    fn get_sub_array_from_output_value() {
112        let mut map = HashMap::new();
113        map.insert(ARRAY_TYPE, vec![1, 2, 3]);
114        let value = json!(map);
115        let job = super::Job {
116            function_id: 1,
117            flow_id: 0,
118            connections: vec![],
119            payload: JobPayload {
120                job_id: 0,
121                input_set: vec![],
122                implementation_url: Url::parse("lib://flowstdlib/math/add").expect("Could not parse Url"),
123            },
124            result: Ok((Some(json!(value)), false)),
125        };
126
127        assert_eq!(
128            &json!(3),
129            job.result
130                .expect("Could not get result")
131                .0
132                .expect("No output value when one was expected")
133                .pointer("/array/2")
134                .expect("Could not get value using json pointer")
135        );
136    }
137}