Skip to main content

omnia_nats/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg(not(target_arch = "wasm32"))]
3
4mod blobstore;
5mod keyvalue;
6mod messaging;
7
8use std::sync::Arc;
9
10use anyhow::{Context, Result};
11use async_nats::AuthError;
12use omnia::Backend;
13use tracing::instrument;
14
15/// NATS backend client for messaging, key-value, and blobstore.
16#[derive(Debug, Clone)]
17pub struct Client {
18    inner: async_nats::Client,
19    topics: Option<Vec<String>>,
20}
21
22impl Backend for Client {
23    type ConnectOptions = ConnectOptions;
24
25    #[instrument]
26    async fn connect_with(options: Self::ConnectOptions) -> Result<Self> {
27        let mut nats_opts = async_nats::ConnectOptions::new();
28
29        if let Some(jwt) = &options.jwt
30            && let Some(seed) = &options.seed
31        {
32            let key_pair = nkeys::KeyPair::from_seed(seed).context("creating KeyPair")?;
33            let key_pair = Arc::new(key_pair);
34            nats_opts = nats_opts.jwt(jwt.clone(), move |nonce| {
35                let key_pair = Arc::clone(&key_pair);
36                async move { key_pair.sign(&nonce).map_err(AuthError::new) }
37            });
38        }
39
40        let client = nats_opts.connect(&options.address).await.context("connecting to NATS")?;
41
42        Ok(Self {
43            inner: client,
44            topics: options.topics,
45        })
46    }
47}
48
49#[allow(missing_docs)]
50mod config {
51    use fromenv::{FromEnv, ParseResult};
52
53    /// Connection options for the NATS backend.
54    #[derive(Debug, Clone, FromEnv)]
55    pub struct ConnectOptions {
56        /// NATS server address.
57        #[env(from = "NATS_ADDR", default = "demo.nats.io")]
58        pub address: String,
59        /// Optional topics for subscription mode.
60        #[env(from = "NATS_TOPICS", with = split)]
61        pub topics: Option<Vec<String>>,
62        /// Optional JWT used for NATS authentication.
63        #[env(from = "NATS_JWT")]
64        pub jwt: Option<String>,
65        /// Optional `NKey` seed used to sign server nonce challenges.
66        #[env(from = "NATS_SEED")]
67        pub seed: Option<String>,
68    }
69
70    #[allow(clippy::unnecessary_wraps)]
71    fn split(s: &str) -> ParseResult<Vec<String>> {
72        Ok(s.split(',').map(ToOwned::to_owned).collect())
73    }
74}
75pub use config::ConnectOptions;
76
77impl omnia::FromEnv for ConnectOptions {
78    fn from_env() -> Result<Self> {
79        Self::from_env().finalize().context("issue loading connection options")
80    }
81}