ruststream_nats/
publisher.rs1use std::sync::Arc;
4
5use bytes::Bytes;
6use ruststream::{OutgoingMessage, Publisher};
7use tokio::sync::OnceCell;
8
9use crate::{convert::headers_to_nats, error::NatsError};
10
11#[derive(Clone)]
17pub struct NatsPublisher {
18 client: Arc<OnceCell<async_nats::Client>>,
19}
20
21impl std::fmt::Debug for NatsPublisher {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 f.debug_struct("NatsPublisher").finish_non_exhaustive()
24 }
25}
26
27impl NatsPublisher {
28 pub(crate) fn new(client: Arc<OnceCell<async_nats::Client>>) -> Self {
29 Self { client }
30 }
31
32 pub(crate) fn client_clone(&self) -> Result<async_nats::Client, NatsError> {
33 self.client.get().cloned().ok_or(NatsError::NotConnected)
34 }
35}
36
37impl Publisher for NatsPublisher {
38 type Error = NatsError;
39
40 async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
41 let client = self.client_clone()?;
42 let subject = msg.name().to_owned();
43 let payload = Bytes::copy_from_slice(msg.payload());
44 let result = match headers_to_nats(msg.headers()) {
45 Some(headers) => client.publish_with_headers(subject, headers, payload).await,
46 None => client.publish(subject, payload).await,
47 };
48 result.map_err(|err| NatsError::Publish(Box::new(err)))
49 }
50}