Skip to main content

ruststream_nats/
publisher.rs

1//! Publishes core NATS messages over an `async-nats` client.
2
3use std::sync::Arc;
4
5use bytes::Bytes;
6use ruststream::{OutgoingMessage, Publisher};
7use tokio::sync::OnceCell;
8
9use crate::{convert::headers_to_nats, error::NatsError};
10
11/// NATS publisher built on top of [`async_nats::Client`]. Cheap to clone.
12///
13/// Holds the broker's shared connection cell, so a publisher created before the broker connects
14/// resolves the client on first use; publishing before [`Broker::connect`](ruststream::Broker::connect)
15/// returns [`NatsError::NotConnected`].
16#[derive(Clone)]
17pub struct NatsPublisher {
18    client: Arc<OnceCell<async_nats::Client>>,
19}
20
21impl std::fmt::Debug for NatsPublisher {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        f.debug_struct("NatsPublisher").finish_non_exhaustive()
24    }
25}
26
27impl NatsPublisher {
28    pub(crate) fn new(client: Arc<OnceCell<async_nats::Client>>) -> Self {
29        Self { client }
30    }
31
32    pub(crate) fn client_clone(&self) -> Result<async_nats::Client, NatsError> {
33        self.client.get().cloned().ok_or(NatsError::NotConnected)
34    }
35}
36
37impl Publisher for NatsPublisher {
38    type Error = NatsError;
39
40    async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
41        let client = self.client_clone()?;
42        let subject = msg.name().to_owned();
43        let payload = Bytes::copy_from_slice(msg.payload());
44        let result = match headers_to_nats(msg.headers()) {
45            Some(headers) => client.publish_with_headers(subject, headers, payload).await,
46            None => client.publish(subject, payload).await,
47        };
48        result.map_err(|err| NatsError::Publish(Box::new(err)))
49    }
50}