use std::{sync::Arc, time::Duration};
use anyhow::{bail, Context as _};
use async_nats::jetstream::kv::Store;
use nkeys::KeyPair;
use tracing::{info, instrument};
use crate::workload_identity::{
setup_workload_identity_nats_connect_options, WorkloadIdentityConfig,
};
pub mod builder;
pub mod ctl;
pub mod event;
pub mod policy;
pub mod provider;
pub mod secrets;
pub mod store;
pub async fn connect_nats(
addr: impl async_nats::ToServerAddrs,
jwt: Option<&String>,
key: Option<Arc<KeyPair>>,
require_tls: bool,
request_timeout: Option<Duration>,
workload_identity_config: Option<WorkloadIdentityConfig>,
) -> anyhow::Result<async_nats::Client> {
let opts = match (jwt, key, workload_identity_config) {
(Some(jwt), Some(key), None) => {
async_nats::ConnectOptions::with_jwt(jwt.to_string(), move |nonce| {
let key = key.clone();
async move { key.sign(&nonce).map_err(async_nats::AuthError::new) }
})
.name("wasmbus")
}
(Some(_), None, _) | (None, Some(_), _) => {
bail!("cannot authenticate if only one of jwt or seed is specified")
}
(jwt, key, Some(wid_cfg)) => {
setup_workload_identity_nats_connect_options(jwt, key, wid_cfg).await?
}
_ => {
let mut opts = async_nats::ConnectOptions::new().name("wasmbus");
if let Ok(mut addrs) = addr.to_server_addrs() {
if let Some(addr) = addrs.next() {
if addr.has_user_pass() {
if let (Some(user), Some(pass)) = (addr.username(), addr.password()) {
opts = opts.user_and_password(user.to_string(), pass.to_string());
}
}
}
}
opts
}
};
let opts = if let Some(timeout) = request_timeout {
opts.request_timeout(Some(timeout))
} else {
opts
};
let opts = opts.require_tls(require_tls);
opts.connect(addr)
.await
.context("failed to connect to NATS")
}
#[instrument(level = "debug", skip_all)]
pub(crate) async fn create_bucket(
jetstream: &async_nats::jetstream::Context,
bucket: &str,
) -> anyhow::Result<Store> {
if let Ok(store) = jetstream.get_key_value(bucket).await {
info!(%bucket, "bucket already exists. Skipping creation.");
return Ok(store);
}
match jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: bucket.to_string(),
..Default::default()
})
.await
{
Ok(store) => {
info!(%bucket, "created bucket with 1 replica");
Ok(store)
}
Err(err) => {
Err(anyhow::anyhow!(err).context(format!("failed to create bucket '{bucket}'")))
}
}
}