mod async_helpers;
use futures::{select, FutureExt};
use std::io::Write;
use std::{error::Error, time::Duration};
use zeromq::{Socket, SocketRecv, SocketSend};
#[async_helpers::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut receiver = zeromq::PullSocket::new();
receiver.connect("tcp://127.0.0.1:5557").await?;
let mut sender = zeromq::PushSocket::new();
sender.connect("tcp://127.0.0.1:5558").await?;
let mut controller = zeromq::SubSocket::new();
controller.connect("tcp://127.0.0.1:5559").await?;
controller.subscribe("").await?;
loop {
select! {
message = receiver.recv().fuse() => {
let message = message.unwrap();
let workload = String::from_utf8(message.get(0).unwrap().to_vec())?
.parse()
.expect("Couldn't parse u64 from data");
async_helpers::sleep(Duration::from_millis(workload)).await;
sender.send(message).await?;
print!(".");
std::io::stdout().flush()?;
},
_kill = controller.recv().fuse() => {
break
}
};
}
println!("Done");
receiver.close().await;
sender.close().await;
controller.close().await;
Ok(())
}