wasmcloud-core 0.20.0

wasmCloud core functionality shared throughout the ecosystem
Documentation
//! Common configuration settings for both the NATS messaging provider and the builtin messaging
//! provider for the host. This module requires the `messaging` feature to be enabled
use std::collections::HashMap;

use anyhow::{bail, Context as _, Result};
use serde::{Deserialize, Serialize};

pub const DEFAULT_NATS_URI: &str = "0.0.0.0:4222";
pub const CONFIG_NATS_SUBSCRIPTION: &str = "subscriptions";
pub const CONFIG_NATS_CONSUMERS: &str = "consumers";
pub const CONFIG_NATS_URI: &str = "cluster_uris";
pub const CONFIG_NATS_CLIENT_JWT: &str = "client_jwt";
pub const CONFIG_NATS_CLIENT_SEED: &str = "client_seed";
pub const CONFIG_NATS_TLS_CA: &str = "tls_ca";
pub const CONFIG_NATS_CUSTOM_INBOX_PREFIX: &str = "custom_inbox_prefix";

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ConsumerConfig {
    pub stream: Box<str>,
    pub consumer: Box<str>,
    pub max_messages: Option<usize>,
    pub max_bytes: Option<usize>,
}

/// Configuration for connecting a nats client.
/// More options are available if you use the json than variables in the values string map.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ConnectionConfig {
    /// List of topics to subscribe to
    #[serde(default)]
    pub subscriptions: Box<[async_nats::Subject]>,

    /// List of JetStream consumers
    #[serde(default)]
    pub consumers: Box<[ConsumerConfig]>,

    /// Cluster(s) to make a subscription on and connect to
    #[serde(default)]
    pub cluster_uris: Box<[Box<str>]>,

    /// Auth JWT to use (if necessary)
    #[serde(default)]
    pub auth_jwt: Option<Box<str>>,

    /// Auth seed to use (if necessary)
    #[serde(default)]
    pub auth_seed: Option<Box<str>>,

    /// TLS Certificate Authority, encoded as a string
    #[serde(default)]
    pub tls_ca: Option<Box<str>>,

    /// TLS Certificate Authority, as a path on disk
    #[serde(default)]
    pub tls_ca_file: Option<Box<str>>,

    /// Ping interval in seconds
    #[serde(default)]
    pub ping_interval_sec: Option<u16>,

    /// Inbox prefix to use (by default
    #[serde(default)]
    pub custom_inbox_prefix: Option<Box<str>>,
}

impl ConnectionConfig {
    /// Merge a given [`ConnectionConfig`] with another, coalescing fields and overriding
    /// where necessary
    pub fn merge(&self, extra: &ConnectionConfig) -> ConnectionConfig {
        let mut out = self.clone();
        if !extra.subscriptions.is_empty() {
            out.subscriptions.clone_from(&extra.subscriptions);
        }
        if !extra.consumers.is_empty() {
            out.consumers.clone_from(&extra.consumers);
        }
        // If the default configuration has a URL in it, and then the link definition
        // also provides a URL, the assumption is to replace/override rather than combine
        // the two into a potentially incompatible set of URIs
        if !extra.cluster_uris.is_empty() {
            out.cluster_uris.clone_from(&extra.cluster_uris);
        }
        if extra.auth_jwt.is_some() {
            out.auth_jwt.clone_from(&extra.auth_jwt);
        }
        if extra.auth_seed.is_some() {
            out.auth_seed.clone_from(&extra.auth_seed);
        }
        if extra.tls_ca.is_some() {
            out.tls_ca.clone_from(&extra.tls_ca);
        }
        if extra.tls_ca_file.is_some() {
            out.tls_ca_file.clone_from(&extra.tls_ca_file);
        }
        if extra.ping_interval_sec.is_some() {
            out.ping_interval_sec = extra.ping_interval_sec;
        }
        if extra.custom_inbox_prefix.is_some() {
            out.custom_inbox_prefix
                .clone_from(&extra.custom_inbox_prefix);
        }
        out
    }
}

impl Default for ConnectionConfig {
    fn default() -> ConnectionConfig {
        ConnectionConfig {
            subscriptions: Box::default(),
            consumers: Box::default(),
            cluster_uris: Box::from([DEFAULT_NATS_URI.into()]),
            auth_jwt: None,
            auth_seed: None,
            tls_ca: None,
            tls_ca_file: None,
            ping_interval_sec: None,
            custom_inbox_prefix: None,
        }
    }
}

impl ConnectionConfig {
    /// Construct configuration from the passed hostdata config
    pub fn from_map(values: &HashMap<String, String>) -> Result<ConnectionConfig> {
        let mut config = ConnectionConfig::default();

        if let Some(sub) = values.get(CONFIG_NATS_SUBSCRIPTION) {
            config.subscriptions = sub.split(',').map(async_nats::Subject::from).collect();
        }
        if let Some(cons) = values.get(CONFIG_NATS_CONSUMERS) {
            config.consumers = serde_json::from_str(cons).context("failed to parse `consumers`")?;
        }
        if let Some(url) = values.get(CONFIG_NATS_URI) {
            config.cluster_uris = url.split(',').map(Box::from).collect();
        }
        if let Some(custom_inbox_prefix) = values.get(CONFIG_NATS_CUSTOM_INBOX_PREFIX) {
            config.custom_inbox_prefix = Some(custom_inbox_prefix.as_str().into());
        }
        if let Some(jwt) = values.get(CONFIG_NATS_CLIENT_JWT) {
            config.auth_jwt = Some(jwt.as_str().into());
        }
        if let Some(seed) = values.get(CONFIG_NATS_CLIENT_SEED) {
            config.auth_seed = Some(seed.as_str().into());
        }
        if let Some(tls_ca) = values.get(CONFIG_NATS_TLS_CA) {
            config.tls_ca = Some(tls_ca.as_str().into());
        }
        if config.auth_jwt.is_some() && config.auth_seed.is_none() {
            bail!("if you specify jwt, you must also specify a seed");
        }

        Ok(config)
    }
}

/// Adds the given CA cert to the provided [`async_nats::ConnectOptions`].
///
/// This follows the builder pattern in that it returns the same `ConnectOptions` with TLS
/// configured
pub fn add_tls_ca(
    tls_ca: &str,
    opts: async_nats::ConnectOptions,
) -> anyhow::Result<async_nats::ConnectOptions> {
    let ca = rustls_pemfile::read_one(&mut tls_ca.as_bytes()).context("failed to read CA")?;
    let mut roots = async_nats::rustls::RootCertStore::empty();
    if let Some(rustls_pemfile::Item::X509Certificate(ca)) = ca {
        roots.add_parsable_certificates([ca]);
    } else {
        bail!("tls ca: invalid certificate type, must be a DER encoded PEM file")
    };
    let tls_client = async_nats::rustls::ClientConfig::builder()
        .with_root_certificates(roots)
        .with_no_client_auth();
    Ok(opts.tls_client_config(tls_client).require_tls(true))
}