Skip to main content

dynomite/proto/
dnode.rs

1//! DNODE wire codec.
2//!
3//! The DNODE protocol frames every Dynomite peer-to-peer message
4//! with a small ASCII header followed by an opaque payload. The
5//! header carries the message id, type tag, encryption/compression
6//! flags, protocol version, same-datacenter bit, an inline data
7//! field (either a one-byte placeholder or an RSA-wrapped AES key),
8//! and the byte length of the payload that follows after `\r\n`.
9//!
10//! The reference engine implements the parser as a single state
11//! machine driven byte-by-byte. This module ports the same machine
12//! into safe Rust and exposes:
13//!
14//! * [`DynParseState`] - the parser's state alphabet.
15//! * [`DmsgType`] - the full set of message-type discriminators.
16//! * [`Dmsg`] - the in-memory header.
17//! * [`DnodeParser`] - the state machine, advanced by feeding bytes
18//!   through [`DnodeParser::step`].
19//! * [`dmsg_write`] / [`dmsg_write_mbuf`] - the canonical encoders.
20//! * [`parse_req`] / [`parse_rsp`] - thin sync wrappers around the
21//!   parser that operate on a [`crate::msg::Msg`]'s mbuf chain.
22//! * [`dmsg_process`] - dispatcher that classifies a parsed
23//!   [`Dmsg`] by type for the cluster layer to act on.
24//!
25//! The encoder accepts an optional `aes_key_payload`: when present,
26//! the caller provides the bytes the inline data field should hold
27//! (the RSA-wrapped AES key produced by [`crate::crypto::Crypto`]).
28//! When absent, the encoder writes the single-byte `'d'` placeholder
29//! the reference engine emits after the first handshake message.
30
31// The parser truncates accumulated decimals into the same fixed
32// bit widths the reference engine uses on the wire (`u8` for the
33// type and flags, `u32` for the data and payload lengths). The
34// allowance keeps the Rust port faithful to the C `(uint8_t)num`
35// and `(uint32_t)num` casts; out-of-range numerals are surfaced as
36// parse errors elsewhere in the state machine.
37#![allow(clippy::cast_possible_truncation)]
38#![allow(clippy::needless_continue)]
39
40use std::net::SocketAddr;
41
42use crate::core::types::MsgId;
43use crate::io::mbuf::{Mbuf, MbufQueue};
44use crate::msg::message::Msg;
45use crate::msg::message::MsgParseResult;
46
47/// Magic literal that opens every DNODE header.
48pub const MAGIC: &[u8] = b"$2014$";
49
50/// Default protocol version emitted by [`dmsg_write`]. Mirrors
51/// `VERSION_10` in the reference engine.
52pub const VERSION_10: u8 = 1;
53
54/// CRLF delimiter that separates the DNODE header from its payload.
55pub const CRLF: &[u8] = b"\r\n";
56
57/// Single-byte placeholder used by [`dmsg_write`] when no AES key
58/// payload accompanies the header.
59pub const HANDSHAKE_PLACEHOLDER_DATA: u8 = b'd';
60
61/// Single-byte placeholder used by [`dmsg_write_mbuf`] when no AES
62/// key payload accompanies the header. The gossip path emits `'a'`
63/// instead of `'d'` to disambiguate the two encoder flavours.
64pub const GOSSIP_PLACEHOLDER_DATA: u8 = b'a';
65
66/// Per-frame upper bound on a parser-accepted length field.
67///
68/// The on-the-wire DNODE header carries `mlen` and `plen` as ASCII
69/// decimal numerals that the streaming parser accumulates into a
70/// `u64` before casting to the wire's `u32`. Without an explicit
71/// cap on the accumulator, a single byte run of `1`s inflates
72/// `self.num` past `u32::MAX`; the silent truncation then drives
73/// [`Vec::reserve`] into a multi-gigabyte malloc (libfuzzer 1h soak
74/// finding 2026-06-02, captured at
75/// `crates/fuzz/seeds/dnode_parse/regression-oom-2026-06-02`).
76///
77/// 256 MiB is well above any legitimate DNODE frame on the wire
78/// today (the largest production payloads we have observed are a
79/// few hundred KiB) while staying well below an allocation that
80/// would produce a real OOM under typical RSS budgets. The parser
81/// surfaces [`ParseStep::Error`] the moment any DataLen or
82/// PayloadLen accumulator exceeds this bound.
83pub const MAX_DATA_LEN: u64 = 256 * 1024 * 1024;
84
85/// Parser state transitions.
86///
87/// Each variant matches a state in the reference engine's
88/// `dyn_parse_state_t`. The numeric values match the C enum's
89/// numeric values to keep external parity tooling honest.
90#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
91pub enum DynParseState {
92    /// Initial state; consumes leading whitespace until the magic
93    /// literal is observed.
94    #[default]
95    Start,
96    /// `$2014$` was matched; awaiting the trailing space.
97    MagicString,
98    /// Reading the decimal message id.
99    MsgId,
100    /// Reading the decimal message type.
101    TypeId,
102    /// Reading the decimal flags bit field.
103    BitField,
104    /// Reading the decimal protocol version.
105    Version,
106    /// Reading the same-datacenter digit.
107    SameDc,
108    /// Awaiting the leading `*` before the data length.
109    Star,
110    /// Reading the decimal data length.
111    DataLen,
112    /// Consuming the inline data of `mlen` bytes.
113    Data,
114    /// Skipping spaces before the payload-length marker.
115    SpacesBeforePayloadLen,
116    /// Reading the decimal payload length.
117    PayloadLen,
118    /// Awaiting the LF that terminates the header.
119    CrlfBeforeDone,
120    /// Header complete; payload position recorded.
121    Done,
122    /// Header complete and post-handshake decryption applied.
123    PostDone,
124    /// Recovery state after the parser hit a malformed byte.
125    Unknown,
126}
127
128/// DNODE message type identifier.
129///
130/// The numeric values match `dmsg_type_t` from the reference engine
131/// because the discriminator travels on the wire as a decimal.
132#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
133#[repr(u8)]
134pub enum DmsgType {
135    /// Unset / unknown type.
136    #[default]
137    Unknown = 0,
138    /// Diagnostic frame (unused on the live wire; kept for parity).
139    Debug = 1,
140    /// Parse-error frame (unused on the live wire; kept for parity).
141    ParseError = 2,
142    /// Datastore request bound for the local DC.
143    Req = 3,
144    /// Datastore request to be forwarded across DCs.
145    ReqForward = 4,
146    /// Datastore response.
147    Res = 5,
148    /// AES key handshake.
149    CryptoHandshake = 6,
150    /// Gossip SYN.
151    GossipSyn = 7,
152    /// Gossip SYN reply.
153    GossipSynReply = 8,
154    /// Gossip ACK.
155    GossipAck = 9,
156    /// Gossip digest SYN.
157    GossipDigestSyn = 10,
158    /// Gossip digest ACK.
159    GossipDigestAck = 11,
160    /// Gossip digest ACK round 2.
161    GossipDigestAck2 = 12,
162    /// Gossip shutdown notice.
163    GossipShutdown = 13,
164    /// Explicit handoff chunk frame.
165    ///
166    /// Carries one chunk of a token-range handoff stream from the
167    /// previous owner of the range to the new owner. Distinct from
168    /// the AAE exchange variants so the receiver can route handoff
169    /// frames to the dedicated handoff coordinator without parsing
170    /// the payload first.
171    HandoffChunk = 14,
172    /// Cluster-wide RediSearch FT.SEARCH request frame.
173    ///
174    /// Sent by the FT.SEARCH coordinator on the node that
175    /// received the client request to every primary peer
176    /// covering the index's key range. The payload encodes a
177    /// broadcast request (table name, serialised query body,
178    /// top-K) - see the `dynomite-search` crate's
179    /// `query_fsm::BroadcastRequest`. Routed by the dispatcher
180    /// to the dedicated FT.SEARCH coordinator FSM instead of
181    /// the data-plane stack so the per-peer query runs against
182    /// the local registry rather than being re-forwarded.
183    FtSearchReq = 15,
184    /// Cluster-wide RediSearch FT.SEARCH reply frame.
185    ///
186    /// Returned by every peer that received a [`Self::FtSearchReq`]
187    /// once its local search completed (or the per-peer
188    /// deadline elapsed). The payload encodes the per-peer
189    /// top-K hit list plus a `timed_out` flag the coordinator
190    /// uses to mark partial results.
191    FtSearchRep = 16,
192}
193
194impl DmsgType {
195    /// Build a type from its on-the-wire integer value.
196    ///
197    /// # Examples
198    ///
199    /// ```
200    /// use dynomite::proto::dnode::DmsgType;
201    /// assert_eq!(DmsgType::from_u8(3), Some(DmsgType::Req));
202    /// assert_eq!(DmsgType::from_u8(99), None);
203    /// ```
204    #[must_use]
205    pub fn from_u8(v: u8) -> Option<Self> {
206        Some(match v {
207            0 => DmsgType::Unknown,
208            1 => DmsgType::Debug,
209            2 => DmsgType::ParseError,
210            3 => DmsgType::Req,
211            4 => DmsgType::ReqForward,
212            5 => DmsgType::Res,
213            6 => DmsgType::CryptoHandshake,
214            7 => DmsgType::GossipSyn,
215            8 => DmsgType::GossipSynReply,
216            9 => DmsgType::GossipAck,
217            10 => DmsgType::GossipDigestSyn,
218            11 => DmsgType::GossipDigestAck,
219            12 => DmsgType::GossipDigestAck2,
220            13 => DmsgType::GossipShutdown,
221            14 => DmsgType::HandoffChunk,
222            15 => DmsgType::FtSearchReq,
223            16 => DmsgType::FtSearchRep,
224            _ => return None,
225        })
226    }
227
228    /// Numeric on-the-wire value.
229    ///
230    /// # Examples
231    ///
232    /// ```
233    /// use dynomite::proto::dnode::DmsgType;
234    /// assert_eq!(DmsgType::CryptoHandshake.as_u8(), 6);
235    /// ```
236    #[must_use]
237    pub const fn as_u8(self) -> u8 {
238        self as u8
239    }
240}
241
242/// Encryption bit in [`Dmsg::flags`].
243pub const DMSG_FLAG_ENCRYPTED: u8 = 0x1;
244
245/// Compression bit in [`Dmsg::flags`].
246pub const DMSG_FLAG_COMPRESSED: u8 = 0x2;
247
248/// Parsed DNODE header.
249///
250/// `data` and `payload` hold copies of the on-the-wire bytes. The
251/// encoder side fills both before emitting; the parser fills them as
252/// it advances through the state machine.
253#[derive(Clone, Debug, Default, Eq, PartialEq)]
254pub struct Dmsg {
255    /// Message id.
256    pub id: MsgId,
257    /// Message type.
258    pub ty: DmsgType,
259    /// Flag bit field; encryption is bit 0, compression is bit 1.
260    pub flags: u8,
261    /// Protocol version.
262    pub version: u8,
263    /// True when sender and receiver share a datacenter.
264    pub same_dc: bool,
265    /// Source address recorded by the recv path. Stage 7 leaves it
266    /// `None`; Stage 9 stamps it from the connection state.
267    pub source_address: Option<SocketAddr>,
268    /// Length (in bytes) of the inline data field.
269    pub mlen: u32,
270    /// Inline data: either the single-byte placeholder or the
271    /// RSA-wrapped AES key during the crypto handshake.
272    pub data: Vec<u8>,
273    /// Length (in bytes) of the trailing payload framed by the
274    /// header.
275    pub plen: u32,
276    /// Payload bytes, if collected by the parser.
277    pub payload: Vec<u8>,
278}
279
280impl Dmsg {
281    /// Construct an empty `Dmsg` defaulted the same way the
282    /// reference engine's `dmsg_get` initialises a fresh slot.
283    ///
284    /// # Examples
285    ///
286    /// ```
287    /// use dynomite::proto::dnode::{Dmsg, DmsgType, VERSION_10};
288    /// let d = Dmsg::new();
289    /// assert_eq!(d.ty, DmsgType::Unknown);
290    /// assert_eq!(d.version, VERSION_10);
291    /// assert!(d.same_dc);
292    /// ```
293    #[must_use]
294    pub fn new() -> Self {
295        Self {
296            id: 0,
297            ty: DmsgType::Unknown,
298            flags: 0,
299            version: VERSION_10,
300            same_dc: true,
301            source_address: None,
302            mlen: 0,
303            data: Vec::new(),
304            plen: 0,
305            payload: Vec::new(),
306        }
307    }
308
309    /// True when the encryption flag is set.
310    ///
311    /// # Examples
312    ///
313    /// ```
314    /// use dynomite::proto::dnode::{Dmsg, DMSG_FLAG_ENCRYPTED};
315    /// let mut d = Dmsg::new();
316    /// d.flags = DMSG_FLAG_ENCRYPTED;
317    /// assert!(d.is_encrypted());
318    /// ```
319    #[must_use]
320    pub fn is_encrypted(&self) -> bool {
321        self.flags & DMSG_FLAG_ENCRYPTED != 0
322    }
323
324    /// True when the compression flag is set.
325    ///
326    /// # Examples
327    ///
328    /// ```
329    /// use dynomite::proto::dnode::{Dmsg, DMSG_FLAG_COMPRESSED};
330    /// let mut d = Dmsg::new();
331    /// d.flags = DMSG_FLAG_COMPRESSED;
332    /// assert!(d.is_compressed());
333    /// ```
334    #[must_use]
335    pub fn is_compressed(&self) -> bool {
336        self.flags & DMSG_FLAG_COMPRESSED != 0
337    }
338}
339
340/// Result of a single [`DnodeParser::step`] invocation.
341#[derive(Copy, Clone, Debug, Eq, PartialEq)]
342pub enum ParseStep {
343    /// More bytes are required to advance the state machine. The
344    /// `consumed` field records how many of the input bytes the
345    /// parser already absorbed.
346    NeedMore {
347        /// Number of input bytes the parser absorbed before it
348        /// stopped waiting for more.
349        consumed: usize,
350    },
351    /// The header (up to and including the trailing LF) has been
352    /// parsed. The `consumed` field records the offset just past
353    /// the LF, so the caller can read the payload starting at that
354    /// index.
355    HeaderDone {
356        /// Offset just past the trailing LF.
357        consumed: usize,
358    },
359    /// The parser hit an unrecoverable bad byte. The caller should
360    /// drop the buffer (or split it at `consumed`) and reset.
361    Error {
362        /// Offset of the byte that triggered the error.
363        consumed: usize,
364    },
365}
366
367/// Errors that can be raised when encoding or parsing a DNODE
368/// header without going through the streaming state machine.
369#[derive(Copy, Clone, Debug, Eq, PartialEq)]
370#[non_exhaustive]
371pub enum DnodeError {
372    /// Buffer too small to encode the header.
373    OutOfSpace,
374    /// Header does not begin with the magic literal.
375    BadMagic,
376    /// Numeric field could not be parsed.
377    BadNumber,
378    /// Trailing CRLF missing.
379    MissingCrlf,
380    /// Type discriminator out of range.
381    BadType,
382    /// Inline data shorter than the declared `mlen`.
383    TruncatedData,
384}
385
386/// Streaming DNODE header parser.
387#[derive(Debug)]
388pub struct DnodeParser {
389    state: DynParseState,
390    num: u64,
391    dmsg: Dmsg,
392    data_remaining: u32,
393    magic_progress: u8,
394    /// Whether the previous byte was an ASCII digit. The header
395    /// state machine only transitions out of the numeric header
396    /// fields (MSG_ID, TYPE_ID, BIT_FIELD, VERSION, SAME_DC) when
397    /// the byte immediately preceding the field-terminating space
398    /// was a digit; the parser reproduces this guard so extra
399    /// whitespace (or any other non-digit byte) is rejected with
400    /// the wire protocol's strictness.
401    prev_was_digit: bool,
402}
403
404impl DnodeParser {
405    /// Build a fresh parser positioned at [`DynParseState::Start`].
406    ///
407    /// # Examples
408    ///
409    /// ```
410    /// use dynomite::proto::dnode::{DnodeParser, DynParseState};
411    /// let p = DnodeParser::new();
412    /// assert_eq!(p.state(), DynParseState::Start);
413    /// ```
414    #[must_use]
415    pub fn new() -> Self {
416        Self {
417            state: DynParseState::Start,
418            num: 0,
419            dmsg: Dmsg::new(),
420            data_remaining: 0,
421            magic_progress: 0,
422            prev_was_digit: false,
423        }
424    }
425
426    /// Reset the parser to [`DynParseState::Start`] with a fresh
427    /// accumulator [`Dmsg`].
428    pub fn reset(&mut self) {
429        *self = Self::new();
430    }
431
432    /// Current state.
433    #[must_use]
434    pub fn state(&self) -> DynParseState {
435        self.state
436    }
437
438    /// Borrow the partial [`Dmsg`].
439    #[must_use]
440    pub fn dmsg(&self) -> &Dmsg {
441        &self.dmsg
442    }
443
444    /// Move the parsed [`Dmsg`] out of the parser. Only meaningful
445    /// after a [`ParseStep::HeaderDone`] step.
446    pub fn take_dmsg(&mut self) -> Dmsg {
447        let mut out = Dmsg::new();
448        std::mem::swap(&mut out, &mut self.dmsg);
449        self.state = DynParseState::Start;
450        self.num = 0;
451        self.data_remaining = 0;
452        self.magic_progress = 0;
453        self.prev_was_digit = false;
454        out
455    }
456
457    /// Feed `input` to the parser. The parser advances as far as it
458    /// can and returns one of the three [`ParseStep`] variants.
459    ///
460    /// The state machine is byte-driven and can be reentered with a
461    /// fresh slice when [`ParseStep::NeedMore`] indicates the input
462    /// was truncated mid-header.
463    ///
464    /// # Examples
465    ///
466    /// ```
467    /// use dynomite::proto::dnode::{DnodeParser, ParseStep};
468    /// let mut p = DnodeParser::new();
469    /// let bytes = b"$2014$ 1 3 0 1 1 *1 d *0\r\n";
470    /// match p.step(bytes) {
471    ///     ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
472    ///     other => panic!("unexpected: {other:?}"),
473    /// }
474    /// ```
475    /// The state machine intentionally stays in one function to
476    /// match the reference engine's single-block parser; splitting
477    /// the per-state arms across helpers would obscure the parity.
478    #[allow(clippy::too_many_lines)]
479    pub fn step(&mut self, input: &[u8]) -> ParseStep {
480        let mut idx = 0usize;
481        while idx < input.len() {
482            let ch = input[idx];
483            match self.state {
484                DynParseState::Start => {
485                    // Phase 1: skip leading whitespace, identical
486                    // to the C engine's `while (ch == ' ')` arm.
487                    if self.magic_progress == 0 {
488                        if ch == b' ' {
489                            idx += 1;
490                            continue;
491                        }
492                        if ch != b'$' {
493                            return ParseStep::Error { consumed: idx };
494                        }
495                    }
496                    // Phase 2: byte-incrementally match the magic
497                    // literal so split inputs are tolerated.
498                    let want = MAGIC[usize::from(self.magic_progress)];
499                    if ch != want {
500                        return ParseStep::Error { consumed: idx };
501                    }
502                    self.magic_progress += 1;
503                    idx += 1;
504                    if usize::from(self.magic_progress) == MAGIC.len() {
505                        self.state = DynParseState::MagicString;
506                        self.magic_progress = 0;
507                    }
508                    continue;
509                }
510                DynParseState::MagicString => {
511                    if ch == b' ' {
512                        self.state = DynParseState::MsgId;
513                        self.num = 0;
514                        idx += 1;
515                        continue;
516                    }
517                    return ParseStep::Error { consumed: idx };
518                }
519                DynParseState::MsgId => {
520                    // DYN_MSG_ID state: digits accumulate, a single
521                    // space terminates the field but only when the
522                    // byte immediately
523                    // before it was a digit. Anything else is
524                    // rejected (the C engine resets to DYN_START
525                    // and lets the recovery path retry; the Rust
526                    // streaming parser surfaces an error so the
527                    // caller can drop the buffer).
528                    if ch.is_ascii_digit() {
529                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
530                        self.prev_was_digit = true;
531                        idx += 1;
532                        continue;
533                    }
534                    if ch == b' ' && self.prev_was_digit {
535                        self.dmsg.id = self.num;
536                        self.state = DynParseState::TypeId;
537                        self.num = 0;
538                        self.prev_was_digit = false;
539                        idx += 1;
540                        continue;
541                    }
542                    return ParseStep::Error { consumed: idx };
543                }
544                DynParseState::TypeId => {
545                    if ch.is_ascii_digit() {
546                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
547                        self.prev_was_digit = true;
548                        idx += 1;
549                        continue;
550                    }
551                    if ch == b' ' && self.prev_was_digit {
552                        self.dmsg.ty = match DmsgType::from_u8(self.num as u8) {
553                            Some(t) => t,
554                            None => return ParseStep::Error { consumed: idx },
555                        };
556                        self.state = DynParseState::BitField;
557                        self.num = 0;
558                        self.prev_was_digit = false;
559                        idx += 1;
560                        continue;
561                    }
562                    return ParseStep::Error { consumed: idx };
563                }
564                DynParseState::BitField => {
565                    if ch.is_ascii_digit() {
566                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
567                        self.prev_was_digit = true;
568                        idx += 1;
569                        continue;
570                    }
571                    if ch == b' ' && self.prev_was_digit {
572                        self.dmsg.flags = (self.num as u8) & 0xF;
573                        self.state = DynParseState::Version;
574                        self.num = 0;
575                        self.prev_was_digit = false;
576                        idx += 1;
577                        continue;
578                    }
579                    return ParseStep::Error { consumed: idx };
580                }
581                DynParseState::Version => {
582                    if ch.is_ascii_digit() {
583                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
584                        self.prev_was_digit = true;
585                        idx += 1;
586                        continue;
587                    }
588                    if ch == b' ' && self.prev_was_digit {
589                        self.dmsg.version = self.num as u8;
590                        self.state = DynParseState::SameDc;
591                        self.num = 0;
592                        self.prev_was_digit = false;
593                        idx += 1;
594                        continue;
595                    }
596                    return ParseStep::Error { consumed: idx };
597                }
598                DynParseState::SameDc => {
599                    if ch.is_ascii_digit() {
600                        self.dmsg.same_dc = ch != b'0';
601                        self.prev_was_digit = true;
602                        idx += 1;
603                        continue;
604                    }
605                    if ch == b' ' && self.prev_was_digit {
606                        self.state = DynParseState::DataLen;
607                        self.num = 0;
608                        self.prev_was_digit = false;
609                        idx += 1;
610                        continue;
611                    }
612                    return ParseStep::Error { consumed: idx };
613                }
614                DynParseState::Star | DynParseState::DataLen => {
615                    if ch == b'*' {
616                        idx += 1;
617                        continue;
618                    }
619                    if ch.is_ascii_digit() {
620                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
621                        // Reject pathological-size length fields
622                        // before the cast to u32 wraps and a
623                        // downstream Vec::reserve allocates the
624                        // wrapped value as bytes. See MAX_DATA_LEN.
625                        if self.num > MAX_DATA_LEN {
626                            return ParseStep::Error { consumed: idx };
627                        }
628                        idx += 1;
629                        continue;
630                    }
631                    if ch == b' ' && self.state == DynParseState::DataLen {
632                        self.dmsg.mlen = self.num as u32;
633                        self.data_remaining = self.dmsg.mlen;
634                        self.dmsg.data.clear();
635                        self.dmsg.data.reserve(self.data_remaining as usize);
636                        self.state = DynParseState::Data;
637                        self.num = 0;
638                        idx += 1;
639                        continue;
640                    }
641                    return ParseStep::Error { consumed: idx };
642                }
643                DynParseState::Data => {
644                    if self.data_remaining == 0 {
645                        self.state = DynParseState::SpacesBeforePayloadLen;
646                        continue;
647                    }
648                    let take = std::cmp::min(self.data_remaining as usize, input.len() - idx);
649                    self.dmsg.data.extend_from_slice(&input[idx..idx + take]);
650                    self.data_remaining -= take as u32;
651                    idx += take;
652                    if self.data_remaining == 0 {
653                        self.state = DynParseState::SpacesBeforePayloadLen;
654                    }
655                    continue;
656                }
657                DynParseState::SpacesBeforePayloadLen => {
658                    if ch == b' ' {
659                        idx += 1;
660                        continue;
661                    }
662                    if ch == b'*' {
663                        self.state = DynParseState::PayloadLen;
664                        self.num = 0;
665                        idx += 1;
666                        continue;
667                    }
668                    return ParseStep::Error { consumed: idx };
669                }
670                DynParseState::PayloadLen => {
671                    if ch.is_ascii_digit() {
672                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
673                        if self.num > MAX_DATA_LEN {
674                            return ParseStep::Error { consumed: idx };
675                        }
676                        idx += 1;
677                        continue;
678                    }
679                    if ch == b'\r' {
680                        self.dmsg.plen = self.num as u32;
681                        self.state = DynParseState::CrlfBeforeDone;
682                        self.num = 0;
683                        idx += 1;
684                        continue;
685                    }
686                    return ParseStep::Error { consumed: idx };
687                }
688                DynParseState::CrlfBeforeDone => {
689                    if ch == b'\n' {
690                        self.state = DynParseState::Done;
691                        idx += 1;
692                        return ParseStep::HeaderDone { consumed: idx };
693                    }
694                    return ParseStep::Error { consumed: idx };
695                }
696                DynParseState::Done | DynParseState::PostDone | DynParseState::Unknown => {
697                    return ParseStep::HeaderDone { consumed: idx };
698                }
699            }
700        }
701        ParseStep::NeedMore { consumed: idx }
702    }
703}
704
705impl Default for DnodeParser {
706    fn default() -> Self {
707        Self::new()
708    }
709}
710
711/// Encode a DNODE header into the writable region of `mbuf`.
712///
713/// `aes_key_payload`, when `Some`, is written as the inline data
714/// field; this is how the crypto handshake transports the
715/// RSA-wrapped AES key. When `None`, a single-byte `'d'` placeholder
716/// is emitted.
717///
718/// `flags` is taken verbatim (the encryption bit must be set by the
719/// caller, alongside any compression bit).
720///
721/// The encoder writes the entire header as a single contiguous
722/// region; if `mbuf` lacks the necessary capacity,
723/// [`DnodeError::OutOfSpace`] is returned.
724///
725/// # Examples
726///
727/// ```
728/// use dynomite::io::mbuf::MbufPool;
729/// use dynomite::proto::dnode::{dmsg_write, DmsgType};
730///
731/// let pool = MbufPool::default();
732/// let mut buf = pool.get();
733/// dmsg_write(
734///     &mut buf,
735///     /* msg_id */ 1,
736///     DmsgType::Req,
737///     /* flags */ 0,
738///     /* same_dc */ true,
739///     /* aes_key_payload */ None,
740///     /* plen */ 0,
741/// )
742/// .unwrap();
743/// assert!(buf.readable().starts_with(b"   $2014$ 1 3 0"));
744/// ```
745pub fn dmsg_write(
746    mbuf: &mut Mbuf,
747    msg_id: MsgId,
748    ty: DmsgType,
749    flags: u8,
750    same_dc: bool,
751    aes_key_payload: Option<&[u8]>,
752    plen: u32,
753) -> Result<(), DnodeError> {
754    let header = build_header(msg_id, ty, flags, same_dc, aes_key_payload, plen, false);
755    write_chain(mbuf, &header)
756}
757
758/// Encode a gossip-flavored DNODE header.
759///
760/// Mirrors the reference engine's `dmsg_write_mbuf`, which differs
761/// from [`dmsg_write`] only in the placeholder byte emitted when no
762/// AES key payload accompanies the header (`'a'` instead of `'d'`).
763///
764/// # Examples
765///
766/// ```
767/// use dynomite::io::mbuf::MbufPool;
768/// use dynomite::proto::dnode::{dmsg_write_mbuf, DmsgType};
769///
770/// let pool = MbufPool::default();
771/// let mut buf = pool.get();
772/// dmsg_write_mbuf(
773///     &mut buf,
774///     /* msg_id */ 5,
775///     DmsgType::GossipSyn,
776///     /* flags */ 0,
777///     /* same_dc */ true,
778///     /* aes_key_payload */ None,
779///     /* plen */ 64,
780/// )
781/// .unwrap();
782/// assert!(buf.readable().contains(&b'a'));
783/// ```
784pub fn dmsg_write_mbuf(
785    mbuf: &mut Mbuf,
786    msg_id: MsgId,
787    ty: DmsgType,
788    flags: u8,
789    same_dc: bool,
790    aes_key_payload: Option<&[u8]>,
791    plen: u32,
792) -> Result<(), DnodeError> {
793    let header = build_header(msg_id, ty, flags, same_dc, aes_key_payload, plen, true);
794    write_chain(mbuf, &header)
795}
796
797fn build_header(
798    msg_id: MsgId,
799    ty: DmsgType,
800    flags: u8,
801    same_dc: bool,
802    aes_key_payload: Option<&[u8]>,
803    plen: u32,
804    gossip_placeholder: bool,
805) -> Vec<u8> {
806    use std::io::Write as _;
807    let mut buf: Vec<u8> = Vec::with_capacity(64);
808    // Three leading spaces are part of the magic literal as written
809    // on the wire; the parser tolerates and skips them in DYN_START.
810    buf.extend_from_slice(b"   $2014$ ");
811    let _ = write!(buf, "{msg_id}");
812    buf.push(b' ');
813    let _ = write!(buf, "{}", ty.as_u8());
814    buf.push(b' ');
815    let _ = write!(buf, "{}", flags & 0xF);
816    buf.push(b' ');
817    let _ = write!(buf, "{VERSION_10}");
818    buf.push(b' ');
819    buf.push(if same_dc { b'1' } else { b'0' });
820    buf.push(b' ');
821    buf.push(b'*');
822    if let Some(payload) = aes_key_payload {
823        let _ = write!(buf, "{}", payload.len());
824        buf.push(b' ');
825        buf.extend_from_slice(payload);
826    } else {
827        buf.extend_from_slice(b"1 ");
828        buf.push(if gossip_placeholder {
829            GOSSIP_PLACEHOLDER_DATA
830        } else {
831            HANDSHAKE_PLACEHOLDER_DATA
832        });
833    }
834    buf.push(b' ');
835    buf.push(b'*');
836    let _ = write!(buf, "{plen}");
837    buf.extend_from_slice(CRLF);
838    buf
839}
840
841fn write_chain(mbuf: &mut Mbuf, payload: &[u8]) -> Result<(), DnodeError> {
842    if mbuf.remaining() < payload.len() {
843        return Err(DnodeError::OutOfSpace);
844    }
845    let n = mbuf.recv(payload);
846    debug_assert_eq!(n, payload.len());
847    Ok(())
848}
849
850/// Sync byte parser that drives a request message's DNODE header
851/// state machine.
852///
853/// The parser walks the contiguous bytes spanning the message's
854/// mbuf chain and updates the [`Msg`] in place. On a fully parsed
855/// header, the function attaches the [`Dmsg`] to the message and
856/// returns `MsgParseResult::Ok`. On truncated input the parser
857/// returns `MsgParseResult::Again`. On invalid bytes the parser
858/// records `MsgParseResult::Error` and surfaces the same value.
859///
860/// The async wrapping (per-connection task scheduling, decryption
861/// hand-off when the encryption bit is set) ships in Stage 9.
862///
863/// # Examples
864///
865/// ```
866/// use dynomite::io::mbuf::MbufPool;
867/// use dynomite::msg::{Msg, MsgType};
868/// use dynomite::proto::dnode::{parse_req, DmsgType, DynParseState};
869///
870/// let pool = MbufPool::default();
871/// let mut msg = Msg::new(0, MsgType::Unknown, true);
872/// let mut mb = pool.get();
873/// mb.recv(b"$2014$ 1 3 0 1 1 *1 d *0\r\n");
874/// msg.mbufs_mut().push_back(mb);
875/// msg.recompute_mlen();
876/// let result = parse_req(&mut msg);
877/// assert_eq!(msg.dyn_parse_state(), DynParseState::Done);
878/// assert_eq!(msg.dmsg().unwrap().ty, DmsgType::Req);
879/// drop(result);
880/// ```
881pub fn parse_req(msg: &mut Msg) -> MsgParseResult {
882    parse_msg(msg, false)
883}
884
885/// Sync byte parser counterpart to [`parse_req`] for response
886/// messages.
887///
888/// # Examples
889///
890/// ```
891/// use dynomite::io::mbuf::MbufPool;
892/// use dynomite::msg::{Msg, MsgType};
893/// use dynomite::proto::dnode::{parse_rsp, DmsgType};
894///
895/// let pool = MbufPool::default();
896/// let mut msg = Msg::new(0, MsgType::Unknown, false);
897/// let mut mb = pool.get();
898/// mb.recv(b"$2014$ 9 5 0 1 1 *1 d *0\r\n");
899/// msg.mbufs_mut().push_back(mb);
900/// msg.recompute_mlen();
901/// let _ = parse_rsp(&mut msg);
902/// assert_eq!(msg.dmsg().unwrap().ty, DmsgType::Res);
903/// ```
904pub fn parse_rsp(msg: &mut Msg) -> MsgParseResult {
905    parse_msg(msg, true)
906}
907
908fn parse_msg(msg: &mut Msg, _is_response: bool) -> MsgParseResult {
909    // Flatten the chain into a single buffer for parsing. The
910    // reference engine walks the chain byte by byte and tolerates
911    // splits at arbitrary boundaries; the Rust port drives the same
912    // state machine over a contiguous slice. Stage 9 will replace
913    // this with a streaming feed when the connection FSM lands.
914    let mut bytes: Vec<u8> = Vec::with_capacity(msg.mbufs().total_len());
915    for mbuf in msg.mbufs() {
916        bytes.extend_from_slice(mbuf.readable());
917    }
918
919    let mut parser = DnodeParser::new();
920    parser.state = msg.dyn_parse_state();
921    match parser.step(&bytes) {
922        ParseStep::HeaderDone { .. } => {
923            let dmsg = parser.take_dmsg();
924            msg.set_dyn_parse_state(DynParseState::Done);
925            msg.set_dmsg(dmsg);
926            msg.set_parse_result(MsgParseResult::Ok);
927            MsgParseResult::Ok
928        }
929        ParseStep::NeedMore { .. } => {
930            msg.set_dyn_parse_state(parser.state);
931            msg.set_parse_result(MsgParseResult::Again);
932            MsgParseResult::Again
933        }
934        ParseStep::Error { .. } => {
935            msg.set_dyn_parse_state(DynParseState::Unknown);
936            msg.set_parse_result(MsgParseResult::Error);
937            MsgParseResult::Error
938        }
939    }
940}
941
942/// Outcome of [`dmsg_process`].
943///
944/// `Bypass` means the header has been recognised as control traffic
945/// and the cluster layer should not pass the message further down
946/// the protocol stack.
947#[derive(Copy, Clone, Debug, Eq, PartialEq)]
948pub enum DmsgDispatch {
949    /// Frame consumed by a control-plane handler.
950    Bypass,
951    /// Frame should continue through the data-plane stack.
952    Forward,
953}
954
955/// Stage 7 dispatcher: classify a parsed [`Dmsg`] as
956/// control-plane traffic the cluster layer should consume directly,
957/// or data-plane traffic that should continue through the protocol
958/// stack.
959///
960/// Stage 10 will extend this with the gossip-message decoders. For
961/// now, only the message-shape side of the dispatch is in place.
962///
963/// # Examples
964///
965/// ```
966/// use dynomite::proto::dnode::{dmsg_process, Dmsg, DmsgDispatch, DmsgType};
967///
968/// let mut d = Dmsg::new();
969/// d.ty = DmsgType::CryptoHandshake;
970/// assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
971///
972/// // Gossip variants other than SYN / SYN_REPLY fall through.
973/// d.ty = DmsgType::GossipShutdown;
974/// assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
975///
976/// d.ty = DmsgType::Req;
977/// assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
978/// ```
979#[must_use]
980pub fn dmsg_process(dmsg: &Dmsg) -> DmsgDispatch {
981    // Dmsg dispatch table: only CRYPTO_HANDSHAKE,
982    // GOSSIP_SYN, and GOSSIP_SYN_REPLY short-circuit; the other
983    // gossip variants (ACK, DIGEST_SYN, DIGEST_ACK, DIGEST_ACK2,
984    // SHUTDOWN) fall through to the default branch and are
985    // forwarded to the cluster handlers (which Stage 10 will wire
986    // up). HANDOFF_CHUNK frames are control-plane traffic for
987    // the explicit handoff coordinator and bypass the data-plane
988    // stack alongside the crypto / gossip handshake variants.
989    match dmsg.ty {
990        DmsgType::CryptoHandshake
991        | DmsgType::GossipSyn
992        | DmsgType::GossipSynReply
993        | DmsgType::HandoffChunk
994        | DmsgType::FtSearchReq
995        | DmsgType::FtSearchRep => DmsgDispatch::Bypass,
996        _ => DmsgDispatch::Forward,
997    }
998}
999
1000/// Drain `chain` into a contiguous `Vec<u8>` recycling each chunk
1001/// back to `pool`. Useful for tests and for the Stage 9 path that
1002/// needs a flat buffer of decrypted payload bytes.
1003pub fn flatten_chain(chain: &mut MbufQueue) -> Vec<u8> {
1004    let mut out = Vec::with_capacity(chain.total_len());
1005    while let Some(buf) = chain.pop_front() {
1006        out.extend_from_slice(buf.readable());
1007    }
1008    out
1009}
1010
1011/// Peer-handshake control payload exchanged on top of a
1012/// [`DmsgType::GossipSyn`] frame.
1013///
1014/// Today the handshake carries the cluster-wide capability
1015/// advertisement (see [`crate::cluster::capability`]). Future
1016/// fields will be appended as new typed records; older peers
1017/// ignore unknown trailing bytes.
1018///
1019/// # Wire format
1020///
1021/// ```text
1022/// magic(4) = "DHS1"
1023/// flags(2) = 0
1024/// CapabilityAd (length-prefixed, see
1025///                `CapabilityAd::encode` for the exact layout)
1026/// ```
1027///
1028/// All multi-byte integers are little-endian. The encoding uses
1029/// only the standard library; no external codec is pulled in.
1030///
1031/// # Examples
1032///
1033/// ```
1034/// use dynomite::cluster::capability::{CapabilityAd, CapabilityAdEntry};
1035/// use dynomite::proto::dnode::Handshake;
1036/// let ad = CapabilityAd::from_entries(vec![
1037///     CapabilityAdEntry::new("framing".into(), vec![vec![1, 0, 0, 0]]),
1038/// ]);
1039/// let hs = Handshake::new(ad.clone());
1040/// let bytes = hs.encode();
1041/// let back = Handshake::decode(&bytes).unwrap();
1042/// assert_eq!(back.capabilities(), &ad);
1043/// ```
1044#[derive(Clone, Debug, Default, Eq, PartialEq)]
1045pub struct Handshake {
1046    capabilities: crate::cluster::capability::CapabilityAd,
1047}
1048
1049impl Handshake {
1050    /// Magic literal that opens every handshake payload.
1051    pub const MAGIC: [u8; 4] = *b"DHS1";
1052
1053    /// Build a handshake carrying `capabilities`.
1054    #[must_use]
1055    pub fn new(capabilities: crate::cluster::capability::CapabilityAd) -> Self {
1056        Self { capabilities }
1057    }
1058
1059    /// Borrow the embedded capability advertisement.
1060    #[must_use]
1061    pub fn capabilities(&self) -> &crate::cluster::capability::CapabilityAd {
1062        &self.capabilities
1063    }
1064
1065    /// Consume the handshake and return the embedded
1066    /// advertisement.
1067    #[must_use]
1068    pub fn into_capabilities(self) -> crate::cluster::capability::CapabilityAd {
1069        self.capabilities
1070    }
1071
1072    /// Serialise the handshake to a length-prefixed byte
1073    /// stream.
1074    #[must_use]
1075    pub fn encode(&self) -> Vec<u8> {
1076        let cap_bytes = self.capabilities.encode();
1077        let mut out = Vec::with_capacity(Self::MAGIC.len() + 2 + cap_bytes.len());
1078        out.extend_from_slice(&Self::MAGIC);
1079        out.extend_from_slice(&0u16.to_le_bytes()); // flags
1080        out.extend_from_slice(&cap_bytes);
1081        out
1082    }
1083
1084    /// Inverse of [`Handshake::encode`]. Surfaces a typed error
1085    /// when the magic / version is wrong or the embedded
1086    /// advertisement is malformed.
1087    pub fn decode(bytes: &[u8]) -> Result<Self, crate::cluster::capability::CapabilityCodecError> {
1088        use crate::cluster::capability::CapabilityCodecError;
1089        if bytes.len() < Self::MAGIC.len() + 2 {
1090            return Err(CapabilityCodecError::Truncated);
1091        }
1092        if bytes[..Self::MAGIC.len()] != Self::MAGIC {
1093            return Err(CapabilityCodecError::BadMagic);
1094        }
1095        // Flags are reserved; the only currently legal value is
1096        // zero. Any non-zero value is reserved for future use
1097        // and rejected here so older builds fail closed.
1098        let flags_off = Self::MAGIC.len();
1099        let flags = u16::from_le_bytes([bytes[flags_off], bytes[flags_off + 1]]);
1100        if flags != 0 {
1101            return Err(CapabilityCodecError::BadMagic);
1102        }
1103        let cap_bytes = &bytes[flags_off + 2..];
1104        let capabilities = crate::cluster::capability::CapabilityAd::decode(cap_bytes)?;
1105        Ok(Self { capabilities })
1106    }
1107
1108    /// Number of bytes the handshake's fixed-size prefix
1109    /// occupies before the embedded advertisement. Useful in
1110    /// tests that assert the on-the-wire delta.
1111    #[must_use]
1112    pub const fn header_len() -> usize {
1113        Self::MAGIC.len() + 2
1114    }
1115}
1116
1117#[cfg(test)]
1118mod tests {
1119    use super::*;
1120    use crate::io::mbuf::MbufPool;
1121
1122    #[test]
1123    fn parse_simple_req() {
1124        let mut p = DnodeParser::new();
1125        let bytes = b"$2014$ 1 3 0 1 1 *1 d *0\r\n";
1126        match p.step(bytes) {
1127            ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1128            other => panic!("unexpected: {other:?}"),
1129        }
1130        let d = p.take_dmsg();
1131        assert_eq!(d.id, 1);
1132        assert_eq!(d.ty, DmsgType::Req);
1133        assert_eq!(d.flags, 0);
1134        assert_eq!(d.version, 1);
1135        assert!(d.same_dc);
1136        assert_eq!(d.mlen, 1);
1137        assert_eq!(d.data, b"d");
1138        assert_eq!(d.plen, 0);
1139    }
1140
1141    #[test]
1142    fn parse_payload_len() {
1143        let mut p = DnodeParser::new();
1144        let bytes = b"$2014$ 2 3 0 1 1 *1 d *413\r\n";
1145        match p.step(bytes) {
1146            ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1147            other => panic!("unexpected: {other:?}"),
1148        }
1149        assert_eq!(p.dmsg().plen, 413);
1150    }
1151
1152    #[test]
1153    fn parse_three_back_to_back() {
1154        let mut input: Vec<u8> = Vec::new();
1155        input.extend_from_slice(b"$2014$ 1 3 0 1 1 *1 d *0\r\n");
1156        input.extend_from_slice(b"some redis bytes here ignored");
1157        input.extend_from_slice(b"$2014$ 2 3 0 1 1 *1 d *3\r\nABC");
1158        input.extend_from_slice(b"$2014$ 3 3 0 1 1 *1 d *0\r\n");
1159        let mut p = DnodeParser::new();
1160        let mut idx = 0;
1161        let mut count = 0;
1162        while idx < input.len() {
1163            match p.step(&input[idx..]) {
1164                ParseStep::HeaderDone { consumed } => {
1165                    let d = p.take_dmsg();
1166                    count += 1;
1167                    let after_header = idx + consumed;
1168                    if count == 1 {
1169                        assert_eq!(d.id, 1);
1170                        // skip past the redis bytes by scanning for the next '$'
1171                        idx = input[after_header..]
1172                            .iter()
1173                            .position(|&b| b == b'$')
1174                            .map_or(input.len(), |n| after_header + n);
1175                    } else if count == 2 {
1176                        assert_eq!(d.id, 2);
1177                        assert_eq!(d.plen, 3);
1178                        idx = after_header + d.plen as usize;
1179                    } else {
1180                        assert_eq!(d.id, 3);
1181                        idx = after_header;
1182                    }
1183                    p.reset();
1184                }
1185                ParseStep::NeedMore { .. } => {
1186                    break;
1187                }
1188                ParseStep::Error { consumed } => {
1189                    idx += consumed.max(1);
1190                    p.reset();
1191                }
1192            }
1193        }
1194        assert_eq!(count, 3);
1195    }
1196
1197    #[test]
1198    fn need_more_when_truncated() {
1199        let mut p = DnodeParser::new();
1200        let prefix = b"$2014$ 1 3 0 1 1 *1 d *";
1201        match p.step(prefix) {
1202            ParseStep::NeedMore { consumed } => assert_eq!(consumed, prefix.len()),
1203            other => panic!("unexpected: {other:?}"),
1204        }
1205        let suffix = b"42\r\n";
1206        match p.step(suffix) {
1207            ParseStep::HeaderDone { consumed } => assert_eq!(consumed, suffix.len()),
1208            other => panic!("unexpected: {other:?}"),
1209        }
1210        assert_eq!(p.take_dmsg().plen, 42);
1211    }
1212
1213    #[test]
1214    fn parse_error_on_garbage_prefix() {
1215        let mut p = DnodeParser::new();
1216        match p.step(b"!nope") {
1217            ParseStep::Error { consumed } => assert_eq!(consumed, 0),
1218            other => panic!("unexpected: {other:?}"),
1219        }
1220    }
1221
1222    /// Regression for the libfuzzer 1h soak finding 2026-06-02:
1223    /// a numeric DataLen field that exceeds [`MAX_DATA_LEN`]
1224    /// must be rejected with `ParseStep::Error` BEFORE the
1225    /// downstream `Vec::reserve` would convert the wrapped u32
1226    /// into a multi-gigabyte malloc.
1227    #[test]
1228    fn parse_rejects_oversized_data_len() {
1229        let mut p = DnodeParser::new();
1230        // 11 ones drives self.num to 11_111_111_111, which casts
1231        // to u32 as 2_521_176_519 (~2.4 GiB). Pre-fix the parser
1232        // accepted this and called Vec::reserve(2_521_176_519).
1233        let bytes = b"$2014$ 1 3 0 1 1 *11111111111 ";
1234        match p.step(bytes) {
1235            ParseStep::Error { consumed: _ } => (),
1236            other => panic!("expected Error, got {other:?}"),
1237        }
1238    }
1239
1240    /// Regression for the libfuzzer 1h soak finding 2026-06-02:
1241    /// the captured 112-byte OOM artifact must drive `step()`
1242    /// to a clean Error rather than allocating gigabytes.
1243    #[test]
1244    fn parse_oom_artifact_2026_06_02() {
1245        let bytes = include_bytes!("../../../fuzz/seeds/dnode_parse/regression-oom-2026-06-02");
1246        let mut p = DnodeParser::new();
1247        match p.step(bytes) {
1248            ParseStep::Error { .. } | ParseStep::HeaderDone { .. } => (),
1249            ParseStep::NeedMore { .. } => panic!("unexpected NeedMore"),
1250        }
1251    }
1252
1253    #[test]
1254    fn writer_round_trip_unencrypted() {
1255        let pool = MbufPool::default();
1256        let mut buf = pool.get();
1257        dmsg_write(&mut buf, 42, DmsgType::Req, 0, true, None, 0).unwrap();
1258        let bytes = buf.readable().to_vec();
1259        let mut p = DnodeParser::new();
1260        let step = p.step(&bytes);
1261        assert!(matches!(step, ParseStep::HeaderDone { .. }));
1262        let d = p.take_dmsg();
1263        assert_eq!(d.id, 42);
1264        assert_eq!(d.ty, DmsgType::Req);
1265        assert_eq!(d.flags, 0);
1266        assert!(d.same_dc);
1267        assert_eq!(d.mlen, 1);
1268        assert_eq!(d.data, b"d");
1269        assert_eq!(d.plen, 0);
1270    }
1271
1272    #[test]
1273    fn writer_round_trip_with_aes_payload() {
1274        let pool = MbufPool::default();
1275        let mut buf = pool.get();
1276        let payload = vec![0xAB; 128];
1277        dmsg_write(
1278            &mut buf,
1279            7,
1280            DmsgType::CryptoHandshake,
1281            DMSG_FLAG_ENCRYPTED,
1282            false,
1283            Some(&payload),
1284            512,
1285        )
1286        .unwrap();
1287        let bytes = buf.readable().to_vec();
1288        let mut p = DnodeParser::new();
1289        match p.step(&bytes) {
1290            ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1291            other => panic!("unexpected: {other:?}"),
1292        }
1293        let d = p.take_dmsg();
1294        assert_eq!(d.id, 7);
1295        assert_eq!(d.ty, DmsgType::CryptoHandshake);
1296        assert!(d.is_encrypted());
1297        assert!(!d.same_dc);
1298        assert_eq!(d.data, payload);
1299        assert_eq!(d.plen, 512);
1300    }
1301
1302    #[test]
1303    fn dispatcher_classifies_control_plane() {
1304        let mut d = Dmsg::new();
1305        // Pin the exact three variants the C `dmsg_process`
1306        // bypasses.
1307        for ty in [
1308            DmsgType::CryptoHandshake,
1309            DmsgType::GossipSyn,
1310            DmsgType::GossipSynReply,
1311        ] {
1312            d.ty = ty;
1313            assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1314        }
1315        // Every other gossip variant falls through to the default
1316        // branch (forward), matching the C switch.
1317        for ty in [
1318            DmsgType::GossipAck,
1319            DmsgType::GossipDigestSyn,
1320            DmsgType::GossipDigestAck,
1321            DmsgType::GossipDigestAck2,
1322            DmsgType::GossipShutdown,
1323            DmsgType::Req,
1324            DmsgType::ReqForward,
1325            DmsgType::Res,
1326        ] {
1327            d.ty = ty;
1328            assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
1329        }
1330        // HandoffChunk routes to the explicit handoff coordinator
1331        // and is therefore bypassed alongside the handshake
1332        // variants.
1333        d.ty = DmsgType::HandoffChunk;
1334        assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1335        // FT.SEARCH coordinator messages are routed to the
1336        // dedicated query-fsm coordinator via the same
1337        // bypass path used by the handoff coordinator.
1338        for ty in [DmsgType::FtSearchReq, DmsgType::FtSearchRep] {
1339            d.ty = ty;
1340            assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1341        }
1342    }
1343}