Skip to main content

ruststream_nats/
publisher.rs

1//! Publishes core NATS messages over an `async-nats` client.
2
3use bytes::Bytes;
4use ruststream::{OutgoingMessage, Publisher};
5
6use crate::{convert::headers_to_nats, error::NatsError};
7
8/// NATS publisher built on top of [`async_nats::Client`]. Cheap to clone.
9#[derive(Clone)]
10pub struct NatsPublisher {
11    client: async_nats::Client,
12}
13
14impl std::fmt::Debug for NatsPublisher {
15    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16        f.debug_struct("NatsPublisher").finish_non_exhaustive()
17    }
18}
19
20impl NatsPublisher {
21    pub(crate) const fn new(client: async_nats::Client) -> Self {
22        Self { client }
23    }
24
25    pub(crate) fn client_clone(&self) -> async_nats::Client {
26        self.client.clone()
27    }
28}
29
30impl Publisher for NatsPublisher {
31    type Error = NatsError;
32
33    async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
34        let subject = msg.name().to_owned();
35        let payload = Bytes::copy_from_slice(msg.payload());
36        let result = match headers_to_nats(msg.headers()) {
37            Some(headers) => {
38                self.client
39                    .publish_with_headers(subject, headers, payload)
40                    .await
41            }
42            None => self.client.publish(subject, payload).await,
43        };
44        result.map_err(|err| NatsError::Publish(Box::new(err)))
45    }
46}