1#![doc = include_str!("../README.md")]
2#![cfg(not(target_arch = "wasm32"))]
3
4mod blobstore;
5mod keyvalue;
6mod messaging;
7
8use std::sync::Arc;
9
10use anyhow::{Context, Result};
11use async_nats::AuthError;
12use omnia::Backend;
13use tracing::instrument;
14
15#[derive(Debug, Clone)]
17pub struct Client {
18 inner: async_nats::Client,
19 topics: Option<Vec<String>>,
20}
21
22impl Backend for Client {
23 type ConnectOptions = ConnectOptions;
24
25 #[instrument]
26 async fn connect_with(options: Self::ConnectOptions) -> Result<Self> {
27 let mut nats_opts = async_nats::ConnectOptions::new();
28
29 if let Some(jwt) = &options.jwt
30 && let Some(seed) = &options.seed
31 {
32 let key_pair = nkeys::KeyPair::from_seed(seed).context("creating KeyPair")?;
33 let key_pair = Arc::new(key_pair);
34 nats_opts = nats_opts.jwt(jwt.clone(), move |nonce| {
35 let key_pair = Arc::clone(&key_pair);
36 async move { key_pair.sign(&nonce).map_err(AuthError::new) }
37 });
38 }
39
40 let client = nats_opts.connect(&options.address).await.context("connecting to NATS")?;
41
42 Ok(Self {
43 inner: client,
44 topics: options.topics,
45 })
46 }
47}
48
49#[allow(missing_docs)]
50mod config {
51 use fromenv::{FromEnv, ParseResult};
52
53 #[derive(Debug, Clone, FromEnv)]
55 pub struct ConnectOptions {
56 #[env(from = "NATS_ADDR", default = "demo.nats.io")]
58 pub address: String,
59 #[env(from = "NATS_TOPICS", with = split)]
61 pub topics: Option<Vec<String>>,
62 #[env(from = "NATS_JWT")]
64 pub jwt: Option<String>,
65 #[env(from = "NATS_SEED")]
67 pub seed: Option<String>,
68 }
69
70 #[allow(clippy::unnecessary_wraps)]
71 fn split(s: &str) -> ParseResult<Vec<String>> {
72 Ok(s.split(',').map(ToOwned::to_owned).collect())
73 }
74}
75pub use config::ConnectOptions;
76
77impl omnia::FromEnv for ConnectOptions {
78 fn from_env() -> Result<Self> {
79 Self::from_env().finalize().context("issue loading connection options")
80 }
81}