1pub mod error;
4#[cfg(feature = "jetstream")]
5pub mod jetstream;
6pub use async_nats::ConnectOptions;
7pub use async_nats::Request;
8use async_nats::{Client, HeaderMap, Subscriber};
9use async_trait::async_trait;
10use bytes::Bytes;
11pub use error::Error;
12use futures::{stream::SelectAll, StreamExt};
13use serde::{Deserialize, Serialize};
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, error, info, instrument, trace};
16
17#[derive(Debug)]
19pub struct Handle {
20    cancel: CancellationToken,
21    handle: Option<tokio::task::JoinHandle<()>>,
22}
23
24impl Drop for Handle {
25    fn drop(&mut self) {
26        self.cancel.cancel();
27    }
28}
29#[derive(Clone, Debug)]
31pub struct NatsClient {
32    client: Client,
33}
34
35impl NatsClient {
36    #[instrument(skip_all)]
38    pub async fn new(bind: &[&str]) -> Result<Self, Error> {
39        info!("Connecting to NATS server at {:?}", bind);
40        let client = ConnectOptions::new().connect(bind).await?;
41        info!("Successfully connected to NATS server");
42        Ok(Self { client })
43    }
44    #[instrument(skip_all)]
45    #[cfg(feature = "jetstream")]
47    pub fn jetstream(&self) -> jetstream::JetStream {
48        jetstream::JetStream::new(self.clone())
49    }
50
51    #[instrument(skip_all)]
53    pub async fn with_options(bind: &[&str], options: ConnectOptions) -> Result<Self, Error> {
54        info!("Connecting to NATS server at {:?}", bind);
55
56        let client = options.connect(bind).await?;
57
58        info!("Successfully connected to NATS server");
59        Ok(Self { client })
60    }
61    #[instrument(skip_all)]
63    pub async fn subscribe(&self, subject: impl AsRef<str>) -> Result<Subscriber, Error> {
64        let subject = subject.as_ref().to_owned();
65        info!("Subscribing to subject: {}", subject);
66        trace!("Calling client.subscribe with subject: {}", subject);
67        let subscription = self.client.subscribe(subject.to_owned()).await?;
68        debug!("Successfully subscribed to {}", subject);
69        Ok(subscription)
70    }
71    #[instrument(skip_all)]
73    pub async fn publish(&self, subject: impl AsRef<str>, payload: Bytes) -> Result<(), Error> {
74        let subject = subject.as_ref().to_owned();
75        debug!("Publishing message to subject: {}", subject);
76        trace!("Payload size: {}", payload.len());
77        self.client.publish(subject.to_owned(), payload).await?;
78        debug!("Successfully published to {}", subject);
79        Ok(())
80    }
81    #[instrument(skip_all)]
83    pub async fn request(
84        &self,
85        subject: impl AsRef<str>,
86        payload: Bytes,
87    ) -> Result<Message, Error> {
88        let subject = subject.as_ref().to_owned();
89        debug!("Sending request to subject: {}", subject);
90        trace!("Payload size: {}", payload.len());
91        let response = self.client.request(subject.to_owned(), payload).await?;
92        debug!("Received response from {}", subject);
93        trace!("Response payload size: {}", response.payload.len());
94        Ok(Message(response))
95    }
96    #[instrument(skip_all)]
98    pub async fn request_with_headers(
99        &self,
100        subject: impl AsRef<str>,
101        payload: Bytes,
102        headers: HeaderMap,
103    ) -> Result<Message, Error> {
104        let subject = subject.as_ref().to_owned();
105        debug!("Sending request to subject: {}", subject);
106        trace!("Payload size: {}", payload.len());
107        let response = self
108            .client
109            .request_with_headers(subject.clone(), headers, payload)
110            .await?;
111        debug!("Received response from {}", subject);
112        trace!("Response payload size: {}", response.payload.len());
113        Ok(Message(response))
114    }
115    #[instrument(skip_all)]
117    pub async fn send_request(
118        &self,
119        subject: impl AsRef<str>,
120        req: Request,
121    ) -> Result<Message, Error> {
122        let subject = subject.as_ref().to_owned();
123        debug!("Sending request to subject: {}", subject);
124
125        let response = self.client.send_request(subject.clone(), req).await?;
126
127        debug!("Received response from {}", subject);
128        Ok(Message(response))
129    }
130
131    #[instrument(skip_all)]
133    pub async fn handle<R: MessageProcessor + 'static>(
134        &self,
135        subject: impl AsRef<str>,
136        processor: R,
137    ) -> Result<Handle, Error> {
138        let subject = subject.as_ref().to_owned();
139        info!("Setting up handler for subject: {}", subject);
140        let subject = subject.to_string();
141        let mut subscriber = self.subscribe(subject.clone()).await?;
142
143        let moved_subject = subject.clone();
144        let client_clone = self.clone();
145        let cancel_token = CancellationToken::new();
146        let cancel_token_child = cancel_token.clone();
147
148        let handle = tokio::spawn(async move {
149            info!("Started message processing loop for {}", moved_subject);
150            let stop_signal = cancel_token_child.cancelled();
151            tokio::select! {
152            _ = async {
153                while let Some(message) = subscriber.next().await {
154                debug!("Processing message from subject: {}", message.subject);
155                trace!("Message payload size: {}", message.payload.len());
156                match processor.process(Message(message.clone())).await {
157                    Ok(reply) => {
158                    debug!("Successfully processed message");
159                    if let Some(reply) = reply {
160                        debug!("Sending reply: {:?}", reply);
161                        if let Err(e) = client_clone.reply(reply).await {
162                            error!("Failed to reply to message: {}", e);
163                        }
164                    } else {
165                        debug!("No reply needed");
166                    }
167
168                    }
169                    Err(e) => {
170                    error!("Failed to process message: {}", e);
171                    if let Err(e) = client_clone
172                        .reply_err(ReplyErrorMessage(Box::new(e)), Message(message.clone()))
173                        .await
174                    {
175                        error!("Failed to reply to message: {}", e);
176                    }
177                    }
178                }
179                }
180            } => {},
181            _ = stop_signal => {
182                if let Err(e) = subscriber.unsubscribe().await {
183                error!("Failed to unsubscribe from {}: {}", moved_subject, e);
184                } else {
185                info!("Successfully unsubscribed from {}", moved_subject);
186                }
187            }
188            }
189        });
190
191        Ok(Handle {
192            cancel: cancel_token,
193            handle: Some(handle),
194        })
195    }
196
197    #[instrument(skip_all)]
199    pub async fn reply(&self, reply: ReplyMessage) -> Result<(), Error> {
200        debug!("Sending reply to: {}", reply.subject);
201        trace!("Reply payload size: {}", reply.payload.len());
202
203        match (reply.headers, reply.reply) {
204            (Some(headers), Some(reply_subject)) => {
205                self.client
206                    .publish_with_reply_and_headers(
207                        reply.subject.clone(),
208                        reply_subject,
209                        headers,
210                        reply.payload.clone(),
211                    )
212                    .await?;
213            }
214            (Some(headers), None) => {
215                self.client
216                    .publish_with_headers(reply.subject.clone(), headers, reply.payload.clone())
217                    .await?;
218            }
219            (None, Some(reply_subject)) => {
220                self.client
221                    .publish_with_reply(reply.subject.clone(), reply_subject, reply.payload.clone())
222                    .await?;
223            }
224            (None, None) => {
225                self.client
226                    .publish(reply.subject.clone(), reply.payload.clone())
227                    .await?;
228            }
229        }
230
231        debug!("Successfully sent reply to {}", reply.subject);
232        Ok(())
233    }
234    #[instrument(skip_all)]
236    async fn reply_err(&self, err: ReplyErrorMessage, msg_source: Message) -> Result<(), Error> {
237        trace!("Creating error reply message");
238        let reply = ReplyMessage {
239            subject: msg_source
240                .reply
241                .clone()
242                .unwrap_or_else(|| {
243                    eprint!("No reply subject");
244                    "".to_string().into()
245                })
246                .to_string(),
247            payload: err.0.to_string().into(),
248            headers: None,
249            reply: None,
250        };
251        self.reply(reply).await
252    }
253    #[instrument(skip_all)]
255    pub async fn handle_multiple<R: MessageProcessor + 'static>(
256        &self,
257        subjects: impl IntoIterator<Item = String>,
258        processor: R,
259    ) -> Result<MultipleHandle, Error> {
260        let mut merged = SelectAll::new();
261        for sub in subjects {
262            merged.push(self.subscribe(sub).await?);
263        }
264
265        let client_clone = self.clone();
266        let cancel_token = CancellationToken::new();
267        let cancel_token_child = cancel_token.clone();
268
269        let handle = tokio::spawn(async move {
270            info!("Started message processing loop for multiple subjects");
271            let stop_signal = cancel_token_child.cancelled();
272            tokio::select! {
273                _ = async {
274                    while let Some(message) = merged.next().await {
275                        debug!("Processing message from subject: {}", message.subject);
276                        trace!("Message payload size: {}", message.payload.len());
277                        match processor.process(Message(message.clone())).await {
278                            Ok(reply) => {
279                                debug!("Successfully processed message");
280                                if let Some(reply) = reply {
281                                    debug!("Sending reply: {:?}", reply);
282                                    if let Err(e) = client_clone.reply(reply).await {
283                                        error!("Failed to reply to message: {}", e);
284                                    }
285                                } else {
286                                    debug!("No reply needed");
287                                }
288                            }
289                            Err(e) => {
290                                error!("Failed to process message: {}", e);
291                                if let Err(e) = client_clone
292                                    .reply_err(ReplyErrorMessage(Box::new(e)), Message(message.clone()))
293                                    .await
294                                {
295                                    error!("Failed to reply to message: {}", e);
296                                }
297                            }
298                        }
299                    }
300                } => {},
301                _ = stop_signal => {
302                    info!("Cancellation requested for multiple subject handler");
303                   for mut sub in merged {
304                        if let Err(e) = sub.unsubscribe().await {
305                            error!("Failed to unsubscribe from subject: {}", e);
306                        } else {
307                            info!("Successfully unsubscribed from subject");
308                        }
309                    }
310                    info!("All subscriptions have been unsubscribed.");
311                }
312            }
313        });
314
315        Ok(MultipleHandle {
316            handle: Handle {
317                cancel: cancel_token,
318                handle: Some(handle),
319            },
320        })
321    }
322    #[instrument(skip_all)]
324    pub fn timeout(&self) -> Option<tokio::time::Duration> {
325        self.client.timeout()
326    }
327
328    #[instrument(skip_all)]
330    pub fn server_info(&self) -> async_nats::ServerInfo {
331        self.client.server_info()
332    }
333
334    #[instrument(skip_all)]
336    pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
337        self.client.is_server_compatible(major, minor, patch)
338    }
339
340    #[instrument(skip_all)]
342    pub async fn flush(&self) -> Result<(), Error> {
343        Ok(self.client.flush().await?)
344    }
345
346    #[instrument(skip_all)]
348    pub async fn drain(&self) -> Result<(), Error> {
349        self.client.drain().await.map_err(Into::into)
350    }
351
352    #[instrument(skip_all)]
354    pub fn connection_state(&self) -> async_nats::connection::State {
355        self.client.connection_state()
356    }
357
358    #[instrument(skip_all)]
360    pub async fn force_reconnect(&self) -> Result<(), Error> {
361        self.client.force_reconnect().await.map_err(Into::into)
362    }
363
364    #[instrument(skip_all)]
366    pub async fn queue_subscribe(
367        &self,
368        subject: impl AsRef<str>,
369        queue_group: impl AsRef<str>,
370    ) -> Result<Subscriber, Error> {
371        let subject = subject.as_ref().to_owned();
372        let queue_group = queue_group.as_ref().to_owned();
373        info!(
374            "Subscribing to subject: {} with queue group: {}",
375            subject, queue_group
376        );
377
378        trace!(
379            "Calling client.queue_subscribe with subject: {} and queue group: {}",
380            subject,
381            queue_group
382        );
383        let subscription = self
384            .client
385            .queue_subscribe(subject.clone(), queue_group.clone())
386            .await?;
387        debug!(
388            "Successfully subscribed to {} with queue group: {}",
389            subject, queue_group
390        );
391
392        Ok(subscription)
393    }
394    #[instrument(skip_all)]
396    pub fn statistics(&self) -> std::sync::Arc<async_nats::Statistics> {
397        self.client.statistics()
398    }
399
400    #[instrument(skip_all)]
402    pub fn new_inbox(&self) -> String {
403        self.client.new_inbox()
404    }
405}
406
407#[derive(Debug)]
409pub struct MultipleHandle {
410    handle: Handle,
411}
412
413impl MultipleHandle {
414    #[instrument(skip_all)]
416    pub async fn shutdown(self) -> Result<(), Error> {
417        self.handle.shutdown().await;
418
419        Ok(())
420    }
421    pub async fn abort(&mut self) {
422        self.handle.abort().await;
423    }
424}
425
426impl Handle {
427    #[instrument(skip_all)]
429    pub async fn shutdown(mut self) {
430        info!("Initiating shutdown for handle");
431        self.cancel.cancel();
432        if let Some(handle) = self.handle.take() {
433            if let Err(e) = handle.await {
434                error!("handle join error: {:?}", e);
435            }
436        }
437    }
438    pub async fn abort(&mut self) {
439        info!("Aborting handle");
440        self.cancel.cancel();
441        if let Some(handle) = self.handle.take() {
442            handle.abort();
443        }
444    }
445}
446
447#[derive(Clone, Debug, Serialize, Deserialize)]
448pub struct Message(async_nats::Message);
449
450impl std::ops::Deref for Message {
451    type Target = async_nats::Message;
452
453    fn deref(&self) -> &Self::Target {
454        &self.0
455    }
456}
457impl std::ops::DerefMut for Message {
458    fn deref_mut(&mut self) -> &mut Self::Target {
459        &mut self.0
460    }
461}
462impl Message {
463    pub fn reply(&self, payload: Bytes) -> ReplyMessage {
464        ReplyMessage {
465            subject: self.reply.clone().unwrap_or_else(|| "".into()).to_string(),
466            payload,
467            headers: None,
468            reply: None,
469        }
470    }
471}
472
473#[async_trait]
475pub trait MessageProcessor: Send + Sync {
476    type Error: std::error::Error + Send + Sync + 'static;
477    async fn process(&self, message: Message) -> Result<Option<ReplyMessage>, Self::Error>;
478}
479
480#[derive(Clone, Debug)]
482pub struct ReplyMessage {
483    pub reply: Option<String>,
484    pub subject: String,
485    pub payload: Bytes,
486    pub headers: Option<HeaderMap>,
487}
488impl ReplyMessage {
489    pub fn new(subject: String, payload: Bytes) -> Self {
490        Self {
491            reply: None,
492            subject,
493            payload,
494            headers: None,
495        }
496    }
497}
498struct ReplyErrorMessage(pub Box<dyn std::error::Error + Send + Sync>);
512
513pub fn reply(msg: &Message, payload: Bytes) -> ReplyMessage {
515    ReplyMessage {
516        subject: msg.reply.clone().unwrap_or_else(|| "".into()).to_string(),
517        payload,
518        headers: None,
519        reply: None,
520    }
521}