restrepo 0.5.10

A collection of components for building restful webservices with actix-web
Documentation
use std::{fmt::Debug, sync::Arc};

use async_nats::{
    Client, ServerAddr,
    jetstream::{self, Context},
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tracing::instrument;

use topmesys::{EventConsumer, EventMessage};

/// Holds NATS client configuration. Url string can hold multiple server urls, separated by comma.
/// #### Example:
/// ```
/// # use serde_json::{from_value, json};
/// use restrepo::messaging::nats::NatsConfig;
///
/// let nats_cfg: NatsConfig = from_value(json!({
///     "url": "nats://nats-1:4222,nats://nats-2:4222",
///     "stream": "my-stream",
///     "consumer": "my-consumer"
/// })).unwrap();
/// # assert_eq!(nats_cfg.stream(), "my-stream");
/// ```
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct NatsConfig {
    url: String,
    stream: String,
    consumer: String,
}

impl NatsConfig {
    pub fn new(url: String, stream: String, consumer: String) -> Self {
        Self {
            url,
            stream,
            consumer,
        }
    }

    /// Parse NATS server url string and create a connected client.
    pub async fn create_client(&self) -> anyhow::Result<async_nats::Client> {
        let addrs: Vec<ServerAddr> = self
            .url
            .split(",")
            .filter_map(|url| url.parse::<ServerAddr>().ok())
            .collect();
        Ok(async_nats::connect(addrs).await?)
    }

    /// Get configured stream name.
    pub fn stream(&self) -> &str {
        &self.stream
    }

    /// Create stream pull consumer from configured stream and consumer names.
    /// Consumer and stream must already exist on NATS server.
    pub async fn generate_consumer(
        &self,
        client: async_nats::Client,
    ) -> anyhow::Result<async_nats::jetstream::consumer::PullConsumer> {
        Ok(async_nats::jetstream::new(client)
            .get_consumer_from_stream(&self.consumer, &self.stream)
            .await?)
    }

    /// Get configured consumer name.
    pub fn consumer(&self) -> &str {
        &self.consumer
    }
}

/// Implements [EventConsumer] trait and handles [EventMessage]s passed in by the [EventBroker](::topmesys::EventBroker).
/// and publishes them to the NATS [stream](jetstream) configured in [Client]. Listens to the NATS [stream](jetstream) and submits
/// received messages as [EventMessage] to the application event bus.
#[derive(Debug)]
pub struct NatsGateway {
    stream: String,
    ctx: Context,
    broker_topic: String,
}

impl NatsGateway {
    #[instrument]
    pub fn new<S: Into<String> + Debug>(client: Client, stream: S, broker_topic: S) -> Self {
        Self {
            stream: stream.into(),
            ctx: jetstream::new(client.clone()),
            broker_topic: broker_topic.into(),
        }
    }

    /// Publish received [EventMessage]s to [jetstream]
    #[instrument(skip(self), level = "debug")]
    pub async fn publish_to_stream(&self, msg: &EventMessage) -> anyhow::Result<()> {
        self.ctx
            .publish(
                format!("{}.{}", self.stream, msg.topic().text()),
                msg.content().clone(),
            )
            .await?
            .await?;
        Ok(())
    }

    pub fn stream(&self) -> &str {
        &self.stream
    }

    pub fn ctx(&self) -> &Context {
        &self.ctx
    }
}

#[async_trait]
impl EventConsumer for NatsGateway {
    fn consumes(&self) -> &String {
        &self.broker_topic
    }

    async fn handle_event(&self, event: Arc<EventMessage>) -> anyhow::Result<()> {
        self.publish_to_stream(event.as_ref()).await
    }
}