async_nats/
client.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::subject::ToSubject;
20use crate::{PublishMessage, ServerInfo};
21
22use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
23use crate::error::Error;
24use bytes::Bytes;
25use futures::future::TryFutureExt;
26use futures::{Sink, SinkExt as _, StreamExt};
27use once_cell::sync::Lazy;
28use portable_atomic::AtomicU64;
29use regex::Regex;
30use std::fmt::Display;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::sync::Arc;
33use std::time::Duration;
34use thiserror::Error;
35use tokio::sync::{mpsc, oneshot};
36use tokio_util::sync::PollSender;
37use tracing::trace;
38
39static VERSION_RE: Lazy<Regex> =
40    Lazy::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
41
42/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
43/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
44pub type PublishError = Error<PublishErrorKind>;
45
46impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
47    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
48        PublishError::with_source(PublishErrorKind::Send, err)
49    }
50}
51
52impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
53    fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
54        PublishError::with_source(PublishErrorKind::Send, err)
55    }
56}
57
58#[derive(Copy, Clone, Debug, PartialEq)]
59pub enum PublishErrorKind {
60    MaxPayloadExceeded,
61    BadSubject,
62    Send,
63}
64
65impl Display for PublishErrorKind {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        match self {
68            PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
69            PublishErrorKind::Send => write!(f, "failed to send message"),
70            PublishErrorKind::BadSubject => write!(f, "bad subject"),
71        }
72    }
73}
74
75/// Client is a `Cloneable` handle to NATS connection.
76/// Client should not be created directly. Instead, one of two methods can be used:
77/// [crate::connect] and [crate::ConnectOptions::connect]
78#[derive(Clone, Debug)]
79pub struct Client {
80    info: tokio::sync::watch::Receiver<ServerInfo>,
81    pub(crate) state: tokio::sync::watch::Receiver<State>,
82    pub(crate) sender: mpsc::Sender<Command>,
83    poll_sender: PollSender<Command>,
84    next_subscription_id: Arc<AtomicU64>,
85    subscription_capacity: usize,
86    inbox_prefix: Arc<str>,
87    request_timeout: Option<Duration>,
88    max_payload: Arc<AtomicUsize>,
89    connection_stats: Arc<Statistics>,
90}
91
92pub mod traits {
93    use std::{future::Future, time::Duration};
94
95    use bytes::Bytes;
96
97    use crate::{subject::ToSubject, Message};
98
99    use super::{PublishError, Request, RequestError, SubscribeError};
100
101    pub trait Publisher {
102        fn publish_with_reply<S: ToSubject, R: ToSubject>(
103            &self,
104            subject: S,
105            reply: R,
106            payload: Bytes,
107        ) -> impl Future<Output = Result<(), PublishError>>;
108    }
109    pub trait Subscriber {
110        fn subscribe<S: ToSubject>(
111            &self,
112            subject: S,
113        ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>;
114    }
115    pub trait Requester {
116        fn send_request<S: ToSubject>(
117            &self,
118            subject: S,
119            request: Request,
120        ) -> impl Future<Output = Result<Message, RequestError>>;
121    }
122    pub trait TimeoutProvider {
123        fn timeout(&self) -> Option<Duration>;
124    }
125}
126
127impl traits::Requester for Client {
128    fn send_request<S: ToSubject>(
129        &self,
130        subject: S,
131        request: Request,
132    ) -> impl Future<Output = Result<Message, RequestError>> {
133        self.send_request(subject, request)
134    }
135}
136
137impl traits::TimeoutProvider for Client {
138    fn timeout(&self) -> Option<Duration> {
139        self.timeout()
140    }
141}
142
143impl traits::Publisher for Client {
144    fn publish_with_reply<S: ToSubject, R: ToSubject>(
145        &self,
146        subject: S,
147        reply: R,
148        payload: Bytes,
149    ) -> impl Future<Output = Result<(), PublishError>> {
150        self.publish_with_reply(subject, reply, payload)
151    }
152}
153
154impl traits::Subscriber for Client {
155    fn subscribe<S: ToSubject>(
156        &self,
157        subject: S,
158    ) -> impl Future<Output = Result<Subscriber, SubscribeError>> {
159        self.subscribe(subject)
160    }
161}
162
163impl Sink<PublishMessage> for Client {
164    type Error = PublishError;
165
166    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167        self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
168    }
169
170    fn start_send(mut self: Pin<&mut Self>, msg: PublishMessage) -> Result<(), Self::Error> {
171        self.poll_sender
172            .start_send_unpin(Command::Publish(msg))
173            .map_err(Into::into)
174    }
175
176    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
177        self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
178    }
179
180    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181        self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
182    }
183}
184
185impl Client {
186    #[allow(clippy::too_many_arguments)]
187    pub(crate) fn new(
188        info: tokio::sync::watch::Receiver<ServerInfo>,
189        state: tokio::sync::watch::Receiver<State>,
190        sender: mpsc::Sender<Command>,
191        capacity: usize,
192        inbox_prefix: String,
193        request_timeout: Option<Duration>,
194        max_payload: Arc<AtomicUsize>,
195        statistics: Arc<Statistics>,
196    ) -> Client {
197        let poll_sender = PollSender::new(sender.clone());
198        Client {
199            info,
200            state,
201            sender,
202            poll_sender,
203            next_subscription_id: Arc::new(AtomicU64::new(1)),
204            subscription_capacity: capacity,
205            inbox_prefix: inbox_prefix.into(),
206            request_timeout,
207            max_payload,
208            connection_stats: statistics,
209        }
210    }
211
212    /// Returns the default timeout for requests set when creating the client.
213    ///
214    /// # Examples
215    /// ```no_run
216    /// # #[tokio::main]
217    /// # async fn main() -> Result<(), async_nats::Error> {
218    /// let client = async_nats::connect("demo.nats.io").await?;
219    /// println!("default request timeout: {:?}", client.timeout());
220    /// # Ok(())
221    /// # }
222    /// ```
223    pub fn timeout(&self) -> Option<Duration> {
224        self.request_timeout
225    }
226
227    /// Returns last received info from the server.
228    ///
229    /// # Examples
230    ///
231    /// ```no_run
232    /// # #[tokio::main]
233    /// # async fn main () -> Result<(), async_nats::Error> {
234    /// let client = async_nats::connect("demo.nats.io").await?;
235    /// println!("info: {:?}", client.server_info());
236    /// # Ok(())
237    /// # }
238    /// ```
239    pub fn server_info(&self) -> ServerInfo {
240        // We ignore notifying the watcher, as that requires mutable client reference.
241        self.info.borrow().to_owned()
242    }
243
244    /// Returns true if the server version is compatible with the version components.
245    ///
246    /// This has to be used with caution, as it is not guaranteed that the server
247    /// that client is connected to is the same version that the one that is
248    /// a JetStream meta/stream/consumer leader, especially across leafnodes.
249    ///
250    /// # Examples
251    ///
252    /// ```no_run
253    /// # #[tokio::main]
254    /// # async fn main() -> Result<(), async_nats::Error> {
255    /// let client = async_nats::connect("demo.nats.io").await?;
256    /// assert!(client.is_server_compatible(2, 8, 4));
257    /// # Ok(())
258    /// # }
259    /// ```
260    pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
261        let info = self.server_info();
262
263        let server_version_captures = match VERSION_RE.captures(&info.version) {
264            Some(captures) => captures,
265            None => return false,
266        };
267
268        let server_major = server_version_captures
269            .get(1)
270            .map(|m| m.as_str().parse::<i64>().unwrap())
271            .unwrap();
272
273        let server_minor = server_version_captures
274            .get(2)
275            .map(|m| m.as_str().parse::<i64>().unwrap())
276            .unwrap();
277
278        let server_patch = server_version_captures
279            .get(3)
280            .map(|m| m.as_str().parse::<i64>().unwrap())
281            .unwrap();
282
283        if server_major < major
284            || (server_major == major && server_minor < minor)
285            || (server_major == major && server_minor == minor && server_patch < patch)
286        {
287            return false;
288        }
289        true
290    }
291
292    /// Publish a [Message] to a given subject.
293    ///
294    /// # Examples
295    /// ```no_run
296    /// # #[tokio::main]
297    /// # async fn main() -> Result<(), async_nats::Error> {
298    /// let client = async_nats::connect("demo.nats.io").await?;
299    /// client.publish("events.data", "payload".into()).await?;
300    /// # Ok(())
301    /// # }
302    /// ```
303    pub async fn publish<S: ToSubject>(
304        &self,
305        subject: S,
306        payload: Bytes,
307    ) -> Result<(), PublishError> {
308        let subject = subject.to_subject();
309        let max_payload = self.max_payload.load(Ordering::Relaxed);
310        if payload.len() > max_payload {
311            return Err(PublishError::with_source(
312                PublishErrorKind::MaxPayloadExceeded,
313                format!(
314                    "Payload size limit of {} exceeded by message size of {}",
315                    max_payload,
316                    payload.len(),
317                ),
318            ));
319        }
320
321        self.sender
322            .send(Command::Publish(PublishMessage {
323                subject,
324                payload,
325                reply: None,
326                headers: None,
327            }))
328            .await?;
329        Ok(())
330    }
331
332    /// Publish a [Message] with headers to a given subject.
333    ///
334    /// # Examples
335    /// ```
336    /// # #[tokio::main]
337    /// # async fn main() -> Result<(), async_nats::Error> {
338    /// use std::str::FromStr;
339    /// let client = async_nats::connect("demo.nats.io").await?;
340    /// let mut headers = async_nats::HeaderMap::new();
341    /// headers.insert(
342    ///     "X-Header",
343    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
344    /// );
345    /// client
346    ///     .publish_with_headers("events.data", headers, "payload".into())
347    ///     .await?;
348    /// # Ok(())
349    /// # }
350    /// ```
351    pub async fn publish_with_headers<S: ToSubject>(
352        &self,
353        subject: S,
354        headers: HeaderMap,
355        payload: Bytes,
356    ) -> Result<(), PublishError> {
357        let subject = subject.to_subject();
358
359        self.sender
360            .send(Command::Publish(PublishMessage {
361                subject,
362                payload,
363                reply: None,
364                headers: Some(headers),
365            }))
366            .await?;
367        Ok(())
368    }
369
370    /// Publish a [Message] to a given subject, with specified response subject
371    /// to which the subscriber can respond.
372    /// This method does not await for the response.
373    ///
374    /// # Examples
375    ///
376    /// ```no_run
377    /// # #[tokio::main]
378    /// # async fn main() -> Result<(), async_nats::Error> {
379    /// let client = async_nats::connect("demo.nats.io").await?;
380    /// client
381    ///     .publish_with_reply("events.data", "reply_subject", "payload".into())
382    ///     .await?;
383    /// # Ok(())
384    /// # }
385    /// ```
386    pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
387        &self,
388        subject: S,
389        reply: R,
390        payload: Bytes,
391    ) -> Result<(), PublishError> {
392        let subject = subject.to_subject();
393        let reply = reply.to_subject();
394
395        self.sender
396            .send(Command::Publish(PublishMessage {
397                subject,
398                payload,
399                reply: Some(reply),
400                headers: None,
401            }))
402            .await?;
403        Ok(())
404    }
405
406    /// Publish a [Message] to a given subject with headers and specified response subject
407    /// to which the subscriber can respond.
408    /// This method does not await for the response.
409    ///
410    /// # Examples
411    ///
412    /// ```no_run
413    /// # #[tokio::main]
414    /// # async fn main() -> Result<(), async_nats::Error> {
415    /// use std::str::FromStr;
416    /// let client = async_nats::connect("demo.nats.io").await?;
417    /// let mut headers = async_nats::HeaderMap::new();
418    /// client
419    ///     .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
420    ///     .await?;
421    /// # Ok(())
422    /// # }
423    /// ```
424    pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
425        &self,
426        subject: S,
427        reply: R,
428        headers: HeaderMap,
429        payload: Bytes,
430    ) -> Result<(), PublishError> {
431        let subject = subject.to_subject();
432        let reply = reply.to_subject();
433
434        self.sender
435            .send(Command::Publish(PublishMessage {
436                subject,
437                payload,
438                reply: Some(reply),
439                headers: Some(headers),
440            }))
441            .await?;
442        Ok(())
443    }
444
445    /// Sends the request with headers.
446    ///
447    /// # Examples
448    /// ```no_run
449    /// # #[tokio::main]
450    /// # async fn main() -> Result<(), async_nats::Error> {
451    /// let client = async_nats::connect("demo.nats.io").await?;
452    /// let response = client.request("service", "data".into()).await?;
453    /// # Ok(())
454    /// # }
455    /// ```
456    pub async fn request<S: ToSubject>(
457        &self,
458        subject: S,
459        payload: Bytes,
460    ) -> Result<Message, RequestError> {
461        let subject = subject.to_subject();
462
463        trace!(
464            "request sent to subject: {} ({})",
465            subject.as_ref(),
466            payload.len()
467        );
468        let request = Request::new().payload(payload);
469        self.send_request(subject, request).await
470    }
471
472    /// Sends the request with headers.
473    ///
474    /// # Examples
475    /// ```no_run
476    /// # #[tokio::main]
477    /// # async fn main() -> Result<(), async_nats::Error> {
478    /// let client = async_nats::connect("demo.nats.io").await?;
479    /// let mut headers = async_nats::HeaderMap::new();
480    /// headers.insert("Key", "Value");
481    /// let response = client
482    ///     .request_with_headers("service", headers, "data".into())
483    ///     .await?;
484    /// # Ok(())
485    /// # }
486    /// ```
487    pub async fn request_with_headers<S: ToSubject>(
488        &self,
489        subject: S,
490        headers: HeaderMap,
491        payload: Bytes,
492    ) -> Result<Message, RequestError> {
493        let subject = subject.to_subject();
494
495        let request = Request::new().headers(headers).payload(payload);
496        self.send_request(subject, request).await
497    }
498
499    /// Sends the request created by the [Request].
500    ///
501    /// # Examples
502    ///
503    /// ```no_run
504    /// # #[tokio::main]
505    /// # async fn main() -> Result<(), async_nats::Error> {
506    /// let client = async_nats::connect("demo.nats.io").await?;
507    /// let request = async_nats::Request::new().payload("data".into());
508    /// let response = client.send_request("service", request).await?;
509    /// # Ok(())
510    /// # }
511    /// ```
512    pub async fn send_request<S: ToSubject>(
513        &self,
514        subject: S,
515        request: Request,
516    ) -> Result<Message, RequestError> {
517        let subject = subject.to_subject();
518
519        if let Some(inbox) = request.inbox {
520            let timeout = request.timeout.unwrap_or(self.request_timeout);
521            let mut subscriber = self.subscribe(inbox.clone()).await?;
522            let payload: Bytes = request.payload.unwrap_or_default();
523            match request.headers {
524                Some(headers) => {
525                    self.publish_with_reply_and_headers(subject, inbox, headers, payload)
526                        .await?
527                }
528                None => self.publish_with_reply(subject, inbox, payload).await?,
529            }
530            let request = match timeout {
531                Some(timeout) => {
532                    tokio::time::timeout(timeout, subscriber.next())
533                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
534                        .await?
535                }
536                None => subscriber.next().await,
537            };
538            match request {
539                Some(message) => {
540                    if message.status == Some(StatusCode::NO_RESPONDERS) {
541                        return Err(RequestError::with_source(
542                            RequestErrorKind::NoResponders,
543                            "no responders",
544                        ));
545                    }
546                    Ok(message)
547                }
548                None => Err(RequestError::with_source(
549                    RequestErrorKind::Other,
550                    "broken pipe",
551                )),
552            }
553        } else {
554            let (sender, receiver) = oneshot::channel();
555
556            let payload = request.payload.unwrap_or_default();
557            let respond = self.new_inbox().into();
558            let headers = request.headers;
559
560            self.sender
561                .send(Command::Request {
562                    subject,
563                    payload,
564                    respond,
565                    headers,
566                    sender,
567                })
568                .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
569                .await?;
570
571            let timeout = request.timeout.unwrap_or(self.request_timeout);
572            let request = match timeout {
573                Some(timeout) => {
574                    tokio::time::timeout(timeout, receiver)
575                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
576                        .await?
577                }
578                None => receiver.await,
579            };
580
581            match request {
582                Ok(message) => {
583                    if message.status == Some(StatusCode::NO_RESPONDERS) {
584                        return Err(RequestError::with_source(
585                            RequestErrorKind::NoResponders,
586                            "no responders",
587                        ));
588                    }
589                    Ok(message)
590                }
591                Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
592            }
593        }
594    }
595
596    /// Create a new globally unique inbox which can be used for replies.
597    ///
598    /// # Examples
599    ///
600    /// ```no_run
601    /// # #[tokio::main]
602    /// # async fn main() -> Result<(), async_nats::Error> {
603    /// # let mut nc = async_nats::connect("demo.nats.io").await?;
604    /// let reply = nc.new_inbox();
605    /// let rsub = nc.subscribe(reply).await?;
606    /// # Ok(())
607    /// # }
608    /// ```
609    pub fn new_inbox(&self) -> String {
610        format!("{}.{}", self.inbox_prefix, nuid::next())
611    }
612
613    /// Subscribes to a subject to receive [messages][Message].
614    ///
615    /// # Examples
616    ///
617    /// ```no_run
618    /// # #[tokio::main]
619    /// # async fn main() -> Result<(), async_nats::Error> {
620    /// use futures::StreamExt;
621    /// let client = async_nats::connect("demo.nats.io").await?;
622    /// let mut subscription = client.subscribe("events.>").await?;
623    /// while let Some(message) = subscription.next().await {
624    ///     println!("received message: {:?}", message);
625    /// }
626    /// # Ok(())
627    /// # }
628    /// ```
629    pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
630        let subject = subject.to_subject();
631        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
632        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
633
634        self.sender
635            .send(Command::Subscribe {
636                sid,
637                subject,
638                queue_group: None,
639                sender,
640            })
641            .await?;
642
643        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
644    }
645
646    /// Subscribes to a subject with a queue group to receive [messages][Message].
647    ///
648    /// # Examples
649    ///
650    /// ```no_run
651    /// # #[tokio::main]
652    /// # async fn main() -> Result<(), async_nats::Error> {
653    /// use futures::StreamExt;
654    /// let client = async_nats::connect("demo.nats.io").await?;
655    /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
656    /// while let Some(message) = subscription.next().await {
657    ///     println!("received message: {:?}", message);
658    /// }
659    /// # Ok(())
660    /// # }
661    /// ```
662    pub async fn queue_subscribe<S: ToSubject>(
663        &self,
664        subject: S,
665        queue_group: String,
666    ) -> Result<Subscriber, SubscribeError> {
667        let subject = subject.to_subject();
668
669        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
670        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
671
672        self.sender
673            .send(Command::Subscribe {
674                sid,
675                subject,
676                queue_group: Some(queue_group),
677                sender,
678            })
679            .await?;
680
681        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
682    }
683
684    /// Flushes the internal buffer ensuring that all messages are sent.
685    ///
686    /// # Examples
687    ///
688    /// ```no_run
689    /// # #[tokio::main]
690    /// # async fn main() -> Result<(), async_nats::Error> {
691    /// let client = async_nats::connect("demo.nats.io").await?;
692    /// client.flush().await?;
693    /// # Ok(())
694    /// # }
695    /// ```
696    pub async fn flush(&self) -> Result<(), FlushError> {
697        let (tx, rx) = tokio::sync::oneshot::channel();
698        self.sender
699            .send(Command::Flush { observer: tx })
700            .await
701            .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
702
703        rx.await
704            .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
705        Ok(())
706    }
707
708    /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
709    /// messages, then closes the connection. Once completed, any associated streams associated with the
710    /// client will be closed, and further client commands will fail
711    ///
712    /// # Examples
713    ///
714    /// ```no_run
715    /// # #[tokio::main]
716    /// # async fn main() -> Result<(), async_nats::Error> {
717    /// use futures::StreamExt;
718    /// let client = async_nats::connect("demo.nats.io").await?;
719    /// let mut subscription = client.subscribe("events.>").await?;
720    ///
721    /// client.drain().await?;
722    ///
723    /// # // existing subscriptions are closed and further commands will fail
724    /// assert!(subscription.next().await.is_none());
725    /// client
726    ///     .subscribe("events.>")
727    ///     .await
728    ///     .expect_err("Expected further commands to fail");
729    ///
730    /// # Ok(())
731    /// # }
732    /// ```
733    pub async fn drain(&self) -> Result<(), DrainError> {
734        // Drain all subscriptions
735        self.sender.send(Command::Drain { sid: None }).await?;
736
737        // Remaining process is handled on the handler-side
738        Ok(())
739    }
740
741    /// Returns the current state of the connection.
742    ///
743    /// # Examples
744    /// ```no_run
745    /// # #[tokio::main]
746    /// # async fn main() -> Result<(), async_nats::Error> {
747    /// let client = async_nats::connect("demo.nats.io").await?;
748    /// println!("connection state: {}", client.connection_state());
749    /// # Ok(())
750    /// # }
751    /// ```
752    pub fn connection_state(&self) -> State {
753        self.state.borrow().to_owned()
754    }
755
756    /// Forces the client to reconnect.
757    /// Keep in mind that client will reconnect automatically if the connection is lost and this
758    /// method does not have to be used in normal circumstances.
759    /// However, if you want to force the client to reconnect, for example to re-trigger
760    /// the `auth-callback`, or manually rebalance connections, this method can be useful.
761    /// This method does not wait for connection to be re-established.
762    ///
763    /// # Examples
764    /// ```no_run
765    /// # #[tokio::main]
766    /// # async fn main() -> Result<(), async_nats::Error> {
767    /// let client = async_nats::connect("demo.nats.io").await?;
768    /// client.force_reconnect().await?;
769    /// # Ok(())
770    /// # }
771    /// ```
772    pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
773        self.sender
774            .send(Command::Reconnect)
775            .await
776            .map_err(Into::into)
777    }
778
779    /// Returns struct representing statistics of the whole lifecycle of the client.
780    /// This includes number of bytes sent/received, number of messages sent/received,
781    /// and number of times the connection was established.
782    /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
783    /// across threads.
784    ///
785    /// # Examples
786    /// ```no_run
787    /// # #[tokio::main]
788    /// # async fn main() -> Result<(), async_nats::Error> {
789    /// let client = async_nats::connect("demo.nats.io").await?;
790    /// let statistics = client.statistics();
791    /// println!("client statistics: {:#?}", statistics);
792    /// # Ok(())
793    /// # }
794    /// ```
795    pub fn statistics(&self) -> Arc<Statistics> {
796        self.connection_stats.clone()
797    }
798}
799
800/// Used for building customized requests.
801#[derive(Default)]
802pub struct Request {
803    payload: Option<Bytes>,
804    headers: Option<HeaderMap>,
805    timeout: Option<Option<Duration>>,
806    inbox: Option<String>,
807}
808
809impl Request {
810    pub fn new() -> Request {
811        Default::default()
812    }
813
814    /// Sets the payload of the request. If not used, empty payload will be sent.
815    ///
816    /// # Examples
817    /// ```no_run
818    /// # #[tokio::main]
819    /// # async fn main() -> Result<(), async_nats::Error> {
820    /// let client = async_nats::connect("demo.nats.io").await?;
821    /// let request = async_nats::Request::new().payload("data".into());
822    /// client.send_request("service", request).await?;
823    /// # Ok(())
824    /// # }
825    /// ```
826    pub fn payload(mut self, payload: Bytes) -> Request {
827        self.payload = Some(payload);
828        self
829    }
830
831    /// Sets the headers of the requests.
832    ///
833    /// # Examples
834    /// ```no_run
835    /// # #[tokio::main]
836    /// # async fn main() -> Result<(), async_nats::Error> {
837    /// use std::str::FromStr;
838    /// let client = async_nats::connect("demo.nats.io").await?;
839    /// let mut headers = async_nats::HeaderMap::new();
840    /// headers.insert(
841    ///     "X-Example",
842    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
843    /// );
844    /// let request = async_nats::Request::new()
845    ///     .headers(headers)
846    ///     .payload("data".into());
847    /// client.send_request("service", request).await?;
848    /// # Ok(())
849    /// # }
850    /// ```
851    pub fn headers(mut self, headers: HeaderMap) -> Request {
852        self.headers = Some(headers);
853        self
854    }
855
856    /// Sets the custom timeout of the request. Overrides default [Client] timeout.
857    /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
858    /// To use default timeout, simply do not call this function.
859    ///
860    /// # Examples
861    /// ```no_run
862    /// # #[tokio::main]
863    /// # async fn main() -> Result<(), async_nats::Error> {
864    /// let client = async_nats::connect("demo.nats.io").await?;
865    /// let request = async_nats::Request::new()
866    ///     .timeout(Some(std::time::Duration::from_secs(15)))
867    ///     .payload("data".into());
868    /// client.send_request("service", request).await?;
869    /// # Ok(())
870    /// # }
871    /// ```
872    pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
873        self.timeout = Some(timeout);
874        self
875    }
876
877    /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
878    ///
879    /// # Examples
880    /// ```no_run
881    /// # #[tokio::main]
882    /// # async fn main() -> Result<(), async_nats::Error> {
883    /// use std::str::FromStr;
884    /// let client = async_nats::connect("demo.nats.io").await?;
885    /// let request = async_nats::Request::new()
886    ///     .inbox("custom_inbox".into())
887    ///     .payload("data".into());
888    /// client.send_request("service", request).await?;
889    /// # Ok(())
890    /// # }
891    /// ```
892    pub fn inbox(mut self, inbox: String) -> Request {
893        self.inbox = Some(inbox);
894        self
895    }
896}
897
898#[derive(Error, Debug)]
899#[error("failed to send reconnect: {0}")]
900pub struct ReconnectError(#[source] crate::Error);
901
902impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
903    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
904        ReconnectError(Box::new(err))
905    }
906}
907
908#[derive(Error, Debug)]
909#[error("failed to send subscribe: {0}")]
910pub struct SubscribeError(#[source] crate::Error);
911
912impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
913    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
914        SubscribeError(Box::new(err))
915    }
916}
917
918#[derive(Error, Debug)]
919#[error("failed to send drain: {0}")]
920pub struct DrainError(#[source] crate::Error);
921
922impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
923    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
924        DrainError(Box::new(err))
925    }
926}
927
928#[derive(Clone, Copy, Debug, PartialEq)]
929pub enum RequestErrorKind {
930    /// There are services listening on requested subject, but they didn't respond
931    /// in time.
932    TimedOut,
933    /// No one is listening on request subject.
934    NoResponders,
935    /// Other errors, client/io related.
936    Other,
937}
938
939impl Display for RequestErrorKind {
940    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
941        match self {
942            Self::TimedOut => write!(f, "request timed out"),
943            Self::NoResponders => write!(f, "no responders"),
944            Self::Other => write!(f, "request failed"),
945        }
946    }
947}
948
949/// Error returned when a core NATS request fails.
950/// To be enumerate over the variants, call [RequestError::kind].
951pub type RequestError = Error<RequestErrorKind>;
952
953impl From<PublishError> for RequestError {
954    fn from(e: PublishError) -> Self {
955        RequestError::with_source(RequestErrorKind::Other, e)
956    }
957}
958
959impl From<SubscribeError> for RequestError {
960    fn from(e: SubscribeError) -> Self {
961        RequestError::with_source(RequestErrorKind::Other, e)
962    }
963}
964
965#[derive(Clone, Copy, Debug, PartialEq)]
966pub enum FlushErrorKind {
967    /// Sending the flush failed client side.
968    SendError,
969    /// Flush failed.
970    /// This can happen mostly in case of connection issues
971    /// that cannot be resolved quickly.
972    FlushError,
973}
974
975impl Display for FlushErrorKind {
976    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
977        match self {
978            Self::SendError => write!(f, "failed to send flush request"),
979            Self::FlushError => write!(f, "flush failed"),
980        }
981    }
982}
983
984pub type FlushError = Error<FlushErrorKind>;
985
986/// Represents statistics for the instance of the client throughout its lifecycle.
987#[derive(Default, Debug)]
988pub struct Statistics {
989    /// Number of bytes received. This does not include the protocol overhead.
990    pub in_bytes: AtomicU64,
991    /// Number of bytes sent. This doe not include the protocol overhead.
992    pub out_bytes: AtomicU64,
993    /// Number of messages received.
994    pub in_messages: AtomicU64,
995    /// Number of messages sent.
996    pub out_messages: AtomicU64,
997    /// Number of times connection was established.
998    /// Initial connect will be counted as well, then all successful reconnects.
999    pub connects: AtomicU64,
1000}