Skip to main content

flowrlib/
dispatcher.rs

1use std::time::Duration;
2
3use log::{debug, error, trace};
4use serde_json::Value;
5use zmq::DONTWAIT;
6
7use flowcore::errors::{Result, ResultExt};
8use flowcore::RunAgain;
9
10use crate::job::Payload;
11
12const WAIT: i32 = 0;
13
14/// `Dispatcher` structure holds information required to send jobs for execution and receive results back
15#[allow(clippy::struct_field_names)]
16pub struct Dispatcher {
17    // A source of lib jobs to be executed
18    lib_job_socket: zmq::Socket,
19    // A source of jobs to be executed for context:// and provided functions
20    general_job_socket: zmq::Socket,
21    // A sink where to send jobs (with results)
22    results_socket: zmq::Socket,
23    // a socket to send control information to subscribing executors
24    control_socket: zmq::Socket,
25}
26
27/// `Dispatcher` struct takes care of ending jobs for execution and receiving results
28impl Dispatcher {
29    /// Create a new `Dispatcher` of `Job`s using three addresses of job queues
30    ///
31    /// # Errors
32    ///
33    /// Returns an error if the zmq sockets used to send messages between client and coordinator
34    /// cannot be bound.
35    ///
36    pub fn new(job_queues: &(String, String, String, String)) -> Result<Self> {
37        let context = zmq::Context::new();
38        let lib_job_socket = context
39            .socket(zmq::PUSH)
40            .map_err(|_| "Could not create job socket")?;
41        lib_job_socket
42            .bind(&job_queues.0)
43            .map_err(|_| "Could not bind to job socket")?;
44
45        let general_job_socket = context
46            .socket(zmq::PUSH)
47            .map_err(|_| "Could not create context job socket")?;
48        general_job_socket
49            .bind(&job_queues.1)
50            .map_err(|_| "Could not bind to context job socket")?;
51
52        let results_socket = context
53            .socket(zmq::PULL)
54            .map_err(|_| "Could not create results socket")?;
55        results_socket
56            .bind(&job_queues.2)
57            .map_err(|_| "Could not bind to results socket")?;
58
59        let control_socket = context
60            .socket(zmq::PUB)
61            .map_err(|_| "Could not create control socket")?;
62        control_socket
63            .bind(&job_queues.3)
64            .map_err(|_| "Could not bind to control socket")?;
65
66        Ok(Dispatcher {
67            lib_job_socket,
68            general_job_socket,
69            results_socket,
70            control_socket,
71        })
72    }
73
74    // Set the timeout to use when waiting for job results
75    // Setting to `None` will disable timeouts and block forever
76    pub(crate) fn set_results_timeout(&mut self, timeout: Option<Duration>) -> Result<()> {
77        #[allow(clippy::single_match_else)]
78        match timeout {
79            Some(time) => {
80                debug!("Setting results timeout to: {}ms", time.as_millis());
81                //assert!(time.as_millis() < i32::MAX, "Truncation");
82                self.results_socket
83                    .set_rcvtimeo(i32::try_from(time.as_millis())?)
84            }
85            None => {
86                debug!("Disabling results timeout");
87                self.results_socket.set_rcvtimeo(-1)
88            }
89        }
90        .map_err(|e| format!("Error setting results timeout: {e}").into())
91    }
92
93    // Wait for, then return the next Result returned from executors
94    #[allow(clippy::type_complexity)]
95    pub(crate) fn get_next_result(
96        &mut self,
97        block: bool,
98    ) -> Result<(usize, Result<(Option<Value>, RunAgain)>)> {
99        let flags = if block { WAIT } else { DONTWAIT };
100
101        let msg = self
102            .results_socket
103            .recv_msg(flags)
104            .map_err(|_| "Error receiving result")?;
105        let message_string = msg.as_str().ok_or("Could not get message as str")?;
106        serde_json::from_str(message_string)
107            .map_err(|_| "Could not Deserialize from zmq message string".into())
108    }
109
110    // Send a `Job` for execution to executors
111    pub(crate) fn send_job_for_execution(&mut self, payload: &Payload) -> Result<()> {
112        if payload.implementation_url.scheme() == "lib" {
113            self.lib_job_socket
114                .send(serde_json::to_string(payload)?.as_bytes(), 0)
115                .map_err(|e| format!("Could not send context Job for execution: {e}"))?;
116        } else {
117            self.general_job_socket
118                .send(serde_json::to_string(payload)?.as_bytes(), 0)
119                .map_err(|e| format!("Could not send Job for execution: {e}"))?;
120        }
121
122        trace!("Job #{}: Payload sent for execution", payload.job_id);
123
124        Ok(())
125    }
126
127    /// Send a "DONE"" message to subscribed executors on the `control_socket`
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the message bytes cannot be sent over the control socket
132    ///
133    pub fn send_done(&mut self) -> Result<()> {
134        debug!("Dispatcher announcing DONE");
135        self.control_socket
136            .send("DONE".as_bytes(), DONTWAIT)
137            .chain_err(|| "Could not send 'DONE' message")
138    }
139}
140
141impl Drop for Dispatcher {
142    fn drop(&mut self) {
143        if let Err(e) = self.send_done() {
144            error!("Error while sending DONE while dropping Dispatcher: {e}");
145        }
146    }
147}
148
149#[cfg(test)]
150#[allow(clippy::unwrap_used, clippy::expect_used)]
151mod test {
152    use std::time::Duration;
153
154    use portpicker::pick_unused_port;
155    use serde_json::Value;
156    use serial_test::serial;
157    use url::Url;
158
159    use flowcore::errors::*;
160    use flowcore::RunAgain;
161    use flowcore::DONT_RUN_AGAIN;
162
163    use crate::job::Payload;
164
165    fn get_bind_addresses(ports: (u16, u16, u16, u16)) -> (String, String, String, String) {
166        (
167            format!("tcp://*:{}", ports.0),
168            format!("tcp://*:{}", ports.1),
169            format!("tcp://*:{}", ports.2),
170            format!("tcp://*:{}", ports.3),
171        )
172    }
173
174    fn get_four_ports() -> (u16, u16, u16, u16) {
175        (
176            pick_unused_port().expect("No ports free"),
177            pick_unused_port().expect("No ports free"),
178            pick_unused_port().expect("No ports free"),
179            pick_unused_port().expect("No ports free"),
180        )
181    }
182
183    #[test]
184    #[serial]
185    fn test_constructor() {
186        let dispatcher = super::Dispatcher::new(&get_bind_addresses(get_four_ports()));
187        assert!(dispatcher.is_ok());
188    }
189
190    #[test]
191    #[serial]
192    fn set_timeout_to_none() {
193        let mut dispatcher = super::Dispatcher::new(&get_bind_addresses(get_four_ports()))
194            .expect("Could not create dispatcher");
195        assert!(dispatcher.set_results_timeout(None).is_ok());
196    }
197
198    #[test]
199    #[serial]
200    fn set_timeout() {
201        let mut dispatcher = super::Dispatcher::new(&get_bind_addresses(get_four_ports()))
202            .expect("Could not create dispatcher");
203        assert!(dispatcher
204            .set_results_timeout(Some(Duration::from_millis(10)))
205            .is_ok());
206    }
207
208    #[test]
209    #[serial]
210    fn send_lib_job() {
211        let payload = Payload {
212            job_id: 0,
213            input_set: vec![],
214            implementation_url: Url::parse("lib://flowstdlib/math/add")
215                .expect("Could not parse Url"),
216        };
217
218        let ports = get_four_ports();
219        let mut dispatcher = super::Dispatcher::new(&get_bind_addresses(ports))
220            .expect("Could not create dispatcher");
221
222        let context = zmq::Context::new();
223        let job_source = context
224            .socket(zmq::PULL)
225            .expect("Could not create PULL end of job socket");
226        job_source
227            .connect(&format!("tcp://127.0.0.1:{}", ports.0))
228            .expect("Could not bind to PULL end of job socket");
229
230        assert!(dispatcher.send_job_for_execution(&payload).is_ok());
231    }
232
233    #[test]
234    #[serial]
235    fn send_context_job() {
236        let payload = Payload {
237            job_id: 0,
238            input_set: vec![],
239            implementation_url: Url::parse("context://stdio/stdout").expect("Could not parse Url"),
240        };
241
242        let ports = get_four_ports();
243        let mut dispatcher = super::Dispatcher::new(&get_bind_addresses(ports))
244            .expect("Could not create dispatcher");
245
246        let context = zmq::Context::new();
247        let context_job_source = context
248            .socket(zmq::PULL)
249            .expect("Could not create PULL end of context-job socket");
250        context_job_source
251            .connect(&format!("tcp://127.0.0.1:{}", ports.1))
252            .expect("Could not bind to PULL end of job-source socket");
253
254        assert!(dispatcher.send_job_for_execution(&payload).is_ok());
255    }
256
257    #[test]
258    #[serial]
259    fn get_job() {
260        let ports = get_four_ports();
261        let mut dispatcher = super::Dispatcher::new(&get_bind_addresses(ports))
262            .expect("Could not create dispatcher");
263
264        let context = zmq::Context::new();
265        let results_sink = context
266            .socket(zmq::PUSH)
267            .expect("Could not create PUSH end of results socket");
268        results_sink
269            .connect(&format!("tcp://127.0.0.1:{}", ports.2))
270            .expect("Could not connect to PULL end of results socket");
271        let result: Result<(Option<Value>, RunAgain)> = Ok((None, DONT_RUN_AGAIN));
272        results_sink
273            .send(
274                serde_json::to_string(&(0, result))
275                    .expect("Could not convert to serde")
276                    .as_bytes(),
277                0,
278            )
279            .expect("Could not send result of Job");
280
281        assert!(dispatcher.get_next_result(true).is_ok());
282    }
283}