1pub use crate::protocol::v5::reason_codes::ReasonCode;
2use crate::time::Duration;
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
5pub enum ProtocolVersion {
6 V311,
7 #[default]
8 V5,
9}
10
11impl ProtocolVersion {
12 #[must_use]
13 pub fn as_u8(self) -> u8 {
14 match self {
15 ProtocolVersion::V311 => 4,
16 ProtocolVersion::V5 => 5,
17 }
18 }
19}
20
21impl From<ProtocolVersion> for u8 {
22 fn from(version: ProtocolVersion) -> Self {
23 version.as_u8()
24 }
25}
26
27impl TryFrom<u8> for ProtocolVersion {
28 type Error = ();
29
30 fn try_from(value: u8) -> Result<Self, Self::Error> {
31 match value {
32 4 => Ok(ProtocolVersion::V311),
33 5 => Ok(ProtocolVersion::V5),
34 _ => Err(()),
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
40pub struct ConnectOptions {
41 pub client_id: String,
42 pub keep_alive: Duration,
43 pub clean_start: bool,
44 pub username: Option<String>,
45 pub password: Option<Vec<u8>>,
46 pub will: Option<WillMessage>,
47 pub properties: ConnectProperties,
48 pub protocol_version: ProtocolVersion,
49}
50
51impl Default for ConnectOptions {
52 fn default() -> Self {
53 Self {
54 client_id: String::new(),
55 keep_alive: Duration::from_secs(60),
56 clean_start: true,
57 username: None,
58 password: None,
59 will: None,
60 properties: ConnectProperties::default(),
61 protocol_version: ProtocolVersion::V5,
62 }
63 }
64}
65
66impl ConnectOptions {
67 #[must_use]
68 pub fn new(client_id: impl Into<String>) -> Self {
69 Self {
70 client_id: client_id.into(),
71 keep_alive: Duration::from_secs(60),
72 clean_start: true,
73 username: None,
74 password: None,
75 will: None,
76 properties: ConnectProperties::default(),
77 protocol_version: ProtocolVersion::V5,
78 }
79 }
80
81 #[must_use]
82 pub fn with_protocol_version(mut self, version: ProtocolVersion) -> Self {
83 self.protocol_version = version;
84 self
85 }
86
87 #[must_use]
88 pub fn with_keep_alive(mut self, duration: Duration) -> Self {
89 self.keep_alive = duration;
90 self
91 }
92
93 #[must_use]
94 pub fn with_clean_start(mut self, clean: bool) -> Self {
95 self.clean_start = clean;
96 self
97 }
98
99 #[must_use]
100 pub fn with_credentials(
101 mut self,
102 username: impl Into<String>,
103 password: impl Into<Vec<u8>>,
104 ) -> Self {
105 self.username = Some(username.into());
106 self.password = Some(password.into());
107 self
108 }
109
110 #[must_use]
111 pub fn with_will(mut self, will: WillMessage) -> Self {
112 self.will = Some(will);
113 self
114 }
115
116 #[must_use]
117 pub fn with_session_expiry_interval(mut self, interval: u32) -> Self {
118 self.properties.session_expiry_interval = Some(interval);
119 self
120 }
121
122 #[must_use]
123 pub fn with_receive_maximum(mut self, receive_maximum: u16) -> Self {
124 self.properties.receive_maximum = Some(receive_maximum);
125 self
126 }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
130pub enum QoS {
131 AtMostOnce = 0,
132 AtLeastOnce = 1,
133 ExactlyOnce = 2,
134}
135
136impl From<u8> for QoS {
137 fn from(value: u8) -> Self {
138 match value {
139 1 => QoS::AtLeastOnce,
140 2 => QoS::ExactlyOnce,
141 _ => QoS::AtMostOnce,
142 }
143 }
144}
145
146impl From<QoS> for u8 {
147 fn from(qos: QoS) -> Self {
148 qos as u8
149 }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum PublishResult {
154 QoS0,
155 QoS1Or2 { packet_id: u16 },
156}
157
158impl PublishResult {
159 #[must_use]
160 pub fn packet_id(&self) -> Option<u16> {
161 match self {
162 Self::QoS0 => None,
163 Self::QoS1Or2 { packet_id } => Some(*packet_id),
164 }
165 }
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
169pub struct ConnectResult {
170 pub session_present: bool,
171}
172
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
174pub struct WillMessage {
175 pub topic: String,
176 pub payload: Vec<u8>,
177 pub qos: QoS,
178 pub retain: bool,
179 pub properties: WillProperties,
180}
181
182impl WillMessage {
183 #[must_use]
184 pub fn new(topic: impl Into<String>, payload: impl Into<Vec<u8>>) -> Self {
185 Self {
186 topic: topic.into(),
187 payload: payload.into(),
188 qos: QoS::AtMostOnce,
189 retain: false,
190 properties: WillProperties::default(),
191 }
192 }
193
194 #[must_use]
195 pub fn with_qos(mut self, qos: QoS) -> Self {
196 self.qos = qos;
197 self
198 }
199
200 #[must_use]
201 pub fn with_retain(mut self, retain: bool) -> Self {
202 self.retain = retain;
203 self
204 }
205}
206
207#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
208pub struct WillProperties {
209 pub will_delay_interval: Option<u32>,
210 pub payload_format_indicator: Option<bool>,
211 pub message_expiry_interval: Option<u32>,
212 pub content_type: Option<String>,
213 pub response_topic: Option<String>,
214 pub correlation_data: Option<Vec<u8>>,
215 pub user_properties: Vec<(String, String)>,
216}
217
218impl From<WillProperties> for crate::protocol::v5::properties::Properties {
219 fn from(will_props: WillProperties) -> Self {
220 let mut properties = crate::protocol::v5::properties::Properties::default();
221
222 if let Some(delay) = will_props.will_delay_interval {
223 if properties
224 .add(
225 crate::protocol::v5::properties::PropertyId::WillDelayInterval,
226 crate::protocol::v5::properties::PropertyValue::FourByteInteger(delay),
227 )
228 .is_err()
229 {
230 tracing::warn!("Failed to add will delay interval property");
231 }
232 }
233
234 if let Some(format) = will_props.payload_format_indicator {
235 if properties
236 .add(
237 crate::protocol::v5::properties::PropertyId::PayloadFormatIndicator,
238 crate::protocol::v5::properties::PropertyValue::Byte(u8::from(format)),
239 )
240 .is_err()
241 {
242 tracing::warn!("Failed to add payload format indicator property");
243 }
244 }
245
246 if let Some(expiry) = will_props.message_expiry_interval {
247 if properties
248 .add(
249 crate::protocol::v5::properties::PropertyId::MessageExpiryInterval,
250 crate::protocol::v5::properties::PropertyValue::FourByteInteger(expiry),
251 )
252 .is_err()
253 {
254 tracing::warn!("Failed to add message expiry interval property");
255 }
256 }
257
258 if let Some(content_type) = will_props.content_type {
259 if properties
260 .add(
261 crate::protocol::v5::properties::PropertyId::ContentType,
262 crate::protocol::v5::properties::PropertyValue::Utf8String(content_type),
263 )
264 .is_err()
265 {
266 tracing::warn!("Failed to add content type property");
267 }
268 }
269
270 if let Some(response_topic) = will_props.response_topic {
271 if properties
272 .add(
273 crate::protocol::v5::properties::PropertyId::ResponseTopic,
274 crate::protocol::v5::properties::PropertyValue::Utf8String(response_topic),
275 )
276 .is_err()
277 {
278 tracing::warn!("Failed to add response topic property");
279 }
280 }
281
282 if let Some(correlation_data) = will_props.correlation_data {
283 if properties
284 .add(
285 crate::protocol::v5::properties::PropertyId::CorrelationData,
286 crate::protocol::v5::properties::PropertyValue::BinaryData(
287 correlation_data.into(),
288 ),
289 )
290 .is_err()
291 {
292 tracing::warn!("Failed to add correlation data property");
293 }
294 }
295
296 for (key, value) in will_props.user_properties {
297 if properties
298 .add(
299 crate::protocol::v5::properties::PropertyId::UserProperty,
300 crate::protocol::v5::properties::PropertyValue::Utf8StringPair(key, value),
301 )
302 .is_err()
303 {
304 tracing::warn!("Failed to add user property");
305 }
306 }
307
308 properties
309 }
310}
311
312#[derive(Debug, Clone, Default)]
313pub struct ConnectProperties {
314 pub session_expiry_interval: Option<u32>,
315 pub receive_maximum: Option<u16>,
316 pub maximum_packet_size: Option<u32>,
317 pub topic_alias_maximum: Option<u16>,
318 pub request_response_information: Option<bool>,
319 pub request_problem_information: Option<bool>,
320 pub user_properties: Vec<(String, String)>,
321 pub authentication_method: Option<String>,
322 pub authentication_data: Option<Vec<u8>>,
323}
324
325#[derive(Debug, Clone)]
326pub struct PublishOptions {
327 pub qos: QoS,
328 pub retain: bool,
329 pub properties: PublishProperties,
330}
331
332impl Default for PublishOptions {
333 fn default() -> Self {
334 Self {
335 qos: QoS::AtMostOnce,
336 retain: false,
337 properties: PublishProperties::default(),
338 }
339 }
340}
341
342#[derive(Debug, Clone, Default)]
343pub struct PublishProperties {
344 pub payload_format_indicator: Option<bool>,
345 pub message_expiry_interval: Option<u32>,
346 pub topic_alias: Option<u16>,
347 pub response_topic: Option<String>,
348 pub correlation_data: Option<Vec<u8>>,
349 pub user_properties: Vec<(String, String)>,
350 pub subscription_identifiers: Vec<u32>,
351 pub content_type: Option<String>,
352}
353
354impl From<PublishProperties> for crate::protocol::v5::properties::Properties {
355 fn from(props: PublishProperties) -> Self {
356 use crate::protocol::v5::properties::{Properties, PropertyId, PropertyValue};
357
358 let mut properties = Properties::default();
359
360 if let Some(val) = props.payload_format_indicator {
361 if properties
362 .add(
363 PropertyId::PayloadFormatIndicator,
364 PropertyValue::Byte(u8::from(val)),
365 )
366 .is_err()
367 {
368 tracing::warn!("Failed to add payload format indicator property");
369 }
370 }
371 if let Some(val) = props.message_expiry_interval {
372 if properties
373 .add(
374 PropertyId::MessageExpiryInterval,
375 PropertyValue::FourByteInteger(val),
376 )
377 .is_err()
378 {
379 tracing::warn!("Failed to add message expiry interval property");
380 }
381 }
382 if let Some(val) = props.topic_alias {
383 if properties
384 .add(PropertyId::TopicAlias, PropertyValue::TwoByteInteger(val))
385 .is_err()
386 {
387 tracing::warn!("Failed to add topic alias property");
388 }
389 }
390 if let Some(val) = props.response_topic {
391 if properties
392 .add(PropertyId::ResponseTopic, PropertyValue::Utf8String(val))
393 .is_err()
394 {
395 tracing::warn!("Failed to add response topic property");
396 }
397 }
398 if let Some(val) = props.correlation_data {
399 if properties
400 .add(
401 PropertyId::CorrelationData,
402 PropertyValue::BinaryData(val.into()),
403 )
404 .is_err()
405 {
406 tracing::warn!("Failed to add correlation data property");
407 }
408 }
409 for id in props.subscription_identifiers {
410 if properties
411 .add(
412 PropertyId::SubscriptionIdentifier,
413 PropertyValue::VariableByteInteger(id),
414 )
415 .is_err()
416 {
417 tracing::warn!("Failed to add subscription identifier property");
418 }
419 }
420 if let Some(val) = props.content_type {
421 if properties
422 .add(PropertyId::ContentType, PropertyValue::Utf8String(val))
423 .is_err()
424 {
425 tracing::warn!("Failed to add content type property");
426 }
427 }
428 for (key, value) in props.user_properties {
429 if properties
430 .add(
431 PropertyId::UserProperty,
432 PropertyValue::Utf8StringPair(key, value),
433 )
434 .is_err()
435 {
436 tracing::warn!("Failed to add user property");
437 }
438 }
439
440 properties
441 }
442}
443
444#[derive(Debug, Clone)]
445pub struct SubscribeOptions {
446 pub qos: QoS,
447 pub no_local: bool,
448 pub retain_as_published: bool,
449 pub retain_handling: RetainHandling,
450 pub subscription_identifier: Option<u32>,
451}
452
453impl Default for SubscribeOptions {
454 fn default() -> Self {
455 Self {
456 qos: QoS::AtMostOnce,
457 no_local: false,
458 retain_as_published: false,
459 retain_handling: RetainHandling::SendAtSubscribe,
460 subscription_identifier: None,
461 }
462 }
463}
464
465impl SubscribeOptions {
466 #[must_use]
467 pub fn with_subscription_identifier(mut self, id: u32) -> Self {
468 self.subscription_identifier = Some(id);
469 self
470 }
471}
472
473#[derive(Debug, Clone, Copy, PartialEq, Eq)]
474pub enum RetainHandling {
475 SendAtSubscribe = 0,
476 SendIfNew = 1,
477 DontSend = 2,
478}
479
480#[derive(Debug, Clone)]
481pub struct Message {
482 pub topic: String,
483 pub payload: Vec<u8>,
484 pub qos: QoS,
485 pub retain: bool,
486 pub properties: MessageProperties,
487}
488
489impl From<crate::packet::publish::PublishPacket> for Message {
490 fn from(packet: crate::packet::publish::PublishPacket) -> Self {
491 Self {
492 topic: packet.topic_name,
493 payload: packet.payload,
494 qos: packet.qos,
495 retain: packet.retain,
496 properties: MessageProperties::from(packet.properties),
497 }
498 }
499}
500
501#[derive(Debug, Clone, Default)]
502pub struct MessageProperties {
503 pub payload_format_indicator: Option<bool>,
504 pub message_expiry_interval: Option<u32>,
505 pub response_topic: Option<String>,
506 pub correlation_data: Option<Vec<u8>>,
507 pub user_properties: Vec<(String, String)>,
508 pub subscription_identifiers: Vec<u32>,
509 pub content_type: Option<String>,
510}
511
512impl From<crate::protocol::v5::properties::Properties> for MessageProperties {
513 fn from(props: crate::protocol::v5::properties::Properties) -> Self {
514 use crate::protocol::v5::properties::{PropertyId, PropertyValue};
515
516 let mut result = Self::default();
517
518 for (id, value) in props.iter() {
519 match (id, value) {
520 (PropertyId::PayloadFormatIndicator, PropertyValue::Byte(v)) => {
521 result.payload_format_indicator = Some(v != &0);
522 }
523 (PropertyId::MessageExpiryInterval, PropertyValue::FourByteInteger(v)) => {
524 result.message_expiry_interval = Some(*v);
525 }
526 (PropertyId::ResponseTopic, PropertyValue::Utf8String(v)) => {
527 result.response_topic = Some(v.clone());
528 }
529 (PropertyId::CorrelationData, PropertyValue::BinaryData(v)) => {
530 result.correlation_data = Some(v.to_vec());
531 }
532 (PropertyId::UserProperty, PropertyValue::Utf8StringPair(k, v)) => {
533 result.user_properties.push((k.clone(), v.clone()));
534 }
535 (PropertyId::SubscriptionIdentifier, PropertyValue::VariableByteInteger(v)) => {
536 result.subscription_identifiers.push(*v);
537 }
538 (PropertyId::ContentType, PropertyValue::Utf8String(v)) => {
539 result.content_type = Some(v.clone());
540 }
541 _ => {}
542 }
543 }
544
545 result
546 }
547}
548
549impl From<MessageProperties> for PublishProperties {
550 fn from(msg_props: MessageProperties) -> Self {
551 Self {
552 payload_format_indicator: msg_props.payload_format_indicator,
553 message_expiry_interval: msg_props.message_expiry_interval,
554 topic_alias: None,
555 response_topic: msg_props.response_topic,
556 correlation_data: msg_props.correlation_data,
557 user_properties: msg_props.user_properties,
558 subscription_identifiers: msg_props.subscription_identifiers,
559 content_type: msg_props.content_type,
560 }
561 }
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567
568 #[test]
569 fn test_qos_values() {
570 assert_eq!(QoS::AtMostOnce as u8, 0);
571 assert_eq!(QoS::AtLeastOnce as u8, 1);
572 assert_eq!(QoS::ExactlyOnce as u8, 2);
573 }
574
575 #[test]
576 fn test_qos_from_u8() {
577 assert_eq!(QoS::from(0), QoS::AtMostOnce);
578 assert_eq!(QoS::from(1), QoS::AtLeastOnce);
579 assert_eq!(QoS::from(2), QoS::ExactlyOnce);
580
581 assert_eq!(QoS::from(3), QoS::AtMostOnce);
582 assert_eq!(QoS::from(255), QoS::AtMostOnce);
583 }
584
585 #[test]
586 fn test_qos_into_u8() {
587 assert_eq!(u8::from(QoS::AtMostOnce), 0);
588 assert_eq!(u8::from(QoS::AtLeastOnce), 1);
589 assert_eq!(u8::from(QoS::ExactlyOnce), 2);
590 }
591}