1pub mod builder;
34
35pub use builder::MqttBuilder;
36
37use crate::layer::field::{FieldError, FieldValue};
38use crate::layer::{Layer, LayerIndex, LayerKind};
39
40pub const MQTT_MIN_HEADER_LEN: usize = 2;
42
43pub const MQTT_PORT: u16 = 1883;
45
46pub const CONNECT: u8 = 1;
51pub const CONNACK: u8 = 2;
52pub const PUBLISH: u8 = 3;
53pub const PUBACK: u8 = 4;
54pub const PUBREC: u8 = 5;
55pub const PUBREL: u8 = 6;
56pub const PUBCOMP: u8 = 7;
57pub const SUBSCRIBE: u8 = 8;
58pub const SUBACK: u8 = 9;
59pub const UNSUBSCRIBE: u8 = 10;
60pub const UNSUBACK: u8 = 11;
61pub const PINGREQ: u8 = 12;
62pub const PINGRESP: u8 = 13;
63pub const DISCONNECT: u8 = 14;
64pub const AUTH: u8 = 15;
65
66pub static MQTT_FIELD_NAMES: &[&str] = &[
68 "msg_type",
69 "dup",
70 "qos",
71 "retain",
72 "remaining_length",
73 "topic",
74 "topic_len",
75 "msgid",
76 "value",
77 "proto_name",
78 "proto_level",
79 "connect_flags",
80 "klive",
81 "client_id",
82 "usernameflag",
83 "passwordflag",
84 "willretainflag",
85 "willQOSflag",
86 "willflag",
87 "cleansess",
88 "sess_present_flag",
89 "retcode",
90 "retcodes",
91];
92
93pub fn decode_variable_length(buf: &[u8], offset: usize) -> Result<(u32, usize), FieldError> {
104 let mut value: u32 = 0;
105 let mut multiplier: u32 = 1;
106 let mut idx = offset;
107
108 loop {
109 if idx >= buf.len() {
110 return Err(FieldError::BufferTooShort {
111 offset: idx,
112 need: 1,
113 have: 0,
114 });
115 }
116 let encoded_byte = buf[idx];
117 value = match u32::from(encoded_byte & 0x7F)
118 .checked_mul(multiplier)
119 .and_then(|v| value.checked_add(v))
120 {
121 Some(v) => v,
122 None => {
123 return Err(FieldError::InvalidValue(
124 "variable-length integer overflow".into(),
125 ));
126 },
127 };
128
129 if multiplier > 128 * 128 * 128 {
130 return Err(FieldError::InvalidValue(
131 "variable-length integer exceeds 4 bytes".into(),
132 ));
133 }
134
135 idx += 1;
136 if encoded_byte & 0x80 == 0 {
137 break;
138 }
139 multiplier = multiplier.saturating_mul(128);
140 }
141
142 Ok((value, idx - offset))
143}
144
145#[must_use]
149pub fn encode_variable_length(value: u32) -> Vec<u8> {
150 if value == 0 {
151 return vec![0x00];
152 }
153 let mut result = Vec::with_capacity(4);
154 let mut x = value;
155 while x > 0 {
156 let mut encoded_byte = (x % 128) as u8;
157 x /= 128;
158 if x > 0 {
159 encoded_byte |= 0x80;
160 }
161 result.push(encoded_byte);
162 }
163 result
164}
165
166#[must_use]
168pub fn message_type_name(msg_type: u8) -> &'static str {
169 match msg_type {
170 CONNECT => "CONNECT",
171 CONNACK => "CONNACK",
172 PUBLISH => "PUBLISH",
173 PUBACK => "PUBACK",
174 PUBREC => "PUBREC",
175 PUBREL => "PUBREL",
176 PUBCOMP => "PUBCOMP",
177 SUBSCRIBE => "SUBSCRIBE",
178 SUBACK => "SUBACK",
179 UNSUBSCRIBE => "UNSUBSCRIBE",
180 UNSUBACK => "UNSUBACK",
181 PINGREQ => "PINGREQ",
182 PINGRESP => "PINGRESP",
183 DISCONNECT => "DISCONNECT",
184 AUTH => "AUTH",
185 _ => "UNKNOWN",
186 }
187}
188
189#[must_use]
194pub fn is_mqtt_payload(buf: &[u8]) -> bool {
195 if buf.len() < 2 {
196 return false;
197 }
198 let msg_type = (buf[0] >> 4) & 0x0F;
199 if !(1..=15).contains(&msg_type) {
200 return false;
201 }
202 decode_variable_length(buf, 1).is_ok()
204}
205
206#[derive(Debug, Clone)]
212pub struct MqttLayer {
213 pub index: LayerIndex,
214}
215
216impl MqttLayer {
217 #[must_use]
219 pub fn new(index: LayerIndex) -> Self {
220 Self { index }
221 }
222
223 #[must_use]
225 pub fn at_start(len: usize) -> Self {
226 Self {
227 index: LayerIndex::new(LayerKind::Mqtt, 0, len),
228 }
229 }
230
231 fn slice<'a>(&self, buf: &'a [u8]) -> &'a [u8] {
233 self.index.slice(buf)
234 }
235
236 pub fn msg_type(&self, buf: &[u8]) -> Result<u8, FieldError> {
242 let s = self.slice(buf);
243 if s.is_empty() {
244 return Err(FieldError::BufferTooShort {
245 offset: self.index.start,
246 need: 1,
247 have: 0,
248 });
249 }
250 Ok((s[0] >> 4) & 0x0F)
251 }
252
253 pub fn dup(&self, buf: &[u8]) -> Result<bool, FieldError> {
255 let s = self.slice(buf);
256 if s.is_empty() {
257 return Err(FieldError::BufferTooShort {
258 offset: self.index.start,
259 need: 1,
260 have: 0,
261 });
262 }
263 Ok((s[0] >> 3) & 0x01 == 1)
264 }
265
266 pub fn qos(&self, buf: &[u8]) -> Result<u8, FieldError> {
268 let s = self.slice(buf);
269 if s.is_empty() {
270 return Err(FieldError::BufferTooShort {
271 offset: self.index.start,
272 need: 1,
273 have: 0,
274 });
275 }
276 Ok((s[0] >> 1) & 0x03)
277 }
278
279 pub fn retain(&self, buf: &[u8]) -> Result<bool, FieldError> {
281 let s = self.slice(buf);
282 if s.is_empty() {
283 return Err(FieldError::BufferTooShort {
284 offset: self.index.start,
285 need: 1,
286 have: 0,
287 });
288 }
289 Ok(s[0] & 0x01 == 1)
290 }
291
292 pub fn remaining_length(&self, buf: &[u8]) -> Result<u32, FieldError> {
294 let s = self.slice(buf);
295 if s.len() < 2 {
296 return Err(FieldError::BufferTooShort {
297 offset: self.index.start + 1,
298 need: 1,
299 have: s.len().saturating_sub(1),
300 });
301 }
302 let (val, _consumed) = decode_variable_length(s, 1)?;
303 Ok(val)
304 }
305
306 #[must_use]
308 pub fn fixed_header_len(&self, buf: &[u8]) -> usize {
309 let s = self.slice(buf);
310 if s.len() < 2 {
311 return MQTT_MIN_HEADER_LEN;
312 }
313 match decode_variable_length(s, 1) {
314 Ok((_val, consumed)) => 1 + consumed,
315 Err(_) => MQTT_MIN_HEADER_LEN,
316 }
317 }
318
319 fn var_header_offset(&self, buf: &[u8]) -> usize {
321 self.index.start + self.fixed_header_len(buf)
322 }
323
324 pub fn topic_len(&self, buf: &[u8]) -> Result<u16, FieldError> {
330 let off = self.var_header_offset(buf);
331 if off + 2 > buf.len() {
332 return Err(FieldError::BufferTooShort {
333 offset: off,
334 need: 2,
335 have: buf.len().saturating_sub(off),
336 });
337 }
338 Ok(u16::from_be_bytes([buf[off], buf[off + 1]]))
339 }
340
341 pub fn topic(&self, buf: &[u8]) -> Result<String, FieldError> {
343 let off = self.var_header_offset(buf);
344 if off + 2 > buf.len() {
345 return Err(FieldError::BufferTooShort {
346 offset: off,
347 need: 2,
348 have: buf.len().saturating_sub(off),
349 });
350 }
351 let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
352 let topic_start = off + 2;
353 if topic_start + tlen > buf.len() {
354 return Err(FieldError::BufferTooShort {
355 offset: topic_start,
356 need: tlen,
357 have: buf.len().saturating_sub(topic_start),
358 });
359 }
360 String::from_utf8(buf[topic_start..topic_start + tlen].to_vec())
361 .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 topic: {e}")))
362 }
363
364 pub fn msgid(&self, buf: &[u8]) -> Result<u16, FieldError> {
370 let mt = self.msg_type(buf)?;
371 let off = self.var_header_offset(buf);
372
373 match mt {
374 PUBLISH => {
375 if off + 2 > buf.len() {
377 return Err(FieldError::BufferTooShort {
378 offset: off,
379 need: 2,
380 have: buf.len().saturating_sub(off),
381 });
382 }
383 let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
384 let msgid_off = off + 2 + tlen;
385 if msgid_off + 2 > buf.len() {
386 return Err(FieldError::BufferTooShort {
387 offset: msgid_off,
388 need: 2,
389 have: buf.len().saturating_sub(msgid_off),
390 });
391 }
392 Ok(u16::from_be_bytes([buf[msgid_off], buf[msgid_off + 1]]))
393 },
394 PUBACK | PUBREC | PUBREL | PUBCOMP | SUBSCRIBE | SUBACK | UNSUBSCRIBE | UNSUBACK => {
395 if off + 2 > buf.len() {
396 return Err(FieldError::BufferTooShort {
397 offset: off,
398 need: 2,
399 have: buf.len().saturating_sub(off),
400 });
401 }
402 Ok(u16::from_be_bytes([buf[off], buf[off + 1]]))
403 },
404 _ => Err(FieldError::InvalidValue(format!(
405 "message type {mt} does not have a msgid field"
406 ))),
407 }
408 }
409
410 pub fn value(&self, buf: &[u8]) -> Result<Vec<u8>, FieldError> {
412 let off = self.var_header_offset(buf);
413 let rem_len = self.remaining_length(buf)? as usize;
414 let fixed_hdr = self.fixed_header_len(buf);
415 let payload_end = self.index.start + fixed_hdr + rem_len;
416 let payload_end = payload_end.min(buf.len());
417
418 if off + 2 > buf.len() {
419 return Err(FieldError::BufferTooShort {
420 offset: off,
421 need: 2,
422 have: buf.len().saturating_sub(off),
423 });
424 }
425 let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
426 let mut value_start = off + 2 + tlen;
427
428 let qos = self.qos(buf)?;
430 if qos > 0 {
431 value_start += 2;
432 }
433
434 if value_start > payload_end {
435 return Ok(Vec::new());
436 }
437 Ok(buf[value_start..payload_end].to_vec())
438 }
439
440 pub fn proto_name(&self, buf: &[u8]) -> Result<String, FieldError> {
446 let off = self.var_header_offset(buf);
447 if off + 2 > buf.len() {
448 return Err(FieldError::BufferTooShort {
449 offset: off,
450 need: 2,
451 have: buf.len().saturating_sub(off),
452 });
453 }
454 let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
455 let name_start = off + 2;
456 if name_start + name_len > buf.len() {
457 return Err(FieldError::BufferTooShort {
458 offset: name_start,
459 need: name_len,
460 have: buf.len().saturating_sub(name_start),
461 });
462 }
463 String::from_utf8(buf[name_start..name_start + name_len].to_vec())
464 .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 proto_name: {e}")))
465 }
466
467 pub fn proto_level(&self, buf: &[u8]) -> Result<u8, FieldError> {
469 let off = self.var_header_offset(buf);
470 if off + 2 > buf.len() {
471 return Err(FieldError::BufferTooShort {
472 offset: off,
473 need: 2,
474 have: buf.len().saturating_sub(off),
475 });
476 }
477 let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
478 let level_off = off + 2 + name_len;
479 if level_off >= buf.len() {
480 return Err(FieldError::BufferTooShort {
481 offset: level_off,
482 need: 1,
483 have: 0,
484 });
485 }
486 Ok(buf[level_off])
487 }
488
489 pub fn connect_flags(&self, buf: &[u8]) -> Result<u8, FieldError> {
491 let off = self.var_header_offset(buf);
492 if off + 2 > buf.len() {
493 return Err(FieldError::BufferTooShort {
494 offset: off,
495 need: 2,
496 have: buf.len().saturating_sub(off),
497 });
498 }
499 let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
500 let flags_off = off + 2 + name_len + 1;
501 if flags_off >= buf.len() {
502 return Err(FieldError::BufferTooShort {
503 offset: flags_off,
504 need: 1,
505 have: 0,
506 });
507 }
508 Ok(buf[flags_off])
509 }
510
511 pub fn usernameflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
513 let flags = self.connect_flags(buf)?;
514 Ok((flags >> 7) & 0x01 == 1)
515 }
516
517 pub fn passwordflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
519 let flags = self.connect_flags(buf)?;
520 Ok((flags >> 6) & 0x01 == 1)
521 }
522
523 pub fn willretainflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
525 let flags = self.connect_flags(buf)?;
526 Ok((flags >> 5) & 0x01 == 1)
527 }
528
529 pub fn will_qosflag(&self, buf: &[u8]) -> Result<u8, FieldError> {
531 let flags = self.connect_flags(buf)?;
532 Ok((flags >> 3) & 0x03)
533 }
534
535 pub fn willflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
537 let flags = self.connect_flags(buf)?;
538 Ok((flags >> 2) & 0x01 == 1)
539 }
540
541 pub fn cleansess(&self, buf: &[u8]) -> Result<bool, FieldError> {
543 let flags = self.connect_flags(buf)?;
544 Ok((flags >> 1) & 0x01 == 1)
545 }
546
547 pub fn klive(&self, buf: &[u8]) -> Result<u16, FieldError> {
549 let off = self.var_header_offset(buf);
550 if off + 2 > buf.len() {
551 return Err(FieldError::BufferTooShort {
552 offset: off,
553 need: 2,
554 have: buf.len().saturating_sub(off),
555 });
556 }
557 let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
558 let klive_off = off + 2 + name_len + 2; if klive_off + 2 > buf.len() {
560 return Err(FieldError::BufferTooShort {
561 offset: klive_off,
562 need: 2,
563 have: buf.len().saturating_sub(klive_off),
564 });
565 }
566 Ok(u16::from_be_bytes([buf[klive_off], buf[klive_off + 1]]))
567 }
568
569 fn connect_payload_offset(&self, buf: &[u8]) -> Result<usize, FieldError> {
571 let off = self.var_header_offset(buf);
572 if off + 2 > buf.len() {
573 return Err(FieldError::BufferTooShort {
574 offset: off,
575 need: 2,
576 have: buf.len().saturating_sub(off),
577 });
578 }
579 let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
580 Ok(off + 2 + name_len + 1 + 1 + 2)
582 }
583
584 pub fn client_id(&self, buf: &[u8]) -> Result<String, FieldError> {
586 let off = self.connect_payload_offset(buf)?;
587 if off + 2 > buf.len() {
588 return Err(FieldError::BufferTooShort {
589 offset: off,
590 need: 2,
591 have: buf.len().saturating_sub(off),
592 });
593 }
594 let cid_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
595 let cid_start = off + 2;
596 if cid_start + cid_len > buf.len() {
597 return Err(FieldError::BufferTooShort {
598 offset: cid_start,
599 need: cid_len,
600 have: buf.len().saturating_sub(cid_start),
601 });
602 }
603 String::from_utf8(buf[cid_start..cid_start + cid_len].to_vec())
604 .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 client_id: {e}")))
605 }
606
607 pub fn sess_present_flag(&self, buf: &[u8]) -> Result<u8, FieldError> {
613 let off = self.var_header_offset(buf);
614 if off >= buf.len() {
615 return Err(FieldError::BufferTooShort {
616 offset: off,
617 need: 1,
618 have: 0,
619 });
620 }
621 Ok(buf[off] & 0x01)
622 }
623
624 pub fn retcode(&self, buf: &[u8]) -> Result<u8, FieldError> {
626 let off = self.var_header_offset(buf);
627 if off + 2 > buf.len() {
628 return Err(FieldError::BufferTooShort {
629 offset: off + 1,
630 need: 1,
631 have: buf.len().saturating_sub(off + 1),
632 });
633 }
634 Ok(buf[off + 1])
635 }
636
637 pub fn retcodes(&self, buf: &[u8]) -> Result<Vec<u8>, FieldError> {
643 let off = self.var_header_offset(buf);
644 let rem_len = self.remaining_length(buf)? as usize;
645 let fixed_hdr = self.fixed_header_len(buf);
646 let payload_end = self.index.start + fixed_hdr + rem_len;
647 let payload_end = payload_end.min(buf.len());
648
649 let retcodes_start = off + 2; if retcodes_start > payload_end {
651 return Ok(Vec::new());
652 }
653 Ok(buf[retcodes_start..payload_end].to_vec())
654 }
655
656 #[must_use]
662 pub fn summary(&self, buf: &[u8]) -> String {
663 let mt = match self.msg_type(buf) {
664 Ok(t) => t,
665 Err(_) => return "MQTT".to_string(),
666 };
667 let type_name = message_type_name(mt);
668
669 match mt {
670 PUBLISH => {
671 let topic = self.topic(buf).unwrap_or_else(|_| "?".to_string());
672 let qos = self.qos(buf).unwrap_or(0);
673 format!("MQTT {type_name} topic={topic} QOS={qos}")
674 },
675 CONNECT => {
676 let cid = self.client_id(buf).unwrap_or_else(|_| "?".to_string());
677 format!("MQTT {type_name} clientId={cid}")
678 },
679 CONNACK => {
680 let rc = self.retcode(buf).unwrap_or(0);
681 format!("MQTT {type_name} retcode={rc}")
682 },
683 SUBSCRIBE | UNSUBSCRIBE => {
684 let mid = self.msgid(buf).unwrap_or(0);
685 format!("MQTT {type_name} msgid={mid}")
686 },
687 SUBACK => {
688 let mid = self.msgid(buf).unwrap_or(0);
689 format!("MQTT {type_name} msgid={mid}")
690 },
691 PUBACK | PUBREC | PUBREL | PUBCOMP | UNSUBACK => {
692 let mid = self.msgid(buf).unwrap_or(0);
693 format!("MQTT {type_name} msgid={mid}")
694 },
695 _ => format!("MQTT {type_name}"),
696 }
697 }
698
699 fn compute_header_len(&self, buf: &[u8]) -> usize {
701 let fixed_hdr = self.fixed_header_len(buf);
702 let rem_len = self.remaining_length(buf).unwrap_or(0) as usize;
703 fixed_hdr + rem_len
704 }
705
706 #[must_use]
712 pub fn field_names() -> &'static [&'static str] {
713 MQTT_FIELD_NAMES
714 }
715
716 pub fn get_field(&self, buf: &[u8], name: &str) -> Option<Result<FieldValue, FieldError>> {
718 match name {
719 "msg_type" => Some(self.msg_type(buf).map(FieldValue::U8)),
720 "dup" => Some(self.dup(buf).map(FieldValue::Bool)),
721 "qos" => Some(self.qos(buf).map(FieldValue::U8)),
722 "retain" => Some(self.retain(buf).map(FieldValue::Bool)),
723 "remaining_length" => Some(self.remaining_length(buf).map(FieldValue::U32)),
724 "topic_len" => {
725 let mt = self.msg_type(buf).ok()?;
726 if mt == PUBLISH {
727 Some(self.topic_len(buf).map(FieldValue::U16))
728 } else {
729 None
730 }
731 },
732 "topic" => {
733 let mt = self.msg_type(buf).ok()?;
734 if mt == PUBLISH {
735 Some(self.topic(buf).map(FieldValue::Str))
736 } else {
737 None
738 }
739 },
740 "msgid" => {
741 let mt = self.msg_type(buf).ok()?;
742 match mt {
743 PUBLISH => {
744 let qos = self.qos(buf).ok()?;
745 if qos > 0 {
746 Some(self.msgid(buf).map(FieldValue::U16))
747 } else {
748 None
749 }
750 },
751 PUBACK | PUBREC | PUBREL | PUBCOMP | SUBSCRIBE | SUBACK | UNSUBSCRIBE
752 | UNSUBACK => Some(self.msgid(buf).map(FieldValue::U16)),
753 _ => None,
754 }
755 },
756 "value" => {
757 let mt = self.msg_type(buf).ok()?;
758 if mt == PUBLISH {
759 Some(self.value(buf).map(FieldValue::Bytes))
760 } else {
761 None
762 }
763 },
764 "proto_name" => {
765 let mt = self.msg_type(buf).ok()?;
766 if mt == CONNECT {
767 Some(self.proto_name(buf).map(FieldValue::Str))
768 } else {
769 None
770 }
771 },
772 "proto_level" => {
773 let mt = self.msg_type(buf).ok()?;
774 if mt == CONNECT {
775 Some(self.proto_level(buf).map(FieldValue::U8))
776 } else {
777 None
778 }
779 },
780 "connect_flags" => {
781 let mt = self.msg_type(buf).ok()?;
782 if mt == CONNECT {
783 Some(self.connect_flags(buf).map(FieldValue::U8))
784 } else {
785 None
786 }
787 },
788 "klive" => {
789 let mt = self.msg_type(buf).ok()?;
790 if mt == CONNECT {
791 Some(self.klive(buf).map(FieldValue::U16))
792 } else {
793 None
794 }
795 },
796 "client_id" => {
797 let mt = self.msg_type(buf).ok()?;
798 if mt == CONNECT {
799 Some(self.client_id(buf).map(FieldValue::Str))
800 } else {
801 None
802 }
803 },
804 "usernameflag" => {
805 let mt = self.msg_type(buf).ok()?;
806 if mt == CONNECT {
807 Some(self.usernameflag(buf).map(FieldValue::Bool))
808 } else {
809 None
810 }
811 },
812 "passwordflag" => {
813 let mt = self.msg_type(buf).ok()?;
814 if mt == CONNECT {
815 Some(self.passwordflag(buf).map(FieldValue::Bool))
816 } else {
817 None
818 }
819 },
820 "willretainflag" => {
821 let mt = self.msg_type(buf).ok()?;
822 if mt == CONNECT {
823 Some(self.willretainflag(buf).map(FieldValue::Bool))
824 } else {
825 None
826 }
827 },
828 "willQOSflag" => {
829 let mt = self.msg_type(buf).ok()?;
830 if mt == CONNECT {
831 Some(self.will_qosflag(buf).map(FieldValue::U8))
832 } else {
833 None
834 }
835 },
836 "willflag" => {
837 let mt = self.msg_type(buf).ok()?;
838 if mt == CONNECT {
839 Some(self.willflag(buf).map(FieldValue::Bool))
840 } else {
841 None
842 }
843 },
844 "cleansess" => {
845 let mt = self.msg_type(buf).ok()?;
846 if mt == CONNECT {
847 Some(self.cleansess(buf).map(FieldValue::Bool))
848 } else {
849 None
850 }
851 },
852 "sess_present_flag" => {
853 let mt = self.msg_type(buf).ok()?;
854 if mt == CONNACK {
855 Some(self.sess_present_flag(buf).map(FieldValue::U8))
856 } else {
857 None
858 }
859 },
860 "retcode" => {
861 let mt = self.msg_type(buf).ok()?;
862 if mt == CONNACK {
863 Some(self.retcode(buf).map(FieldValue::U8))
864 } else {
865 None
866 }
867 },
868 "retcodes" => {
869 let mt = self.msg_type(buf).ok()?;
870 if mt == SUBACK {
871 Some(self.retcodes(buf).map(FieldValue::Bytes))
872 } else {
873 None
874 }
875 },
876 _ => None,
877 }
878 }
879
880 pub fn set_field(
882 &self,
883 _buf: &mut [u8],
884 _name: &str,
885 _value: FieldValue,
886 ) -> Option<Result<(), FieldError>> {
887 None
890 }
891}
892
893impl Layer for MqttLayer {
894 fn kind(&self) -> LayerKind {
895 LayerKind::Mqtt
896 }
897
898 fn summary(&self, data: &[u8]) -> String {
899 self.summary(data)
900 }
901
902 fn header_len(&self, data: &[u8]) -> usize {
903 self.compute_header_len(data)
904 }
905
906 fn field_names(&self) -> &'static [&'static str] {
907 MQTT_FIELD_NAMES
908 }
909}
910
911#[cfg(test)]
916mod tests {
917 use super::*;
918
919 #[test]
922 fn test_decode_variable_length_single_byte() {
923 let buf = [0x00];
925 let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
926 assert_eq!(val, 0);
927 assert_eq!(consumed, 1);
928
929 let buf = [0x7F];
931 let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
932 assert_eq!(val, 127);
933 assert_eq!(consumed, 1);
934 }
935
936 #[test]
937 fn test_decode_variable_length_two_bytes() {
938 let buf = [0x80, 0x01];
940 let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
941 assert_eq!(val, 128);
942 assert_eq!(consumed, 2);
943
944 let buf = [0xFF, 0x7F];
946 let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
947 assert_eq!(val, 16383);
948 assert_eq!(consumed, 2);
949 }
950
951 #[test]
952 fn test_decode_variable_length_four_bytes() {
953 let buf = [0xFF, 0xFF, 0xFF, 0x7F];
955 let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
956 assert_eq!(val, 268_435_455);
957 assert_eq!(consumed, 4);
958 }
959
960 #[test]
961 fn test_encode_variable_length_roundtrip() {
962 for &val in &[0u32, 1, 127, 128, 16383, 16384, 2_097_151, 268_435_455] {
963 let encoded = encode_variable_length(val);
964 let (decoded, consumed) = decode_variable_length(&encoded, 0).unwrap();
965 assert_eq!(decoded, val, "roundtrip failed for {}", val);
966 assert_eq!(consumed, encoded.len());
967 }
968 }
969
970 #[test]
971 fn test_encode_variable_length_zero() {
972 let encoded = encode_variable_length(0);
973 assert_eq!(encoded, vec![0x00]);
974 }
975
976 #[test]
977 fn test_encode_variable_length_single_byte() {
978 let encoded = encode_variable_length(10);
979 assert_eq!(encoded, vec![0x0a]);
980 }
981
982 #[test]
985 fn test_is_mqtt_payload_valid() {
986 assert!(is_mqtt_payload(&[0xC0, 0x00]));
988 assert!(is_mqtt_payload(&[0x10, 0x1f]));
990 assert!(is_mqtt_payload(&[0x30, 0x0a, 0x00, 0x04]));
992 }
993
994 #[test]
995 fn test_is_mqtt_payload_invalid() {
996 assert!(!is_mqtt_payload(&[0x30]));
998 assert!(!is_mqtt_payload(&[0x00, 0x00]));
1000 }
1001
1002 #[test]
1005 fn test_parse_publish_qos0() {
1006 let data: Vec<u8> = vec![
1015 0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1016 ];
1017 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1018 let mqtt = MqttLayer::new(idx);
1019
1020 assert_eq!(mqtt.msg_type(&data).unwrap(), PUBLISH);
1021 assert!(!mqtt.dup(&data).unwrap());
1022 assert_eq!(mqtt.qos(&data).unwrap(), 0);
1023 assert!(!mqtt.retain(&data).unwrap());
1024 assert_eq!(mqtt.remaining_length(&data).unwrap(), 10);
1025 assert_eq!(mqtt.fixed_header_len(&data), 2);
1026 assert_eq!(mqtt.topic_len(&data).unwrap(), 4);
1027 assert_eq!(mqtt.topic(&data).unwrap(), "test");
1028 assert_eq!(mqtt.value(&data).unwrap(), b"test");
1029 }
1030
1031 #[test]
1032 fn test_parse_publish_qos1() {
1033 let data: Vec<u8> = vec![
1037 0x32, 0x0c, 0x00, 0x04, b't', b'e', b's', b't', 0x00, 0x0a, b'd', b'a', b't', b'a',
1038 ];
1039 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1040 let mqtt = MqttLayer::new(idx);
1041
1042 assert_eq!(mqtt.msg_type(&data).unwrap(), PUBLISH);
1043 assert_eq!(mqtt.qos(&data).unwrap(), 1);
1044 assert_eq!(mqtt.topic(&data).unwrap(), "test");
1045 assert_eq!(mqtt.msgid(&data).unwrap(), 10);
1046 assert_eq!(mqtt.value(&data).unwrap(), b"data");
1047 }
1048
1049 #[test]
1052 fn test_parse_connect() {
1053 let data: Vec<u8> = vec![
1055 0x10, 0x1f, 0x00, 0x06, b'M', b'Q', b'I', b's', b'd', b'p', 0x03, 0x02, 0x00, 0x3c, 0x00, 0x11, b'm', b'o', b's', b'q', b'p', b'u', b'b', b'/', b'1', b'4', b'4', b'0', b'-', b'k',
1063 b'a', b'l', b'i',
1064 ];
1065 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1066 let mqtt = MqttLayer::new(idx);
1067
1068 assert_eq!(mqtt.msg_type(&data).unwrap(), CONNECT);
1069 assert_eq!(mqtt.remaining_length(&data).unwrap(), 31);
1070 assert_eq!(mqtt.proto_name(&data).unwrap(), "MQIsdp");
1071 assert_eq!(mqtt.proto_level(&data).unwrap(), 3);
1072 assert_eq!(mqtt.connect_flags(&data).unwrap(), 0x02);
1073 assert!(mqtt.cleansess(&data).unwrap());
1074 assert!(!mqtt.usernameflag(&data).unwrap());
1075 assert!(!mqtt.passwordflag(&data).unwrap());
1076 assert!(!mqtt.willflag(&data).unwrap());
1077 assert_eq!(mqtt.klive(&data).unwrap(), 60);
1078 assert_eq!(mqtt.client_id(&data).unwrap(), "mosqpub/1440-kali");
1079 }
1080
1081 #[test]
1084 fn test_parse_connack() {
1085 let data: Vec<u8> = vec![0x20, 0x02, 0x00, 0x00];
1087 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1088 let mqtt = MqttLayer::new(idx);
1089
1090 assert_eq!(mqtt.msg_type(&data).unwrap(), CONNACK);
1091 assert_eq!(mqtt.remaining_length(&data).unwrap(), 2);
1092 assert_eq!(mqtt.sess_present_flag(&data).unwrap(), 0);
1093 assert_eq!(mqtt.retcode(&data).unwrap(), 0);
1094 }
1095
1096 #[test]
1099 fn test_parse_subscribe() {
1100 let data: Vec<u8> = vec![
1104 0x82, 0x09, 0x00, 0x01, 0x00, 0x04, b't', b'e', b's', b't', 0x01,
1105 ];
1106 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1107 let mqtt = MqttLayer::new(idx);
1108
1109 assert_eq!(mqtt.msg_type(&data).unwrap(), SUBSCRIBE);
1110 assert_eq!(mqtt.remaining_length(&data).unwrap(), 9);
1111 assert_eq!(mqtt.msgid(&data).unwrap(), 1);
1112 }
1113
1114 #[test]
1117 fn test_parse_suback() {
1118 let data: Vec<u8> = vec![0x90, 0x03, 0x00, 0x01, 0x00];
1121 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1122 let mqtt = MqttLayer::new(idx);
1123
1124 assert_eq!(mqtt.msg_type(&data).unwrap(), SUBACK);
1125 assert_eq!(mqtt.remaining_length(&data).unwrap(), 3);
1126 assert_eq!(mqtt.msgid(&data).unwrap(), 1);
1127 assert_eq!(mqtt.retcodes(&data).unwrap(), vec![0x00]);
1128 }
1129
1130 #[test]
1133 fn test_parse_pingreq() {
1134 let data: Vec<u8> = vec![0xC0, 0x00];
1135 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1136 let mqtt = MqttLayer::new(idx);
1137
1138 assert_eq!(mqtt.msg_type(&data).unwrap(), PINGREQ);
1139 assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1140 }
1141
1142 #[test]
1143 fn test_parse_pingresp() {
1144 let data: Vec<u8> = vec![0xD0, 0x00];
1145 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1146 let mqtt = MqttLayer::new(idx);
1147
1148 assert_eq!(mqtt.msg_type(&data).unwrap(), PINGRESP);
1149 assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1150 }
1151
1152 #[test]
1155 fn test_parse_disconnect() {
1156 let data: Vec<u8> = vec![0xE0, 0x00];
1157 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1158 let mqtt = MqttLayer::new(idx);
1159
1160 assert_eq!(mqtt.msg_type(&data).unwrap(), DISCONNECT);
1161 assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1162 }
1163
1164 #[test]
1167 fn test_message_type_names() {
1168 assert_eq!(message_type_name(CONNECT), "CONNECT");
1169 assert_eq!(message_type_name(CONNACK), "CONNACK");
1170 assert_eq!(message_type_name(PUBLISH), "PUBLISH");
1171 assert_eq!(message_type_name(PUBACK), "PUBACK");
1172 assert_eq!(message_type_name(PUBREC), "PUBREC");
1173 assert_eq!(message_type_name(PUBREL), "PUBREL");
1174 assert_eq!(message_type_name(PUBCOMP), "PUBCOMP");
1175 assert_eq!(message_type_name(SUBSCRIBE), "SUBSCRIBE");
1176 assert_eq!(message_type_name(SUBACK), "SUBACK");
1177 assert_eq!(message_type_name(UNSUBSCRIBE), "UNSUBSCRIBE");
1178 assert_eq!(message_type_name(UNSUBACK), "UNSUBACK");
1179 assert_eq!(message_type_name(PINGREQ), "PINGREQ");
1180 assert_eq!(message_type_name(PINGRESP), "PINGRESP");
1181 assert_eq!(message_type_name(DISCONNECT), "DISCONNECT");
1182 assert_eq!(message_type_name(AUTH), "AUTH");
1183 assert_eq!(message_type_name(0), "UNKNOWN");
1184 }
1185
1186 #[test]
1189 fn test_summary_publish() {
1190 let data: Vec<u8> = vec![
1191 0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1192 ];
1193 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1194 let mqtt = MqttLayer::new(idx);
1195 let s = mqtt.summary(&data);
1196 assert!(s.contains("PUBLISH"));
1197 assert!(s.contains("topic=test"));
1198 assert!(s.contains("QOS=0"));
1199 }
1200
1201 #[test]
1202 fn test_summary_connect() {
1203 let data: Vec<u8> = vec![
1204 0x10, 0x1f, 0x00, 0x06, b'M', b'Q', b'I', b's', b'd', b'p', 0x03, 0x02, 0x00, 0x3c,
1205 0x00, 0x11, b'm', b'o', b's', b'q', b'p', b'u', b'b', b'/', b'1', b'4', b'4', b'0',
1206 b'-', b'k', b'a', b'l', b'i',
1207 ];
1208 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1209 let mqtt = MqttLayer::new(idx);
1210 let s = mqtt.summary(&data);
1211 assert!(s.contains("CONNECT"));
1212 assert!(s.contains("clientId=mosqpub/1440-kali"));
1213 }
1214
1215 #[test]
1218 fn test_get_field_msg_type() {
1219 let data: Vec<u8> = vec![0xC0, 0x00];
1220 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1221 let mqtt = MqttLayer::new(idx);
1222 let val = mqtt.get_field(&data, "msg_type").unwrap().unwrap();
1223 assert_eq!(val, FieldValue::U8(PINGREQ));
1224 }
1225
1226 #[test]
1227 fn test_get_field_unknown() {
1228 let data: Vec<u8> = vec![0xC0, 0x00];
1229 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1230 let mqtt = MqttLayer::new(idx);
1231 assert!(mqtt.get_field(&data, "nonexistent").is_none());
1232 }
1233
1234 #[test]
1237 fn test_layer_trait_kind() {
1238 let idx = LayerIndex::new(LayerKind::Mqtt, 0, 2);
1239 let mqtt = MqttLayer::new(idx);
1240 assert_eq!(mqtt.kind(), LayerKind::Mqtt);
1241 }
1242
1243 #[test]
1244 fn test_layer_trait_header_len() {
1245 let data: Vec<u8> = vec![
1247 0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1248 ];
1249 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1250 let mqtt = MqttLayer::new(idx);
1251 assert_eq!(Layer::header_len(&mqtt, &data), 12);
1252 }
1253
1254 #[test]
1257 fn test_parse_puback() {
1258 let data: Vec<u8> = vec![0x40, 0x02, 0x00, 0x0a];
1260 let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1261 let mqtt = MqttLayer::new(idx);
1262
1263 assert_eq!(mqtt.msg_type(&data).unwrap(), PUBACK);
1264 assert_eq!(mqtt.msgid(&data).unwrap(), 10);
1265 }
1266
1267 #[test]
1270 fn test_builder_roundtrip_publish() {
1271 let built = MqttBuilder::new()
1272 .publish()
1273 .topic(b"test")
1274 .payload(b"hello")
1275 .build();
1276
1277 let idx = LayerIndex::new(LayerKind::Mqtt, 0, built.len());
1278 let mqtt = MqttLayer::new(idx);
1279
1280 assert_eq!(mqtt.msg_type(&built).unwrap(), PUBLISH);
1281 assert_eq!(mqtt.qos(&built).unwrap(), 0);
1282 assert_eq!(mqtt.topic(&built).unwrap(), "test");
1283 assert_eq!(mqtt.value(&built).unwrap(), b"hello");
1284 }
1285
1286 #[test]
1287 fn test_builder_roundtrip_connect() {
1288 let built = MqttBuilder::new()
1289 .connect()
1290 .client_id(b"myclient")
1291 .keep_alive(120)
1292 .clean_session(true)
1293 .build();
1294
1295 let idx = LayerIndex::new(LayerKind::Mqtt, 0, built.len());
1296 let mqtt = MqttLayer::new(idx);
1297
1298 assert_eq!(mqtt.msg_type(&built).unwrap(), CONNECT);
1299 assert_eq!(mqtt.proto_name(&built).unwrap(), "MQTT");
1300 assert_eq!(mqtt.proto_level(&built).unwrap(), 4);
1301 assert!(mqtt.cleansess(&built).unwrap());
1302 assert_eq!(mqtt.klive(&built).unwrap(), 120);
1303 assert_eq!(mqtt.client_id(&built).unwrap(), "myclient");
1304 }
1305}