Skip to main content

async_nats/
client.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::message::OutboundMessage;
20use crate::subject::ToSubject;
21use crate::ServerInfo;
22
23use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
24use crate::error::Error;
25use bytes::Bytes;
26use futures_util::future::TryFutureExt;
27use futures_util::{Sink, SinkExt as _, StreamExt};
28use portable_atomic::AtomicU64;
29use regex::Regex;
30use std::fmt::Display;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::sync::Arc;
33use std::sync::LazyLock;
34use std::time::Duration;
35use thiserror::Error;
36use tokio::sync::{mpsc, oneshot};
37use tokio_util::sync::PollSender;
38
39static VERSION_RE: LazyLock<Regex> =
40    LazyLock::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
41
42/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
43/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
44pub type PublishError = Error<PublishErrorKind>;
45
46impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
47    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
48        PublishError::with_source(PublishErrorKind::Send, err)
49    }
50}
51
52impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
53    fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
54        PublishError::with_source(PublishErrorKind::Send, err)
55    }
56}
57
58#[derive(Copy, Clone, Debug, PartialEq)]
59pub enum PublishErrorKind {
60    MaxPayloadExceeded,
61    InvalidSubject,
62    Send,
63}
64
65impl Display for PublishErrorKind {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        match self {
68            PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
69            PublishErrorKind::Send => write!(f, "failed to send message"),
70            PublishErrorKind::InvalidSubject => write!(f, "invalid subject"),
71        }
72    }
73}
74
75/// Client is a `Cloneable` handle to NATS connection.
76/// Client should not be created directly. Instead, one of two methods can be used:
77/// [crate::connect] and [crate::ConnectOptions::connect]
78#[derive(Clone, Debug)]
79pub struct Client {
80    info: tokio::sync::watch::Receiver<Option<ServerInfo>>,
81    pub(crate) state: tokio::sync::watch::Receiver<State>,
82    pub(crate) sender: mpsc::Sender<Command>,
83    poll_sender: PollSender<Command>,
84    next_subscription_id: Arc<AtomicU64>,
85    subscription_capacity: usize,
86    inbox_prefix: Arc<str>,
87    request_timeout: Option<Duration>,
88    max_payload: Arc<AtomicUsize>,
89    connection_stats: Arc<Statistics>,
90    skip_subject_validation: bool,
91}
92
93pub mod traits {
94    use std::{future::Future, time::Duration};
95
96    use bytes::Bytes;
97
98    use crate::{message, subject::ToSubject, Message};
99
100    use super::{PublishError, Request, RequestError, SubscribeError};
101
102    pub trait Publisher {
103        fn publish_with_reply<S, R>(
104            &self,
105            subject: S,
106            reply: R,
107            payload: Bytes,
108        ) -> impl Future<Output = Result<(), PublishError>>
109        where
110            S: ToSubject,
111            R: ToSubject;
112
113        fn publish_message(
114            &self,
115            msg: message::OutboundMessage,
116        ) -> impl Future<Output = Result<(), PublishError>>;
117    }
118    pub trait Subscriber {
119        fn subscribe<S>(
120            &self,
121            subject: S,
122        ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>
123        where
124            S: ToSubject;
125    }
126    pub trait Requester {
127        fn send_request<S>(
128            &self,
129            subject: S,
130            request: Request,
131        ) -> impl Future<Output = Result<Message, RequestError>>
132        where
133            S: ToSubject;
134    }
135    pub trait TimeoutProvider {
136        fn timeout(&self) -> Option<Duration>;
137    }
138}
139
140impl traits::Requester for Client {
141    fn send_request<S: ToSubject>(
142        &self,
143        subject: S,
144        request: Request,
145    ) -> impl Future<Output = Result<Message, RequestError>> {
146        self.send_request(subject, request)
147    }
148}
149
150impl traits::TimeoutProvider for Client {
151    fn timeout(&self) -> Option<Duration> {
152        self.timeout()
153    }
154}
155
156impl traits::Publisher for Client {
157    fn publish_with_reply<S, R>(
158        &self,
159        subject: S,
160        reply: R,
161        payload: Bytes,
162    ) -> impl Future<Output = Result<(), PublishError>>
163    where
164        S: ToSubject,
165        R: ToSubject,
166    {
167        self.publish_with_reply(subject, reply, payload)
168    }
169
170    async fn publish_message(&self, msg: OutboundMessage) -> Result<(), PublishError> {
171        if msg.subject.is_empty()
172            || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&msg.subject))
173        {
174            return Err(PublishError::with_source(
175                PublishErrorKind::InvalidSubject,
176                crate::subject::SubjectError::InvalidFormat,
177            ));
178        }
179        self.sender
180            .send(Command::Publish(msg))
181            .await
182            .map_err(Into::into)
183    }
184}
185
186impl traits::Subscriber for Client {
187    fn subscribe<S>(&self, subject: S) -> impl Future<Output = Result<Subscriber, SubscribeError>>
188    where
189        S: ToSubject,
190    {
191        self.subscribe(subject)
192    }
193}
194
195impl Sink<OutboundMessage> for Client {
196    type Error = PublishError;
197
198    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199        self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
200    }
201
202    fn start_send(mut self: Pin<&mut Self>, msg: OutboundMessage) -> Result<(), Self::Error> {
203        self.poll_sender
204            .start_send_unpin(Command::Publish(msg))
205            .map_err(Into::into)
206    }
207
208    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
209        self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
210    }
211
212    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
213        self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
214    }
215}
216
217impl Client {
218    #[allow(clippy::too_many_arguments)]
219    pub(crate) fn new(
220        info: tokio::sync::watch::Receiver<Option<ServerInfo>>,
221        state: tokio::sync::watch::Receiver<State>,
222        sender: mpsc::Sender<Command>,
223        capacity: usize,
224        inbox_prefix: String,
225        request_timeout: Option<Duration>,
226        max_payload: Arc<AtomicUsize>,
227        statistics: Arc<Statistics>,
228        skip_subject_validation: bool,
229    ) -> Client {
230        let poll_sender = PollSender::new(sender.clone());
231        Client {
232            info,
233            state,
234            sender,
235            poll_sender,
236            next_subscription_id: Arc::new(AtomicU64::new(1)),
237            subscription_capacity: capacity,
238            inbox_prefix: inbox_prefix.into(),
239            request_timeout,
240            max_payload,
241            connection_stats: statistics,
242            skip_subject_validation,
243        }
244    }
245
246    /// Validates a subject for publishing (protocol-framing safety only).
247    /// Checks for empty and whitespace. Does not check dot structure.
248    pub(crate) fn maybe_validate_publish_subject<S: ToSubject>(
249        &self,
250        subject: S,
251    ) -> Result<crate::Subject, crate::subject::SubjectError> {
252        let subject = subject.to_subject();
253        if subject.is_empty()
254            || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&subject))
255        {
256            return Err(crate::subject::SubjectError::InvalidFormat);
257        }
258        Ok(subject)
259    }
260
261    /// Validates a subject for subscribing (protocol-framing + dot structure).
262    /// Always runs regardless of `skip_subject_validation` (matches Go/Java behavior).
263    pub(crate) fn validate_subscribe_subject<S: ToSubject>(
264        &self,
265        subject: S,
266    ) -> Result<crate::Subject, crate::subject::SubjectError> {
267        let subject = subject.to_subject();
268        if !subject.is_valid() {
269            return Err(crate::subject::SubjectError::InvalidFormat);
270        }
271        Ok(subject)
272    }
273
274    /// Returns the default timeout for requests set when creating the client.
275    ///
276    /// # Examples
277    /// ```no_run
278    /// # #[tokio::main]
279    /// # async fn main() -> Result<(), async_nats::Error> {
280    /// let client = async_nats::connect("demo.nats.io").await?;
281    /// println!("default request timeout: {:?}", client.timeout());
282    /// # Ok(())
283    /// # }
284    /// ```
285    pub fn timeout(&self) -> Option<Duration> {
286        self.request_timeout
287    }
288
289    /// Returns the last received info from the server if one has been observed.
290    ///
291    /// This is useful when [`ConnectOptions::retry_on_initial_connect`][crate::ConnectOptions::retry_on_initial_connect]
292    /// is enabled and the client is returned before the first server `INFO`
293    /// message arrives.
294    ///
295    /// # Examples
296    ///
297    /// ```no_run
298    /// # #[tokio::main]
299    /// # async fn main () -> Result<(), async_nats::Error> {
300    /// let client = async_nats::ConnectOptions::new()
301    ///     .retry_on_initial_connect()
302    ///     .connect("demo.nats.io")
303    ///     .await?;
304    /// println!("info available: {}", client.try_server_info().is_some());
305    /// # Ok(())
306    /// # }
307    /// ```
308    pub fn try_server_info(&self) -> Option<ServerInfo> {
309        // We ignore notifying the watcher, as that requires mutable client reference.
310        self.info.borrow().clone()
311    }
312
313    /// Returns last received info from the server.
314    ///
315    /// # Examples
316    ///
317    /// ```no_run
318    /// # #[tokio::main]
319    /// # async fn main () -> Result<(), async_nats::Error> {
320    /// let client = async_nats::connect("demo.nats.io").await?;
321    /// println!("info: {:?}", client.server_info());
322    /// # Ok(())
323    /// # }
324    /// ```
325    pub fn server_info(&self) -> ServerInfo {
326        self.try_server_info().unwrap_or_default()
327    }
328
329    /// Returns the maximum payload size currently used by the client.
330    ///
331    /// Before the first server `INFO` message arrives, this returns the default
332    /// server payload limit used for client-side publish validation.
333    ///
334    /// # Examples
335    ///
336    /// ```no_run
337    /// # #[tokio::main]
338    /// # async fn main () -> Result<(), async_nats::Error> {
339    /// let client = async_nats::connect("demo.nats.io").await?;
340    /// println!("max payload: {:?}", client.max_payload());
341    /// # Ok(())
342    /// # }
343    /// ```
344    pub fn max_payload(&self) -> usize {
345        self.max_payload.load(Ordering::Relaxed)
346    }
347
348    /// Returns true if the server version is compatible with the version components.
349    ///
350    /// This has to be used with caution, as it is not guaranteed that the server
351    /// that client is connected to is the same version that the one that is
352    /// a JetStream meta/stream/consumer leader, especially across leafnodes.
353    ///
354    /// # Examples
355    ///
356    /// ```no_run
357    /// # #[tokio::main]
358    /// # async fn main() -> Result<(), async_nats::Error> {
359    /// let client = async_nats::connect("demo.nats.io").await?;
360    /// assert!(client.is_server_compatible(2, 8, 4));
361    /// # Ok(())
362    /// # }
363    /// ```
364    pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
365        let info = self.server_info();
366
367        let server_version_captures = match VERSION_RE.captures(&info.version) {
368            Some(captures) => captures,
369            None => return false,
370        };
371
372        let server_major = server_version_captures
373            .get(1)
374            .map(|m| m.as_str().parse::<i64>().unwrap())
375            .unwrap();
376
377        let server_minor = server_version_captures
378            .get(2)
379            .map(|m| m.as_str().parse::<i64>().unwrap())
380            .unwrap();
381
382        let server_patch = server_version_captures
383            .get(3)
384            .map(|m| m.as_str().parse::<i64>().unwrap())
385            .unwrap();
386
387        if server_major < major
388            || (server_major == major && server_minor < minor)
389            || (server_major == major && server_minor == minor && server_patch < patch)
390        {
391            return false;
392        }
393        true
394    }
395
396    /// Publish a [Message] to a given subject.
397    ///
398    /// Returns `PublishErrorKind::InvalidSubject` if the subject is invalid
399    /// (empty or contains whitespace). This check can be disabled with
400    /// [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation] (empty subjects are
401    /// always rejected).
402    ///
403    /// # Examples
404    /// ```no_run
405    /// # #[tokio::main]
406    /// # async fn main() -> Result<(), async_nats::Error> {
407    /// let client = async_nats::connect("demo.nats.io").await?;
408    /// client.publish("events.data", "payload".into()).await?;
409    /// # Ok(())
410    /// # }
411    /// ```
412    pub async fn publish<S: ToSubject>(
413        &self,
414        subject: S,
415        payload: Bytes,
416    ) -> Result<(), PublishError> {
417        let subject = self
418            .maybe_validate_publish_subject(subject)
419            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
420
421        let max_payload = self.max_payload.load(Ordering::Relaxed);
422        if payload.len() > max_payload {
423            return Err(PublishError::with_source(
424                PublishErrorKind::MaxPayloadExceeded,
425                format!(
426                    "Payload size limit of {} exceeded by message size of {}",
427                    max_payload,
428                    payload.len(),
429                ),
430            ));
431        }
432
433        self.sender
434            .send(Command::Publish(OutboundMessage {
435                subject,
436                payload,
437                reply: None,
438                headers: None,
439            }))
440            .await?;
441        Ok(())
442    }
443
444    /// Publish a [Message] with headers to a given subject.
445    ///
446    /// # Examples
447    /// ```
448    /// # #[tokio::main]
449    /// # async fn main() -> Result<(), async_nats::Error> {
450    /// use std::str::FromStr;
451    /// let client = async_nats::connect("demo.nats.io").await?;
452    /// let mut headers = async_nats::HeaderMap::new();
453    /// headers.insert(
454    ///     "X-Header",
455    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
456    /// );
457    /// client
458    ///     .publish_with_headers("events.data", headers, "payload".into())
459    ///     .await?;
460    /// # Ok(())
461    /// # }
462    /// ```
463    pub async fn publish_with_headers<S: ToSubject>(
464        &self,
465        subject: S,
466        headers: HeaderMap,
467        payload: Bytes,
468    ) -> Result<(), PublishError> {
469        let subject = self
470            .maybe_validate_publish_subject(subject)
471            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
472
473        self.sender
474            .send(Command::Publish(OutboundMessage {
475                subject,
476                payload,
477                reply: None,
478                headers: Some(headers),
479            }))
480            .await?;
481        Ok(())
482    }
483
484    /// Publish a [Message] to a given subject, with specified response subject
485    /// to which the subscriber can respond.
486    /// This method does not await for the response.
487    ///
488    /// # Examples
489    ///
490    /// ```no_run
491    /// # #[tokio::main]
492    /// # async fn main() -> Result<(), async_nats::Error> {
493    /// let client = async_nats::connect("demo.nats.io").await?;
494    /// client
495    ///     .publish_with_reply("events.data", "reply_subject", "payload".into())
496    ///     .await?;
497    /// # Ok(())
498    /// # }
499    /// ```
500    pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
501        &self,
502        subject: S,
503        reply: R,
504        payload: Bytes,
505    ) -> Result<(), PublishError> {
506        let subject = self
507            .maybe_validate_publish_subject(subject)
508            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
509        let reply = self
510            .maybe_validate_publish_subject(reply)
511            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
512
513        self.sender
514            .send(Command::Publish(OutboundMessage {
515                subject,
516                payload,
517                reply: Some(reply),
518                headers: None,
519            }))
520            .await?;
521        Ok(())
522    }
523
524    /// Publish a [Message] to a given subject with headers and specified response subject
525    /// to which the subscriber can respond.
526    /// This method does not await for the response.
527    ///
528    /// # Examples
529    ///
530    /// ```no_run
531    /// # #[tokio::main]
532    /// # async fn main() -> Result<(), async_nats::Error> {
533    /// use std::str::FromStr;
534    /// let client = async_nats::connect("demo.nats.io").await?;
535    /// let mut headers = async_nats::HeaderMap::new();
536    /// client
537    ///     .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
538    ///     .await?;
539    /// # Ok(())
540    /// # }
541    /// ```
542    pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
543        &self,
544        subject: S,
545        reply: R,
546        headers: HeaderMap,
547        payload: Bytes,
548    ) -> Result<(), PublishError> {
549        let subject = self
550            .maybe_validate_publish_subject(subject)
551            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
552        let reply = self
553            .maybe_validate_publish_subject(reply)
554            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
555
556        self.sender
557            .send(Command::Publish(OutboundMessage {
558                subject,
559                payload,
560                reply: Some(reply),
561                headers: Some(headers),
562            }))
563            .await?;
564        Ok(())
565    }
566
567    /// Sends the request with headers.
568    ///
569    /// # Examples
570    /// ```no_run
571    /// # #[tokio::main]
572    /// # async fn main() -> Result<(), async_nats::Error> {
573    /// let client = async_nats::connect("demo.nats.io").await?;
574    /// let response = client.request("service", "data".into()).await?;
575    /// # Ok(())
576    /// # }
577    /// ```
578    pub async fn request<S: ToSubject>(
579        &self,
580        subject: S,
581        payload: Bytes,
582    ) -> Result<Message, RequestError> {
583        let request = Request::new().payload(payload);
584        self.send_request(subject, request).await
585    }
586
587    /// Sends the request with headers.
588    ///
589    /// # Examples
590    /// ```no_run
591    /// # #[tokio::main]
592    /// # async fn main() -> Result<(), async_nats::Error> {
593    /// let client = async_nats::connect("demo.nats.io").await?;
594    /// let mut headers = async_nats::HeaderMap::new();
595    /// headers.insert("Key", "Value");
596    /// let response = client
597    ///     .request_with_headers("service", headers, "data".into())
598    ///     .await?;
599    /// # Ok(())
600    /// # }
601    /// ```
602    pub async fn request_with_headers<S: ToSubject>(
603        &self,
604        subject: S,
605        headers: HeaderMap,
606        payload: Bytes,
607    ) -> Result<Message, RequestError> {
608        let request = Request::new().headers(headers).payload(payload);
609        self.send_request(subject, request).await
610    }
611
612    /// Sends the request created by the [Request].
613    ///
614    /// Returns `RequestErrorKind::InvalidSubject` if the subject is invalid
615    /// (empty or contains whitespace).
616    ///
617    /// # Examples
618    ///
619    /// ```no_run
620    /// # #[tokio::main]
621    /// # async fn main() -> Result<(), async_nats::Error> {
622    /// let client = async_nats::connect("demo.nats.io").await?;
623    /// let request = async_nats::Request::new().payload("data".into());
624    /// let response = client.send_request("service", request).await?;
625    /// # Ok(())
626    /// # }
627    /// ```
628    pub async fn send_request<S: ToSubject>(
629        &self,
630        subject: S,
631        request: Request,
632    ) -> Result<Message, RequestError> {
633        let subject = self
634            .maybe_validate_publish_subject(subject)
635            .map_err(|e| RequestError::with_source(RequestErrorKind::InvalidSubject, e))?;
636
637        if let Some(inbox) = request.inbox {
638            let timeout = request.timeout.unwrap_or(self.request_timeout);
639            let mut subscriber = self.subscribe(inbox.clone()).await?;
640            let payload: Bytes = request.payload.unwrap_or_default();
641            match request.headers {
642                Some(headers) => {
643                    self.publish_with_reply_and_headers(subject, inbox, headers, payload)
644                        .await?
645                }
646                None => self.publish_with_reply(subject, inbox, payload).await?,
647            }
648            let request = match timeout {
649                Some(timeout) => {
650                    tokio::time::timeout(timeout, subscriber.next())
651                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
652                        .await?
653                }
654                None => subscriber.next().await,
655            };
656            match request {
657                Some(message) => {
658                    if message.status == Some(StatusCode::NO_RESPONDERS) {
659                        return Err(RequestError::with_source(
660                            RequestErrorKind::NoResponders,
661                            "no responders",
662                        ));
663                    }
664                    Ok(message)
665                }
666                None => Err(RequestError::with_source(
667                    RequestErrorKind::Other,
668                    "broken pipe",
669                )),
670            }
671        } else {
672            let (sender, receiver) = oneshot::channel();
673
674            let payload = request.payload.unwrap_or_default();
675            let respond = self.new_inbox().into();
676            let headers = request.headers;
677
678            self.sender
679                .send(Command::Request {
680                    subject,
681                    payload,
682                    respond,
683                    headers,
684                    sender,
685                })
686                .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
687                .await?;
688
689            let timeout = request.timeout.unwrap_or(self.request_timeout);
690            let request = match timeout {
691                Some(timeout) => {
692                    tokio::time::timeout(timeout, receiver)
693                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
694                        .await?
695                }
696                None => receiver.await,
697            };
698
699            match request {
700                Ok(message) => {
701                    if message.status == Some(StatusCode::NO_RESPONDERS) {
702                        return Err(RequestError::with_source(
703                            RequestErrorKind::NoResponders,
704                            "no responders",
705                        ));
706                    }
707                    Ok(message)
708                }
709                Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
710            }
711        }
712    }
713
714    /// Create a new globally unique inbox which can be used for replies.
715    ///
716    /// # Examples
717    ///
718    /// ```no_run
719    /// # #[tokio::main]
720    /// # async fn main() -> Result<(), async_nats::Error> {
721    /// # let mut nc = async_nats::connect("demo.nats.io").await?;
722    /// let reply = nc.new_inbox();
723    /// let rsub = nc.subscribe(reply).await?;
724    /// # Ok(())
725    /// # }
726    /// ```
727    pub fn new_inbox(&self) -> String {
728        format!("{}.{}", self.inbox_prefix, crate::id_generator::next())
729    }
730
731    /// Subscribes to a subject to receive [messages][Message].
732    ///
733    /// Returns an error if the subject is invalid (empty, contains whitespace,
734    /// or has malformed dot structure). This validation always runs regardless
735    /// of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
736    ///
737    /// # Examples
738    ///
739    /// ```no_run
740    /// # #[tokio::main]
741    /// # async fn main() -> Result<(), async_nats::Error> {
742    /// use futures_util::StreamExt;
743    /// let client = async_nats::connect("demo.nats.io").await?;
744    /// let mut subscription = client.subscribe("events.>").await?;
745    /// while let Some(message) = subscription.next().await {
746    ///     println!("received message: {:?}", message);
747    /// }
748    /// # Ok(())
749    /// # }
750    /// ```
751    pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
752        let subject = self
753            .validate_subscribe_subject(subject)
754            .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
755
756        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
757        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
758
759        self.sender
760            .send(Command::Subscribe {
761                sid,
762                subject,
763                queue_group: None,
764                sender,
765            })
766            .await?;
767
768        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
769    }
770
771    /// Subscribes to a subject with a queue group to receive [messages][Message].
772    ///
773    /// Returns an error if the subject is invalid (empty, contains whitespace,
774    /// or has malformed dot structure) or if the queue group name is invalid
775    /// (empty or contains whitespace). Subject and queue group validation always
776    /// runs regardless of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
777    ///
778    /// # Examples
779    ///
780    /// ```no_run
781    /// # #[tokio::main]
782    /// # async fn main() -> Result<(), async_nats::Error> {
783    /// use futures_util::StreamExt;
784    /// let client = async_nats::connect("demo.nats.io").await?;
785    /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
786    /// while let Some(message) = subscription.next().await {
787    ///     println!("received message: {:?}", message);
788    /// }
789    /// # Ok(())
790    /// # }
791    /// ```
792    pub async fn queue_subscribe<S: ToSubject>(
793        &self,
794        subject: S,
795        queue_group: String,
796    ) -> Result<Subscriber, SubscribeError> {
797        let subject = self
798            .validate_subscribe_subject(subject)
799            .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
800
801        if !crate::is_valid_queue_group(&queue_group) {
802            return Err(SubscribeError::new(SubscribeErrorKind::InvalidQueueName));
803        }
804
805        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
806        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
807
808        self.sender
809            .send(Command::Subscribe {
810                sid,
811                subject,
812                queue_group: Some(queue_group),
813                sender,
814            })
815            .await?;
816
817        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
818    }
819
820    /// Flushes the internal buffer ensuring that all messages are sent.
821    ///
822    /// # Examples
823    ///
824    /// ```no_run
825    /// # #[tokio::main]
826    /// # async fn main() -> Result<(), async_nats::Error> {
827    /// let client = async_nats::connect("demo.nats.io").await?;
828    /// client.flush().await?;
829    /// # Ok(())
830    /// # }
831    /// ```
832    pub async fn flush(&self) -> Result<(), FlushError> {
833        let (tx, rx) = tokio::sync::oneshot::channel();
834        self.sender
835            .send(Command::Flush { observer: tx })
836            .await
837            .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
838
839        rx.await
840            .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
841        Ok(())
842    }
843
844    /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
845    /// messages, then closes the connection. Once completed, any streams associated with the
846    /// connection and its [Clients](crate::Client) will be closed, and further [crate::Client] commands will fail.
847    ///
848    /// # Examples
849    ///
850    /// ```no_run
851    /// # #[tokio::main]
852    /// # async fn main() -> Result<(), async_nats::Error> {
853    /// use futures_util::StreamExt;
854    /// let client = async_nats::connect("demo.nats.io").await?;
855    /// let mut subscription = client.subscribe("events.>").await?;
856    ///
857    /// client.drain().await?;
858    ///
859    /// # // existing subscriptions are closed and further commands will fail
860    /// assert!(subscription.next().await.is_none());
861    /// client
862    ///     .subscribe("events.>")
863    ///     .await
864    ///     .expect_err("Expected further commands to fail");
865    ///
866    /// # Ok(())
867    /// # }
868    /// ```
869    pub async fn drain(&self) -> Result<(), DrainError> {
870        // Drain all subscriptions
871        self.sender.send(Command::Drain { sid: None }).await?;
872
873        // Remaining process is handled on the handler-side
874        Ok(())
875    }
876
877    /// Returns the current state of the connection.
878    ///
879    /// # Examples
880    /// ```no_run
881    /// # #[tokio::main]
882    /// # async fn main() -> Result<(), async_nats::Error> {
883    /// let client = async_nats::connect("demo.nats.io").await?;
884    /// println!("connection state: {}", client.connection_state());
885    /// # Ok(())
886    /// # }
887    /// ```
888    pub fn connection_state(&self) -> State {
889        self.state.borrow().to_owned()
890    }
891
892    /// Forces the client to reconnect.
893    /// Keep in mind that client will reconnect automatically if the connection is lost and this
894    /// method does not have to be used in normal circumstances.
895    /// However, if you want to force the client to reconnect, for example to re-trigger
896    /// the `auth-callback`, or manually rebalance connections, this method can be useful.
897    /// This method does not wait for connection to be re-established.
898    ///
899    /// # Examples
900    /// ```no_run
901    /// # #[tokio::main]
902    /// # async fn main() -> Result<(), async_nats::Error> {
903    /// let client = async_nats::connect("demo.nats.io").await?;
904    /// client.force_reconnect().await?;
905    /// # Ok(())
906    /// # }
907    /// ```
908    pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
909        self.sender
910            .send(Command::Reconnect)
911            .await
912            .map_err(Into::into)
913    }
914
915    /// Replaces the server pool used for reconnection attempts.
916    ///
917    /// The new pool takes effect on the next reconnect attempt; it does not
918    /// trigger an immediate reconnect. To force an immediate reconnect with
919    /// the new pool, call [`Client::force_reconnect`] after this method.
920    ///
921    /// Per-server state (failed attempt count, connection history) is preserved
922    /// for servers that appear in both the old and new pools.
923    ///
924    /// This also resets the global reconnection attempt counter, so any
925    /// progress toward [`ConnectOptions::max_reconnects`](crate::ConnectOptions::max_reconnects)
926    /// is cleared.
927    ///
928    /// # Examples
929    /// ```no_run
930    /// # #[tokio::main]
931    /// # async fn main() -> Result<(), async_nats::Error> {
932    /// let client = async_nats::connect("demo.nats.io").await?;
933    /// client
934    ///     .set_server_pool(["nats://server1:4222", "nats://server2:4222"].as_slice())
935    ///     .await?;
936    /// // Optionally force reconnect to apply immediately:
937    /// client.force_reconnect().await?;
938    /// # Ok(())
939    /// # }
940    /// ```
941    pub async fn set_server_pool<A: crate::ToServerAddrs>(
942        &self,
943        addrs: A,
944    ) -> Result<(), SetServerPoolError> {
945        let servers: Vec<crate::ServerAddr> = addrs
946            .to_server_addrs()
947            .map_err(|err| {
948                SetServerPoolError::with_source(SetServerPoolErrorKind::InvalidAddress, err)
949            })?
950            .collect();
951
952        if servers.is_empty() {
953            return Err(SetServerPoolError::new(SetServerPoolErrorKind::EmptyPool));
954        }
955
956        let (tx, rx) = oneshot::channel();
957        self.sender
958            .send(Command::SetServerPool {
959                servers,
960                result: tx,
961            })
962            .await
963            .map_err(|err| SetServerPoolError::with_source(SetServerPoolErrorKind::Send, err))?;
964
965        rx.await
966            .map_err(|err| SetServerPoolError::with_source(SetServerPoolErrorKind::Send, err))?
967            .map_err(|err| {
968                SetServerPoolError::with_source(SetServerPoolErrorKind::MixedSchemes, err)
969            })
970    }
971
972    /// Returns a snapshot of the current server pool.
973    ///
974    /// The returned list includes both explicitly configured and discovered
975    /// servers, along with per-server metadata such as reconnect count and
976    /// connection history.
977    ///
978    /// # Examples
979    /// ```no_run
980    /// # #[tokio::main]
981    /// # async fn main() -> Result<(), async_nats::Error> {
982    /// let client = async_nats::connect("demo.nats.io").await?;
983    /// let pool = client.server_pool().await?;
984    /// for server in &pool {
985    ///     println!(
986    ///         "{:?}: {} failed attempts",
987    ///         server.addr, server.failed_attempts
988    ///     );
989    /// }
990    /// # Ok(())
991    /// # }
992    /// ```
993    pub async fn server_pool(&self) -> Result<Vec<crate::Server>, ServerPoolError> {
994        let (tx, rx) = oneshot::channel();
995        self.sender
996            .send(Command::ServerPool { result: tx })
997            .await
998            .map_err(|err| ServerPoolError::with_source(ServerPoolErrorKind::Send, err))?;
999
1000        rx.await
1001            .map_err(|err| ServerPoolError::with_source(ServerPoolErrorKind::Send, err))
1002    }
1003
1004    /// Returns struct representing statistics of the whole lifecycle of the client.
1005    /// This includes number of bytes sent/received, number of messages sent/received,
1006    /// and number of times the connection was established.
1007    /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
1008    /// across threads.
1009    ///
1010    /// # Examples
1011    /// ```no_run
1012    /// # #[tokio::main]
1013    /// # async fn main() -> Result<(), async_nats::Error> {
1014    /// let client = async_nats::connect("demo.nats.io").await?;
1015    /// let statistics = client.statistics();
1016    /// println!("client statistics: {:#?}", statistics);
1017    /// # Ok(())
1018    /// # }
1019    /// ```
1020    pub fn statistics(&self) -> Arc<Statistics> {
1021        self.connection_stats.clone()
1022    }
1023}
1024
1025/// Used for building customized requests.
1026#[derive(Default)]
1027pub struct Request {
1028    pub payload: Option<Bytes>,
1029    pub headers: Option<HeaderMap>,
1030    pub timeout: Option<Option<Duration>>,
1031    pub inbox: Option<String>,
1032}
1033
1034impl Request {
1035    pub fn new() -> Request {
1036        Default::default()
1037    }
1038
1039    /// Sets the payload of the request. If not used, empty payload will be sent.
1040    ///
1041    /// # Examples
1042    /// ```no_run
1043    /// # #[tokio::main]
1044    /// # async fn main() -> Result<(), async_nats::Error> {
1045    /// let client = async_nats::connect("demo.nats.io").await?;
1046    /// let request = async_nats::Request::new().payload("data".into());
1047    /// client.send_request("service", request).await?;
1048    /// # Ok(())
1049    /// # }
1050    /// ```
1051    pub fn payload(mut self, payload: Bytes) -> Request {
1052        self.payload = Some(payload);
1053        self
1054    }
1055
1056    /// Sets the headers of the requests.
1057    ///
1058    /// # Examples
1059    /// ```no_run
1060    /// # #[tokio::main]
1061    /// # async fn main() -> Result<(), async_nats::Error> {
1062    /// use std::str::FromStr;
1063    /// let client = async_nats::connect("demo.nats.io").await?;
1064    /// let mut headers = async_nats::HeaderMap::new();
1065    /// headers.insert(
1066    ///     "X-Example",
1067    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
1068    /// );
1069    /// let request = async_nats::Request::new()
1070    ///     .headers(headers)
1071    ///     .payload("data".into());
1072    /// client.send_request("service", request).await?;
1073    /// # Ok(())
1074    /// # }
1075    /// ```
1076    pub fn headers(mut self, headers: HeaderMap) -> Request {
1077        self.headers = Some(headers);
1078        self
1079    }
1080
1081    /// Sets the custom timeout of the request. Overrides default [Client] timeout.
1082    /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
1083    /// To use default timeout, simply do not call this function.
1084    ///
1085    /// # Examples
1086    /// ```no_run
1087    /// # #[tokio::main]
1088    /// # async fn main() -> Result<(), async_nats::Error> {
1089    /// let client = async_nats::connect("demo.nats.io").await?;
1090    /// let request = async_nats::Request::new()
1091    ///     .timeout(Some(std::time::Duration::from_secs(15)))
1092    ///     .payload("data".into());
1093    /// client.send_request("service", request).await?;
1094    /// # Ok(())
1095    /// # }
1096    /// ```
1097    pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
1098        self.timeout = Some(timeout);
1099        self
1100    }
1101
1102    /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
1103    ///
1104    /// # Examples
1105    /// ```no_run
1106    /// # #[tokio::main]
1107    /// # async fn main() -> Result<(), async_nats::Error> {
1108    /// use std::str::FromStr;
1109    /// let client = async_nats::connect("demo.nats.io").await?;
1110    /// let request = async_nats::Request::new()
1111    ///     .inbox("custom_inbox".into())
1112    ///     .payload("data".into());
1113    /// client.send_request("service", request).await?;
1114    /// # Ok(())
1115    /// # }
1116    /// ```
1117    pub fn inbox(mut self, inbox: String) -> Request {
1118        self.inbox = Some(inbox);
1119        self
1120    }
1121}
1122
1123#[derive(Error, Debug)]
1124#[error("failed to send reconnect: {0}")]
1125pub struct ReconnectError(#[source] crate::Error);
1126
1127impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
1128    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1129        ReconnectError(Box::new(err))
1130    }
1131}
1132
1133/// An error returned from [`Client::set_server_pool`].
1134pub type SetServerPoolError = Error<SetServerPoolErrorKind>;
1135
1136#[derive(Copy, Clone, Debug, PartialEq)]
1137pub enum SetServerPoolErrorKind {
1138    /// Failed to send command to the connection handler.
1139    Send,
1140    /// One or more server addresses could not be parsed.
1141    InvalidAddress,
1142    /// The pool contains a mix of WebSocket (`ws://`, `wss://`) and
1143    /// non-websocket (`nats://`, `tls://`) URLs, which is not allowed.
1144    MixedSchemes,
1145    /// The server pool cannot be empty.
1146    EmptyPool,
1147}
1148
1149impl Display for SetServerPoolErrorKind {
1150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1151        match self {
1152            Self::Send => write!(f, "failed to send set_server_pool command"),
1153            Self::InvalidAddress => write!(f, "invalid server address"),
1154            Self::EmptyPool => write!(f, "server pool cannot be empty"),
1155            Self::MixedSchemes => write!(
1156                f,
1157                "cannot mix websocket and non-websocket URLs in server pool"
1158            ),
1159        }
1160    }
1161}
1162
1163/// An error returned from [`Client::server_pool`].
1164pub type ServerPoolError = Error<ServerPoolErrorKind>;
1165
1166#[derive(Copy, Clone, Debug, PartialEq)]
1167pub enum ServerPoolErrorKind {
1168    /// Failed to send command to the connection handler.
1169    Send,
1170}
1171
1172impl Display for ServerPoolErrorKind {
1173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1174        match self {
1175            Self::Send => write!(f, "failed to send server_pool command"),
1176        }
1177    }
1178}
1179
1180/// An error returned from the [`Client::subscribe`] or [`Client::queue_subscribe`] functions.
1181pub type SubscribeError = Error<SubscribeErrorKind>;
1182
1183impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
1184    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1185        SubscribeError::with_source(SubscribeErrorKind::Other, err)
1186    }
1187}
1188
1189#[derive(Copy, Clone, Debug, PartialEq)]
1190pub enum SubscribeErrorKind {
1191    /// The subject is invalid (empty, contains whitespace, or has malformed dot structure).
1192    InvalidSubject,
1193    /// The queue group name is invalid (empty or contains whitespace).
1194    InvalidQueueName,
1195    /// Other errors, client/io related.
1196    Other,
1197}
1198
1199impl Display for SubscribeErrorKind {
1200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1201        match self {
1202            Self::InvalidSubject => write!(f, "invalid subject"),
1203            Self::InvalidQueueName => write!(f, "invalid queue name"),
1204            Self::Other => write!(f, "subscribe failed"),
1205        }
1206    }
1207}
1208
1209#[derive(Error, Debug)]
1210#[error("failed to send drain: {0}")]
1211pub struct DrainError(#[source] crate::Error);
1212
1213impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
1214    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1215        DrainError(Box::new(err))
1216    }
1217}
1218
1219#[derive(Clone, Copy, Debug, PartialEq)]
1220pub enum RequestErrorKind {
1221    /// There are services listening on requested subject, but they didn't respond
1222    /// in time.
1223    TimedOut,
1224    /// No one is listening on request subject.
1225    NoResponders,
1226    /// The subject is invalid (empty or contains whitespace).
1227    InvalidSubject,
1228    /// Other errors, client/io related.
1229    Other,
1230}
1231
1232impl Display for RequestErrorKind {
1233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1234        match self {
1235            Self::TimedOut => write!(f, "request timed out"),
1236            Self::NoResponders => write!(f, "no responders"),
1237            Self::InvalidSubject => write!(f, "invalid subject"),
1238            Self::Other => write!(f, "request failed"),
1239        }
1240    }
1241}
1242
1243/// Error returned when a core NATS request fails.
1244/// To be enumerate over the variants, call [RequestError::kind].
1245pub type RequestError = Error<RequestErrorKind>;
1246
1247impl From<PublishError> for RequestError {
1248    fn from(e: PublishError) -> Self {
1249        RequestError::with_source(RequestErrorKind::Other, e)
1250    }
1251}
1252
1253impl From<SubscribeError> for RequestError {
1254    fn from(e: SubscribeError) -> Self {
1255        RequestError::with_source(RequestErrorKind::Other, e)
1256    }
1257}
1258
1259#[derive(Clone, Copy, Debug, PartialEq)]
1260pub enum FlushErrorKind {
1261    /// Sending the flush failed client side.
1262    SendError,
1263    /// Flush failed.
1264    /// This can happen mostly in case of connection issues
1265    /// that cannot be resolved quickly.
1266    FlushError,
1267}
1268
1269impl Display for FlushErrorKind {
1270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1271        match self {
1272            Self::SendError => write!(f, "failed to send flush request"),
1273            Self::FlushError => write!(f, "flush failed"),
1274        }
1275    }
1276}
1277
1278pub type FlushError = Error<FlushErrorKind>;
1279
1280/// Represents statistics for the instance of the client throughout its lifecycle.
1281#[derive(Default, Debug)]
1282pub struct Statistics {
1283    /// Number of bytes received. This does not include the protocol overhead.
1284    pub in_bytes: AtomicU64,
1285    /// Number of bytes sent. This doe not include the protocol overhead.
1286    pub out_bytes: AtomicU64,
1287    /// Number of messages received.
1288    pub in_messages: AtomicU64,
1289    /// Number of messages sent.
1290    pub out_messages: AtomicU64,
1291    /// Number of times connection was established.
1292    /// Initial connect will be counted as well, then all successful reconnects.
1293    pub connects: AtomicU64,
1294}