1use std::time::Duration;
2
3use log::{debug, error, trace};
4use serde_json::Value;
5use zmq::DONTWAIT;
6
7use flowcore::errors::{Result, ResultExt};
8use flowcore::RunAgain;
9
10use crate::job::Payload;
11
12const WAIT: i32 = 0;
13
14#[allow(clippy::struct_field_names)]
16pub struct Dispatcher {
17 lib_job_socket: zmq::Socket,
19 general_job_socket: zmq::Socket,
21 results_socket: zmq::Socket,
23 control_socket: zmq::Socket,
25}
26
27impl Dispatcher {
29 pub fn new(job_queues: &(String, String, String, String)) -> Result<Self> {
37 let context = zmq::Context::new();
38 let lib_job_socket = context
39 .socket(zmq::PUSH)
40 .map_err(|_| "Could not create job socket")?;
41 lib_job_socket
42 .bind(&job_queues.0)
43 .map_err(|_| "Could not bind to job socket")?;
44
45 let general_job_socket = context
46 .socket(zmq::PUSH)
47 .map_err(|_| "Could not create context job socket")?;
48 general_job_socket
49 .bind(&job_queues.1)
50 .map_err(|_| "Could not bind to context job socket")?;
51
52 let results_socket = context
53 .socket(zmq::PULL)
54 .map_err(|_| "Could not create results socket")?;
55 results_socket
56 .bind(&job_queues.2)
57 .map_err(|_| "Could not bind to results socket")?;
58
59 let control_socket = context
60 .socket(zmq::PUB)
61 .map_err(|_| "Could not create control socket")?;
62 control_socket
63 .bind(&job_queues.3)
64 .map_err(|_| "Could not bind to control socket")?;
65
66 Ok(Dispatcher {
67 lib_job_socket,
68 general_job_socket,
69 results_socket,
70 control_socket,
71 })
72 }
73
74 pub(crate) fn set_results_timeout(&mut self, timeout: Option<Duration>) -> Result<()> {
77 #[allow(clippy::single_match_else)]
78 match timeout {
79 Some(time) => {
80 debug!("Setting results timeout to: {}ms", time.as_millis());
81 self.results_socket
83 .set_rcvtimeo(i32::try_from(time.as_millis())?)
84 }
85 None => {
86 debug!("Disabling results timeout");
87 self.results_socket.set_rcvtimeo(-1)
88 }
89 }
90 .map_err(|e| format!("Error setting results timeout: {e}").into())
91 }
92
93 #[allow(clippy::type_complexity)]
95 pub(crate) fn get_next_result(
96 &mut self,
97 block: bool,
98 ) -> Result<(usize, Result<(Option<Value>, RunAgain)>)> {
99 let flags = if block { WAIT } else { DONTWAIT };
100
101 let msg = self
102 .results_socket
103 .recv_msg(flags)
104 .map_err(|_| "Error receiving result")?;
105 let message_string = msg.as_str().ok_or("Could not get message as str")?;
106 serde_json::from_str(message_string)
107 .map_err(|_| "Could not Deserialize from zmq message string".into())
108 }
109
110 pub(crate) fn send_job_for_execution(&mut self, payload: &Payload) -> Result<()> {
112 if payload.implementation_url.scheme() == "lib" {
113 self.lib_job_socket
114 .send(serde_json::to_string(payload)?.as_bytes(), 0)
115 .map_err(|e| format!("Could not send context Job for execution: {e}"))?;
116 } else {
117 self.general_job_socket
118 .send(serde_json::to_string(payload)?.as_bytes(), 0)
119 .map_err(|e| format!("Could not send Job for execution: {e}"))?;
120 }
121
122 trace!("Job #{}: Payload sent for execution", payload.job_id);
123
124 Ok(())
125 }
126
127 pub fn send_done(&mut self) -> Result<()> {
134 debug!("Dispatcher announcing DONE");
135 self.control_socket
136 .send("DONE".as_bytes(), DONTWAIT)
137 .chain_err(|| "Could not send 'DONE' message")
138 }
139}
140
141impl Drop for Dispatcher {
142 fn drop(&mut self) {
143 if let Err(e) = self.send_done() {
144 error!("Error while sending DONE while dropping Dispatcher: {e}");
145 }
146 }
147}
148
149#[cfg(test)]
150#[allow(clippy::unwrap_used, clippy::expect_used)]
151mod test {
152 use std::time::Duration;
153
154 use portpicker::pick_unused_port;
155 use serde_json::Value;
156 use serial_test::serial;
157 use url::Url;
158
159 use flowcore::errors::*;
160 use flowcore::RunAgain;
161 use flowcore::DONT_RUN_AGAIN;
162
163 use crate::job::Payload;
164
165 fn get_bind_addresses(ports: (u16, u16, u16, u16)) -> (String, String, String, String) {
166 (
167 format!("tcp://*:{}", ports.0),
168 format!("tcp://*:{}", ports.1),
169 format!("tcp://*:{}", ports.2),
170 format!("tcp://*:{}", ports.3),
171 )
172 }
173
174 fn get_four_ports() -> (u16, u16, u16, u16) {
175 (
176 pick_unused_port().expect("No ports free"),
177 pick_unused_port().expect("No ports free"),
178 pick_unused_port().expect("No ports free"),
179 pick_unused_port().expect("No ports free"),
180 )
181 }
182
183 #[test]
184 #[serial]
185 fn test_constructor() {
186 let dispatcher = super::Dispatcher::new(&get_bind_addresses(get_four_ports()));
187 assert!(dispatcher.is_ok());
188 }
189
190 #[test]
191 #[serial]
192 fn set_timeout_to_none() {
193 let mut dispatcher = super::Dispatcher::new(&get_bind_addresses(get_four_ports()))
194 .expect("Could not create dispatcher");
195 assert!(dispatcher.set_results_timeout(None).is_ok());
196 }
197
198 #[test]
199 #[serial]
200 fn set_timeout() {
201 let mut dispatcher = super::Dispatcher::new(&get_bind_addresses(get_four_ports()))
202 .expect("Could not create dispatcher");
203 assert!(dispatcher
204 .set_results_timeout(Some(Duration::from_millis(10)))
205 .is_ok());
206 }
207
208 #[test]
209 #[serial]
210 fn send_lib_job() {
211 let payload = Payload {
212 job_id: 0,
213 input_set: vec![],
214 implementation_url: Url::parse("lib://flowstdlib/math/add")
215 .expect("Could not parse Url"),
216 };
217
218 let ports = get_four_ports();
219 let mut dispatcher = super::Dispatcher::new(&get_bind_addresses(ports))
220 .expect("Could not create dispatcher");
221
222 let context = zmq::Context::new();
223 let job_source = context
224 .socket(zmq::PULL)
225 .expect("Could not create PULL end of job socket");
226 job_source
227 .connect(&format!("tcp://127.0.0.1:{}", ports.0))
228 .expect("Could not bind to PULL end of job socket");
229
230 assert!(dispatcher.send_job_for_execution(&payload).is_ok());
231 }
232
233 #[test]
234 #[serial]
235 fn send_context_job() {
236 let payload = Payload {
237 job_id: 0,
238 input_set: vec![],
239 implementation_url: Url::parse("context://stdio/stdout").expect("Could not parse Url"),
240 };
241
242 let ports = get_four_ports();
243 let mut dispatcher = super::Dispatcher::new(&get_bind_addresses(ports))
244 .expect("Could not create dispatcher");
245
246 let context = zmq::Context::new();
247 let context_job_source = context
248 .socket(zmq::PULL)
249 .expect("Could not create PULL end of context-job socket");
250 context_job_source
251 .connect(&format!("tcp://127.0.0.1:{}", ports.1))
252 .expect("Could not bind to PULL end of job-source socket");
253
254 assert!(dispatcher.send_job_for_execution(&payload).is_ok());
255 }
256
257 #[test]
258 #[serial]
259 fn get_job() {
260 let ports = get_four_ports();
261 let mut dispatcher = super::Dispatcher::new(&get_bind_addresses(ports))
262 .expect("Could not create dispatcher");
263
264 let context = zmq::Context::new();
265 let results_sink = context
266 .socket(zmq::PUSH)
267 .expect("Could not create PUSH end of results socket");
268 results_sink
269 .connect(&format!("tcp://127.0.0.1:{}", ports.2))
270 .expect("Could not connect to PULL end of results socket");
271 let result: Result<(Option<Value>, RunAgain)> = Ok((None, DONT_RUN_AGAIN));
272 results_sink
273 .send(
274 serde_json::to_string(&(0, result))
275 .expect("Could not convert to serde")
276 .as_bytes(),
277 0,
278 )
279 .expect("Could not send result of Job");
280
281 assert!(dispatcher.get_next_result(true).is_ok());
282 }
283}