1pub mod builder;
34
35pub use builder::MqttBuilder;
36
37use crate::layer::field::{FieldError, FieldValue};
38use crate::layer::{Layer, LayerIndex, LayerKind};
39
40pub const MQTT_MIN_HEADER_LEN: usize = 2;
42
43pub const MQTT_PORT: u16 = 1883;
45
46pub const CONNECT: u8 = 1;
51pub const CONNACK: u8 = 2;
52pub const PUBLISH: u8 = 3;
53pub const PUBACK: u8 = 4;
54pub const PUBREC: u8 = 5;
55pub const PUBREL: u8 = 6;
56pub const PUBCOMP: u8 = 7;
57pub const SUBSCRIBE: u8 = 8;
58pub const SUBACK: u8 = 9;
59pub const UNSUBSCRIBE: u8 = 10;
60pub const UNSUBACK: u8 = 11;
61pub const PINGREQ: u8 = 12;
62pub const PINGRESP: u8 = 13;
63pub const DISCONNECT: u8 = 14;
64pub const AUTH: u8 = 15;
65
66pub static MQTT_FIELD_NAMES: &[&str] = &[
68 "msg_type",
69 "dup",
70 "qos",
71 "retain",
72 "remaining_length",
73 "topic",
74 "topic_len",
75 "msgid",
76 "value",
77 "proto_name",
78 "proto_level",
79 "connect_flags",
80 "klive",
81 "client_id",
82 "usernameflag",
83 "passwordflag",
84 "willretainflag",
85 "willQOSflag",
86 "willflag",
87 "cleansess",
88 "sess_present_flag",
89 "retcode",
90 "retcodes",
91];
92
93pub fn decode_variable_length(buf: &[u8], offset: usize) -> Result<(u32, usize), FieldError> {
104 let mut value: u32 = 0;
105 let mut multiplier: u32 = 1;
106 let mut idx = offset;
107
108 loop {
109 if idx >= buf.len() {
110 return Err(FieldError::BufferTooShort {
111 offset: idx,
112 need: 1,
113 have: 0,
114 });
115 }
116 let encoded_byte = buf[idx];
117 value += u32::from(encoded_byte & 0x7F) * multiplier;
118
119 if multiplier > 128 * 128 * 128 {
120 return Err(FieldError::InvalidValue(
121 "variable-length integer exceeds 4 bytes".into(),
122 ));
123 }
124
125 idx += 1;
126 if encoded_byte & 0x80 == 0 {
127 break;
128 }
129 multiplier *= 128;
130 }
131
132 Ok((value, idx - offset))
133}
134
135#[must_use]
139pub fn encode_variable_length(value: u32) -> Vec<u8> {
140 if value == 0 {
141 return vec![0x00];
142 }
143 let mut result = Vec::with_capacity(4);
144 let mut x = value;
145 while x > 0 {
146 let mut encoded_byte = (x % 128) as u8;
147 x /= 128;
148 if x > 0 {
149 encoded_byte |= 0x80;
150 }
151 result.push(encoded_byte);
152 }
153 result
154}
155
156#[must_use]
158pub fn message_type_name(msg_type: u8) -> &'static str {
159 match msg_type {
160 CONNECT => "CONNECT",
161 CONNACK => "CONNACK",
162 PUBLISH => "PUBLISH",
163 PUBACK => "PUBACK",
164 PUBREC => "PUBREC",
165 PUBREL => "PUBREL",
166 PUBCOMP => "PUBCOMP",
167 SUBSCRIBE => "SUBSCRIBE",
168 SUBACK => "SUBACK",
169 UNSUBSCRIBE => "UNSUBSCRIBE",
170 UNSUBACK => "UNSUBACK",
171 PINGREQ => "PINGREQ",
172 PINGRESP => "PINGRESP",
173 DISCONNECT => "DISCONNECT",
174 AUTH => "AUTH",
175 _ => "UNKNOWN",
176 }
177}
178
179#[must_use]
184pub fn is_mqtt_payload(buf: &[u8]) -> bool {
185 if buf.len() < 2 {
186 return false;
187 }
188 let msg_type = (buf[0] >> 4) & 0x0F;
189 if !(1..=15).contains(&msg_type) {
190 return false;
191 }
192 decode_variable_length(buf, 1).is_ok()
194}
195
196#[derive(Debug, Clone)]
202pub struct MqttLayer {
203 pub index: LayerIndex,
204}
205
206impl MqttLayer {
207 #[must_use]
209 pub fn new(index: LayerIndex) -> Self {
210 Self { index }
211 }
212
213 #[must_use]
215 pub fn at_start(len: usize) -> Self {
216 Self {
217 index: LayerIndex::new(LayerKind::Mqtt, 0, len),
218 }
219 }
220
221 fn slice<'a>(&self, buf: &'a [u8]) -> &'a [u8] {
223 self.index.slice(buf)
224 }
225
226 pub fn msg_type(&self, buf: &[u8]) -> Result<u8, FieldError> {
232 let s = self.slice(buf);
233 if s.is_empty() {
234 return Err(FieldError::BufferTooShort {
235 offset: self.index.start,
236 need: 1,
237 have: 0,
238 });
239 }
240 Ok((s[0] >> 4) & 0x0F)
241 }
242
243 pub fn dup(&self, buf: &[u8]) -> Result<bool, FieldError> {
245 let s = self.slice(buf);
246 if s.is_empty() {
247 return Err(FieldError::BufferTooShort {
248 offset: self.index.start,
249 need: 1,
250 have: 0,
251 });
252 }
253 Ok((s[0] >> 3) & 0x01 == 1)
254 }
255
256 pub fn qos(&self, buf: &[u8]) -> Result<u8, FieldError> {
258 let s = self.slice(buf);
259 if s.is_empty() {
260 return Err(FieldError::BufferTooShort {
261 offset: self.index.start,
262 need: 1,
263 have: 0,
264 });
265 }
266 Ok((s[0] >> 1) & 0x03)
267 }
268
269 pub fn retain(&self, buf: &[u8]) -> Result<bool, FieldError> {
271 let s = self.slice(buf);
272 if s.is_empty() {
273 return Err(FieldError::BufferTooShort {
274 offset: self.index.start,
275 need: 1,
276 have: 0,
277 });
278 }
279 Ok(s[0] & 0x01 == 1)
280 }
281
282 pub fn remaining_length(&self, buf: &[u8]) -> Result<u32, FieldError> {
284 let s = self.slice(buf);
285 if s.len() < 2 {
286 return Err(FieldError::BufferTooShort {
287 offset: self.index.start + 1,
288 need: 1,
289 have: s.len().saturating_sub(1),
290 });
291 }
292 let (val, _consumed) = decode_variable_length(s, 1)?;
293 Ok(val)
294 }
295
296 #[must_use]
298 pub fn fixed_header_len(&self, buf: &[u8]) -> usize {
299 let s = self.slice(buf);
300 if s.len() < 2 {
301 return MQTT_MIN_HEADER_LEN;
302 }
303 match decode_variable_length(s, 1) {
304 Ok((_val, consumed)) => 1 + consumed,
305 Err(_) => MQTT_MIN_HEADER_LEN,
306 }
307 }
308
309 fn var_header_offset(&self, buf: &[u8]) -> usize {
311 self.index.start + self.fixed_header_len(buf)
312 }
313
314 pub fn topic_len(&self, buf: &[u8]) -> Result<u16, FieldError> {
320 let off = self.var_header_offset(buf);
321 if off + 2 > buf.len() {
322 return Err(FieldError::BufferTooShort {
323 offset: off,
324 need: 2,
325 have: buf.len().saturating_sub(off),
326 });
327 }
328 Ok(u16::from_be_bytes([buf[off], buf[off + 1]]))
329 }
330
331 pub fn topic(&self, buf: &[u8]) -> Result<String, FieldError> {
333 let off = self.var_header_offset(buf);
334 if off + 2 > buf.len() {
335 return Err(FieldError::BufferTooShort {
336 offset: off,
337 need: 2,
338 have: buf.len().saturating_sub(off),
339 });
340 }
341 let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
342 let topic_start = off + 2;
343 if topic_start + tlen > buf.len() {
344 return Err(FieldError::BufferTooShort {
345 offset: topic_start,
346 need: tlen,
347 have: buf.len().saturating_sub(topic_start),
348 });
349 }
350 String::from_utf8(buf[topic_start..topic_start + tlen].to_vec())
351 .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 topic: {e}")))
352 }
353
354 pub fn msgid(&self, buf: &[u8]) -> Result<u16, FieldError> {
360 let mt = self.msg_type(buf)?;
361 let off = self.var_header_offset(buf);
362
363 match mt {
364 PUBLISH => {
365 if off + 2 > buf.len() {
367 return Err(FieldError::BufferTooShort {
368 offset: off,
369 need: 2,
370 have: buf.len().saturating_sub(off),
371 });
372 }
373 let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
374 let msgid_off = off + 2 + tlen;
375 if msgid_off + 2 > buf.len() {
376 return Err(FieldError::BufferTooShort {
377 offset: msgid_off,
378 need: 2,
379 have: buf.len().saturating_sub(msgid_off),
380 });
381 }
382 Ok(u16::from_be_bytes([buf[msgid_off], buf[msgid_off + 1]]))
383 },
384 PUBACK | PUBREC | PUBREL | PUBCOMP | SUBSCRIBE | SUBACK | UNSUBSCRIBE | UNSUBACK => {
385 if off + 2 > buf.len() {
386 return Err(FieldError::BufferTooShort {
387 offset: off,
388 need: 2,
389 have: buf.len().saturating_sub(off),
390 });
391 }
392 Ok(u16::from_be_bytes([buf[off], buf[off + 1]]))
393 },
394 _ => Err(FieldError::InvalidValue(format!(
395 "message type {mt} does not have a msgid field"
396 ))),
397 }
398 }
399
400 pub fn value(&self, buf: &[u8]) -> Result<Vec<u8>, FieldError> {
402 let off = self.var_header_offset(buf);
403 let rem_len = self.remaining_length(buf)? as usize;
404 let fixed_hdr = self.fixed_header_len(buf);
405 let payload_end = self.index.start + fixed_hdr + rem_len;
406 let payload_end = payload_end.min(buf.len());
407
408 if off + 2 > buf.len() {
409 return Err(FieldError::BufferTooShort {
410 offset: off,
411 need: 2,
412 have: buf.len().saturating_sub(off),
413 });
414 }
415 let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
416 let mut value_start = off + 2 + tlen;
417
418 let qos = self.qos(buf)?;
420 if qos > 0 {
421 value_start += 2;
422 }
423
424 if value_start > payload_end {
425 return Ok(Vec::new());
426 }
427 Ok(buf[value_start..payload_end].to_vec())
428 }
429
430 pub fn proto_name(&self, buf: &[u8]) -> Result<String, FieldError> {
436 let off = self.var_header_offset(buf);
437 if off + 2 > buf.len() {
438 return Err(FieldError::BufferTooShort {
439 offset: off,
440 need: 2,
441 have: buf.len().saturating_sub(off),
442 });
443 }
444 let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
445 let name_start = off + 2;
446 if name_start + name_len > buf.len() {
447 return Err(FieldError::BufferTooShort {
448 offset: name_start,
449 need: name_len,
450 have: buf.len().saturating_sub(name_start),
451 });
452 }
453 String::from_utf8(buf[name_start..name_start + name_len].to_vec())
454 .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 proto_name: {e}")))
455 }
456
457 pub fn proto_level(&self, buf: &[u8]) -> Result<u8, FieldError> {
459 let off = self.var_header_offset(buf);
460 if off + 2 > buf.len() {
461 return Err(FieldError::BufferTooShort {
462 offset: off,
463 need: 2,
464 have: buf.len().saturating_sub(off),
465 });
466 }
467 let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
468 let level_off = off + 2 + name_len;
469 if level_off >= buf.len() {
470 return Err(FieldError::BufferTooShort {
471 offset: level_off,
472 need: 1,
473 have: 0,
474 });
475 }
476 Ok(buf[level_off])
477 }
478
479 pub fn connect_flags(&self, buf: &[u8]) -> Result<u8, FieldError> {
481 let off = self.var_header_offset(buf);
482 if off + 2 > buf.len() {
483 return Err(FieldError::BufferTooShort {
484 offset: off,
485 need: 2,
486 have: buf.len().saturating_sub(off),
487 });
488 }
489 let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
490 let flags_off = off + 2 + name_len + 1;
491 if flags_off >= buf.len() {
492 return Err(FieldError::BufferTooShort {
493 offset: flags_off,
494 need: 1,
495 have: 0,
496 });
497 }
498 Ok(buf[flags_off])
499 }
500
501 pub fn usernameflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
503 let flags = self.connect_flags(buf)?;
504 Ok((flags >> 7) & 0x01 == 1)
505 }
506
507 pub fn passwordflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
509 let flags = self.connect_flags(buf)?;
510 Ok((flags >> 6) & 0x01 == 1)
511 }
512
513 pub fn willretainflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
515 let flags = self.connect_flags(buf)?;
516 Ok((flags >> 5) & 0x01 == 1)
517 }
518
519 pub fn will_qosflag(&self, buf: &[u8]) -> Result<u8, FieldError> {
521 let flags = self.connect_flags(buf)?;
522 Ok((flags >> 3) & 0x03)
523 }
524
525 pub fn willflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
527 let flags = self.connect_flags(buf)?;
528 Ok((flags >> 2) & 0x01 == 1)
529 }
530
531 pub fn cleansess(&self, buf: &[u8]) -> Result<bool, FieldError> {
533 let flags = self.connect_flags(buf)?;
534 Ok((flags >> 1) & 0x01 == 1)
535 }
536
537 pub fn klive(&self, buf: &[u8]) -> Result<u16, FieldError> {
539 let off = self.var_header_offset(buf);
540 if off + 2 > buf.len() {
541 return Err(FieldError::BufferTooShort {
542 offset: off,
543 need: 2,
544 have: buf.len().saturating_sub(off),
545 });
546 }
547 let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
548 let klive_off = off + 2 + name_len + 2; if klive_off + 2 > buf.len() {
550 return Err(FieldError::BufferTooShort {
551 offset: klive_off,
552 need: 2,
553 have: buf.len().saturating_sub(klive_off),
554 });
555 }
556 Ok(u16::from_be_bytes([buf[klive_off], buf[klive_off + 1]]))
557 }
558
559 fn connect_payload_offset(&self, buf: &[u8]) -> Result<usize, FieldError> {
561 let off = self.var_header_offset(buf);
562 if off + 2 > buf.len() {
563 return Err(FieldError::BufferTooShort {
564 offset: off,
565 need: 2,
566 have: buf.len().saturating_sub(off),
567 });
568 }
569 let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
570 Ok(off + 2 + name_len + 1 + 1 + 2)
572 }
573
574 pub fn client_id(&self, buf: &[u8]) -> Result<String, FieldError> {
576 let off = self.connect_payload_offset(buf)?;
577 if off + 2 > buf.len() {
578 return Err(FieldError::BufferTooShort {
579 offset: off,
580 need: 2,
581 have: buf.len().saturating_sub(off),
582 });
583 }
584 let cid_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
585 let cid_start = off + 2;
586 if cid_start + cid_len > buf.len() {
587 return Err(FieldError::BufferTooShort {
588 offset: cid_start,
589 need: cid_len,
590 have: buf.len().saturating_sub(cid_start),
591 });
592 }
593 String::from_utf8(buf[cid_start..cid_start + cid_len].to_vec())
594 .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 client_id: {e}")))
595 }
596
597 pub fn sess_present_flag(&self, buf: &[u8]) -> Result<u8, FieldError> {
603 let off = self.var_header_offset(buf);
604 if off >= buf.len() {
605 return Err(FieldError::BufferTooShort {
606 offset: off,
607 need: 1,
608 have: 0,
609 });
610 }
611 Ok(buf[off] & 0x01)
612 }
613
614 pub fn retcode(&self, buf: &[u8]) -> Result<u8, FieldError> {
616 let off = self.var_header_offset(buf);
617 if off + 2 > buf.len() {
618 return Err(FieldError::BufferTooShort {
619 offset: off + 1,
620 need: 1,
621 have: buf.len().saturating_sub(off + 1),
622 });
623 }
624 Ok(buf[off + 1])
625 }
626
627 pub fn retcodes(&self, buf: &[u8]) -> Result<Vec<u8>, FieldError> {
633 let off = self.var_header_offset(buf);
634 let rem_len = self.remaining_length(buf)? as usize;
635 let fixed_hdr = self.fixed_header_len(buf);
636 let payload_end = self.index.start + fixed_hdr + rem_len;
637 let payload_end = payload_end.min(buf.len());
638
639 let retcodes_start = off + 2; if retcodes_start > payload_end {
641 return Ok(Vec::new());
642 }
643 Ok(buf[retcodes_start..payload_end].to_vec())
644 }
645
646 #[must_use]
652 pub fn summary(&self, buf: &[u8]) -> String {
653 let mt = match self.msg_type(buf) {
654 Ok(t) => t,
655 Err(_) => return "MQTT".to_string(),
656 };
657 let type_name = message_type_name(mt);
658
659 match mt {
660 PUBLISH => {
661 let topic = self.topic(buf).unwrap_or_else(|_| "?".to_string());
662 let qos = self.qos(buf).unwrap_or(0);
663 format!("MQTT {type_name} topic={topic} QOS={qos}")
664 },
665 CONNECT => {
666 let cid = self.client_id(buf).unwrap_or_else(|_| "?".to_string());
667 format!("MQTT {type_name} clientId={cid}")
668 },
669 CONNACK => {
670 let rc = self.retcode(buf).unwrap_or(0);
671 format!("MQTT {type_name} retcode={rc}")
672 },
673 SUBSCRIBE | UNSUBSCRIBE => {
674 let mid = self.msgid(buf).unwrap_or(0);
675 format!("MQTT {type_name} msgid={mid}")
676 },
677 SUBACK => {
678 let mid = self.msgid(buf).unwrap_or(0);
679 format!("MQTT {type_name} msgid={mid}")
680 },
681 PUBACK | PUBREC | PUBREL | PUBCOMP | UNSUBACK => {
682 let mid = self.msgid(buf).unwrap_or(0);
683 format!("MQTT {type_name} msgid={mid}")
684 },
685 _ => format!("MQTT {type_name}"),
686 }
687 }
688
689 fn compute_header_len(&self, buf: &[u8]) -> usize {
691 let fixed_hdr = self.fixed_header_len(buf);
692 let rem_len = self.remaining_length(buf).unwrap_or(0) as usize;
693 fixed_hdr + rem_len
694 }
695
696 #[must_use]
702 pub fn field_names() -> &'static [&'static str] {
703 MQTT_FIELD_NAMES
704 }
705
706 pub fn get_field(&self, buf: &[u8], name: &str) -> Option<Result<FieldValue, FieldError>> {
708 match name {
709 "msg_type" => Some(self.msg_type(buf).map(FieldValue::U8)),
710 "dup" => Some(self.dup(buf).map(FieldValue::Bool)),
711 "qos" => Some(self.qos(buf).map(FieldValue::U8)),
712 "retain" => Some(self.retain(buf).map(FieldValue::Bool)),
713 "remaining_length" => Some(self.remaining_length(buf).map(FieldValue::U32)),
714 "topic_len" => {
715 let mt = self.msg_type(buf).ok()?;
716 if mt == PUBLISH {
717 Some(self.topic_len(buf).map(FieldValue::U16))
718 } else {
719 None
720 }
721 },
722 "topic" => {
723 let mt = self.msg_type(buf).ok()?;
724 if mt == PUBLISH {
725 Some(self.topic(buf).map(FieldValue::Str))
726 } else {
727 None
728 }
729 },
730 "msgid" => {
731 let mt = self.msg_type(buf).ok()?;
732 match mt {
733 PUBLISH => {
734 let qos = self.qos(buf).ok()?;
735 if qos > 0 {
736 Some(self.msgid(buf).map(FieldValue::U16))
737 } else {
738 None
739 }
740 },
741 PUBACK | PUBREC | PUBREL | PUBCOMP | SUBSCRIBE | SUBACK | UNSUBSCRIBE
742 | UNSUBACK => Some(self.msgid(buf).map(FieldValue::U16)),
743 _ => None,
744 }
745 },
746 "value" => {
747 let mt = self.msg_type(buf).ok()?;
748 if mt == PUBLISH {
749 Some(self.value(buf).map(FieldValue::Bytes))
750 } else {
751 None
752 }
753 },
754 "proto_name" => {
755 let mt = self.msg_type(buf).ok()?;
756 if mt == CONNECT {
757 Some(self.proto_name(buf).map(FieldValue::Str))
758 } else {
759 None
760 }
761 },
762 "proto_level" => {
763 let mt = self.msg_type(buf).ok()?;
764 if mt == CONNECT {
765 Some(self.proto_level(buf).map(FieldValue::U8))
766 } else {
767 None
768 }
769 },
770 "connect_flags" => {
771 let mt = self.msg_type(buf).ok()?;
772 if mt == CONNECT {
773 Some(self.connect_flags(buf).map(FieldValue::U8))
774 } else {
775 None
776 }
777 },
778 "klive" => {
779 let mt = self.msg_type(buf).ok()?;
780 if mt == CONNECT {
781 Some(self.klive(buf).map(FieldValue::U16))
782 } else {
783 None
784 }
785 },
786 "client_id" => {
787 let mt = self.msg_type(buf).ok()?;
788 if mt == CONNECT {
789 Some(self.client_id(buf).map(FieldValue::Str))
790 } else {
791 None
792 }
793 },
794 "usernameflag" => {
795 let mt = self.msg_type(buf).ok()?;
796 if mt == CONNECT {
797 Some(self.usernameflag(buf).map(FieldValue::Bool))
798 } else {
799 None
800 }
801 },
802 "passwordflag" => {
803 let mt = self.msg_type(buf).ok()?;
804 if mt == CONNECT {
805 Some(self.passwordflag(buf).map(FieldValue::Bool))
806 } else {
807 None
808 }
809 },
810 "willretainflag" => {
811 let mt = self.msg_type(buf).ok()?;
812 if mt == CONNECT {
813 Some(self.willretainflag(buf).map(FieldValue::Bool))
814 } else {
815 None
816 }
817 },
818 "willQOSflag" => {
819 let mt = self.msg_type(buf).ok()?;
820 if mt == CONNECT {
821 Some(self.will_qosflag(buf).map(FieldValue::U8))
822 } else {
823 None
824 }
825 },
826 "willflag" => {
827 let mt = self.msg_type(buf).ok()?;
828 if mt == CONNECT {
829 Some(self.willflag(buf).map(FieldValue::Bool))
830 } else {
831 None
832 }
833 },
834 "cleansess" => {
835 let mt = self.msg_type(buf).ok()?;
836 if mt == CONNECT {
837 Some(self.cleansess(buf).map(FieldValue::Bool))
838 } else {
839 None
840 }
841 },
842 "sess_present_flag" => {
843 let mt = self.msg_type(buf).ok()?;
844 if mt == CONNACK {
845 Some(self.sess_present_flag(buf).map(FieldValue::U8))
846 } else {
847 None
848 }
849 },
850 "retcode" => {
851 let mt = self.msg_type(buf).ok()?;
852 if mt == CONNACK {
853 Some(self.retcode(buf).map(FieldValue::U8))
854 } else {
855 None
856 }
857 },
858 "retcodes" => {
859 let mt = self.msg_type(buf).ok()?;
860 if mt == SUBACK {
861 Some(self.retcodes(buf).map(FieldValue::Bytes))
862 } else {
863 None
864 }
865 },
866 _ => None,
867 }
868 }
869
870 pub fn set_field(
872 &self,
873 _buf: &mut [u8],
874 _name: &str,
875 _value: FieldValue,
876 ) -> Option<Result<(), FieldError>> {
877 None
880 }
881}
882
883impl Layer for MqttLayer {
884 fn kind(&self) -> LayerKind {
885 LayerKind::Mqtt
886 }
887
888 fn summary(&self, data: &[u8]) -> String {
889 self.summary(data)
890 }
891
892 fn header_len(&self, data: &[u8]) -> usize {
893 self.compute_header_len(data)
894 }
895
896 fn field_names(&self) -> &'static [&'static str] {
897 MQTT_FIELD_NAMES
898 }
899}
900
901#[cfg(test)]
906mod tests {
907 use super::*;
908
909 #[test]
912 fn test_decode_variable_length_single_byte() {
913 let buf = [0x00];
915 let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
916 assert_eq!(val, 0);
917 assert_eq!(consumed, 1);
918
919 let buf = [0x7F];
921 let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
922 assert_eq!(val, 127);
923 assert_eq!(consumed, 1);
924 }
925
926 #[test]
927 fn test_decode_variable_length_two_bytes() {
928 let buf = [0x80, 0x01];
930 let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
931 assert_eq!(val, 128);
932 assert_eq!(consumed, 2);
933
934 let buf = [0xFF, 0x7F];
936 let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
937 assert_eq!(val, 16383);
938 assert_eq!(consumed, 2);
939 }
940
941 #[test]
942 fn test_decode_variable_length_four_bytes() {
943 let buf = [0xFF, 0xFF, 0xFF, 0x7F];
945 let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
946 assert_eq!(val, 268_435_455);
947 assert_eq!(consumed, 4);
948 }
949
950 #[test]
951 fn test_encode_variable_length_roundtrip() {
952 for &val in &[0u32, 1, 127, 128, 16383, 16384, 2_097_151, 268_435_455] {
953 let encoded = encode_variable_length(val);
954 let (decoded, consumed) = decode_variable_length(&encoded, 0).unwrap();
955 assert_eq!(decoded, val, "roundtrip failed for {}", val);
956 assert_eq!(consumed, encoded.len());
957 }
958 }
959
960 #[test]
961 fn test_encode_variable_length_zero() {
962 let encoded = encode_variable_length(0);
963 assert_eq!(encoded, vec![0x00]);
964 }
965
966 #[test]
967 fn test_encode_variable_length_single_byte() {
968 let encoded = encode_variable_length(10);
969 assert_eq!(encoded, vec![0x0a]);
970 }
971
972 #[test]
975 fn test_is_mqtt_payload_valid() {
976 assert!(is_mqtt_payload(&[0xC0, 0x00]));
978 assert!(is_mqtt_payload(&[0x10, 0x1f]));
980 assert!(is_mqtt_payload(&[0x30, 0x0a, 0x00, 0x04]));
982 }
983
984 #[test]
985 fn test_is_mqtt_payload_invalid() {
986 assert!(!is_mqtt_payload(&[0x30]));
988 assert!(!is_mqtt_payload(&[0x00, 0x00]));
990 }
991
992 #[test]
995 fn test_parse_publish_qos0() {
996 let data: Vec<u8> = vec![
1005 0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1006 ];
1007 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1008 let mqtt = MqttLayer::new(idx);
1009
1010 assert_eq!(mqtt.msg_type(&data).unwrap(), PUBLISH);
1011 assert!(!mqtt.dup(&data).unwrap());
1012 assert_eq!(mqtt.qos(&data).unwrap(), 0);
1013 assert!(!mqtt.retain(&data).unwrap());
1014 assert_eq!(mqtt.remaining_length(&data).unwrap(), 10);
1015 assert_eq!(mqtt.fixed_header_len(&data), 2);
1016 assert_eq!(mqtt.topic_len(&data).unwrap(), 4);
1017 assert_eq!(mqtt.topic(&data).unwrap(), "test");
1018 assert_eq!(mqtt.value(&data).unwrap(), b"test");
1019 }
1020
1021 #[test]
1022 fn test_parse_publish_qos1() {
1023 let data: Vec<u8> = vec![
1027 0x32, 0x0c, 0x00, 0x04, b't', b'e', b's', b't', 0x00, 0x0a, b'd', b'a', b't', b'a',
1028 ];
1029 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1030 let mqtt = MqttLayer::new(idx);
1031
1032 assert_eq!(mqtt.msg_type(&data).unwrap(), PUBLISH);
1033 assert_eq!(mqtt.qos(&data).unwrap(), 1);
1034 assert_eq!(mqtt.topic(&data).unwrap(), "test");
1035 assert_eq!(mqtt.msgid(&data).unwrap(), 10);
1036 assert_eq!(mqtt.value(&data).unwrap(), b"data");
1037 }
1038
1039 #[test]
1042 fn test_parse_connect() {
1043 let data: Vec<u8> = vec![
1045 0x10, 0x1f, 0x00, 0x06, b'M', b'Q', b'I', b's', b'd', b'p', 0x03, 0x02, 0x00, 0x3c, 0x00, 0x11, b'm', b'o', b's', b'q', b'p', b'u', b'b', b'/', b'1', b'4', b'4', b'0', b'-', b'k',
1053 b'a', b'l', b'i',
1054 ];
1055 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1056 let mqtt = MqttLayer::new(idx);
1057
1058 assert_eq!(mqtt.msg_type(&data).unwrap(), CONNECT);
1059 assert_eq!(mqtt.remaining_length(&data).unwrap(), 31);
1060 assert_eq!(mqtt.proto_name(&data).unwrap(), "MQIsdp");
1061 assert_eq!(mqtt.proto_level(&data).unwrap(), 3);
1062 assert_eq!(mqtt.connect_flags(&data).unwrap(), 0x02);
1063 assert!(mqtt.cleansess(&data).unwrap());
1064 assert!(!mqtt.usernameflag(&data).unwrap());
1065 assert!(!mqtt.passwordflag(&data).unwrap());
1066 assert!(!mqtt.willflag(&data).unwrap());
1067 assert_eq!(mqtt.klive(&data).unwrap(), 60);
1068 assert_eq!(mqtt.client_id(&data).unwrap(), "mosqpub/1440-kali");
1069 }
1070
1071 #[test]
1074 fn test_parse_connack() {
1075 let data: Vec<u8> = vec![0x20, 0x02, 0x00, 0x00];
1077 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1078 let mqtt = MqttLayer::new(idx);
1079
1080 assert_eq!(mqtt.msg_type(&data).unwrap(), CONNACK);
1081 assert_eq!(mqtt.remaining_length(&data).unwrap(), 2);
1082 assert_eq!(mqtt.sess_present_flag(&data).unwrap(), 0);
1083 assert_eq!(mqtt.retcode(&data).unwrap(), 0);
1084 }
1085
1086 #[test]
1089 fn test_parse_subscribe() {
1090 let data: Vec<u8> = vec![
1094 0x82, 0x09, 0x00, 0x01, 0x00, 0x04, b't', b'e', b's', b't', 0x01,
1095 ];
1096 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1097 let mqtt = MqttLayer::new(idx);
1098
1099 assert_eq!(mqtt.msg_type(&data).unwrap(), SUBSCRIBE);
1100 assert_eq!(mqtt.remaining_length(&data).unwrap(), 9);
1101 assert_eq!(mqtt.msgid(&data).unwrap(), 1);
1102 }
1103
1104 #[test]
1107 fn test_parse_suback() {
1108 let data: Vec<u8> = vec![0x90, 0x03, 0x00, 0x01, 0x00];
1111 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1112 let mqtt = MqttLayer::new(idx);
1113
1114 assert_eq!(mqtt.msg_type(&data).unwrap(), SUBACK);
1115 assert_eq!(mqtt.remaining_length(&data).unwrap(), 3);
1116 assert_eq!(mqtt.msgid(&data).unwrap(), 1);
1117 assert_eq!(mqtt.retcodes(&data).unwrap(), vec![0x00]);
1118 }
1119
1120 #[test]
1123 fn test_parse_pingreq() {
1124 let data: Vec<u8> = vec![0xC0, 0x00];
1125 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1126 let mqtt = MqttLayer::new(idx);
1127
1128 assert_eq!(mqtt.msg_type(&data).unwrap(), PINGREQ);
1129 assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1130 }
1131
1132 #[test]
1133 fn test_parse_pingresp() {
1134 let data: Vec<u8> = vec![0xD0, 0x00];
1135 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1136 let mqtt = MqttLayer::new(idx);
1137
1138 assert_eq!(mqtt.msg_type(&data).unwrap(), PINGRESP);
1139 assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1140 }
1141
1142 #[test]
1145 fn test_parse_disconnect() {
1146 let data: Vec<u8> = vec![0xE0, 0x00];
1147 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1148 let mqtt = MqttLayer::new(idx);
1149
1150 assert_eq!(mqtt.msg_type(&data).unwrap(), DISCONNECT);
1151 assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1152 }
1153
1154 #[test]
1157 fn test_message_type_names() {
1158 assert_eq!(message_type_name(CONNECT), "CONNECT");
1159 assert_eq!(message_type_name(CONNACK), "CONNACK");
1160 assert_eq!(message_type_name(PUBLISH), "PUBLISH");
1161 assert_eq!(message_type_name(PUBACK), "PUBACK");
1162 assert_eq!(message_type_name(PUBREC), "PUBREC");
1163 assert_eq!(message_type_name(PUBREL), "PUBREL");
1164 assert_eq!(message_type_name(PUBCOMP), "PUBCOMP");
1165 assert_eq!(message_type_name(SUBSCRIBE), "SUBSCRIBE");
1166 assert_eq!(message_type_name(SUBACK), "SUBACK");
1167 assert_eq!(message_type_name(UNSUBSCRIBE), "UNSUBSCRIBE");
1168 assert_eq!(message_type_name(UNSUBACK), "UNSUBACK");
1169 assert_eq!(message_type_name(PINGREQ), "PINGREQ");
1170 assert_eq!(message_type_name(PINGRESP), "PINGRESP");
1171 assert_eq!(message_type_name(DISCONNECT), "DISCONNECT");
1172 assert_eq!(message_type_name(AUTH), "AUTH");
1173 assert_eq!(message_type_name(0), "UNKNOWN");
1174 }
1175
1176 #[test]
1179 fn test_summary_publish() {
1180 let data: Vec<u8> = vec![
1181 0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1182 ];
1183 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1184 let mqtt = MqttLayer::new(idx);
1185 let s = mqtt.summary(&data);
1186 assert!(s.contains("PUBLISH"));
1187 assert!(s.contains("topic=test"));
1188 assert!(s.contains("QOS=0"));
1189 }
1190
1191 #[test]
1192 fn test_summary_connect() {
1193 let data: Vec<u8> = vec![
1194 0x10, 0x1f, 0x00, 0x06, b'M', b'Q', b'I', b's', b'd', b'p', 0x03, 0x02, 0x00, 0x3c,
1195 0x00, 0x11, b'm', b'o', b's', b'q', b'p', b'u', b'b', b'/', b'1', b'4', b'4', b'0',
1196 b'-', b'k', b'a', b'l', b'i',
1197 ];
1198 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1199 let mqtt = MqttLayer::new(idx);
1200 let s = mqtt.summary(&data);
1201 assert!(s.contains("CONNECT"));
1202 assert!(s.contains("clientId=mosqpub/1440-kali"));
1203 }
1204
1205 #[test]
1208 fn test_get_field_msg_type() {
1209 let data: Vec<u8> = vec![0xC0, 0x00];
1210 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1211 let mqtt = MqttLayer::new(idx);
1212 let val = mqtt.get_field(&data, "msg_type").unwrap().unwrap();
1213 assert_eq!(val, FieldValue::U8(PINGREQ));
1214 }
1215
1216 #[test]
1217 fn test_get_field_unknown() {
1218 let data: Vec<u8> = vec![0xC0, 0x00];
1219 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1220 let mqtt = MqttLayer::new(idx);
1221 assert!(mqtt.get_field(&data, "nonexistent").is_none());
1222 }
1223
1224 #[test]
1227 fn test_layer_trait_kind() {
1228 let idx = LayerIndex::new(LayerKind::Mqtt, 0, 2);
1229 let mqtt = MqttLayer::new(idx);
1230 assert_eq!(mqtt.kind(), LayerKind::Mqtt);
1231 }
1232
1233 #[test]
1234 fn test_layer_trait_header_len() {
1235 let data: Vec<u8> = vec![
1237 0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1238 ];
1239 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1240 let mqtt = MqttLayer::new(idx);
1241 assert_eq!(Layer::header_len(&mqtt, &data), 12);
1242 }
1243
1244 #[test]
1247 fn test_parse_puback() {
1248 let data: Vec<u8> = vec![0x40, 0x02, 0x00, 0x0a];
1250 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1251 let mqtt = MqttLayer::new(idx);
1252
1253 assert_eq!(mqtt.msg_type(&data).unwrap(), PUBACK);
1254 assert_eq!(mqtt.msgid(&data).unwrap(), 10);
1255 }
1256
1257 #[test]
1260 fn test_builder_roundtrip_publish() {
1261 let built = MqttBuilder::new()
1262 .publish()
1263 .topic(b"test")
1264 .payload(b"hello")
1265 .build();
1266
1267 let idx = LayerIndex::new(LayerKind::Mqtt, 0, built.len());
1268 let mqtt = MqttLayer::new(idx);
1269
1270 assert_eq!(mqtt.msg_type(&built).unwrap(), PUBLISH);
1271 assert_eq!(mqtt.qos(&built).unwrap(), 0);
1272 assert_eq!(mqtt.topic(&built).unwrap(), "test");
1273 assert_eq!(mqtt.value(&built).unwrap(), b"hello");
1274 }
1275
1276 #[test]
1277 fn test_builder_roundtrip_connect() {
1278 let built = MqttBuilder::new()
1279 .connect()
1280 .client_id(b"myclient")
1281 .keep_alive(120)
1282 .clean_session(true)
1283 .build();
1284
1285 let idx = LayerIndex::new(LayerKind::Mqtt, 0, built.len());
1286 let mqtt = MqttLayer::new(idx);
1287
1288 assert_eq!(mqtt.msg_type(&built).unwrap(), CONNECT);
1289 assert_eq!(mqtt.proto_name(&built).unwrap(), "MQTT");
1290 assert_eq!(mqtt.proto_level(&built).unwrap(), 4);
1291 assert!(mqtt.cleansess(&built).unwrap());
1292 assert_eq!(mqtt.klive(&built).unwrap(), 120);
1293 assert_eq!(mqtt.client_id(&built).unwrap(), "myclient");
1294 }
1295}