Skip to main content

rivet_async_nats/
client.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::message::OutboundMessage;
20use crate::subject::ToSubject;
21use crate::ServerInfo;
22
23use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
24use crate::SubscriberStatistics;
25use crate::error::Error;
26use bytes::Bytes;
27use futures_util::future::TryFutureExt;
28use futures_util::{Sink, SinkExt as _, StreamExt};
29use portable_atomic::AtomicU64;
30use regex::Regex;
31use std::fmt::Display;
32use std::sync::atomic::{AtomicUsize, Ordering};
33use std::sync::Arc;
34use std::sync::LazyLock;
35use std::time::Duration;
36use thiserror::Error;
37use tokio::sync::{mpsc, oneshot};
38use tokio_util::sync::PollSender;
39use tracing::trace;
40
41static VERSION_RE: LazyLock<Regex> =
42    LazyLock::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
43
44/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
45/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
46pub type PublishError = Error<PublishErrorKind>;
47
48impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
49    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
50        PublishError::with_source(PublishErrorKind::Send, err)
51    }
52}
53
54impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
55    fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
56        PublishError::with_source(PublishErrorKind::Send, err)
57    }
58}
59
60#[derive(Copy, Clone, Debug, PartialEq)]
61pub enum PublishErrorKind {
62    MaxPayloadExceeded,
63    BadSubject,
64    Send,
65}
66
67impl Display for PublishErrorKind {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        match self {
70            PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
71            PublishErrorKind::Send => write!(f, "failed to send message"),
72            PublishErrorKind::BadSubject => write!(f, "bad subject"),
73        }
74    }
75}
76
77/// Client is a `Cloneable` handle to NATS connection.
78/// Client should not be created directly. Instead, one of two methods can be used:
79/// [crate::connect] and [crate::ConnectOptions::connect]
80#[derive(Clone, Debug)]
81pub struct Client {
82    info: tokio::sync::watch::Receiver<ServerInfo>,
83    pub(crate) state: tokio::sync::watch::Receiver<State>,
84    pub(crate) sender: mpsc::Sender<Command>,
85    poll_sender: PollSender<Command>,
86    next_subscription_id: Arc<AtomicU64>,
87    subscription_capacity: usize,
88    inbox_prefix: Arc<str>,
89    request_timeout: Option<Duration>,
90    max_payload: Arc<AtomicUsize>,
91    connection_stats: Arc<Statistics>,
92}
93
94pub mod traits {
95    use std::{future::Future, time::Duration};
96
97    use bytes::Bytes;
98
99    use crate::{message, subject::ToSubject, Message};
100
101    use super::{PublishError, Request, RequestError, SubscribeError};
102
103    pub trait Publisher {
104        fn publish_with_reply<S: ToSubject, R: ToSubject>(
105            &self,
106            subject: S,
107            reply: R,
108            payload: Bytes,
109        ) -> impl Future<Output = Result<(), PublishError>>;
110
111        fn publish_message(
112            &self,
113            msg: message::OutboundMessage,
114        ) -> impl Future<Output = Result<(), PublishError>>;
115    }
116    pub trait Subscriber {
117        fn subscribe<S: ToSubject>(
118            &self,
119            subject: S,
120        ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>;
121    }
122    pub trait Requester {
123        fn send_request<S: ToSubject>(
124            &self,
125            subject: S,
126            request: Request,
127        ) -> impl Future<Output = Result<Message, RequestError>>;
128    }
129    pub trait TimeoutProvider {
130        fn timeout(&self) -> Option<Duration>;
131    }
132}
133
134impl traits::Requester for Client {
135    fn send_request<S: ToSubject>(
136        &self,
137        subject: S,
138        request: Request,
139    ) -> impl Future<Output = Result<Message, RequestError>> {
140        self.send_request(subject, request)
141    }
142}
143
144impl traits::TimeoutProvider for Client {
145    fn timeout(&self) -> Option<Duration> {
146        self.timeout()
147    }
148}
149
150impl traits::Publisher for Client {
151    fn publish_with_reply<S: ToSubject, R: ToSubject>(
152        &self,
153        subject: S,
154        reply: R,
155        payload: Bytes,
156    ) -> impl Future<Output = Result<(), PublishError>> {
157        self.publish_with_reply(subject, reply, payload)
158    }
159
160    async fn publish_message(&self, msg: OutboundMessage) -> Result<(), PublishError> {
161        self.sender
162            .send(Command::Publish(msg))
163            .await
164            .map_err(Into::into)
165    }
166}
167
168impl traits::Subscriber for Client {
169    fn subscribe<S: ToSubject>(
170        &self,
171        subject: S,
172    ) -> impl Future<Output = Result<Subscriber, SubscribeError>> {
173        self.subscribe(subject)
174    }
175}
176
177impl Sink<OutboundMessage> for Client {
178    type Error = PublishError;
179
180    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181        self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
182    }
183
184    fn start_send(mut self: Pin<&mut Self>, msg: OutboundMessage) -> Result<(), Self::Error> {
185        self.poll_sender
186            .start_send_unpin(Command::Publish(msg))
187            .map_err(Into::into)
188    }
189
190    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
191        self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
192    }
193
194    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195        self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
196    }
197}
198
199impl Client {
200    #[allow(clippy::too_many_arguments)]
201    pub(crate) fn new(
202        info: tokio::sync::watch::Receiver<ServerInfo>,
203        state: tokio::sync::watch::Receiver<State>,
204        sender: mpsc::Sender<Command>,
205        capacity: usize,
206        inbox_prefix: String,
207        request_timeout: Option<Duration>,
208        max_payload: Arc<AtomicUsize>,
209        statistics: Arc<Statistics>,
210    ) -> Client {
211        let poll_sender = PollSender::new(sender.clone());
212        Client {
213            info,
214            state,
215            sender,
216            poll_sender,
217            next_subscription_id: Arc::new(AtomicU64::new(1)),
218            subscription_capacity: capacity,
219            inbox_prefix: inbox_prefix.into(),
220            request_timeout,
221            max_payload,
222            connection_stats: statistics,
223        }
224    }
225
226    /// Returns the default timeout for requests set when creating the client.
227    ///
228    /// # Examples
229    /// ```no_run
230    /// # #[tokio::main]
231    /// # async fn main() -> Result<(), async_nats::Error> {
232    /// let client = async_nats::connect("demo.nats.io").await?;
233    /// println!("default request timeout: {:?}", client.timeout());
234    /// # Ok(())
235    /// # }
236    /// ```
237    pub fn timeout(&self) -> Option<Duration> {
238        self.request_timeout
239    }
240
241    /// Returns last received info from the server.
242    ///
243    /// # Examples
244    ///
245    /// ```no_run
246    /// # #[tokio::main]
247    /// # async fn main () -> Result<(), async_nats::Error> {
248    /// let client = async_nats::connect("demo.nats.io").await?;
249    /// println!("info: {:?}", client.server_info());
250    /// # Ok(())
251    /// # }
252    /// ```
253    pub fn server_info(&self) -> ServerInfo {
254        // We ignore notifying the watcher, as that requires mutable client reference.
255        self.info.borrow().to_owned()
256    }
257
258    /// Returns true if the server version is compatible with the version components.
259    ///
260    /// This has to be used with caution, as it is not guaranteed that the server
261    /// that client is connected to is the same version that the one that is
262    /// a JetStream meta/stream/consumer leader, especially across leafnodes.
263    ///
264    /// # Examples
265    ///
266    /// ```no_run
267    /// # #[tokio::main]
268    /// # async fn main() -> Result<(), async_nats::Error> {
269    /// let client = async_nats::connect("demo.nats.io").await?;
270    /// assert!(client.is_server_compatible(2, 8, 4));
271    /// # Ok(())
272    /// # }
273    /// ```
274    pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
275        let info = self.server_info();
276
277        let server_version_captures = match VERSION_RE.captures(&info.version) {
278            Some(captures) => captures,
279            None => return false,
280        };
281
282        let server_major = server_version_captures
283            .get(1)
284            .map(|m| m.as_str().parse::<i64>().unwrap())
285            .unwrap();
286
287        let server_minor = server_version_captures
288            .get(2)
289            .map(|m| m.as_str().parse::<i64>().unwrap())
290            .unwrap();
291
292        let server_patch = server_version_captures
293            .get(3)
294            .map(|m| m.as_str().parse::<i64>().unwrap())
295            .unwrap();
296
297        if server_major < major
298            || (server_major == major && server_minor < minor)
299            || (server_major == major && server_minor == minor && server_patch < patch)
300        {
301            return false;
302        }
303        true
304    }
305
306    /// Publish a [Message] to a given subject.
307    ///
308    /// # Examples
309    /// ```no_run
310    /// # #[tokio::main]
311    /// # async fn main() -> Result<(), async_nats::Error> {
312    /// let client = async_nats::connect("demo.nats.io").await?;
313    /// client.publish("events.data", "payload".into()).await?;
314    /// # Ok(())
315    /// # }
316    /// ```
317    pub async fn publish<S: ToSubject>(
318        &self,
319        subject: S,
320        payload: Bytes,
321    ) -> Result<(), PublishError> {
322        let subject = subject.to_subject();
323        let max_payload = self.max_payload.load(Ordering::Relaxed);
324        if payload.len() > max_payload {
325            return Err(PublishError::with_source(
326                PublishErrorKind::MaxPayloadExceeded,
327                format!(
328                    "Payload size limit of {} exceeded by message size of {}",
329                    max_payload,
330                    payload.len(),
331                ),
332            ));
333        }
334
335        self.sender
336            .send(Command::Publish(OutboundMessage {
337                subject,
338                payload,
339                reply: None,
340                headers: None,
341            }))
342            .await?;
343        Ok(())
344    }
345
346    /// Publish a [Message] with headers to a given subject.
347    ///
348    /// # Examples
349    /// ```
350    /// # #[tokio::main]
351    /// # async fn main() -> Result<(), async_nats::Error> {
352    /// use std::str::FromStr;
353    /// let client = async_nats::connect("demo.nats.io").await?;
354    /// let mut headers = async_nats::HeaderMap::new();
355    /// headers.insert(
356    ///     "X-Header",
357    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
358    /// );
359    /// client
360    ///     .publish_with_headers("events.data", headers, "payload".into())
361    ///     .await?;
362    /// # Ok(())
363    /// # }
364    /// ```
365    pub async fn publish_with_headers<S: ToSubject>(
366        &self,
367        subject: S,
368        headers: HeaderMap,
369        payload: Bytes,
370    ) -> Result<(), PublishError> {
371        let subject = subject.to_subject();
372
373        self.sender
374            .send(Command::Publish(OutboundMessage {
375                subject,
376                payload,
377                reply: None,
378                headers: Some(headers),
379            }))
380            .await?;
381        Ok(())
382    }
383
384    /// Publish a [Message] to a given subject, with specified response subject
385    /// to which the subscriber can respond.
386    /// This method does not await for the response.
387    ///
388    /// # Examples
389    ///
390    /// ```no_run
391    /// # #[tokio::main]
392    /// # async fn main() -> Result<(), async_nats::Error> {
393    /// let client = async_nats::connect("demo.nats.io").await?;
394    /// client
395    ///     .publish_with_reply("events.data", "reply_subject", "payload".into())
396    ///     .await?;
397    /// # Ok(())
398    /// # }
399    /// ```
400    pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
401        &self,
402        subject: S,
403        reply: R,
404        payload: Bytes,
405    ) -> Result<(), PublishError> {
406        let subject = subject.to_subject();
407        let reply = reply.to_subject();
408
409        self.sender
410            .send(Command::Publish(OutboundMessage {
411                subject,
412                payload,
413                reply: Some(reply),
414                headers: None,
415            }))
416            .await?;
417        Ok(())
418    }
419
420    /// Publish a [Message] to a given subject with headers and specified response subject
421    /// to which the subscriber can respond.
422    /// This method does not await for the response.
423    ///
424    /// # Examples
425    ///
426    /// ```no_run
427    /// # #[tokio::main]
428    /// # async fn main() -> Result<(), async_nats::Error> {
429    /// use std::str::FromStr;
430    /// let client = async_nats::connect("demo.nats.io").await?;
431    /// let mut headers = async_nats::HeaderMap::new();
432    /// client
433    ///     .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
434    ///     .await?;
435    /// # Ok(())
436    /// # }
437    /// ```
438    pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
439        &self,
440        subject: S,
441        reply: R,
442        headers: HeaderMap,
443        payload: Bytes,
444    ) -> Result<(), PublishError> {
445        let subject = subject.to_subject();
446        let reply = reply.to_subject();
447
448        self.sender
449            .send(Command::Publish(OutboundMessage {
450                subject,
451                payload,
452                reply: Some(reply),
453                headers: Some(headers),
454            }))
455            .await?;
456        Ok(())
457    }
458
459    /// Sends the request with headers.
460    ///
461    /// # Examples
462    /// ```no_run
463    /// # #[tokio::main]
464    /// # async fn main() -> Result<(), async_nats::Error> {
465    /// let client = async_nats::connect("demo.nats.io").await?;
466    /// let response = client.request("service", "data".into()).await?;
467    /// # Ok(())
468    /// # }
469    /// ```
470    pub async fn request<S: ToSubject>(
471        &self,
472        subject: S,
473        payload: Bytes,
474    ) -> Result<Message, RequestError> {
475        let subject = subject.to_subject();
476
477        trace!(
478            "request sent to subject: {} ({})",
479            subject.as_ref(),
480            payload.len()
481        );
482        let request = Request::new().payload(payload);
483        self.send_request(subject, request).await
484    }
485
486    /// Sends the request with headers.
487    ///
488    /// # Examples
489    /// ```no_run
490    /// # #[tokio::main]
491    /// # async fn main() -> Result<(), async_nats::Error> {
492    /// let client = async_nats::connect("demo.nats.io").await?;
493    /// let mut headers = async_nats::HeaderMap::new();
494    /// headers.insert("Key", "Value");
495    /// let response = client
496    ///     .request_with_headers("service", headers, "data".into())
497    ///     .await?;
498    /// # Ok(())
499    /// # }
500    /// ```
501    pub async fn request_with_headers<S: ToSubject>(
502        &self,
503        subject: S,
504        headers: HeaderMap,
505        payload: Bytes,
506    ) -> Result<Message, RequestError> {
507        let subject = subject.to_subject();
508
509        let request = Request::new().headers(headers).payload(payload);
510        self.send_request(subject, request).await
511    }
512
513    /// Sends the request created by the [Request].
514    ///
515    /// # Examples
516    ///
517    /// ```no_run
518    /// # #[tokio::main]
519    /// # async fn main() -> Result<(), async_nats::Error> {
520    /// let client = async_nats::connect("demo.nats.io").await?;
521    /// let request = async_nats::Request::new().payload("data".into());
522    /// let response = client.send_request("service", request).await?;
523    /// # Ok(())
524    /// # }
525    /// ```
526    pub async fn send_request<S: ToSubject>(
527        &self,
528        subject: S,
529        request: Request,
530    ) -> Result<Message, RequestError> {
531        let subject = subject.to_subject();
532
533        if let Some(inbox) = request.inbox {
534            let timeout = request.timeout.unwrap_or(self.request_timeout);
535            let mut subscriber = self.subscribe(inbox.clone()).await?;
536            let payload: Bytes = request.payload.unwrap_or_default();
537            match request.headers {
538                Some(headers) => {
539                    self.publish_with_reply_and_headers(subject, inbox, headers, payload)
540                        .await?
541                }
542                None => self.publish_with_reply(subject, inbox, payload).await?,
543            }
544            let request = match timeout {
545                Some(timeout) => {
546                    tokio::time::timeout(timeout, subscriber.next())
547                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
548                        .await?
549                }
550                None => subscriber.next().await,
551            };
552            match request {
553                Some(message) => {
554                    if message.status == Some(StatusCode::NO_RESPONDERS) {
555                        return Err(RequestError::with_source(
556                            RequestErrorKind::NoResponders,
557                            "no responders",
558                        ));
559                    }
560                    Ok(message)
561                }
562                None => Err(RequestError::with_source(
563                    RequestErrorKind::Other,
564                    "broken pipe",
565                )),
566            }
567        } else {
568            let (sender, receiver) = oneshot::channel();
569
570            let payload = request.payload.unwrap_or_default();
571            let respond = self.new_inbox().into();
572            let headers = request.headers;
573
574            self.sender
575                .send(Command::Request {
576                    subject,
577                    payload,
578                    respond,
579                    headers,
580                    sender,
581                })
582                .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
583                .await?;
584
585            let timeout = request.timeout.unwrap_or(self.request_timeout);
586            let request = match timeout {
587                Some(timeout) => {
588                    tokio::time::timeout(timeout, receiver)
589                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
590                        .await?
591                }
592                None => receiver.await,
593            };
594
595            match request {
596                Ok(message) => {
597                    if message.status == Some(StatusCode::NO_RESPONDERS) {
598                        return Err(RequestError::with_source(
599                            RequestErrorKind::NoResponders,
600                            "no responders",
601                        ));
602                    }
603                    Ok(message)
604                }
605                Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
606            }
607        }
608    }
609
610    /// Create a new globally unique inbox which can be used for replies.
611    ///
612    /// # Examples
613    ///
614    /// ```no_run
615    /// # #[tokio::main]
616    /// # async fn main() -> Result<(), async_nats::Error> {
617    /// # let mut nc = async_nats::connect("demo.nats.io").await?;
618    /// let reply = nc.new_inbox();
619    /// let rsub = nc.subscribe(reply).await?;
620    /// # Ok(())
621    /// # }
622    /// ```
623    pub fn new_inbox(&self) -> String {
624        format!("{}.{}", self.inbox_prefix, crate::id_generator::next())
625    }
626
627    /// Subscribes to a subject to receive [messages][Message].
628    ///
629    /// # Examples
630    ///
631    /// ```no_run
632    /// # #[tokio::main]
633    /// # async fn main() -> Result<(), async_nats::Error> {
634    /// use futures_util::StreamExt;
635    /// let client = async_nats::connect("demo.nats.io").await?;
636    /// let mut subscription = client.subscribe("events.>").await?;
637    /// while let Some(message) = subscription.next().await {
638    ///     println!("received message: {:?}", message);
639    /// }
640    /// # Ok(())
641    /// # }
642    /// ```
643    pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
644        let subject = subject.to_subject();
645        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
646        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
647        let subscription_statistics = Arc::new(SubscriberStatistics::default());
648
649        self.sender
650            .send(Command::Subscribe {
651                sid,
652                subject,
653                queue_group: None,
654                sender,
655                statistics: subscription_statistics.clone(),
656            })
657            .await?;
658
659        Ok(Subscriber::new(
660            sid,
661            self.sender.clone(),
662            receiver,
663            subscription_statistics,
664            self.connection_stats.clone(),
665        ))
666    }
667
668    /// Subscribes to a subject with a queue group to receive [messages][Message].
669    ///
670    /// # Examples
671    ///
672    /// ```no_run
673    /// # #[tokio::main]
674    /// # async fn main() -> Result<(), async_nats::Error> {
675    /// use futures_util::StreamExt;
676    /// let client = async_nats::connect("demo.nats.io").await?;
677    /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
678    /// while let Some(message) = subscription.next().await {
679    ///     println!("received message: {:?}", message);
680    /// }
681    /// # Ok(())
682    /// # }
683    /// ```
684    pub async fn queue_subscribe<S: ToSubject>(
685        &self,
686        subject: S,
687        queue_group: String,
688    ) -> Result<Subscriber, SubscribeError> {
689        let subject = subject.to_subject();
690
691        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
692        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
693        let subscription_statistics = Arc::new(SubscriberStatistics::default());
694
695        self.sender
696            .send(Command::Subscribe {
697                sid,
698                subject,
699                queue_group: Some(queue_group),
700                sender,
701                statistics: subscription_statistics.clone(),
702            })
703            .await?;
704
705        Ok(Subscriber::new(
706            sid,
707            self.sender.clone(),
708            receiver,
709            subscription_statistics,
710            self.connection_stats.clone(),
711        ))
712    }
713
714    /// Flushes the internal buffer ensuring that all messages are sent.
715    ///
716    /// # Examples
717    ///
718    /// ```no_run
719    /// # #[tokio::main]
720    /// # async fn main() -> Result<(), async_nats::Error> {
721    /// let client = async_nats::connect("demo.nats.io").await?;
722    /// client.flush().await?;
723    /// # Ok(())
724    /// # }
725    /// ```
726    pub async fn flush(&self) -> Result<(), FlushError> {
727        let (tx, rx) = tokio::sync::oneshot::channel();
728        self.sender
729            .send(Command::Flush { observer: tx })
730            .await
731            .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
732
733        rx.await
734            .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
735        Ok(())
736    }
737
738    /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
739    /// messages, then closes the connection. Once completed, any streams associated with the
740    /// connection and its [Clients](crate::Client) will be closed, and further [crate::Client] commands will fail.
741    ///
742    /// # Examples
743    ///
744    /// ```no_run
745    /// # #[tokio::main]
746    /// # async fn main() -> Result<(), async_nats::Error> {
747    /// use futures_util::StreamExt;
748    /// let client = async_nats::connect("demo.nats.io").await?;
749    /// let mut subscription = client.subscribe("events.>").await?;
750    ///
751    /// client.drain().await?;
752    ///
753    /// # // existing subscriptions are closed and further commands will fail
754    /// assert!(subscription.next().await.is_none());
755    /// client
756    ///     .subscribe("events.>")
757    ///     .await
758    ///     .expect_err("Expected further commands to fail");
759    ///
760    /// # Ok(())
761    /// # }
762    /// ```
763    pub async fn drain(&self) -> Result<(), DrainError> {
764        // Drain all subscriptions
765        self.sender.send(Command::Drain { sid: None }).await?;
766
767        // Remaining process is handled on the handler-side
768        Ok(())
769    }
770
771    /// Returns the current state of the connection.
772    ///
773    /// # Examples
774    /// ```no_run
775    /// # #[tokio::main]
776    /// # async fn main() -> Result<(), async_nats::Error> {
777    /// let client = async_nats::connect("demo.nats.io").await?;
778    /// println!("connection state: {}", client.connection_state());
779    /// # Ok(())
780    /// # }
781    /// ```
782    pub fn connection_state(&self) -> State {
783        self.state.borrow().to_owned()
784    }
785
786    /// Forces the client to reconnect.
787    /// Keep in mind that client will reconnect automatically if the connection is lost and this
788    /// method does not have to be used in normal circumstances.
789    /// However, if you want to force the client to reconnect, for example to re-trigger
790    /// the `auth-callback`, or manually rebalance connections, this method can be useful.
791    /// This method does not wait for connection to be re-established.
792    ///
793    /// # Examples
794    /// ```no_run
795    /// # #[tokio::main]
796    /// # async fn main() -> Result<(), async_nats::Error> {
797    /// let client = async_nats::connect("demo.nats.io").await?;
798    /// client.force_reconnect().await?;
799    /// # Ok(())
800    /// # }
801    /// ```
802    pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
803        self.sender
804            .send(Command::Reconnect)
805            .await
806            .map_err(Into::into)
807    }
808
809    /// Returns struct representing statistics of the whole lifecycle of the client.
810    /// This includes number of bytes sent/received, number of messages sent/received,
811    /// and number of times the connection was established.
812    /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
813    /// across threads.
814    ///
815    /// # Examples
816    /// ```no_run
817    /// # #[tokio::main]
818    /// # async fn main() -> Result<(), async_nats::Error> {
819    /// let client = async_nats::connect("demo.nats.io").await?;
820    /// let statistics = client.statistics();
821    /// println!("client statistics: {:#?}", statistics);
822    /// # Ok(())
823    /// # }
824    /// ```
825    pub fn statistics(&self) -> Arc<Statistics> {
826        self.connection_stats.clone()
827    }
828}
829
830/// Used for building customized requests.
831#[derive(Default)]
832pub struct Request {
833    pub payload: Option<Bytes>,
834    pub headers: Option<HeaderMap>,
835    pub timeout: Option<Option<Duration>>,
836    pub inbox: Option<String>,
837}
838
839impl Request {
840    pub fn new() -> Request {
841        Default::default()
842    }
843
844    /// Sets the payload of the request. If not used, empty payload will be sent.
845    ///
846    /// # Examples
847    /// ```no_run
848    /// # #[tokio::main]
849    /// # async fn main() -> Result<(), async_nats::Error> {
850    /// let client = async_nats::connect("demo.nats.io").await?;
851    /// let request = async_nats::Request::new().payload("data".into());
852    /// client.send_request("service", request).await?;
853    /// # Ok(())
854    /// # }
855    /// ```
856    pub fn payload(mut self, payload: Bytes) -> Request {
857        self.payload = Some(payload);
858        self
859    }
860
861    /// Sets the headers of the requests.
862    ///
863    /// # Examples
864    /// ```no_run
865    /// # #[tokio::main]
866    /// # async fn main() -> Result<(), async_nats::Error> {
867    /// use std::str::FromStr;
868    /// let client = async_nats::connect("demo.nats.io").await?;
869    /// let mut headers = async_nats::HeaderMap::new();
870    /// headers.insert(
871    ///     "X-Example",
872    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
873    /// );
874    /// let request = async_nats::Request::new()
875    ///     .headers(headers)
876    ///     .payload("data".into());
877    /// client.send_request("service", request).await?;
878    /// # Ok(())
879    /// # }
880    /// ```
881    pub fn headers(mut self, headers: HeaderMap) -> Request {
882        self.headers = Some(headers);
883        self
884    }
885
886    /// Sets the custom timeout of the request. Overrides default [Client] timeout.
887    /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
888    /// To use default timeout, simply do not call this function.
889    ///
890    /// # Examples
891    /// ```no_run
892    /// # #[tokio::main]
893    /// # async fn main() -> Result<(), async_nats::Error> {
894    /// let client = async_nats::connect("demo.nats.io").await?;
895    /// let request = async_nats::Request::new()
896    ///     .timeout(Some(std::time::Duration::from_secs(15)))
897    ///     .payload("data".into());
898    /// client.send_request("service", request).await?;
899    /// # Ok(())
900    /// # }
901    /// ```
902    pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
903        self.timeout = Some(timeout);
904        self
905    }
906
907    /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
908    ///
909    /// # Examples
910    /// ```no_run
911    /// # #[tokio::main]
912    /// # async fn main() -> Result<(), async_nats::Error> {
913    /// use std::str::FromStr;
914    /// let client = async_nats::connect("demo.nats.io").await?;
915    /// let request = async_nats::Request::new()
916    ///     .inbox("custom_inbox".into())
917    ///     .payload("data".into());
918    /// client.send_request("service", request).await?;
919    /// # Ok(())
920    /// # }
921    /// ```
922    pub fn inbox(mut self, inbox: String) -> Request {
923        self.inbox = Some(inbox);
924        self
925    }
926}
927
928#[derive(Error, Debug)]
929#[error("failed to send reconnect: {0}")]
930pub struct ReconnectError(#[source] crate::Error);
931
932impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
933    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
934        ReconnectError(Box::new(err))
935    }
936}
937
938#[derive(Error, Debug)]
939#[error("failed to send subscribe: {0}")]
940pub struct SubscribeError(#[source] crate::Error);
941
942impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
943    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
944        SubscribeError(Box::new(err))
945    }
946}
947
948#[derive(Error, Debug)]
949#[error("failed to send drain: {0}")]
950pub struct DrainError(#[source] crate::Error);
951
952impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
953    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
954        DrainError(Box::new(err))
955    }
956}
957
958#[derive(Clone, Copy, Debug, PartialEq)]
959pub enum RequestErrorKind {
960    /// There are services listening on requested subject, but they didn't respond
961    /// in time.
962    TimedOut,
963    /// No one is listening on request subject.
964    NoResponders,
965    /// Other errors, client/io related.
966    Other,
967}
968
969impl Display for RequestErrorKind {
970    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
971        match self {
972            Self::TimedOut => write!(f, "request timed out"),
973            Self::NoResponders => write!(f, "no responders"),
974            Self::Other => write!(f, "request failed"),
975        }
976    }
977}
978
979/// Error returned when a core NATS request fails.
980/// To be enumerate over the variants, call [RequestError::kind].
981pub type RequestError = Error<RequestErrorKind>;
982
983impl From<PublishError> for RequestError {
984    fn from(e: PublishError) -> Self {
985        RequestError::with_source(RequestErrorKind::Other, e)
986    }
987}
988
989impl From<SubscribeError> for RequestError {
990    fn from(e: SubscribeError) -> Self {
991        RequestError::with_source(RequestErrorKind::Other, e)
992    }
993}
994
995#[derive(Clone, Copy, Debug, PartialEq)]
996pub enum FlushErrorKind {
997    /// Sending the flush failed client side.
998    SendError,
999    /// Flush failed.
1000    /// This can happen mostly in case of connection issues
1001    /// that cannot be resolved quickly.
1002    FlushError,
1003}
1004
1005impl Display for FlushErrorKind {
1006    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1007        match self {
1008            Self::SendError => write!(f, "failed to send flush request"),
1009            Self::FlushError => write!(f, "flush failed"),
1010        }
1011    }
1012}
1013
1014pub type FlushError = Error<FlushErrorKind>;
1015
1016/// Represents statistics for the instance of the client throughout its lifecycle.
1017#[derive(Default, Debug)]
1018pub struct Statistics {
1019    /// Number of bytes received. This does not include the protocol overhead.
1020    pub in_bytes: AtomicU64,
1021    /// Number of bytes sent. This doe not include the protocol overhead.
1022    pub out_bytes: AtomicU64,
1023    /// Number of messages received.
1024    pub in_messages: AtomicU64,
1025    /// Number of messages sent.
1026    pub out_messages: AtomicU64,
1027    /// Number of times connection was established.
1028    /// Initial connect will be counted as well, then all successful reconnects.
1029    pub connects: AtomicU64,
1030    /// Number of messages currently buffered in subscription channels.
1031    pub subscription_pending_messages: AtomicU64,
1032    /// Number of bytes currently buffered in subscription channels.
1033    pub subscription_pending_bytes: AtomicU64,
1034    /// Number of messages dropped because subscription channels were full.
1035    pub subscription_dropped_messages: AtomicU64,
1036    /// Number of bytes dropped because subscription channels were full.
1037    pub subscription_dropped_bytes: AtomicU64,
1038    /// Number of currently active subscription handles.
1039    pub active_subscriptions: AtomicU64,
1040    /// Total configured capacity of currently active subscription channels.
1041    pub active_subscription_capacity: AtomicU64,
1042}