1use crate::prelude::{String, Vec};
2pub use crate::protocol::v5::reason_codes::ReasonCode;
3use crate::time::Duration;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
6pub enum ProtocolVersion {
7 V311,
8 #[default]
9 V5,
10}
11
12impl ProtocolVersion {
13 #[must_use]
14 pub fn as_u8(self) -> u8 {
15 match self {
16 ProtocolVersion::V311 => 4,
17 ProtocolVersion::V5 => 5,
18 }
19 }
20}
21
22impl From<ProtocolVersion> for u8 {
23 fn from(version: ProtocolVersion) -> Self {
24 version.as_u8()
25 }
26}
27
28impl TryFrom<u8> for ProtocolVersion {
29 type Error = ();
30
31 fn try_from(value: u8) -> Result<Self, Self::Error> {
32 match value {
33 4 => Ok(ProtocolVersion::V311),
34 5 => Ok(ProtocolVersion::V5),
35 _ => Err(()),
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
41pub struct ConnectOptions {
42 pub client_id: String,
43 pub keep_alive: Duration,
44 pub clean_start: bool,
45 pub username: Option<String>,
46 pub password: Option<Vec<u8>>,
47 pub will: Option<WillMessage>,
48 pub properties: ConnectProperties,
49 pub protocol_version: ProtocolVersion,
50}
51
52impl Default for ConnectOptions {
53 fn default() -> Self {
54 Self {
55 client_id: String::new(),
56 keep_alive: Duration::from_secs(60),
57 clean_start: true,
58 username: None,
59 password: None,
60 will: None,
61 properties: ConnectProperties::default(),
62 protocol_version: ProtocolVersion::V5,
63 }
64 }
65}
66
67impl ConnectOptions {
68 #[must_use]
69 pub fn new(client_id: impl Into<String>) -> Self {
70 Self {
71 client_id: client_id.into(),
72 keep_alive: Duration::from_secs(60),
73 clean_start: true,
74 username: None,
75 password: None,
76 will: None,
77 properties: ConnectProperties::default(),
78 protocol_version: ProtocolVersion::V5,
79 }
80 }
81
82 #[must_use]
83 pub fn with_protocol_version(mut self, version: ProtocolVersion) -> Self {
84 self.protocol_version = version;
85 self
86 }
87
88 #[must_use]
89 pub fn with_keep_alive(mut self, duration: Duration) -> Self {
90 self.keep_alive = duration;
91 self
92 }
93
94 #[must_use]
95 pub fn with_clean_start(mut self, clean: bool) -> Self {
96 self.clean_start = clean;
97 self
98 }
99
100 #[must_use]
101 pub fn with_credentials(
102 mut self,
103 username: impl Into<String>,
104 password: impl AsRef<[u8]>,
105 ) -> Self {
106 self.username = Some(username.into());
107 self.password = Some(password.as_ref().to_vec());
108 self
109 }
110
111 #[must_use]
112 pub fn with_will(mut self, will: WillMessage) -> Self {
113 self.will = Some(will);
114 self
115 }
116
117 #[must_use]
118 pub fn with_session_expiry_interval(mut self, interval: u32) -> Self {
119 self.properties.session_expiry_interval = Some(interval);
120 self
121 }
122
123 #[must_use]
124 pub fn with_receive_maximum(mut self, receive_maximum: u16) -> Self {
125 self.properties.receive_maximum = Some(receive_maximum);
126 self
127 }
128
129 #[must_use]
130 pub fn with_authentication_method(mut self, method: impl Into<String>) -> Self {
131 self.properties.authentication_method = Some(method.into());
132 self
133 }
134
135 #[must_use]
136 pub fn with_authentication_data(mut self, data: impl AsRef<[u8]>) -> Self {
137 self.properties.authentication_data = Some(data.as_ref().to_vec());
138 self
139 }
140}
141
142#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
143pub enum QoS {
144 AtMostOnce = 0,
145 AtLeastOnce = 1,
146 ExactlyOnce = 2,
147}
148
149impl From<u8> for QoS {
150 fn from(value: u8) -> Self {
151 match value {
152 1 => QoS::AtLeastOnce,
153 2 => QoS::ExactlyOnce,
154 _ => QoS::AtMostOnce,
155 }
156 }
157}
158
159impl From<QoS> for u8 {
160 fn from(qos: QoS) -> Self {
161 qos as u8
162 }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum PublishResult {
167 QoS0,
168 QoS1Or2 { packet_id: u16 },
169}
170
171impl PublishResult {
172 #[must_use]
173 pub fn packet_id(&self) -> Option<u16> {
174 match self {
175 Self::QoS0 => None,
176 Self::QoS1Or2 { packet_id } => Some(*packet_id),
177 }
178 }
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182pub struct ConnectResult {
183 pub session_present: bool,
184}
185
186#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
187pub struct WillMessage {
188 pub topic: String,
189 pub payload: Vec<u8>,
190 pub qos: QoS,
191 pub retain: bool,
192 pub properties: WillProperties,
193}
194
195impl WillMessage {
196 #[must_use]
197 pub fn new(topic: impl Into<String>, payload: impl Into<Vec<u8>>) -> Self {
198 Self {
199 topic: topic.into(),
200 payload: payload.into(),
201 qos: QoS::AtMostOnce,
202 retain: false,
203 properties: WillProperties::default(),
204 }
205 }
206
207 #[must_use]
208 pub fn with_qos(mut self, qos: QoS) -> Self {
209 self.qos = qos;
210 self
211 }
212
213 #[must_use]
214 pub fn with_retain(mut self, retain: bool) -> Self {
215 self.retain = retain;
216 self
217 }
218}
219
220#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
221pub struct WillProperties {
222 pub will_delay_interval: Option<u32>,
223 pub payload_format_indicator: Option<bool>,
224 pub message_expiry_interval: Option<u32>,
225 pub content_type: Option<String>,
226 pub response_topic: Option<String>,
227 pub correlation_data: Option<Vec<u8>>,
228 pub user_properties: Vec<(String, String)>,
229}
230
231impl From<WillProperties> for crate::protocol::v5::properties::Properties {
232 fn from(will_props: WillProperties) -> Self {
233 let mut properties = crate::protocol::v5::properties::Properties::default();
234
235 if let Some(delay) = will_props.will_delay_interval {
236 if properties
237 .add(
238 crate::protocol::v5::properties::PropertyId::WillDelayInterval,
239 crate::protocol::v5::properties::PropertyValue::FourByteInteger(delay),
240 )
241 .is_err()
242 {
243 crate::prelude::warn_log!("Failed to add will delay interval property");
244 }
245 }
246
247 if let Some(format) = will_props.payload_format_indicator {
248 if properties
249 .add(
250 crate::protocol::v5::properties::PropertyId::PayloadFormatIndicator,
251 crate::protocol::v5::properties::PropertyValue::Byte(u8::from(format)),
252 )
253 .is_err()
254 {
255 crate::prelude::warn_log!("Failed to add payload format indicator property");
256 }
257 }
258
259 if let Some(expiry) = will_props.message_expiry_interval {
260 if properties
261 .add(
262 crate::protocol::v5::properties::PropertyId::MessageExpiryInterval,
263 crate::protocol::v5::properties::PropertyValue::FourByteInteger(expiry),
264 )
265 .is_err()
266 {
267 crate::prelude::warn_log!("Failed to add message expiry interval property");
268 }
269 }
270
271 if let Some(content_type) = will_props.content_type {
272 if properties
273 .add(
274 crate::protocol::v5::properties::PropertyId::ContentType,
275 crate::protocol::v5::properties::PropertyValue::Utf8String(content_type),
276 )
277 .is_err()
278 {
279 crate::prelude::warn_log!("Failed to add content type property");
280 }
281 }
282
283 if let Some(response_topic) = will_props.response_topic {
284 if properties
285 .add(
286 crate::protocol::v5::properties::PropertyId::ResponseTopic,
287 crate::protocol::v5::properties::PropertyValue::Utf8String(response_topic),
288 )
289 .is_err()
290 {
291 crate::prelude::warn_log!("Failed to add response topic property");
292 }
293 }
294
295 if let Some(correlation_data) = will_props.correlation_data {
296 if properties
297 .add(
298 crate::protocol::v5::properties::PropertyId::CorrelationData,
299 crate::protocol::v5::properties::PropertyValue::BinaryData(
300 correlation_data.into(),
301 ),
302 )
303 .is_err()
304 {
305 crate::prelude::warn_log!("Failed to add correlation data property");
306 }
307 }
308
309 for (key, value) in will_props.user_properties {
310 if properties
311 .add(
312 crate::protocol::v5::properties::PropertyId::UserProperty,
313 crate::protocol::v5::properties::PropertyValue::Utf8StringPair(key, value),
314 )
315 .is_err()
316 {
317 crate::prelude::warn_log!("Failed to add user property");
318 }
319 }
320
321 properties
322 }
323}
324
325#[derive(Debug, Clone, Default)]
326pub struct ConnectProperties {
327 pub session_expiry_interval: Option<u32>,
328 pub receive_maximum: Option<u16>,
329 pub maximum_packet_size: Option<u32>,
330 pub topic_alias_maximum: Option<u16>,
331 pub request_response_information: Option<bool>,
332 pub request_problem_information: Option<bool>,
333 pub user_properties: Vec<(String, String)>,
334 pub authentication_method: Option<String>,
335 pub authentication_data: Option<Vec<u8>>,
336}
337
338#[derive(Debug, Clone)]
339pub struct PublishOptions {
340 pub qos: QoS,
341 pub retain: bool,
342 pub properties: PublishProperties,
343}
344
345impl Default for PublishOptions {
346 fn default() -> Self {
347 Self {
348 qos: QoS::AtMostOnce,
349 retain: false,
350 properties: PublishProperties::default(),
351 }
352 }
353}
354
355#[derive(Debug, Clone, Default)]
356pub struct PublishProperties {
357 pub payload_format_indicator: Option<bool>,
358 pub message_expiry_interval: Option<u32>,
359 pub topic_alias: Option<u16>,
360 pub response_topic: Option<String>,
361 pub correlation_data: Option<Vec<u8>>,
362 pub user_properties: Vec<(String, String)>,
363 pub subscription_identifiers: Vec<u32>,
364 pub content_type: Option<String>,
365}
366
367impl From<PublishProperties> for crate::protocol::v5::properties::Properties {
368 fn from(props: PublishProperties) -> Self {
369 use crate::protocol::v5::properties::{Properties, PropertyId, PropertyValue};
370
371 let mut properties = Properties::default();
372
373 if let Some(val) = props.payload_format_indicator {
374 if properties
375 .add(
376 PropertyId::PayloadFormatIndicator,
377 PropertyValue::Byte(u8::from(val)),
378 )
379 .is_err()
380 {
381 crate::prelude::warn_log!("Failed to add payload format indicator property");
382 }
383 }
384 if let Some(val) = props.message_expiry_interval {
385 if properties
386 .add(
387 PropertyId::MessageExpiryInterval,
388 PropertyValue::FourByteInteger(val),
389 )
390 .is_err()
391 {
392 crate::prelude::warn_log!("Failed to add message expiry interval property");
393 }
394 }
395 if let Some(val) = props.topic_alias {
396 if properties
397 .add(PropertyId::TopicAlias, PropertyValue::TwoByteInteger(val))
398 .is_err()
399 {
400 crate::prelude::warn_log!("Failed to add topic alias property");
401 }
402 }
403 if let Some(val) = props.response_topic {
404 if properties
405 .add(PropertyId::ResponseTopic, PropertyValue::Utf8String(val))
406 .is_err()
407 {
408 crate::prelude::warn_log!("Failed to add response topic property");
409 }
410 }
411 if let Some(val) = props.correlation_data {
412 if properties
413 .add(
414 PropertyId::CorrelationData,
415 PropertyValue::BinaryData(val.into()),
416 )
417 .is_err()
418 {
419 crate::prelude::warn_log!("Failed to add correlation data property");
420 }
421 }
422 for id in props.subscription_identifiers {
423 if properties
424 .add(
425 PropertyId::SubscriptionIdentifier,
426 PropertyValue::VariableByteInteger(id),
427 )
428 .is_err()
429 {
430 crate::prelude::warn_log!("Failed to add subscription identifier property");
431 }
432 }
433 if let Some(val) = props.content_type {
434 if properties
435 .add(PropertyId::ContentType, PropertyValue::Utf8String(val))
436 .is_err()
437 {
438 crate::prelude::warn_log!("Failed to add content type property");
439 }
440 }
441 for (key, value) in props.user_properties {
442 if properties
443 .add(
444 PropertyId::UserProperty,
445 PropertyValue::Utf8StringPair(key, value),
446 )
447 .is_err()
448 {
449 crate::prelude::warn_log!("Failed to add user property");
450 }
451 }
452
453 properties
454 }
455}
456
457#[derive(Debug, Clone)]
458pub struct SubscribeOptions {
459 pub qos: QoS,
460 pub no_local: bool,
461 pub retain_as_published: bool,
462 pub retain_handling: RetainHandling,
463 pub subscription_identifier: Option<u32>,
464}
465
466impl Default for SubscribeOptions {
467 fn default() -> Self {
468 Self {
469 qos: QoS::AtMostOnce,
470 no_local: false,
471 retain_as_published: false,
472 retain_handling: RetainHandling::SendAtSubscribe,
473 subscription_identifier: None,
474 }
475 }
476}
477
478impl SubscribeOptions {
479 #[must_use]
480 pub fn with_subscription_identifier(mut self, id: u32) -> Self {
481 self.subscription_identifier = Some(id);
482 self
483 }
484}
485
486#[derive(Debug, Clone, Copy, PartialEq, Eq)]
487pub enum RetainHandling {
488 SendAtSubscribe = 0,
489 SendIfNew = 1,
490 DontSend = 2,
491}
492
493#[derive(Debug, Clone)]
494pub struct Message {
495 pub topic: String,
496 pub payload: Vec<u8>,
497 pub qos: QoS,
498 pub retain: bool,
499 pub properties: MessageProperties,
500}
501
502impl From<crate::packet::publish::PublishPacket> for Message {
503 fn from(packet: crate::packet::publish::PublishPacket) -> Self {
504 Self {
505 topic: packet.topic_name,
506 payload: packet.payload.to_vec(),
507 qos: packet.qos,
508 retain: packet.retain,
509 properties: MessageProperties::from(packet.properties),
510 }
511 }
512}
513
514#[derive(Debug, Clone, Default)]
515pub struct MessageProperties {
516 pub payload_format_indicator: Option<bool>,
517 pub message_expiry_interval: Option<u32>,
518 pub response_topic: Option<String>,
519 pub correlation_data: Option<Vec<u8>>,
520 pub user_properties: Vec<(String, String)>,
521 pub subscription_identifiers: Vec<u32>,
522 pub content_type: Option<String>,
523}
524
525impl From<crate::protocol::v5::properties::Properties> for MessageProperties {
526 fn from(props: crate::protocol::v5::properties::Properties) -> Self {
527 use crate::protocol::v5::properties::{PropertyId, PropertyValue};
528
529 let mut result = Self::default();
530
531 for (id, value) in props.iter() {
532 match (id, value) {
533 (PropertyId::PayloadFormatIndicator, PropertyValue::Byte(v)) => {
534 result.payload_format_indicator = Some(v != &0);
535 }
536 (PropertyId::MessageExpiryInterval, PropertyValue::FourByteInteger(v)) => {
537 result.message_expiry_interval = Some(*v);
538 }
539 (PropertyId::ResponseTopic, PropertyValue::Utf8String(v)) => {
540 result.response_topic = Some(v.clone());
541 }
542 (PropertyId::CorrelationData, PropertyValue::BinaryData(v)) => {
543 result.correlation_data = Some(v.to_vec());
544 }
545 (PropertyId::UserProperty, PropertyValue::Utf8StringPair(k, v)) => {
546 result.user_properties.push((k.clone(), v.clone()));
547 }
548 (PropertyId::SubscriptionIdentifier, PropertyValue::VariableByteInteger(v)) => {
549 result.subscription_identifiers.push(*v);
550 }
551 (PropertyId::ContentType, PropertyValue::Utf8String(v)) => {
552 result.content_type = Some(v.clone());
553 }
554 _ => {}
555 }
556 }
557
558 result
559 }
560}
561
562impl From<MessageProperties> for PublishProperties {
563 fn from(msg_props: MessageProperties) -> Self {
564 Self {
565 payload_format_indicator: msg_props.payload_format_indicator,
566 message_expiry_interval: msg_props.message_expiry_interval,
567 topic_alias: None,
568 response_topic: msg_props.response_topic,
569 correlation_data: msg_props.correlation_data,
570 user_properties: msg_props.user_properties,
571 subscription_identifiers: msg_props.subscription_identifiers,
572 content_type: msg_props.content_type,
573 }
574 }
575}
576
577#[cfg(test)]
578mod tests {
579 use super::*;
580
581 #[test]
582 fn test_qos_values() {
583 assert_eq!(QoS::AtMostOnce as u8, 0);
584 assert_eq!(QoS::AtLeastOnce as u8, 1);
585 assert_eq!(QoS::ExactlyOnce as u8, 2);
586 }
587
588 #[test]
589 fn test_qos_from_u8() {
590 assert_eq!(QoS::from(0), QoS::AtMostOnce);
591 assert_eq!(QoS::from(1), QoS::AtLeastOnce);
592 assert_eq!(QoS::from(2), QoS::ExactlyOnce);
593
594 assert_eq!(QoS::from(3), QoS::AtMostOnce);
595 assert_eq!(QoS::from(255), QoS::AtMostOnce);
596 }
597
598 #[test]
599 fn test_qos_into_u8() {
600 assert_eq!(u8::from(QoS::AtMostOnce), 0);
601 assert_eq!(u8::from(QoS::AtLeastOnce), 1);
602 assert_eq!(u8::from(QoS::ExactlyOnce), 2);
603 }
604}