extern crate env_logger;
extern crate futures;
extern crate log;
extern crate tokio;
extern crate tokio_executor;
extern crate tokio_zmq;
extern crate zmq;
use std::{env, sync::Arc, thread};
use futures::{stream::iter_ok, Future, Stream};
use tokio_zmq::{prelude::*, Dealer, Multipart, Pub, Rep, Req, Router, Sub};
const CLIENT_REQUESTS: usize = 1000;
pub struct Stop;
impl ControlHandler for Stop {
fn should_stop(&mut self, _: Multipart) -> bool {
println!("Got stop signal");
true
}
}
fn client() {
let ctx = Arc::new(zmq::Context::new());
let req_fut = Req::builder(Arc::clone(&ctx))
.connect("tcp://localhost:5559")
.build();
let zpub_fut = Pub::builder(Arc::clone(&ctx)).bind("tcp://*:5561").build();
println!("Sending 'Hewwo?' for 0");
let runner = req_fut
.and_then(|req| {
req.send(zmq::Message::from("Hewwo?").into())
.and_then(|req| {
let (sink, stream) = req.sink_stream(25).split();
stream
.zip(iter_ok(1..CLIENT_REQUESTS))
.map(|(multipart, request_nbr)| {
for msg in multipart {
if let Some(msg) = msg.as_str() {
println!("Received reply {} {}", request_nbr, msg);
}
}
println!("Sending 'Hewwo?' for {}", request_nbr);
zmq::Message::from("Hewwo?").into()
})
.forward(sink)
})
})
.join(zpub_fut)
.and_then(move |((stream, _sink), zpub)| {
stream
.into_future()
.map_err(|(e, _)| e)
.and_then(|(maybe_last_item, _stream)| {
if let Some(multipart) = maybe_last_item {
for msg in multipart {
if let Some(msg) = msg.as_str() {
println!("Received last reply {}", msg);
}
}
}
let msg = zmq::Message::from("");
zpub.send(msg.into())
})
});
tokio::run(runner.map(|_| ()).or_else(|e| {
println!("Error in client: {:?}", e);
Ok(())
}));
}
fn worker() {
let ctx = Arc::new(zmq::Context::new());
let rep_fut = Rep::builder(Arc::clone(&ctx))
.connect("tcp://localhost:5560")
.build();
let cmd_fut = Sub::builder(Arc::clone(&ctx))
.connect("tcp://localhost:5561")
.filter(b"")
.build();
let runner = rep_fut.join(cmd_fut).and_then(|(rep, cmd)| {
let (rep_sink, rep_stream) = rep.sink_stream(25).split();
rep_stream
.controlled(cmd.stream(), Stop)
.map(|multipart| {
for msg in multipart {
if let Some(msg) = msg.as_str() {
println!("Received request: {}", msg);
} else {
println!("Received unparsable request: {:?}", msg);
}
}
let msg = zmq::Message::from("Mr Obama???");
msg.into()
})
.forward(rep_sink)
});
tokio::run(runner.map(|_| ()).or_else(|e| {
println!("Error in worker: {:?}", e);
Ok(())
}));
}
fn broker() {
let ctx = Arc::new(zmq::Context::new());
let router_fut = Router::builder(Arc::clone(&ctx))
.bind("tcp://*:5559")
.build();
let dealer_fut = Dealer::builder(Arc::clone(&ctx))
.bind("tcp://*:5560")
.build();
let cmd1_fut = Sub::builder(Arc::clone(&ctx))
.connect("tcp://localhost:5561")
.filter(b"")
.build();
let cmd2_fut = Sub::builder(Arc::clone(&ctx))
.connect("tcp://localhost:5561")
.filter(b"")
.build();
let runner = dealer_fut
.join(router_fut)
.join(cmd1_fut.join(cmd2_fut))
.and_then(|((dealer, router), (cmd1, cmd2))| {
let (dealer_sink, dealer_stream) = dealer.sink_stream(25).split();
let (router_sink, router_stream) = router.sink_stream(25).split();
let d2r = dealer_stream
.controlled(cmd1.stream(), Stop)
.map(|multipart| {
for msg in &multipart {
if let Some(msg) = msg.as_str() {
println!("Relaying message '{}' to router", msg);
} else {
println!("Relaying unknown message to router");
}
}
multipart
})
.forward(router_sink);
let r2d = router_stream
.controlled(cmd2.stream(), Stop)
.map(|multipart| {
for msg in &multipart {
if let Some(msg) = msg.as_str() {
println!("Relaying message '{}' to dealer", msg);
} else {
println!("Relaying unknown message to dealer");
}
}
multipart
})
.forward(dealer_sink);
d2r.join(r2d)
});
tokio::run(runner.map(|_| ()).or_else(|e| {
println!("broker bailed: {:?}", e);
Ok(())
}));
}
#[derive(Debug, PartialEq)]
enum Selection {
All,
Broker,
Worker,
Client,
}
impl Selection {
fn broker(&self) -> bool {
*self == Selection::All || *self == Selection::Broker
}
fn worker(&self) -> bool {
*self == Selection::All || *self == Selection::Worker
}
fn client(&self) -> bool {
*self == Selection::All || *self == Selection::Client
}
}
fn main() {
env_logger::init();
let selection = env::var("SELECTION").unwrap_or_else(|_| "all".into());
let selection = match selection.as_ref() {
"broker" => Selection::Broker,
"worker" => Selection::Worker,
"client" => Selection::Client,
_ => Selection::All,
};
println!("SELECTION: {:?}", selection);
let mut broker_thread = None;
let mut worker_thread = None;
let mut client_thread = None;
if selection.broker() {
broker_thread = Some(thread::spawn(broker));
}
if selection.worker() {
worker_thread = Some(thread::spawn(worker));
}
if selection.client() {
client_thread = Some(thread::spawn(client));
}
if let Some(broker_thread) = broker_thread {
broker_thread.join().unwrap();
}
if let Some(worker_thread) = worker_thread {
worker_thread.join().unwrap();
}
if let Some(client_thread) = client_thread {
client_thread.join().unwrap();
}
}