#![crate_name = "rtreq"]
use rand::Rng;
use std::thread;
use std::time::{Duration, Instant};
use zmq2::SNDMORE;
fn hex(bytes: &[u8]) -> String {
bytes
.iter()
.map(|x| format!("{:02x}", x))
.collect::<Vec<_>>()
.join("")
}
fn worker_task() {
let context = zmq2::Context::new();
let worker = context.socket(zmq2::REQ).unwrap();
let mut rng = rand::thread_rng();
let identity: Vec<_> = (0..10).map(|_| rand::random::<u8>()).collect();
worker.set_identity(&identity).unwrap();
assert!(worker.connect("tcp://localhost:5671").is_ok());
let mut total = 0;
loop {
worker.send("Hi boss!", 0).unwrap();
let workload = worker.recv_string(0).unwrap().unwrap();
if workload == "Fired!" {
println!("Worker {} completed {} tasks", hex(&identity), total);
break;
}
total += 1;
thread::sleep(Duration::from_millis(rng.gen_range(1..500)));
}
}
fn main() {
let worker_pool_size = 10;
let allowed_duration = Duration::new(5, 0);
let context = zmq2::Context::new();
let broker = context.socket(zmq2::ROUTER).unwrap();
assert!(broker.bind("tcp://*:5671").is_ok());
let mut thread_pool = Vec::new();
for _ in 0..worker_pool_size {
let child = thread::spawn(move || {
worker_task();
});
thread_pool.push(child);
}
let start_time = Instant::now();
let mut workers_fired = 0;
loop {
let identity = broker.recv_bytes(0).unwrap();
broker.send(&identity, SNDMORE).unwrap();
broker.recv_bytes(0).unwrap(); broker.recv_bytes(0).unwrap(); broker.send("", SNDMORE).unwrap();
if start_time.elapsed() < allowed_duration {
broker.send("Work harder", 0).unwrap();
} else {
broker.send("Fired!", 0).unwrap();
workers_fired += 1;
if workers_fired >= worker_pool_size {
break;
}
}
}
}