async_nats_wrpc/
lib.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A Rust asynchronous client for the NATS.io ecosystem.
15//!
16//! To access the repository, you can clone it by running:
17//!
18//! ```bash
19//! git clone https://github.com/nats-io/nats.rs
20//! ````
21//! NATS.io is a simple, secure, and high-performance open-source messaging
22//! system designed for cloud-native applications, IoT messaging, and microservices
23//! architectures.
24//!
25//! **Note**: The synchronous NATS API is deprecated and no longer actively maintained. If you need to use the deprecated synchronous API, you can refer to:
26//! <https://crates.io/crates/nats>
27//!
28//! For more information on NATS.io visit: <https://nats.io>
29//!
30//! ## Examples
31//!
32//! Below, you can find some basic examples on how to use this library.
33//!
34//! For more details, please refer to the specific methods and structures documentation.
35//!
36//! ### Complete example
37//!
38//! Connect to the NATS server, publish messages and subscribe to receive messages.
39//!
40//! ```no_run
41//! use bytes::Bytes;
42//! use futures::StreamExt;
43//!
44//! #[tokio::main]
45//! async fn main() -> Result<(), async_nats::Error> {
46//!     // Connect to the NATS server
47//!     let client = async_nats::connect("demo.nats.io").await?;
48//!
49//!     // Subscribe to the "messages" subject
50//!     let mut subscriber = client.subscribe("messages").await?;
51//!
52//!     // Publish messages to the "messages" subject
53//!     for _ in 0..10 {
54//!         client.publish("messages", "data".into()).await?;
55//!     }
56//!
57//!     // Receive and process messages
58//!     while let Some(message) = subscriber.next().await {
59//!         println!("Received message {:?}", message);
60//!     }
61//!
62//!     Ok(())
63//! }
64//! ```
65//!
66//! ### Publish
67//!
68//! Connect to the NATS server and publish messages to a subject.
69//!
70//! ```
71//! # use bytes::Bytes;
72//! # use std::error::Error;
73//! # use std::time::Instant;
74//! # #[tokio::main]
75//! # async fn main() -> Result<(), async_nats::Error> {
76//! // Connect to the NATS server
77//! let client = async_nats::connect("demo.nats.io").await?;
78//!
79//! // Prepare the subject and data
80//! let subject = "foo";
81//! let data = Bytes::from("bar");
82//!
83//! // Publish messages to the NATS server
84//! for _ in 0..10 {
85//!     client.publish(subject, data.clone()).await?;
86//! }
87//!
88//! // Flush internal buffer before exiting to make sure all messages are sent
89//! client.flush().await?;
90//!
91//! #    Ok(())
92//! # }
93//! ```
94//!
95//! ### Subscribe
96//!
97//! Connect to the NATS server, subscribe to a subject and receive messages.
98//!
99//! ```no_run
100//! # use bytes::Bytes;
101//! # use futures::StreamExt;
102//! # use std::error::Error;
103//! # use std::time::Instant;
104//! # #[tokio::main]
105//! # async fn main() -> Result<(), async_nats::Error> {
106//! // Connect to the NATS server
107//! let client = async_nats::connect("demo.nats.io").await?;
108//!
109//! // Subscribe to the "foo" subject
110//! let mut subscriber = client.subscribe("foo").await.unwrap();
111//!
112//! // Receive and process messages
113//! while let Some(message) = subscriber.next().await {
114//!     println!("Received message {:?}", message);
115//! }
116//! #     Ok(())
117//! # }
118//! ```
119//!
120//! ### JetStream
121//!
122//! To access JetStream API, create a JetStream [jetstream::Context].
123//!
124//! ```no_run
125//! # #[tokio::main]
126//! # async fn main() -> Result<(), async_nats::Error> {
127//! // Connect to the NATS server
128//! let client = async_nats::connect("demo.nats.io").await?;
129//! // Create a JetStream context.
130//! let jetstream = async_nats::jetstream::new(client);
131//!
132//! // Publish JetStream messages, manage streams, consumers, etc.
133//! jetstream.publish("foo", "bar".into()).await?;
134//! # Ok(())
135//! # }
136//! ```
137//!
138//! ### Key-value Store
139//!
140//! Key-value [Store][jetstream::kv::Store] is accessed through [jetstream::Context].
141//!
142//! ```no_run
143//! # #[tokio::main]
144//! # async fn main() -> Result<(), async_nats::Error> {
145//! // Connect to the NATS server
146//! let client = async_nats::connect("demo.nats.io").await?;
147//! // Create a JetStream context.
148//! let jetstream = async_nats::jetstream::new(client);
149//! // Access an existing key-value.
150//! let kv = jetstream.get_key_value("store").await?;
151//! # Ok(())
152//! # }
153//! ```
154//! ### Object Store store
155//!
156//! Object [Store][jetstream::object_store::ObjectStore] is accessed through [jetstream::Context].
157//!
158//! ```no_run
159//! # #[tokio::main]
160//! # async fn main() -> Result<(), async_nats::Error> {
161//! // Connect to the NATS server
162//! let client = async_nats::connect("demo.nats.io").await?;
163//! // Create a JetStream context.
164//! let jetstream = async_nats::jetstream::new(client);
165//! // Access an existing key-value.
166//! let kv = jetstream.get_object_store("store").await?;
167//! # Ok(())
168//! # }
169//! ```
170//! ### Service API
171//!
172//! [Service API][service::Service] is accessible through [Client] after importing its trait.
173//!
174//! ```no_run
175//! # #[tokio::main]
176//! # async fn main() -> Result<(), async_nats::Error> {
177//! use async_nats::service::ServiceExt;
178//! // Connect to the NATS server
179//! let client = async_nats::connect("demo.nats.io").await?;
180//! let mut service = client
181//!     .service_builder()
182//!     .description("some service")
183//!     .stats_handler(|endpoint, stats| serde_json::json!({ "endpoint": endpoint }))
184//!     .start("products", "1.0.0")
185//!     .await?;
186//! # Ok(())
187//! # }
188//! ```
189
190#![deny(unreachable_pub)]
191#![deny(rustdoc::broken_intra_doc_links)]
192#![deny(rustdoc::private_intra_doc_links)]
193#![deny(rustdoc::invalid_codeblock_attributes)]
194#![deny(rustdoc::invalid_rust_codeblocks)]
195#![cfg_attr(docsrs, feature(doc_auto_cfg))]
196
197use thiserror::Error;
198
199use futures::stream::Stream;
200use tokio::io::AsyncWriteExt;
201use tokio::sync::oneshot;
202use tracing::{debug, error};
203
204use core::fmt;
205use std::collections::HashMap;
206use std::fmt::Display;
207use std::future::Future;
208use std::iter;
209use std::mem;
210use std::net::SocketAddr;
211use std::option;
212use std::pin::Pin;
213use std::slice;
214use std::str::{self, FromStr};
215use std::sync::atomic::AtomicUsize;
216use std::sync::Arc;
217use std::task::{Context, Poll};
218use tokio::io::ErrorKind;
219use tokio::time::{interval, Duration, Interval, MissedTickBehavior};
220use url::{Host, Url};
221
222use bytes::Bytes;
223use serde::{Deserialize, Serialize};
224use serde_repr::{Deserialize_repr, Serialize_repr};
225use tokio::io;
226use tokio::sync::mpsc;
227use tokio::task;
228
229pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
230
231const VERSION: &str = env!("CARGO_PKG_VERSION");
232const LANG: &str = "rust";
233const MAX_PENDING_PINGS: usize = 2;
234const MULTIPLEXER_SID: u64 = 0;
235
236/// A re-export of the `rustls` crate used in this crate,
237/// for use in cases where manual client configurations
238/// must be provided using `Options::tls_client_config`.
239pub use tokio_rustls::rustls;
240
241use connection::{Connection, State};
242use connector::{Connector, ConnectorOptions};
243pub use header::{HeaderMap, HeaderName, HeaderValue};
244pub use subject::Subject;
245
246mod auth;
247pub(crate) mod auth_utils;
248pub mod client;
249pub mod connection;
250mod connector;
251mod options;
252
253pub use auth::Auth;
254pub use client::{Client, PublishError, Request, RequestError, RequestErrorKind, SubscribeError};
255pub use options::{AuthError, ConnectOptions};
256
257mod crypto;
258pub mod error;
259pub mod header;
260pub mod jetstream;
261pub mod message;
262#[cfg(feature = "service")]
263pub mod service;
264pub mod status;
265pub mod subject;
266mod tls;
267
268pub use message::Message;
269pub use status::StatusCode;
270
271/// Information sent by the server back to this client
272/// during initial connection, and possibly again later.
273#[derive(Debug, Deserialize, Default, Clone, Eq, PartialEq)]
274pub struct ServerInfo {
275    /// The unique identifier of the NATS server.
276    #[serde(default)]
277    pub server_id: String,
278    /// Generated Server Name.
279    #[serde(default)]
280    pub server_name: String,
281    /// The host specified in the cluster parameter/options.
282    #[serde(default)]
283    pub host: String,
284    /// The port number specified in the cluster parameter/options.
285    #[serde(default)]
286    pub port: u16,
287    /// The version of the NATS server.
288    #[serde(default)]
289    pub version: String,
290    /// If this is set, then the server should try to authenticate upon
291    /// connect.
292    #[serde(default)]
293    pub auth_required: bool,
294    /// If this is set, then the server must authenticate using TLS.
295    #[serde(default)]
296    pub tls_required: bool,
297    /// Maximum payload size that the server will accept.
298    #[serde(default)]
299    pub max_payload: usize,
300    /// The protocol version in use.
301    #[serde(default)]
302    pub proto: i8,
303    /// The server-assigned client ID. This may change during reconnection.
304    #[serde(default)]
305    pub client_id: u64,
306    /// The version of golang the NATS server was built with.
307    #[serde(default)]
308    pub go: String,
309    /// The nonce used for nkeys.
310    #[serde(default)]
311    pub nonce: String,
312    /// A list of server urls that a client can connect to.
313    #[serde(default)]
314    pub connect_urls: Vec<String>,
315    /// The client IP as known by the server.
316    #[serde(default)]
317    pub client_ip: String,
318    /// Whether the server supports headers.
319    #[serde(default)]
320    pub headers: bool,
321    /// Whether server goes into lame duck mode.
322    #[serde(default, rename = "ldm")]
323    pub lame_duck_mode: bool,
324}
325
326#[derive(Clone, Debug, Eq, PartialEq)]
327pub(crate) enum ServerOp {
328    Ok,
329    Info(Box<ServerInfo>),
330    Ping,
331    Pong,
332    Error(ServerError),
333    Message {
334        sid: u64,
335        subject: Subject,
336        reply: Option<Subject>,
337        payload: Bytes,
338        headers: Option<HeaderMap>,
339        status: Option<StatusCode>,
340        description: Option<String>,
341        length: usize,
342    },
343}
344
345/// `Command` represents all commands that a [`Client`] can handle
346#[derive(Debug)]
347pub(crate) enum Command {
348    Publish {
349        subject: Subject,
350        payload: Bytes,
351        respond: Option<Subject>,
352        headers: Option<HeaderMap>,
353    },
354    Request {
355        subject: Subject,
356        payload: Bytes,
357        respond: Subject,
358        headers: Option<HeaderMap>,
359        sender: oneshot::Sender<Message>,
360    },
361    Subscribe {
362        sid: u64,
363        subject: Subject,
364        queue_group: Option<String>,
365        sender: mpsc::Sender<Message>,
366    },
367    Unsubscribe {
368        sid: u64,
369        max: Option<u64>,
370    },
371    Flush {
372        observer: oneshot::Sender<()>,
373    },
374    Reconnect,
375}
376
377/// `ClientOp` represents all actions of `Client`.
378#[derive(Debug)]
379pub(crate) enum ClientOp {
380    Publish {
381        subject: Subject,
382        payload: Bytes,
383        respond: Option<Subject>,
384        headers: Option<HeaderMap>,
385    },
386    Subscribe {
387        sid: u64,
388        subject: Subject,
389        queue_group: Option<String>,
390    },
391    Unsubscribe {
392        sid: u64,
393        max: Option<u64>,
394    },
395    Ping,
396    Pong,
397    Connect(ConnectInfo),
398}
399
400#[derive(Debug)]
401struct Subscription {
402    subject: Subject,
403    sender: mpsc::Sender<Message>,
404    queue_group: Option<String>,
405    delivered: u64,
406    max: Option<u64>,
407}
408
409#[derive(Debug)]
410struct Multiplexer {
411    subject: Subject,
412    prefix: Subject,
413    senders: HashMap<String, oneshot::Sender<Message>>,
414}
415
416/// A connection handler which facilitates communication from channels to a single shared connection.
417pub(crate) struct ConnectionHandler {
418    connection: Connection,
419    connector: Connector,
420    subscriptions: HashMap<u64, Subscription>,
421    multiplexer: Option<Multiplexer>,
422    pending_pings: usize,
423    info_sender: tokio::sync::watch::Sender<ServerInfo>,
424    ping_interval: Interval,
425    should_reconnect: bool,
426    flush_observers: Vec<oneshot::Sender<()>>,
427}
428
429impl ConnectionHandler {
430    pub(crate) fn new(
431        connection: Connection,
432        connector: Connector,
433        info_sender: tokio::sync::watch::Sender<ServerInfo>,
434        ping_period: Duration,
435    ) -> ConnectionHandler {
436        let mut ping_interval = interval(ping_period);
437        ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
438
439        ConnectionHandler {
440            connection,
441            connector,
442            subscriptions: HashMap::new(),
443            multiplexer: None,
444            pending_pings: 0,
445            info_sender,
446            ping_interval,
447            should_reconnect: false,
448            flush_observers: Vec::new(),
449        }
450    }
451
452    pub(crate) async fn process<'a>(&'a mut self, receiver: &'a mut mpsc::Receiver<Command>) {
453        struct ProcessFut<'a> {
454            handler: &'a mut ConnectionHandler,
455            receiver: &'a mut mpsc::Receiver<Command>,
456            recv_buf: &'a mut Vec<Command>,
457        }
458
459        enum ExitReason {
460            Disconnected(Option<io::Error>),
461            ReconnectRequested,
462            Closed,
463        }
464
465        impl<'a> ProcessFut<'a> {
466            const RECV_CHUNK_SIZE: usize = 16;
467
468            #[cold]
469            fn ping(&mut self) -> Poll<ExitReason> {
470                self.handler.pending_pings += 1;
471
472                if self.handler.pending_pings > MAX_PENDING_PINGS {
473                    debug!(
474                        "pending pings {}, max pings {}. disconnecting",
475                        self.handler.pending_pings, MAX_PENDING_PINGS
476                    );
477
478                    Poll::Ready(ExitReason::Disconnected(None))
479                } else {
480                    self.handler.connection.enqueue_write_op(&ClientOp::Ping);
481
482                    Poll::Pending
483                }
484            }
485        }
486
487        impl<'a> Future for ProcessFut<'a> {
488            type Output = ExitReason;
489
490            /// Drives the connection forward.
491            ///
492            /// Returns one of the following:
493            ///
494            /// * `Poll::Pending` means that the connection
495            ///   is blocked on all fronts or there are
496            ///   no commands to send or receive
497            /// * `Poll::Ready(ExitReason::Disconnected(_))` means
498            ///   that an I/O operation failed and the connection
499            ///   is considered dead.
500            /// * `Poll::Ready(ExitReason::Closed)` means that
501            ///   [`Self::receiver`] was closed, so there's nothing
502            ///   more for us to do than to exit the client.
503            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
504                // We need to be sure the waker is registered, therefore we need to poll until we
505                // get a `Poll::Pending`. With a sane interval delay, this means that the loop
506                // breaks at the second iteration.
507                while self.handler.ping_interval.poll_tick(cx).is_ready() {
508                    if let Poll::Ready(exit) = self.ping() {
509                        return Poll::Ready(exit);
510                    }
511                }
512
513                loop {
514                    match self.handler.connection.poll_read_op(cx) {
515                        Poll::Pending => break,
516                        Poll::Ready(Ok(Some(server_op))) => {
517                            self.handler.handle_server_op(server_op);
518                        }
519                        Poll::Ready(Ok(None)) => {
520                            return Poll::Ready(ExitReason::Disconnected(None))
521                        }
522                        Poll::Ready(Err(err)) => {
523                            return Poll::Ready(ExitReason::Disconnected(Some(err)))
524                        }
525                    }
526                }
527
528                // WARNING: after the following loop `handle_command`,
529                // or other functions which call `enqueue_write_op`,
530                // cannot be called anymore. Runtime wakeups won't
531                // trigger a call to `poll_write`
532
533                let mut made_progress = true;
534                loop {
535                    while !self.handler.connection.is_write_buf_full() {
536                        debug_assert!(self.recv_buf.is_empty());
537
538                        let Self {
539                            recv_buf,
540                            handler,
541                            receiver,
542                        } = &mut *self;
543                        match receiver.poll_recv_many(cx, recv_buf, Self::RECV_CHUNK_SIZE) {
544                            Poll::Pending => break,
545                            Poll::Ready(1..) => {
546                                made_progress = true;
547
548                                for cmd in recv_buf.drain(..) {
549                                    handler.handle_command(cmd);
550                                }
551                            }
552                            // TODO: replace `_` with `0` after bumping MSRV to 1.75
553                            Poll::Ready(_) => return Poll::Ready(ExitReason::Closed),
554                        }
555                    }
556
557                    // The first round will poll both from
558                    // the `receiver` and the writer, giving
559                    // them both a chance to make progress
560                    // and register `Waker`s.
561                    //
562                    // If writing is `Poll::Pending` we exit.
563                    //
564                    // If writing is completed we can repeat the entire
565                    // cycle as long as the `receiver` doesn't end-up
566                    // `Poll::Pending` immediately.
567                    if !mem::take(&mut made_progress) {
568                        break;
569                    }
570
571                    match self.handler.connection.poll_write(cx) {
572                        Poll::Pending => {
573                            // Write buffer couldn't be fully emptied
574                            break;
575                        }
576                        Poll::Ready(Ok(())) => {
577                            // Write buffer is empty
578                            continue;
579                        }
580                        Poll::Ready(Err(err)) => {
581                            return Poll::Ready(ExitReason::Disconnected(Some(err)))
582                        }
583                    }
584                }
585
586                if let (ShouldFlush::Yes, _) | (ShouldFlush::No, false) = (
587                    self.handler.connection.should_flush(),
588                    self.handler.flush_observers.is_empty(),
589                ) {
590                    match self.handler.connection.poll_flush(cx) {
591                        Poll::Pending => {}
592                        Poll::Ready(Ok(())) => {
593                            for observer in self.handler.flush_observers.drain(..) {
594                                let _ = observer.send(());
595                            }
596                        }
597                        Poll::Ready(Err(err)) => {
598                            return Poll::Ready(ExitReason::Disconnected(Some(err)))
599                        }
600                    }
601                }
602
603                if mem::take(&mut self.handler.should_reconnect) {
604                    return Poll::Ready(ExitReason::ReconnectRequested);
605                }
606
607                Poll::Pending
608            }
609        }
610
611        let mut recv_buf = Vec::with_capacity(ProcessFut::RECV_CHUNK_SIZE);
612        loop {
613            let process = ProcessFut {
614                handler: self,
615                receiver,
616                recv_buf: &mut recv_buf,
617            };
618            match process.await {
619                ExitReason::Disconnected(err) => {
620                    debug!(?err, "disconnected");
621                    if self.handle_disconnect().await.is_err() {
622                        break;
623                    };
624                    debug!("reconnected");
625                }
626                ExitReason::Closed => break,
627                ExitReason::ReconnectRequested => {
628                    debug!("reconnect requested");
629                    // Should be ok to ingore error, as that means we are not in connected state.
630                    self.connection.stream.shutdown().await.ok();
631                    if self.handle_disconnect().await.is_err() {
632                        break;
633                    };
634                }
635            }
636        }
637    }
638
639    fn handle_server_op(&mut self, server_op: ServerOp) {
640        self.ping_interval.reset();
641
642        match server_op {
643            ServerOp::Ping => {
644                self.connection.enqueue_write_op(&ClientOp::Pong);
645            }
646            ServerOp::Pong => {
647                debug!("received PONG");
648                self.pending_pings = self.pending_pings.saturating_sub(1);
649            }
650            ServerOp::Error(error) => {
651                self.connector
652                    .events_tx
653                    .try_send(Event::ServerError(error))
654                    .ok();
655            }
656            ServerOp::Message {
657                sid,
658                subject,
659                reply,
660                payload,
661                headers,
662                status,
663                description,
664                length,
665            } => {
666                if let Some(subscription) = self.subscriptions.get_mut(&sid) {
667                    let message: Message = Message {
668                        subject,
669                        reply,
670                        payload,
671                        headers,
672                        status,
673                        description,
674                        length,
675                    };
676
677                    // if the channel for subscription was dropped, remove the
678                    // subscription from the map and unsubscribe.
679                    match subscription.sender.try_send(message) {
680                        Ok(_) => {
681                            subscription.delivered += 1;
682                            // if this `Subscription` has set `max` value, check if it
683                            // was reached. If yes, remove the `Subscription` and in
684                            // the result, `drop` the `sender` channel.
685                            if let Some(max) = subscription.max {
686                                if subscription.delivered.ge(&max) {
687                                    self.subscriptions.remove(&sid);
688                                }
689                            }
690                        }
691                        Err(mpsc::error::TrySendError::Full(_)) => {
692                            self.connector
693                                .events_tx
694                                .try_send(Event::SlowConsumer(sid))
695                                .ok();
696                        }
697                        Err(mpsc::error::TrySendError::Closed(_)) => {
698                            self.subscriptions.remove(&sid);
699                            self.connection
700                                .enqueue_write_op(&ClientOp::Unsubscribe { sid, max: None });
701                        }
702                    }
703                } else if sid == MULTIPLEXER_SID {
704                    if let Some(multiplexer) = self.multiplexer.as_mut() {
705                        let maybe_token =
706                            subject.strip_prefix(multiplexer.prefix.as_ref()).to_owned();
707
708                        if let Some(token) = maybe_token {
709                            if let Some(sender) = multiplexer.senders.remove(token) {
710                                let message = Message {
711                                    subject,
712                                    reply,
713                                    payload,
714                                    headers,
715                                    status,
716                                    description,
717                                    length,
718                                };
719
720                                let _ = sender.send(message);
721                            }
722                        }
723                    }
724                }
725            }
726            // TODO: we should probably update advertised server list here too.
727            ServerOp::Info(info) => {
728                if info.lame_duck_mode {
729                    self.connector.events_tx.try_send(Event::LameDuckMode).ok();
730                }
731            }
732
733            _ => {
734                // TODO: don't ignore.
735            }
736        }
737    }
738
739    fn handle_command(&mut self, command: Command) {
740        self.ping_interval.reset();
741
742        match command {
743            Command::Unsubscribe { sid, max } => {
744                if let Some(subscription) = self.subscriptions.get_mut(&sid) {
745                    subscription.max = max;
746                    match subscription.max {
747                        Some(n) => {
748                            if subscription.delivered >= n {
749                                self.subscriptions.remove(&sid);
750                            }
751                        }
752                        None => {
753                            self.subscriptions.remove(&sid);
754                        }
755                    }
756
757                    self.connection
758                        .enqueue_write_op(&ClientOp::Unsubscribe { sid, max });
759                }
760            }
761            Command::Flush { observer } => {
762                self.flush_observers.push(observer);
763            }
764            Command::Subscribe {
765                sid,
766                subject,
767                queue_group,
768                sender,
769            } => {
770                let subscription = Subscription {
771                    sender,
772                    delivered: 0,
773                    max: None,
774                    subject: subject.to_owned(),
775                    queue_group: queue_group.to_owned(),
776                };
777
778                self.subscriptions.insert(sid, subscription);
779
780                self.connection.enqueue_write_op(&ClientOp::Subscribe {
781                    sid,
782                    subject,
783                    queue_group,
784                });
785            }
786            Command::Request {
787                subject,
788                payload,
789                respond,
790                headers,
791                sender,
792            } => {
793                let (prefix, token) = respond.rsplit_once('.').expect("malformed request subject");
794
795                let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() {
796                    multiplexer
797                } else {
798                    let prefix = Subject::from(format!("{}.{}.", prefix, nuid::next()));
799                    let subject = Subject::from(format!("{}*", prefix));
800
801                    self.connection.enqueue_write_op(&ClientOp::Subscribe {
802                        sid: MULTIPLEXER_SID,
803                        subject: subject.clone(),
804                        queue_group: None,
805                    });
806
807                    self.multiplexer.insert(Multiplexer {
808                        subject,
809                        prefix,
810                        senders: HashMap::new(),
811                    })
812                };
813
814                multiplexer.senders.insert(token.to_owned(), sender);
815
816                let pub_op = ClientOp::Publish {
817                    subject,
818                    payload,
819                    respond: Some(format!("{}{}", multiplexer.prefix, token).into()),
820                    headers,
821                };
822
823                self.connection.enqueue_write_op(&pub_op);
824            }
825
826            Command::Publish {
827                subject,
828                payload,
829                respond,
830                headers,
831            } => {
832                self.connection.enqueue_write_op(&ClientOp::Publish {
833                    subject,
834                    payload,
835                    respond,
836                    headers,
837                });
838            }
839
840            Command::Reconnect => {
841                self.should_reconnect = true;
842            }
843        }
844    }
845
846    async fn handle_disconnect(&mut self) -> Result<(), ConnectError> {
847        self.pending_pings = 0;
848        self.connector.events_tx.try_send(Event::Disconnected).ok();
849        self.connector.state_tx.send(State::Disconnected).ok();
850
851        self.handle_reconnect().await
852    }
853
854    async fn handle_reconnect(&mut self) -> Result<(), ConnectError> {
855        let (info, connection) = self.connector.connect().await?;
856        self.connection = connection;
857        let _ = self.info_sender.send(info);
858
859        self.subscriptions
860            .retain(|_, subscription| !subscription.sender.is_closed());
861
862        for (sid, subscription) in &self.subscriptions {
863            self.connection.enqueue_write_op(&ClientOp::Subscribe {
864                sid: *sid,
865                subject: subscription.subject.to_owned(),
866                queue_group: subscription.queue_group.to_owned(),
867            });
868        }
869
870        if let Some(multiplexer) = &self.multiplexer {
871            self.connection.enqueue_write_op(&ClientOp::Subscribe {
872                sid: MULTIPLEXER_SID,
873                subject: multiplexer.subject.to_owned(),
874                queue_group: None,
875            });
876        }
877        Ok(())
878    }
879}
880
881/// Connects to NATS with specified options.
882///
883/// It is generally advised to use [ConnectOptions] instead, as it provides a builder for whole
884/// configuration.
885///
886/// # Examples
887/// ```
888/// # #[tokio::main]
889/// # async fn main() ->  Result<(), async_nats::Error> {
890/// let mut nc =
891///     async_nats::connect_with_options("demo.nats.io", async_nats::ConnectOptions::new()).await?;
892/// nc.publish("test", "data".into()).await?;
893/// # Ok(())
894/// # }
895/// ```
896pub async fn connect_with_options<A: ToServerAddrs>(
897    addrs: A,
898    options: ConnectOptions,
899) -> Result<Client, ConnectError> {
900    let ping_period = options.ping_interval;
901
902    let (events_tx, mut events_rx) = mpsc::channel(128);
903    let (state_tx, state_rx) = tokio::sync::watch::channel(State::Pending);
904    // We're setting it to the default server payload size.
905    let max_payload = Arc::new(AtomicUsize::new(1024 * 1024));
906
907    let mut connector = Connector::new(
908        addrs,
909        ConnectorOptions {
910            tls_required: options.tls_required,
911            certificates: options.certificates,
912            client_key: options.client_key,
913            client_cert: options.client_cert,
914            tls_client_config: options.tls_client_config,
915            tls_first: options.tls_first,
916            auth: options.auth,
917            no_echo: options.no_echo,
918            connection_timeout: options.connection_timeout,
919            name: options.name,
920            ignore_discovered_servers: options.ignore_discovered_servers,
921            retain_servers_order: options.retain_servers_order,
922            read_buffer_capacity: options.read_buffer_capacity,
923            reconnect_delay_callback: options.reconnect_delay_callback,
924            auth_callback: options.auth_callback,
925            max_reconnects: options.max_reconnects,
926        },
927        events_tx,
928        state_tx,
929        max_payload.clone(),
930    )
931    .map_err(|err| ConnectError::with_source(ConnectErrorKind::ServerParse, err))?;
932
933    let mut info: ServerInfo = Default::default();
934    let mut connection = None;
935    if !options.retry_on_initial_connect {
936        debug!("retry on initial connect failure is disabled");
937        let (info_ok, connection_ok) = connector.try_connect().await?;
938        connection = Some(connection_ok);
939        info = info_ok;
940    }
941
942    let (info_sender, info_watcher) = tokio::sync::watch::channel(info.clone());
943    let (sender, mut receiver) = mpsc::channel(options.sender_capacity);
944
945    let client = Client::new(
946        info_watcher,
947        state_rx,
948        sender,
949        options.subscription_capacity,
950        options.inbox_prefix,
951        options.request_timeout,
952        max_payload,
953    );
954
955    task::spawn(async move {
956        while let Some(event) = events_rx.recv().await {
957            tracing::info!("event: {}", event);
958            if let Some(event_callback) = &options.event_callback {
959                event_callback.call(event).await;
960            }
961        }
962    });
963
964    task::spawn(async move {
965        if connection.is_none() && options.retry_on_initial_connect {
966            let (info, connection_ok) = match connector.connect().await {
967                Ok((info, connection)) => (info, connection),
968                Err(err) => {
969                    error!("connection closed: {}", err);
970                    return;
971                }
972            };
973            info_sender.send(info).ok();
974            connection = Some(connection_ok);
975        }
976        let connection = connection.unwrap();
977        let mut connection_handler =
978            ConnectionHandler::new(connection, connector, info_sender, ping_period);
979        connection_handler.process(&mut receiver).await
980    });
981
982    Ok(client)
983}
984
985#[derive(Debug, Clone, PartialEq, Eq)]
986pub enum Event {
987    Connected,
988    Disconnected,
989    LameDuckMode,
990    SlowConsumer(u64),
991    ServerError(ServerError),
992    ClientError(ClientError),
993}
994
995impl fmt::Display for Event {
996    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
997        match self {
998            Event::Connected => write!(f, "connected"),
999            Event::Disconnected => write!(f, "disconnected"),
1000            Event::LameDuckMode => write!(f, "lame duck mode detected"),
1001            Event::SlowConsumer(sid) => write!(f, "slow consumers for subscription {sid}"),
1002            Event::ServerError(err) => write!(f, "server error: {err}"),
1003            Event::ClientError(err) => write!(f, "client error: {err}"),
1004        }
1005    }
1006}
1007
1008/// Connects to NATS with default config.
1009///
1010/// Returns cloneable [Client].
1011///
1012/// To have customized NATS connection, check [ConnectOptions].
1013///
1014/// # Examples
1015///
1016/// ## Single URL
1017/// ```
1018/// # #[tokio::main]
1019/// # async fn main() ->  Result<(), async_nats::Error> {
1020/// let mut nc = async_nats::connect("demo.nats.io").await?;
1021/// nc.publish("test", "data".into()).await?;
1022/// # Ok(())
1023/// # }
1024/// ```
1025///
1026/// ## Connect with [Vec] of [ServerAddr].
1027/// ```no_run
1028/// #[tokio::main]
1029/// # async fn main() -> Result<(), async_nats::Error> {
1030/// use async_nats::ServerAddr;
1031/// let client = async_nats::connect(vec![
1032///     "demo.nats.io".parse::<ServerAddr>()?,
1033///     "other.nats.io".parse::<ServerAddr>()?,
1034/// ])
1035/// .await
1036/// .unwrap();
1037/// # Ok(())
1038/// # }
1039/// ```
1040///
1041/// ## with [Vec], but parse URLs inside [crate::connect()]
1042/// ```no_run
1043/// #[tokio::main]
1044/// # async fn main() -> Result<(), async_nats::Error> {
1045/// use async_nats::ServerAddr;
1046/// let servers = vec!["demo.nats.io", "other.nats.io"];
1047/// let client = async_nats::connect(
1048///     servers
1049///         .iter()
1050///         .map(|url| url.parse())
1051///         .collect::<Result<Vec<ServerAddr>, _>>()?,
1052/// )
1053/// .await?;
1054/// # Ok(())
1055/// # }
1056/// ```
1057///
1058///
1059/// ## with slice.
1060/// ```no_run
1061/// #[tokio::main]
1062/// # async fn main() -> Result<(), async_nats::Error> {
1063/// use async_nats::ServerAddr;
1064/// let client = async_nats::connect(
1065///    [
1066///        "demo.nats.io".parse::<ServerAddr>()?,
1067///        "other.nats.io".parse::<ServerAddr>()?,
1068///    ]
1069///    .as_slice(),
1070/// )
1071/// .await?;
1072/// # Ok(())
1073/// # }
1074pub async fn connect<A: ToServerAddrs>(addrs: A) -> Result<Client, ConnectError> {
1075    connect_with_options(addrs, ConnectOptions::default()).await
1076}
1077
1078#[derive(Debug, Clone, Copy, PartialEq)]
1079pub enum ConnectErrorKind {
1080    /// Parsing the passed server address failed.
1081    ServerParse,
1082    /// DNS related issues.
1083    Dns,
1084    /// Failed authentication process, signing nonce, etc.
1085    Authentication,
1086    /// Server returned authorization violation error.
1087    AuthorizationViolation,
1088    /// Connect timed out.
1089    TimedOut,
1090    /// Erroneous TLS setup.
1091    Tls,
1092    /// Other IO error.
1093    Io,
1094    /// Reached the maximum number of reconnects.
1095    MaxReconnects,
1096}
1097
1098impl Display for ConnectErrorKind {
1099    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1100        match self {
1101            Self::ServerParse => write!(f, "failed to parse server or server list"),
1102            Self::Dns => write!(f, "DNS error"),
1103            Self::Authentication => write!(f, "failed signing nonce"),
1104            Self::AuthorizationViolation => write!(f, "authorization violation"),
1105            Self::TimedOut => write!(f, "timed out"),
1106            Self::Tls => write!(f, "TLS error"),
1107            Self::Io => write!(f, "IO error"),
1108            Self::MaxReconnects => write!(f, "reached maximum number of reconnects"),
1109        }
1110    }
1111}
1112
1113/// Returned when initial connection fails.
1114/// To be enumerate over the variants, call [ConnectError::kind].
1115pub type ConnectError = error::Error<ConnectErrorKind>;
1116
1117impl From<io::Error> for ConnectError {
1118    fn from(err: io::Error) -> Self {
1119        ConnectError::with_source(ConnectErrorKind::Io, err)
1120    }
1121}
1122
1123/// Retrieves messages from given `subscription` created by [Client::subscribe].
1124///
1125/// Implements [futures::stream::Stream] for ergonomic async message processing.
1126///
1127/// # Examples
1128/// ```
1129/// # #[tokio::main]
1130/// # async fn main() ->  Result<(), async_nats::Error> {
1131/// let mut nc = async_nats::connect("demo.nats.io").await?;
1132/// # nc.publish("test", "data".into()).await?;
1133/// # Ok(())
1134/// # }
1135/// ```
1136#[derive(Debug)]
1137pub struct Subscriber {
1138    sid: u64,
1139    receiver: mpsc::Receiver<Message>,
1140    sender: mpsc::Sender<Command>,
1141}
1142
1143impl Subscriber {
1144    fn new(
1145        sid: u64,
1146        sender: mpsc::Sender<Command>,
1147        receiver: mpsc::Receiver<Message>,
1148    ) -> Subscriber {
1149        Subscriber {
1150            sid,
1151            sender,
1152            receiver,
1153        }
1154    }
1155
1156    /// Unsubscribes from subscription, draining all remaining messages.
1157    ///
1158    /// # Examples
1159    /// ```
1160    /// # #[tokio::main]
1161    /// # async fn main() -> Result<(), async_nats::Error> {
1162    /// let client = async_nats::connect("demo.nats.io").await?;
1163    ///
1164    /// let mut subscriber = client.subscribe("foo").await?;
1165    ///
1166    /// subscriber.unsubscribe().await?;
1167    /// # Ok(())
1168    /// # }
1169    /// ```
1170    pub async fn unsubscribe(&mut self) -> Result<(), UnsubscribeError> {
1171        self.sender
1172            .send(Command::Unsubscribe {
1173                sid: self.sid,
1174                max: None,
1175            })
1176            .await?;
1177        self.receiver.close();
1178        Ok(())
1179    }
1180
1181    /// Unsubscribes from subscription after reaching given number of messages.
1182    /// This is the total number of messages received by this subscription in it's whole
1183    /// lifespan. If it already reached or surpassed the passed value, it will immediately stop.
1184    ///
1185    /// # Examples
1186    /// ```
1187    /// # use futures::StreamExt;
1188    /// # #[tokio::main]
1189    /// # async fn main() -> Result<(), async_nats::Error> {
1190    /// let client = async_nats::connect("demo.nats.io").await?;
1191    ///
1192    /// let mut subscriber = client.subscribe("test").await?;
1193    /// subscriber.unsubscribe_after(3).await?;
1194    ///
1195    /// for _ in 0..3 {
1196    ///     client.publish("test", "data".into()).await?;
1197    /// }
1198    ///
1199    /// while let Some(message) = subscriber.next().await {
1200    ///     println!("message received: {:?}", message);
1201    /// }
1202    /// println!("no more messages, unsubscribed");
1203    /// # Ok(())
1204    /// # }
1205    /// ```
1206    pub async fn unsubscribe_after(&mut self, unsub_after: u64) -> Result<(), UnsubscribeError> {
1207        self.sender
1208            .send(Command::Unsubscribe {
1209                sid: self.sid,
1210                max: Some(unsub_after),
1211            })
1212            .await?;
1213        Ok(())
1214    }
1215}
1216
1217#[derive(Error, Debug, PartialEq)]
1218#[error("failed to send unsubscribe")]
1219pub struct UnsubscribeError(String);
1220
1221impl From<tokio::sync::mpsc::error::SendError<Command>> for UnsubscribeError {
1222    fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
1223        UnsubscribeError(err.to_string())
1224    }
1225}
1226
1227impl Drop for Subscriber {
1228    fn drop(&mut self) {
1229        self.receiver.close();
1230        tokio::spawn({
1231            let sender = self.sender.clone();
1232            let sid = self.sid;
1233            async move {
1234                sender
1235                    .send(Command::Unsubscribe { sid, max: None })
1236                    .await
1237                    .ok();
1238            }
1239        });
1240    }
1241}
1242
1243impl Stream for Subscriber {
1244    type Item = Message;
1245
1246    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1247        self.receiver.poll_recv(cx)
1248    }
1249}
1250
1251#[derive(Clone, Debug, Eq, PartialEq)]
1252pub enum CallbackError {
1253    Client(ClientError),
1254    Server(ServerError),
1255}
1256impl std::fmt::Display for CallbackError {
1257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1258        match self {
1259            Self::Client(error) => write!(f, "{error}"),
1260            Self::Server(error) => write!(f, "{error}"),
1261        }
1262    }
1263}
1264
1265impl From<ServerError> for CallbackError {
1266    fn from(server_error: ServerError) -> Self {
1267        CallbackError::Server(server_error)
1268    }
1269}
1270
1271impl From<ClientError> for CallbackError {
1272    fn from(client_error: ClientError) -> Self {
1273        CallbackError::Client(client_error)
1274    }
1275}
1276
1277#[derive(Clone, Debug, Eq, PartialEq, Error)]
1278pub enum ServerError {
1279    AuthorizationViolation,
1280    SlowConsumer(u64),
1281    Other(String),
1282}
1283
1284#[derive(Clone, Debug, Eq, PartialEq)]
1285pub enum ClientError {
1286    Other(String),
1287    MaxReconnects,
1288}
1289impl std::fmt::Display for ClientError {
1290    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1291        match self {
1292            Self::Other(error) => write!(f, "nats: {error}"),
1293            Self::MaxReconnects => write!(f, "nats: max reconnects reached"),
1294        }
1295    }
1296}
1297
1298impl ServerError {
1299    fn new(error: String) -> ServerError {
1300        match error.to_lowercase().as_str() {
1301            "authorization violation" => ServerError::AuthorizationViolation,
1302            // error messages can contain case-sensitive values which should be preserved
1303            _ => ServerError::Other(error),
1304        }
1305    }
1306}
1307
1308impl std::fmt::Display for ServerError {
1309    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1310        match self {
1311            Self::AuthorizationViolation => write!(f, "nats: authorization violation"),
1312            Self::SlowConsumer(sid) => write!(f, "nats: subscription {sid} is a slow consumer"),
1313            Self::Other(error) => write!(f, "nats: {error}"),
1314        }
1315    }
1316}
1317
1318/// Info to construct a CONNECT message.
1319#[derive(Clone, Debug, Serialize)]
1320pub struct ConnectInfo {
1321    /// Turns on +OK protocol acknowledgments.
1322    pub verbose: bool,
1323
1324    /// Turns on additional strict format checking, e.g. for properly formed
1325    /// subjects.
1326    pub pedantic: bool,
1327
1328    /// User's JWT.
1329    #[serde(rename = "jwt")]
1330    pub user_jwt: Option<String>,
1331
1332    /// Public nkey.
1333    pub nkey: Option<String>,
1334
1335    /// Signed nonce, encoded to Base64URL.
1336    #[serde(rename = "sig")]
1337    pub signature: Option<String>,
1338
1339    /// Optional client name.
1340    pub name: Option<String>,
1341
1342    /// If set to `true`, the server (version 1.2.0+) will not send originating
1343    /// messages from this connection to its own subscriptions. Clients should
1344    /// set this to `true` only for server supporting this feature, which is
1345    /// when proto in the INFO protocol is set to at least 1.
1346    pub echo: bool,
1347
1348    /// The implementation language of the client.
1349    pub lang: String,
1350
1351    /// The version of the client.
1352    pub version: String,
1353
1354    /// Sending 0 (or absent) indicates client supports original protocol.
1355    /// Sending 1 indicates that the client supports dynamic reconfiguration
1356    /// of cluster topology changes by asynchronously receiving INFO messages
1357    /// with known servers it can reconnect to.
1358    pub protocol: Protocol,
1359
1360    /// Indicates whether the client requires an SSL connection.
1361    pub tls_required: bool,
1362
1363    /// Connection username (if `auth_required` is set)
1364    pub user: Option<String>,
1365
1366    /// Connection password (if auth_required is set)
1367    pub pass: Option<String>,
1368
1369    /// Client authorization token (if auth_required is set)
1370    pub auth_token: Option<String>,
1371
1372    /// Whether the client supports the usage of headers.
1373    pub headers: bool,
1374
1375    /// Whether the client supports no_responders.
1376    pub no_responders: bool,
1377}
1378
1379/// Protocol version used by the client.
1380#[derive(Serialize_repr, Deserialize_repr, PartialEq, Eq, Debug, Clone, Copy)]
1381#[repr(u8)]
1382pub enum Protocol {
1383    /// Original protocol.
1384    Original = 0,
1385    /// Protocol with dynamic reconfiguration of cluster and lame duck mode functionality.
1386    Dynamic = 1,
1387}
1388
1389/// Address of a NATS server.
1390#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1391pub struct ServerAddr(Url);
1392
1393impl FromStr for ServerAddr {
1394    type Err = io::Error;
1395
1396    /// Parse an address of a NATS server.
1397    ///
1398    /// If not stated explicitly the `nats://` schema and port `4222` is assumed.
1399    fn from_str(input: &str) -> Result<Self, Self::Err> {
1400        let url: Url = if input.contains("://") {
1401            input.parse()
1402        } else {
1403            format!("nats://{input}").parse()
1404        }
1405        .map_err(|e| {
1406            io::Error::new(
1407                ErrorKind::InvalidInput,
1408                format!("NATS server URL is invalid: {e}"),
1409            )
1410        })?;
1411
1412        Self::from_url(url)
1413    }
1414}
1415
1416impl ServerAddr {
1417    /// Check if the URL is a valid NATS server address.
1418    pub fn from_url(url: Url) -> io::Result<Self> {
1419        if url.scheme() != "nats" && url.scheme() != "tls" {
1420            return Err(std::io::Error::new(
1421                ErrorKind::InvalidInput,
1422                format!("invalid scheme for NATS server URL: {}", url.scheme()),
1423            ));
1424        }
1425
1426        Ok(Self(url))
1427    }
1428
1429    /// Turn the server address into a standard URL.
1430    pub fn into_inner(self) -> Url {
1431        self.0
1432    }
1433
1434    /// Returns if tls is required by the client for this server.
1435    pub fn tls_required(&self) -> bool {
1436        self.0.scheme() == "tls"
1437    }
1438
1439    /// Returns if the server url had embedded username and password.
1440    pub fn has_user_pass(&self) -> bool {
1441        self.0.username() != ""
1442    }
1443
1444    /// Returns the host.
1445    pub fn host(&self) -> &str {
1446        match self.0.host() {
1447            Some(Host::Domain(_)) | Some(Host::Ipv4 { .. }) => self.0.host_str().unwrap(),
1448            // `host_str()` for Ipv6 includes the []s
1449            Some(Host::Ipv6 { .. }) => {
1450                let host = self.0.host_str().unwrap();
1451                &host[1..host.len() - 1]
1452            }
1453            None => "",
1454        }
1455    }
1456
1457    /// Returns the port.
1458    pub fn port(&self) -> u16 {
1459        self.0.port().unwrap_or(4222)
1460    }
1461
1462    /// Returns the optional username in the url.
1463    pub fn username(&self) -> Option<&str> {
1464        let user = self.0.username();
1465        if user.is_empty() {
1466            None
1467        } else {
1468            Some(user)
1469        }
1470    }
1471
1472    /// Returns the optional password in the url.
1473    pub fn password(&self) -> Option<&str> {
1474        self.0.password()
1475    }
1476
1477    /// Return the sockets from resolving the server address.
1478    pub async fn socket_addrs(&self) -> io::Result<impl Iterator<Item = SocketAddr> + '_> {
1479        tokio::net::lookup_host((self.host(), self.port())).await
1480    }
1481}
1482
1483/// Capability to convert into a list of NATS server addresses.
1484///
1485/// There are several implementations ensuring the easy passing of one or more server addresses to
1486/// functions like [`crate::connect()`].
1487pub trait ToServerAddrs {
1488    /// Returned iterator over socket addresses which this type may correspond
1489    /// to.
1490    type Iter: Iterator<Item = ServerAddr>;
1491
1492    fn to_server_addrs(&self) -> io::Result<Self::Iter>;
1493}
1494
1495impl ToServerAddrs for ServerAddr {
1496    type Iter = option::IntoIter<ServerAddr>;
1497    fn to_server_addrs(&self) -> io::Result<Self::Iter> {
1498        Ok(Some(self.clone()).into_iter())
1499    }
1500}
1501
1502impl ToServerAddrs for str {
1503    type Iter = option::IntoIter<ServerAddr>;
1504    fn to_server_addrs(&self) -> io::Result<Self::Iter> {
1505        self.parse::<ServerAddr>()
1506            .map(|addr| Some(addr).into_iter())
1507    }
1508}
1509
1510impl ToServerAddrs for String {
1511    type Iter = option::IntoIter<ServerAddr>;
1512    fn to_server_addrs(&self) -> io::Result<Self::Iter> {
1513        (**self).to_server_addrs()
1514    }
1515}
1516
1517impl<T: AsRef<str>> ToServerAddrs for [T] {
1518    type Iter = std::vec::IntoIter<ServerAddr>;
1519    fn to_server_addrs(&self) -> io::Result<Self::Iter> {
1520        self.iter()
1521            .map(AsRef::as_ref)
1522            .map(str::parse)
1523            .collect::<io::Result<_>>()
1524            .map(Vec::into_iter)
1525    }
1526}
1527
1528impl<T: AsRef<str>> ToServerAddrs for Vec<T> {
1529    type Iter = std::vec::IntoIter<ServerAddr>;
1530    fn to_server_addrs(&self) -> io::Result<Self::Iter> {
1531        self.as_slice().to_server_addrs()
1532    }
1533}
1534
1535impl<'a> ToServerAddrs for &'a [ServerAddr] {
1536    type Iter = iter::Cloned<slice::Iter<'a, ServerAddr>>;
1537
1538    fn to_server_addrs(&self) -> io::Result<Self::Iter> {
1539        Ok(self.iter().cloned())
1540    }
1541}
1542
1543impl ToServerAddrs for Vec<ServerAddr> {
1544    type Iter = std::vec::IntoIter<ServerAddr>;
1545
1546    fn to_server_addrs(&self) -> io::Result<Self::Iter> {
1547        Ok(self.clone().into_iter())
1548    }
1549}
1550
1551impl<T: ToServerAddrs + ?Sized> ToServerAddrs for &T {
1552    type Iter = T::Iter;
1553    fn to_server_addrs(&self) -> io::Result<Self::Iter> {
1554        (**self).to_server_addrs()
1555    }
1556}
1557
1558pub(crate) fn is_valid_subject<T: AsRef<str>>(subject: T) -> bool {
1559    !subject.as_ref().contains([' ', '.', '\r', '\n'])
1560}
1561
1562macro_rules! from_with_timeout {
1563    ($t:ty, $k:ty, $origin: ty, $origin_kind: ty) => {
1564        impl From<$origin> for $t {
1565            fn from(err: $origin) -> Self {
1566                match err.kind() {
1567                    <$origin_kind>::TimedOut => Self::new(<$k>::TimedOut),
1568                    _ => Self::with_source(<$k>::Other, err),
1569                }
1570            }
1571        }
1572    };
1573}
1574pub(crate) use from_with_timeout;
1575
1576use crate::connection::ShouldFlush;
1577
1578#[cfg(test)]
1579mod tests {
1580    use super::*;
1581
1582    #[test]
1583    fn server_address_ipv6() {
1584        let address = ServerAddr::from_str("nats://[::]").unwrap();
1585        assert_eq!(address.host(), "::")
1586    }
1587
1588    #[test]
1589    fn server_address_ipv4() {
1590        let address = ServerAddr::from_str("nats://127.0.0.1").unwrap();
1591        assert_eq!(address.host(), "127.0.0.1")
1592    }
1593
1594    #[test]
1595    fn server_address_domain() {
1596        let address = ServerAddr::from_str("nats://example.com").unwrap();
1597        assert_eq!(address.host(), "example.com")
1598    }
1599
1600    #[test]
1601    fn to_server_addrs_vec_str() {
1602        let vec = vec!["nats://127.0.0.1", "nats://[::]"];
1603        let mut addrs_iter = vec.to_server_addrs().unwrap();
1604        assert_eq!(addrs_iter.next().unwrap().host(), "127.0.0.1");
1605        assert_eq!(addrs_iter.next().unwrap().host(), "::");
1606        assert_eq!(addrs_iter.next(), None);
1607    }
1608
1609    #[test]
1610    fn to_server_addrs_arr_str() {
1611        let arr = ["nats://127.0.0.1", "nats://[::]"];
1612        let mut addrs_iter = arr.to_server_addrs().unwrap();
1613        assert_eq!(addrs_iter.next().unwrap().host(), "127.0.0.1");
1614        assert_eq!(addrs_iter.next().unwrap().host(), "::");
1615        assert_eq!(addrs_iter.next(), None);
1616    }
1617
1618    #[test]
1619    fn to_server_addrs_vec_string() {
1620        let vec = vec!["nats://127.0.0.1".to_string(), "nats://[::]".to_string()];
1621        let mut addrs_iter = vec.to_server_addrs().unwrap();
1622        assert_eq!(addrs_iter.next().unwrap().host(), "127.0.0.1");
1623        assert_eq!(addrs_iter.next().unwrap().host(), "::");
1624        assert_eq!(addrs_iter.next(), None);
1625    }
1626
1627    #[test]
1628    fn to_server_addrs_arr_string() {
1629        let arr = ["nats://127.0.0.1".to_string(), "nats://[::]".to_string()];
1630        let mut addrs_iter = arr.to_server_addrs().unwrap();
1631        assert_eq!(addrs_iter.next().unwrap().host(), "127.0.0.1");
1632        assert_eq!(addrs_iter.next().unwrap().host(), "::");
1633        assert_eq!(addrs_iter.next(), None);
1634    }
1635}