zmq2 0.5.0

Maintained high-level bindings to the zeromq library
Documentation
#![crate_name = "lbbroker"]

//! load balancing broker
//! clients and workers here are shown in process

use zmq2::SNDMORE;
//use std::time::;
use std::thread;

//  Basic request-reply client using REQ socket
//  Because s_send and s_recv can't handle 0MQ binary identities, we
//  set a printable text identity to allow routing.
fn client_task(client_nbr: i32) {
    //create context and client socket
    let context = zmq2::Context::new();
    let client = context.socket(zmq2::REQ).unwrap();

    //set random indentity string and connect
    let identity = format!("Client{}", client_nbr);
    client.set_identity(identity.as_bytes()).unwrap();
    client
        .connect("ipc://frontend.ipc")
        .expect("failed connecting client");

    //send request, get reply
    client
        .send("HELLO", 0)
        .expect("client failed sending request");
    let reply = client
        .recv_string(0)
        .expect("client failed receiving reply")
        .unwrap();
    println!("Client: {}", reply);
}

fn worker_task(worker_nbr: i32) {
    let context = zmq2::Context::new();
    let worker = context.socket(zmq2::REQ).unwrap();
    let identity = format!("Worker{}", worker_nbr);
    worker.set_identity(identity.as_bytes()).unwrap();
    assert!(worker.connect("ipc://backend.ipc").is_ok());

    // Tell the broker we're ready for work
    worker.send("READY", 0).unwrap();

    loop {
        //Read and save all frames until we get an empty frame
        //In this example there is only 1 but there could be more
        let identity = worker
            .recv_string(0)
            .expect("worker failed receiving id")
            .unwrap();
        let empty = worker
            .recv_string(0)
            .expect("worker failed receving empty")
            .unwrap();
        assert!(empty.is_empty());
        // Get workload from broker, until finished
        let request = worker.recv_string(0).unwrap().unwrap();
        println!("Worker: {}", request);
        worker
            .send(&identity, SNDMORE)
            .expect("worker failed sending identity");
        worker
            .send("", SNDMORE)
            .expect("worker failed sending empty frame");
        worker.send("OK", 0).expect("worker failed sending OK");
    }
}

fn main() {
    let worker_pool_size = 3;
    let client_pool_size = 10;
    let context = zmq2::Context::new();
    let frontend = context.socket(zmq2::ROUTER).unwrap();
    let backend = context.socket(zmq2::ROUTER).unwrap();
    frontend
        .bind("ipc://frontend.ipc")
        .expect("failed binding frontend");
    backend
        .bind("ipc://backend.ipc")
        .expect("failed binding backend");
    // While this example runs in a single process, that is just to make
    // it easier to start and stop the example. Each thread has its own
    // context and conceptually acts as a separate process.
    let mut client_thread_pool = Vec::new();
    for client_nbr in 0..client_pool_size {
        let child = thread::spawn(move || {
            client_task(client_nbr);
        });
        client_thread_pool.push(child);
    }

    let mut worker_thread_pool = Vec::new();
    for worker_nbr in 0..worker_pool_size {
        let child = thread::spawn(move || {
            worker_task(worker_nbr);
        });
        worker_thread_pool.push(child);
    }
    //  Here is the main loop for the least-recently-used queue. It has two
    //  sockets; a frontend for clients and a backend for workers. It polls
    //  the backend in all cases, and polls the frontend only when there are
    //  one or more workers ready. This is a neat way to use 0MQ's own queues
    //  to hold messages we're not ready to process yet. When we get a client
    //  reply, we pop the next available worker and send the request to it,
    //  including the originating client identity. When a worker replies,
    //  we requeue that worker and forward the reply to the original
    //  client using the reply envelope.
    let mut client_nbr = client_pool_size;
    let mut worker_queue = Vec::new();
    loop {
        let mut items = [
            backend.as_poll_item(zmq2::POLLIN),
            frontend.as_poll_item(zmq2::POLLIN),
        ];
        let rc = zmq2::poll(
            &mut items[0..if worker_queue.is_empty() { 1 } else { 2 }],
            -1,
        )
        .unwrap();

        if rc == -1 {
            break;
        }

        if items[0].is_readable() {
            let worker_id = backend
                .recv_string(0)
                .expect("backend failed receiving worker id")
                .unwrap();
            assert!(
                backend
                    .recv_string(0)
                    .expect("backend failed receiving empty")
                    .unwrap()
                    == ""
            );
            assert!(worker_queue.len() < (worker_pool_size as usize));
            worker_queue.push(worker_id);
            let client_id = backend
                .recv_string(0)
                .expect("backend failed receiving client id")
                .unwrap();

            //if client reply send rest to front end
            if client_id != "READY" {
                assert!(
                    backend
                        .recv_string(0)
                        .expect("backend failed receiving second empty")
                        .unwrap()
                        == ""
                );
                let reply = backend
                    .recv_string(0)
                    .expect("backend failed receiving client reply")
                    .unwrap();
                frontend
                    .send(&client_id, SNDMORE)
                    .expect("frontend failed sending client id");
                frontend
                    .send("", SNDMORE)
                    .expect("frontend failed sending empty");
                frontend
                    .send(&reply, 0)
                    .expect("frontend failed sending reply");
                client_nbr -= 1;
                if client_nbr == 0 {
                    break;
                }
            }
        }

        if items[1].is_readable() {
            //  Now get next client request, route to last-used worker
            //  Client request is [identity][empty][request]
            let client_id = frontend
                .recv_string(0)
                .expect("frontend failed receiving client id")
                .unwrap();
            assert!(
                frontend
                    .recv_string(0)
                    .expect("frontend failed receiving empty")
                    .unwrap()
                    == ""
            );
            let request = frontend
                .recv_string(0)
                .expect("frontend failed receiving request")
                .unwrap();

            let worker = worker_queue.pop().unwrap();
            backend
                .send(&worker, SNDMORE)
                .expect("backend failed sending worker_id");
            backend
                .send("", SNDMORE)
                .expect("backend failed sending empty");
            backend
                .send(client_id.as_bytes(), SNDMORE)
                .expect("backend failed sending client_id");
            backend
                .send("", SNDMORE)
                .expect("backend faield sending second empty");
            backend
                .send(&request, 0)
                .expect("backend failed sending request");
        }
    }
}