1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use futures_util::stream::StreamExt;
use rand::RngExt;
use std::{env, str::from_utf8, time::Duration};
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
// Use the NATS_URL env variable if defined, otherwise fallback
// to the default.
let nats_url = env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
let client = async_nats::connect(nats_url).await?;
// `Subscriber` implements Rust iterator, so we can leverage
// combinators like `take()` to limit the messages intended
// to be consumed for this interaction.
let subscription = client.subscribe("greet.*").await?.take(50);
// Publish set of messages, each with order identifier.
for i in 0..50 {
client
.publish("greet.joe", format!("hello {i}").into())
.await?;
}
// Flush the internal buffer and ensure that all messages are sent.
client.flush().await?;
// Iterate over messages concurrently.
// for_each_concurrent allows us to not wait for time-consuming operation and receive next
// message immediately.
subscription
.for_each_concurrent(25, |message| async move {
let num = rand::rng().random_range(0..500);
tokio::time::sleep(Duration::from_millis(num)).await;
println!(
"received message: {:?}",
from_utf8(&message.payload).unwrap()
)
})
.await;
Ok(())
}