async_nats/client.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::message::OutboundMessage;
20use crate::subject::ToSubject;
21use crate::ServerInfo;
22
23use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
24use crate::error::Error;
25use bytes::Bytes;
26use futures_util::future::TryFutureExt;
27use futures_util::{Sink, SinkExt as _, StreamExt};
28use once_cell::sync::Lazy;
29use portable_atomic::AtomicU64;
30use regex::Regex;
31use std::fmt::Display;
32use std::sync::atomic::{AtomicUsize, Ordering};
33use std::sync::Arc;
34use std::time::Duration;
35use thiserror::Error;
36use tokio::sync::{mpsc, oneshot};
37use tokio_util::sync::PollSender;
38use tracing::trace;
39
40static VERSION_RE: Lazy<Regex> =
41 Lazy::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
42
43/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
44/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
45pub type PublishError = Error<PublishErrorKind>;
46
47impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
48 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
49 PublishError::with_source(PublishErrorKind::Send, err)
50 }
51}
52
53impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
54 fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
55 PublishError::with_source(PublishErrorKind::Send, err)
56 }
57}
58
59#[derive(Copy, Clone, Debug, PartialEq)]
60pub enum PublishErrorKind {
61 MaxPayloadExceeded,
62 BadSubject,
63 Send,
64}
65
66impl Display for PublishErrorKind {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
70 PublishErrorKind::Send => write!(f, "failed to send message"),
71 PublishErrorKind::BadSubject => write!(f, "bad subject"),
72 }
73 }
74}
75
76/// Client is a `Cloneable` handle to NATS connection.
77/// Client should not be created directly. Instead, one of two methods can be used:
78/// [crate::connect] and [crate::ConnectOptions::connect]
79#[derive(Clone, Debug)]
80pub struct Client {
81 info: tokio::sync::watch::Receiver<ServerInfo>,
82 pub(crate) state: tokio::sync::watch::Receiver<State>,
83 pub(crate) sender: mpsc::Sender<Command>,
84 poll_sender: PollSender<Command>,
85 next_subscription_id: Arc<AtomicU64>,
86 subscription_capacity: usize,
87 inbox_prefix: Arc<str>,
88 request_timeout: Option<Duration>,
89 max_payload: Arc<AtomicUsize>,
90 connection_stats: Arc<Statistics>,
91}
92
93pub mod traits {
94 use std::{future::Future, time::Duration};
95
96 use bytes::Bytes;
97
98 use crate::{message, subject::ToSubject, Message};
99
100 use super::{PublishError, Request, RequestError, SubscribeError};
101
102 pub trait Publisher {
103 fn publish_with_reply<S: ToSubject, R: ToSubject>(
104 &self,
105 subject: S,
106 reply: R,
107 payload: Bytes,
108 ) -> impl Future<Output = Result<(), PublishError>>;
109
110 fn publish_message(
111 &self,
112 msg: message::OutboundMessage,
113 ) -> impl Future<Output = Result<(), PublishError>>;
114 }
115 pub trait Subscriber {
116 fn subscribe<S: ToSubject>(
117 &self,
118 subject: S,
119 ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>;
120 }
121 pub trait Requester {
122 fn send_request<S: ToSubject>(
123 &self,
124 subject: S,
125 request: Request,
126 ) -> impl Future<Output = Result<Message, RequestError>>;
127 }
128 pub trait TimeoutProvider {
129 fn timeout(&self) -> Option<Duration>;
130 }
131}
132
133impl traits::Requester for Client {
134 fn send_request<S: ToSubject>(
135 &self,
136 subject: S,
137 request: Request,
138 ) -> impl Future<Output = Result<Message, RequestError>> {
139 self.send_request(subject, request)
140 }
141}
142
143impl traits::TimeoutProvider for Client {
144 fn timeout(&self) -> Option<Duration> {
145 self.timeout()
146 }
147}
148
149impl traits::Publisher for Client {
150 fn publish_with_reply<S: ToSubject, R: ToSubject>(
151 &self,
152 subject: S,
153 reply: R,
154 payload: Bytes,
155 ) -> impl Future<Output = Result<(), PublishError>> {
156 self.publish_with_reply(subject, reply, payload)
157 }
158
159 async fn publish_message(&self, msg: OutboundMessage) -> Result<(), PublishError> {
160 self.sender
161 .send(Command::Publish(msg))
162 .await
163 .map_err(Into::into)
164 }
165}
166
167impl traits::Subscriber for Client {
168 fn subscribe<S: ToSubject>(
169 &self,
170 subject: S,
171 ) -> impl Future<Output = Result<Subscriber, SubscribeError>> {
172 self.subscribe(subject)
173 }
174}
175
176impl Sink<OutboundMessage> for Client {
177 type Error = PublishError;
178
179 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
180 self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
181 }
182
183 fn start_send(mut self: Pin<&mut Self>, msg: OutboundMessage) -> Result<(), Self::Error> {
184 self.poll_sender
185 .start_send_unpin(Command::Publish(msg))
186 .map_err(Into::into)
187 }
188
189 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
190 self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
191 }
192
193 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194 self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
195 }
196}
197
198impl Client {
199 #[allow(clippy::too_many_arguments)]
200 pub(crate) fn new(
201 info: tokio::sync::watch::Receiver<ServerInfo>,
202 state: tokio::sync::watch::Receiver<State>,
203 sender: mpsc::Sender<Command>,
204 capacity: usize,
205 inbox_prefix: String,
206 request_timeout: Option<Duration>,
207 max_payload: Arc<AtomicUsize>,
208 statistics: Arc<Statistics>,
209 ) -> Client {
210 let poll_sender = PollSender::new(sender.clone());
211 Client {
212 info,
213 state,
214 sender,
215 poll_sender,
216 next_subscription_id: Arc::new(AtomicU64::new(1)),
217 subscription_capacity: capacity,
218 inbox_prefix: inbox_prefix.into(),
219 request_timeout,
220 max_payload,
221 connection_stats: statistics,
222 }
223 }
224
225 /// Returns the default timeout for requests set when creating the client.
226 ///
227 /// # Examples
228 /// ```no_run
229 /// # #[tokio::main]
230 /// # async fn main() -> Result<(), async_nats::Error> {
231 /// let client = async_nats::connect("demo.nats.io").await?;
232 /// println!("default request timeout: {:?}", client.timeout());
233 /// # Ok(())
234 /// # }
235 /// ```
236 pub fn timeout(&self) -> Option<Duration> {
237 self.request_timeout
238 }
239
240 /// Returns last received info from the server.
241 ///
242 /// # Examples
243 ///
244 /// ```no_run
245 /// # #[tokio::main]
246 /// # async fn main () -> Result<(), async_nats::Error> {
247 /// let client = async_nats::connect("demo.nats.io").await?;
248 /// println!("info: {:?}", client.server_info());
249 /// # Ok(())
250 /// # }
251 /// ```
252 pub fn server_info(&self) -> ServerInfo {
253 // We ignore notifying the watcher, as that requires mutable client reference.
254 self.info.borrow().to_owned()
255 }
256
257 /// Returns true if the server version is compatible with the version components.
258 ///
259 /// This has to be used with caution, as it is not guaranteed that the server
260 /// that client is connected to is the same version that the one that is
261 /// a JetStream meta/stream/consumer leader, especially across leafnodes.
262 ///
263 /// # Examples
264 ///
265 /// ```no_run
266 /// # #[tokio::main]
267 /// # async fn main() -> Result<(), async_nats::Error> {
268 /// let client = async_nats::connect("demo.nats.io").await?;
269 /// assert!(client.is_server_compatible(2, 8, 4));
270 /// # Ok(())
271 /// # }
272 /// ```
273 pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
274 let info = self.server_info();
275
276 let server_version_captures = match VERSION_RE.captures(&info.version) {
277 Some(captures) => captures,
278 None => return false,
279 };
280
281 let server_major = server_version_captures
282 .get(1)
283 .map(|m| m.as_str().parse::<i64>().unwrap())
284 .unwrap();
285
286 let server_minor = server_version_captures
287 .get(2)
288 .map(|m| m.as_str().parse::<i64>().unwrap())
289 .unwrap();
290
291 let server_patch = server_version_captures
292 .get(3)
293 .map(|m| m.as_str().parse::<i64>().unwrap())
294 .unwrap();
295
296 if server_major < major
297 || (server_major == major && server_minor < minor)
298 || (server_major == major && server_minor == minor && server_patch < patch)
299 {
300 return false;
301 }
302 true
303 }
304
305 /// Publish a [Message] to a given subject.
306 ///
307 /// # Examples
308 /// ```no_run
309 /// # #[tokio::main]
310 /// # async fn main() -> Result<(), async_nats::Error> {
311 /// let client = async_nats::connect("demo.nats.io").await?;
312 /// client.publish("events.data", "payload".into()).await?;
313 /// # Ok(())
314 /// # }
315 /// ```
316 pub async fn publish<S: ToSubject>(
317 &self,
318 subject: S,
319 payload: Bytes,
320 ) -> Result<(), PublishError> {
321 let subject = subject.to_subject();
322 let max_payload = self.max_payload.load(Ordering::Relaxed);
323 if payload.len() > max_payload {
324 return Err(PublishError::with_source(
325 PublishErrorKind::MaxPayloadExceeded,
326 format!(
327 "Payload size limit of {} exceeded by message size of {}",
328 max_payload,
329 payload.len(),
330 ),
331 ));
332 }
333
334 self.sender
335 .send(Command::Publish(OutboundMessage {
336 subject,
337 payload,
338 reply: None,
339 headers: None,
340 }))
341 .await?;
342 Ok(())
343 }
344
345 /// Publish a [Message] with headers to a given subject.
346 ///
347 /// # Examples
348 /// ```
349 /// # #[tokio::main]
350 /// # async fn main() -> Result<(), async_nats::Error> {
351 /// use std::str::FromStr;
352 /// let client = async_nats::connect("demo.nats.io").await?;
353 /// let mut headers = async_nats::HeaderMap::new();
354 /// headers.insert(
355 /// "X-Header",
356 /// async_nats::HeaderValue::from_str("Value").unwrap(),
357 /// );
358 /// client
359 /// .publish_with_headers("events.data", headers, "payload".into())
360 /// .await?;
361 /// # Ok(())
362 /// # }
363 /// ```
364 pub async fn publish_with_headers<S: ToSubject>(
365 &self,
366 subject: S,
367 headers: HeaderMap,
368 payload: Bytes,
369 ) -> Result<(), PublishError> {
370 let subject = subject.to_subject();
371
372 self.sender
373 .send(Command::Publish(OutboundMessage {
374 subject,
375 payload,
376 reply: None,
377 headers: Some(headers),
378 }))
379 .await?;
380 Ok(())
381 }
382
383 /// Publish a [Message] to a given subject, with specified response subject
384 /// to which the subscriber can respond.
385 /// This method does not await for the response.
386 ///
387 /// # Examples
388 ///
389 /// ```no_run
390 /// # #[tokio::main]
391 /// # async fn main() -> Result<(), async_nats::Error> {
392 /// let client = async_nats::connect("demo.nats.io").await?;
393 /// client
394 /// .publish_with_reply("events.data", "reply_subject", "payload".into())
395 /// .await?;
396 /// # Ok(())
397 /// # }
398 /// ```
399 pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
400 &self,
401 subject: S,
402 reply: R,
403 payload: Bytes,
404 ) -> Result<(), PublishError> {
405 let subject = subject.to_subject();
406 let reply = reply.to_subject();
407
408 self.sender
409 .send(Command::Publish(OutboundMessage {
410 subject,
411 payload,
412 reply: Some(reply),
413 headers: None,
414 }))
415 .await?;
416 Ok(())
417 }
418
419 /// Publish a [Message] to a given subject with headers and specified response subject
420 /// to which the subscriber can respond.
421 /// This method does not await for the response.
422 ///
423 /// # Examples
424 ///
425 /// ```no_run
426 /// # #[tokio::main]
427 /// # async fn main() -> Result<(), async_nats::Error> {
428 /// use std::str::FromStr;
429 /// let client = async_nats::connect("demo.nats.io").await?;
430 /// let mut headers = async_nats::HeaderMap::new();
431 /// client
432 /// .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
433 /// .await?;
434 /// # Ok(())
435 /// # }
436 /// ```
437 pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
438 &self,
439 subject: S,
440 reply: R,
441 headers: HeaderMap,
442 payload: Bytes,
443 ) -> Result<(), PublishError> {
444 let subject = subject.to_subject();
445 let reply = reply.to_subject();
446
447 self.sender
448 .send(Command::Publish(OutboundMessage {
449 subject,
450 payload,
451 reply: Some(reply),
452 headers: Some(headers),
453 }))
454 .await?;
455 Ok(())
456 }
457
458 /// Sends the request with headers.
459 ///
460 /// # Examples
461 /// ```no_run
462 /// # #[tokio::main]
463 /// # async fn main() -> Result<(), async_nats::Error> {
464 /// let client = async_nats::connect("demo.nats.io").await?;
465 /// let response = client.request("service", "data".into()).await?;
466 /// # Ok(())
467 /// # }
468 /// ```
469 pub async fn request<S: ToSubject>(
470 &self,
471 subject: S,
472 payload: Bytes,
473 ) -> Result<Message, RequestError> {
474 let subject = subject.to_subject();
475
476 trace!(
477 "request sent to subject: {} ({})",
478 subject.as_ref(),
479 payload.len()
480 );
481 let request = Request::new().payload(payload);
482 self.send_request(subject, request).await
483 }
484
485 /// Sends the request with headers.
486 ///
487 /// # Examples
488 /// ```no_run
489 /// # #[tokio::main]
490 /// # async fn main() -> Result<(), async_nats::Error> {
491 /// let client = async_nats::connect("demo.nats.io").await?;
492 /// let mut headers = async_nats::HeaderMap::new();
493 /// headers.insert("Key", "Value");
494 /// let response = client
495 /// .request_with_headers("service", headers, "data".into())
496 /// .await?;
497 /// # Ok(())
498 /// # }
499 /// ```
500 pub async fn request_with_headers<S: ToSubject>(
501 &self,
502 subject: S,
503 headers: HeaderMap,
504 payload: Bytes,
505 ) -> Result<Message, RequestError> {
506 let subject = subject.to_subject();
507
508 let request = Request::new().headers(headers).payload(payload);
509 self.send_request(subject, request).await
510 }
511
512 /// Sends the request created by the [Request].
513 ///
514 /// # Examples
515 ///
516 /// ```no_run
517 /// # #[tokio::main]
518 /// # async fn main() -> Result<(), async_nats::Error> {
519 /// let client = async_nats::connect("demo.nats.io").await?;
520 /// let request = async_nats::Request::new().payload("data".into());
521 /// let response = client.send_request("service", request).await?;
522 /// # Ok(())
523 /// # }
524 /// ```
525 pub async fn send_request<S: ToSubject>(
526 &self,
527 subject: S,
528 request: Request,
529 ) -> Result<Message, RequestError> {
530 let subject = subject.to_subject();
531
532 if let Some(inbox) = request.inbox {
533 let timeout = request.timeout.unwrap_or(self.request_timeout);
534 let mut subscriber = self.subscribe(inbox.clone()).await?;
535 let payload: Bytes = request.payload.unwrap_or_default();
536 match request.headers {
537 Some(headers) => {
538 self.publish_with_reply_and_headers(subject, inbox, headers, payload)
539 .await?
540 }
541 None => self.publish_with_reply(subject, inbox, payload).await?,
542 }
543 let request = match timeout {
544 Some(timeout) => {
545 tokio::time::timeout(timeout, subscriber.next())
546 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
547 .await?
548 }
549 None => subscriber.next().await,
550 };
551 match request {
552 Some(message) => {
553 if message.status == Some(StatusCode::NO_RESPONDERS) {
554 return Err(RequestError::with_source(
555 RequestErrorKind::NoResponders,
556 "no responders",
557 ));
558 }
559 Ok(message)
560 }
561 None => Err(RequestError::with_source(
562 RequestErrorKind::Other,
563 "broken pipe",
564 )),
565 }
566 } else {
567 let (sender, receiver) = oneshot::channel();
568
569 let payload = request.payload.unwrap_or_default();
570 let respond = self.new_inbox().into();
571 let headers = request.headers;
572
573 self.sender
574 .send(Command::Request {
575 subject,
576 payload,
577 respond,
578 headers,
579 sender,
580 })
581 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
582 .await?;
583
584 let timeout = request.timeout.unwrap_or(self.request_timeout);
585 let request = match timeout {
586 Some(timeout) => {
587 tokio::time::timeout(timeout, receiver)
588 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
589 .await?
590 }
591 None => receiver.await,
592 };
593
594 match request {
595 Ok(message) => {
596 if message.status == Some(StatusCode::NO_RESPONDERS) {
597 return Err(RequestError::with_source(
598 RequestErrorKind::NoResponders,
599 "no responders",
600 ));
601 }
602 Ok(message)
603 }
604 Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
605 }
606 }
607 }
608
609 /// Create a new globally unique inbox which can be used for replies.
610 ///
611 /// # Examples
612 ///
613 /// ```no_run
614 /// # #[tokio::main]
615 /// # async fn main() -> Result<(), async_nats::Error> {
616 /// # let mut nc = async_nats::connect("demo.nats.io").await?;
617 /// let reply = nc.new_inbox();
618 /// let rsub = nc.subscribe(reply).await?;
619 /// # Ok(())
620 /// # }
621 /// ```
622 pub fn new_inbox(&self) -> String {
623 format!("{}.{}", self.inbox_prefix, nuid::next())
624 }
625
626 /// Subscribes to a subject to receive [messages][Message].
627 ///
628 /// # Examples
629 ///
630 /// ```no_run
631 /// # #[tokio::main]
632 /// # async fn main() -> Result<(), async_nats::Error> {
633 /// use futures_util::StreamExt;
634 /// let client = async_nats::connect("demo.nats.io").await?;
635 /// let mut subscription = client.subscribe("events.>").await?;
636 /// while let Some(message) = subscription.next().await {
637 /// println!("received message: {:?}", message);
638 /// }
639 /// # Ok(())
640 /// # }
641 /// ```
642 pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
643 let subject = subject.to_subject();
644 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
645 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
646
647 self.sender
648 .send(Command::Subscribe {
649 sid,
650 subject,
651 queue_group: None,
652 sender,
653 })
654 .await?;
655
656 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
657 }
658
659 /// Subscribes to a subject with a queue group to receive [messages][Message].
660 ///
661 /// # Examples
662 ///
663 /// ```no_run
664 /// # #[tokio::main]
665 /// # async fn main() -> Result<(), async_nats::Error> {
666 /// use futures_util::StreamExt;
667 /// let client = async_nats::connect("demo.nats.io").await?;
668 /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
669 /// while let Some(message) = subscription.next().await {
670 /// println!("received message: {:?}", message);
671 /// }
672 /// # Ok(())
673 /// # }
674 /// ```
675 pub async fn queue_subscribe<S: ToSubject>(
676 &self,
677 subject: S,
678 queue_group: String,
679 ) -> Result<Subscriber, SubscribeError> {
680 let subject = subject.to_subject();
681
682 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
683 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
684
685 self.sender
686 .send(Command::Subscribe {
687 sid,
688 subject,
689 queue_group: Some(queue_group),
690 sender,
691 })
692 .await?;
693
694 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
695 }
696
697 /// Flushes the internal buffer ensuring that all messages are sent.
698 ///
699 /// # Examples
700 ///
701 /// ```no_run
702 /// # #[tokio::main]
703 /// # async fn main() -> Result<(), async_nats::Error> {
704 /// let client = async_nats::connect("demo.nats.io").await?;
705 /// client.flush().await?;
706 /// # Ok(())
707 /// # }
708 /// ```
709 pub async fn flush(&self) -> Result<(), FlushError> {
710 let (tx, rx) = tokio::sync::oneshot::channel();
711 self.sender
712 .send(Command::Flush { observer: tx })
713 .await
714 .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
715
716 rx.await
717 .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
718 Ok(())
719 }
720
721 /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
722 /// messages, then closes the connection. Once completed, any associated streams associated with the
723 /// client will be closed, and further client commands will fail
724 ///
725 /// # Examples
726 ///
727 /// ```no_run
728 /// # #[tokio::main]
729 /// # async fn main() -> Result<(), async_nats::Error> {
730 /// use futures_util::StreamExt;
731 /// let client = async_nats::connect("demo.nats.io").await?;
732 /// let mut subscription = client.subscribe("events.>").await?;
733 ///
734 /// client.drain().await?;
735 ///
736 /// # // existing subscriptions are closed and further commands will fail
737 /// assert!(subscription.next().await.is_none());
738 /// client
739 /// .subscribe("events.>")
740 /// .await
741 /// .expect_err("Expected further commands to fail");
742 ///
743 /// # Ok(())
744 /// # }
745 /// ```
746 pub async fn drain(&self) -> Result<(), DrainError> {
747 // Drain all subscriptions
748 self.sender.send(Command::Drain { sid: None }).await?;
749
750 // Remaining process is handled on the handler-side
751 Ok(())
752 }
753
754 /// Returns the current state of the connection.
755 ///
756 /// # Examples
757 /// ```no_run
758 /// # #[tokio::main]
759 /// # async fn main() -> Result<(), async_nats::Error> {
760 /// let client = async_nats::connect("demo.nats.io").await?;
761 /// println!("connection state: {}", client.connection_state());
762 /// # Ok(())
763 /// # }
764 /// ```
765 pub fn connection_state(&self) -> State {
766 self.state.borrow().to_owned()
767 }
768
769 /// Forces the client to reconnect.
770 /// Keep in mind that client will reconnect automatically if the connection is lost and this
771 /// method does not have to be used in normal circumstances.
772 /// However, if you want to force the client to reconnect, for example to re-trigger
773 /// the `auth-callback`, or manually rebalance connections, this method can be useful.
774 /// This method does not wait for connection to be re-established.
775 ///
776 /// # Examples
777 /// ```no_run
778 /// # #[tokio::main]
779 /// # async fn main() -> Result<(), async_nats::Error> {
780 /// let client = async_nats::connect("demo.nats.io").await?;
781 /// client.force_reconnect().await?;
782 /// # Ok(())
783 /// # }
784 /// ```
785 pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
786 self.sender
787 .send(Command::Reconnect)
788 .await
789 .map_err(Into::into)
790 }
791
792 /// Returns struct representing statistics of the whole lifecycle of the client.
793 /// This includes number of bytes sent/received, number of messages sent/received,
794 /// and number of times the connection was established.
795 /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
796 /// across threads.
797 ///
798 /// # Examples
799 /// ```no_run
800 /// # #[tokio::main]
801 /// # async fn main() -> Result<(), async_nats::Error> {
802 /// let client = async_nats::connect("demo.nats.io").await?;
803 /// let statistics = client.statistics();
804 /// println!("client statistics: {:#?}", statistics);
805 /// # Ok(())
806 /// # }
807 /// ```
808 pub fn statistics(&self) -> Arc<Statistics> {
809 self.connection_stats.clone()
810 }
811}
812
813/// Used for building customized requests.
814#[derive(Default)]
815pub struct Request {
816 pub payload: Option<Bytes>,
817 pub headers: Option<HeaderMap>,
818 pub timeout: Option<Option<Duration>>,
819 pub inbox: Option<String>,
820}
821
822impl Request {
823 pub fn new() -> Request {
824 Default::default()
825 }
826
827 /// Sets the payload of the request. If not used, empty payload will be sent.
828 ///
829 /// # Examples
830 /// ```no_run
831 /// # #[tokio::main]
832 /// # async fn main() -> Result<(), async_nats::Error> {
833 /// let client = async_nats::connect("demo.nats.io").await?;
834 /// let request = async_nats::Request::new().payload("data".into());
835 /// client.send_request("service", request).await?;
836 /// # Ok(())
837 /// # }
838 /// ```
839 pub fn payload(mut self, payload: Bytes) -> Request {
840 self.payload = Some(payload);
841 self
842 }
843
844 /// Sets the headers of the requests.
845 ///
846 /// # Examples
847 /// ```no_run
848 /// # #[tokio::main]
849 /// # async fn main() -> Result<(), async_nats::Error> {
850 /// use std::str::FromStr;
851 /// let client = async_nats::connect("demo.nats.io").await?;
852 /// let mut headers = async_nats::HeaderMap::new();
853 /// headers.insert(
854 /// "X-Example",
855 /// async_nats::HeaderValue::from_str("Value").unwrap(),
856 /// );
857 /// let request = async_nats::Request::new()
858 /// .headers(headers)
859 /// .payload("data".into());
860 /// client.send_request("service", request).await?;
861 /// # Ok(())
862 /// # }
863 /// ```
864 pub fn headers(mut self, headers: HeaderMap) -> Request {
865 self.headers = Some(headers);
866 self
867 }
868
869 /// Sets the custom timeout of the request. Overrides default [Client] timeout.
870 /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
871 /// To use default timeout, simply do not call this function.
872 ///
873 /// # Examples
874 /// ```no_run
875 /// # #[tokio::main]
876 /// # async fn main() -> Result<(), async_nats::Error> {
877 /// let client = async_nats::connect("demo.nats.io").await?;
878 /// let request = async_nats::Request::new()
879 /// .timeout(Some(std::time::Duration::from_secs(15)))
880 /// .payload("data".into());
881 /// client.send_request("service", request).await?;
882 /// # Ok(())
883 /// # }
884 /// ```
885 pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
886 self.timeout = Some(timeout);
887 self
888 }
889
890 /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
891 ///
892 /// # Examples
893 /// ```no_run
894 /// # #[tokio::main]
895 /// # async fn main() -> Result<(), async_nats::Error> {
896 /// use std::str::FromStr;
897 /// let client = async_nats::connect("demo.nats.io").await?;
898 /// let request = async_nats::Request::new()
899 /// .inbox("custom_inbox".into())
900 /// .payload("data".into());
901 /// client.send_request("service", request).await?;
902 /// # Ok(())
903 /// # }
904 /// ```
905 pub fn inbox(mut self, inbox: String) -> Request {
906 self.inbox = Some(inbox);
907 self
908 }
909}
910
911#[derive(Error, Debug)]
912#[error("failed to send reconnect: {0}")]
913pub struct ReconnectError(#[source] crate::Error);
914
915impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
916 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
917 ReconnectError(Box::new(err))
918 }
919}
920
921#[derive(Error, Debug)]
922#[error("failed to send subscribe: {0}")]
923pub struct SubscribeError(#[source] crate::Error);
924
925impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
926 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
927 SubscribeError(Box::new(err))
928 }
929}
930
931#[derive(Error, Debug)]
932#[error("failed to send drain: {0}")]
933pub struct DrainError(#[source] crate::Error);
934
935impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
936 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
937 DrainError(Box::new(err))
938 }
939}
940
941#[derive(Clone, Copy, Debug, PartialEq)]
942pub enum RequestErrorKind {
943 /// There are services listening on requested subject, but they didn't respond
944 /// in time.
945 TimedOut,
946 /// No one is listening on request subject.
947 NoResponders,
948 /// Other errors, client/io related.
949 Other,
950}
951
952impl Display for RequestErrorKind {
953 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
954 match self {
955 Self::TimedOut => write!(f, "request timed out"),
956 Self::NoResponders => write!(f, "no responders"),
957 Self::Other => write!(f, "request failed"),
958 }
959 }
960}
961
962/// Error returned when a core NATS request fails.
963/// To be enumerate over the variants, call [RequestError::kind].
964pub type RequestError = Error<RequestErrorKind>;
965
966impl From<PublishError> for RequestError {
967 fn from(e: PublishError) -> Self {
968 RequestError::with_source(RequestErrorKind::Other, e)
969 }
970}
971
972impl From<SubscribeError> for RequestError {
973 fn from(e: SubscribeError) -> Self {
974 RequestError::with_source(RequestErrorKind::Other, e)
975 }
976}
977
978#[derive(Clone, Copy, Debug, PartialEq)]
979pub enum FlushErrorKind {
980 /// Sending the flush failed client side.
981 SendError,
982 /// Flush failed.
983 /// This can happen mostly in case of connection issues
984 /// that cannot be resolved quickly.
985 FlushError,
986}
987
988impl Display for FlushErrorKind {
989 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
990 match self {
991 Self::SendError => write!(f, "failed to send flush request"),
992 Self::FlushError => write!(f, "flush failed"),
993 }
994 }
995}
996
997pub type FlushError = Error<FlushErrorKind>;
998
999/// Represents statistics for the instance of the client throughout its lifecycle.
1000#[derive(Default, Debug)]
1001pub struct Statistics {
1002 /// Number of bytes received. This does not include the protocol overhead.
1003 pub in_bytes: AtomicU64,
1004 /// Number of bytes sent. This doe not include the protocol overhead.
1005 pub out_bytes: AtomicU64,
1006 /// Number of messages received.
1007 pub in_messages: AtomicU64,
1008 /// Number of messages sent.
1009 pub out_messages: AtomicU64,
1010 /// Number of times connection was established.
1011 /// Initial connect will be counted as well, then all successful reconnects.
1012 pub connects: AtomicU64,
1013}