1pub mod error;
4#[cfg(feature = "jetstream")]
5pub mod jetstream;
6pub use async_nats::ConnectOptions;
7pub use async_nats::Request;
8use async_nats::{Client, HeaderMap, Subscriber};
9use async_trait::async_trait;
10use bytes::Bytes;
11pub use error::Error;
12use futures::{stream::SelectAll, StreamExt};
13use serde::{Deserialize, Serialize};
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, error, info, instrument, trace};
16
17#[derive(Debug)]
19pub struct Handle {
20 cancel: CancellationToken,
21 handle: Option<tokio::task::JoinHandle<()>>,
22}
23
24impl Drop for Handle {
25 fn drop(&mut self) {
26 self.cancel.cancel();
27 }
28}
29#[derive(Clone, Debug)]
31pub struct NatsClient {
32 client: Client,
33}
34
35impl NatsClient {
36 #[instrument(skip_all)]
38 pub async fn new(bind: &[&str]) -> Result<Self, Error> {
39 info!("Connecting to NATS server at {:?}", bind);
40 let client = ConnectOptions::new().connect(bind).await?;
41 info!("Successfully connected to NATS server");
42 Ok(Self { client })
43 }
44 #[instrument(skip_all)]
45 #[cfg(feature = "jetstream")]
47 pub fn jetstream(&self) -> jetstream::JetStream {
48 jetstream::JetStream::new(self.clone())
49 }
50
51 #[instrument(skip_all)]
53 pub async fn with_options(bind: &[&str], options: ConnectOptions) -> Result<Self, Error> {
54 info!("Connecting to NATS server at {:?}", bind);
55
56 let client = options.connect(bind).await?;
57
58 info!("Successfully connected to NATS server");
59 Ok(Self { client })
60 }
61 #[instrument(skip_all)]
63 pub async fn subscribe(&self, subject: impl AsRef<str>) -> Result<Subscriber, Error> {
64 let subject = subject.as_ref().to_owned();
65 info!("Subscribing to subject: {}", subject);
66 trace!("Calling client.subscribe with subject: {}", subject);
67 let subscription = self.client.subscribe(subject.to_owned()).await?;
68 debug!("Successfully subscribed to {}", subject);
69 Ok(subscription)
70 }
71 #[instrument(skip_all)]
73 pub async fn publish(&self, subject: impl AsRef<str>, payload: Bytes) -> Result<(), Error> {
74 let subject = subject.as_ref().to_owned();
75 debug!("Publishing message to subject: {}", subject);
76 trace!("Payload size: {}", payload.len());
77 self.client.publish(subject.to_owned(), payload).await?;
78 debug!("Successfully published to {}", subject);
79 Ok(())
80 }
81 #[instrument(skip_all)]
83 pub async fn request(
84 &self,
85 subject: impl AsRef<str>,
86 payload: Bytes,
87 ) -> Result<Message, Error> {
88 let subject = subject.as_ref().to_owned();
89 debug!("Sending request to subject: {}", subject);
90 trace!("Payload size: {}", payload.len());
91 let response = self.client.request(subject.to_owned(), payload).await?;
92 debug!("Received response from {}", subject);
93 trace!("Response payload size: {}", response.payload.len());
94 Ok(Message(response))
95 }
96 #[instrument(skip_all)]
98 pub async fn request_with_headers(
99 &self,
100 subject: impl AsRef<str>,
101 payload: Bytes,
102 headers: HeaderMap,
103 ) -> Result<Message, Error> {
104 let subject = subject.as_ref().to_owned();
105 debug!("Sending request to subject: {}", subject);
106 trace!("Payload size: {}", payload.len());
107 let response = self
108 .client
109 .request_with_headers(subject.clone(), headers, payload)
110 .await?;
111 debug!("Received response from {}", subject);
112 trace!("Response payload size: {}", response.payload.len());
113 Ok(Message(response))
114 }
115 #[instrument(skip_all)]
117 pub async fn send_request(
118 &self,
119 subject: impl AsRef<str>,
120 req: Request,
121 ) -> Result<Message, Error> {
122 let subject = subject.as_ref().to_owned();
123 debug!("Sending request to subject: {}", subject);
124
125 let response = self.client.send_request(subject.clone(), req).await?;
126
127 debug!("Received response from {}", subject);
128 Ok(Message(response))
129 }
130
131 #[instrument(skip_all)]
133 pub async fn handle<R: MessageProcessor + 'static>(
134 &self,
135 subject: impl AsRef<str>,
136 processor: R,
137 ) -> Result<Handle, Error> {
138 let subject = subject.as_ref().to_owned();
139 info!("Setting up handler for subject: {}", subject);
140 let subject = subject.to_string();
141 let mut subscriber = self.subscribe(subject.clone()).await?;
142
143 let moved_subject = subject.clone();
144 let client_clone = self.clone();
145 let cancel_token = CancellationToken::new();
146 let cancel_token_child = cancel_token.clone();
147
148 let handle = tokio::spawn(async move {
149 info!("Started message processing loop for {}", moved_subject);
150 let stop_signal = cancel_token_child.cancelled();
151 tokio::select! {
152 _ = async {
153 while let Some(message) = subscriber.next().await {
154 debug!("Processing message from subject: {}", message.subject);
155 trace!("Message payload size: {}", message.payload.len());
156 match processor.process(Message(message.clone())).await {
157 Ok(reply) => {
158 debug!("Successfully processed message");
159 if let Some(reply) = reply {
160 debug!("Sending reply: {:?}", reply);
161 if let Err(e) = client_clone.reply(reply).await {
162 error!("Failed to reply to message: {}", e);
163 }
164 } else {
165 debug!("No reply needed");
166 }
167
168 }
169 Err(e) => {
170 error!("Failed to process message: {}", e);
171 if let Err(e) = client_clone
172 .reply_err(ReplyErrorMessage(Box::new(e)), Message(message.clone()))
173 .await
174 {
175 error!("Failed to reply to message: {}", e);
176 }
177 }
178 }
179 }
180 } => {},
181 _ = stop_signal => {
182 if let Err(e) = subscriber.unsubscribe().await {
183 error!("Failed to unsubscribe from {}: {}", moved_subject, e);
184 } else {
185 info!("Successfully unsubscribed from {}", moved_subject);
186 }
187 }
188 }
189 });
190
191 Ok(Handle {
192 cancel: cancel_token,
193 handle: Some(handle),
194 })
195 }
196
197 #[instrument(skip_all)]
199 pub async fn reply(&self, reply: ReplyMessage) -> Result<(), Error> {
200 debug!("Sending reply to: {}", reply.subject);
201 trace!("Reply payload size: {}", reply.payload.len());
202
203 match (reply.headers, reply.reply) {
204 (Some(headers), Some(reply_subject)) => {
205 self.client
206 .publish_with_reply_and_headers(
207 reply.subject.clone(),
208 reply_subject,
209 headers,
210 reply.payload.clone(),
211 )
212 .await?;
213 }
214 (Some(headers), None) => {
215 self.client
216 .publish_with_headers(reply.subject.clone(), headers, reply.payload.clone())
217 .await?;
218 }
219 (None, Some(reply_subject)) => {
220 self.client
221 .publish_with_reply(reply.subject.clone(), reply_subject, reply.payload.clone())
222 .await?;
223 }
224 (None, None) => {
225 self.client
226 .publish(reply.subject.clone(), reply.payload.clone())
227 .await?;
228 }
229 }
230
231 debug!("Successfully sent reply to {}", reply.subject);
232 Ok(())
233 }
234 #[instrument(skip_all)]
236 async fn reply_err(&self, err: ReplyErrorMessage, msg_source: Message) -> Result<(), Error> {
237 trace!("Creating error reply message");
238 let reply = ReplyMessage {
239 subject: msg_source
240 .reply
241 .clone()
242 .unwrap_or_else(|| {
243 eprint!("No reply subject");
244 "".to_string().into()
245 })
246 .to_string(),
247 payload: err.0.to_string().into(),
248 headers: None,
249 reply: None,
250 };
251 self.reply(reply).await
252 }
253 #[instrument(skip_all)]
255 pub async fn handle_multiple<R: MessageProcessor + 'static>(
256 &self,
257 subjects: impl IntoIterator<Item = String>,
258 processor: R,
259 ) -> Result<MultipleHandle, Error> {
260 let mut merged = SelectAll::new();
261 for sub in subjects {
262 merged.push(self.subscribe(sub).await?);
263 }
264
265 let client_clone = self.clone();
266 let cancel_token = CancellationToken::new();
267 let cancel_token_child = cancel_token.clone();
268
269 let handle = tokio::spawn(async move {
270 info!("Started message processing loop for multiple subjects");
271 let stop_signal = cancel_token_child.cancelled();
272 tokio::select! {
273 _ = async {
274 while let Some(message) = merged.next().await {
275 debug!("Processing message from subject: {}", message.subject);
276 trace!("Message payload size: {}", message.payload.len());
277 match processor.process(Message(message.clone())).await {
278 Ok(reply) => {
279 debug!("Successfully processed message");
280 if let Some(reply) = reply {
281 debug!("Sending reply: {:?}", reply);
282 if let Err(e) = client_clone.reply(reply).await {
283 error!("Failed to reply to message: {}", e);
284 }
285 } else {
286 debug!("No reply needed");
287 }
288 }
289 Err(e) => {
290 error!("Failed to process message: {}", e);
291 if let Err(e) = client_clone
292 .reply_err(ReplyErrorMessage(Box::new(e)), Message(message.clone()))
293 .await
294 {
295 error!("Failed to reply to message: {}", e);
296 }
297 }
298 }
299 }
300 } => {},
301 _ = stop_signal => {
302 info!("Cancellation requested for multiple subject handler");
303 for mut sub in merged {
304 if let Err(e) = sub.unsubscribe().await {
305 error!("Failed to unsubscribe from subject: {}", e);
306 } else {
307 info!("Successfully unsubscribed from subject");
308 }
309 }
310 info!("All subscriptions have been unsubscribed.");
311 }
312 }
313 });
314
315 Ok(MultipleHandle {
316 handle: Handle {
317 cancel: cancel_token,
318 handle: Some(handle),
319 },
320 })
321 }
322 #[instrument(skip_all)]
324 pub fn timeout(&self) -> Option<tokio::time::Duration> {
325 self.client.timeout()
326 }
327
328 #[instrument(skip_all)]
330 pub fn server_info(&self) -> async_nats::ServerInfo {
331 self.client.server_info()
332 }
333
334 #[instrument(skip_all)]
336 pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
337 self.client.is_server_compatible(major, minor, patch)
338 }
339
340 #[instrument(skip_all)]
342 pub async fn flush(&self) -> Result<(), Error> {
343 Ok(self.client.flush().await?)
344 }
345
346 #[instrument(skip_all)]
348 pub async fn drain(&self) -> Result<(), Error> {
349 self.client.drain().await.map_err(Into::into)
350 }
351
352 #[instrument(skip_all)]
354 pub fn connection_state(&self) -> async_nats::connection::State {
355 self.client.connection_state()
356 }
357
358 #[instrument(skip_all)]
360 pub async fn force_reconnect(&self) -> Result<(), Error> {
361 self.client.force_reconnect().await.map_err(Into::into)
362 }
363
364 #[instrument(skip_all)]
366 pub async fn queue_subscribe(
367 &self,
368 subject: impl AsRef<str>,
369 queue_group: impl AsRef<str>,
370 ) -> Result<Subscriber, Error> {
371 let subject = subject.as_ref().to_owned();
372 let queue_group = queue_group.as_ref().to_owned();
373 info!(
374 "Subscribing to subject: {} with queue group: {}",
375 subject, queue_group
376 );
377
378 trace!(
379 "Calling client.queue_subscribe with subject: {} and queue group: {}",
380 subject,
381 queue_group
382 );
383 let subscription = self
384 .client
385 .queue_subscribe(subject.clone(), queue_group.clone())
386 .await?;
387 debug!(
388 "Successfully subscribed to {} with queue group: {}",
389 subject, queue_group
390 );
391
392 Ok(subscription)
393 }
394 #[instrument(skip_all)]
396 pub fn statistics(&self) -> std::sync::Arc<async_nats::Statistics> {
397 self.client.statistics()
398 }
399
400 #[instrument(skip_all)]
402 pub fn new_inbox(&self) -> String {
403 self.client.new_inbox()
404 }
405}
406
407#[derive(Debug)]
409pub struct MultipleHandle {
410 handle: Handle,
411}
412
413impl MultipleHandle {
414 #[instrument(skip_all)]
416 pub async fn shutdown(self) -> Result<(), Error> {
417 self.handle.shutdown().await;
418
419 Ok(())
420 }
421 pub async fn abort(&mut self) {
422 self.handle.abort().await;
423 }
424}
425
426impl Handle {
427 #[instrument(skip_all)]
429 pub async fn shutdown(mut self) {
430 info!("Initiating shutdown for handle");
431 self.cancel.cancel();
432 if let Some(handle) = self.handle.take() {
433 if let Err(e) = handle.await {
434 error!("handle join error: {:?}", e);
435 }
436 }
437 }
438 pub async fn abort(&mut self) {
439 info!("Aborting handle");
440 self.cancel.cancel();
441 if let Some(handle) = self.handle.take() {
442 handle.abort();
443 }
444 }
445}
446
447#[derive(Clone, Debug, Serialize, Deserialize)]
448pub struct Message(async_nats::Message);
449
450impl std::ops::Deref for Message {
451 type Target = async_nats::Message;
452
453 fn deref(&self) -> &Self::Target {
454 &self.0
455 }
456}
457impl std::ops::DerefMut for Message {
458 fn deref_mut(&mut self) -> &mut Self::Target {
459 &mut self.0
460 }
461}
462impl Message {
463 pub fn reply(&self, payload: Bytes) -> ReplyMessage {
464 ReplyMessage {
465 subject: self.reply.clone().unwrap_or_else(|| "".into()).to_string(),
466 payload,
467 headers: None,
468 reply: None,
469 }
470 }
471}
472
473#[async_trait]
475pub trait MessageProcessor: Send + Sync {
476 type Error: std::error::Error + Send + Sync + 'static;
477 async fn process(&self, message: Message) -> Result<Option<ReplyMessage>, Self::Error>;
478}
479
480#[derive(Clone, Debug)]
482pub struct ReplyMessage {
483 pub reply: Option<String>,
484 pub subject: String,
485 pub payload: Bytes,
486 pub headers: Option<HeaderMap>,
487}
488impl ReplyMessage {
489 pub fn new(subject: String, payload: Bytes) -> Self {
490 Self {
491 reply: None,
492 subject,
493 payload,
494 headers: None,
495 }
496 }
497}
498struct ReplyErrorMessage(pub Box<dyn std::error::Error + Send + Sync>);
512
513pub fn reply(msg: &Message, payload: Bytes) -> ReplyMessage {
515 ReplyMessage {
516 subject: msg.reply.clone().unwrap_or_else(|| "".into()).to_string(),
517 payload,
518 headers: None,
519 reply: None,
520 }
521}