extern crate env_logger;
extern crate futures;
extern crate log;
extern crate tokio;
extern crate tokio_zmq;
extern crate zmq;
use std::sync::Arc;
use futures::{stream::iter_ok, Future, Stream};
use tokio_zmq::{prelude::*, Multipart, Req};
fn build_multipart(i: usize) -> Multipart {
let mut multipart = Multipart::new();
let msg1 = zmq::Message::from(&format!("Hewwo? {}", i));
let msg2 = zmq::Message::from(&format!("Mr Obama??? {}", i));
multipart.push_back(msg1);
multipart.push_back(msg2);
multipart
}
fn main() {
env_logger::init();
let ctx = Arc::new(zmq::Context::new());
let req_fut = Req::builder(ctx).connect("tcp://localhost:5560").build();
let runner = req_fut.and_then(|req| {
req.send(build_multipart(0)).and_then(|req| {
let (sink, stream) = req.sink_stream(25).split();
stream
.zip(iter_ok(1..10_000))
.map(|(multipart, i)| {
for msg in multipart {
if let Some(msg) = msg.as_str() {
println!("Received: {}", msg);
}
}
build_multipart(i)
})
.forward(sink)
})
});
tokio::run(runner.map(|_| ()).or_else(|e| {
println!("Error: {:?}", e);
Ok(())
}));
}