async_nats/client.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::message::OutboundMessage;
20use crate::subject::ToSubject;
21use crate::ServerInfo;
22
23use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
24use crate::error::Error;
25use bytes::Bytes;
26use futures_util::future::TryFutureExt;
27use futures_util::{Sink, SinkExt as _, StreamExt};
28use portable_atomic::AtomicU64;
29use regex::Regex;
30use std::fmt::Display;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::sync::Arc;
33use std::sync::LazyLock;
34use std::time::Duration;
35use thiserror::Error;
36use tokio::sync::{mpsc, oneshot};
37use tokio_util::sync::PollSender;
38
39static VERSION_RE: LazyLock<Regex> =
40 LazyLock::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
41
42/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
43/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
44pub type PublishError = Error<PublishErrorKind>;
45
46impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
47 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
48 PublishError::with_source(PublishErrorKind::Send, err)
49 }
50}
51
52impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
53 fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
54 PublishError::with_source(PublishErrorKind::Send, err)
55 }
56}
57
58#[derive(Copy, Clone, Debug, PartialEq)]
59pub enum PublishErrorKind {
60 MaxPayloadExceeded,
61 InvalidSubject,
62 Send,
63}
64
65impl Display for PublishErrorKind {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match self {
68 PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
69 PublishErrorKind::Send => write!(f, "failed to send message"),
70 PublishErrorKind::InvalidSubject => write!(f, "invalid subject"),
71 }
72 }
73}
74
75/// Shared error text for the max-payload errors, from `(max_payload, size)`.
76pub(crate) fn max_payload_message((max_payload, size): (usize, usize)) -> String {
77 format!("Payload size limit of {max_payload} exceeded by message size of {size}")
78}
79
80/// `map_err` helper turning a size violation into a [`PublishError`].
81fn max_payload_error(sizes: (usize, usize)) -> PublishError {
82 PublishError::with_source(
83 PublishErrorKind::MaxPayloadExceeded,
84 max_payload_message(sizes),
85 )
86}
87
88/// Client is a `Cloneable` handle to NATS connection.
89/// Client should not be created directly. Instead, one of two methods can be used:
90/// [crate::connect] and [crate::ConnectOptions::connect]
91#[derive(Clone, Debug)]
92pub struct Client {
93 info: tokio::sync::watch::Receiver<Option<ServerInfo>>,
94 pub(crate) state: tokio::sync::watch::Receiver<State>,
95 pub(crate) sender: mpsc::Sender<Command>,
96 poll_sender: PollSender<Command>,
97 next_subscription_id: Arc<AtomicU64>,
98 subscription_capacity: usize,
99 inbox_prefix: Arc<str>,
100 request_timeout: Option<Duration>,
101 max_payload: Arc<AtomicUsize>,
102 connection_stats: Arc<Statistics>,
103 skip_subject_validation: bool,
104}
105
106pub mod traits {
107 use std::{future::Future, time::Duration};
108
109 use bytes::Bytes;
110
111 use crate::{message, subject::ToSubject, Message};
112
113 use super::{PublishError, Request, RequestError, SubscribeError};
114
115 pub trait Publisher {
116 fn publish_with_reply<S, R>(
117 &self,
118 subject: S,
119 reply: R,
120 payload: Bytes,
121 ) -> impl Future<Output = Result<(), PublishError>>
122 where
123 S: ToSubject,
124 R: ToSubject;
125
126 fn publish_message(
127 &self,
128 msg: message::OutboundMessage,
129 ) -> impl Future<Output = Result<(), PublishError>>;
130 }
131 pub trait Subscriber {
132 fn subscribe<S>(
133 &self,
134 subject: S,
135 ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>
136 where
137 S: ToSubject;
138 }
139 pub trait Requester {
140 fn send_request<S>(
141 &self,
142 subject: S,
143 request: Request,
144 ) -> impl Future<Output = Result<Message, RequestError>>
145 where
146 S: ToSubject;
147 }
148 pub trait TimeoutProvider {
149 fn timeout(&self) -> Option<Duration>;
150 }
151}
152
153impl traits::Requester for Client {
154 fn send_request<S: ToSubject>(
155 &self,
156 subject: S,
157 request: Request,
158 ) -> impl Future<Output = Result<Message, RequestError>> {
159 self.send_request(subject, request)
160 }
161}
162
163impl traits::TimeoutProvider for Client {
164 fn timeout(&self) -> Option<Duration> {
165 self.timeout()
166 }
167}
168
169impl traits::Publisher for Client {
170 fn publish_with_reply<S, R>(
171 &self,
172 subject: S,
173 reply: R,
174 payload: Bytes,
175 ) -> impl Future<Output = Result<(), PublishError>>
176 where
177 S: ToSubject,
178 R: ToSubject,
179 {
180 self.publish_with_reply(subject, reply, payload)
181 }
182
183 async fn publish_message(&self, msg: OutboundMessage) -> Result<(), PublishError> {
184 if msg.subject.is_empty()
185 || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&msg.subject))
186 {
187 return Err(PublishError::with_source(
188 PublishErrorKind::InvalidSubject,
189 crate::subject::SubjectError::InvalidFormat,
190 ));
191 }
192 self.check_payload_size(msg.headers.as_ref(), msg.payload.len())
193 .map_err(max_payload_error)?;
194 self.sender
195 .send(Command::Publish(msg))
196 .await
197 .map_err(Into::into)
198 }
199}
200
201impl traits::Subscriber for Client {
202 fn subscribe<S>(&self, subject: S) -> impl Future<Output = Result<Subscriber, SubscribeError>>
203 where
204 S: ToSubject,
205 {
206 self.subscribe(subject)
207 }
208}
209
210impl Sink<OutboundMessage> for Client {
211 type Error = PublishError;
212
213 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
214 self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
215 }
216
217 fn start_send(mut self: Pin<&mut Self>, msg: OutboundMessage) -> Result<(), Self::Error> {
218 self.check_payload_size(msg.headers.as_ref(), msg.payload.len())
219 .map_err(max_payload_error)?;
220 self.poll_sender
221 .start_send_unpin(Command::Publish(msg))
222 .map_err(Into::into)
223 }
224
225 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
226 self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
227 }
228
229 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
230 self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
231 }
232}
233
234impl Client {
235 #[allow(clippy::too_many_arguments)]
236 pub(crate) fn new(
237 info: tokio::sync::watch::Receiver<Option<ServerInfo>>,
238 state: tokio::sync::watch::Receiver<State>,
239 sender: mpsc::Sender<Command>,
240 capacity: usize,
241 inbox_prefix: String,
242 request_timeout: Option<Duration>,
243 max_payload: Arc<AtomicUsize>,
244 statistics: Arc<Statistics>,
245 skip_subject_validation: bool,
246 ) -> Client {
247 let poll_sender = PollSender::new(sender.clone());
248 Client {
249 info,
250 state,
251 sender,
252 poll_sender,
253 next_subscription_id: Arc::new(AtomicU64::new(1)),
254 subscription_capacity: capacity,
255 inbox_prefix: inbox_prefix.into(),
256 request_timeout,
257 max_payload,
258 connection_stats: statistics,
259 skip_subject_validation,
260 }
261 }
262
263 /// Validates a subject for publishing (protocol-framing safety only).
264 /// Checks for empty and whitespace. Does not check dot structure.
265 pub(crate) fn maybe_validate_publish_subject<S: ToSubject>(
266 &self,
267 subject: S,
268 ) -> Result<crate::Subject, crate::subject::SubjectError> {
269 let subject = subject.to_subject();
270 if subject.is_empty()
271 || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&subject))
272 {
273 return Err(crate::subject::SubjectError::InvalidFormat);
274 }
275 Ok(subject)
276 }
277
278 /// Validates a subject for subscribing (protocol-framing + dot structure).
279 /// Always runs regardless of `skip_subject_validation` (matches Go/Java behavior).
280 pub(crate) fn validate_subscribe_subject<S: ToSubject>(
281 &self,
282 subject: S,
283 ) -> Result<crate::Subject, crate::subject::SubjectError> {
284 let subject = subject.to_subject();
285 if !subject.is_valid() {
286 return Err(crate::subject::SubjectError::InvalidFormat);
287 }
288 Ok(subject)
289 }
290
291 /// Returns the default timeout for requests set when creating the client.
292 ///
293 /// # Examples
294 /// ```no_run
295 /// # #[tokio::main]
296 /// # async fn main() -> Result<(), async_nats::Error> {
297 /// let client = async_nats::connect("demo.nats.io").await?;
298 /// println!("default request timeout: {:?}", client.timeout());
299 /// # Ok(())
300 /// # }
301 /// ```
302 pub fn timeout(&self) -> Option<Duration> {
303 self.request_timeout
304 }
305
306 /// Returns the last received info from the server if one has been observed.
307 ///
308 /// This is useful when [`ConnectOptions::retry_on_initial_connect`][crate::ConnectOptions::retry_on_initial_connect]
309 /// is enabled and the client is returned before the first server `INFO`
310 /// message arrives.
311 ///
312 /// # Examples
313 ///
314 /// ```no_run
315 /// # #[tokio::main]
316 /// # async fn main () -> Result<(), async_nats::Error> {
317 /// let client = async_nats::ConnectOptions::new()
318 /// .retry_on_initial_connect()
319 /// .connect("demo.nats.io")
320 /// .await?;
321 /// println!("info available: {}", client.try_server_info().is_some());
322 /// # Ok(())
323 /// # }
324 /// ```
325 pub fn try_server_info(&self) -> Option<ServerInfo> {
326 // We ignore notifying the watcher, as that requires mutable client reference.
327 self.info.borrow().clone()
328 }
329
330 /// Returns last received info from the server.
331 ///
332 /// # Examples
333 ///
334 /// ```no_run
335 /// # #[tokio::main]
336 /// # async fn main () -> Result<(), async_nats::Error> {
337 /// let client = async_nats::connect("demo.nats.io").await?;
338 /// println!("info: {:?}", client.server_info());
339 /// # Ok(())
340 /// # }
341 /// ```
342 pub fn server_info(&self) -> ServerInfo {
343 self.try_server_info().unwrap_or_default()
344 }
345
346 /// Returns the maximum payload size currently used by the client.
347 ///
348 /// Before the first server `INFO` message arrives, this returns the default
349 /// server payload limit used for client-side publish validation.
350 ///
351 /// # Examples
352 ///
353 /// ```no_run
354 /// # #[tokio::main]
355 /// # async fn main () -> Result<(), async_nats::Error> {
356 /// let client = async_nats::connect("demo.nats.io").await?;
357 /// println!("max payload: {:?}", client.max_payload());
358 /// # Ok(())
359 /// # }
360 /// ```
361 pub fn max_payload(&self) -> usize {
362 self.max_payload.load(Ordering::Relaxed)
363 }
364
365 /// Checks the wire size (serialized headers + payload, matching the server's
366 /// `HPUB` total) against the negotiated `max_payload`. On violation returns
367 /// `(max_payload, size)` for the caller to wrap in its own error.
368 pub(crate) fn check_payload_size(
369 &self,
370 headers: Option<&HeaderMap>,
371 payload_len: usize,
372 ) -> Result<(), (usize, usize)> {
373 let max_payload = self.max_payload.load(Ordering::Relaxed);
374 let size = match headers {
375 Some(headers) if !headers.is_empty() => headers.wire_len() + payload_len,
376 _ => payload_len,
377 };
378 if size > max_payload {
379 Err((max_payload, size))
380 } else {
381 Ok(())
382 }
383 }
384
385 /// Returns true if the server version is compatible with the version components.
386 ///
387 /// This has to be used with caution, as it is not guaranteed that the server
388 /// that client is connected to is the same version that the one that is
389 /// a JetStream meta/stream/consumer leader, especially across leafnodes.
390 ///
391 /// # Examples
392 ///
393 /// ```no_run
394 /// # #[tokio::main]
395 /// # async fn main() -> Result<(), async_nats::Error> {
396 /// let client = async_nats::connect("demo.nats.io").await?;
397 /// assert!(client.is_server_compatible(2, 8, 4));
398 /// # Ok(())
399 /// # }
400 /// ```
401 pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
402 let info = self.server_info();
403
404 let server_version_captures = match VERSION_RE.captures(&info.version) {
405 Some(captures) => captures,
406 None => return false,
407 };
408
409 let server_major = server_version_captures
410 .get(1)
411 .map(|m| m.as_str().parse::<i64>().unwrap())
412 .unwrap();
413
414 let server_minor = server_version_captures
415 .get(2)
416 .map(|m| m.as_str().parse::<i64>().unwrap())
417 .unwrap();
418
419 let server_patch = server_version_captures
420 .get(3)
421 .map(|m| m.as_str().parse::<i64>().unwrap())
422 .unwrap();
423
424 if server_major < major
425 || (server_major == major && server_minor < minor)
426 || (server_major == major && server_minor == minor && server_patch < patch)
427 {
428 return false;
429 }
430 true
431 }
432
433 /// Publish a [Message] to a given subject.
434 ///
435 /// Returns `PublishErrorKind::InvalidSubject` if the subject is invalid
436 /// (empty or contains whitespace). This check can be disabled with
437 /// [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation] (empty subjects are
438 /// always rejected).
439 ///
440 /// # Examples
441 /// ```no_run
442 /// # #[tokio::main]
443 /// # async fn main() -> Result<(), async_nats::Error> {
444 /// let client = async_nats::connect("demo.nats.io").await?;
445 /// client.publish("events.data", "payload".into()).await?;
446 /// # Ok(())
447 /// # }
448 /// ```
449 pub async fn publish<S: ToSubject>(
450 &self,
451 subject: S,
452 payload: Bytes,
453 ) -> Result<(), PublishError> {
454 let subject = self
455 .maybe_validate_publish_subject(subject)
456 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
457
458 self.check_payload_size(None, payload.len())
459 .map_err(max_payload_error)?;
460
461 self.sender
462 .send(Command::Publish(OutboundMessage {
463 subject,
464 payload,
465 reply: None,
466 headers: None,
467 }))
468 .await?;
469 Ok(())
470 }
471
472 /// Publish a [Message] with headers to a given subject.
473 ///
474 /// # Examples
475 /// ```
476 /// # #[tokio::main]
477 /// # async fn main() -> Result<(), async_nats::Error> {
478 /// use std::str::FromStr;
479 /// let client = async_nats::connect("demo.nats.io").await?;
480 /// let mut headers = async_nats::HeaderMap::new();
481 /// headers.insert(
482 /// "X-Header",
483 /// async_nats::HeaderValue::from_str("Value").unwrap(),
484 /// );
485 /// client
486 /// .publish_with_headers("events.data", headers, "payload".into())
487 /// .await?;
488 /// # Ok(())
489 /// # }
490 /// ```
491 pub async fn publish_with_headers<S: ToSubject>(
492 &self,
493 subject: S,
494 headers: HeaderMap,
495 payload: Bytes,
496 ) -> Result<(), PublishError> {
497 let subject = self
498 .maybe_validate_publish_subject(subject)
499 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
500
501 self.check_payload_size(Some(&headers), payload.len())
502 .map_err(max_payload_error)?;
503
504 self.sender
505 .send(Command::Publish(OutboundMessage {
506 subject,
507 payload,
508 reply: None,
509 headers: Some(headers),
510 }))
511 .await?;
512 Ok(())
513 }
514
515 /// Publish a [Message] to a given subject, with specified response subject
516 /// to which the subscriber can respond.
517 /// This method does not await for the response.
518 ///
519 /// # Examples
520 ///
521 /// ```no_run
522 /// # #[tokio::main]
523 /// # async fn main() -> Result<(), async_nats::Error> {
524 /// let client = async_nats::connect("demo.nats.io").await?;
525 /// client
526 /// .publish_with_reply("events.data", "reply_subject", "payload".into())
527 /// .await?;
528 /// # Ok(())
529 /// # }
530 /// ```
531 pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
532 &self,
533 subject: S,
534 reply: R,
535 payload: Bytes,
536 ) -> Result<(), PublishError> {
537 let subject = self
538 .maybe_validate_publish_subject(subject)
539 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
540 let reply = self
541 .maybe_validate_publish_subject(reply)
542 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
543
544 self.check_payload_size(None, payload.len())
545 .map_err(max_payload_error)?;
546
547 self.sender
548 .send(Command::Publish(OutboundMessage {
549 subject,
550 payload,
551 reply: Some(reply),
552 headers: None,
553 }))
554 .await?;
555 Ok(())
556 }
557
558 /// Publish a [Message] to a given subject with headers and specified response subject
559 /// to which the subscriber can respond.
560 /// This method does not await for the response.
561 ///
562 /// # Examples
563 ///
564 /// ```no_run
565 /// # #[tokio::main]
566 /// # async fn main() -> Result<(), async_nats::Error> {
567 /// use std::str::FromStr;
568 /// let client = async_nats::connect("demo.nats.io").await?;
569 /// let mut headers = async_nats::HeaderMap::new();
570 /// client
571 /// .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
572 /// .await?;
573 /// # Ok(())
574 /// # }
575 /// ```
576 pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
577 &self,
578 subject: S,
579 reply: R,
580 headers: HeaderMap,
581 payload: Bytes,
582 ) -> Result<(), PublishError> {
583 let subject = self
584 .maybe_validate_publish_subject(subject)
585 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
586 let reply = self
587 .maybe_validate_publish_subject(reply)
588 .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
589
590 self.check_payload_size(Some(&headers), payload.len())
591 .map_err(max_payload_error)?;
592
593 self.sender
594 .send(Command::Publish(OutboundMessage {
595 subject,
596 payload,
597 reply: Some(reply),
598 headers: Some(headers),
599 }))
600 .await?;
601 Ok(())
602 }
603
604 /// Sends the request with headers.
605 ///
606 /// # Examples
607 /// ```no_run
608 /// # #[tokio::main]
609 /// # async fn main() -> Result<(), async_nats::Error> {
610 /// let client = async_nats::connect("demo.nats.io").await?;
611 /// let response = client.request("service", "data".into()).await?;
612 /// # Ok(())
613 /// # }
614 /// ```
615 pub async fn request<S: ToSubject>(
616 &self,
617 subject: S,
618 payload: Bytes,
619 ) -> Result<Message, RequestError> {
620 let request = Request::new().payload(payload);
621 self.send_request(subject, request).await
622 }
623
624 /// Sends the request with headers.
625 ///
626 /// # Examples
627 /// ```no_run
628 /// # #[tokio::main]
629 /// # async fn main() -> Result<(), async_nats::Error> {
630 /// let client = async_nats::connect("demo.nats.io").await?;
631 /// let mut headers = async_nats::HeaderMap::new();
632 /// headers.insert("Key", "Value");
633 /// let response = client
634 /// .request_with_headers("service", headers, "data".into())
635 /// .await?;
636 /// # Ok(())
637 /// # }
638 /// ```
639 pub async fn request_with_headers<S: ToSubject>(
640 &self,
641 subject: S,
642 headers: HeaderMap,
643 payload: Bytes,
644 ) -> Result<Message, RequestError> {
645 let request = Request::new().headers(headers).payload(payload);
646 self.send_request(subject, request).await
647 }
648
649 /// Sends the request created by the [Request].
650 ///
651 /// Returns `RequestErrorKind::InvalidSubject` if the subject is invalid
652 /// (empty or contains whitespace).
653 ///
654 /// # Examples
655 ///
656 /// ```no_run
657 /// # #[tokio::main]
658 /// # async fn main() -> Result<(), async_nats::Error> {
659 /// let client = async_nats::connect("demo.nats.io").await?;
660 /// let request = async_nats::Request::new().payload("data".into());
661 /// let response = client.send_request("service", request).await?;
662 /// # Ok(())
663 /// # }
664 /// ```
665 pub async fn send_request<S: ToSubject>(
666 &self,
667 subject: S,
668 request: Request,
669 ) -> Result<Message, RequestError> {
670 let subject = self
671 .maybe_validate_publish_subject(subject)
672 .map_err(|e| RequestError::with_source(RequestErrorKind::InvalidSubject, e))?;
673
674 // Reject oversized requests up front, else they'd be sent and time out.
675 self.check_payload_size(
676 request.headers.as_ref(),
677 request.payload.as_ref().map_or(0, Bytes::len),
678 )
679 .map_err(|sizes| {
680 RequestError::with_source(
681 RequestErrorKind::MaxPayloadExceeded,
682 max_payload_message(sizes),
683 )
684 })?;
685
686 if let Some(inbox) = request.inbox {
687 let timeout = request.timeout.unwrap_or(self.request_timeout);
688 let mut subscriber = self.subscribe(inbox.clone()).await?;
689 let payload: Bytes = request.payload.unwrap_or_default();
690 match request.headers {
691 Some(headers) => {
692 self.publish_with_reply_and_headers(subject, inbox, headers, payload)
693 .await?
694 }
695 None => self.publish_with_reply(subject, inbox, payload).await?,
696 }
697 let request = match timeout {
698 Some(timeout) => {
699 tokio::time::timeout(timeout, subscriber.next())
700 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
701 .await?
702 }
703 None => subscriber.next().await,
704 };
705 match request {
706 Some(message) => {
707 if message.status == Some(StatusCode::NO_RESPONDERS) {
708 return Err(RequestError::with_source(
709 RequestErrorKind::NoResponders,
710 "no responders",
711 ));
712 }
713 Ok(message)
714 }
715 None => Err(RequestError::with_source(
716 RequestErrorKind::Other,
717 "broken pipe",
718 )),
719 }
720 } else {
721 let (sender, receiver) = oneshot::channel();
722
723 let payload = request.payload.unwrap_or_default();
724 let respond = self.new_inbox().into();
725 let headers = request.headers;
726
727 self.sender
728 .send(Command::Request {
729 subject,
730 payload,
731 respond,
732 headers,
733 sender,
734 })
735 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
736 .await?;
737
738 let timeout = request.timeout.unwrap_or(self.request_timeout);
739 let request = match timeout {
740 Some(timeout) => {
741 tokio::time::timeout(timeout, receiver)
742 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
743 .await?
744 }
745 None => receiver.await,
746 };
747
748 match request {
749 Ok(message) => {
750 if message.status == Some(StatusCode::NO_RESPONDERS) {
751 return Err(RequestError::with_source(
752 RequestErrorKind::NoResponders,
753 "no responders",
754 ));
755 }
756 Ok(message)
757 }
758 Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
759 }
760 }
761 }
762
763 /// Create a new globally unique inbox which can be used for replies.
764 ///
765 /// # Examples
766 ///
767 /// ```no_run
768 /// # #[tokio::main]
769 /// # async fn main() -> Result<(), async_nats::Error> {
770 /// # let mut nc = async_nats::connect("demo.nats.io").await?;
771 /// let reply = nc.new_inbox();
772 /// let rsub = nc.subscribe(reply).await?;
773 /// # Ok(())
774 /// # }
775 /// ```
776 pub fn new_inbox(&self) -> String {
777 format!("{}.{}", self.inbox_prefix, crate::id_generator::next())
778 }
779
780 /// Subscribes to a subject to receive [messages][Message].
781 ///
782 /// Returns an error if the subject is invalid (empty, contains whitespace,
783 /// or has malformed dot structure). This validation always runs regardless
784 /// of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
785 ///
786 /// # Examples
787 ///
788 /// ```no_run
789 /// # #[tokio::main]
790 /// # async fn main() -> Result<(), async_nats::Error> {
791 /// use futures_util::StreamExt;
792 /// let client = async_nats::connect("demo.nats.io").await?;
793 /// let mut subscription = client.subscribe("events.>").await?;
794 /// while let Some(message) = subscription.next().await {
795 /// println!("received message: {:?}", message);
796 /// }
797 /// # Ok(())
798 /// # }
799 /// ```
800 pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
801 let subject = self
802 .validate_subscribe_subject(subject)
803 .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
804
805 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
806 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
807
808 self.sender
809 .send(Command::Subscribe {
810 sid,
811 subject,
812 queue_group: None,
813 sender,
814 })
815 .await?;
816
817 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
818 }
819
820 /// Subscribes to a subject with a queue group to receive [messages][Message].
821 ///
822 /// Returns an error if the subject is invalid (empty, contains whitespace,
823 /// or has malformed dot structure) or if the queue group name is invalid
824 /// (empty or contains whitespace). Subject and queue group validation always
825 /// runs regardless of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
826 ///
827 /// # Examples
828 ///
829 /// ```no_run
830 /// # #[tokio::main]
831 /// # async fn main() -> Result<(), async_nats::Error> {
832 /// use futures_util::StreamExt;
833 /// let client = async_nats::connect("demo.nats.io").await?;
834 /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
835 /// while let Some(message) = subscription.next().await {
836 /// println!("received message: {:?}", message);
837 /// }
838 /// # Ok(())
839 /// # }
840 /// ```
841 pub async fn queue_subscribe<S: ToSubject>(
842 &self,
843 subject: S,
844 queue_group: String,
845 ) -> Result<Subscriber, SubscribeError> {
846 let subject = self
847 .validate_subscribe_subject(subject)
848 .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
849
850 if !crate::is_valid_queue_group(&queue_group) {
851 return Err(SubscribeError::new(SubscribeErrorKind::InvalidQueueName));
852 }
853
854 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
855 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
856
857 self.sender
858 .send(Command::Subscribe {
859 sid,
860 subject,
861 queue_group: Some(queue_group),
862 sender,
863 })
864 .await?;
865
866 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
867 }
868
869 /// Flushes the internal buffer ensuring that all messages are sent.
870 ///
871 /// # Examples
872 ///
873 /// ```no_run
874 /// # #[tokio::main]
875 /// # async fn main() -> Result<(), async_nats::Error> {
876 /// let client = async_nats::connect("demo.nats.io").await?;
877 /// client.flush().await?;
878 /// # Ok(())
879 /// # }
880 /// ```
881 pub async fn flush(&self) -> Result<(), FlushError> {
882 let (tx, rx) = tokio::sync::oneshot::channel();
883 self.sender
884 .send(Command::Flush { observer: tx })
885 .await
886 .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
887
888 rx.await
889 .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
890 Ok(())
891 }
892
893 /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
894 /// messages, then closes the connection. Once completed, any streams associated with the
895 /// connection and its [Clients](crate::Client) will be closed, and further [crate::Client] commands will fail.
896 ///
897 /// # Examples
898 ///
899 /// ```no_run
900 /// # #[tokio::main]
901 /// # async fn main() -> Result<(), async_nats::Error> {
902 /// use futures_util::StreamExt;
903 /// let client = async_nats::connect("demo.nats.io").await?;
904 /// let mut subscription = client.subscribe("events.>").await?;
905 ///
906 /// client.drain().await?;
907 ///
908 /// # // existing subscriptions are closed and further commands will fail
909 /// assert!(subscription.next().await.is_none());
910 /// client
911 /// .subscribe("events.>")
912 /// .await
913 /// .expect_err("Expected further commands to fail");
914 ///
915 /// # Ok(())
916 /// # }
917 /// ```
918 pub async fn drain(&self) -> Result<(), DrainError> {
919 // Drain all subscriptions
920 self.sender.send(Command::Drain { sid: None }).await?;
921
922 // Remaining process is handled on the handler-side
923 Ok(())
924 }
925
926 /// Returns the current state of the connection.
927 ///
928 /// # Examples
929 /// ```no_run
930 /// # #[tokio::main]
931 /// # async fn main() -> Result<(), async_nats::Error> {
932 /// let client = async_nats::connect("demo.nats.io").await?;
933 /// println!("connection state: {}", client.connection_state());
934 /// # Ok(())
935 /// # }
936 /// ```
937 pub fn connection_state(&self) -> State {
938 self.state.borrow().to_owned()
939 }
940
941 /// Forces the client to reconnect.
942 /// Keep in mind that client will reconnect automatically if the connection is lost and this
943 /// method does not have to be used in normal circumstances.
944 /// However, if you want to force the client to reconnect, for example to re-trigger
945 /// the `auth-callback`, or manually rebalance connections, this method can be useful.
946 /// This method does not wait for connection to be re-established.
947 ///
948 /// # Examples
949 /// ```no_run
950 /// # #[tokio::main]
951 /// # async fn main() -> Result<(), async_nats::Error> {
952 /// let client = async_nats::connect("demo.nats.io").await?;
953 /// client.force_reconnect().await?;
954 /// # Ok(())
955 /// # }
956 /// ```
957 pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
958 self.sender
959 .send(Command::Reconnect)
960 .await
961 .map_err(Into::into)
962 }
963
964 /// Replaces the server pool used for reconnection attempts.
965 ///
966 /// The new pool takes effect on the next reconnect attempt; it does not
967 /// trigger an immediate reconnect. To force an immediate reconnect with
968 /// the new pool, call [`Client::force_reconnect`] after this method.
969 ///
970 /// Per-server state (failed attempt count, connection history) is preserved
971 /// for servers that appear in both the old and new pools.
972 ///
973 /// This also resets the global reconnection attempt counter, so any
974 /// progress toward [`ConnectOptions::max_reconnects`](crate::ConnectOptions::max_reconnects)
975 /// is cleared.
976 ///
977 /// # Examples
978 /// ```no_run
979 /// # #[tokio::main]
980 /// # async fn main() -> Result<(), async_nats::Error> {
981 /// let client = async_nats::connect("demo.nats.io").await?;
982 /// client
983 /// .set_server_pool(["nats://server1:4222", "nats://server2:4222"].as_slice())
984 /// .await?;
985 /// // Optionally force reconnect to apply immediately:
986 /// client.force_reconnect().await?;
987 /// # Ok(())
988 /// # }
989 /// ```
990 pub async fn set_server_pool<A: crate::ToServerAddrs>(
991 &self,
992 addrs: A,
993 ) -> Result<(), SetServerPoolError> {
994 let servers: Vec<crate::ServerAddr> = addrs
995 .to_server_addrs()
996 .map_err(|err| {
997 SetServerPoolError::with_source(SetServerPoolErrorKind::InvalidAddress, err)
998 })?
999 .collect();
1000
1001 if servers.is_empty() {
1002 return Err(SetServerPoolError::new(SetServerPoolErrorKind::EmptyPool));
1003 }
1004
1005 let (tx, rx) = oneshot::channel();
1006 self.sender
1007 .send(Command::SetServerPool {
1008 servers,
1009 result: tx,
1010 })
1011 .await
1012 .map_err(|err| SetServerPoolError::with_source(SetServerPoolErrorKind::Send, err))?;
1013
1014 rx.await
1015 .map_err(|err| SetServerPoolError::with_source(SetServerPoolErrorKind::Send, err))?
1016 .map_err(|err| {
1017 SetServerPoolError::with_source(SetServerPoolErrorKind::MixedSchemes, err)
1018 })
1019 }
1020
1021 /// Returns a snapshot of the current server pool.
1022 ///
1023 /// The returned list includes both explicitly configured and discovered
1024 /// servers, along with per-server metadata such as reconnect count and
1025 /// connection history.
1026 ///
1027 /// # Examples
1028 /// ```no_run
1029 /// # #[tokio::main]
1030 /// # async fn main() -> Result<(), async_nats::Error> {
1031 /// let client = async_nats::connect("demo.nats.io").await?;
1032 /// let pool = client.server_pool().await?;
1033 /// for server in &pool {
1034 /// println!(
1035 /// "{:?}: {} failed attempts",
1036 /// server.addr, server.failed_attempts
1037 /// );
1038 /// }
1039 /// # Ok(())
1040 /// # }
1041 /// ```
1042 pub async fn server_pool(&self) -> Result<Vec<crate::Server>, ServerPoolError> {
1043 let (tx, rx) = oneshot::channel();
1044 self.sender
1045 .send(Command::ServerPool { result: tx })
1046 .await
1047 .map_err(|err| ServerPoolError::with_source(ServerPoolErrorKind::Send, err))?;
1048
1049 rx.await
1050 .map_err(|err| ServerPoolError::with_source(ServerPoolErrorKind::Send, err))
1051 }
1052
1053 /// Returns struct representing statistics of the whole lifecycle of the client.
1054 /// This includes number of bytes sent/received, number of messages sent/received,
1055 /// and number of times the connection was established.
1056 /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
1057 /// across threads.
1058 ///
1059 /// # Examples
1060 /// ```no_run
1061 /// # #[tokio::main]
1062 /// # async fn main() -> Result<(), async_nats::Error> {
1063 /// let client = async_nats::connect("demo.nats.io").await?;
1064 /// let statistics = client.statistics();
1065 /// println!("client statistics: {:#?}", statistics);
1066 /// # Ok(())
1067 /// # }
1068 /// ```
1069 pub fn statistics(&self) -> Arc<Statistics> {
1070 self.connection_stats.clone()
1071 }
1072}
1073
1074/// Used for building customized requests.
1075#[derive(Default)]
1076pub struct Request {
1077 pub payload: Option<Bytes>,
1078 pub headers: Option<HeaderMap>,
1079 pub timeout: Option<Option<Duration>>,
1080 pub inbox: Option<String>,
1081}
1082
1083impl Request {
1084 pub fn new() -> Request {
1085 Default::default()
1086 }
1087
1088 /// Sets the payload of the request. If not used, empty payload will be sent.
1089 ///
1090 /// # Examples
1091 /// ```no_run
1092 /// # #[tokio::main]
1093 /// # async fn main() -> Result<(), async_nats::Error> {
1094 /// let client = async_nats::connect("demo.nats.io").await?;
1095 /// let request = async_nats::Request::new().payload("data".into());
1096 /// client.send_request("service", request).await?;
1097 /// # Ok(())
1098 /// # }
1099 /// ```
1100 pub fn payload(mut self, payload: Bytes) -> Request {
1101 self.payload = Some(payload);
1102 self
1103 }
1104
1105 /// Sets the headers of the requests.
1106 ///
1107 /// # Examples
1108 /// ```no_run
1109 /// # #[tokio::main]
1110 /// # async fn main() -> Result<(), async_nats::Error> {
1111 /// use std::str::FromStr;
1112 /// let client = async_nats::connect("demo.nats.io").await?;
1113 /// let mut headers = async_nats::HeaderMap::new();
1114 /// headers.insert(
1115 /// "X-Example",
1116 /// async_nats::HeaderValue::from_str("Value").unwrap(),
1117 /// );
1118 /// let request = async_nats::Request::new()
1119 /// .headers(headers)
1120 /// .payload("data".into());
1121 /// client.send_request("service", request).await?;
1122 /// # Ok(())
1123 /// # }
1124 /// ```
1125 pub fn headers(mut self, headers: HeaderMap) -> Request {
1126 self.headers = Some(headers);
1127 self
1128 }
1129
1130 /// Sets the custom timeout of the request. Overrides default [Client] timeout.
1131 /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
1132 /// To use default timeout, simply do not call this function.
1133 ///
1134 /// # Examples
1135 /// ```no_run
1136 /// # #[tokio::main]
1137 /// # async fn main() -> Result<(), async_nats::Error> {
1138 /// let client = async_nats::connect("demo.nats.io").await?;
1139 /// let request = async_nats::Request::new()
1140 /// .timeout(Some(std::time::Duration::from_secs(15)))
1141 /// .payload("data".into());
1142 /// client.send_request("service", request).await?;
1143 /// # Ok(())
1144 /// # }
1145 /// ```
1146 pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
1147 self.timeout = Some(timeout);
1148 self
1149 }
1150
1151 /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
1152 ///
1153 /// # Examples
1154 /// ```no_run
1155 /// # #[tokio::main]
1156 /// # async fn main() -> Result<(), async_nats::Error> {
1157 /// use std::str::FromStr;
1158 /// let client = async_nats::connect("demo.nats.io").await?;
1159 /// let request = async_nats::Request::new()
1160 /// .inbox("custom_inbox".into())
1161 /// .payload("data".into());
1162 /// client.send_request("service", request).await?;
1163 /// # Ok(())
1164 /// # }
1165 /// ```
1166 pub fn inbox(mut self, inbox: String) -> Request {
1167 self.inbox = Some(inbox);
1168 self
1169 }
1170}
1171
1172#[derive(Error, Debug)]
1173#[error("failed to send reconnect: {0}")]
1174pub struct ReconnectError(#[source] crate::Error);
1175
1176impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
1177 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1178 ReconnectError(Box::new(err))
1179 }
1180}
1181
1182/// An error returned from [`Client::set_server_pool`].
1183pub type SetServerPoolError = Error<SetServerPoolErrorKind>;
1184
1185#[derive(Copy, Clone, Debug, PartialEq)]
1186pub enum SetServerPoolErrorKind {
1187 /// Failed to send command to the connection handler.
1188 Send,
1189 /// One or more server addresses could not be parsed.
1190 InvalidAddress,
1191 /// The pool contains a mix of WebSocket (`ws://`, `wss://`) and
1192 /// non-websocket (`nats://`, `tls://`) URLs, which is not allowed.
1193 MixedSchemes,
1194 /// The server pool cannot be empty.
1195 EmptyPool,
1196}
1197
1198impl Display for SetServerPoolErrorKind {
1199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1200 match self {
1201 Self::Send => write!(f, "failed to send set_server_pool command"),
1202 Self::InvalidAddress => write!(f, "invalid server address"),
1203 Self::EmptyPool => write!(f, "server pool cannot be empty"),
1204 Self::MixedSchemes => write!(
1205 f,
1206 "cannot mix websocket and non-websocket URLs in server pool"
1207 ),
1208 }
1209 }
1210}
1211
1212/// An error returned from [`Client::server_pool`].
1213pub type ServerPoolError = Error<ServerPoolErrorKind>;
1214
1215#[derive(Copy, Clone, Debug, PartialEq)]
1216pub enum ServerPoolErrorKind {
1217 /// Failed to send command to the connection handler.
1218 Send,
1219}
1220
1221impl Display for ServerPoolErrorKind {
1222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1223 match self {
1224 Self::Send => write!(f, "failed to send server_pool command"),
1225 }
1226 }
1227}
1228
1229/// An error returned from the [`Client::subscribe`] or [`Client::queue_subscribe`] functions.
1230pub type SubscribeError = Error<SubscribeErrorKind>;
1231
1232impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
1233 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1234 SubscribeError::with_source(SubscribeErrorKind::Other, err)
1235 }
1236}
1237
1238#[derive(Copy, Clone, Debug, PartialEq)]
1239pub enum SubscribeErrorKind {
1240 /// The subject is invalid (empty, contains whitespace, or has malformed dot structure).
1241 InvalidSubject,
1242 /// The queue group name is invalid (empty or contains whitespace).
1243 InvalidQueueName,
1244 /// Other errors, client/io related.
1245 Other,
1246}
1247
1248impl Display for SubscribeErrorKind {
1249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1250 match self {
1251 Self::InvalidSubject => write!(f, "invalid subject"),
1252 Self::InvalidQueueName => write!(f, "invalid queue name"),
1253 Self::Other => write!(f, "subscribe failed"),
1254 }
1255 }
1256}
1257
1258#[derive(Error, Debug)]
1259#[error("failed to send drain: {0}")]
1260pub struct DrainError(#[source] crate::Error);
1261
1262impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
1263 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1264 DrainError(Box::new(err))
1265 }
1266}
1267
1268#[derive(Clone, Copy, Debug, PartialEq)]
1269pub enum RequestErrorKind {
1270 /// There are services listening on requested subject, but they didn't respond
1271 /// in time.
1272 TimedOut,
1273 /// No one is listening on request subject.
1274 NoResponders,
1275 /// The subject is invalid (empty or contains whitespace).
1276 InvalidSubject,
1277 /// The payload (plus headers) exceeds the server's `max_payload`.
1278 MaxPayloadExceeded,
1279 /// Other errors, client/io related.
1280 Other,
1281}
1282
1283impl Display for RequestErrorKind {
1284 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1285 match self {
1286 Self::TimedOut => write!(f, "request timed out"),
1287 Self::NoResponders => write!(f, "no responders"),
1288 Self::InvalidSubject => write!(f, "invalid subject"),
1289 Self::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
1290 Self::Other => write!(f, "request failed"),
1291 }
1292 }
1293}
1294
1295/// Error returned when a core NATS request fails.
1296/// To be enumerate over the variants, call [RequestError::kind].
1297pub type RequestError = Error<RequestErrorKind>;
1298
1299impl From<PublishError> for RequestError {
1300 fn from(e: PublishError) -> Self {
1301 RequestError::with_source(RequestErrorKind::Other, e)
1302 }
1303}
1304
1305impl From<SubscribeError> for RequestError {
1306 fn from(e: SubscribeError) -> Self {
1307 RequestError::with_source(RequestErrorKind::Other, e)
1308 }
1309}
1310
1311#[derive(Clone, Copy, Debug, PartialEq)]
1312pub enum FlushErrorKind {
1313 /// Sending the flush failed client side.
1314 SendError,
1315 /// Flush failed.
1316 /// This can happen mostly in case of connection issues
1317 /// that cannot be resolved quickly.
1318 FlushError,
1319}
1320
1321impl Display for FlushErrorKind {
1322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1323 match self {
1324 Self::SendError => write!(f, "failed to send flush request"),
1325 Self::FlushError => write!(f, "flush failed"),
1326 }
1327 }
1328}
1329
1330pub type FlushError = Error<FlushErrorKind>;
1331
1332/// Represents statistics for the instance of the client throughout its lifecycle.
1333#[derive(Default, Debug)]
1334pub struct Statistics {
1335 /// Number of bytes received. This does not include the protocol overhead.
1336 pub in_bytes: AtomicU64,
1337 /// Number of bytes sent. This doe not include the protocol overhead.
1338 pub out_bytes: AtomicU64,
1339 /// Number of messages received.
1340 pub in_messages: AtomicU64,
1341 /// Number of messages sent.
1342 pub out_messages: AtomicU64,
1343 /// Number of times connection was established.
1344 /// Initial connect will be counted as well, then all successful reconnects.
1345 pub connects: AtomicU64,
1346}