Skip to main content

stackforge_core/layer/mqtt/
mod.rs

1//! MQTT (Message Queuing Telemetry Transport) layer implementation.
2//!
3//! Implements MQTT v3.1, v3.1.1, and v5.0 packet parsing as a zero-copy view
4//! into a packet buffer.
5//!
6//! ## Fixed Header Format
7//!
8//! ```text
9//! Byte 0:  [msg_type(4 bits)] [dup(1)] [qos(2)] [retain(1)]
10//! Byte 1+: Remaining Length (variable-length encoded, 1-4 bytes)
11//! ```
12//!
13//! ## Message Types
14//!
15//! | Value | Name        | Direction       |
16//! |-------|-------------|-----------------|
17//! | 1     | CONNECT     | Client -> Server|
18//! | 2     | CONNACK     | Server -> Client|
19//! | 3     | PUBLISH     | Both            |
20//! | 4     | PUBACK      | Both            |
21//! | 5     | PUBREC      | Both            |
22//! | 6     | PUBREL      | Both            |
23//! | 7     | PUBCOMP     | Both            |
24//! | 8     | SUBSCRIBE   | Client -> Server|
25//! | 9     | SUBACK      | Server -> Client|
26//! | 10    | UNSUBSCRIBE | Client -> Server|
27//! | 11    | UNSUBACK    | Server -> Client|
28//! | 12    | PINGREQ     | Client -> Server|
29//! | 13    | PINGRESP    | Server -> Client|
30//! | 14    | DISCONNECT  | Both (v5.0)     |
31//! | 15    | AUTH        | Both (v5.0)     |
32
33pub mod builder;
34
35pub use builder::MqttBuilder;
36
37use crate::layer::field::{FieldError, FieldValue};
38use crate::layer::{Layer, LayerIndex, LayerKind};
39
40/// Minimum MQTT header: 1 byte fixed header + at least 1 byte remaining length.
41pub const MQTT_MIN_HEADER_LEN: usize = 2;
42
43/// Default MQTT TCP port.
44pub const MQTT_PORT: u16 = 1883;
45
46// ============================================================================
47// Message type constants
48// ============================================================================
49
50pub const CONNECT: u8 = 1;
51pub const CONNACK: u8 = 2;
52pub const PUBLISH: u8 = 3;
53pub const PUBACK: u8 = 4;
54pub const PUBREC: u8 = 5;
55pub const PUBREL: u8 = 6;
56pub const PUBCOMP: u8 = 7;
57pub const SUBSCRIBE: u8 = 8;
58pub const SUBACK: u8 = 9;
59pub const UNSUBSCRIBE: u8 = 10;
60pub const UNSUBACK: u8 = 11;
61pub const PINGREQ: u8 = 12;
62pub const PINGRESP: u8 = 13;
63pub const DISCONNECT: u8 = 14;
64pub const AUTH: u8 = 15;
65
66/// Field names exported for Python/generic access.
67pub static MQTT_FIELD_NAMES: &[&str] = &[
68    "msg_type",
69    "dup",
70    "qos",
71    "retain",
72    "remaining_length",
73    "topic",
74    "topic_len",
75    "msgid",
76    "value",
77    "proto_name",
78    "proto_level",
79    "connect_flags",
80    "klive",
81    "client_id",
82    "usernameflag",
83    "passwordflag",
84    "willretainflag",
85    "willQOSflag",
86    "willflag",
87    "cleansess",
88    "sess_present_flag",
89    "retcode",
90    "retcodes",
91];
92
93// ============================================================================
94// Variable-length integer encoding/decoding (MQTT spec section 1.5.5)
95// ============================================================================
96
97/// Decode a variable-length integer from `buf` starting at `offset`.
98///
99/// Returns `(value, bytes_consumed)` on success.
100///
101/// Each byte encodes 7 bits of data in bits 6-0, with bit 7 as a continuation
102/// flag. At most 4 bytes are used, encoding values up to 268,435,455.
103pub fn decode_variable_length(buf: &[u8], offset: usize) -> Result<(u32, usize), FieldError> {
104    let mut value: u32 = 0;
105    let mut multiplier: u32 = 1;
106    let mut idx = offset;
107
108    loop {
109        if idx >= buf.len() {
110            return Err(FieldError::BufferTooShort {
111                offset: idx,
112                need: 1,
113                have: 0,
114            });
115        }
116        let encoded_byte = buf[idx];
117        value += u32::from(encoded_byte & 0x7F) * multiplier;
118
119        if multiplier > 128 * 128 * 128 {
120            return Err(FieldError::InvalidValue(
121                "variable-length integer exceeds 4 bytes".into(),
122            ));
123        }
124
125        idx += 1;
126        if encoded_byte & 0x80 == 0 {
127            break;
128        }
129        multiplier *= 128;
130    }
131
132    Ok((value, idx - offset))
133}
134
135/// Encode a value as an MQTT variable-length integer.
136///
137/// Max encodable value is 268,435,455 (0x0FFFFFFF).
138#[must_use]
139pub fn encode_variable_length(value: u32) -> Vec<u8> {
140    if value == 0 {
141        return vec![0x00];
142    }
143    let mut result = Vec::with_capacity(4);
144    let mut x = value;
145    while x > 0 {
146        let mut encoded_byte = (x % 128) as u8;
147        x /= 128;
148        if x > 0 {
149            encoded_byte |= 0x80;
150        }
151        result.push(encoded_byte);
152    }
153    result
154}
155
156/// Return the string name for an MQTT message type value.
157#[must_use]
158pub fn message_type_name(msg_type: u8) -> &'static str {
159    match msg_type {
160        CONNECT => "CONNECT",
161        CONNACK => "CONNACK",
162        PUBLISH => "PUBLISH",
163        PUBACK => "PUBACK",
164        PUBREC => "PUBREC",
165        PUBREL => "PUBREL",
166        PUBCOMP => "PUBCOMP",
167        SUBSCRIBE => "SUBSCRIBE",
168        SUBACK => "SUBACK",
169        UNSUBSCRIBE => "UNSUBSCRIBE",
170        UNSUBACK => "UNSUBACK",
171        PINGREQ => "PINGREQ",
172        PINGRESP => "PINGRESP",
173        DISCONNECT => "DISCONNECT",
174        AUTH => "AUTH",
175        _ => "UNKNOWN",
176    }
177}
178
179/// Check whether a TCP payload looks like an MQTT packet.
180///
181/// Validates that the first byte contains a valid message type (1-15) in bits
182/// 7-4 and that the remaining length can be decoded.
183#[must_use]
184pub fn is_mqtt_payload(buf: &[u8]) -> bool {
185    if buf.len() < 2 {
186        return false;
187    }
188    let msg_type = (buf[0] >> 4) & 0x0F;
189    if !(1..=15).contains(&msg_type) {
190        return false;
191    }
192    // Verify we can decode the remaining length
193    decode_variable_length(buf, 1).is_ok()
194}
195
196// ============================================================================
197// MqttLayer — zero-copy view into a packet buffer
198// ============================================================================
199
200/// MQTT layer -- a zero-copy view into a packet buffer.
201#[derive(Debug, Clone)]
202pub struct MqttLayer {
203    pub index: LayerIndex,
204}
205
206impl MqttLayer {
207    /// Create a new MQTT layer from a layer index.
208    #[must_use]
209    pub fn new(index: LayerIndex) -> Self {
210        Self { index }
211    }
212
213    /// Create an MQTT layer starting at offset 0 (for standalone parsing).
214    #[must_use]
215    pub fn at_start(len: usize) -> Self {
216        Self {
217            index: LayerIndex::new(LayerKind::Mqtt, 0, len),
218        }
219    }
220
221    /// Return a reference to the slice of the buffer corresponding to this layer.
222    fn slice<'a>(&self, buf: &'a [u8]) -> &'a [u8] {
223        self.index.slice(buf)
224    }
225
226    // ========================================================================
227    // Fixed header field accessors
228    // ========================================================================
229
230    /// Get the message type (bits 7-4 of byte 0).
231    pub fn msg_type(&self, buf: &[u8]) -> Result<u8, FieldError> {
232        let s = self.slice(buf);
233        if s.is_empty() {
234            return Err(FieldError::BufferTooShort {
235                offset: self.index.start,
236                need: 1,
237                have: 0,
238            });
239        }
240        Ok((s[0] >> 4) & 0x0F)
241    }
242
243    /// Get the DUP flag (bit 3 of byte 0).
244    pub fn dup(&self, buf: &[u8]) -> Result<bool, FieldError> {
245        let s = self.slice(buf);
246        if s.is_empty() {
247            return Err(FieldError::BufferTooShort {
248                offset: self.index.start,
249                need: 1,
250                have: 0,
251            });
252        }
253        Ok((s[0] >> 3) & 0x01 == 1)
254    }
255
256    /// Get the `QoS` level (bits 2-1 of byte 0).
257    pub fn qos(&self, buf: &[u8]) -> Result<u8, FieldError> {
258        let s = self.slice(buf);
259        if s.is_empty() {
260            return Err(FieldError::BufferTooShort {
261                offset: self.index.start,
262                need: 1,
263                have: 0,
264            });
265        }
266        Ok((s[0] >> 1) & 0x03)
267    }
268
269    /// Get the RETAIN flag (bit 0 of byte 0).
270    pub fn retain(&self, buf: &[u8]) -> Result<bool, FieldError> {
271        let s = self.slice(buf);
272        if s.is_empty() {
273            return Err(FieldError::BufferTooShort {
274                offset: self.index.start,
275                need: 1,
276                have: 0,
277            });
278        }
279        Ok(s[0] & 0x01 == 1)
280    }
281
282    /// Get the remaining length (variable-length integer starting at byte 1).
283    pub fn remaining_length(&self, buf: &[u8]) -> Result<u32, FieldError> {
284        let s = self.slice(buf);
285        if s.len() < 2 {
286            return Err(FieldError::BufferTooShort {
287                offset: self.index.start + 1,
288                need: 1,
289                have: s.len().saturating_sub(1),
290            });
291        }
292        let (val, _consumed) = decode_variable_length(s, 1)?;
293        Ok(val)
294    }
295
296    /// Compute the fixed header length (1 byte type/flags + N bytes remaining length).
297    #[must_use]
298    pub fn fixed_header_len(&self, buf: &[u8]) -> usize {
299        let s = self.slice(buf);
300        if s.len() < 2 {
301            return MQTT_MIN_HEADER_LEN;
302        }
303        match decode_variable_length(s, 1) {
304            Ok((_val, consumed)) => 1 + consumed,
305            Err(_) => MQTT_MIN_HEADER_LEN,
306        }
307    }
308
309    /// Compute the variable header start offset within the full buffer.
310    fn var_header_offset(&self, buf: &[u8]) -> usize {
311        self.index.start + self.fixed_header_len(buf)
312    }
313
314    // ========================================================================
315    // PUBLISH field accessors
316    // ========================================================================
317
318    /// Get the topic length for a PUBLISH message (2-byte big-endian at variable header start).
319    pub fn topic_len(&self, buf: &[u8]) -> Result<u16, FieldError> {
320        let off = self.var_header_offset(buf);
321        if off + 2 > buf.len() {
322            return Err(FieldError::BufferTooShort {
323                offset: off,
324                need: 2,
325                have: buf.len().saturating_sub(off),
326            });
327        }
328        Ok(u16::from_be_bytes([buf[off], buf[off + 1]]))
329    }
330
331    /// Get the topic string for a PUBLISH message.
332    pub fn topic(&self, buf: &[u8]) -> Result<String, FieldError> {
333        let off = self.var_header_offset(buf);
334        if off + 2 > buf.len() {
335            return Err(FieldError::BufferTooShort {
336                offset: off,
337                need: 2,
338                have: buf.len().saturating_sub(off),
339            });
340        }
341        let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
342        let topic_start = off + 2;
343        if topic_start + tlen > buf.len() {
344            return Err(FieldError::BufferTooShort {
345                offset: topic_start,
346                need: tlen,
347                have: buf.len().saturating_sub(topic_start),
348            });
349        }
350        String::from_utf8(buf[topic_start..topic_start + tlen].to_vec())
351            .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 topic: {e}")))
352    }
353
354    /// Get the message ID for PUBLISH (`QoS` > 0), PUBACK, PUBREC, PUBREL, PUBCOMP,
355    /// SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK messages.
356    ///
357    /// For PUBLISH: message ID follows the topic (2 bytes `topic_len` + topic bytes).
358    /// For others: message ID is at the start of the variable header.
359    pub fn msgid(&self, buf: &[u8]) -> Result<u16, FieldError> {
360        let mt = self.msg_type(buf)?;
361        let off = self.var_header_offset(buf);
362
363        match mt {
364            PUBLISH => {
365                // msg_id is after topic_len(2) + topic(N)
366                if off + 2 > buf.len() {
367                    return Err(FieldError::BufferTooShort {
368                        offset: off,
369                        need: 2,
370                        have: buf.len().saturating_sub(off),
371                    });
372                }
373                let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
374                let msgid_off = off + 2 + tlen;
375                if msgid_off + 2 > buf.len() {
376                    return Err(FieldError::BufferTooShort {
377                        offset: msgid_off,
378                        need: 2,
379                        have: buf.len().saturating_sub(msgid_off),
380                    });
381                }
382                Ok(u16::from_be_bytes([buf[msgid_off], buf[msgid_off + 1]]))
383            },
384            PUBACK | PUBREC | PUBREL | PUBCOMP | SUBSCRIBE | SUBACK | UNSUBSCRIBE | UNSUBACK => {
385                if off + 2 > buf.len() {
386                    return Err(FieldError::BufferTooShort {
387                        offset: off,
388                        need: 2,
389                        have: buf.len().saturating_sub(off),
390                    });
391                }
392                Ok(u16::from_be_bytes([buf[off], buf[off + 1]]))
393            },
394            _ => Err(FieldError::InvalidValue(format!(
395                "message type {mt} does not have a msgid field"
396            ))),
397        }
398    }
399
400    /// Get the payload value for a PUBLISH message (bytes after topic + optional msgid).
401    pub fn value(&self, buf: &[u8]) -> Result<Vec<u8>, FieldError> {
402        let off = self.var_header_offset(buf);
403        let rem_len = self.remaining_length(buf)? as usize;
404        let fixed_hdr = self.fixed_header_len(buf);
405        let payload_end = self.index.start + fixed_hdr + rem_len;
406        let payload_end = payload_end.min(buf.len());
407
408        if off + 2 > buf.len() {
409            return Err(FieldError::BufferTooShort {
410                offset: off,
411                need: 2,
412                have: buf.len().saturating_sub(off),
413            });
414        }
415        let tlen = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
416        let mut value_start = off + 2 + tlen;
417
418        // If QoS > 0, skip the 2-byte message ID
419        let qos = self.qos(buf)?;
420        if qos > 0 {
421            value_start += 2;
422        }
423
424        if value_start > payload_end {
425            return Ok(Vec::new());
426        }
427        Ok(buf[value_start..payload_end].to_vec())
428    }
429
430    // ========================================================================
431    // CONNECT field accessors
432    // ========================================================================
433
434    /// Get the protocol name from a CONNECT message (e.g., "MQTT" or "`MQIsdp`").
435    pub fn proto_name(&self, buf: &[u8]) -> Result<String, FieldError> {
436        let off = self.var_header_offset(buf);
437        if off + 2 > buf.len() {
438            return Err(FieldError::BufferTooShort {
439                offset: off,
440                need: 2,
441                have: buf.len().saturating_sub(off),
442            });
443        }
444        let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
445        let name_start = off + 2;
446        if name_start + name_len > buf.len() {
447            return Err(FieldError::BufferTooShort {
448                offset: name_start,
449                need: name_len,
450                have: buf.len().saturating_sub(name_start),
451            });
452        }
453        String::from_utf8(buf[name_start..name_start + name_len].to_vec())
454            .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 proto_name: {e}")))
455    }
456
457    /// Get the protocol level/version byte from a CONNECT message.
458    pub fn proto_level(&self, buf: &[u8]) -> Result<u8, FieldError> {
459        let off = self.var_header_offset(buf);
460        if off + 2 > buf.len() {
461            return Err(FieldError::BufferTooShort {
462                offset: off,
463                need: 2,
464                have: buf.len().saturating_sub(off),
465            });
466        }
467        let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
468        let level_off = off + 2 + name_len;
469        if level_off >= buf.len() {
470            return Err(FieldError::BufferTooShort {
471                offset: level_off,
472                need: 1,
473                have: 0,
474            });
475        }
476        Ok(buf[level_off])
477    }
478
479    /// Get the connect flags byte from a CONNECT message.
480    pub fn connect_flags(&self, buf: &[u8]) -> Result<u8, FieldError> {
481        let off = self.var_header_offset(buf);
482        if off + 2 > buf.len() {
483            return Err(FieldError::BufferTooShort {
484                offset: off,
485                need: 2,
486                have: buf.len().saturating_sub(off),
487            });
488        }
489        let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
490        let flags_off = off + 2 + name_len + 1;
491        if flags_off >= buf.len() {
492            return Err(FieldError::BufferTooShort {
493                offset: flags_off,
494                need: 1,
495                have: 0,
496            });
497        }
498        Ok(buf[flags_off])
499    }
500
501    /// Get the username flag from CONNECT flags (bit 7).
502    pub fn usernameflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
503        let flags = self.connect_flags(buf)?;
504        Ok((flags >> 7) & 0x01 == 1)
505    }
506
507    /// Get the password flag from CONNECT flags (bit 6).
508    pub fn passwordflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
509        let flags = self.connect_flags(buf)?;
510        Ok((flags >> 6) & 0x01 == 1)
511    }
512
513    /// Get the will retain flag from CONNECT flags (bit 5).
514    pub fn willretainflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
515        let flags = self.connect_flags(buf)?;
516        Ok((flags >> 5) & 0x01 == 1)
517    }
518
519    /// Get the will `QoS` from CONNECT flags (bits 4-3).
520    pub fn will_qosflag(&self, buf: &[u8]) -> Result<u8, FieldError> {
521        let flags = self.connect_flags(buf)?;
522        Ok((flags >> 3) & 0x03)
523    }
524
525    /// Get the will flag from CONNECT flags (bit 2).
526    pub fn willflag(&self, buf: &[u8]) -> Result<bool, FieldError> {
527        let flags = self.connect_flags(buf)?;
528        Ok((flags >> 2) & 0x01 == 1)
529    }
530
531    /// Get the clean session flag from CONNECT flags (bit 1).
532    pub fn cleansess(&self, buf: &[u8]) -> Result<bool, FieldError> {
533        let flags = self.connect_flags(buf)?;
534        Ok((flags >> 1) & 0x01 == 1)
535    }
536
537    /// Get the keep alive value from a CONNECT message (2-byte big-endian).
538    pub fn klive(&self, buf: &[u8]) -> Result<u16, FieldError> {
539        let off = self.var_header_offset(buf);
540        if off + 2 > buf.len() {
541            return Err(FieldError::BufferTooShort {
542                offset: off,
543                need: 2,
544                have: buf.len().saturating_sub(off),
545            });
546        }
547        let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
548        let klive_off = off + 2 + name_len + 2; // after proto_name + proto_level + connect_flags
549        if klive_off + 2 > buf.len() {
550            return Err(FieldError::BufferTooShort {
551                offset: klive_off,
552                need: 2,
553                have: buf.len().saturating_sub(klive_off),
554            });
555        }
556        Ok(u16::from_be_bytes([buf[klive_off], buf[klive_off + 1]]))
557    }
558
559    /// Compute the offset to the start of the CONNECT payload (after variable header).
560    fn connect_payload_offset(&self, buf: &[u8]) -> Result<usize, FieldError> {
561        let off = self.var_header_offset(buf);
562        if off + 2 > buf.len() {
563            return Err(FieldError::BufferTooShort {
564                offset: off,
565                need: 2,
566                have: buf.len().saturating_sub(off),
567            });
568        }
569        let name_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
570        // variable header = proto_name_len(2) + proto_name(N) + proto_level(1) + connect_flags(1) + keep_alive(2)
571        Ok(off + 2 + name_len + 1 + 1 + 2)
572    }
573
574    /// Get the client ID from a CONNECT message.
575    pub fn client_id(&self, buf: &[u8]) -> Result<String, FieldError> {
576        let off = self.connect_payload_offset(buf)?;
577        if off + 2 > buf.len() {
578            return Err(FieldError::BufferTooShort {
579                offset: off,
580                need: 2,
581                have: buf.len().saturating_sub(off),
582            });
583        }
584        let cid_len = u16::from_be_bytes([buf[off], buf[off + 1]]) as usize;
585        let cid_start = off + 2;
586        if cid_start + cid_len > buf.len() {
587            return Err(FieldError::BufferTooShort {
588                offset: cid_start,
589                need: cid_len,
590                have: buf.len().saturating_sub(cid_start),
591            });
592        }
593        String::from_utf8(buf[cid_start..cid_start + cid_len].to_vec())
594            .map_err(|e| FieldError::InvalidValue(format!("invalid UTF-8 client_id: {e}")))
595    }
596
597    // ========================================================================
598    // CONNACK field accessors
599    // ========================================================================
600
601    /// Get the session present flag from a CONNACK message (byte 0 of variable header).
602    pub fn sess_present_flag(&self, buf: &[u8]) -> Result<u8, FieldError> {
603        let off = self.var_header_offset(buf);
604        if off >= buf.len() {
605            return Err(FieldError::BufferTooShort {
606                offset: off,
607                need: 1,
608                have: 0,
609            });
610        }
611        Ok(buf[off] & 0x01)
612    }
613
614    /// Get the return code from a CONNACK message (byte 1 of variable header).
615    pub fn retcode(&self, buf: &[u8]) -> Result<u8, FieldError> {
616        let off = self.var_header_offset(buf);
617        if off + 2 > buf.len() {
618            return Err(FieldError::BufferTooShort {
619                offset: off + 1,
620                need: 1,
621                have: buf.len().saturating_sub(off + 1),
622            });
623        }
624        Ok(buf[off + 1])
625    }
626
627    // ========================================================================
628    // SUBACK field accessors
629    // ========================================================================
630
631    /// Get the return codes from a SUBACK message (bytes after the 2-byte msgid).
632    pub fn retcodes(&self, buf: &[u8]) -> Result<Vec<u8>, FieldError> {
633        let off = self.var_header_offset(buf);
634        let rem_len = self.remaining_length(buf)? as usize;
635        let fixed_hdr = self.fixed_header_len(buf);
636        let payload_end = self.index.start + fixed_hdr + rem_len;
637        let payload_end = payload_end.min(buf.len());
638
639        let retcodes_start = off + 2; // after msgid
640        if retcodes_start > payload_end {
641            return Ok(Vec::new());
642        }
643        Ok(buf[retcodes_start..payload_end].to_vec())
644    }
645
646    // ========================================================================
647    // Summary / display
648    // ========================================================================
649
650    /// Generate a one-line summary of this MQTT layer.
651    #[must_use]
652    pub fn summary(&self, buf: &[u8]) -> String {
653        let mt = match self.msg_type(buf) {
654            Ok(t) => t,
655            Err(_) => return "MQTT".to_string(),
656        };
657        let type_name = message_type_name(mt);
658
659        match mt {
660            PUBLISH => {
661                let topic = self.topic(buf).unwrap_or_else(|_| "?".to_string());
662                let qos = self.qos(buf).unwrap_or(0);
663                format!("MQTT {type_name} topic={topic} QOS={qos}")
664            },
665            CONNECT => {
666                let cid = self.client_id(buf).unwrap_or_else(|_| "?".to_string());
667                format!("MQTT {type_name} clientId={cid}")
668            },
669            CONNACK => {
670                let rc = self.retcode(buf).unwrap_or(0);
671                format!("MQTT {type_name} retcode={rc}")
672            },
673            SUBSCRIBE | UNSUBSCRIBE => {
674                let mid = self.msgid(buf).unwrap_or(0);
675                format!("MQTT {type_name} msgid={mid}")
676            },
677            SUBACK => {
678                let mid = self.msgid(buf).unwrap_or(0);
679                format!("MQTT {type_name} msgid={mid}")
680            },
681            PUBACK | PUBREC | PUBREL | PUBCOMP | UNSUBACK => {
682                let mid = self.msgid(buf).unwrap_or(0);
683                format!("MQTT {type_name} msgid={mid}")
684            },
685            _ => format!("MQTT {type_name}"),
686        }
687    }
688
689    /// Compute the total MQTT message length (fixed header + remaining length).
690    fn compute_header_len(&self, buf: &[u8]) -> usize {
691        let fixed_hdr = self.fixed_header_len(buf);
692        let rem_len = self.remaining_length(buf).unwrap_or(0) as usize;
693        fixed_hdr + rem_len
694    }
695
696    // ========================================================================
697    // Field access API
698    // ========================================================================
699
700    /// Get the field names for this layer.
701    #[must_use]
702    pub fn field_names() -> &'static [&'static str] {
703        MQTT_FIELD_NAMES
704    }
705
706    /// Get a field value by name.
707    pub fn get_field(&self, buf: &[u8], name: &str) -> Option<Result<FieldValue, FieldError>> {
708        match name {
709            "msg_type" => Some(self.msg_type(buf).map(FieldValue::U8)),
710            "dup" => Some(self.dup(buf).map(FieldValue::Bool)),
711            "qos" => Some(self.qos(buf).map(FieldValue::U8)),
712            "retain" => Some(self.retain(buf).map(FieldValue::Bool)),
713            "remaining_length" => Some(self.remaining_length(buf).map(FieldValue::U32)),
714            "topic_len" => {
715                let mt = self.msg_type(buf).ok()?;
716                if mt == PUBLISH {
717                    Some(self.topic_len(buf).map(FieldValue::U16))
718                } else {
719                    None
720                }
721            },
722            "topic" => {
723                let mt = self.msg_type(buf).ok()?;
724                if mt == PUBLISH {
725                    Some(self.topic(buf).map(FieldValue::Str))
726                } else {
727                    None
728                }
729            },
730            "msgid" => {
731                let mt = self.msg_type(buf).ok()?;
732                match mt {
733                    PUBLISH => {
734                        let qos = self.qos(buf).ok()?;
735                        if qos > 0 {
736                            Some(self.msgid(buf).map(FieldValue::U16))
737                        } else {
738                            None
739                        }
740                    },
741                    PUBACK | PUBREC | PUBREL | PUBCOMP | SUBSCRIBE | SUBACK | UNSUBSCRIBE
742                    | UNSUBACK => Some(self.msgid(buf).map(FieldValue::U16)),
743                    _ => None,
744                }
745            },
746            "value" => {
747                let mt = self.msg_type(buf).ok()?;
748                if mt == PUBLISH {
749                    Some(self.value(buf).map(FieldValue::Bytes))
750                } else {
751                    None
752                }
753            },
754            "proto_name" => {
755                let mt = self.msg_type(buf).ok()?;
756                if mt == CONNECT {
757                    Some(self.proto_name(buf).map(FieldValue::Str))
758                } else {
759                    None
760                }
761            },
762            "proto_level" => {
763                let mt = self.msg_type(buf).ok()?;
764                if mt == CONNECT {
765                    Some(self.proto_level(buf).map(FieldValue::U8))
766                } else {
767                    None
768                }
769            },
770            "connect_flags" => {
771                let mt = self.msg_type(buf).ok()?;
772                if mt == CONNECT {
773                    Some(self.connect_flags(buf).map(FieldValue::U8))
774                } else {
775                    None
776                }
777            },
778            "klive" => {
779                let mt = self.msg_type(buf).ok()?;
780                if mt == CONNECT {
781                    Some(self.klive(buf).map(FieldValue::U16))
782                } else {
783                    None
784                }
785            },
786            "client_id" => {
787                let mt = self.msg_type(buf).ok()?;
788                if mt == CONNECT {
789                    Some(self.client_id(buf).map(FieldValue::Str))
790                } else {
791                    None
792                }
793            },
794            "usernameflag" => {
795                let mt = self.msg_type(buf).ok()?;
796                if mt == CONNECT {
797                    Some(self.usernameflag(buf).map(FieldValue::Bool))
798                } else {
799                    None
800                }
801            },
802            "passwordflag" => {
803                let mt = self.msg_type(buf).ok()?;
804                if mt == CONNECT {
805                    Some(self.passwordflag(buf).map(FieldValue::Bool))
806                } else {
807                    None
808                }
809            },
810            "willretainflag" => {
811                let mt = self.msg_type(buf).ok()?;
812                if mt == CONNECT {
813                    Some(self.willretainflag(buf).map(FieldValue::Bool))
814                } else {
815                    None
816                }
817            },
818            "willQOSflag" => {
819                let mt = self.msg_type(buf).ok()?;
820                if mt == CONNECT {
821                    Some(self.will_qosflag(buf).map(FieldValue::U8))
822                } else {
823                    None
824                }
825            },
826            "willflag" => {
827                let mt = self.msg_type(buf).ok()?;
828                if mt == CONNECT {
829                    Some(self.willflag(buf).map(FieldValue::Bool))
830                } else {
831                    None
832                }
833            },
834            "cleansess" => {
835                let mt = self.msg_type(buf).ok()?;
836                if mt == CONNECT {
837                    Some(self.cleansess(buf).map(FieldValue::Bool))
838                } else {
839                    None
840                }
841            },
842            "sess_present_flag" => {
843                let mt = self.msg_type(buf).ok()?;
844                if mt == CONNACK {
845                    Some(self.sess_present_flag(buf).map(FieldValue::U8))
846                } else {
847                    None
848                }
849            },
850            "retcode" => {
851                let mt = self.msg_type(buf).ok()?;
852                if mt == CONNACK {
853                    Some(self.retcode(buf).map(FieldValue::U8))
854                } else {
855                    None
856                }
857            },
858            "retcodes" => {
859                let mt = self.msg_type(buf).ok()?;
860                if mt == SUBACK {
861                    Some(self.retcodes(buf).map(FieldValue::Bytes))
862                } else {
863                    None
864                }
865            },
866            _ => None,
867        }
868    }
869
870    /// Set a field value by name (limited support for MQTT).
871    pub fn set_field(
872        &self,
873        _buf: &mut [u8],
874        _name: &str,
875        _value: FieldValue,
876    ) -> Option<Result<(), FieldError>> {
877        // MQTT fields are variable-length and setting them in-place is not
878        // straightforward. Use the builder for constructing new packets.
879        None
880    }
881}
882
883impl Layer for MqttLayer {
884    fn kind(&self) -> LayerKind {
885        LayerKind::Mqtt
886    }
887
888    fn summary(&self, data: &[u8]) -> String {
889        self.summary(data)
890    }
891
892    fn header_len(&self, data: &[u8]) -> usize {
893        self.compute_header_len(data)
894    }
895
896    fn field_names(&self) -> &'static [&'static str] {
897        MQTT_FIELD_NAMES
898    }
899}
900
901// ============================================================================
902// Unit tests
903// ============================================================================
904
905#[cfg(test)]
906mod tests {
907    use super::*;
908
909    // ---- Variable-length encoding/decoding tests ----
910
911    #[test]
912    fn test_decode_variable_length_single_byte() {
913        // Value 0
914        let buf = [0x00];
915        let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
916        assert_eq!(val, 0);
917        assert_eq!(consumed, 1);
918
919        // Value 127
920        let buf = [0x7F];
921        let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
922        assert_eq!(val, 127);
923        assert_eq!(consumed, 1);
924    }
925
926    #[test]
927    fn test_decode_variable_length_two_bytes() {
928        // Value 128 = 0x00 + 0x80 first byte, 0x01 second byte
929        let buf = [0x80, 0x01];
930        let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
931        assert_eq!(val, 128);
932        assert_eq!(consumed, 2);
933
934        // Value 16383
935        let buf = [0xFF, 0x7F];
936        let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
937        assert_eq!(val, 16383);
938        assert_eq!(consumed, 2);
939    }
940
941    #[test]
942    fn test_decode_variable_length_four_bytes() {
943        // Value 268,435,455 (max)
944        let buf = [0xFF, 0xFF, 0xFF, 0x7F];
945        let (val, consumed) = decode_variable_length(&buf, 0).unwrap();
946        assert_eq!(val, 268_435_455);
947        assert_eq!(consumed, 4);
948    }
949
950    #[test]
951    fn test_encode_variable_length_roundtrip() {
952        for &val in &[0u32, 1, 127, 128, 16383, 16384, 2_097_151, 268_435_455] {
953            let encoded = encode_variable_length(val);
954            let (decoded, consumed) = decode_variable_length(&encoded, 0).unwrap();
955            assert_eq!(decoded, val, "roundtrip failed for {}", val);
956            assert_eq!(consumed, encoded.len());
957        }
958    }
959
960    #[test]
961    fn test_encode_variable_length_zero() {
962        let encoded = encode_variable_length(0);
963        assert_eq!(encoded, vec![0x00]);
964    }
965
966    #[test]
967    fn test_encode_variable_length_single_byte() {
968        let encoded = encode_variable_length(10);
969        assert_eq!(encoded, vec![0x0a]);
970    }
971
972    // ---- Detection ----
973
974    #[test]
975    fn test_is_mqtt_payload_valid() {
976        // PINGREQ: 0xC0 0x00
977        assert!(is_mqtt_payload(&[0xC0, 0x00]));
978        // CONNECT
979        assert!(is_mqtt_payload(&[0x10, 0x1f]));
980        // PUBLISH QoS 0
981        assert!(is_mqtt_payload(&[0x30, 0x0a, 0x00, 0x04]));
982    }
983
984    #[test]
985    fn test_is_mqtt_payload_invalid() {
986        // Too short
987        assert!(!is_mqtt_payload(&[0x30]));
988        // Type 0 is reserved/invalid
989        assert!(!is_mqtt_payload(&[0x00, 0x00]));
990    }
991
992    // ---- PUBLISH parsing ----
993
994    #[test]
995    fn test_parse_publish_qos0() {
996        // PUBLISH QoS0: \x30\x0a\x00\x04test\x00test (but wait... let me
997        // recalculate the value: fixed header = 0x30 (type=3, dup=0, qos=0, retain=0)
998        // remaining length = 0x0a = 10
999        // variable header: topic_len=0x0004, topic="test" (4 bytes)
1000        // payload: remaining = 10 - (2+4) = 4 bytes => but we need 6 bytes for "test"?
1001        // No: rem_len=10 = 2(topic_len) + 4(topic) + 4(value) = 10 correct.
1002        // So value = last 4 bytes.
1003        // Full packet: 0x30 0x0a 0x00 0x04 't' 'e' 's' 't' 't' 'e' 's' 't'
1004        let data: Vec<u8> = vec![
1005            0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1006        ];
1007        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1008        let mqtt = MqttLayer::new(idx);
1009
1010        assert_eq!(mqtt.msg_type(&data).unwrap(), PUBLISH);
1011        assert!(!mqtt.dup(&data).unwrap());
1012        assert_eq!(mqtt.qos(&data).unwrap(), 0);
1013        assert!(!mqtt.retain(&data).unwrap());
1014        assert_eq!(mqtt.remaining_length(&data).unwrap(), 10);
1015        assert_eq!(mqtt.fixed_header_len(&data), 2);
1016        assert_eq!(mqtt.topic_len(&data).unwrap(), 4);
1017        assert_eq!(mqtt.topic(&data).unwrap(), "test");
1018        assert_eq!(mqtt.value(&data).unwrap(), b"test");
1019    }
1020
1021    #[test]
1022    fn test_parse_publish_qos1() {
1023        // PUBLISH QoS1 with msg_id:
1024        // type=3, dup=0, qos=1, retain=0 => 0x32
1025        // remaining_length=12 => topic_len(2) + topic(4) + msgid(2) + value(4)
1026        let data: Vec<u8> = vec![
1027            0x32, 0x0c, 0x00, 0x04, b't', b'e', b's', b't', 0x00, 0x0a, b'd', b'a', b't', b'a',
1028        ];
1029        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1030        let mqtt = MqttLayer::new(idx);
1031
1032        assert_eq!(mqtt.msg_type(&data).unwrap(), PUBLISH);
1033        assert_eq!(mqtt.qos(&data).unwrap(), 1);
1034        assert_eq!(mqtt.topic(&data).unwrap(), "test");
1035        assert_eq!(mqtt.msgid(&data).unwrap(), 10);
1036        assert_eq!(mqtt.value(&data).unwrap(), b"data");
1037    }
1038
1039    // ---- CONNECT parsing ----
1040
1041    #[test]
1042    fn test_parse_connect() {
1043        // CONNECT: \x10\x1f\x00\x06MQIsdp\x03\x02\x00\x3c\x00\x11mosqpub/1440-kali
1044        let data: Vec<u8> = vec![
1045            0x10, 0x1f, // fixed header: CONNECT, remaining_length=31
1046            0x00, 0x06, // proto_name length = 6
1047            b'M', b'Q', b'I', b's', b'd', b'p', // proto_name = "MQIsdp"
1048            0x03, // proto_level = 3
1049            0x02, // connect_flags = 0x02 (clean session)
1050            0x00, 0x3c, // keep_alive = 60
1051            0x00, 0x11, // client_id length = 17
1052            b'm', b'o', b's', b'q', b'p', b'u', b'b', b'/', b'1', b'4', b'4', b'0', b'-', b'k',
1053            b'a', b'l', b'i',
1054        ];
1055        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1056        let mqtt = MqttLayer::new(idx);
1057
1058        assert_eq!(mqtt.msg_type(&data).unwrap(), CONNECT);
1059        assert_eq!(mqtt.remaining_length(&data).unwrap(), 31);
1060        assert_eq!(mqtt.proto_name(&data).unwrap(), "MQIsdp");
1061        assert_eq!(mqtt.proto_level(&data).unwrap(), 3);
1062        assert_eq!(mqtt.connect_flags(&data).unwrap(), 0x02);
1063        assert!(mqtt.cleansess(&data).unwrap());
1064        assert!(!mqtt.usernameflag(&data).unwrap());
1065        assert!(!mqtt.passwordflag(&data).unwrap());
1066        assert!(!mqtt.willflag(&data).unwrap());
1067        assert_eq!(mqtt.klive(&data).unwrap(), 60);
1068        assert_eq!(mqtt.client_id(&data).unwrap(), "mosqpub/1440-kali");
1069    }
1070
1071    // ---- CONNACK parsing ----
1072
1073    #[test]
1074    fn test_parse_connack() {
1075        // CONNACK: \x20\x02\x00\x00
1076        let data: Vec<u8> = vec![0x20, 0x02, 0x00, 0x00];
1077        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1078        let mqtt = MqttLayer::new(idx);
1079
1080        assert_eq!(mqtt.msg_type(&data).unwrap(), CONNACK);
1081        assert_eq!(mqtt.remaining_length(&data).unwrap(), 2);
1082        assert_eq!(mqtt.sess_present_flag(&data).unwrap(), 0);
1083        assert_eq!(mqtt.retcode(&data).unwrap(), 0);
1084    }
1085
1086    // ---- SUBSCRIBE parsing ----
1087
1088    #[test]
1089    fn test_parse_subscribe() {
1090        // SUBSCRIBE: \x82\x09\x00\x01\x00\x04test\x01
1091        // type=8, flags=0x02 (reserved), rem_len=9
1092        // msgid=0x0001, topic_filter_len=4, topic="test", qos=1
1093        let data: Vec<u8> = vec![
1094            0x82, 0x09, 0x00, 0x01, 0x00, 0x04, b't', b'e', b's', b't', 0x01,
1095        ];
1096        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1097        let mqtt = MqttLayer::new(idx);
1098
1099        assert_eq!(mqtt.msg_type(&data).unwrap(), SUBSCRIBE);
1100        assert_eq!(mqtt.remaining_length(&data).unwrap(), 9);
1101        assert_eq!(mqtt.msgid(&data).unwrap(), 1);
1102    }
1103
1104    // ---- SUBACK parsing ----
1105
1106    #[test]
1107    fn test_parse_suback() {
1108        // SUBACK: \x90\x03\x00\x01\x00
1109        // type=9, rem_len=3, msgid=1, retcodes=[0x00]
1110        let data: Vec<u8> = vec![0x90, 0x03, 0x00, 0x01, 0x00];
1111        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1112        let mqtt = MqttLayer::new(idx);
1113
1114        assert_eq!(mqtt.msg_type(&data).unwrap(), SUBACK);
1115        assert_eq!(mqtt.remaining_length(&data).unwrap(), 3);
1116        assert_eq!(mqtt.msgid(&data).unwrap(), 1);
1117        assert_eq!(mqtt.retcodes(&data).unwrap(), vec![0x00]);
1118    }
1119
1120    // ---- PINGREQ / PINGRESP ----
1121
1122    #[test]
1123    fn test_parse_pingreq() {
1124        let data: Vec<u8> = vec![0xC0, 0x00];
1125        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1126        let mqtt = MqttLayer::new(idx);
1127
1128        assert_eq!(mqtt.msg_type(&data).unwrap(), PINGREQ);
1129        assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1130    }
1131
1132    #[test]
1133    fn test_parse_pingresp() {
1134        let data: Vec<u8> = vec![0xD0, 0x00];
1135        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1136        let mqtt = MqttLayer::new(idx);
1137
1138        assert_eq!(mqtt.msg_type(&data).unwrap(), PINGRESP);
1139        assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1140    }
1141
1142    // ---- DISCONNECT ----
1143
1144    #[test]
1145    fn test_parse_disconnect() {
1146        let data: Vec<u8> = vec![0xE0, 0x00];
1147        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1148        let mqtt = MqttLayer::new(idx);
1149
1150        assert_eq!(mqtt.msg_type(&data).unwrap(), DISCONNECT);
1151        assert_eq!(mqtt.remaining_length(&data).unwrap(), 0);
1152    }
1153
1154    // ---- Message type names ----
1155
1156    #[test]
1157    fn test_message_type_names() {
1158        assert_eq!(message_type_name(CONNECT), "CONNECT");
1159        assert_eq!(message_type_name(CONNACK), "CONNACK");
1160        assert_eq!(message_type_name(PUBLISH), "PUBLISH");
1161        assert_eq!(message_type_name(PUBACK), "PUBACK");
1162        assert_eq!(message_type_name(PUBREC), "PUBREC");
1163        assert_eq!(message_type_name(PUBREL), "PUBREL");
1164        assert_eq!(message_type_name(PUBCOMP), "PUBCOMP");
1165        assert_eq!(message_type_name(SUBSCRIBE), "SUBSCRIBE");
1166        assert_eq!(message_type_name(SUBACK), "SUBACK");
1167        assert_eq!(message_type_name(UNSUBSCRIBE), "UNSUBSCRIBE");
1168        assert_eq!(message_type_name(UNSUBACK), "UNSUBACK");
1169        assert_eq!(message_type_name(PINGREQ), "PINGREQ");
1170        assert_eq!(message_type_name(PINGRESP), "PINGRESP");
1171        assert_eq!(message_type_name(DISCONNECT), "DISCONNECT");
1172        assert_eq!(message_type_name(AUTH), "AUTH");
1173        assert_eq!(message_type_name(0), "UNKNOWN");
1174    }
1175
1176    // ---- Summary ----
1177
1178    #[test]
1179    fn test_summary_publish() {
1180        let data: Vec<u8> = vec![
1181            0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1182        ];
1183        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1184        let mqtt = MqttLayer::new(idx);
1185        let s = mqtt.summary(&data);
1186        assert!(s.contains("PUBLISH"));
1187        assert!(s.contains("topic=test"));
1188        assert!(s.contains("QOS=0"));
1189    }
1190
1191    #[test]
1192    fn test_summary_connect() {
1193        let data: Vec<u8> = vec![
1194            0x10, 0x1f, 0x00, 0x06, b'M', b'Q', b'I', b's', b'd', b'p', 0x03, 0x02, 0x00, 0x3c,
1195            0x00, 0x11, b'm', b'o', b's', b'q', b'p', b'u', b'b', b'/', b'1', b'4', b'4', b'0',
1196            b'-', b'k', b'a', b'l', b'i',
1197        ];
1198        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1199        let mqtt = MqttLayer::new(idx);
1200        let s = mqtt.summary(&data);
1201        assert!(s.contains("CONNECT"));
1202        assert!(s.contains("clientId=mosqpub/1440-kali"));
1203    }
1204
1205    // ---- get_field ----
1206
1207    #[test]
1208    fn test_get_field_msg_type() {
1209        let data: Vec<u8> = vec![0xC0, 0x00];
1210        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1211        let mqtt = MqttLayer::new(idx);
1212        let val = mqtt.get_field(&data, "msg_type").unwrap().unwrap();
1213        assert_eq!(val, FieldValue::U8(PINGREQ));
1214    }
1215
1216    #[test]
1217    fn test_get_field_unknown() {
1218        let data: Vec<u8> = vec![0xC0, 0x00];
1219        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1220        let mqtt = MqttLayer::new(idx);
1221        assert!(mqtt.get_field(&data, "nonexistent").is_none());
1222    }
1223
1224    // ---- Layer trait ----
1225
1226    #[test]
1227    fn test_layer_trait_kind() {
1228        let idx = LayerIndex::new(LayerKind::Mqtt, 0, 2);
1229        let mqtt = MqttLayer::new(idx);
1230        assert_eq!(mqtt.kind(), LayerKind::Mqtt);
1231    }
1232
1233    #[test]
1234    fn test_layer_trait_header_len() {
1235        // PUBLISH with rem_len=10 => total = 2 (fixed header) + 10 = 12
1236        let data: Vec<u8> = vec![
1237            0x30, 0x0a, 0x00, 0x04, b't', b'e', b's', b't', b't', b'e', b's', b't',
1238        ];
1239        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1240        let mqtt = MqttLayer::new(idx);
1241        assert_eq!(Layer::header_len(&mqtt, &data), 12);
1242    }
1243
1244    // ---- PUBACK ----
1245
1246    #[test]
1247    fn test_parse_puback() {
1248        // PUBACK: type=4, rem_len=2, msgid=0x000a
1249        let data: Vec<u8> = vec![0x40, 0x02, 0x00, 0x0a];
1250        let idx = LayerIndex::new(LayerKind::Mqtt, 0, data.len());
1251        let mqtt = MqttLayer::new(idx);
1252
1253        assert_eq!(mqtt.msg_type(&data).unwrap(), PUBACK);
1254        assert_eq!(mqtt.msgid(&data).unwrap(), 10);
1255    }
1256
1257    // ---- Builder round-trip ----
1258
1259    #[test]
1260    fn test_builder_roundtrip_publish() {
1261        let built = MqttBuilder::new()
1262            .publish()
1263            .topic(b"test")
1264            .payload(b"hello")
1265            .build();
1266
1267        let idx = LayerIndex::new(LayerKind::Mqtt, 0, built.len());
1268        let mqtt = MqttLayer::new(idx);
1269
1270        assert_eq!(mqtt.msg_type(&built).unwrap(), PUBLISH);
1271        assert_eq!(mqtt.qos(&built).unwrap(), 0);
1272        assert_eq!(mqtt.topic(&built).unwrap(), "test");
1273        assert_eq!(mqtt.value(&built).unwrap(), b"hello");
1274    }
1275
1276    #[test]
1277    fn test_builder_roundtrip_connect() {
1278        let built = MqttBuilder::new()
1279            .connect()
1280            .client_id(b"myclient")
1281            .keep_alive(120)
1282            .clean_session(true)
1283            .build();
1284
1285        let idx = LayerIndex::new(LayerKind::Mqtt, 0, built.len());
1286        let mqtt = MqttLayer::new(idx);
1287
1288        assert_eq!(mqtt.msg_type(&built).unwrap(), CONNECT);
1289        assert_eq!(mqtt.proto_name(&built).unwrap(), "MQTT");
1290        assert_eq!(mqtt.proto_level(&built).unwrap(), 4);
1291        assert!(mqtt.cleansess(&built).unwrap());
1292        assert_eq!(mqtt.klive(&built).unwrap(), 120);
1293        assert_eq!(mqtt.client_id(&built).unwrap(), "myclient");
1294    }
1295}