Skip to main content

async_nats/
client.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::message::OutboundMessage;
20use crate::subject::ToSubject;
21use crate::ServerInfo;
22
23use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
24use crate::error::Error;
25use bytes::Bytes;
26use futures_util::future::TryFutureExt;
27use futures_util::{Sink, SinkExt as _, StreamExt};
28use portable_atomic::AtomicU64;
29use regex::Regex;
30use std::fmt::Display;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::sync::Arc;
33use std::sync::LazyLock;
34use std::time::Duration;
35use thiserror::Error;
36use tokio::sync::{mpsc, oneshot};
37use tokio_util::sync::PollSender;
38
39static VERSION_RE: LazyLock<Regex> =
40    LazyLock::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
41
42/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
43/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
44pub type PublishError = Error<PublishErrorKind>;
45
46impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
47    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
48        PublishError::with_source(PublishErrorKind::Send, err)
49    }
50}
51
52impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
53    fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
54        PublishError::with_source(PublishErrorKind::Send, err)
55    }
56}
57
58#[derive(Copy, Clone, Debug, PartialEq)]
59pub enum PublishErrorKind {
60    MaxPayloadExceeded,
61    InvalidSubject,
62    Send,
63}
64
65impl Display for PublishErrorKind {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        match self {
68            PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
69            PublishErrorKind::Send => write!(f, "failed to send message"),
70            PublishErrorKind::InvalidSubject => write!(f, "invalid subject"),
71        }
72    }
73}
74
75/// Client is a `Cloneable` handle to NATS connection.
76/// Client should not be created directly. Instead, one of two methods can be used:
77/// [crate::connect] and [crate::ConnectOptions::connect]
78#[derive(Clone, Debug)]
79pub struct Client {
80    info: tokio::sync::watch::Receiver<ServerInfo>,
81    pub(crate) state: tokio::sync::watch::Receiver<State>,
82    pub(crate) sender: mpsc::Sender<Command>,
83    poll_sender: PollSender<Command>,
84    next_subscription_id: Arc<AtomicU64>,
85    subscription_capacity: usize,
86    inbox_prefix: Arc<str>,
87    request_timeout: Option<Duration>,
88    max_payload: Arc<AtomicUsize>,
89    connection_stats: Arc<Statistics>,
90    skip_subject_validation: bool,
91}
92
93pub mod traits {
94    use std::{future::Future, time::Duration};
95
96    use bytes::Bytes;
97
98    use crate::{message, subject::ToSubject, Message};
99
100    use super::{PublishError, Request, RequestError, SubscribeError};
101
102    pub trait Publisher {
103        fn publish_with_reply<S, R>(
104            &self,
105            subject: S,
106            reply: R,
107            payload: Bytes,
108        ) -> impl Future<Output = Result<(), PublishError>>
109        where
110            S: ToSubject,
111            R: ToSubject;
112
113        fn publish_message(
114            &self,
115            msg: message::OutboundMessage,
116        ) -> impl Future<Output = Result<(), PublishError>>;
117    }
118    pub trait Subscriber {
119        fn subscribe<S>(
120            &self,
121            subject: S,
122        ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>
123        where
124            S: ToSubject;
125    }
126    pub trait Requester {
127        fn send_request<S>(
128            &self,
129            subject: S,
130            request: Request,
131        ) -> impl Future<Output = Result<Message, RequestError>>
132        where
133            S: ToSubject;
134    }
135    pub trait TimeoutProvider {
136        fn timeout(&self) -> Option<Duration>;
137    }
138}
139
140impl traits::Requester for Client {
141    fn send_request<S: ToSubject>(
142        &self,
143        subject: S,
144        request: Request,
145    ) -> impl Future<Output = Result<Message, RequestError>> {
146        self.send_request(subject, request)
147    }
148}
149
150impl traits::TimeoutProvider for Client {
151    fn timeout(&self) -> Option<Duration> {
152        self.timeout()
153    }
154}
155
156impl traits::Publisher for Client {
157    fn publish_with_reply<S, R>(
158        &self,
159        subject: S,
160        reply: R,
161        payload: Bytes,
162    ) -> impl Future<Output = Result<(), PublishError>>
163    where
164        S: ToSubject,
165        R: ToSubject,
166    {
167        self.publish_with_reply(subject, reply, payload)
168    }
169
170    async fn publish_message(&self, msg: OutboundMessage) -> Result<(), PublishError> {
171        if msg.subject.is_empty()
172            || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&msg.subject))
173        {
174            return Err(PublishError::with_source(
175                PublishErrorKind::InvalidSubject,
176                crate::subject::SubjectError::InvalidFormat,
177            ));
178        }
179        self.sender
180            .send(Command::Publish(msg))
181            .await
182            .map_err(Into::into)
183    }
184}
185
186impl traits::Subscriber for Client {
187    fn subscribe<S>(&self, subject: S) -> impl Future<Output = Result<Subscriber, SubscribeError>>
188    where
189        S: ToSubject,
190    {
191        self.subscribe(subject)
192    }
193}
194
195impl Sink<OutboundMessage> for Client {
196    type Error = PublishError;
197
198    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199        self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
200    }
201
202    fn start_send(mut self: Pin<&mut Self>, msg: OutboundMessage) -> Result<(), Self::Error> {
203        self.poll_sender
204            .start_send_unpin(Command::Publish(msg))
205            .map_err(Into::into)
206    }
207
208    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
209        self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
210    }
211
212    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
213        self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
214    }
215}
216
217impl Client {
218    #[allow(clippy::too_many_arguments)]
219    pub(crate) fn new(
220        info: tokio::sync::watch::Receiver<ServerInfo>,
221        state: tokio::sync::watch::Receiver<State>,
222        sender: mpsc::Sender<Command>,
223        capacity: usize,
224        inbox_prefix: String,
225        request_timeout: Option<Duration>,
226        max_payload: Arc<AtomicUsize>,
227        statistics: Arc<Statistics>,
228        skip_subject_validation: bool,
229    ) -> Client {
230        let poll_sender = PollSender::new(sender.clone());
231        Client {
232            info,
233            state,
234            sender,
235            poll_sender,
236            next_subscription_id: Arc::new(AtomicU64::new(1)),
237            subscription_capacity: capacity,
238            inbox_prefix: inbox_prefix.into(),
239            request_timeout,
240            max_payload,
241            connection_stats: statistics,
242            skip_subject_validation,
243        }
244    }
245
246    /// Validates a subject for publishing (protocol-framing safety only).
247    /// Checks for empty and whitespace. Does not check dot structure.
248    pub(crate) fn maybe_validate_publish_subject<S: ToSubject>(
249        &self,
250        subject: S,
251    ) -> Result<crate::Subject, crate::subject::SubjectError> {
252        let subject = subject.to_subject();
253        if subject.is_empty()
254            || (!self.skip_subject_validation && !crate::is_valid_publish_subject(&subject))
255        {
256            return Err(crate::subject::SubjectError::InvalidFormat);
257        }
258        Ok(subject)
259    }
260
261    /// Validates a subject for subscribing (protocol-framing + dot structure).
262    /// Always runs regardless of `skip_subject_validation` (matches Go/Java behavior).
263    pub(crate) fn validate_subscribe_subject<S: ToSubject>(
264        &self,
265        subject: S,
266    ) -> Result<crate::Subject, crate::subject::SubjectError> {
267        let subject = subject.to_subject();
268        if !subject.is_valid() {
269            return Err(crate::subject::SubjectError::InvalidFormat);
270        }
271        Ok(subject)
272    }
273
274    /// Returns the default timeout for requests set when creating the client.
275    ///
276    /// # Examples
277    /// ```no_run
278    /// # #[tokio::main]
279    /// # async fn main() -> Result<(), async_nats::Error> {
280    /// let client = async_nats::connect("demo.nats.io").await?;
281    /// println!("default request timeout: {:?}", client.timeout());
282    /// # Ok(())
283    /// # }
284    /// ```
285    pub fn timeout(&self) -> Option<Duration> {
286        self.request_timeout
287    }
288
289    /// Returns last received info from the server.
290    ///
291    /// # Examples
292    ///
293    /// ```no_run
294    /// # #[tokio::main]
295    /// # async fn main () -> Result<(), async_nats::Error> {
296    /// let client = async_nats::connect("demo.nats.io").await?;
297    /// println!("info: {:?}", client.server_info());
298    /// # Ok(())
299    /// # }
300    /// ```
301    pub fn server_info(&self) -> ServerInfo {
302        // We ignore notifying the watcher, as that requires mutable client reference.
303        self.info.borrow().to_owned()
304    }
305
306    /// Returns true if the server version is compatible with the version components.
307    ///
308    /// This has to be used with caution, as it is not guaranteed that the server
309    /// that client is connected to is the same version that the one that is
310    /// a JetStream meta/stream/consumer leader, especially across leafnodes.
311    ///
312    /// # Examples
313    ///
314    /// ```no_run
315    /// # #[tokio::main]
316    /// # async fn main() -> Result<(), async_nats::Error> {
317    /// let client = async_nats::connect("demo.nats.io").await?;
318    /// assert!(client.is_server_compatible(2, 8, 4));
319    /// # Ok(())
320    /// # }
321    /// ```
322    pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
323        let info = self.server_info();
324
325        let server_version_captures = match VERSION_RE.captures(&info.version) {
326            Some(captures) => captures,
327            None => return false,
328        };
329
330        let server_major = server_version_captures
331            .get(1)
332            .map(|m| m.as_str().parse::<i64>().unwrap())
333            .unwrap();
334
335        let server_minor = server_version_captures
336            .get(2)
337            .map(|m| m.as_str().parse::<i64>().unwrap())
338            .unwrap();
339
340        let server_patch = server_version_captures
341            .get(3)
342            .map(|m| m.as_str().parse::<i64>().unwrap())
343            .unwrap();
344
345        if server_major < major
346            || (server_major == major && server_minor < minor)
347            || (server_major == major && server_minor == minor && server_patch < patch)
348        {
349            return false;
350        }
351        true
352    }
353
354    /// Publish a [Message] to a given subject.
355    ///
356    /// Returns `PublishErrorKind::InvalidSubject` if the subject is invalid
357    /// (empty or contains whitespace). This check can be disabled with
358    /// [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation] (empty subjects are
359    /// always rejected).
360    ///
361    /// # Examples
362    /// ```no_run
363    /// # #[tokio::main]
364    /// # async fn main() -> Result<(), async_nats::Error> {
365    /// let client = async_nats::connect("demo.nats.io").await?;
366    /// client.publish("events.data", "payload".into()).await?;
367    /// # Ok(())
368    /// # }
369    /// ```
370    pub async fn publish<S: ToSubject>(
371        &self,
372        subject: S,
373        payload: Bytes,
374    ) -> Result<(), PublishError> {
375        let subject = self
376            .maybe_validate_publish_subject(subject)
377            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
378
379        let max_payload = self.max_payload.load(Ordering::Relaxed);
380        if payload.len() > max_payload {
381            return Err(PublishError::with_source(
382                PublishErrorKind::MaxPayloadExceeded,
383                format!(
384                    "Payload size limit of {} exceeded by message size of {}",
385                    max_payload,
386                    payload.len(),
387                ),
388            ));
389        }
390
391        self.sender
392            .send(Command::Publish(OutboundMessage {
393                subject,
394                payload,
395                reply: None,
396                headers: None,
397            }))
398            .await?;
399        Ok(())
400    }
401
402    /// Publish a [Message] with headers to a given subject.
403    ///
404    /// # Examples
405    /// ```
406    /// # #[tokio::main]
407    /// # async fn main() -> Result<(), async_nats::Error> {
408    /// use std::str::FromStr;
409    /// let client = async_nats::connect("demo.nats.io").await?;
410    /// let mut headers = async_nats::HeaderMap::new();
411    /// headers.insert(
412    ///     "X-Header",
413    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
414    /// );
415    /// client
416    ///     .publish_with_headers("events.data", headers, "payload".into())
417    ///     .await?;
418    /// # Ok(())
419    /// # }
420    /// ```
421    pub async fn publish_with_headers<S: ToSubject>(
422        &self,
423        subject: S,
424        headers: HeaderMap,
425        payload: Bytes,
426    ) -> Result<(), PublishError> {
427        let subject = self
428            .maybe_validate_publish_subject(subject)
429            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
430
431        self.sender
432            .send(Command::Publish(OutboundMessage {
433                subject,
434                payload,
435                reply: None,
436                headers: Some(headers),
437            }))
438            .await?;
439        Ok(())
440    }
441
442    /// Publish a [Message] to a given subject, with specified response subject
443    /// to which the subscriber can respond.
444    /// This method does not await for the response.
445    ///
446    /// # Examples
447    ///
448    /// ```no_run
449    /// # #[tokio::main]
450    /// # async fn main() -> Result<(), async_nats::Error> {
451    /// let client = async_nats::connect("demo.nats.io").await?;
452    /// client
453    ///     .publish_with_reply("events.data", "reply_subject", "payload".into())
454    ///     .await?;
455    /// # Ok(())
456    /// # }
457    /// ```
458    pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
459        &self,
460        subject: S,
461        reply: R,
462        payload: Bytes,
463    ) -> Result<(), PublishError> {
464        let subject = self
465            .maybe_validate_publish_subject(subject)
466            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
467        let reply = self
468            .maybe_validate_publish_subject(reply)
469            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
470
471        self.sender
472            .send(Command::Publish(OutboundMessage {
473                subject,
474                payload,
475                reply: Some(reply),
476                headers: None,
477            }))
478            .await?;
479        Ok(())
480    }
481
482    /// Publish a [Message] to a given subject with headers and specified response subject
483    /// to which the subscriber can respond.
484    /// This method does not await for the response.
485    ///
486    /// # Examples
487    ///
488    /// ```no_run
489    /// # #[tokio::main]
490    /// # async fn main() -> Result<(), async_nats::Error> {
491    /// use std::str::FromStr;
492    /// let client = async_nats::connect("demo.nats.io").await?;
493    /// let mut headers = async_nats::HeaderMap::new();
494    /// client
495    ///     .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
496    ///     .await?;
497    /// # Ok(())
498    /// # }
499    /// ```
500    pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
501        &self,
502        subject: S,
503        reply: R,
504        headers: HeaderMap,
505        payload: Bytes,
506    ) -> Result<(), PublishError> {
507        let subject = self
508            .maybe_validate_publish_subject(subject)
509            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
510        let reply = self
511            .maybe_validate_publish_subject(reply)
512            .map_err(|e| PublishError::with_source(PublishErrorKind::InvalidSubject, e))?;
513
514        self.sender
515            .send(Command::Publish(OutboundMessage {
516                subject,
517                payload,
518                reply: Some(reply),
519                headers: Some(headers),
520            }))
521            .await?;
522        Ok(())
523    }
524
525    /// Sends the request with headers.
526    ///
527    /// # Examples
528    /// ```no_run
529    /// # #[tokio::main]
530    /// # async fn main() -> Result<(), async_nats::Error> {
531    /// let client = async_nats::connect("demo.nats.io").await?;
532    /// let response = client.request("service", "data".into()).await?;
533    /// # Ok(())
534    /// # }
535    /// ```
536    pub async fn request<S: ToSubject>(
537        &self,
538        subject: S,
539        payload: Bytes,
540    ) -> Result<Message, RequestError> {
541        let request = Request::new().payload(payload);
542        self.send_request(subject, request).await
543    }
544
545    /// Sends the request with headers.
546    ///
547    /// # Examples
548    /// ```no_run
549    /// # #[tokio::main]
550    /// # async fn main() -> Result<(), async_nats::Error> {
551    /// let client = async_nats::connect("demo.nats.io").await?;
552    /// let mut headers = async_nats::HeaderMap::new();
553    /// headers.insert("Key", "Value");
554    /// let response = client
555    ///     .request_with_headers("service", headers, "data".into())
556    ///     .await?;
557    /// # Ok(())
558    /// # }
559    /// ```
560    pub async fn request_with_headers<S: ToSubject>(
561        &self,
562        subject: S,
563        headers: HeaderMap,
564        payload: Bytes,
565    ) -> Result<Message, RequestError> {
566        let request = Request::new().headers(headers).payload(payload);
567        self.send_request(subject, request).await
568    }
569
570    /// Sends the request created by the [Request].
571    ///
572    /// Returns `RequestErrorKind::InvalidSubject` if the subject is invalid
573    /// (empty or contains whitespace).
574    ///
575    /// # Examples
576    ///
577    /// ```no_run
578    /// # #[tokio::main]
579    /// # async fn main() -> Result<(), async_nats::Error> {
580    /// let client = async_nats::connect("demo.nats.io").await?;
581    /// let request = async_nats::Request::new().payload("data".into());
582    /// let response = client.send_request("service", request).await?;
583    /// # Ok(())
584    /// # }
585    /// ```
586    pub async fn send_request<S: ToSubject>(
587        &self,
588        subject: S,
589        request: Request,
590    ) -> Result<Message, RequestError> {
591        let subject = self
592            .maybe_validate_publish_subject(subject)
593            .map_err(|e| RequestError::with_source(RequestErrorKind::InvalidSubject, e))?;
594
595        if let Some(inbox) = request.inbox {
596            let timeout = request.timeout.unwrap_or(self.request_timeout);
597            let mut subscriber = self.subscribe(inbox.clone()).await?;
598            let payload: Bytes = request.payload.unwrap_or_default();
599            match request.headers {
600                Some(headers) => {
601                    self.publish_with_reply_and_headers(subject, inbox, headers, payload)
602                        .await?
603                }
604                None => self.publish_with_reply(subject, inbox, payload).await?,
605            }
606            let request = match timeout {
607                Some(timeout) => {
608                    tokio::time::timeout(timeout, subscriber.next())
609                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
610                        .await?
611                }
612                None => subscriber.next().await,
613            };
614            match request {
615                Some(message) => {
616                    if message.status == Some(StatusCode::NO_RESPONDERS) {
617                        return Err(RequestError::with_source(
618                            RequestErrorKind::NoResponders,
619                            "no responders",
620                        ));
621                    }
622                    Ok(message)
623                }
624                None => Err(RequestError::with_source(
625                    RequestErrorKind::Other,
626                    "broken pipe",
627                )),
628            }
629        } else {
630            let (sender, receiver) = oneshot::channel();
631
632            let payload = request.payload.unwrap_or_default();
633            let respond = self.new_inbox().into();
634            let headers = request.headers;
635
636            self.sender
637                .send(Command::Request {
638                    subject,
639                    payload,
640                    respond,
641                    headers,
642                    sender,
643                })
644                .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
645                .await?;
646
647            let timeout = request.timeout.unwrap_or(self.request_timeout);
648            let request = match timeout {
649                Some(timeout) => {
650                    tokio::time::timeout(timeout, receiver)
651                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
652                        .await?
653                }
654                None => receiver.await,
655            };
656
657            match request {
658                Ok(message) => {
659                    if message.status == Some(StatusCode::NO_RESPONDERS) {
660                        return Err(RequestError::with_source(
661                            RequestErrorKind::NoResponders,
662                            "no responders",
663                        ));
664                    }
665                    Ok(message)
666                }
667                Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
668            }
669        }
670    }
671
672    /// Create a new globally unique inbox which can be used for replies.
673    ///
674    /// # Examples
675    ///
676    /// ```no_run
677    /// # #[tokio::main]
678    /// # async fn main() -> Result<(), async_nats::Error> {
679    /// # let mut nc = async_nats::connect("demo.nats.io").await?;
680    /// let reply = nc.new_inbox();
681    /// let rsub = nc.subscribe(reply).await?;
682    /// # Ok(())
683    /// # }
684    /// ```
685    pub fn new_inbox(&self) -> String {
686        format!("{}.{}", self.inbox_prefix, crate::id_generator::next())
687    }
688
689    /// Subscribes to a subject to receive [messages][Message].
690    ///
691    /// Returns an error if the subject is invalid (empty, contains whitespace,
692    /// or has malformed dot structure). This validation always runs regardless
693    /// of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
694    ///
695    /// # Examples
696    ///
697    /// ```no_run
698    /// # #[tokio::main]
699    /// # async fn main() -> Result<(), async_nats::Error> {
700    /// use futures_util::StreamExt;
701    /// let client = async_nats::connect("demo.nats.io").await?;
702    /// let mut subscription = client.subscribe("events.>").await?;
703    /// while let Some(message) = subscription.next().await {
704    ///     println!("received message: {:?}", message);
705    /// }
706    /// # Ok(())
707    /// # }
708    /// ```
709    pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
710        let subject = self
711            .validate_subscribe_subject(subject)
712            .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
713
714        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
715        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
716
717        self.sender
718            .send(Command::Subscribe {
719                sid,
720                subject,
721                queue_group: None,
722                sender,
723            })
724            .await?;
725
726        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
727    }
728
729    /// Subscribes to a subject with a queue group to receive [messages][Message].
730    ///
731    /// Returns an error if the subject is invalid (empty, contains whitespace,
732    /// or has malformed dot structure) or if the queue group name is invalid
733    /// (empty or contains whitespace). Subject and queue group validation always
734    /// runs regardless of [`ConnectOptions::skip_subject_validation`][crate::ConnectOptions::skip_subject_validation].
735    ///
736    /// # Examples
737    ///
738    /// ```no_run
739    /// # #[tokio::main]
740    /// # async fn main() -> Result<(), async_nats::Error> {
741    /// use futures_util::StreamExt;
742    /// let client = async_nats::connect("demo.nats.io").await?;
743    /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
744    /// while let Some(message) = subscription.next().await {
745    ///     println!("received message: {:?}", message);
746    /// }
747    /// # Ok(())
748    /// # }
749    /// ```
750    pub async fn queue_subscribe<S: ToSubject>(
751        &self,
752        subject: S,
753        queue_group: String,
754    ) -> Result<Subscriber, SubscribeError> {
755        let subject = self
756            .validate_subscribe_subject(subject)
757            .map_err(|e| SubscribeError::with_source(SubscribeErrorKind::InvalidSubject, e))?;
758
759        if !crate::is_valid_queue_group(&queue_group) {
760            return Err(SubscribeError::new(SubscribeErrorKind::InvalidQueueName));
761        }
762
763        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
764        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
765
766        self.sender
767            .send(Command::Subscribe {
768                sid,
769                subject,
770                queue_group: Some(queue_group),
771                sender,
772            })
773            .await?;
774
775        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
776    }
777
778    /// Flushes the internal buffer ensuring that all messages are sent.
779    ///
780    /// # Examples
781    ///
782    /// ```no_run
783    /// # #[tokio::main]
784    /// # async fn main() -> Result<(), async_nats::Error> {
785    /// let client = async_nats::connect("demo.nats.io").await?;
786    /// client.flush().await?;
787    /// # Ok(())
788    /// # }
789    /// ```
790    pub async fn flush(&self) -> Result<(), FlushError> {
791        let (tx, rx) = tokio::sync::oneshot::channel();
792        self.sender
793            .send(Command::Flush { observer: tx })
794            .await
795            .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
796
797        rx.await
798            .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
799        Ok(())
800    }
801
802    /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
803    /// messages, then closes the connection. Once completed, any streams associated with the
804    /// connection and its [Clients](crate::Client) will be closed, and further [crate::Client] commands will fail.
805    ///
806    /// # Examples
807    ///
808    /// ```no_run
809    /// # #[tokio::main]
810    /// # async fn main() -> Result<(), async_nats::Error> {
811    /// use futures_util::StreamExt;
812    /// let client = async_nats::connect("demo.nats.io").await?;
813    /// let mut subscription = client.subscribe("events.>").await?;
814    ///
815    /// client.drain().await?;
816    ///
817    /// # // existing subscriptions are closed and further commands will fail
818    /// assert!(subscription.next().await.is_none());
819    /// client
820    ///     .subscribe("events.>")
821    ///     .await
822    ///     .expect_err("Expected further commands to fail");
823    ///
824    /// # Ok(())
825    /// # }
826    /// ```
827    pub async fn drain(&self) -> Result<(), DrainError> {
828        // Drain all subscriptions
829        self.sender.send(Command::Drain { sid: None }).await?;
830
831        // Remaining process is handled on the handler-side
832        Ok(())
833    }
834
835    /// Returns the current state of the connection.
836    ///
837    /// # Examples
838    /// ```no_run
839    /// # #[tokio::main]
840    /// # async fn main() -> Result<(), async_nats::Error> {
841    /// let client = async_nats::connect("demo.nats.io").await?;
842    /// println!("connection state: {}", client.connection_state());
843    /// # Ok(())
844    /// # }
845    /// ```
846    pub fn connection_state(&self) -> State {
847        self.state.borrow().to_owned()
848    }
849
850    /// Forces the client to reconnect.
851    /// Keep in mind that client will reconnect automatically if the connection is lost and this
852    /// method does not have to be used in normal circumstances.
853    /// However, if you want to force the client to reconnect, for example to re-trigger
854    /// the `auth-callback`, or manually rebalance connections, this method can be useful.
855    /// This method does not wait for connection to be re-established.
856    ///
857    /// # Examples
858    /// ```no_run
859    /// # #[tokio::main]
860    /// # async fn main() -> Result<(), async_nats::Error> {
861    /// let client = async_nats::connect("demo.nats.io").await?;
862    /// client.force_reconnect().await?;
863    /// # Ok(())
864    /// # }
865    /// ```
866    pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
867        self.sender
868            .send(Command::Reconnect)
869            .await
870            .map_err(Into::into)
871    }
872
873    /// Returns struct representing statistics of the whole lifecycle of the client.
874    /// This includes number of bytes sent/received, number of messages sent/received,
875    /// and number of times the connection was established.
876    /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
877    /// across threads.
878    ///
879    /// # Examples
880    /// ```no_run
881    /// # #[tokio::main]
882    /// # async fn main() -> Result<(), async_nats::Error> {
883    /// let client = async_nats::connect("demo.nats.io").await?;
884    /// let statistics = client.statistics();
885    /// println!("client statistics: {:#?}", statistics);
886    /// # Ok(())
887    /// # }
888    /// ```
889    pub fn statistics(&self) -> Arc<Statistics> {
890        self.connection_stats.clone()
891    }
892}
893
894/// Used for building customized requests.
895#[derive(Default)]
896pub struct Request {
897    pub payload: Option<Bytes>,
898    pub headers: Option<HeaderMap>,
899    pub timeout: Option<Option<Duration>>,
900    pub inbox: Option<String>,
901}
902
903impl Request {
904    pub fn new() -> Request {
905        Default::default()
906    }
907
908    /// Sets the payload of the request. If not used, empty payload will be sent.
909    ///
910    /// # Examples
911    /// ```no_run
912    /// # #[tokio::main]
913    /// # async fn main() -> Result<(), async_nats::Error> {
914    /// let client = async_nats::connect("demo.nats.io").await?;
915    /// let request = async_nats::Request::new().payload("data".into());
916    /// client.send_request("service", request).await?;
917    /// # Ok(())
918    /// # }
919    /// ```
920    pub fn payload(mut self, payload: Bytes) -> Request {
921        self.payload = Some(payload);
922        self
923    }
924
925    /// Sets the headers of the requests.
926    ///
927    /// # Examples
928    /// ```no_run
929    /// # #[tokio::main]
930    /// # async fn main() -> Result<(), async_nats::Error> {
931    /// use std::str::FromStr;
932    /// let client = async_nats::connect("demo.nats.io").await?;
933    /// let mut headers = async_nats::HeaderMap::new();
934    /// headers.insert(
935    ///     "X-Example",
936    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
937    /// );
938    /// let request = async_nats::Request::new()
939    ///     .headers(headers)
940    ///     .payload("data".into());
941    /// client.send_request("service", request).await?;
942    /// # Ok(())
943    /// # }
944    /// ```
945    pub fn headers(mut self, headers: HeaderMap) -> Request {
946        self.headers = Some(headers);
947        self
948    }
949
950    /// Sets the custom timeout of the request. Overrides default [Client] timeout.
951    /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
952    /// To use default timeout, simply do not call this function.
953    ///
954    /// # Examples
955    /// ```no_run
956    /// # #[tokio::main]
957    /// # async fn main() -> Result<(), async_nats::Error> {
958    /// let client = async_nats::connect("demo.nats.io").await?;
959    /// let request = async_nats::Request::new()
960    ///     .timeout(Some(std::time::Duration::from_secs(15)))
961    ///     .payload("data".into());
962    /// client.send_request("service", request).await?;
963    /// # Ok(())
964    /// # }
965    /// ```
966    pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
967        self.timeout = Some(timeout);
968        self
969    }
970
971    /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
972    ///
973    /// # Examples
974    /// ```no_run
975    /// # #[tokio::main]
976    /// # async fn main() -> Result<(), async_nats::Error> {
977    /// use std::str::FromStr;
978    /// let client = async_nats::connect("demo.nats.io").await?;
979    /// let request = async_nats::Request::new()
980    ///     .inbox("custom_inbox".into())
981    ///     .payload("data".into());
982    /// client.send_request("service", request).await?;
983    /// # Ok(())
984    /// # }
985    /// ```
986    pub fn inbox(mut self, inbox: String) -> Request {
987        self.inbox = Some(inbox);
988        self
989    }
990}
991
992#[derive(Error, Debug)]
993#[error("failed to send reconnect: {0}")]
994pub struct ReconnectError(#[source] crate::Error);
995
996impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
997    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
998        ReconnectError(Box::new(err))
999    }
1000}
1001
1002/// An error returned from the [`Client::subscribe`] or [`Client::queue_subscribe`] functions.
1003pub type SubscribeError = Error<SubscribeErrorKind>;
1004
1005impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
1006    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1007        SubscribeError::with_source(SubscribeErrorKind::Other, err)
1008    }
1009}
1010
1011#[derive(Copy, Clone, Debug, PartialEq)]
1012pub enum SubscribeErrorKind {
1013    /// The subject is invalid (empty, contains whitespace, or has malformed dot structure).
1014    InvalidSubject,
1015    /// The queue group name is invalid (empty or contains whitespace).
1016    InvalidQueueName,
1017    /// Other errors, client/io related.
1018    Other,
1019}
1020
1021impl Display for SubscribeErrorKind {
1022    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1023        match self {
1024            Self::InvalidSubject => write!(f, "invalid subject"),
1025            Self::InvalidQueueName => write!(f, "invalid queue name"),
1026            Self::Other => write!(f, "subscribe failed"),
1027        }
1028    }
1029}
1030
1031#[derive(Error, Debug)]
1032#[error("failed to send drain: {0}")]
1033pub struct DrainError(#[source] crate::Error);
1034
1035impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
1036    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1037        DrainError(Box::new(err))
1038    }
1039}
1040
1041#[derive(Clone, Copy, Debug, PartialEq)]
1042pub enum RequestErrorKind {
1043    /// There are services listening on requested subject, but they didn't respond
1044    /// in time.
1045    TimedOut,
1046    /// No one is listening on request subject.
1047    NoResponders,
1048    /// The subject is invalid (empty or contains whitespace).
1049    InvalidSubject,
1050    /// Other errors, client/io related.
1051    Other,
1052}
1053
1054impl Display for RequestErrorKind {
1055    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1056        match self {
1057            Self::TimedOut => write!(f, "request timed out"),
1058            Self::NoResponders => write!(f, "no responders"),
1059            Self::InvalidSubject => write!(f, "invalid subject"),
1060            Self::Other => write!(f, "request failed"),
1061        }
1062    }
1063}
1064
1065/// Error returned when a core NATS request fails.
1066/// To be enumerate over the variants, call [RequestError::kind].
1067pub type RequestError = Error<RequestErrorKind>;
1068
1069impl From<PublishError> for RequestError {
1070    fn from(e: PublishError) -> Self {
1071        RequestError::with_source(RequestErrorKind::Other, e)
1072    }
1073}
1074
1075impl From<SubscribeError> for RequestError {
1076    fn from(e: SubscribeError) -> Self {
1077        RequestError::with_source(RequestErrorKind::Other, e)
1078    }
1079}
1080
1081#[derive(Clone, Copy, Debug, PartialEq)]
1082pub enum FlushErrorKind {
1083    /// Sending the flush failed client side.
1084    SendError,
1085    /// Flush failed.
1086    /// This can happen mostly in case of connection issues
1087    /// that cannot be resolved quickly.
1088    FlushError,
1089}
1090
1091impl Display for FlushErrorKind {
1092    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1093        match self {
1094            Self::SendError => write!(f, "failed to send flush request"),
1095            Self::FlushError => write!(f, "flush failed"),
1096        }
1097    }
1098}
1099
1100pub type FlushError = Error<FlushErrorKind>;
1101
1102/// Represents statistics for the instance of the client throughout its lifecycle.
1103#[derive(Default, Debug)]
1104pub struct Statistics {
1105    /// Number of bytes received. This does not include the protocol overhead.
1106    pub in_bytes: AtomicU64,
1107    /// Number of bytes sent. This doe not include the protocol overhead.
1108    pub out_bytes: AtomicU64,
1109    /// Number of messages received.
1110    pub in_messages: AtomicU64,
1111    /// Number of messages sent.
1112    pub out_messages: AtomicU64,
1113    /// Number of times connection was established.
1114    /// Initial connect will be counted as well, then all successful reconnects.
1115    pub connects: AtomicU64,
1116}