use std::fmt::{Debug, Formatter};
use std::sync::OnceLock;
use async_nats::jetstream::AckKind;
use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
use crate::convert::headers_from_nats;
pub enum NatsMessage {
Core(Box<CoreMessage>),
JetStream(Box<JetStreamMessage>),
}
impl Debug for NatsMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Core(_) => f.debug_struct("NatsMessage::Core").finish_non_exhaustive(),
Self::JetStream(_) => f
.debug_struct("NatsMessage::JetStream")
.finish_non_exhaustive(),
}
}
}
pub struct CoreMessage {
inner: async_nats::Message,
headers: Headers,
}
impl Debug for CoreMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CoreMessage")
.field("subject", &self.inner.subject.as_str())
.field("payload_len", &self.inner.payload.len())
.finish_non_exhaustive()
}
}
impl CoreMessage {
pub(crate) fn new(inner: async_nats::Message) -> Self {
let mut headers = headers_from_nats(inner.headers.as_ref());
if let Some(reply) = inner.reply.as_ref() {
headers.insert("reply-to", reply.as_str().to_owned());
}
Self { inner, headers }
}
}
pub struct JetStreamMessage {
inner: async_nats::jetstream::Message,
headers: Headers,
}
impl Debug for JetStreamMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JetStreamMessage")
.field("subject", &self.inner.message.subject.as_str())
.field("payload_len", &self.inner.message.payload.len())
.finish_non_exhaustive()
}
}
impl JetStreamMessage {
pub(crate) fn new(inner: async_nats::jetstream::Message) -> Self {
let headers = headers_from_nats(inner.message.headers.as_ref());
Self { inner, headers }
}
}
fn empty_headers() -> &'static Headers {
static EMPTY: OnceLock<Headers> = OnceLock::new();
EMPTY.get_or_init(Headers::new)
}
impl IncomingMessage for NatsMessage {
fn payload(&self) -> &[u8] {
match self {
Self::Core(m) => &m.inner.payload,
Self::JetStream(m) => &m.inner.message.payload,
}
}
fn headers(&self) -> &Headers {
match self {
Self::Core(m) => &m.headers,
Self::JetStream(m) => &m.headers,
}
}
async fn ack(self) -> Result<(), AckError> {
match self {
Self::Core(_) => Err(AckError::Unsupported),
Self::JetStream(m) => m
.inner
.ack()
.await
.map_err(|err| AckError::Broker(format_err(err))),
}
}
async fn nack(self, requeue: bool) -> Result<(), AckError> {
match self {
Self::Core(_) => Err(AckError::Unsupported),
Self::JetStream(m) => {
let kind = if requeue {
AckKind::Nak(None)
} else {
AckKind::Term
};
m.inner
.ack_with(kind)
.await
.map_err(|err| AckError::Broker(format_err(err)))
}
}
}
}
pub const PARTITION_KEY_HEADER: &str = "nats-partition-key";
impl Partitioned for NatsMessage {
fn partition_key(&self) -> Option<&[u8]> {
self.headers().get(PARTITION_KEY_HEADER)
}
}
fn format_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
where
E: std::fmt::Display + Send + Sync + 'static,
{
let msg = err.to_string();
Box::<dyn std::error::Error + Send + Sync>::from(msg)
}
#[allow(dead_code)]
fn _empty_headers_keepalive() -> &'static Headers {
empty_headers()
}
#[cfg(test)]
mod tests {
use super::*;
fn core_message(reply: Option<&str>) -> NatsMessage {
NatsMessage::Core(Box::new(CoreMessage::new(async_nats::Message {
subject: "subj".into(),
reply: reply.map(Into::into),
payload: bytes::Bytes::from_static(b"x"),
headers: None,
status: None,
description: None,
length: 1,
})))
}
#[test]
fn core_reply_inbox_surfaces_as_reply_to_header() {
let msg = core_message(Some("_INBOX.42"));
assert_eq!(msg.headers().reply_to(), Some("_INBOX.42"));
}
#[test]
fn core_message_without_reply_has_no_reply_to() {
assert_eq!(core_message(None).headers().reply_to(), None);
}
}