extern crate futures;
extern crate tokio;
extern crate tokio_timer;
extern crate tokio_zmq;
extern crate zmq;
use std::{
io,
sync::Arc,
time::{Duration, Instant},
};
use futures::{Future, Stream};
use tokio_timer::{Error as TimerError, Interval};
use tokio_zmq::prelude::*;
use tokio_zmq::{Error as ZmqFutError, Pub};
#[derive(Debug)]
enum Error {
ZmqFut(ZmqFutError),
Zmq(zmq::Error),
Io(io::Error),
Timer(TimerError),
}
impl From<ZmqFutError> for Error {
fn from(e: ZmqFutError) -> Self {
Error::ZmqFut(e)
}
}
impl From<zmq::Error> for Error {
fn from(e: zmq::Error) -> Self {
Error::Zmq(e)
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::Io(e)
}
}
impl From<TimerError> for Error {
fn from(e: TimerError) -> Self {
Error::Timer(e)
}
}
fn main() {
let ctx = Arc::new(zmq::Context::new());
let zpub_fut = Pub::builder(ctx).bind("tcp://*:5556").build();
let producer = zpub_fut.from_err().and_then(|zpub| {
Interval::new(Instant::now(), Duration::from_secs(1))
.map_err(Error::from)
.and_then(|_| {
println!("Sending 'Hello'");
Ok(zmq::Message::from("Hello").into())
})
.forward(zpub.sink(25))
});
tokio::run(producer.map(|_| ()).or_else(|e| {
println!("Error in producer: {:?}", e);
Ok(())
}));
}