async_nats_wrpc/
client.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14use core::pin::Pin;
15use core::task::{Context, Poll};
16
17use crate::connection::State;
18use crate::subject::{Subject, ToSubject};
19use crate::ServerInfo;
20
21use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
22use crate::error::Error;
23use bytes::Bytes;
24use futures::future::TryFutureExt;
25use futures::{Sink, SinkExt as _, StreamExt};
26use once_cell::sync::Lazy;
27use portable_atomic::AtomicU64;
28use regex::Regex;
29use std::fmt::Display;
30use std::sync::atomic::{AtomicUsize, Ordering};
31use std::sync::Arc;
32use std::time::Duration;
33use thiserror::Error;
34use tokio::sync::{mpsc, oneshot};
35use tokio_util::sync::PollSender;
36use tracing::trace;
37
38static VERSION_RE: Lazy<Regex> =
39    Lazy::new(|| Regex::new(r"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?").unwrap());
40
41/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
42/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
43/// This error is also used as [`Publisher::Error`]
44pub type PublishError = Error<PublishErrorKind>;
45
46impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
47    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
48        PublishError::with_source(PublishErrorKind::Send, err)
49    }
50}
51
52impl From<tokio_util::sync::PollSendError<Command>> for PublishError {
53    fn from(err: tokio_util::sync::PollSendError<Command>) -> Self {
54        PublishError::with_source(PublishErrorKind::Send, err)
55    }
56}
57
58#[derive(Copy, Clone, Debug, PartialEq)]
59pub enum PublishErrorKind {
60    MaxPayloadExceeded,
61    Send,
62}
63
64impl Display for PublishErrorKind {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        match self {
67            PublishErrorKind::MaxPayloadExceeded => write!(f, "max payload size exceeded"),
68            PublishErrorKind::Send => write!(f, "failed to send message"),
69        }
70    }
71}
72
73/// [`Sink<Bytes>`] created by [`Client::publish_sink`], [`Client::publish_with_reply_sink`],
74/// [`Client::publish_with_headers_sink`] or [`Client::publish_with_reply_and_headers_sink`]
75#[derive(Clone, Debug)]
76pub struct Publisher {
77    subject: Subject,
78    respond: Option<Subject>,
79    headers: Option<HeaderMap>,
80    sender: PollSender<Command>,
81}
82
83impl Sink<Bytes> for Publisher {
84    type Error = PublishError;
85
86    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87        self.sender.poll_ready_unpin(cx).map_err(Into::into)
88    }
89
90    fn start_send(mut self: Pin<&mut Self>, payload: Bytes) -> Result<(), Self::Error> {
91        let headers = self.headers.clone();
92        let respond = self.respond.clone();
93        let subject = self.subject.clone();
94        self.sender
95            .start_send_unpin(Command::Publish {
96                subject,
97                payload,
98                respond,
99                headers,
100            })
101            .map_err(Into::into)
102    }
103
104    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
105        self.sender.poll_flush_unpin(cx).map_err(Into::into)
106    }
107
108    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
109        self.sender.poll_close_unpin(cx).map_err(Into::into)
110    }
111}
112
113/// Client is a `Cloneable` handle to NATS connection.
114/// Client should not be created directly. Instead, one of two methods can be used:
115/// [crate::connect] and [crate::ConnectOptions::connect]
116#[derive(Clone, Debug)]
117pub struct Client {
118    info: tokio::sync::watch::Receiver<ServerInfo>,
119    pub(crate) state: tokio::sync::watch::Receiver<State>,
120    pub(crate) sender: mpsc::Sender<Command>,
121    next_subscription_id: Arc<AtomicU64>,
122    subscription_capacity: usize,
123    inbox_prefix: Arc<str>,
124    request_timeout: Option<Duration>,
125    max_payload: Arc<AtomicUsize>,
126}
127
128impl Client {
129    pub(crate) fn new(
130        info: tokio::sync::watch::Receiver<ServerInfo>,
131        state: tokio::sync::watch::Receiver<State>,
132        sender: mpsc::Sender<Command>,
133        capacity: usize,
134        inbox_prefix: String,
135        request_timeout: Option<Duration>,
136        max_payload: Arc<AtomicUsize>,
137    ) -> Client {
138        Client {
139            info,
140            state,
141            sender,
142            next_subscription_id: Arc::new(AtomicU64::new(1)),
143            subscription_capacity: capacity,
144            inbox_prefix: inbox_prefix.into(),
145            request_timeout,
146            max_payload,
147        }
148    }
149
150    /// Returns last received info from the server.
151    ///
152    /// # Examples
153    ///
154    /// ```no_run
155    /// # #[tokio::main]
156    /// # async fn main () -> Result<(), async_nats::Error> {
157    /// let client = async_nats::connect("demo.nats.io").await?;
158    /// println!("info: {:?}", client.server_info());
159    /// # Ok(())
160    /// # }
161    /// ```
162    pub fn server_info(&self) -> ServerInfo {
163        // We ignore notifying the watcher, as that requires mutable client reference.
164        self.info.borrow().to_owned()
165    }
166
167    /// Returns true if the server version is compatible with the version components.
168    ///
169    /// # Examples
170    ///
171    /// ```no_run
172    /// # #[tokio::main]
173    /// # async fn main() -> Result<(), async_nats::Error> {
174    /// let client = async_nats::connect("demo.nats.io").await?;
175    /// assert!(client.is_server_compatible(2, 8, 4));
176    /// # Ok(())
177    /// # }
178    /// ```
179    pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
180        let info = self.server_info();
181
182        let server_version_captures = VERSION_RE.captures(&info.version).unwrap();
183
184        let server_major = server_version_captures
185            .get(1)
186            .map(|m| m.as_str().parse::<i64>().unwrap())
187            .unwrap();
188
189        let server_minor = server_version_captures
190            .get(2)
191            .map(|m| m.as_str().parse::<i64>().unwrap())
192            .unwrap();
193
194        let server_patch = server_version_captures
195            .get(3)
196            .map(|m| m.as_str().parse::<i64>().unwrap())
197            .unwrap();
198
199        if server_major < major
200            || (server_major == major && server_minor < minor)
201            || (server_major == major && server_minor == minor && server_patch < patch)
202        {
203            return false;
204        }
205        true
206    }
207
208    /// Publish a [Message] to a given subject.
209    ///
210    /// # Examples
211    /// ```no_run
212    /// # #[tokio::main]
213    /// # async fn main() -> Result<(), async_nats::Error> {
214    /// let client = async_nats::connect("demo.nats.io").await?;
215    /// client.publish("events.data", "payload".into()).await?;
216    /// # Ok(())
217    /// # }
218    /// ```
219    pub async fn publish<S: ToSubject>(
220        &self,
221        subject: S,
222        payload: Bytes,
223    ) -> Result<(), PublishError> {
224        let subject = subject.to_subject();
225        let max_payload = self.max_payload.load(Ordering::Relaxed);
226        if payload.len() > max_payload {
227            return Err(PublishError::with_source(
228                PublishErrorKind::MaxPayloadExceeded,
229                format!(
230                    "Payload size limit of {} exceeded by message size of {}",
231                    payload.len(),
232                    max_payload
233                ),
234            ));
235        }
236
237        self.sender
238            .send(Command::Publish {
239                subject,
240                payload,
241                respond: None,
242                headers: None,
243            })
244            .await?;
245        Ok(())
246    }
247
248    /// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject.
249    pub fn publish_sink<S: ToSubject>(&self, subject: S) -> Publisher {
250        Publisher {
251            subject: subject.to_subject(),
252            respond: None,
253            headers: None,
254            sender: PollSender::new(self.sender.clone()),
255        }
256    }
257
258    /// Publish a [Message] with headers to a given subject.
259    ///
260    /// # Examples
261    /// ```
262    /// # #[tokio::main]
263    /// # async fn main() -> Result<(), async_nats::Error> {
264    /// use std::str::FromStr;
265    /// let client = async_nats::connect("demo.nats.io").await?;
266    /// let mut headers = async_nats::HeaderMap::new();
267    /// headers.insert(
268    ///     "X-Header",
269    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
270    /// );
271    /// client
272    ///     .publish_with_headers("events.data", headers, "payload".into())
273    ///     .await?;
274    /// # Ok(())
275    /// # }
276    /// ```
277    pub async fn publish_with_headers<S: ToSubject>(
278        &self,
279        subject: S,
280        headers: HeaderMap,
281        payload: Bytes,
282    ) -> Result<(), PublishError> {
283        let subject = subject.to_subject();
284
285        self.sender
286            .send(Command::Publish {
287                subject,
288                payload,
289                respond: None,
290                headers: Some(headers),
291            })
292            .await?;
293        Ok(())
294    }
295
296    /// Returns a [Publisher], which can be used to send multiple [Messages](Message) with headers to a given subject.
297    pub fn publish_with_headers_sink<S: ToSubject>(
298        &self,
299        subject: S,
300        headers: HeaderMap,
301    ) -> Publisher {
302        Publisher {
303            subject: subject.to_subject(),
304            respond: None,
305            headers: Some(headers),
306            sender: PollSender::new(self.sender.clone()),
307        }
308    }
309
310    /// Publish a [Message] to a given subject, with specified response subject
311    /// to which the subscriber can respond.
312    /// This method does not await for the response.
313    ///
314    /// # Examples
315    ///
316    /// ```no_run
317    /// # #[tokio::main]
318    /// # async fn main() -> Result<(), async_nats::Error> {
319    /// let client = async_nats::connect("demo.nats.io").await?;
320    /// client
321    ///     .publish_with_reply("events.data", "reply_subject", "payload".into())
322    ///     .await?;
323    /// # Ok(())
324    /// # }
325    /// ```
326    pub async fn publish_with_reply<S: ToSubject, R: ToSubject>(
327        &self,
328        subject: S,
329        reply: R,
330        payload: Bytes,
331    ) -> Result<(), PublishError> {
332        let subject = subject.to_subject();
333        let reply = reply.to_subject();
334
335        self.sender
336            .send(Command::Publish {
337                subject,
338                payload,
339                respond: Some(reply),
340                headers: None,
341            })
342            .await?;
343        Ok(())
344    }
345
346    /// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject,
347    /// with specified response subject to which the subscriber can respond.
348    /// [Publisher] does not await for the response.
349    pub fn publish_with_reply_sink<S: ToSubject, R: ToSubject>(
350        &self,
351        subject: S,
352        reply: R,
353    ) -> Publisher {
354        Publisher {
355            subject: subject.to_subject(),
356            respond: Some(reply.to_subject()),
357            headers: None,
358            sender: PollSender::new(self.sender.clone()),
359        }
360    }
361
362    /// Publish a [Message] to a given subject with headers and specified response subject
363    /// to which the subscriber can respond.
364    /// This method does not await for the response.
365    ///
366    /// # Examples
367    ///
368    /// ```no_run
369    /// # #[tokio::main]
370    /// # async fn main() -> Result<(), async_nats::Error> {
371    /// use std::str::FromStr;
372    /// let client = async_nats::connect("demo.nats.io").await?;
373    /// let mut headers = async_nats::HeaderMap::new();
374    /// client
375    ///     .publish_with_reply_and_headers("events.data", "reply_subject", headers, "payload".into())
376    ///     .await?;
377    /// # Ok(())
378    /// # }
379    /// ```
380    pub async fn publish_with_reply_and_headers<S: ToSubject, R: ToSubject>(
381        &self,
382        subject: S,
383        reply: R,
384        headers: HeaderMap,
385        payload: Bytes,
386    ) -> Result<(), PublishError> {
387        let subject = subject.to_subject();
388        let reply = reply.to_subject();
389
390        self.sender
391            .send(Command::Publish {
392                subject,
393                payload,
394                respond: Some(reply),
395                headers: Some(headers),
396            })
397            .await?;
398        Ok(())
399    }
400
401    /// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject,
402    /// with headers and specified response subject to which the subscriber can respond.
403    /// [Publisher] does not await for the response.
404    pub fn publish_with_reply_and_headers_sink<S: ToSubject, R: ToSubject>(
405        &self,
406        subject: S,
407        reply: R,
408        headers: HeaderMap,
409    ) -> Publisher {
410        Publisher {
411            subject: subject.to_subject(),
412            respond: Some(reply.to_subject()),
413            headers: Some(headers),
414            sender: PollSender::new(self.sender.clone()),
415        }
416    }
417
418    /// Sends the request with headers.
419    ///
420    /// # Examples
421    /// ```no_run
422    /// # #[tokio::main]
423    /// # async fn main() -> Result<(), async_nats::Error> {
424    /// let client = async_nats::connect("demo.nats.io").await?;
425    /// let response = client.request("service", "data".into()).await?;
426    /// # Ok(())
427    /// # }
428    /// ```
429    pub async fn request<S: ToSubject>(
430        &self,
431        subject: S,
432        payload: Bytes,
433    ) -> Result<Message, RequestError> {
434        let subject = subject.to_subject();
435
436        trace!(
437            "request sent to subject: {} ({})",
438            subject.as_ref(),
439            payload.len()
440        );
441        let request = Request::new().payload(payload);
442        self.send_request(subject, request).await
443    }
444
445    /// Sends the request with headers.
446    ///
447    /// # Examples
448    /// ```no_run
449    /// # #[tokio::main]
450    /// # async fn main() -> Result<(), async_nats::Error> {
451    /// let client = async_nats::connect("demo.nats.io").await?;
452    /// let mut headers = async_nats::HeaderMap::new();
453    /// headers.insert("Key", "Value");
454    /// let response = client
455    ///     .request_with_headers("service", headers, "data".into())
456    ///     .await?;
457    /// # Ok(())
458    /// # }
459    /// ```
460    pub async fn request_with_headers<S: ToSubject>(
461        &self,
462        subject: S,
463        headers: HeaderMap,
464        payload: Bytes,
465    ) -> Result<Message, RequestError> {
466        let subject = subject.to_subject();
467
468        let request = Request::new().headers(headers).payload(payload);
469        self.send_request(subject, request).await
470    }
471
472    /// Sends the request created by the [Request].
473    ///
474    /// # Examples
475    ///
476    /// ```no_run
477    /// # #[tokio::main]
478    /// # async fn main() -> Result<(), async_nats::Error> {
479    /// let client = async_nats::connect("demo.nats.io").await?;
480    /// let request = async_nats::Request::new().payload("data".into());
481    /// let response = client.send_request("service", request).await?;
482    /// # Ok(())
483    /// # }
484    /// ```
485    pub async fn send_request<S: ToSubject>(
486        &self,
487        subject: S,
488        request: Request,
489    ) -> Result<Message, RequestError> {
490        let subject = subject.to_subject();
491
492        if let Some(inbox) = request.inbox {
493            let timeout = request.timeout.unwrap_or(self.request_timeout);
494            let mut subscriber = self.subscribe(inbox.clone()).await?;
495            let payload: Bytes = request.payload.unwrap_or_default();
496            match request.headers {
497                Some(headers) => {
498                    self.publish_with_reply_and_headers(subject, inbox, headers, payload)
499                        .await?
500                }
501                None => self.publish_with_reply(subject, inbox, payload).await?,
502            }
503            let request = match timeout {
504                Some(timeout) => {
505                    tokio::time::timeout(timeout, subscriber.next())
506                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
507                        .await?
508                }
509                None => subscriber.next().await,
510            };
511            match request {
512                Some(message) => {
513                    if message.status == Some(StatusCode::NO_RESPONDERS) {
514                        return Err(RequestError::with_source(
515                            RequestErrorKind::NoResponders,
516                            "no responders",
517                        ));
518                    }
519                    Ok(message)
520                }
521                None => Err(RequestError::with_source(
522                    RequestErrorKind::Other,
523                    "broken pipe",
524                )),
525            }
526        } else {
527            let (sender, receiver) = oneshot::channel();
528
529            let payload = request.payload.unwrap_or_default();
530            let respond = self.new_inbox().into();
531            let headers = request.headers;
532
533            self.sender
534                .send(Command::Request {
535                    subject,
536                    payload,
537                    respond,
538                    headers,
539                    sender,
540                })
541                .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
542                .await?;
543
544            let timeout = request.timeout.unwrap_or(self.request_timeout);
545            let request = match timeout {
546                Some(timeout) => {
547                    tokio::time::timeout(timeout, receiver)
548                        .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
549                        .await?
550                }
551                None => receiver.await,
552            };
553
554            match request {
555                Ok(message) => {
556                    if message.status == Some(StatusCode::NO_RESPONDERS) {
557                        return Err(RequestError::with_source(
558                            RequestErrorKind::NoResponders,
559                            "no responders",
560                        ));
561                    }
562                    Ok(message)
563                }
564                Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
565            }
566        }
567    }
568
569    /// Create a new globally unique inbox which can be used for replies.
570    ///
571    /// # Examples
572    ///
573    /// ```no_run
574    /// # #[tokio::main]
575    /// # async fn main() -> Result<(), async_nats::Error> {
576    /// # let mut nc = async_nats::connect("demo.nats.io").await?;
577    /// let reply = nc.new_inbox();
578    /// let rsub = nc.subscribe(reply).await?;
579    /// # Ok(())
580    /// # }
581    /// ```
582    pub fn new_inbox(&self) -> String {
583        format!("{}.{}", self.inbox_prefix, nuid::next())
584    }
585
586    /// Subscribes to a subject to receive [messages][Message].
587    ///
588    /// # Examples
589    ///
590    /// ```no_run
591    /// # #[tokio::main]
592    /// # async fn main() -> Result<(), async_nats::Error> {
593    /// use futures::StreamExt;
594    /// let client = async_nats::connect("demo.nats.io").await?;
595    /// let mut subscription = client.subscribe("events.>").await?;
596    /// while let Some(message) = subscription.next().await {
597    ///     println!("received message: {:?}", message);
598    /// }
599    /// # Ok(())
600    /// # }
601    /// ```
602    pub async fn subscribe<S: ToSubject>(&self, subject: S) -> Result<Subscriber, SubscribeError> {
603        let subject = subject.to_subject();
604        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
605        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
606
607        self.sender
608            .send(Command::Subscribe {
609                sid,
610                subject,
611                queue_group: None,
612                sender,
613            })
614            .await?;
615
616        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
617    }
618
619    /// Subscribes to a subject with a queue group to receive [messages][Message].
620    ///
621    /// # Examples
622    ///
623    /// ```no_run
624    /// # #[tokio::main]
625    /// # async fn main() -> Result<(), async_nats::Error> {
626    /// use futures::StreamExt;
627    /// let client = async_nats::connect("demo.nats.io").await?;
628    /// let mut subscription = client.queue_subscribe("events.>", "queue".into()).await?;
629    /// while let Some(message) = subscription.next().await {
630    ///     println!("received message: {:?}", message);
631    /// }
632    /// # Ok(())
633    /// # }
634    /// ```
635    pub async fn queue_subscribe<S: ToSubject>(
636        &self,
637        subject: S,
638        queue_group: String,
639    ) -> Result<Subscriber, SubscribeError> {
640        let subject = subject.to_subject();
641
642        let sid = self.next_subscription_id.fetch_add(1, Ordering::Relaxed);
643        let (sender, receiver) = mpsc::channel(self.subscription_capacity);
644
645        self.sender
646            .send(Command::Subscribe {
647                sid,
648                subject,
649                queue_group: Some(queue_group),
650                sender,
651            })
652            .await?;
653
654        Ok(Subscriber::new(sid, self.sender.clone(), receiver))
655    }
656
657    /// Flushes the internal buffer ensuring that all messages are sent.
658    ///
659    /// # Examples
660    ///
661    /// ```no_run
662    /// # #[tokio::main]
663    /// # async fn main() -> Result<(), async_nats::Error> {
664    /// let client = async_nats::connect("demo.nats.io").await?;
665    /// client.flush().await?;
666    /// # Ok(())
667    /// # }
668    /// ```
669    pub async fn flush(&self) -> Result<(), FlushError> {
670        let (tx, rx) = tokio::sync::oneshot::channel();
671        self.sender
672            .send(Command::Flush { observer: tx })
673            .await
674            .map_err(|err| FlushError::with_source(FlushErrorKind::SendError, err))?;
675
676        rx.await
677            .map_err(|err| FlushError::with_source(FlushErrorKind::FlushError, err))?;
678        Ok(())
679    }
680
681    /// Returns the current state of the connection.
682    ///
683    /// # Examples
684    /// ```no_run
685    /// # #[tokio::main]
686    /// # async fn main() -> Result<(), async_nats::Error> {
687    /// let client = async_nats::connect("demo.nats.io").await?;
688    /// println!("connection state: {}", client.connection_state());
689    /// # Ok(())
690    /// # }
691    /// ```
692    pub fn connection_state(&self) -> State {
693        self.state.borrow().to_owned()
694    }
695
696    /// Forces the client to reconnect.
697    /// Keep in mind that client will reconnect automatically if the connection is lost and this
698    /// method does not have to be used in normal circumstances.
699    /// However, if you want to force the client to reconnect, for example to re-trigger
700    /// the `auth-callback`, or manually rebalance connections, this method can be useful.
701    /// This method does not wait for connection to be re-established.
702    ///
703    /// # Examples
704    /// ```no_run
705    /// # #[tokio::main]
706    /// # async fn main() -> Result<(), async_nats::Error> {
707    /// let client = async_nats::connect("demo.nats.io").await?;
708    /// client.force_reconnect().await?;
709    /// # Ok(())
710    /// # }
711    /// ```
712    pub async fn force_reconnect(&self) -> Result<(), ReconnectError> {
713        self.sender
714            .send(Command::Reconnect)
715            .await
716            .map_err(Into::into)
717    }
718}
719
720/// Used for building customized requests.
721#[derive(Default)]
722pub struct Request {
723    payload: Option<Bytes>,
724    headers: Option<HeaderMap>,
725    timeout: Option<Option<Duration>>,
726    inbox: Option<String>,
727}
728
729impl Request {
730    pub fn new() -> Request {
731        Default::default()
732    }
733
734    /// Sets the payload of the request. If not used, empty payload will be sent.
735    ///
736    /// # Examples
737    /// ```no_run
738    /// # #[tokio::main]
739    /// # async fn main() -> Result<(), async_nats::Error> {
740    /// let client = async_nats::connect("demo.nats.io").await?;
741    /// let request = async_nats::Request::new().payload("data".into());
742    /// client.send_request("service", request).await?;
743    /// # Ok(())
744    /// # }
745    /// ```
746    pub fn payload(mut self, payload: Bytes) -> Request {
747        self.payload = Some(payload);
748        self
749    }
750
751    /// Sets the headers of the requests.
752    ///
753    /// # Examples
754    /// ```no_run
755    /// # #[tokio::main]
756    /// # async fn main() -> Result<(), async_nats::Error> {
757    /// use std::str::FromStr;
758    /// let client = async_nats::connect("demo.nats.io").await?;
759    /// let mut headers = async_nats::HeaderMap::new();
760    /// headers.insert(
761    ///     "X-Example",
762    ///     async_nats::HeaderValue::from_str("Value").unwrap(),
763    /// );
764    /// let request = async_nats::Request::new()
765    ///     .headers(headers)
766    ///     .payload("data".into());
767    /// client.send_request("service", request).await?;
768    /// # Ok(())
769    /// # }
770    /// ```
771    pub fn headers(mut self, headers: HeaderMap) -> Request {
772        self.headers = Some(headers);
773        self
774    }
775
776    /// Sets the custom timeout of the request. Overrides default [Client] timeout.
777    /// Setting it to [Option::None] disables the timeout entirely which might result in deadlock.
778    /// To use default timeout, simply do not call this function.
779    ///
780    /// # Examples
781    /// ```no_run
782    /// # #[tokio::main]
783    /// # async fn main() -> Result<(), async_nats::Error> {
784    /// let client = async_nats::connect("demo.nats.io").await?;
785    /// let request = async_nats::Request::new()
786    ///     .timeout(Some(std::time::Duration::from_secs(15)))
787    ///     .payload("data".into());
788    /// client.send_request("service", request).await?;
789    /// # Ok(())
790    /// # }
791    /// ```
792    pub fn timeout(mut self, timeout: Option<Duration>) -> Request {
793        self.timeout = Some(timeout);
794        self
795    }
796
797    /// Sets custom inbox for this request. Overrides both customized and default [Client] Inbox.
798    ///
799    /// # Examples
800    /// ```no_run
801    /// # #[tokio::main]
802    /// # async fn main() -> Result<(), async_nats::Error> {
803    /// use std::str::FromStr;
804    /// let client = async_nats::connect("demo.nats.io").await?;
805    /// let request = async_nats::Request::new()
806    ///     .inbox("custom_inbox".into())
807    ///     .payload("data".into());
808    /// client.send_request("service", request).await?;
809    /// # Ok(())
810    /// # }
811    /// ```
812    pub fn inbox(mut self, inbox: String) -> Request {
813        self.inbox = Some(inbox);
814        self
815    }
816}
817
818#[derive(Error, Debug)]
819#[error("failed to send reconnect: {0}")]
820pub struct ReconnectError(#[source] crate::Error);
821
822impl From<tokio::sync::mpsc::error::SendError<Command>> for ReconnectError {
823    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
824        ReconnectError(Box::new(err))
825    }
826}
827
828#[derive(Error, Debug)]
829#[error("failed to send subscribe: {0}")]
830pub struct SubscribeError(#[source] crate::Error);
831
832impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
833    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
834        SubscribeError(Box::new(err))
835    }
836}
837
838#[derive(Clone, Copy, Debug, PartialEq)]
839pub enum RequestErrorKind {
840    /// There are services listening on requested subject, but they didn't respond
841    /// in time.
842    TimedOut,
843    /// No one is listening on request subject.
844    NoResponders,
845    /// Other errors, client/io related.
846    Other,
847}
848
849impl Display for RequestErrorKind {
850    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
851        match self {
852            Self::TimedOut => write!(f, "request timed out"),
853            Self::NoResponders => write!(f, "no responders"),
854            Self::Other => write!(f, "request failed"),
855        }
856    }
857}
858
859/// Error returned when a core NATS request fails.
860/// To be enumerate over the variants, call [RequestError::kind].
861pub type RequestError = Error<RequestErrorKind>;
862
863impl From<PublishError> for RequestError {
864    fn from(e: PublishError) -> Self {
865        RequestError::with_source(RequestErrorKind::Other, e)
866    }
867}
868
869impl From<SubscribeError> for RequestError {
870    fn from(e: SubscribeError) -> Self {
871        RequestError::with_source(RequestErrorKind::Other, e)
872    }
873}
874
875#[derive(Clone, Copy, Debug, PartialEq)]
876pub enum FlushErrorKind {
877    /// Sending the flush failed client side.
878    SendError,
879    /// Flush failed.
880    /// This can happen mostly in case of connection issues
881    /// that cannot be resolved quickly.
882    FlushError,
883}
884
885impl Display for FlushErrorKind {
886    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
887        match self {
888            Self::SendError => write!(f, "failed to send flush request"),
889            Self::FlushError => write!(f, "flush failed"),
890        }
891    }
892}
893
894pub type FlushError = Error<FlushErrorKind>;