Skip to main content

flowrlib/
job.rs

1use std::fmt;
2
3use serde_derive::{Deserialize, Serialize};
4use serde_json::Value;
5use url::Url;
6
7use flowcore::errors::Result;
8use flowcore::model::output_connection::OutputConnection;
9use flowcore::RunAgain;
10
11/// Conatins the minimum amount of information required to execute a [Job] and return the result
12#[derive(Serialize, Deserialize, Clone)]
13pub struct Payload {
14    /// Each `Job` has a unique id that increments as jobs are executed
15    pub job_id: usize,
16    /// The set of input values to be used by the function when executing this job
17    pub input_set: Vec<Value>,
18    /// The url of the implementation to be run for this job
19    pub implementation_url: Url,
20}
21
22/// A `Job` contains the information necessary to manage the execution of a function in the
23/// flow on a set of input values, and then where to send the outputs that maybe produces.
24#[derive(Serialize, Deserialize, Clone)]
25pub struct Job {
26    /// The `process_id` of the function in the `RunState`'s list of functions that will execute this job
27    pub process_id: usize,
28    /// The `parent_id` of the flow containing the function executing the job
29    pub parent_id: usize,
30    /// The function name (for logging without looking up the function)
31    #[cfg(feature = "debugger")]
32    pub function_name: String,
33    /// the payload required to execute the job
34    pub payload: Payload,
35    /// The result of the execution with the `job_id`, the optional output Value and if the function
36    /// should be run again in the future
37    pub result: Result<(Option<Value>, RunAgain)>,
38    /// The destinations (other function's inputs) where any output should be sent
39    pub connections: Vec<OutputConnection>,
40}
41
42impl fmt::Display for Job {
43    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
44        writeln!(f, "{}", self.payload)?;
45        writeln!(f, "Connections: {:?}", self.connections)?;
46        writeln!(
47            f,
48            "Process Id: {}, Parent Id: {}",
49            self.process_id, self.parent_id
50        )?;
51        write!(f, "Result: {:?}", self.result)
52    }
53}
54
55impl fmt::Display for Payload {
56    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
57        writeln!(f, "Job #: {}", self.job_id)?;
58        writeln!(f, "Implementation Url: {}", self.implementation_url)?;
59        writeln!(f, "Inputs: {:?}", self.input_set)
60    }
61}
62
63#[cfg(test)]
64#[allow(clippy::unwrap_used, clippy::expect_used)]
65mod test {
66    use std::collections::HashMap;
67
68    use serde_json::json;
69    use url::Url;
70
71    use flowcore::model::datatype::ARRAY_TYPE;
72
73    use crate::job::Payload;
74
75    #[test]
76    fn display_job_test() {
77        let job = super::Job {
78            process_id: 1,
79            #[cfg(feature = "debugger")]
80            function_name: String::new(),
81            parent_id: 0,
82            connections: vec![],
83            payload: Payload {
84                job_id: 0,
85                input_set: vec![],
86                implementation_url: Url::parse("lib://flowstdlib/math/add")
87                    .expect("Could not parse Url"),
88            },
89            result: Ok((None, false)),
90        };
91        println!("Job: {job}");
92    }
93
94    #[test]
95    fn get_entire_output_value() {
96        let job = super::Job {
97            process_id: 1,
98            #[cfg(feature = "debugger")]
99            function_name: String::new(),
100            parent_id: 0,
101            connections: vec![],
102            payload: Payload {
103                job_id: 0,
104                input_set: vec![],
105                implementation_url: Url::parse("lib://flowstdlib/math/add")
106                    .expect("Could not parse Url"),
107            },
108            result: Ok((Some(json!(42u64)), false)),
109        };
110
111        assert_eq!(
112            &json!(42u64),
113            job.result
114                .expect("Could not get result")
115                .0
116                .expect("No output value when one was expected")
117                .pointer("")
118                .expect("Could not get value using json pointer")
119        );
120    }
121
122    #[test]
123    fn get_sub_array_from_output_value() {
124        let mut map = HashMap::new();
125        map.insert(ARRAY_TYPE, vec![1, 2, 3]);
126        let value = json!(map);
127        let job = super::Job {
128            process_id: 1,
129            #[cfg(feature = "debugger")]
130            function_name: String::new(),
131            parent_id: 0,
132            connections: vec![],
133            payload: Payload {
134                job_id: 0,
135                input_set: vec![],
136                implementation_url: Url::parse("lib://flowstdlib/math/add")
137                    .expect("Could not parse Url"),
138            },
139            result: Ok((Some(json!(value)), false)),
140        };
141
142        assert_eq!(
143            &json!(3),
144            job.result
145                .expect("Could not get result")
146                .0
147                .expect("No output value when one was expected")
148                .pointer("/array/2")
149                .expect("Could not get value using json pointer")
150        );
151    }
152}