1pub use crate::protocol::v5::reason_codes::ReasonCode;
2use crate::time::Duration;
3
4#[derive(Debug, Clone)]
5pub struct ConnectOptions {
6 pub client_id: String,
7 pub keep_alive: Duration,
8 pub clean_start: bool,
9 pub username: Option<String>,
10 pub password: Option<Vec<u8>>,
11 pub will: Option<WillMessage>,
12 pub properties: ConnectProperties,
13}
14
15impl Default for ConnectOptions {
16 fn default() -> Self {
17 Self {
18 client_id: String::new(),
19 keep_alive: Duration::from_secs(60),
20 clean_start: true,
21 username: None,
22 password: None,
23 will: None,
24 properties: ConnectProperties::default(),
25 }
26 }
27}
28
29impl ConnectOptions {
30 #[must_use]
31 pub fn new(client_id: impl Into<String>) -> Self {
32 Self {
33 client_id: client_id.into(),
34 keep_alive: Duration::from_secs(60),
35 clean_start: true,
36 username: None,
37 password: None,
38 will: None,
39 properties: ConnectProperties::default(),
40 }
41 }
42
43 #[must_use]
44 pub fn with_keep_alive(mut self, duration: Duration) -> Self {
45 self.keep_alive = duration;
46 self
47 }
48
49 #[must_use]
50 pub fn with_clean_start(mut self, clean: bool) -> Self {
51 self.clean_start = clean;
52 self
53 }
54
55 #[must_use]
56 pub fn with_credentials(
57 mut self,
58 username: impl Into<String>,
59 password: impl Into<Vec<u8>>,
60 ) -> Self {
61 self.username = Some(username.into());
62 self.password = Some(password.into());
63 self
64 }
65
66 #[must_use]
67 pub fn with_will(mut self, will: WillMessage) -> Self {
68 self.will = Some(will);
69 self
70 }
71
72 #[must_use]
73 pub fn with_session_expiry_interval(mut self, interval: u32) -> Self {
74 self.properties.session_expiry_interval = Some(interval);
75 self
76 }
77
78 #[must_use]
79 pub fn with_receive_maximum(mut self, receive_maximum: u16) -> Self {
80 self.properties.receive_maximum = Some(receive_maximum);
81 self
82 }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
86pub enum QoS {
87 AtMostOnce = 0,
88 AtLeastOnce = 1,
89 ExactlyOnce = 2,
90}
91
92impl From<u8> for QoS {
93 fn from(value: u8) -> Self {
94 match value {
95 1 => QoS::AtLeastOnce,
96 2 => QoS::ExactlyOnce,
97 _ => QoS::AtMostOnce,
98 }
99 }
100}
101
102impl From<QoS> for u8 {
103 fn from(qos: QoS) -> Self {
104 qos as u8
105 }
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum PublishResult {
110 QoS0,
111 QoS1Or2 { packet_id: u16 },
112}
113
114impl PublishResult {
115 #[must_use]
116 pub fn packet_id(&self) -> Option<u16> {
117 match self {
118 Self::QoS0 => None,
119 Self::QoS1Or2 { packet_id } => Some(*packet_id),
120 }
121 }
122}
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub struct ConnectResult {
126 pub session_present: bool,
127}
128
129#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
130pub struct WillMessage {
131 pub topic: String,
132 pub payload: Vec<u8>,
133 pub qos: QoS,
134 pub retain: bool,
135 pub properties: WillProperties,
136}
137
138impl WillMessage {
139 #[must_use]
140 pub fn new(topic: impl Into<String>, payload: impl Into<Vec<u8>>) -> Self {
141 Self {
142 topic: topic.into(),
143 payload: payload.into(),
144 qos: QoS::AtMostOnce,
145 retain: false,
146 properties: WillProperties::default(),
147 }
148 }
149
150 #[must_use]
151 pub fn with_qos(mut self, qos: QoS) -> Self {
152 self.qos = qos;
153 self
154 }
155
156 #[must_use]
157 pub fn with_retain(mut self, retain: bool) -> Self {
158 self.retain = retain;
159 self
160 }
161}
162
163#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
164pub struct WillProperties {
165 pub will_delay_interval: Option<u32>,
166 pub payload_format_indicator: Option<bool>,
167 pub message_expiry_interval: Option<u32>,
168 pub content_type: Option<String>,
169 pub response_topic: Option<String>,
170 pub correlation_data: Option<Vec<u8>>,
171 pub user_properties: Vec<(String, String)>,
172}
173
174impl From<WillProperties> for crate::protocol::v5::properties::Properties {
175 fn from(will_props: WillProperties) -> Self {
176 let mut properties = crate::protocol::v5::properties::Properties::default();
177
178 if let Some(delay) = will_props.will_delay_interval {
179 if properties
180 .add(
181 crate::protocol::v5::properties::PropertyId::WillDelayInterval,
182 crate::protocol::v5::properties::PropertyValue::FourByteInteger(delay),
183 )
184 .is_err()
185 {
186 tracing::warn!("Failed to add will delay interval property");
187 }
188 }
189
190 if let Some(format) = will_props.payload_format_indicator {
191 if properties
192 .add(
193 crate::protocol::v5::properties::PropertyId::PayloadFormatIndicator,
194 crate::protocol::v5::properties::PropertyValue::Byte(u8::from(format)),
195 )
196 .is_err()
197 {
198 tracing::warn!("Failed to add payload format indicator property");
199 }
200 }
201
202 if let Some(expiry) = will_props.message_expiry_interval {
203 if properties
204 .add(
205 crate::protocol::v5::properties::PropertyId::MessageExpiryInterval,
206 crate::protocol::v5::properties::PropertyValue::FourByteInteger(expiry),
207 )
208 .is_err()
209 {
210 tracing::warn!("Failed to add message expiry interval property");
211 }
212 }
213
214 if let Some(content_type) = will_props.content_type {
215 if properties
216 .add(
217 crate::protocol::v5::properties::PropertyId::ContentType,
218 crate::protocol::v5::properties::PropertyValue::Utf8String(content_type),
219 )
220 .is_err()
221 {
222 tracing::warn!("Failed to add content type property");
223 }
224 }
225
226 if let Some(response_topic) = will_props.response_topic {
227 if properties
228 .add(
229 crate::protocol::v5::properties::PropertyId::ResponseTopic,
230 crate::protocol::v5::properties::PropertyValue::Utf8String(response_topic),
231 )
232 .is_err()
233 {
234 tracing::warn!("Failed to add response topic property");
235 }
236 }
237
238 if let Some(correlation_data) = will_props.correlation_data {
239 if properties
240 .add(
241 crate::protocol::v5::properties::PropertyId::CorrelationData,
242 crate::protocol::v5::properties::PropertyValue::BinaryData(
243 correlation_data.into(),
244 ),
245 )
246 .is_err()
247 {
248 tracing::warn!("Failed to add correlation data property");
249 }
250 }
251
252 for (key, value) in will_props.user_properties {
253 if properties
254 .add(
255 crate::protocol::v5::properties::PropertyId::UserProperty,
256 crate::protocol::v5::properties::PropertyValue::Utf8StringPair(key, value),
257 )
258 .is_err()
259 {
260 tracing::warn!("Failed to add user property");
261 }
262 }
263
264 properties
265 }
266}
267
268#[derive(Debug, Clone, Default)]
269pub struct ConnectProperties {
270 pub session_expiry_interval: Option<u32>,
271 pub receive_maximum: Option<u16>,
272 pub maximum_packet_size: Option<u32>,
273 pub topic_alias_maximum: Option<u16>,
274 pub request_response_information: Option<bool>,
275 pub request_problem_information: Option<bool>,
276 pub user_properties: Vec<(String, String)>,
277 pub authentication_method: Option<String>,
278 pub authentication_data: Option<Vec<u8>>,
279}
280
281#[derive(Debug, Clone)]
282pub struct PublishOptions {
283 pub qos: QoS,
284 pub retain: bool,
285 pub properties: PublishProperties,
286}
287
288impl Default for PublishOptions {
289 fn default() -> Self {
290 Self {
291 qos: QoS::AtMostOnce,
292 retain: false,
293 properties: PublishProperties::default(),
294 }
295 }
296}
297
298#[derive(Debug, Clone, Default)]
299pub struct PublishProperties {
300 pub payload_format_indicator: Option<bool>,
301 pub message_expiry_interval: Option<u32>,
302 pub topic_alias: Option<u16>,
303 pub response_topic: Option<String>,
304 pub correlation_data: Option<Vec<u8>>,
305 pub user_properties: Vec<(String, String)>,
306 pub subscription_identifiers: Vec<u32>,
307 pub content_type: Option<String>,
308}
309
310impl From<PublishProperties> for crate::protocol::v5::properties::Properties {
311 fn from(props: PublishProperties) -> Self {
312 use crate::protocol::v5::properties::{Properties, PropertyId, PropertyValue};
313
314 let mut properties = Properties::default();
315
316 if let Some(val) = props.payload_format_indicator {
317 if properties
318 .add(
319 PropertyId::PayloadFormatIndicator,
320 PropertyValue::Byte(u8::from(val)),
321 )
322 .is_err()
323 {
324 tracing::warn!("Failed to add payload format indicator property");
325 }
326 }
327 if let Some(val) = props.message_expiry_interval {
328 if properties
329 .add(
330 PropertyId::MessageExpiryInterval,
331 PropertyValue::FourByteInteger(val),
332 )
333 .is_err()
334 {
335 tracing::warn!("Failed to add message expiry interval property");
336 }
337 }
338 if let Some(val) = props.topic_alias {
339 if properties
340 .add(PropertyId::TopicAlias, PropertyValue::TwoByteInteger(val))
341 .is_err()
342 {
343 tracing::warn!("Failed to add topic alias property");
344 }
345 }
346 if let Some(val) = props.response_topic {
347 if properties
348 .add(PropertyId::ResponseTopic, PropertyValue::Utf8String(val))
349 .is_err()
350 {
351 tracing::warn!("Failed to add response topic property");
352 }
353 }
354 if let Some(val) = props.correlation_data {
355 if properties
356 .add(
357 PropertyId::CorrelationData,
358 PropertyValue::BinaryData(val.into()),
359 )
360 .is_err()
361 {
362 tracing::warn!("Failed to add correlation data property");
363 }
364 }
365 for id in props.subscription_identifiers {
366 if properties
367 .add(
368 PropertyId::SubscriptionIdentifier,
369 PropertyValue::VariableByteInteger(id),
370 )
371 .is_err()
372 {
373 tracing::warn!("Failed to add subscription identifier property");
374 }
375 }
376 if let Some(val) = props.content_type {
377 if properties
378 .add(PropertyId::ContentType, PropertyValue::Utf8String(val))
379 .is_err()
380 {
381 tracing::warn!("Failed to add content type property");
382 }
383 }
384 for (key, value) in props.user_properties {
385 if properties
386 .add(
387 PropertyId::UserProperty,
388 PropertyValue::Utf8StringPair(key, value),
389 )
390 .is_err()
391 {
392 tracing::warn!("Failed to add user property");
393 }
394 }
395
396 properties
397 }
398}
399
400#[derive(Debug, Clone)]
401pub struct SubscribeOptions {
402 pub qos: QoS,
403 pub no_local: bool,
404 pub retain_as_published: bool,
405 pub retain_handling: RetainHandling,
406 pub subscription_identifier: Option<u32>,
407}
408
409impl Default for SubscribeOptions {
410 fn default() -> Self {
411 Self {
412 qos: QoS::AtMostOnce,
413 no_local: false,
414 retain_as_published: false,
415 retain_handling: RetainHandling::SendAtSubscribe,
416 subscription_identifier: None,
417 }
418 }
419}
420
421impl SubscribeOptions {
422 #[must_use]
423 pub fn with_subscription_identifier(mut self, id: u32) -> Self {
424 self.subscription_identifier = Some(id);
425 self
426 }
427}
428
429#[derive(Debug, Clone, Copy, PartialEq, Eq)]
430pub enum RetainHandling {
431 SendAtSubscribe = 0,
432 SendIfNew = 1,
433 DontSend = 2,
434}
435
436#[derive(Debug, Clone)]
437pub struct Message {
438 pub topic: String,
439 pub payload: Vec<u8>,
440 pub qos: QoS,
441 pub retain: bool,
442 pub properties: MessageProperties,
443}
444
445impl From<crate::packet::publish::PublishPacket> for Message {
446 fn from(packet: crate::packet::publish::PublishPacket) -> Self {
447 Self {
448 topic: packet.topic_name,
449 payload: packet.payload,
450 qos: packet.qos,
451 retain: packet.retain,
452 properties: MessageProperties::from(packet.properties),
453 }
454 }
455}
456
457#[derive(Debug, Clone, Default)]
458pub struct MessageProperties {
459 pub payload_format_indicator: Option<bool>,
460 pub message_expiry_interval: Option<u32>,
461 pub response_topic: Option<String>,
462 pub correlation_data: Option<Vec<u8>>,
463 pub user_properties: Vec<(String, String)>,
464 pub subscription_identifiers: Vec<u32>,
465 pub content_type: Option<String>,
466}
467
468impl From<crate::protocol::v5::properties::Properties> for MessageProperties {
469 fn from(props: crate::protocol::v5::properties::Properties) -> Self {
470 use crate::protocol::v5::properties::{PropertyId, PropertyValue};
471
472 let mut result = Self::default();
473
474 for (id, value) in props.iter() {
475 match (id, value) {
476 (PropertyId::PayloadFormatIndicator, PropertyValue::Byte(v)) => {
477 result.payload_format_indicator = Some(v != &0);
478 }
479 (PropertyId::MessageExpiryInterval, PropertyValue::FourByteInteger(v)) => {
480 result.message_expiry_interval = Some(*v);
481 }
482 (PropertyId::ResponseTopic, PropertyValue::Utf8String(v)) => {
483 result.response_topic = Some(v.clone());
484 }
485 (PropertyId::CorrelationData, PropertyValue::BinaryData(v)) => {
486 result.correlation_data = Some(v.to_vec());
487 }
488 (PropertyId::UserProperty, PropertyValue::Utf8StringPair(k, v)) => {
489 result.user_properties.push((k.clone(), v.clone()));
490 }
491 (PropertyId::SubscriptionIdentifier, PropertyValue::VariableByteInteger(v)) => {
492 result.subscription_identifiers.push(*v);
493 }
494 (PropertyId::ContentType, PropertyValue::Utf8String(v)) => {
495 result.content_type = Some(v.clone());
496 }
497 _ => {}
498 }
499 }
500
501 result
502 }
503}
504
505impl From<MessageProperties> for PublishProperties {
506 fn from(msg_props: MessageProperties) -> Self {
507 Self {
508 payload_format_indicator: msg_props.payload_format_indicator,
509 message_expiry_interval: msg_props.message_expiry_interval,
510 topic_alias: None,
511 response_topic: msg_props.response_topic,
512 correlation_data: msg_props.correlation_data,
513 user_properties: msg_props.user_properties,
514 subscription_identifiers: msg_props.subscription_identifiers,
515 content_type: msg_props.content_type,
516 }
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523
524 #[test]
525 fn test_qos_values() {
526 assert_eq!(QoS::AtMostOnce as u8, 0);
527 assert_eq!(QoS::AtLeastOnce as u8, 1);
528 assert_eq!(QoS::ExactlyOnce as u8, 2);
529 }
530
531 #[test]
532 fn test_qos_from_u8() {
533 assert_eq!(QoS::from(0), QoS::AtMostOnce);
534 assert_eq!(QoS::from(1), QoS::AtLeastOnce);
535 assert_eq!(QoS::from(2), QoS::ExactlyOnce);
536
537 assert_eq!(QoS::from(3), QoS::AtMostOnce);
538 assert_eq!(QoS::from(255), QoS::AtMostOnce);
539 }
540
541 #[test]
542 fn test_qos_into_u8() {
543 assert_eq!(u8::from(QoS::AtMostOnce), 0);
544 assert_eq!(u8::from(QoS::AtLeastOnce), 1);
545 assert_eq!(u8::from(QoS::ExactlyOnce), 2);
546 }
547}