use std::time::Duration;
use log::{debug, trace, error};
use zmq::DONTWAIT;
use serde_json::Value;
use flowcore::RunAgain;
use flowcore::errors::*;
use crate::job::JobPayload;
pub struct Dispatcher {
lib_job_socket: zmq::Socket,
general_job_socket: zmq::Socket,
results_socket: zmq::Socket,
control_socket: zmq::Socket,
}
impl Dispatcher {
pub fn new(job_queues: (String, String, String, String)) -> Result<Self> {
let context = zmq::Context::new();
let lib_job_socket = context.socket(zmq::PUSH)
.map_err(|_| "Could not create job socket")?;
lib_job_socket.bind(&job_queues.0)
.map_err(|_| "Could not bind to job socket")?;
let general_job_socket = context.socket(zmq::PUSH)
.map_err(|_| "Could not create context job socket")?;
general_job_socket.bind(&job_queues.1)
.map_err(|_| "Could not bind to context job socket")?;
let results_socket = context.socket(zmq::PULL)
.map_err(|_| "Could not create results socket")?;
results_socket.bind(&job_queues.2)
.map_err(|_| "Could not bind to results socket")?;
let control_socket = context.socket(zmq::PUB)
.map_err(|_| "Could not create control socket")?;
control_socket.bind(&job_queues.3)
.map_err(|_| "Could not bind to control socket")?;
Ok(Dispatcher {
lib_job_socket,
general_job_socket,
results_socket,
control_socket
})
}
pub(crate) fn set_results_timeout(&mut self, timeout: Option<Duration>) -> Result<()> {
match timeout {
Some(time) => {
debug!("Setting results timeout to: {}ms", time.as_millis());
self.results_socket.set_rcvtimeo(time.as_millis() as i32)
},
None => {
debug!("Disabling results timeout");
self.results_socket.set_rcvtimeo(-1)
},
}.map_err(|e| format!("Error setting results timeout: {e}").into())
}
#[allow(clippy::type_complexity)]
pub(crate) fn get_next_result(&mut self) -> Result<(usize, Result<(Option<Value>, RunAgain)>)> {
let msg = self.results_socket.recv_msg(0)
.map_err(|_| "Error receiving result")?;
let message_string = msg.as_str().ok_or("Could not get message as str")?;
serde_json::from_str(message_string)
.map_err(|_| "Could not Deserialize JobPayload from zmq message string".into())
}
pub(crate) fn send_job_for_execution(&mut self, payload: &JobPayload) -> Result<()> {
if payload.implementation_url.scheme() == "lib" {
self.lib_job_socket.send(serde_json::to_string(payload)?.as_bytes(), 0)
.map_err(|e| format!("Could not send context Job for execution: {e}"))?;
} else {
self.general_job_socket.send(serde_json::to_string(payload)?.as_bytes(), 0)
.map_err(|e| format!("Could not send Job for execution: {e}"))?;
}
trace!("Job #{}: Payload sent for execution", payload.job_id);
Ok(())
}
pub fn send_done(&mut self) -> Result<()> {
debug!("Dispatcher announcing DONE");
self.control_socket.send("DONE".as_bytes(), DONTWAIT)
.chain_err(|| "Could not send 'DONE' message")
}
}
impl Drop for Dispatcher {
fn drop(&mut self) {
if let Err(e) = self.send_done() {
error!("Error while sending DONE while dropping Dispatcher: {e}");
}
}
}
#[cfg(test)]
mod test {
use url::Url;
use serde_json::Value;
use flowcore::RunAgain;
use std::time::Duration;
use serial_test::serial;
use crate::job::JobPayload;
use portpicker::pick_unused_port;
use flowcore::DONT_RUN_AGAIN;
use flowcore::errors::*;
fn get_bind_addresses(ports: (u16, u16, u16, u16)) -> (String, String, String, String) {
(
format!("tcp://*:{}", ports.0),
format!("tcp://*:{}", ports.1),
format!("tcp://*:{}", ports.2),
format!("tcp://*:{}", ports.3),
)
}
fn get_four_ports() -> (u16, u16, u16, u16) {
(pick_unused_port().expect("No ports free"),
pick_unused_port().expect("No ports free"),
pick_unused_port().expect("No ports free"),
pick_unused_port().expect("No ports free"),
)
}
#[test]
#[serial]
fn test_constructor() {
let dispatcher = super::Dispatcher::new(
get_bind_addresses(get_four_ports()));
assert!(dispatcher.is_ok());
}
#[test]
#[serial]
fn set_timeout_to_none() {
let mut dispatcher = super::Dispatcher::new(
get_bind_addresses(get_four_ports())
).expect("Could not create dispatcher");
assert!(dispatcher.set_results_timeout(None).is_ok());
}
#[test]
#[serial]
fn set_timeout() {
let mut dispatcher = super::Dispatcher::new(
get_bind_addresses(get_four_ports())
).expect("Could not create dispatcher");
assert!(dispatcher.set_results_timeout(Some(Duration::from_millis(10))).is_ok());
}
#[test]
#[serial]
fn send_lib_job() {
let payload = JobPayload {
job_id: 0,
input_set: vec![],
implementation_url: Url::parse("lib://flowstdlib/math/add").expect("Could not parse Url"),
};
let ports = get_four_ports();
let mut dispatcher = super::Dispatcher::new(
get_bind_addresses(ports)
).expect("Could not create dispatcher");
let context = zmq::Context::new();
let job_source = context.socket(zmq::PULL)
.expect("Could not create PULL end of job socket");
job_source.connect(&format!("tcp://127.0.0.1:{}", ports.0))
.expect("Could not bind to PULL end of job socket");
assert!(dispatcher.send_job_for_execution(&payload).is_ok());
}
#[test]
#[serial]
fn send_context_job() {
let payload = JobPayload {
job_id: 0,
input_set: vec![],
implementation_url: Url::parse("context://stdio/stdout").expect("Could not parse Url"),
};
let ports = get_four_ports();
let mut dispatcher = super::Dispatcher::new(
get_bind_addresses(ports)
).expect("Could not create dispatcher");
let context = zmq::Context::new();
let context_job_source = context.socket(zmq::PULL)
.expect("Could not create PULL end of context-job socket");
context_job_source.connect(&format!("tcp://127.0.0.1:{}", ports.1))
.expect("Could not bind to PULL end of job-source socket");
assert!(dispatcher.send_job_for_execution(&payload).is_ok());
}
#[test]
#[serial]
fn get_job() {
let ports = get_four_ports();
let mut dispatcher = super::Dispatcher::new(
get_bind_addresses(ports)
).expect("Could not create dispatcher");
let context = zmq::Context::new();
let results_sink = context.socket(zmq::PUSH)
.expect("Could not create PUSH end of results socket");
results_sink.connect(&format!("tcp://127.0.0.1:{}", ports.2))
.expect("Could not connect to PULL end of results socket");
let result:Result<(Option<Value>, RunAgain)> = Ok((None, DONT_RUN_AGAIN));
results_sink.send(serde_json::to_string(&(0, result))
.expect("Could not convert to serde")
.as_bytes(), 0).expect("Could not send result of Job");
assert!(dispatcher.get_next_result().is_ok());
}
}