Skip to main content

async_nats/
client.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::message::OutboundMessage;
20use crate::subject::ToSubject;
21use crate::ServerInfo;
22
23use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
24use crate::error::Error;
25use bytes::Bytes;
26use futures_util::future::TryFutureExt;
27use futures_util::{Sink, SinkExt as _, StreamExt};
28use portable_atomic::AtomicU64;
29use regex::Regex;
30use std::fmt::Display;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::sync::Arc;
33use std::sync::LazyLock;
34use std::time::Duration;
35use thiserror::Error;
36use tokio::sync::{mpsc, oneshot};
37use tokio_util::sync::PollSender;
38
39static VERSION_RE: LazyLock<Regex> =
40    LazyLock::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
41
42/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
43/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
44pub type PublishError = Error<PublishErrorKind>;
45
46impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
47    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
48        PublishError::with_source(PublishErrorKind::Send, err)
49    }
50}
51
52impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
53    fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
54        PublishError::with_source(PublishErrorKind::Send, err)
55    }
56}
57
58#[derive(Copy, Clone, Debug, PartialEq)]
59pub enum PublishErrorKind {
60    MaxPayloadExceeded,
61    InvalidSubject,
62    Send,
63}
64
65impl Display for PublishErrorKind {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        match self {
68            PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
69            PublishErrorKind::Send => write!(f, "failed to send message"),
70            PublishErrorKind::InvalidSubject => write!(f, "invalid subject"),
71        }
72    }
73}
74
75/// Shared error text for the max-payload errors, from `(max_payload, size)`.
76pub(crate) fn max_payload_message((max_payload, size): (usize, usize)) -> String {
77    format!("Payload size limit of {max_payload} exceeded by message size of {size}")
78}
79
80/// `map_err` helper turning a size violation into a [`PublishError`].
81fn max_payload_error(sizes: (usize, usize)) -> PublishError {
82    PublishError::with_source(
83        PublishErrorKind::MaxPayloadExceeded,
84        max_payload_message(sizes),
85    )
86}
87
88/// Client is a `Cloneable` handle to NATS connection.
89/// Client should not be created directly. Instead, one of two methods can be used:
90/// [crate::connect] and [crate::ConnectOptions::connect]
91#[derive(Clone, Debug)]
92pub struct Client {
93    info: tokio::sync::watch::Receiver<Option<ServerInfo>>,
94    pub(crate) state: tokio::sync::watch::Receiver<State>,
95    pub(crate) sender: mpsc::Sender<Command>,
96    poll_sender: PollSender<Command>,
97    next_subscription_id: Arc<AtomicU64>,
98    subscription_capacity: usize,
99    inbox_prefix: Arc<str>,
100    request_timeout: Option<Duration>,
101    max_payload: Arc<AtomicUsize>,
102    connection_stats: Arc<Statistics>,
103    skip_subject_validation: bool,
104}
105
106pub mod traits {
107    use std::{future::Future, time::Duration};
108
109    use bytes::Bytes;
110
111    use crate::{message, subject::ToSubject, Message};
112
113    use super::{PublishError, Request, RequestError, SubscribeError};
114
115    pub trait Publisher {
116        fn publish_with_reply<S, R>(
117            &self,
118            subject: S,
119            reply: R,
120            payload: Bytes,
121        ) -> impl Future<Output = Result<(), PublishError>>
122        where
123            S: ToSubject,
124            R: ToSubject;
125
126        fn publish_message(
127            &self,
128            msg: message::OutboundMessage,
129        ) -> impl Future<Output = Result<(), PublishError>>;
130    }
131    pub trait Subscriber {
132        fn subscribe<S>(
133            &self,
134            subject: S,
135        ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>
136        where
137            S: ToSubject;
138    }
139    pub trait Requester {
140        fn send_request<S>(
141            &self,
142            subject: S,
143            request: Request,
144        ) -> impl Future<Output = Result<Message, RequestError>>
145        where
146            S: ToSubject;
147    }
148    pub trait TimeoutProvider {
149        fn timeout(&self) -> Option<Duration>;
150    }
151}
152
153impl traits::Requester for Client {
154    fn send_request<S: ToSubject>(
155        &self,
156        subject: S,
157        request: Request,
158    ) -> impl Future<Output = Result<Message, RequestError>> {
159        self.send_request(subject, request)
160    }
161}
162
163impl traits::TimeoutProvider for Client {
164    fn timeout(&self) -> Option<Duration> {
165        self.timeout()
166    }
167}
168
169impl traits::Publisher for Client {
170    fn publish_with_reply<S, R>(
171        &self,
172        subject: S,
173        reply: R,
174        payload: Bytes,
175    ) -> impl Future<Output = Result<(), PublishError>>
176    where
177        S: ToSubject,
178        R: ToSubject,
179    {
180        self.publish_with_reply(subject, reply, payload)
181    }
182
183    async fn publish_message(&self, msg: OutboundMessage) -> Result<(), PublishError> {
184        if msg.subject.is_empty()
185            || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&msg.subject))
186        {
187            return Err(PublishError::with_source(
188                PublishErrorKind::InvalidSubject,
189                crate::subject::SubjectError::InvalidFormat,
190            ));
191        }
192        self.check_payload_size(msg.headers.as_ref(), msg.payload.len())
193            .map_err(max_payload_error)?;
194        self.sender
195            .send(Command::Publish(msg))
196            .await
197            .map_err(Into::into)
198    }
199}
200
201impl traits::Subscriber for Client {
202    fn subscribe<S>(&self, subject: S) -> impl Future<Output = Result<Subscriber, SubscribeError>>
203    where
204        S: ToSubject,
205    {
206        self.subscribe(subject)
207    }
208}
209
210impl Sink<OutboundMessage> for Client {
211    type Error = PublishError;
212
213    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
214        self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
215    }
216
217    fn start_send(mut self: Pin<&mut Self>, msg: OutboundMessage) -> Result<(), Self::Error> {
218        self.check_payload_size(msg.headers.as_ref(), msg.payload.len())
219            .map_err(max_payload_error)?;
220        self.poll_sender
221            .start_send_unpin(Command::Publish(msg))
222            .map_err(Into::into)
223    }
224
225    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
226        self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
227    }
228
229    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
230        self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
231    }
232}
233
234impl Client {
235    #[allow(clippy::too_many_arguments)]
236    pub(crate) fn new(
237        info: tokio::sync::watch::Receiver<Option<ServerInfo>>,
238        state: tokio::sync::watch::Receiver<State>,
239        sender: mpsc::Sender<Command>,
240        capacity: usize,
241        inbox_prefix: String,
242        request_timeout: Option<Duration>,
243        max_payload: Arc<AtomicUsize>,
244        statistics: Arc<Statistics>,
245        skip_subject_validation: bool,
246    ) -> Client {
247        let poll_sender = PollSender::new(sender.clone());
248        Client {
249            info,
250            state,
251            sender,
252            poll_sender,
253            next_subscription_id: Arc::new(AtomicU64::new(1)),
254            subscription_capacity: capacity,
255            inbox_prefix: inbox_prefix.into(),
256            request_timeout,
257            max_payload,
258            connection_stats: statistics,
259            skip_subject_validation,
260        }
261    }
262
263    /// Validates a subject for publishing (protocol-framing safety only).
264    /// Checks for empty and whitespace. Does not check dot structure.
265    pub(crate) fn maybe_validate_publish_subject<S: ToSubject>(
266        &self,
267        subject: S,
268    ) -> Result<crate::Subject, crate::subject::SubjectError> {
269        let subject = subject.to_subject();
270        if subject.is_empty()
271            || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&subject))
272        {
273            return Err(crate::subject::SubjectError::InvalidFormat);
274        }
275        Ok(subject)
276    }
277
278    /// Validates a subject for subscribing (protocol-framing + dot structure).
279    /// Always runs regardless of `skip_subject_validation` (matches Go/Java behavior).
280    pub(crate) fn validate_subscribe_subject<S: ToSubject>(
281        &self,
282        subject: S,
283    ) -> Result<crate::Subject, crate::subject::SubjectError> {
284        let subject = subject.to_subject();
285        if !subject.is_valid() {
286            return Err(crate::subject::SubjectError::InvalidFormat);
287        }
288        Ok(subject)
289    }
290
291    /// Returns the default timeout for requests set when creating the client.
292    ///
293    /// # Examples
294    /// ```no_run
295    /// # #[tokio::main]
296    /// # async fn main() -> Result<(), async_nats::Error> {
297    /// let client = async_nats::connect("demo.nats.io").await?;
298    /// println!("default request timeout: {:?}", client.timeout());
299    /// # Ok(())
300    /// # }
301    /// ```
302    pub fn timeout(&self) -> Option<Duration> {
303        self.request_timeout
304    }
305
306    /// Returns the last received info from the server if one has been observed.
307    ///
308    /// This is useful when [`ConnectOptions::retry_on_initial_connect`][crate::ConnectOptions::retry_on_initial_connect]
309    /// is enabled and the client is returned before the first server `INFO`
310    /// message arrives.
311    ///
312    /// # Examples
313    ///
314    /// ```no_run
315    /// # #[tokio::main]
316    /// # async fn main () -> Result<(), async_nats::Error> {
317    /// let client = async_nats::ConnectOptions::new()
318    ///     .retry_on_initial_connect()
319    ///     .connect("demo.nats.io")
320    ///     .await?;
321    /// println!("info available: {}", client.try_server_info().is_some());
322    /// # Ok(())
323    /// # }
324    /// ```
325    pub fn try_server_info(&self) -> Option<ServerInfo> {
326        // We ignore notifying the watcher, as that requires mutable client reference.
327        self.info.borrow().clone()
328    }
329
330    /// Returns last received info from the server.
331    ///
332    /// # Examples
333    ///
334    /// ```no_run
335    /// # #[tokio::main]
336    /// # async fn main () -> Result<(), async_nats::Error> {
337    /// let client = async_nats::connect("demo.nats.io").await?;
338    /// println!("info: {:?}", client.server_info());
339    /// # Ok(())
340    /// # }
341    /// ```
342    pub fn server_info(&self) -> ServerInfo {
343        self.try_server_info().unwrap_or_default()
344    }
345
346    /// Returns the maximum payload size currently used by the client.
347    ///
348    /// Before the first server `INFO` message arrives, this returns the default
349    /// server payload limit used for client-side publish validation.
350    ///
351    /// # Examples
352    ///
353    /// ```no_run
354    /// # #[tokio::main]
355    /// # async fn main () -> Result<(), async_nats::Error> {
356    /// let client = async_nats::connect("demo.nats.io").await?;
357    /// println!("max payload: {:?}", client.max_payload());
358    /// # Ok(())
359    /// # }
360    /// ```
361    pub fn max_payload(&self) -> usize {
362        self.max_payload.load(Ordering::Relaxed)
363    }
364
365    /// Checks the wire size (serialized headers + payload, matching the server's
366    /// `HPUB` total) against the negotiated `max_payload`. On violation returns
367    /// `(max_payload, size)` for the caller to wrap in its own error.
368    pub(crate) fn check_payload_size(
369        &self,
370        headers: Option<&HeaderMap>,
371        payload_len: usize,
372    ) -> Result<(), (usize, usize)> {
373        let max_payload = self.max_payload.load(Ordering::Relaxed);
374        let size = match headers {
375            Some(headers) if !headers.is_empty() => headers.wire_len() + payload_len,
376            _ => payload_len,
377        };
378        if size > max_payload {
379            Err((max_payload, size))
380        } else {
381            Ok(())
382        }
383    }
384
385    /// Returns true if the server version is compatible with the version components.
386    ///
387    /// This has to be used with caution, as it is not guaranteed that the server
388    /// that client is connected to is the same version that the one that is
389    /// a JetStream meta/stream/consumer leader, especially across leafnodes.
390    ///
391    /// # Examples
392    ///
393    /// ```no_run
394    /// # #[tokio::main]
395    /// # async fn main() -> Result<(), async_nats::Error> {
396    /// let client = async_nats::connect("demo.nats.io").await?;
397    /// assert!(client.is_server_compatible(2, 8, 4));
398    /// # Ok(())
399    /// # }
400    /// ```
401    pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
402        let info = self.server_info();
403
404        let server_version_captures = match VERSION_RE.captures(&info.version) {
405            Some(captures) => captures,
406            None => return false,
407        };
408
409        let server_major = server_version_captures
410            .get(1)
411            .map(|m| m.as_str().parse::<i64>().unwrap())
412            .unwrap();
413
414        let server_minor = server_version_captures
415            .get(2)
416            .map(|m| m.as_str().parse::<i64>().unwrap())
417            .unwrap();
418
419        let server_patch = server_version_captures
420            .get(3)
421            .map(|m| m.as_str().parse::<i64>().unwrap())
422            .unwrap();
423
424        if server_major < major
425            || (server_major == major && server_minor < minor)
426            || (server_major == major && server_minor == minor && server_patch < patch)
427        {
428            return false;
429        }
430        true
431    }
432
433    /// Publish a [Message] to a given subject.
434    ///
435    /// Returns `PublishErrorKind::InvalidSubject` if the subject is invalid
436    /// (empty or contains whitespace). This check can be disabled with
437    /// [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation] (empty subjects are
438    /// always rejected).
439    ///
440    /// # Examples
441    /// ```no_run
442    /// # #[tokio::main]
443    /// # async fn main() -> Result<(), async_nats::Error> {
444    /// let client = async_nats::connect("demo.nats.io").await?;
445    /// client.publish("events.data", "payload".into()).await?;
446    /// # Ok(())
447    /// # }
448    /// ```
449    pub async fn publish<S: ToSubject>(
450        &self,
451        subject: S,
452        payload: Bytes,
453    ) -> Result<(), PublishError> {
454        let subject = self
455            .maybe_validate_publish_subject(subject)
456            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
457
458        self.check_payload_size(None, payload.len())
459            .map_err(max_payload_error)?;
460
461        self.sender
462            .send(Command::Publish(OutboundMessage {
463                subject,
464                payload,
465                reply: None,
466                headers: None,
467            }))
468            .await?;
469        Ok(())
470    }
471
472    /// Publish a [Message] with headers to a given subject.
473    ///
474    /// # Examples
475    /// ```
476    /// # #[tokio::main]
477    /// # async fn main() -> Result<(), async_nats::Error> {
478    /// use std::str::FromStr;
479    /// let client = async_nats::connect("demo.nats.io").await?;
480    /// let mut headers = async_nats::HeaderMap::new();
481    /// headers.insert(
482    ///     "X-Header",
483    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
484    /// );
485    /// client
486    ///     .publish_with_headers("events.data", headers, "payload".into())
487    ///     .await?;
488    /// # Ok(())
489    /// # }
490    /// ```
491    pub async fn publish_with_headers<S: ToSubject>(
492        &self,
493        subject: S,
494        headers: HeaderMap,
495        payload: Bytes,
496    ) -> Result<(), PublishError> {
497        let subject = self
498            .maybe_validate_publish_subject(subject)
499            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
500
501        self.check_payload_size(Some(&headers), payload.len())
502            .map_err(max_payload_error)?;
503
504        self.sender
505            .send(Command::Publish(OutboundMessage {
506                subject,
507                payload,
508                reply: None,
509                headers: Some(headers),
510            }))
511            .await?;
512        Ok(())
513    }
514
515    /// Publish a [Message] to a given subject, with specified response subject
516    /// to which the subscriber can respond.
517    /// This method does not await for the response.
518    ///
519    /// # Examples
520    ///
521    /// ```no_run
522    /// # #[tokio::main]
523    /// # async fn main() -> Result<(), async_nats::Error> {
524    /// let client = async_nats::connect("demo.nats.io").await?;
525    /// client
526    ///     .publish_with_reply("events.data", "reply_subject", "payload".into())
527    ///     .await?;
528    /// # Ok(())
529    /// # }
530    /// ```
531    pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
532        &self,
533        subject: S,
534        reply: R,
535        payload: Bytes,
536    ) -> Result<(), PublishError> {
537        let subject = self
538            .maybe_validate_publish_subject(subject)
539            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
540        let reply = self
541            .maybe_validate_publish_subject(reply)
542            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
543
544        self.check_payload_size(None, payload.len())
545            .map_err(max_payload_error)?;
546
547        self.sender
548            .send(Command::Publish(OutboundMessage {
549                subject,
550                payload,
551                reply: Some(reply),
552                headers: None,
553            }))
554            .await?;
555        Ok(())
556    }
557
558    /// Publish a [Message] to a given subject with headers and specified response subject
559    /// to which the subscriber can respond.
560    /// This method does not await for the response.
561    ///
562    /// # Examples
563    ///
564    /// ```no_run
565    /// # #[tokio::main]
566    /// # async fn main() -> Result<(), async_nats::Error> {
567    /// use std::str::FromStr;
568    /// let client = async_nats::connect("demo.nats.io").await?;
569    /// let mut headers = async_nats::HeaderMap::new();
570    /// client
571    ///     .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
572    ///     .await?;
573    /// # Ok(())
574    /// # }
575    /// ```
576    pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
577        &self,
578        subject: S,
579        reply: R,
580        headers: HeaderMap,
581        payload: Bytes,
582    ) -> Result<(), PublishError> {
583        let subject = self
584            .maybe_validate_publish_subject(subject)
585            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
586        let reply = self
587            .maybe_validate_publish_subject(reply)
588            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
589
590        self.check_payload_size(Some(&headers), payload.len())
591            .map_err(max_payload_error)?;
592
593        self.sender
594            .send(Command::Publish(OutboundMessage {
595                subject,
596                payload,
597                reply: Some(reply),
598                headers: Some(headers),
599            }))
600            .await?;
601        Ok(())
602    }
603
604    /// Sends the request with headers.
605    ///
606    /// # Examples
607    /// ```no_run
608    /// # #[tokio::main]
609    /// # async fn main() -> Result<(), async_nats::Error> {
610    /// let client = async_nats::connect("demo.nats.io").await?;
611    /// let response = client.request("service", "data".into()).await?;
612    /// # Ok(())
613    /// # }
614    /// ```
615    pub async fn request<S: ToSubject>(
616        &self,
617        subject: S,
618        payload: Bytes,
619    ) -> Result<Message, RequestError> {
620        let request = Request::new().payload(payload);
621        self.send_request(subject, request).await
622    }
623
624    /// Sends the request with headers.
625    ///
626    /// # Examples
627    /// ```no_run
628    /// # #[tokio::main]
629    /// # async fn main() -> Result<(), async_nats::Error> {
630    /// let client = async_nats::connect("demo.nats.io").await?;
631    /// let mut headers = async_nats::HeaderMap::new();
632    /// headers.insert("Key", "Value");
633    /// let response = client
634    ///     .request_with_headers("service", headers, "data".into())
635    ///     .await?;
636    /// # Ok(())
637    /// # }
638    /// ```
639    pub async fn request_with_headers<S: ToSubject>(
640        &self,
641        subject: S,
642        headers: HeaderMap,
643        payload: Bytes,
644    ) -> Result<Message, RequestError> {
645        let request = Request::new().headers(headers).payload(payload);
646        self.send_request(subject, request).await
647    }
648
649    /// Sends the request created by the [Request].
650    ///
651    /// Returns `RequestErrorKind::InvalidSubject` if the subject is invalid
652    /// (empty or contains whitespace).
653    ///
654    /// # Examples
655    ///
656    /// ```no_run
657    /// # #[tokio::main]
658    /// # async fn main() -> Result<(), async_nats::Error> {
659    /// let client = async_nats::connect("demo.nats.io").await?;
660    /// let request = async_nats::Request::new().payload("data".into());
661    /// let response = client.send_request("service", request).await?;
662    /// # Ok(())
663    /// # }
664    /// ```
665    pub async fn send_request<S: ToSubject>(
666        &self,
667        subject: S,
668        request: Request,
669    ) -> Result<Message, RequestError> {
670        let subject = self
671            .maybe_validate_publish_subject(subject)
672            .map_err(|e| RequestError::with_source(RequestErrorKind::InvalidSubject, e))?;
673
674        // Reject oversized requests up front, else they'd be sent and time out.
675        self.check_payload_size(
676            request.headers.as_ref(),
677            request.payload.as_ref().map_or(0, Bytes::len),
678        )
679        .map_err(|sizes| {
680            RequestError::with_source(
681                RequestErrorKind::MaxPayloadExceeded,
682                max_payload_message(sizes),
683            )
684        })?;
685
686        if let Some(inbox) = request.inbox {
687            let timeout = request.timeout.unwrap_or(self.request_timeout);
688            let mut subscriber = self.subscribe(inbox.clone()).await?;
689            let payload: Bytes = request.payload.unwrap_or_default();
690            match request.headers {
691                Some(headers) => {
692                    self.publish_with_reply_and_headers(subject, inbox, headers, payload)
693                        .await?
694                }
695                None => self.publish_with_reply(subject, inbox, payload).await?,
696            }
697            let request = match timeout {
698                Some(timeout) => {
699                    tokio::time::timeout(timeout, subscriber.next())
700                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
701                        .await?
702                }
703                None => subscriber.next().await,
704            };
705            match request {
706                Some(message) => {
707                    if message.status == Some(StatusCode::NO_RESPONDERS) {
708                        return Err(RequestError::with_source(
709                            RequestErrorKind::NoResponders,
710                            "no responders",
711                        ));
712                    }
713                    Ok(message)
714                }
715                None => Err(RequestError::with_source(
716                    RequestErrorKind::Other,
717                    "broken pipe",
718                )),
719            }
720        } else {
721            let (sender, receiver) = oneshot::channel();
722
723            let payload = request.payload.unwrap_or_default();
724            let respond = self.new_inbox().into();
725            let headers = request.headers;
726
727            self.sender
728                .send(Command::Request {
729                    subject,
730                    payload,
731                    respond,
732                    headers,
733                    sender,
734                })
735                .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
736                .await?;
737
738            let timeout = request.timeout.unwrap_or(self.request_timeout);
739            let request = match timeout {
740                Some(timeout) => {
741                    tokio::time::timeout(timeout, receiver)
742                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
743                        .await?
744                }
745                None => receiver.await,
746            };
747
748            match request {
749                Ok(message) => {
750                    if message.status == Some(StatusCode::NO_RESPONDERS) {
751                        return Err(RequestError::with_source(
752                            RequestErrorKind::NoResponders,
753                            "no responders",
754                        ));
755                    }
756                    Ok(message)
757                }
758                Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
759            }
760        }
761    }
762
763    /// Create a new globally unique inbox which can be used for replies.
764    ///
765    /// # Examples
766    ///
767    /// ```no_run
768    /// # #[tokio::main]
769    /// # async fn main() -> Result<(), async_nats::Error> {
770    /// # let mut nc = async_nats::connect("demo.nats.io").await?;
771    /// let reply = nc.new_inbox();
772    /// let rsub = nc.subscribe(reply).await?;
773    /// # Ok(())
774    /// # }
775    /// ```
776    pub fn new_inbox(&self) -> String {
777        format!("{}.{}", self.inbox_prefix, crate::id_generator::next())
778    }
779
780    /// Subscribes to a subject to receive [messages][Message].
781    ///
782    /// Returns an error if the subject is invalid (empty, contains whitespace,
783    /// or has malformed dot structure). This validation always runs regardless
784    /// of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
785    ///
786    /// # Examples
787    ///
788    /// ```no_run
789    /// # #[tokio::main]
790    /// # async fn main() -> Result<(), async_nats::Error> {
791    /// use futures_util::StreamExt;
792    /// let client = async_nats::connect("demo.nats.io").await?;
793    /// let mut subscription = client.subscribe("events.>").await?;
794    /// while let Some(message) = subscription.next().await {
795    ///     println!("received message: {:?}", message);
796    /// }
797    /// # Ok(())
798    /// # }
799    /// ```
800    pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
801        let subject = self
802            .validate_subscribe_subject(subject)
803            .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
804
805        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
806        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
807
808        self.sender
809            .send(Command::Subscribe {
810                sid,
811                subject,
812                queue_group: None,
813                sender,
814            })
815            .await?;
816
817        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
818    }
819
820    /// Subscribes to a subject with a queue group to receive [messages][Message].
821    ///
822    /// Returns an error if the subject is invalid (empty, contains whitespace,
823    /// or has malformed dot structure) or if the queue group name is invalid
824    /// (empty or contains whitespace). Subject and queue group validation always
825    /// runs regardless of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
826    ///
827    /// # Examples
828    ///
829    /// ```no_run
830    /// # #[tokio::main]
831    /// # async fn main() -> Result<(), async_nats::Error> {
832    /// use futures_util::StreamExt;
833    /// let client = async_nats::connect("demo.nats.io").await?;
834    /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
835    /// while let Some(message) = subscription.next().await {
836    ///     println!("received message: {:?}", message);
837    /// }
838    /// # Ok(())
839    /// # }
840    /// ```
841    pub async fn queue_subscribe<S: ToSubject>(
842        &self,
843        subject: S,
844        queue_group: String,
845    ) -> Result<Subscriber, SubscribeError> {
846        let subject = self
847            .validate_subscribe_subject(subject)
848            .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
849
850        if !crate::is_valid_queue_group(&queue_group) {
851            return Err(SubscribeError::new(SubscribeErrorKind::InvalidQueueName));
852        }
853
854        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
855        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
856
857        self.sender
858            .send(Command::Subscribe {
859                sid,
860                subject,
861                queue_group: Some(queue_group),
862                sender,
863            })
864            .await?;
865
866        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
867    }
868
869    /// Flushes the internal buffer ensuring that all messages are sent.
870    ///
871    /// # Examples
872    ///
873    /// ```no_run
874    /// # #[tokio::main]
875    /// # async fn main() -> Result<(), async_nats::Error> {
876    /// let client = async_nats::connect("demo.nats.io").await?;
877    /// client.flush().await?;
878    /// # Ok(())
879    /// # }
880    /// ```
881    pub async fn flush(&self) -> Result<(), FlushError> {
882        let (tx, rx) = tokio::sync::oneshot::channel();
883        self.sender
884            .send(Command::Flush { observer: tx })
885            .await
886            .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
887
888        rx.await
889            .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
890        Ok(())
891    }
892
893    /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
894    /// messages, then closes the connection. Once completed, any streams associated with the
895    /// connection and its [Clients](crate::Client) will be closed, and further [crate::Client] commands will fail.
896    ///
897    /// # Examples
898    ///
899    /// ```no_run
900    /// # #[tokio::main]
901    /// # async fn main() -> Result<(), async_nats::Error> {
902    /// use futures_util::StreamExt;
903    /// let client = async_nats::connect("demo.nats.io").await?;
904    /// let mut subscription = client.subscribe("events.>").await?;
905    ///
906    /// client.drain().await?;
907    ///
908    /// # // existing subscriptions are closed and further commands will fail
909    /// assert!(subscription.next().await.is_none());
910    /// client
911    ///     .subscribe("events.>")
912    ///     .await
913    ///     .expect_err("Expected further commands to fail");
914    ///
915    /// # Ok(())
916    /// # }
917    /// ```
918    pub async fn drain(&self) -> Result<(), DrainError> {
919        // Drain all subscriptions
920        self.sender.send(Command::Drain { sid: None }).await?;
921
922        // Remaining process is handled on the handler-side
923        Ok(())
924    }
925
926    /// Returns the current state of the connection.
927    ///
928    /// # Examples
929    /// ```no_run
930    /// # #[tokio::main]
931    /// # async fn main() -> Result<(), async_nats::Error> {
932    /// let client = async_nats::connect("demo.nats.io").await?;
933    /// println!("connection state: {}", client.connection_state());
934    /// # Ok(())
935    /// # }
936    /// ```
937    pub fn connection_state(&self) -> State {
938        self.state.borrow().to_owned()
939    }
940
941    /// Forces the client to reconnect.
942    /// Keep in mind that client will reconnect automatically if the connection is lost and this
943    /// method does not have to be used in normal circumstances.
944    /// However, if you want to force the client to reconnect, for example to re-trigger
945    /// the `auth-callback`, or manually rebalance connections, this method can be useful.
946    /// This method does not wait for connection to be re-established.
947    ///
948    /// # Examples
949    /// ```no_run
950    /// # #[tokio::main]
951    /// # async fn main() -> Result<(), async_nats::Error> {
952    /// let client = async_nats::connect("demo.nats.io").await?;
953    /// client.force_reconnect().await?;
954    /// # Ok(())
955    /// # }
956    /// ```
957    pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
958        self.sender
959            .send(Command::Reconnect)
960            .await
961            .map_err(Into::into)
962    }
963
964    /// Replaces the server pool used for reconnection attempts.
965    ///
966    /// The new pool takes effect on the next reconnect attempt; it does not
967    /// trigger an immediate reconnect. To force an immediate reconnect with
968    /// the new pool, call [`Client::force_reconnect`] after this method.
969    ///
970    /// Per-server state (failed attempt count, connection history) is preserved
971    /// for servers that appear in both the old and new pools.
972    ///
973    /// This also resets the global reconnection attempt counter, so any
974    /// progress toward [`ConnectOptions::max_reconnects`](crate::ConnectOptions::max_reconnects)
975    /// is cleared.
976    ///
977    /// # Examples
978    /// ```no_run
979    /// # #[tokio::main]
980    /// # async fn main() -> Result<(), async_nats::Error> {
981    /// let client = async_nats::connect("demo.nats.io").await?;
982    /// client
983    ///     .set_server_pool(["nats://server1:4222", "nats://server2:4222"].as_slice())
984    ///     .await?;
985    /// // Optionally force reconnect to apply immediately:
986    /// client.force_reconnect().await?;
987    /// # Ok(())
988    /// # }
989    /// ```
990    pub async fn set_server_pool<A: crate::ToServerAddrs>(
991        &self,
992        addrs: A,
993    ) -> Result<(), SetServerPoolError> {
994        let servers: Vec<crate::ServerAddr> = addrs
995            .to_server_addrs()
996            .map_err(|err| {
997                SetServerPoolError::with_source(SetServerPoolErrorKind::InvalidAddress, err)
998            })?
999            .collect();
1000
1001        if servers.is_empty() {
1002            return Err(SetServerPoolError::new(SetServerPoolErrorKind::EmptyPool));
1003        }
1004
1005        let (tx, rx) = oneshot::channel();
1006        self.sender
1007            .send(Command::SetServerPool {
1008                servers,
1009                result: tx,
1010            })
1011            .await
1012            .map_err(|err| SetServerPoolError::with_source(SetServerPoolErrorKind::Send, err))?;
1013
1014        rx.await
1015            .map_err(|err| SetServerPoolError::with_source(SetServerPoolErrorKind::Send, err))?
1016            .map_err(|err| {
1017                SetServerPoolError::with_source(SetServerPoolErrorKind::MixedSchemes, err)
1018            })
1019    }
1020
1021    /// Returns a snapshot of the current server pool.
1022    ///
1023    /// The returned list includes both explicitly configured and discovered
1024    /// servers, along with per-server metadata such as reconnect count and
1025    /// connection history.
1026    ///
1027    /// # Examples
1028    /// ```no_run
1029    /// # #[tokio::main]
1030    /// # async fn main() -> Result<(), async_nats::Error> {
1031    /// let client = async_nats::connect("demo.nats.io").await?;
1032    /// let pool = client.server_pool().await?;
1033    /// for server in &pool {
1034    ///     println!(
1035    ///         "{:?}: {} failed attempts",
1036    ///         server.addr, server.failed_attempts
1037    ///     );
1038    /// }
1039    /// # Ok(())
1040    /// # }
1041    /// ```
1042    pub async fn server_pool(&self) -> Result<Vec<crate::Server>, ServerPoolError> {
1043        let (tx, rx) = oneshot::channel();
1044        self.sender
1045            .send(Command::ServerPool { result: tx })
1046            .await
1047            .map_err(|err| ServerPoolError::with_source(ServerPoolErrorKind::Send, err))?;
1048
1049        rx.await
1050            .map_err(|err| ServerPoolError::with_source(ServerPoolErrorKind::Send, err))
1051    }
1052
1053    /// Returns struct representing statistics of the whole lifecycle of the client.
1054    /// This includes number of bytes sent/received, number of messages sent/received,
1055    /// and number of times the connection was established.
1056    /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
1057    /// across threads.
1058    ///
1059    /// # Examples
1060    /// ```no_run
1061    /// # #[tokio::main]
1062    /// # async fn main() -> Result<(), async_nats::Error> {
1063    /// let client = async_nats::connect("demo.nats.io").await?;
1064    /// let statistics = client.statistics();
1065    /// println!("client statistics: {:#?}", statistics);
1066    /// # Ok(())
1067    /// # }
1068    /// ```
1069    pub fn statistics(&self) -> Arc<Statistics> {
1070        self.connection_stats.clone()
1071    }
1072}
1073
1074/// Used for building customized requests.
1075#[derive(Default)]
1076pub struct Request {
1077    pub payload: Option<Bytes>,
1078    pub headers: Option<HeaderMap>,
1079    pub timeout: Option<Option<Duration>>,
1080    pub inbox: Option<String>,
1081}
1082
1083impl Request {
1084    pub fn new() -> Request {
1085        Default::default()
1086    }
1087
1088    /// Sets the payload of the request. If not used, empty payload will be sent.
1089    ///
1090    /// # Examples
1091    /// ```no_run
1092    /// # #[tokio::main]
1093    /// # async fn main() -> Result<(), async_nats::Error> {
1094    /// let client = async_nats::connect("demo.nats.io").await?;
1095    /// let request = async_nats::Request::new().payload("data".into());
1096    /// client.send_request("service", request).await?;
1097    /// # Ok(())
1098    /// # }
1099    /// ```
1100    pub fn payload(mut self, payload: Bytes) -> Request {
1101        self.payload = Some(payload);
1102        self
1103    }
1104
1105    /// Sets the headers of the requests.
1106    ///
1107    /// # Examples
1108    /// ```no_run
1109    /// # #[tokio::main]
1110    /// # async fn main() -> Result<(), async_nats::Error> {
1111    /// use std::str::FromStr;
1112    /// let client = async_nats::connect("demo.nats.io").await?;
1113    /// let mut headers = async_nats::HeaderMap::new();
1114    /// headers.insert(
1115    ///     "X-Example",
1116    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
1117    /// );
1118    /// let request = async_nats::Request::new()
1119    ///     .headers(headers)
1120    ///     .payload("data".into());
1121    /// client.send_request("service", request).await?;
1122    /// # Ok(())
1123    /// # }
1124    /// ```
1125    pub fn headers(mut self, headers: HeaderMap) -> Request {
1126        self.headers = Some(headers);
1127        self
1128    }
1129
1130    /// Sets the custom timeout of the request. Overrides default [Client] timeout.
1131    /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
1132    /// To use default timeout, simply do not call this function.
1133    ///
1134    /// # Examples
1135    /// ```no_run
1136    /// # #[tokio::main]
1137    /// # async fn main() -> Result<(), async_nats::Error> {
1138    /// let client = async_nats::connect("demo.nats.io").await?;
1139    /// let request = async_nats::Request::new()
1140    ///     .timeout(Some(std::time::Duration::from_secs(15)))
1141    ///     .payload("data".into());
1142    /// client.send_request("service", request).await?;
1143    /// # Ok(())
1144    /// # }
1145    /// ```
1146    pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
1147        self.timeout = Some(timeout);
1148        self
1149    }
1150
1151    /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
1152    ///
1153    /// # Examples
1154    /// ```no_run
1155    /// # #[tokio::main]
1156    /// # async fn main() -> Result<(), async_nats::Error> {
1157    /// use std::str::FromStr;
1158    /// let client = async_nats::connect("demo.nats.io").await?;
1159    /// let request = async_nats::Request::new()
1160    ///     .inbox("custom_inbox".into())
1161    ///     .payload("data".into());
1162    /// client.send_request("service", request).await?;
1163    /// # Ok(())
1164    /// # }
1165    /// ```
1166    pub fn inbox(mut self, inbox: String) -> Request {
1167        self.inbox = Some(inbox);
1168        self
1169    }
1170}
1171
1172#[derive(Error, Debug)]
1173#[error("failed to send reconnect: {0}")]
1174pub struct ReconnectError(#[source] crate::Error);
1175
1176impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
1177    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1178        ReconnectError(Box::new(err))
1179    }
1180}
1181
1182/// An error returned from [`Client::set_server_pool`].
1183pub type SetServerPoolError = Error<SetServerPoolErrorKind>;
1184
1185#[derive(Copy, Clone, Debug, PartialEq)]
1186pub enum SetServerPoolErrorKind {
1187    /// Failed to send command to the connection handler.
1188    Send,
1189    /// One or more server addresses could not be parsed.
1190    InvalidAddress,
1191    /// The pool contains a mix of WebSocket (`ws://`, `wss://`) and
1192    /// non-websocket (`nats://`, `tls://`) URLs, which is not allowed.
1193    MixedSchemes,
1194    /// The server pool cannot be empty.
1195    EmptyPool,
1196}
1197
1198impl Display for SetServerPoolErrorKind {
1199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1200        match self {
1201            Self::Send => write!(f, "failed to send set_server_pool command"),
1202            Self::InvalidAddress => write!(f, "invalid server address"),
1203            Self::EmptyPool => write!(f, "server pool cannot be empty"),
1204            Self::MixedSchemes => write!(
1205                f,
1206                "cannot mix websocket and non-websocket URLs in server pool"
1207            ),
1208        }
1209    }
1210}
1211
1212/// An error returned from [`Client::server_pool`].
1213pub type ServerPoolError = Error<ServerPoolErrorKind>;
1214
1215#[derive(Copy, Clone, Debug, PartialEq)]
1216pub enum ServerPoolErrorKind {
1217    /// Failed to send command to the connection handler.
1218    Send,
1219}
1220
1221impl Display for ServerPoolErrorKind {
1222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1223        match self {
1224            Self::Send => write!(f, "failed to send server_pool command"),
1225        }
1226    }
1227}
1228
1229/// An error returned from the [`Client::subscribe`] or [`Client::queue_subscribe`] functions.
1230pub type SubscribeError = Error<SubscribeErrorKind>;
1231
1232impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
1233    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1234        SubscribeError::with_source(SubscribeErrorKind::Other, err)
1235    }
1236}
1237
1238#[derive(Copy, Clone, Debug, PartialEq)]
1239pub enum SubscribeErrorKind {
1240    /// The subject is invalid (empty, contains whitespace, or has malformed dot structure).
1241    InvalidSubject,
1242    /// The queue group name is invalid (empty or contains whitespace).
1243    InvalidQueueName,
1244    /// Other errors, client/io related.
1245    Other,
1246}
1247
1248impl Display for SubscribeErrorKind {
1249    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1250        match self {
1251            Self::InvalidSubject => write!(f, "invalid subject"),
1252            Self::InvalidQueueName => write!(f, "invalid queue name"),
1253            Self::Other => write!(f, "subscribe failed"),
1254        }
1255    }
1256}
1257
1258#[derive(Error, Debug)]
1259#[error("failed to send drain: {0}")]
1260pub struct DrainError(#[source] crate::Error);
1261
1262impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
1263    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1264        DrainError(Box::new(err))
1265    }
1266}
1267
1268#[derive(Clone, Copy, Debug, PartialEq)]
1269pub enum RequestErrorKind {
1270    /// There are services listening on requested subject, but they didn't respond
1271    /// in time.
1272    TimedOut,
1273    /// No one is listening on request subject.
1274    NoResponders,
1275    /// The subject is invalid (empty or contains whitespace).
1276    InvalidSubject,
1277    /// The payload (plus headers) exceeds the server's `max_payload`.
1278    MaxPayloadExceeded,
1279    /// Other errors, client/io related.
1280    Other,
1281}
1282
1283impl Display for RequestErrorKind {
1284    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1285        match self {
1286            Self::TimedOut => write!(f, "request timed out"),
1287            Self::NoResponders => write!(f, "no responders"),
1288            Self::InvalidSubject => write!(f, "invalid subject"),
1289            Self::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
1290            Self::Other => write!(f, "request failed"),
1291        }
1292    }
1293}
1294
1295/// Error returned when a core NATS request fails.
1296/// To be enumerate over the variants, call [RequestError::kind].
1297pub type RequestError = Error<RequestErrorKind>;
1298
1299impl From<PublishError> for RequestError {
1300    fn from(e: PublishError) -> Self {
1301        RequestError::with_source(RequestErrorKind::Other, e)
1302    }
1303}
1304
1305impl From<SubscribeError> for RequestError {
1306    fn from(e: SubscribeError) -> Self {
1307        RequestError::with_source(RequestErrorKind::Other, e)
1308    }
1309}
1310
1311#[derive(Clone, Copy, Debug, PartialEq)]
1312pub enum FlushErrorKind {
1313    /// Sending the flush failed client side.
1314    SendError,
1315    /// Flush failed.
1316    /// This can happen mostly in case of connection issues
1317    /// that cannot be resolved quickly.
1318    FlushError,
1319}
1320
1321impl Display for FlushErrorKind {
1322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1323        match self {
1324            Self::SendError => write!(f, "failed to send flush request"),
1325            Self::FlushError => write!(f, "flush failed"),
1326        }
1327    }
1328}
1329
1330pub type FlushError = Error<FlushErrorKind>;
1331
1332/// Represents statistics for the instance of the client throughout its lifecycle.
1333#[derive(Default, Debug)]
1334pub struct Statistics {
1335    /// Number of bytes received. This does not include the protocol overhead.
1336    pub in_bytes: AtomicU64,
1337    /// Number of bytes sent. This doe not include the protocol overhead.
1338    pub out_bytes: AtomicU64,
1339    /// Number of messages received.
1340    pub in_messages: AtomicU64,
1341    /// Number of messages sent.
1342    pub out_messages: AtomicU64,
1343    /// Number of times connection was established.
1344    /// Initial connect will be counted as well, then all successful reconnects.
1345    pub connects: AtomicU64,
1346}