Skip to main content

dynomite/proto/
dnode.rs

1//! DNODE wire codec.
2//!
3//! The DNODE protocol frames every Dynomite peer-to-peer message
4//! with a small ASCII header followed by an opaque payload. The
5//! header carries the message id, type tag, encryption/compression
6//! flags, protocol version, same-datacenter bit, an inline data
7//! field (either a one-byte placeholder or an RSA-wrapped AES key),
8//! and the byte length of the payload that follows after `\r\n`.
9//!
10//! The reference engine implements the parser as a single state
11//! machine driven byte-by-byte. This module ports the same machine
12//! into safe Rust and exposes:
13//!
14//! * [`DynParseState`] - the parser's state alphabet.
15//! * [`DmsgType`] - the full set of message-type discriminators.
16//! * [`Dmsg`] - the in-memory header.
17//! * [`DnodeParser`] - the state machine, advanced by feeding bytes
18//!   through [`DnodeParser::step`].
19//! * [`dmsg_write`] / [`dmsg_write_mbuf`] - the canonical encoders.
20//! * [`parse_req`] / [`parse_rsp`] - thin sync wrappers around the
21//!   parser that operate on a [`crate::msg::Msg`]'s mbuf chain.
22//! * [`dmsg_process`] - dispatcher that classifies a parsed
23//!   [`Dmsg`] by type for the cluster layer to act on.
24//!
25//! The encoder accepts an optional `aes_key_payload`: when present,
26//! the caller provides the bytes the inline data field should hold
27//! (the RSA-wrapped AES key produced by [`crate::crypto::Crypto`]).
28//! When absent, the encoder writes the single-byte `'d'` placeholder
29//! the reference engine emits after the first handshake message.
30
31// The parser truncates accumulated decimals into the same fixed
32// bit widths the reference engine uses on the wire (`u8` for the
33// type and flags, `u32` for the data and payload lengths). The
34// allowance keeps the Rust port faithful to the C `(uint8_t)num`
35// and `(uint32_t)num` casts; out-of-range numerals are surfaced as
36// parse errors elsewhere in the state machine.
37#![allow(clippy::cast_possible_truncation)]
38#![allow(clippy::needless_continue)]
39
40use std::net::SocketAddr;
41
42use crate::core::types::MsgId;
43use crate::io::mbuf::{Mbuf, MbufQueue};
44use crate::msg::message::Msg;
45use crate::msg::message::MsgParseResult;
46
47/// Magic literal that opens every DNODE header.
48pub const MAGIC: &[u8] = b"$2014$";
49
50/// Default protocol version emitted by [`dmsg_write`]. Mirrors
51/// `VERSION_10` in the reference engine.
52pub const VERSION_10: u8 = 1;
53
54/// CRLF delimiter that separates the DNODE header from its payload.
55pub const CRLF: &[u8] = b"\r\n";
56
57/// Single-byte placeholder used by [`dmsg_write`] when no AES key
58/// payload accompanies the header.
59pub const HANDSHAKE_PLACEHOLDER_DATA: u8 = b'd';
60
61/// Single-byte placeholder used by [`dmsg_write_mbuf`] when no AES
62/// key payload accompanies the header. The gossip path emits `'a'`
63/// instead of `'d'` to disambiguate the two encoder flavours.
64pub const GOSSIP_PLACEHOLDER_DATA: u8 = b'a';
65
66/// Parser state transitions.
67///
68/// Each variant matches a state in the reference engine's
69/// `dyn_parse_state_t`. The numeric values match the C enum's
70/// numeric values to keep external parity tooling honest.
71#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
72pub enum DynParseState {
73    /// Initial state; consumes leading whitespace until the magic
74    /// literal is observed.
75    #[default]
76    Start,
77    /// `$2014$` was matched; awaiting the trailing space.
78    MagicString,
79    /// Reading the decimal message id.
80    MsgId,
81    /// Reading the decimal message type.
82    TypeId,
83    /// Reading the decimal flags bit field.
84    BitField,
85    /// Reading the decimal protocol version.
86    Version,
87    /// Reading the same-datacenter digit.
88    SameDc,
89    /// Awaiting the leading `*` before the data length.
90    Star,
91    /// Reading the decimal data length.
92    DataLen,
93    /// Consuming the inline data of `mlen` bytes.
94    Data,
95    /// Skipping spaces before the payload-length marker.
96    SpacesBeforePayloadLen,
97    /// Reading the decimal payload length.
98    PayloadLen,
99    /// Awaiting the LF that terminates the header.
100    CrlfBeforeDone,
101    /// Header complete; payload position recorded.
102    Done,
103    /// Header complete and post-handshake decryption applied.
104    PostDone,
105    /// Recovery state after the parser hit a malformed byte.
106    Unknown,
107}
108
109/// DNODE message type identifier.
110///
111/// The numeric values match `dmsg_type_t` from the reference engine
112/// because the discriminator travels on the wire as a decimal.
113#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
114#[repr(u8)]
115pub enum DmsgType {
116    /// Unset / unknown type.
117    #[default]
118    Unknown = 0,
119    /// Diagnostic frame (unused on the live wire; kept for parity).
120    Debug = 1,
121    /// Parse-error frame (unused on the live wire; kept for parity).
122    ParseError = 2,
123    /// Datastore request bound for the local DC.
124    Req = 3,
125    /// Datastore request to be forwarded across DCs.
126    ReqForward = 4,
127    /// Datastore response.
128    Res = 5,
129    /// AES key handshake.
130    CryptoHandshake = 6,
131    /// Gossip SYN.
132    GossipSyn = 7,
133    /// Gossip SYN reply.
134    GossipSynReply = 8,
135    /// Gossip ACK.
136    GossipAck = 9,
137    /// Gossip digest SYN.
138    GossipDigestSyn = 10,
139    /// Gossip digest ACK.
140    GossipDigestAck = 11,
141    /// Gossip digest ACK round 2.
142    GossipDigestAck2 = 12,
143    /// Gossip shutdown notice.
144    GossipShutdown = 13,
145    /// Explicit handoff chunk frame.
146    ///
147    /// Carries one chunk of a token-range handoff stream from the
148    /// previous owner of the range to the new owner. Distinct from
149    /// the AAE exchange variants so the receiver can route handoff
150    /// frames to the dedicated handoff coordinator without parsing
151    /// the payload first.
152    HandoffChunk = 14,
153    /// Cluster-wide RediSearch FT.SEARCH request frame.
154    ///
155    /// Sent by the FT.SEARCH coordinator on the node that
156    /// received the client request to every primary peer
157    /// covering the index's key range. The payload encodes a
158    /// broadcast request (table name, serialised query body,
159    /// top-K) - see the `dynomite-search` crate's
160    /// `query_fsm::BroadcastRequest`. Routed by the dispatcher
161    /// to the dedicated FT.SEARCH coordinator FSM instead of
162    /// the data-plane stack so the per-peer query runs against
163    /// the local registry rather than being re-forwarded.
164    FtSearchReq = 15,
165    /// Cluster-wide RediSearch FT.SEARCH reply frame.
166    ///
167    /// Returned by every peer that received a [`Self::FtSearchReq`]
168    /// once its local search completed (or the per-peer
169    /// deadline elapsed). The payload encodes the per-peer
170    /// top-K hit list plus a `timed_out` flag the coordinator
171    /// uses to mark partial results.
172    FtSearchRep = 16,
173}
174
175impl DmsgType {
176    /// Build a type from its on-the-wire integer value.
177    ///
178    /// # Examples
179    ///
180    /// ```
181    /// use dynomite::proto::dnode::DmsgType;
182    /// assert_eq!(DmsgType::from_u8(3), Some(DmsgType::Req));
183    /// assert_eq!(DmsgType::from_u8(99), None);
184    /// ```
185    #[must_use]
186    pub fn from_u8(v: u8) -> Option<Self> {
187        Some(match v {
188            0 => DmsgType::Unknown,
189            1 => DmsgType::Debug,
190            2 => DmsgType::ParseError,
191            3 => DmsgType::Req,
192            4 => DmsgType::ReqForward,
193            5 => DmsgType::Res,
194            6 => DmsgType::CryptoHandshake,
195            7 => DmsgType::GossipSyn,
196            8 => DmsgType::GossipSynReply,
197            9 => DmsgType::GossipAck,
198            10 => DmsgType::GossipDigestSyn,
199            11 => DmsgType::GossipDigestAck,
200            12 => DmsgType::GossipDigestAck2,
201            13 => DmsgType::GossipShutdown,
202            14 => DmsgType::HandoffChunk,
203            15 => DmsgType::FtSearchReq,
204            16 => DmsgType::FtSearchRep,
205            _ => return None,
206        })
207    }
208
209    /// Numeric on-the-wire value.
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// use dynomite::proto::dnode::DmsgType;
215    /// assert_eq!(DmsgType::CryptoHandshake.as_u8(), 6);
216    /// ```
217    #[must_use]
218    pub const fn as_u8(self) -> u8 {
219        self as u8
220    }
221}
222
223/// Encryption bit in [`Dmsg::flags`].
224pub const DMSG_FLAG_ENCRYPTED: u8 = 0x1;
225
226/// Compression bit in [`Dmsg::flags`].
227pub const DMSG_FLAG_COMPRESSED: u8 = 0x2;
228
229/// Parsed DNODE header.
230///
231/// `data` and `payload` hold copies of the on-the-wire bytes. The
232/// encoder side fills both before emitting; the parser fills them as
233/// it advances through the state machine.
234#[derive(Clone, Debug, Default, Eq, PartialEq)]
235pub struct Dmsg {
236    /// Message id.
237    pub id: MsgId,
238    /// Message type.
239    pub ty: DmsgType,
240    /// Flag bit field; encryption is bit 0, compression is bit 1.
241    pub flags: u8,
242    /// Protocol version.
243    pub version: u8,
244    /// True when sender and receiver share a datacenter.
245    pub same_dc: bool,
246    /// Source address recorded by the recv path. Stage 7 leaves it
247    /// `None`; Stage 9 stamps it from the connection state.
248    pub source_address: Option<SocketAddr>,
249    /// Length (in bytes) of the inline data field.
250    pub mlen: u32,
251    /// Inline data: either the single-byte placeholder or the
252    /// RSA-wrapped AES key during the crypto handshake.
253    pub data: Vec<u8>,
254    /// Length (in bytes) of the trailing payload framed by the
255    /// header.
256    pub plen: u32,
257    /// Payload bytes, if collected by the parser.
258    pub payload: Vec<u8>,
259}
260
261impl Dmsg {
262    /// Construct an empty `Dmsg` defaulted the same way the
263    /// reference engine's `dmsg_get` initialises a fresh slot.
264    ///
265    /// # Examples
266    ///
267    /// ```
268    /// use dynomite::proto::dnode::{Dmsg, DmsgType, VERSION_10};
269    /// let d = Dmsg::new();
270    /// assert_eq!(d.ty, DmsgType::Unknown);
271    /// assert_eq!(d.version, VERSION_10);
272    /// assert!(d.same_dc);
273    /// ```
274    #[must_use]
275    pub fn new() -> Self {
276        Self {
277            id: 0,
278            ty: DmsgType::Unknown,
279            flags: 0,
280            version: VERSION_10,
281            same_dc: true,
282            source_address: None,
283            mlen: 0,
284            data: Vec::new(),
285            plen: 0,
286            payload: Vec::new(),
287        }
288    }
289
290    /// True when the encryption flag is set.
291    ///
292    /// # Examples
293    ///
294    /// ```
295    /// use dynomite::proto::dnode::{Dmsg, DMSG_FLAG_ENCRYPTED};
296    /// let mut d = Dmsg::new();
297    /// d.flags = DMSG_FLAG_ENCRYPTED;
298    /// assert!(d.is_encrypted());
299    /// ```
300    #[must_use]
301    pub fn is_encrypted(&self) -> bool {
302        self.flags & DMSG_FLAG_ENCRYPTED != 0
303    }
304
305    /// True when the compression flag is set.
306    ///
307    /// # Examples
308    ///
309    /// ```
310    /// use dynomite::proto::dnode::{Dmsg, DMSG_FLAG_COMPRESSED};
311    /// let mut d = Dmsg::new();
312    /// d.flags = DMSG_FLAG_COMPRESSED;
313    /// assert!(d.is_compressed());
314    /// ```
315    #[must_use]
316    pub fn is_compressed(&self) -> bool {
317        self.flags & DMSG_FLAG_COMPRESSED != 0
318    }
319}
320
321/// Result of a single [`DnodeParser::step`] invocation.
322#[derive(Copy, Clone, Debug, Eq, PartialEq)]
323pub enum ParseStep {
324    /// More bytes are required to advance the state machine. The
325    /// `consumed` field records how many of the input bytes the
326    /// parser already absorbed.
327    NeedMore {
328        /// Number of input bytes the parser absorbed before it
329        /// stopped waiting for more.
330        consumed: usize,
331    },
332    /// The header (up to and including the trailing LF) has been
333    /// parsed. The `consumed` field records the offset just past
334    /// the LF, so the caller can read the payload starting at that
335    /// index.
336    HeaderDone {
337        /// Offset just past the trailing LF.
338        consumed: usize,
339    },
340    /// The parser hit an unrecoverable bad byte. The caller should
341    /// drop the buffer (or split it at `consumed`) and reset.
342    Error {
343        /// Offset of the byte that triggered the error.
344        consumed: usize,
345    },
346}
347
348/// Errors that can be raised when encoding or parsing a DNODE
349/// header without going through the streaming state machine.
350#[derive(Copy, Clone, Debug, Eq, PartialEq)]
351#[non_exhaustive]
352pub enum DnodeError {
353    /// Buffer too small to encode the header.
354    OutOfSpace,
355    /// Header does not begin with the magic literal.
356    BadMagic,
357    /// Numeric field could not be parsed.
358    BadNumber,
359    /// Trailing CRLF missing.
360    MissingCrlf,
361    /// Type discriminator out of range.
362    BadType,
363    /// Inline data shorter than the declared `mlen`.
364    TruncatedData,
365}
366
367/// Streaming DNODE header parser.
368#[derive(Debug)]
369pub struct DnodeParser {
370    state: DynParseState,
371    num: u64,
372    dmsg: Dmsg,
373    data_remaining: u32,
374    magic_progress: u8,
375    /// Whether the previous byte was an ASCII digit. The header
376    /// state machine only transitions out of the numeric header
377    /// fields (MSG_ID, TYPE_ID, BIT_FIELD, VERSION, SAME_DC) when
378    /// the byte immediately preceding the field-terminating space
379    /// was a digit; the parser reproduces this guard so extra
380    /// whitespace (or any other non-digit byte) is rejected with
381    /// the wire protocol's strictness.
382    prev_was_digit: bool,
383}
384
385impl DnodeParser {
386    /// Build a fresh parser positioned at [`DynParseState::Start`].
387    ///
388    /// # Examples
389    ///
390    /// ```
391    /// use dynomite::proto::dnode::{DnodeParser, DynParseState};
392    /// let p = DnodeParser::new();
393    /// assert_eq!(p.state(), DynParseState::Start);
394    /// ```
395    #[must_use]
396    pub fn new() -> Self {
397        Self {
398            state: DynParseState::Start,
399            num: 0,
400            dmsg: Dmsg::new(),
401            data_remaining: 0,
402            magic_progress: 0,
403            prev_was_digit: false,
404        }
405    }
406
407    /// Reset the parser to [`DynParseState::Start`] with a fresh
408    /// accumulator [`Dmsg`].
409    pub fn reset(&mut self) {
410        *self = Self::new();
411    }
412
413    /// Current state.
414    #[must_use]
415    pub fn state(&self) -> DynParseState {
416        self.state
417    }
418
419    /// Borrow the partial [`Dmsg`].
420    #[must_use]
421    pub fn dmsg(&self) -> &Dmsg {
422        &self.dmsg
423    }
424
425    /// Move the parsed [`Dmsg`] out of the parser. Only meaningful
426    /// after a [`ParseStep::HeaderDone`] step.
427    pub fn take_dmsg(&mut self) -> Dmsg {
428        let mut out = Dmsg::new();
429        std::mem::swap(&mut out, &mut self.dmsg);
430        self.state = DynParseState::Start;
431        self.num = 0;
432        self.data_remaining = 0;
433        self.magic_progress = 0;
434        self.prev_was_digit = false;
435        out
436    }
437
438    /// Feed `input` to the parser. The parser advances as far as it
439    /// can and returns one of the three [`ParseStep`] variants.
440    ///
441    /// The state machine is byte-driven and can be reentered with a
442    /// fresh slice when [`ParseStep::NeedMore`] indicates the input
443    /// was truncated mid-header.
444    ///
445    /// # Examples
446    ///
447    /// ```
448    /// use dynomite::proto::dnode::{DnodeParser, ParseStep};
449    /// let mut p = DnodeParser::new();
450    /// let bytes = b"$2014$ 1 3 0 1 1 *1 d *0\r\n";
451    /// match p.step(bytes) {
452    ///     ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
453    ///     other => panic!("unexpected: {other:?}"),
454    /// }
455    /// ```
456    /// The state machine intentionally stays in one function to
457    /// match the reference engine's single-block parser; splitting
458    /// the per-state arms across helpers would obscure the parity.
459    #[allow(clippy::too_many_lines)]
460    pub fn step(&mut self, input: &[u8]) -> ParseStep {
461        let mut idx = 0usize;
462        while idx < input.len() {
463            let ch = input[idx];
464            match self.state {
465                DynParseState::Start => {
466                    // Phase 1: skip leading whitespace, identical
467                    // to the C engine's `while (ch == ' ')` arm.
468                    if self.magic_progress == 0 {
469                        if ch == b' ' {
470                            idx += 1;
471                            continue;
472                        }
473                        if ch != b'$' {
474                            return ParseStep::Error { consumed: idx };
475                        }
476                    }
477                    // Phase 2: byte-incrementally match the magic
478                    // literal so split inputs are tolerated.
479                    let want = MAGIC[usize::from(self.magic_progress)];
480                    if ch != want {
481                        return ParseStep::Error { consumed: idx };
482                    }
483                    self.magic_progress += 1;
484                    idx += 1;
485                    if usize::from(self.magic_progress) == MAGIC.len() {
486                        self.state = DynParseState::MagicString;
487                        self.magic_progress = 0;
488                    }
489                    continue;
490                }
491                DynParseState::MagicString => {
492                    if ch == b' ' {
493                        self.state = DynParseState::MsgId;
494                        self.num = 0;
495                        idx += 1;
496                        continue;
497                    }
498                    return ParseStep::Error { consumed: idx };
499                }
500                DynParseState::MsgId => {
501                    // DYN_MSG_ID state: digits accumulate, a single
502                    // space terminates the field but only when the
503                    // byte immediately
504                    // before it was a digit. Anything else is
505                    // rejected (the C engine resets to DYN_START
506                    // and lets the recovery path retry; the Rust
507                    // streaming parser surfaces an error so the
508                    // caller can drop the buffer).
509                    if ch.is_ascii_digit() {
510                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
511                        self.prev_was_digit = true;
512                        idx += 1;
513                        continue;
514                    }
515                    if ch == b' ' && self.prev_was_digit {
516                        self.dmsg.id = self.num;
517                        self.state = DynParseState::TypeId;
518                        self.num = 0;
519                        self.prev_was_digit = false;
520                        idx += 1;
521                        continue;
522                    }
523                    return ParseStep::Error { consumed: idx };
524                }
525                DynParseState::TypeId => {
526                    if ch.is_ascii_digit() {
527                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
528                        self.prev_was_digit = true;
529                        idx += 1;
530                        continue;
531                    }
532                    if ch == b' ' && self.prev_was_digit {
533                        self.dmsg.ty = match DmsgType::from_u8(self.num as u8) {
534                            Some(t) => t,
535                            None => return ParseStep::Error { consumed: idx },
536                        };
537                        self.state = DynParseState::BitField;
538                        self.num = 0;
539                        self.prev_was_digit = false;
540                        idx += 1;
541                        continue;
542                    }
543                    return ParseStep::Error { consumed: idx };
544                }
545                DynParseState::BitField => {
546                    if ch.is_ascii_digit() {
547                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
548                        self.prev_was_digit = true;
549                        idx += 1;
550                        continue;
551                    }
552                    if ch == b' ' && self.prev_was_digit {
553                        self.dmsg.flags = (self.num as u8) & 0xF;
554                        self.state = DynParseState::Version;
555                        self.num = 0;
556                        self.prev_was_digit = false;
557                        idx += 1;
558                        continue;
559                    }
560                    return ParseStep::Error { consumed: idx };
561                }
562                DynParseState::Version => {
563                    if ch.is_ascii_digit() {
564                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
565                        self.prev_was_digit = true;
566                        idx += 1;
567                        continue;
568                    }
569                    if ch == b' ' && self.prev_was_digit {
570                        self.dmsg.version = self.num as u8;
571                        self.state = DynParseState::SameDc;
572                        self.num = 0;
573                        self.prev_was_digit = false;
574                        idx += 1;
575                        continue;
576                    }
577                    return ParseStep::Error { consumed: idx };
578                }
579                DynParseState::SameDc => {
580                    if ch.is_ascii_digit() {
581                        self.dmsg.same_dc = ch != b'0';
582                        self.prev_was_digit = true;
583                        idx += 1;
584                        continue;
585                    }
586                    if ch == b' ' && self.prev_was_digit {
587                        self.state = DynParseState::DataLen;
588                        self.num = 0;
589                        self.prev_was_digit = false;
590                        idx += 1;
591                        continue;
592                    }
593                    return ParseStep::Error { consumed: idx };
594                }
595                DynParseState::Star | DynParseState::DataLen => {
596                    if ch == b'*' {
597                        idx += 1;
598                        continue;
599                    }
600                    if ch.is_ascii_digit() {
601                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
602                        idx += 1;
603                        continue;
604                    }
605                    if ch == b' ' && self.state == DynParseState::DataLen {
606                        self.dmsg.mlen = self.num as u32;
607                        self.data_remaining = self.dmsg.mlen;
608                        self.dmsg.data.clear();
609                        self.dmsg.data.reserve(self.data_remaining as usize);
610                        self.state = DynParseState::Data;
611                        self.num = 0;
612                        idx += 1;
613                        continue;
614                    }
615                    return ParseStep::Error { consumed: idx };
616                }
617                DynParseState::Data => {
618                    if self.data_remaining == 0 {
619                        self.state = DynParseState::SpacesBeforePayloadLen;
620                        continue;
621                    }
622                    let take = std::cmp::min(self.data_remaining as usize, input.len() - idx);
623                    self.dmsg.data.extend_from_slice(&input[idx..idx + take]);
624                    self.data_remaining -= take as u32;
625                    idx += take;
626                    if self.data_remaining == 0 {
627                        self.state = DynParseState::SpacesBeforePayloadLen;
628                    }
629                    continue;
630                }
631                DynParseState::SpacesBeforePayloadLen => {
632                    if ch == b' ' {
633                        idx += 1;
634                        continue;
635                    }
636                    if ch == b'*' {
637                        self.state = DynParseState::PayloadLen;
638                        self.num = 0;
639                        idx += 1;
640                        continue;
641                    }
642                    return ParseStep::Error { consumed: idx };
643                }
644                DynParseState::PayloadLen => {
645                    if ch.is_ascii_digit() {
646                        self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
647                        idx += 1;
648                        continue;
649                    }
650                    if ch == b'\r' {
651                        self.dmsg.plen = self.num as u32;
652                        self.state = DynParseState::CrlfBeforeDone;
653                        self.num = 0;
654                        idx += 1;
655                        continue;
656                    }
657                    return ParseStep::Error { consumed: idx };
658                }
659                DynParseState::CrlfBeforeDone => {
660                    if ch == b'\n' {
661                        self.state = DynParseState::Done;
662                        idx += 1;
663                        return ParseStep::HeaderDone { consumed: idx };
664                    }
665                    return ParseStep::Error { consumed: idx };
666                }
667                DynParseState::Done | DynParseState::PostDone | DynParseState::Unknown => {
668                    return ParseStep::HeaderDone { consumed: idx };
669                }
670            }
671        }
672        ParseStep::NeedMore { consumed: idx }
673    }
674}
675
676impl Default for DnodeParser {
677    fn default() -> Self {
678        Self::new()
679    }
680}
681
682/// Encode a DNODE header into the writable region of `mbuf`.
683///
684/// `aes_key_payload`, when `Some`, is written as the inline data
685/// field; this is how the crypto handshake transports the
686/// RSA-wrapped AES key. When `None`, a single-byte `'d'` placeholder
687/// is emitted.
688///
689/// `flags` is taken verbatim (the encryption bit must be set by the
690/// caller, alongside any compression bit).
691///
692/// The encoder writes the entire header as a single contiguous
693/// region; if `mbuf` lacks the necessary capacity,
694/// [`DnodeError::OutOfSpace`] is returned.
695///
696/// # Examples
697///
698/// ```
699/// use dynomite::io::mbuf::MbufPool;
700/// use dynomite::proto::dnode::{dmsg_write, DmsgType};
701///
702/// let pool = MbufPool::default();
703/// let mut buf = pool.get();
704/// dmsg_write(
705///     &mut buf,
706///     /* msg_id */ 1,
707///     DmsgType::Req,
708///     /* flags */ 0,
709///     /* same_dc */ true,
710///     /* aes_key_payload */ None,
711///     /* plen */ 0,
712/// )
713/// .unwrap();
714/// assert!(buf.readable().starts_with(b"   $2014$ 1 3 0"));
715/// ```
716pub fn dmsg_write(
717    mbuf: &mut Mbuf,
718    msg_id: MsgId,
719    ty: DmsgType,
720    flags: u8,
721    same_dc: bool,
722    aes_key_payload: Option<&[u8]>,
723    plen: u32,
724) -> Result<(), DnodeError> {
725    let header = build_header(msg_id, ty, flags, same_dc, aes_key_payload, plen, false);
726    write_chain(mbuf, &header)
727}
728
729/// Encode a gossip-flavored DNODE header.
730///
731/// Mirrors the reference engine's `dmsg_write_mbuf`, which differs
732/// from [`dmsg_write`] only in the placeholder byte emitted when no
733/// AES key payload accompanies the header (`'a'` instead of `'d'`).
734///
735/// # Examples
736///
737/// ```
738/// use dynomite::io::mbuf::MbufPool;
739/// use dynomite::proto::dnode::{dmsg_write_mbuf, DmsgType};
740///
741/// let pool = MbufPool::default();
742/// let mut buf = pool.get();
743/// dmsg_write_mbuf(
744///     &mut buf,
745///     /* msg_id */ 5,
746///     DmsgType::GossipSyn,
747///     /* flags */ 0,
748///     /* same_dc */ true,
749///     /* aes_key_payload */ None,
750///     /* plen */ 64,
751/// )
752/// .unwrap();
753/// assert!(buf.readable().contains(&b'a'));
754/// ```
755pub fn dmsg_write_mbuf(
756    mbuf: &mut Mbuf,
757    msg_id: MsgId,
758    ty: DmsgType,
759    flags: u8,
760    same_dc: bool,
761    aes_key_payload: Option<&[u8]>,
762    plen: u32,
763) -> Result<(), DnodeError> {
764    let header = build_header(msg_id, ty, flags, same_dc, aes_key_payload, plen, true);
765    write_chain(mbuf, &header)
766}
767
768fn build_header(
769    msg_id: MsgId,
770    ty: DmsgType,
771    flags: u8,
772    same_dc: bool,
773    aes_key_payload: Option<&[u8]>,
774    plen: u32,
775    gossip_placeholder: bool,
776) -> Vec<u8> {
777    use std::io::Write as _;
778    let mut buf: Vec<u8> = Vec::with_capacity(64);
779    // Three leading spaces are part of the magic literal as written
780    // on the wire; the parser tolerates and skips them in DYN_START.
781    buf.extend_from_slice(b"   $2014$ ");
782    let _ = write!(buf, "{msg_id}");
783    buf.push(b' ');
784    let _ = write!(buf, "{}", ty.as_u8());
785    buf.push(b' ');
786    let _ = write!(buf, "{}", flags & 0xF);
787    buf.push(b' ');
788    let _ = write!(buf, "{VERSION_10}");
789    buf.push(b' ');
790    buf.push(if same_dc { b'1' } else { b'0' });
791    buf.push(b' ');
792    buf.push(b'*');
793    if let Some(payload) = aes_key_payload {
794        let _ = write!(buf, "{}", payload.len());
795        buf.push(b' ');
796        buf.extend_from_slice(payload);
797    } else {
798        buf.extend_from_slice(b"1 ");
799        buf.push(if gossip_placeholder {
800            GOSSIP_PLACEHOLDER_DATA
801        } else {
802            HANDSHAKE_PLACEHOLDER_DATA
803        });
804    }
805    buf.push(b' ');
806    buf.push(b'*');
807    let _ = write!(buf, "{plen}");
808    buf.extend_from_slice(CRLF);
809    buf
810}
811
812fn write_chain(mbuf: &mut Mbuf, payload: &[u8]) -> Result<(), DnodeError> {
813    if mbuf.remaining() < payload.len() {
814        return Err(DnodeError::OutOfSpace);
815    }
816    let n = mbuf.recv(payload);
817    debug_assert_eq!(n, payload.len());
818    Ok(())
819}
820
821/// Sync byte parser that drives a request message's DNODE header
822/// state machine.
823///
824/// The parser walks the contiguous bytes spanning the message's
825/// mbuf chain and updates the [`Msg`] in place. On a fully parsed
826/// header, the function attaches the [`Dmsg`] to the message and
827/// returns `MsgParseResult::Ok`. On truncated input the parser
828/// returns `MsgParseResult::Again`. On invalid bytes the parser
829/// records `MsgParseResult::Error` and surfaces the same value.
830///
831/// The async wrapping (per-connection task scheduling, decryption
832/// hand-off when the encryption bit is set) ships in Stage 9.
833///
834/// # Examples
835///
836/// ```
837/// use dynomite::io::mbuf::MbufPool;
838/// use dynomite::msg::{Msg, MsgType};
839/// use dynomite::proto::dnode::{parse_req, DmsgType, DynParseState};
840///
841/// let pool = MbufPool::default();
842/// let mut msg = Msg::new(0, MsgType::Unknown, true);
843/// let mut mb = pool.get();
844/// mb.recv(b"$2014$ 1 3 0 1 1 *1 d *0\r\n");
845/// msg.mbufs_mut().push_back(mb);
846/// msg.recompute_mlen();
847/// let result = parse_req(&mut msg);
848/// assert_eq!(msg.dyn_parse_state(), DynParseState::Done);
849/// assert_eq!(msg.dmsg().unwrap().ty, DmsgType::Req);
850/// drop(result);
851/// ```
852pub fn parse_req(msg: &mut Msg) -> MsgParseResult {
853    parse_msg(msg, false)
854}
855
856/// Sync byte parser counterpart to [`parse_req`] for response
857/// messages.
858///
859/// # Examples
860///
861/// ```
862/// use dynomite::io::mbuf::MbufPool;
863/// use dynomite::msg::{Msg, MsgType};
864/// use dynomite::proto::dnode::{parse_rsp, DmsgType};
865///
866/// let pool = MbufPool::default();
867/// let mut msg = Msg::new(0, MsgType::Unknown, false);
868/// let mut mb = pool.get();
869/// mb.recv(b"$2014$ 9 5 0 1 1 *1 d *0\r\n");
870/// msg.mbufs_mut().push_back(mb);
871/// msg.recompute_mlen();
872/// let _ = parse_rsp(&mut msg);
873/// assert_eq!(msg.dmsg().unwrap().ty, DmsgType::Res);
874/// ```
875pub fn parse_rsp(msg: &mut Msg) -> MsgParseResult {
876    parse_msg(msg, true)
877}
878
879fn parse_msg(msg: &mut Msg, _is_response: bool) -> MsgParseResult {
880    // Flatten the chain into a single buffer for parsing. The
881    // reference engine walks the chain byte by byte and tolerates
882    // splits at arbitrary boundaries; the Rust port drives the same
883    // state machine over a contiguous slice. Stage 9 will replace
884    // this with a streaming feed when the connection FSM lands.
885    let mut bytes: Vec<u8> = Vec::with_capacity(msg.mbufs().total_len());
886    for mbuf in msg.mbufs() {
887        bytes.extend_from_slice(mbuf.readable());
888    }
889
890    let mut parser = DnodeParser::new();
891    parser.state = msg.dyn_parse_state();
892    match parser.step(&bytes) {
893        ParseStep::HeaderDone { .. } => {
894            let dmsg = parser.take_dmsg();
895            msg.set_dyn_parse_state(DynParseState::Done);
896            msg.set_dmsg(dmsg);
897            msg.set_parse_result(MsgParseResult::Ok);
898            MsgParseResult::Ok
899        }
900        ParseStep::NeedMore { .. } => {
901            msg.set_dyn_parse_state(parser.state);
902            msg.set_parse_result(MsgParseResult::Again);
903            MsgParseResult::Again
904        }
905        ParseStep::Error { .. } => {
906            msg.set_dyn_parse_state(DynParseState::Unknown);
907            msg.set_parse_result(MsgParseResult::Error);
908            MsgParseResult::Error
909        }
910    }
911}
912
913/// Outcome of [`dmsg_process`].
914///
915/// `Bypass` means the header has been recognised as control traffic
916/// and the cluster layer should not pass the message further down
917/// the protocol stack.
918#[derive(Copy, Clone, Debug, Eq, PartialEq)]
919pub enum DmsgDispatch {
920    /// Frame consumed by a control-plane handler.
921    Bypass,
922    /// Frame should continue through the data-plane stack.
923    Forward,
924}
925
926/// Stage 7 dispatcher: classify a parsed [`Dmsg`] as
927/// control-plane traffic the cluster layer should consume directly,
928/// or data-plane traffic that should continue through the protocol
929/// stack.
930///
931/// Stage 10 will extend this with the gossip-message decoders. For
932/// now, only the message-shape side of the dispatch is in place.
933///
934/// # Examples
935///
936/// ```
937/// use dynomite::proto::dnode::{dmsg_process, Dmsg, DmsgDispatch, DmsgType};
938///
939/// let mut d = Dmsg::new();
940/// d.ty = DmsgType::CryptoHandshake;
941/// assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
942///
943/// // Gossip variants other than SYN / SYN_REPLY fall through.
944/// d.ty = DmsgType::GossipShutdown;
945/// assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
946///
947/// d.ty = DmsgType::Req;
948/// assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
949/// ```
950#[must_use]
951pub fn dmsg_process(dmsg: &Dmsg) -> DmsgDispatch {
952    // Dmsg dispatch table: only CRYPTO_HANDSHAKE,
953    // GOSSIP_SYN, and GOSSIP_SYN_REPLY short-circuit; the other
954    // gossip variants (ACK, DIGEST_SYN, DIGEST_ACK, DIGEST_ACK2,
955    // SHUTDOWN) fall through to the default branch and are
956    // forwarded to the cluster handlers (which Stage 10 will wire
957    // up). HANDOFF_CHUNK frames are control-plane traffic for
958    // the explicit handoff coordinator and bypass the data-plane
959    // stack alongside the crypto / gossip handshake variants.
960    match dmsg.ty {
961        DmsgType::CryptoHandshake
962        | DmsgType::GossipSyn
963        | DmsgType::GossipSynReply
964        | DmsgType::HandoffChunk
965        | DmsgType::FtSearchReq
966        | DmsgType::FtSearchRep => DmsgDispatch::Bypass,
967        _ => DmsgDispatch::Forward,
968    }
969}
970
971/// Drain `chain` into a contiguous `Vec<u8>` recycling each chunk
972/// back to `pool`. Useful for tests and for the Stage 9 path that
973/// needs a flat buffer of decrypted payload bytes.
974pub fn flatten_chain(chain: &mut MbufQueue) -> Vec<u8> {
975    let mut out = Vec::with_capacity(chain.total_len());
976    while let Some(buf) = chain.pop_front() {
977        out.extend_from_slice(buf.readable());
978    }
979    out
980}
981
982/// Peer-handshake control payload exchanged on top of a
983/// [`DmsgType::GossipSyn`] frame.
984///
985/// Today the handshake carries the cluster-wide capability
986/// advertisement (see [`crate::cluster::capability`]). Future
987/// fields will be appended as new typed records; older peers
988/// ignore unknown trailing bytes.
989///
990/// # Wire format
991///
992/// ```text
993/// magic(4) = "DHS1"
994/// flags(2) = 0
995/// CapabilityAd (length-prefixed, see
996///                `CapabilityAd::encode` for the exact layout)
997/// ```
998///
999/// All multi-byte integers are little-endian. The encoding uses
1000/// only the standard library; no external codec is pulled in.
1001///
1002/// # Examples
1003///
1004/// ```
1005/// use dynomite::cluster::capability::{CapabilityAd, CapabilityAdEntry};
1006/// use dynomite::proto::dnode::Handshake;
1007/// let ad = CapabilityAd::from_entries(vec![
1008///     CapabilityAdEntry::new("framing".into(), vec![vec![1, 0, 0, 0]]),
1009/// ]);
1010/// let hs = Handshake::new(ad.clone());
1011/// let bytes = hs.encode();
1012/// let back = Handshake::decode(&bytes).unwrap();
1013/// assert_eq!(back.capabilities(), &ad);
1014/// ```
1015#[derive(Clone, Debug, Default, Eq, PartialEq)]
1016pub struct Handshake {
1017    capabilities: crate::cluster::capability::CapabilityAd,
1018}
1019
1020impl Handshake {
1021    /// Magic literal that opens every handshake payload.
1022    pub const MAGIC: [u8; 4] = *b"DHS1";
1023
1024    /// Build a handshake carrying `capabilities`.
1025    #[must_use]
1026    pub fn new(capabilities: crate::cluster::capability::CapabilityAd) -> Self {
1027        Self { capabilities }
1028    }
1029
1030    /// Borrow the embedded capability advertisement.
1031    #[must_use]
1032    pub fn capabilities(&self) -> &crate::cluster::capability::CapabilityAd {
1033        &self.capabilities
1034    }
1035
1036    /// Consume the handshake and return the embedded
1037    /// advertisement.
1038    #[must_use]
1039    pub fn into_capabilities(self) -> crate::cluster::capability::CapabilityAd {
1040        self.capabilities
1041    }
1042
1043    /// Serialise the handshake to a length-prefixed byte
1044    /// stream.
1045    #[must_use]
1046    pub fn encode(&self) -> Vec<u8> {
1047        let cap_bytes = self.capabilities.encode();
1048        let mut out = Vec::with_capacity(Self::MAGIC.len() + 2 + cap_bytes.len());
1049        out.extend_from_slice(&Self::MAGIC);
1050        out.extend_from_slice(&0u16.to_le_bytes()); // flags
1051        out.extend_from_slice(&cap_bytes);
1052        out
1053    }
1054
1055    /// Inverse of [`Handshake::encode`]. Surfaces a typed error
1056    /// when the magic / version is wrong or the embedded
1057    /// advertisement is malformed.
1058    pub fn decode(bytes: &[u8]) -> Result<Self, crate::cluster::capability::CapabilityCodecError> {
1059        use crate::cluster::capability::CapabilityCodecError;
1060        if bytes.len() < Self::MAGIC.len() + 2 {
1061            return Err(CapabilityCodecError::Truncated);
1062        }
1063        if bytes[..Self::MAGIC.len()] != Self::MAGIC {
1064            return Err(CapabilityCodecError::BadMagic);
1065        }
1066        // Flags are reserved; the only currently legal value is
1067        // zero. Any non-zero value is reserved for future use
1068        // and rejected here so older builds fail closed.
1069        let flags_off = Self::MAGIC.len();
1070        let flags = u16::from_le_bytes([bytes[flags_off], bytes[flags_off + 1]]);
1071        if flags != 0 {
1072            return Err(CapabilityCodecError::BadMagic);
1073        }
1074        let cap_bytes = &bytes[flags_off + 2..];
1075        let capabilities = crate::cluster::capability::CapabilityAd::decode(cap_bytes)?;
1076        Ok(Self { capabilities })
1077    }
1078
1079    /// Number of bytes the handshake's fixed-size prefix
1080    /// occupies before the embedded advertisement. Useful in
1081    /// tests that assert the on-the-wire delta.
1082    #[must_use]
1083    pub const fn header_len() -> usize {
1084        Self::MAGIC.len() + 2
1085    }
1086}
1087
1088#[cfg(test)]
1089mod tests {
1090    use super::*;
1091    use crate::io::mbuf::MbufPool;
1092
1093    #[test]
1094    fn parse_simple_req() {
1095        let mut p = DnodeParser::new();
1096        let bytes = b"$2014$ 1 3 0 1 1 *1 d *0\r\n";
1097        match p.step(bytes) {
1098            ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1099            other => panic!("unexpected: {other:?}"),
1100        }
1101        let d = p.take_dmsg();
1102        assert_eq!(d.id, 1);
1103        assert_eq!(d.ty, DmsgType::Req);
1104        assert_eq!(d.flags, 0);
1105        assert_eq!(d.version, 1);
1106        assert!(d.same_dc);
1107        assert_eq!(d.mlen, 1);
1108        assert_eq!(d.data, b"d");
1109        assert_eq!(d.plen, 0);
1110    }
1111
1112    #[test]
1113    fn parse_payload_len() {
1114        let mut p = DnodeParser::new();
1115        let bytes = b"$2014$ 2 3 0 1 1 *1 d *413\r\n";
1116        match p.step(bytes) {
1117            ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1118            other => panic!("unexpected: {other:?}"),
1119        }
1120        assert_eq!(p.dmsg().plen, 413);
1121    }
1122
1123    #[test]
1124    fn parse_three_back_to_back() {
1125        let mut input: Vec<u8> = Vec::new();
1126        input.extend_from_slice(b"$2014$ 1 3 0 1 1 *1 d *0\r\n");
1127        input.extend_from_slice(b"some redis bytes here ignored");
1128        input.extend_from_slice(b"$2014$ 2 3 0 1 1 *1 d *3\r\nABC");
1129        input.extend_from_slice(b"$2014$ 3 3 0 1 1 *1 d *0\r\n");
1130        let mut p = DnodeParser::new();
1131        let mut idx = 0;
1132        let mut count = 0;
1133        while idx < input.len() {
1134            match p.step(&input[idx..]) {
1135                ParseStep::HeaderDone { consumed } => {
1136                    let d = p.take_dmsg();
1137                    count += 1;
1138                    let after_header = idx + consumed;
1139                    if count == 1 {
1140                        assert_eq!(d.id, 1);
1141                        // skip past the redis bytes by scanning for the next '$'
1142                        idx = input[after_header..]
1143                            .iter()
1144                            .position(|&b| b == b'$')
1145                            .map_or(input.len(), |n| after_header + n);
1146                    } else if count == 2 {
1147                        assert_eq!(d.id, 2);
1148                        assert_eq!(d.plen, 3);
1149                        idx = after_header + d.plen as usize;
1150                    } else {
1151                        assert_eq!(d.id, 3);
1152                        idx = after_header;
1153                    }
1154                    p.reset();
1155                }
1156                ParseStep::NeedMore { .. } => {
1157                    break;
1158                }
1159                ParseStep::Error { consumed } => {
1160                    idx += consumed.max(1);
1161                    p.reset();
1162                }
1163            }
1164        }
1165        assert_eq!(count, 3);
1166    }
1167
1168    #[test]
1169    fn need_more_when_truncated() {
1170        let mut p = DnodeParser::new();
1171        let prefix = b"$2014$ 1 3 0 1 1 *1 d *";
1172        match p.step(prefix) {
1173            ParseStep::NeedMore { consumed } => assert_eq!(consumed, prefix.len()),
1174            other => panic!("unexpected: {other:?}"),
1175        }
1176        let suffix = b"42\r\n";
1177        match p.step(suffix) {
1178            ParseStep::HeaderDone { consumed } => assert_eq!(consumed, suffix.len()),
1179            other => panic!("unexpected: {other:?}"),
1180        }
1181        assert_eq!(p.take_dmsg().plen, 42);
1182    }
1183
1184    #[test]
1185    fn parse_error_on_garbage_prefix() {
1186        let mut p = DnodeParser::new();
1187        match p.step(b"!nope") {
1188            ParseStep::Error { consumed } => assert_eq!(consumed, 0),
1189            other => panic!("unexpected: {other:?}"),
1190        }
1191    }
1192
1193    #[test]
1194    fn writer_round_trip_unencrypted() {
1195        let pool = MbufPool::default();
1196        let mut buf = pool.get();
1197        dmsg_write(&mut buf, 42, DmsgType::Req, 0, true, None, 0).unwrap();
1198        let bytes = buf.readable().to_vec();
1199        let mut p = DnodeParser::new();
1200        let step = p.step(&bytes);
1201        assert!(matches!(step, ParseStep::HeaderDone { .. }));
1202        let d = p.take_dmsg();
1203        assert_eq!(d.id, 42);
1204        assert_eq!(d.ty, DmsgType::Req);
1205        assert_eq!(d.flags, 0);
1206        assert!(d.same_dc);
1207        assert_eq!(d.mlen, 1);
1208        assert_eq!(d.data, b"d");
1209        assert_eq!(d.plen, 0);
1210    }
1211
1212    #[test]
1213    fn writer_round_trip_with_aes_payload() {
1214        let pool = MbufPool::default();
1215        let mut buf = pool.get();
1216        let payload = vec![0xAB; 128];
1217        dmsg_write(
1218            &mut buf,
1219            7,
1220            DmsgType::CryptoHandshake,
1221            DMSG_FLAG_ENCRYPTED,
1222            false,
1223            Some(&payload),
1224            512,
1225        )
1226        .unwrap();
1227        let bytes = buf.readable().to_vec();
1228        let mut p = DnodeParser::new();
1229        match p.step(&bytes) {
1230            ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1231            other => panic!("unexpected: {other:?}"),
1232        }
1233        let d = p.take_dmsg();
1234        assert_eq!(d.id, 7);
1235        assert_eq!(d.ty, DmsgType::CryptoHandshake);
1236        assert!(d.is_encrypted());
1237        assert!(!d.same_dc);
1238        assert_eq!(d.data, payload);
1239        assert_eq!(d.plen, 512);
1240    }
1241
1242    #[test]
1243    fn dispatcher_classifies_control_plane() {
1244        let mut d = Dmsg::new();
1245        // Pin the exact three variants the C `dmsg_process`
1246        // bypasses.
1247        for ty in [
1248            DmsgType::CryptoHandshake,
1249            DmsgType::GossipSyn,
1250            DmsgType::GossipSynReply,
1251        ] {
1252            d.ty = ty;
1253            assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1254        }
1255        // Every other gossip variant falls through to the default
1256        // branch (forward), matching the C switch.
1257        for ty in [
1258            DmsgType::GossipAck,
1259            DmsgType::GossipDigestSyn,
1260            DmsgType::GossipDigestAck,
1261            DmsgType::GossipDigestAck2,
1262            DmsgType::GossipShutdown,
1263            DmsgType::Req,
1264            DmsgType::ReqForward,
1265            DmsgType::Res,
1266        ] {
1267            d.ty = ty;
1268            assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
1269        }
1270        // HandoffChunk routes to the explicit handoff coordinator
1271        // and is therefore bypassed alongside the handshake
1272        // variants.
1273        d.ty = DmsgType::HandoffChunk;
1274        assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1275        // FT.SEARCH coordinator messages are routed to the
1276        // dedicated query-fsm coordinator via the same
1277        // bypass path used by the handoff coordinator.
1278        for ty in [DmsgType::FtSearchReq, DmsgType::FtSearchRep] {
1279            d.ty = ty;
1280            assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1281        }
1282    }
1283}