use futures_util::stream::StreamExt;
use tokio::time::Instant;
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
let client = async_nats::connect("nats://localhost:4222").await?;
tokio::task::spawn({
let client = client.clone();
async move {
let mut subscriber = client.subscribe("foo").await?;
println!("Awaiting messages on foo");
while let Some(message) = subscriber.next().await {
println!("Received message {message:?}");
}
Ok::<(), async_nats::Error>(())
}
});
tokio::task::spawn({
let client = client.clone();
async move {
let mut subscriber = client.subscribe("bar").await?;
println!("Awaiting messages on bar");
while let Some(message) = subscriber.next().await {
println!("Received message {message:?}");
}
Ok::<(), async_nats::Error>(())
}
});
let foo_pub_handle = tokio::task::spawn({
let client = client.clone();
async move {
let now = Instant::now();
for _ in 0..10_000 {
client.publish("foo", "data".into()).await?;
}
Ok::<std::time::Duration, async_nats::Error>(now.elapsed())
}
});
let bar_pub_handle = tokio::task::spawn({
let client = client.clone();
async move {
let now = Instant::now();
for _ in 0..10_000 {
client.publish("bar", "data".into()).await?;
}
Ok::<std::time::Duration, async_nats::Error>(now.elapsed())
}
});
match futures_util::try_join!(foo_pub_handle, bar_pub_handle) {
Ok((foo_duration, bar_duration)) => println!(
"finished publishing foo in {:?} and bar in {:?}",
foo_duration?, bar_duration?
),
Err(err) => println!("error while in task: {:?} ", err.to_string()),
}
Ok(())
}