async_nats/client.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::subject::ToSubject;
20use crate::{PublishMessage, ServerInfo};
21
22use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
23use crate::error::Error;
24use bytes::Bytes;
25use futures::future::TryFutureExt;
26use futures::{Sink, SinkExt as _, StreamExt};
27use once_cell::sync::Lazy;
28use portable_atomic::AtomicU64;
29use regex::Regex;
30use std::fmt::Display;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::sync::Arc;
33use std::time::Duration;
34use thiserror::Error;
35use tokio::sync::{mpsc, oneshot};
36use tokio_util::sync::PollSender;
37use tracing::trace;
38
39static VERSION_RE: Lazy<Regex> =
40 Lazy::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
41
42/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
43/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
44pub type PublishError = Error<PublishErrorKind>;
45
46impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
47 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
48 PublishError::with_source(PublishErrorKind::Send, err)
49 }
50}
51
52impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
53 fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
54 PublishError::with_source(PublishErrorKind::Send, err)
55 }
56}
57
58#[derive(Copy, Clone, Debug, PartialEq)]
59pub enum PublishErrorKind {
60 MaxPayloadExceeded,
61 BadSubject,
62 Send,
63}
64
65impl Display for PublishErrorKind {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match self {
68 PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
69 PublishErrorKind::Send => write!(f, "failed to send message"),
70 PublishErrorKind::BadSubject => write!(f, "bad subject"),
71 }
72 }
73}
74
75/// Client is a `Cloneable` handle to NATS connection.
76/// Client should not be created directly. Instead, one of two methods can be used:
77/// [crate::connect] and [crate::ConnectOptions::connect]
78#[derive(Clone, Debug)]
79pub struct Client {
80 info: tokio::sync::watch::Receiver<ServerInfo>,
81 pub(crate) state: tokio::sync::watch::Receiver<State>,
82 pub(crate) sender: mpsc::Sender<Command>,
83 poll_sender: PollSender<Command>,
84 next_subscription_id: Arc<AtomicU64>,
85 subscription_capacity: usize,
86 inbox_prefix: Arc<str>,
87 request_timeout: Option<Duration>,
88 max_payload: Arc<AtomicUsize>,
89 connection_stats: Arc<Statistics>,
90}
91
92pub mod traits {
93 use std::{future::Future, time::Duration};
94
95 use bytes::Bytes;
96
97 use crate::{subject::ToSubject, Message};
98
99 use super::{PublishError, Request, RequestError, SubscribeError};
100
101 pub trait Publisher {
102 fn publish_with_reply<S: ToSubject, R: ToSubject>(
103 &self,
104 subject: S,
105 reply: R,
106 payload: Bytes,
107 ) -> impl Future<Output = Result<(), PublishError>>;
108 }
109 pub trait Subscriber {
110 fn subscribe<S: ToSubject>(
111 &self,
112 subject: S,
113 ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>;
114 }
115 pub trait Requester {
116 fn send_request<S: ToSubject>(
117 &self,
118 subject: S,
119 request: Request,
120 ) -> impl Future<Output = Result<Message, RequestError>>;
121 }
122 pub trait TimeoutProvider {
123 fn timeout(&self) -> Option<Duration>;
124 }
125}
126
127impl traits::Requester for Client {
128 fn send_request<S: ToSubject>(
129 &self,
130 subject: S,
131 request: Request,
132 ) -> impl Future<Output = Result<Message, RequestError>> {
133 self.send_request(subject, request)
134 }
135}
136
137impl traits::TimeoutProvider for Client {
138 fn timeout(&self) -> Option<Duration> {
139 self.timeout()
140 }
141}
142
143impl traits::Publisher for Client {
144 fn publish_with_reply<S: ToSubject, R: ToSubject>(
145 &self,
146 subject: S,
147 reply: R,
148 payload: Bytes,
149 ) -> impl Future<Output = Result<(), PublishError>> {
150 self.publish_with_reply(subject, reply, payload)
151 }
152}
153
154impl traits::Subscriber for Client {
155 fn subscribe<S: ToSubject>(
156 &self,
157 subject: S,
158 ) -> impl Future<Output = Result<Subscriber, SubscribeError>> {
159 self.subscribe(subject)
160 }
161}
162
163impl Sink<PublishMessage> for Client {
164 type Error = PublishError;
165
166 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167 self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
168 }
169
170 fn start_send(mut self: Pin<&mut Self>, msg: PublishMessage) -> Result<(), Self::Error> {
171 self.poll_sender
172 .start_send_unpin(Command::Publish(msg))
173 .map_err(Into::into)
174 }
175
176 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
177 self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
178 }
179
180 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181 self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
182 }
183}
184
185impl Client {
186 #[allow(clippy::too_many_arguments)]
187 pub(crate) fn new(
188 info: tokio::sync::watch::Receiver<ServerInfo>,
189 state: tokio::sync::watch::Receiver<State>,
190 sender: mpsc::Sender<Command>,
191 capacity: usize,
192 inbox_prefix: String,
193 request_timeout: Option<Duration>,
194 max_payload: Arc<AtomicUsize>,
195 statistics: Arc<Statistics>,
196 ) -> Client {
197 let poll_sender = PollSender::new(sender.clone());
198 Client {
199 info,
200 state,
201 sender,
202 poll_sender,
203 next_subscription_id: Arc::new(AtomicU64::new(1)),
204 subscription_capacity: capacity,
205 inbox_prefix: inbox_prefix.into(),
206 request_timeout,
207 max_payload,
208 connection_stats: statistics,
209 }
210 }
211
212 /// Returns the default timeout for requests set when creating the client.
213 ///
214 /// # Examples
215 /// ```no_run
216 /// # #[tokio::main]
217 /// # async fn main() -> Result<(), async_nats::Error> {
218 /// let client = async_nats::connect("demo.nats.io").await?;
219 /// println!("default request timeout: {:?}", client.timeout());
220 /// # Ok(())
221 /// # }
222 /// ```
223 pub fn timeout(&self) -> Option<Duration> {
224 self.request_timeout
225 }
226
227 /// Returns last received info from the server.
228 ///
229 /// # Examples
230 ///
231 /// ```no_run
232 /// # #[tokio::main]
233 /// # async fn main () -> Result<(), async_nats::Error> {
234 /// let client = async_nats::connect("demo.nats.io").await?;
235 /// println!("info: {:?}", client.server_info());
236 /// # Ok(())
237 /// # }
238 /// ```
239 pub fn server_info(&self) -> ServerInfo {
240 // We ignore notifying the watcher, as that requires mutable client reference.
241 self.info.borrow().to_owned()
242 }
243
244 /// Returns true if the server version is compatible with the version components.
245 ///
246 /// This has to be used with caution, as it is not guaranteed that the server
247 /// that client is connected to is the same version that the one that is
248 /// a JetStream meta/stream/consumer leader, especially across leafnodes.
249 ///
250 /// # Examples
251 ///
252 /// ```no_run
253 /// # #[tokio::main]
254 /// # async fn main() -> Result<(), async_nats::Error> {
255 /// let client = async_nats::connect("demo.nats.io").await?;
256 /// assert!(client.is_server_compatible(2, 8, 4));
257 /// # Ok(())
258 /// # }
259 /// ```
260 pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
261 let info = self.server_info();
262
263 let server_version_captures = match VERSION_RE.captures(&info.version) {
264 Some(captures) => captures,
265 None => return false,
266 };
267
268 let server_major = server_version_captures
269 .get(1)
270 .map(|m| m.as_str().parse::<i64>().unwrap())
271 .unwrap();
272
273 let server_minor = server_version_captures
274 .get(2)
275 .map(|m| m.as_str().parse::<i64>().unwrap())
276 .unwrap();
277
278 let server_patch = server_version_captures
279 .get(3)
280 .map(|m| m.as_str().parse::<i64>().unwrap())
281 .unwrap();
282
283 if server_major < major
284 || (server_major == major && server_minor < minor)
285 || (server_major == major && server_minor == minor && server_patch < patch)
286 {
287 return false;
288 }
289 true
290 }
291
292 /// Publish a [Message] to a given subject.
293 ///
294 /// # Examples
295 /// ```no_run
296 /// # #[tokio::main]
297 /// # async fn main() -> Result<(), async_nats::Error> {
298 /// let client = async_nats::connect("demo.nats.io").await?;
299 /// client.publish("events.data", "payload".into()).await?;
300 /// # Ok(())
301 /// # }
302 /// ```
303 pub async fn publish<S: ToSubject>(
304 &self,
305 subject: S,
306 payload: Bytes,
307 ) -> Result<(), PublishError> {
308 let subject = subject.to_subject();
309 let max_payload = self.max_payload.load(Ordering::Relaxed);
310 if payload.len() > max_payload {
311 return Err(PublishError::with_source(
312 PublishErrorKind::MaxPayloadExceeded,
313 format!(
314 "Payload size limit of {} exceeded by message size of {}",
315 max_payload,
316 payload.len(),
317 ),
318 ));
319 }
320
321 self.sender
322 .send(Command::Publish(PublishMessage {
323 subject,
324 payload,
325 reply: None,
326 headers: None,
327 }))
328 .await?;
329 Ok(())
330 }
331
332 /// Publish a [Message] with headers to a given subject.
333 ///
334 /// # Examples
335 /// ```
336 /// # #[tokio::main]
337 /// # async fn main() -> Result<(), async_nats::Error> {
338 /// use std::str::FromStr;
339 /// let client = async_nats::connect("demo.nats.io").await?;
340 /// let mut headers = async_nats::HeaderMap::new();
341 /// headers.insert(
342 /// "X-Header",
343 /// async_nats::HeaderValue::from_str("Value").unwrap(),
344 /// );
345 /// client
346 /// .publish_with_headers("events.data", headers, "payload".into())
347 /// .await?;
348 /// # Ok(())
349 /// # }
350 /// ```
351 pub async fn publish_with_headers<S: ToSubject>(
352 &self,
353 subject: S,
354 headers: HeaderMap,
355 payload: Bytes,
356 ) -> Result<(), PublishError> {
357 let subject = subject.to_subject();
358
359 self.sender
360 .send(Command::Publish(PublishMessage {
361 subject,
362 payload,
363 reply: None,
364 headers: Some(headers),
365 }))
366 .await?;
367 Ok(())
368 }
369
370 /// Publish a [Message] to a given subject, with specified response subject
371 /// to which the subscriber can respond.
372 /// This method does not await for the response.
373 ///
374 /// # Examples
375 ///
376 /// ```no_run
377 /// # #[tokio::main]
378 /// # async fn main() -> Result<(), async_nats::Error> {
379 /// let client = async_nats::connect("demo.nats.io").await?;
380 /// client
381 /// .publish_with_reply("events.data", "reply_subject", "payload".into())
382 /// .await?;
383 /// # Ok(())
384 /// # }
385 /// ```
386 pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
387 &self,
388 subject: S,
389 reply: R,
390 payload: Bytes,
391 ) -> Result<(), PublishError> {
392 let subject = subject.to_subject();
393 let reply = reply.to_subject();
394
395 self.sender
396 .send(Command::Publish(PublishMessage {
397 subject,
398 payload,
399 reply: Some(reply),
400 headers: None,
401 }))
402 .await?;
403 Ok(())
404 }
405
406 /// Publish a [Message] to a given subject with headers and specified response subject
407 /// to which the subscriber can respond.
408 /// This method does not await for the response.
409 ///
410 /// # Examples
411 ///
412 /// ```no_run
413 /// # #[tokio::main]
414 /// # async fn main() -> Result<(), async_nats::Error> {
415 /// use std::str::FromStr;
416 /// let client = async_nats::connect("demo.nats.io").await?;
417 /// let mut headers = async_nats::HeaderMap::new();
418 /// client
419 /// .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
420 /// .await?;
421 /// # Ok(())
422 /// # }
423 /// ```
424 pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
425 &self,
426 subject: S,
427 reply: R,
428 headers: HeaderMap,
429 payload: Bytes,
430 ) -> Result<(), PublishError> {
431 let subject = subject.to_subject();
432 let reply = reply.to_subject();
433
434 self.sender
435 .send(Command::Publish(PublishMessage {
436 subject,
437 payload,
438 reply: Some(reply),
439 headers: Some(headers),
440 }))
441 .await?;
442 Ok(())
443 }
444
445 /// Sends the request with headers.
446 ///
447 /// # Examples
448 /// ```no_run
449 /// # #[tokio::main]
450 /// # async fn main() -> Result<(), async_nats::Error> {
451 /// let client = async_nats::connect("demo.nats.io").await?;
452 /// let response = client.request("service", "data".into()).await?;
453 /// # Ok(())
454 /// # }
455 /// ```
456 pub async fn request<S: ToSubject>(
457 &self,
458 subject: S,
459 payload: Bytes,
460 ) -> Result<Message, RequestError> {
461 let subject = subject.to_subject();
462
463 trace!(
464 "request sent to subject: {} ({})",
465 subject.as_ref(),
466 payload.len()
467 );
468 let request = Request::new().payload(payload);
469 self.send_request(subject, request).await
470 }
471
472 /// Sends the request with headers.
473 ///
474 /// # Examples
475 /// ```no_run
476 /// # #[tokio::main]
477 /// # async fn main() -> Result<(), async_nats::Error> {
478 /// let client = async_nats::connect("demo.nats.io").await?;
479 /// let mut headers = async_nats::HeaderMap::new();
480 /// headers.insert("Key", "Value");
481 /// let response = client
482 /// .request_with_headers("service", headers, "data".into())
483 /// .await?;
484 /// # Ok(())
485 /// # }
486 /// ```
487 pub async fn request_with_headers<S: ToSubject>(
488 &self,
489 subject: S,
490 headers: HeaderMap,
491 payload: Bytes,
492 ) -> Result<Message, RequestError> {
493 let subject = subject.to_subject();
494
495 let request = Request::new().headers(headers).payload(payload);
496 self.send_request(subject, request).await
497 }
498
499 /// Sends the request created by the [Request].
500 ///
501 /// # Examples
502 ///
503 /// ```no_run
504 /// # #[tokio::main]
505 /// # async fn main() -> Result<(), async_nats::Error> {
506 /// let client = async_nats::connect("demo.nats.io").await?;
507 /// let request = async_nats::Request::new().payload("data".into());
508 /// let response = client.send_request("service", request).await?;
509 /// # Ok(())
510 /// # }
511 /// ```
512 pub async fn send_request<S: ToSubject>(
513 &self,
514 subject: S,
515 request: Request,
516 ) -> Result<Message, RequestError> {
517 let subject = subject.to_subject();
518
519 if let Some(inbox) = request.inbox {
520 let timeout = request.timeout.unwrap_or(self.request_timeout);
521 let mut subscriber = self.subscribe(inbox.clone()).await?;
522 let payload: Bytes = request.payload.unwrap_or_default();
523 match request.headers {
524 Some(headers) => {
525 self.publish_with_reply_and_headers(subject, inbox, headers, payload)
526 .await?
527 }
528 None => self.publish_with_reply(subject, inbox, payload).await?,
529 }
530 let request = match timeout {
531 Some(timeout) => {
532 tokio::time::timeout(timeout, subscriber.next())
533 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
534 .await?
535 }
536 None => subscriber.next().await,
537 };
538 match request {
539 Some(message) => {
540 if message.status == Some(StatusCode::NO_RESPONDERS) {
541 return Err(RequestError::with_source(
542 RequestErrorKind::NoResponders,
543 "no responders",
544 ));
545 }
546 Ok(message)
547 }
548 None => Err(RequestError::with_source(
549 RequestErrorKind::Other,
550 "broken pipe",
551 )),
552 }
553 } else {
554 let (sender, receiver) = oneshot::channel();
555
556 let payload = request.payload.unwrap_or_default();
557 let respond = self.new_inbox().into();
558 let headers = request.headers;
559
560 self.sender
561 .send(Command::Request {
562 subject,
563 payload,
564 respond,
565 headers,
566 sender,
567 })
568 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
569 .await?;
570
571 let timeout = request.timeout.unwrap_or(self.request_timeout);
572 let request = match timeout {
573 Some(timeout) => {
574 tokio::time::timeout(timeout, receiver)
575 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
576 .await?
577 }
578 None => receiver.await,
579 };
580
581 match request {
582 Ok(message) => {
583 if message.status == Some(StatusCode::NO_RESPONDERS) {
584 return Err(RequestError::with_source(
585 RequestErrorKind::NoResponders,
586 "no responders",
587 ));
588 }
589 Ok(message)
590 }
591 Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
592 }
593 }
594 }
595
596 /// Create a new globally unique inbox which can be used for replies.
597 ///
598 /// # Examples
599 ///
600 /// ```no_run
601 /// # #[tokio::main]
602 /// # async fn main() -> Result<(), async_nats::Error> {
603 /// # let mut nc = async_nats::connect("demo.nats.io").await?;
604 /// let reply = nc.new_inbox();
605 /// let rsub = nc.subscribe(reply).await?;
606 /// # Ok(())
607 /// # }
608 /// ```
609 pub fn new_inbox(&self) -> String {
610 format!("{}.{}", self.inbox_prefix, nuid::next())
611 }
612
613 /// Subscribes to a subject to receive [messages][Message].
614 ///
615 /// # Examples
616 ///
617 /// ```no_run
618 /// # #[tokio::main]
619 /// # async fn main() -> Result<(), async_nats::Error> {
620 /// use futures::StreamExt;
621 /// let client = async_nats::connect("demo.nats.io").await?;
622 /// let mut subscription = client.subscribe("events.>").await?;
623 /// while let Some(message) = subscription.next().await {
624 /// println!("received message: {:?}", message);
625 /// }
626 /// # Ok(())
627 /// # }
628 /// ```
629 pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
630 let subject = subject.to_subject();
631 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
632 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
633
634 self.sender
635 .send(Command::Subscribe {
636 sid,
637 subject,
638 queue_group: None,
639 sender,
640 })
641 .await?;
642
643 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
644 }
645
646 /// Subscribes to a subject with a queue group to receive [messages][Message].
647 ///
648 /// # Examples
649 ///
650 /// ```no_run
651 /// # #[tokio::main]
652 /// # async fn main() -> Result<(), async_nats::Error> {
653 /// use futures::StreamExt;
654 /// let client = async_nats::connect("demo.nats.io").await?;
655 /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
656 /// while let Some(message) = subscription.next().await {
657 /// println!("received message: {:?}", message);
658 /// }
659 /// # Ok(())
660 /// # }
661 /// ```
662 pub async fn queue_subscribe<S: ToSubject>(
663 &self,
664 subject: S,
665 queue_group: String,
666 ) -> Result<Subscriber, SubscribeError> {
667 let subject = subject.to_subject();
668
669 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
670 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
671
672 self.sender
673 .send(Command::Subscribe {
674 sid,
675 subject,
676 queue_group: Some(queue_group),
677 sender,
678 })
679 .await?;
680
681 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
682 }
683
684 /// Flushes the internal buffer ensuring that all messages are sent.
685 ///
686 /// # Examples
687 ///
688 /// ```no_run
689 /// # #[tokio::main]
690 /// # async fn main() -> Result<(), async_nats::Error> {
691 /// let client = async_nats::connect("demo.nats.io").await?;
692 /// client.flush().await?;
693 /// # Ok(())
694 /// # }
695 /// ```
696 pub async fn flush(&self) -> Result<(), FlushError> {
697 let (tx, rx) = tokio::sync::oneshot::channel();
698 self.sender
699 .send(Command::Flush { observer: tx })
700 .await
701 .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
702
703 rx.await
704 .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
705 Ok(())
706 }
707
708 /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
709 /// messages, then closes the connection. Once completed, any associated streams associated with the
710 /// client will be closed, and further client commands will fail
711 ///
712 /// # Examples
713 ///
714 /// ```no_run
715 /// # #[tokio::main]
716 /// # async fn main() -> Result<(), async_nats::Error> {
717 /// use futures::StreamExt;
718 /// let client = async_nats::connect("demo.nats.io").await?;
719 /// let mut subscription = client.subscribe("events.>").await?;
720 ///
721 /// client.drain().await?;
722 ///
723 /// # // existing subscriptions are closed and further commands will fail
724 /// assert!(subscription.next().await.is_none());
725 /// client
726 /// .subscribe("events.>")
727 /// .await
728 /// .expect_err("Expected further commands to fail");
729 ///
730 /// # Ok(())
731 /// # }
732 /// ```
733 pub async fn drain(&self) -> Result<(), DrainError> {
734 // Drain all subscriptions
735 self.sender.send(Command::Drain { sid: None }).await?;
736
737 // Remaining process is handled on the handler-side
738 Ok(())
739 }
740
741 /// Returns the current state of the connection.
742 ///
743 /// # Examples
744 /// ```no_run
745 /// # #[tokio::main]
746 /// # async fn main() -> Result<(), async_nats::Error> {
747 /// let client = async_nats::connect("demo.nats.io").await?;
748 /// println!("connection state: {}", client.connection_state());
749 /// # Ok(())
750 /// # }
751 /// ```
752 pub fn connection_state(&self) -> State {
753 self.state.borrow().to_owned()
754 }
755
756 /// Forces the client to reconnect.
757 /// Keep in mind that client will reconnect automatically if the connection is lost and this
758 /// method does not have to be used in normal circumstances.
759 /// However, if you want to force the client to reconnect, for example to re-trigger
760 /// the `auth-callback`, or manually rebalance connections, this method can be useful.
761 /// This method does not wait for connection to be re-established.
762 ///
763 /// # Examples
764 /// ```no_run
765 /// # #[tokio::main]
766 /// # async fn main() -> Result<(), async_nats::Error> {
767 /// let client = async_nats::connect("demo.nats.io").await?;
768 /// client.force_reconnect().await?;
769 /// # Ok(())
770 /// # }
771 /// ```
772 pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
773 self.sender
774 .send(Command::Reconnect)
775 .await
776 .map_err(Into::into)
777 }
778
779 /// Returns struct representing statistics of the whole lifecycle of the client.
780 /// This includes number of bytes sent/received, number of messages sent/received,
781 /// and number of times the connection was established.
782 /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
783 /// across threads.
784 ///
785 /// # Examples
786 /// ```no_run
787 /// # #[tokio::main]
788 /// # async fn main() -> Result<(), async_nats::Error> {
789 /// let client = async_nats::connect("demo.nats.io").await?;
790 /// let statistics = client.statistics();
791 /// println!("client statistics: {:#?}", statistics);
792 /// # Ok(())
793 /// # }
794 /// ```
795 pub fn statistics(&self) -> Arc<Statistics> {
796 self.connection_stats.clone()
797 }
798}
799
800/// Used for building customized requests.
801#[derive(Default)]
802pub struct Request {
803 payload: Option<Bytes>,
804 headers: Option<HeaderMap>,
805 timeout: Option<Option<Duration>>,
806 inbox: Option<String>,
807}
808
809impl Request {
810 pub fn new() -> Request {
811 Default::default()
812 }
813
814 /// Sets the payload of the request. If not used, empty payload will be sent.
815 ///
816 /// # Examples
817 /// ```no_run
818 /// # #[tokio::main]
819 /// # async fn main() -> Result<(), async_nats::Error> {
820 /// let client = async_nats::connect("demo.nats.io").await?;
821 /// let request = async_nats::Request::new().payload("data".into());
822 /// client.send_request("service", request).await?;
823 /// # Ok(())
824 /// # }
825 /// ```
826 pub fn payload(mut self, payload: Bytes) -> Request {
827 self.payload = Some(payload);
828 self
829 }
830
831 /// Sets the headers of the requests.
832 ///
833 /// # Examples
834 /// ```no_run
835 /// # #[tokio::main]
836 /// # async fn main() -> Result<(), async_nats::Error> {
837 /// use std::str::FromStr;
838 /// let client = async_nats::connect("demo.nats.io").await?;
839 /// let mut headers = async_nats::HeaderMap::new();
840 /// headers.insert(
841 /// "X-Example",
842 /// async_nats::HeaderValue::from_str("Value").unwrap(),
843 /// );
844 /// let request = async_nats::Request::new()
845 /// .headers(headers)
846 /// .payload("data".into());
847 /// client.send_request("service", request).await?;
848 /// # Ok(())
849 /// # }
850 /// ```
851 pub fn headers(mut self, headers: HeaderMap) -> Request {
852 self.headers = Some(headers);
853 self
854 }
855
856 /// Sets the custom timeout of the request. Overrides default [Client] timeout.
857 /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
858 /// To use default timeout, simply do not call this function.
859 ///
860 /// # Examples
861 /// ```no_run
862 /// # #[tokio::main]
863 /// # async fn main() -> Result<(), async_nats::Error> {
864 /// let client = async_nats::connect("demo.nats.io").await?;
865 /// let request = async_nats::Request::new()
866 /// .timeout(Some(std::time::Duration::from_secs(15)))
867 /// .payload("data".into());
868 /// client.send_request("service", request).await?;
869 /// # Ok(())
870 /// # }
871 /// ```
872 pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
873 self.timeout = Some(timeout);
874 self
875 }
876
877 /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
878 ///
879 /// # Examples
880 /// ```no_run
881 /// # #[tokio::main]
882 /// # async fn main() -> Result<(), async_nats::Error> {
883 /// use std::str::FromStr;
884 /// let client = async_nats::connect("demo.nats.io").await?;
885 /// let request = async_nats::Request::new()
886 /// .inbox("custom_inbox".into())
887 /// .payload("data".into());
888 /// client.send_request("service", request).await?;
889 /// # Ok(())
890 /// # }
891 /// ```
892 pub fn inbox(mut self, inbox: String) -> Request {
893 self.inbox = Some(inbox);
894 self
895 }
896}
897
898#[derive(Error, Debug)]
899#[error("failed to send reconnect: {0}")]
900pub struct ReconnectError(#[source] crate::Error);
901
902impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
903 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
904 ReconnectError(Box::new(err))
905 }
906}
907
908#[derive(Error, Debug)]
909#[error("failed to send subscribe: {0}")]
910pub struct SubscribeError(#[source] crate::Error);
911
912impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
913 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
914 SubscribeError(Box::new(err))
915 }
916}
917
918#[derive(Error, Debug)]
919#[error("failed to send drain: {0}")]
920pub struct DrainError(#[source] crate::Error);
921
922impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
923 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
924 DrainError(Box::new(err))
925 }
926}
927
928#[derive(Clone, Copy, Debug, PartialEq)]
929pub enum RequestErrorKind {
930 /// There are services listening on requested subject, but they didn't respond
931 /// in time.
932 TimedOut,
933 /// No one is listening on request subject.
934 NoResponders,
935 /// Other errors, client/io related.
936 Other,
937}
938
939impl Display for RequestErrorKind {
940 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
941 match self {
942 Self::TimedOut => write!(f, "request timed out"),
943 Self::NoResponders => write!(f, "no responders"),
944 Self::Other => write!(f, "request failed"),
945 }
946 }
947}
948
949/// Error returned when a core NATS request fails.
950/// To be enumerate over the variants, call [RequestError::kind].
951pub type RequestError = Error<RequestErrorKind>;
952
953impl From<PublishError> for RequestError {
954 fn from(e: PublishError) -> Self {
955 RequestError::with_source(RequestErrorKind::Other, e)
956 }
957}
958
959impl From<SubscribeError> for RequestError {
960 fn from(e: SubscribeError) -> Self {
961 RequestError::with_source(RequestErrorKind::Other, e)
962 }
963}
964
965#[derive(Clone, Copy, Debug, PartialEq)]
966pub enum FlushErrorKind {
967 /// Sending the flush failed client side.
968 SendError,
969 /// Flush failed.
970 /// This can happen mostly in case of connection issues
971 /// that cannot be resolved quickly.
972 FlushError,
973}
974
975impl Display for FlushErrorKind {
976 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
977 match self {
978 Self::SendError => write!(f, "failed to send flush request"),
979 Self::FlushError => write!(f, "flush failed"),
980 }
981 }
982}
983
984pub type FlushError = Error<FlushErrorKind>;
985
986/// Represents statistics for the instance of the client throughout its lifecycle.
987#[derive(Default, Debug)]
988pub struct Statistics {
989 /// Number of bytes received. This does not include the protocol overhead.
990 pub in_bytes: AtomicU64,
991 /// Number of bytes sent. This doe not include the protocol overhead.
992 pub out_bytes: AtomicU64,
993 /// Number of messages received.
994 pub in_messages: AtomicU64,
995 /// Number of messages sent.
996 pub out_messages: AtomicU64,
997 /// Number of times connection was established.
998 /// Initial connect will be counted as well, then all successful reconnects.
999 pub connects: AtomicU64,
1000}