mqtt5_protocol/
types.rs

1pub use crate::protocol::v5::reason_codes::ReasonCode;
2use crate::time::Duration;
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
5pub enum ProtocolVersion {
6    V311,
7    #[default]
8    V5,
9}
10
11impl ProtocolVersion {
12    #[must_use]
13    pub fn as_u8(self) -> u8 {
14        match self {
15            ProtocolVersion::V311 => 4,
16            ProtocolVersion::V5 => 5,
17        }
18    }
19}
20
21impl From<ProtocolVersion> for u8 {
22    fn from(version: ProtocolVersion) -> Self {
23        version.as_u8()
24    }
25}
26
27impl TryFrom<u8> for ProtocolVersion {
28    type Error = ();
29
30    fn try_from(value: u8) -> Result<Self, Self::Error> {
31        match value {
32            4 => Ok(ProtocolVersion::V311),
33            5 => Ok(ProtocolVersion::V5),
34            _ => Err(()),
35        }
36    }
37}
38
39#[derive(Debug, Clone)]
40pub struct ConnectOptions {
41    pub client_id: String,
42    pub keep_alive: Duration,
43    pub clean_start: bool,
44    pub username: Option<String>,
45    pub password: Option<Vec<u8>>,
46    pub will: Option<WillMessage>,
47    pub properties: ConnectProperties,
48    pub protocol_version: ProtocolVersion,
49}
50
51impl Default for ConnectOptions {
52    fn default() -> Self {
53        Self {
54            client_id: String::new(),
55            keep_alive: Duration::from_secs(60),
56            clean_start: true,
57            username: None,
58            password: None,
59            will: None,
60            properties: ConnectProperties::default(),
61            protocol_version: ProtocolVersion::V5,
62        }
63    }
64}
65
66impl ConnectOptions {
67    #[must_use]
68    pub fn new(client_id: impl Into<String>) -> Self {
69        Self {
70            client_id: client_id.into(),
71            keep_alive: Duration::from_secs(60),
72            clean_start: true,
73            username: None,
74            password: None,
75            will: None,
76            properties: ConnectProperties::default(),
77            protocol_version: ProtocolVersion::V5,
78        }
79    }
80
81    #[must_use]
82    pub fn with_protocol_version(mut self, version: ProtocolVersion) -> Self {
83        self.protocol_version = version;
84        self
85    }
86
87    #[must_use]
88    pub fn with_keep_alive(mut self, duration: Duration) -> Self {
89        self.keep_alive = duration;
90        self
91    }
92
93    #[must_use]
94    pub fn with_clean_start(mut self, clean: bool) -> Self {
95        self.clean_start = clean;
96        self
97    }
98
99    #[must_use]
100    pub fn with_credentials(
101        mut self,
102        username: impl Into<String>,
103        password: impl Into<Vec<u8>>,
104    ) -> Self {
105        self.username = Some(username.into());
106        self.password = Some(password.into());
107        self
108    }
109
110    #[must_use]
111    pub fn with_will(mut self, will: WillMessage) -> Self {
112        self.will = Some(will);
113        self
114    }
115
116    #[must_use]
117    pub fn with_session_expiry_interval(mut self, interval: u32) -> Self {
118        self.properties.session_expiry_interval = Some(interval);
119        self
120    }
121
122    #[must_use]
123    pub fn with_receive_maximum(mut self, receive_maximum: u16) -> Self {
124        self.properties.receive_maximum = Some(receive_maximum);
125        self
126    }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
130pub enum QoS {
131    AtMostOnce = 0,
132    AtLeastOnce = 1,
133    ExactlyOnce = 2,
134}
135
136impl From<u8> for QoS {
137    fn from(value: u8) -> Self {
138        match value {
139            1 => QoS::AtLeastOnce,
140            2 => QoS::ExactlyOnce,
141            _ => QoS::AtMostOnce,
142        }
143    }
144}
145
146impl From<QoS> for u8 {
147    fn from(qos: QoS) -> Self {
148        qos as u8
149    }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum PublishResult {
154    QoS0,
155    QoS1Or2 { packet_id: u16 },
156}
157
158impl PublishResult {
159    #[must_use]
160    pub fn packet_id(&self) -> Option<u16> {
161        match self {
162            Self::QoS0 => None,
163            Self::QoS1Or2 { packet_id } => Some(*packet_id),
164        }
165    }
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
169pub struct ConnectResult {
170    pub session_present: bool,
171}
172
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
174pub struct WillMessage {
175    pub topic: String,
176    pub payload: Vec<u8>,
177    pub qos: QoS,
178    pub retain: bool,
179    pub properties: WillProperties,
180}
181
182impl WillMessage {
183    #[must_use]
184    pub fn new(topic: impl Into<String>, payload: impl Into<Vec<u8>>) -> Self {
185        Self {
186            topic: topic.into(),
187            payload: payload.into(),
188            qos: QoS::AtMostOnce,
189            retain: false,
190            properties: WillProperties::default(),
191        }
192    }
193
194    #[must_use]
195    pub fn with_qos(mut self, qos: QoS) -> Self {
196        self.qos = qos;
197        self
198    }
199
200    #[must_use]
201    pub fn with_retain(mut self, retain: bool) -> Self {
202        self.retain = retain;
203        self
204    }
205}
206
207#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
208pub struct WillProperties {
209    pub will_delay_interval: Option<u32>,
210    pub payload_format_indicator: Option<bool>,
211    pub message_expiry_interval: Option<u32>,
212    pub content_type: Option<String>,
213    pub response_topic: Option<String>,
214    pub correlation_data: Option<Vec<u8>>,
215    pub user_properties: Vec<(String, String)>,
216}
217
218impl From<WillProperties> for crate::protocol::v5::properties::Properties {
219    fn from(will_props: WillProperties) -> Self {
220        let mut properties = crate::protocol::v5::properties::Properties::default();
221
222        if let Some(delay) = will_props.will_delay_interval {
223            if properties
224                .add(
225                    crate::protocol::v5::properties::PropertyId::WillDelayInterval,
226                    crate::protocol::v5::properties::PropertyValue::FourByteInteger(delay),
227                )
228                .is_err()
229            {
230                tracing::warn!("Failed to add will delay interval property");
231            }
232        }
233
234        if let Some(format) = will_props.payload_format_indicator {
235            if properties
236                .add(
237                    crate::protocol::v5::properties::PropertyId::PayloadFormatIndicator,
238                    crate::protocol::v5::properties::PropertyValue::Byte(u8::from(format)),
239                )
240                .is_err()
241            {
242                tracing::warn!("Failed to add payload format indicator property");
243            }
244        }
245
246        if let Some(expiry) = will_props.message_expiry_interval {
247            if properties
248                .add(
249                    crate::protocol::v5::properties::PropertyId::MessageExpiryInterval,
250                    crate::protocol::v5::properties::PropertyValue::FourByteInteger(expiry),
251                )
252                .is_err()
253            {
254                tracing::warn!("Failed to add message expiry interval property");
255            }
256        }
257
258        if let Some(content_type) = will_props.content_type {
259            if properties
260                .add(
261                    crate::protocol::v5::properties::PropertyId::ContentType,
262                    crate::protocol::v5::properties::PropertyValue::Utf8String(content_type),
263                )
264                .is_err()
265            {
266                tracing::warn!("Failed to add content type property");
267            }
268        }
269
270        if let Some(response_topic) = will_props.response_topic {
271            if properties
272                .add(
273                    crate::protocol::v5::properties::PropertyId::ResponseTopic,
274                    crate::protocol::v5::properties::PropertyValue::Utf8String(response_topic),
275                )
276                .is_err()
277            {
278                tracing::warn!("Failed to add response topic property");
279            }
280        }
281
282        if let Some(correlation_data) = will_props.correlation_data {
283            if properties
284                .add(
285                    crate::protocol::v5::properties::PropertyId::CorrelationData,
286                    crate::protocol::v5::properties::PropertyValue::BinaryData(
287                        correlation_data.into(),
288                    ),
289                )
290                .is_err()
291            {
292                tracing::warn!("Failed to add correlation data property");
293            }
294        }
295
296        for (key, value) in will_props.user_properties {
297            if properties
298                .add(
299                    crate::protocol::v5::properties::PropertyId::UserProperty,
300                    crate::protocol::v5::properties::PropertyValue::Utf8StringPair(key, value),
301                )
302                .is_err()
303            {
304                tracing::warn!("Failed to add user property");
305            }
306        }
307
308        properties
309    }
310}
311
312#[derive(Debug, Clone, Default)]
313pub struct ConnectProperties {
314    pub session_expiry_interval: Option<u32>,
315    pub receive_maximum: Option<u16>,
316    pub maximum_packet_size: Option<u32>,
317    pub topic_alias_maximum: Option<u16>,
318    pub request_response_information: Option<bool>,
319    pub request_problem_information: Option<bool>,
320    pub user_properties: Vec<(String, String)>,
321    pub authentication_method: Option<String>,
322    pub authentication_data: Option<Vec<u8>>,
323}
324
325#[derive(Debug, Clone)]
326pub struct PublishOptions {
327    pub qos: QoS,
328    pub retain: bool,
329    pub properties: PublishProperties,
330}
331
332impl Default for PublishOptions {
333    fn default() -> Self {
334        Self {
335            qos: QoS::AtMostOnce,
336            retain: false,
337            properties: PublishProperties::default(),
338        }
339    }
340}
341
342#[derive(Debug, Clone, Default)]
343pub struct PublishProperties {
344    pub payload_format_indicator: Option<bool>,
345    pub message_expiry_interval: Option<u32>,
346    pub topic_alias: Option<u16>,
347    pub response_topic: Option<String>,
348    pub correlation_data: Option<Vec<u8>>,
349    pub user_properties: Vec<(String, String)>,
350    pub subscription_identifiers: Vec<u32>,
351    pub content_type: Option<String>,
352}
353
354impl From<PublishProperties> for crate::protocol::v5::properties::Properties {
355    fn from(props: PublishProperties) -> Self {
356        use crate::protocol::v5::properties::{Properties, PropertyId, PropertyValue};
357
358        let mut properties = Properties::default();
359
360        if let Some(val) = props.payload_format_indicator {
361            if properties
362                .add(
363                    PropertyId::PayloadFormatIndicator,
364                    PropertyValue::Byte(u8::from(val)),
365                )
366                .is_err()
367            {
368                tracing::warn!("Failed to add payload format indicator property");
369            }
370        }
371        if let Some(val) = props.message_expiry_interval {
372            if properties
373                .add(
374                    PropertyId::MessageExpiryInterval,
375                    PropertyValue::FourByteInteger(val),
376                )
377                .is_err()
378            {
379                tracing::warn!("Failed to add message expiry interval property");
380            }
381        }
382        if let Some(val) = props.topic_alias {
383            if properties
384                .add(PropertyId::TopicAlias, PropertyValue::TwoByteInteger(val))
385                .is_err()
386            {
387                tracing::warn!("Failed to add topic alias property");
388            }
389        }
390        if let Some(val) = props.response_topic {
391            if properties
392                .add(PropertyId::ResponseTopic, PropertyValue::Utf8String(val))
393                .is_err()
394            {
395                tracing::warn!("Failed to add response topic property");
396            }
397        }
398        if let Some(val) = props.correlation_data {
399            if properties
400                .add(
401                    PropertyId::CorrelationData,
402                    PropertyValue::BinaryData(val.into()),
403                )
404                .is_err()
405            {
406                tracing::warn!("Failed to add correlation data property");
407            }
408        }
409        for id in props.subscription_identifiers {
410            if properties
411                .add(
412                    PropertyId::SubscriptionIdentifier,
413                    PropertyValue::VariableByteInteger(id),
414                )
415                .is_err()
416            {
417                tracing::warn!("Failed to add subscription identifier property");
418            }
419        }
420        if let Some(val) = props.content_type {
421            if properties
422                .add(PropertyId::ContentType, PropertyValue::Utf8String(val))
423                .is_err()
424            {
425                tracing::warn!("Failed to add content type property");
426            }
427        }
428        for (key, value) in props.user_properties {
429            if properties
430                .add(
431                    PropertyId::UserProperty,
432                    PropertyValue::Utf8StringPair(key, value),
433                )
434                .is_err()
435            {
436                tracing::warn!("Failed to add user property");
437            }
438        }
439
440        properties
441    }
442}
443
444#[derive(Debug, Clone)]
445pub struct SubscribeOptions {
446    pub qos: QoS,
447    pub no_local: bool,
448    pub retain_as_published: bool,
449    pub retain_handling: RetainHandling,
450    pub subscription_identifier: Option<u32>,
451}
452
453impl Default for SubscribeOptions {
454    fn default() -> Self {
455        Self {
456            qos: QoS::AtMostOnce,
457            no_local: false,
458            retain_as_published: false,
459            retain_handling: RetainHandling::SendAtSubscribe,
460            subscription_identifier: None,
461        }
462    }
463}
464
465impl SubscribeOptions {
466    #[must_use]
467    pub fn with_subscription_identifier(mut self, id: u32) -> Self {
468        self.subscription_identifier = Some(id);
469        self
470    }
471}
472
473#[derive(Debug, Clone, Copy, PartialEq, Eq)]
474pub enum RetainHandling {
475    SendAtSubscribe = 0,
476    SendIfNew = 1,
477    DontSend = 2,
478}
479
480#[derive(Debug, Clone)]
481pub struct Message {
482    pub topic: String,
483    pub payload: Vec<u8>,
484    pub qos: QoS,
485    pub retain: bool,
486    pub properties: MessageProperties,
487}
488
489impl From<crate::packet::publish::PublishPacket> for Message {
490    fn from(packet: crate::packet::publish::PublishPacket) -> Self {
491        Self {
492            topic: packet.topic_name,
493            payload: packet.payload,
494            qos: packet.qos,
495            retain: packet.retain,
496            properties: MessageProperties::from(packet.properties),
497        }
498    }
499}
500
501#[derive(Debug, Clone, Default)]
502pub struct MessageProperties {
503    pub payload_format_indicator: Option<bool>,
504    pub message_expiry_interval: Option<u32>,
505    pub response_topic: Option<String>,
506    pub correlation_data: Option<Vec<u8>>,
507    pub user_properties: Vec<(String, String)>,
508    pub subscription_identifiers: Vec<u32>,
509    pub content_type: Option<String>,
510}
511
512impl From<crate::protocol::v5::properties::Properties> for MessageProperties {
513    fn from(props: crate::protocol::v5::properties::Properties) -> Self {
514        use crate::protocol::v5::properties::{PropertyId, PropertyValue};
515
516        let mut result = Self::default();
517
518        for (id, value) in props.iter() {
519            match (id, value) {
520                (PropertyId::PayloadFormatIndicator, PropertyValue::Byte(v)) => {
521                    result.payload_format_indicator = Some(v != &0);
522                }
523                (PropertyId::MessageExpiryInterval, PropertyValue::FourByteInteger(v)) => {
524                    result.message_expiry_interval = Some(*v);
525                }
526                (PropertyId::ResponseTopic, PropertyValue::Utf8String(v)) => {
527                    result.response_topic = Some(v.clone());
528                }
529                (PropertyId::CorrelationData, PropertyValue::BinaryData(v)) => {
530                    result.correlation_data = Some(v.to_vec());
531                }
532                (PropertyId::UserProperty, PropertyValue::Utf8StringPair(k, v)) => {
533                    result.user_properties.push((k.clone(), v.clone()));
534                }
535                (PropertyId::SubscriptionIdentifier, PropertyValue::VariableByteInteger(v)) => {
536                    result.subscription_identifiers.push(*v);
537                }
538                (PropertyId::ContentType, PropertyValue::Utf8String(v)) => {
539                    result.content_type = Some(v.clone());
540                }
541                _ => {}
542            }
543        }
544
545        result
546    }
547}
548
549impl From<MessageProperties> for PublishProperties {
550    fn from(msg_props: MessageProperties) -> Self {
551        Self {
552            payload_format_indicator: msg_props.payload_format_indicator,
553            message_expiry_interval: msg_props.message_expiry_interval,
554            topic_alias: None,
555            response_topic: msg_props.response_topic,
556            correlation_data: msg_props.correlation_data,
557            user_properties: msg_props.user_properties,
558            subscription_identifiers: msg_props.subscription_identifiers,
559            content_type: msg_props.content_type,
560        }
561    }
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567
568    #[test]
569    fn test_qos_values() {
570        assert_eq!(QoS::AtMostOnce as u8, 0);
571        assert_eq!(QoS::AtLeastOnce as u8, 1);
572        assert_eq!(QoS::ExactlyOnce as u8, 2);
573    }
574
575    #[test]
576    fn test_qos_from_u8() {
577        assert_eq!(QoS::from(0), QoS::AtMostOnce);
578        assert_eq!(QoS::from(1), QoS::AtLeastOnce);
579        assert_eq!(QoS::from(2), QoS::ExactlyOnce);
580
581        assert_eq!(QoS::from(3), QoS::AtMostOnce);
582        assert_eq!(QoS::from(255), QoS::AtMostOnce);
583    }
584
585    #[test]
586    fn test_qos_into_u8() {
587        assert_eq!(u8::from(QoS::AtMostOnce), 0);
588        assert_eq!(u8::from(QoS::AtLeastOnce), 1);
589        assert_eq!(u8::from(QoS::ExactlyOnce), 2);
590    }
591}