#![crate_name = "lbbroker"]
use zmq2::SNDMORE;
use std::thread;
fn client_task(client_nbr: i32) {
let context = zmq2::Context::new();
let client = context.socket(zmq2::REQ).unwrap();
let identity = format!("Client{}", client_nbr);
client.set_identity(identity.as_bytes()).unwrap();
client
.connect("ipc://frontend.ipc")
.expect("failed connecting client");
client
.send("HELLO", 0)
.expect("client failed sending request");
let reply = client
.recv_string(0)
.expect("client failed receiving reply")
.unwrap();
println!("Client: {}", reply);
}
fn worker_task(worker_nbr: i32) {
let context = zmq2::Context::new();
let worker = context.socket(zmq2::REQ).unwrap();
let identity = format!("Worker{}", worker_nbr);
worker.set_identity(identity.as_bytes()).unwrap();
assert!(worker.connect("ipc://backend.ipc").is_ok());
worker.send("READY", 0).unwrap();
loop {
let identity = worker
.recv_string(0)
.expect("worker failed receiving id")
.unwrap();
let empty = worker
.recv_string(0)
.expect("worker failed receving empty")
.unwrap();
assert!(empty.is_empty());
let request = worker.recv_string(0).unwrap().unwrap();
println!("Worker: {}", request);
worker
.send(&identity, SNDMORE)
.expect("worker failed sending identity");
worker
.send("", SNDMORE)
.expect("worker failed sending empty frame");
worker.send("OK", 0).expect("worker failed sending OK");
}
}
fn main() {
let worker_pool_size = 3;
let client_pool_size = 10;
let context = zmq2::Context::new();
let frontend = context.socket(zmq2::ROUTER).unwrap();
let backend = context.socket(zmq2::ROUTER).unwrap();
frontend
.bind("ipc://frontend.ipc")
.expect("failed binding frontend");
backend
.bind("ipc://backend.ipc")
.expect("failed binding backend");
let mut client_thread_pool = Vec::new();
for client_nbr in 0..client_pool_size {
let child = thread::spawn(move || {
client_task(client_nbr);
});
client_thread_pool.push(child);
}
let mut worker_thread_pool = Vec::new();
for worker_nbr in 0..worker_pool_size {
let child = thread::spawn(move || {
worker_task(worker_nbr);
});
worker_thread_pool.push(child);
}
let mut client_nbr = client_pool_size;
let mut worker_queue = Vec::new();
loop {
let mut items = [
backend.as_poll_item(zmq2::POLLIN),
frontend.as_poll_item(zmq2::POLLIN),
];
let rc = zmq2::poll(
&mut items[0..if worker_queue.is_empty() { 1 } else { 2 }],
-1,
)
.unwrap();
if rc == -1 {
break;
}
if items[0].is_readable() {
let worker_id = backend
.recv_string(0)
.expect("backend failed receiving worker id")
.unwrap();
assert!(
backend
.recv_string(0)
.expect("backend failed receiving empty")
.unwrap()
== ""
);
assert!(worker_queue.len() < (worker_pool_size as usize));
worker_queue.push(worker_id);
let client_id = backend
.recv_string(0)
.expect("backend failed receiving client id")
.unwrap();
if client_id != "READY" {
assert!(
backend
.recv_string(0)
.expect("backend failed receiving second empty")
.unwrap()
== ""
);
let reply = backend
.recv_string(0)
.expect("backend failed receiving client reply")
.unwrap();
frontend
.send(&client_id, SNDMORE)
.expect("frontend failed sending client id");
frontend
.send("", SNDMORE)
.expect("frontend failed sending empty");
frontend
.send(&reply, 0)
.expect("frontend failed sending reply");
client_nbr -= 1;
if client_nbr == 0 {
break;
}
}
}
if items[1].is_readable() {
let client_id = frontend
.recv_string(0)
.expect("frontend failed receiving client id")
.unwrap();
assert!(
frontend
.recv_string(0)
.expect("frontend failed receiving empty")
.unwrap()
== ""
);
let request = frontend
.recv_string(0)
.expect("frontend failed receiving request")
.unwrap();
let worker = worker_queue.pop().unwrap();
backend
.send(&worker, SNDMORE)
.expect("backend failed sending worker_id");
backend
.send("", SNDMORE)
.expect("backend failed sending empty");
backend
.send(client_id.as_bytes(), SNDMORE)
.expect("backend failed sending client_id");
backend
.send("", SNDMORE)
.expect("backend faield sending second empty");
backend
.send(&request, 0)
.expect("backend failed sending request");
}
}
}