1mod address;
2pub mod error;
3mod parser;
4mod refs;
5mod state;
6#[cfg(test)]
7mod tests;
8#[cfg(feature = "tls")]
9pub mod tls;
10
11use log::trace;
12use serde::{Deserialize, Serialize};
13use std::{
14 convert::Infallible,
15 fmt,
16 str::FromStr,
17 sync::atomic::{AtomicU64, Ordering},
18};
19use tokio::sync::mpsc::Sender as MpscSender;
20
21use crate::util;
22
23pub use self::{
24 address::Address,
25 refs::{ClientRef, ClientRefMut, StableMutexGuard},
26 state::{ClientState, ConnectionState, StateTransition, StateTransitionResult},
27};
28
29#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
33pub struct Info {
34 pub(crate) server_id: String,
35 pub(crate) version: String,
36 pub(crate) go: String,
37 pub(crate) host: String,
38 pub(crate) port: u16,
39 pub(crate) max_payload: usize,
40 #[serde(default)]
41 pub(crate) proto: i32,
42 pub(crate) client_id: Option<u64>,
43 #[serde(default)]
44 pub(crate) auth_required: bool,
45 #[serde(default)]
46 pub(crate) tls_required: bool,
47 #[serde(default)]
48 pub(crate) tls_verify: bool,
49 #[serde(default)]
50 pub(crate) connect_urls: Vec<Address>,
51}
52
53impl Info {
54 pub(crate) fn new() -> Self {
56 Self::default()
57 }
58
59 pub fn server_id(&self) -> &str {
61 &self.server_id
62 }
63
64 pub fn version(&self) -> &str {
66 &self.version
67 }
68
69 pub fn go(&self) -> &str {
71 &self.go
72 }
73
74 pub fn host(&self) -> &str {
77 &self.host
78 }
79
80 pub fn port(&self) -> u16 {
82 self.port
83 }
84
85 pub fn max_payload(&self) -> usize {
87 self.max_payload
88 }
89
90 pub fn proto(&self) -> i32 {
93 self.proto
94 }
95
96 pub fn client_id(&self) -> Option<u64> {
100 self.client_id
101 }
102
103 pub fn auth_required(&self) -> bool {
105 self.auth_required
106 }
107
108 pub fn tls_required(&self) -> bool {
111 self.tls_required
112 }
113
114 pub fn tls_verify(&self) -> bool {
116 self.tls_verify
117 }
118
119 pub fn connect_urls(&self) -> &[Address] {
121 &self.connect_urls
122 }
123}
124
125#[derive(Clone, Debug, PartialEq, Serialize)]
129#[serde(untagged)]
130pub enum Authorization {
131 Token {
133 #[serde(rename = "auth_token")]
134 token: String,
135 },
136 UsernamePassword {
138 #[serde(rename = "user")]
139 username: String,
140 #[serde(rename = "pass")]
141 password: String,
142 },
143}
144
145impl Authorization {
146 pub fn token(token: String) -> Self {
148 Authorization::Token { token }
149 }
150
151 pub fn username_password(username: String, password: String) -> Self {
153 Authorization::UsernamePassword { username, password }
154 }
155}
156
157impl FromStr for Authorization {
158 type Err = Infallible;
159 fn from_str(s: &str) -> Result<Self, Self::Err> {
160 match util::split_after(s, util::USERNAME_PASSWORD_SEPARATOR) {
161 (token, None) => Ok(Authorization::token(String::from(token))),
162 (username, Some(password)) => Ok(Authorization::username_password(
163 String::from(username),
164 String::from(password),
165 )),
166 }
167 }
168}
169
170impl fmt::Display for Authorization {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 match &self {
173 Authorization::Token { token } => write!(f, "{}", token)?,
174 Authorization::UsernamePassword { username, password } => {
175 write!(f, "{}:{}", username, password)?
176 }
177 }
178 Ok(())
179 }
180}
181
182#[derive(Clone, Debug, PartialEq, Serialize)]
184pub struct Connect {
185 verbose: bool,
186 pedantic: bool,
187 tls_required: bool,
188 #[serde(flatten)]
189 authorization: Option<Authorization>,
190 name: Option<String>,
191 #[serde(rename = "lang")]
192 language: String,
193 version: String,
194 protocol: i32,
195 echo: bool,
196}
197
198impl Connect {
199 pub fn new() -> Self {
201 Self::default()
202 }
203
204 pub fn is_verbose(&self) -> bool {
206 self.verbose
207 }
208
209 pub fn verbose(&mut self, verbose: bool) -> &mut Self {
211 self.verbose = verbose;
212 self
213 }
214
215 pub fn is_pedantic(&self) -> bool {
217 self.pedantic
218 }
219
220 pub fn pedantic(&mut self, pedantic: bool) -> &mut Self {
223 self.pedantic = pedantic;
224 self
225 }
226
227 pub fn is_tls_required(&self) -> bool {
229 self.tls_required
230 }
231
232 pub fn tls_required(&mut self, tls_required: bool) -> &mut Self {
234 self.tls_required = tls_required;
235 self
236 }
237
238 pub fn authorization(&self) -> Option<&Authorization> {
240 self.authorization.as_ref()
241 }
242
243 pub fn token(&mut self, token: String) -> &mut Self {
245 self.set_authorization(Some(Authorization::token(token)))
246 }
247
248 pub fn username_password(&mut self, username: String, password: String) -> &mut Self {
250 self.set_authorization(Some(Authorization::username_password(username, password)))
251 }
252
253 pub fn set_authorization(&mut self, authorization: Option<Authorization>) -> &mut Self {
255 self.authorization = authorization;
256 self
257 }
258
259 pub fn clear_authorization(&mut self) -> &mut Self {
261 self.set_authorization(None)
262 }
263
264 pub fn get_name(&self) -> Option<&str> {
266 self.name.as_deref()
267 }
268
269 pub fn name(&mut self, name: String) -> &mut Self {
271 self.name = Some(name);
272 self
273 }
274
275 pub fn clear_name(&mut self) -> &mut Self {
277 self.name = None;
278 self
279 }
280
281 pub fn get_lang(&self) -> &str {
283 &self.language
284 }
285
286 pub fn get_version(&self) -> &str {
288 &self.version
289 }
290
291 pub fn get_protocol(&self) -> i32 {
296 self.protocol
297 }
298
299 pub fn is_echo(&self) -> bool {
301 self.echo
302 }
303
304 pub fn echo(&mut self, echo: bool) -> &mut Self {
309 self.echo = echo;
310 self
311 }
312}
313
314impl Default for Connect {
315 fn default() -> Self {
316 Self {
317 verbose: false,
318 pedantic: false,
319 tls_required: false,
320 authorization: None,
321 name: None,
322 language: String::from("rust"),
323 version: String::from(util::CLIENT_VERSION),
324 protocol: 1,
325 echo: false,
326 }
327 }
328}
329
330#[derive(Clone, Debug, PartialEq)]
334pub enum ProtocolError {
335 UnknownProtocolOperation,
337 AttemptedToConnectToRoutePort,
339 AuthorizationViolation,
342 AuthorizationTimeout,
345 InvalidClientProtocol,
347 MaximumControlLineExceeded,
350 ParserError,
352 SecureConnectionTlsRequired,
354 StaleConnection,
356 MaximumConnectionsExceeded,
360 SlowConsumer,
362 MaximumPayloadViolation,
367 InvalidSubject,
369 PermissionsViolationForSubscription(Subject),
372 PermissionsViolationForPublish(Subject),
375}
376
377impl fmt::Display for ProtocolError {
378 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
379 match self {
380 ProtocolError::UnknownProtocolOperation => {
381 write!(f, "{}", util::UNKNOWN_PROTOCOL_OPERATION)?
382 }
383 ProtocolError::AttemptedToConnectToRoutePort => {
384 write!(f, "{}", util::ATTEMPTED_TO_CONNECT_TO_ROUTE_PORT)?
385 }
386 ProtocolError::AuthorizationViolation => {
387 write!(f, "{}", util::AUTHORIZATION_VIOLATION)?
388 }
389 ProtocolError::AuthorizationTimeout => write!(f, "{}", util::AUTHORIZATION_TIMEOUT)?,
390 ProtocolError::InvalidClientProtocol => write!(f, "{}", util::INVALID_CLIENT_PROTOCOL)?,
391 ProtocolError::MaximumControlLineExceeded => {
392 write!(f, "{}", util::MAXIMUM_CONTROL_LINE_EXCEEDED)?
393 }
394 ProtocolError::ParserError => write!(f, "{}", util::PARSER_ERROR)?,
395 ProtocolError::SecureConnectionTlsRequired => {
396 write!(f, "{}", util::SECURE_CONNECTION_TLS_REQUIRED)?
397 }
398 ProtocolError::StaleConnection => write!(f, "{}", util::STALE_CONNECTION)?,
399 ProtocolError::MaximumConnectionsExceeded => {
400 write!(f, "{}", util::MAXIMUM_CONNECTIONS_EXCEEDED)?
401 }
402 ProtocolError::SlowConsumer => write!(f, "{}", util::SLOW_CONSUMER)?,
403 ProtocolError::MaximumPayloadViolation => {
404 write!(f, "{}", util::MAXIMUM_PAYLOAD_VIOLATION)?
405 }
406 ProtocolError::InvalidSubject => write!(f, "{}", util::INVALID_SUBJECT)?,
407 ProtocolError::PermissionsViolationForSubscription(subject) => write!(
408 f,
409 "{} {}",
410 util::PERMISSIONS_VIOLATION_FOR_SUBSCRIPTION,
411 subject
412 )?,
413 ProtocolError::PermissionsViolationForPublish(subject) => {
414 write!(f, "{} {}", util::PERMISSIONS_VIOLATION_FOR_PUBLISH, subject)?
415 }
416 }
417 Ok(())
418 }
419}
420
421#[derive(Clone, Debug, Eq, PartialEq, Hash)]
443pub struct Subject {
444 tokens: Vec<String>,
445 full_wildcard: bool,
446}
447
448impl fmt::Display for Subject {
449 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450 if self.tokens.is_empty() {
452 write!(f, ">")?;
453 return Ok(());
454 }
455 write!(f, "{}", self.tokens.join("."))?;
456 if self.full_wildcard {
457 write!(f, ".>")?;
458 }
459 Ok(())
460 }
461}
462
463#[derive(Default)]
464pub struct SubjectBuilder {
465 tokens: Vec<String>,
466}
467
468impl SubjectBuilder {
469 pub fn new() -> Self {
471 Self::default()
472 }
473
474 #[allow(clippy::should_implement_trait)]
476 pub fn add(mut self, subject: impl Into<String>) -> Self {
477 self.tokens.push(subject.into());
479 self
480 }
481
482 pub fn add_wildcard(mut self) -> Self {
484 self.tokens.push("*".to_string());
485 self
486 }
487
488 pub fn build(self) -> Subject {
490 let fwc = self.tokens.is_empty();
491 Subject {
492 tokens: self.tokens,
493 full_wildcard: fwc,
494 }
495 }
496
497 pub fn build_full_wildcard(self) -> Subject {
499 Subject {
500 tokens: self.tokens,
501 full_wildcard: true,
502 }
503 }
504}
505
506#[derive(Debug, PartialEq)]
510pub struct Msg {
511 subject: Subject,
512 sid: Sid,
513 reply_to: Option<Subject>,
514 payload: Vec<u8>,
515}
516
517impl Msg {
518 pub(crate) fn new(
519 subject: Subject,
520 sid: Sid,
521 reply_to: Option<Subject>,
522 payload: Vec<u8>,
523 ) -> Self {
524 Self {
525 subject,
526 sid,
527 reply_to,
528 payload,
529 }
530 }
531
532 pub fn subject(&self) -> &Subject {
534 &self.subject
535 }
536
537 pub fn sid(&self) -> Sid {
539 self.sid
540 }
541
542 pub fn reply_to(&self) -> Option<&Subject> {
544 self.reply_to.as_ref()
545 }
546
547 pub fn payload(&self) -> &[u8] {
549 &self.payload
550 }
551
552 pub fn into_payload(self) -> Vec<u8> {
554 self.payload
555 }
556}
557
558pub type Sid = u64;
566static SID: AtomicU64 = AtomicU64::new(0);
567
568pub struct Subscription {
570 subject: Subject,
571 sid: Sid,
572 queue_group: Option<String>,
573 pub(crate) unsubscribe_after: Option<u64>,
574 pub(crate) tx: MpscSender<Msg>,
575}
576
577impl Subscription {
578 pub(crate) fn new(subject: Subject, queue_group: Option<String>, tx: MpscSender<Msg>) -> Self {
579 Self {
580 subject,
581 sid: SID.fetch_add(1, Ordering::Relaxed),
582 queue_group,
583 unsubscribe_after: None,
584 tx,
585 }
586 }
587
588 pub fn subject(&self) -> &Subject {
590 &self.subject
591 }
592
593 pub fn sid(&self) -> Sid {
595 self.sid
596 }
597
598 pub fn queue_group(&self) -> Option<&str> {
600 self.queue_group.as_ref().map(String::as_ref)
601 }
602
603 pub fn unsubscribe_after(&self) -> Option<u64> {
606 self.unsubscribe_after
607 }
608}
609
610#[derive(Debug, PartialEq)]
615pub enum ServerControl {
616 Info(Info),
617 Msg {
618 subject: Subject,
619 sid: Sid,
620 reply_to: Option<Subject>,
621 len: u64,
622 },
623 Ping,
624 Pong,
625 Ok,
626 Err(ProtocolError),
627}
628
629#[derive(Debug, PartialEq)]
632pub enum ServerMessage {
633 Info(Info),
634 Msg(Msg),
635 Ping,
636 Pong,
637 Ok,
638 Err(ProtocolError),
639}
640
641impl From<ServerControl> for ServerMessage {
642 fn from(control: ServerControl) -> Self {
643 match control {
644 ServerControl::Info(info) => ServerMessage::Info(info),
645 ServerControl::Msg { .. } => unreachable!(),
649 ServerControl::Ping => ServerMessage::Ping,
650 ServerControl::Pong => ServerMessage::Pong,
651 ServerControl::Ok => ServerMessage::Ok,
652 ServerControl::Err(e) => ServerMessage::Err(e),
653 }
654 }
655}
656
657pub enum ClientControl<'a> {
660 Connect(&'a Connect),
661 Pub(&'a Subject, Option<&'a Subject>, usize),
662 Sub(&'a Subscription),
663 Unsub(Sid, Option<u64>),
664 Ping,
665 Pong,
666}
667
668impl fmt::Display for ClientControl<'_> {
669 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
670 match self {
671 Self::Connect(connect) => write!(
672 f,
673 "{} {}{}",
674 util::CONNECT_OP_NAME,
675 serde_json::to_string(connect).expect("to serialize Connect"),
676 util::MESSAGE_TERMINATOR
677 ),
678 Self::Pub(subject, reply_to, len) => {
679 if let Some(reply_to) = reply_to {
680 write!(
681 f,
682 "{} {} {} {}{}",
683 util::PUB_OP_NAME,
684 subject,
685 reply_to,
686 len,
687 util::MESSAGE_TERMINATOR
688 )
689 } else {
690 write!(
691 f,
692 "{} {} {}{}",
693 util::PUB_OP_NAME,
694 subject,
695 len,
696 util::MESSAGE_TERMINATOR
697 )
698 }
699 }
700 Self::Sub(subscription) => {
701 if let Some(queue_group) = &subscription.queue_group {
702 write!(
703 f,
704 "{} {} {} {}{}",
705 util::SUB_OP_NAME,
706 subscription.subject(),
707 queue_group,
708 subscription.sid(),
709 util::MESSAGE_TERMINATOR
710 )
711 } else {
712 write!(
713 f,
714 "{} {} {}{}",
715 util::SUB_OP_NAME,
716 subscription.subject(),
717 subscription.sid(),
718 util::MESSAGE_TERMINATOR
719 )
720 }
721 }
722 Self::Unsub(sid, max_msgs) => {
723 if let Some(max_msgs) = max_msgs {
724 write!(
725 f,
726 "{} {} {}{}",
727 util::UNSUB_OP_NAME,
728 sid,
729 max_msgs,
730 util::MESSAGE_TERMINATOR
731 )
732 } else {
733 write!(
734 f,
735 "{} {}{}",
736 util::UNSUB_OP_NAME,
737 sid,
738 util::MESSAGE_TERMINATOR
739 )
740 }
741 }
742 Self::Ping => write!(f, "{}{}", util::PING_OP_NAME, util::MESSAGE_TERMINATOR),
743 Self::Pong => write!(f, "{}{}", util::PONG_OP_NAME, util::MESSAGE_TERMINATOR),
744 }
745 }
746}
747
748impl ClientControl<'_> {
749 pub fn to_line(&self) -> String {
750 let s = self.to_string();
751 trace!("->> {:?}", s);
752 s
753 }
754}
755
756