async_nats/client.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::message::OutboundMessage;
20use crate::subject::ToSubject;
21use crate::ServerInfo;
22
23use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
24use crate::error::Error;
25use bytes::Bytes;
26use futures_util::future::TryFutureExt;
27use futures_util::{Sink, SinkExt as _, StreamExt};
28use portable_atomic::AtomicU64;
29use regex::Regex;
30use std::fmt::Display;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::sync::Arc;
33use std::sync::LazyLock;
34use std::time::Duration;
35use thiserror::Error;
36use tokio::sync::{mpsc, oneshot};
37use tokio_util::sync::PollSender;
38
39static VERSION_RE: LazyLock<Regex> =
40 LazyLock::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
41
42/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
43/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
44pub type PublishError = Error<PublishErrorKind>;
45
46impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
47 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
48 PublishError::with_source(PublishErrorKind::Send, err)
49 }
50}
51
52impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
53 fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
54 PublishError::with_source(PublishErrorKind::Send, err)
55 }
56}
57
58#[derive(Copy, Clone, Debug, PartialEq)]
59pub enum PublishErrorKind {
60 MaxPayloadExceeded,
61 InvalidSubject,
62 Send,
63}
64
65impl Display for PublishErrorKind {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match self {
68 PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
69 PublishErrorKind::Send => write!(f, "failed to send message"),
70 PublishErrorKind::InvalidSubject => write!(f, "invalid subject"),
71 }
72 }
73}
74
75/// Client is a `Cloneable` handle to NATS connection.
76/// Client should not be created directly. Instead, one of two methods can be used:
77/// [crate::connect] and [crate::ConnectOptions::connect]
78#[derive(Clone, Debug)]
79pub struct Client {
80 info: tokio::sync::watch::Receiver<Option<ServerInfo>>,
81 pub(crate) state: tokio::sync::watch::Receiver<State>,
82 pub(crate) sender: mpsc::Sender<Command>,
83 poll_sender: PollSender<Command>,
84 next_subscription_id: Arc<AtomicU64>,
85 subscription_capacity: usize,
86 inbox_prefix: Arc<str>,
87 request_timeout: Option<Duration>,
88 max_payload: Arc<AtomicUsize>,
89 connection_stats: Arc<Statistics>,
90 skip_subject_validation: bool,
91}
92
93pub mod traits {
94 use std::{future::Future, time::Duration};
95
96 use bytes::Bytes;
97
98 use crate::{message, subject::ToSubject, Message};
99
100 use super::{PublishError, Request, RequestError, SubscribeError};
101
102 pub trait Publisher {
103 fn publish_with_reply<S, R>(
104 &self,
105 subject: S,
106 reply: R,
107 payload: Bytes,
108 ) -> impl Future<Output = Result<(), PublishError>>
109 where
110 S: ToSubject,
111 R: ToSubject;
112
113 fn publish_message(
114 &self,
115 msg: message::OutboundMessage,
116 ) -> impl Future<Output = Result<(), PublishError>>;
117 }
118 pub trait Subscriber {
119 fn subscribe<S>(
120 &self,
121 subject: S,
122 ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>
123 where
124 S: ToSubject;
125 }
126 pub trait Requester {
127 fn send_request<S>(
128 &self,
129 subject: S,
130 request: Request,
131 ) -> impl Future<Output = Result<Message, RequestError>>
132 where
133 S: ToSubject;
134 }
135 pub trait TimeoutProvider {
136 fn timeout(&self) -> Option<Duration>;
137 }
138}
139
140impl traits::Requester for Client {
141 fn send_request<S: ToSubject>(
142 &self,
143 subject: S,
144 request: Request,
145 ) -> impl Future<Output = Result<Message, RequestError>> {
146 self.send_request(subject, request)
147 }
148}
149
150impl traits::TimeoutProvider for Client {
151 fn timeout(&self) -> Option<Duration> {
152 self.timeout()
153 }
154}
155
156impl traits::Publisher for Client {
157 fn publish_with_reply<S, R>(
158 &self,
159 subject: S,
160 reply: R,
161 payload: Bytes,
162 ) -> impl Future<Output = Result<(), PublishError>>
163 where
164 S: ToSubject,
165 R: ToSubject,
166 {
167 self.publish_with_reply(subject, reply, payload)
168 }
169
170 async fn publish_message(&self, msg: OutboundMessage) -> Result<(), PublishError> {
171 if msg.subject.is_empty()
172 || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&msg.subject))
173 {
174 return Err(PublishError::with_source(
175 PublishErrorKind::InvalidSubject,
176 crate::subject::SubjectError::InvalidFormat,
177 ));
178 }
179 self.sender
180 .send(Command::Publish(msg))
181 .await
182 .map_err(Into::into)
183 }
184}
185
186impl traits::Subscriber for Client {
187 fn subscribe<S>(&self, subject: S) -> impl Future<Output = Result<Subscriber, SubscribeError>>
188 where
189 S: ToSubject,
190 {
191 self.subscribe(subject)
192 }
193}
194
195impl Sink<OutboundMessage> for Client {
196 type Error = PublishError;
197
198 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199 self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
200 }
201
202 fn start_send(mut self: Pin<&mut Self>, msg: OutboundMessage) -> Result<(), Self::Error> {
203 self.poll_sender
204 .start_send_unpin(Command::Publish(msg))
205 .map_err(Into::into)
206 }
207
208 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
209 self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
210 }
211
212 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
213 self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
214 }
215}
216
217impl Client {
218 #[allow(clippy::too_many_arguments)]
219 pub(crate) fn new(
220 info: tokio::sync::watch::Receiver<Option<ServerInfo>>,
221 state: tokio::sync::watch::Receiver<State>,
222 sender: mpsc::Sender<Command>,
223 capacity: usize,
224 inbox_prefix: String,
225 request_timeout: Option<Duration>,
226 max_payload: Arc<AtomicUsize>,
227 statistics: Arc<Statistics>,
228 skip_subject_validation: bool,
229 ) -> Client {
230 let poll_sender = PollSender::new(sender.clone());
231 Client {
232 info,
233 state,
234 sender,
235 poll_sender,
236 next_subscription_id: Arc::new(AtomicU64::new(1)),
237 subscription_capacity: capacity,
238 inbox_prefix: inbox_prefix.into(),
239 request_timeout,
240 max_payload,
241 connection_stats: statistics,
242 skip_subject_validation,
243 }
244 }
245
246 /// Validates a subject for publishing (protocol-framing safety only).
247 /// Checks for empty and whitespace. Does not check dot structure.
248 pub(crate) fn maybe_validate_publish_subject<S: ToSubject>(
249 &self,
250 subject: S,
251 ) -> Result<crate::Subject, crate::subject::SubjectError> {
252 let subject = subject.to_subject();
253 if subject.is_empty()
254 || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&subject))
255 {
256 return Err(crate::subject::SubjectError::InvalidFormat);
257 }
258 Ok(subject)
259 }
260
261 /// Validates a subject for subscribing (protocol-framing + dot structure).
262 /// Always runs regardless of `skip_subject_validation` (matches Go/Java behavior).
263 pub(crate) fn validate_subscribe_subject<S: ToSubject>(
264 &self,
265 subject: S,
266 ) -> Result<crate::Subject, crate::subject::SubjectError> {
267 let subject = subject.to_subject();
268 if !subject.is_valid() {
269 return Err(crate::subject::SubjectError::InvalidFormat);
270 }
271 Ok(subject)
272 }
273
274 /// Returns the default timeout for requests set when creating the client.
275 ///
276 /// # Examples
277 /// ```no_run
278 /// # #[tokio::main]
279 /// # async fn main() -> Result<(), async_nats::Error> {
280 /// let client = async_nats::connect("demo.nats.io").await?;
281 /// println!("default request timeout: {:?}", client.timeout());
282 /// # Ok(())
283 /// # }
284 /// ```
285 pub fn timeout(&self) -> Option<Duration> {
286 self.request_timeout
287 }
288
289 /// Returns the last received info from the server if one has been observed.
290 ///
291 /// This is useful when [`ConnectOptions::retry_on_initial_connect`][crate::ConnectOptions::retry_on_initial_connect]
292 /// is enabled and the client is returned before the first server `INFO`
293 /// message arrives.
294 ///
295 /// # Examples
296 ///
297 /// ```no_run
298 /// # #[tokio::main]
299 /// # async fn main () -> Result<(), async_nats::Error> {
300 /// let client = async_nats::ConnectOptions::new()
301 /// .retry_on_initial_connect()
302 /// .connect("demo.nats.io")
303 /// .await?;
304 /// println!("info available: {}", client.try_server_info().is_some());
305 /// # Ok(())
306 /// # }
307 /// ```
308 pub fn try_server_info(&self) -> Option<ServerInfo> {
309 // We ignore notifying the watcher, as that requires mutable client reference.
310 self.info.borrow().clone()
311 }
312
313 /// Returns last received info from the server.
314 ///
315 /// # Examples
316 ///
317 /// ```no_run
318 /// # #[tokio::main]
319 /// # async fn main () -> Result<(), async_nats::Error> {
320 /// let client = async_nats::connect("demo.nats.io").await?;
321 /// println!("info: {:?}", client.server_info());
322 /// # Ok(())
323 /// # }
324 /// ```
325 pub fn server_info(&self) -> ServerInfo {
326 self.try_server_info().unwrap_or_default()
327 }
328
329 /// Returns the maximum payload size currently used by the client.
330 ///
331 /// Before the first server `INFO` message arrives, this returns the default
332 /// server payload limit used for client-side publish validation.
333 ///
334 /// # Examples
335 ///
336 /// ```no_run
337 /// # #[tokio::main]
338 /// # async fn main () -> Result<(), async_nats::Error> {
339 /// let client = async_nats::connect("demo.nats.io").await?;
340 /// println!("max payload: {:?}", client.max_payload());
341 /// # Ok(())
342 /// # }
343 /// ```
344 pub fn max_payload(&self) -> usize {
345 self.max_payload.load(Ordering::Relaxed)
346 }
347
348 /// Returns true if the server version is compatible with the version components.
349 ///
350 /// This has to be used with caution, as it is not guaranteed that the server
351 /// that client is connected to is the same version that the one that is
352 /// a JetStream meta/stream/consumer leader, especially across leafnodes.
353 ///
354 /// # Examples
355 ///
356 /// ```no_run
357 /// # #[tokio::main]
358 /// # async fn main() -> Result<(), async_nats::Error> {
359 /// let client = async_nats::connect("demo.nats.io").await?;
360 /// assert!(client.is_server_compatible(2, 8, 4));
361 /// # Ok(())
362 /// # }
363 /// ```
364 pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
365 let info = self.server_info();
366
367 let server_version_captures = match VERSION_RE.captures(&info.version) {
368 Some(captures) => captures,
369 None => return false,
370 };
371
372 let server_major = server_version_captures
373 .get(1)
374 .map(|m| m.as_str().parse::<i64>().unwrap())
375 .unwrap();
376
377 let server_minor = server_version_captures
378 .get(2)
379 .map(|m| m.as_str().parse::<i64>().unwrap())
380 .unwrap();
381
382 let server_patch = server_version_captures
383 .get(3)
384 .map(|m| m.as_str().parse::<i64>().unwrap())
385 .unwrap();
386
387 if server_major < major
388 || (server_major == major && server_minor < minor)
389 || (server_major == major && server_minor == minor && server_patch < patch)
390 {
391 return false;
392 }
393 true
394 }
395
396 /// Publish a [Message] to a given subject.
397 ///
398 /// Returns `PublishErrorKind::InvalidSubject` if the subject is invalid
399 /// (empty or contains whitespace). This check can be disabled with
400 /// [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation] (empty subjects are
401 /// always rejected).
402 ///
403 /// # Examples
404 /// ```no_run
405 /// # #[tokio::main]
406 /// # async fn main() -> Result<(), async_nats::Error> {
407 /// let client = async_nats::connect("demo.nats.io").await?;
408 /// client.publish("events.data", "payload".into()).await?;
409 /// # Ok(())
410 /// # }
411 /// ```
412 pub async fn publish<S: ToSubject>(
413 &self,
414 subject: S,
415 payload: Bytes,
416 ) -> Result<(), PublishError> {
417 let subject = self
418 .maybe_validate_publish_subject(subject)
419 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
420
421 let max_payload = self.max_payload.load(Ordering::Relaxed);
422 if payload.len() > max_payload {
423 return Err(PublishError::with_source(
424 PublishErrorKind::MaxPayloadExceeded,
425 format!(
426 "Payload size limit of {} exceeded by message size of {}",
427 max_payload,
428 payload.len(),
429 ),
430 ));
431 }
432
433 self.sender
434 .send(Command::Publish(OutboundMessage {
435 subject,
436 payload,
437 reply: None,
438 headers: None,
439 }))
440 .await?;
441 Ok(())
442 }
443
444 /// Publish a [Message] with headers to a given subject.
445 ///
446 /// # Examples
447 /// ```
448 /// # #[tokio::main]
449 /// # async fn main() -> Result<(), async_nats::Error> {
450 /// use std::str::FromStr;
451 /// let client = async_nats::connect("demo.nats.io").await?;
452 /// let mut headers = async_nats::HeaderMap::new();
453 /// headers.insert(
454 /// "X-Header",
455 /// async_nats::HeaderValue::from_str("Value").unwrap(),
456 /// );
457 /// client
458 /// .publish_with_headers("events.data", headers, "payload".into())
459 /// .await?;
460 /// # Ok(())
461 /// # }
462 /// ```
463 pub async fn publish_with_headers<S: ToSubject>(
464 &self,
465 subject: S,
466 headers: HeaderMap,
467 payload: Bytes,
468 ) -> Result<(), PublishError> {
469 let subject = self
470 .maybe_validate_publish_subject(subject)
471 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
472
473 self.sender
474 .send(Command::Publish(OutboundMessage {
475 subject,
476 payload,
477 reply: None,
478 headers: Some(headers),
479 }))
480 .await?;
481 Ok(())
482 }
483
484 /// Publish a [Message] to a given subject, with specified response subject
485 /// to which the subscriber can respond.
486 /// This method does not await for the response.
487 ///
488 /// # Examples
489 ///
490 /// ```no_run
491 /// # #[tokio::main]
492 /// # async fn main() -> Result<(), async_nats::Error> {
493 /// let client = async_nats::connect("demo.nats.io").await?;
494 /// client
495 /// .publish_with_reply("events.data", "reply_subject", "payload".into())
496 /// .await?;
497 /// # Ok(())
498 /// # }
499 /// ```
500 pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
501 &self,
502 subject: S,
503 reply: R,
504 payload: Bytes,
505 ) -> Result<(), PublishError> {
506 let subject = self
507 .maybe_validate_publish_subject(subject)
508 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
509 let reply = self
510 .maybe_validate_publish_subject(reply)
511 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
512
513 self.sender
514 .send(Command::Publish(OutboundMessage {
515 subject,
516 payload,
517 reply: Some(reply),
518 headers: None,
519 }))
520 .await?;
521 Ok(())
522 }
523
524 /// Publish a [Message] to a given subject with headers and specified response subject
525 /// to which the subscriber can respond.
526 /// This method does not await for the response.
527 ///
528 /// # Examples
529 ///
530 /// ```no_run
531 /// # #[tokio::main]
532 /// # async fn main() -> Result<(), async_nats::Error> {
533 /// use std::str::FromStr;
534 /// let client = async_nats::connect("demo.nats.io").await?;
535 /// let mut headers = async_nats::HeaderMap::new();
536 /// client
537 /// .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
538 /// .await?;
539 /// # Ok(())
540 /// # }
541 /// ```
542 pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
543 &self,
544 subject: S,
545 reply: R,
546 headers: HeaderMap,
547 payload: Bytes,
548 ) -> Result<(), PublishError> {
549 let subject = self
550 .maybe_validate_publish_subject(subject)
551 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
552 let reply = self
553 .maybe_validate_publish_subject(reply)
554 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
555
556 self.sender
557 .send(Command::Publish(OutboundMessage {
558 subject,
559 payload,
560 reply: Some(reply),
561 headers: Some(headers),
562 }))
563 .await?;
564 Ok(())
565 }
566
567 /// Sends the request with headers.
568 ///
569 /// # Examples
570 /// ```no_run
571 /// # #[tokio::main]
572 /// # async fn main() -> Result<(), async_nats::Error> {
573 /// let client = async_nats::connect("demo.nats.io").await?;
574 /// let response = client.request("service", "data".into()).await?;
575 /// # Ok(())
576 /// # }
577 /// ```
578 pub async fn request<S: ToSubject>(
579 &self,
580 subject: S,
581 payload: Bytes,
582 ) -> Result<Message, RequestError> {
583 let request = Request::new().payload(payload);
584 self.send_request(subject, request).await
585 }
586
587 /// Sends the request with headers.
588 ///
589 /// # Examples
590 /// ```no_run
591 /// # #[tokio::main]
592 /// # async fn main() -> Result<(), async_nats::Error> {
593 /// let client = async_nats::connect("demo.nats.io").await?;
594 /// let mut headers = async_nats::HeaderMap::new();
595 /// headers.insert("Key", "Value");
596 /// let response = client
597 /// .request_with_headers("service", headers, "data".into())
598 /// .await?;
599 /// # Ok(())
600 /// # }
601 /// ```
602 pub async fn request_with_headers<S: ToSubject>(
603 &self,
604 subject: S,
605 headers: HeaderMap,
606 payload: Bytes,
607 ) -> Result<Message, RequestError> {
608 let request = Request::new().headers(headers).payload(payload);
609 self.send_request(subject, request).await
610 }
611
612 /// Sends the request created by the [Request].
613 ///
614 /// Returns `RequestErrorKind::InvalidSubject` if the subject is invalid
615 /// (empty or contains whitespace).
616 ///
617 /// # Examples
618 ///
619 /// ```no_run
620 /// # #[tokio::main]
621 /// # async fn main() -> Result<(), async_nats::Error> {
622 /// let client = async_nats::connect("demo.nats.io").await?;
623 /// let request = async_nats::Request::new().payload("data".into());
624 /// let response = client.send_request("service", request).await?;
625 /// # Ok(())
626 /// # }
627 /// ```
628 pub async fn send_request<S: ToSubject>(
629 &self,
630 subject: S,
631 request: Request,
632 ) -> Result<Message, RequestError> {
633 let subject = self
634 .maybe_validate_publish_subject(subject)
635 .map_err(|e| RequestError::with_source(RequestErrorKind::InvalidSubject, e))?;
636
637 if let Some(inbox) = request.inbox {
638 let timeout = request.timeout.unwrap_or(self.request_timeout);
639 let mut subscriber = self.subscribe(inbox.clone()).await?;
640 let payload: Bytes = request.payload.unwrap_or_default();
641 match request.headers {
642 Some(headers) => {
643 self.publish_with_reply_and_headers(subject, inbox, headers, payload)
644 .await?
645 }
646 None => self.publish_with_reply(subject, inbox, payload).await?,
647 }
648 let request = match timeout {
649 Some(timeout) => {
650 tokio::time::timeout(timeout, subscriber.next())
651 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
652 .await?
653 }
654 None => subscriber.next().await,
655 };
656 match request {
657 Some(message) => {
658 if message.status == Some(StatusCode::NO_RESPONDERS) {
659 return Err(RequestError::with_source(
660 RequestErrorKind::NoResponders,
661 "no responders",
662 ));
663 }
664 Ok(message)
665 }
666 None => Err(RequestError::with_source(
667 RequestErrorKind::Other,
668 "broken pipe",
669 )),
670 }
671 } else {
672 let (sender, receiver) = oneshot::channel();
673
674 let payload = request.payload.unwrap_or_default();
675 let respond = self.new_inbox().into();
676 let headers = request.headers;
677
678 self.sender
679 .send(Command::Request {
680 subject,
681 payload,
682 respond,
683 headers,
684 sender,
685 })
686 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
687 .await?;
688
689 let timeout = request.timeout.unwrap_or(self.request_timeout);
690 let request = match timeout {
691 Some(timeout) => {
692 tokio::time::timeout(timeout, receiver)
693 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
694 .await?
695 }
696 None => receiver.await,
697 };
698
699 match request {
700 Ok(message) => {
701 if message.status == Some(StatusCode::NO_RESPONDERS) {
702 return Err(RequestError::with_source(
703 RequestErrorKind::NoResponders,
704 "no responders",
705 ));
706 }
707 Ok(message)
708 }
709 Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
710 }
711 }
712 }
713
714 /// Create a new globally unique inbox which can be used for replies.
715 ///
716 /// # Examples
717 ///
718 /// ```no_run
719 /// # #[tokio::main]
720 /// # async fn main() -> Result<(), async_nats::Error> {
721 /// # let mut nc = async_nats::connect("demo.nats.io").await?;
722 /// let reply = nc.new_inbox();
723 /// let rsub = nc.subscribe(reply).await?;
724 /// # Ok(())
725 /// # }
726 /// ```
727 pub fn new_inbox(&self) -> String {
728 format!("{}.{}", self.inbox_prefix, crate::id_generator::next())
729 }
730
731 /// Subscribes to a subject to receive [messages][Message].
732 ///
733 /// Returns an error if the subject is invalid (empty, contains whitespace,
734 /// or has malformed dot structure). This validation always runs regardless
735 /// of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
736 ///
737 /// # Examples
738 ///
739 /// ```no_run
740 /// # #[tokio::main]
741 /// # async fn main() -> Result<(), async_nats::Error> {
742 /// use futures_util::StreamExt;
743 /// let client = async_nats::connect("demo.nats.io").await?;
744 /// let mut subscription = client.subscribe("events.>").await?;
745 /// while let Some(message) = subscription.next().await {
746 /// println!("received message: {:?}", message);
747 /// }
748 /// # Ok(())
749 /// # }
750 /// ```
751 pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
752 let subject = self
753 .validate_subscribe_subject(subject)
754 .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
755
756 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
757 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
758
759 self.sender
760 .send(Command::Subscribe {
761 sid,
762 subject,
763 queue_group: None,
764 sender,
765 })
766 .await?;
767
768 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
769 }
770
771 /// Subscribes to a subject with a queue group to receive [messages][Message].
772 ///
773 /// Returns an error if the subject is invalid (empty, contains whitespace,
774 /// or has malformed dot structure) or if the queue group name is invalid
775 /// (empty or contains whitespace). Subject and queue group validation always
776 /// runs regardless of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
777 ///
778 /// # Examples
779 ///
780 /// ```no_run
781 /// # #[tokio::main]
782 /// # async fn main() -> Result<(), async_nats::Error> {
783 /// use futures_util::StreamExt;
784 /// let client = async_nats::connect("demo.nats.io").await?;
785 /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
786 /// while let Some(message) = subscription.next().await {
787 /// println!("received message: {:?}", message);
788 /// }
789 /// # Ok(())
790 /// # }
791 /// ```
792 pub async fn queue_subscribe<S: ToSubject>(
793 &self,
794 subject: S,
795 queue_group: String,
796 ) -> Result<Subscriber, SubscribeError> {
797 let subject = self
798 .validate_subscribe_subject(subject)
799 .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
800
801 if !crate::is_valid_queue_group(&queue_group) {
802 return Err(SubscribeError::new(SubscribeErrorKind::InvalidQueueName));
803 }
804
805 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
806 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
807
808 self.sender
809 .send(Command::Subscribe {
810 sid,
811 subject,
812 queue_group: Some(queue_group),
813 sender,
814 })
815 .await?;
816
817 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
818 }
819
820 /// Flushes the internal buffer ensuring that all messages are sent.
821 ///
822 /// # Examples
823 ///
824 /// ```no_run
825 /// # #[tokio::main]
826 /// # async fn main() -> Result<(), async_nats::Error> {
827 /// let client = async_nats::connect("demo.nats.io").await?;
828 /// client.flush().await?;
829 /// # Ok(())
830 /// # }
831 /// ```
832 pub async fn flush(&self) -> Result<(), FlushError> {
833 let (tx, rx) = tokio::sync::oneshot::channel();
834 self.sender
835 .send(Command::Flush { observer: tx })
836 .await
837 .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
838
839 rx.await
840 .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
841 Ok(())
842 }
843
844 /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
845 /// messages, then closes the connection. Once completed, any streams associated with the
846 /// connection and its [Clients](crate::Client) will be closed, and further [crate::Client] commands will fail.
847 ///
848 /// # Examples
849 ///
850 /// ```no_run
851 /// # #[tokio::main]
852 /// # async fn main() -> Result<(), async_nats::Error> {
853 /// use futures_util::StreamExt;
854 /// let client = async_nats::connect("demo.nats.io").await?;
855 /// let mut subscription = client.subscribe("events.>").await?;
856 ///
857 /// client.drain().await?;
858 ///
859 /// # // existing subscriptions are closed and further commands will fail
860 /// assert!(subscription.next().await.is_none());
861 /// client
862 /// .subscribe("events.>")
863 /// .await
864 /// .expect_err("Expected further commands to fail");
865 ///
866 /// # Ok(())
867 /// # }
868 /// ```
869 pub async fn drain(&self) -> Result<(), DrainError> {
870 // Drain all subscriptions
871 self.sender.send(Command::Drain { sid: None }).await?;
872
873 // Remaining process is handled on the handler-side
874 Ok(())
875 }
876
877 /// Returns the current state of the connection.
878 ///
879 /// # Examples
880 /// ```no_run
881 /// # #[tokio::main]
882 /// # async fn main() -> Result<(), async_nats::Error> {
883 /// let client = async_nats::connect("demo.nats.io").await?;
884 /// println!("connection state: {}", client.connection_state());
885 /// # Ok(())
886 /// # }
887 /// ```
888 pub fn connection_state(&self) -> State {
889 self.state.borrow().to_owned()
890 }
891
892 /// Forces the client to reconnect.
893 /// Keep in mind that client will reconnect automatically if the connection is lost and this
894 /// method does not have to be used in normal circumstances.
895 /// However, if you want to force the client to reconnect, for example to re-trigger
896 /// the `auth-callback`, or manually rebalance connections, this method can be useful.
897 /// This method does not wait for connection to be re-established.
898 ///
899 /// # Examples
900 /// ```no_run
901 /// # #[tokio::main]
902 /// # async fn main() -> Result<(), async_nats::Error> {
903 /// let client = async_nats::connect("demo.nats.io").await?;
904 /// client.force_reconnect().await?;
905 /// # Ok(())
906 /// # }
907 /// ```
908 pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
909 self.sender
910 .send(Command::Reconnect)
911 .await
912 .map_err(Into::into)
913 }
914
915 /// Replaces the server pool used for reconnection attempts.
916 ///
917 /// The new pool takes effect on the next reconnect attempt; it does not
918 /// trigger an immediate reconnect. To force an immediate reconnect with
919 /// the new pool, call [`Client::force_reconnect`] after this method.
920 ///
921 /// Per-server state (failed attempt count, connection history) is preserved
922 /// for servers that appear in both the old and new pools.
923 ///
924 /// This also resets the global reconnection attempt counter, so any
925 /// progress toward [`ConnectOptions::max_reconnects`](crate::ConnectOptions::max_reconnects)
926 /// is cleared.
927 ///
928 /// # Examples
929 /// ```no_run
930 /// # #[tokio::main]
931 /// # async fn main() -> Result<(), async_nats::Error> {
932 /// let client = async_nats::connect("demo.nats.io").await?;
933 /// client
934 /// .set_server_pool(["nats://server1:4222", "nats://server2:4222"].as_slice())
935 /// .await?;
936 /// // Optionally force reconnect to apply immediately:
937 /// client.force_reconnect().await?;
938 /// # Ok(())
939 /// # }
940 /// ```
941 pub async fn set_server_pool<A: crate::ToServerAddrs>(
942 &self,
943 addrs: A,
944 ) -> Result<(), SetServerPoolError> {
945 let servers: Vec<crate::ServerAddr> = addrs
946 .to_server_addrs()
947 .map_err(|err| {
948 SetServerPoolError::with_source(SetServerPoolErrorKind::InvalidAddress, err)
949 })?
950 .collect();
951
952 if servers.is_empty() {
953 return Err(SetServerPoolError::new(SetServerPoolErrorKind::EmptyPool));
954 }
955
956 let (tx, rx) = oneshot::channel();
957 self.sender
958 .send(Command::SetServerPool {
959 servers,
960 result: tx,
961 })
962 .await
963 .map_err(|err| SetServerPoolError::with_source(SetServerPoolErrorKind::Send, err))?;
964
965 rx.await
966 .map_err(|err| SetServerPoolError::with_source(SetServerPoolErrorKind::Send, err))?
967 .map_err(|err| {
968 SetServerPoolError::with_source(SetServerPoolErrorKind::MixedSchemes, err)
969 })
970 }
971
972 /// Returns a snapshot of the current server pool.
973 ///
974 /// The returned list includes both explicitly configured and discovered
975 /// servers, along with per-server metadata such as reconnect count and
976 /// connection history.
977 ///
978 /// # Examples
979 /// ```no_run
980 /// # #[tokio::main]
981 /// # async fn main() -> Result<(), async_nats::Error> {
982 /// let client = async_nats::connect("demo.nats.io").await?;
983 /// let pool = client.server_pool().await?;
984 /// for server in &pool {
985 /// println!(
986 /// "{:?}: {} failed attempts",
987 /// server.addr, server.failed_attempts
988 /// );
989 /// }
990 /// # Ok(())
991 /// # }
992 /// ```
993 pub async fn server_pool(&self) -> Result<Vec<crate::Server>, ServerPoolError> {
994 let (tx, rx) = oneshot::channel();
995 self.sender
996 .send(Command::ServerPool { result: tx })
997 .await
998 .map_err(|err| ServerPoolError::with_source(ServerPoolErrorKind::Send, err))?;
999
1000 rx.await
1001 .map_err(|err| ServerPoolError::with_source(ServerPoolErrorKind::Send, err))
1002 }
1003
1004 /// Returns struct representing statistics of the whole lifecycle of the client.
1005 /// This includes number of bytes sent/received, number of messages sent/received,
1006 /// and number of times the connection was established.
1007 /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
1008 /// across threads.
1009 ///
1010 /// # Examples
1011 /// ```no_run
1012 /// # #[tokio::main]
1013 /// # async fn main() -> Result<(), async_nats::Error> {
1014 /// let client = async_nats::connect("demo.nats.io").await?;
1015 /// let statistics = client.statistics();
1016 /// println!("client statistics: {:#?}", statistics);
1017 /// # Ok(())
1018 /// # }
1019 /// ```
1020 pub fn statistics(&self) -> Arc<Statistics> {
1021 self.connection_stats.clone()
1022 }
1023}
1024
1025/// Used for building customized requests.
1026#[derive(Default)]
1027pub struct Request {
1028 pub payload: Option<Bytes>,
1029 pub headers: Option<HeaderMap>,
1030 pub timeout: Option<Option<Duration>>,
1031 pub inbox: Option<String>,
1032}
1033
1034impl Request {
1035 pub fn new() -> Request {
1036 Default::default()
1037 }
1038
1039 /// Sets the payload of the request. If not used, empty payload will be sent.
1040 ///
1041 /// # Examples
1042 /// ```no_run
1043 /// # #[tokio::main]
1044 /// # async fn main() -> Result<(), async_nats::Error> {
1045 /// let client = async_nats::connect("demo.nats.io").await?;
1046 /// let request = async_nats::Request::new().payload("data".into());
1047 /// client.send_request("service", request).await?;
1048 /// # Ok(())
1049 /// # }
1050 /// ```
1051 pub fn payload(mut self, payload: Bytes) -> Request {
1052 self.payload = Some(payload);
1053 self
1054 }
1055
1056 /// Sets the headers of the requests.
1057 ///
1058 /// # Examples
1059 /// ```no_run
1060 /// # #[tokio::main]
1061 /// # async fn main() -> Result<(), async_nats::Error> {
1062 /// use std::str::FromStr;
1063 /// let client = async_nats::connect("demo.nats.io").await?;
1064 /// let mut headers = async_nats::HeaderMap::new();
1065 /// headers.insert(
1066 /// "X-Example",
1067 /// async_nats::HeaderValue::from_str("Value").unwrap(),
1068 /// );
1069 /// let request = async_nats::Request::new()
1070 /// .headers(headers)
1071 /// .payload("data".into());
1072 /// client.send_request("service", request).await?;
1073 /// # Ok(())
1074 /// # }
1075 /// ```
1076 pub fn headers(mut self, headers: HeaderMap) -> Request {
1077 self.headers = Some(headers);
1078 self
1079 }
1080
1081 /// Sets the custom timeout of the request. Overrides default [Client] timeout.
1082 /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
1083 /// To use default timeout, simply do not call this function.
1084 ///
1085 /// # Examples
1086 /// ```no_run
1087 /// # #[tokio::main]
1088 /// # async fn main() -> Result<(), async_nats::Error> {
1089 /// let client = async_nats::connect("demo.nats.io").await?;
1090 /// let request = async_nats::Request::new()
1091 /// .timeout(Some(std::time::Duration::from_secs(15)))
1092 /// .payload("data".into());
1093 /// client.send_request("service", request).await?;
1094 /// # Ok(())
1095 /// # }
1096 /// ```
1097 pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
1098 self.timeout = Some(timeout);
1099 self
1100 }
1101
1102 /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
1103 ///
1104 /// # Examples
1105 /// ```no_run
1106 /// # #[tokio::main]
1107 /// # async fn main() -> Result<(), async_nats::Error> {
1108 /// use std::str::FromStr;
1109 /// let client = async_nats::connect("demo.nats.io").await?;
1110 /// let request = async_nats::Request::new()
1111 /// .inbox("custom_inbox".into())
1112 /// .payload("data".into());
1113 /// client.send_request("service", request).await?;
1114 /// # Ok(())
1115 /// # }
1116 /// ```
1117 pub fn inbox(mut self, inbox: String) -> Request {
1118 self.inbox = Some(inbox);
1119 self
1120 }
1121}
1122
1123#[derive(Error, Debug)]
1124#[error("failed to send reconnect: {0}")]
1125pub struct ReconnectError(#[source] crate::Error);
1126
1127impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
1128 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1129 ReconnectError(Box::new(err))
1130 }
1131}
1132
1133/// An error returned from [`Client::set_server_pool`].
1134pub type SetServerPoolError = Error<SetServerPoolErrorKind>;
1135
1136#[derive(Copy, Clone, Debug, PartialEq)]
1137pub enum SetServerPoolErrorKind {
1138 /// Failed to send command to the connection handler.
1139 Send,
1140 /// One or more server addresses could not be parsed.
1141 InvalidAddress,
1142 /// The pool contains a mix of WebSocket (`ws://`, `wss://`) and
1143 /// non-websocket (`nats://`, `tls://`) URLs, which is not allowed.
1144 MixedSchemes,
1145 /// The server pool cannot be empty.
1146 EmptyPool,
1147}
1148
1149impl Display for SetServerPoolErrorKind {
1150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1151 match self {
1152 Self::Send => write!(f, "failed to send set_server_pool command"),
1153 Self::InvalidAddress => write!(f, "invalid server address"),
1154 Self::EmptyPool => write!(f, "server pool cannot be empty"),
1155 Self::MixedSchemes => write!(
1156 f,
1157 "cannot mix websocket and non-websocket URLs in server pool"
1158 ),
1159 }
1160 }
1161}
1162
1163/// An error returned from [`Client::server_pool`].
1164pub type ServerPoolError = Error<ServerPoolErrorKind>;
1165
1166#[derive(Copy, Clone, Debug, PartialEq)]
1167pub enum ServerPoolErrorKind {
1168 /// Failed to send command to the connection handler.
1169 Send,
1170}
1171
1172impl Display for ServerPoolErrorKind {
1173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1174 match self {
1175 Self::Send => write!(f, "failed to send server_pool command"),
1176 }
1177 }
1178}
1179
1180/// An error returned from the [`Client::subscribe`] or [`Client::queue_subscribe`] functions.
1181pub type SubscribeError = Error<SubscribeErrorKind>;
1182
1183impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
1184 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1185 SubscribeError::with_source(SubscribeErrorKind::Other, err)
1186 }
1187}
1188
1189#[derive(Copy, Clone, Debug, PartialEq)]
1190pub enum SubscribeErrorKind {
1191 /// The subject is invalid (empty, contains whitespace, or has malformed dot structure).
1192 InvalidSubject,
1193 /// The queue group name is invalid (empty or contains whitespace).
1194 InvalidQueueName,
1195 /// Other errors, client/io related.
1196 Other,
1197}
1198
1199impl Display for SubscribeErrorKind {
1200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1201 match self {
1202 Self::InvalidSubject => write!(f, "invalid subject"),
1203 Self::InvalidQueueName => write!(f, "invalid queue name"),
1204 Self::Other => write!(f, "subscribe failed"),
1205 }
1206 }
1207}
1208
1209#[derive(Error, Debug)]
1210#[error("failed to send drain: {0}")]
1211pub struct DrainError(#[source] crate::Error);
1212
1213impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
1214 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1215 DrainError(Box::new(err))
1216 }
1217}
1218
1219#[derive(Clone, Copy, Debug, PartialEq)]
1220pub enum RequestErrorKind {
1221 /// There are services listening on requested subject, but they didn't respond
1222 /// in time.
1223 TimedOut,
1224 /// No one is listening on request subject.
1225 NoResponders,
1226 /// The subject is invalid (empty or contains whitespace).
1227 InvalidSubject,
1228 /// Other errors, client/io related.
1229 Other,
1230}
1231
1232impl Display for RequestErrorKind {
1233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1234 match self {
1235 Self::TimedOut => write!(f, "request timed out"),
1236 Self::NoResponders => write!(f, "no responders"),
1237 Self::InvalidSubject => write!(f, "invalid subject"),
1238 Self::Other => write!(f, "request failed"),
1239 }
1240 }
1241}
1242
1243/// Error returned when a core NATS request fails.
1244/// To be enumerate over the variants, call [RequestError::kind].
1245pub type RequestError = Error<RequestErrorKind>;
1246
1247impl From<PublishError> for RequestError {
1248 fn from(e: PublishError) -> Self {
1249 RequestError::with_source(RequestErrorKind::Other, e)
1250 }
1251}
1252
1253impl From<SubscribeError> for RequestError {
1254 fn from(e: SubscribeError) -> Self {
1255 RequestError::with_source(RequestErrorKind::Other, e)
1256 }
1257}
1258
1259#[derive(Clone, Copy, Debug, PartialEq)]
1260pub enum FlushErrorKind {
1261 /// Sending the flush failed client side.
1262 SendError,
1263 /// Flush failed.
1264 /// This can happen mostly in case of connection issues
1265 /// that cannot be resolved quickly.
1266 FlushError,
1267}
1268
1269impl Display for FlushErrorKind {
1270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1271 match self {
1272 Self::SendError => write!(f, "failed to send flush request"),
1273 Self::FlushError => write!(f, "flush failed"),
1274 }
1275 }
1276}
1277
1278pub type FlushError = Error<FlushErrorKind>;
1279
1280/// Represents statistics for the instance of the client throughout its lifecycle.
1281#[derive(Default, Debug)]
1282pub struct Statistics {
1283 /// Number of bytes received. This does not include the protocol overhead.
1284 pub in_bytes: AtomicU64,
1285 /// Number of bytes sent. This doe not include the protocol overhead.
1286 pub out_bytes: AtomicU64,
1287 /// Number of messages received.
1288 pub in_messages: AtomicU64,
1289 /// Number of messages sent.
1290 pub out_messages: AtomicU64,
1291 /// Number of times connection was established.
1292 /// Initial connect will be counted as well, then all successful reconnects.
1293 pub connects: AtomicU64,
1294}