use anyhow::anyhow;
use grpc_build_core::NamedMessage;
use lapin::{
options::{BasicPublishOptions, ConfirmSelectOptions},
publisher_confirm::Confirmation,
BasicProperties,
};
use opentelemetry::{
trace::{FutureExt, TraceContextExt},
KeyValue,
};
use crate::{ext::utc_to_timestamp, protogen::amqpsy::message::AmqpMessageWrapper, telemetry};
use super::{new_ampq_channel, AmqpConfig};
#[derive(Clone)]
pub struct AmqpPublisher<Msg: NamedMessage + prost::Message> {
_msg: std::marker::PhantomData<Msg>,
exchange_name: String,
option: BasicPublishOptions,
props: BasicProperties,
channel: lapin::Channel,
#[cfg(feature = "chaos")]
chaos: crate::chaos::Chaos,
}
#[derive(Clone)]
pub struct AmqpCommandPublisher<Msg: NamedMessage + prost::Message>(AmqpPublisher<Msg>);
#[derive(Clone)]
pub struct AmqpEventPublisher<Msg: NamedMessage + prost::Message>(AmqpPublisher<Msg>);
impl<Msg: NamedMessage + prost::Message> AmqpPublisher<Msg> {
pub async fn new_for_command(
config: &AmqpConfig,
exchange_name: &str,
) -> anyhow::Result<AmqpCommandPublisher<Msg>> {
let channel = new_ampq_channel(config).await?;
channel
.confirm_select(ConfirmSelectOptions::default())
.await?;
let props = BasicProperties::default().with_type(Msg::NAME.into());
Ok(AmqpCommandPublisher(AmqpPublisher {
_msg: Default::default(),
exchange_name: exchange_name.to_string(),
option: BasicPublishOptions {
mandatory: true,
..<_>::default()
},
#[cfg(feature = "chaos")]
chaos: crate::chaos::Chaos::new(),
props,
channel,
}))
}
pub async fn new_for_event(
config: &AmqpConfig,
exchange_name: &str,
) -> anyhow::Result<AmqpEventPublisher<Msg>> {
let channel = new_ampq_channel(config).await?;
channel
.confirm_select(ConfirmSelectOptions::default())
.await?;
let props = BasicProperties::default().with_type(Msg::NAME.into());
Ok(AmqpEventPublisher(AmqpPublisher {
_msg: Default::default(),
exchange_name: exchange_name.to_string(),
option: BasicPublishOptions {
mandatory: false,
..<_>::default()
},
#[cfg(feature = "chaos")]
chaos: crate::chaos::Chaos::new(),
props,
channel,
}))
}
pub async fn new_for_event_without_publisher_confirmation(
config: &AmqpConfig,
exchange_name: &str,
) -> anyhow::Result<AmqpEventPublisher<Msg>> {
let channel = new_ampq_channel(config).await?;
let props = BasicProperties::default().with_type(Msg::NAME.into());
Ok(AmqpEventPublisher(AmqpPublisher {
_msg: Default::default(),
exchange_name: exchange_name.to_string(),
option: BasicPublishOptions {
mandatory: false,
..<_>::default()
},
#[cfg(feature = "chaos")]
chaos: crate::chaos::Chaos::new(),
props,
channel,
}))
}
async fn publish(&self, message: Msg) -> anyhow::Result<Confirmation> {
tracing::debug!("Publishing {} into {}", Msg::NAME, self.exchange_name);
let now = chrono::Utc::now();
let props = self
.props
.clone()
.with_content_type("application/x-protobuf".into())
.with_timestamp(epoch_now());
let message_id = format!("msg_{}", ulid::Ulid::new().to_string().to_lowercase());
let mut wrapped_message = AmqpMessageWrapper {
id: message_id.clone(),
exchange: self.exchange_name.clone(),
routing_key: Msg::NAME.to_string(),
headers: <_>::default(), created_at: Some(utc_to_timestamp(now)),
payload: proto_to_vec(message)?,
};
let otel_context = telemetry::put_trace_context(
&mut wrapped_message,
&format!("publish_{}", Msg::NAME),
vec![
KeyValue::new("amqpsy.amqp.message.id", message_id),
KeyValue::new("amqpsy.amqp.message.routing_key", Msg::NAME.to_string()),
KeyValue::new("amqpsy.amqp.message.exchange", self.exchange_name.clone()),
KeyValue::new("amqpsy.amqp.message.published_at", now.to_rfc3339()),
],
opentelemetry::Context::current(),
);
let wrapped_message_buff = proto_to_vec(wrapped_message)?;
#[cfg(feature = "chaos")]
if self.chaos.should_publish_duplicate() {
otel_context
.span()
.add_event("Chaos: Publishing duplicate message", vec![]);
_ = self
.publish_(props.clone(), otel_context.clone(), &wrapped_message_buff)
.await;
} else if self.chaos.should_publish_fail() {
otel_context
.span()
.add_event("Chaos: Publishing failed", vec![]);
return Err(anyhow::anyhow!("chaos: Publish failed"));
}
let confirm = self
.publish_(props, otel_context, &wrapped_message_buff)
.await?;
Ok(confirm)
}
async fn publish_(
&self,
props: lapin::protocol::basic::AMQPProperties,
otel_context: opentelemetry::Context,
wrapped_message_buff: &[u8],
) -> Result<Confirmation, anyhow::Error> {
Ok(self
.channel
.basic_publish(
&self.exchange_name,
Msg::NAME,
self.option,
wrapped_message_buff,
props,
)
.await
.inspect_err(|e| tracing::warn!("Error publishing: {}", e))?
.with_context(otel_context)
.await?)
}
}
fn proto_to_vec<Msg: NamedMessage + prost::Message>(
message: Msg,
) -> Result<Vec<u8>, anyhow::Error> {
let mut buffer = Vec::new();
message.encode(&mut buffer)?;
Ok(buffer)
}
#[derive(Debug, thiserror::Error)]
pub enum AmqpPublisherError {
#[error("Publisher confirm error: {0}")]
PublisherConfirmError(lapin::Error),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl<Msg: NamedMessage + prost::Message> AmqpCommandPublisher<Msg> {
#[tracing::instrument(name = "Publish Command", skip_all, fields(
amqpsy.amqp.exchange = %self.0.exchange_name,
amqpsy.amqp.routing_key = %Msg::NAME
))]
pub async fn publish(&self, message: Msg) -> Result<(), AmqpPublisherError> {
self.0
.publish(message)
.await
.map(|_| ())
.map_err(|e| AmqpPublisherError::Other(anyhow!("{}", e)))
}
}
impl<Msg: NamedMessage + prost::Message> AmqpEventPublisher<Msg> {
#[tracing::instrument(name = "Publish Event", skip_all, fields(
amqpsy.amqp.exchange = %self.0.exchange_name,
amqpsy.amqp.routing_key = %Msg::NAME
))]
pub async fn publish(&self, message: Msg) -> anyhow::Result<()> {
self.0.publish(message).await.map(|_| ())
}
}
fn epoch_now() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as _
}