restrepo 0.5.12

A collection of components for building restful webservices with actix-web
Documentation
use std::{fmt::Debug, sync::Arc};

use async_nats::{
    Client,
    jetstream::{self, Context},
};
use async_trait::async_trait;
use tracing::instrument;

use topmesys::{EventConsumer, EventMessage};

/// Implements [EventConsumer] trait and handles [EventMessage]s passed in by the [EventBroker](::topmesys::EventBroker).
/// and publishes them to the NATS [stream](jetstream) configured in [Client]. Listens to the NATS [stream](jetstream) and submits
/// received messages as [EventMessage] to the application event bus.
#[derive(Debug)]
pub struct NatsGateway {
    stream: String,
    ctx: Context,
    broker_topic: String,
}

impl NatsGateway {
    #[instrument]
    pub fn new<S: Into<String> + Debug>(client: Client, stream: S, broker_topic: S) -> Self {
        Self {
            stream: stream.into(),
            ctx: jetstream::new(client.clone()),
            broker_topic: broker_topic.into(),
        }
    }

    /// Publish received [EventMessage]s to [jetstream]
    #[instrument(skip(self), level = "debug")]
    pub async fn publish_to_stream(&self, msg: &EventMessage) -> anyhow::Result<()> {
        self.ctx
            .publish(
                format!("{}.{}", self.stream, msg.topic().text()),
                msg.content().clone(),
            )
            .await?
            .await?;
        Ok(())
    }

    pub fn stream(&self) -> &str {
        &self.stream
    }

    pub fn ctx(&self) -> &Context {
        &self.ctx
    }
}

#[async_trait]
impl EventConsumer for NatsGateway {
    fn consumes(&self) -> &String {
        &self.broker_topic
    }

    async fn handle_event(&self, event: Arc<EventMessage>) -> anyhow::Result<()> {
        self.publish_to_stream(event.as_ref()).await
    }
}