async_nats_wrpc/client.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16
17use crate::connection::State;
18use crate::subject::{Subject, ToSubject};
19use crate::ServerInfo;
20
21use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
22use crate::error::Error;
23use bytes::Bytes;
24use futures::future::TryFutureExt;
25use futures::{Sink, SinkExt as _, StreamExt};
26use once_cell::sync::Lazy;
27use portable_atomic::AtomicU64;
28use regex::Regex;
29use std::fmt::Display;
30use std::sync::atomic::{AtomicUsize, Ordering};
31use std::sync::Arc;
32use std::time::Duration;
33use thiserror::Error;
34use tokio::sync::{mpsc, oneshot};
35use tokio_util::sync::PollSender;
36use tracing::trace;
37
38static VERSION_RE: Lazy<Regex> =
39 Lazy::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
40
41/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
42/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
43/// This error is also used as [`Publisher::Error`]
44pub type PublishError = Error<PublishErrorKind>;
45
46impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
47 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
48 PublishError::with_source(PublishErrorKind::Send, err)
49 }
50}
51
52impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
53 fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
54 PublishError::with_source(PublishErrorKind::Send, err)
55 }
56}
57
58#[derive(Copy, Clone, Debug, PartialEq)]
59pub enum PublishErrorKind {
60 MaxPayloadExceeded,
61 Send,
62}
63
64impl Display for PublishErrorKind {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 match self {
67 PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
68 PublishErrorKind::Send => write!(f, "failed to send message"),
69 }
70 }
71}
72
73/// [`Sink<Bytes>`] created by [`Client::publish_sink`], [`Client::publish_with_reply_sink`],
74/// [`Client::publish_with_headers_sink`] or [`Client::publish_with_reply_and_headers_sink`]
75#[derive(Clone, Debug)]
76pub struct Publisher {
77 subject: Subject,
78 respond: Option<Subject>,
79 headers: Option<HeaderMap>,
80 sender: PollSender<Command>,
81}
82
83impl Sink<Bytes> for Publisher {
84 type Error = PublishError;
85
86 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87 self.sender.poll_ready_unpin(cx).map_err(Into::into)
88 }
89
90 fn start_send(mut self: Pin<&mut Self>, payload: Bytes) -> Result<(), Self::Error> {
91 let headers = self.headers.clone();
92 let respond = self.respond.clone();
93 let subject = self.subject.clone();
94 self.sender
95 .start_send_unpin(Command::Publish {
96 subject,
97 payload,
98 respond,
99 headers,
100 })
101 .map_err(Into::into)
102 }
103
104 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
105 self.sender.poll_flush_unpin(cx).map_err(Into::into)
106 }
107
108 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
109 self.sender.poll_close_unpin(cx).map_err(Into::into)
110 }
111}
112
113/// Client is a `Cloneable` handle to NATS connection.
114/// Client should not be created directly. Instead, one of two methods can be used:
115/// [crate::connect] and [crate::ConnectOptions::connect]
116#[derive(Clone, Debug)]
117pub struct Client {
118 info: tokio::sync::watch::Receiver<ServerInfo>,
119 pub(crate) state: tokio::sync::watch::Receiver<State>,
120 pub(crate) sender: mpsc::Sender<Command>,
121 next_subscription_id: Arc<AtomicU64>,
122 subscription_capacity: usize,
123 inbox_prefix: Arc<str>,
124 request_timeout: Option<Duration>,
125 max_payload: Arc<AtomicUsize>,
126}
127
128impl Client {
129 pub(crate) fn new(
130 info: tokio::sync::watch::Receiver<ServerInfo>,
131 state: tokio::sync::watch::Receiver<State>,
132 sender: mpsc::Sender<Command>,
133 capacity: usize,
134 inbox_prefix: String,
135 request_timeout: Option<Duration>,
136 max_payload: Arc<AtomicUsize>,
137 ) -> Client {
138 Client {
139 info,
140 state,
141 sender,
142 next_subscription_id: Arc::new(AtomicU64::new(1)),
143 subscription_capacity: capacity,
144 inbox_prefix: inbox_prefix.into(),
145 request_timeout,
146 max_payload,
147 }
148 }
149
150 /// Returns last received info from the server.
151 ///
152 /// # Examples
153 ///
154 /// ```no_run
155 /// # #[tokio::main]
156 /// # async fn main () -> Result<(), async_nats::Error> {
157 /// let client = async_nats::connect("demo.nats.io").await?;
158 /// println!("info: {:?}", client.server_info());
159 /// # Ok(())
160 /// # }
161 /// ```
162 pub fn server_info(&self) -> ServerInfo {
163 // We ignore notifying the watcher, as that requires mutable client reference.
164 self.info.borrow().to_owned()
165 }
166
167 /// Returns true if the server version is compatible with the version components.
168 ///
169 /// # Examples
170 ///
171 /// ```no_run
172 /// # #[tokio::main]
173 /// # async fn main() -> Result<(), async_nats::Error> {
174 /// let client = async_nats::connect("demo.nats.io").await?;
175 /// assert!(client.is_server_compatible(2, 8, 4));
176 /// # Ok(())
177 /// # }
178 /// ```
179 pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
180 let info = self.server_info();
181
182 let server_version_captures = VERSION_RE.captures(&info.version).unwrap();
183
184 let server_major = server_version_captures
185 .get(1)
186 .map(|m| m.as_str().parse::<i64>().unwrap())
187 .unwrap();
188
189 let server_minor = server_version_captures
190 .get(2)
191 .map(|m| m.as_str().parse::<i64>().unwrap())
192 .unwrap();
193
194 let server_patch = server_version_captures
195 .get(3)
196 .map(|m| m.as_str().parse::<i64>().unwrap())
197 .unwrap();
198
199 if server_major < major
200 || (server_major == major && server_minor < minor)
201 || (server_major == major && server_minor == minor && server_patch < patch)
202 {
203 return false;
204 }
205 true
206 }
207
208 /// Publish a [Message] to a given subject.
209 ///
210 /// # Examples
211 /// ```no_run
212 /// # #[tokio::main]
213 /// # async fn main() -> Result<(), async_nats::Error> {
214 /// let client = async_nats::connect("demo.nats.io").await?;
215 /// client.publish("events.data", "payload".into()).await?;
216 /// # Ok(())
217 /// # }
218 /// ```
219 pub async fn publish<S: ToSubject>(
220 &self,
221 subject: S,
222 payload: Bytes,
223 ) -> Result<(), PublishError> {
224 let subject = subject.to_subject();
225 let max_payload = self.max_payload.load(Ordering::Relaxed);
226 if payload.len() > max_payload {
227 return Err(PublishError::with_source(
228 PublishErrorKind::MaxPayloadExceeded,
229 format!(
230 "Payload size limit of {} exceeded by message size of {}",
231 payload.len(),
232 max_payload
233 ),
234 ));
235 }
236
237 self.sender
238 .send(Command::Publish {
239 subject,
240 payload,
241 respond: None,
242 headers: None,
243 })
244 .await?;
245 Ok(())
246 }
247
248 /// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject.
249 pub fn publish_sink<S: ToSubject>(&self, subject: S) -> Publisher {
250 Publisher {
251 subject: subject.to_subject(),
252 respond: None,
253 headers: None,
254 sender: PollSender::new(self.sender.clone()),
255 }
256 }
257
258 /// Publish a [Message] with headers to a given subject.
259 ///
260 /// # Examples
261 /// ```
262 /// # #[tokio::main]
263 /// # async fn main() -> Result<(), async_nats::Error> {
264 /// use std::str::FromStr;
265 /// let client = async_nats::connect("demo.nats.io").await?;
266 /// let mut headers = async_nats::HeaderMap::new();
267 /// headers.insert(
268 /// "X-Header",
269 /// async_nats::HeaderValue::from_str("Value").unwrap(),
270 /// );
271 /// client
272 /// .publish_with_headers("events.data", headers, "payload".into())
273 /// .await?;
274 /// # Ok(())
275 /// # }
276 /// ```
277 pub async fn publish_with_headers<S: ToSubject>(
278 &self,
279 subject: S,
280 headers: HeaderMap,
281 payload: Bytes,
282 ) -> Result<(), PublishError> {
283 let subject = subject.to_subject();
284
285 self.sender
286 .send(Command::Publish {
287 subject,
288 payload,
289 respond: None,
290 headers: Some(headers),
291 })
292 .await?;
293 Ok(())
294 }
295
296 /// Returns a [Publisher], which can be used to send multiple [Messages](Message) with headers to a given subject.
297 pub fn publish_with_headers_sink<S: ToSubject>(
298 &self,
299 subject: S,
300 headers: HeaderMap,
301 ) -> Publisher {
302 Publisher {
303 subject: subject.to_subject(),
304 respond: None,
305 headers: Some(headers),
306 sender: PollSender::new(self.sender.clone()),
307 }
308 }
309
310 /// Publish a [Message] to a given subject, with specified response subject
311 /// to which the subscriber can respond.
312 /// This method does not await for the response.
313 ///
314 /// # Examples
315 ///
316 /// ```no_run
317 /// # #[tokio::main]
318 /// # async fn main() -> Result<(), async_nats::Error> {
319 /// let client = async_nats::connect("demo.nats.io").await?;
320 /// client
321 /// .publish_with_reply("events.data", "reply_subject", "payload".into())
322 /// .await?;
323 /// # Ok(())
324 /// # }
325 /// ```
326 pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
327 &self,
328 subject: S,
329 reply: R,
330 payload: Bytes,
331 ) -> Result<(), PublishError> {
332 let subject = subject.to_subject();
333 let reply = reply.to_subject();
334
335 self.sender
336 .send(Command::Publish {
337 subject,
338 payload,
339 respond: Some(reply),
340 headers: None,
341 })
342 .await?;
343 Ok(())
344 }
345
346 /// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject,
347 /// with specified response subject to which the subscriber can respond.
348 /// [Publisher] does not await for the response.
349 pub fn publish_with_reply_sink<S: ToSubject, R: ToSubject>(
350 &self,
351 subject: S,
352 reply: R,
353 ) -> Publisher {
354 Publisher {
355 subject: subject.to_subject(),
356 respond: Some(reply.to_subject()),
357 headers: None,
358 sender: PollSender::new(self.sender.clone()),
359 }
360 }
361
362 /// Publish a [Message] to a given subject with headers and specified response subject
363 /// to which the subscriber can respond.
364 /// This method does not await for the response.
365 ///
366 /// # Examples
367 ///
368 /// ```no_run
369 /// # #[tokio::main]
370 /// # async fn main() -> Result<(), async_nats::Error> {
371 /// use std::str::FromStr;
372 /// let client = async_nats::connect("demo.nats.io").await?;
373 /// let mut headers = async_nats::HeaderMap::new();
374 /// client
375 /// .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
376 /// .await?;
377 /// # Ok(())
378 /// # }
379 /// ```
380 pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
381 &self,
382 subject: S,
383 reply: R,
384 headers: HeaderMap,
385 payload: Bytes,
386 ) -> Result<(), PublishError> {
387 let subject = subject.to_subject();
388 let reply = reply.to_subject();
389
390 self.sender
391 .send(Command::Publish {
392 subject,
393 payload,
394 respond: Some(reply),
395 headers: Some(headers),
396 })
397 .await?;
398 Ok(())
399 }
400
401 /// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject,
402 /// with headers and specified response subject to which the subscriber can respond.
403 /// [Publisher] does not await for the response.
404 pub fn publish_with_reply_and_headers_sink<S: ToSubject, R: ToSubject>(
405 &self,
406 subject: S,
407 reply: R,
408 headers: HeaderMap,
409 ) -> Publisher {
410 Publisher {
411 subject: subject.to_subject(),
412 respond: Some(reply.to_subject()),
413 headers: Some(headers),
414 sender: PollSender::new(self.sender.clone()),
415 }
416 }
417
418 /// Sends the request with headers.
419 ///
420 /// # Examples
421 /// ```no_run
422 /// # #[tokio::main]
423 /// # async fn main() -> Result<(), async_nats::Error> {
424 /// let client = async_nats::connect("demo.nats.io").await?;
425 /// let response = client.request("service", "data".into()).await?;
426 /// # Ok(())
427 /// # }
428 /// ```
429 pub async fn request<S: ToSubject>(
430 &self,
431 subject: S,
432 payload: Bytes,
433 ) -> Result<Message, RequestError> {
434 let subject = subject.to_subject();
435
436 trace!(
437 "request sent to subject: {} ({})",
438 subject.as_ref(),
439 payload.len()
440 );
441 let request = Request::new().payload(payload);
442 self.send_request(subject, request).await
443 }
444
445 /// Sends the request with headers.
446 ///
447 /// # Examples
448 /// ```no_run
449 /// # #[tokio::main]
450 /// # async fn main() -> Result<(), async_nats::Error> {
451 /// let client = async_nats::connect("demo.nats.io").await?;
452 /// let mut headers = async_nats::HeaderMap::new();
453 /// headers.insert("Key", "Value");
454 /// let response = client
455 /// .request_with_headers("service", headers, "data".into())
456 /// .await?;
457 /// # Ok(())
458 /// # }
459 /// ```
460 pub async fn request_with_headers<S: ToSubject>(
461 &self,
462 subject: S,
463 headers: HeaderMap,
464 payload: Bytes,
465 ) -> Result<Message, RequestError> {
466 let subject = subject.to_subject();
467
468 let request = Request::new().headers(headers).payload(payload);
469 self.send_request(subject, request).await
470 }
471
472 /// Sends the request created by the [Request].
473 ///
474 /// # Examples
475 ///
476 /// ```no_run
477 /// # #[tokio::main]
478 /// # async fn main() -> Result<(), async_nats::Error> {
479 /// let client = async_nats::connect("demo.nats.io").await?;
480 /// let request = async_nats::Request::new().payload("data".into());
481 /// let response = client.send_request("service", request).await?;
482 /// # Ok(())
483 /// # }
484 /// ```
485 pub async fn send_request<S: ToSubject>(
486 &self,
487 subject: S,
488 request: Request,
489 ) -> Result<Message, RequestError> {
490 let subject = subject.to_subject();
491
492 if let Some(inbox) = request.inbox {
493 let timeout = request.timeout.unwrap_or(self.request_timeout);
494 let mut subscriber = self.subscribe(inbox.clone()).await?;
495 let payload: Bytes = request.payload.unwrap_or_default();
496 match request.headers {
497 Some(headers) => {
498 self.publish_with_reply_and_headers(subject, inbox, headers, payload)
499 .await?
500 }
501 None => self.publish_with_reply(subject, inbox, payload).await?,
502 }
503 let request = match timeout {
504 Some(timeout) => {
505 tokio::time::timeout(timeout, subscriber.next())
506 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
507 .await?
508 }
509 None => subscriber.next().await,
510 };
511 match request {
512 Some(message) => {
513 if message.status == Some(StatusCode::NO_RESPONDERS) {
514 return Err(RequestError::with_source(
515 RequestErrorKind::NoResponders,
516 "no responders",
517 ));
518 }
519 Ok(message)
520 }
521 None => Err(RequestError::with_source(
522 RequestErrorKind::Other,
523 "broken pipe",
524 )),
525 }
526 } else {
527 let (sender, receiver) = oneshot::channel();
528
529 let payload = request.payload.unwrap_or_default();
530 let respond = self.new_inbox().into();
531 let headers = request.headers;
532
533 self.sender
534 .send(Command::Request {
535 subject,
536 payload,
537 respond,
538 headers,
539 sender,
540 })
541 .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
542 .await?;
543
544 let timeout = request.timeout.unwrap_or(self.request_timeout);
545 let request = match timeout {
546 Some(timeout) => {
547 tokio::time::timeout(timeout, receiver)
548 .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
549 .await?
550 }
551 None => receiver.await,
552 };
553
554 match request {
555 Ok(message) => {
556 if message.status == Some(StatusCode::NO_RESPONDERS) {
557 return Err(RequestError::with_source(
558 RequestErrorKind::NoResponders,
559 "no responders",
560 ));
561 }
562 Ok(message)
563 }
564 Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
565 }
566 }
567 }
568
569 /// Create a new globally unique inbox which can be used for replies.
570 ///
571 /// # Examples
572 ///
573 /// ```no_run
574 /// # #[tokio::main]
575 /// # async fn main() -> Result<(), async_nats::Error> {
576 /// # let mut nc = async_nats::connect("demo.nats.io").await?;
577 /// let reply = nc.new_inbox();
578 /// let rsub = nc.subscribe(reply).await?;
579 /// # Ok(())
580 /// # }
581 /// ```
582 pub fn new_inbox(&self) -> String {
583 format!("{}.{}", self.inbox_prefix, nuid::next())
584 }
585
586 /// Subscribes to a subject to receive [messages][Message].
587 ///
588 /// # Examples
589 ///
590 /// ```no_run
591 /// # #[tokio::main]
592 /// # async fn main() -> Result<(), async_nats::Error> {
593 /// use futures::StreamExt;
594 /// let client = async_nats::connect("demo.nats.io").await?;
595 /// let mut subscription = client.subscribe("events.>").await?;
596 /// while let Some(message) = subscription.next().await {
597 /// println!("received message: {:?}", message);
598 /// }
599 /// # Ok(())
600 /// # }
601 /// ```
602 pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
603 let subject = subject.to_subject();
604 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
605 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
606
607 self.sender
608 .send(Command::Subscribe {
609 sid,
610 subject,
611 queue_group: None,
612 sender,
613 })
614 .await?;
615
616 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
617 }
618
619 /// Subscribes to a subject with a queue group to receive [messages][Message].
620 ///
621 /// # Examples
622 ///
623 /// ```no_run
624 /// # #[tokio::main]
625 /// # async fn main() -> Result<(), async_nats::Error> {
626 /// use futures::StreamExt;
627 /// let client = async_nats::connect("demo.nats.io").await?;
628 /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
629 /// while let Some(message) = subscription.next().await {
630 /// println!("received message: {:?}", message);
631 /// }
632 /// # Ok(())
633 /// # }
634 /// ```
635 pub async fn queue_subscribe<S: ToSubject>(
636 &self,
637 subject: S,
638 queue_group: String,
639 ) -> Result<Subscriber, SubscribeError> {
640 let subject = subject.to_subject();
641
642 let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
643 let (sender, receiver) = mpsc::channel(self.subscription_capacity);
644
645 self.sender
646 .send(Command::Subscribe {
647 sid,
648 subject,
649 queue_group: Some(queue_group),
650 sender,
651 })
652 .await?;
653
654 Ok(Subscriber::new(sid, self.sender.clone(), receiver))
655 }
656
657 /// Flushes the internal buffer ensuring that all messages are sent.
658 ///
659 /// # Examples
660 ///
661 /// ```no_run
662 /// # #[tokio::main]
663 /// # async fn main() -> Result<(), async_nats::Error> {
664 /// let client = async_nats::connect("demo.nats.io").await?;
665 /// client.flush().await?;
666 /// # Ok(())
667 /// # }
668 /// ```
669 pub async fn flush(&self) -> Result<(), FlushError> {
670 let (tx, rx) = tokio::sync::oneshot::channel();
671 self.sender
672 .send(Command::Flush { observer: tx })
673 .await
674 .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
675
676 rx.await
677 .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
678 Ok(())
679 }
680
681 /// Returns the current state of the connection.
682 ///
683 /// # Examples
684 /// ```no_run
685 /// # #[tokio::main]
686 /// # async fn main() -> Result<(), async_nats::Error> {
687 /// let client = async_nats::connect("demo.nats.io").await?;
688 /// println!("connection state: {}", client.connection_state());
689 /// # Ok(())
690 /// # }
691 /// ```
692 pub fn connection_state(&self) -> State {
693 self.state.borrow().to_owned()
694 }
695
696 /// Forces the client to reconnect.
697 /// Keep in mind that client will reconnect automatically if the connection is lost and this
698 /// method does not have to be used in normal circumstances.
699 /// However, if you want to force the client to reconnect, for example to re-trigger
700 /// the `auth-callback`, or manually rebalance connections, this method can be useful.
701 /// This method does not wait for connection to be re-established.
702 ///
703 /// # Examples
704 /// ```no_run
705 /// # #[tokio::main]
706 /// # async fn main() -> Result<(), async_nats::Error> {
707 /// let client = async_nats::connect("demo.nats.io").await?;
708 /// client.force_reconnect().await?;
709 /// # Ok(())
710 /// # }
711 /// ```
712 pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
713 self.sender
714 .send(Command::Reconnect)
715 .await
716 .map_err(Into::into)
717 }
718}
719
720/// Used for building customized requests.
721#[derive(Default)]
722pub struct Request {
723 payload: Option<Bytes>,
724 headers: Option<HeaderMap>,
725 timeout: Option<Option<Duration>>,
726 inbox: Option<String>,
727}
728
729impl Request {
730 pub fn new() -> Request {
731 Default::default()
732 }
733
734 /// Sets the payload of the request. If not used, empty payload will be sent.
735 ///
736 /// # Examples
737 /// ```no_run
738 /// # #[tokio::main]
739 /// # async fn main() -> Result<(), async_nats::Error> {
740 /// let client = async_nats::connect("demo.nats.io").await?;
741 /// let request = async_nats::Request::new().payload("data".into());
742 /// client.send_request("service", request).await?;
743 /// # Ok(())
744 /// # }
745 /// ```
746 pub fn payload(mut self, payload: Bytes) -> Request {
747 self.payload = Some(payload);
748 self
749 }
750
751 /// Sets the headers of the requests.
752 ///
753 /// # Examples
754 /// ```no_run
755 /// # #[tokio::main]
756 /// # async fn main() -> Result<(), async_nats::Error> {
757 /// use std::str::FromStr;
758 /// let client = async_nats::connect("demo.nats.io").await?;
759 /// let mut headers = async_nats::HeaderMap::new();
760 /// headers.insert(
761 /// "X-Example",
762 /// async_nats::HeaderValue::from_str("Value").unwrap(),
763 /// );
764 /// let request = async_nats::Request::new()
765 /// .headers(headers)
766 /// .payload("data".into());
767 /// client.send_request("service", request).await?;
768 /// # Ok(())
769 /// # }
770 /// ```
771 pub fn headers(mut self, headers: HeaderMap) -> Request {
772 self.headers = Some(headers);
773 self
774 }
775
776 /// Sets the custom timeout of the request. Overrides default [Client] timeout.
777 /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
778 /// To use default timeout, simply do not call this function.
779 ///
780 /// # Examples
781 /// ```no_run
782 /// # #[tokio::main]
783 /// # async fn main() -> Result<(), async_nats::Error> {
784 /// let client = async_nats::connect("demo.nats.io").await?;
785 /// let request = async_nats::Request::new()
786 /// .timeout(Some(std::time::Duration::from_secs(15)))
787 /// .payload("data".into());
788 /// client.send_request("service", request).await?;
789 /// # Ok(())
790 /// # }
791 /// ```
792 pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
793 self.timeout = Some(timeout);
794 self
795 }
796
797 /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
798 ///
799 /// # Examples
800 /// ```no_run
801 /// # #[tokio::main]
802 /// # async fn main() -> Result<(), async_nats::Error> {
803 /// use std::str::FromStr;
804 /// let client = async_nats::connect("demo.nats.io").await?;
805 /// let request = async_nats::Request::new()
806 /// .inbox("custom_inbox".into())
807 /// .payload("data".into());
808 /// client.send_request("service", request).await?;
809 /// # Ok(())
810 /// # }
811 /// ```
812 pub fn inbox(mut self, inbox: String) -> Request {
813 self.inbox = Some(inbox);
814 self
815 }
816}
817
818#[derive(Error, Debug)]
819#[error("failed to send reconnect: {0}")]
820pub struct ReconnectError(#[source] crate::Error);
821
822impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
823 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
824 ReconnectError(Box::new(err))
825 }
826}
827
828#[derive(Error, Debug)]
829#[error("failed to send subscribe: {0}")]
830pub struct SubscribeError(#[source] crate::Error);
831
832impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
833 fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
834 SubscribeError(Box::new(err))
835 }
836}
837
838#[derive(Clone, Copy, Debug, PartialEq)]
839pub enum RequestErrorKind {
840 /// There are services listening on requested subject, but they didn't respond
841 /// in time.
842 TimedOut,
843 /// No one is listening on request subject.
844 NoResponders,
845 /// Other errors, client/io related.
846 Other,
847}
848
849impl Display for RequestErrorKind {
850 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
851 match self {
852 Self::TimedOut => write!(f, "request timed out"),
853 Self::NoResponders => write!(f, "no responders"),
854 Self::Other => write!(f, "request failed"),
855 }
856 }
857}
858
859/// Error returned when a core NATS request fails.
860/// To be enumerate over the variants, call [RequestError::kind].
861pub type RequestError = Error<RequestErrorKind>;
862
863impl From<PublishError> for RequestError {
864 fn from(e: PublishError) -> Self {
865 RequestError::with_source(RequestErrorKind::Other, e)
866 }
867}
868
869impl From<SubscribeError> for RequestError {
870 fn from(e: SubscribeError) -> Self {
871 RequestError::with_source(RequestErrorKind::Other, e)
872 }
873}
874
875#[derive(Clone, Copy, Debug, PartialEq)]
876pub enum FlushErrorKind {
877 /// Sending the flush failed client side.
878 SendError,
879 /// Flush failed.
880 /// This can happen mostly in case of connection issues
881 /// that cannot be resolved quickly.
882 FlushError,
883}
884
885impl Display for FlushErrorKind {
886 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
887 match self {
888 Self::SendError => write!(f, "failed to send flush request"),
889 Self::FlushError => write!(f, "flush failed"),
890 }
891 }
892}
893
894pub type FlushError = Error<FlushErrorKind>;