net-mesh 0.21.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
//! Channel membership subprotocol — Subscribe / Unsubscribe / Ack.
//!
//! Ships over `SUBPROTOCOL_CHANNEL_MEMBERSHIP` on existing encrypted
//! sessions. Carries the channel name (not just the u16 hash) so that
//! the publisher-side `ChannelConfig::can_subscribe` check can look up
//! the authoritative config by name — hash collisions must never cause
//! a subscribe to land on the wrong channel's ACL.

use bytes::{Buf, BufMut};

use super::name::{ChannelError, ChannelName};

/// Subprotocol ID for channel membership (subscribe / unsubscribe / ack).
pub const SUBPROTOCOL_CHANNEL_MEMBERSHIP: u16 = 0x0A00;

const MSG_SUBSCRIBE: u8 = 0;
const MSG_UNSUBSCRIBE: u8 = 1;
const MSG_ACK: u8 = 2;

const ACK_REASON_OK: u8 = 0;
const ACK_REASON_UNAUTHORIZED: u8 = 1;
const ACK_REASON_UNKNOWN_CHANNEL: u8 = 2;
const ACK_REASON_RATE_LIMITED: u8 = 3;
const ACK_REASON_TOO_MANY_CHANNELS: u8 = 4;

/// Why a `Subscribe` or `Unsubscribe` was rejected.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AckReason {
    /// Capability or token check failed.
    Unauthorized,
    /// Channel not registered on the publisher side.
    UnknownChannel,
    /// Membership churn throttled.
    RateLimited,
    /// Per-peer channel cap exceeded.
    TooManyChannels,
}

/// Channel membership wire message.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MembershipMsg {
    /// Ask the publisher to add this node to `channel`'s subscriber set.
    Subscribe {
        /// Channel the sender wants to subscribe to.
        channel: ChannelName,
        /// Request correlation nonce — echoed back in `Ack`.
        nonce: u64,
        /// Serialized [`super::super::identity::PermissionToken`]
        /// presented alongside the subscribe request. `None` / empty
        /// when the sender has no token to offer — the publisher's
        /// `authorize_subscribe` decides whether a token is required.
        token: Option<Vec<u8>>,
        /// Subscription mode. `None` → `Broadcast` (every published
        /// event delivered to this subscriber, the historic
        /// pub/sub semantic). `Some(name)` → `QueueGroup(name)`
        /// (work-distribution: every published event delivered to
        /// exactly ONE subscriber in the named group). The
        /// publisher's `authorize_subscribe` is mode-agnostic — the
        /// capability tokens that gate the channel apply equally.
        ///
        /// Wire-compat: encoded as a `u8` length prefix + UTF-8
        /// bytes after the token. Length `0` (or absent trailing
        /// bytes — pre-queue-group senders) means `Broadcast`.
        queue_group: Option<String>,
    },
    /// Ask the publisher to remove this node from `channel`'s subscriber set.
    Unsubscribe {
        /// Channel the sender wants to unsubscribe from.
        channel: ChannelName,
        /// Request correlation nonce — echoed back in `Ack`.
        nonce: u64,
    },
    /// Acknowledgement for a prior Subscribe / Unsubscribe.
    Ack {
        /// Nonce of the request being acknowledged.
        nonce: u64,
        /// Whether the request was accepted.
        accepted: bool,
        /// If rejected, why.
        reason: Option<AckReason>,
    },
}

/// Error returned by the membership codec.
#[derive(Debug, thiserror::Error)]
pub enum MembershipCodecError {
    /// Unknown or reserved message-type byte.
    #[error("unknown membership message type: {0}")]
    UnknownType(u8),
    /// Buffer ended mid-field.
    #[error("truncated membership message: {0}")]
    Truncated(&'static str),
    /// Channel name failed validation.
    #[error("channel name: {0}")]
    Name(#[from] ChannelError),
    /// Length prefix exceeds the remaining buffer.
    #[error("length {0} exceeds remaining {1}")]
    Overflow(usize, usize),
    /// Length prefix exceeds the declared max.
    #[error("channel name length {0} exceeds limit {1}")]
    NameTooLong(usize, usize),
}

/// Maximum channel-name length accepted by the decoder, in bytes.
/// Matches `name::MAX_NAME_LEN`; duplicated here to keep the wire check local.
const MAX_CHANNEL_NAME_LEN: usize = 255;

/// Maximum queue-group name length, in bytes. Bounded by the u8
/// length prefix in the wire format. Long names work but bloat
/// every Subscribe; recommend keeping group names short and
/// human-readable.
const MAX_QUEUE_GROUP_NAME_LEN: usize = 255;

/// Encode a membership message to bytes.
pub fn encode(msg: &MembershipMsg) -> Vec<u8> {
    let mut buf = Vec::with_capacity(64);
    match msg {
        MembershipMsg::Subscribe {
            channel,
            nonce,
            token,
            queue_group,
        } => {
            buf.put_u8(MSG_SUBSCRIBE);
            buf.put_u64_le(*nonce);
            let name = channel.as_str().as_bytes();
            buf.put_u8(name.len() as u8);
            buf.extend_from_slice(name);
            // Token payload: u16_le length + bytes. Zero length when
            // unset — decoder treats absent trailing bytes as no
            // token, for forward-compat with a potential pre-E-1
            // sender (none exist in practice but the cost is ~nil).
            let token_bytes: &[u8] = token.as_deref().unwrap_or(&[]);
            buf.put_u16_le(token_bytes.len() as u16);
            buf.extend_from_slice(token_bytes);
            // Queue-group payload: u8 length + UTF-8 bytes. We
            // OMIT the field entirely when `None` — the decoder
            // treats zero remaining bytes after the token as
            // Broadcast (queue_group = None), so `None` round-
            // trips byte-equivalent to the pre-queue-group wire
            // shape. This is load-bearing for backward
            // compatibility: pre-queue-group nodes' strict-trailer
            // guard rejects a SUBSCRIBE that has trailing bytes
            // beyond the token, so always-emitting the `qg_len=0`
            // byte would break new→old SUBSCRIBE delivery.
            //
            // Names are bound by `MAX_QUEUE_GROUP_NAME_LEN` to
            // keep the prefix u8 — overlength names panic in debug
            // (programmer error) and saturate in release. Encoders
            // should pre-validate group names; the panic here is a
            // last-resort defense, not a routine path.
            if let Some(qg) = queue_group {
                let qg_bytes = qg.as_bytes();
                debug_assert!(
                    qg_bytes.len() <= MAX_QUEUE_GROUP_NAME_LEN,
                    "queue-group name {} exceeds MAX_QUEUE_GROUP_NAME_LEN ({})",
                    qg_bytes.len(),
                    MAX_QUEUE_GROUP_NAME_LEN,
                );
                let len = qg_bytes.len().min(MAX_QUEUE_GROUP_NAME_LEN) as u8;
                buf.put_u8(len);
                buf.extend_from_slice(&qg_bytes[..len as usize]);
            }
        }
        MembershipMsg::Unsubscribe { channel, nonce } => {
            buf.put_u8(MSG_UNSUBSCRIBE);
            buf.put_u64_le(*nonce);
            let name = channel.as_str().as_bytes();
            buf.put_u8(name.len() as u8);
            buf.extend_from_slice(name);
        }
        MembershipMsg::Ack {
            nonce,
            accepted,
            reason,
        } => {
            buf.put_u8(MSG_ACK);
            buf.put_u64_le(*nonce);
            buf.put_u8(u8::from(*accepted));
            buf.put_u8(match reason {
                None => ACK_REASON_OK,
                Some(AckReason::Unauthorized) => ACK_REASON_UNAUTHORIZED,
                Some(AckReason::UnknownChannel) => ACK_REASON_UNKNOWN_CHANNEL,
                Some(AckReason::RateLimited) => ACK_REASON_RATE_LIMITED,
                Some(AckReason::TooManyChannels) => ACK_REASON_TOO_MANY_CHANNELS,
            });
        }
    }
    buf
}

/// Decode a membership message from bytes.
pub fn decode(data: &[u8]) -> Result<MembershipMsg, MembershipCodecError> {
    if data.is_empty() {
        return Err(MembershipCodecError::Truncated("empty"));
    }
    let mut cur = std::io::Cursor::new(data);
    let tag = cur.get_u8();
    match tag {
        MSG_SUBSCRIBE | MSG_UNSUBSCRIBE => {
            if cur.remaining() < 9 {
                return Err(MembershipCodecError::Truncated("subscribe header"));
            }
            let nonce = cur.get_u64_le();
            let name_len = cur.get_u8() as usize;
            if name_len == 0 {
                return Err(MembershipCodecError::Truncated("empty channel name"));
            }
            if name_len > MAX_CHANNEL_NAME_LEN {
                return Err(MembershipCodecError::NameTooLong(
                    name_len,
                    MAX_CHANNEL_NAME_LEN,
                ));
            }
            if cur.remaining() < name_len {
                return Err(MembershipCodecError::Overflow(name_len, cur.remaining()));
            }
            let start = cur.position() as usize;
            let end = start + name_len;
            let name_bytes = &data[start..end];
            let name_str = std::str::from_utf8(name_bytes)
                .map_err(|_| MembershipCodecError::Truncated("non-utf8 channel name"))?;
            let channel = ChannelName::new(name_str)?;
            if tag == MSG_SUBSCRIBE {
                // Advance past the name we just read.
                cur.set_position(end as u64);
                // Token: u16_le length + bytes. Zero length ⇒ absent.
                // Legacy pre-E-1 payloads that stop exactly after the
                // name (zero trailing bytes) are treated as "no token"
                // for forward-compat. Exactly one trailing byte is
                // neither — it means a malformed sender wrote half
                // the length prefix, and the older
                // `cur.remaining() < 2` check silently accepted it as
                // "no token," hiding the bug from callers. Reject so
                // truncation surfaces as an error.
                let token = match cur.remaining() {
                    0 => None,
                    1 => {
                        return Err(MembershipCodecError::Truncated(
                            "subscribe token length prefix",
                        ));
                    }
                    _ => {
                        let token_len = cur.get_u16_le() as usize;
                        if token_len == 0 {
                            None
                        } else if cur.remaining() < token_len {
                            return Err(MembershipCodecError::Overflow(token_len, cur.remaining()));
                        } else {
                            let tstart = cur.position() as usize;
                            let tend = tstart + token_len;
                            // Advance cur past the token bytes so
                            // the trailing-byte check below operates
                            // against the actual unconsumed remainder
                            // rather than seeing the token as
                            // "trailing".
                            cur.set_position(tend as u64);
                            Some(data[tstart..tend].to_vec())
                        }
                    }
                };
                // Queue-group: u8 length + UTF-8 bytes. Forward-
                // compat with pre-queue-group senders: zero
                // remaining bytes after the token decodes as
                // `Broadcast` (queue_group = None). A non-zero
                // length but a malformed (non-UTF-8) name surfaces
                // as a decode error rather than a silent acceptance.
                let queue_group = match cur.remaining() {
                    0 => None,
                    _ => {
                        let qg_len = cur.get_u8() as usize;
                        if qg_len == 0 {
                            None
                        } else if qg_len > MAX_QUEUE_GROUP_NAME_LEN {
                            return Err(MembershipCodecError::NameTooLong(
                                qg_len,
                                MAX_QUEUE_GROUP_NAME_LEN,
                            ));
                        } else if cur.remaining() < qg_len {
                            return Err(MembershipCodecError::Overflow(qg_len, cur.remaining()));
                        } else {
                            let qstart = cur.position() as usize;
                            let qend = qstart + qg_len;
                            cur.set_position(qend as u64);
                            let s = std::str::from_utf8(&data[qstart..qend]).map_err(|_| {
                                MembershipCodecError::Truncated(
                                    "non-utf8 subscribe queue-group name",
                                )
                            })?;
                            Some(s.to_string())
                        }
                    }
                };
                // Strict-trailer rejection after the queue-group
                // bytes (the new outermost optional field). Pre-
                // queue-group, this guarded against arbitrary
                // garbage after the token; the guard moves outward
                // by one field.
                if cur.remaining() != 0 {
                    return Err(MembershipCodecError::Truncated(
                        "trailing bytes after subscribe queue-group",
                    ));
                }
                Ok(MembershipMsg::Subscribe {
                    channel,
                    nonce,
                    token,
                    queue_group,
                })
            } else {
                // Advance cur past the channel name we read by
                // direct slice (the SUBSCRIBE branch above does
                // this via `cur.set_position(end as u64)`; we
                // mirror that here so the trailing-byte check
                // below is meaningful).
                cur.set_position(end as u64);
                // Pre-fix UNSUBSCRIBE returned Ok without
                // checking that the buffer was fully consumed. A
                // malformed peer could append arbitrary bytes
                // after a valid Unsubscribe and the decoder
                // accepted it, hiding upstream framer bugs.
                if cur.remaining() != 0 {
                    return Err(MembershipCodecError::Truncated(
                        "trailing bytes after unsubscribe",
                    ));
                }
                Ok(MembershipMsg::Unsubscribe { channel, nonce })
            }
        }
        MSG_ACK => {
            if cur.remaining() < 10 {
                return Err(MembershipCodecError::Truncated("ack"));
            }
            let nonce = cur.get_u64_le();
            // Strict boolean: reject any byte other than 0 or 1 instead of
            // treating every non-zero value as "accepted". Prevents a
            // malformed sender from making an otherwise-unknown reason
            // code silently imply acceptance.
            let accepted = match cur.get_u8() {
                0 => false,
                1 => true,
                other => return Err(MembershipCodecError::UnknownType(other)),
            };
            let reason_byte = cur.get_u8();
            let reason = match reason_byte {
                ACK_REASON_OK => None,
                ACK_REASON_UNAUTHORIZED => Some(AckReason::Unauthorized),
                ACK_REASON_UNKNOWN_CHANNEL => Some(AckReason::UnknownChannel),
                ACK_REASON_RATE_LIMITED => Some(AckReason::RateLimited),
                ACK_REASON_TOO_MANY_CHANNELS => Some(AckReason::TooManyChannels),
                other => return Err(MembershipCodecError::UnknownType(other)),
            };
            // Same strict-trailer rejection on the ACK
            // path.
            if cur.remaining() != 0 {
                return Err(MembershipCodecError::Truncated("trailing bytes after ack"));
            }
            Ok(MembershipMsg::Ack {
                nonce,
                accepted,
                reason,
            })
        }
        other => Err(MembershipCodecError::UnknownType(other)),
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn ch(name: &str) -> ChannelName {
        ChannelName::new(name).unwrap()
    }

    #[test]
    fn test_roundtrip_subscribe_no_token() {
        let msg = MembershipMsg::Subscribe {
            channel: ch("sensors/lidar"),
            nonce: 0xDEAD_BEEF_CAFE_F00D,
            token: None,
            queue_group: None,
        };
        let bytes = encode(&msg);
        let decoded = decode(&bytes).unwrap();
        assert_eq!(decoded, msg);
    }

    #[test]
    fn test_roundtrip_subscribe_with_token() {
        // Arbitrary token bytes — codec doesn't validate internal
        // structure. Validation is the job of `PermissionToken`.
        let token_bytes = vec![0xABu8; 64];
        let msg = MembershipMsg::Subscribe {
            channel: ch("sensors/lidar"),
            nonce: 0xCAFE,
            token: Some(token_bytes),
            queue_group: None,
        };
        let bytes = encode(&msg);
        let decoded = decode(&bytes).unwrap();
        assert_eq!(decoded, msg);
    }

    #[test]
    fn test_legacy_subscribe_no_trailing_token_len_decodes_as_none() {
        // Forge a pre-E-1 payload (no u16 token_len trailer).
        use bytes::BufMut;
        let mut buf = Vec::new();
        buf.put_u8(MSG_SUBSCRIBE);
        buf.put_u64_le(42);
        let name = b"lab/x";
        buf.put_u8(name.len() as u8);
        buf.extend_from_slice(name);
        // NO token_len field — stops right after the name.
        let decoded = decode(&buf).unwrap();
        assert_eq!(
            decoded,
            MembershipMsg::Subscribe {
                channel: ch("lab/x"),
                nonce: 42,
                token: None,
                queue_group: None,
            }
        );
    }

    #[test]
    fn test_regression_subscribe_one_byte_token_len_rejected() {
        // Regression for a cubic-flagged P2: a Subscribe payload with
        // exactly one trailing byte after the name used to be silently
        // accepted as "no token" because the decoder guarded on
        // `remaining() < 2`. A half-written `u16_le` length prefix is
        // a truncation, not a legacy payload — it must error.
        use bytes::BufMut;
        let mut buf = Vec::new();
        buf.put_u8(MSG_SUBSCRIBE);
        buf.put_u64_le(42);
        let name = b"lab/x";
        buf.put_u8(name.len() as u8);
        buf.extend_from_slice(name);
        // Exactly ONE trailing byte — half of a u16_le length prefix.
        buf.push(0x05);
        let err = decode(&buf).unwrap_err();
        assert!(
            matches!(err, MembershipCodecError::Truncated(_)),
            "expected Truncated, got {err:?}",
        );
    }

    #[test]
    fn test_roundtrip_unsubscribe() {
        let msg = MembershipMsg::Unsubscribe {
            channel: ch("control/estop"),
            nonce: 42,
        };
        let bytes = encode(&msg);
        let decoded = decode(&bytes).unwrap();
        assert_eq!(decoded, msg);
    }

    #[test]
    fn test_roundtrip_ack_accepted() {
        let msg = MembershipMsg::Ack {
            nonce: 7,
            accepted: true,
            reason: None,
        };
        let bytes = encode(&msg);
        let decoded = decode(&bytes).unwrap();
        assert_eq!(decoded, msg);
    }

    #[test]
    fn test_roundtrip_ack_rejected() {
        let reasons = [
            AckReason::Unauthorized,
            AckReason::UnknownChannel,
            AckReason::RateLimited,
            AckReason::TooManyChannels,
        ];
        for r in reasons {
            let msg = MembershipMsg::Ack {
                nonce: 99,
                accepted: false,
                reason: Some(r),
            };
            let bytes = encode(&msg);
            let decoded = decode(&bytes).unwrap();
            assert_eq!(decoded, msg);
        }
    }

    #[test]
    fn test_decode_empty_fails() {
        assert!(matches!(
            decode(&[]),
            Err(MembershipCodecError::Truncated(_))
        ));
    }

    #[test]
    fn test_decode_unknown_tag() {
        assert!(matches!(
            decode(&[0xFF]),
            Err(MembershipCodecError::UnknownType(0xFF))
        ));
    }

    #[test]
    fn test_decode_truncated_subscribe() {
        // Tag + partial nonce only.
        assert!(matches!(
            decode(&[MSG_SUBSCRIBE, 0, 0, 0]),
            Err(MembershipCodecError::Truncated(_))
        ));
    }

    #[test]
    fn test_decode_zero_name_len_rejected() {
        let mut buf = vec![MSG_SUBSCRIBE];
        buf.extend_from_slice(&0u64.to_le_bytes());
        buf.push(0); // name_len = 0
        assert!(matches!(
            decode(&buf),
            Err(MembershipCodecError::Truncated(_))
        ));
    }

    #[test]
    fn test_decode_overflow_name_len() {
        let mut buf = vec![MSG_SUBSCRIBE];
        buf.extend_from_slice(&0u64.to_le_bytes());
        buf.push(10); // claims 10 bytes but we only have 3
        buf.extend_from_slice(b"abc");
        assert!(matches!(
            decode(&buf),
            Err(MembershipCodecError::Overflow(10, 3))
        ));
    }

    #[test]
    fn test_decode_ack_strict_boolean_rejects_non_01() {
        // Valid ack with accepted=true (0x01), reason=OK — sanity check.
        let mut buf = vec![MSG_ACK];
        buf.extend_from_slice(&7u64.to_le_bytes());
        buf.push(1);
        buf.push(ACK_REASON_OK);
        assert!(decode(&buf).is_ok());

        // Same message but accepted=0xFF — must be rejected, not treated
        // as `true`.
        let mut buf = vec![MSG_ACK];
        buf.extend_from_slice(&7u64.to_le_bytes());
        buf.push(0xFF);
        buf.push(ACK_REASON_OK);
        assert!(matches!(
            decode(&buf),
            Err(MembershipCodecError::UnknownType(0xFF))
        ));
    }

    #[test]
    fn test_decode_invalid_channel_name() {
        let mut buf = vec![MSG_SUBSCRIBE];
        buf.extend_from_slice(&0u64.to_le_bytes());
        // name contains '//' which fails validation
        let name = b"a//b";
        buf.push(name.len() as u8);
        buf.extend_from_slice(name);
        assert!(matches!(decode(&buf), Err(MembershipCodecError::Name(_))));
    }

    /// Trailing bytes after a valid UNSUBSCRIBE must be
    /// rejected. Pre-fix the decoder returned Ok without checking
    /// `cur.remaining() == 0`, so a malformed peer could append
    /// garbage that hid upstream framer bugs.
    #[test]
    fn unsubscribe_with_trailing_bytes_is_rejected() {
        let msg = MembershipMsg::Unsubscribe {
            channel: ch("control/estop"),
            nonce: 42,
        };
        let mut bytes = encode(&msg);
        bytes.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
        let err = decode(&bytes).unwrap_err();
        assert!(
            matches!(err, MembershipCodecError::Truncated(s) if s.contains("unsubscribe")),
            "expected trailing-after-unsubscribe error, got {:?}",
            err
        );
    }

    /// Same strict-trailer rejection on the ACK path.
    #[test]
    fn ack_with_trailing_bytes_is_rejected() {
        let msg = MembershipMsg::Ack {
            nonce: 42,
            accepted: true,
            reason: None,
        };
        let mut bytes = encode(&msg);
        bytes.push(0xAA);
        let err = decode(&bytes).unwrap_err();
        assert!(
            matches!(err, MembershipCodecError::Truncated(s) if s.contains("ack")),
            "expected trailing-after-ack error, got {:?}",
            err
        );
    }

    /// SUBSCRIBE-with-token must reject trailing bytes
    /// after the token. Pre-fix this was the SUBSCRIBE path's
    /// equivalent gap.
    #[test]
    fn subscribe_with_token_then_trailing_bytes_is_rejected() {
        let msg = MembershipMsg::Subscribe {
            channel: ch("sensors/lidar"),
            nonce: 0xCAFE,
            token: Some(vec![0xAB; 32]),
            queue_group: None,
        };
        let mut bytes = encode(&msg);
        // Append two arbitrary bytes that aren't a valid
        // queue_group prefix-length+payload (the first is read as
        // qg_len=0xDE which then demands 0xDE bytes that aren't
        // there).
        bytes.extend_from_slice(&[0xDE, 0xAD]);
        let err = decode(&bytes).unwrap_err();
        // After the queue-group field landed on the wire, trailing
        // bytes after a valid token are interpreted as the start
        // of the queue-group field. The first byte (`0xDE`) is
        // read as `qg_len = 222`; the second byte is then short of
        // the demanded payload, so the decoder errors with
        // `Overflow`. Pre-queue-group, the same bytes were
        // rejected as `Truncated("trailing bytes after subscribe
        // token")`. Either error proves the load-bearing property:
        // arbitrary garbage past a valid Subscribe is NOT silently
        // accepted.
        assert!(
            matches!(
                err,
                MembershipCodecError::Truncated(_) | MembershipCodecError::Overflow(_, _)
            ),
            "expected trailing-after-subscribe rejection (Truncated or Overflow), got {:?}",
            err
        );
    }

    // ====================================================================
    // SUBSCRIBE queue-group field — wire-format extension.
    //
    // The codec's forward-compat property: pre-queue-group senders
    // (no trailing bytes after the token) decode as Broadcast
    // (queue_group = None). New senders that include the field
    // round-trip cleanly.
    // ====================================================================

    /// SUBSCRIBE with a queue group set round-trips through
    /// encode/decode unchanged.
    #[test]
    fn subscribe_queue_group_roundtrip() {
        let msg = MembershipMsg::Subscribe {
            channel: ch("svc/req"),
            nonce: 7,
            token: None,
            queue_group: Some("workers".to_string()),
        };
        let bytes = encode(&msg);
        let decoded = decode(&bytes).unwrap();
        assert_eq!(decoded, msg);
    }

    /// SUBSCRIBE with both a token AND a queue group round-trips.
    /// Pin the field ordering (token then queue group) so a
    /// future re-ordering tickles the test.
    #[test]
    fn subscribe_token_and_queue_group_roundtrip() {
        let msg = MembershipMsg::Subscribe {
            channel: ch("svc/req"),
            nonce: 11,
            token: Some(vec![0xAB; 80]),
            queue_group: Some("workers-pool-a".to_string()),
        };
        let bytes = encode(&msg);
        let decoded = decode(&bytes).unwrap();
        assert_eq!(decoded, msg);
    }

    /// Backward-compat regression: encoding a SUBSCRIBE with
    /// `queue_group: None` MUST produce a wire payload that is
    /// byte-equivalent to what a pre-queue-group sender would
    /// produce — i.e. ZERO trailing bytes after the token block.
    /// Pre-queue-group nodes' decoders run a strict-trailer guard
    /// that rejects any bytes past a valid token; if we always
    /// emit the `qg_len=0` byte, those nodes reject every new-
    /// sender SUBSCRIBE and roll-out becomes lockstep-only.
    #[test]
    fn subscribe_none_queue_group_omits_qg_byte_for_wire_compat() {
        let new_msg = MembershipMsg::Subscribe {
            channel: ch("sensors/lidar"),
            nonce: 99,
            token: None,
            queue_group: None,
        };
        let new_bytes = encode(&new_msg);

        // Build the exact byte sequence a pre-queue-group sender
        // would emit (no qg byte at all).
        use bytes::BufMut;
        let mut legacy_bytes = Vec::new();
        legacy_bytes.put_u8(MSG_SUBSCRIBE);
        legacy_bytes.put_u64_le(99);
        let name = b"sensors/lidar";
        legacy_bytes.put_u8(name.len() as u8);
        legacy_bytes.extend_from_slice(name);
        legacy_bytes.put_u16_le(0);
        // NO qg byte.

        assert_eq!(
            new_bytes, legacy_bytes,
            "encode(queue_group: None) must be byte-equivalent to the \
             pre-queue-group wire shape so old peers' strict-trailer \
             guard accepts the SUBSCRIBE",
        );
    }

    /// SUBSCRIBE with `queue_group: None` is byte-equivalent to a
    /// pre-queue-group SUBSCRIBE that stops right after the token.
    /// Both encode to (token+0x00) trailing the channel name; the
    /// decoder treats `qg_len = 0` and "no remaining bytes" as
    /// the same Broadcast-default outcome.
    #[test]
    fn subscribe_pre_queue_group_payload_decodes_as_broadcast() {
        // Forge a wire payload that stops after the token (no qg
        // byte) — this is what a pre-queue-group sender produces.
        use bytes::BufMut;
        let mut buf = Vec::new();
        buf.put_u8(MSG_SUBSCRIBE);
        buf.put_u64_le(99);
        let name = b"sensors/lidar";
        buf.put_u8(name.len() as u8);
        buf.extend_from_slice(name);
        // Token of length 0.
        buf.put_u16_le(0);
        // No qg byte at all — matches a pre-queue-group sender.
        let decoded = decode(&buf).unwrap();
        assert_eq!(
            decoded,
            MembershipMsg::Subscribe {
                channel: ch("sensors/lidar"),
                nonce: 99,
                token: None,
                queue_group: None,
            },
            "pre-queue-group payload (no trailing bytes after token) \
             must decode as Broadcast",
        );
    }

    /// A queue-group name that exceeds `MAX_QUEUE_GROUP_NAME_LEN`
    /// in the wire-format invariant `qg_len <= u8::MAX` is
    /// structurally impossible (the prefix can't encode a longer
    /// length). But a malformed sender that writes
    /// `qg_len > remaining` must surface as `Overflow`. Pin so a
    /// future change can't silently accept short reads.
    #[test]
    fn subscribe_queue_group_overflow_is_rejected() {
        use bytes::BufMut;
        let mut buf = Vec::new();
        buf.put_u8(MSG_SUBSCRIBE);
        buf.put_u64_le(1);
        let name = b"svc/req";
        buf.put_u8(name.len() as u8);
        buf.extend_from_slice(name);
        buf.put_u16_le(0); // no token
                           // Claim a 200-byte queue-group name but only provide 5.
        buf.put_u8(200);
        buf.extend_from_slice(b"short");
        let err = decode(&buf).unwrap_err();
        assert!(
            matches!(err, MembershipCodecError::Overflow(claimed, remaining) if claimed == 200 && remaining == 5),
            "expected Overflow(200, 5), got {:?}",
            err,
        );
    }

    /// Non-UTF-8 bytes in the queue-group payload are rejected.
    #[test]
    fn subscribe_queue_group_non_utf8_is_rejected() {
        use bytes::BufMut;
        let mut buf = Vec::new();
        buf.put_u8(MSG_SUBSCRIBE);
        buf.put_u64_le(1);
        let name = b"svc/req";
        buf.put_u8(name.len() as u8);
        buf.extend_from_slice(name);
        buf.put_u16_le(0);
        buf.put_u8(2);
        buf.extend_from_slice(&[0xFF, 0xFE]); // not valid UTF-8 in this position
        let err = decode(&buf).unwrap_err();
        assert!(
            matches!(err, MembershipCodecError::Truncated(s) if s.contains("non-utf8")),
            "expected non-utf8 rejection, got {:?}",
            err,
        );
    }
}