amqpsy 0.1.0

Extremely opinionated AMQP PubSub library
Documentation
use anyhow::anyhow;
use grpc_build_core::NamedMessage;
use lapin::{
    options::{BasicPublishOptions, ConfirmSelectOptions},
    publisher_confirm::Confirmation,
    BasicProperties,
};
use opentelemetry::{
    trace::{FutureExt, TraceContextExt},
    KeyValue,
};

use crate::{ext::utc_to_timestamp, protogen::amqpsy::message::AmqpMessageWrapper, telemetry};

use super::{new_ampq_channel, AmqpConfig};

/// Opinionated AMQP publisher.
///
/// Use one of the `new_for_*` methods to create a publisher.
/// See the crate documentation for more information.
#[derive(Clone)]
pub struct AmqpPublisher<Msg: NamedMessage + prost::Message> {
    _msg: std::marker::PhantomData<Msg>,
    exchange_name: String,
    option: BasicPublishOptions,
    props: BasicProperties,
    channel: lapin::Channel,
    #[cfg(feature = "chaos")]
    chaos: crate::chaos::Chaos,
}

/// Publish a command.
/// Use [`AmqpPublisher::new_for_command`] to create this type.
#[derive(Clone)]
pub struct AmqpCommandPublisher<Msg: NamedMessage + prost::Message>(AmqpPublisher<Msg>);

/// Publish an event.
/// Use [`AmqpPublisher::new_for_event`] to create this type.
#[derive(Clone)]
pub struct AmqpEventPublisher<Msg: NamedMessage + prost::Message>(AmqpPublisher<Msg>);

impl<Msg: NamedMessage + prost::Message> AmqpPublisher<Msg> {
    /// Create opinionated publisher for commands.
    ///
    /// * Publisher confirm is enabled - to ensure message is delivered to the broker.
    /// * Mandatory routing is enabled - to ensure there is a queue to receive the message.
    pub async fn new_for_command(
        config: &AmqpConfig,
        exchange_name: &str,
    ) -> anyhow::Result<AmqpCommandPublisher<Msg>> {
        let channel = new_ampq_channel(config).await?;

        // Publisher confirm
        channel
            .confirm_select(ConfirmSelectOptions::default())
            .await?;

        let props = BasicProperties::default().with_type(Msg::NAME.into());

        Ok(AmqpCommandPublisher(AmqpPublisher {
            _msg: Default::default(),
            exchange_name: exchange_name.to_string(),
            option: BasicPublishOptions {
                mandatory: true,
                ..<_>::default()
            },
            #[cfg(feature = "chaos")]
            chaos: crate::chaos::Chaos::new(),
            props,
            channel,
        }))
    }

    /// Create opinionated publisher for events.
    ///
    /// * Publisher confirm is enabled - to ensure message is delivered to the broker.
    /// * Mandatory routing is **disabled**.
    pub async fn new_for_event(
        config: &AmqpConfig,
        exchange_name: &str,
    ) -> anyhow::Result<AmqpEventPublisher<Msg>> {
        let channel = new_ampq_channel(config).await?;

        // Publisher confirm
        channel
            .confirm_select(ConfirmSelectOptions::default())
            .await?;

        let props = BasicProperties::default().with_type(Msg::NAME.into());

        Ok(AmqpEventPublisher(AmqpPublisher {
            _msg: Default::default(),
            exchange_name: exchange_name.to_string(),
            option: BasicPublishOptions {
                mandatory: false,
                ..<_>::default()
            },
            #[cfg(feature = "chaos")]
            chaos: crate::chaos::Chaos::new(),
            props,
            channel,
        }))
    }

    /// Create opinionated publisher for events when publisher confirmation is not needed.
    ///
    /// * Publisher confirm is *disabled*.
    /// * Mandatory routing is **disabled**.
    pub async fn new_for_event_without_publisher_confirmation(
        config: &AmqpConfig,
        exchange_name: &str,
    ) -> anyhow::Result<AmqpEventPublisher<Msg>> {
        let channel = new_ampq_channel(config).await?;

        // No Publisher confirm

        let props = BasicProperties::default().with_type(Msg::NAME.into());

        Ok(AmqpEventPublisher(AmqpPublisher {
            _msg: Default::default(),
            exchange_name: exchange_name.to_string(),
            option: BasicPublishOptions {
                mandatory: false,
                ..<_>::default()
            },
            #[cfg(feature = "chaos")]
            chaos: crate::chaos::Chaos::new(),
            props,
            channel,
        }))
    }

    async fn publish(&self, message: Msg) -> anyhow::Result<Confirmation> {
        tracing::debug!("Publishing {} into {}", Msg::NAME, self.exchange_name);

        let now = chrono::Utc::now();
        let props = self
            .props
            .clone()
            .with_content_type("application/x-protobuf".into())
            .with_timestamp(epoch_now());

        // FIXME: lapin can't include arbitrary AMQP header,
        // So, send header in the body of the message

        let message_id = format!("msg_{}", ulid::Ulid::new().to_string().to_lowercase());
        let mut wrapped_message = AmqpMessageWrapper {
            id: message_id.clone(),
            exchange: self.exchange_name.clone(),
            routing_key: Msg::NAME.to_string(),
            headers: <_>::default(), // to be populated by `put_trace_context` below
            created_at: Some(utc_to_timestamp(now)),
            payload: proto_to_vec(message)?,
        };

        let otel_context = telemetry::put_trace_context(
            &mut wrapped_message,
            &format!("publish_{}", Msg::NAME),
            vec![
                KeyValue::new("amqpsy.amqp.message.id", message_id),
                KeyValue::new("amqpsy.amqp.message.routing_key", Msg::NAME.to_string()),
                KeyValue::new("amqpsy.amqp.message.exchange", self.exchange_name.clone()),
                KeyValue::new("amqpsy.amqp.message.published_at", now.to_rfc3339()),
            ],
            opentelemetry::Context::current(),
        );
        let wrapped_message_buff = proto_to_vec(wrapped_message)?;

        #[cfg(feature = "chaos")]
        if self.chaos.should_publish_duplicate() {
            otel_context
                .span()
                .add_event("Chaos: Publishing duplicate message", vec![]);

            _ = self
                .publish_(props.clone(), otel_context.clone(), &wrapped_message_buff)
                .await;
        } else if self.chaos.should_publish_fail() {
            otel_context
                .span()
                .add_event("Chaos: Publishing failed", vec![]);
            return Err(anyhow::anyhow!("chaos: Publish failed"));
        }

        let confirm = self
            .publish_(props, otel_context, &wrapped_message_buff)
            .await?;

        Ok(confirm)
    }

    async fn publish_(
        &self,
        props: lapin::protocol::basic::AMQPProperties,
        otel_context: opentelemetry::Context,
        wrapped_message_buff: &[u8],
    ) -> Result<Confirmation, anyhow::Error> {
        Ok(self
            .channel
            .basic_publish(
                &self.exchange_name,
                Msg::NAME,
                self.option,
                wrapped_message_buff,
                props,
            )
            .await
            .inspect_err(|e| tracing::warn!("Error publishing: {}", e))?
            .with_context(otel_context)
            .await?)
    }
}

fn proto_to_vec<Msg: NamedMessage + prost::Message>(
    message: Msg,
) -> Result<Vec<u8>, anyhow::Error> {
    let mut buffer = Vec::new();
    message.encode(&mut buffer)?;
    Ok(buffer)
}

#[derive(Debug, thiserror::Error)]
pub enum AmqpPublisherError {
    #[error("Publisher confirm error: {0}")]
    PublisherConfirmError(lapin::Error),
    #[error(transparent)]
    Other(#[from] anyhow::Error),
}

impl<Msg: NamedMessage + prost::Message> AmqpCommandPublisher<Msg> {
    #[tracing::instrument(name = "Publish Command", skip_all, fields(
        amqpsy.amqp.exchange = %self.0.exchange_name,
        amqpsy.amqp.routing_key = %Msg::NAME
    ))]
    pub async fn publish(&self, message: Msg) -> Result<(), AmqpPublisherError> {
        self.0
            .publish(message)
            .await
            .map(|_| ())
            .map_err(|e| AmqpPublisherError::Other(anyhow!("{}", e)))
    }
}

impl<Msg: NamedMessage + prost::Message> AmqpEventPublisher<Msg> {
    #[tracing::instrument(name = "Publish Event", skip_all, fields(
        amqpsy.amqp.exchange = %self.0.exchange_name,
        amqpsy.amqp.routing_key = %Msg::NAME
    ))]
    pub async fn publish(&self, message: Msg) -> anyhow::Result<()> {
        self.0.publish(message).await.map(|_| ())
    }
}

fn epoch_now() -> u64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap()
        .as_secs() as _
}