async_nats/client.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::message::OutboundMessage;
20use crate::subject::ToSubject;
21use crate::ServerInfo;
22
23use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
24use crate::error::Error;
25use bytes::Bytes;
26use futures_util::future::TryFutureExt;
27use futures_util::{Sink, SinkExt as _, StreamExt};
28use portable_atomic::AtomicU64;
29use regex::Regex;
30use std::fmt::Display;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::sync::Arc;
33use std::sync::LazyLock;
34use std::time::Duration;
35use thiserror::Error;
36use tokio::sync::{mpsc, oneshot};
37use tokio_util::sync::PollSender;
38
39static VERSION_RE: LazyLock<Regex> =
40 LazyLock::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
41
42/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
43/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
44pub type PublishError = Error<PublishErrorKind>;
45
46impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
47 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
48 PublishError::with_source(PublishErrorKind::Send, err)
49 }
50}
51
52impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
53 fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
54 PublishError::with_source(PublishErrorKind::Send, err)
55 }
56}
57
58#[derive(Copy, Clone, Debug, PartialEq)]
59pub enum PublishErrorKind {
60 MaxPayloadExceeded,
61 InvalidSubject,
62 Send,
63}
64
65impl Display for PublishErrorKind {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match self {
68 PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
69 PublishErrorKind::Send => write!(f, "failed to send message"),
70 PublishErrorKind::InvalidSubject => write!(f, "invalid subject"),
71 }
72 }
73}
74
75/// Client is a `Cloneable` handle to NATS connection.
76/// Client should not be created directly. Instead, one of two methods can be used:
77/// [crate::connect] and [crate::ConnectOptions::connect]
78#[derive(Clone, Debug)]
79pub struct Client {
80 info: tokio::sync::watch::Receiver<ServerInfo>,
81 pub(crate) state: tokio::sync::watch::Receiver<State>,
82 pub(crate) sender: mpsc::Sender<Command>,
83 poll_sender: PollSender<Command>,
84 next_subscription_id: Arc<AtomicU64>,
85 subscription_capacity: usize,
86 inbox_prefix: Arc<str>,
87 request_timeout: Option<Duration>,
88 max_payload: Arc<AtomicUsize>,
89 connection_stats: Arc<Statistics>,
90 skip_subject_validation: bool,
91}
92
93pub mod traits {
94 use std::{future::Future, time::Duration};
95
96 use bytes::Bytes;
97
98 use crate::{message, subject::ToSubject, Message};
99
100 use super::{PublishError, Request, RequestError, SubscribeError};
101
102 pub trait Publisher {
103 fn publish_with_reply<S, R>(
104 &self,
105 subject: S,
106 reply: R,
107 payload: Bytes,
108 ) -> impl Future<Output = Result<(), PublishError>>
109 where
110 S: ToSubject,
111 R: ToSubject;
112
113 fn publish_message(
114 &self,
115 msg: message::OutboundMessage,
116 ) -> impl Future<Output = Result<(), PublishError>>;
117 }
118 pub trait Subscriber {
119 fn subscribe<S>(
120 &self,
121 subject: S,
122 ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>
123 where
124 S: ToSubject;
125 }
126 pub trait Requester {
127 fn send_request<S>(
128 &self,
129 subject: S,
130 request: Request,
131 ) -> impl Future<Output = Result<Message, RequestError>>
132 where
133 S: ToSubject;
134 }
135 pub trait TimeoutProvider {
136 fn timeout(&self) -> Option<Duration>;
137 }
138}
139
140impl traits::Requester for Client {
141 fn send_request<S: ToSubject>(
142 &self,
143 subject: S,
144 request: Request,
145 ) -> impl Future<Output = Result<Message, RequestError>> {
146 self.send_request(subject, request)
147 }
148}
149
150impl traits::TimeoutProvider for Client {
151 fn timeout(&self) -> Option<Duration> {
152 self.timeout()
153 }
154}
155
156impl traits::Publisher for Client {
157 fn publish_with_reply<S, R>(
158 &self,
159 subject: S,
160 reply: R,
161 payload: Bytes,
162 ) -> impl Future<Output = Result<(), PublishError>>
163 where
164 S: ToSubject,
165 R: ToSubject,
166 {
167 self.publish_with_reply(subject, reply, payload)
168 }
169
170 async fn publish_message(&self, msg: OutboundMessage) -> Result<(), PublishError> {
171 if msg.subject.is_empty()
172 || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&msg.subject))
173 {
174 return Err(PublishError::with_source(
175 PublishErrorKind::InvalidSubject,
176 crate::subject::SubjectError::InvalidFormat,
177 ));
178 }
179 self.sender
180 .send(Command::Publish(msg))
181 .await
182 .map_err(Into::into)
183 }
184}
185
186impl traits::Subscriber for Client {
187 fn subscribe<S>(&self, subject: S) -> impl Future<Output = Result<Subscriber, SubscribeError>>
188 where
189 S: ToSubject,
190 {
191 self.subscribe(subject)
192 }
193}
194
195impl Sink<OutboundMessage> for Client {
196 type Error = PublishError;
197
198 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199 self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
200 }
201
202 fn start_send(mut self: Pin<&mut Self>, msg: OutboundMessage) -> Result<(), Self::Error> {
203 self.poll_sender
204 .start_send_unpin(Command::Publish(msg))
205 .map_err(Into::into)
206 }
207
208 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
209 self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
210 }
211
212 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
213 self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
214 }
215}
216
217impl Client {
218 #[allow(clippy::too_many_arguments)]
219 pub(crate) fn new(
220 info: tokio::sync::watch::Receiver<ServerInfo>,
221 state: tokio::sync::watch::Receiver<State>,
222 sender: mpsc::Sender<Command>,
223 capacity: usize,
224 inbox_prefix: String,
225 request_timeout: Option<Duration>,
226 max_payload: Arc<AtomicUsize>,
227 statistics: Arc<Statistics>,
228 skip_subject_validation: bool,
229 ) -> Client {
230 let poll_sender = PollSender::new(sender.clone());
231 Client {
232 info,
233 state,
234 sender,
235 poll_sender,
236 next_subscription_id: Arc::new(AtomicU64::new(1)),
237 subscription_capacity: capacity,
238 inbox_prefix: inbox_prefix.into(),
239 request_timeout,
240 max_payload,
241 connection_stats: statistics,
242 skip_subject_validation,
243 }
244 }
245
246 /// Validates a subject for publishing (protocol-framing safety only).
247 /// Checks for empty and whitespace. Does not check dot structure.
248 pub(crate) fn maybe_validate_publish_subject<S: ToSubject>(
249 &self,
250 subject: S,
251 ) -> Result<crate::Subject, crate::subject::SubjectError> {
252 let subject = subject.to_subject();
253 if subject.is_empty()
254 || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&subject))
255 {
256 return Err(crate::subject::SubjectError::InvalidFormat);
257 }
258 Ok(subject)
259 }
260
261 /// Validates a subject for subscribing (protocol-framing + dot structure).
262 /// Always runs regardless of `skip_subject_validation` (matches Go/Java behavior).
263 pub(crate) fn validate_subscribe_subject<S: ToSubject>(
264 &self,
265 subject: S,
266 ) -> Result<crate::Subject, crate::subject::SubjectError> {
267 let subject = subject.to_subject();
268 if !subject.is_valid() {
269 return Err(crate::subject::SubjectError::InvalidFormat);
270 }
271 Ok(subject)
272 }
273
274 /// Returns the default timeout for requests set when creating the client.
275 ///
276 /// # Examples
277 /// ```no_run
278 /// # #[tokio::main]
279 /// # async fn main() -> Result<(), async_nats::Error> {
280 /// let client = async_nats::connect("demo.nats.io").await?;
281 /// println!("default request timeout: {:?}", client.timeout());
282 /// # Ok(())
283 /// # }
284 /// ```
285 pub fn timeout(&self) -> Option<Duration> {
286 self.request_timeout
287 }
288
289 /// Returns last received info from the server.
290 ///
291 /// # Examples
292 ///
293 /// ```no_run
294 /// # #[tokio::main]
295 /// # async fn main () -> Result<(), async_nats::Error> {
296 /// let client = async_nats::connect("demo.nats.io").await?;
297 /// println!("info: {:?}", client.server_info());
298 /// # Ok(())
299 /// # }
300 /// ```
301 pub fn server_info(&self) -> ServerInfo {
302 // We ignore notifying the watcher, as that requires mutable client reference.
303 self.info.borrow().to_owned()
304 }
305
306 /// Returns true if the server version is compatible with the version components.
307 ///
308 /// This has to be used with caution, as it is not guaranteed that the server
309 /// that client is connected to is the same version that the one that is
310 /// a JetStream meta/stream/consumer leader, especially across leafnodes.
311 ///
312 /// # Examples
313 ///
314 /// ```no_run
315 /// # #[tokio::main]
316 /// # async fn main() -> Result<(), async_nats::Error> {
317 /// let client = async_nats::connect("demo.nats.io").await?;
318 /// assert!(client.is_server_compatible(2, 8, 4));
319 /// # Ok(())
320 /// # }
321 /// ```
322 pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
323 let info = self.server_info();
324
325 let server_version_captures = match VERSION_RE.captures(&info.version) {
326 Some(captures) => captures,
327 None => return false,
328 };
329
330 let server_major = server_version_captures
331 .get(1)
332 .map(|m| m.as_str().parse::<i64>().unwrap())
333 .unwrap();
334
335 let server_minor = server_version_captures
336 .get(2)
337 .map(|m| m.as_str().parse::<i64>().unwrap())
338 .unwrap();
339
340 let server_patch = server_version_captures
341 .get(3)
342 .map(|m| m.as_str().parse::<i64>().unwrap())
343 .unwrap();
344
345 if server_major < major
346 || (server_major == major && server_minor < minor)
347 || (server_major == major && server_minor == minor && server_patch < patch)
348 {
349 return false;
350 }
351 true
352 }
353
354 /// Publish a [Message] to a given subject.
355 ///
356 /// Returns `PublishErrorKind::InvalidSubject` if the subject is invalid
357 /// (empty or contains whitespace). This check can be disabled with
358 /// [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation] (empty subjects are
359 /// always rejected).
360 ///
361 /// # Examples
362 /// ```no_run
363 /// # #[tokio::main]
364 /// # async fn main() -> Result<(), async_nats::Error> {
365 /// let client = async_nats::connect("demo.nats.io").await?;
366 /// client.publish("events.data", "payload".into()).await?;
367 /// # Ok(())
368 /// # }
369 /// ```
370 pub async fn publish<S: ToSubject>(
371 &self,
372 subject: S,
373 payload: Bytes,
374 ) -> Result<(), PublishError> {
375 let subject = self
376 .maybe_validate_publish_subject(subject)
377 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
378
379 let max_payload = self.max_payload.load(Ordering::Relaxed);
380 if payload.len() > max_payload {
381 return Err(PublishError::with_source(
382 PublishErrorKind::MaxPayloadExceeded,
383 format!(
384 "Payload size limit of {} exceeded by message size of {}",
385 max_payload,
386 payload.len(),
387 ),
388 ));
389 }
390
391 self.sender
392 .send(Command::Publish(OutboundMessage {
393 subject,
394 payload,
395 reply: None,
396 headers: None,
397 }))
398 .await?;
399 Ok(())
400 }
401
402 /// Publish a [Message] with headers to a given subject.
403 ///
404 /// # Examples
405 /// ```
406 /// # #[tokio::main]
407 /// # async fn main() -> Result<(), async_nats::Error> {
408 /// use std::str::FromStr;
409 /// let client = async_nats::connect("demo.nats.io").await?;
410 /// let mut headers = async_nats::HeaderMap::new();
411 /// headers.insert(
412 /// "X-Header",
413 /// async_nats::HeaderValue::from_str("Value").unwrap(),
414 /// );
415 /// client
416 /// .publish_with_headers("events.data", headers, "payload".into())
417 /// .await?;
418 /// # Ok(())
419 /// # }
420 /// ```
421 pub async fn publish_with_headers<S: ToSubject>(
422 &self,
423 subject: S,
424 headers: HeaderMap,
425 payload: Bytes,
426 ) -> Result<(), PublishError> {
427 let subject = self
428 .maybe_validate_publish_subject(subject)
429 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
430
431 self.sender
432 .send(Command::Publish(OutboundMessage {
433 subject,
434 payload,
435 reply: None,
436 headers: Some(headers),
437 }))
438 .await?;
439 Ok(())
440 }
441
442 /// Publish a [Message] to a given subject, with specified response subject
443 /// to which the subscriber can respond.
444 /// This method does not await for the response.
445 ///
446 /// # Examples
447 ///
448 /// ```no_run
449 /// # #[tokio::main]
450 /// # async fn main() -> Result<(), async_nats::Error> {
451 /// let client = async_nats::connect("demo.nats.io").await?;
452 /// client
453 /// .publish_with_reply("events.data", "reply_subject", "payload".into())
454 /// .await?;
455 /// # Ok(())
456 /// # }
457 /// ```
458 pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
459 &self,
460 subject: S,
461 reply: R,
462 payload: Bytes,
463 ) -> Result<(), PublishError> {
464 let subject = self
465 .maybe_validate_publish_subject(subject)
466 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
467 let reply = self
468 .maybe_validate_publish_subject(reply)
469 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
470
471 self.sender
472 .send(Command::Publish(OutboundMessage {
473 subject,
474 payload,
475 reply: Some(reply),
476 headers: None,
477 }))
478 .await?;
479 Ok(())
480 }
481
482 /// Publish a [Message] to a given subject with headers and specified response subject
483 /// to which the subscriber can respond.
484 /// This method does not await for the response.
485 ///
486 /// # Examples
487 ///
488 /// ```no_run
489 /// # #[tokio::main]
490 /// # async fn main() -> Result<(), async_nats::Error> {
491 /// use std::str::FromStr;
492 /// let client = async_nats::connect("demo.nats.io").await?;
493 /// let mut headers = async_nats::HeaderMap::new();
494 /// client
495 /// .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
496 /// .await?;
497 /// # Ok(())
498 /// # }
499 /// ```
500 pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
501 &self,
502 subject: S,
503 reply: R,
504 headers: HeaderMap,
505 payload: Bytes,
506 ) -> Result<(), PublishError> {
507 let subject = self
508 .maybe_validate_publish_subject(subject)
509 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
510 let reply = self
511 .maybe_validate_publish_subject(reply)
512 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
513
514 self.sender
515 .send(Command::Publish(OutboundMessage {
516 subject,
517 payload,
518 reply: Some(reply),
519 headers: Some(headers),
520 }))
521 .await?;
522 Ok(())
523 }
524
525 /// Sends the request with headers.
526 ///
527 /// # Examples
528 /// ```no_run
529 /// # #[tokio::main]
530 /// # async fn main() -> Result<(), async_nats::Error> {
531 /// let client = async_nats::connect("demo.nats.io").await?;
532 /// let response = client.request("service", "data".into()).await?;
533 /// # Ok(())
534 /// # }
535 /// ```
536 pub async fn request<S: ToSubject>(
537 &self,
538 subject: S,
539 payload: Bytes,
540 ) -> Result<Message, RequestError> {
541 let request = Request::new().payload(payload);
542 self.send_request(subject, request).await
543 }
544
545 /// Sends the request with headers.
546 ///
547 /// # Examples
548 /// ```no_run
549 /// # #[tokio::main]
550 /// # async fn main() -> Result<(), async_nats::Error> {
551 /// let client = async_nats::connect("demo.nats.io").await?;
552 /// let mut headers = async_nats::HeaderMap::new();
553 /// headers.insert("Key", "Value");
554 /// let response = client
555 /// .request_with_headers("service", headers, "data".into())
556 /// .await?;
557 /// # Ok(())
558 /// # }
559 /// ```
560 pub async fn request_with_headers<S: ToSubject>(
561 &self,
562 subject: S,
563 headers: HeaderMap,
564 payload: Bytes,
565 ) -> Result<Message, RequestError> {
566 let request = Request::new().headers(headers).payload(payload);
567 self.send_request(subject, request).await
568 }
569
570 /// Sends the request created by the [Request].
571 ///
572 /// Returns `RequestErrorKind::InvalidSubject` if the subject is invalid
573 /// (empty or contains whitespace).
574 ///
575 /// # Examples
576 ///
577 /// ```no_run
578 /// # #[tokio::main]
579 /// # async fn main() -> Result<(), async_nats::Error> {
580 /// let client = async_nats::connect("demo.nats.io").await?;
581 /// let request = async_nats::Request::new().payload("data".into());
582 /// let response = client.send_request("service", request).await?;
583 /// # Ok(())
584 /// # }
585 /// ```
586 pub async fn send_request<S: ToSubject>(
587 &self,
588 subject: S,
589 request: Request,
590 ) -> Result<Message, RequestError> {
591 let subject = self
592 .maybe_validate_publish_subject(subject)
593 .map_err(|e| RequestError::with_source(RequestErrorKind::InvalidSubject, e))?;
594
595 if let Some(inbox) = request.inbox {
596 let timeout = request.timeout.unwrap_or(self.request_timeout);
597 let mut subscriber = self.subscribe(inbox.clone()).await?;
598 let payload: Bytes = request.payload.unwrap_or_default();
599 match request.headers {
600 Some(headers) => {
601 self.publish_with_reply_and_headers(subject, inbox, headers, payload)
602 .await?
603 }
604 None => self.publish_with_reply(subject, inbox, payload).await?,
605 }
606 let request = match timeout {
607 Some(timeout) => {
608 tokio::time::timeout(timeout, subscriber.next())
609 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
610 .await?
611 }
612 None => subscriber.next().await,
613 };
614 match request {
615 Some(message) => {
616 if message.status == Some(StatusCode::NO_RESPONDERS) {
617 return Err(RequestError::with_source(
618 RequestErrorKind::NoResponders,
619 "no responders",
620 ));
621 }
622 Ok(message)
623 }
624 None => Err(RequestError::with_source(
625 RequestErrorKind::Other,
626 "broken pipe",
627 )),
628 }
629 } else {
630 let (sender, receiver) = oneshot::channel();
631
632 let payload = request.payload.unwrap_or_default();
633 let respond = self.new_inbox().into();
634 let headers = request.headers;
635
636 self.sender
637 .send(Command::Request {
638 subject,
639 payload,
640 respond,
641 headers,
642 sender,
643 })
644 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
645 .await?;
646
647 let timeout = request.timeout.unwrap_or(self.request_timeout);
648 let request = match timeout {
649 Some(timeout) => {
650 tokio::time::timeout(timeout, receiver)
651 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
652 .await?
653 }
654 None => receiver.await,
655 };
656
657 match request {
658 Ok(message) => {
659 if message.status == Some(StatusCode::NO_RESPONDERS) {
660 return Err(RequestError::with_source(
661 RequestErrorKind::NoResponders,
662 "no responders",
663 ));
664 }
665 Ok(message)
666 }
667 Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
668 }
669 }
670 }
671
672 /// Create a new globally unique inbox which can be used for replies.
673 ///
674 /// # Examples
675 ///
676 /// ```no_run
677 /// # #[tokio::main]
678 /// # async fn main() -> Result<(), async_nats::Error> {
679 /// # let mut nc = async_nats::connect("demo.nats.io").await?;
680 /// let reply = nc.new_inbox();
681 /// let rsub = nc.subscribe(reply).await?;
682 /// # Ok(())
683 /// # }
684 /// ```
685 pub fn new_inbox(&self) -> String {
686 format!("{}.{}", self.inbox_prefix, crate::id_generator::next())
687 }
688
689 /// Subscribes to a subject to receive [messages][Message].
690 ///
691 /// Returns an error if the subject is invalid (empty, contains whitespace,
692 /// or has malformed dot structure). This validation always runs regardless
693 /// of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
694 ///
695 /// # Examples
696 ///
697 /// ```no_run
698 /// # #[tokio::main]
699 /// # async fn main() -> Result<(), async_nats::Error> {
700 /// use futures_util::StreamExt;
701 /// let client = async_nats::connect("demo.nats.io").await?;
702 /// let mut subscription = client.subscribe("events.>").await?;
703 /// while let Some(message) = subscription.next().await {
704 /// println!("received message: {:?}", message);
705 /// }
706 /// # Ok(())
707 /// # }
708 /// ```
709 pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
710 let subject = self
711 .validate_subscribe_subject(subject)
712 .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
713
714 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
715 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
716
717 self.sender
718 .send(Command::Subscribe {
719 sid,
720 subject,
721 queue_group: None,
722 sender,
723 })
724 .await?;
725
726 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
727 }
728
729 /// Subscribes to a subject with a queue group to receive [messages][Message].
730 ///
731 /// Returns an error if the subject is invalid (empty, contains whitespace,
732 /// or has malformed dot structure) or if the queue group name is invalid
733 /// (empty or contains whitespace). Subject and queue group validation always
734 /// runs regardless of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
735 ///
736 /// # Examples
737 ///
738 /// ```no_run
739 /// # #[tokio::main]
740 /// # async fn main() -> Result<(), async_nats::Error> {
741 /// use futures_util::StreamExt;
742 /// let client = async_nats::connect("demo.nats.io").await?;
743 /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
744 /// while let Some(message) = subscription.next().await {
745 /// println!("received message: {:?}", message);
746 /// }
747 /// # Ok(())
748 /// # }
749 /// ```
750 pub async fn queue_subscribe<S: ToSubject>(
751 &self,
752 subject: S,
753 queue_group: String,
754 ) -> Result<Subscriber, SubscribeError> {
755 let subject = self
756 .validate_subscribe_subject(subject)
757 .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
758
759 if !crate::is_valid_queue_group(&queue_group) {
760 return Err(SubscribeError::new(SubscribeErrorKind::InvalidQueueName));
761 }
762
763 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
764 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
765
766 self.sender
767 .send(Command::Subscribe {
768 sid,
769 subject,
770 queue_group: Some(queue_group),
771 sender,
772 })
773 .await?;
774
775 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
776 }
777
778 /// Flushes the internal buffer ensuring that all messages are sent.
779 ///
780 /// # Examples
781 ///
782 /// ```no_run
783 /// # #[tokio::main]
784 /// # async fn main() -> Result<(), async_nats::Error> {
785 /// let client = async_nats::connect("demo.nats.io").await?;
786 /// client.flush().await?;
787 /// # Ok(())
788 /// # }
789 /// ```
790 pub async fn flush(&self) -> Result<(), FlushError> {
791 let (tx, rx) = tokio::sync::oneshot::channel();
792 self.sender
793 .send(Command::Flush { observer: tx })
794 .await
795 .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
796
797 rx.await
798 .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
799 Ok(())
800 }
801
802 /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
803 /// messages, then closes the connection. Once completed, any streams associated with the
804 /// connection and its [Clients](crate::Client) will be closed, and further [crate::Client] commands will fail.
805 ///
806 /// # Examples
807 ///
808 /// ```no_run
809 /// # #[tokio::main]
810 /// # async fn main() -> Result<(), async_nats::Error> {
811 /// use futures_util::StreamExt;
812 /// let client = async_nats::connect("demo.nats.io").await?;
813 /// let mut subscription = client.subscribe("events.>").await?;
814 ///
815 /// client.drain().await?;
816 ///
817 /// # // existing subscriptions are closed and further commands will fail
818 /// assert!(subscription.next().await.is_none());
819 /// client
820 /// .subscribe("events.>")
821 /// .await
822 /// .expect_err("Expected further commands to fail");
823 ///
824 /// # Ok(())
825 /// # }
826 /// ```
827 pub async fn drain(&self) -> Result<(), DrainError> {
828 // Drain all subscriptions
829 self.sender.send(Command::Drain { sid: None }).await?;
830
831 // Remaining process is handled on the handler-side
832 Ok(())
833 }
834
835 /// Returns the current state of the connection.
836 ///
837 /// # Examples
838 /// ```no_run
839 /// # #[tokio::main]
840 /// # async fn main() -> Result<(), async_nats::Error> {
841 /// let client = async_nats::connect("demo.nats.io").await?;
842 /// println!("connection state: {}", client.connection_state());
843 /// # Ok(())
844 /// # }
845 /// ```
846 pub fn connection_state(&self) -> State {
847 self.state.borrow().to_owned()
848 }
849
850 /// Forces the client to reconnect.
851 /// Keep in mind that client will reconnect automatically if the connection is lost and this
852 /// method does not have to be used in normal circumstances.
853 /// However, if you want to force the client to reconnect, for example to re-trigger
854 /// the `auth-callback`, or manually rebalance connections, this method can be useful.
855 /// This method does not wait for connection to be re-established.
856 ///
857 /// # Examples
858 /// ```no_run
859 /// # #[tokio::main]
860 /// # async fn main() -> Result<(), async_nats::Error> {
861 /// let client = async_nats::connect("demo.nats.io").await?;
862 /// client.force_reconnect().await?;
863 /// # Ok(())
864 /// # }
865 /// ```
866 pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
867 self.sender
868 .send(Command::Reconnect)
869 .await
870 .map_err(Into::into)
871 }
872
873 /// Returns struct representing statistics of the whole lifecycle of the client.
874 /// This includes number of bytes sent/received, number of messages sent/received,
875 /// and number of times the connection was established.
876 /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
877 /// across threads.
878 ///
879 /// # Examples
880 /// ```no_run
881 /// # #[tokio::main]
882 /// # async fn main() -> Result<(), async_nats::Error> {
883 /// let client = async_nats::connect("demo.nats.io").await?;
884 /// let statistics = client.statistics();
885 /// println!("client statistics: {:#?}", statistics);
886 /// # Ok(())
887 /// # }
888 /// ```
889 pub fn statistics(&self) -> Arc<Statistics> {
890 self.connection_stats.clone()
891 }
892}
893
894/// Used for building customized requests.
895#[derive(Default)]
896pub struct Request {
897 pub payload: Option<Bytes>,
898 pub headers: Option<HeaderMap>,
899 pub timeout: Option<Option<Duration>>,
900 pub inbox: Option<String>,
901}
902
903impl Request {
904 pub fn new() -> Request {
905 Default::default()
906 }
907
908 /// Sets the payload of the request. If not used, empty payload will be sent.
909 ///
910 /// # Examples
911 /// ```no_run
912 /// # #[tokio::main]
913 /// # async fn main() -> Result<(), async_nats::Error> {
914 /// let client = async_nats::connect("demo.nats.io").await?;
915 /// let request = async_nats::Request::new().payload("data".into());
916 /// client.send_request("service", request).await?;
917 /// # Ok(())
918 /// # }
919 /// ```
920 pub fn payload(mut self, payload: Bytes) -> Request {
921 self.payload = Some(payload);
922 self
923 }
924
925 /// Sets the headers of the requests.
926 ///
927 /// # Examples
928 /// ```no_run
929 /// # #[tokio::main]
930 /// # async fn main() -> Result<(), async_nats::Error> {
931 /// use std::str::FromStr;
932 /// let client = async_nats::connect("demo.nats.io").await?;
933 /// let mut headers = async_nats::HeaderMap::new();
934 /// headers.insert(
935 /// "X-Example",
936 /// async_nats::HeaderValue::from_str("Value").unwrap(),
937 /// );
938 /// let request = async_nats::Request::new()
939 /// .headers(headers)
940 /// .payload("data".into());
941 /// client.send_request("service", request).await?;
942 /// # Ok(())
943 /// # }
944 /// ```
945 pub fn headers(mut self, headers: HeaderMap) -> Request {
946 self.headers = Some(headers);
947 self
948 }
949
950 /// Sets the custom timeout of the request. Overrides default [Client] timeout.
951 /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
952 /// To use default timeout, simply do not call this function.
953 ///
954 /// # Examples
955 /// ```no_run
956 /// # #[tokio::main]
957 /// # async fn main() -> Result<(), async_nats::Error> {
958 /// let client = async_nats::connect("demo.nats.io").await?;
959 /// let request = async_nats::Request::new()
960 /// .timeout(Some(std::time::Duration::from_secs(15)))
961 /// .payload("data".into());
962 /// client.send_request("service", request).await?;
963 /// # Ok(())
964 /// # }
965 /// ```
966 pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
967 self.timeout = Some(timeout);
968 self
969 }
970
971 /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
972 ///
973 /// # Examples
974 /// ```no_run
975 /// # #[tokio::main]
976 /// # async fn main() -> Result<(), async_nats::Error> {
977 /// use std::str::FromStr;
978 /// let client = async_nats::connect("demo.nats.io").await?;
979 /// let request = async_nats::Request::new()
980 /// .inbox("custom_inbox".into())
981 /// .payload("data".into());
982 /// client.send_request("service", request).await?;
983 /// # Ok(())
984 /// # }
985 /// ```
986 pub fn inbox(mut self, inbox: String) -> Request {
987 self.inbox = Some(inbox);
988 self
989 }
990}
991
992#[derive(Error, Debug)]
993#[error("failed to send reconnect: {0}")]
994pub struct ReconnectError(#[source] crate::Error);
995
996impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
997 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
998 ReconnectError(Box::new(err))
999 }
1000}
1001
1002/// An error returned from the [`Client::subscribe`] or [`Client::queue_subscribe`] functions.
1003pub type SubscribeError = Error<SubscribeErrorKind>;
1004
1005impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
1006 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1007 SubscribeError::with_source(SubscribeErrorKind::Other, err)
1008 }
1009}
1010
1011#[derive(Copy, Clone, Debug, PartialEq)]
1012pub enum SubscribeErrorKind {
1013 /// The subject is invalid (empty, contains whitespace, or has malformed dot structure).
1014 InvalidSubject,
1015 /// The queue group name is invalid (empty or contains whitespace).
1016 InvalidQueueName,
1017 /// Other errors, client/io related.
1018 Other,
1019}
1020
1021impl Display for SubscribeErrorKind {
1022 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1023 match self {
1024 Self::InvalidSubject => write!(f, "invalid subject"),
1025 Self::InvalidQueueName => write!(f, "invalid queue name"),
1026 Self::Other => write!(f, "subscribe failed"),
1027 }
1028 }
1029}
1030
1031#[derive(Error, Debug)]
1032#[error("failed to send drain: {0}")]
1033pub struct DrainError(#[source] crate::Error);
1034
1035impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
1036 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1037 DrainError(Box::new(err))
1038 }
1039}
1040
1041#[derive(Clone, Copy, Debug, PartialEq)]
1042pub enum RequestErrorKind {
1043 /// There are services listening on requested subject, but they didn't respond
1044 /// in time.
1045 TimedOut,
1046 /// No one is listening on request subject.
1047 NoResponders,
1048 /// The subject is invalid (empty or contains whitespace).
1049 InvalidSubject,
1050 /// Other errors, client/io related.
1051 Other,
1052}
1053
1054impl Display for RequestErrorKind {
1055 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1056 match self {
1057 Self::TimedOut => write!(f, "request timed out"),
1058 Self::NoResponders => write!(f, "no responders"),
1059 Self::InvalidSubject => write!(f, "invalid subject"),
1060 Self::Other => write!(f, "request failed"),
1061 }
1062 }
1063}
1064
1065/// Error returned when a core NATS request fails.
1066/// To be enumerate over the variants, call [RequestError::kind].
1067pub type RequestError = Error<RequestErrorKind>;
1068
1069impl From<PublishError> for RequestError {
1070 fn from(e: PublishError) -> Self {
1071 RequestError::with_source(RequestErrorKind::Other, e)
1072 }
1073}
1074
1075impl From<SubscribeError> for RequestError {
1076 fn from(e: SubscribeError) -> Self {
1077 RequestError::with_source(RequestErrorKind::Other, e)
1078 }
1079}
1080
1081#[derive(Clone, Copy, Debug, PartialEq)]
1082pub enum FlushErrorKind {
1083 /// Sending the flush failed client side.
1084 SendError,
1085 /// Flush failed.
1086 /// This can happen mostly in case of connection issues
1087 /// that cannot be resolved quickly.
1088 FlushError,
1089}
1090
1091impl Display for FlushErrorKind {
1092 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1093 match self {
1094 Self::SendError => write!(f, "failed to send flush request"),
1095 Self::FlushError => write!(f, "flush failed"),
1096 }
1097 }
1098}
1099
1100pub type FlushError = Error<FlushErrorKind>;
1101
1102/// Represents statistics for the instance of the client throughout its lifecycle.
1103#[derive(Default, Debug)]
1104pub struct Statistics {
1105 /// Number of bytes received. This does not include the protocol overhead.
1106 pub in_bytes: AtomicU64,
1107 /// Number of bytes sent. This doe not include the protocol overhead.
1108 pub out_bytes: AtomicU64,
1109 /// Number of messages received.
1110 pub in_messages: AtomicU64,
1111 /// Number of messages sent.
1112 pub out_messages: AtomicU64,
1113 /// Number of times connection was established.
1114 /// Initial connect will be counted as well, then all successful reconnects.
1115 pub connects: AtomicU64,
1116}