Skip to main content

rmqtt/
types.rs

1//! Some commonly used type definitions
2
3use std::any::Any;
4use std::convert::From as _f;
5use std::fmt;
6use std::fmt::Display;
7use std::hash::Hash;
8use std::mem::{size_of, size_of_val};
9use std::net::SocketAddr;
10use std::num::{NonZeroU16, NonZeroU32};
11use std::ops::{Deref, DerefMut};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use crate::acl::AuthInfo;
16use crate::codec::types::MQTT_LEVEL_5;
17use crate::codec::v3::{
18    Connect as ConnectV3, ConnectAckReason as ConnectAckReasonV3, LastWill as LastWillV3,
19};
20use crate::codec::v5::{
21    Connect as ConnectV5, ConnectAckReason as ConnectAckReasonV5, DisconnectReasonCode,
22    LastWill as LastWillV5, PublishAck as PublishAckV5, PublishAck2, PublishAck2Reason, PublishAckReason,
23    PublishProperties, RetainHandling, SubscribeAckReason, SubscriptionOptions as SubscriptionOptionsV5,
24    ToReasonCode, UserProperties, UserProperty,
25};
26use crate::fitter::Fitter;
27use crate::net::MqttError;
28use crate::net::{v3, v5, Builder};
29use crate::queue::{Queue, Sender};
30use crate::utils::{self, timestamp_millis};
31use crate::{codec, Error, Result};
32use anyhow::anyhow;
33use base64::prelude::{Engine, BASE64_STANDARD};
34use bitflags::bitflags;
35use bytes::Bytes;
36use bytestring::ByteString;
37use futures::StreamExt;
38use get_size::GetSize;
39use itertools::Itertools;
40use rmqtt_codec::cert::CertInfo;
41use serde::de::{self, Deserializer};
42use serde::ser::{SerializeStruct, Serializer};
43use serde::{Deserialize, Serialize};
44use serde_json::{json, Map, Value};
45use tokio::io::{AsyncRead, AsyncWrite};
46use tokio::sync::{oneshot, RwLock};
47
48use crate::context::ServerContext;
49use crate::inflight::{OutInflight, OutInflightMessage};
50use crate::session::OfflineInfo;
51use crate::topic::Level;
52
53pub use crate::codec::types::Publish as CodecPublish;
54
55pub type Port = u16;
56pub type NodeId = utils::NodeId;
57pub type NodeName = String;
58pub type RemoteSocketAddr = SocketAddr;
59pub type LocalSocketAddr = SocketAddr;
60pub type Addr = utils::Addr;
61pub type ClientId = ByteString;
62pub type UserName = ByteString;
63pub type Superuser = bool;
64pub type Password = bytes::Bytes;
65pub type PacketId = u16;
66///topic name or topic filter
67pub type TopicName = ByteString;
68pub type Topic = crate::topic::Topic;
69///topic filter
70pub type TopicFilter = ByteString;
71pub type SharedGroup = ByteString;
72pub type LimitSubsCount = Option<usize>;
73pub type IsDisconnect = bool;
74pub type MessageExpiry = bool;
75pub type TimestampMillis = utils::TimestampMillis;
76pub type Timestamp = utils::Timestamp;
77pub type IsOnline = bool;
78pub type IsAdmin = bool;
79pub type LimiterName = u16;
80pub type CleanStart = bool;
81pub type AssignedClientId = bool;
82pub type IsPing = bool;
83
84pub type Tx = SessionTx;
85pub type Rx = futures::channel::mpsc::UnboundedReceiver<Message>;
86
87pub type DashSet<V> = dashmap::DashSet<V, ahash::RandomState>;
88pub type DashMap<K, V> = dashmap::DashMap<K, V, ahash::RandomState>;
89pub type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;
90pub type QoS = rmqtt_codec::types::QoS;
91pub type PublishReceiveTime = TimestampMillis;
92pub type Subscriptions = Vec<(TopicFilter, SubscriptionOptions)>;
93pub type TopicFilters = Vec<TopicFilter>;
94pub type SubscriptionClientIds = Option<Vec<(ClientId, Option<(TopicFilter, SharedGroup)>)>>;
95pub type SubscriptionIdentifier = NonZeroU32;
96
97pub type HookSubscribeResult = Vec<Option<TopicFilter>>;
98pub type HookUnsubscribeResult = Vec<Option<TopicFilter>>;
99
100pub type MessageSender = Sender<(From, Publish)>;
101pub type MessageQueue = Queue<(From, Publish)>;
102pub type MessageQueueType = Arc<MessageQueue>;
103pub type OutInflightType = Arc<RwLock<OutInflight>>; //@TODO 考虑去掉 RwLock
104
105pub type ConnectInfoType = Arc<ConnectInfo>;
106pub type FitterType = Arc<dyn Fitter>;
107pub type ListenerConfig = Arc<Builder>;
108pub type ListenerId = u16;
109
110pub(crate) const UNDEFINED: &str = "undefined";
111
112#[derive(Clone)]
113pub struct SessionTx {
114    #[cfg(feature = "debug")]
115    scx: ServerContext,
116    tx: futures::channel::mpsc::UnboundedSender<Message>,
117}
118
119impl SessionTx {
120    pub fn new(
121        tx: futures::channel::mpsc::UnboundedSender<Message>,
122        #[cfg(feature = "debug")] scx: ServerContext,
123    ) -> Self {
124        Self {
125            tx,
126            #[cfg(feature = "debug")]
127            scx,
128        }
129    }
130
131    #[inline]
132    pub fn is_closed(&self) -> bool {
133        self.tx.is_closed()
134    }
135
136    #[inline]
137    pub fn unbounded_send(
138        &self,
139        msg: Message,
140    ) -> std::result::Result<(), futures::channel::mpsc::TrySendError<Message>> {
141        match self.tx.unbounded_send(msg) {
142            Ok(()) => {
143                #[cfg(feature = "debug")]
144                self.scx.stats.debug_session_channels.inc();
145                Ok(())
146            }
147            Err(e) => Err(e),
148        }
149    }
150}
151
152#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
153pub enum ConnectInfo {
154    V3(Id, Box<ConnectV3>),
155    V5(Id, Box<ConnectV5>),
156}
157
158impl std::convert::From<Id> for ConnectInfo {
159    fn from(id: Id) -> Self {
160        ConnectInfo::V3(id, Box::default())
161    }
162}
163
164impl ConnectInfo {
165    #[inline]
166    pub fn id(&self) -> &Id {
167        match self {
168            ConnectInfo::V3(id, _) => id,
169            ConnectInfo::V5(id, _) => id,
170        }
171    }
172
173    #[inline]
174    pub fn client_id(&self) -> &ClientId {
175        match self {
176            ConnectInfo::V3(id, _) => &id.client_id,
177            ConnectInfo::V5(id, _) => &id.client_id,
178        }
179    }
180
181    #[inline]
182    pub fn to_json(&self) -> serde_json::Value {
183        match self {
184            ConnectInfo::V3(id, c) => {
185                json!({
186                    "node": id.node(),
187                    "ipaddress": id.remote_addr,
188                    "clientid": id.client_id,
189                    "username": id.username_ref(),
190                    "keepalive": c.keep_alive,
191                    "proto_ver": c.protocol.level(),
192                    "clean_session": c.clean_session,
193                    "last_will": self.last_will().map(|lw|lw.to_json())
194                })
195            }
196            ConnectInfo::V5(id, c) => {
197                json!({
198                    "node": id.node(),
199                    "ipaddress": id.remote_addr,
200                    "clientid": id.client_id,
201                    "username": id.username_ref(),
202                    "keepalive": c.keep_alive,
203                    "proto_ver": MQTT_LEVEL_5,
204                    "clean_start": c.clean_start,
205                    "last_will": self.last_will().map(|lw|lw.to_json()),
206
207                    "session_expiry_interval_secs": c.session_expiry_interval_secs,
208                    "auth_method": c.auth_method,
209                    "auth_data": c.auth_data,
210                    "request_problem_info": c.request_problem_info,
211                    "request_response_info": c.request_response_info,
212                    "receive_max": c.receive_max,
213                    "topic_alias_max": c.topic_alias_max,
214                    "user_properties": c.user_properties,
215                    "max_packet_size": c.max_packet_size,
216                })
217            }
218        }
219    }
220
221    #[inline]
222    pub fn to_hook_body(&self, user_properties: bool) -> serde_json::Value {
223        match self {
224            ConnectInfo::V3(id, c) => {
225                json!({
226                    "node": id.node(),
227                    "ipaddress": id.remote_addr,
228                    "clientid": id.client_id,
229                    "username": id.username_ref(),
230                    "keepalive": c.keep_alive,
231                    "proto_ver": c.protocol.level(),
232                    "clean_session": c.clean_session,
233                })
234            }
235            ConnectInfo::V5(id, c) => {
236                let mut body = json!({
237                    "node": id.node(),
238                    "ipaddress": id.remote_addr,
239                    "clientid": id.client_id,
240                    "username": id.username_ref(),
241                    "keepalive": c.keep_alive,
242                    "proto_ver": MQTT_LEVEL_5,
243                    "clean_start": c.clean_start,
244                });
245                if user_properties && !c.user_properties.is_empty() {
246                    if let Some(obj) = body.as_object_mut() {
247                        obj.insert(
248                            "user_properties".into(),
249                            json!(serialize_user_properties(&c.user_properties)),
250                        );
251                    }
252                }
253                body
254            }
255        }
256    }
257
258    #[inline]
259    pub fn last_will(&self) -> Option<LastWill<'_>> {
260        match self {
261            ConnectInfo::V3(_, conn_info) => conn_info.last_will.as_ref().map(LastWill::V3),
262            ConnectInfo::V5(_, conn_info) => conn_info.last_will.as_ref().map(LastWill::V5),
263        }
264    }
265
266    #[inline]
267    pub fn keep_alive(&self) -> u16 {
268        match self {
269            ConnectInfo::V3(_, conn_info) => conn_info.keep_alive,
270            ConnectInfo::V5(_, conn_info) => conn_info.keep_alive,
271        }
272    }
273
274    #[inline]
275    pub fn username(&self) -> Option<&UserName> {
276        match self {
277            ConnectInfo::V3(_, conn_info) => conn_info.username.as_ref(),
278            ConnectInfo::V5(_, conn_info) => conn_info.username.as_ref(),
279        }
280    }
281
282    #[inline]
283    pub fn password(&self) -> Option<&Password> {
284        match self {
285            ConnectInfo::V3(_, conn_info) => conn_info.password.as_ref(),
286            ConnectInfo::V5(_, conn_info) => conn_info.password.as_ref(),
287        }
288    }
289
290    #[inline]
291    pub fn ipaddress(&self) -> Option<SocketAddr> {
292        match self {
293            ConnectInfo::V3(id, _) => id.remote_addr,
294            ConnectInfo::V5(id, _) => id.remote_addr,
295        }
296    }
297
298    #[inline]
299    pub fn clean_start(&self) -> bool {
300        match self {
301            ConnectInfo::V3(_, conn_info) => conn_info.clean_session,
302            ConnectInfo::V5(_, conn_info) => conn_info.clean_start,
303        }
304    }
305
306    #[inline]
307    pub fn proto_ver(&self) -> u8 {
308        match self {
309            ConnectInfo::V3(_, conn_info) => conn_info.protocol.level(),
310            ConnectInfo::V5(_, _) => MQTT_LEVEL_5,
311        }
312    }
313
314    ///client max packet size, S(Max Limit) -> C
315    #[inline]
316    pub fn max_packet_size(&self) -> Option<NonZeroU32> {
317        if let ConnectInfo::V5(_, connect) = self {
318            connect.max_packet_size
319        } else {
320            None
321        }
322    }
323
324    #[inline]
325    pub fn auth_method(&self) -> Option<&ByteString> {
326        if let ConnectInfo::V5(_, connect) = self {
327            connect.auth_method.as_ref()
328        } else {
329            None
330        }
331    }
332
333    #[inline]
334    pub fn cert(&self) -> Option<&CertInfo> {
335        match self {
336            ConnectInfo::V3(_, connect) => connect.cert.as_ref(),
337            ConnectInfo::V5(_, connect) => connect.cert.as_ref(),
338        }
339    }
340}
341
342#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
343pub enum Disconnect {
344    V3,
345    V5(codec::v5::Disconnect),
346    Other(ByteString),
347}
348
349impl Disconnect {
350    #[inline]
351    pub fn reason_code(&self) -> Option<DisconnectReasonCode> {
352        match self {
353            Disconnect::V3 => None,
354            Disconnect::V5(d) => Some(d.reason_code),
355            Disconnect::Other(_) => None,
356        }
357    }
358
359    #[inline]
360    pub fn reason(&self) -> Option<&ByteString> {
361        match self {
362            Disconnect::V3 => None,
363            Disconnect::V5(d) => d.reason_string.as_ref(),
364            Disconnect::Other(r) => Some(r),
365        }
366    }
367}
368
369pub type SubscribeAclResult = SubscribeReturn;
370
371#[derive(Default, Debug, Clone, Deserialize, Serialize)]
372pub struct PublishAclResult(pub PublishResult);
373
374impl PublishAclResult {
375    #[inline]
376    pub fn pub_res(self) -> PublishResult {
377        self.0
378    }
379
380    #[inline]
381    pub fn allow() -> Self {
382        PublishAclResult(PublishResult::reason_code(PublishAckReason::Success, None, false))
383    }
384
385    #[inline]
386    pub fn rejected(disconnect: IsDisconnect, reason_string: Option<ByteString>) -> Self {
387        PublishAclResult(PublishResult::reason_code(
388            PublishAckReason::NotAuthorized,
389            Some(reason_string.unwrap_or_else(|| "PublishRefused".into())),
390            disconnect,
391        ))
392    }
393
394    #[inline]
395    pub fn is_allow(&self) -> bool {
396        matches!(self.0.reason_code, PublishAckReason::Success)
397    }
398
399    #[inline]
400    pub fn is_rejected(&self) -> bool {
401        !self.is_allow()
402    }
403}
404
405#[derive(Debug, Clone)]
406pub enum AuthResult {
407    Allow(Superuser, Option<AuthInfo>),
408    ///User is not found
409    NotFound,
410    BadUsernameOrPassword,
411    NotAuthorized,
412}
413
414#[derive(Debug, Clone, PartialEq, Eq)]
415pub enum MessageExpiryCheckResult {
416    Expiry,
417    Remaining(Option<NonZeroU32>),
418}
419
420impl MessageExpiryCheckResult {
421    #[inline]
422    pub fn is_expiry(&self) -> bool {
423        matches!(self, Self::Expiry)
424    }
425
426    #[inline]
427    pub fn message_expiry_interval(&self) -> Option<NonZeroU32> {
428        match self {
429            Self::Expiry => None,
430            Self::Remaining(i) => *i,
431        }
432    }
433}
434
435//key is TopicFilter
436pub type SharedSubRelations = HashMap<TopicFilter, Vec<(SharedGroup, NodeId, ClientId, QoS, IsOnline)>>;
437//In other nodes
438pub type OtherSubRelations = HashMap<NodeId, Vec<TopicFilter>>;
439pub type ClearSubscriptions = bool;
440
441pub type SharedGroupType = (SharedGroup, IsOnline, Vec<ClientId>);
442
443pub type AllRelationsMap = DashMap<TopicFilter, HashMap<ClientId, (Id, SubscriptionOptions)>>;
444
445pub type SubRelation = (
446    TopicFilter,
447    ClientId,
448    SubscriptionOptions,
449    Option<Vec<SubscriptionIdentifier>>,
450    Option<SharedGroupType>,
451);
452pub type SubRelations = Vec<SubRelation>;
453pub type SubRelationsMap = HashMap<NodeId, SubRelations>;
454
455impl std::convert::From<SubscriptioRelationsCollector> for SubRelations {
456    #[inline]
457    fn from(collector: SubscriptioRelationsCollector) -> Self {
458        let mut subs = collector.v3_rels;
459        subs.extend(collector.v5_rels.into_iter().map(|(clientid, (topic_filter, opts, sub_ids, group))| {
460            (topic_filter, clientid, opts, sub_ids, group)
461        }));
462        subs
463    }
464}
465
466pub type SubscriptioRelationsCollectorMap = HashMap<NodeId, SubscriptioRelationsCollector>;
467
468#[allow(clippy::type_complexity)]
469#[derive(Debug, Default)]
470pub struct SubscriptioRelationsCollector {
471    v3_rels: Vec<SubRelation>,
472    v5_rels: HashMap<
473        ClientId,
474        (TopicFilter, SubscriptionOptions, Option<Vec<SubscriptionIdentifier>>, Option<SharedGroupType>),
475    >,
476}
477
478impl SubscriptioRelationsCollector {
479    #[inline]
480    pub fn add(
481        &mut self,
482        topic_filter: &TopicFilter,
483        client_id: ClientId,
484        opts: SubscriptionOptions,
485        group: Option<SharedGroupType>,
486    ) {
487        if opts.is_v3() {
488            self.v3_rels.push((topic_filter.clone(), client_id, opts, None, group));
489        } else {
490            //MQTT V5: Subscription Identifiers
491            self.v5_rels
492                .entry(client_id)
493                .and_modify(|(_, _, sub_ids, _)| {
494                    if let Some(sub_id) = opts.subscription_identifier() {
495                        if let Some(sub_ids) = sub_ids {
496                            sub_ids.push(sub_id)
497                        } else {
498                            *sub_ids = Some(vec![sub_id]);
499                        }
500                    }
501                })
502                .or_insert_with(|| {
503                    let sub_ids = opts.subscription_identifier().map(|id| vec![id]);
504                    (topic_filter.clone(), opts, sub_ids, group)
505                });
506        }
507    }
508}
509
510#[inline]
511pub fn parse_topic_filter(
512    topic_filter: &ByteString,
513    shared_subscription: bool,
514    limit_subscription: bool,
515) -> Result<(TopicFilter, Option<SharedGroup>, LimitSubsCount)> {
516    let invalid_filter = || anyhow!(format!("Illegal topic filter, {:?}", topic_filter));
517    let (topic, shared_group, limit_subs) = if shared_subscription || limit_subscription {
518        let levels = topic_filter.splitn(3, '/').collect::<Vec<_>>();
519        match (levels.first(), levels.get(1), levels.get(2)) {
520            (Some(&"$share"), group, tf) => match (shared_subscription, group, tf) {
521                (true, Some(group), Some(tf)) => {
522                    let tf = TopicFilter::from(*tf);
523                    (tf, Some(SharedGroup::from(*group)), None)
524                }
525                (true, _, _) => {
526                    return Err(invalid_filter());
527                }
528                (false, _, _) => {
529                    return Err(anyhow!(format!("Shared subscription is not enabled, {:?}", topic_filter)));
530                }
531            },
532            (Some(&"$limit"), limit, tf) => match (limit_subscription, limit, tf) {
533                (true, Some(limit), Some(tf)) => {
534                    let tf = TopicFilter::from(*tf);
535                    let limit = limit.parse::<usize>().map_err(|_| invalid_filter())?;
536                    (tf, None, Some(limit))
537                }
538                (true, _, _) => {
539                    return Err(invalid_filter());
540                }
541                (false, _, _) => {
542                    return Err(anyhow!(format!("Limit subscription is not enabled, {:?}", topic_filter)));
543                }
544            },
545            (Some(&"$exclusive"), _, _) => {
546                if limit_subscription {
547                    let tf = TopicFilter::from(topic_filter.trim_start_matches("$exclusive/"));
548                    (tf, None, Some(1))
549                } else {
550                    return Err(anyhow!(format!("Limit subscription is not enabled, {:?}", topic_filter)));
551                }
552            }
553            _ => (topic_filter.clone(), None, None),
554        }
555    } else {
556        (topic_filter.clone(), None, None)
557    };
558    if topic.is_empty() {
559        return Err(invalid_filter());
560    }
561    Ok((topic, shared_group, limit_subs))
562}
563
564#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
565pub enum SubscriptionOptions {
566    V3(SubOptionsV3),
567    V5(SubOptionsV5),
568}
569
570impl Default for SubscriptionOptions {
571    fn default() -> Self {
572        SubscriptionOptions::V3(SubOptionsV3 {
573            qos: QoS::AtMostOnce,
574            #[cfg(feature = "shared-subscription")]
575            shared_group: None,
576            #[cfg(feature = "limit-subscription")]
577            limit_subs: None,
578        })
579    }
580}
581
582impl SubscriptionOptions {
583    #[inline]
584    pub fn no_local(&self) -> Option<bool> {
585        match self {
586            SubscriptionOptions::V3(_) => None,
587            SubscriptionOptions::V5(opts) => Some(opts.no_local),
588        }
589    }
590
591    #[inline]
592    pub fn retain_as_published(&self) -> Option<bool> {
593        match self {
594            SubscriptionOptions::V3(_) => None,
595            SubscriptionOptions::V5(opts) => Some(opts.retain_as_published),
596        }
597    }
598
599    #[inline]
600    pub fn retain_handling(&self) -> Option<RetainHandling> {
601        match self {
602            SubscriptionOptions::V3(_) => None,
603            SubscriptionOptions::V5(opts) => Some(opts.retain_handling),
604        }
605    }
606
607    #[inline]
608    pub fn subscription_identifier(&self) -> Option<NonZeroU32> {
609        match self {
610            SubscriptionOptions::V3(_) => None,
611            SubscriptionOptions::V5(opts) => opts.id,
612        }
613    }
614
615    #[inline]
616    pub fn qos(&self) -> QoS {
617        match self {
618            SubscriptionOptions::V3(opts) => opts.qos,
619            SubscriptionOptions::V5(opts) => opts.qos,
620        }
621    }
622
623    #[inline]
624    pub fn qos_value(&self) -> u8 {
625        match self {
626            SubscriptionOptions::V3(opts) => opts.qos.value(),
627            SubscriptionOptions::V5(opts) => opts.qos.value(),
628        }
629    }
630
631    #[inline]
632    pub fn set_qos(&mut self, qos: QoS) {
633        match self {
634            SubscriptionOptions::V3(opts) => opts.qos = qos,
635            SubscriptionOptions::V5(opts) => opts.qos = qos,
636        }
637    }
638
639    #[inline]
640    pub fn shared_group(&self) -> Option<&SharedGroup> {
641        #[cfg(feature = "shared-subscription")]
642        match self {
643            SubscriptionOptions::V3(opts) => opts.shared_group.as_ref(),
644            SubscriptionOptions::V5(opts) => opts.shared_group.as_ref(),
645        }
646        #[cfg(not(feature = "shared-subscription"))]
647        None
648    }
649
650    #[inline]
651    #[cfg(feature = "shared-subscription")]
652    pub fn has_shared_group(&self) -> bool {
653        match self {
654            SubscriptionOptions::V3(opts) => opts.shared_group.is_some(),
655            SubscriptionOptions::V5(opts) => opts.shared_group.is_some(),
656        }
657    }
658
659    #[inline]
660    pub fn limit_subs(&self) -> Option<usize> {
661        #[cfg(feature = "limit-subscription")]
662        match self {
663            SubscriptionOptions::V3(opts) => opts.limit_subs,
664            SubscriptionOptions::V5(opts) => opts.limit_subs,
665        }
666        #[cfg(not(feature = "limit-subscription"))]
667        None
668    }
669
670    #[inline]
671    #[cfg(feature = "limit-subscription")]
672    pub fn has_limit_subs(&self) -> bool {
673        match self {
674            SubscriptionOptions::V3(opts) => opts.limit_subs.is_some(),
675            SubscriptionOptions::V5(opts) => opts.limit_subs.is_some(),
676        }
677    }
678
679    #[inline]
680    pub fn is_v3(&self) -> bool {
681        matches!(self, SubscriptionOptions::V3(_))
682    }
683
684    #[inline]
685    pub fn is_v5(&self) -> bool {
686        matches!(self, SubscriptionOptions::V5(_))
687    }
688
689    #[inline]
690    pub fn to_json(&self) -> serde_json::Value {
691        match self {
692            SubscriptionOptions::V3(opts) => opts.to_json(),
693            SubscriptionOptions::V5(opts) => opts.to_json(),
694        }
695    }
696
697    #[inline]
698    pub fn deserialize_qos<'de, D>(deserializer: D) -> std::result::Result<QoS, D::Error>
699    where
700        D: Deserializer<'de>,
701    {
702        let v = u8::deserialize(deserializer)?;
703        Ok(match v {
704            0 => QoS::AtMostOnce,
705            1 => QoS::AtLeastOnce,
706            2 => QoS::ExactlyOnce,
707            _ => return Err(de::Error::custom(format!("invalid QoS value, {v}"))),
708        })
709    }
710
711    #[inline]
712    pub fn serialize_qos<S>(qos: &QoS, s: S) -> std::result::Result<S::Ok, S::Error>
713    where
714        S: Serializer,
715    {
716        qos.value().serialize(s)
717    }
718}
719
720#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
721pub struct SubOptionsV3 {
722    #[serde(
723        serialize_with = "SubscriptionOptions::serialize_qos",
724        deserialize_with = "SubscriptionOptions::deserialize_qos"
725    )]
726    pub qos: QoS,
727    #[cfg(feature = "shared-subscription")]
728    pub shared_group: Option<SharedGroup>,
729    #[cfg(feature = "limit-subscription")]
730    pub limit_subs: LimitSubsCount,
731}
732
733impl SubOptionsV3 {
734    #[inline]
735    pub fn to_json(&self) -> serde_json::Value {
736        #[allow(unused_mut)]
737        let mut obj = json!({
738            "qos": self.qos.value(),
739        });
740        #[cfg(any(feature = "limit-subscription", feature = "shared-subscription"))]
741        if let Some(obj) = obj.as_object_mut() {
742            #[cfg(feature = "shared-subscription")]
743            if let Some(g) = &self.shared_group {
744                obj.insert("group".into(), serde_json::Value::String(g.to_string()));
745            }
746            #[cfg(feature = "limit-subscription")]
747            if let Some(limit_subs) = &self.limit_subs {
748                obj.insert("limit_subs".into(), serde_json::Value::from(*limit_subs));
749            }
750        }
751        obj
752    }
753}
754
755#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
756pub struct SubOptionsV5 {
757    #[serde(
758        serialize_with = "SubscriptionOptions::serialize_qos",
759        deserialize_with = "SubscriptionOptions::deserialize_qos"
760    )]
761    pub qos: QoS,
762    #[cfg(feature = "shared-subscription")]
763    pub shared_group: Option<SharedGroup>,
764    #[cfg(feature = "limit-subscription")]
765    pub limit_subs: LimitSubsCount,
766    pub no_local: bool,
767    pub retain_as_published: bool,
768    #[serde(
769        serialize_with = "SubOptionsV5::serialize_retain_handling",
770        deserialize_with = "SubOptionsV5::deserialize_retain_handling"
771    )]
772    pub retain_handling: RetainHandling,
773    //Subscription Identifier
774    pub id: Option<SubscriptionIdentifier>,
775}
776
777impl SubOptionsV5 {
778    #[inline]
779    pub fn retain_handling_value(&self) -> u8 {
780        match self.retain_handling {
781            RetainHandling::AtSubscribe => 0u8,
782            RetainHandling::AtSubscribeNew => 1u8,
783            RetainHandling::NoAtSubscribe => 2u8,
784        }
785    }
786
787    #[inline]
788    pub fn to_json(&self) -> serde_json::Value {
789        let mut obj = json!({
790            "qos": self.qos.value(),
791            "no_local": self.no_local,
792            "retain_as_published": self.retain_as_published,
793            "retain_handling": self.retain_handling_value(),
794        });
795        if let Some(obj) = obj.as_object_mut() {
796            #[cfg(feature = "shared-subscription")]
797            if let Some(g) = &self.shared_group {
798                obj.insert("group".into(), serde_json::Value::String(g.to_string()));
799            }
800            #[cfg(feature = "limit-subscription")]
801            if let Some(limit_subs) = &self.limit_subs {
802                obj.insert("limit_subs".into(), serde_json::Value::from(*limit_subs));
803            }
804            if let Some(id) = &self.id {
805                obj.insert("id".into(), serde_json::Value::Number(serde_json::Number::from(id.get())));
806            }
807        }
808        obj
809    }
810
811    #[inline]
812    pub fn deserialize_retain_handling<'de, D>(
813        deserializer: D,
814    ) -> std::result::Result<RetainHandling, D::Error>
815    where
816        D: Deserializer<'de>,
817    {
818        let v = u8::deserialize(deserializer)?;
819        Ok(match v {
820            0 => RetainHandling::AtSubscribe,
821            1 => RetainHandling::AtSubscribeNew,
822            2 => RetainHandling::NoAtSubscribe,
823            _ => return Err(de::Error::custom(format!("invalid RetainHandling value, {v}"))),
824        })
825    }
826
827    #[inline]
828    pub fn serialize_retain_handling<S>(rh: &RetainHandling, s: S) -> std::result::Result<S::Ok, S::Error>
829    where
830        S: Serializer,
831    {
832        let v = match rh {
833            RetainHandling::AtSubscribe => 0u8,
834            RetainHandling::AtSubscribeNew => 1u8,
835            RetainHandling::NoAtSubscribe => 2u8,
836        };
837        v.serialize(s)
838    }
839}
840
841impl std::convert::From<(QoS, Option<SharedGroup>, LimitSubsCount)> for SubscriptionOptions {
842    #[inline]
843    fn from(opts: (QoS, Option<SharedGroup>, LimitSubsCount)) -> Self {
844        SubscriptionOptions::V3(SubOptionsV3 {
845            qos: opts.0,
846            #[cfg(feature = "shared-subscription")]
847            shared_group: opts.1,
848            #[cfg(feature = "limit-subscription")]
849            limit_subs: opts.2,
850        })
851    }
852}
853
854impl std::convert::From<(&SubscriptionOptionsV5, Option<SharedGroup>, LimitSubsCount, Option<NonZeroU32>)>
855    for SubscriptionOptions
856{
857    #[inline]
858    fn from(opts: (&SubscriptionOptionsV5, Option<SharedGroup>, LimitSubsCount, Option<NonZeroU32>)) -> Self {
859        SubscriptionOptions::V5(SubOptionsV5 {
860            qos: opts.0.qos,
861            #[cfg(feature = "shared-subscription")]
862            shared_group: opts.1,
863            #[cfg(feature = "limit-subscription")]
864            limit_subs: opts.2,
865            no_local: opts.0.no_local,
866            retain_as_published: opts.0.retain_as_published,
867            retain_handling: opts.0.retain_handling,
868            id: opts.3,
869        })
870    }
871}
872
873#[derive(Clone, Debug, Serialize, Deserialize)]
874pub struct Subscribe {
875    pub topic_filter: TopicFilter,
876    pub opts: SubscriptionOptions,
877}
878
879impl Subscribe {
880    #[inline]
881    pub fn from_v3(
882        topic_filter: &ByteString,
883        qos: QoS,
884        shared_subscription: bool,
885        limit_subscription: bool,
886    ) -> Result<Self> {
887        let (topic_filter, shared_group, limit_subs) =
888            parse_topic_filter(topic_filter, shared_subscription, limit_subscription)?;
889        let opts = (qos, shared_group, limit_subs).into();
890        Ok(Subscribe { topic_filter, opts })
891    }
892
893    #[inline]
894    pub fn from_v5(
895        topic_filter: &ByteString,
896        opts: &SubscriptionOptionsV5,
897        shared_subscription: bool,
898        limit_subscription: bool,
899        sub_id: Option<NonZeroU32>,
900    ) -> Result<Self> {
901        let (topic_filter, shared_group, limit_subs) =
902            parse_topic_filter(topic_filter, shared_subscription, limit_subscription)?;
903        let opts = (opts, shared_group, limit_subs, sub_id).into();
904        Ok(Subscribe { topic_filter, opts })
905    }
906
907    #[inline]
908    #[cfg(feature = "shared-subscription")]
909    pub fn is_shared(&self) -> bool {
910        self.opts.has_shared_group()
911    }
912}
913
914#[derive(Clone, Debug)]
915pub struct SubscribeReturn {
916    pub ack_reason: SubscribeAckReason,
917    pub prev_opts: Option<SubscriptionOptions>,
918}
919
920impl SubscribeReturn {
921    #[inline]
922    pub fn new_success(qos: QoS, prev_opts: Option<SubscriptionOptions>) -> Self {
923        let ack_reason = match qos {
924            QoS::AtMostOnce => SubscribeAckReason::GrantedQos0,
925            QoS::AtLeastOnce => SubscribeAckReason::GrantedQos1,
926            QoS::ExactlyOnce => SubscribeAckReason::GrantedQos2,
927        };
928        Self { ack_reason, prev_opts }
929    }
930
931    #[inline]
932    pub fn new_failure(ack_reason: SubscribeAckReason) -> Self {
933        Self { ack_reason, prev_opts: None }
934    }
935
936    #[inline]
937    pub fn success(&self) -> Option<QoS> {
938        match self.ack_reason {
939            SubscribeAckReason::GrantedQos0 => Some(QoS::AtMostOnce),
940            SubscribeAckReason::GrantedQos1 => Some(QoS::AtLeastOnce),
941            SubscribeAckReason::GrantedQos2 => Some(QoS::ExactlyOnce),
942            _ => None,
943        }
944    }
945
946    #[inline]
947    pub fn failure(&self) -> bool {
948        !matches!(
949            self.ack_reason,
950            SubscribeAckReason::GrantedQos0
951                | SubscribeAckReason::GrantedQos1
952                | SubscribeAckReason::GrantedQos2
953        )
954    }
955
956    #[inline]
957    pub fn into_inner(self) -> SubscribeAckReason {
958        self.ack_reason
959    }
960}
961//
962// #[derive(Debug, PartialEq, Eq, Clone)]
963// pub struct SubscribedV5 {
964//     /// Packet Identifier
965//     pub packet_id: NonZeroU16,
966//     /// Subscription Identifier
967//     pub id: Option<NonZeroU32>,
968//     pub user_properties: UserProperties,
969//     /// the list of Topic Filters and QoS to which the Client wants to subscribe.
970//     pub topic_filter: (ByteString, SubscriptionOptionsV5),
971// }
972
973#[derive(Copy, Clone, Debug, PartialEq, Eq)]
974pub enum ConnectAckReason {
975    V3(ConnectAckReasonV3),
976    V5(ConnectAckReasonV5),
977}
978
979impl ConnectAckReason {
980    #[inline]
981    pub fn success(&self) -> bool {
982        matches!(
983            *self,
984            ConnectAckReason::V3(ConnectAckReasonV3::ConnectionAccepted)
985                | ConnectAckReason::V5(ConnectAckReasonV5::Success)
986        )
987    }
988
989    #[inline]
990    pub fn not_authorized(&self) -> bool {
991        matches!(
992            *self,
993            ConnectAckReason::V3(ConnectAckReasonV3::NotAuthorized)
994                | ConnectAckReason::V3(ConnectAckReasonV3::BadUserNameOrPassword)
995                | ConnectAckReason::V5(ConnectAckReasonV5::NotAuthorized)
996                | ConnectAckReason::V5(ConnectAckReasonV5::BadUserNameOrPassword)
997        )
998    }
999
1000    #[inline]
1001    pub fn success_or_auth_error(&self) -> (bool, bool) {
1002        match *self {
1003            ConnectAckReason::V3(ConnectAckReasonV3::ConnectionAccepted)
1004            | ConnectAckReason::V5(ConnectAckReasonV5::Success) => (true, false),
1005            ConnectAckReason::V3(ConnectAckReasonV3::NotAuthorized)
1006            | ConnectAckReason::V3(ConnectAckReasonV3::BadUserNameOrPassword)
1007            | ConnectAckReason::V5(ConnectAckReasonV5::NotAuthorized)
1008            | ConnectAckReason::V5(ConnectAckReasonV5::BadUserNameOrPassword) => (false, true),
1009            _ => (false, false),
1010        }
1011    }
1012
1013    #[inline]
1014    pub fn reason(&self) -> &'static str {
1015        match *self {
1016            ConnectAckReason::V3(r) => r.reason(),
1017            ConnectAckReason::V5(r) => r.reason(),
1018        }
1019    }
1020}
1021
1022#[derive(Clone, Debug)]
1023pub struct Unsubscribe {
1024    pub topic_filter: TopicFilter,
1025    pub shared_group: Option<SharedGroup>,
1026}
1027
1028impl Unsubscribe {
1029    #[inline]
1030    pub fn from(
1031        topic_filter: &ByteString,
1032        shared_subscription: bool,
1033        limit_subscription: bool,
1034    ) -> Result<Self> {
1035        let (topic_filter, shared_group, _) =
1036            parse_topic_filter(topic_filter, shared_subscription, limit_subscription)?;
1037        Ok(Unsubscribe { topic_filter, shared_group })
1038    }
1039
1040    #[inline]
1041    pub fn is_shared(&self) -> bool {
1042        self.shared_group.is_some()
1043    }
1044}
1045
1046// #[derive(Clone, Debug)]
1047// pub enum UnsubscribeAck {
1048//     V3,
1049//     V5(UnsubscribeAckV5),
1050// }
1051
1052#[derive(Clone)]
1053pub enum LastWill<'a> {
1054    V3(&'a LastWillV3),
1055    V5(&'a LastWillV5),
1056}
1057
1058impl fmt::Debug for LastWill<'_> {
1059    #[inline]
1060    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1061        match self {
1062            LastWill::V3(lw) => f
1063                .debug_struct("LastWill")
1064                .field("topic", &lw.topic)
1065                .field("retain", &lw.retain)
1066                .field("qos", &lw.qos.value())
1067                .field("message", &"<REDACTED>")
1068                .finish(),
1069            LastWill::V5(lw) => f
1070                .debug_struct("LastWill")
1071                .field("topic", &lw.topic)
1072                .field("retain", &lw.retain)
1073                .field("qos", &lw.qos.value())
1074                .field("message", &"<REDACTED>")
1075                .field("will_delay_interval_sec", &lw.will_delay_interval_sec)
1076                .field("correlation_data", &lw.correlation_data)
1077                .field("message_expiry_interval", &lw.message_expiry_interval)
1078                .field("content_type", &lw.content_type)
1079                .field("user_properties", &lw.user_properties)
1080                .field("is_utf8_payload", &lw.is_utf8_payload)
1081                .field("response_topic", &lw.response_topic)
1082                .finish(),
1083        }
1084    }
1085}
1086
1087impl LastWill<'_> {
1088    #[inline]
1089    pub fn will_delay_interval(&self) -> Option<Duration> {
1090        match self {
1091            LastWill::V3(_) => None,
1092            LastWill::V5(lw) => lw.will_delay_interval_sec.map(|i| Duration::from_secs(i as u64)),
1093        }
1094    }
1095
1096    #[inline]
1097    pub fn to_json(&self) -> serde_json::Value {
1098        match self {
1099            LastWill::V3(lw) => {
1100                json!({
1101                    "qos": lw.qos.value(),
1102                    "retain": lw.retain,
1103                    "topic": lw.topic,
1104                    "message": BASE64_STANDARD.encode(lw.message.as_ref()),
1105                })
1106            }
1107            LastWill::V5(lw) => {
1108                json!({
1109                    "qos": lw.qos.value(),
1110                    "retain": lw.retain,
1111                    "topic": lw.topic,
1112                    "message": BASE64_STANDARD.encode(lw.message.as_ref()),
1113
1114                    "will_delay_interval_sec": lw.will_delay_interval_sec,
1115                    "correlation_data": lw.correlation_data,
1116                    "message_expiry_interval": lw.message_expiry_interval,
1117                    "content_type": lw.content_type,
1118                    "user_properties": lw.user_properties,
1119                    "is_utf8_payload": lw.is_utf8_payload,
1120                    "response_topic": lw.response_topic,
1121                })
1122            }
1123        }
1124    }
1125}
1126
1127impl Serialize for LastWill<'_> {
1128    #[inline]
1129    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1130    where
1131        S: Serializer,
1132    {
1133        match self {
1134            LastWill::V3(lw) => {
1135                let mut s = serializer.serialize_struct("LastWill", 4)?;
1136                s.serialize_field("qos", &lw.qos.value())?;
1137                s.serialize_field("retain", &lw.retain)?;
1138                s.serialize_field("topic", &lw.topic)?;
1139                s.serialize_field("message", &lw.message)?;
1140                s.end()
1141            }
1142            LastWill::V5(lw) => {
1143                let mut s = serializer.serialize_struct("LastWill", 11)?;
1144                s.serialize_field("qos", &lw.qos.value())?;
1145                s.serialize_field("retain", &lw.retain)?;
1146                s.serialize_field("topic", &lw.topic)?;
1147                s.serialize_field("message", &lw.message)?;
1148
1149                s.serialize_field("will_delay_interval_sec", &lw.will_delay_interval_sec)?;
1150                s.serialize_field("correlation_data", &lw.correlation_data)?;
1151                s.serialize_field("message_expiry_interval", &lw.message_expiry_interval)?;
1152                s.serialize_field("content_type", &lw.content_type)?;
1153                s.serialize_field("user_properties", &lw.user_properties)?;
1154                s.serialize_field("is_utf8_payload", &lw.is_utf8_payload)?;
1155                s.serialize_field("response_topic", &lw.response_topic)?;
1156
1157                s.end()
1158            }
1159        }
1160    }
1161}
1162
1163#[derive(Serialize, Deserialize, Clone, Debug)]
1164/// High-level Publish structure used in the broker.
1165///
1166/// This wraps a low-level `CodecPublish` packet and adds
1167/// extra metadata such as the target client identifier.
1168pub struct Publish {
1169    /// The raw publish packet decoded from the codec layer.
1170    pub inner: Box<CodecPublish>,
1171
1172    /// Optional target client ID that this publish message
1173    /// should be delivered to.
1174    pub target_clientid: Option<ClientId>,
1175
1176    /// Delayed publish interval in seconds
1177    pub delay_interval: Option<u32>,
1178
1179    /// Message creation timestamp
1180    pub create_time: Option<i64>,
1181}
1182
1183impl Publish {
1184    #[inline]
1185    pub fn target_clientid(mut self, target_clientid: ClientId) -> Self {
1186        self.target_clientid = Some(target_clientid);
1187        self
1188    }
1189
1190    #[inline]
1191    pub fn delay_interval(mut self, delay_interval: u32) -> Self {
1192        self.delay_interval = Some(delay_interval);
1193        self
1194    }
1195
1196    #[inline]
1197    pub fn create_time(mut self, create_time: i64) -> Self {
1198        self.create_time = Some(create_time);
1199        self
1200    }
1201
1202    #[inline]
1203    pub fn new(
1204        inner: Box<CodecPublish>,
1205        target_clientid: Option<ClientId>,
1206        delay_interval: Option<u32>,
1207        create_time: Option<i64>,
1208    ) -> Self {
1209        Publish { inner, target_clientid, delay_interval, create_time }
1210    }
1211
1212    #[inline]
1213    pub fn take(self) -> Box<CodecPublish> {
1214        self.inner
1215    }
1216
1217    #[inline]
1218    pub fn take_topic(self) -> ByteString {
1219        self.inner.topic
1220    }
1221
1222    #[inline]
1223    pub fn take_payload(self) -> Bytes {
1224        self.inner.payload
1225    }
1226}
1227
1228impl Deref for Publish {
1229    type Target = CodecPublish;
1230    #[inline]
1231    fn deref(&self) -> &Self::Target {
1232        self.inner.as_ref()
1233    }
1234}
1235
1236impl DerefMut for Publish {
1237    #[inline]
1238    fn deref_mut(&mut self) -> &mut Self::Target {
1239        self.inner.as_mut()
1240    }
1241}
1242
1243impl std::convert::From<CodecPublish> for Publish {
1244    fn from(p: CodecPublish) -> Self {
1245        Publish { inner: Box::new(p), target_clientid: None, delay_interval: None, create_time: None }
1246    }
1247}
1248
1249impl std::convert::From<Box<CodecPublish>> for Publish {
1250    fn from(p: Box<CodecPublish>) -> Self {
1251        Publish { inner: p, target_clientid: None, delay_interval: None, create_time: None }
1252    }
1253}
1254
1255impl<'a> std::convert::TryFrom<LastWill<'a>> for Publish {
1256    type Error = MqttError;
1257
1258    #[inline]
1259    fn try_from(lw: LastWill<'a>) -> std::result::Result<Self, Self::Error> {
1260        let (retain, qos, topic, payload, props) = match lw {
1261            LastWill::V3(lw) => {
1262                let (topic, user_properties) = if let Some(pos) = lw.topic.find('?') {
1263                    let topic = lw.topic.clone();
1264                    let query = lw.topic.as_bytes().slice(pos + 1..lw.topic.len());
1265                    let user_props = url::form_urlencoded::parse(query.as_ref())
1266                        .into_owned()
1267                        .map(|(key, val)| (ByteString::from(key), ByteString::from(val)))
1268                        .collect::<UserProperties>();
1269                    (topic, user_props)
1270                } else {
1271                    let topic = lw.topic.clone();
1272                    (topic, UserProperties::default())
1273                };
1274                let props = PublishProperties { user_properties, ..Default::default() };
1275                (lw.retain, lw.qos, topic, lw.message.clone(), props)
1276            }
1277            LastWill::V5(lw) => {
1278                let topic = lw.topic.clone();
1279                let props = PublishProperties {
1280                    correlation_data: lw.correlation_data.clone(),
1281                    message_expiry_interval: lw.message_expiry_interval,
1282                    content_type: lw.content_type.clone(),
1283                    user_properties: lw.user_properties.clone(),
1284                    is_utf8_payload: lw.is_utf8_payload.unwrap_or_default(),
1285                    response_topic: lw.response_topic.clone(),
1286                    ..Default::default()
1287                };
1288                (lw.retain, lw.qos, topic, lw.message.clone(), props)
1289            }
1290        };
1291
1292        let p = CodecPublish {
1293            dup: false,
1294            retain,
1295            qos,
1296            topic,
1297            packet_id: None,
1298            payload,
1299            properties: Some(props),
1300        };
1301        let p = <CodecPublish as Into<Publish>>::into(p).create_time(timestamp_millis());
1302        Ok(p)
1303    }
1304}
1305
1306pub enum Sink<Io> {
1307    V3(v3::MqttStream<Io>),
1308    V5(v5::MqttStream<Io>),
1309}
1310
1311impl<Io> Sink<Io>
1312where
1313    Io: AsyncRead + AsyncWrite + Unpin,
1314{
1315    #[inline]
1316    pub(crate) fn v3_mut(&mut self) -> &mut v3::MqttStream<Io> {
1317        if let Sink::V3(s) = self {
1318            s
1319        } else {
1320            unreachable!()
1321        }
1322    }
1323
1324    #[inline]
1325    pub(crate) fn v5_mut(&mut self) -> &mut v5::MqttStream<Io> {
1326        if let Sink::V5(s) = self {
1327            s
1328        } else {
1329            unreachable!()
1330        }
1331    }
1332
1333    #[inline]
1334    pub(crate) async fn recv(&mut self) -> Result<Option<Packet>> {
1335        match self {
1336            Sink::V3(s) => match s.next().await {
1337                Some(Ok(pkt)) => Ok(Some(Packet::V3(pkt))),
1338                Some(Err(e)) => Err(e),
1339                None => Ok(None),
1340            },
1341            Sink::V5(s) => match s.next().await {
1342                Some(Ok(pkt)) => Ok(Some(Packet::V5(pkt))),
1343                Some(Err(e)) => Err(e),
1344                None => Ok(None),
1345            },
1346        }
1347    }
1348
1349    #[inline]
1350    #[allow(dead_code)]
1351    pub(crate) async fn close(&mut self) -> Result<()> {
1352        match self {
1353            Sink::V3(s) => {
1354                s.close().await?;
1355            }
1356            Sink::V5(s) => s.close().await?,
1357        }
1358        Ok(())
1359    }
1360
1361    #[inline]
1362    pub(crate) async fn publish(
1363        &mut self,
1364        mut p: Publish,
1365        message_expiry_interval: Option<NonZeroU32>,
1366        server_topic_aliases: Option<&Arc<ServerTopicAliases>>,
1367    ) -> Result<()> {
1368        match self {
1369            Sink::V3(s) => {
1370                s.send_publish(p.take()).await?;
1371            }
1372            Sink::V5(s) => {
1373                let (topic, alias) = {
1374                    if let Some(server_topic_aliases) = server_topic_aliases {
1375                        server_topic_aliases.get(p.topic.clone()).await
1376                    } else {
1377                        (Some(p.topic.clone()), None)
1378                    }
1379                };
1380
1381                p.topic = topic.unwrap_or_default();
1382
1383                if let Some(properties) = &mut p.properties {
1384                    properties.message_expiry_interval = message_expiry_interval;
1385                    properties.topic_alias = alias;
1386                }
1387                s.send_publish(p.take()).await?;
1388            }
1389        }
1390        Ok(())
1391    }
1392
1393    #[inline]
1394    pub(crate) async fn send_publish_ack(
1395        &mut self,
1396        packet_id: NonZeroU16,
1397        pubres: PublishResult,
1398    ) -> Result<()> {
1399        match self {
1400            Sink::V3(s) => {
1401                s.send_publish_ack(packet_id).await?;
1402            }
1403            Sink::V5(s) => {
1404                let ack = PublishAckV5 {
1405                    packet_id,
1406                    reason_code: pubres.reason_code,
1407                    properties: pubres.properties,
1408                    reason_string: pubres.reason_string,
1409                };
1410                s.send_publish_ack(ack).await?;
1411            }
1412        }
1413        Ok(())
1414    }
1415
1416    #[inline]
1417    pub(crate) async fn send_publish_received(
1418        &mut self,
1419        packet_id: NonZeroU16,
1420        pubres: PublishResult,
1421    ) -> Result<()> {
1422        match self {
1423            Sink::V3(s) => {
1424                s.send_publish_received(packet_id).await?;
1425            }
1426            Sink::V5(s) => {
1427                let ack = PublishAckV5 {
1428                    packet_id,
1429                    reason_code: pubres.reason_code,
1430                    properties: pubres.properties,
1431                    reason_string: pubres.reason_string,
1432                };
1433                s.send_publish_received(ack).await?;
1434            }
1435        }
1436        Ok(())
1437    }
1438
1439    #[inline]
1440    #[allow(dead_code)]
1441    pub(crate) async fn send_publish_release(&mut self, packet_id: NonZeroU16) -> Result<()> {
1442        match self {
1443            Sink::V3(s) => {
1444                s.send_publish_release(packet_id).await?;
1445            }
1446            Sink::V5(s) => {
1447                let ack2 =
1448                    PublishAck2 { packet_id, reason_code: PublishAck2Reason::Success, ..Default::default() };
1449                s.send_publish_release(ack2).await?;
1450            }
1451        }
1452        Ok(())
1453    }
1454
1455    #[inline]
1456    #[allow(dead_code)]
1457    pub(crate) async fn send(&mut self, p: Packet) -> Result<()> {
1458        match self {
1459            Sink::V3(s) => {
1460                if let Packet::V3(p) = p {
1461                    s.send(p).await?;
1462                }
1463            }
1464            Sink::V5(s) => {
1465                if let Packet::V5(p) = p {
1466                    s.send(p).await?;
1467                }
1468            }
1469        }
1470        Ok(())
1471    }
1472}
1473
1474#[derive(Debug, Clone, Deserialize, Serialize)]
1475pub struct PublishResult {
1476    pub reason_code: PublishAckReason,
1477    pub properties: UserProperties,
1478    pub reason_string: Option<ByteString>,
1479    pub disconnect: IsDisconnect,
1480}
1481
1482impl PublishResult {
1483    #[inline]
1484    pub fn success() -> PublishResult {
1485        PublishResult { reason_code: PublishAckReason::Success, ..Default::default() }
1486    }
1487
1488    #[inline]
1489    pub fn reason_code(
1490        reason_code: PublishAckReason,
1491        reason_string: Option<ByteString>,
1492        disconnect: IsDisconnect,
1493    ) -> PublishResult {
1494        PublishResult { reason_code, reason_string, disconnect, ..Default::default() }
1495    }
1496
1497    #[inline]
1498    pub fn is_success(&self) -> bool {
1499        matches!(self.reason_code, PublishAckReason::Success)
1500    }
1501}
1502
1503impl Default for PublishResult {
1504    fn default() -> Self {
1505        Self {
1506            reason_code: PublishAckReason::Success,
1507            properties: UserProperties::default(),
1508            reason_string: None,
1509            disconnect: false,
1510        }
1511    }
1512}
1513
1514#[allow(clippy::large_enum_variant)]
1515#[derive(Debug, Clone)]
1516pub enum Packet {
1517    V3(codec::v3::Packet),
1518    V5(codec::v5::Packet),
1519}
1520
1521#[derive(GetSize, Debug, Clone, Copy, Deserialize, Serialize)]
1522pub enum FromType {
1523    Custom,
1524    Admin,
1525    System,
1526    LastWill,
1527    Bridge,
1528}
1529
1530impl FromType {
1531    #[inline]
1532    pub fn as_str(&self) -> &str {
1533        match self {
1534            FromType::Custom => "custom",
1535            FromType::Admin => "admin",
1536            FromType::System => "system",
1537            FromType::LastWill => "lastwill",
1538            FromType::Bridge => "bridge",
1539        }
1540    }
1541}
1542
1543impl std::fmt::Display for FromType {
1544    #[inline]
1545    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1546        write!(f, "{}", self.as_str())
1547    }
1548}
1549
1550#[derive(GetSize, Clone, Deserialize, Serialize)]
1551pub struct From {
1552    typ: FromType,
1553    pub id: Id,
1554}
1555
1556impl From {
1557    #[inline]
1558    pub fn from_custom(id: Id) -> From {
1559        From { typ: FromType::Custom, id }
1560    }
1561
1562    #[inline]
1563    pub fn from_admin(id: Id) -> From {
1564        From { typ: FromType::Admin, id }
1565    }
1566
1567    #[inline]
1568    pub fn from_bridge(id: Id) -> From {
1569        From { typ: FromType::Bridge, id }
1570    }
1571
1572    #[inline]
1573    pub fn from_system(id: Id) -> From {
1574        From { typ: FromType::System, id }
1575    }
1576
1577    #[inline]
1578    pub fn from_lastwill(id: Id) -> From {
1579        From { typ: FromType::LastWill, id }
1580    }
1581
1582    #[inline]
1583    pub fn typ(&self) -> FromType {
1584        self.typ
1585    }
1586
1587    #[inline]
1588    pub fn is_system(&self) -> bool {
1589        matches!(self.typ, FromType::System)
1590    }
1591
1592    #[inline]
1593    pub fn is_custom(&self) -> bool {
1594        matches!(self.typ, FromType::Custom)
1595    }
1596
1597    #[inline]
1598    pub fn to_from_json(&self, json: serde_json::Value) -> serde_json::Value {
1599        let mut json = self.id.to_from_json(json);
1600        if let Some(obj) = json.as_object_mut() {
1601            obj.insert("from_type".into(), serde_json::Value::String(self.typ.to_string()));
1602        }
1603        json
1604    }
1605}
1606
1607impl Deref for From {
1608    type Target = Id;
1609    #[inline]
1610    fn deref(&self) -> &Self::Target {
1611        &self.id
1612    }
1613}
1614
1615impl std::fmt::Debug for From {
1616    #[inline]
1617    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1618        write!(f, "{}-{:?}", self.id, self.typ)
1619    }
1620}
1621
1622pub type To = Id;
1623
1624#[derive(Clone)]
1625pub struct Id(Arc<_Id>);
1626
1627impl get_size::GetSize for Id {
1628    fn get_heap_size(&self) -> usize {
1629        self.0.get_heap_size()
1630    }
1631}
1632
1633impl Id {
1634    #[inline]
1635    pub fn new(
1636        node_id: NodeId,
1637        lid: ListenerId,
1638        local_addr: Option<SocketAddr>,
1639        remote_addr: Option<SocketAddr>,
1640        client_id: ClientId,
1641        username: Option<UserName>,
1642    ) -> Self {
1643        Self(Arc::new(_Id {
1644            node_id,
1645            lid,
1646            local_addr,
1647            remote_addr,
1648            client_id,
1649            username,
1650            create_time: timestamp_millis(),
1651        }))
1652    }
1653
1654    #[inline]
1655    pub fn to_json(&self) -> serde_json::Value {
1656        json!({
1657            "node": self.node(),
1658            "ipaddress": self.remote_addr,
1659            "clientid": self.client_id,
1660            "username": self.username_ref(),
1661            "create_time": self.create_time,
1662        })
1663    }
1664
1665    #[inline]
1666    pub fn to_from_json(&self, mut json: serde_json::Value) -> serde_json::Value {
1667        if let Some(obj) = json.as_object_mut() {
1668            obj.insert("from_node".into(), serde_json::Value::Number(serde_json::Number::from(self.node())));
1669            obj.insert(
1670                "from_ipaddress".into(),
1671                self.remote_addr
1672                    .map(|a| serde_json::Value::String(a.to_string()))
1673                    .unwrap_or(serde_json::Value::Null),
1674            );
1675            obj.insert("from_clientid".into(), serde_json::Value::String(self.client_id.to_string()));
1676            obj.insert("from_username".into(), serde_json::Value::String(self.username_ref().into()));
1677        }
1678        json
1679    }
1680
1681    #[inline]
1682    pub fn to_to_json(&self, mut json: serde_json::Value) -> serde_json::Value {
1683        if let Some(obj) = json.as_object_mut() {
1684            obj.insert("node".into(), serde_json::Value::Number(serde_json::Number::from(self.node())));
1685            obj.insert(
1686                "ipaddress".into(),
1687                self.remote_addr
1688                    .map(|a| serde_json::Value::String(a.to_string()))
1689                    .unwrap_or(serde_json::Value::Null),
1690            );
1691            obj.insert("clientid".into(), serde_json::Value::String(self.client_id.to_string()));
1692            obj.insert("username".into(), serde_json::Value::String(self.username_ref().into()));
1693        }
1694        json
1695    }
1696
1697    #[inline]
1698    pub fn from(node_id: NodeId, client_id: ClientId) -> Self {
1699        Self::new(node_id, 0, None, None, client_id, None)
1700    }
1701
1702    #[inline]
1703    pub fn node(&self) -> NodeId {
1704        self.node_id
1705    }
1706
1707    #[inline]
1708    pub fn lid(&self) -> ListenerId {
1709        self.lid
1710    }
1711
1712    #[inline]
1713    pub fn username(&self) -> UserName {
1714        self.username.clone().unwrap_or_else(|| UserName::from_static(UNDEFINED))
1715    }
1716
1717    #[inline]
1718    pub fn username_ref(&self) -> &str {
1719        self.username.as_ref().map(<UserName as AsRef<str>>::as_ref).unwrap_or_else(|| UNDEFINED)
1720    }
1721}
1722
1723impl Display for Id {
1724    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1725        write!(
1726            f,
1727            "{}@{}:{}/{}/{}/{}/{}",
1728            self.node_id,
1729            self.local_addr.map(|addr| addr.ip().to_string()).unwrap_or_default(),
1730            self.lid,
1731            self.remote_addr.map(|addr| addr.to_string()).unwrap_or_default(),
1732            self.client_id,
1733            self.username_ref(),
1734            self.create_time
1735        )
1736    }
1737}
1738
1739impl std::fmt::Debug for Id {
1740    #[inline]
1741    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1742        write!(f, "{self}")
1743    }
1744}
1745
1746impl PartialEq<Id> for Id {
1747    #[inline]
1748    fn eq(&self, o: &Id) -> bool {
1749        self.node_id == o.node_id
1750            && self.lid == o.lid
1751            && self.client_id == o.client_id
1752            && self.local_addr == o.local_addr
1753            && self.remote_addr == o.remote_addr
1754            && self.username == o.username
1755            && self.create_time == o.create_time
1756    }
1757}
1758
1759impl Eq for Id {}
1760
1761impl std::hash::Hash for Id {
1762    #[inline]
1763    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1764        self.node_id.hash(state);
1765        self.lid.hash(state);
1766        self.local_addr.hash(state);
1767        self.remote_addr.hash(state);
1768        self.client_id.hash(state);
1769        self.username.hash(state);
1770        self.create_time.hash(state);
1771    }
1772}
1773
1774impl Deref for Id {
1775    type Target = _Id;
1776    #[inline]
1777    fn deref(&self) -> &Self::Target {
1778        &self.0
1779    }
1780}
1781
1782impl Serialize for Id {
1783    #[inline]
1784    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1785    where
1786        S: Serializer,
1787    {
1788        _Id::serialize(self.0.as_ref(), serializer)
1789    }
1790}
1791
1792impl<'de> Deserialize<'de> for Id {
1793    #[inline]
1794    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1795    where
1796        D: Deserializer<'de>,
1797    {
1798        Ok(Id(Arc::new(_Id::deserialize(deserializer)?)))
1799    }
1800}
1801
1802#[derive(Debug, PartialEq, Eq, Hash, Clone, GetSize, Deserialize, Serialize)]
1803pub struct _Id {
1804    pub node_id: NodeId,
1805    pub lid: ListenerId,
1806    #[get_size(size_fn = get_option_addr_size_helper)]
1807    pub local_addr: Option<SocketAddr>,
1808    #[get_size(size_fn = get_option_addr_size_helper)]
1809    pub remote_addr: Option<SocketAddr>,
1810    #[get_size(size_fn = get_bytestring_size_helper)]
1811    pub client_id: ClientId,
1812    #[get_size(size_fn = get_option_bytestring_size_helper)]
1813    pub username: Option<UserName>,
1814    pub create_time: TimestampMillis,
1815}
1816
1817fn get_bytestring_size_helper(s: &ByteString) -> usize {
1818    s.len()
1819}
1820
1821fn get_option_bytestring_size_helper(s: &Option<ByteString>) -> usize {
1822    if let Some(s) = s {
1823        s.len()
1824    } else {
1825        0
1826    }
1827}
1828
1829fn get_option_addr_size_helper(s: &Option<SocketAddr>) -> usize {
1830    if let Some(s) = s {
1831        match s {
1832            SocketAddr::V4(s) => size_of_val(s),
1833            SocketAddr::V6(s) => size_of_val(s),
1834        }
1835    } else {
1836        0
1837    }
1838}
1839
1840#[derive(Debug, Clone, Deserialize, Serialize)]
1841pub struct Retain {
1842    pub msg_id: Option<MsgID>,
1843    pub from: From,
1844    pub publish: Publish,
1845}
1846
1847pub type MsgID = usize;
1848
1849#[derive(Debug, Clone, Deserialize, Serialize, GetSize)]
1850pub struct StoredMessage {
1851    pub msg_id: MsgID,
1852    pub from: From,
1853    #[get_size(size_fn = get_publish_size_helper)]
1854    pub publish: Publish,
1855    pub expiry_time_at: TimestampMillis,
1856}
1857
1858fn get_bytes_size_helper(s: &Bytes) -> usize {
1859    s.len()
1860}
1861
1862fn get_properties_size_helper(s: &PublishProperties) -> usize {
1863    s.content_type.as_ref().map(|ct| ct.len()).unwrap_or_default()
1864        + s.correlation_data.as_ref().map(|cd| cd.len()).unwrap_or_default()
1865        + s.user_properties.len() * size_of::<UserProperty>()
1866        + s.response_topic.as_ref().map(|rt| rt.len()).unwrap_or_default()
1867        + s.subscription_ids.len() * size_of::<NonZeroU32>()
1868}
1869
1870fn get_publish_size_helper(p: &Publish) -> usize {
1871    // p.create_time.get_heap_size()
1872    p.packet_id.get_heap_size()
1873        // + p.dup.get_heap_size()
1874        + get_bytes_size_helper(&p.payload)
1875        // + p.retain.get_heap_size()
1876        + get_bytestring_size_helper(&p.topic)
1877        + size_of_val(&p.qos)
1878        + p.properties.as_ref().map(get_properties_size_helper).unwrap_or_default()
1879}
1880
1881impl StoredMessage {
1882    #[inline]
1883    pub fn is_expiry(&self) -> bool {
1884        self.expiry_time_at < timestamp_millis()
1885    }
1886
1887    #[inline]
1888    pub fn encode(&self) -> Result<Vec<u8>> {
1889        Ok(bincode::serialize(&self)?)
1890    }
1891
1892    #[inline]
1893    pub fn decode(data: &[u8]) -> Result<Self> {
1894        Ok(bincode::deserialize(data)?)
1895    }
1896}
1897
1898#[derive(Debug)]
1899pub enum Message {
1900    Forward(From, Publish),
1901    SendRerelease(OutInflightMessage),
1902    Kick(oneshot::Sender<()>, Id, CleanStart, IsAdmin),
1903    // Disconnect(Disconnect),
1904    Closed(Reason),
1905    Subscribe(Subscribe, oneshot::Sender<Result<SubscribeReturn>>),
1906    Subscribes(Vec<Subscribe>, Option<oneshot::Sender<Vec<Result<SubscribeReturn>>>>),
1907    Unsubscribe(Unsubscribe, oneshot::Sender<Result<()>>),
1908    SessionStateTransfer(OfflineInfo, CleanStart),
1909}
1910
1911#[derive(Serialize, Deserialize, Debug, Clone)]
1912pub struct SessionStatus {
1913    pub id: Id,
1914    pub online: IsOnline,
1915    pub handshaking: bool,
1916}
1917
1918#[derive(Deserialize, Serialize, Debug, Default, Clone)]
1919pub struct SubsSearchParams {
1920    #[serde(default)]
1921    pub _limit: usize,
1922    pub clientid: Option<String>,
1923    pub topic: Option<String>,
1924    //value is 0,1,2
1925    pub qos: Option<u8>,
1926    pub share: Option<SharedGroup>,
1927    pub _match_topic: Option<String>,
1928}
1929
1930#[derive(Deserialize, Serialize, Debug, Default)]
1931pub struct SubsSearchResult {
1932    pub node_id: NodeId,
1933    pub clientid: ClientId,
1934    pub client_addr: Option<SocketAddr>,
1935    pub topic: TopicFilter,
1936    pub opts: SubscriptionOptions,
1937}
1938
1939impl SubsSearchResult {
1940    #[inline]
1941    pub fn to_json(self) -> serde_json::Value {
1942        json!({
1943            "node_id": self.node_id,
1944            "clientid": self.clientid,
1945            "client_addr": self.client_addr,
1946            "topic": self.topic,
1947            "opts": self.opts.to_json(),
1948        })
1949    }
1950}
1951
1952#[derive(Deserialize, Serialize, Debug, Default, PartialEq, Eq, Hash, Clone)]
1953pub struct Route {
1954    pub node_id: NodeId,
1955    pub topic: TopicFilter,
1956}
1957
1958pub type SessionSubMap = HashMap<TopicFilter, SubscriptionOptions>;
1959#[derive(Clone)]
1960pub struct SessionSubs {
1961    subs: Arc<RwLock<SessionSubMap>>,
1962}
1963
1964impl Deref for SessionSubs {
1965    type Target = Arc<RwLock<SessionSubMap>>;
1966    #[inline]
1967    fn deref(&self) -> &Self::Target {
1968        &self.subs
1969    }
1970}
1971
1972impl Default for SessionSubs {
1973    fn default() -> Self {
1974        Self::new()
1975    }
1976}
1977
1978impl SessionSubs {
1979    #[inline]
1980    pub fn new() -> Self {
1981        Self::from(SessionSubMap::default())
1982    }
1983
1984    #[inline]
1985    #[allow(clippy::mutable_key_type)]
1986    pub fn from(subs: SessionSubMap) -> Self {
1987        Self { subs: Arc::new(RwLock::new(subs)) }
1988    }
1989
1990    #[inline]
1991    #[allow(unused_variables)]
1992    pub(crate) async fn _add(
1993        &self,
1994        scx: &ServerContext,
1995        topic_filter: TopicFilter,
1996        opts: SubscriptionOptions,
1997    ) -> Option<SubscriptionOptions> {
1998        #[cfg(feature = "shared-subscription")]
1999        let is_shared = opts.has_shared_group();
2000
2001        let prev = {
2002            let mut subs = self.subs.write().await;
2003            let prev = subs.insert(topic_filter, opts);
2004            subs.shrink_to_fit();
2005            prev
2006        };
2007
2008        if let Some(prev_opts) = &prev {
2009            #[cfg(feature = "shared-subscription")]
2010            match (prev_opts.has_shared_group(), is_shared) {
2011                (true, false) => {
2012                    #[cfg(feature = "stats")]
2013                    scx.stats.subscriptions_shared.dec();
2014                }
2015                (false, true) => {
2016                    #[cfg(feature = "stats")]
2017                    scx.stats.subscriptions_shared.inc();
2018                }
2019                (false, false) => {}
2020                (true, true) => {}
2021            }
2022        } else {
2023            #[cfg(feature = "stats")]
2024            scx.stats.subscriptions.inc();
2025            #[cfg(feature = "shared-subscription")]
2026            if is_shared {
2027                #[cfg(feature = "stats")]
2028                scx.stats.subscriptions_shared.inc();
2029            }
2030        }
2031
2032        prev
2033    }
2034
2035    #[inline]
2036    pub(crate) async fn _remove(
2037        &self,
2038        #[allow(unused_variables)] scx: &ServerContext,
2039        topic_filter: &str,
2040    ) -> Option<(TopicFilter, SubscriptionOptions)> {
2041        let removed = {
2042            let mut subs = self.subs.write().await;
2043            let removed = subs.remove_entry(topic_filter);
2044            subs.shrink_to_fit();
2045            removed
2046        };
2047
2048        #[allow(unused_variables)]
2049        if let Some((_, opts)) = &removed {
2050            #[cfg(feature = "stats")]
2051            scx.stats.subscriptions.dec();
2052            #[cfg(feature = "shared-subscription")]
2053            if opts.has_shared_group() {
2054                #[cfg(feature = "stats")]
2055                scx.stats.subscriptions_shared.dec();
2056            }
2057        }
2058
2059        removed
2060    }
2061
2062    #[inline]
2063    pub(crate) async fn _drain(&self, scx: &ServerContext) -> Subscriptions {
2064        let topic_filters = self.subs.read().await.keys().cloned().collect::<Vec<_>>();
2065        let mut subs = Vec::new();
2066        for tf in topic_filters {
2067            if let Some(sub) = self._remove(scx, &tf).await {
2068                subs.push(sub);
2069            }
2070        }
2071        subs
2072    }
2073
2074    #[inline]
2075    pub(crate) async fn _extend(&self, scx: &ServerContext, subs: Subscriptions) {
2076        for (topic_filter, opts) in subs {
2077            self._add(scx, topic_filter, opts).await;
2078        }
2079    }
2080
2081    #[inline]
2082    pub async fn clear(&self, #[allow(unused_variables)] scx: &ServerContext) {
2083        {
2084            let subs = self.subs.read().await;
2085            #[allow(unused_variables)]
2086            for (_, opts) in subs.iter() {
2087                #[cfg(feature = "stats")]
2088                scx.stats.subscriptions.dec();
2089                #[cfg(feature = "shared-subscription")]
2090                if opts.has_shared_group() {
2091                    #[cfg(feature = "stats")]
2092                    scx.stats.subscriptions_shared.dec();
2093                }
2094            }
2095        }
2096        let mut subs = self.subs.write().await;
2097        subs.clear();
2098        subs.shrink_to_fit();
2099    }
2100
2101    #[inline]
2102    pub async fn len(&self) -> usize {
2103        self.subs.read().await.len()
2104    }
2105
2106    #[inline]
2107    pub async fn shared_len(&self) -> usize {
2108        #[cfg(feature = "shared-subscription")]
2109        {
2110            self.subs.read().await.iter().filter(|(_, opts)| opts.has_shared_group()).count()
2111        }
2112        #[cfg(not(feature = "shared-subscription"))]
2113        {
2114            0
2115        }
2116    }
2117
2118    #[inline]
2119    pub async fn is_empty(&self) -> bool {
2120        self.subs.read().await.is_empty()
2121    }
2122
2123    #[inline]
2124    pub async fn to_topic_filters(&self) -> TopicFilters {
2125        self.subs.read().await.keys().cloned().collect()
2126    }
2127}
2128
2129// pub struct ExtraData<K, T> {
2130//     attrs: Arc<parking_lot::RwLock<HashMap<K, T>>>,
2131// }
2132//
2133// impl<K, T> Deref for ExtraData<K, T> {
2134//     type Target = Arc<parking_lot::RwLock<HashMap<K, T>>>;
2135//     #[inline]
2136//     fn deref(&self) -> &Self::Target {
2137//         &self.attrs
2138//     }
2139// }
2140//
2141// impl<K, T> Serialize for ExtraData<K, T>
2142// where
2143//     K: serde::Serialize,
2144//     T: serde::Serialize,
2145// {
2146//     #[inline]
2147//     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2148//     where
2149//         S: Serializer,
2150//     {
2151//         self.attrs.read().deref().serialize(serializer)
2152//     }
2153// }
2154//
2155// impl<'de, K, T> Deserialize<'de> for ExtraData<K, T>
2156// where
2157//     K: Eq + Hash,
2158//     K: serde::de::DeserializeOwned,
2159//     T: serde::de::DeserializeOwned,
2160// {
2161//     #[inline]
2162//     fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2163//     where
2164//         D: Deserializer<'de>,
2165//     {
2166//         let v = HashMap::deserialize(deserializer)?;
2167//         Ok(Self { attrs: Arc::new(parking_lot::RwLock::new(v)) })
2168//     }
2169// }
2170//
2171// impl<K, T> Default for ExtraData<K, T>
2172// where
2173//     K: Eq + Hash,
2174// {
2175//     fn default() -> Self {
2176//         Self::new()
2177//     }
2178// }
2179// impl<K, T> ExtraData<K, T>
2180// where
2181//     K: Eq + Hash,
2182// {
2183//     #[inline]
2184//     pub fn new() -> Self {
2185//         Self { attrs: Arc::new(parking_lot::RwLock::new(HashMap::default())) }
2186//     }
2187//
2188//     #[inline]
2189//     pub fn len(&self) -> usize {
2190//         self.attrs.read().len()
2191//     }
2192//
2193//     #[inline]
2194//     pub fn is_empty(&self) -> bool {
2195//         self.attrs.read().is_empty()
2196//     }
2197//
2198//     #[inline]
2199//     pub fn clear(&self) {
2200//         self.attrs.write().clear()
2201//     }
2202//
2203//     #[inline]
2204//     pub fn insert(&self, key: K, value: T) {
2205//         self.attrs.write().insert(key, value);
2206//     }
2207// }
2208//
2209pub struct ExtraAttrs {
2210    attrs: HashMap<String, Box<dyn Any + Sync + Send>>,
2211}
2212
2213impl Default for ExtraAttrs {
2214    fn default() -> Self {
2215        Self::new()
2216    }
2217}
2218
2219impl ExtraAttrs {
2220    #[inline]
2221    pub fn new() -> Self {
2222        Self { attrs: HashMap::default() }
2223    }
2224
2225    #[inline]
2226    pub fn len(&self) -> usize {
2227        self.attrs.len()
2228    }
2229
2230    #[inline]
2231    pub fn is_empty(&self) -> bool {
2232        self.attrs.is_empty()
2233    }
2234
2235    #[inline]
2236    pub fn clear(&mut self) {
2237        self.attrs.clear()
2238    }
2239
2240    #[inline]
2241    pub fn insert<T: Any + Sync + Send>(&mut self, key: String, value: T) {
2242        self.attrs.insert(key, Box::new(value));
2243    }
2244
2245    #[inline]
2246    pub fn get<T: Any + Sync + Send>(&self, key: &str) -> Option<&T> {
2247        self.attrs.get(key).and_then(|v| v.downcast_ref::<T>())
2248    }
2249
2250    #[inline]
2251    pub fn get_mut<T: Any + Sync + Send>(&mut self, key: &str) -> Option<&mut T> {
2252        self.attrs.get_mut(key).and_then(|v| v.downcast_mut::<T>())
2253    }
2254
2255    #[inline]
2256    pub fn get_default_mut<T: Any + Sync + Send, F: Fn() -> T>(
2257        &mut self,
2258        key: String,
2259        def_fn: F,
2260    ) -> Option<&mut T> {
2261        self.attrs.entry(key).or_insert_with(|| Box::new(def_fn())).downcast_mut::<T>()
2262    }
2263
2264    #[inline]
2265    pub fn serialize_key<S, T>(&self, key: &str, serializer: S) -> std::result::Result<S::Ok, S::Error>
2266    where
2267        T: Any + Sync + Send + serde::ser::Serialize,
2268        S: serde::ser::Serializer,
2269    {
2270        self.get::<T>(key).serialize(serializer)
2271    }
2272}
2273
2274#[derive(Clone, Debug)]
2275pub struct TimedValue<V>(V, Option<Instant>);
2276
2277impl<V> TimedValue<V> {
2278    pub fn new(value: V, timeout_duration: Option<Duration>) -> Self {
2279        TimedValue(value, timeout_duration.map(|t| Instant::now() + t))
2280    }
2281
2282    pub fn value(&self) -> &V {
2283        &self.0
2284    }
2285
2286    pub fn value_mut(&mut self) -> &mut V {
2287        &mut self.0
2288    }
2289
2290    pub fn into_value(self) -> V {
2291        self.0
2292    }
2293
2294    pub fn is_expired(&self) -> bool {
2295        self.1.map(|e| Instant::now() >= e).unwrap_or(false)
2296    }
2297}
2298
2299impl<V> PartialEq for TimedValue<V>
2300where
2301    V: PartialEq,
2302{
2303    fn eq(&self, other: &TimedValue<V>) -> bool {
2304        self.value() == other.value()
2305    }
2306}
2307
2308//impl<V> Eq for TimedValue<V> where V: Eq {}
2309
2310bitflags! {
2311    #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
2312    pub struct StateFlags: u8 {
2313        const Kicked = 0b00000001;
2314        const ByAdminKick = 0b00000010;
2315        const DisconnectReceived = 0b00000100;
2316        const CleanStart = 0b00001000;
2317        const Ping = 0b00010000;
2318    }
2319}
2320
2321bitflags! {
2322    #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
2323    pub struct SessionStateFlags: u8 {
2324        const SessionPresent = 0b00000001;
2325        const Superuser = 0b00000010;
2326        const Connected = 0b00000100;
2327    }
2328}
2329
2330impl Serialize for SessionStateFlags {
2331    #[inline]
2332    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2333    where
2334        S: Serializer,
2335    {
2336        let v: u8 = self.0 .0;
2337        v.serialize(serializer)
2338    }
2339}
2340
2341impl<'de> Deserialize<'de> for SessionStateFlags {
2342    #[inline]
2343    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2344    where
2345        D: Deserializer<'de>,
2346    {
2347        let v = u8::deserialize(deserializer)?;
2348        Ok(SessionStateFlags::from_bits_retain(v))
2349    }
2350}
2351
2352#[derive(Deserialize, Serialize, Clone, Debug, Default)]
2353pub enum Reason {
2354    ConnectDisconnect(Option<Disconnect>),
2355    ConnectReadWriteTimeout,
2356    ConnectReadWriteError,
2357    ConnectRemoteClose,
2358    ConnectKeepaliveTimeout,
2359    ConnectKicked(IsAdmin),
2360    HandshakeRateExceeded,
2361    SessionExpiration,
2362    SubscribeFailed(Option<ByteString>),
2363    UnsubscribeFailed(Option<ByteString>),
2364    PublishResult(PublishResult),
2365    SubscribeRefused,
2366    DelayedPublishRefused,
2367    MessageExpiration,
2368    MessageQueueFull,
2369    InflightWindowFull,
2370    ProtocolError(ByteString),
2371    Error(ByteString),
2372    MqttError(MqttError),
2373    Reasons(Vec<Reason>),
2374    #[default]
2375    Unknown,
2376}
2377
2378impl Reason {
2379    #[inline]
2380    pub fn from_static(r: &'static str) -> Self {
2381        Reason::Error(ByteString::from_static(r))
2382    }
2383
2384    #[inline]
2385    pub fn is_kicked(&self, admin_opt: IsAdmin) -> bool {
2386        match self {
2387            Reason::ConnectKicked(_admin_opt) => *_admin_opt == admin_opt,
2388            _ => false,
2389        }
2390    }
2391
2392    #[inline]
2393    pub fn is_kicked_by_admin(&self) -> bool {
2394        matches!(self, Reason::ConnectKicked(true))
2395    }
2396}
2397
2398impl std::convert::From<Reason> for PublishResult {
2399    fn from(reason: Reason) -> Self {
2400        use PublishAckReason::*;
2401        use Reason::*;
2402
2403        match reason {
2404            PublishResult(r) => r, // 已经是 PublishResult,直接返回
2405
2406            MessageQueueFull => Self {
2407                reason_code: QuotaExceeded,
2408                properties: UserProperties::default(),
2409                reason_string: Some("Message queue full".into()),
2410                disconnect: false,
2411            },
2412
2413            InflightWindowFull => Self {
2414                reason_code: QuotaExceeded,
2415                properties: UserProperties::default(),
2416                reason_string: Some("Inflight window full".into()),
2417                disconnect: false,
2418            },
2419
2420            SubscribeRefused | DelayedPublishRefused => Self {
2421                reason_code: ImplementationSpecificError,
2422                properties: UserProperties::default(),
2423                reason_string: Some("Publish refused".into()),
2424                disconnect: false,
2425            },
2426
2427            MessageExpiration => Self {
2428                reason_code: UnspecifiedError,
2429                properties: UserProperties::default(),
2430                reason_string: Some("Message expired".into()),
2431                disconnect: true,
2432            },
2433
2434            ProtocolError(msg) => Self {
2435                reason_code: UnspecifiedError,
2436                properties: UserProperties::default(),
2437                reason_string: Some(format!("Protocol error: {msg}").into()),
2438                disconnect: true,
2439            },
2440
2441            Error(msg) => Self {
2442                reason_code: ImplementationSpecificError,
2443                properties: UserProperties::default(),
2444                reason_string: Some(msg),
2445                disconnect: true,
2446            },
2447
2448            MqttError(_) => Self {
2449                reason_code: UnspecifiedError,
2450                properties: UserProperties::default(),
2451                reason_string: Some("MQTT internal error".into()),
2452                disconnect: true,
2453            },
2454
2455            ConnectDisconnect(_)
2456            | ConnectReadWriteTimeout
2457            | ConnectReadWriteError
2458            | ConnectRemoteClose
2459            | ConnectKeepaliveTimeout
2460            | ConnectKicked(_)
2461            | HandshakeRateExceeded
2462            | SessionExpiration
2463            | SubscribeFailed(_)
2464            | UnsubscribeFailed(_)
2465            | Reasons(_)
2466            | Unknown => Self {
2467                reason_code: UnspecifiedError,
2468                properties: UserProperties::default(),
2469                reason_string: Some("Connection or session related error".into()),
2470                disconnect: true,
2471            },
2472        }
2473    }
2474}
2475
2476impl std::convert::From<&str> for Reason {
2477    #[inline]
2478    fn from(r: &str) -> Self {
2479        Reason::Error(ByteString::from(r))
2480    }
2481}
2482
2483impl std::convert::From<String> for Reason {
2484    #[inline]
2485    fn from(r: String) -> Self {
2486        Reason::Error(ByteString::from(r))
2487    }
2488}
2489
2490impl std::convert::From<MqttError> for Reason {
2491    #[inline]
2492    fn from(e: MqttError) -> Self {
2493        Reason::MqttError(e)
2494    }
2495}
2496
2497impl std::convert::From<Error> for Reason {
2498    #[inline]
2499    fn from(e: Error) -> Self {
2500        match e.downcast::<MqttError>() {
2501            Err(e) => Reason::Error(ByteString::from(e.to_string())),
2502            Ok(e) => Reason::MqttError(e),
2503        }
2504    }
2505}
2506
2507impl ToReasonCode for Reason {
2508    fn to_reason_code(&self) -> DisconnectReasonCode {
2509        match self {
2510            Reason::ConnectDisconnect(Some(Disconnect::V5(d))) => d.reason_code,
2511            Reason::ConnectDisconnect(_) => DisconnectReasonCode::NormalDisconnection,
2512            Reason::ConnectReadWriteTimeout => DisconnectReasonCode::KeepAliveTimeout,
2513            Reason::ConnectReadWriteError => DisconnectReasonCode::UnspecifiedError,
2514            Reason::ConnectRemoteClose => DisconnectReasonCode::ServerShuttingDown,
2515            Reason::ConnectKeepaliveTimeout => DisconnectReasonCode::KeepAliveTimeout,
2516            Reason::ConnectKicked(is_admin) => {
2517                if *is_admin {
2518                    DisconnectReasonCode::AdministrativeAction
2519                } else {
2520                    DisconnectReasonCode::NotAuthorized
2521                }
2522            }
2523            Reason::HandshakeRateExceeded => DisconnectReasonCode::ConnectionRateExceeded,
2524            Reason::SessionExpiration => DisconnectReasonCode::SessionTakenOver,
2525            Reason::SubscribeFailed(_) => DisconnectReasonCode::UnspecifiedError,
2526            Reason::UnsubscribeFailed(_) => DisconnectReasonCode::UnspecifiedError,
2527            Reason::SubscribeRefused => DisconnectReasonCode::NotAuthorized,
2528            Reason::PublishResult(pubres) => pubres.reason_code.to_reason_code(),
2529            Reason::DelayedPublishRefused => DisconnectReasonCode::NotAuthorized,
2530            Reason::MessageExpiration => DisconnectReasonCode::MessageRateTooHigh,
2531            Reason::MessageQueueFull => DisconnectReasonCode::QuotaExceeded,
2532            Reason::InflightWindowFull => DisconnectReasonCode::ReceiveMaximumExceeded,
2533            Reason::ProtocolError(_) => DisconnectReasonCode::ProtocolError,
2534            Reason::Error(_) => DisconnectReasonCode::UnspecifiedError,
2535            Reason::MqttError(mqtt_error) => mqtt_error.to_reason_code(),
2536            Reason::Reasons(reasons) => {
2537                if let Some(first_reason) = reasons.first() {
2538                    first_reason.to_reason_code()
2539                } else {
2540                    DisconnectReasonCode::UnspecifiedError
2541                }
2542            }
2543            Reason::Unknown => DisconnectReasonCode::UnspecifiedError,
2544        }
2545    }
2546}
2547
2548impl Display for Reason {
2549    #[inline]
2550    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2551        let r = match self {
2552            Reason::ConnectDisconnect(r) => {
2553                //Disconnect message received
2554                match r {
2555                    Some(r) => return write!(f, "Disconnect({r:?})"),
2556                    None => "Disconnect",
2557                }
2558            }
2559            Reason::ConnectReadWriteTimeout => {
2560                "ReadWriteTimeout" //read/write timeout
2561            }
2562            Reason::ConnectReadWriteError => {
2563                "ReadWriteError" //read/write error
2564            }
2565            Reason::ConnectRemoteClose => {
2566                "RemoteClose" //"connection close by remote client"
2567            }
2568            Reason::ConnectKeepaliveTimeout => {
2569                "KeepaliveTimeout" //keepalive timeout
2570            }
2571            Reason::ConnectKicked(admin_opt) => {
2572                if *admin_opt {
2573                    "ByAdminKick" //kicked by administrator
2574                } else {
2575                    "Kicked" //kicked
2576                }
2577            }
2578            Reason::HandshakeRateExceeded => {
2579                "HandshakeRateExceeded" //handshake rate exceeded
2580            }
2581            Reason::SessionExpiration => {
2582                "SessionExpiration" //session expiration
2583            }
2584            Reason::SubscribeFailed(r) => {
2585                //subscribe failed
2586                match r {
2587                    Some(r) => return write!(f, "SubscribeFailed({r})"),
2588                    None => "SubscribeFailed",
2589                }
2590            }
2591            Reason::UnsubscribeFailed(r) => {
2592                //unsubscribe failed
2593                match r {
2594                    Some(r) => return write!(f, "UnsubscribeFailed({r})"),
2595                    None => "UnsubscribeFailed",
2596                }
2597            }
2598            Reason::SubscribeRefused => {
2599                "SubscribeRefused" //subscribe refused
2600            }
2601            Reason::PublishResult(pubres) => {
2602                if let Some(rs) = pubres.reason_string.as_ref() {
2603                    let s: &str = rs.as_ref();
2604                    s
2605                } else {
2606                    &format!("{:?}", pubres.reason_code)
2607                }
2608            }
2609            Reason::DelayedPublishRefused => {
2610                "DelayedPublishRefused" //delayed publish refused
2611            }
2612            Reason::MessageExpiration => {
2613                "MessageExpiration" //message expiration
2614            }
2615            Reason::MessageQueueFull => {
2616                "MessageQueueFull" //message deliver queue is full
2617            }
2618            Reason::InflightWindowFull => "Inflight window is full",
2619            Reason::Error(r) => r,
2620            Reason::MqttError(e) => &e.to_string(),
2621            Reason::ProtocolError(r) => return write!(f, "ProtocolError({r})"),
2622            Reason::Reasons(reasons) => match reasons.len() {
2623                0 => "",
2624                1 => return write!(f, "{}", reasons.first().map(|r| r.to_string()).unwrap_or_default()),
2625                _ => return write!(f, "{}", reasons.iter().map(|r| r.to_string()).join(",")),
2626            },
2627            Reason::Unknown => {
2628                "Unknown" //unknown
2629            }
2630        };
2631        write!(f, "{r}")
2632    }
2633}
2634
2635#[derive(Debug)]
2636pub struct ServerTopicAliases {
2637    max_topic_aliases: usize,
2638    aliases: RwLock<HashMap<TopicName, NonZeroU16>>,
2639}
2640
2641impl ServerTopicAliases {
2642    #[inline]
2643    pub fn new(max_topic_aliases: usize) -> Self {
2644        ServerTopicAliases { max_topic_aliases, aliases: RwLock::new(HashMap::default()) }
2645    }
2646
2647    #[inline]
2648    pub async fn get(&self, topic: TopicName) -> (Option<TopicName>, Option<NonZeroU16>) {
2649        if self.max_topic_aliases == 0 {
2650            return (Some(topic), None);
2651        }
2652        let alias = {
2653            let aliases = self.aliases.read().await;
2654            if let Some(alias) = aliases.get(&topic) {
2655                return (None, Some(*alias));
2656            }
2657            let len = aliases.len();
2658            if len >= self.max_topic_aliases {
2659                return (Some(topic), None);
2660            }
2661
2662            match NonZeroU16::try_from((len + 1) as u16) {
2663                Ok(alias) => alias,
2664                Err(_) => {
2665                    unreachable!()
2666                }
2667            }
2668        };
2669        self.aliases.write().await.insert(topic.clone(), alias);
2670        (Some(topic), Some(alias))
2671    }
2672}
2673
2674#[derive(Debug)]
2675pub struct ClientTopicAliases {
2676    max_topic_aliases: usize,
2677    aliases: RwLock<HashMap<NonZeroU16, TopicName>>,
2678}
2679
2680impl ClientTopicAliases {
2681    #[inline]
2682    pub fn new(max_topic_aliases: usize) -> Self {
2683        ClientTopicAliases { max_topic_aliases, aliases: RwLock::new(HashMap::default()) }
2684    }
2685
2686    #[inline]
2687    pub async fn set_and_get(&self, alias: Option<NonZeroU16>, topic: TopicName) -> Result<TopicName> {
2688        match (alias, topic.len()) {
2689            (Some(alias), 0) => {
2690                self.aliases.read().await.get(&alias).ok_or_else(|| {
2691                    MqttError::PublishAckReason(
2692                        PublishAckReason::ImplementationSpecificError,
2693                        ByteString::from(
2694                            "implementation specific error, the ‘topic‘ associated with the ‘alias‘ was not found",
2695                        ),
2696                    ).into()
2697                }).cloned()
2698            }
2699            (Some(alias), _) => {
2700                let mut aliases = self.aliases.write().await;
2701                let len = aliases.len();
2702                if let Some(topic_mut) = aliases.get_mut(&alias) {
2703                    *topic_mut = topic.clone()
2704                }else{
2705                    if len >= self.max_topic_aliases {
2706                        return Err(MqttError::PublishAckReason(
2707                            PublishAckReason::ImplementationSpecificError,
2708                            ByteString::from(
2709                                format!("implementation specific error, the number of topic aliases exceeds the limit ({})", self.max_topic_aliases),
2710                            ),
2711                        ).into())
2712                    }
2713                    aliases.insert(alias, topic.clone());
2714                }
2715                Ok(topic)
2716            }
2717            (None, 0) => Err(MqttError::PublishAckReason(
2718                PublishAckReason::ImplementationSpecificError,
2719                ByteString::from("implementation specific error, ‘alias’ and ‘topic’ are both empty"),
2720            ).into()),
2721            (None, _) => Ok(topic),
2722        }
2723    }
2724}
2725
2726#[derive(Debug, Default, Deserialize, Serialize, Clone)]
2727pub struct DisconnectInfo {
2728    pub disconnected_at: TimestampMillis,
2729    pub reasons: Vec<Reason>,
2730    pub mqtt_disconnect: Option<Disconnect>, //MQTT Disconnect
2731}
2732
2733impl DisconnectInfo {
2734    #[inline]
2735    pub fn new(disconnected_at: TimestampMillis) -> Self {
2736        Self { disconnected_at, reasons: Vec::new(), mqtt_disconnect: None }
2737    }
2738
2739    #[inline]
2740    pub fn is_disconnected(&self) -> bool {
2741        self.disconnected_at != 0
2742    }
2743}
2744
2745#[inline]
2746pub fn topic_size(topic: &Topic) -> usize {
2747    topic
2748        .iter()
2749        .map(|l| {
2750            let data_len = match l {
2751                Level::Normal(s) => s.len(),
2752                Level::Metadata(s) => s.len(),
2753                _ => 0,
2754            };
2755            size_of::<Level>() + data_len
2756        })
2757        .sum::<usize>()
2758}
2759
2760#[derive(Debug, Serialize, Deserialize)]
2761pub struct DelayedPublish {
2762    pub expired_time: TimestampMillis,
2763    pub from: From,
2764    pub publish: Publish,
2765    pub message_storage_available: bool,
2766    pub message_expiry_interval: Option<Duration>,
2767}
2768
2769impl DelayedPublish {
2770    #[inline]
2771    pub fn new(
2772        from: From,
2773        publish: Publish,
2774        message_storage_available: bool,
2775        message_expiry_interval: Option<Duration>,
2776    ) -> Self {
2777        let expired_time = publish
2778            .delay_interval
2779            .map(|di| timestamp_millis() + (di as TimestampMillis * 1000))
2780            .unwrap_or_else(timestamp_millis);
2781        Self { expired_time, from, publish, message_storage_available, message_expiry_interval }
2782    }
2783
2784    #[inline]
2785    pub fn is_expired(&self) -> bool {
2786        timestamp_millis() > self.expired_time
2787    }
2788}
2789
2790impl std::cmp::Eq for DelayedPublish {}
2791
2792impl PartialEq for DelayedPublish {
2793    fn eq(&self, other: &Self) -> bool {
2794        other.expired_time.eq(&self.expired_time)
2795    }
2796}
2797
2798impl std::cmp::Ord for DelayedPublish {
2799    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
2800        other.expired_time.cmp(&self.expired_time)
2801    }
2802}
2803
2804impl PartialOrd for DelayedPublish {
2805    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
2806        Some(self.cmp(other))
2807    }
2808}
2809
2810#[derive(Serialize, Deserialize, Clone, Debug)]
2811pub enum OfflineSession {
2812    Exist(Option<OfflineInfo>),
2813    NotExist,
2814}
2815
2816#[derive(Serialize, Deserialize, Clone, Debug)]
2817pub struct HealthInfo {
2818    pub running: bool,
2819    #[serde(skip_serializing_if = "Option::is_none")]
2820    pub descr: Option<String>,
2821    pub nodes: Vec<NodeHealthStatus>,
2822}
2823
2824impl Default for HealthInfo {
2825    fn default() -> Self {
2826        Self { running: true, descr: None, nodes: vec![NodeHealthStatus::default()] }
2827    }
2828}
2829
2830impl HealthInfo {
2831    pub fn to_json(&self) -> Value {
2832        let mut obj = Map::new();
2833
2834        obj.insert("running".into(), json!(self.running));
2835
2836        if let Some(descr) = &self.descr {
2837            if !descr.is_empty() {
2838                obj.insert("descr".into(), json!(descr));
2839            }
2840        }
2841
2842        let nodes_json: Vec<Value> = self.nodes.iter().map(|node| node.to_json()).collect();
2843
2844        obj.insert("nodes".into(), Value::Array(nodes_json));
2845
2846        Value::Object(obj)
2847    }
2848}
2849
2850#[derive(Serialize, Deserialize, Clone, Debug)]
2851pub struct NodeHealthStatus {
2852    pub node_id: NodeId,
2853    pub running: bool,
2854    pub leader_id: Option<NodeId>,
2855    pub descr: Option<String>,
2856}
2857
2858impl NodeHealthStatus {
2859    pub fn is_running(&self) -> bool {
2860        if let Some(leader_id) = self.leader_id {
2861            leader_id > 0 && self.running
2862        } else {
2863            self.running
2864        }
2865    }
2866
2867    pub fn to_json(&self) -> Value {
2868        let mut obj = Map::new();
2869
2870        obj.insert("node_id".into(), json!(self.node_id));
2871        obj.insert("running".into(), json!(self.is_running()));
2872
2873        if let Some(leader_id) = &self.leader_id {
2874            obj.insert("leader_id".into(), json!(leader_id));
2875        }
2876
2877        if let Some(descr) = &self.descr {
2878            if !descr.is_empty() {
2879                obj.insert("descr".into(), json!(descr));
2880            }
2881        }
2882
2883        Value::Object(obj)
2884    }
2885}
2886
2887impl Default for NodeHealthStatus {
2888    fn default() -> Self {
2889        Self { node_id: 0, running: true, leader_id: None, descr: None }
2890    }
2891}
2892
2893#[inline]
2894pub fn serialize_user_properties(props: &UserProperties) -> HashMap<&str, Value> {
2895    let mut map: HashMap<&str, Vec<&str>> = HashMap::default();
2896
2897    for (k, v) in props {
2898        map.entry(k.as_ref()).or_default().push(v.as_ref());
2899    }
2900
2901    let mut result = HashMap::default();
2902    for (key, values) in map {
2903        if values.len() == 1 {
2904            result.insert(key, serde_json::Value::String(values[0].to_string()));
2905        } else {
2906            let array: Vec<serde_json::Value> =
2907                values.iter().map(|v| serde_json::Value::String(v.to_string())).collect();
2908            result.insert(key, serde_json::Value::Array(array));
2909        }
2910    }
2911
2912    result
2913}
2914
2915#[test]
2916fn test_reason() {
2917    assert!(Reason::ConnectKicked(false).is_kicked(false));
2918    assert!(!Reason::ConnectKicked(false).is_kicked(true));
2919    assert!(Reason::ConnectKicked(true).is_kicked(true));
2920    assert!(!Reason::ConnectKicked(true).is_kicked(false));
2921    assert!(Reason::ConnectKicked(true).is_kicked_by_admin());
2922    assert!(!Reason::ConnectKicked(false).is_kicked_by_admin());
2923    assert!(!Reason::ConnectDisconnect(None).is_kicked(false));
2924    assert!(!Reason::ConnectDisconnect(None).is_kicked_by_admin());
2925
2926    let reasons = Reason::Reasons(vec![Reason::ConnectKicked(false), Reason::MessageExpiration]);
2927    assert_eq!(reasons.to_string(), "Kicked,MessageExpiration");
2928}