async_nats/
client.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::message::OutboundMessage;
20use crate::subject::ToSubject;
21use crate::ServerInfo;
22
23use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
24use crate::error::Error;
25use bytes::Bytes;
26use futures_util::future::TryFutureExt;
27use futures_util::{Sink, SinkExt as _, StreamExt};
28use once_cell::sync::Lazy;
29use portable_atomic::AtomicU64;
30use regex::Regex;
31use std::fmt::Display;
32use std::sync::atomic::{AtomicUsize, Ordering};
33use std::sync::Arc;
34use std::time::Duration;
35use thiserror::Error;
36use tokio::sync::{mpsc, oneshot};
37use tokio_util::sync::PollSender;
38use tracing::trace;
39
40static VERSION_RE: Lazy<Regex> =
41    Lazy::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
42
43/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
44/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
45pub type PublishError = Error<PublishErrorKind>;
46
47impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
48    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
49        PublishError::with_source(PublishErrorKind::Send, err)
50    }
51}
52
53impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
54    fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
55        PublishError::with_source(PublishErrorKind::Send, err)
56    }
57}
58
59#[derive(Copy, Clone, Debug, PartialEq)]
60pub enum PublishErrorKind {
61    MaxPayloadExceeded,
62    BadSubject,
63    Send,
64}
65
66impl Display for PublishErrorKind {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
70            PublishErrorKind::Send => write!(f, "failed to send message"),
71            PublishErrorKind::BadSubject => write!(f, "bad subject"),
72        }
73    }
74}
75
76/// Client is a `Cloneable` handle to NATS connection.
77/// Client should not be created directly. Instead, one of two methods can be used:
78/// [crate::connect] and [crate::ConnectOptions::connect]
79#[derive(Clone, Debug)]
80pub struct Client {
81    info: tokio::sync::watch::Receiver<ServerInfo>,
82    pub(crate) state: tokio::sync::watch::Receiver<State>,
83    pub(crate) sender: mpsc::Sender<Command>,
84    poll_sender: PollSender<Command>,
85    next_subscription_id: Arc<AtomicU64>,
86    subscription_capacity: usize,
87    inbox_prefix: Arc<str>,
88    request_timeout: Option<Duration>,
89    max_payload: Arc<AtomicUsize>,
90    connection_stats: Arc<Statistics>,
91}
92
93pub mod traits {
94    use std::{future::Future, time::Duration};
95
96    use bytes::Bytes;
97
98    use crate::{message, subject::ToSubject, Message};
99
100    use super::{PublishError, Request, RequestError, SubscribeError};
101
102    pub trait Publisher {
103        fn publish_with_reply<S: ToSubject, R: ToSubject>(
104            &self,
105            subject: S,
106            reply: R,
107            payload: Bytes,
108        ) -> impl Future<Output = Result<(), PublishError>>;
109
110        fn publish_message(
111            &self,
112            msg: message::OutboundMessage,
113        ) -> impl Future<Output = Result<(), PublishError>>;
114    }
115    pub trait Subscriber {
116        fn subscribe<S: ToSubject>(
117            &self,
118            subject: S,
119        ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>;
120    }
121    pub trait Requester {
122        fn send_request<S: ToSubject>(
123            &self,
124            subject: S,
125            request: Request,
126        ) -> impl Future<Output = Result<Message, RequestError>>;
127    }
128    pub trait TimeoutProvider {
129        fn timeout(&self) -> Option<Duration>;
130    }
131}
132
133impl traits::Requester for Client {
134    fn send_request<S: ToSubject>(
135        &self,
136        subject: S,
137        request: Request,
138    ) -> impl Future<Output = Result<Message, RequestError>> {
139        self.send_request(subject, request)
140    }
141}
142
143impl traits::TimeoutProvider for Client {
144    fn timeout(&self) -> Option<Duration> {
145        self.timeout()
146    }
147}
148
149impl traits::Publisher for Client {
150    fn publish_with_reply<S: ToSubject, R: ToSubject>(
151        &self,
152        subject: S,
153        reply: R,
154        payload: Bytes,
155    ) -> impl Future<Output = Result<(), PublishError>> {
156        self.publish_with_reply(subject, reply, payload)
157    }
158
159    async fn publish_message(&self, msg: OutboundMessage) -> Result<(), PublishError> {
160        self.sender
161            .send(Command::Publish(msg))
162            .await
163            .map_err(Into::into)
164    }
165}
166
167impl traits::Subscriber for Client {
168    fn subscribe<S: ToSubject>(
169        &self,
170        subject: S,
171    ) -> impl Future<Output = Result<Subscriber, SubscribeError>> {
172        self.subscribe(subject)
173    }
174}
175
176impl Sink<OutboundMessage> for Client {
177    type Error = PublishError;
178
179    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
180        self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
181    }
182
183    fn start_send(mut self: Pin<&mut Self>, msg: OutboundMessage) -> Result<(), Self::Error> {
184        self.poll_sender
185            .start_send_unpin(Command::Publish(msg))
186            .map_err(Into::into)
187    }
188
189    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
190        self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
191    }
192
193    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194        self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
195    }
196}
197
198impl Client {
199    #[allow(clippy::too_many_arguments)]
200    pub(crate) fn new(
201        info: tokio::sync::watch::Receiver<ServerInfo>,
202        state: tokio::sync::watch::Receiver<State>,
203        sender: mpsc::Sender<Command>,
204        capacity: usize,
205        inbox_prefix: String,
206        request_timeout: Option<Duration>,
207        max_payload: Arc<AtomicUsize>,
208        statistics: Arc<Statistics>,
209    ) -> Client {
210        let poll_sender = PollSender::new(sender.clone());
211        Client {
212            info,
213            state,
214            sender,
215            poll_sender,
216            next_subscription_id: Arc::new(AtomicU64::new(1)),
217            subscription_capacity: capacity,
218            inbox_prefix: inbox_prefix.into(),
219            request_timeout,
220            max_payload,
221            connection_stats: statistics,
222        }
223    }
224
225    /// Returns the default timeout for requests set when creating the client.
226    ///
227    /// # Examples
228    /// ```no_run
229    /// # #[tokio::main]
230    /// # async fn main() -> Result<(), async_nats::Error> {
231    /// let client = async_nats::connect("demo.nats.io").await?;
232    /// println!("default request timeout: {:?}", client.timeout());
233    /// # Ok(())
234    /// # }
235    /// ```
236    pub fn timeout(&self) -> Option<Duration> {
237        self.request_timeout
238    }
239
240    /// Returns last received info from the server.
241    ///
242    /// # Examples
243    ///
244    /// ```no_run
245    /// # #[tokio::main]
246    /// # async fn main () -> Result<(), async_nats::Error> {
247    /// let client = async_nats::connect("demo.nats.io").await?;
248    /// println!("info: {:?}", client.server_info());
249    /// # Ok(())
250    /// # }
251    /// ```
252    pub fn server_info(&self) -> ServerInfo {
253        // We ignore notifying the watcher, as that requires mutable client reference.
254        self.info.borrow().to_owned()
255    }
256
257    /// Returns true if the server version is compatible with the version components.
258    ///
259    /// This has to be used with caution, as it is not guaranteed that the server
260    /// that client is connected to is the same version that the one that is
261    /// a JetStream meta/stream/consumer leader, especially across leafnodes.
262    ///
263    /// # Examples
264    ///
265    /// ```no_run
266    /// # #[tokio::main]
267    /// # async fn main() -> Result<(), async_nats::Error> {
268    /// let client = async_nats::connect("demo.nats.io").await?;
269    /// assert!(client.is_server_compatible(2, 8, 4));
270    /// # Ok(())
271    /// # }
272    /// ```
273    pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
274        let info = self.server_info();
275
276        let server_version_captures = match VERSION_RE.captures(&info.version) {
277            Some(captures) => captures,
278            None => return false,
279        };
280
281        let server_major = server_version_captures
282            .get(1)
283            .map(|m| m.as_str().parse::<i64>().unwrap())
284            .unwrap();
285
286        let server_minor = server_version_captures
287            .get(2)
288            .map(|m| m.as_str().parse::<i64>().unwrap())
289            .unwrap();
290
291        let server_patch = server_version_captures
292            .get(3)
293            .map(|m| m.as_str().parse::<i64>().unwrap())
294            .unwrap();
295
296        if server_major < major
297            || (server_major == major && server_minor < minor)
298            || (server_major == major && server_minor == minor && server_patch < patch)
299        {
300            return false;
301        }
302        true
303    }
304
305    /// Publish a [Message] to a given subject.
306    ///
307    /// # Examples
308    /// ```no_run
309    /// # #[tokio::main]
310    /// # async fn main() -> Result<(), async_nats::Error> {
311    /// let client = async_nats::connect("demo.nats.io").await?;
312    /// client.publish("events.data", "payload".into()).await?;
313    /// # Ok(())
314    /// # }
315    /// ```
316    pub async fn publish<S: ToSubject>(
317        &self,
318        subject: S,
319        payload: Bytes,
320    ) -> Result<(), PublishError> {
321        let subject = subject.to_subject();
322        let max_payload = self.max_payload.load(Ordering::Relaxed);
323        if payload.len() > max_payload {
324            return Err(PublishError::with_source(
325                PublishErrorKind::MaxPayloadExceeded,
326                format!(
327                    "Payload size limit of {} exceeded by message size of {}",
328                    max_payload,
329                    payload.len(),
330                ),
331            ));
332        }
333
334        self.sender
335            .send(Command::Publish(OutboundMessage {
336                subject,
337                payload,
338                reply: None,
339                headers: None,
340            }))
341            .await?;
342        Ok(())
343    }
344
345    /// Publish a [Message] with headers to a given subject.
346    ///
347    /// # Examples
348    /// ```
349    /// # #[tokio::main]
350    /// # async fn main() -> Result<(), async_nats::Error> {
351    /// use std::str::FromStr;
352    /// let client = async_nats::connect("demo.nats.io").await?;
353    /// let mut headers = async_nats::HeaderMap::new();
354    /// headers.insert(
355    ///     "X-Header",
356    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
357    /// );
358    /// client
359    ///     .publish_with_headers("events.data", headers, "payload".into())
360    ///     .await?;
361    /// # Ok(())
362    /// # }
363    /// ```
364    pub async fn publish_with_headers<S: ToSubject>(
365        &self,
366        subject: S,
367        headers: HeaderMap,
368        payload: Bytes,
369    ) -> Result<(), PublishError> {
370        let subject = subject.to_subject();
371
372        self.sender
373            .send(Command::Publish(OutboundMessage {
374                subject,
375                payload,
376                reply: None,
377                headers: Some(headers),
378            }))
379            .await?;
380        Ok(())
381    }
382
383    /// Publish a [Message] to a given subject, with specified response subject
384    /// to which the subscriber can respond.
385    /// This method does not await for the response.
386    ///
387    /// # Examples
388    ///
389    /// ```no_run
390    /// # #[tokio::main]
391    /// # async fn main() -> Result<(), async_nats::Error> {
392    /// let client = async_nats::connect("demo.nats.io").await?;
393    /// client
394    ///     .publish_with_reply("events.data", "reply_subject", "payload".into())
395    ///     .await?;
396    /// # Ok(())
397    /// # }
398    /// ```
399    pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
400        &self,
401        subject: S,
402        reply: R,
403        payload: Bytes,
404    ) -> Result<(), PublishError> {
405        let subject = subject.to_subject();
406        let reply = reply.to_subject();
407
408        self.sender
409            .send(Command::Publish(OutboundMessage {
410                subject,
411                payload,
412                reply: Some(reply),
413                headers: None,
414            }))
415            .await?;
416        Ok(())
417    }
418
419    /// Publish a [Message] to a given subject with headers and specified response subject
420    /// to which the subscriber can respond.
421    /// This method does not await for the response.
422    ///
423    /// # Examples
424    ///
425    /// ```no_run
426    /// # #[tokio::main]
427    /// # async fn main() -> Result<(), async_nats::Error> {
428    /// use std::str::FromStr;
429    /// let client = async_nats::connect("demo.nats.io").await?;
430    /// let mut headers = async_nats::HeaderMap::new();
431    /// client
432    ///     .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
433    ///     .await?;
434    /// # Ok(())
435    /// # }
436    /// ```
437    pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
438        &self,
439        subject: S,
440        reply: R,
441        headers: HeaderMap,
442        payload: Bytes,
443    ) -> Result<(), PublishError> {
444        let subject = subject.to_subject();
445        let reply = reply.to_subject();
446
447        self.sender
448            .send(Command::Publish(OutboundMessage {
449                subject,
450                payload,
451                reply: Some(reply),
452                headers: Some(headers),
453            }))
454            .await?;
455        Ok(())
456    }
457
458    /// Sends the request with headers.
459    ///
460    /// # Examples
461    /// ```no_run
462    /// # #[tokio::main]
463    /// # async fn main() -> Result<(), async_nats::Error> {
464    /// let client = async_nats::connect("demo.nats.io").await?;
465    /// let response = client.request("service", "data".into()).await?;
466    /// # Ok(())
467    /// # }
468    /// ```
469    pub async fn request<S: ToSubject>(
470        &self,
471        subject: S,
472        payload: Bytes,
473    ) -> Result<Message, RequestError> {
474        let subject = subject.to_subject();
475
476        trace!(
477            "request sent to subject: {} ({})",
478            subject.as_ref(),
479            payload.len()
480        );
481        let request = Request::new().payload(payload);
482        self.send_request(subject, request).await
483    }
484
485    /// Sends the request with headers.
486    ///
487    /// # Examples
488    /// ```no_run
489    /// # #[tokio::main]
490    /// # async fn main() -> Result<(), async_nats::Error> {
491    /// let client = async_nats::connect("demo.nats.io").await?;
492    /// let mut headers = async_nats::HeaderMap::new();
493    /// headers.insert("Key", "Value");
494    /// let response = client
495    ///     .request_with_headers("service", headers, "data".into())
496    ///     .await?;
497    /// # Ok(())
498    /// # }
499    /// ```
500    pub async fn request_with_headers<S: ToSubject>(
501        &self,
502        subject: S,
503        headers: HeaderMap,
504        payload: Bytes,
505    ) -> Result<Message, RequestError> {
506        let subject = subject.to_subject();
507
508        let request = Request::new().headers(headers).payload(payload);
509        self.send_request(subject, request).await
510    }
511
512    /// Sends the request created by the [Request].
513    ///
514    /// # Examples
515    ///
516    /// ```no_run
517    /// # #[tokio::main]
518    /// # async fn main() -> Result<(), async_nats::Error> {
519    /// let client = async_nats::connect("demo.nats.io").await?;
520    /// let request = async_nats::Request::new().payload("data".into());
521    /// let response = client.send_request("service", request).await?;
522    /// # Ok(())
523    /// # }
524    /// ```
525    pub async fn send_request<S: ToSubject>(
526        &self,
527        subject: S,
528        request: Request,
529    ) -> Result<Message, RequestError> {
530        let subject = subject.to_subject();
531
532        if let Some(inbox) = request.inbox {
533            let timeout = request.timeout.unwrap_or(self.request_timeout);
534            let mut subscriber = self.subscribe(inbox.clone()).await?;
535            let payload: Bytes = request.payload.unwrap_or_default();
536            match request.headers {
537                Some(headers) => {
538                    self.publish_with_reply_and_headers(subject, inbox, headers, payload)
539                        .await?
540                }
541                None => self.publish_with_reply(subject, inbox, payload).await?,
542            }
543            let request = match timeout {
544                Some(timeout) => {
545                    tokio::time::timeout(timeout, subscriber.next())
546                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
547                        .await?
548                }
549                None => subscriber.next().await,
550            };
551            match request {
552                Some(message) => {
553                    if message.status == Some(StatusCode::NO_RESPONDERS) {
554                        return Err(RequestError::with_source(
555                            RequestErrorKind::NoResponders,
556                            "no responders",
557                        ));
558                    }
559                    Ok(message)
560                }
561                None => Err(RequestError::with_source(
562                    RequestErrorKind::Other,
563                    "broken pipe",
564                )),
565            }
566        } else {
567            let (sender, receiver) = oneshot::channel();
568
569            let payload = request.payload.unwrap_or_default();
570            let respond = self.new_inbox().into();
571            let headers = request.headers;
572
573            self.sender
574                .send(Command::Request {
575                    subject,
576                    payload,
577                    respond,
578                    headers,
579                    sender,
580                })
581                .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
582                .await?;
583
584            let timeout = request.timeout.unwrap_or(self.request_timeout);
585            let request = match timeout {
586                Some(timeout) => {
587                    tokio::time::timeout(timeout, receiver)
588                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
589                        .await?
590                }
591                None => receiver.await,
592            };
593
594            match request {
595                Ok(message) => {
596                    if message.status == Some(StatusCode::NO_RESPONDERS) {
597                        return Err(RequestError::with_source(
598                            RequestErrorKind::NoResponders,
599                            "no responders",
600                        ));
601                    }
602                    Ok(message)
603                }
604                Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
605            }
606        }
607    }
608
609    /// Create a new globally unique inbox which can be used for replies.
610    ///
611    /// # Examples
612    ///
613    /// ```no_run
614    /// # #[tokio::main]
615    /// # async fn main() -> Result<(), async_nats::Error> {
616    /// # let mut nc = async_nats::connect("demo.nats.io").await?;
617    /// let reply = nc.new_inbox();
618    /// let rsub = nc.subscribe(reply).await?;
619    /// # Ok(())
620    /// # }
621    /// ```
622    pub fn new_inbox(&self) -> String {
623        format!("{}.{}", self.inbox_prefix, nuid::next())
624    }
625
626    /// Subscribes to a subject to receive [messages][Message].
627    ///
628    /// # Examples
629    ///
630    /// ```no_run
631    /// # #[tokio::main]
632    /// # async fn main() -> Result<(), async_nats::Error> {
633    /// use futures_util::StreamExt;
634    /// let client = async_nats::connect("demo.nats.io").await?;
635    /// let mut subscription = client.subscribe("events.>").await?;
636    /// while let Some(message) = subscription.next().await {
637    ///     println!("received message: {:?}", message);
638    /// }
639    /// # Ok(())
640    /// # }
641    /// ```
642    pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
643        let subject = subject.to_subject();
644        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
645        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
646
647        self.sender
648            .send(Command::Subscribe {
649                sid,
650                subject,
651                queue_group: None,
652                sender,
653            })
654            .await?;
655
656        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
657    }
658
659    /// Subscribes to a subject with a queue group to receive [messages][Message].
660    ///
661    /// # Examples
662    ///
663    /// ```no_run
664    /// # #[tokio::main]
665    /// # async fn main() -> Result<(), async_nats::Error> {
666    /// use futures_util::StreamExt;
667    /// let client = async_nats::connect("demo.nats.io").await?;
668    /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
669    /// while let Some(message) = subscription.next().await {
670    ///     println!("received message: {:?}", message);
671    /// }
672    /// # Ok(())
673    /// # }
674    /// ```
675    pub async fn queue_subscribe<S: ToSubject>(
676        &self,
677        subject: S,
678        queue_group: String,
679    ) -> Result<Subscriber, SubscribeError> {
680        let subject = subject.to_subject();
681
682        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
683        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
684
685        self.sender
686            .send(Command::Subscribe {
687                sid,
688                subject,
689                queue_group: Some(queue_group),
690                sender,
691            })
692            .await?;
693
694        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
695    }
696
697    /// Flushes the internal buffer ensuring that all messages are sent.
698    ///
699    /// # Examples
700    ///
701    /// ```no_run
702    /// # #[tokio::main]
703    /// # async fn main() -> Result<(), async_nats::Error> {
704    /// let client = async_nats::connect("demo.nats.io").await?;
705    /// client.flush().await?;
706    /// # Ok(())
707    /// # }
708    /// ```
709    pub async fn flush(&self) -> Result<(), FlushError> {
710        let (tx, rx) = tokio::sync::oneshot::channel();
711        self.sender
712            .send(Command::Flush { observer: tx })
713            .await
714            .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
715
716        rx.await
717            .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
718        Ok(())
719    }
720
721    /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
722    /// messages, then closes the connection. Once completed, any associated streams associated with the
723    /// client will be closed, and further client commands will fail
724    ///
725    /// # Examples
726    ///
727    /// ```no_run
728    /// # #[tokio::main]
729    /// # async fn main() -> Result<(), async_nats::Error> {
730    /// use futures_util::StreamExt;
731    /// let client = async_nats::connect("demo.nats.io").await?;
732    /// let mut subscription = client.subscribe("events.>").await?;
733    ///
734    /// client.drain().await?;
735    ///
736    /// # // existing subscriptions are closed and further commands will fail
737    /// assert!(subscription.next().await.is_none());
738    /// client
739    ///     .subscribe("events.>")
740    ///     .await
741    ///     .expect_err("Expected further commands to fail");
742    ///
743    /// # Ok(())
744    /// # }
745    /// ```
746    pub async fn drain(&self) -> Result<(), DrainError> {
747        // Drain all subscriptions
748        self.sender.send(Command::Drain { sid: None }).await?;
749
750        // Remaining process is handled on the handler-side
751        Ok(())
752    }
753
754    /// Returns the current state of the connection.
755    ///
756    /// # Examples
757    /// ```no_run
758    /// # #[tokio::main]
759    /// # async fn main() -> Result<(), async_nats::Error> {
760    /// let client = async_nats::connect("demo.nats.io").await?;
761    /// println!("connection state: {}", client.connection_state());
762    /// # Ok(())
763    /// # }
764    /// ```
765    pub fn connection_state(&self) -> State {
766        self.state.borrow().to_owned()
767    }
768
769    /// Forces the client to reconnect.
770    /// Keep in mind that client will reconnect automatically if the connection is lost and this
771    /// method does not have to be used in normal circumstances.
772    /// However, if you want to force the client to reconnect, for example to re-trigger
773    /// the `auth-callback`, or manually rebalance connections, this method can be useful.
774    /// This method does not wait for connection to be re-established.
775    ///
776    /// # Examples
777    /// ```no_run
778    /// # #[tokio::main]
779    /// # async fn main() -> Result<(), async_nats::Error> {
780    /// let client = async_nats::connect("demo.nats.io").await?;
781    /// client.force_reconnect().await?;
782    /// # Ok(())
783    /// # }
784    /// ```
785    pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
786        self.sender
787            .send(Command::Reconnect)
788            .await
789            .map_err(Into::into)
790    }
791
792    /// Returns struct representing statistics of the whole lifecycle of the client.
793    /// This includes number of bytes sent/received, number of messages sent/received,
794    /// and number of times the connection was established.
795    /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
796    /// across threads.
797    ///
798    /// # Examples
799    /// ```no_run
800    /// # #[tokio::main]
801    /// # async fn main() -> Result<(), async_nats::Error> {
802    /// let client = async_nats::connect("demo.nats.io").await?;
803    /// let statistics = client.statistics();
804    /// println!("client statistics: {:#?}", statistics);
805    /// # Ok(())
806    /// # }
807    /// ```
808    pub fn statistics(&self) -> Arc<Statistics> {
809        self.connection_stats.clone()
810    }
811}
812
813/// Used for building customized requests.
814#[derive(Default)]
815pub struct Request {
816    pub payload: Option<Bytes>,
817    pub headers: Option<HeaderMap>,
818    pub timeout: Option<Option<Duration>>,
819    pub inbox: Option<String>,
820}
821
822impl Request {
823    pub fn new() -> Request {
824        Default::default()
825    }
826
827    /// Sets the payload of the request. If not used, empty payload will be sent.
828    ///
829    /// # Examples
830    /// ```no_run
831    /// # #[tokio::main]
832    /// # async fn main() -> Result<(), async_nats::Error> {
833    /// let client = async_nats::connect("demo.nats.io").await?;
834    /// let request = async_nats::Request::new().payload("data".into());
835    /// client.send_request("service", request).await?;
836    /// # Ok(())
837    /// # }
838    /// ```
839    pub fn payload(mut self, payload: Bytes) -> Request {
840        self.payload = Some(payload);
841        self
842    }
843
844    /// Sets the headers of the requests.
845    ///
846    /// # Examples
847    /// ```no_run
848    /// # #[tokio::main]
849    /// # async fn main() -> Result<(), async_nats::Error> {
850    /// use std::str::FromStr;
851    /// let client = async_nats::connect("demo.nats.io").await?;
852    /// let mut headers = async_nats::HeaderMap::new();
853    /// headers.insert(
854    ///     "X-Example",
855    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
856    /// );
857    /// let request = async_nats::Request::new()
858    ///     .headers(headers)
859    ///     .payload("data".into());
860    /// client.send_request("service", request).await?;
861    /// # Ok(())
862    /// # }
863    /// ```
864    pub fn headers(mut self, headers: HeaderMap) -> Request {
865        self.headers = Some(headers);
866        self
867    }
868
869    /// Sets the custom timeout of the request. Overrides default [Client] timeout.
870    /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
871    /// To use default timeout, simply do not call this function.
872    ///
873    /// # Examples
874    /// ```no_run
875    /// # #[tokio::main]
876    /// # async fn main() -> Result<(), async_nats::Error> {
877    /// let client = async_nats::connect("demo.nats.io").await?;
878    /// let request = async_nats::Request::new()
879    ///     .timeout(Some(std::time::Duration::from_secs(15)))
880    ///     .payload("data".into());
881    /// client.send_request("service", request).await?;
882    /// # Ok(())
883    /// # }
884    /// ```
885    pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
886        self.timeout = Some(timeout);
887        self
888    }
889
890    /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
891    ///
892    /// # Examples
893    /// ```no_run
894    /// # #[tokio::main]
895    /// # async fn main() -> Result<(), async_nats::Error> {
896    /// use std::str::FromStr;
897    /// let client = async_nats::connect("demo.nats.io").await?;
898    /// let request = async_nats::Request::new()
899    ///     .inbox("custom_inbox".into())
900    ///     .payload("data".into());
901    /// client.send_request("service", request).await?;
902    /// # Ok(())
903    /// # }
904    /// ```
905    pub fn inbox(mut self, inbox: String) -> Request {
906        self.inbox = Some(inbox);
907        self
908    }
909}
910
911#[derive(Error, Debug)]
912#[error("failed to send reconnect: {0}")]
913pub struct ReconnectError(#[source] crate::Error);
914
915impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
916    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
917        ReconnectError(Box::new(err))
918    }
919}
920
921#[derive(Error, Debug)]
922#[error("failed to send subscribe: {0}")]
923pub struct SubscribeError(#[source] crate::Error);
924
925impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
926    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
927        SubscribeError(Box::new(err))
928    }
929}
930
931#[derive(Error, Debug)]
932#[error("failed to send drain: {0}")]
933pub struct DrainError(#[source] crate::Error);
934
935impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
936    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
937        DrainError(Box::new(err))
938    }
939}
940
941#[derive(Clone, Copy, Debug, PartialEq)]
942pub enum RequestErrorKind {
943    /// There are services listening on requested subject, but they didn't respond
944    /// in time.
945    TimedOut,
946    /// No one is listening on request subject.
947    NoResponders,
948    /// Other errors, client/io related.
949    Other,
950}
951
952impl Display for RequestErrorKind {
953    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
954        match self {
955            Self::TimedOut => write!(f, "request timed out"),
956            Self::NoResponders => write!(f, "no responders"),
957            Self::Other => write!(f, "request failed"),
958        }
959    }
960}
961
962/// Error returned when a core NATS request fails.
963/// To be enumerate over the variants, call [RequestError::kind].
964pub type RequestError = Error<RequestErrorKind>;
965
966impl From<PublishError> for RequestError {
967    fn from(e: PublishError) -> Self {
968        RequestError::with_source(RequestErrorKind::Other, e)
969    }
970}
971
972impl From<SubscribeError> for RequestError {
973    fn from(e: SubscribeError) -> Self {
974        RequestError::with_source(RequestErrorKind::Other, e)
975    }
976}
977
978#[derive(Clone, Copy, Debug, PartialEq)]
979pub enum FlushErrorKind {
980    /// Sending the flush failed client side.
981    SendError,
982    /// Flush failed.
983    /// This can happen mostly in case of connection issues
984    /// that cannot be resolved quickly.
985    FlushError,
986}
987
988impl Display for FlushErrorKind {
989    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
990        match self {
991            Self::SendError => write!(f, "failed to send flush request"),
992            Self::FlushError => write!(f, "flush failed"),
993        }
994    }
995}
996
997pub type FlushError = Error<FlushErrorKind>;
998
999/// Represents statistics for the instance of the client throughout its lifecycle.
1000#[derive(Default, Debug)]
1001pub struct Statistics {
1002    /// Number of bytes received. This does not include the protocol overhead.
1003    pub in_bytes: AtomicU64,
1004    /// Number of bytes sent. This doe not include the protocol overhead.
1005    pub out_bytes: AtomicU64,
1006    /// Number of messages received.
1007    pub in_messages: AtomicU64,
1008    /// Number of messages sent.
1009    pub out_messages: AtomicU64,
1010    /// Number of times connection was established.
1011    /// Initial connect will be counted as well, then all successful reconnects.
1012    pub connects: AtomicU64,
1013}