1pub use crate::protocol::v5::reason_codes::ReasonCode;
2use crate::time::Duration;
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
5pub enum ProtocolVersion {
6 V311,
7 #[default]
8 V5,
9}
10
11impl ProtocolVersion {
12 #[must_use]
13 pub fn as_u8(self) -> u8 {
14 match self {
15 ProtocolVersion::V311 => 4,
16 ProtocolVersion::V5 => 5,
17 }
18 }
19}
20
21impl From<ProtocolVersion> for u8 {
22 fn from(version: ProtocolVersion) -> Self {
23 version.as_u8()
24 }
25}
26
27impl TryFrom<u8> for ProtocolVersion {
28 type Error = ();
29
30 fn try_from(value: u8) -> Result<Self, Self::Error> {
31 match value {
32 4 => Ok(ProtocolVersion::V311),
33 5 => Ok(ProtocolVersion::V5),
34 _ => Err(()),
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
40pub struct ConnectOptions {
41 pub client_id: String,
42 pub keep_alive: Duration,
43 pub clean_start: bool,
44 pub username: Option<String>,
45 pub password: Option<Vec<u8>>,
46 pub will: Option<WillMessage>,
47 pub properties: ConnectProperties,
48 pub protocol_version: ProtocolVersion,
49}
50
51impl Default for ConnectOptions {
52 fn default() -> Self {
53 Self {
54 client_id: String::new(),
55 keep_alive: Duration::from_secs(60),
56 clean_start: true,
57 username: None,
58 password: None,
59 will: None,
60 properties: ConnectProperties::default(),
61 protocol_version: ProtocolVersion::V5,
62 }
63 }
64}
65
66impl ConnectOptions {
67 #[must_use]
68 pub fn new(client_id: impl Into<String>) -> Self {
69 Self {
70 client_id: client_id.into(),
71 keep_alive: Duration::from_secs(60),
72 clean_start: true,
73 username: None,
74 password: None,
75 will: None,
76 properties: ConnectProperties::default(),
77 protocol_version: ProtocolVersion::V5,
78 }
79 }
80
81 #[must_use]
82 pub fn with_protocol_version(mut self, version: ProtocolVersion) -> Self {
83 self.protocol_version = version;
84 self
85 }
86
87 #[must_use]
88 pub fn with_keep_alive(mut self, duration: Duration) -> Self {
89 self.keep_alive = duration;
90 self
91 }
92
93 #[must_use]
94 pub fn with_clean_start(mut self, clean: bool) -> Self {
95 self.clean_start = clean;
96 self
97 }
98
99 #[must_use]
100 pub fn with_credentials(
101 mut self,
102 username: impl Into<String>,
103 password: impl AsRef<[u8]>,
104 ) -> Self {
105 self.username = Some(username.into());
106 self.password = Some(password.as_ref().to_vec());
107 self
108 }
109
110 #[must_use]
111 pub fn with_will(mut self, will: WillMessage) -> Self {
112 self.will = Some(will);
113 self
114 }
115
116 #[must_use]
117 pub fn with_session_expiry_interval(mut self, interval: u32) -> Self {
118 self.properties.session_expiry_interval = Some(interval);
119 self
120 }
121
122 #[must_use]
123 pub fn with_receive_maximum(mut self, receive_maximum: u16) -> Self {
124 self.properties.receive_maximum = Some(receive_maximum);
125 self
126 }
127
128 #[must_use]
129 pub fn with_authentication_method(mut self, method: impl Into<String>) -> Self {
130 self.properties.authentication_method = Some(method.into());
131 self
132 }
133
134 #[must_use]
135 pub fn with_authentication_data(mut self, data: impl AsRef<[u8]>) -> Self {
136 self.properties.authentication_data = Some(data.as_ref().to_vec());
137 self
138 }
139}
140
141#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
142pub enum QoS {
143 AtMostOnce = 0,
144 AtLeastOnce = 1,
145 ExactlyOnce = 2,
146}
147
148impl From<u8> for QoS {
149 fn from(value: u8) -> Self {
150 match value {
151 1 => QoS::AtLeastOnce,
152 2 => QoS::ExactlyOnce,
153 _ => QoS::AtMostOnce,
154 }
155 }
156}
157
158impl From<QoS> for u8 {
159 fn from(qos: QoS) -> Self {
160 qos as u8
161 }
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165pub enum PublishResult {
166 QoS0,
167 QoS1Or2 { packet_id: u16 },
168}
169
170impl PublishResult {
171 #[must_use]
172 pub fn packet_id(&self) -> Option<u16> {
173 match self {
174 Self::QoS0 => None,
175 Self::QoS1Or2 { packet_id } => Some(*packet_id),
176 }
177 }
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub struct ConnectResult {
182 pub session_present: bool,
183}
184
185#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
186pub struct WillMessage {
187 pub topic: String,
188 pub payload: Vec<u8>,
189 pub qos: QoS,
190 pub retain: bool,
191 pub properties: WillProperties,
192}
193
194impl WillMessage {
195 #[must_use]
196 pub fn new(topic: impl Into<String>, payload: impl Into<Vec<u8>>) -> Self {
197 Self {
198 topic: topic.into(),
199 payload: payload.into(),
200 qos: QoS::AtMostOnce,
201 retain: false,
202 properties: WillProperties::default(),
203 }
204 }
205
206 #[must_use]
207 pub fn with_qos(mut self, qos: QoS) -> Self {
208 self.qos = qos;
209 self
210 }
211
212 #[must_use]
213 pub fn with_retain(mut self, retain: bool) -> Self {
214 self.retain = retain;
215 self
216 }
217}
218
219#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
220pub struct WillProperties {
221 pub will_delay_interval: Option<u32>,
222 pub payload_format_indicator: Option<bool>,
223 pub message_expiry_interval: Option<u32>,
224 pub content_type: Option<String>,
225 pub response_topic: Option<String>,
226 pub correlation_data: Option<Vec<u8>>,
227 pub user_properties: Vec<(String, String)>,
228}
229
230impl From<WillProperties> for crate::protocol::v5::properties::Properties {
231 fn from(will_props: WillProperties) -> Self {
232 let mut properties = crate::protocol::v5::properties::Properties::default();
233
234 if let Some(delay) = will_props.will_delay_interval {
235 if properties
236 .add(
237 crate::protocol::v5::properties::PropertyId::WillDelayInterval,
238 crate::protocol::v5::properties::PropertyValue::FourByteInteger(delay),
239 )
240 .is_err()
241 {
242 tracing::warn!("Failed to add will delay interval property");
243 }
244 }
245
246 if let Some(format) = will_props.payload_format_indicator {
247 if properties
248 .add(
249 crate::protocol::v5::properties::PropertyId::PayloadFormatIndicator,
250 crate::protocol::v5::properties::PropertyValue::Byte(u8::from(format)),
251 )
252 .is_err()
253 {
254 tracing::warn!("Failed to add payload format indicator property");
255 }
256 }
257
258 if let Some(expiry) = will_props.message_expiry_interval {
259 if properties
260 .add(
261 crate::protocol::v5::properties::PropertyId::MessageExpiryInterval,
262 crate::protocol::v5::properties::PropertyValue::FourByteInteger(expiry),
263 )
264 .is_err()
265 {
266 tracing::warn!("Failed to add message expiry interval property");
267 }
268 }
269
270 if let Some(content_type) = will_props.content_type {
271 if properties
272 .add(
273 crate::protocol::v5::properties::PropertyId::ContentType,
274 crate::protocol::v5::properties::PropertyValue::Utf8String(content_type),
275 )
276 .is_err()
277 {
278 tracing::warn!("Failed to add content type property");
279 }
280 }
281
282 if let Some(response_topic) = will_props.response_topic {
283 if properties
284 .add(
285 crate::protocol::v5::properties::PropertyId::ResponseTopic,
286 crate::protocol::v5::properties::PropertyValue::Utf8String(response_topic),
287 )
288 .is_err()
289 {
290 tracing::warn!("Failed to add response topic property");
291 }
292 }
293
294 if let Some(correlation_data) = will_props.correlation_data {
295 if properties
296 .add(
297 crate::protocol::v5::properties::PropertyId::CorrelationData,
298 crate::protocol::v5::properties::PropertyValue::BinaryData(
299 correlation_data.into(),
300 ),
301 )
302 .is_err()
303 {
304 tracing::warn!("Failed to add correlation data property");
305 }
306 }
307
308 for (key, value) in will_props.user_properties {
309 if properties
310 .add(
311 crate::protocol::v5::properties::PropertyId::UserProperty,
312 crate::protocol::v5::properties::PropertyValue::Utf8StringPair(key, value),
313 )
314 .is_err()
315 {
316 tracing::warn!("Failed to add user property");
317 }
318 }
319
320 properties
321 }
322}
323
324#[derive(Debug, Clone, Default)]
325pub struct ConnectProperties {
326 pub session_expiry_interval: Option<u32>,
327 pub receive_maximum: Option<u16>,
328 pub maximum_packet_size: Option<u32>,
329 pub topic_alias_maximum: Option<u16>,
330 pub request_response_information: Option<bool>,
331 pub request_problem_information: Option<bool>,
332 pub user_properties: Vec<(String, String)>,
333 pub authentication_method: Option<String>,
334 pub authentication_data: Option<Vec<u8>>,
335}
336
337#[derive(Debug, Clone)]
338pub struct PublishOptions {
339 pub qos: QoS,
340 pub retain: bool,
341 pub properties: PublishProperties,
342}
343
344impl Default for PublishOptions {
345 fn default() -> Self {
346 Self {
347 qos: QoS::AtMostOnce,
348 retain: false,
349 properties: PublishProperties::default(),
350 }
351 }
352}
353
354#[derive(Debug, Clone, Default)]
355pub struct PublishProperties {
356 pub payload_format_indicator: Option<bool>,
357 pub message_expiry_interval: Option<u32>,
358 pub topic_alias: Option<u16>,
359 pub response_topic: Option<String>,
360 pub correlation_data: Option<Vec<u8>>,
361 pub user_properties: Vec<(String, String)>,
362 pub subscription_identifiers: Vec<u32>,
363 pub content_type: Option<String>,
364}
365
366impl From<PublishProperties> for crate::protocol::v5::properties::Properties {
367 fn from(props: PublishProperties) -> Self {
368 use crate::protocol::v5::properties::{Properties, PropertyId, PropertyValue};
369
370 let mut properties = Properties::default();
371
372 if let Some(val) = props.payload_format_indicator {
373 if properties
374 .add(
375 PropertyId::PayloadFormatIndicator,
376 PropertyValue::Byte(u8::from(val)),
377 )
378 .is_err()
379 {
380 tracing::warn!("Failed to add payload format indicator property");
381 }
382 }
383 if let Some(val) = props.message_expiry_interval {
384 if properties
385 .add(
386 PropertyId::MessageExpiryInterval,
387 PropertyValue::FourByteInteger(val),
388 )
389 .is_err()
390 {
391 tracing::warn!("Failed to add message expiry interval property");
392 }
393 }
394 if let Some(val) = props.topic_alias {
395 if properties
396 .add(PropertyId::TopicAlias, PropertyValue::TwoByteInteger(val))
397 .is_err()
398 {
399 tracing::warn!("Failed to add topic alias property");
400 }
401 }
402 if let Some(val) = props.response_topic {
403 if properties
404 .add(PropertyId::ResponseTopic, PropertyValue::Utf8String(val))
405 .is_err()
406 {
407 tracing::warn!("Failed to add response topic property");
408 }
409 }
410 if let Some(val) = props.correlation_data {
411 if properties
412 .add(
413 PropertyId::CorrelationData,
414 PropertyValue::BinaryData(val.into()),
415 )
416 .is_err()
417 {
418 tracing::warn!("Failed to add correlation data property");
419 }
420 }
421 for id in props.subscription_identifiers {
422 if properties
423 .add(
424 PropertyId::SubscriptionIdentifier,
425 PropertyValue::VariableByteInteger(id),
426 )
427 .is_err()
428 {
429 tracing::warn!("Failed to add subscription identifier property");
430 }
431 }
432 if let Some(val) = props.content_type {
433 if properties
434 .add(PropertyId::ContentType, PropertyValue::Utf8String(val))
435 .is_err()
436 {
437 tracing::warn!("Failed to add content type property");
438 }
439 }
440 for (key, value) in props.user_properties {
441 if properties
442 .add(
443 PropertyId::UserProperty,
444 PropertyValue::Utf8StringPair(key, value),
445 )
446 .is_err()
447 {
448 tracing::warn!("Failed to add user property");
449 }
450 }
451
452 properties
453 }
454}
455
456#[derive(Debug, Clone)]
457pub struct SubscribeOptions {
458 pub qos: QoS,
459 pub no_local: bool,
460 pub retain_as_published: bool,
461 pub retain_handling: RetainHandling,
462 pub subscription_identifier: Option<u32>,
463}
464
465impl Default for SubscribeOptions {
466 fn default() -> Self {
467 Self {
468 qos: QoS::AtMostOnce,
469 no_local: false,
470 retain_as_published: false,
471 retain_handling: RetainHandling::SendAtSubscribe,
472 subscription_identifier: None,
473 }
474 }
475}
476
477impl SubscribeOptions {
478 #[must_use]
479 pub fn with_subscription_identifier(mut self, id: u32) -> Self {
480 self.subscription_identifier = Some(id);
481 self
482 }
483}
484
485#[derive(Debug, Clone, Copy, PartialEq, Eq)]
486pub enum RetainHandling {
487 SendAtSubscribe = 0,
488 SendIfNew = 1,
489 DontSend = 2,
490}
491
492#[derive(Debug, Clone)]
493pub struct Message {
494 pub topic: String,
495 pub payload: Vec<u8>,
496 pub qos: QoS,
497 pub retain: bool,
498 pub properties: MessageProperties,
499}
500
501impl From<crate::packet::publish::PublishPacket> for Message {
502 fn from(packet: crate::packet::publish::PublishPacket) -> Self {
503 Self {
504 topic: packet.topic_name,
505 payload: packet.payload,
506 qos: packet.qos,
507 retain: packet.retain,
508 properties: MessageProperties::from(packet.properties),
509 }
510 }
511}
512
513#[derive(Debug, Clone, Default)]
514pub struct MessageProperties {
515 pub payload_format_indicator: Option<bool>,
516 pub message_expiry_interval: Option<u32>,
517 pub response_topic: Option<String>,
518 pub correlation_data: Option<Vec<u8>>,
519 pub user_properties: Vec<(String, String)>,
520 pub subscription_identifiers: Vec<u32>,
521 pub content_type: Option<String>,
522}
523
524impl From<crate::protocol::v5::properties::Properties> for MessageProperties {
525 fn from(props: crate::protocol::v5::properties::Properties) -> Self {
526 use crate::protocol::v5::properties::{PropertyId, PropertyValue};
527
528 let mut result = Self::default();
529
530 for (id, value) in props.iter() {
531 match (id, value) {
532 (PropertyId::PayloadFormatIndicator, PropertyValue::Byte(v)) => {
533 result.payload_format_indicator = Some(v != &0);
534 }
535 (PropertyId::MessageExpiryInterval, PropertyValue::FourByteInteger(v)) => {
536 result.message_expiry_interval = Some(*v);
537 }
538 (PropertyId::ResponseTopic, PropertyValue::Utf8String(v)) => {
539 result.response_topic = Some(v.clone());
540 }
541 (PropertyId::CorrelationData, PropertyValue::BinaryData(v)) => {
542 result.correlation_data = Some(v.to_vec());
543 }
544 (PropertyId::UserProperty, PropertyValue::Utf8StringPair(k, v)) => {
545 result.user_properties.push((k.clone(), v.clone()));
546 }
547 (PropertyId::SubscriptionIdentifier, PropertyValue::VariableByteInteger(v)) => {
548 result.subscription_identifiers.push(*v);
549 }
550 (PropertyId::ContentType, PropertyValue::Utf8String(v)) => {
551 result.content_type = Some(v.clone());
552 }
553 _ => {}
554 }
555 }
556
557 result
558 }
559}
560
561impl From<MessageProperties> for PublishProperties {
562 fn from(msg_props: MessageProperties) -> Self {
563 Self {
564 payload_format_indicator: msg_props.payload_format_indicator,
565 message_expiry_interval: msg_props.message_expiry_interval,
566 topic_alias: None,
567 response_topic: msg_props.response_topic,
568 correlation_data: msg_props.correlation_data,
569 user_properties: msg_props.user_properties,
570 subscription_identifiers: msg_props.subscription_identifiers,
571 content_type: msg_props.content_type,
572 }
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579
580 #[test]
581 fn test_qos_values() {
582 assert_eq!(QoS::AtMostOnce as u8, 0);
583 assert_eq!(QoS::AtLeastOnce as u8, 1);
584 assert_eq!(QoS::ExactlyOnce as u8, 2);
585 }
586
587 #[test]
588 fn test_qos_from_u8() {
589 assert_eq!(QoS::from(0), QoS::AtMostOnce);
590 assert_eq!(QoS::from(1), QoS::AtLeastOnce);
591 assert_eq!(QoS::from(2), QoS::ExactlyOnce);
592
593 assert_eq!(QoS::from(3), QoS::AtMostOnce);
594 assert_eq!(QoS::from(255), QoS::AtMostOnce);
595 }
596
597 #[test]
598 fn test_qos_into_u8() {
599 assert_eq!(u8::from(QoS::AtMostOnce), 0);
600 assert_eq!(u8::from(QoS::AtLeastOnce), 1);
601 assert_eq!(u8::from(QoS::ExactlyOnce), 2);
602 }
603}