dynomite/proto/dnode.rs
1//! DNODE wire codec.
2//!
3//! The DNODE protocol frames every Dynomite peer-to-peer message
4//! with a small ASCII header followed by an opaque payload. The
5//! header carries the message id, type tag, encryption/compression
6//! flags, protocol version, same-datacenter bit, an inline data
7//! field (either a one-byte placeholder or an RSA-wrapped AES key),
8//! and the byte length of the payload that follows after `\r\n`.
9//!
10//! The reference engine implements the parser as a single state
11//! machine driven byte-by-byte. This module ports the same machine
12//! into safe Rust and exposes:
13//!
14//! * [`DynParseState`] - the parser's state alphabet.
15//! * [`DmsgType`] - the full set of message-type discriminators.
16//! * [`Dmsg`] - the in-memory header.
17//! * [`DnodeParser`] - the state machine, advanced by feeding bytes
18//! through [`DnodeParser::step`].
19//! * [`dmsg_write`] / [`dmsg_write_mbuf`] - the canonical encoders.
20//! * [`parse_req`] / [`parse_rsp`] - thin sync wrappers around the
21//! parser that operate on a [`crate::msg::Msg`]'s mbuf chain.
22//! * [`dmsg_process`] - dispatcher that classifies a parsed
23//! [`Dmsg`] by type for the cluster layer to act on.
24//!
25//! The encoder accepts an optional `aes_key_payload`: when present,
26//! the caller provides the bytes the inline data field should hold
27//! (the RSA-wrapped AES key produced by [`crate::crypto::Crypto`]).
28//! When absent, the encoder writes the single-byte `'d'` placeholder
29//! the reference engine emits after the first handshake message.
30
31// The parser truncates accumulated decimals into the same fixed
32// bit widths the reference engine uses on the wire (`u8` for the
33// type and flags, `u32` for the data and payload lengths). The
34// allowance keeps the Rust port faithful to the C `(uint8_t)num`
35// and `(uint32_t)num` casts; out-of-range numerals are surfaced as
36// parse errors elsewhere in the state machine.
37#![allow(clippy::cast_possible_truncation)]
38#![allow(clippy::needless_continue)]
39
40use std::net::SocketAddr;
41
42use crate::core::types::MsgId;
43use crate::io::mbuf::{Mbuf, MbufQueue};
44use crate::msg::message::Msg;
45use crate::msg::message::MsgParseResult;
46
47/// Magic literal that opens every DNODE header.
48pub const MAGIC: &[u8] = b"$2014$";
49
50/// Default protocol version emitted by [`dmsg_write`]. Mirrors
51/// `VERSION_10` in the reference engine.
52pub const VERSION_10: u8 = 1;
53
54/// CRLF delimiter that separates the DNODE header from its payload.
55pub const CRLF: &[u8] = b"\r\n";
56
57/// Single-byte placeholder used by [`dmsg_write`] when no AES key
58/// payload accompanies the header.
59pub const HANDSHAKE_PLACEHOLDER_DATA: u8 = b'd';
60
61/// Single-byte placeholder used by [`dmsg_write_mbuf`] when no AES
62/// key payload accompanies the header. The gossip path emits `'a'`
63/// instead of `'d'` to disambiguate the two encoder flavours.
64pub const GOSSIP_PLACEHOLDER_DATA: u8 = b'a';
65
66/// Per-frame upper bound on a parser-accepted length field.
67///
68/// The on-the-wire DNODE header carries `mlen` and `plen` as ASCII
69/// decimal numerals that the streaming parser accumulates into a
70/// `u64` before casting to the wire's `u32`. Without an explicit
71/// cap on the accumulator, a single byte run of `1`s inflates
72/// `self.num` past `u32::MAX`; the silent truncation then drives
73/// [`Vec::reserve`] into a multi-gigabyte malloc (libfuzzer 1h soak
74/// finding 2026-06-02, captured at
75/// `crates/fuzz/seeds/dnode_parse/regression-oom-2026-06-02`).
76///
77/// 256 MiB is well above any legitimate DNODE frame on the wire
78/// today (the largest production payloads we have observed are a
79/// few hundred KiB) while staying well below an allocation that
80/// would produce a real OOM under typical RSS budgets. The parser
81/// surfaces [`ParseStep::Error`] the moment any DataLen or
82/// PayloadLen accumulator exceeds this bound.
83pub const MAX_DATA_LEN: u64 = 256 * 1024 * 1024;
84
85/// Parser state transitions.
86///
87/// Each variant matches a state in the reference engine's
88/// `dyn_parse_state_t`. The numeric values match the C enum's
89/// numeric values to keep external parity tooling honest.
90#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
91pub enum DynParseState {
92 /// Initial state; consumes leading whitespace until the magic
93 /// literal is observed.
94 #[default]
95 Start,
96 /// `$2014$` was matched; awaiting the trailing space.
97 MagicString,
98 /// Reading the decimal message id.
99 MsgId,
100 /// Reading the decimal message type.
101 TypeId,
102 /// Reading the decimal flags bit field.
103 BitField,
104 /// Reading the decimal protocol version.
105 Version,
106 /// Reading the same-datacenter digit.
107 SameDc,
108 /// Awaiting the leading `*` before the data length.
109 Star,
110 /// Reading the decimal data length.
111 DataLen,
112 /// Consuming the inline data of `mlen` bytes.
113 Data,
114 /// Skipping spaces before the payload-length marker.
115 SpacesBeforePayloadLen,
116 /// Reading the decimal payload length.
117 PayloadLen,
118 /// Awaiting the LF that terminates the header.
119 CrlfBeforeDone,
120 /// Header complete; payload position recorded.
121 Done,
122 /// Header complete and post-handshake decryption applied.
123 PostDone,
124 /// Recovery state after the parser hit a malformed byte.
125 Unknown,
126}
127
128/// DNODE message type identifier.
129///
130/// The numeric values match `dmsg_type_t` from the reference engine
131/// because the discriminator travels on the wire as a decimal.
132#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
133#[repr(u8)]
134pub enum DmsgType {
135 /// Unset / unknown type.
136 #[default]
137 Unknown = 0,
138 /// Diagnostic frame (unused on the live wire; kept for parity).
139 Debug = 1,
140 /// Parse-error frame (unused on the live wire; kept for parity).
141 ParseError = 2,
142 /// Datastore request bound for the local DC.
143 Req = 3,
144 /// Datastore request to be forwarded across DCs.
145 ReqForward = 4,
146 /// Datastore response.
147 Res = 5,
148 /// AES key handshake.
149 CryptoHandshake = 6,
150 /// Gossip SYN.
151 GossipSyn = 7,
152 /// Gossip SYN reply.
153 GossipSynReply = 8,
154 /// Gossip ACK.
155 GossipAck = 9,
156 /// Gossip digest SYN.
157 GossipDigestSyn = 10,
158 /// Gossip digest ACK.
159 GossipDigestAck = 11,
160 /// Gossip digest ACK round 2.
161 GossipDigestAck2 = 12,
162 /// Gossip shutdown notice.
163 GossipShutdown = 13,
164 /// Explicit handoff chunk frame.
165 ///
166 /// Carries one chunk of a token-range handoff stream from the
167 /// previous owner of the range to the new owner. Distinct from
168 /// the AAE exchange variants so the receiver can route handoff
169 /// frames to the dedicated handoff coordinator without parsing
170 /// the payload first.
171 HandoffChunk = 14,
172 /// Cluster-wide RediSearch FT.SEARCH request frame.
173 ///
174 /// Sent by the FT.SEARCH coordinator on the node that
175 /// received the client request to every primary peer
176 /// covering the index's key range. The payload encodes a
177 /// broadcast request (table name, serialised query body,
178 /// top-K) - see the `dynomite-search` crate's
179 /// `query_fsm::BroadcastRequest`. Routed by the dispatcher
180 /// to the dedicated FT.SEARCH coordinator FSM instead of
181 /// the data-plane stack so the per-peer query runs against
182 /// the local registry rather than being re-forwarded.
183 FtSearchReq = 15,
184 /// Cluster-wide RediSearch FT.SEARCH reply frame.
185 ///
186 /// Returned by every peer that received a [`Self::FtSearchReq`]
187 /// once its local search completed (or the per-peer
188 /// deadline elapsed). The payload encodes the per-peer
189 /// top-K hit list plus a `timed_out` flag the coordinator
190 /// uses to mark partial results.
191 FtSearchRep = 16,
192}
193
194impl DmsgType {
195 /// Build a type from its on-the-wire integer value.
196 ///
197 /// # Examples
198 ///
199 /// ```
200 /// use dynomite::proto::dnode::DmsgType;
201 /// assert_eq!(DmsgType::from_u8(3), Some(DmsgType::Req));
202 /// assert_eq!(DmsgType::from_u8(99), None);
203 /// ```
204 #[must_use]
205 pub fn from_u8(v: u8) -> Option<Self> {
206 Some(match v {
207 0 => DmsgType::Unknown,
208 1 => DmsgType::Debug,
209 2 => DmsgType::ParseError,
210 3 => DmsgType::Req,
211 4 => DmsgType::ReqForward,
212 5 => DmsgType::Res,
213 6 => DmsgType::CryptoHandshake,
214 7 => DmsgType::GossipSyn,
215 8 => DmsgType::GossipSynReply,
216 9 => DmsgType::GossipAck,
217 10 => DmsgType::GossipDigestSyn,
218 11 => DmsgType::GossipDigestAck,
219 12 => DmsgType::GossipDigestAck2,
220 13 => DmsgType::GossipShutdown,
221 14 => DmsgType::HandoffChunk,
222 15 => DmsgType::FtSearchReq,
223 16 => DmsgType::FtSearchRep,
224 _ => return None,
225 })
226 }
227
228 /// Numeric on-the-wire value.
229 ///
230 /// # Examples
231 ///
232 /// ```
233 /// use dynomite::proto::dnode::DmsgType;
234 /// assert_eq!(DmsgType::CryptoHandshake.as_u8(), 6);
235 /// ```
236 #[must_use]
237 pub const fn as_u8(self) -> u8 {
238 self as u8
239 }
240}
241
242/// Encryption bit in [`Dmsg::flags`].
243pub const DMSG_FLAG_ENCRYPTED: u8 = 0x1;
244
245/// Compression bit in [`Dmsg::flags`].
246pub const DMSG_FLAG_COMPRESSED: u8 = 0x2;
247
248/// Parsed DNODE header.
249///
250/// `data` and `payload` hold copies of the on-the-wire bytes. The
251/// encoder side fills both before emitting; the parser fills them as
252/// it advances through the state machine.
253#[derive(Clone, Debug, Default, Eq, PartialEq)]
254pub struct Dmsg {
255 /// Message id.
256 pub id: MsgId,
257 /// Message type.
258 pub ty: DmsgType,
259 /// Flag bit field; encryption is bit 0, compression is bit 1.
260 pub flags: u8,
261 /// Protocol version.
262 pub version: u8,
263 /// True when sender and receiver share a datacenter.
264 pub same_dc: bool,
265 /// Source address recorded by the recv path. Stage 7 leaves it
266 /// `None`; Stage 9 stamps it from the connection state.
267 pub source_address: Option<SocketAddr>,
268 /// Length (in bytes) of the inline data field.
269 pub mlen: u32,
270 /// Inline data: either the single-byte placeholder or the
271 /// RSA-wrapped AES key during the crypto handshake.
272 pub data: Vec<u8>,
273 /// Length (in bytes) of the trailing payload framed by the
274 /// header.
275 pub plen: u32,
276 /// Payload bytes, if collected by the parser.
277 pub payload: Vec<u8>,
278}
279
280impl Dmsg {
281 /// Construct an empty `Dmsg` defaulted the same way the
282 /// reference engine's `dmsg_get` initialises a fresh slot.
283 ///
284 /// # Examples
285 ///
286 /// ```
287 /// use dynomite::proto::dnode::{Dmsg, DmsgType, VERSION_10};
288 /// let d = Dmsg::new();
289 /// assert_eq!(d.ty, DmsgType::Unknown);
290 /// assert_eq!(d.version, VERSION_10);
291 /// assert!(d.same_dc);
292 /// ```
293 #[must_use]
294 pub fn new() -> Self {
295 Self {
296 id: 0,
297 ty: DmsgType::Unknown,
298 flags: 0,
299 version: VERSION_10,
300 same_dc: true,
301 source_address: None,
302 mlen: 0,
303 data: Vec::new(),
304 plen: 0,
305 payload: Vec::new(),
306 }
307 }
308
309 /// True when the encryption flag is set.
310 ///
311 /// # Examples
312 ///
313 /// ```
314 /// use dynomite::proto::dnode::{Dmsg, DMSG_FLAG_ENCRYPTED};
315 /// let mut d = Dmsg::new();
316 /// d.flags = DMSG_FLAG_ENCRYPTED;
317 /// assert!(d.is_encrypted());
318 /// ```
319 #[must_use]
320 pub fn is_encrypted(&self) -> bool {
321 self.flags & DMSG_FLAG_ENCRYPTED != 0
322 }
323
324 /// True when the compression flag is set.
325 ///
326 /// # Examples
327 ///
328 /// ```
329 /// use dynomite::proto::dnode::{Dmsg, DMSG_FLAG_COMPRESSED};
330 /// let mut d = Dmsg::new();
331 /// d.flags = DMSG_FLAG_COMPRESSED;
332 /// assert!(d.is_compressed());
333 /// ```
334 #[must_use]
335 pub fn is_compressed(&self) -> bool {
336 self.flags & DMSG_FLAG_COMPRESSED != 0
337 }
338}
339
340/// Result of a single [`DnodeParser::step`] invocation.
341#[derive(Copy, Clone, Debug, Eq, PartialEq)]
342pub enum ParseStep {
343 /// More bytes are required to advance the state machine. The
344 /// `consumed` field records how many of the input bytes the
345 /// parser already absorbed.
346 NeedMore {
347 /// Number of input bytes the parser absorbed before it
348 /// stopped waiting for more.
349 consumed: usize,
350 },
351 /// The header (up to and including the trailing LF) has been
352 /// parsed. The `consumed` field records the offset just past
353 /// the LF, so the caller can read the payload starting at that
354 /// index.
355 HeaderDone {
356 /// Offset just past the trailing LF.
357 consumed: usize,
358 },
359 /// The parser hit an unrecoverable bad byte. The caller should
360 /// drop the buffer (or split it at `consumed`) and reset.
361 Error {
362 /// Offset of the byte that triggered the error.
363 consumed: usize,
364 },
365}
366
367/// Errors that can be raised when encoding or parsing a DNODE
368/// header without going through the streaming state machine.
369#[derive(Copy, Clone, Debug, Eq, PartialEq)]
370#[non_exhaustive]
371pub enum DnodeError {
372 /// Buffer too small to encode the header.
373 OutOfSpace,
374 /// Header does not begin with the magic literal.
375 BadMagic,
376 /// Numeric field could not be parsed.
377 BadNumber,
378 /// Trailing CRLF missing.
379 MissingCrlf,
380 /// Type discriminator out of range.
381 BadType,
382 /// Inline data shorter than the declared `mlen`.
383 TruncatedData,
384}
385
386/// Streaming DNODE header parser.
387#[derive(Debug)]
388pub struct DnodeParser {
389 state: DynParseState,
390 num: u64,
391 dmsg: Dmsg,
392 data_remaining: u32,
393 magic_progress: u8,
394 /// Whether the previous byte was an ASCII digit. The header
395 /// state machine only transitions out of the numeric header
396 /// fields (MSG_ID, TYPE_ID, BIT_FIELD, VERSION, SAME_DC) when
397 /// the byte immediately preceding the field-terminating space
398 /// was a digit; the parser reproduces this guard so extra
399 /// whitespace (or any other non-digit byte) is rejected with
400 /// the wire protocol's strictness.
401 prev_was_digit: bool,
402}
403
404impl DnodeParser {
405 /// Build a fresh parser positioned at [`DynParseState::Start`].
406 ///
407 /// # Examples
408 ///
409 /// ```
410 /// use dynomite::proto::dnode::{DnodeParser, DynParseState};
411 /// let p = DnodeParser::new();
412 /// assert_eq!(p.state(), DynParseState::Start);
413 /// ```
414 #[must_use]
415 pub fn new() -> Self {
416 Self {
417 state: DynParseState::Start,
418 num: 0,
419 dmsg: Dmsg::new(),
420 data_remaining: 0,
421 magic_progress: 0,
422 prev_was_digit: false,
423 }
424 }
425
426 /// Reset the parser to [`DynParseState::Start`] with a fresh
427 /// accumulator [`Dmsg`].
428 pub fn reset(&mut self) {
429 *self = Self::new();
430 }
431
432 /// Current state.
433 #[must_use]
434 pub fn state(&self) -> DynParseState {
435 self.state
436 }
437
438 /// Borrow the partial [`Dmsg`].
439 #[must_use]
440 pub fn dmsg(&self) -> &Dmsg {
441 &self.dmsg
442 }
443
444 /// Move the parsed [`Dmsg`] out of the parser. Only meaningful
445 /// after a [`ParseStep::HeaderDone`] step.
446 pub fn take_dmsg(&mut self) -> Dmsg {
447 let mut out = Dmsg::new();
448 std::mem::swap(&mut out, &mut self.dmsg);
449 self.state = DynParseState::Start;
450 self.num = 0;
451 self.data_remaining = 0;
452 self.magic_progress = 0;
453 self.prev_was_digit = false;
454 out
455 }
456
457 /// Feed `input` to the parser. The parser advances as far as it
458 /// can and returns one of the three [`ParseStep`] variants.
459 ///
460 /// The state machine is byte-driven and can be reentered with a
461 /// fresh slice when [`ParseStep::NeedMore`] indicates the input
462 /// was truncated mid-header.
463 ///
464 /// # Examples
465 ///
466 /// ```
467 /// use dynomite::proto::dnode::{DnodeParser, ParseStep};
468 /// let mut p = DnodeParser::new();
469 /// let bytes = b"$2014$ 1 3 0 1 1 *1 d *0\r\n";
470 /// match p.step(bytes) {
471 /// ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
472 /// other => panic!("unexpected: {other:?}"),
473 /// }
474 /// ```
475 /// The state machine intentionally stays in one function to
476 /// match the reference engine's single-block parser; splitting
477 /// the per-state arms across helpers would obscure the parity.
478 #[allow(clippy::too_many_lines)]
479 pub fn step(&mut self, input: &[u8]) -> ParseStep {
480 let mut idx = 0usize;
481 while idx < input.len() {
482 let ch = input[idx];
483 match self.state {
484 DynParseState::Start => {
485 // Phase 1: skip leading whitespace, identical
486 // to the C engine's `while (ch == ' ')` arm.
487 if self.magic_progress == 0 {
488 if ch == b' ' {
489 idx += 1;
490 continue;
491 }
492 if ch != b'$' {
493 return ParseStep::Error { consumed: idx };
494 }
495 }
496 // Phase 2: byte-incrementally match the magic
497 // literal so split inputs are tolerated.
498 let want = MAGIC[usize::from(self.magic_progress)];
499 if ch != want {
500 return ParseStep::Error { consumed: idx };
501 }
502 self.magic_progress += 1;
503 idx += 1;
504 if usize::from(self.magic_progress) == MAGIC.len() {
505 self.state = DynParseState::MagicString;
506 self.magic_progress = 0;
507 }
508 continue;
509 }
510 DynParseState::MagicString => {
511 if ch == b' ' {
512 self.state = DynParseState::MsgId;
513 self.num = 0;
514 idx += 1;
515 continue;
516 }
517 return ParseStep::Error { consumed: idx };
518 }
519 DynParseState::MsgId => {
520 // DYN_MSG_ID state: digits accumulate, a single
521 // space terminates the field but only when the
522 // byte immediately
523 // before it was a digit. Anything else is
524 // rejected (the C engine resets to DYN_START
525 // and lets the recovery path retry; the Rust
526 // streaming parser surfaces an error so the
527 // caller can drop the buffer).
528 if ch.is_ascii_digit() {
529 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
530 self.prev_was_digit = true;
531 idx += 1;
532 continue;
533 }
534 if ch == b' ' && self.prev_was_digit {
535 self.dmsg.id = self.num;
536 self.state = DynParseState::TypeId;
537 self.num = 0;
538 self.prev_was_digit = false;
539 idx += 1;
540 continue;
541 }
542 return ParseStep::Error { consumed: idx };
543 }
544 DynParseState::TypeId => {
545 if ch.is_ascii_digit() {
546 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
547 self.prev_was_digit = true;
548 idx += 1;
549 continue;
550 }
551 if ch == b' ' && self.prev_was_digit {
552 self.dmsg.ty = match DmsgType::from_u8(self.num as u8) {
553 Some(t) => t,
554 None => return ParseStep::Error { consumed: idx },
555 };
556 self.state = DynParseState::BitField;
557 self.num = 0;
558 self.prev_was_digit = false;
559 idx += 1;
560 continue;
561 }
562 return ParseStep::Error { consumed: idx };
563 }
564 DynParseState::BitField => {
565 if ch.is_ascii_digit() {
566 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
567 self.prev_was_digit = true;
568 idx += 1;
569 continue;
570 }
571 if ch == b' ' && self.prev_was_digit {
572 self.dmsg.flags = (self.num as u8) & 0xF;
573 self.state = DynParseState::Version;
574 self.num = 0;
575 self.prev_was_digit = false;
576 idx += 1;
577 continue;
578 }
579 return ParseStep::Error { consumed: idx };
580 }
581 DynParseState::Version => {
582 if ch.is_ascii_digit() {
583 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
584 self.prev_was_digit = true;
585 idx += 1;
586 continue;
587 }
588 if ch == b' ' && self.prev_was_digit {
589 self.dmsg.version = self.num as u8;
590 self.state = DynParseState::SameDc;
591 self.num = 0;
592 self.prev_was_digit = false;
593 idx += 1;
594 continue;
595 }
596 return ParseStep::Error { consumed: idx };
597 }
598 DynParseState::SameDc => {
599 if ch.is_ascii_digit() {
600 self.dmsg.same_dc = ch != b'0';
601 self.prev_was_digit = true;
602 idx += 1;
603 continue;
604 }
605 if ch == b' ' && self.prev_was_digit {
606 self.state = DynParseState::DataLen;
607 self.num = 0;
608 self.prev_was_digit = false;
609 idx += 1;
610 continue;
611 }
612 return ParseStep::Error { consumed: idx };
613 }
614 DynParseState::Star | DynParseState::DataLen => {
615 if ch == b'*' {
616 idx += 1;
617 continue;
618 }
619 if ch.is_ascii_digit() {
620 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
621 // Reject pathological-size length fields
622 // before the cast to u32 wraps and a
623 // downstream Vec::reserve allocates the
624 // wrapped value as bytes. See MAX_DATA_LEN.
625 if self.num > MAX_DATA_LEN {
626 return ParseStep::Error { consumed: idx };
627 }
628 idx += 1;
629 continue;
630 }
631 if ch == b' ' && self.state == DynParseState::DataLen {
632 self.dmsg.mlen = self.num as u32;
633 self.data_remaining = self.dmsg.mlen;
634 self.dmsg.data.clear();
635 self.dmsg.data.reserve(self.data_remaining as usize);
636 self.state = DynParseState::Data;
637 self.num = 0;
638 idx += 1;
639 continue;
640 }
641 return ParseStep::Error { consumed: idx };
642 }
643 DynParseState::Data => {
644 if self.data_remaining == 0 {
645 self.state = DynParseState::SpacesBeforePayloadLen;
646 continue;
647 }
648 let take = std::cmp::min(self.data_remaining as usize, input.len() - idx);
649 self.dmsg.data.extend_from_slice(&input[idx..idx + take]);
650 self.data_remaining -= take as u32;
651 idx += take;
652 if self.data_remaining == 0 {
653 self.state = DynParseState::SpacesBeforePayloadLen;
654 }
655 continue;
656 }
657 DynParseState::SpacesBeforePayloadLen => {
658 if ch == b' ' {
659 idx += 1;
660 continue;
661 }
662 if ch == b'*' {
663 self.state = DynParseState::PayloadLen;
664 self.num = 0;
665 idx += 1;
666 continue;
667 }
668 return ParseStep::Error { consumed: idx };
669 }
670 DynParseState::PayloadLen => {
671 if ch.is_ascii_digit() {
672 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
673 if self.num > MAX_DATA_LEN {
674 return ParseStep::Error { consumed: idx };
675 }
676 idx += 1;
677 continue;
678 }
679 if ch == b'\r' {
680 self.dmsg.plen = self.num as u32;
681 self.state = DynParseState::CrlfBeforeDone;
682 self.num = 0;
683 idx += 1;
684 continue;
685 }
686 return ParseStep::Error { consumed: idx };
687 }
688 DynParseState::CrlfBeforeDone => {
689 if ch == b'\n' {
690 self.state = DynParseState::Done;
691 idx += 1;
692 return ParseStep::HeaderDone { consumed: idx };
693 }
694 return ParseStep::Error { consumed: idx };
695 }
696 DynParseState::Done | DynParseState::PostDone | DynParseState::Unknown => {
697 return ParseStep::HeaderDone { consumed: idx };
698 }
699 }
700 }
701 ParseStep::NeedMore { consumed: idx }
702 }
703}
704
705impl Default for DnodeParser {
706 fn default() -> Self {
707 Self::new()
708 }
709}
710
711/// Encode a DNODE header into the writable region of `mbuf`.
712///
713/// `aes_key_payload`, when `Some`, is written as the inline data
714/// field; this is how the crypto handshake transports the
715/// RSA-wrapped AES key. When `None`, a single-byte `'d'` placeholder
716/// is emitted.
717///
718/// `flags` is taken verbatim (the encryption bit must be set by the
719/// caller, alongside any compression bit).
720///
721/// The encoder writes the entire header as a single contiguous
722/// region; if `mbuf` lacks the necessary capacity,
723/// [`DnodeError::OutOfSpace`] is returned.
724///
725/// # Examples
726///
727/// ```
728/// use dynomite::io::mbuf::MbufPool;
729/// use dynomite::proto::dnode::{dmsg_write, DmsgType};
730///
731/// let pool = MbufPool::default();
732/// let mut buf = pool.get();
733/// dmsg_write(
734/// &mut buf,
735/// /* msg_id */ 1,
736/// DmsgType::Req,
737/// /* flags */ 0,
738/// /* same_dc */ true,
739/// /* aes_key_payload */ None,
740/// /* plen */ 0,
741/// )
742/// .unwrap();
743/// assert!(buf.readable().starts_with(b" $2014$ 1 3 0"));
744/// ```
745pub fn dmsg_write(
746 mbuf: &mut Mbuf,
747 msg_id: MsgId,
748 ty: DmsgType,
749 flags: u8,
750 same_dc: bool,
751 aes_key_payload: Option<&[u8]>,
752 plen: u32,
753) -> Result<(), DnodeError> {
754 let header = build_header(msg_id, ty, flags, same_dc, aes_key_payload, plen, false);
755 write_chain(mbuf, &header)
756}
757
758/// Encode a gossip-flavored DNODE header.
759///
760/// Mirrors the reference engine's `dmsg_write_mbuf`, which differs
761/// from [`dmsg_write`] only in the placeholder byte emitted when no
762/// AES key payload accompanies the header (`'a'` instead of `'d'`).
763///
764/// # Examples
765///
766/// ```
767/// use dynomite::io::mbuf::MbufPool;
768/// use dynomite::proto::dnode::{dmsg_write_mbuf, DmsgType};
769///
770/// let pool = MbufPool::default();
771/// let mut buf = pool.get();
772/// dmsg_write_mbuf(
773/// &mut buf,
774/// /* msg_id */ 5,
775/// DmsgType::GossipSyn,
776/// /* flags */ 0,
777/// /* same_dc */ true,
778/// /* aes_key_payload */ None,
779/// /* plen */ 64,
780/// )
781/// .unwrap();
782/// assert!(buf.readable().contains(&b'a'));
783/// ```
784pub fn dmsg_write_mbuf(
785 mbuf: &mut Mbuf,
786 msg_id: MsgId,
787 ty: DmsgType,
788 flags: u8,
789 same_dc: bool,
790 aes_key_payload: Option<&[u8]>,
791 plen: u32,
792) -> Result<(), DnodeError> {
793 let header = build_header(msg_id, ty, flags, same_dc, aes_key_payload, plen, true);
794 write_chain(mbuf, &header)
795}
796
797fn build_header(
798 msg_id: MsgId,
799 ty: DmsgType,
800 flags: u8,
801 same_dc: bool,
802 aes_key_payload: Option<&[u8]>,
803 plen: u32,
804 gossip_placeholder: bool,
805) -> Vec<u8> {
806 use std::io::Write as _;
807 let mut buf: Vec<u8> = Vec::with_capacity(64);
808 // Three leading spaces are part of the magic literal as written
809 // on the wire; the parser tolerates and skips them in DYN_START.
810 buf.extend_from_slice(b" $2014$ ");
811 let _ = write!(buf, "{msg_id}");
812 buf.push(b' ');
813 let _ = write!(buf, "{}", ty.as_u8());
814 buf.push(b' ');
815 let _ = write!(buf, "{}", flags & 0xF);
816 buf.push(b' ');
817 let _ = write!(buf, "{VERSION_10}");
818 buf.push(b' ');
819 buf.push(if same_dc { b'1' } else { b'0' });
820 buf.push(b' ');
821 buf.push(b'*');
822 if let Some(payload) = aes_key_payload {
823 let _ = write!(buf, "{}", payload.len());
824 buf.push(b' ');
825 buf.extend_from_slice(payload);
826 } else {
827 buf.extend_from_slice(b"1 ");
828 buf.push(if gossip_placeholder {
829 GOSSIP_PLACEHOLDER_DATA
830 } else {
831 HANDSHAKE_PLACEHOLDER_DATA
832 });
833 }
834 buf.push(b' ');
835 buf.push(b'*');
836 let _ = write!(buf, "{plen}");
837 buf.extend_from_slice(CRLF);
838 buf
839}
840
841fn write_chain(mbuf: &mut Mbuf, payload: &[u8]) -> Result<(), DnodeError> {
842 if mbuf.remaining() < payload.len() {
843 return Err(DnodeError::OutOfSpace);
844 }
845 let n = mbuf.recv(payload);
846 debug_assert_eq!(n, payload.len());
847 Ok(())
848}
849
850/// Sync byte parser that drives a request message's DNODE header
851/// state machine.
852///
853/// The parser walks the contiguous bytes spanning the message's
854/// mbuf chain and updates the [`Msg`] in place. On a fully parsed
855/// header, the function attaches the [`Dmsg`] to the message and
856/// returns `MsgParseResult::Ok`. On truncated input the parser
857/// returns `MsgParseResult::Again`. On invalid bytes the parser
858/// records `MsgParseResult::Error` and surfaces the same value.
859///
860/// The async wrapping (per-connection task scheduling, decryption
861/// hand-off when the encryption bit is set) ships in Stage 9.
862///
863/// # Examples
864///
865/// ```
866/// use dynomite::io::mbuf::MbufPool;
867/// use dynomite::msg::{Msg, MsgType};
868/// use dynomite::proto::dnode::{parse_req, DmsgType, DynParseState};
869///
870/// let pool = MbufPool::default();
871/// let mut msg = Msg::new(0, MsgType::Unknown, true);
872/// let mut mb = pool.get();
873/// mb.recv(b"$2014$ 1 3 0 1 1 *1 d *0\r\n");
874/// msg.mbufs_mut().push_back(mb);
875/// msg.recompute_mlen();
876/// let result = parse_req(&mut msg);
877/// assert_eq!(msg.dyn_parse_state(), DynParseState::Done);
878/// assert_eq!(msg.dmsg().unwrap().ty, DmsgType::Req);
879/// drop(result);
880/// ```
881pub fn parse_req(msg: &mut Msg) -> MsgParseResult {
882 parse_msg(msg, false)
883}
884
885/// Sync byte parser counterpart to [`parse_req`] for response
886/// messages.
887///
888/// # Examples
889///
890/// ```
891/// use dynomite::io::mbuf::MbufPool;
892/// use dynomite::msg::{Msg, MsgType};
893/// use dynomite::proto::dnode::{parse_rsp, DmsgType};
894///
895/// let pool = MbufPool::default();
896/// let mut msg = Msg::new(0, MsgType::Unknown, false);
897/// let mut mb = pool.get();
898/// mb.recv(b"$2014$ 9 5 0 1 1 *1 d *0\r\n");
899/// msg.mbufs_mut().push_back(mb);
900/// msg.recompute_mlen();
901/// let _ = parse_rsp(&mut msg);
902/// assert_eq!(msg.dmsg().unwrap().ty, DmsgType::Res);
903/// ```
904pub fn parse_rsp(msg: &mut Msg) -> MsgParseResult {
905 parse_msg(msg, true)
906}
907
908fn parse_msg(msg: &mut Msg, _is_response: bool) -> MsgParseResult {
909 // Flatten the chain into a single buffer for parsing. The
910 // reference engine walks the chain byte by byte and tolerates
911 // splits at arbitrary boundaries; the Rust port drives the same
912 // state machine over a contiguous slice. Stage 9 will replace
913 // this with a streaming feed when the connection FSM lands.
914 let mut bytes: Vec<u8> = Vec::with_capacity(msg.mbufs().total_len());
915 for mbuf in msg.mbufs() {
916 bytes.extend_from_slice(mbuf.readable());
917 }
918
919 let mut parser = DnodeParser::new();
920 parser.state = msg.dyn_parse_state();
921 match parser.step(&bytes) {
922 ParseStep::HeaderDone { .. } => {
923 let dmsg = parser.take_dmsg();
924 msg.set_dyn_parse_state(DynParseState::Done);
925 msg.set_dmsg(dmsg);
926 msg.set_parse_result(MsgParseResult::Ok);
927 MsgParseResult::Ok
928 }
929 ParseStep::NeedMore { .. } => {
930 msg.set_dyn_parse_state(parser.state);
931 msg.set_parse_result(MsgParseResult::Again);
932 MsgParseResult::Again
933 }
934 ParseStep::Error { .. } => {
935 msg.set_dyn_parse_state(DynParseState::Unknown);
936 msg.set_parse_result(MsgParseResult::Error);
937 MsgParseResult::Error
938 }
939 }
940}
941
942/// Outcome of [`dmsg_process`].
943///
944/// `Bypass` means the header has been recognised as control traffic
945/// and the cluster layer should not pass the message further down
946/// the protocol stack.
947#[derive(Copy, Clone, Debug, Eq, PartialEq)]
948pub enum DmsgDispatch {
949 /// Frame consumed by a control-plane handler.
950 Bypass,
951 /// Frame should continue through the data-plane stack.
952 Forward,
953}
954
955/// Stage 7 dispatcher: classify a parsed [`Dmsg`] as
956/// control-plane traffic the cluster layer should consume directly,
957/// or data-plane traffic that should continue through the protocol
958/// stack.
959///
960/// Stage 10 will extend this with the gossip-message decoders. For
961/// now, only the message-shape side of the dispatch is in place.
962///
963/// # Examples
964///
965/// ```
966/// use dynomite::proto::dnode::{dmsg_process, Dmsg, DmsgDispatch, DmsgType};
967///
968/// let mut d = Dmsg::new();
969/// d.ty = DmsgType::CryptoHandshake;
970/// assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
971///
972/// // Gossip variants other than SYN / SYN_REPLY fall through.
973/// d.ty = DmsgType::GossipShutdown;
974/// assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
975///
976/// d.ty = DmsgType::Req;
977/// assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
978/// ```
979#[must_use]
980pub fn dmsg_process(dmsg: &Dmsg) -> DmsgDispatch {
981 // Dmsg dispatch table: only CRYPTO_HANDSHAKE,
982 // GOSSIP_SYN, and GOSSIP_SYN_REPLY short-circuit; the other
983 // gossip variants (ACK, DIGEST_SYN, DIGEST_ACK, DIGEST_ACK2,
984 // SHUTDOWN) fall through to the default branch and are
985 // forwarded to the cluster handlers (which Stage 10 will wire
986 // up). HANDOFF_CHUNK frames are control-plane traffic for
987 // the explicit handoff coordinator and bypass the data-plane
988 // stack alongside the crypto / gossip handshake variants.
989 match dmsg.ty {
990 DmsgType::CryptoHandshake
991 | DmsgType::GossipSyn
992 | DmsgType::GossipSynReply
993 | DmsgType::HandoffChunk
994 | DmsgType::FtSearchReq
995 | DmsgType::FtSearchRep => DmsgDispatch::Bypass,
996 _ => DmsgDispatch::Forward,
997 }
998}
999
1000/// Drain `chain` into a contiguous `Vec<u8>` recycling each chunk
1001/// back to `pool`. Useful for tests and for the Stage 9 path that
1002/// needs a flat buffer of decrypted payload bytes.
1003pub fn flatten_chain(chain: &mut MbufQueue) -> Vec<u8> {
1004 let mut out = Vec::with_capacity(chain.total_len());
1005 while let Some(buf) = chain.pop_front() {
1006 out.extend_from_slice(buf.readable());
1007 }
1008 out
1009}
1010
1011/// Peer-handshake control payload exchanged on top of a
1012/// [`DmsgType::GossipSyn`] frame.
1013///
1014/// Today the handshake carries the cluster-wide capability
1015/// advertisement (see [`crate::cluster::capability`]). Future
1016/// fields will be appended as new typed records; older peers
1017/// ignore unknown trailing bytes.
1018///
1019/// # Wire format
1020///
1021/// ```text
1022/// magic(4) = "DHS1"
1023/// flags(2) = 0
1024/// CapabilityAd (length-prefixed, see
1025/// `CapabilityAd::encode` for the exact layout)
1026/// ```
1027///
1028/// All multi-byte integers are little-endian. The encoding uses
1029/// only the standard library; no external codec is pulled in.
1030///
1031/// # Examples
1032///
1033/// ```
1034/// use dynomite::cluster::capability::{CapabilityAd, CapabilityAdEntry};
1035/// use dynomite::proto::dnode::Handshake;
1036/// let ad = CapabilityAd::from_entries(vec![
1037/// CapabilityAdEntry::new("framing".into(), vec![vec![1, 0, 0, 0]]),
1038/// ]);
1039/// let hs = Handshake::new(ad.clone());
1040/// let bytes = hs.encode();
1041/// let back = Handshake::decode(&bytes).unwrap();
1042/// assert_eq!(back.capabilities(), &ad);
1043/// ```
1044#[derive(Clone, Debug, Default, Eq, PartialEq)]
1045pub struct Handshake {
1046 capabilities: crate::cluster::capability::CapabilityAd,
1047}
1048
1049impl Handshake {
1050 /// Magic literal that opens every handshake payload.
1051 pub const MAGIC: [u8; 4] = *b"DHS1";
1052
1053 /// Build a handshake carrying `capabilities`.
1054 #[must_use]
1055 pub fn new(capabilities: crate::cluster::capability::CapabilityAd) -> Self {
1056 Self { capabilities }
1057 }
1058
1059 /// Borrow the embedded capability advertisement.
1060 #[must_use]
1061 pub fn capabilities(&self) -> &crate::cluster::capability::CapabilityAd {
1062 &self.capabilities
1063 }
1064
1065 /// Consume the handshake and return the embedded
1066 /// advertisement.
1067 #[must_use]
1068 pub fn into_capabilities(self) -> crate::cluster::capability::CapabilityAd {
1069 self.capabilities
1070 }
1071
1072 /// Serialise the handshake to a length-prefixed byte
1073 /// stream.
1074 #[must_use]
1075 pub fn encode(&self) -> Vec<u8> {
1076 let cap_bytes = self.capabilities.encode();
1077 let mut out = Vec::with_capacity(Self::MAGIC.len() + 2 + cap_bytes.len());
1078 out.extend_from_slice(&Self::MAGIC);
1079 out.extend_from_slice(&0u16.to_le_bytes()); // flags
1080 out.extend_from_slice(&cap_bytes);
1081 out
1082 }
1083
1084 /// Inverse of [`Handshake::encode`]. Surfaces a typed error
1085 /// when the magic / version is wrong or the embedded
1086 /// advertisement is malformed.
1087 pub fn decode(bytes: &[u8]) -> Result<Self, crate::cluster::capability::CapabilityCodecError> {
1088 use crate::cluster::capability::CapabilityCodecError;
1089 if bytes.len() < Self::MAGIC.len() + 2 {
1090 return Err(CapabilityCodecError::Truncated);
1091 }
1092 if bytes[..Self::MAGIC.len()] != Self::MAGIC {
1093 return Err(CapabilityCodecError::BadMagic);
1094 }
1095 // Flags are reserved; the only currently legal value is
1096 // zero. Any non-zero value is reserved for future use
1097 // and rejected here so older builds fail closed.
1098 let flags_off = Self::MAGIC.len();
1099 let flags = u16::from_le_bytes([bytes[flags_off], bytes[flags_off + 1]]);
1100 if flags != 0 {
1101 return Err(CapabilityCodecError::BadMagic);
1102 }
1103 let cap_bytes = &bytes[flags_off + 2..];
1104 let capabilities = crate::cluster::capability::CapabilityAd::decode(cap_bytes)?;
1105 Ok(Self { capabilities })
1106 }
1107
1108 /// Number of bytes the handshake's fixed-size prefix
1109 /// occupies before the embedded advertisement. Useful in
1110 /// tests that assert the on-the-wire delta.
1111 #[must_use]
1112 pub const fn header_len() -> usize {
1113 Self::MAGIC.len() + 2
1114 }
1115}
1116
1117#[cfg(test)]
1118mod tests {
1119 use super::*;
1120 use crate::io::mbuf::MbufPool;
1121
1122 #[test]
1123 fn parse_simple_req() {
1124 let mut p = DnodeParser::new();
1125 let bytes = b"$2014$ 1 3 0 1 1 *1 d *0\r\n";
1126 match p.step(bytes) {
1127 ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1128 other => panic!("unexpected: {other:?}"),
1129 }
1130 let d = p.take_dmsg();
1131 assert_eq!(d.id, 1);
1132 assert_eq!(d.ty, DmsgType::Req);
1133 assert_eq!(d.flags, 0);
1134 assert_eq!(d.version, 1);
1135 assert!(d.same_dc);
1136 assert_eq!(d.mlen, 1);
1137 assert_eq!(d.data, b"d");
1138 assert_eq!(d.plen, 0);
1139 }
1140
1141 #[test]
1142 fn parse_payload_len() {
1143 let mut p = DnodeParser::new();
1144 let bytes = b"$2014$ 2 3 0 1 1 *1 d *413\r\n";
1145 match p.step(bytes) {
1146 ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1147 other => panic!("unexpected: {other:?}"),
1148 }
1149 assert_eq!(p.dmsg().plen, 413);
1150 }
1151
1152 #[test]
1153 fn parse_three_back_to_back() {
1154 let mut input: Vec<u8> = Vec::new();
1155 input.extend_from_slice(b"$2014$ 1 3 0 1 1 *1 d *0\r\n");
1156 input.extend_from_slice(b"some redis bytes here ignored");
1157 input.extend_from_slice(b"$2014$ 2 3 0 1 1 *1 d *3\r\nABC");
1158 input.extend_from_slice(b"$2014$ 3 3 0 1 1 *1 d *0\r\n");
1159 let mut p = DnodeParser::new();
1160 let mut idx = 0;
1161 let mut count = 0;
1162 while idx < input.len() {
1163 match p.step(&input[idx..]) {
1164 ParseStep::HeaderDone { consumed } => {
1165 let d = p.take_dmsg();
1166 count += 1;
1167 let after_header = idx + consumed;
1168 if count == 1 {
1169 assert_eq!(d.id, 1);
1170 // skip past the redis bytes by scanning for the next '$'
1171 idx = input[after_header..]
1172 .iter()
1173 .position(|&b| b == b'$')
1174 .map_or(input.len(), |n| after_header + n);
1175 } else if count == 2 {
1176 assert_eq!(d.id, 2);
1177 assert_eq!(d.plen, 3);
1178 idx = after_header + d.plen as usize;
1179 } else {
1180 assert_eq!(d.id, 3);
1181 idx = after_header;
1182 }
1183 p.reset();
1184 }
1185 ParseStep::NeedMore { .. } => {
1186 break;
1187 }
1188 ParseStep::Error { consumed } => {
1189 idx += consumed.max(1);
1190 p.reset();
1191 }
1192 }
1193 }
1194 assert_eq!(count, 3);
1195 }
1196
1197 #[test]
1198 fn need_more_when_truncated() {
1199 let mut p = DnodeParser::new();
1200 let prefix = b"$2014$ 1 3 0 1 1 *1 d *";
1201 match p.step(prefix) {
1202 ParseStep::NeedMore { consumed } => assert_eq!(consumed, prefix.len()),
1203 other => panic!("unexpected: {other:?}"),
1204 }
1205 let suffix = b"42\r\n";
1206 match p.step(suffix) {
1207 ParseStep::HeaderDone { consumed } => assert_eq!(consumed, suffix.len()),
1208 other => panic!("unexpected: {other:?}"),
1209 }
1210 assert_eq!(p.take_dmsg().plen, 42);
1211 }
1212
1213 #[test]
1214 fn parse_error_on_garbage_prefix() {
1215 let mut p = DnodeParser::new();
1216 match p.step(b"!nope") {
1217 ParseStep::Error { consumed } => assert_eq!(consumed, 0),
1218 other => panic!("unexpected: {other:?}"),
1219 }
1220 }
1221
1222 /// Regression for the libfuzzer 1h soak finding 2026-06-02:
1223 /// a numeric DataLen field that exceeds [`MAX_DATA_LEN`]
1224 /// must be rejected with `ParseStep::Error` BEFORE the
1225 /// downstream `Vec::reserve` would convert the wrapped u32
1226 /// into a multi-gigabyte malloc.
1227 #[test]
1228 fn parse_rejects_oversized_data_len() {
1229 let mut p = DnodeParser::new();
1230 // 11 ones drives self.num to 11_111_111_111, which casts
1231 // to u32 as 2_521_176_519 (~2.4 GiB). Pre-fix the parser
1232 // accepted this and called Vec::reserve(2_521_176_519).
1233 let bytes = b"$2014$ 1 3 0 1 1 *11111111111 ";
1234 match p.step(bytes) {
1235 ParseStep::Error { consumed: _ } => (),
1236 other => panic!("expected Error, got {other:?}"),
1237 }
1238 }
1239
1240 /// Regression for the libfuzzer 1h soak finding 2026-06-02:
1241 /// the captured 112-byte OOM artifact must drive `step()`
1242 /// to a clean Error rather than allocating gigabytes.
1243 #[test]
1244 fn parse_oom_artifact_2026_06_02() {
1245 let bytes = include_bytes!("../../../fuzz/seeds/dnode_parse/regression-oom-2026-06-02");
1246 let mut p = DnodeParser::new();
1247 match p.step(bytes) {
1248 ParseStep::Error { .. } | ParseStep::HeaderDone { .. } => (),
1249 ParseStep::NeedMore { .. } => panic!("unexpected NeedMore"),
1250 }
1251 }
1252
1253 #[test]
1254 fn writer_round_trip_unencrypted() {
1255 let pool = MbufPool::default();
1256 let mut buf = pool.get();
1257 dmsg_write(&mut buf, 42, DmsgType::Req, 0, true, None, 0).unwrap();
1258 let bytes = buf.readable().to_vec();
1259 let mut p = DnodeParser::new();
1260 let step = p.step(&bytes);
1261 assert!(matches!(step, ParseStep::HeaderDone { .. }));
1262 let d = p.take_dmsg();
1263 assert_eq!(d.id, 42);
1264 assert_eq!(d.ty, DmsgType::Req);
1265 assert_eq!(d.flags, 0);
1266 assert!(d.same_dc);
1267 assert_eq!(d.mlen, 1);
1268 assert_eq!(d.data, b"d");
1269 assert_eq!(d.plen, 0);
1270 }
1271
1272 #[test]
1273 fn writer_round_trip_with_aes_payload() {
1274 let pool = MbufPool::default();
1275 let mut buf = pool.get();
1276 let payload = vec![0xAB; 128];
1277 dmsg_write(
1278 &mut buf,
1279 7,
1280 DmsgType::CryptoHandshake,
1281 DMSG_FLAG_ENCRYPTED,
1282 false,
1283 Some(&payload),
1284 512,
1285 )
1286 .unwrap();
1287 let bytes = buf.readable().to_vec();
1288 let mut p = DnodeParser::new();
1289 match p.step(&bytes) {
1290 ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1291 other => panic!("unexpected: {other:?}"),
1292 }
1293 let d = p.take_dmsg();
1294 assert_eq!(d.id, 7);
1295 assert_eq!(d.ty, DmsgType::CryptoHandshake);
1296 assert!(d.is_encrypted());
1297 assert!(!d.same_dc);
1298 assert_eq!(d.data, payload);
1299 assert_eq!(d.plen, 512);
1300 }
1301
1302 #[test]
1303 fn dispatcher_classifies_control_plane() {
1304 let mut d = Dmsg::new();
1305 // Pin the exact three variants the C `dmsg_process`
1306 // bypasses.
1307 for ty in [
1308 DmsgType::CryptoHandshake,
1309 DmsgType::GossipSyn,
1310 DmsgType::GossipSynReply,
1311 ] {
1312 d.ty = ty;
1313 assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1314 }
1315 // Every other gossip variant falls through to the default
1316 // branch (forward), matching the C switch.
1317 for ty in [
1318 DmsgType::GossipAck,
1319 DmsgType::GossipDigestSyn,
1320 DmsgType::GossipDigestAck,
1321 DmsgType::GossipDigestAck2,
1322 DmsgType::GossipShutdown,
1323 DmsgType::Req,
1324 DmsgType::ReqForward,
1325 DmsgType::Res,
1326 ] {
1327 d.ty = ty;
1328 assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
1329 }
1330 // HandoffChunk routes to the explicit handoff coordinator
1331 // and is therefore bypassed alongside the handshake
1332 // variants.
1333 d.ty = DmsgType::HandoffChunk;
1334 assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1335 // FT.SEARCH coordinator messages are routed to the
1336 // dedicated query-fsm coordinator via the same
1337 // bypass path used by the handoff coordinator.
1338 for ty in [DmsgType::FtSearchReq, DmsgType::FtSearchRep] {
1339 d.ty = ty;
1340 assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1341 }
1342 }
1343}