Skip to main content

ruststream_nats/
publisher.rs

1//! Publishes core NATS messages over an `async-nats` client.
2
3use async_nats::Client;
4use bytes::Bytes;
5use ruststream::{OutgoingMessage, Publisher};
6use std::fmt::{Debug, Formatter};
7use std::sync::Arc;
8use tokio::sync::OnceCell;
9
10use crate::{convert::headers_to_nats, error::NatsError};
11
12/// NATS publisher built on top of [`async_nats::Client`]. Cheap to clone.
13///
14/// Holds the broker's shared connection cell, so a publisher created before the broker connects
15/// resolves the client on first use; publishing before [`Broker::connect`](ruststream::Broker::connect)
16/// returns [`NatsError::NotConnected`].
17#[derive(Clone)]
18pub struct NatsPublisher {
19    client: Arc<OnceCell<Client>>,
20}
21
22impl Debug for NatsPublisher {
23    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
24        f.debug_struct("NatsPublisher").finish_non_exhaustive()
25    }
26}
27
28impl NatsPublisher {
29    pub(crate) fn new(client: Arc<OnceCell<Client>>) -> Self {
30        Self { client }
31    }
32
33    pub(crate) fn client_clone(&self) -> Result<Client, NatsError> {
34        self.client.get().cloned().ok_or(NatsError::NotConnected)
35    }
36}
37
38impl Publisher for NatsPublisher {
39    type Error = NatsError;
40
41    async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
42        let client = self.client_clone()?;
43        let subject = msg.name().to_owned();
44        let payload = Bytes::copy_from_slice(msg.payload());
45        let result = match headers_to_nats(msg.headers()) {
46            Some(headers) => client.publish_with_headers(subject, headers, payload).await,
47            None => client.publish(subject, payload).await,
48        };
49        result.map_err(|err| NatsError::Publish(Box::new(err)))
50    }
51}