use futures::{future, SinkExt, StreamExt};
use rand::Rng;
use std::{error::Error, rc::Rc, time::Duration};
use tmq::{dealer, router, Context, Multipart};
use tokio::time::sleep;
async fn client(ctx: Rc<Context>, client_id: u64, frontend: String) -> tmq::Result<()> {
let mut sock = dealer(&ctx).connect(&frontend)?;
let mut rng = rand::thread_rng();
let client_id = client_id.to_string();
let mut request_id = 0;
loop {
println!("Client {} sending request {}", client_id, request_id);
let request_id_str = request_id.to_string();
let msg = vec![client_id.as_bytes(), request_id_str.as_bytes(), b"request"];
sock.send(msg).await?;
let response = sock.next().await.unwrap()?;
let expected: Multipart =
vec![client_id.as_bytes(), request_id_str.as_bytes(), b"response"].into();
assert_eq!(expected, response);
let sleep_time = rng.gen_range(200..1000);
sleep(Duration::from_millis(sleep_time)).await;
request_id += 1;
}
}
async fn worker(ctx: Rc<Context>, worker_id: u64, backend: String) -> Result<(), Box<dyn Error>> {
let mut sock = dealer(&ctx).connect(&backend)?;
let mut rng = rand::thread_rng();
loop {
let mut request = sock.next().await.unwrap()?;
let identity = request.pop_front().unwrap();
let client_id = request.pop_front().unwrap();
let request_id = request.pop_front().unwrap();
let request_body = request.pop_front().unwrap();
println!(
"Worker {} handling request(id={} body={}) from client {}",
worker_id,
request_id.as_str().unwrap(),
request_body.as_str().unwrap(),
client_id.as_str().unwrap()
);
let sleep_time = rng.gen_range(100..3000);
sleep(Duration::from_millis(sleep_time)).await;
let response = vec![identity, client_id, request_id, "response".into()];
sock.send(response).await?;
}
}
async fn proxy(ctx: Rc<Context>, frontend: String, backend: String) -> tmq::Result<()> {
let (mut router_tx, mut router_rx) = router(&ctx).bind(&frontend)?.split();
let (mut dealer_tx, mut dealer_rx) = dealer(&ctx).bind(&backend)?.split();
let mut frontend_fut = router_rx.next();
let mut backend_fut = dealer_rx.next();
loop {
let msg = future::select(frontend_fut, backend_fut).await;
match msg {
future::Either::Left(router_msg) => {
dealer_tx.send(router_msg.0.unwrap()?).await?;
frontend_fut = router_rx.next();
backend_fut = router_msg.1;
}
future::Either::Right(dealer_msg) => {
router_tx.send(dealer_msg.0.unwrap()?).await?;
backend_fut = dealer_rx.next();
frontend_fut = dealer_msg.1;
}
}
}
}
fn main() -> tmq::Result<()> {
let frontend = "tcp://127.0.0.1:5555".to_string();
let backend = "tcp://127.0.0.1:5556".to_string();
let ctx = Rc::new(Context::new());
let mut runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let tasks = tokio::task::LocalSet::new();
for worker_id in 0..2 {
let ctx = ctx.clone();
let backend = backend.clone();
tasks.spawn_local(async move {
worker(ctx, worker_id, backend)
.await
.expect("Worker failed");
});
}
for client_id in 10..13 {
let ctx = ctx.clone();
let frontend = frontend.clone();
tasks.spawn_local(async move {
client(ctx, client_id, frontend)
.await
.expect("Client failed");
});
}
tasks.block_on(&mut runtime, async move {
proxy(ctx.clone(), frontend, backend)
.await
.expect("Proxy failed");
});
Ok(())
}