Skip to main content

stackforge_core/layer/mqtt/
mod.rs

1//! MQTT (Message Queuing Telemetry Transport) layer implementation.
2//!
3//! Implements MQTT v3.1, v3.1.1, and v5.0 packet parsing as a zero-copy view
4//! into a packet buffer.
5//!
6//! ## Fixed Header Format
7//!
8//! ```text
9//! Byte 0:  [msg_type(4 bits)] [dup(1)] [qos(2)] [retain(1)]
10//! Byte 1+: Remaining Length (variable-length encoded, 1-4 bytes)
11//! ```
12//!
13//! ## Message Types
14//!
15//! | Value | Name        | Direction       |
16//! |-------|-------------|-----------------|
17//! | 1     | CONNECT     | Client -> Server|
18//! | 2     | CONNACK     | Server -> Client|
19//! | 3     | PUBLISH     | Both            |
20//! | 4     | PUBACK      | Both            |
21//! | 5     | PUBREC      | Both            |
22//! | 6     | PUBREL      | Both            |
23//! | 7     | PUBCOMP     | Both            |
24//! | 8     | SUBSCRIBE   | Client -> Server|
25//! | 9     | SUBACK      | Server -> Client|
26//! | 10    | UNSUBSCRIBE | Client -> Server|
27//! | 11    | UNSUBACK    | Server -> Client|
28//! | 12    | PINGREQ     | Client -> Server|
29//! | 13    | PINGRESP    | Server -> Client|
30//! | 14    | DISCONNECT  | Both (v5.0)     |
31//! | 15    | AUTH        | Both (v5.0)     |
32
33pub mod builder;
34
35pub use builder::MqttBuilder;
36
37use crate::layer::field::{FieldError, FieldValue};
38use crate::layer::{Layer, LayerIndex, LayerKind};
39
40/// Minimum MQTT header: 1 byte fixed header + at least 1 byte remaining length.
41pub const MQTT_MIN_HEADER_LEN: usize = 2;
42
43/// Default MQTT TCP port.
44pub const MQTT_PORT: u16 = 1883;
45
46// ============================================================================
47// Message type constants
48// ============================================================================
49
50pub const CONNECT: u8 = 1;
51pub const CONNACK: u8 = 2;
52pub const PUBLISH: u8 = 3;
53pub const PUBACK: u8 = 4;
54pub const PUBREC: u8 = 5;
55pub const PUBREL: u8 = 6;
56pub const PUBCOMP: u8 = 7;
57pub const SUBSCRIBE: u8 = 8;
58pub const SUBACK: u8 = 9;
59pub const UNSUBSCRIBE: u8 = 10;
60pub const UNSUBACK: u8 = 11;
61pub const PINGREQ: u8 = 12;
62pub const PINGRESP: u8 = 13;
63pub const DISCONNECT: u8 = 14;
64pub const AUTH: u8 = 15;
65
66/// Field names exported for Python/generic access.
67pub static MQTT_FIELD_NAMES: &[&str] = &[
68    "msg_type",
69    "dup",
70    "qos",
71    "retain",
72    "remaining_length",
73    "topic",
74    "topic_len",
75    "msgid",
76    "value",
77    "proto_name",
78    "proto_level",
79    "connect_flags",
80    "klive",
81    "client_id",
82    "usernameflag",
83    "passwordflag",
84    "willretainflag",
85    "willQOSflag",
86    "willflag",
87    "cleansess",
88    "sess_present_flag",
89    "retcode",
90    "retcodes",
91];
92
93// ============================================================================
94// Variable-length integer encoding/decoding (MQTT spec section 1.5.5)
95// ============================================================================
96
97/// Decode a variable-length integer from `buf` starting at `offset`.
98///
99/// Returns `(value, bytes_consumed)` on success.
100///
101/// Each byte encodes 7 bits of data in bits 6-0, with bit 7 as a continuation
102/// flag. At most 4 bytes are used, encoding values up to 268,435,455.
103pub fn decode_variable_length(buf: &[u8], offset: usize) -> Result<(u32, usize), FieldError> {
104    let mut value: u32 = 0;
105    let mut multiplier: u32 = 1;
106    let mut idx = offset;
107
108    loop {
109        if idx >= buf.len() {
110            return Err(FieldError::BufferTooShort {
111                offset: idx,
112                need: 1,
113                have: 0,
114            });
115        }
116        let encoded_byte = buf[idx];
117        value = match u32::from(encoded_byte & 0x7F)
118            .checked_mul(multiplier)
119            .and_then(|v| value.checked_add(v))
120        {
121            Some(v) => v,
122            None => {
123                return Err(FieldError::InvalidValue(
124                    "variable-length integer overflow".into(),
125                ));
126            },
127        };
128
129        if multiplier > 128 * 128 * 128 {
130            return Err(FieldError::InvalidValue(
131                "variable-length integer exceeds 4 bytes".into(),
132            ));
133        }
134
135        idx += 1;
136        if encoded_byte & 0x80 == 0 {
137            break;
138        }
139        multiplier = multiplier.saturating_mul(128);
140    }
141
142    Ok((value, idx - offset))
143}
144
145/// Encode a value as an MQTT variable-length integer.
146///
147/// Max encodable value is 268,435,455 (0x0FFFFFFF).
148#[must_use]
149pub fn encode_variable_length(value: u32) -> Vec<u8> {
150    if value == 0 {
151        return vec![0x00];
152    }
153    let mut result = Vec::with_capacity(4);
154    let mut x = value;
155    while x > 0 {
156        let mut encoded_byte = (x % 128) as u8;
157        x /= 128;
158        if x > 0 {
159            encoded_byte |= 0x80;
160        }
161        result.push(encoded_byte);
162    }
163    result
164}
165
166/// Return the string name for an MQTT message type value.
167#[must_use]
168pub fn message_type_name(msg_type: u8) -> &'static str {
169    match msg_type {
170        CONNECT => "CONNECT",
171        CONNACK => "CONNACK",
172        PUBLISH => "PUBLISH",
173        PUBACK => "PUBACK",
174        PUBREC => "PUBREC",
175        PUBREL => "PUBREL",
176        PUBCOMP => "PUBCOMP",
177        SUBSCRIBE => "SUBSCRIBE",
178        SUBACK => "SUBACK",
179        UNSUBSCRIBE => "UNSUBSCRIBE",
180        UNSUBACK => "UNSUBACK",
181        PINGREQ => "PINGREQ",
182        PINGRESP => "PINGRESP",
183        DISCONNECT => "DISCONNECT",
184        AUTH => "AUTH",
185        _ => "UNKNOWN",
186    }
187}
188
189/// Check whether a TCP payload looks like an MQTT packet.
190///
191/// Validates that the first byte contains a valid message type (1-15) in bits
192/// 7-4 and that the remaining length can be decoded.
193#[must_use]
194pub fn is_mqtt_payload(buf: &[u8]) -> bool {
195    if buf.len() < 2 {
196        return false;
197    }
198    let msg_type = (buf[0] >> 4) & 0x0F;
199    if !(1..=15).contains(&msg_type) {
200        return false;
201    }
202    // Verify we can decode the remaining length
203    decode_variable_length(buf, 1).is_ok()
204}
205
206// ============================================================================
207// MqttLayer — zero-copy view into a packet buffer
208// ============================================================================
209
210/// MQTT layer -- a zero-copy view into a packet buffer.
211#[derive(Debug, Clone)]
212pub struct MqttLayer {
213    pub index: LayerIndex,
214}
215
216impl MqttLayer {
217    /// Create a new MQTT layer from a layer index.
218    #[must_use]
219    pub fn new(index: LayerIndex) -> Self {
220        Self { index }
221    }
222
223    /// Create an MQTT layer starting at offset 0 (for standalone parsing).
224    #[must_use]
225    pub fn at_start(len: usize) -> Self {
226        Self {
227            index: LayerIndex::new(LayerKind::Mqtt, 0, len),
228        }
229    }
230
231    /// Return a reference to the slice of the buffer corresponding to this layer.
232    fn slice<'a>(&self, buf: &'a [u8]) -> &'a [u8] {
233        self.index.slice(buf)
234    }
235
236    // ========================================================================
237    // Fixed header field accessors
238    // ========================================================================
239
240    /// Get the message type (bits 7-4 of byte 0).
241    pub fn msg_type(&self, buf: &[u8]) -> Result<u8, FieldError> {
242        let s = self.slice(buf);
243        if s.is_empty() {
244            return Err(FieldError::BufferTooShort {
245                offset: self.index.start,
246                need: 1,
247                have: 0,
248            });
249        }
250        Ok((s[0] >> 4) & 0x0F)
251    }
252
253    /// Get the DUP flag (bit 3 of byte 0).
254    pub fn dup(&self, buf: &[u8]) -> Result<bool, FieldError> {
255        let s = self.slice(buf);
256        if s.is_empty() {
257            return Err(FieldError::BufferTooShort {
258                offset: self.index.start,
259                need: 1,
260                have: 0,
261            });
262        }
263        Ok((s[0] >> 3) & 0x01 == 1)
264    }
265
266    /// Get the `QoS` level (bits 2-1 of byte 0).
267    pub fn qos(&self, buf: &[u8]) -> Result<u8, FieldError> {
268        let s = self.slice(buf);
269        if s.is_empty() {
270            return Err(FieldError::BufferTooShort {
271                offset: self.index.start,
272                need: 1,
273                have: 0,
274            });
275        }
276        Ok((s[0] >> 1) & 0x03)
277    }
278
279    /// Get the RETAIN flag (bit 0 of byte 0).
280    pub fn retain(&self, buf: &[u8]) -> Result<bool, FieldError> {
281        let s = self.slice(buf);
282        if s.is_empty() {
283            return Err(FieldError::BufferTooShort {
284                offset: self.index.start,
285                need: 1,
286                have: 0,
287            });
288        }
289        Ok(s[0] & 0x01 == 1)
290    }
291
292    /// Get the remaining length (variable-length integer starting at byte 1).
293    pub fn remaining_length(&self, buf: &[u8]) -> Result<u32, FieldError> {
294        let s = self.slice(buf);
295        if s.len() < 2 {
296            return Err(FieldError::BufferTooShort {
297                offset: self.index.start + 1,
298                need: 1,
299                have: s.len().saturating_sub(1),
300            });
301        }
302        let (val, _consumed) = decode_variable_length(s, 1)?;
303        Ok(val)
304    }
305
306    /// Compute the fixed header length (1 byte type/flags + N bytes remaining length).
307    #[must_use]
308    pub fn fixed_header_len(&self, buf: &[u8]) -> usize {
309        let s = self.slice(buf);
310        if s.len() < 2 {
311            return MQTT_MIN_HEADER_LEN;
312        }
313        match decode_variable_length(s, 1) {
314            Ok((_val, consumed)) => 1 + consumed,
315            Err(_) => MQTT_MIN_HEADER_LEN,
316        }
317    }
318
319    /// Compute the variable header start offset within the full buffer.
320    fn var_header_offset(&self, buf: &[u8]) -> usize {
321        self.index.start + self.fixed_header_len(buf)
322    }
323
324    // ========================================================================
325    // PUBLISH field accessors
326    // ========================================================================
327
328    /// Get the topic length for a PUBLISH message (2-byte big-endian at variable header start).
329    pub fn topic_len(&self, buf: &[u8]) -> Result<u16, FieldError> {
330        let off = self.var_header_offset(buf);
331        if off + 2 > buf.len() {
332            return Err(FieldError::BufferTooShort {
333                offset: off,
334                need: 2,
335                have: buf.len().saturating_sub(off),
336            });
337        }
338        Ok(u16::from_be_bytes([buf[off], buf[off + 1]]))
339    }
340
341    /// Get the topic string for a PUBLISH message.
342    pub fn topic(&self, buf: &[u8]) -> Result<String, FieldError> {
343        let off = self.var_header_offset(buf);
344        if off + 2 > buf.len() {
345            return Err(FieldError::BufferTooShort {
346                offset: off,
347                need: 2,
348                have: buf.len().saturating_sub(off),
349            });
350        }
351        let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
352        let topic_start = off + 2;
353        if topic_start + tlen > buf.len() {
354            return Err(FieldError::BufferTooShort {
355                offset: topic_start,
356                need: tlen,
357                have: buf.len().saturating_sub(topic_start),
358            });
359        }
360        String::from_utf8(buf[topic_start..topic_start + tlen].to_vec())
361            .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 topic: {e}")))
362    }
363
364    /// Get the message ID for PUBLISH (`QoS` > 0), PUBACK, PUBREC, PUBREL, PUBCOMP,
365    /// SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK messages.
366    ///
367    /// For PUBLISH: message ID follows the topic (2 bytes `topic_len` + topic bytes).
368    /// For others: message ID is at the start of the variable header.
369    pub fn msgid(&self, buf: &[u8]) -> Result<u16, FieldError> {
370        let mt = self.msg_type(buf)?;
371        let off = self.var_header_offset(buf);
372
373        match mt {
374            PUBLISH => {
375                // msg_id is after topic_len(2) + topic(N)
376                if off + 2 > buf.len() {
377                    return Err(FieldError::BufferTooShort {
378                        offset: off,
379                        need: 2,
380                        have: buf.len().saturating_sub(off),
381                    });
382                }
383                let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
384                let msgid_off = off + 2 + tlen;
385                if msgid_off + 2 > buf.len() {
386                    return Err(FieldError::BufferTooShort {
387                        offset: msgid_off,
388                        need: 2,
389                        have: buf.len().saturating_sub(msgid_off),
390                    });
391                }
392                Ok(u16::from_be_bytes([buf[msgid_off], buf[msgid_off + 1]]))
393            },
394            PUBACK | PUBREC | PUBREL | PUBCOMP | SUBSCRIBE | SUBACK | UNSUBSCRIBE | UNSUBACK => {
395                if off + 2 > buf.len() {
396                    return Err(FieldError::BufferTooShort {
397                        offset: off,
398                        need: 2,
399                        have: buf.len().saturating_sub(off),
400                    });
401                }
402                Ok(u16::from_be_bytes([buf[off], buf[off + 1]]))
403            },
404            _ => Err(FieldError::InvalidValue(format!(
405                "message type {mt} does not have a msgid field"
406            ))),
407        }
408    }
409
410    /// Get the payload value for a PUBLISH message (bytes after topic + optional msgid).
411    pub fn value(&self, buf: &[u8]) -> Result<Vec<u8>, FieldError> {
412        let off = self.var_header_offset(buf);
413        let rem_len = self.remaining_length(buf)? as usize;
414        let fixed_hdr = self.fixed_header_len(buf);
415        let payload_end = self.index.start + fixed_hdr + rem_len;
416        let payload_end = payload_end.min(buf.len());
417
418        if off + 2 > buf.len() {
419            return Err(FieldError::BufferTooShort {
420                offset: off,
421                need: 2,
422                have: buf.len().saturating_sub(off),
423            });
424        }
425        let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
426        let mut value_start = off + 2 + tlen;
427
428        // If QoS > 0, skip the 2-byte message ID
429        let qos = self.qos(buf)?;
430        if qos > 0 {
431            value_start += 2;
432        }
433
434        if value_start > payload_end {
435            return Ok(Vec::new());
436        }
437        Ok(buf[value_start..payload_end].to_vec())
438    }
439
440    // ========================================================================
441    // CONNECT field accessors
442    // ========================================================================
443
444    /// Get the protocol name from a CONNECT message (e.g., "MQTT" or "`MQIsdp`").
445    pub fn proto_name(&self, buf: &[u8]) -> Result<String, FieldError> {
446        let off = self.var_header_offset(buf);
447        if off + 2 > buf.len() {
448            return Err(FieldError::BufferTooShort {
449                offset: off,
450                need: 2,
451                have: buf.len().saturating_sub(off),
452            });
453        }
454        let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
455        let name_start = off + 2;
456        if name_start + name_len > buf.len() {
457            return Err(FieldError::BufferTooShort {
458                offset: name_start,
459                need: name_len,
460                have: buf.len().saturating_sub(name_start),
461            });
462        }
463        String::from_utf8(buf[name_start..name_start + name_len].to_vec())
464            .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 proto_name: {e}")))
465    }
466
467    /// Get the protocol level/version byte from a CONNECT message.
468    pub fn proto_level(&self, buf: &[u8]) -> Result<u8, FieldError> {
469        let off = self.var_header_offset(buf);
470        if off + 2 > buf.len() {
471            return Err(FieldError::BufferTooShort {
472                offset: off,
473                need: 2,
474                have: buf.len().saturating_sub(off),
475            });
476        }
477        let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
478        let level_off = off + 2 + name_len;
479        if level_off >= buf.len() {
480            return Err(FieldError::BufferTooShort {
481                offset: level_off,
482                need: 1,
483                have: 0,
484            });
485        }
486        Ok(buf[level_off])
487    }
488
489    /// Get the connect flags byte from a CONNECT message.
490    pub fn connect_flags(&self, buf: &[u8]) -> Result<u8, FieldError> {
491        let off = self.var_header_offset(buf);
492        if off + 2 > buf.len() {
493            return Err(FieldError::BufferTooShort {
494                offset: off,
495                need: 2,
496                have: buf.len().saturating_sub(off),
497            });
498        }
499        let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
500        let flags_off = off + 2 + name_len + 1;
501        if flags_off >= buf.len() {
502            return Err(FieldError::BufferTooShort {
503                offset: flags_off,
504                need: 1,
505                have: 0,
506            });
507        }
508        Ok(buf[flags_off])
509    }
510
511    /// Get the username flag from CONNECT flags (bit 7).
512    pub fn usernameflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
513        let flags = self.connect_flags(buf)?;
514        Ok((flags >> 7) & 0x01 == 1)
515    }
516
517    /// Get the password flag from CONNECT flags (bit 6).
518    pub fn passwordflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
519        let flags = self.connect_flags(buf)?;
520        Ok((flags >> 6) & 0x01 == 1)
521    }
522
523    /// Get the will retain flag from CONNECT flags (bit 5).
524    pub fn willretainflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
525        let flags = self.connect_flags(buf)?;
526        Ok((flags >> 5) & 0x01 == 1)
527    }
528
529    /// Get the will `QoS` from CONNECT flags (bits 4-3).
530    pub fn will_qosflag(&self, buf: &[u8]) -> Result<u8, FieldError> {
531        let flags = self.connect_flags(buf)?;
532        Ok((flags >> 3) & 0x03)
533    }
534
535    /// Get the will flag from CONNECT flags (bit 2).
536    pub fn willflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
537        let flags = self.connect_flags(buf)?;
538        Ok((flags >> 2) & 0x01 == 1)
539    }
540
541    /// Get the clean session flag from CONNECT flags (bit 1).
542    pub fn cleansess(&self, buf: &[u8]) -> Result<bool, FieldError> {
543        let flags = self.connect_flags(buf)?;
544        Ok((flags >> 1) & 0x01 == 1)
545    }
546
547    /// Get the keep alive value from a CONNECT message (2-byte big-endian).
548    pub fn klive(&self, buf: &[u8]) -> Result<u16, FieldError> {
549        let off = self.var_header_offset(buf);
550        if off + 2 > buf.len() {
551            return Err(FieldError::BufferTooShort {
552                offset: off,
553                need: 2,
554                have: buf.len().saturating_sub(off),
555            });
556        }
557        let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
558        let klive_off = off + 2 + name_len + 2; // after proto_name + proto_level + connect_flags
559        if klive_off + 2 > buf.len() {
560            return Err(FieldError::BufferTooShort {
561                offset: klive_off,
562                need: 2,
563                have: buf.len().saturating_sub(klive_off),
564            });
565        }
566        Ok(u16::from_be_bytes([buf[klive_off], buf[klive_off + 1]]))
567    }
568
569    /// Compute the offset to the start of the CONNECT payload (after variable header).
570    fn connect_payload_offset(&self, buf: &[u8]) -> Result<usize, FieldError> {
571        let off = self.var_header_offset(buf);
572        if off + 2 > buf.len() {
573            return Err(FieldError::BufferTooShort {
574                offset: off,
575                need: 2,
576                have: buf.len().saturating_sub(off),
577            });
578        }
579        let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
580        // variable header = proto_name_len(2) + proto_name(N) + proto_level(1) + connect_flags(1) + keep_alive(2)
581        Ok(off + 2 + name_len + 1 + 1 + 2)
582    }
583
584    /// Get the client ID from a CONNECT message.
585    pub fn client_id(&self, buf: &[u8]) -> Result<String, FieldError> {
586        let off = self.connect_payload_offset(buf)?;
587        if off + 2 > buf.len() {
588            return Err(FieldError::BufferTooShort {
589                offset: off,
590                need: 2,
591                have: buf.len().saturating_sub(off),
592            });
593        }
594        let cid_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
595        let cid_start = off + 2;
596        if cid_start + cid_len > buf.len() {
597            return Err(FieldError::BufferTooShort {
598                offset: cid_start,
599                need: cid_len,
600                have: buf.len().saturating_sub(cid_start),
601            });
602        }
603        String::from_utf8(buf[cid_start..cid_start + cid_len].to_vec())
604            .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 client_id: {e}")))
605    }
606
607    // ========================================================================
608    // CONNACK field accessors
609    // ========================================================================
610
611    /// Get the session present flag from a CONNACK message (byte 0 of variable header).
612    pub fn sess_present_flag(&self, buf: &[u8]) -> Result<u8, FieldError> {
613        let off = self.var_header_offset(buf);
614        if off >= buf.len() {
615            return Err(FieldError::BufferTooShort {
616                offset: off,
617                need: 1,
618                have: 0,
619            });
620        }
621        Ok(buf[off] & 0x01)
622    }
623
624    /// Get the return code from a CONNACK message (byte 1 of variable header).
625    pub fn retcode(&self, buf: &[u8]) -> Result<u8, FieldError> {
626        let off = self.var_header_offset(buf);
627        if off + 2 > buf.len() {
628            return Err(FieldError::BufferTooShort {
629                offset: off + 1,
630                need: 1,
631                have: buf.len().saturating_sub(off + 1),
632            });
633        }
634        Ok(buf[off + 1])
635    }
636
637    // ========================================================================
638    // SUBACK field accessors
639    // ========================================================================
640
641    /// Get the return codes from a SUBACK message (bytes after the 2-byte msgid).
642    pub fn retcodes(&self, buf: &[u8]) -> Result<Vec<u8>, FieldError> {
643        let off = self.var_header_offset(buf);
644        let rem_len = self.remaining_length(buf)? as usize;
645        let fixed_hdr = self.fixed_header_len(buf);
646        let payload_end = self.index.start + fixed_hdr + rem_len;
647        let payload_end = payload_end.min(buf.len());
648
649        let retcodes_start = off + 2; // after msgid
650        if retcodes_start > payload_end {
651            return Ok(Vec::new());
652        }
653        Ok(buf[retcodes_start..payload_end].to_vec())
654    }
655
656    // ========================================================================
657    // Summary / display
658    // ========================================================================
659
660    /// Generate a one-line summary of this MQTT layer.
661    #[must_use]
662    pub fn summary(&self, buf: &[u8]) -> String {
663        let mt = match self.msg_type(buf) {
664            Ok(t) => t,
665            Err(_) => return "MQTT".to_string(),
666        };
667        let type_name = message_type_name(mt);
668
669        match mt {
670            PUBLISH => {
671                let topic = self.topic(buf).unwrap_or_else(|_| "?".to_string());
672                let qos = self.qos(buf).unwrap_or(0);
673                format!("MQTT {type_name} topic={topic} QOS={qos}")
674            },
675            CONNECT => {
676                let cid = self.client_id(buf).unwrap_or_else(|_| "?".to_string());
677                format!("MQTT {type_name} clientId={cid}")
678            },
679            CONNACK => {
680                let rc = self.retcode(buf).unwrap_or(0);
681                format!("MQTT {type_name} retcode={rc}")
682            },
683            SUBSCRIBE | UNSUBSCRIBE => {
684                let mid = self.msgid(buf).unwrap_or(0);
685                format!("MQTT {type_name} msgid={mid}")
686            },
687            SUBACK => {
688                let mid = self.msgid(buf).unwrap_or(0);
689                format!("MQTT {type_name} msgid={mid}")
690            },
691            PUBACK | PUBREC | PUBREL | PUBCOMP | UNSUBACK => {
692                let mid = self.msgid(buf).unwrap_or(0);
693                format!("MQTT {type_name} msgid={mid}")
694            },
695            _ => format!("MQTT {type_name}"),
696        }
697    }
698
699    /// Compute the total MQTT message length (fixed header + remaining length).
700    fn compute_header_len(&self, buf: &[u8]) -> usize {
701        let fixed_hdr = self.fixed_header_len(buf);
702        let rem_len = self.remaining_length(buf).unwrap_or(0) as usize;
703        fixed_hdr + rem_len
704    }
705
706    // ========================================================================
707    // Field access API
708    // ========================================================================
709
710    /// Get the field names for this layer.
711    #[must_use]
712    pub fn field_names() -> &'static [&'static str] {
713        MQTT_FIELD_NAMES
714    }
715
716    /// Get a field value by name.
717    pub fn get_field(&self, buf: &[u8], name: &str) -> Option<Result<FieldValue, FieldError>> {
718        match name {
719            "msg_type" => Some(self.msg_type(buf).map(FieldValue::U8)),
720            "dup" => Some(self.dup(buf).map(FieldValue::Bool)),
721            "qos" => Some(self.qos(buf).map(FieldValue::U8)),
722            "retain" => Some(self.retain(buf).map(FieldValue::Bool)),
723            "remaining_length" => Some(self.remaining_length(buf).map(FieldValue::U32)),
724            "topic_len" => {
725                let mt = self.msg_type(buf).ok()?;
726                if mt == PUBLISH {
727                    Some(self.topic_len(buf).map(FieldValue::U16))
728                } else {
729                    None
730                }
731            },
732            "topic" => {
733                let mt = self.msg_type(buf).ok()?;
734                if mt == PUBLISH {
735                    Some(self.topic(buf).map(FieldValue::Str))
736                } else {
737                    None
738                }
739            },
740            "msgid" => {
741                let mt = self.msg_type(buf).ok()?;
742                match mt {
743                    PUBLISH => {
744                        let qos = self.qos(buf).ok()?;
745                        if qos > 0 {
746                            Some(self.msgid(buf).map(FieldValue::U16))
747                        } else {
748                            None
749                        }
750                    },
751                    PUBACK | PUBREC | PUBREL | PUBCOMP | SUBSCRIBE | SUBACK | UNSUBSCRIBE
752                    | UNSUBACK => Some(self.msgid(buf).map(FieldValue::U16)),
753                    _ => None,
754                }
755            },
756            "value" => {
757                let mt = self.msg_type(buf).ok()?;
758                if mt == PUBLISH {
759                    Some(self.value(buf).map(FieldValue::Bytes))
760                } else {
761                    None
762                }
763            },
764            "proto_name" => {
765                let mt = self.msg_type(buf).ok()?;
766                if mt == CONNECT {
767                    Some(self.proto_name(buf).map(FieldValue::Str))
768                } else {
769                    None
770                }
771            },
772            "proto_level" => {
773                let mt = self.msg_type(buf).ok()?;
774                if mt == CONNECT {
775                    Some(self.proto_level(buf).map(FieldValue::U8))
776                } else {
777                    None
778                }
779            },
780            "connect_flags" => {
781                let mt = self.msg_type(buf).ok()?;
782                if mt == CONNECT {
783                    Some(self.connect_flags(buf).map(FieldValue::U8))
784                } else {
785                    None
786                }
787            },
788            "klive" => {
789                let mt = self.msg_type(buf).ok()?;
790                if mt == CONNECT {
791                    Some(self.klive(buf).map(FieldValue::U16))
792                } else {
793                    None
794                }
795            },
796            "client_id" => {
797                let mt = self.msg_type(buf).ok()?;
798                if mt == CONNECT {
799                    Some(self.client_id(buf).map(FieldValue::Str))
800                } else {
801                    None
802                }
803            },
804            "usernameflag" => {
805                let mt = self.msg_type(buf).ok()?;
806                if mt == CONNECT {
807                    Some(self.usernameflag(buf).map(FieldValue::Bool))
808                } else {
809                    None
810                }
811            },
812            "passwordflag" => {
813                let mt = self.msg_type(buf).ok()?;
814                if mt == CONNECT {
815                    Some(self.passwordflag(buf).map(FieldValue::Bool))
816                } else {
817                    None
818                }
819            },
820            "willretainflag" => {
821                let mt = self.msg_type(buf).ok()?;
822                if mt == CONNECT {
823                    Some(self.willretainflag(buf).map(FieldValue::Bool))
824                } else {
825                    None
826                }
827            },
828            "willQOSflag" => {
829                let mt = self.msg_type(buf).ok()?;
830                if mt == CONNECT {
831                    Some(self.will_qosflag(buf).map(FieldValue::U8))
832                } else {
833                    None
834                }
835            },
836            "willflag" => {
837                let mt = self.msg_type(buf).ok()?;
838                if mt == CONNECT {
839                    Some(self.willflag(buf).map(FieldValue::Bool))
840                } else {
841                    None
842                }
843            },
844            "cleansess" => {
845                let mt = self.msg_type(buf).ok()?;
846                if mt == CONNECT {
847                    Some(self.cleansess(buf).map(FieldValue::Bool))
848                } else {
849                    None
850                }
851            },
852            "sess_present_flag" => {
853                let mt = self.msg_type(buf).ok()?;
854                if mt == CONNACK {
855                    Some(self.sess_present_flag(buf).map(FieldValue::U8))
856                } else {
857                    None
858                }
859            },
860            "retcode" => {
861                let mt = self.msg_type(buf).ok()?;
862                if mt == CONNACK {
863                    Some(self.retcode(buf).map(FieldValue::U8))
864                } else {
865                    None
866                }
867            },
868            "retcodes" => {
869                let mt = self.msg_type(buf).ok()?;
870                if mt == SUBACK {
871                    Some(self.retcodes(buf).map(FieldValue::Bytes))
872                } else {
873                    None
874                }
875            },
876            _ => None,
877        }
878    }
879
880    /// Set a field value by name (limited support for MQTT).
881    pub fn set_field(
882        &self,
883        _buf: &mut [u8],
884        _name: &str,
885        _value: FieldValue,
886    ) -> Option<Result<(), FieldError>> {
887        // MQTT fields are variable-length and setting them in-place is not
888        // straightforward. Use the builder for constructing new packets.
889        None
890    }
891}
892
893impl Layer for MqttLayer {
894    fn kind(&self) -> LayerKind {
895        LayerKind::Mqtt
896    }
897
898    fn summary(&self, data: &[u8]) -> String {
899        self.summary(data)
900    }
901
902    fn header_len(&self, data: &[u8]) -> usize {
903        self.compute_header_len(data)
904    }
905
906    fn field_names(&self) -> &'static [&'static str] {
907        MQTT_FIELD_NAMES
908    }
909}
910
911// ============================================================================
912// Unit tests
913// ============================================================================
914
915#[cfg(test)]
916mod tests {
917    use super::*;
918
919    // ---- Variable-length encoding/decoding tests ----
920
921    #[test]
922    fn test_decode_variable_length_single_byte() {
923        // Value 0
924        let buf = [0x00];
925        let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
926        assert_eq!(val, 0);
927        assert_eq!(consumed, 1);
928
929        // Value 127
930        let buf = [0x7F];
931        let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
932        assert_eq!(val, 127);
933        assert_eq!(consumed, 1);
934    }
935
936    #[test]
937    fn test_decode_variable_length_two_bytes() {
938        // Value 128 = 0x00 + 0x80 first byte, 0x01 second byte
939        let buf = [0x80, 0x01];
940        let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
941        assert_eq!(val, 128);
942        assert_eq!(consumed, 2);
943
944        // Value 16383
945        let buf = [0xFF, 0x7F];
946        let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
947        assert_eq!(val, 16383);
948        assert_eq!(consumed, 2);
949    }
950
951    #[test]
952    fn test_decode_variable_length_four_bytes() {
953        // Value 268,435,455 (max)
954        let buf = [0xFF, 0xFF, 0xFF, 0x7F];
955        let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
956        assert_eq!(val, 268_435_455);
957        assert_eq!(consumed, 4);
958    }
959
960    #[test]
961    fn test_encode_variable_length_roundtrip() {
962        for &val in &[0u32, 1, 127, 128, 16383, 16384, 2_097_151, 268_435_455] {
963            let encoded = encode_variable_length(val);
964            let (decoded, consumed) = decode_variable_length(&encoded, 0).unwrap();
965            assert_eq!(decoded, val, "roundtrip failed for {}", val);
966            assert_eq!(consumed, encoded.len());
967        }
968    }
969
970    #[test]
971    fn test_encode_variable_length_zero() {
972        let encoded = encode_variable_length(0);
973        assert_eq!(encoded, vec![0x00]);
974    }
975
976    #[test]
977    fn test_encode_variable_length_single_byte() {
978        let encoded = encode_variable_length(10);
979        assert_eq!(encoded, vec![0x0a]);
980    }
981
982    // ---- Detection ----
983
984    #[test]
985    fn test_is_mqtt_payload_valid() {
986        // PINGREQ: 0xC0 0x00
987        assert!(is_mqtt_payload(&[0xC0, 0x00]));
988        // CONNECT
989        assert!(is_mqtt_payload(&[0x10, 0x1f]));
990        // PUBLISH QoS 0
991        assert!(is_mqtt_payload(&[0x30, 0x0a, 0x00, 0x04]));
992    }
993
994    #[test]
995    fn test_is_mqtt_payload_invalid() {
996        // Too short
997        assert!(!is_mqtt_payload(&[0x30]));
998        // Type 0 is reserved/invalid
999        assert!(!is_mqtt_payload(&[0x00, 0x00]));
1000    }
1001
1002    // ---- PUBLISH parsing ----
1003
1004    #[test]
1005    fn test_parse_publish_qos0() {
1006        // PUBLISH QoS0: \x30\x0a\x00\x04test\x00test (but wait... let me
1007        // recalculate the value: fixed header = 0x30 (type=3, dup=0, qos=0, retain=0)
1008        // remaining length = 0x0a = 10
1009        // variable header: topic_len=0x0004, topic="test" (4 bytes)
1010        // payload: remaining = 10 - (2+4) = 4 bytes => but we need 6 bytes for "test"?
1011        // No: rem_len=10 = 2(topic_len) + 4(topic) + 4(value) = 10 correct.
1012        // So value = last 4 bytes.
1013        // Full packet: 0x30 0x0a 0x00 0x04 't' 'e' 's' 't' 't' 'e' 's' 't'
1014        let data: Vec<u8> = vec![
1015            0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1016        ];
1017        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1018        let mqtt = MqttLayer::new(idx);
1019
1020        assert_eq!(mqtt.msg_type(&data).unwrap(), PUBLISH);
1021        assert!(!mqtt.dup(&data).unwrap());
1022        assert_eq!(mqtt.qos(&data).unwrap(), 0);
1023        assert!(!mqtt.retain(&data).unwrap());
1024        assert_eq!(mqtt.remaining_length(&data).unwrap(), 10);
1025        assert_eq!(mqtt.fixed_header_len(&data), 2);
1026        assert_eq!(mqtt.topic_len(&data).unwrap(), 4);
1027        assert_eq!(mqtt.topic(&data).unwrap(), "test");
1028        assert_eq!(mqtt.value(&data).unwrap(), b"test");
1029    }
1030
1031    #[test]
1032    fn test_parse_publish_qos1() {
1033        // PUBLISH QoS1 with msg_id:
1034        // type=3, dup=0, qos=1, retain=0 => 0x32
1035        // remaining_length=12 => topic_len(2) + topic(4) + msgid(2) + value(4)
1036        let data: Vec<u8> = vec![
1037            0x32, 0x0c, 0x00, 0x04, b't', b'e', b's', b't', 0x00, 0x0a, b'd', b'a', b't', b'a',
1038        ];
1039        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1040        let mqtt = MqttLayer::new(idx);
1041
1042        assert_eq!(mqtt.msg_type(&data).unwrap(), PUBLISH);
1043        assert_eq!(mqtt.qos(&data).unwrap(), 1);
1044        assert_eq!(mqtt.topic(&data).unwrap(), "test");
1045        assert_eq!(mqtt.msgid(&data).unwrap(), 10);
1046        assert_eq!(mqtt.value(&data).unwrap(), b"data");
1047    }
1048
1049    // ---- CONNECT parsing ----
1050
1051    #[test]
1052    fn test_parse_connect() {
1053        // CONNECT: \x10\x1f\x00\x06MQIsdp\x03\x02\x00\x3c\x00\x11mosqpub/1440-kali
1054        let data: Vec<u8> = vec![
1055            0x10, 0x1f, // fixed header: CONNECT, remaining_length=31
1056            0x00, 0x06, // proto_name length = 6
1057            b'M', b'Q', b'I', b's', b'd', b'p', // proto_name = "MQIsdp"
1058            0x03, // proto_level = 3
1059            0x02, // connect_flags = 0x02 (clean session)
1060            0x00, 0x3c, // keep_alive = 60
1061            0x00, 0x11, // client_id length = 17
1062            b'm', b'o', b's', b'q', b'p', b'u', b'b', b'/', b'1', b'4', b'4', b'0', b'-', b'k',
1063            b'a', b'l', b'i',
1064        ];
1065        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1066        let mqtt = MqttLayer::new(idx);
1067
1068        assert_eq!(mqtt.msg_type(&data).unwrap(), CONNECT);
1069        assert_eq!(mqtt.remaining_length(&data).unwrap(), 31);
1070        assert_eq!(mqtt.proto_name(&data).unwrap(), "MQIsdp");
1071        assert_eq!(mqtt.proto_level(&data).unwrap(), 3);
1072        assert_eq!(mqtt.connect_flags(&data).unwrap(), 0x02);
1073        assert!(mqtt.cleansess(&data).unwrap());
1074        assert!(!mqtt.usernameflag(&data).unwrap());
1075        assert!(!mqtt.passwordflag(&data).unwrap());
1076        assert!(!mqtt.willflag(&data).unwrap());
1077        assert_eq!(mqtt.klive(&data).unwrap(), 60);
1078        assert_eq!(mqtt.client_id(&data).unwrap(), "mosqpub/1440-kali");
1079    }
1080
1081    // ---- CONNACK parsing ----
1082
1083    #[test]
1084    fn test_parse_connack() {
1085        // CONNACK: \x20\x02\x00\x00
1086        let data: Vec<u8> = vec![0x20, 0x02, 0x00, 0x00];
1087        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1088        let mqtt = MqttLayer::new(idx);
1089
1090        assert_eq!(mqtt.msg_type(&data).unwrap(), CONNACK);
1091        assert_eq!(mqtt.remaining_length(&data).unwrap(), 2);
1092        assert_eq!(mqtt.sess_present_flag(&data).unwrap(), 0);
1093        assert_eq!(mqtt.retcode(&data).unwrap(), 0);
1094    }
1095
1096    // ---- SUBSCRIBE parsing ----
1097
1098    #[test]
1099    fn test_parse_subscribe() {
1100        // SUBSCRIBE: \x82\x09\x00\x01\x00\x04test\x01
1101        // type=8, flags=0x02 (reserved), rem_len=9
1102        // msgid=0x0001, topic_filter_len=4, topic="test", qos=1
1103        let data: Vec<u8> = vec![
1104            0x82, 0x09, 0x00, 0x01, 0x00, 0x04, b't', b'e', b's', b't', 0x01,
1105        ];
1106        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1107        let mqtt = MqttLayer::new(idx);
1108
1109        assert_eq!(mqtt.msg_type(&data).unwrap(), SUBSCRIBE);
1110        assert_eq!(mqtt.remaining_length(&data).unwrap(), 9);
1111        assert_eq!(mqtt.msgid(&data).unwrap(), 1);
1112    }
1113
1114    // ---- SUBACK parsing ----
1115
1116    #[test]
1117    fn test_parse_suback() {
1118        // SUBACK: \x90\x03\x00\x01\x00
1119        // type=9, rem_len=3, msgid=1, retcodes=[0x00]
1120        let data: Vec<u8> = vec![0x90, 0x03, 0x00, 0x01, 0x00];
1121        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1122        let mqtt = MqttLayer::new(idx);
1123
1124        assert_eq!(mqtt.msg_type(&data).unwrap(), SUBACK);
1125        assert_eq!(mqtt.remaining_length(&data).unwrap(), 3);
1126        assert_eq!(mqtt.msgid(&data).unwrap(), 1);
1127        assert_eq!(mqtt.retcodes(&data).unwrap(), vec![0x00]);
1128    }
1129
1130    // ---- PINGREQ / PINGRESP ----
1131
1132    #[test]
1133    fn test_parse_pingreq() {
1134        let data: Vec<u8> = vec![0xC0, 0x00];
1135        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1136        let mqtt = MqttLayer::new(idx);
1137
1138        assert_eq!(mqtt.msg_type(&data).unwrap(), PINGREQ);
1139        assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1140    }
1141
1142    #[test]
1143    fn test_parse_pingresp() {
1144        let data: Vec<u8> = vec![0xD0, 0x00];
1145        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1146        let mqtt = MqttLayer::new(idx);
1147
1148        assert_eq!(mqtt.msg_type(&data).unwrap(), PINGRESP);
1149        assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1150    }
1151
1152    // ---- DISCONNECT ----
1153
1154    #[test]
1155    fn test_parse_disconnect() {
1156        let data: Vec<u8> = vec![0xE0, 0x00];
1157        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1158        let mqtt = MqttLayer::new(idx);
1159
1160        assert_eq!(mqtt.msg_type(&data).unwrap(), DISCONNECT);
1161        assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1162    }
1163
1164    // ---- Message type names ----
1165
1166    #[test]
1167    fn test_message_type_names() {
1168        assert_eq!(message_type_name(CONNECT), "CONNECT");
1169        assert_eq!(message_type_name(CONNACK), "CONNACK");
1170        assert_eq!(message_type_name(PUBLISH), "PUBLISH");
1171        assert_eq!(message_type_name(PUBACK), "PUBACK");
1172        assert_eq!(message_type_name(PUBREC), "PUBREC");
1173        assert_eq!(message_type_name(PUBREL), "PUBREL");
1174        assert_eq!(message_type_name(PUBCOMP), "PUBCOMP");
1175        assert_eq!(message_type_name(SUBSCRIBE), "SUBSCRIBE");
1176        assert_eq!(message_type_name(SUBACK), "SUBACK");
1177        assert_eq!(message_type_name(UNSUBSCRIBE), "UNSUBSCRIBE");
1178        assert_eq!(message_type_name(UNSUBACK), "UNSUBACK");
1179        assert_eq!(message_type_name(PINGREQ), "PINGREQ");
1180        assert_eq!(message_type_name(PINGRESP), "PINGRESP");
1181        assert_eq!(message_type_name(DISCONNECT), "DISCONNECT");
1182        assert_eq!(message_type_name(AUTH), "AUTH");
1183        assert_eq!(message_type_name(0), "UNKNOWN");
1184    }
1185
1186    // ---- Summary ----
1187
1188    #[test]
1189    fn test_summary_publish() {
1190        let data: Vec<u8> = vec![
1191            0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1192        ];
1193        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1194        let mqtt = MqttLayer::new(idx);
1195        let s = mqtt.summary(&data);
1196        assert!(s.contains("PUBLISH"));
1197        assert!(s.contains("topic=test"));
1198        assert!(s.contains("QOS=0"));
1199    }
1200
1201    #[test]
1202    fn test_summary_connect() {
1203        let data: Vec<u8> = vec![
1204            0x10, 0x1f, 0x00, 0x06, b'M', b'Q', b'I', b's', b'd', b'p', 0x03, 0x02, 0x00, 0x3c,
1205            0x00, 0x11, b'm', b'o', b's', b'q', b'p', b'u', b'b', b'/', b'1', b'4', b'4', b'0',
1206            b'-', b'k', b'a', b'l', b'i',
1207        ];
1208        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1209        let mqtt = MqttLayer::new(idx);
1210        let s = mqtt.summary(&data);
1211        assert!(s.contains("CONNECT"));
1212        assert!(s.contains("clientId=mosqpub/1440-kali"));
1213    }
1214
1215    // ---- get_field ----
1216
1217    #[test]
1218    fn test_get_field_msg_type() {
1219        let data: Vec<u8> = vec![0xC0, 0x00];
1220        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1221        let mqtt = MqttLayer::new(idx);
1222        let val = mqtt.get_field(&data, "msg_type").unwrap().unwrap();
1223        assert_eq!(val, FieldValue::U8(PINGREQ));
1224    }
1225
1226    #[test]
1227    fn test_get_field_unknown() {
1228        let data: Vec<u8> = vec![0xC0, 0x00];
1229        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1230        let mqtt = MqttLayer::new(idx);
1231        assert!(mqtt.get_field(&data, "nonexistent").is_none());
1232    }
1233
1234    // ---- Layer trait ----
1235
1236    #[test]
1237    fn test_layer_trait_kind() {
1238        let idx = LayerIndex::new(LayerKind::Mqtt, 0, 2);
1239        let mqtt = MqttLayer::new(idx);
1240        assert_eq!(mqtt.kind(), LayerKind::Mqtt);
1241    }
1242
1243    #[test]
1244    fn test_layer_trait_header_len() {
1245        // PUBLISH with rem_len=10 => total = 2 (fixed header) + 10 = 12
1246        let data: Vec<u8> = vec![
1247            0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1248        ];
1249        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1250        let mqtt = MqttLayer::new(idx);
1251        assert_eq!(Layer::header_len(&mqtt, &data), 12);
1252    }
1253
1254    // ---- PUBACK ----
1255
1256    #[test]
1257    fn test_parse_puback() {
1258        // PUBACK: type=4, rem_len=2, msgid=0x000a
1259        let data: Vec<u8> = vec![0x40, 0x02, 0x00, 0x0a];
1260        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1261        let mqtt = MqttLayer::new(idx);
1262
1263        assert_eq!(mqtt.msg_type(&data).unwrap(), PUBACK);
1264        assert_eq!(mqtt.msgid(&data).unwrap(), 10);
1265    }
1266
1267    // ---- Builder round-trip ----
1268
1269    #[test]
1270    fn test_builder_roundtrip_publish() {
1271        let built = MqttBuilder::new()
1272            .publish()
1273            .topic(b"test")
1274            .payload(b"hello")
1275            .build();
1276
1277        let idx = LayerIndex::new(LayerKind::Mqtt, 0, built.len());
1278        let mqtt = MqttLayer::new(idx);
1279
1280        assert_eq!(mqtt.msg_type(&built).unwrap(), PUBLISH);
1281        assert_eq!(mqtt.qos(&built).unwrap(), 0);
1282        assert_eq!(mqtt.topic(&built).unwrap(), "test");
1283        assert_eq!(mqtt.value(&built).unwrap(), b"hello");
1284    }
1285
1286    #[test]
1287    fn test_builder_roundtrip_connect() {
1288        let built = MqttBuilder::new()
1289            .connect()
1290            .client_id(b"myclient")
1291            .keep_alive(120)
1292            .clean_session(true)
1293            .build();
1294
1295        let idx = LayerIndex::new(LayerKind::Mqtt, 0, built.len());
1296        let mqtt = MqttLayer::new(idx);
1297
1298        assert_eq!(mqtt.msg_type(&built).unwrap(), CONNECT);
1299        assert_eq!(mqtt.proto_name(&built).unwrap(), "MQTT");
1300        assert_eq!(mqtt.proto_level(&built).unwrap(), 4);
1301        assert!(mqtt.cleansess(&built).unwrap());
1302        assert_eq!(mqtt.klive(&built).unwrap(), 120);
1303        assert_eq!(mqtt.client_id(&built).unwrap(), "myclient");
1304    }
1305}