rivet_async_nats/client.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16use std::future::Future;
17
18use crate::connection::State;
19use crate::message::OutboundMessage;
20use crate::subject::ToSubject;
21use crate::ServerInfo;
22
23use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
24use crate::SubscriberStatistics;
25use crate::error::Error;
26use bytes::Bytes;
27use futures_util::future::TryFutureExt;
28use futures_util::{Sink, SinkExt as _, StreamExt};
29use portable_atomic::AtomicU64;
30use regex::Regex;
31use std::fmt::Display;
32use std::sync::atomic::{AtomicUsize, Ordering};
33use std::sync::Arc;
34use std::sync::LazyLock;
35use std::time::Duration;
36use thiserror::Error;
37use tokio::sync::{mpsc, oneshot};
38use tokio_util::sync::PollSender;
39use tracing::trace;
40
41static VERSION_RE: LazyLock<Regex> =
42 LazyLock::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
43
44/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
45/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
46pub type PublishError = Error<PublishErrorKind>;
47
48impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
49 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
50 PublishError::with_source(PublishErrorKind::Send, err)
51 }
52}
53
54impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
55 fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
56 PublishError::with_source(PublishErrorKind::Send, err)
57 }
58}
59
60#[derive(Copy, Clone, Debug, PartialEq)]
61pub enum PublishErrorKind {
62 MaxPayloadExceeded,
63 BadSubject,
64 Send,
65}
66
67impl Display for PublishErrorKind {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 match self {
70 PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
71 PublishErrorKind::Send => write!(f, "failed to send message"),
72 PublishErrorKind::BadSubject => write!(f, "bad subject"),
73 }
74 }
75}
76
77/// Client is a `Cloneable` handle to NATS connection.
78/// Client should not be created directly. Instead, one of two methods can be used:
79/// [crate::connect] and [crate::ConnectOptions::connect]
80#[derive(Clone, Debug)]
81pub struct Client {
82 info: tokio::sync::watch::Receiver<ServerInfo>,
83 pub(crate) state: tokio::sync::watch::Receiver<State>,
84 pub(crate) sender: mpsc::Sender<Command>,
85 poll_sender: PollSender<Command>,
86 next_subscription_id: Arc<AtomicU64>,
87 subscription_capacity: usize,
88 inbox_prefix: Arc<str>,
89 request_timeout: Option<Duration>,
90 max_payload: Arc<AtomicUsize>,
91 connection_stats: Arc<Statistics>,
92}
93
94pub mod traits {
95 use std::{future::Future, time::Duration};
96
97 use bytes::Bytes;
98
99 use crate::{message, subject::ToSubject, Message};
100
101 use super::{PublishError, Request, RequestError, SubscribeError};
102
103 pub trait Publisher {
104 fn publish_with_reply<S: ToSubject, R: ToSubject>(
105 &self,
106 subject: S,
107 reply: R,
108 payload: Bytes,
109 ) -> impl Future<Output = Result<(), PublishError>>;
110
111 fn publish_message(
112 &self,
113 msg: message::OutboundMessage,
114 ) -> impl Future<Output = Result<(), PublishError>>;
115 }
116 pub trait Subscriber {
117 fn subscribe<S: ToSubject>(
118 &self,
119 subject: S,
120 ) -> impl Future<Output = Result<crate::Subscriber, SubscribeError>>;
121 }
122 pub trait Requester {
123 fn send_request<S: ToSubject>(
124 &self,
125 subject: S,
126 request: Request,
127 ) -> impl Future<Output = Result<Message, RequestError>>;
128 }
129 pub trait TimeoutProvider {
130 fn timeout(&self) -> Option<Duration>;
131 }
132}
133
134impl traits::Requester for Client {
135 fn send_request<S: ToSubject>(
136 &self,
137 subject: S,
138 request: Request,
139 ) -> impl Future<Output = Result<Message, RequestError>> {
140 self.send_request(subject, request)
141 }
142}
143
144impl traits::TimeoutProvider for Client {
145 fn timeout(&self) -> Option<Duration> {
146 self.timeout()
147 }
148}
149
150impl traits::Publisher for Client {
151 fn publish_with_reply<S: ToSubject, R: ToSubject>(
152 &self,
153 subject: S,
154 reply: R,
155 payload: Bytes,
156 ) -> impl Future<Output = Result<(), PublishError>> {
157 self.publish_with_reply(subject, reply, payload)
158 }
159
160 async fn publish_message(&self, msg: OutboundMessage) -> Result<(), PublishError> {
161 self.sender
162 .send(Command::Publish(msg))
163 .await
164 .map_err(Into::into)
165 }
166}
167
168impl traits::Subscriber for Client {
169 fn subscribe<S: ToSubject>(
170 &self,
171 subject: S,
172 ) -> impl Future<Output = Result<Subscriber, SubscribeError>> {
173 self.subscribe(subject)
174 }
175}
176
177impl Sink<OutboundMessage> for Client {
178 type Error = PublishError;
179
180 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181 self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
182 }
183
184 fn start_send(mut self: Pin<&mut Self>, msg: OutboundMessage) -> Result<(), Self::Error> {
185 self.poll_sender
186 .start_send_unpin(Command::Publish(msg))
187 .map_err(Into::into)
188 }
189
190 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
191 self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
192 }
193
194 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195 self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
196 }
197}
198
199impl Client {
200 #[allow(clippy::too_many_arguments)]
201 pub(crate) fn new(
202 info: tokio::sync::watch::Receiver<ServerInfo>,
203 state: tokio::sync::watch::Receiver<State>,
204 sender: mpsc::Sender<Command>,
205 capacity: usize,
206 inbox_prefix: String,
207 request_timeout: Option<Duration>,
208 max_payload: Arc<AtomicUsize>,
209 statistics: Arc<Statistics>,
210 ) -> Client {
211 let poll_sender = PollSender::new(sender.clone());
212 Client {
213 info,
214 state,
215 sender,
216 poll_sender,
217 next_subscription_id: Arc::new(AtomicU64::new(1)),
218 subscription_capacity: capacity,
219 inbox_prefix: inbox_prefix.into(),
220 request_timeout,
221 max_payload,
222 connection_stats: statistics,
223 }
224 }
225
226 /// Returns the default timeout for requests set when creating the client.
227 ///
228 /// # Examples
229 /// ```no_run
230 /// # #[tokio::main]
231 /// # async fn main() -> Result<(), async_nats::Error> {
232 /// let client = async_nats::connect("demo.nats.io").await?;
233 /// println!("default request timeout: {:?}", client.timeout());
234 /// # Ok(())
235 /// # }
236 /// ```
237 pub fn timeout(&self) -> Option<Duration> {
238 self.request_timeout
239 }
240
241 /// Returns last received info from the server.
242 ///
243 /// # Examples
244 ///
245 /// ```no_run
246 /// # #[tokio::main]
247 /// # async fn main () -> Result<(), async_nats::Error> {
248 /// let client = async_nats::connect("demo.nats.io").await?;
249 /// println!("info: {:?}", client.server_info());
250 /// # Ok(())
251 /// # }
252 /// ```
253 pub fn server_info(&self) -> ServerInfo {
254 // We ignore notifying the watcher, as that requires mutable client reference.
255 self.info.borrow().to_owned()
256 }
257
258 /// Returns true if the server version is compatible with the version components.
259 ///
260 /// This has to be used with caution, as it is not guaranteed that the server
261 /// that client is connected to is the same version that the one that is
262 /// a JetStream meta/stream/consumer leader, especially across leafnodes.
263 ///
264 /// # Examples
265 ///
266 /// ```no_run
267 /// # #[tokio::main]
268 /// # async fn main() -> Result<(), async_nats::Error> {
269 /// let client = async_nats::connect("demo.nats.io").await?;
270 /// assert!(client.is_server_compatible(2, 8, 4));
271 /// # Ok(())
272 /// # }
273 /// ```
274 pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
275 let info = self.server_info();
276
277 let server_version_captures = match VERSION_RE.captures(&info.version) {
278 Some(captures) => captures,
279 None => return false,
280 };
281
282 let server_major = server_version_captures
283 .get(1)
284 .map(|m| m.as_str().parse::<i64>().unwrap())
285 .unwrap();
286
287 let server_minor = server_version_captures
288 .get(2)
289 .map(|m| m.as_str().parse::<i64>().unwrap())
290 .unwrap();
291
292 let server_patch = server_version_captures
293 .get(3)
294 .map(|m| m.as_str().parse::<i64>().unwrap())
295 .unwrap();
296
297 if server_major < major
298 || (server_major == major && server_minor < minor)
299 || (server_major == major && server_minor == minor && server_patch < patch)
300 {
301 return false;
302 }
303 true
304 }
305
306 /// Publish a [Message] to a given subject.
307 ///
308 /// # Examples
309 /// ```no_run
310 /// # #[tokio::main]
311 /// # async fn main() -> Result<(), async_nats::Error> {
312 /// let client = async_nats::connect("demo.nats.io").await?;
313 /// client.publish("events.data", "payload".into()).await?;
314 /// # Ok(())
315 /// # }
316 /// ```
317 pub async fn publish<S: ToSubject>(
318 &self,
319 subject: S,
320 payload: Bytes,
321 ) -> Result<(), PublishError> {
322 let subject = subject.to_subject();
323 let max_payload = self.max_payload.load(Ordering::Relaxed);
324 if payload.len() > max_payload {
325 return Err(PublishError::with_source(
326 PublishErrorKind::MaxPayloadExceeded,
327 format!(
328 "Payload size limit of {} exceeded by message size of {}",
329 max_payload,
330 payload.len(),
331 ),
332 ));
333 }
334
335 self.sender
336 .send(Command::Publish(OutboundMessage {
337 subject,
338 payload,
339 reply: None,
340 headers: None,
341 }))
342 .await?;
343 Ok(())
344 }
345
346 /// Publish a [Message] with headers to a given subject.
347 ///
348 /// # Examples
349 /// ```
350 /// # #[tokio::main]
351 /// # async fn main() -> Result<(), async_nats::Error> {
352 /// use std::str::FromStr;
353 /// let client = async_nats::connect("demo.nats.io").await?;
354 /// let mut headers = async_nats::HeaderMap::new();
355 /// headers.insert(
356 /// "X-Header",
357 /// async_nats::HeaderValue::from_str("Value").unwrap(),
358 /// );
359 /// client
360 /// .publish_with_headers("events.data", headers, "payload".into())
361 /// .await?;
362 /// # Ok(())
363 /// # }
364 /// ```
365 pub async fn publish_with_headers<S: ToSubject>(
366 &self,
367 subject: S,
368 headers: HeaderMap,
369 payload: Bytes,
370 ) -> Result<(), PublishError> {
371 let subject = subject.to_subject();
372
373 self.sender
374 .send(Command::Publish(OutboundMessage {
375 subject,
376 payload,
377 reply: None,
378 headers: Some(headers),
379 }))
380 .await?;
381 Ok(())
382 }
383
384 /// Publish a [Message] to a given subject, with specified response subject
385 /// to which the subscriber can respond.
386 /// This method does not await for the response.
387 ///
388 /// # Examples
389 ///
390 /// ```no_run
391 /// # #[tokio::main]
392 /// # async fn main() -> Result<(), async_nats::Error> {
393 /// let client = async_nats::connect("demo.nats.io").await?;
394 /// client
395 /// .publish_with_reply("events.data", "reply_subject", "payload".into())
396 /// .await?;
397 /// # Ok(())
398 /// # }
399 /// ```
400 pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
401 &self,
402 subject: S,
403 reply: R,
404 payload: Bytes,
405 ) -> Result<(), PublishError> {
406 let subject = subject.to_subject();
407 let reply = reply.to_subject();
408
409 self.sender
410 .send(Command::Publish(OutboundMessage {
411 subject,
412 payload,
413 reply: Some(reply),
414 headers: None,
415 }))
416 .await?;
417 Ok(())
418 }
419
420 /// Publish a [Message] to a given subject with headers and specified response subject
421 /// to which the subscriber can respond.
422 /// This method does not await for the response.
423 ///
424 /// # Examples
425 ///
426 /// ```no_run
427 /// # #[tokio::main]
428 /// # async fn main() -> Result<(), async_nats::Error> {
429 /// use std::str::FromStr;
430 /// let client = async_nats::connect("demo.nats.io").await?;
431 /// let mut headers = async_nats::HeaderMap::new();
432 /// client
433 /// .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
434 /// .await?;
435 /// # Ok(())
436 /// # }
437 /// ```
438 pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
439 &self,
440 subject: S,
441 reply: R,
442 headers: HeaderMap,
443 payload: Bytes,
444 ) -> Result<(), PublishError> {
445 let subject = subject.to_subject();
446 let reply = reply.to_subject();
447
448 self.sender
449 .send(Command::Publish(OutboundMessage {
450 subject,
451 payload,
452 reply: Some(reply),
453 headers: Some(headers),
454 }))
455 .await?;
456 Ok(())
457 }
458
459 /// Sends the request with headers.
460 ///
461 /// # Examples
462 /// ```no_run
463 /// # #[tokio::main]
464 /// # async fn main() -> Result<(), async_nats::Error> {
465 /// let client = async_nats::connect("demo.nats.io").await?;
466 /// let response = client.request("service", "data".into()).await?;
467 /// # Ok(())
468 /// # }
469 /// ```
470 pub async fn request<S: ToSubject>(
471 &self,
472 subject: S,
473 payload: Bytes,
474 ) -> Result<Message, RequestError> {
475 let subject = subject.to_subject();
476
477 trace!(
478 "request sent to subject: {} ({})",
479 subject.as_ref(),
480 payload.len()
481 );
482 let request = Request::new().payload(payload);
483 self.send_request(subject, request).await
484 }
485
486 /// Sends the request with headers.
487 ///
488 /// # Examples
489 /// ```no_run
490 /// # #[tokio::main]
491 /// # async fn main() -> Result<(), async_nats::Error> {
492 /// let client = async_nats::connect("demo.nats.io").await?;
493 /// let mut headers = async_nats::HeaderMap::new();
494 /// headers.insert("Key", "Value");
495 /// let response = client
496 /// .request_with_headers("service", headers, "data".into())
497 /// .await?;
498 /// # Ok(())
499 /// # }
500 /// ```
501 pub async fn request_with_headers<S: ToSubject>(
502 &self,
503 subject: S,
504 headers: HeaderMap,
505 payload: Bytes,
506 ) -> Result<Message, RequestError> {
507 let subject = subject.to_subject();
508
509 let request = Request::new().headers(headers).payload(payload);
510 self.send_request(subject, request).await
511 }
512
513 /// Sends the request created by the [Request].
514 ///
515 /// # Examples
516 ///
517 /// ```no_run
518 /// # #[tokio::main]
519 /// # async fn main() -> Result<(), async_nats::Error> {
520 /// let client = async_nats::connect("demo.nats.io").await?;
521 /// let request = async_nats::Request::new().payload("data".into());
522 /// let response = client.send_request("service", request).await?;
523 /// # Ok(())
524 /// # }
525 /// ```
526 pub async fn send_request<S: ToSubject>(
527 &self,
528 subject: S,
529 request: Request,
530 ) -> Result<Message, RequestError> {
531 let subject = subject.to_subject();
532
533 if let Some(inbox) = request.inbox {
534 let timeout = request.timeout.unwrap_or(self.request_timeout);
535 let mut subscriber = self.subscribe(inbox.clone()).await?;
536 let payload: Bytes = request.payload.unwrap_or_default();
537 match request.headers {
538 Some(headers) => {
539 self.publish_with_reply_and_headers(subject, inbox, headers, payload)
540 .await?
541 }
542 None => self.publish_with_reply(subject, inbox, payload).await?,
543 }
544 let request = match timeout {
545 Some(timeout) => {
546 tokio::time::timeout(timeout, subscriber.next())
547 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
548 .await?
549 }
550 None => subscriber.next().await,
551 };
552 match request {
553 Some(message) => {
554 if message.status == Some(StatusCode::NO_RESPONDERS) {
555 return Err(RequestError::with_source(
556 RequestErrorKind::NoResponders,
557 "no responders",
558 ));
559 }
560 Ok(message)
561 }
562 None => Err(RequestError::with_source(
563 RequestErrorKind::Other,
564 "broken pipe",
565 )),
566 }
567 } else {
568 let (sender, receiver) = oneshot::channel();
569
570 let payload = request.payload.unwrap_or_default();
571 let respond = self.new_inbox().into();
572 let headers = request.headers;
573
574 self.sender
575 .send(Command::Request {
576 subject,
577 payload,
578 respond,
579 headers,
580 sender,
581 })
582 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
583 .await?;
584
585 let timeout = request.timeout.unwrap_or(self.request_timeout);
586 let request = match timeout {
587 Some(timeout) => {
588 tokio::time::timeout(timeout, receiver)
589 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
590 .await?
591 }
592 None => receiver.await,
593 };
594
595 match request {
596 Ok(message) => {
597 if message.status == Some(StatusCode::NO_RESPONDERS) {
598 return Err(RequestError::with_source(
599 RequestErrorKind::NoResponders,
600 "no responders",
601 ));
602 }
603 Ok(message)
604 }
605 Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
606 }
607 }
608 }
609
610 /// Create a new globally unique inbox which can be used for replies.
611 ///
612 /// # Examples
613 ///
614 /// ```no_run
615 /// # #[tokio::main]
616 /// # async fn main() -> Result<(), async_nats::Error> {
617 /// # let mut nc = async_nats::connect("demo.nats.io").await?;
618 /// let reply = nc.new_inbox();
619 /// let rsub = nc.subscribe(reply).await?;
620 /// # Ok(())
621 /// # }
622 /// ```
623 pub fn new_inbox(&self) -> String {
624 format!("{}.{}", self.inbox_prefix, crate::id_generator::next())
625 }
626
627 /// Subscribes to a subject to receive [messages][Message].
628 ///
629 /// # Examples
630 ///
631 /// ```no_run
632 /// # #[tokio::main]
633 /// # async fn main() -> Result<(), async_nats::Error> {
634 /// use futures_util::StreamExt;
635 /// let client = async_nats::connect("demo.nats.io").await?;
636 /// let mut subscription = client.subscribe("events.>").await?;
637 /// while let Some(message) = subscription.next().await {
638 /// println!("received message: {:?}", message);
639 /// }
640 /// # Ok(())
641 /// # }
642 /// ```
643 pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
644 let subject = subject.to_subject();
645 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
646 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
647 let subscription_statistics = Arc::new(SubscriberStatistics::default());
648
649 self.sender
650 .send(Command::Subscribe {
651 sid,
652 subject,
653 queue_group: None,
654 sender,
655 statistics: subscription_statistics.clone(),
656 })
657 .await?;
658
659 Ok(Subscriber::new(
660 sid,
661 self.sender.clone(),
662 receiver,
663 subscription_statistics,
664 self.connection_stats.clone(),
665 ))
666 }
667
668 /// Subscribes to a subject with a queue group to receive [messages][Message].
669 ///
670 /// # Examples
671 ///
672 /// ```no_run
673 /// # #[tokio::main]
674 /// # async fn main() -> Result<(), async_nats::Error> {
675 /// use futures_util::StreamExt;
676 /// let client = async_nats::connect("demo.nats.io").await?;
677 /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
678 /// while let Some(message) = subscription.next().await {
679 /// println!("received message: {:?}", message);
680 /// }
681 /// # Ok(())
682 /// # }
683 /// ```
684 pub async fn queue_subscribe<S: ToSubject>(
685 &self,
686 subject: S,
687 queue_group: String,
688 ) -> Result<Subscriber, SubscribeError> {
689 let subject = subject.to_subject();
690
691 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
692 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
693 let subscription_statistics = Arc::new(SubscriberStatistics::default());
694
695 self.sender
696 .send(Command::Subscribe {
697 sid,
698 subject,
699 queue_group: Some(queue_group),
700 sender,
701 statistics: subscription_statistics.clone(),
702 })
703 .await?;
704
705 Ok(Subscriber::new(
706 sid,
707 self.sender.clone(),
708 receiver,
709 subscription_statistics,
710 self.connection_stats.clone(),
711 ))
712 }
713
714 /// Flushes the internal buffer ensuring that all messages are sent.
715 ///
716 /// # Examples
717 ///
718 /// ```no_run
719 /// # #[tokio::main]
720 /// # async fn main() -> Result<(), async_nats::Error> {
721 /// let client = async_nats::connect("demo.nats.io").await?;
722 /// client.flush().await?;
723 /// # Ok(())
724 /// # }
725 /// ```
726 pub async fn flush(&self) -> Result<(), FlushError> {
727 let (tx, rx) = tokio::sync::oneshot::channel();
728 self.sender
729 .send(Command::Flush { observer: tx })
730 .await
731 .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
732
733 rx.await
734 .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
735 Ok(())
736 }
737
738 /// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
739 /// messages, then closes the connection. Once completed, any streams associated with the
740 /// connection and its [Clients](crate::Client) will be closed, and further [crate::Client] commands will fail.
741 ///
742 /// # Examples
743 ///
744 /// ```no_run
745 /// # #[tokio::main]
746 /// # async fn main() -> Result<(), async_nats::Error> {
747 /// use futures_util::StreamExt;
748 /// let client = async_nats::connect("demo.nats.io").await?;
749 /// let mut subscription = client.subscribe("events.>").await?;
750 ///
751 /// client.drain().await?;
752 ///
753 /// # // existing subscriptions are closed and further commands will fail
754 /// assert!(subscription.next().await.is_none());
755 /// client
756 /// .subscribe("events.>")
757 /// .await
758 /// .expect_err("Expected further commands to fail");
759 ///
760 /// # Ok(())
761 /// # }
762 /// ```
763 pub async fn drain(&self) -> Result<(), DrainError> {
764 // Drain all subscriptions
765 self.sender.send(Command::Drain { sid: None }).await?;
766
767 // Remaining process is handled on the handler-side
768 Ok(())
769 }
770
771 /// Returns the current state of the connection.
772 ///
773 /// # Examples
774 /// ```no_run
775 /// # #[tokio::main]
776 /// # async fn main() -> Result<(), async_nats::Error> {
777 /// let client = async_nats::connect("demo.nats.io").await?;
778 /// println!("connection state: {}", client.connection_state());
779 /// # Ok(())
780 /// # }
781 /// ```
782 pub fn connection_state(&self) -> State {
783 self.state.borrow().to_owned()
784 }
785
786 /// Forces the client to reconnect.
787 /// Keep in mind that client will reconnect automatically if the connection is lost and this
788 /// method does not have to be used in normal circumstances.
789 /// However, if you want to force the client to reconnect, for example to re-trigger
790 /// the `auth-callback`, or manually rebalance connections, this method can be useful.
791 /// This method does not wait for connection to be re-established.
792 ///
793 /// # Examples
794 /// ```no_run
795 /// # #[tokio::main]
796 /// # async fn main() -> Result<(), async_nats::Error> {
797 /// let client = async_nats::connect("demo.nats.io").await?;
798 /// client.force_reconnect().await?;
799 /// # Ok(())
800 /// # }
801 /// ```
802 pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
803 self.sender
804 .send(Command::Reconnect)
805 .await
806 .map_err(Into::into)
807 }
808
809 /// Returns struct representing statistics of the whole lifecycle of the client.
810 /// This includes number of bytes sent/received, number of messages sent/received,
811 /// and number of times the connection was established.
812 /// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
813 /// across threads.
814 ///
815 /// # Examples
816 /// ```no_run
817 /// # #[tokio::main]
818 /// # async fn main() -> Result<(), async_nats::Error> {
819 /// let client = async_nats::connect("demo.nats.io").await?;
820 /// let statistics = client.statistics();
821 /// println!("client statistics: {:#?}", statistics);
822 /// # Ok(())
823 /// # }
824 /// ```
825 pub fn statistics(&self) -> Arc<Statistics> {
826 self.connection_stats.clone()
827 }
828}
829
830/// Used for building customized requests.
831#[derive(Default)]
832pub struct Request {
833 pub payload: Option<Bytes>,
834 pub headers: Option<HeaderMap>,
835 pub timeout: Option<Option<Duration>>,
836 pub inbox: Option<String>,
837}
838
839impl Request {
840 pub fn new() -> Request {
841 Default::default()
842 }
843
844 /// Sets the payload of the request. If not used, empty payload will be sent.
845 ///
846 /// # Examples
847 /// ```no_run
848 /// # #[tokio::main]
849 /// # async fn main() -> Result<(), async_nats::Error> {
850 /// let client = async_nats::connect("demo.nats.io").await?;
851 /// let request = async_nats::Request::new().payload("data".into());
852 /// client.send_request("service", request).await?;
853 /// # Ok(())
854 /// # }
855 /// ```
856 pub fn payload(mut self, payload: Bytes) -> Request {
857 self.payload = Some(payload);
858 self
859 }
860
861 /// Sets the headers of the requests.
862 ///
863 /// # Examples
864 /// ```no_run
865 /// # #[tokio::main]
866 /// # async fn main() -> Result<(), async_nats::Error> {
867 /// use std::str::FromStr;
868 /// let client = async_nats::connect("demo.nats.io").await?;
869 /// let mut headers = async_nats::HeaderMap::new();
870 /// headers.insert(
871 /// "X-Example",
872 /// async_nats::HeaderValue::from_str("Value").unwrap(),
873 /// );
874 /// let request = async_nats::Request::new()
875 /// .headers(headers)
876 /// .payload("data".into());
877 /// client.send_request("service", request).await?;
878 /// # Ok(())
879 /// # }
880 /// ```
881 pub fn headers(mut self, headers: HeaderMap) -> Request {
882 self.headers = Some(headers);
883 self
884 }
885
886 /// Sets the custom timeout of the request. Overrides default [Client] timeout.
887 /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
888 /// To use default timeout, simply do not call this function.
889 ///
890 /// # Examples
891 /// ```no_run
892 /// # #[tokio::main]
893 /// # async fn main() -> Result<(), async_nats::Error> {
894 /// let client = async_nats::connect("demo.nats.io").await?;
895 /// let request = async_nats::Request::new()
896 /// .timeout(Some(std::time::Duration::from_secs(15)))
897 /// .payload("data".into());
898 /// client.send_request("service", request).await?;
899 /// # Ok(())
900 /// # }
901 /// ```
902 pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
903 self.timeout = Some(timeout);
904 self
905 }
906
907 /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
908 ///
909 /// # Examples
910 /// ```no_run
911 /// # #[tokio::main]
912 /// # async fn main() -> Result<(), async_nats::Error> {
913 /// use std::str::FromStr;
914 /// let client = async_nats::connect("demo.nats.io").await?;
915 /// let request = async_nats::Request::new()
916 /// .inbox("custom_inbox".into())
917 /// .payload("data".into());
918 /// client.send_request("service", request).await?;
919 /// # Ok(())
920 /// # }
921 /// ```
922 pub fn inbox(mut self, inbox: String) -> Request {
923 self.inbox = Some(inbox);
924 self
925 }
926}
927
928#[derive(Error, Debug)]
929#[error("failed to send reconnect: {0}")]
930pub struct ReconnectError(#[source] crate::Error);
931
932impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
933 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
934 ReconnectError(Box::new(err))
935 }
936}
937
938#[derive(Error, Debug)]
939#[error("failed to send subscribe: {0}")]
940pub struct SubscribeError(#[source] crate::Error);
941
942impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
943 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
944 SubscribeError(Box::new(err))
945 }
946}
947
948#[derive(Error, Debug)]
949#[error("failed to send drain: {0}")]
950pub struct DrainError(#[source] crate::Error);
951
952impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
953 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
954 DrainError(Box::new(err))
955 }
956}
957
958#[derive(Clone, Copy, Debug, PartialEq)]
959pub enum RequestErrorKind {
960 /// There are services listening on requested subject, but they didn't respond
961 /// in time.
962 TimedOut,
963 /// No one is listening on request subject.
964 NoResponders,
965 /// Other errors, client/io related.
966 Other,
967}
968
969impl Display for RequestErrorKind {
970 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
971 match self {
972 Self::TimedOut => write!(f, "request timed out"),
973 Self::NoResponders => write!(f, "no responders"),
974 Self::Other => write!(f, "request failed"),
975 }
976 }
977}
978
979/// Error returned when a core NATS request fails.
980/// To be enumerate over the variants, call [RequestError::kind].
981pub type RequestError = Error<RequestErrorKind>;
982
983impl From<PublishError> for RequestError {
984 fn from(e: PublishError) -> Self {
985 RequestError::with_source(RequestErrorKind::Other, e)
986 }
987}
988
989impl From<SubscribeError> for RequestError {
990 fn from(e: SubscribeError) -> Self {
991 RequestError::with_source(RequestErrorKind::Other, e)
992 }
993}
994
995#[derive(Clone, Copy, Debug, PartialEq)]
996pub enum FlushErrorKind {
997 /// Sending the flush failed client side.
998 SendError,
999 /// Flush failed.
1000 /// This can happen mostly in case of connection issues
1001 /// that cannot be resolved quickly.
1002 FlushError,
1003}
1004
1005impl Display for FlushErrorKind {
1006 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1007 match self {
1008 Self::SendError => write!(f, "failed to send flush request"),
1009 Self::FlushError => write!(f, "flush failed"),
1010 }
1011 }
1012}
1013
1014pub type FlushError = Error<FlushErrorKind>;
1015
1016/// Represents statistics for the instance of the client throughout its lifecycle.
1017#[derive(Default, Debug)]
1018pub struct Statistics {
1019 /// Number of bytes received. This does not include the protocol overhead.
1020 pub in_bytes: AtomicU64,
1021 /// Number of bytes sent. This doe not include the protocol overhead.
1022 pub out_bytes: AtomicU64,
1023 /// Number of messages received.
1024 pub in_messages: AtomicU64,
1025 /// Number of messages sent.
1026 pub out_messages: AtomicU64,
1027 /// Number of times connection was established.
1028 /// Initial connect will be counted as well, then all successful reconnects.
1029 pub connects: AtomicU64,
1030 /// Number of messages currently buffered in subscription channels.
1031 pub subscription_pending_messages: AtomicU64,
1032 /// Number of bytes currently buffered in subscription channels.
1033 pub subscription_pending_bytes: AtomicU64,
1034 /// Number of messages dropped because subscription channels were full.
1035 pub subscription_dropped_messages: AtomicU64,
1036 /// Number of bytes dropped because subscription channels were full.
1037 pub subscription_dropped_bytes: AtomicU64,
1038 /// Number of currently active subscription handles.
1039 pub active_subscriptions: AtomicU64,
1040 /// Total configured capacity of currently active subscription channels.
1041 pub active_subscription_capacity: AtomicU64,
1042}