ruststream_nats/
publisher.rs1use bytes::Bytes;
4use ruststream::{OutgoingMessage, Publisher};
5
6use crate::{convert::headers_to_nats, error::NatsError};
7
8#[derive(Clone)]
10pub struct NatsPublisher {
11 client: async_nats::Client,
12}
13
14impl std::fmt::Debug for NatsPublisher {
15 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16 f.debug_struct("NatsPublisher").finish_non_exhaustive()
17 }
18}
19
20impl NatsPublisher {
21 pub(crate) const fn new(client: async_nats::Client) -> Self {
22 Self { client }
23 }
24
25 pub(crate) fn client_clone(&self) -> async_nats::Client {
26 self.client.clone()
27 }
28}
29
30impl Publisher for NatsPublisher {
31 type Error = NatsError;
32
33 async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
34 let subject = msg.name().to_owned();
35 let payload = Bytes::copy_from_slice(msg.payload());
36 let result = match headers_to_nats(msg.headers()) {
37 Some(headers) => {
38 self.client
39 .publish_with_headers(subject, headers, payload)
40 .await
41 }
42 None => self.client.publish(subject, payload).await,
43 };
44 result.map_err(|err| NatsError::Publish(Box::new(err)))
45 }
46}