1use std::any::Any;
4use std::convert::From as _f;
5use std::fmt;
6use std::fmt::Display;
7use std::hash::Hash;
8use std::mem::{size_of, size_of_val};
9use std::net::SocketAddr;
10use std::num::{NonZeroU16, NonZeroU32};
11use std::ops::{Deref, DerefMut};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use crate::acl::AuthInfo;
16use crate::codec::types::MQTT_LEVEL_5;
17use crate::codec::v3::{
18 Connect as ConnectV3, ConnectAckReason as ConnectAckReasonV3, LastWill as LastWillV3,
19};
20use crate::codec::v5::{
21 Connect as ConnectV5, ConnectAckReason as ConnectAckReasonV5, DisconnectReasonCode,
22 LastWill as LastWillV5, PublishAck as PublishAckV5, PublishAck2, PublishAck2Reason, PublishAckReason,
23 PublishProperties, RetainHandling, SubscribeAckReason, SubscriptionOptions as SubscriptionOptionsV5,
24 ToReasonCode, UserProperties, UserProperty,
25};
26use crate::fitter::Fitter;
27use crate::net::MqttError;
28use crate::net::{v3, v5, Builder};
29use crate::queue::{Queue, Sender};
30use crate::utils::{self, timestamp_millis};
31use crate::{codec, Error, Result};
32use anyhow::anyhow;
33use base64::prelude::{Engine, BASE64_STANDARD};
34use bitflags::bitflags;
35use bytes::Bytes;
36use bytestring::ByteString;
37use futures::StreamExt;
38use get_size::GetSize;
39use itertools::Itertools;
40use rmqtt_codec::cert::CertInfo;
41use serde::de::{self, Deserializer};
42use serde::ser::{SerializeStruct, Serializer};
43use serde::{Deserialize, Serialize};
44use serde_json::{json, Map, Value};
45use tokio::io::{AsyncRead, AsyncWrite};
46use tokio::sync::{oneshot, RwLock};
47
48use crate::context::ServerContext;
49use crate::inflight::{OutInflight, OutInflightMessage};
50use crate::session::OfflineInfo;
51use crate::topic::Level;
52
53pub use crate::codec::types::Publish as CodecPublish;
54
55pub type Port = u16;
56pub type NodeId = utils::NodeId;
57pub type NodeName = String;
58pub type RemoteSocketAddr = SocketAddr;
59pub type LocalSocketAddr = SocketAddr;
60pub type Addr = utils::Addr;
61pub type ClientId = ByteString;
62pub type UserName = ByteString;
63pub type Superuser = bool;
64pub type Password = bytes::Bytes;
65pub type PacketId = u16;
66pub type TopicName = ByteString;
68pub type Topic = crate::topic::Topic;
69pub type TopicFilter = ByteString;
71pub type SharedGroup = ByteString;
72pub type LimitSubsCount = Option<usize>;
73pub type IsDisconnect = bool;
74pub type MessageExpiry = bool;
75pub type TimestampMillis = utils::TimestampMillis;
76pub type Timestamp = utils::Timestamp;
77pub type IsOnline = bool;
78pub type IsAdmin = bool;
79pub type LimiterName = u16;
80pub type CleanStart = bool;
81pub type AssignedClientId = bool;
82pub type IsPing = bool;
83
84pub type Tx = SessionTx;
85pub type Rx = futures::channel::mpsc::UnboundedReceiver<Message>;
86
87pub type DashSet<V> = dashmap::DashSet<V, ahash::RandomState>;
88pub type DashMap<K, V> = dashmap::DashMap<K, V, ahash::RandomState>;
89pub type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;
90pub type QoS = rmqtt_codec::types::QoS;
91pub type PublishReceiveTime = TimestampMillis;
92pub type Subscriptions = Vec<(TopicFilter, SubscriptionOptions)>;
93pub type TopicFilters = Vec<TopicFilter>;
94pub type SubscriptionClientIds = Option<Vec<(ClientId, Option<(TopicFilter, SharedGroup)>)>>;
95pub type SubscriptionIdentifier = NonZeroU32;
96
97pub type HookSubscribeResult = Vec<Option<TopicFilter>>;
98pub type HookUnsubscribeResult = Vec<Option<TopicFilter>>;
99
100pub type MessageSender = Sender<(From, Publish)>;
101pub type MessageQueue = Queue<(From, Publish)>;
102pub type MessageQueueType = Arc<MessageQueue>;
103pub type OutInflightType = Arc<RwLock<OutInflight>>; pub type ConnectInfoType = Arc<ConnectInfo>;
106pub type FitterType = Arc<dyn Fitter>;
107pub type ListenerConfig = Arc<Builder>;
108pub type ListenerId = u16;
109
110pub(crate) const UNDEFINED: &str = "undefined";
111
112#[derive(Clone)]
113pub struct SessionTx {
114 #[cfg(feature = "debug")]
115 scx: ServerContext,
116 tx: futures::channel::mpsc::UnboundedSender<Message>,
117}
118
119impl SessionTx {
120 pub fn new(
121 tx: futures::channel::mpsc::UnboundedSender<Message>,
122 #[cfg(feature = "debug")] scx: ServerContext,
123 ) -> Self {
124 Self {
125 tx,
126 #[cfg(feature = "debug")]
127 scx,
128 }
129 }
130
131 #[inline]
132 pub fn is_closed(&self) -> bool {
133 self.tx.is_closed()
134 }
135
136 #[inline]
137 pub fn unbounded_send(
138 &self,
139 msg: Message,
140 ) -> std::result::Result<(), futures::channel::mpsc::TrySendError<Message>> {
141 match self.tx.unbounded_send(msg) {
142 Ok(()) => {
143 #[cfg(feature = "debug")]
144 self.scx.stats.debug_session_channels.inc();
145 Ok(())
146 }
147 Err(e) => Err(e),
148 }
149 }
150}
151
152#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
153pub enum ConnectInfo {
154 V3(Id, Box<ConnectV3>),
155 V5(Id, Box<ConnectV5>),
156}
157
158impl std::convert::From<Id> for ConnectInfo {
159 fn from(id: Id) -> Self {
160 ConnectInfo::V3(id, Box::default())
161 }
162}
163
164impl ConnectInfo {
165 #[inline]
166 pub fn id(&self) -> &Id {
167 match self {
168 ConnectInfo::V3(id, _) => id,
169 ConnectInfo::V5(id, _) => id,
170 }
171 }
172
173 #[inline]
174 pub fn client_id(&self) -> &ClientId {
175 match self {
176 ConnectInfo::V3(id, _) => &id.client_id,
177 ConnectInfo::V5(id, _) => &id.client_id,
178 }
179 }
180
181 #[inline]
182 pub fn to_json(&self) -> serde_json::Value {
183 match self {
184 ConnectInfo::V3(id, c) => {
185 json!({
186 "node": id.node(),
187 "ipaddress": id.remote_addr,
188 "clientid": id.client_id,
189 "username": id.username_ref(),
190 "keepalive": c.keep_alive,
191 "proto_ver": c.protocol.level(),
192 "clean_session": c.clean_session,
193 "last_will": self.last_will().map(|lw|lw.to_json())
194 })
195 }
196 ConnectInfo::V5(id, c) => {
197 json!({
198 "node": id.node(),
199 "ipaddress": id.remote_addr,
200 "clientid": id.client_id,
201 "username": id.username_ref(),
202 "keepalive": c.keep_alive,
203 "proto_ver": MQTT_LEVEL_5,
204 "clean_start": c.clean_start,
205 "last_will": self.last_will().map(|lw|lw.to_json()),
206
207 "session_expiry_interval_secs": c.session_expiry_interval_secs,
208 "auth_method": c.auth_method,
209 "auth_data": c.auth_data,
210 "request_problem_info": c.request_problem_info,
211 "request_response_info": c.request_response_info,
212 "receive_max": c.receive_max,
213 "topic_alias_max": c.topic_alias_max,
214 "user_properties": c.user_properties,
215 "max_packet_size": c.max_packet_size,
216 })
217 }
218 }
219 }
220
221 #[inline]
222 pub fn to_hook_body(&self, user_properties: bool) -> serde_json::Value {
223 match self {
224 ConnectInfo::V3(id, c) => {
225 json!({
226 "node": id.node(),
227 "ipaddress": id.remote_addr,
228 "clientid": id.client_id,
229 "username": id.username_ref(),
230 "keepalive": c.keep_alive,
231 "proto_ver": c.protocol.level(),
232 "clean_session": c.clean_session,
233 })
234 }
235 ConnectInfo::V5(id, c) => {
236 let mut body = json!({
237 "node": id.node(),
238 "ipaddress": id.remote_addr,
239 "clientid": id.client_id,
240 "username": id.username_ref(),
241 "keepalive": c.keep_alive,
242 "proto_ver": MQTT_LEVEL_5,
243 "clean_start": c.clean_start,
244 });
245 if user_properties && !c.user_properties.is_empty() {
246 if let Some(obj) = body.as_object_mut() {
247 obj.insert(
248 "user_properties".into(),
249 json!(serialize_user_properties(&c.user_properties)),
250 );
251 }
252 }
253 body
254 }
255 }
256 }
257
258 #[inline]
259 pub fn last_will(&self) -> Option<LastWill<'_>> {
260 match self {
261 ConnectInfo::V3(_, conn_info) => conn_info.last_will.as_ref().map(LastWill::V3),
262 ConnectInfo::V5(_, conn_info) => conn_info.last_will.as_ref().map(LastWill::V5),
263 }
264 }
265
266 #[inline]
267 pub fn keep_alive(&self) -> u16 {
268 match self {
269 ConnectInfo::V3(_, conn_info) => conn_info.keep_alive,
270 ConnectInfo::V5(_, conn_info) => conn_info.keep_alive,
271 }
272 }
273
274 #[inline]
275 pub fn username(&self) -> Option<&UserName> {
276 match self {
277 ConnectInfo::V3(_, conn_info) => conn_info.username.as_ref(),
278 ConnectInfo::V5(_, conn_info) => conn_info.username.as_ref(),
279 }
280 }
281
282 #[inline]
283 pub fn password(&self) -> Option<&Password> {
284 match self {
285 ConnectInfo::V3(_, conn_info) => conn_info.password.as_ref(),
286 ConnectInfo::V5(_, conn_info) => conn_info.password.as_ref(),
287 }
288 }
289
290 #[inline]
291 pub fn ipaddress(&self) -> Option<SocketAddr> {
292 match self {
293 ConnectInfo::V3(id, _) => id.remote_addr,
294 ConnectInfo::V5(id, _) => id.remote_addr,
295 }
296 }
297
298 #[inline]
299 pub fn clean_start(&self) -> bool {
300 match self {
301 ConnectInfo::V3(_, conn_info) => conn_info.clean_session,
302 ConnectInfo::V5(_, conn_info) => conn_info.clean_start,
303 }
304 }
305
306 #[inline]
307 pub fn proto_ver(&self) -> u8 {
308 match self {
309 ConnectInfo::V3(_, conn_info) => conn_info.protocol.level(),
310 ConnectInfo::V5(_, _) => MQTT_LEVEL_5,
311 }
312 }
313
314 #[inline]
316 pub fn max_packet_size(&self) -> Option<NonZeroU32> {
317 if let ConnectInfo::V5(_, connect) = self {
318 connect.max_packet_size
319 } else {
320 None
321 }
322 }
323
324 #[inline]
325 pub fn auth_method(&self) -> Option<&ByteString> {
326 if let ConnectInfo::V5(_, connect) = self {
327 connect.auth_method.as_ref()
328 } else {
329 None
330 }
331 }
332
333 #[inline]
334 pub fn cert(&self) -> Option<&CertInfo> {
335 match self {
336 ConnectInfo::V3(_, connect) => connect.cert.as_ref(),
337 ConnectInfo::V5(_, connect) => connect.cert.as_ref(),
338 }
339 }
340}
341
342#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
343pub enum Disconnect {
344 V3,
345 V5(codec::v5::Disconnect),
346 Other(ByteString),
347}
348
349impl Disconnect {
350 #[inline]
351 pub fn reason_code(&self) -> Option<DisconnectReasonCode> {
352 match self {
353 Disconnect::V3 => None,
354 Disconnect::V5(d) => Some(d.reason_code),
355 Disconnect::Other(_) => None,
356 }
357 }
358
359 #[inline]
360 pub fn reason(&self) -> Option<&ByteString> {
361 match self {
362 Disconnect::V3 => None,
363 Disconnect::V5(d) => d.reason_string.as_ref(),
364 Disconnect::Other(r) => Some(r),
365 }
366 }
367}
368
369pub type SubscribeAclResult = SubscribeReturn;
370
371#[derive(Default, Debug, Clone, Deserialize, Serialize)]
372pub struct PublishAclResult(pub PublishResult);
373
374impl PublishAclResult {
375 #[inline]
376 pub fn pub_res(self) -> PublishResult {
377 self.0
378 }
379
380 #[inline]
381 pub fn allow() -> Self {
382 PublishAclResult(PublishResult::reason_code(PublishAckReason::Success, None, false))
383 }
384
385 #[inline]
386 pub fn rejected(disconnect: IsDisconnect, reason_string: Option<ByteString>) -> Self {
387 PublishAclResult(PublishResult::reason_code(
388 PublishAckReason::NotAuthorized,
389 Some(reason_string.unwrap_or_else(|| "PublishRefused".into())),
390 disconnect,
391 ))
392 }
393
394 #[inline]
395 pub fn is_allow(&self) -> bool {
396 matches!(self.0.reason_code, PublishAckReason::Success)
397 }
398
399 #[inline]
400 pub fn is_rejected(&self) -> bool {
401 !self.is_allow()
402 }
403}
404
405#[derive(Debug, Clone)]
406pub enum AuthResult {
407 Allow(Superuser, Option<AuthInfo>),
408 NotFound,
410 BadUsernameOrPassword,
411 NotAuthorized,
412}
413
414#[derive(Debug, Clone, PartialEq, Eq)]
415pub enum MessageExpiryCheckResult {
416 Expiry,
417 Remaining(Option<NonZeroU32>),
418}
419
420impl MessageExpiryCheckResult {
421 #[inline]
422 pub fn is_expiry(&self) -> bool {
423 matches!(self, Self::Expiry)
424 }
425
426 #[inline]
427 pub fn message_expiry_interval(&self) -> Option<NonZeroU32> {
428 match self {
429 Self::Expiry => None,
430 Self::Remaining(i) => *i,
431 }
432 }
433}
434
435pub type SharedSubRelations = HashMap<TopicFilter, Vec<(SharedGroup, NodeId, ClientId, QoS, IsOnline)>>;
437pub type OtherSubRelations = HashMap<NodeId, Vec<TopicFilter>>;
439pub type ClearSubscriptions = bool;
440
441pub type SharedGroupType = (SharedGroup, IsOnline, Vec<ClientId>);
442
443pub type AllRelationsMap = DashMap<TopicFilter, HashMap<ClientId, (Id, SubscriptionOptions)>>;
444
445pub type SubRelation = (
446 TopicFilter,
447 ClientId,
448 SubscriptionOptions,
449 Option<Vec<SubscriptionIdentifier>>,
450 Option<SharedGroupType>,
451);
452pub type SubRelations = Vec<SubRelation>;
453pub type SubRelationsMap = HashMap<NodeId, SubRelations>;
454
455impl std::convert::From<SubscriptioRelationsCollector> for SubRelations {
456 #[inline]
457 fn from(collector: SubscriptioRelationsCollector) -> Self {
458 let mut subs = collector.v3_rels;
459 subs.extend(collector.v5_rels.into_iter().map(|(clientid, (topic_filter, opts, sub_ids, group))| {
460 (topic_filter, clientid, opts, sub_ids, group)
461 }));
462 subs
463 }
464}
465
466pub type SubscriptioRelationsCollectorMap = HashMap<NodeId, SubscriptioRelationsCollector>;
467
468#[allow(clippy::type_complexity)]
469#[derive(Debug, Default)]
470pub struct SubscriptioRelationsCollector {
471 v3_rels: Vec<SubRelation>,
472 v5_rels: HashMap<
473 ClientId,
474 (TopicFilter, SubscriptionOptions, Option<Vec<SubscriptionIdentifier>>, Option<SharedGroupType>),
475 >,
476}
477
478impl SubscriptioRelationsCollector {
479 #[inline]
480 pub fn add(
481 &mut self,
482 topic_filter: &TopicFilter,
483 client_id: ClientId,
484 opts: SubscriptionOptions,
485 group: Option<SharedGroupType>,
486 ) {
487 if opts.is_v3() {
488 self.v3_rels.push((topic_filter.clone(), client_id, opts, None, group));
489 } else {
490 self.v5_rels
492 .entry(client_id)
493 .and_modify(|(_, _, sub_ids, _)| {
494 if let Some(sub_id) = opts.subscription_identifier() {
495 if let Some(sub_ids) = sub_ids {
496 sub_ids.push(sub_id)
497 } else {
498 *sub_ids = Some(vec![sub_id]);
499 }
500 }
501 })
502 .or_insert_with(|| {
503 let sub_ids = opts.subscription_identifier().map(|id| vec![id]);
504 (topic_filter.clone(), opts, sub_ids, group)
505 });
506 }
507 }
508}
509
510#[inline]
511pub fn parse_topic_filter(
512 topic_filter: &ByteString,
513 shared_subscription: bool,
514 limit_subscription: bool,
515) -> Result<(TopicFilter, Option<SharedGroup>, LimitSubsCount)> {
516 let invalid_filter = || anyhow!(format!("Illegal topic filter, {:?}", topic_filter));
517 let (topic, shared_group, limit_subs) = if shared_subscription || limit_subscription {
518 let levels = topic_filter.splitn(3, '/').collect::<Vec<_>>();
519 match (levels.first(), levels.get(1), levels.get(2)) {
520 (Some(&"$share"), group, tf) => match (shared_subscription, group, tf) {
521 (true, Some(group), Some(tf)) => {
522 let tf = TopicFilter::from(*tf);
523 (tf, Some(SharedGroup::from(*group)), None)
524 }
525 (true, _, _) => {
526 return Err(invalid_filter());
527 }
528 (false, _, _) => {
529 return Err(anyhow!(format!("Shared subscription is not enabled, {:?}", topic_filter)));
530 }
531 },
532 (Some(&"$limit"), limit, tf) => match (limit_subscription, limit, tf) {
533 (true, Some(limit), Some(tf)) => {
534 let tf = TopicFilter::from(*tf);
535 let limit = limit.parse::<usize>().map_err(|_| invalid_filter())?;
536 (tf, None, Some(limit))
537 }
538 (true, _, _) => {
539 return Err(invalid_filter());
540 }
541 (false, _, _) => {
542 return Err(anyhow!(format!("Limit subscription is not enabled, {:?}", topic_filter)));
543 }
544 },
545 (Some(&"$exclusive"), _, _) => {
546 if limit_subscription {
547 let tf = TopicFilter::from(topic_filter.trim_start_matches("$exclusive/"));
548 (tf, None, Some(1))
549 } else {
550 return Err(anyhow!(format!("Limit subscription is not enabled, {:?}", topic_filter)));
551 }
552 }
553 _ => (topic_filter.clone(), None, None),
554 }
555 } else {
556 (topic_filter.clone(), None, None)
557 };
558 if topic.is_empty() {
559 return Err(invalid_filter());
560 }
561 Ok((topic, shared_group, limit_subs))
562}
563
564#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
565pub enum SubscriptionOptions {
566 V3(SubOptionsV3),
567 V5(SubOptionsV5),
568}
569
570impl Default for SubscriptionOptions {
571 fn default() -> Self {
572 SubscriptionOptions::V3(SubOptionsV3 {
573 qos: QoS::AtMostOnce,
574 #[cfg(feature = "shared-subscription")]
575 shared_group: None,
576 #[cfg(feature = "limit-subscription")]
577 limit_subs: None,
578 })
579 }
580}
581
582impl SubscriptionOptions {
583 #[inline]
584 pub fn no_local(&self) -> Option<bool> {
585 match self {
586 SubscriptionOptions::V3(_) => None,
587 SubscriptionOptions::V5(opts) => Some(opts.no_local),
588 }
589 }
590
591 #[inline]
592 pub fn retain_as_published(&self) -> Option<bool> {
593 match self {
594 SubscriptionOptions::V3(_) => None,
595 SubscriptionOptions::V5(opts) => Some(opts.retain_as_published),
596 }
597 }
598
599 #[inline]
600 pub fn retain_handling(&self) -> Option<RetainHandling> {
601 match self {
602 SubscriptionOptions::V3(_) => None,
603 SubscriptionOptions::V5(opts) => Some(opts.retain_handling),
604 }
605 }
606
607 #[inline]
608 pub fn subscription_identifier(&self) -> Option<NonZeroU32> {
609 match self {
610 SubscriptionOptions::V3(_) => None,
611 SubscriptionOptions::V5(opts) => opts.id,
612 }
613 }
614
615 #[inline]
616 pub fn qos(&self) -> QoS {
617 match self {
618 SubscriptionOptions::V3(opts) => opts.qos,
619 SubscriptionOptions::V5(opts) => opts.qos,
620 }
621 }
622
623 #[inline]
624 pub fn qos_value(&self) -> u8 {
625 match self {
626 SubscriptionOptions::V3(opts) => opts.qos.value(),
627 SubscriptionOptions::V5(opts) => opts.qos.value(),
628 }
629 }
630
631 #[inline]
632 pub fn set_qos(&mut self, qos: QoS) {
633 match self {
634 SubscriptionOptions::V3(opts) => opts.qos = qos,
635 SubscriptionOptions::V5(opts) => opts.qos = qos,
636 }
637 }
638
639 #[inline]
640 pub fn shared_group(&self) -> Option<&SharedGroup> {
641 #[cfg(feature = "shared-subscription")]
642 match self {
643 SubscriptionOptions::V3(opts) => opts.shared_group.as_ref(),
644 SubscriptionOptions::V5(opts) => opts.shared_group.as_ref(),
645 }
646 #[cfg(not(feature = "shared-subscription"))]
647 None
648 }
649
650 #[inline]
651 #[cfg(feature = "shared-subscription")]
652 pub fn has_shared_group(&self) -> bool {
653 match self {
654 SubscriptionOptions::V3(opts) => opts.shared_group.is_some(),
655 SubscriptionOptions::V5(opts) => opts.shared_group.is_some(),
656 }
657 }
658
659 #[inline]
660 pub fn limit_subs(&self) -> Option<usize> {
661 #[cfg(feature = "limit-subscription")]
662 match self {
663 SubscriptionOptions::V3(opts) => opts.limit_subs,
664 SubscriptionOptions::V5(opts) => opts.limit_subs,
665 }
666 #[cfg(not(feature = "limit-subscription"))]
667 None
668 }
669
670 #[inline]
671 #[cfg(feature = "limit-subscription")]
672 pub fn has_limit_subs(&self) -> bool {
673 match self {
674 SubscriptionOptions::V3(opts) => opts.limit_subs.is_some(),
675 SubscriptionOptions::V5(opts) => opts.limit_subs.is_some(),
676 }
677 }
678
679 #[inline]
680 pub fn is_v3(&self) -> bool {
681 matches!(self, SubscriptionOptions::V3(_))
682 }
683
684 #[inline]
685 pub fn is_v5(&self) -> bool {
686 matches!(self, SubscriptionOptions::V5(_))
687 }
688
689 #[inline]
690 pub fn to_json(&self) -> serde_json::Value {
691 match self {
692 SubscriptionOptions::V3(opts) => opts.to_json(),
693 SubscriptionOptions::V5(opts) => opts.to_json(),
694 }
695 }
696
697 #[inline]
698 pub fn deserialize_qos<'de, D>(deserializer: D) -> std::result::Result<QoS, D::Error>
699 where
700 D: Deserializer<'de>,
701 {
702 let v = u8::deserialize(deserializer)?;
703 Ok(match v {
704 0 => QoS::AtMostOnce,
705 1 => QoS::AtLeastOnce,
706 2 => QoS::ExactlyOnce,
707 _ => return Err(de::Error::custom(format!("invalid QoS value, {v}"))),
708 })
709 }
710
711 #[inline]
712 pub fn serialize_qos<S>(qos: &QoS, s: S) -> std::result::Result<S::Ok, S::Error>
713 where
714 S: Serializer,
715 {
716 qos.value().serialize(s)
717 }
718}
719
720#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
721pub struct SubOptionsV3 {
722 #[serde(
723 serialize_with = "SubscriptionOptions::serialize_qos",
724 deserialize_with = "SubscriptionOptions::deserialize_qos"
725 )]
726 pub qos: QoS,
727 #[cfg(feature = "shared-subscription")]
728 pub shared_group: Option<SharedGroup>,
729 #[cfg(feature = "limit-subscription")]
730 pub limit_subs: LimitSubsCount,
731}
732
733impl SubOptionsV3 {
734 #[inline]
735 pub fn to_json(&self) -> serde_json::Value {
736 #[allow(unused_mut)]
737 let mut obj = json!({
738 "qos": self.qos.value(),
739 });
740 #[cfg(any(feature = "limit-subscription", feature = "shared-subscription"))]
741 if let Some(obj) = obj.as_object_mut() {
742 #[cfg(feature = "shared-subscription")]
743 if let Some(g) = &self.shared_group {
744 obj.insert("group".into(), serde_json::Value::String(g.to_string()));
745 }
746 #[cfg(feature = "limit-subscription")]
747 if let Some(limit_subs) = &self.limit_subs {
748 obj.insert("limit_subs".into(), serde_json::Value::from(*limit_subs));
749 }
750 }
751 obj
752 }
753}
754
755#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
756pub struct SubOptionsV5 {
757 #[serde(
758 serialize_with = "SubscriptionOptions::serialize_qos",
759 deserialize_with = "SubscriptionOptions::deserialize_qos"
760 )]
761 pub qos: QoS,
762 #[cfg(feature = "shared-subscription")]
763 pub shared_group: Option<SharedGroup>,
764 #[cfg(feature = "limit-subscription")]
765 pub limit_subs: LimitSubsCount,
766 pub no_local: bool,
767 pub retain_as_published: bool,
768 #[serde(
769 serialize_with = "SubOptionsV5::serialize_retain_handling",
770 deserialize_with = "SubOptionsV5::deserialize_retain_handling"
771 )]
772 pub retain_handling: RetainHandling,
773 pub id: Option<SubscriptionIdentifier>,
775}
776
777impl SubOptionsV5 {
778 #[inline]
779 pub fn retain_handling_value(&self) -> u8 {
780 match self.retain_handling {
781 RetainHandling::AtSubscribe => 0u8,
782 RetainHandling::AtSubscribeNew => 1u8,
783 RetainHandling::NoAtSubscribe => 2u8,
784 }
785 }
786
787 #[inline]
788 pub fn to_json(&self) -> serde_json::Value {
789 let mut obj = json!({
790 "qos": self.qos.value(),
791 "no_local": self.no_local,
792 "retain_as_published": self.retain_as_published,
793 "retain_handling": self.retain_handling_value(),
794 });
795 if let Some(obj) = obj.as_object_mut() {
796 #[cfg(feature = "shared-subscription")]
797 if let Some(g) = &self.shared_group {
798 obj.insert("group".into(), serde_json::Value::String(g.to_string()));
799 }
800 #[cfg(feature = "limit-subscription")]
801 if let Some(limit_subs) = &self.limit_subs {
802 obj.insert("limit_subs".into(), serde_json::Value::from(*limit_subs));
803 }
804 if let Some(id) = &self.id {
805 obj.insert("id".into(), serde_json::Value::Number(serde_json::Number::from(id.get())));
806 }
807 }
808 obj
809 }
810
811 #[inline]
812 pub fn deserialize_retain_handling<'de, D>(
813 deserializer: D,
814 ) -> std::result::Result<RetainHandling, D::Error>
815 where
816 D: Deserializer<'de>,
817 {
818 let v = u8::deserialize(deserializer)?;
819 Ok(match v {
820 0 => RetainHandling::AtSubscribe,
821 1 => RetainHandling::AtSubscribeNew,
822 2 => RetainHandling::NoAtSubscribe,
823 _ => return Err(de::Error::custom(format!("invalid RetainHandling value, {v}"))),
824 })
825 }
826
827 #[inline]
828 pub fn serialize_retain_handling<S>(rh: &RetainHandling, s: S) -> std::result::Result<S::Ok, S::Error>
829 where
830 S: Serializer,
831 {
832 let v = match rh {
833 RetainHandling::AtSubscribe => 0u8,
834 RetainHandling::AtSubscribeNew => 1u8,
835 RetainHandling::NoAtSubscribe => 2u8,
836 };
837 v.serialize(s)
838 }
839}
840
841impl std::convert::From<(QoS, Option<SharedGroup>, LimitSubsCount)> for SubscriptionOptions {
842 #[inline]
843 fn from(opts: (QoS, Option<SharedGroup>, LimitSubsCount)) -> Self {
844 SubscriptionOptions::V3(SubOptionsV3 {
845 qos: opts.0,
846 #[cfg(feature = "shared-subscription")]
847 shared_group: opts.1,
848 #[cfg(feature = "limit-subscription")]
849 limit_subs: opts.2,
850 })
851 }
852}
853
854impl std::convert::From<(&SubscriptionOptionsV5, Option<SharedGroup>, LimitSubsCount, Option<NonZeroU32>)>
855 for SubscriptionOptions
856{
857 #[inline]
858 fn from(opts: (&SubscriptionOptionsV5, Option<SharedGroup>, LimitSubsCount, Option<NonZeroU32>)) -> Self {
859 SubscriptionOptions::V5(SubOptionsV5 {
860 qos: opts.0.qos,
861 #[cfg(feature = "shared-subscription")]
862 shared_group: opts.1,
863 #[cfg(feature = "limit-subscription")]
864 limit_subs: opts.2,
865 no_local: opts.0.no_local,
866 retain_as_published: opts.0.retain_as_published,
867 retain_handling: opts.0.retain_handling,
868 id: opts.3,
869 })
870 }
871}
872
873#[derive(Clone, Debug, Serialize, Deserialize)]
874pub struct Subscribe {
875 pub topic_filter: TopicFilter,
876 pub opts: SubscriptionOptions,
877}
878
879impl Subscribe {
880 #[inline]
881 pub fn from_v3(
882 topic_filter: &ByteString,
883 qos: QoS,
884 shared_subscription: bool,
885 limit_subscription: bool,
886 ) -> Result<Self> {
887 let (topic_filter, shared_group, limit_subs) =
888 parse_topic_filter(topic_filter, shared_subscription, limit_subscription)?;
889 let opts = (qos, shared_group, limit_subs).into();
890 Ok(Subscribe { topic_filter, opts })
891 }
892
893 #[inline]
894 pub fn from_v5(
895 topic_filter: &ByteString,
896 opts: &SubscriptionOptionsV5,
897 shared_subscription: bool,
898 limit_subscription: bool,
899 sub_id: Option<NonZeroU32>,
900 ) -> Result<Self> {
901 let (topic_filter, shared_group, limit_subs) =
902 parse_topic_filter(topic_filter, shared_subscription, limit_subscription)?;
903 let opts = (opts, shared_group, limit_subs, sub_id).into();
904 Ok(Subscribe { topic_filter, opts })
905 }
906
907 #[inline]
908 #[cfg(feature = "shared-subscription")]
909 pub fn is_shared(&self) -> bool {
910 self.opts.has_shared_group()
911 }
912}
913
914#[derive(Clone, Debug)]
915pub struct SubscribeReturn {
916 pub ack_reason: SubscribeAckReason,
917 pub prev_opts: Option<SubscriptionOptions>,
918}
919
920impl SubscribeReturn {
921 #[inline]
922 pub fn new_success(qos: QoS, prev_opts: Option<SubscriptionOptions>) -> Self {
923 let ack_reason = match qos {
924 QoS::AtMostOnce => SubscribeAckReason::GrantedQos0,
925 QoS::AtLeastOnce => SubscribeAckReason::GrantedQos1,
926 QoS::ExactlyOnce => SubscribeAckReason::GrantedQos2,
927 };
928 Self { ack_reason, prev_opts }
929 }
930
931 #[inline]
932 pub fn new_failure(ack_reason: SubscribeAckReason) -> Self {
933 Self { ack_reason, prev_opts: None }
934 }
935
936 #[inline]
937 pub fn success(&self) -> Option<QoS> {
938 match self.ack_reason {
939 SubscribeAckReason::GrantedQos0 => Some(QoS::AtMostOnce),
940 SubscribeAckReason::GrantedQos1 => Some(QoS::AtLeastOnce),
941 SubscribeAckReason::GrantedQos2 => Some(QoS::ExactlyOnce),
942 _ => None,
943 }
944 }
945
946 #[inline]
947 pub fn failure(&self) -> bool {
948 !matches!(
949 self.ack_reason,
950 SubscribeAckReason::GrantedQos0
951 | SubscribeAckReason::GrantedQos1
952 | SubscribeAckReason::GrantedQos2
953 )
954 }
955
956 #[inline]
957 pub fn into_inner(self) -> SubscribeAckReason {
958 self.ack_reason
959 }
960}
961#[derive(Copy, Clone, Debug, PartialEq, Eq)]
974pub enum ConnectAckReason {
975 V3(ConnectAckReasonV3),
976 V5(ConnectAckReasonV5),
977}
978
979impl ConnectAckReason {
980 #[inline]
981 pub fn success(&self) -> bool {
982 matches!(
983 *self,
984 ConnectAckReason::V3(ConnectAckReasonV3::ConnectionAccepted)
985 | ConnectAckReason::V5(ConnectAckReasonV5::Success)
986 )
987 }
988
989 #[inline]
990 pub fn not_authorized(&self) -> bool {
991 matches!(
992 *self,
993 ConnectAckReason::V3(ConnectAckReasonV3::NotAuthorized)
994 | ConnectAckReason::V3(ConnectAckReasonV3::BadUserNameOrPassword)
995 | ConnectAckReason::V5(ConnectAckReasonV5::NotAuthorized)
996 | ConnectAckReason::V5(ConnectAckReasonV5::BadUserNameOrPassword)
997 )
998 }
999
1000 #[inline]
1001 pub fn success_or_auth_error(&self) -> (bool, bool) {
1002 match *self {
1003 ConnectAckReason::V3(ConnectAckReasonV3::ConnectionAccepted)
1004 | ConnectAckReason::V5(ConnectAckReasonV5::Success) => (true, false),
1005 ConnectAckReason::V3(ConnectAckReasonV3::NotAuthorized)
1006 | ConnectAckReason::V3(ConnectAckReasonV3::BadUserNameOrPassword)
1007 | ConnectAckReason::V5(ConnectAckReasonV5::NotAuthorized)
1008 | ConnectAckReason::V5(ConnectAckReasonV5::BadUserNameOrPassword) => (false, true),
1009 _ => (false, false),
1010 }
1011 }
1012
1013 #[inline]
1014 pub fn reason(&self) -> &'static str {
1015 match *self {
1016 ConnectAckReason::V3(r) => r.reason(),
1017 ConnectAckReason::V5(r) => r.reason(),
1018 }
1019 }
1020}
1021
1022#[derive(Clone, Debug)]
1023pub struct Unsubscribe {
1024 pub topic_filter: TopicFilter,
1025 pub shared_group: Option<SharedGroup>,
1026}
1027
1028impl Unsubscribe {
1029 #[inline]
1030 pub fn from(
1031 topic_filter: &ByteString,
1032 shared_subscription: bool,
1033 limit_subscription: bool,
1034 ) -> Result<Self> {
1035 let (topic_filter, shared_group, _) =
1036 parse_topic_filter(topic_filter, shared_subscription, limit_subscription)?;
1037 Ok(Unsubscribe { topic_filter, shared_group })
1038 }
1039
1040 #[inline]
1041 pub fn is_shared(&self) -> bool {
1042 self.shared_group.is_some()
1043 }
1044}
1045
1046#[derive(Clone)]
1053pub enum LastWill<'a> {
1054 V3(&'a LastWillV3),
1055 V5(&'a LastWillV5),
1056}
1057
1058impl fmt::Debug for LastWill<'_> {
1059 #[inline]
1060 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1061 match self {
1062 LastWill::V3(lw) => f
1063 .debug_struct("LastWill")
1064 .field("topic", &lw.topic)
1065 .field("retain", &lw.retain)
1066 .field("qos", &lw.qos.value())
1067 .field("message", &"<REDACTED>")
1068 .finish(),
1069 LastWill::V5(lw) => f
1070 .debug_struct("LastWill")
1071 .field("topic", &lw.topic)
1072 .field("retain", &lw.retain)
1073 .field("qos", &lw.qos.value())
1074 .field("message", &"<REDACTED>")
1075 .field("will_delay_interval_sec", &lw.will_delay_interval_sec)
1076 .field("correlation_data", &lw.correlation_data)
1077 .field("message_expiry_interval", &lw.message_expiry_interval)
1078 .field("content_type", &lw.content_type)
1079 .field("user_properties", &lw.user_properties)
1080 .field("is_utf8_payload", &lw.is_utf8_payload)
1081 .field("response_topic", &lw.response_topic)
1082 .finish(),
1083 }
1084 }
1085}
1086
1087impl LastWill<'_> {
1088 #[inline]
1089 pub fn will_delay_interval(&self) -> Option<Duration> {
1090 match self {
1091 LastWill::V3(_) => None,
1092 LastWill::V5(lw) => lw.will_delay_interval_sec.map(|i| Duration::from_secs(i as u64)),
1093 }
1094 }
1095
1096 #[inline]
1097 pub fn to_json(&self) -> serde_json::Value {
1098 match self {
1099 LastWill::V3(lw) => {
1100 json!({
1101 "qos": lw.qos.value(),
1102 "retain": lw.retain,
1103 "topic": lw.topic,
1104 "message": BASE64_STANDARD.encode(lw.message.as_ref()),
1105 })
1106 }
1107 LastWill::V5(lw) => {
1108 json!({
1109 "qos": lw.qos.value(),
1110 "retain": lw.retain,
1111 "topic": lw.topic,
1112 "message": BASE64_STANDARD.encode(lw.message.as_ref()),
1113
1114 "will_delay_interval_sec": lw.will_delay_interval_sec,
1115 "correlation_data": lw.correlation_data,
1116 "message_expiry_interval": lw.message_expiry_interval,
1117 "content_type": lw.content_type,
1118 "user_properties": lw.user_properties,
1119 "is_utf8_payload": lw.is_utf8_payload,
1120 "response_topic": lw.response_topic,
1121 })
1122 }
1123 }
1124 }
1125}
1126
1127impl Serialize for LastWill<'_> {
1128 #[inline]
1129 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1130 where
1131 S: Serializer,
1132 {
1133 match self {
1134 LastWill::V3(lw) => {
1135 let mut s = serializer.serialize_struct("LastWill", 4)?;
1136 s.serialize_field("qos", &lw.qos.value())?;
1137 s.serialize_field("retain", &lw.retain)?;
1138 s.serialize_field("topic", &lw.topic)?;
1139 s.serialize_field("message", &lw.message)?;
1140 s.end()
1141 }
1142 LastWill::V5(lw) => {
1143 let mut s = serializer.serialize_struct("LastWill", 11)?;
1144 s.serialize_field("qos", &lw.qos.value())?;
1145 s.serialize_field("retain", &lw.retain)?;
1146 s.serialize_field("topic", &lw.topic)?;
1147 s.serialize_field("message", &lw.message)?;
1148
1149 s.serialize_field("will_delay_interval_sec", &lw.will_delay_interval_sec)?;
1150 s.serialize_field("correlation_data", &lw.correlation_data)?;
1151 s.serialize_field("message_expiry_interval", &lw.message_expiry_interval)?;
1152 s.serialize_field("content_type", &lw.content_type)?;
1153 s.serialize_field("user_properties", &lw.user_properties)?;
1154 s.serialize_field("is_utf8_payload", &lw.is_utf8_payload)?;
1155 s.serialize_field("response_topic", &lw.response_topic)?;
1156
1157 s.end()
1158 }
1159 }
1160 }
1161}
1162
1163#[derive(Serialize, Deserialize, Clone, Debug)]
1164pub struct Publish {
1169 pub inner: Box<CodecPublish>,
1171
1172 pub target_clientid: Option<ClientId>,
1175
1176 pub delay_interval: Option<u32>,
1178
1179 pub create_time: Option<i64>,
1181}
1182
1183impl Publish {
1184 #[inline]
1185 pub fn target_clientid(mut self, target_clientid: ClientId) -> Self {
1186 self.target_clientid = Some(target_clientid);
1187 self
1188 }
1189
1190 #[inline]
1191 pub fn delay_interval(mut self, delay_interval: u32) -> Self {
1192 self.delay_interval = Some(delay_interval);
1193 self
1194 }
1195
1196 #[inline]
1197 pub fn create_time(mut self, create_time: i64) -> Self {
1198 self.create_time = Some(create_time);
1199 self
1200 }
1201
1202 #[inline]
1203 pub fn new(
1204 inner: Box<CodecPublish>,
1205 target_clientid: Option<ClientId>,
1206 delay_interval: Option<u32>,
1207 create_time: Option<i64>,
1208 ) -> Self {
1209 Publish { inner, target_clientid, delay_interval, create_time }
1210 }
1211
1212 #[inline]
1213 pub fn take(self) -> Box<CodecPublish> {
1214 self.inner
1215 }
1216
1217 #[inline]
1218 pub fn take_topic(self) -> ByteString {
1219 self.inner.topic
1220 }
1221
1222 #[inline]
1223 pub fn take_payload(self) -> Bytes {
1224 self.inner.payload
1225 }
1226}
1227
1228impl Deref for Publish {
1229 type Target = CodecPublish;
1230 #[inline]
1231 fn deref(&self) -> &Self::Target {
1232 self.inner.as_ref()
1233 }
1234}
1235
1236impl DerefMut for Publish {
1237 #[inline]
1238 fn deref_mut(&mut self) -> &mut Self::Target {
1239 self.inner.as_mut()
1240 }
1241}
1242
1243impl std::convert::From<CodecPublish> for Publish {
1244 fn from(p: CodecPublish) -> Self {
1245 Publish { inner: Box::new(p), target_clientid: None, delay_interval: None, create_time: None }
1246 }
1247}
1248
1249impl std::convert::From<Box<CodecPublish>> for Publish {
1250 fn from(p: Box<CodecPublish>) -> Self {
1251 Publish { inner: p, target_clientid: None, delay_interval: None, create_time: None }
1252 }
1253}
1254
1255impl<'a> std::convert::TryFrom<LastWill<'a>> for Publish {
1256 type Error = MqttError;
1257
1258 #[inline]
1259 fn try_from(lw: LastWill<'a>) -> std::result::Result<Self, Self::Error> {
1260 let (retain, qos, topic, payload, props) = match lw {
1261 LastWill::V3(lw) => {
1262 let (topic, user_properties) = if let Some(pos) = lw.topic.find('?') {
1263 let topic = lw.topic.clone();
1264 let query = lw.topic.as_bytes().slice(pos + 1..lw.topic.len());
1265 let user_props = url::form_urlencoded::parse(query.as_ref())
1266 .into_owned()
1267 .map(|(key, val)| (ByteString::from(key), ByteString::from(val)))
1268 .collect::<UserProperties>();
1269 (topic, user_props)
1270 } else {
1271 let topic = lw.topic.clone();
1272 (topic, UserProperties::default())
1273 };
1274 let props = PublishProperties { user_properties, ..Default::default() };
1275 (lw.retain, lw.qos, topic, lw.message.clone(), props)
1276 }
1277 LastWill::V5(lw) => {
1278 let topic = lw.topic.clone();
1279 let props = PublishProperties {
1280 correlation_data: lw.correlation_data.clone(),
1281 message_expiry_interval: lw.message_expiry_interval,
1282 content_type: lw.content_type.clone(),
1283 user_properties: lw.user_properties.clone(),
1284 is_utf8_payload: lw.is_utf8_payload.unwrap_or_default(),
1285 response_topic: lw.response_topic.clone(),
1286 ..Default::default()
1287 };
1288 (lw.retain, lw.qos, topic, lw.message.clone(), props)
1289 }
1290 };
1291
1292 let p = CodecPublish {
1293 dup: false,
1294 retain,
1295 qos,
1296 topic,
1297 packet_id: None,
1298 payload,
1299 properties: Some(props),
1300 };
1301 let p = <CodecPublish as Into<Publish>>::into(p).create_time(timestamp_millis());
1302 Ok(p)
1303 }
1304}
1305
1306pub enum Sink<Io> {
1307 V3(v3::MqttStream<Io>),
1308 V5(v5::MqttStream<Io>),
1309}
1310
1311impl<Io> Sink<Io>
1312where
1313 Io: AsyncRead + AsyncWrite + Unpin,
1314{
1315 #[inline]
1316 pub(crate) fn v3_mut(&mut self) -> &mut v3::MqttStream<Io> {
1317 if let Sink::V3(s) = self {
1318 s
1319 } else {
1320 unreachable!()
1321 }
1322 }
1323
1324 #[inline]
1325 pub(crate) fn v5_mut(&mut self) -> &mut v5::MqttStream<Io> {
1326 if let Sink::V5(s) = self {
1327 s
1328 } else {
1329 unreachable!()
1330 }
1331 }
1332
1333 #[inline]
1334 pub(crate) async fn recv(&mut self) -> Result<Option<Packet>> {
1335 match self {
1336 Sink::V3(s) => match s.next().await {
1337 Some(Ok(pkt)) => Ok(Some(Packet::V3(pkt))),
1338 Some(Err(e)) => Err(e),
1339 None => Ok(None),
1340 },
1341 Sink::V5(s) => match s.next().await {
1342 Some(Ok(pkt)) => Ok(Some(Packet::V5(pkt))),
1343 Some(Err(e)) => Err(e),
1344 None => Ok(None),
1345 },
1346 }
1347 }
1348
1349 #[inline]
1350 #[allow(dead_code)]
1351 pub(crate) async fn close(&mut self) -> Result<()> {
1352 match self {
1353 Sink::V3(s) => {
1354 s.close().await?;
1355 }
1356 Sink::V5(s) => s.close().await?,
1357 }
1358 Ok(())
1359 }
1360
1361 #[inline]
1362 pub(crate) async fn publish(
1363 &mut self,
1364 mut p: Publish,
1365 message_expiry_interval: Option<NonZeroU32>,
1366 server_topic_aliases: Option<&Arc<ServerTopicAliases>>,
1367 ) -> Result<()> {
1368 match self {
1369 Sink::V3(s) => {
1370 s.send_publish(p.take()).await?;
1371 }
1372 Sink::V5(s) => {
1373 let (topic, alias) = {
1374 if let Some(server_topic_aliases) = server_topic_aliases {
1375 server_topic_aliases.get(p.topic.clone()).await
1376 } else {
1377 (Some(p.topic.clone()), None)
1378 }
1379 };
1380
1381 p.topic = topic.unwrap_or_default();
1382
1383 if let Some(properties) = &mut p.properties {
1384 properties.message_expiry_interval = message_expiry_interval;
1385 properties.topic_alias = alias;
1386 }
1387 s.send_publish(p.take()).await?;
1388 }
1389 }
1390 Ok(())
1391 }
1392
1393 #[inline]
1394 pub(crate) async fn send_publish_ack(
1395 &mut self,
1396 packet_id: NonZeroU16,
1397 pubres: PublishResult,
1398 ) -> Result<()> {
1399 match self {
1400 Sink::V3(s) => {
1401 s.send_publish_ack(packet_id).await?;
1402 }
1403 Sink::V5(s) => {
1404 let ack = PublishAckV5 {
1405 packet_id,
1406 reason_code: pubres.reason_code,
1407 properties: pubres.properties,
1408 reason_string: pubres.reason_string,
1409 };
1410 s.send_publish_ack(ack).await?;
1411 }
1412 }
1413 Ok(())
1414 }
1415
1416 #[inline]
1417 pub(crate) async fn send_publish_received(
1418 &mut self,
1419 packet_id: NonZeroU16,
1420 pubres: PublishResult,
1421 ) -> Result<()> {
1422 match self {
1423 Sink::V3(s) => {
1424 s.send_publish_received(packet_id).await?;
1425 }
1426 Sink::V5(s) => {
1427 let ack = PublishAckV5 {
1428 packet_id,
1429 reason_code: pubres.reason_code,
1430 properties: pubres.properties,
1431 reason_string: pubres.reason_string,
1432 };
1433 s.send_publish_received(ack).await?;
1434 }
1435 }
1436 Ok(())
1437 }
1438
1439 #[inline]
1440 #[allow(dead_code)]
1441 pub(crate) async fn send_publish_release(&mut self, packet_id: NonZeroU16) -> Result<()> {
1442 match self {
1443 Sink::V3(s) => {
1444 s.send_publish_release(packet_id).await?;
1445 }
1446 Sink::V5(s) => {
1447 let ack2 =
1448 PublishAck2 { packet_id, reason_code: PublishAck2Reason::Success, ..Default::default() };
1449 s.send_publish_release(ack2).await?;
1450 }
1451 }
1452 Ok(())
1453 }
1454
1455 #[inline]
1456 #[allow(dead_code)]
1457 pub(crate) async fn send(&mut self, p: Packet) -> Result<()> {
1458 match self {
1459 Sink::V3(s) => {
1460 if let Packet::V3(p) = p {
1461 s.send(p).await?;
1462 }
1463 }
1464 Sink::V5(s) => {
1465 if let Packet::V5(p) = p {
1466 s.send(p).await?;
1467 }
1468 }
1469 }
1470 Ok(())
1471 }
1472}
1473
1474#[derive(Debug, Clone, Deserialize, Serialize)]
1475pub struct PublishResult {
1476 pub reason_code: PublishAckReason,
1477 pub properties: UserProperties,
1478 pub reason_string: Option<ByteString>,
1479 pub disconnect: IsDisconnect,
1480}
1481
1482impl PublishResult {
1483 #[inline]
1484 pub fn success() -> PublishResult {
1485 PublishResult { reason_code: PublishAckReason::Success, ..Default::default() }
1486 }
1487
1488 #[inline]
1489 pub fn reason_code(
1490 reason_code: PublishAckReason,
1491 reason_string: Option<ByteString>,
1492 disconnect: IsDisconnect,
1493 ) -> PublishResult {
1494 PublishResult { reason_code, reason_string, disconnect, ..Default::default() }
1495 }
1496
1497 #[inline]
1498 pub fn is_success(&self) -> bool {
1499 matches!(self.reason_code, PublishAckReason::Success)
1500 }
1501}
1502
1503impl Default for PublishResult {
1504 fn default() -> Self {
1505 Self {
1506 reason_code: PublishAckReason::Success,
1507 properties: UserProperties::default(),
1508 reason_string: None,
1509 disconnect: false,
1510 }
1511 }
1512}
1513
1514#[allow(clippy::large_enum_variant)]
1515#[derive(Debug, Clone)]
1516pub enum Packet {
1517 V3(codec::v3::Packet),
1518 V5(codec::v5::Packet),
1519}
1520
1521#[derive(GetSize, Debug, Clone, Copy, Deserialize, Serialize)]
1522pub enum FromType {
1523 Custom,
1524 Admin,
1525 System,
1526 LastWill,
1527 Bridge,
1528}
1529
1530impl FromType {
1531 #[inline]
1532 pub fn as_str(&self) -> &str {
1533 match self {
1534 FromType::Custom => "custom",
1535 FromType::Admin => "admin",
1536 FromType::System => "system",
1537 FromType::LastWill => "lastwill",
1538 FromType::Bridge => "bridge",
1539 }
1540 }
1541}
1542
1543impl std::fmt::Display for FromType {
1544 #[inline]
1545 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1546 write!(f, "{}", self.as_str())
1547 }
1548}
1549
1550#[derive(GetSize, Clone, Deserialize, Serialize)]
1551pub struct From {
1552 typ: FromType,
1553 pub id: Id,
1554}
1555
1556impl From {
1557 #[inline]
1558 pub fn from_custom(id: Id) -> From {
1559 From { typ: FromType::Custom, id }
1560 }
1561
1562 #[inline]
1563 pub fn from_admin(id: Id) -> From {
1564 From { typ: FromType::Admin, id }
1565 }
1566
1567 #[inline]
1568 pub fn from_bridge(id: Id) -> From {
1569 From { typ: FromType::Bridge, id }
1570 }
1571
1572 #[inline]
1573 pub fn from_system(id: Id) -> From {
1574 From { typ: FromType::System, id }
1575 }
1576
1577 #[inline]
1578 pub fn from_lastwill(id: Id) -> From {
1579 From { typ: FromType::LastWill, id }
1580 }
1581
1582 #[inline]
1583 pub fn typ(&self) -> FromType {
1584 self.typ
1585 }
1586
1587 #[inline]
1588 pub fn is_system(&self) -> bool {
1589 matches!(self.typ, FromType::System)
1590 }
1591
1592 #[inline]
1593 pub fn is_custom(&self) -> bool {
1594 matches!(self.typ, FromType::Custom)
1595 }
1596
1597 #[inline]
1598 pub fn to_from_json(&self, json: serde_json::Value) -> serde_json::Value {
1599 let mut json = self.id.to_from_json(json);
1600 if let Some(obj) = json.as_object_mut() {
1601 obj.insert("from_type".into(), serde_json::Value::String(self.typ.to_string()));
1602 }
1603 json
1604 }
1605}
1606
1607impl Deref for From {
1608 type Target = Id;
1609 #[inline]
1610 fn deref(&self) -> &Self::Target {
1611 &self.id
1612 }
1613}
1614
1615impl std::fmt::Debug for From {
1616 #[inline]
1617 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1618 write!(f, "{}-{:?}", self.id, self.typ)
1619 }
1620}
1621
1622pub type To = Id;
1623
1624#[derive(Clone)]
1625pub struct Id(Arc<_Id>);
1626
1627impl get_size::GetSize for Id {
1628 fn get_heap_size(&self) -> usize {
1629 self.0.get_heap_size()
1630 }
1631}
1632
1633impl Id {
1634 #[inline]
1635 pub fn new(
1636 node_id: NodeId,
1637 lid: ListenerId,
1638 local_addr: Option<SocketAddr>,
1639 remote_addr: Option<SocketAddr>,
1640 client_id: ClientId,
1641 username: Option<UserName>,
1642 ) -> Self {
1643 Self(Arc::new(_Id {
1644 node_id,
1645 lid,
1646 local_addr,
1647 remote_addr,
1648 client_id,
1649 username,
1650 create_time: timestamp_millis(),
1651 }))
1652 }
1653
1654 #[inline]
1655 pub fn to_json(&self) -> serde_json::Value {
1656 json!({
1657 "node": self.node(),
1658 "ipaddress": self.remote_addr,
1659 "clientid": self.client_id,
1660 "username": self.username_ref(),
1661 "create_time": self.create_time,
1662 })
1663 }
1664
1665 #[inline]
1666 pub fn to_from_json(&self, mut json: serde_json::Value) -> serde_json::Value {
1667 if let Some(obj) = json.as_object_mut() {
1668 obj.insert("from_node".into(), serde_json::Value::Number(serde_json::Number::from(self.node())));
1669 obj.insert(
1670 "from_ipaddress".into(),
1671 self.remote_addr
1672 .map(|a| serde_json::Value::String(a.to_string()))
1673 .unwrap_or(serde_json::Value::Null),
1674 );
1675 obj.insert("from_clientid".into(), serde_json::Value::String(self.client_id.to_string()));
1676 obj.insert("from_username".into(), serde_json::Value::String(self.username_ref().into()));
1677 }
1678 json
1679 }
1680
1681 #[inline]
1682 pub fn to_to_json(&self, mut json: serde_json::Value) -> serde_json::Value {
1683 if let Some(obj) = json.as_object_mut() {
1684 obj.insert("node".into(), serde_json::Value::Number(serde_json::Number::from(self.node())));
1685 obj.insert(
1686 "ipaddress".into(),
1687 self.remote_addr
1688 .map(|a| serde_json::Value::String(a.to_string()))
1689 .unwrap_or(serde_json::Value::Null),
1690 );
1691 obj.insert("clientid".into(), serde_json::Value::String(self.client_id.to_string()));
1692 obj.insert("username".into(), serde_json::Value::String(self.username_ref().into()));
1693 }
1694 json
1695 }
1696
1697 #[inline]
1698 pub fn from(node_id: NodeId, client_id: ClientId) -> Self {
1699 Self::new(node_id, 0, None, None, client_id, None)
1700 }
1701
1702 #[inline]
1703 pub fn node(&self) -> NodeId {
1704 self.node_id
1705 }
1706
1707 #[inline]
1708 pub fn lid(&self) -> ListenerId {
1709 self.lid
1710 }
1711
1712 #[inline]
1713 pub fn username(&self) -> UserName {
1714 self.username.clone().unwrap_or_else(|| UserName::from_static(UNDEFINED))
1715 }
1716
1717 #[inline]
1718 pub fn username_ref(&self) -> &str {
1719 self.username.as_ref().map(<UserName as AsRef<str>>::as_ref).unwrap_or_else(|| UNDEFINED)
1720 }
1721}
1722
1723impl Display for Id {
1724 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1725 write!(
1726 f,
1727 "{}@{}:{}/{}/{}/{}/{}",
1728 self.node_id,
1729 self.local_addr.map(|addr| addr.ip().to_string()).unwrap_or_default(),
1730 self.lid,
1731 self.remote_addr.map(|addr| addr.to_string()).unwrap_or_default(),
1732 self.client_id,
1733 self.username_ref(),
1734 self.create_time
1735 )
1736 }
1737}
1738
1739impl std::fmt::Debug for Id {
1740 #[inline]
1741 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1742 write!(f, "{self}")
1743 }
1744}
1745
1746impl PartialEq<Id> for Id {
1747 #[inline]
1748 fn eq(&self, o: &Id) -> bool {
1749 self.node_id == o.node_id
1750 && self.lid == o.lid
1751 && self.client_id == o.client_id
1752 && self.local_addr == o.local_addr
1753 && self.remote_addr == o.remote_addr
1754 && self.username == o.username
1755 && self.create_time == o.create_time
1756 }
1757}
1758
1759impl Eq for Id {}
1760
1761impl std::hash::Hash for Id {
1762 #[inline]
1763 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1764 self.node_id.hash(state);
1765 self.lid.hash(state);
1766 self.local_addr.hash(state);
1767 self.remote_addr.hash(state);
1768 self.client_id.hash(state);
1769 self.username.hash(state);
1770 self.create_time.hash(state);
1771 }
1772}
1773
1774impl Deref for Id {
1775 type Target = _Id;
1776 #[inline]
1777 fn deref(&self) -> &Self::Target {
1778 &self.0
1779 }
1780}
1781
1782impl Serialize for Id {
1783 #[inline]
1784 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1785 where
1786 S: Serializer,
1787 {
1788 _Id::serialize(self.0.as_ref(), serializer)
1789 }
1790}
1791
1792impl<'de> Deserialize<'de> for Id {
1793 #[inline]
1794 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1795 where
1796 D: Deserializer<'de>,
1797 {
1798 Ok(Id(Arc::new(_Id::deserialize(deserializer)?)))
1799 }
1800}
1801
1802#[derive(Debug, PartialEq, Eq, Hash, Clone, GetSize, Deserialize, Serialize)]
1803pub struct _Id {
1804 pub node_id: NodeId,
1805 pub lid: ListenerId,
1806 #[get_size(size_fn = get_option_addr_size_helper)]
1807 pub local_addr: Option<SocketAddr>,
1808 #[get_size(size_fn = get_option_addr_size_helper)]
1809 pub remote_addr: Option<SocketAddr>,
1810 #[get_size(size_fn = get_bytestring_size_helper)]
1811 pub client_id: ClientId,
1812 #[get_size(size_fn = get_option_bytestring_size_helper)]
1813 pub username: Option<UserName>,
1814 pub create_time: TimestampMillis,
1815}
1816
1817fn get_bytestring_size_helper(s: &ByteString) -> usize {
1818 s.len()
1819}
1820
1821fn get_option_bytestring_size_helper(s: &Option<ByteString>) -> usize {
1822 if let Some(s) = s {
1823 s.len()
1824 } else {
1825 0
1826 }
1827}
1828
1829fn get_option_addr_size_helper(s: &Option<SocketAddr>) -> usize {
1830 if let Some(s) = s {
1831 match s {
1832 SocketAddr::V4(s) => size_of_val(s),
1833 SocketAddr::V6(s) => size_of_val(s),
1834 }
1835 } else {
1836 0
1837 }
1838}
1839
1840#[derive(Debug, Clone, Deserialize, Serialize)]
1841pub struct Retain {
1842 pub msg_id: Option<MsgID>,
1843 pub from: From,
1844 pub publish: Publish,
1845}
1846
1847pub type MsgID = usize;
1848
1849#[derive(Debug, Clone, Deserialize, Serialize, GetSize)]
1850pub struct StoredMessage {
1851 pub msg_id: MsgID,
1852 pub from: From,
1853 #[get_size(size_fn = get_publish_size_helper)]
1854 pub publish: Publish,
1855 pub expiry_time_at: TimestampMillis,
1856}
1857
1858fn get_bytes_size_helper(s: &Bytes) -> usize {
1859 s.len()
1860}
1861
1862fn get_properties_size_helper(s: &PublishProperties) -> usize {
1863 s.content_type.as_ref().map(|ct| ct.len()).unwrap_or_default()
1864 + s.correlation_data.as_ref().map(|cd| cd.len()).unwrap_or_default()
1865 + s.user_properties.len() * size_of::<UserProperty>()
1866 + s.response_topic.as_ref().map(|rt| rt.len()).unwrap_or_default()
1867 + s.subscription_ids.len() * size_of::<NonZeroU32>()
1868}
1869
1870fn get_publish_size_helper(p: &Publish) -> usize {
1871 p.packet_id.get_heap_size()
1873 + get_bytes_size_helper(&p.payload)
1875 + get_bytestring_size_helper(&p.topic)
1877 + size_of_val(&p.qos)
1878 + p.properties.as_ref().map(get_properties_size_helper).unwrap_or_default()
1879}
1880
1881impl StoredMessage {
1882 #[inline]
1883 pub fn is_expiry(&self) -> bool {
1884 self.expiry_time_at < timestamp_millis()
1885 }
1886
1887 #[inline]
1888 pub fn encode(&self) -> Result<Vec<u8>> {
1889 Ok(bincode::serialize(&self)?)
1890 }
1891
1892 #[inline]
1893 pub fn decode(data: &[u8]) -> Result<Self> {
1894 Ok(bincode::deserialize(data)?)
1895 }
1896}
1897
1898#[derive(Debug)]
1899pub enum Message {
1900 Forward(From, Publish),
1901 SendRerelease(OutInflightMessage),
1902 Kick(oneshot::Sender<()>, Id, CleanStart, IsAdmin),
1903 Closed(Reason),
1905 Subscribe(Subscribe, oneshot::Sender<Result<SubscribeReturn>>),
1906 Subscribes(Vec<Subscribe>, Option<oneshot::Sender<Vec<Result<SubscribeReturn>>>>),
1907 Unsubscribe(Unsubscribe, oneshot::Sender<Result<()>>),
1908 SessionStateTransfer(OfflineInfo, CleanStart),
1909}
1910
1911#[derive(Serialize, Deserialize, Debug, Clone)]
1912pub struct SessionStatus {
1913 pub id: Id,
1914 pub online: IsOnline,
1915 pub handshaking: bool,
1916}
1917
1918#[derive(Deserialize, Serialize, Debug, Default, Clone)]
1919pub struct SubsSearchParams {
1920 #[serde(default)]
1921 pub _limit: usize,
1922 pub clientid: Option<String>,
1923 pub topic: Option<String>,
1924 pub qos: Option<u8>,
1926 pub share: Option<SharedGroup>,
1927 pub _match_topic: Option<String>,
1928}
1929
1930#[derive(Deserialize, Serialize, Debug, Default)]
1931pub struct SubsSearchResult {
1932 pub node_id: NodeId,
1933 pub clientid: ClientId,
1934 pub client_addr: Option<SocketAddr>,
1935 pub topic: TopicFilter,
1936 pub opts: SubscriptionOptions,
1937}
1938
1939impl SubsSearchResult {
1940 #[inline]
1941 pub fn to_json(self) -> serde_json::Value {
1942 json!({
1943 "node_id": self.node_id,
1944 "clientid": self.clientid,
1945 "client_addr": self.client_addr,
1946 "topic": self.topic,
1947 "opts": self.opts.to_json(),
1948 })
1949 }
1950}
1951
1952#[derive(Deserialize, Serialize, Debug, Default, PartialEq, Eq, Hash, Clone)]
1953pub struct Route {
1954 pub node_id: NodeId,
1955 pub topic: TopicFilter,
1956}
1957
1958pub type SessionSubMap = HashMap<TopicFilter, SubscriptionOptions>;
1959#[derive(Clone)]
1960pub struct SessionSubs {
1961 subs: Arc<RwLock<SessionSubMap>>,
1962}
1963
1964impl Deref for SessionSubs {
1965 type Target = Arc<RwLock<SessionSubMap>>;
1966 #[inline]
1967 fn deref(&self) -> &Self::Target {
1968 &self.subs
1969 }
1970}
1971
1972impl Default for SessionSubs {
1973 fn default() -> Self {
1974 Self::new()
1975 }
1976}
1977
1978impl SessionSubs {
1979 #[inline]
1980 pub fn new() -> Self {
1981 Self::from(SessionSubMap::default())
1982 }
1983
1984 #[inline]
1985 #[allow(clippy::mutable_key_type)]
1986 pub fn from(subs: SessionSubMap) -> Self {
1987 Self { subs: Arc::new(RwLock::new(subs)) }
1988 }
1989
1990 #[inline]
1991 #[allow(unused_variables)]
1992 pub(crate) async fn _add(
1993 &self,
1994 scx: &ServerContext,
1995 topic_filter: TopicFilter,
1996 opts: SubscriptionOptions,
1997 ) -> Option<SubscriptionOptions> {
1998 #[cfg(feature = "shared-subscription")]
1999 let is_shared = opts.has_shared_group();
2000
2001 let prev = {
2002 let mut subs = self.subs.write().await;
2003 let prev = subs.insert(topic_filter, opts);
2004 subs.shrink_to_fit();
2005 prev
2006 };
2007
2008 if let Some(prev_opts) = &prev {
2009 #[cfg(feature = "shared-subscription")]
2010 match (prev_opts.has_shared_group(), is_shared) {
2011 (true, false) => {
2012 #[cfg(feature = "stats")]
2013 scx.stats.subscriptions_shared.dec();
2014 }
2015 (false, true) => {
2016 #[cfg(feature = "stats")]
2017 scx.stats.subscriptions_shared.inc();
2018 }
2019 (false, false) => {}
2020 (true, true) => {}
2021 }
2022 } else {
2023 #[cfg(feature = "stats")]
2024 scx.stats.subscriptions.inc();
2025 #[cfg(feature = "shared-subscription")]
2026 if is_shared {
2027 #[cfg(feature = "stats")]
2028 scx.stats.subscriptions_shared.inc();
2029 }
2030 }
2031
2032 prev
2033 }
2034
2035 #[inline]
2036 pub(crate) async fn _remove(
2037 &self,
2038 #[allow(unused_variables)] scx: &ServerContext,
2039 topic_filter: &str,
2040 ) -> Option<(TopicFilter, SubscriptionOptions)> {
2041 let removed = {
2042 let mut subs = self.subs.write().await;
2043 let removed = subs.remove_entry(topic_filter);
2044 subs.shrink_to_fit();
2045 removed
2046 };
2047
2048 #[allow(unused_variables)]
2049 if let Some((_, opts)) = &removed {
2050 #[cfg(feature = "stats")]
2051 scx.stats.subscriptions.dec();
2052 #[cfg(feature = "shared-subscription")]
2053 if opts.has_shared_group() {
2054 #[cfg(feature = "stats")]
2055 scx.stats.subscriptions_shared.dec();
2056 }
2057 }
2058
2059 removed
2060 }
2061
2062 #[inline]
2063 pub(crate) async fn _drain(&self, scx: &ServerContext) -> Subscriptions {
2064 let topic_filters = self.subs.read().await.keys().cloned().collect::<Vec<_>>();
2065 let mut subs = Vec::new();
2066 for tf in topic_filters {
2067 if let Some(sub) = self._remove(scx, &tf).await {
2068 subs.push(sub);
2069 }
2070 }
2071 subs
2072 }
2073
2074 #[inline]
2075 pub(crate) async fn _extend(&self, scx: &ServerContext, subs: Subscriptions) {
2076 for (topic_filter, opts) in subs {
2077 self._add(scx, topic_filter, opts).await;
2078 }
2079 }
2080
2081 #[inline]
2082 pub async fn clear(&self, #[allow(unused_variables)] scx: &ServerContext) {
2083 {
2084 let subs = self.subs.read().await;
2085 #[allow(unused_variables)]
2086 for (_, opts) in subs.iter() {
2087 #[cfg(feature = "stats")]
2088 scx.stats.subscriptions.dec();
2089 #[cfg(feature = "shared-subscription")]
2090 if opts.has_shared_group() {
2091 #[cfg(feature = "stats")]
2092 scx.stats.subscriptions_shared.dec();
2093 }
2094 }
2095 }
2096 let mut subs = self.subs.write().await;
2097 subs.clear();
2098 subs.shrink_to_fit();
2099 }
2100
2101 #[inline]
2102 pub async fn len(&self) -> usize {
2103 self.subs.read().await.len()
2104 }
2105
2106 #[inline]
2107 pub async fn shared_len(&self) -> usize {
2108 #[cfg(feature = "shared-subscription")]
2109 {
2110 self.subs.read().await.iter().filter(|(_, opts)| opts.has_shared_group()).count()
2111 }
2112 #[cfg(not(feature = "shared-subscription"))]
2113 {
2114 0
2115 }
2116 }
2117
2118 #[inline]
2119 pub async fn is_empty(&self) -> bool {
2120 self.subs.read().await.is_empty()
2121 }
2122
2123 #[inline]
2124 pub async fn to_topic_filters(&self) -> TopicFilters {
2125 self.subs.read().await.keys().cloned().collect()
2126 }
2127}
2128
2129pub struct ExtraAttrs {
2210 attrs: HashMap<String, Box<dyn Any + Sync + Send>>,
2211}
2212
2213impl Default for ExtraAttrs {
2214 fn default() -> Self {
2215 Self::new()
2216 }
2217}
2218
2219impl ExtraAttrs {
2220 #[inline]
2221 pub fn new() -> Self {
2222 Self { attrs: HashMap::default() }
2223 }
2224
2225 #[inline]
2226 pub fn len(&self) -> usize {
2227 self.attrs.len()
2228 }
2229
2230 #[inline]
2231 pub fn is_empty(&self) -> bool {
2232 self.attrs.is_empty()
2233 }
2234
2235 #[inline]
2236 pub fn clear(&mut self) {
2237 self.attrs.clear()
2238 }
2239
2240 #[inline]
2241 pub fn insert<T: Any + Sync + Send>(&mut self, key: String, value: T) {
2242 self.attrs.insert(key, Box::new(value));
2243 }
2244
2245 #[inline]
2246 pub fn get<T: Any + Sync + Send>(&self, key: &str) -> Option<&T> {
2247 self.attrs.get(key).and_then(|v| v.downcast_ref::<T>())
2248 }
2249
2250 #[inline]
2251 pub fn get_mut<T: Any + Sync + Send>(&mut self, key: &str) -> Option<&mut T> {
2252 self.attrs.get_mut(key).and_then(|v| v.downcast_mut::<T>())
2253 }
2254
2255 #[inline]
2256 pub fn get_default_mut<T: Any + Sync + Send, F: Fn() -> T>(
2257 &mut self,
2258 key: String,
2259 def_fn: F,
2260 ) -> Option<&mut T> {
2261 self.attrs.entry(key).or_insert_with(|| Box::new(def_fn())).downcast_mut::<T>()
2262 }
2263
2264 #[inline]
2265 pub fn serialize_key<S, T>(&self, key: &str, serializer: S) -> std::result::Result<S::Ok, S::Error>
2266 where
2267 T: Any + Sync + Send + serde::ser::Serialize,
2268 S: serde::ser::Serializer,
2269 {
2270 self.get::<T>(key).serialize(serializer)
2271 }
2272}
2273
2274#[derive(Clone, Debug)]
2275pub struct TimedValue<V>(V, Option<Instant>);
2276
2277impl<V> TimedValue<V> {
2278 pub fn new(value: V, timeout_duration: Option<Duration>) -> Self {
2279 TimedValue(value, timeout_duration.map(|t| Instant::now() + t))
2280 }
2281
2282 pub fn value(&self) -> &V {
2283 &self.0
2284 }
2285
2286 pub fn value_mut(&mut self) -> &mut V {
2287 &mut self.0
2288 }
2289
2290 pub fn into_value(self) -> V {
2291 self.0
2292 }
2293
2294 pub fn is_expired(&self) -> bool {
2295 self.1.map(|e| Instant::now() >= e).unwrap_or(false)
2296 }
2297}
2298
2299impl<V> PartialEq for TimedValue<V>
2300where
2301 V: PartialEq,
2302{
2303 fn eq(&self, other: &TimedValue<V>) -> bool {
2304 self.value() == other.value()
2305 }
2306}
2307
2308bitflags! {
2311 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
2312 pub struct StateFlags: u8 {
2313 const Kicked = 0b00000001;
2314 const ByAdminKick = 0b00000010;
2315 const DisconnectReceived = 0b00000100;
2316 const CleanStart = 0b00001000;
2317 const Ping = 0b00010000;
2318 }
2319}
2320
2321bitflags! {
2322 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
2323 pub struct SessionStateFlags: u8 {
2324 const SessionPresent = 0b00000001;
2325 const Superuser = 0b00000010;
2326 const Connected = 0b00000100;
2327 }
2328}
2329
2330impl Serialize for SessionStateFlags {
2331 #[inline]
2332 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2333 where
2334 S: Serializer,
2335 {
2336 let v: u8 = self.0 .0;
2337 v.serialize(serializer)
2338 }
2339}
2340
2341impl<'de> Deserialize<'de> for SessionStateFlags {
2342 #[inline]
2343 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2344 where
2345 D: Deserializer<'de>,
2346 {
2347 let v = u8::deserialize(deserializer)?;
2348 Ok(SessionStateFlags::from_bits_retain(v))
2349 }
2350}
2351
2352#[derive(Deserialize, Serialize, Clone, Debug, Default)]
2353pub enum Reason {
2354 ConnectDisconnect(Option<Disconnect>),
2355 ConnectReadWriteTimeout,
2356 ConnectReadWriteError,
2357 ConnectRemoteClose,
2358 ConnectKeepaliveTimeout,
2359 ConnectKicked(IsAdmin),
2360 HandshakeRateExceeded,
2361 SessionExpiration,
2362 SubscribeFailed(Option<ByteString>),
2363 UnsubscribeFailed(Option<ByteString>),
2364 PublishResult(PublishResult),
2365 SubscribeRefused,
2366 DelayedPublishRefused,
2367 MessageExpiration,
2368 MessageQueueFull,
2369 InflightWindowFull,
2370 ProtocolError(ByteString),
2371 Error(ByteString),
2372 MqttError(MqttError),
2373 Reasons(Vec<Reason>),
2374 #[default]
2375 Unknown,
2376}
2377
2378impl Reason {
2379 #[inline]
2380 pub fn from_static(r: &'static str) -> Self {
2381 Reason::Error(ByteString::from_static(r))
2382 }
2383
2384 #[inline]
2385 pub fn is_kicked(&self, admin_opt: IsAdmin) -> bool {
2386 match self {
2387 Reason::ConnectKicked(_admin_opt) => *_admin_opt == admin_opt,
2388 _ => false,
2389 }
2390 }
2391
2392 #[inline]
2393 pub fn is_kicked_by_admin(&self) -> bool {
2394 matches!(self, Reason::ConnectKicked(true))
2395 }
2396}
2397
2398impl std::convert::From<Reason> for PublishResult {
2399 fn from(reason: Reason) -> Self {
2400 use PublishAckReason::*;
2401 use Reason::*;
2402
2403 match reason {
2404 PublishResult(r) => r, MessageQueueFull => Self {
2407 reason_code: QuotaExceeded,
2408 properties: UserProperties::default(),
2409 reason_string: Some("Message queue full".into()),
2410 disconnect: false,
2411 },
2412
2413 InflightWindowFull => Self {
2414 reason_code: QuotaExceeded,
2415 properties: UserProperties::default(),
2416 reason_string: Some("Inflight window full".into()),
2417 disconnect: false,
2418 },
2419
2420 SubscribeRefused | DelayedPublishRefused => Self {
2421 reason_code: ImplementationSpecificError,
2422 properties: UserProperties::default(),
2423 reason_string: Some("Publish refused".into()),
2424 disconnect: false,
2425 },
2426
2427 MessageExpiration => Self {
2428 reason_code: UnspecifiedError,
2429 properties: UserProperties::default(),
2430 reason_string: Some("Message expired".into()),
2431 disconnect: true,
2432 },
2433
2434 ProtocolError(msg) => Self {
2435 reason_code: UnspecifiedError,
2436 properties: UserProperties::default(),
2437 reason_string: Some(format!("Protocol error: {msg}").into()),
2438 disconnect: true,
2439 },
2440
2441 Error(msg) => Self {
2442 reason_code: ImplementationSpecificError,
2443 properties: UserProperties::default(),
2444 reason_string: Some(msg),
2445 disconnect: true,
2446 },
2447
2448 MqttError(_) => Self {
2449 reason_code: UnspecifiedError,
2450 properties: UserProperties::default(),
2451 reason_string: Some("MQTT internal error".into()),
2452 disconnect: true,
2453 },
2454
2455 ConnectDisconnect(_)
2456 | ConnectReadWriteTimeout
2457 | ConnectReadWriteError
2458 | ConnectRemoteClose
2459 | ConnectKeepaliveTimeout
2460 | ConnectKicked(_)
2461 | HandshakeRateExceeded
2462 | SessionExpiration
2463 | SubscribeFailed(_)
2464 | UnsubscribeFailed(_)
2465 | Reasons(_)
2466 | Unknown => Self {
2467 reason_code: UnspecifiedError,
2468 properties: UserProperties::default(),
2469 reason_string: Some("Connection or session related error".into()),
2470 disconnect: true,
2471 },
2472 }
2473 }
2474}
2475
2476impl std::convert::From<&str> for Reason {
2477 #[inline]
2478 fn from(r: &str) -> Self {
2479 Reason::Error(ByteString::from(r))
2480 }
2481}
2482
2483impl std::convert::From<String> for Reason {
2484 #[inline]
2485 fn from(r: String) -> Self {
2486 Reason::Error(ByteString::from(r))
2487 }
2488}
2489
2490impl std::convert::From<MqttError> for Reason {
2491 #[inline]
2492 fn from(e: MqttError) -> Self {
2493 Reason::MqttError(e)
2494 }
2495}
2496
2497impl std::convert::From<Error> for Reason {
2498 #[inline]
2499 fn from(e: Error) -> Self {
2500 match e.downcast::<MqttError>() {
2501 Err(e) => Reason::Error(ByteString::from(e.to_string())),
2502 Ok(e) => Reason::MqttError(e),
2503 }
2504 }
2505}
2506
2507impl ToReasonCode for Reason {
2508 fn to_reason_code(&self) -> DisconnectReasonCode {
2509 match self {
2510 Reason::ConnectDisconnect(Some(Disconnect::V5(d))) => d.reason_code,
2511 Reason::ConnectDisconnect(_) => DisconnectReasonCode::NormalDisconnection,
2512 Reason::ConnectReadWriteTimeout => DisconnectReasonCode::KeepAliveTimeout,
2513 Reason::ConnectReadWriteError => DisconnectReasonCode::UnspecifiedError,
2514 Reason::ConnectRemoteClose => DisconnectReasonCode::ServerShuttingDown,
2515 Reason::ConnectKeepaliveTimeout => DisconnectReasonCode::KeepAliveTimeout,
2516 Reason::ConnectKicked(is_admin) => {
2517 if *is_admin {
2518 DisconnectReasonCode::AdministrativeAction
2519 } else {
2520 DisconnectReasonCode::NotAuthorized
2521 }
2522 }
2523 Reason::HandshakeRateExceeded => DisconnectReasonCode::ConnectionRateExceeded,
2524 Reason::SessionExpiration => DisconnectReasonCode::SessionTakenOver,
2525 Reason::SubscribeFailed(_) => DisconnectReasonCode::UnspecifiedError,
2526 Reason::UnsubscribeFailed(_) => DisconnectReasonCode::UnspecifiedError,
2527 Reason::SubscribeRefused => DisconnectReasonCode::NotAuthorized,
2528 Reason::PublishResult(pubres) => pubres.reason_code.to_reason_code(),
2529 Reason::DelayedPublishRefused => DisconnectReasonCode::NotAuthorized,
2530 Reason::MessageExpiration => DisconnectReasonCode::MessageRateTooHigh,
2531 Reason::MessageQueueFull => DisconnectReasonCode::QuotaExceeded,
2532 Reason::InflightWindowFull => DisconnectReasonCode::ReceiveMaximumExceeded,
2533 Reason::ProtocolError(_) => DisconnectReasonCode::ProtocolError,
2534 Reason::Error(_) => DisconnectReasonCode::UnspecifiedError,
2535 Reason::MqttError(mqtt_error) => mqtt_error.to_reason_code(),
2536 Reason::Reasons(reasons) => {
2537 if let Some(first_reason) = reasons.first() {
2538 first_reason.to_reason_code()
2539 } else {
2540 DisconnectReasonCode::UnspecifiedError
2541 }
2542 }
2543 Reason::Unknown => DisconnectReasonCode::UnspecifiedError,
2544 }
2545 }
2546}
2547
2548impl Display for Reason {
2549 #[inline]
2550 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2551 let r = match self {
2552 Reason::ConnectDisconnect(r) => {
2553 match r {
2555 Some(r) => return write!(f, "Disconnect({r:?})"),
2556 None => "Disconnect",
2557 }
2558 }
2559 Reason::ConnectReadWriteTimeout => {
2560 "ReadWriteTimeout" }
2562 Reason::ConnectReadWriteError => {
2563 "ReadWriteError" }
2565 Reason::ConnectRemoteClose => {
2566 "RemoteClose" }
2568 Reason::ConnectKeepaliveTimeout => {
2569 "KeepaliveTimeout" }
2571 Reason::ConnectKicked(admin_opt) => {
2572 if *admin_opt {
2573 "ByAdminKick" } else {
2575 "Kicked" }
2577 }
2578 Reason::HandshakeRateExceeded => {
2579 "HandshakeRateExceeded" }
2581 Reason::SessionExpiration => {
2582 "SessionExpiration" }
2584 Reason::SubscribeFailed(r) => {
2585 match r {
2587 Some(r) => return write!(f, "SubscribeFailed({r})"),
2588 None => "SubscribeFailed",
2589 }
2590 }
2591 Reason::UnsubscribeFailed(r) => {
2592 match r {
2594 Some(r) => return write!(f, "UnsubscribeFailed({r})"),
2595 None => "UnsubscribeFailed",
2596 }
2597 }
2598 Reason::SubscribeRefused => {
2599 "SubscribeRefused" }
2601 Reason::PublishResult(pubres) => {
2602 if let Some(rs) = pubres.reason_string.as_ref() {
2603 let s: &str = rs.as_ref();
2604 s
2605 } else {
2606 &format!("{:?}", pubres.reason_code)
2607 }
2608 }
2609 Reason::DelayedPublishRefused => {
2610 "DelayedPublishRefused" }
2612 Reason::MessageExpiration => {
2613 "MessageExpiration" }
2615 Reason::MessageQueueFull => {
2616 "MessageQueueFull" }
2618 Reason::InflightWindowFull => "Inflight window is full",
2619 Reason::Error(r) => r,
2620 Reason::MqttError(e) => &e.to_string(),
2621 Reason::ProtocolError(r) => return write!(f, "ProtocolError({r})"),
2622 Reason::Reasons(reasons) => match reasons.len() {
2623 0 => "",
2624 1 => return write!(f, "{}", reasons.first().map(|r| r.to_string()).unwrap_or_default()),
2625 _ => return write!(f, "{}", reasons.iter().map(|r| r.to_string()).join(",")),
2626 },
2627 Reason::Unknown => {
2628 "Unknown" }
2630 };
2631 write!(f, "{r}")
2632 }
2633}
2634
2635#[derive(Debug)]
2636pub struct ServerTopicAliases {
2637 max_topic_aliases: usize,
2638 aliases: RwLock<HashMap<TopicName, NonZeroU16>>,
2639}
2640
2641impl ServerTopicAliases {
2642 #[inline]
2643 pub fn new(max_topic_aliases: usize) -> Self {
2644 ServerTopicAliases { max_topic_aliases, aliases: RwLock::new(HashMap::default()) }
2645 }
2646
2647 #[inline]
2648 pub async fn get(&self, topic: TopicName) -> (Option<TopicName>, Option<NonZeroU16>) {
2649 if self.max_topic_aliases == 0 {
2650 return (Some(topic), None);
2651 }
2652 let alias = {
2653 let aliases = self.aliases.read().await;
2654 if let Some(alias) = aliases.get(&topic) {
2655 return (None, Some(*alias));
2656 }
2657 let len = aliases.len();
2658 if len >= self.max_topic_aliases {
2659 return (Some(topic), None);
2660 }
2661
2662 match NonZeroU16::try_from((len + 1) as u16) {
2663 Ok(alias) => alias,
2664 Err(_) => {
2665 unreachable!()
2666 }
2667 }
2668 };
2669 self.aliases.write().await.insert(topic.clone(), alias);
2670 (Some(topic), Some(alias))
2671 }
2672}
2673
2674#[derive(Debug)]
2675pub struct ClientTopicAliases {
2676 max_topic_aliases: usize,
2677 aliases: RwLock<HashMap<NonZeroU16, TopicName>>,
2678}
2679
2680impl ClientTopicAliases {
2681 #[inline]
2682 pub fn new(max_topic_aliases: usize) -> Self {
2683 ClientTopicAliases { max_topic_aliases, aliases: RwLock::new(HashMap::default()) }
2684 }
2685
2686 #[inline]
2687 pub async fn set_and_get(&self, alias: Option<NonZeroU16>, topic: TopicName) -> Result<TopicName> {
2688 match (alias, topic.len()) {
2689 (Some(alias), 0) => {
2690 self.aliases.read().await.get(&alias).ok_or_else(|| {
2691 MqttError::PublishAckReason(
2692 PublishAckReason::ImplementationSpecificError,
2693 ByteString::from(
2694 "implementation specific error, the ‘topic‘ associated with the ‘alias‘ was not found",
2695 ),
2696 ).into()
2697 }).cloned()
2698 }
2699 (Some(alias), _) => {
2700 let mut aliases = self.aliases.write().await;
2701 let len = aliases.len();
2702 if let Some(topic_mut) = aliases.get_mut(&alias) {
2703 *topic_mut = topic.clone()
2704 }else{
2705 if len >= self.max_topic_aliases {
2706 return Err(MqttError::PublishAckReason(
2707 PublishAckReason::ImplementationSpecificError,
2708 ByteString::from(
2709 format!("implementation specific error, the number of topic aliases exceeds the limit ({})", self.max_topic_aliases),
2710 ),
2711 ).into())
2712 }
2713 aliases.insert(alias, topic.clone());
2714 }
2715 Ok(topic)
2716 }
2717 (None, 0) => Err(MqttError::PublishAckReason(
2718 PublishAckReason::ImplementationSpecificError,
2719 ByteString::from("implementation specific error, ‘alias’ and ‘topic’ are both empty"),
2720 ).into()),
2721 (None, _) => Ok(topic),
2722 }
2723 }
2724}
2725
2726#[derive(Debug, Default, Deserialize, Serialize, Clone)]
2727pub struct DisconnectInfo {
2728 pub disconnected_at: TimestampMillis,
2729 pub reasons: Vec<Reason>,
2730 pub mqtt_disconnect: Option<Disconnect>, }
2732
2733impl DisconnectInfo {
2734 #[inline]
2735 pub fn new(disconnected_at: TimestampMillis) -> Self {
2736 Self { disconnected_at, reasons: Vec::new(), mqtt_disconnect: None }
2737 }
2738
2739 #[inline]
2740 pub fn is_disconnected(&self) -> bool {
2741 self.disconnected_at != 0
2742 }
2743}
2744
2745#[inline]
2746pub fn topic_size(topic: &Topic) -> usize {
2747 topic
2748 .iter()
2749 .map(|l| {
2750 let data_len = match l {
2751 Level::Normal(s) => s.len(),
2752 Level::Metadata(s) => s.len(),
2753 _ => 0,
2754 };
2755 size_of::<Level>() + data_len
2756 })
2757 .sum::<usize>()
2758}
2759
2760#[derive(Debug, Serialize, Deserialize)]
2761pub struct DelayedPublish {
2762 pub expired_time: TimestampMillis,
2763 pub from: From,
2764 pub publish: Publish,
2765 pub message_storage_available: bool,
2766 pub message_expiry_interval: Option<Duration>,
2767}
2768
2769impl DelayedPublish {
2770 #[inline]
2771 pub fn new(
2772 from: From,
2773 publish: Publish,
2774 message_storage_available: bool,
2775 message_expiry_interval: Option<Duration>,
2776 ) -> Self {
2777 let expired_time = publish
2778 .delay_interval
2779 .map(|di| timestamp_millis() + (di as TimestampMillis * 1000))
2780 .unwrap_or_else(timestamp_millis);
2781 Self { expired_time, from, publish, message_storage_available, message_expiry_interval }
2782 }
2783
2784 #[inline]
2785 pub fn is_expired(&self) -> bool {
2786 timestamp_millis() > self.expired_time
2787 }
2788}
2789
2790impl std::cmp::Eq for DelayedPublish {}
2791
2792impl PartialEq for DelayedPublish {
2793 fn eq(&self, other: &Self) -> bool {
2794 other.expired_time.eq(&self.expired_time)
2795 }
2796}
2797
2798impl std::cmp::Ord for DelayedPublish {
2799 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
2800 other.expired_time.cmp(&self.expired_time)
2801 }
2802}
2803
2804impl PartialOrd for DelayedPublish {
2805 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
2806 Some(self.cmp(other))
2807 }
2808}
2809
2810#[derive(Serialize, Deserialize, Clone, Debug)]
2811pub enum OfflineSession {
2812 Exist(Option<OfflineInfo>),
2813 NotExist,
2814}
2815
2816#[derive(Serialize, Deserialize, Clone, Debug)]
2817pub struct HealthInfo {
2818 pub running: bool,
2819 #[serde(skip_serializing_if = "Option::is_none")]
2820 pub descr: Option<String>,
2821 pub nodes: Vec<NodeHealthStatus>,
2822}
2823
2824impl Default for HealthInfo {
2825 fn default() -> Self {
2826 Self { running: true, descr: None, nodes: vec![NodeHealthStatus::default()] }
2827 }
2828}
2829
2830impl HealthInfo {
2831 pub fn to_json(&self) -> Value {
2832 let mut obj = Map::new();
2833
2834 obj.insert("running".into(), json!(self.running));
2835
2836 if let Some(descr) = &self.descr {
2837 if !descr.is_empty() {
2838 obj.insert("descr".into(), json!(descr));
2839 }
2840 }
2841
2842 let nodes_json: Vec<Value> = self.nodes.iter().map(|node| node.to_json()).collect();
2843
2844 obj.insert("nodes".into(), Value::Array(nodes_json));
2845
2846 Value::Object(obj)
2847 }
2848}
2849
2850#[derive(Serialize, Deserialize, Clone, Debug)]
2851pub struct NodeHealthStatus {
2852 pub node_id: NodeId,
2853 pub running: bool,
2854 pub leader_id: Option<NodeId>,
2855 pub descr: Option<String>,
2856}
2857
2858impl NodeHealthStatus {
2859 pub fn is_running(&self) -> bool {
2860 if let Some(leader_id) = self.leader_id {
2861 leader_id > 0 && self.running
2862 } else {
2863 self.running
2864 }
2865 }
2866
2867 pub fn to_json(&self) -> Value {
2868 let mut obj = Map::new();
2869
2870 obj.insert("node_id".into(), json!(self.node_id));
2871 obj.insert("running".into(), json!(self.is_running()));
2872
2873 if let Some(leader_id) = &self.leader_id {
2874 obj.insert("leader_id".into(), json!(leader_id));
2875 }
2876
2877 if let Some(descr) = &self.descr {
2878 if !descr.is_empty() {
2879 obj.insert("descr".into(), json!(descr));
2880 }
2881 }
2882
2883 Value::Object(obj)
2884 }
2885}
2886
2887impl Default for NodeHealthStatus {
2888 fn default() -> Self {
2889 Self { node_id: 0, running: true, leader_id: None, descr: None }
2890 }
2891}
2892
2893#[inline]
2894pub fn serialize_user_properties(props: &UserProperties) -> HashMap<&str, Value> {
2895 let mut map: HashMap<&str, Vec<&str>> = HashMap::default();
2896
2897 for (k, v) in props {
2898 map.entry(k.as_ref()).or_default().push(v.as_ref());
2899 }
2900
2901 let mut result = HashMap::default();
2902 for (key, values) in map {
2903 if values.len() == 1 {
2904 result.insert(key, serde_json::Value::String(values[0].to_string()));
2905 } else {
2906 let array: Vec<serde_json::Value> =
2907 values.iter().map(|v| serde_json::Value::String(v.to_string())).collect();
2908 result.insert(key, serde_json::Value::Array(array));
2909 }
2910 }
2911
2912 result
2913}
2914
2915#[test]
2916fn test_reason() {
2917 assert!(Reason::ConnectKicked(false).is_kicked(false));
2918 assert!(!Reason::ConnectKicked(false).is_kicked(true));
2919 assert!(Reason::ConnectKicked(true).is_kicked(true));
2920 assert!(!Reason::ConnectKicked(true).is_kicked(false));
2921 assert!(Reason::ConnectKicked(true).is_kicked_by_admin());
2922 assert!(!Reason::ConnectKicked(false).is_kicked_by_admin());
2923 assert!(!Reason::ConnectDisconnect(None).is_kicked(false));
2924 assert!(!Reason::ConnectDisconnect(None).is_kicked_by_admin());
2925
2926 let reasons = Reason::Reasons(vec![Reason::ConnectKicked(false), Reason::MessageExpiration]);
2927 assert_eq!(reasons.to_string(), "Kicked,MessageExpiration");
2928}