ruststream_nats/
publisher.rs1use async_nats::Client;
4use bytes::Bytes;
5use ruststream::{OutgoingMessage, Publisher};
6use std::fmt::{Debug, Formatter};
7use std::sync::Arc;
8use tokio::sync::OnceCell;
9
10use crate::{convert::headers_to_nats, error::NatsError};
11
12#[derive(Clone)]
18pub struct NatsPublisher {
19 client: Arc<OnceCell<Client>>,
20}
21
22impl Debug for NatsPublisher {
23 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
24 f.debug_struct("NatsPublisher").finish_non_exhaustive()
25 }
26}
27
28impl NatsPublisher {
29 pub(crate) fn new(client: Arc<OnceCell<Client>>) -> Self {
30 Self { client }
31 }
32
33 pub(crate) fn client_clone(&self) -> Result<Client, NatsError> {
34 self.client.get().cloned().ok_or(NatsError::NotConnected)
35 }
36}
37
38impl Publisher for NatsPublisher {
39 type Error = NatsError;
40
41 async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
42 let client = self.client_clone()?;
43 let subject = msg.name().to_owned();
44 let payload = Bytes::copy_from_slice(msg.payload());
45 let result = match headers_to_nats(msg.headers()) {
46 Some(headers) => client.publish_with_headers(subject, headers, payload).await,
47 None => client.publish(subject, payload).await,
48 };
49 result.map_err(|err| NatsError::Publish(Box::new(err)))
50 }
51}