wrpc_cli/
nats.rs

1use anyhow::Context as _;
2use tokio::sync::mpsc;
3
4pub const DEFAULT_URL: &str = "nats://127.0.0.1:4222";
5
6/// Connect to NATS.io server and ensure that the connection is fully established before
7/// returning the resulting [`async_nats::Client`]
8pub async fn connect(addrs: impl async_nats::ToServerAddrs) -> anyhow::Result<async_nats::Client> {
9    let (conn_tx, mut conn_rx) = mpsc::channel(1);
10    let client = async_nats::connect_with_options(
11        addrs,
12        async_nats::ConnectOptions::new()
13            .retry_on_initial_connect()
14            .event_callback(move |event| {
15                let conn_tx = conn_tx.clone();
16                async move {
17                    if let async_nats::Event::Connected = event {
18                        conn_tx
19                            .send(())
20                            .await
21                            .expect("failed to send NATS.io server connection notification");
22                    }
23                }
24            }),
25    )
26    .await
27    .context("failed to connect to NATS.io server")?;
28    conn_rx
29        .recv()
30        .await
31        .context("failed to await NATS.io server connection to be established")?;
32    Ok(client)
33}