use crate::canonical_message::tracing_support::LazyMessageIds;
use crate::models::NatsConfig;
use crate::traits::{
BatchCommitFunc, BoxFuture, ConsumerError, EndpointStatus, MessageConsumer, MessageDisposition,
MessagePublisher, PublisherError, ReceivedBatch, Sent, SentBatch,
};
use crate::CanonicalMessage;
use crate::APP_NAME;
use anyhow::{anyhow, Context};
use async_nats::connection::State;
use async_nats::jetstream::consumer::pull;
use async_nats::{header::HeaderMap, jetstream, jetstream::stream, ConnectOptions};
use async_trait::async_trait;
use futures::{FutureExt, StreamExt, TryStreamExt};
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, UnixTime};
use rustls::{ClientConfig, DigitallySignedStruct, Error as RustlsError, SignatureScheme};
use std::io::BufReader;
use std::sync::Arc;
use tracing::{info, trace, warn};
use uuid::Uuid;
enum NatsClient {
Core(async_nats::Client),
JetStream(jetstream::Context),
}
pub struct NatsPublisher {
client: NatsClient,
core_client: async_nats::Client,
subject: String,
delayed_ack: bool,
request_reply: bool,
request_timeout: std::time::Duration,
}
impl NatsPublisher {
pub async fn new(config: &NatsConfig) -> anyhow::Result<Self> {
let subject = config
.subject
.as_deref()
.ok_or_else(|| anyhow!("Subject is required for NATS publisher"))?;
let stream_name = if !config.no_jetstream {
config
.stream
.as_deref()
.ok_or_else(|| anyhow!("stream must be provided when JetStream is enabled"))?
} else {
config.stream.as_deref().unwrap_or_default()
};
let options = build_nats_options(config).await?;
let nats_client = options.connect(&config.url).await?;
let core_client = nats_client.clone();
let client = if !config.no_jetstream {
let jetstream = jetstream::new(nats_client);
info!(stream = %stream_name, "Ensuring NATS JetStream stream exists");
let subjects = if subject.contains('>') || subject.contains('*') {
vec![subject.to_string()]
} else {
vec![format!("{}.>", stream_name)]
};
jetstream
.get_or_create_stream(stream::Config {
name: stream_name.to_string(),
subjects,
max_messages: config.stream_max_messages.unwrap_or(1_000_000),
max_bytes: config.stream_max_bytes.unwrap_or(1024 * 1024 * 1024), ..Default::default()
})
.await?;
NatsClient::JetStream(jetstream)
} else {
info!("NATS publisher is in Core mode (non-persistent).");
if config.delayed_ack {
tracing::debug!("'delayed_ack' is true but NATS is in Core mode, which always performs fire and forget. The flag will be ignored.");
}
NatsClient::Core(nats_client)
};
Ok(Self {
client,
core_client,
subject: subject.to_string(),
delayed_ack: config.delayed_ack,
request_reply: config.request_reply,
request_timeout: std::time::Duration::from_millis(
config.request_timeout_ms.unwrap_or(30_000),
),
})
}
}
#[async_trait]
impl MessagePublisher for NatsPublisher {
async fn send(&self, message: CanonicalMessage) -> Result<Sent, PublisherError> {
trace!(
subject = %self.subject,
message_id = %format!("{:032x}", message.message_id),
payload_size = message.payload.len(),
"Publishing NATS message"
);
let mut headers = if !message.metadata.is_empty() {
let mut headers = HeaderMap::new();
for (key, value) in &message.metadata {
headers.insert(key.as_str(), value.as_str());
}
headers
} else {
HeaderMap::new()
};
headers.insert(
"mq_bridge.message_id",
format!("{:032x}", message.message_id).as_str(),
);
if self.request_reply {
let response = tokio::time::timeout(
self.request_timeout,
self.core_client.request_with_headers(
self.subject.clone(),
headers,
message.payload,
),
)
.await
.map_err(|_| PublisherError::Retryable(anyhow!("NATS request timed out")))?
.map_err(|e| PublisherError::Retryable(anyhow!("NATS request failed: {}", e)))?;
let response_msg = create_nats_canonical_message(&response, None, false);
return Ok(Sent::Response(response_msg));
}
match &self.client {
NatsClient::JetStream(jetstream) => {
tracing::trace!("Publishing to NATS JetStream subject: {}", self.subject);
let ack_future = jetstream
.publish_with_headers(self.subject.clone(), headers, message.payload)
.await
.context("Failed to publish to NATS JetStream")?;
tracing::trace!("Published to NATS JetStream, waiting for ack");
if !self.delayed_ack {
match tokio::time::timeout(std::time::Duration::from_secs(5), ack_future).await
{
Ok(Ok(_)) => tracing::trace!("Ack received"),
Ok(Err(e)) => {
return Err(PublisherError::Retryable(anyhow!(
"NATS Ack failed: {}",
e
)))
}
Err(_) => {
return Err(PublisherError::Retryable(anyhow!("NATS Ack timed out")))
}
}
}
}
NatsClient::Core(client) => {
client
.publish_with_headers(self.subject.clone(), headers, message.payload)
.await
.context("Failed to publish to NATS Core")?;
}
}
Ok(Sent::Ack)
}
async fn send_batch(
&self,
messages: Vec<CanonicalMessage>,
) -> Result<SentBatch, PublisherError> {
trace!(
subject = %self.subject,
count = messages.len(),
message_ids = ?LazyMessageIds(&messages),
"Publishing batch of NATS messages"
);
if self.request_reply {
return crate::traits::send_batch_helper(self, messages, |p, m| Box::pin(p.send(m)))
.await;
}
match &self.client {
NatsClient::JetStream(_jetstream) => {
crate::traits::send_batch_helper(self, messages, |p, m| Box::pin(p.send(m))).await
}
NatsClient::Core(_) => {
crate::traits::send_batch_helper(self, messages, |p, m| Box::pin(p.send(m))).await
}
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn flush(&self) -> anyhow::Result<()> {
self.core_client
.flush()
.await
.map_err(|e| anyhow!("NATS flush failed: {}", e))
}
async fn status(&self) -> EndpointStatus {
EndpointStatus {
healthy: self.core_client.connection_state() == State::Connected,
target: self.subject.clone(),
pending: None,
capacity: None,
error: if self.core_client.connection_state() == State::Connected {
None
} else {
Some("Disconnected".to_string())
},
..Default::default()
}
}
}
enum NatsCore {
Ephemeral(async_nats::Subscriber),
JetStream {
consumer: Box<jetstream::consumer::Consumer<pull::Config>>,
stream: Box<jetstream::consumer::pull::Stream>,
},
}
pub struct NatsConsumer {
core: NatsCore,
client: async_nats::Client,
subject: String,
}
use std::any::Any;
impl NatsConsumer {
pub async fn new(config: &NatsConfig) -> anyhow::Result<Self> {
let subject = config
.subject
.as_deref()
.ok_or_else(|| anyhow!("Subject is required for NATS consumer"))?;
let stream_name = config
.stream
.as_deref()
.ok_or_else(|| anyhow!("Stream name is required for NATS consumer"))?;
let deliver_policy = match config.deliver_policy {
Some(crate::models::NatsDeliverPolicy::All) | None => {
jetstream::consumer::DeliverPolicy::All
}
Some(crate::models::NatsDeliverPolicy::Last) => {
jetstream::consumer::DeliverPolicy::Last
}
Some(crate::models::NatsDeliverPolicy::New) => jetstream::consumer::DeliverPolicy::New,
Some(crate::models::NatsDeliverPolicy::LastPerSubject) => {
jetstream::consumer::DeliverPolicy::LastPerSubject
}
};
let (durable_name, queue_group) = if config.subscriber_mode {
(None, None)
} else {
let durable = format!("{}-{}-{}", APP_NAME, stream_name, subject.replace('.', "-"));
let queue = format!("{}-{}", APP_NAME, stream_name.replace('.', "-"));
(Some(durable), Some(queue))
};
let (core, client) = NatsCore::connect(
config,
stream_name,
subject,
durable_name,
deliver_policy,
queue_group,
)
.await?;
Ok(Self {
core,
client,
subject: subject.to_string(),
})
}
}
#[async_trait]
impl MessageConsumer for NatsConsumer {
async fn receive_batch(&mut self, max_messages: usize) -> Result<ReceivedBatch, ConsumerError> {
self.core
.receive_batch(max_messages, &self.subject, &self.client)
.await
}
async fn status(&self) -> EndpointStatus {
let mut healthy = self.client.connection_state() == State::Connected;
let mut pending = None;
let mut error = None;
if healthy {
match &self.core {
NatsCore::Ephemeral(_sub) => {
pending = None;
}
NatsCore::JetStream { consumer, .. } => match consumer.get_info().await {
Ok(info) => {
pending = Some(info.num_pending.try_into().unwrap_or(usize::MAX));
}
Err(e) => {
healthy = false;
error = Some(format!("Failed to get consumer info: {}", e));
}
},
}
} else {
error = Some(format!(
"Disconnected: {:?}",
self.client.connection_state()
));
}
EndpointStatus {
healthy,
target: self.subject.clone(),
pending,
error,
..Default::default()
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
async fn build_nats_options(config: &NatsConfig) -> anyhow::Result<ConnectOptions> {
let mut options = if let Some(token) = &config.token {
ConnectOptions::with_token(token.clone())
} else if let (Some(user), Some(pass)) = (&config.username, &config.password) {
ConnectOptions::with_user_and_password(user.clone(), pass.clone())
} else {
ConnectOptions::new()
};
if !config.tls.required {
return Ok(options);
}
let mut root_store = rustls::RootCertStore::empty();
if let Some(ca_file) = &config.tls.ca_file {
let mut pem = BufReader::new(std::fs::File::open(ca_file)?);
for cert in rustls_pemfile::certs(&mut pem) {
root_store.add(cert?)?;
}
}
let tls_config = if config.tls.is_mtls_client_configured() {
let cert_file = config.tls.cert_file.as_ref().unwrap();
let key_file = config.tls.key_file.as_ref(); let mut client_auth_certs = Vec::new();
let mut pem = BufReader::new(std::fs::File::open(cert_file)?);
for cert in rustls_pemfile::certs(&mut pem) {
client_auth_certs.push(cert?);
}
let mut client_auth_key = None;
if let Some(key_file) = key_file {
let key_bytes = tokio::fs::read(key_file).await?;
let mut keys: Vec<_> = rustls_pemfile::pkcs8_private_keys(&mut key_bytes.as_slice())
.collect::<Result<_, _>>()?;
if !keys.is_empty() {
client_auth_key = Some(PrivateKeyDer::Pkcs8(keys.remove(0)));
}
}
let tls_config_builder =
ClientConfig::builder_with_provider(crate::endpoints::get_crypto_provider()?)
.with_protocol_versions(&[&rustls::version::TLS13])?
.with_root_certificates(root_store);
let tls_config_builder = tls_config_builder.with_client_auth_cert(
client_auth_certs,
client_auth_key
.ok_or_else(|| anyhow!("Client key is required but not found or invalid"))?,
)?;
tls_config_builder
} else {
ClientConfig::builder_with_provider(crate::endpoints::get_crypto_provider()?)
.with_safe_default_protocol_versions()?
.with_root_certificates(root_store)
.with_no_client_auth()
};
if config.tls.accept_invalid_certs {
#[derive(Debug)]
struct NoopServerCertVerifier {
supported_schemes: Vec<SignatureScheme>,
}
impl ServerCertVerifier for NoopServerCertVerifier {
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName,
_ocsp_response: &[u8],
_now: UnixTime,
) -> Result<ServerCertVerified, RustlsError> {
Ok(ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, RustlsError> {
Ok(HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, RustlsError> {
Ok(HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
self.supported_schemes.clone()
}
}
let schemes = crate::endpoints::get_crypto_provider()?
.signature_verification_algorithms
.supported_schemes();
let verifier = NoopServerCertVerifier {
supported_schemes: schemes,
};
let mut new_tls_config = tls_config;
new_tls_config
.dangerous()
.set_certificate_verifier(Arc::new(verifier));
options = options.tls_client_config(new_tls_config);
} else {
options = options.tls_client_config(tls_config);
}
Ok(options)
}
impl NatsCore {
async fn connect(
config: &NatsConfig,
stream_name: &str,
subject: &str,
durable_name: Option<String>,
deliver_policy: jetstream::consumer::DeliverPolicy,
queue_group: Option<String>,
) -> anyhow::Result<(Self, async_nats::Client)> {
let options = build_nats_options(config).await?;
let client = options.connect(&config.url).await?;
let client_clone = client.clone();
if !config.no_jetstream {
let jetstream = jetstream::new(client);
info!(stream = %stream_name, subject = %subject, "NATS endpoint is in JetStream mode.");
jetstream
.get_or_create_stream(stream::Config {
name: stream_name.to_string(),
subjects: vec![subject.to_string()],
max_messages: config.stream_max_messages.unwrap_or(1_000_000),
max_bytes: config.stream_max_bytes.unwrap_or(1024 * 1024 * 1024), ..Default::default()
})
.await?;
let stream = jetstream.get_stream(stream_name).await?;
let max_ack_pending = config.prefetch_count.unwrap_or(10000) as i64;
let consumer = stream
.create_consumer(jetstream::consumer::pull::Config {
durable_name,
filter_subject: subject.to_string(),
deliver_policy,
max_ack_pending,
..Default::default()
})
.await?;
let stream = consumer.messages().await?;
info!(stream = %stream_name, subject = %subject, "NATS JetStream subscribed");
Ok((
NatsCore::JetStream {
consumer: Box::new(consumer),
stream: Box::new(stream),
},
client_clone,
))
} else {
info!(subject = %subject, "NATS endpoint is in Core mode.");
let sub = if let Some(qg) = queue_group {
info!(queue_group = %qg, "Using queue subscription");
client.queue_subscribe(subject.to_string(), qg).await?
} else {
client.subscribe(subject.to_string()).await?
};
client.flush().await?;
info!(subject = %subject, "NATS Core subscribed");
Ok((NatsCore::Ephemeral(sub), client_clone))
}
}
async fn receive_batch(
&mut self,
max_messages: usize,
subject: &str,
client: &async_nats::Client,
) -> Result<ReceivedBatch, ConsumerError> {
if max_messages == 0 {
return Ok(ReceivedBatch {
messages: Vec::new(),
commit: Box::new(|_| Box::pin(async { Ok(()) })),
});
}
match self {
NatsCore::JetStream { stream, .. } => {
let mut canonical_messages = Vec::with_capacity(max_messages);
let mut jetstream_messages = Vec::with_capacity(max_messages);
tracing::trace!("Waiting for next NATS JetStream message");
let message_stream = stream.next().await;
tracing::trace!("Received NATS JetStream message");
match message_stream {
Some(Ok(first_message)) => {
let sequence = first_message.info().ok().map(|meta| meta.stream_sequence);
canonical_messages.push(create_nats_canonical_message(
&first_message,
sequence,
false,
));
jetstream_messages.push(first_message);
}
Some(Err(e)) => return Err(ConsumerError::Connection(anyhow::anyhow!(e))),
None => {
return Err(ConsumerError::Connection(anyhow::anyhow!(
"NATS JetStream ended"
)))
}
}
while canonical_messages.len() < max_messages {
match stream.try_next().now_or_never() {
Some(Ok(Some(message))) => {
let sequence = message.info().ok().map(|meta| meta.stream_sequence);
canonical_messages
.push(create_nats_canonical_message(&message, sequence, false));
jetstream_messages.push(message);
}
_ => break, }
}
trace!(count = canonical_messages.len(), subject = %subject, message_ids = ?LazyMessageIds(&canonical_messages), "Received batch of NATS JetStream messages");
let client = client.clone();
let commit_closure: BatchCommitFunc = Box::new(move |dispositions| {
Box::pin(async move {
if dispositions.len() != jetstream_messages.len() {
tracing::warn!(
"NATS JetStream batch reply count mismatch: received {} messages but got {} responses. Pairing up to the shorter length.",
jetstream_messages.len(),
dispositions.len()
);
}
handle_jetstream_replies(&client, &jetstream_messages, &dispositions).await;
handle_jetstream_acks(jetstream_messages, dispositions).await?;
Ok(())
}) as BoxFuture<'static, anyhow::Result<()>>
});
Ok(ReceivedBatch {
messages: canonical_messages,
commit: commit_closure,
})
}
NatsCore::Ephemeral(sub) => {
let mut messages = Vec::with_capacity(max_messages);
let mut reply_subjects = Vec::with_capacity(max_messages);
if let Some(message) = sub.next().await {
reply_subjects.push(message.reply.clone());
messages.push(create_nats_canonical_message(&message, None, true));
while messages.len() < max_messages {
match sub.next().now_or_never() {
Some(Some(message)) => {
reply_subjects.push(message.reply.clone());
messages.push(create_nats_canonical_message(&message, None, true))
}
_ => break,
}
}
} else {
return Err(ConsumerError::Connection(anyhow::anyhow!(
"NATS Core subscription ended"
)));
}
let client = client.clone();
let commit_closure: BatchCommitFunc = Box::new(move |dispositions| {
Box::pin(async move {
let mut sent_reply = false;
if dispositions.len() != reply_subjects.len() {
tracing::warn!(
"NATS Core batch reply count mismatch: received {} messages but got {} responses. Pairing up to the shorter length.",
reply_subjects.len(),
dispositions.len()
);
}
for (reply_opt, disposition) in reply_subjects.iter().zip(dispositions) {
if let (Some(reply), MessageDisposition::Reply(resp)) =
(reply_opt, disposition)
{
let publish_result = tokio::time::timeout(
std::time::Duration::from_secs(60),
client.publish(reply.clone(), resp.payload),
)
.await;
match publish_result {
Err(_) => {
tracing::error!(
subject = %reply,
"Failed to publish NATS reply (timeout)"
);
}
Ok(Err(e)) => {
tracing::error!(
subject = %reply,
error = %e,
"Failed to publish NATS reply"
);
}
Ok(Ok(_)) => {
sent_reply = true;
}
}
}
}
if sent_reply {
client.flush().await.map_err(|e| {
anyhow::anyhow!("Failed to flush NATS replies: {}", e)
})?;
}
Ok(())
}) as BoxFuture<'static, anyhow::Result<()>>
});
trace!(count = messages.len(), subject = %subject, message_ids = ?LazyMessageIds(&messages), "Received batch of NATS Core messages");
Ok(ReceivedBatch {
messages,
commit: commit_closure,
})
}
}
}
}
fn create_nats_canonical_message(
message: &async_nats::Message,
sequence: Option<u64>,
include_native_reply_to: bool,
) -> CanonicalMessage {
let mut message_id: Option<u128> = None;
if let Some(headers) = &message.headers {
if let Some(val) = headers.get("mq_bridge.message_id") {
if let Ok(id) = u128::from_str_radix(val.as_str(), 16) {
message_id = Some(id);
}
}
}
if message_id.is_none() {
message_id = sequence.map(|s| s as u128);
}
if message_id.is_none() {
if let Some(headers) = &message.headers {
if let Some(msg_id_header) = headers.get("Nats-Msg-Id") {
let id_str = msg_id_header.as_str();
if let Ok(uuid) = Uuid::parse_str(id_str) {
message_id = Some(uuid.as_u128());
} else if let Ok(n) = id_str.parse::<u128>() {
message_id = Some(n);
} else {
warn!(header_value = %id_str, "Could not parse 'Nats-Msg-Id' header as a UUID or u128");
}
}
}
}
let mut canonical_message = CanonicalMessage::new(message.payload.to_vec(), message_id);
if let Some(headers) = &message.headers {
if !headers.is_empty() {
let mut metadata = std::collections::HashMap::new();
for (key, value) in headers.iter() {
let joined_value = value
.iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(",");
if !joined_value.is_empty() {
metadata.insert(key.to_string(), joined_value);
}
}
canonical_message.metadata = metadata;
}
}
if include_native_reply_to {
if let Some(reply) = &message.reply {
canonical_message
.metadata
.entry("reply_to".to_string())
.or_insert_with(|| reply.to_string());
}
}
canonical_message
}
async fn handle_jetstream_replies(
client: &async_nats::Client,
messages: &[async_nats::jetstream::Message],
dispositions: &[MessageDisposition],
) {
let mut sent_reply = false;
for (msg, disposition) in messages.iter().zip(dispositions.iter()) {
if let Some(reply) = msg.reply.as_ref() {
let payload = match disposition {
MessageDisposition::Reply(resp) => Some(resp.payload.clone()),
_ => None,
};
if let Some(p) = payload {
let publish_result = tokio::time::timeout(
std::time::Duration::from_secs(60),
client.publish(reply.clone(), p),
)
.await;
match publish_result {
Err(_) => {
tracing::error!(subject = %reply, "Failed to publish NATS reply (timeout)");
}
Ok(Err(e)) => {
tracing::error!(subject = %reply, error = %e, "Failed to publish NATS reply");
}
Ok(Ok(_)) => {
sent_reply = true;
}
}
}
}
}
if sent_reply {
if let Err(error) = client.flush().await {
tracing::error!(error = %error, "Failed to flush NATS JetStream replies");
}
}
}
async fn handle_jetstream_acks(
messages: Vec<async_nats::jetstream::Message>,
dispositions: Vec<MessageDisposition>,
) -> anyhow::Result<()> {
let ack_futures =
messages
.into_iter()
.zip(dispositions)
.map(|(message, disposition)| async move {
match disposition {
MessageDisposition::Ack | MessageDisposition::Reply(_) => message
.ack()
.await
.map_err(|e| anyhow!("Failed to ACK NATS message: {}", e)),
MessageDisposition::Nack => message
.ack_with(async_nats::jetstream::AckKind::Nak(None))
.await
.map_err(|e| anyhow!("Failed to NAK NATS message: {}", e)),
}
});
let results: Vec<Result<(), anyhow::Error>> = futures::stream::iter(ack_futures)
.buffer_unordered(100)
.collect()
.await;
for res in results {
if let Err(e) = res {
tracing::error!(error = %e, "NATS JetStream ack failed");
return Err(e);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
fn nats_message(reply: Option<&str>, headers: Option<HeaderMap>) -> async_nats::Message {
let payload = Bytes::from_static(b"payload");
async_nats::Message {
subject: async_nats::Subject::from_static("test.subject"),
reply: reply.map(|reply| async_nats::Subject::from(reply.to_string())),
length: payload.len(),
payload,
headers,
status: None,
description: None,
}
}
#[test]
fn core_native_reply_is_mapped_to_reply_to_metadata() {
let message = nats_message(Some("_INBOX.reply"), None);
let canonical = create_nats_canonical_message(&message, None, true);
assert_eq!(
canonical.metadata.get("reply_to").map(String::as_str),
Some("_INBOX.reply")
);
}
#[test]
fn jetstream_ack_reply_is_not_mapped_to_reply_to_metadata() {
let message = nats_message(Some("$JS.ACK.test-stream.consumer.1.1"), None);
let canonical = create_nats_canonical_message(&message, Some(1), false);
assert!(!canonical.metadata.contains_key("reply_to"));
}
#[test]
fn jetstream_preserves_explicit_reply_to_header() {
let mut headers = HeaderMap::new();
headers.insert("reply_to", "app.reply.subject");
let message = nats_message(Some("$JS.ACK.test-stream.consumer.1.1"), Some(headers));
let canonical = create_nats_canonical_message(&message, Some(1), false);
assert_eq!(
canonical.metadata.get("reply_to").map(String::as_str),
Some("app.reply.subject")
);
}
#[test]
fn core_native_reply_does_not_overwrite_explicit_reply_to_header() {
let mut headers = HeaderMap::new();
headers.insert("reply_to", "app.reply.subject");
let message = nats_message(Some("_INBOX.native"), Some(headers));
let canonical = create_nats_canonical_message(&message, None, true);
assert_eq!(
canonical.metadata.get("reply_to").map(String::as_str),
Some("app.reply.subject")
);
}
#[test]
fn core_native_reply_true_with_no_reply_is_absent() {
let message = nats_message(None, None);
let canonical = create_nats_canonical_message(&message, None, true);
assert!(!canonical.metadata.contains_key("reply_to"));
}
}