rants/
types.rs

1mod address;
2pub mod error;
3mod parser;
4mod refs;
5mod state;
6#[cfg(test)]
7mod tests;
8#[cfg(feature = "tls")]
9pub mod tls;
10
11use log::trace;
12use serde::{Deserialize, Serialize};
13use std::{
14    convert::Infallible,
15    fmt,
16    str::FromStr,
17    sync::atomic::{AtomicU64, Ordering},
18};
19use tokio::sync::mpsc::Sender as MpscSender;
20
21use crate::util;
22
23pub use self::{
24    address::Address,
25    refs::{ClientRef, ClientRefMut, StableMutexGuard},
26    state::{ClientState, ConnectionState, StateTransition, StateTransitionResult},
27};
28
29///////////////////////////////////////////////////////////////////////////////////////////////////
30
31/// The [`INFO`](https://nats-io.github.io/docs/nats_protocol/nats-protocol.html#info) message sent by the server
32#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
33pub struct Info {
34    pub(crate) server_id: String,
35    pub(crate) version: String,
36    pub(crate) go: String,
37    pub(crate) host: String,
38    pub(crate) port: u16,
39    pub(crate) max_payload: usize,
40    #[serde(default)]
41    pub(crate) proto: i32,
42    pub(crate) client_id: Option<u64>,
43    #[serde(default)]
44    pub(crate) auth_required: bool,
45    #[serde(default)]
46    pub(crate) tls_required: bool,
47    #[serde(default)]
48    pub(crate) tls_verify: bool,
49    #[serde(default)]
50    pub(crate) connect_urls: Vec<Address>,
51}
52
53impl Info {
54    /// Construct a new default `Info`
55    pub(crate) fn new() -> Self {
56        Self::default()
57    }
58
59    /// The unique identifier of the NATS server
60    pub fn server_id(&self) -> &str {
61        &self.server_id
62    }
63
64    /// The version of the NATS server
65    pub fn version(&self) -> &str {
66        &self.version
67    }
68
69    /// The version of golang the NATS server was built with
70    pub fn go(&self) -> &str {
71        &self.go
72    }
73
74    /// The IP address used to start the NATS server, by default this will be 0.0.0.0 and can be
75    /// configured with -client_advertise host:port
76    pub fn host(&self) -> &str {
77        &self.host
78    }
79
80    /// The port number the NATS server is configured to listen on
81    pub fn port(&self) -> u16 {
82        self.port
83    }
84
85    /// Maximum payload size, in bytes, that the server will accept from the client.
86    pub fn max_payload(&self) -> usize {
87        self.max_payload
88    }
89
90    /// An integer indicating the protocol version of the server. The server version 1.2.0 sets
91    /// this to 1 to indicate that it supports the "Echo" feature.
92    pub fn proto(&self) -> i32 {
93        self.proto
94    }
95
96    /// An optional unsigned integer (64 bits) representing the internal client identifier in the
97    /// server. This can be used to filter client connections in monitoring, correlate with error
98    /// logs, etc...
99    pub fn client_id(&self) -> Option<u64> {
100        self.client_id
101    }
102
103    /// If this is set, then the client should try to authenticate upon connect.
104    pub fn auth_required(&self) -> bool {
105        self.auth_required
106    }
107
108    /// If this is set, then the client must perform the TLS/1.2 handshake. Note, this used to be
109    /// ssl_required and has been updated along with the protocol from SSL to TLS.
110    pub fn tls_required(&self) -> bool {
111        self.tls_required
112    }
113
114    /// If this is set, the client must provide a valid certificate during the TLS handshake.
115    pub fn tls_verify(&self) -> bool {
116        self.tls_verify
117    }
118
119    /// An optional list of server urls that a client can connect to.
120    pub fn connect_urls(&self) -> &[Address] {
121        &self.connect_urls
122    }
123}
124
125///////////////////////////////////////////////////////////////////////////////////////////////////
126
127/// The methods of client authorization set in the [`Connect`](struct.Connect.html) message
128#[derive(Clone, Debug, PartialEq, Serialize)]
129#[serde(untagged)]
130pub enum Authorization {
131    /// Use the `auth_token` authorization method
132    Token {
133        #[serde(rename = "auth_token")]
134        token: String,
135    },
136    /// Use the `user` and `pass` authorization method
137    UsernamePassword {
138        #[serde(rename = "user")]
139        username: String,
140        #[serde(rename = "pass")]
141        password: String,
142    },
143}
144
145impl Authorization {
146    /// Create a `Authorization::token`
147    pub fn token(token: String) -> Self {
148        Authorization::Token { token }
149    }
150
151    /// Create a `Authorization::UsernamePassword`
152    pub fn username_password(username: String, password: String) -> Self {
153        Authorization::UsernamePassword { username, password }
154    }
155}
156
157impl FromStr for Authorization {
158    type Err = Infallible;
159    fn from_str(s: &str) -> Result<Self, Self::Err> {
160        match util::split_after(s, util::USERNAME_PASSWORD_SEPARATOR) {
161            (token, None) => Ok(Authorization::token(String::from(token))),
162            (username, Some(password)) => Ok(Authorization::username_password(
163                String::from(username),
164                String::from(password),
165            )),
166        }
167    }
168}
169
170impl fmt::Display for Authorization {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        match &self {
173            Authorization::Token { token } => write!(f, "{}", token)?,
174            Authorization::UsernamePassword { username, password } => {
175                write!(f, "{}:{}", username, password)?
176            }
177        }
178        Ok(())
179    }
180}
181
182/// The [`CONNECT`](https://nats-io.github.io/docs/nats_protocol/nats-protocol.html#connect) message sent by the client
183#[derive(Clone, Debug, PartialEq, Serialize)]
184pub struct Connect {
185    verbose: bool,
186    pedantic: bool,
187    tls_required: bool,
188    #[serde(flatten)]
189    authorization: Option<Authorization>,
190    name: Option<String>,
191    #[serde(rename = "lang")]
192    language: String,
193    version: String,
194    protocol: i32,
195    echo: bool,
196}
197
198impl Connect {
199    /// Construct a new default `Connect`
200    pub fn new() -> Self {
201        Self::default()
202    }
203
204    /// Return `true` if the connection is verbose.
205    pub fn is_verbose(&self) -> bool {
206        self.verbose
207    }
208
209    /// Turns on +OK protocol acknowledgements. [default = `false`]
210    pub fn verbose(&mut self, verbose: bool) -> &mut Self {
211        self.verbose = verbose;
212        self
213    }
214
215    /// Return `true` if the connection is pedantic.
216    pub fn is_pedantic(&self) -> bool {
217        self.pedantic
218    }
219
220    /// Turns on additional strict format checking, e.g. for properly formed subjects [default =
221    /// `false`]
222    pub fn pedantic(&mut self, pedantic: bool) -> &mut Self {
223        self.pedantic = pedantic;
224        self
225    }
226
227    /// Return `true` if the connection requires TLS.
228    pub fn is_tls_required(&self) -> bool {
229        self.tls_required
230    }
231
232    /// Indicates whether the client requires an SSL connection. [default = `false`]
233    pub fn tls_required(&mut self, tls_required: bool) -> &mut Self {
234        self.tls_required = tls_required;
235        self
236    }
237
238    /// Get the [`Authorization`](enum.Authorization.html)
239    pub fn authorization(&self) -> Option<&Authorization> {
240        self.authorization.as_ref()
241    }
242
243    /// Set the authorization to use a token
244    pub fn token(&mut self, token: String) -> &mut Self {
245        self.set_authorization(Some(Authorization::token(token)))
246    }
247
248    /// Set the authorization to use a username and password
249    pub fn username_password(&mut self, username: String, password: String) -> &mut Self {
250        self.set_authorization(Some(Authorization::username_password(username, password)))
251    }
252
253    /// Set the authorization
254    pub fn set_authorization(&mut self, authorization: Option<Authorization>) -> &mut Self {
255        self.authorization = authorization;
256        self
257    }
258
259    /// Remove the authorization
260    pub fn clear_authorization(&mut self) -> &mut Self {
261        self.set_authorization(None)
262    }
263
264    /// Get the optional name of the client.
265    pub fn get_name(&self) -> Option<&str> {
266        self.name.as_deref()
267    }
268
269    /// Set the optional client name. [default = `None`]
270    pub fn name(&mut self, name: String) -> &mut Self {
271        self.name = Some(name);
272        self
273    }
274
275    /// Remove the optional client name [default = `None`]
276    pub fn clear_name(&mut self) -> &mut Self {
277        self.name = None;
278        self
279    }
280
281    /// The implementation language of the client. [always = `"rust"`]
282    pub fn get_lang(&self) -> &str {
283        &self.language
284    }
285
286    /// The version of the client. [always = `"<the crate version>"`]
287    pub fn get_version(&self) -> &str {
288        &self.version
289    }
290
291    /// Optional int. Sending 0 (or absent) indicates client supports original protocol. Sending 1
292    /// indicates that the client supports dynamic reconfiguration of cluster topology changes by
293    /// asynchronously receiving INFO messages with known servers it can reconnect to. [always =
294    /// `1`]
295    pub fn get_protocol(&self) -> i32 {
296        self.protocol
297    }
298
299    /// Return `true` if echo is enabled on the connection.
300    pub fn is_echo(&self) -> bool {
301        self.echo
302    }
303
304    /// Optional boolean. If set to true, the server (version 1.2.0+) will send originating
305    /// messages from this connection to its own subscriptions. Clients should set this to true
306    /// only for server supporting this feature, which is when proto in the INFO protocol is set to
307    /// at least 1 [default = `false`]
308    pub fn echo(&mut self, echo: bool) -> &mut Self {
309        self.echo = echo;
310        self
311    }
312}
313
314impl Default for Connect {
315    fn default() -> Self {
316        Self {
317            verbose: false,
318            pedantic: false,
319            tls_required: false,
320            authorization: None,
321            name: None,
322            language: String::from("rust"),
323            version: String::from(util::CLIENT_VERSION),
324            protocol: 1,
325            echo: false,
326        }
327    }
328}
329
330///////////////////////////////////////////////////////////////////////////////////////////////////
331
332/// The [`-ERR`](https://nats-io.github.io/docs/nats_protocol/nats-protocol.html#okerr) messages sent from the server
333#[derive(Clone, Debug, PartialEq)]
334pub enum ProtocolError {
335    /// Unknown protocol error
336    UnknownProtocolOperation,
337    /// Client attempted to connect to a route port instead of the client port
338    AttemptedToConnectToRoutePort,
339    /// Client failed to authenticate to the server with credentials specified in the CONNECT
340    /// message
341    AuthorizationViolation,
342    /// Client took too long to authenticate to the server after establishing a connection
343    /// (default 1 second)
344    AuthorizationTimeout,
345    /// Client specified an invalid protocol version in the CONNECT message
346    InvalidClientProtocol,
347    /// Message destination subject and reply subject length exceeded the maximum control line
348    /// value specified by the max_control_line server option. The default is 1024 bytes.
349    MaximumControlLineExceeded,
350    /// Cannot parse the protocol message sent by the client
351    ParserError,
352    /// The server requires TLS and the client does not have TLS enabled.
353    SecureConnectionTlsRequired,
354    /// The server hasn't received a message from the client, including a PONG in too long.
355    StaleConnection,
356    /// This error is sent by the server when creating a new connection and the server has exceeded
357    /// the maximum number of connections specified by the max_connections server option. The
358    /// default is 64k.
359    MaximumConnectionsExceeded,
360    /// The server pending data size for the connection has reached the maximum size (default 10MB).
361    SlowConsumer,
362    /// Client attempted to publish a message with a payload size that exceeds the max_payload size
363    /// configured on the server. This value is supplied to the client upon connection in the
364    /// initial INFO message. The client is expected to do proper accounting of byte size to be
365    /// sent to the server in order to handle this error synchronously.
366    MaximumPayloadViolation,
367    /// Client sent a malformed subject (e.g. sub foo. 90)
368    InvalidSubject,
369    /// The user specified in the CONNECT message does not have permission to subscribe to the
370    /// subject.
371    PermissionsViolationForSubscription(Subject),
372    /// The user specified in the CONNECT message does not have permissions to publish to the
373    /// subject.
374    PermissionsViolationForPublish(Subject),
375}
376
377impl fmt::Display for ProtocolError {
378    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
379        match self {
380            ProtocolError::UnknownProtocolOperation => {
381                write!(f, "{}", util::UNKNOWN_PROTOCOL_OPERATION)?
382            }
383            ProtocolError::AttemptedToConnectToRoutePort => {
384                write!(f, "{}", util::ATTEMPTED_TO_CONNECT_TO_ROUTE_PORT)?
385            }
386            ProtocolError::AuthorizationViolation => {
387                write!(f, "{}", util::AUTHORIZATION_VIOLATION)?
388            }
389            ProtocolError::AuthorizationTimeout => write!(f, "{}", util::AUTHORIZATION_TIMEOUT)?,
390            ProtocolError::InvalidClientProtocol => write!(f, "{}", util::INVALID_CLIENT_PROTOCOL)?,
391            ProtocolError::MaximumControlLineExceeded => {
392                write!(f, "{}", util::MAXIMUM_CONTROL_LINE_EXCEEDED)?
393            }
394            ProtocolError::ParserError => write!(f, "{}", util::PARSER_ERROR)?,
395            ProtocolError::SecureConnectionTlsRequired => {
396                write!(f, "{}", util::SECURE_CONNECTION_TLS_REQUIRED)?
397            }
398            ProtocolError::StaleConnection => write!(f, "{}", util::STALE_CONNECTION)?,
399            ProtocolError::MaximumConnectionsExceeded => {
400                write!(f, "{}", util::MAXIMUM_CONNECTIONS_EXCEEDED)?
401            }
402            ProtocolError::SlowConsumer => write!(f, "{}", util::SLOW_CONSUMER)?,
403            ProtocolError::MaximumPayloadViolation => {
404                write!(f, "{}", util::MAXIMUM_PAYLOAD_VIOLATION)?
405            }
406            ProtocolError::InvalidSubject => write!(f, "{}", util::INVALID_SUBJECT)?,
407            ProtocolError::PermissionsViolationForSubscription(subject) => write!(
408                f,
409                "{} {}",
410                util::PERMISSIONS_VIOLATION_FOR_SUBSCRIPTION,
411                subject
412            )?,
413            ProtocolError::PermissionsViolationForPublish(subject) => {
414                write!(f, "{} {}", util::PERMISSIONS_VIOLATION_FOR_PUBLISH, subject)?
415            }
416        }
417        Ok(())
418    }
419}
420
421///////////////////////////////////////////////////////////////////////////////////////////////////
422
423/// A [subject](https://nats-io.github.io/docs/nats_protocol/nats-protocol.html#protocol-conventions) to publish or subscribe to
424///
425/// `Subject`s can be created by parsing a String or via a SubjectBuilder
426///
427/// # Example
428///  ```
429/// use rants::{ Subject, SubjectBuilder };
430///
431/// let subject = "foo.bar.*.>".parse::<Subject>();
432/// assert!(subject.is_ok());
433///
434/// let subject = SubjectBuilder::new()
435///   .add("foo")
436///   .add("bar")
437///   .add_wildcard()
438///   .build_full_wildcard();
439///
440/// assert_eq!(format!("{}", subject), "foo.bar.*.>");
441/// ```
442#[derive(Clone, Debug, Eq, PartialEq, Hash)]
443pub struct Subject {
444    tokens: Vec<String>,
445    full_wildcard: bool,
446}
447
448impl fmt::Display for Subject {
449    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450        // Is the whole subject a full wildcard
451        if self.tokens.is_empty() {
452            write!(f, ">")?;
453            return Ok(());
454        }
455        write!(f, "{}", self.tokens.join("."))?;
456        if self.full_wildcard {
457            write!(f, ".>")?;
458        }
459        Ok(())
460    }
461}
462
463#[derive(Default)]
464pub struct SubjectBuilder {
465    tokens: Vec<String>,
466}
467
468impl SubjectBuilder {
469    /// Create a new `SubjectBuilder`
470    pub fn new() -> Self {
471        Self::default()
472    }
473
474    /// Add a component to the subject
475    #[allow(clippy::should_implement_trait)]
476    pub fn add(mut self, subject: impl Into<String>) -> Self {
477        // Need to add some checks here to check for illegal characters
478        self.tokens.push(subject.into());
479        self
480    }
481
482    /// Add a wildcard to the subject
483    pub fn add_wildcard(mut self) -> Self {
484        self.tokens.push("*".to_string());
485        self
486    }
487
488    /// Generate the subject
489    pub fn build(self) -> Subject {
490        let fwc = self.tokens.is_empty();
491        Subject {
492            tokens: self.tokens,
493            full_wildcard: fwc,
494        }
495    }
496
497    /// Generate the subject with a full wildcard ending
498    pub fn build_full_wildcard(self) -> Subject {
499        Subject {
500            tokens: self.tokens,
501            full_wildcard: true,
502        }
503    }
504}
505
506///////////////////////////////////////////////////////////////////////////////////////////////////
507
508/// The [`MSG`](https://nats-io.github.io/docs/nats_protocol/nats-protocol.html#msg) message sent by the server
509#[derive(Debug, PartialEq)]
510pub struct Msg {
511    subject: Subject,
512    sid: Sid,
513    reply_to: Option<Subject>,
514    payload: Vec<u8>,
515}
516
517impl Msg {
518    pub(crate) fn new(
519        subject: Subject,
520        sid: Sid,
521        reply_to: Option<Subject>,
522        payload: Vec<u8>,
523    ) -> Self {
524        Self {
525            subject,
526            sid,
527            reply_to,
528            payload,
529        }
530    }
531
532    /// Get the [`Subject`](struct.Subject.html)
533    pub fn subject(&self) -> &Subject {
534        &self.subject
535    }
536
537    /// Get the subscription id
538    pub fn sid(&self) -> Sid {
539        self.sid
540    }
541
542    /// Get the optional reply to [`Subject`](struct.Subject.html)
543    pub fn reply_to(&self) -> Option<&Subject> {
544        self.reply_to.as_ref()
545    }
546
547    /// Get the payload
548    pub fn payload(&self) -> &[u8] {
549        &self.payload
550    }
551
552    /// Take ownership of the payload
553    pub fn into_payload(self) -> Vec<u8> {
554        self.payload
555    }
556}
557
558///////////////////////////////////////////////////////////////////////////////////////////////////
559
560/// The type used for subscription IDs
561///
562/// This is a unique identifier the client uses when routing messages from the server. A
563/// subscription ID can be any ASCII string, but within this client library, we always use
564/// the string representation of an atomically increasing `u64` counter.
565pub type Sid = u64;
566static SID: AtomicU64 = AtomicU64::new(0);
567
568/// A subscription to receive messages from a particular [`Subject`](struct.Subject.html)
569pub struct Subscription {
570    subject: Subject,
571    sid: Sid,
572    queue_group: Option<String>,
573    pub(crate) unsubscribe_after: Option<u64>,
574    pub(crate) tx: MpscSender<Msg>,
575}
576
577impl Subscription {
578    pub(crate) fn new(subject: Subject, queue_group: Option<String>, tx: MpscSender<Msg>) -> Self {
579        Self {
580            subject,
581            sid: SID.fetch_add(1, Ordering::Relaxed),
582            queue_group,
583            unsubscribe_after: None,
584            tx,
585        }
586    }
587
588    /// The [`Subject`](struct.Subject.html) of the subscription
589    pub fn subject(&self) -> &Subject {
590        &self.subject
591    }
592
593    /// The unique subscription ID
594    pub fn sid(&self) -> Sid {
595        self.sid
596    }
597
598    /// The optional queue group of the subscription
599    pub fn queue_group(&self) -> Option<&str> {
600        self.queue_group.as_ref().map(String::as_ref)
601    }
602
603    /// If this is of type `Some`, it means the subscription will automatically unsubscribe
604    /// after receiving the indicated number of messages
605    pub fn unsubscribe_after(&self) -> Option<u64> {
606        self.unsubscribe_after
607    }
608}
609
610///////////////////////////////////////////////////////////////////////////////////////////////////
611
612/// Representation of all possible server control lines. A control line is the first line of a
613/// message
614#[derive(Debug, PartialEq)]
615pub enum ServerControl {
616    Info(Info),
617    Msg {
618        subject: Subject,
619        sid: Sid,
620        reply_to: Option<Subject>,
621        len: u64,
622    },
623    Ping,
624    Pong,
625    Ok,
626    Err(ProtocolError),
627}
628
629/// Representation of all possible server messages. This is similar to `ServerControl` however it
630/// contains a full message type
631#[derive(Debug, PartialEq)]
632pub enum ServerMessage {
633    Info(Info),
634    Msg(Msg),
635    Ping,
636    Pong,
637    Ok,
638    Err(ProtocolError),
639}
640
641impl From<ServerControl> for ServerMessage {
642    fn from(control: ServerControl) -> Self {
643        match control {
644            ServerControl::Info(info) => ServerMessage::Info(info),
645            // We should never try to directly convert a `ServerControl::Msg` to
646            // `ServerMessage::Msg`. The reason is the `Msg` message has a payload and therefore
647            // requires further parsing.
648            ServerControl::Msg { .. } => unreachable!(),
649            ServerControl::Ping => ServerMessage::Ping,
650            ServerControl::Pong => ServerMessage::Pong,
651            ServerControl::Ok => ServerMessage::Ok,
652            ServerControl::Err(e) => ServerMessage::Err(e),
653        }
654    }
655}
656
657///////////////////////////////////////////////////////////////////////////////////////////////////
658
659pub enum ClientControl<'a> {
660    Connect(&'a Connect),
661    Pub(&'a Subject, Option<&'a Subject>, usize),
662    Sub(&'a Subscription),
663    Unsub(Sid, Option<u64>),
664    Ping,
665    Pong,
666}
667
668impl fmt::Display for ClientControl<'_> {
669    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
670        match self {
671            Self::Connect(connect) => write!(
672                f,
673                "{} {}{}",
674                util::CONNECT_OP_NAME,
675                serde_json::to_string(connect).expect("to serialize Connect"),
676                util::MESSAGE_TERMINATOR
677            ),
678            Self::Pub(subject, reply_to, len) => {
679                if let Some(reply_to) = reply_to {
680                    write!(
681                        f,
682                        "{} {} {} {}{}",
683                        util::PUB_OP_NAME,
684                        subject,
685                        reply_to,
686                        len,
687                        util::MESSAGE_TERMINATOR
688                    )
689                } else {
690                    write!(
691                        f,
692                        "{} {} {}{}",
693                        util::PUB_OP_NAME,
694                        subject,
695                        len,
696                        util::MESSAGE_TERMINATOR
697                    )
698                }
699            }
700            Self::Sub(subscription) => {
701                if let Some(queue_group) = &subscription.queue_group {
702                    write!(
703                        f,
704                        "{} {} {} {}{}",
705                        util::SUB_OP_NAME,
706                        subscription.subject(),
707                        queue_group,
708                        subscription.sid(),
709                        util::MESSAGE_TERMINATOR
710                    )
711                } else {
712                    write!(
713                        f,
714                        "{} {} {}{}",
715                        util::SUB_OP_NAME,
716                        subscription.subject(),
717                        subscription.sid(),
718                        util::MESSAGE_TERMINATOR
719                    )
720                }
721            }
722            Self::Unsub(sid, max_msgs) => {
723                if let Some(max_msgs) = max_msgs {
724                    write!(
725                        f,
726                        "{} {} {}{}",
727                        util::UNSUB_OP_NAME,
728                        sid,
729                        max_msgs,
730                        util::MESSAGE_TERMINATOR
731                    )
732                } else {
733                    write!(
734                        f,
735                        "{} {}{}",
736                        util::UNSUB_OP_NAME,
737                        sid,
738                        util::MESSAGE_TERMINATOR
739                    )
740                }
741            }
742            Self::Ping => write!(f, "{}{}", util::PING_OP_NAME, util::MESSAGE_TERMINATOR),
743            Self::Pong => write!(f, "{}{}", util::PONG_OP_NAME, util::MESSAGE_TERMINATOR),
744        }
745    }
746}
747
748impl ClientControl<'_> {
749    pub fn to_line(&self) -> String {
750        let s = self.to_string();
751        trace!("->> {:?}", s);
752        s
753    }
754}
755
756///////////////////////////////////////////////////////////////////////////////////////////////////