mqtt5_protocol/
types.rs

1pub use crate::protocol::v5::reason_codes::ReasonCode;
2use crate::time::Duration;
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
5pub enum ProtocolVersion {
6    V311,
7    #[default]
8    V5,
9}
10
11impl ProtocolVersion {
12    #[must_use]
13    pub fn as_u8(self) -> u8 {
14        match self {
15            ProtocolVersion::V311 => 4,
16            ProtocolVersion::V5 => 5,
17        }
18    }
19}
20
21impl From<ProtocolVersion> for u8 {
22    fn from(version: ProtocolVersion) -> Self {
23        version.as_u8()
24    }
25}
26
27impl TryFrom<u8> for ProtocolVersion {
28    type Error = ();
29
30    fn try_from(value: u8) -> Result<Self, Self::Error> {
31        match value {
32            4 => Ok(ProtocolVersion::V311),
33            5 => Ok(ProtocolVersion::V5),
34            _ => Err(()),
35        }
36    }
37}
38
39#[derive(Debug, Clone)]
40pub struct ConnectOptions {
41    pub client_id: String,
42    pub keep_alive: Duration,
43    pub clean_start: bool,
44    pub username: Option<String>,
45    pub password: Option<Vec<u8>>,
46    pub will: Option<WillMessage>,
47    pub properties: ConnectProperties,
48    pub protocol_version: ProtocolVersion,
49}
50
51impl Default for ConnectOptions {
52    fn default() -> Self {
53        Self {
54            client_id: String::new(),
55            keep_alive: Duration::from_secs(60),
56            clean_start: true,
57            username: None,
58            password: None,
59            will: None,
60            properties: ConnectProperties::default(),
61            protocol_version: ProtocolVersion::V5,
62        }
63    }
64}
65
66impl ConnectOptions {
67    #[must_use]
68    pub fn new(client_id: impl Into<String>) -> Self {
69        Self {
70            client_id: client_id.into(),
71            keep_alive: Duration::from_secs(60),
72            clean_start: true,
73            username: None,
74            password: None,
75            will: None,
76            properties: ConnectProperties::default(),
77            protocol_version: ProtocolVersion::V5,
78        }
79    }
80
81    #[must_use]
82    pub fn with_protocol_version(mut self, version: ProtocolVersion) -> Self {
83        self.protocol_version = version;
84        self
85    }
86
87    #[must_use]
88    pub fn with_keep_alive(mut self, duration: Duration) -> Self {
89        self.keep_alive = duration;
90        self
91    }
92
93    #[must_use]
94    pub fn with_clean_start(mut self, clean: bool) -> Self {
95        self.clean_start = clean;
96        self
97    }
98
99    #[must_use]
100    pub fn with_credentials(
101        mut self,
102        username: impl Into<String>,
103        password: impl AsRef<[u8]>,
104    ) -> Self {
105        self.username = Some(username.into());
106        self.password = Some(password.as_ref().to_vec());
107        self
108    }
109
110    #[must_use]
111    pub fn with_will(mut self, will: WillMessage) -> Self {
112        self.will = Some(will);
113        self
114    }
115
116    #[must_use]
117    pub fn with_session_expiry_interval(mut self, interval: u32) -> Self {
118        self.properties.session_expiry_interval = Some(interval);
119        self
120    }
121
122    #[must_use]
123    pub fn with_receive_maximum(mut self, receive_maximum: u16) -> Self {
124        self.properties.receive_maximum = Some(receive_maximum);
125        self
126    }
127
128    #[must_use]
129    pub fn with_authentication_method(mut self, method: impl Into<String>) -> Self {
130        self.properties.authentication_method = Some(method.into());
131        self
132    }
133
134    #[must_use]
135    pub fn with_authentication_data(mut self, data: impl AsRef<[u8]>) -> Self {
136        self.properties.authentication_data = Some(data.as_ref().to_vec());
137        self
138    }
139}
140
141#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
142pub enum QoS {
143    AtMostOnce = 0,
144    AtLeastOnce = 1,
145    ExactlyOnce = 2,
146}
147
148impl From<u8> for QoS {
149    fn from(value: u8) -> Self {
150        match value {
151            1 => QoS::AtLeastOnce,
152            2 => QoS::ExactlyOnce,
153            _ => QoS::AtMostOnce,
154        }
155    }
156}
157
158impl From<QoS> for u8 {
159    fn from(qos: QoS) -> Self {
160        qos as u8
161    }
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165pub enum PublishResult {
166    QoS0,
167    QoS1Or2 { packet_id: u16 },
168}
169
170impl PublishResult {
171    #[must_use]
172    pub fn packet_id(&self) -> Option<u16> {
173        match self {
174            Self::QoS0 => None,
175            Self::QoS1Or2 { packet_id } => Some(*packet_id),
176        }
177    }
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub struct ConnectResult {
182    pub session_present: bool,
183}
184
185#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
186pub struct WillMessage {
187    pub topic: String,
188    pub payload: Vec<u8>,
189    pub qos: QoS,
190    pub retain: bool,
191    pub properties: WillProperties,
192}
193
194impl WillMessage {
195    #[must_use]
196    pub fn new(topic: impl Into<String>, payload: impl Into<Vec<u8>>) -> Self {
197        Self {
198            topic: topic.into(),
199            payload: payload.into(),
200            qos: QoS::AtMostOnce,
201            retain: false,
202            properties: WillProperties::default(),
203        }
204    }
205
206    #[must_use]
207    pub fn with_qos(mut self, qos: QoS) -> Self {
208        self.qos = qos;
209        self
210    }
211
212    #[must_use]
213    pub fn with_retain(mut self, retain: bool) -> Self {
214        self.retain = retain;
215        self
216    }
217}
218
219#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
220pub struct WillProperties {
221    pub will_delay_interval: Option<u32>,
222    pub payload_format_indicator: Option<bool>,
223    pub message_expiry_interval: Option<u32>,
224    pub content_type: Option<String>,
225    pub response_topic: Option<String>,
226    pub correlation_data: Option<Vec<u8>>,
227    pub user_properties: Vec<(String, String)>,
228}
229
230impl From<WillProperties> for crate::protocol::v5::properties::Properties {
231    fn from(will_props: WillProperties) -> Self {
232        let mut properties = crate::protocol::v5::properties::Properties::default();
233
234        if let Some(delay) = will_props.will_delay_interval {
235            if properties
236                .add(
237                    crate::protocol::v5::properties::PropertyId::WillDelayInterval,
238                    crate::protocol::v5::properties::PropertyValue::FourByteInteger(delay),
239                )
240                .is_err()
241            {
242                tracing::warn!("Failed to add will delay interval property");
243            }
244        }
245
246        if let Some(format) = will_props.payload_format_indicator {
247            if properties
248                .add(
249                    crate::protocol::v5::properties::PropertyId::PayloadFormatIndicator,
250                    crate::protocol::v5::properties::PropertyValue::Byte(u8::from(format)),
251                )
252                .is_err()
253            {
254                tracing::warn!("Failed to add payload format indicator property");
255            }
256        }
257
258        if let Some(expiry) = will_props.message_expiry_interval {
259            if properties
260                .add(
261                    crate::protocol::v5::properties::PropertyId::MessageExpiryInterval,
262                    crate::protocol::v5::properties::PropertyValue::FourByteInteger(expiry),
263                )
264                .is_err()
265            {
266                tracing::warn!("Failed to add message expiry interval property");
267            }
268        }
269
270        if let Some(content_type) = will_props.content_type {
271            if properties
272                .add(
273                    crate::protocol::v5::properties::PropertyId::ContentType,
274                    crate::protocol::v5::properties::PropertyValue::Utf8String(content_type),
275                )
276                .is_err()
277            {
278                tracing::warn!("Failed to add content type property");
279            }
280        }
281
282        if let Some(response_topic) = will_props.response_topic {
283            if properties
284                .add(
285                    crate::protocol::v5::properties::PropertyId::ResponseTopic,
286                    crate::protocol::v5::properties::PropertyValue::Utf8String(response_topic),
287                )
288                .is_err()
289            {
290                tracing::warn!("Failed to add response topic property");
291            }
292        }
293
294        if let Some(correlation_data) = will_props.correlation_data {
295            if properties
296                .add(
297                    crate::protocol::v5::properties::PropertyId::CorrelationData,
298                    crate::protocol::v5::properties::PropertyValue::BinaryData(
299                        correlation_data.into(),
300                    ),
301                )
302                .is_err()
303            {
304                tracing::warn!("Failed to add correlation data property");
305            }
306        }
307
308        for (key, value) in will_props.user_properties {
309            if properties
310                .add(
311                    crate::protocol::v5::properties::PropertyId::UserProperty,
312                    crate::protocol::v5::properties::PropertyValue::Utf8StringPair(key, value),
313                )
314                .is_err()
315            {
316                tracing::warn!("Failed to add user property");
317            }
318        }
319
320        properties
321    }
322}
323
324#[derive(Debug, Clone, Default)]
325pub struct ConnectProperties {
326    pub session_expiry_interval: Option<u32>,
327    pub receive_maximum: Option<u16>,
328    pub maximum_packet_size: Option<u32>,
329    pub topic_alias_maximum: Option<u16>,
330    pub request_response_information: Option<bool>,
331    pub request_problem_information: Option<bool>,
332    pub user_properties: Vec<(String, String)>,
333    pub authentication_method: Option<String>,
334    pub authentication_data: Option<Vec<u8>>,
335}
336
337#[derive(Debug, Clone)]
338pub struct PublishOptions {
339    pub qos: QoS,
340    pub retain: bool,
341    pub properties: PublishProperties,
342}
343
344impl Default for PublishOptions {
345    fn default() -> Self {
346        Self {
347            qos: QoS::AtMostOnce,
348            retain: false,
349            properties: PublishProperties::default(),
350        }
351    }
352}
353
354#[derive(Debug, Clone, Default)]
355pub struct PublishProperties {
356    pub payload_format_indicator: Option<bool>,
357    pub message_expiry_interval: Option<u32>,
358    pub topic_alias: Option<u16>,
359    pub response_topic: Option<String>,
360    pub correlation_data: Option<Vec<u8>>,
361    pub user_properties: Vec<(String, String)>,
362    pub subscription_identifiers: Vec<u32>,
363    pub content_type: Option<String>,
364}
365
366impl From<PublishProperties> for crate::protocol::v5::properties::Properties {
367    fn from(props: PublishProperties) -> Self {
368        use crate::protocol::v5::properties::{Properties, PropertyId, PropertyValue};
369
370        let mut properties = Properties::default();
371
372        if let Some(val) = props.payload_format_indicator {
373            if properties
374                .add(
375                    PropertyId::PayloadFormatIndicator,
376                    PropertyValue::Byte(u8::from(val)),
377                )
378                .is_err()
379            {
380                tracing::warn!("Failed to add payload format indicator property");
381            }
382        }
383        if let Some(val) = props.message_expiry_interval {
384            if properties
385                .add(
386                    PropertyId::MessageExpiryInterval,
387                    PropertyValue::FourByteInteger(val),
388                )
389                .is_err()
390            {
391                tracing::warn!("Failed to add message expiry interval property");
392            }
393        }
394        if let Some(val) = props.topic_alias {
395            if properties
396                .add(PropertyId::TopicAlias, PropertyValue::TwoByteInteger(val))
397                .is_err()
398            {
399                tracing::warn!("Failed to add topic alias property");
400            }
401        }
402        if let Some(val) = props.response_topic {
403            if properties
404                .add(PropertyId::ResponseTopic, PropertyValue::Utf8String(val))
405                .is_err()
406            {
407                tracing::warn!("Failed to add response topic property");
408            }
409        }
410        if let Some(val) = props.correlation_data {
411            if properties
412                .add(
413                    PropertyId::CorrelationData,
414                    PropertyValue::BinaryData(val.into()),
415                )
416                .is_err()
417            {
418                tracing::warn!("Failed to add correlation data property");
419            }
420        }
421        for id in props.subscription_identifiers {
422            if properties
423                .add(
424                    PropertyId::SubscriptionIdentifier,
425                    PropertyValue::VariableByteInteger(id),
426                )
427                .is_err()
428            {
429                tracing::warn!("Failed to add subscription identifier property");
430            }
431        }
432        if let Some(val) = props.content_type {
433            if properties
434                .add(PropertyId::ContentType, PropertyValue::Utf8String(val))
435                .is_err()
436            {
437                tracing::warn!("Failed to add content type property");
438            }
439        }
440        for (key, value) in props.user_properties {
441            if properties
442                .add(
443                    PropertyId::UserProperty,
444                    PropertyValue::Utf8StringPair(key, value),
445                )
446                .is_err()
447            {
448                tracing::warn!("Failed to add user property");
449            }
450        }
451
452        properties
453    }
454}
455
456#[derive(Debug, Clone)]
457pub struct SubscribeOptions {
458    pub qos: QoS,
459    pub no_local: bool,
460    pub retain_as_published: bool,
461    pub retain_handling: RetainHandling,
462    pub subscription_identifier: Option<u32>,
463}
464
465impl Default for SubscribeOptions {
466    fn default() -> Self {
467        Self {
468            qos: QoS::AtMostOnce,
469            no_local: false,
470            retain_as_published: false,
471            retain_handling: RetainHandling::SendAtSubscribe,
472            subscription_identifier: None,
473        }
474    }
475}
476
477impl SubscribeOptions {
478    #[must_use]
479    pub fn with_subscription_identifier(mut self, id: u32) -> Self {
480        self.subscription_identifier = Some(id);
481        self
482    }
483}
484
485#[derive(Debug, Clone, Copy, PartialEq, Eq)]
486pub enum RetainHandling {
487    SendAtSubscribe = 0,
488    SendIfNew = 1,
489    DontSend = 2,
490}
491
492#[derive(Debug, Clone)]
493pub struct Message {
494    pub topic: String,
495    pub payload: Vec<u8>,
496    pub qos: QoS,
497    pub retain: bool,
498    pub properties: MessageProperties,
499}
500
501impl From<crate::packet::publish::PublishPacket> for Message {
502    fn from(packet: crate::packet::publish::PublishPacket) -> Self {
503        Self {
504            topic: packet.topic_name,
505            payload: packet.payload,
506            qos: packet.qos,
507            retain: packet.retain,
508            properties: MessageProperties::from(packet.properties),
509        }
510    }
511}
512
513#[derive(Debug, Clone, Default)]
514pub struct MessageProperties {
515    pub payload_format_indicator: Option<bool>,
516    pub message_expiry_interval: Option<u32>,
517    pub response_topic: Option<String>,
518    pub correlation_data: Option<Vec<u8>>,
519    pub user_properties: Vec<(String, String)>,
520    pub subscription_identifiers: Vec<u32>,
521    pub content_type: Option<String>,
522}
523
524impl From<crate::protocol::v5::properties::Properties> for MessageProperties {
525    fn from(props: crate::protocol::v5::properties::Properties) -> Self {
526        use crate::protocol::v5::properties::{PropertyId, PropertyValue};
527
528        let mut result = Self::default();
529
530        for (id, value) in props.iter() {
531            match (id, value) {
532                (PropertyId::PayloadFormatIndicator, PropertyValue::Byte(v)) => {
533                    result.payload_format_indicator = Some(v != &0);
534                }
535                (PropertyId::MessageExpiryInterval, PropertyValue::FourByteInteger(v)) => {
536                    result.message_expiry_interval = Some(*v);
537                }
538                (PropertyId::ResponseTopic, PropertyValue::Utf8String(v)) => {
539                    result.response_topic = Some(v.clone());
540                }
541                (PropertyId::CorrelationData, PropertyValue::BinaryData(v)) => {
542                    result.correlation_data = Some(v.to_vec());
543                }
544                (PropertyId::UserProperty, PropertyValue::Utf8StringPair(k, v)) => {
545                    result.user_properties.push((k.clone(), v.clone()));
546                }
547                (PropertyId::SubscriptionIdentifier, PropertyValue::VariableByteInteger(v)) => {
548                    result.subscription_identifiers.push(*v);
549                }
550                (PropertyId::ContentType, PropertyValue::Utf8String(v)) => {
551                    result.content_type = Some(v.clone());
552                }
553                _ => {}
554            }
555        }
556
557        result
558    }
559}
560
561impl From<MessageProperties> for PublishProperties {
562    fn from(msg_props: MessageProperties) -> Self {
563        Self {
564            payload_format_indicator: msg_props.payload_format_indicator,
565            message_expiry_interval: msg_props.message_expiry_interval,
566            topic_alias: None,
567            response_topic: msg_props.response_topic,
568            correlation_data: msg_props.correlation_data,
569            user_properties: msg_props.user_properties,
570            subscription_identifiers: msg_props.subscription_identifiers,
571            content_type: msg_props.content_type,
572        }
573    }
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579
580    #[test]
581    fn test_qos_values() {
582        assert_eq!(QoS::AtMostOnce as u8, 0);
583        assert_eq!(QoS::AtLeastOnce as u8, 1);
584        assert_eq!(QoS::ExactlyOnce as u8, 2);
585    }
586
587    #[test]
588    fn test_qos_from_u8() {
589        assert_eq!(QoS::from(0), QoS::AtMostOnce);
590        assert_eq!(QoS::from(1), QoS::AtLeastOnce);
591        assert_eq!(QoS::from(2), QoS::ExactlyOnce);
592
593        assert_eq!(QoS::from(3), QoS::AtMostOnce);
594        assert_eq!(QoS::from(255), QoS::AtMostOnce);
595    }
596
597    #[test]
598    fn test_qos_into_u8() {
599        assert_eq!(u8::from(QoS::AtMostOnce), 0);
600        assert_eq!(u8::from(QoS::AtLeastOnce), 1);
601        assert_eq!(u8::from(QoS::ExactlyOnce), 2);
602    }
603}