dynomite/proto/dnode.rs
1//! DNODE wire codec.
2//!
3//! The DNODE protocol frames every Dynomite peer-to-peer message
4//! with a small ASCII header followed by an opaque payload. The
5//! header carries the message id, type tag, encryption/compression
6//! flags, protocol version, same-datacenter bit, an inline data
7//! field (either a one-byte placeholder or an RSA-wrapped AES key),
8//! and the byte length of the payload that follows after `\r\n`.
9//!
10//! The reference engine implements the parser as a single state
11//! machine driven byte-by-byte. This module ports the same machine
12//! into safe Rust and exposes:
13//!
14//! * [`DynParseState`] - the parser's state alphabet.
15//! * [`DmsgType`] - the full set of message-type discriminators.
16//! * [`Dmsg`] - the in-memory header.
17//! * [`DnodeParser`] - the state machine, advanced by feeding bytes
18//! through [`DnodeParser::step`].
19//! * [`dmsg_write`] / [`dmsg_write_mbuf`] - the canonical encoders.
20//! * [`parse_req`] / [`parse_rsp`] - thin sync wrappers around the
21//! parser that operate on a [`crate::msg::Msg`]'s mbuf chain.
22//! * [`dmsg_process`] - dispatcher that classifies a parsed
23//! [`Dmsg`] by type for the cluster layer to act on.
24//!
25//! The encoder accepts an optional `aes_key_payload`: when present,
26//! the caller provides the bytes the inline data field should hold
27//! (the RSA-wrapped AES key produced by [`crate::crypto::Crypto`]).
28//! When absent, the encoder writes the single-byte `'d'` placeholder
29//! the reference engine emits after the first handshake message.
30
31// The parser truncates accumulated decimals into the same fixed
32// bit widths the reference engine uses on the wire (`u8` for the
33// type and flags, `u32` for the data and payload lengths). The
34// allowance keeps the Rust port faithful to the C `(uint8_t)num`
35// and `(uint32_t)num` casts; out-of-range numerals are surfaced as
36// parse errors elsewhere in the state machine.
37#![allow(clippy::cast_possible_truncation)]
38#![allow(clippy::needless_continue)]
39
40use std::net::SocketAddr;
41
42use crate::core::types::MsgId;
43use crate::io::mbuf::{Mbuf, MbufQueue};
44use crate::msg::message::Msg;
45use crate::msg::message::MsgParseResult;
46
47/// Magic literal that opens every DNODE header.
48pub const MAGIC: &[u8] = b"$2014$";
49
50/// Default protocol version emitted by [`dmsg_write`]. Mirrors
51/// `VERSION_10` in the reference engine.
52pub const VERSION_10: u8 = 1;
53
54/// CRLF delimiter that separates the DNODE header from its payload.
55pub const CRLF: &[u8] = b"\r\n";
56
57/// Single-byte placeholder used by [`dmsg_write`] when no AES key
58/// payload accompanies the header.
59pub const HANDSHAKE_PLACEHOLDER_DATA: u8 = b'd';
60
61/// Single-byte placeholder used by [`dmsg_write_mbuf`] when no AES
62/// key payload accompanies the header. The gossip path emits `'a'`
63/// instead of `'d'` to disambiguate the two encoder flavours.
64pub const GOSSIP_PLACEHOLDER_DATA: u8 = b'a';
65
66/// Parser state transitions.
67///
68/// Each variant matches a state in the reference engine's
69/// `dyn_parse_state_t`. The numeric values match the C enum's
70/// numeric values to keep external parity tooling honest.
71#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
72pub enum DynParseState {
73 /// Initial state; consumes leading whitespace until the magic
74 /// literal is observed.
75 #[default]
76 Start,
77 /// `$2014$` was matched; awaiting the trailing space.
78 MagicString,
79 /// Reading the decimal message id.
80 MsgId,
81 /// Reading the decimal message type.
82 TypeId,
83 /// Reading the decimal flags bit field.
84 BitField,
85 /// Reading the decimal protocol version.
86 Version,
87 /// Reading the same-datacenter digit.
88 SameDc,
89 /// Awaiting the leading `*` before the data length.
90 Star,
91 /// Reading the decimal data length.
92 DataLen,
93 /// Consuming the inline data of `mlen` bytes.
94 Data,
95 /// Skipping spaces before the payload-length marker.
96 SpacesBeforePayloadLen,
97 /// Reading the decimal payload length.
98 PayloadLen,
99 /// Awaiting the LF that terminates the header.
100 CrlfBeforeDone,
101 /// Header complete; payload position recorded.
102 Done,
103 /// Header complete and post-handshake decryption applied.
104 PostDone,
105 /// Recovery state after the parser hit a malformed byte.
106 Unknown,
107}
108
109/// DNODE message type identifier.
110///
111/// The numeric values match `dmsg_type_t` from the reference engine
112/// because the discriminator travels on the wire as a decimal.
113#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
114#[repr(u8)]
115pub enum DmsgType {
116 /// Unset / unknown type.
117 #[default]
118 Unknown = 0,
119 /// Diagnostic frame (unused on the live wire; kept for parity).
120 Debug = 1,
121 /// Parse-error frame (unused on the live wire; kept for parity).
122 ParseError = 2,
123 /// Datastore request bound for the local DC.
124 Req = 3,
125 /// Datastore request to be forwarded across DCs.
126 ReqForward = 4,
127 /// Datastore response.
128 Res = 5,
129 /// AES key handshake.
130 CryptoHandshake = 6,
131 /// Gossip SYN.
132 GossipSyn = 7,
133 /// Gossip SYN reply.
134 GossipSynReply = 8,
135 /// Gossip ACK.
136 GossipAck = 9,
137 /// Gossip digest SYN.
138 GossipDigestSyn = 10,
139 /// Gossip digest ACK.
140 GossipDigestAck = 11,
141 /// Gossip digest ACK round 2.
142 GossipDigestAck2 = 12,
143 /// Gossip shutdown notice.
144 GossipShutdown = 13,
145 /// Explicit handoff chunk frame.
146 ///
147 /// Carries one chunk of a token-range handoff stream from the
148 /// previous owner of the range to the new owner. Distinct from
149 /// the AAE exchange variants so the receiver can route handoff
150 /// frames to the dedicated handoff coordinator without parsing
151 /// the payload first.
152 HandoffChunk = 14,
153 /// Cluster-wide RediSearch FT.SEARCH request frame.
154 ///
155 /// Sent by the FT.SEARCH coordinator on the node that
156 /// received the client request to every primary peer
157 /// covering the index's key range. The payload encodes a
158 /// broadcast request (table name, serialised query body,
159 /// top-K) - see the `dynomite-search` crate's
160 /// `query_fsm::BroadcastRequest`. Routed by the dispatcher
161 /// to the dedicated FT.SEARCH coordinator FSM instead of
162 /// the data-plane stack so the per-peer query runs against
163 /// the local registry rather than being re-forwarded.
164 FtSearchReq = 15,
165 /// Cluster-wide RediSearch FT.SEARCH reply frame.
166 ///
167 /// Returned by every peer that received a [`Self::FtSearchReq`]
168 /// once its local search completed (or the per-peer
169 /// deadline elapsed). The payload encodes the per-peer
170 /// top-K hit list plus a `timed_out` flag the coordinator
171 /// uses to mark partial results.
172 FtSearchRep = 16,
173}
174
175impl DmsgType {
176 /// Build a type from its on-the-wire integer value.
177 ///
178 /// # Examples
179 ///
180 /// ```
181 /// use dynomite::proto::dnode::DmsgType;
182 /// assert_eq!(DmsgType::from_u8(3), Some(DmsgType::Req));
183 /// assert_eq!(DmsgType::from_u8(99), None);
184 /// ```
185 #[must_use]
186 pub fn from_u8(v: u8) -> Option<Self> {
187 Some(match v {
188 0 => DmsgType::Unknown,
189 1 => DmsgType::Debug,
190 2 => DmsgType::ParseError,
191 3 => DmsgType::Req,
192 4 => DmsgType::ReqForward,
193 5 => DmsgType::Res,
194 6 => DmsgType::CryptoHandshake,
195 7 => DmsgType::GossipSyn,
196 8 => DmsgType::GossipSynReply,
197 9 => DmsgType::GossipAck,
198 10 => DmsgType::GossipDigestSyn,
199 11 => DmsgType::GossipDigestAck,
200 12 => DmsgType::GossipDigestAck2,
201 13 => DmsgType::GossipShutdown,
202 14 => DmsgType::HandoffChunk,
203 15 => DmsgType::FtSearchReq,
204 16 => DmsgType::FtSearchRep,
205 _ => return None,
206 })
207 }
208
209 /// Numeric on-the-wire value.
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// use dynomite::proto::dnode::DmsgType;
215 /// assert_eq!(DmsgType::CryptoHandshake.as_u8(), 6);
216 /// ```
217 #[must_use]
218 pub const fn as_u8(self) -> u8 {
219 self as u8
220 }
221}
222
223/// Encryption bit in [`Dmsg::flags`].
224pub const DMSG_FLAG_ENCRYPTED: u8 = 0x1;
225
226/// Compression bit in [`Dmsg::flags`].
227pub const DMSG_FLAG_COMPRESSED: u8 = 0x2;
228
229/// Parsed DNODE header.
230///
231/// `data` and `payload` hold copies of the on-the-wire bytes. The
232/// encoder side fills both before emitting; the parser fills them as
233/// it advances through the state machine.
234#[derive(Clone, Debug, Default, Eq, PartialEq)]
235pub struct Dmsg {
236 /// Message id.
237 pub id: MsgId,
238 /// Message type.
239 pub ty: DmsgType,
240 /// Flag bit field; encryption is bit 0, compression is bit 1.
241 pub flags: u8,
242 /// Protocol version.
243 pub version: u8,
244 /// True when sender and receiver share a datacenter.
245 pub same_dc: bool,
246 /// Source address recorded by the recv path. Stage 7 leaves it
247 /// `None`; Stage 9 stamps it from the connection state.
248 pub source_address: Option<SocketAddr>,
249 /// Length (in bytes) of the inline data field.
250 pub mlen: u32,
251 /// Inline data: either the single-byte placeholder or the
252 /// RSA-wrapped AES key during the crypto handshake.
253 pub data: Vec<u8>,
254 /// Length (in bytes) of the trailing payload framed by the
255 /// header.
256 pub plen: u32,
257 /// Payload bytes, if collected by the parser.
258 pub payload: Vec<u8>,
259}
260
261impl Dmsg {
262 /// Construct an empty `Dmsg` defaulted the same way the
263 /// reference engine's `dmsg_get` initialises a fresh slot.
264 ///
265 /// # Examples
266 ///
267 /// ```
268 /// use dynomite::proto::dnode::{Dmsg, DmsgType, VERSION_10};
269 /// let d = Dmsg::new();
270 /// assert_eq!(d.ty, DmsgType::Unknown);
271 /// assert_eq!(d.version, VERSION_10);
272 /// assert!(d.same_dc);
273 /// ```
274 #[must_use]
275 pub fn new() -> Self {
276 Self {
277 id: 0,
278 ty: DmsgType::Unknown,
279 flags: 0,
280 version: VERSION_10,
281 same_dc: true,
282 source_address: None,
283 mlen: 0,
284 data: Vec::new(),
285 plen: 0,
286 payload: Vec::new(),
287 }
288 }
289
290 /// True when the encryption flag is set.
291 ///
292 /// # Examples
293 ///
294 /// ```
295 /// use dynomite::proto::dnode::{Dmsg, DMSG_FLAG_ENCRYPTED};
296 /// let mut d = Dmsg::new();
297 /// d.flags = DMSG_FLAG_ENCRYPTED;
298 /// assert!(d.is_encrypted());
299 /// ```
300 #[must_use]
301 pub fn is_encrypted(&self) -> bool {
302 self.flags & DMSG_FLAG_ENCRYPTED != 0
303 }
304
305 /// True when the compression flag is set.
306 ///
307 /// # Examples
308 ///
309 /// ```
310 /// use dynomite::proto::dnode::{Dmsg, DMSG_FLAG_COMPRESSED};
311 /// let mut d = Dmsg::new();
312 /// d.flags = DMSG_FLAG_COMPRESSED;
313 /// assert!(d.is_compressed());
314 /// ```
315 #[must_use]
316 pub fn is_compressed(&self) -> bool {
317 self.flags & DMSG_FLAG_COMPRESSED != 0
318 }
319}
320
321/// Result of a single [`DnodeParser::step`] invocation.
322#[derive(Copy, Clone, Debug, Eq, PartialEq)]
323pub enum ParseStep {
324 /// More bytes are required to advance the state machine. The
325 /// `consumed` field records how many of the input bytes the
326 /// parser already absorbed.
327 NeedMore {
328 /// Number of input bytes the parser absorbed before it
329 /// stopped waiting for more.
330 consumed: usize,
331 },
332 /// The header (up to and including the trailing LF) has been
333 /// parsed. The `consumed` field records the offset just past
334 /// the LF, so the caller can read the payload starting at that
335 /// index.
336 HeaderDone {
337 /// Offset just past the trailing LF.
338 consumed: usize,
339 },
340 /// The parser hit an unrecoverable bad byte. The caller should
341 /// drop the buffer (or split it at `consumed`) and reset.
342 Error {
343 /// Offset of the byte that triggered the error.
344 consumed: usize,
345 },
346}
347
348/// Errors that can be raised when encoding or parsing a DNODE
349/// header without going through the streaming state machine.
350#[derive(Copy, Clone, Debug, Eq, PartialEq)]
351#[non_exhaustive]
352pub enum DnodeError {
353 /// Buffer too small to encode the header.
354 OutOfSpace,
355 /// Header does not begin with the magic literal.
356 BadMagic,
357 /// Numeric field could not be parsed.
358 BadNumber,
359 /// Trailing CRLF missing.
360 MissingCrlf,
361 /// Type discriminator out of range.
362 BadType,
363 /// Inline data shorter than the declared `mlen`.
364 TruncatedData,
365}
366
367/// Streaming DNODE header parser.
368#[derive(Debug)]
369pub struct DnodeParser {
370 state: DynParseState,
371 num: u64,
372 dmsg: Dmsg,
373 data_remaining: u32,
374 magic_progress: u8,
375 /// Whether the previous byte was an ASCII digit. The header
376 /// state machine only transitions out of the numeric header
377 /// fields (MSG_ID, TYPE_ID, BIT_FIELD, VERSION, SAME_DC) when
378 /// the byte immediately preceding the field-terminating space
379 /// was a digit; the parser reproduces this guard so extra
380 /// whitespace (or any other non-digit byte) is rejected with
381 /// the wire protocol's strictness.
382 prev_was_digit: bool,
383}
384
385impl DnodeParser {
386 /// Build a fresh parser positioned at [`DynParseState::Start`].
387 ///
388 /// # Examples
389 ///
390 /// ```
391 /// use dynomite::proto::dnode::{DnodeParser, DynParseState};
392 /// let p = DnodeParser::new();
393 /// assert_eq!(p.state(), DynParseState::Start);
394 /// ```
395 #[must_use]
396 pub fn new() -> Self {
397 Self {
398 state: DynParseState::Start,
399 num: 0,
400 dmsg: Dmsg::new(),
401 data_remaining: 0,
402 magic_progress: 0,
403 prev_was_digit: false,
404 }
405 }
406
407 /// Reset the parser to [`DynParseState::Start`] with a fresh
408 /// accumulator [`Dmsg`].
409 pub fn reset(&mut self) {
410 *self = Self::new();
411 }
412
413 /// Current state.
414 #[must_use]
415 pub fn state(&self) -> DynParseState {
416 self.state
417 }
418
419 /// Borrow the partial [`Dmsg`].
420 #[must_use]
421 pub fn dmsg(&self) -> &Dmsg {
422 &self.dmsg
423 }
424
425 /// Move the parsed [`Dmsg`] out of the parser. Only meaningful
426 /// after a [`ParseStep::HeaderDone`] step.
427 pub fn take_dmsg(&mut self) -> Dmsg {
428 let mut out = Dmsg::new();
429 std::mem::swap(&mut out, &mut self.dmsg);
430 self.state = DynParseState::Start;
431 self.num = 0;
432 self.data_remaining = 0;
433 self.magic_progress = 0;
434 self.prev_was_digit = false;
435 out
436 }
437
438 /// Feed `input` to the parser. The parser advances as far as it
439 /// can and returns one of the three [`ParseStep`] variants.
440 ///
441 /// The state machine is byte-driven and can be reentered with a
442 /// fresh slice when [`ParseStep::NeedMore`] indicates the input
443 /// was truncated mid-header.
444 ///
445 /// # Examples
446 ///
447 /// ```
448 /// use dynomite::proto::dnode::{DnodeParser, ParseStep};
449 /// let mut p = DnodeParser::new();
450 /// let bytes = b"$2014$ 1 3 0 1 1 *1 d *0\r\n";
451 /// match p.step(bytes) {
452 /// ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
453 /// other => panic!("unexpected: {other:?}"),
454 /// }
455 /// ```
456 /// The state machine intentionally stays in one function to
457 /// match the reference engine's single-block parser; splitting
458 /// the per-state arms across helpers would obscure the parity.
459 #[allow(clippy::too_many_lines)]
460 pub fn step(&mut self, input: &[u8]) -> ParseStep {
461 let mut idx = 0usize;
462 while idx < input.len() {
463 let ch = input[idx];
464 match self.state {
465 DynParseState::Start => {
466 // Phase 1: skip leading whitespace, identical
467 // to the C engine's `while (ch == ' ')` arm.
468 if self.magic_progress == 0 {
469 if ch == b' ' {
470 idx += 1;
471 continue;
472 }
473 if ch != b'$' {
474 return ParseStep::Error { consumed: idx };
475 }
476 }
477 // Phase 2: byte-incrementally match the magic
478 // literal so split inputs are tolerated.
479 let want = MAGIC[usize::from(self.magic_progress)];
480 if ch != want {
481 return ParseStep::Error { consumed: idx };
482 }
483 self.magic_progress += 1;
484 idx += 1;
485 if usize::from(self.magic_progress) == MAGIC.len() {
486 self.state = DynParseState::MagicString;
487 self.magic_progress = 0;
488 }
489 continue;
490 }
491 DynParseState::MagicString => {
492 if ch == b' ' {
493 self.state = DynParseState::MsgId;
494 self.num = 0;
495 idx += 1;
496 continue;
497 }
498 return ParseStep::Error { consumed: idx };
499 }
500 DynParseState::MsgId => {
501 // DYN_MSG_ID state: digits accumulate, a single
502 // space terminates the field but only when the
503 // byte immediately
504 // before it was a digit. Anything else is
505 // rejected (the C engine resets to DYN_START
506 // and lets the recovery path retry; the Rust
507 // streaming parser surfaces an error so the
508 // caller can drop the buffer).
509 if ch.is_ascii_digit() {
510 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
511 self.prev_was_digit = true;
512 idx += 1;
513 continue;
514 }
515 if ch == b' ' && self.prev_was_digit {
516 self.dmsg.id = self.num;
517 self.state = DynParseState::TypeId;
518 self.num = 0;
519 self.prev_was_digit = false;
520 idx += 1;
521 continue;
522 }
523 return ParseStep::Error { consumed: idx };
524 }
525 DynParseState::TypeId => {
526 if ch.is_ascii_digit() {
527 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
528 self.prev_was_digit = true;
529 idx += 1;
530 continue;
531 }
532 if ch == b' ' && self.prev_was_digit {
533 self.dmsg.ty = match DmsgType::from_u8(self.num as u8) {
534 Some(t) => t,
535 None => return ParseStep::Error { consumed: idx },
536 };
537 self.state = DynParseState::BitField;
538 self.num = 0;
539 self.prev_was_digit = false;
540 idx += 1;
541 continue;
542 }
543 return ParseStep::Error { consumed: idx };
544 }
545 DynParseState::BitField => {
546 if ch.is_ascii_digit() {
547 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
548 self.prev_was_digit = true;
549 idx += 1;
550 continue;
551 }
552 if ch == b' ' && self.prev_was_digit {
553 self.dmsg.flags = (self.num as u8) & 0xF;
554 self.state = DynParseState::Version;
555 self.num = 0;
556 self.prev_was_digit = false;
557 idx += 1;
558 continue;
559 }
560 return ParseStep::Error { consumed: idx };
561 }
562 DynParseState::Version => {
563 if ch.is_ascii_digit() {
564 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
565 self.prev_was_digit = true;
566 idx += 1;
567 continue;
568 }
569 if ch == b' ' && self.prev_was_digit {
570 self.dmsg.version = self.num as u8;
571 self.state = DynParseState::SameDc;
572 self.num = 0;
573 self.prev_was_digit = false;
574 idx += 1;
575 continue;
576 }
577 return ParseStep::Error { consumed: idx };
578 }
579 DynParseState::SameDc => {
580 if ch.is_ascii_digit() {
581 self.dmsg.same_dc = ch != b'0';
582 self.prev_was_digit = true;
583 idx += 1;
584 continue;
585 }
586 if ch == b' ' && self.prev_was_digit {
587 self.state = DynParseState::DataLen;
588 self.num = 0;
589 self.prev_was_digit = false;
590 idx += 1;
591 continue;
592 }
593 return ParseStep::Error { consumed: idx };
594 }
595 DynParseState::Star | DynParseState::DataLen => {
596 if ch == b'*' {
597 idx += 1;
598 continue;
599 }
600 if ch.is_ascii_digit() {
601 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
602 idx += 1;
603 continue;
604 }
605 if ch == b' ' && self.state == DynParseState::DataLen {
606 self.dmsg.mlen = self.num as u32;
607 self.data_remaining = self.dmsg.mlen;
608 self.dmsg.data.clear();
609 self.dmsg.data.reserve(self.data_remaining as usize);
610 self.state = DynParseState::Data;
611 self.num = 0;
612 idx += 1;
613 continue;
614 }
615 return ParseStep::Error { consumed: idx };
616 }
617 DynParseState::Data => {
618 if self.data_remaining == 0 {
619 self.state = DynParseState::SpacesBeforePayloadLen;
620 continue;
621 }
622 let take = std::cmp::min(self.data_remaining as usize, input.len() - idx);
623 self.dmsg.data.extend_from_slice(&input[idx..idx + take]);
624 self.data_remaining -= take as u32;
625 idx += take;
626 if self.data_remaining == 0 {
627 self.state = DynParseState::SpacesBeforePayloadLen;
628 }
629 continue;
630 }
631 DynParseState::SpacesBeforePayloadLen => {
632 if ch == b' ' {
633 idx += 1;
634 continue;
635 }
636 if ch == b'*' {
637 self.state = DynParseState::PayloadLen;
638 self.num = 0;
639 idx += 1;
640 continue;
641 }
642 return ParseStep::Error { consumed: idx };
643 }
644 DynParseState::PayloadLen => {
645 if ch.is_ascii_digit() {
646 self.num = self.num.wrapping_mul(10) + u64::from(ch - b'0');
647 idx += 1;
648 continue;
649 }
650 if ch == b'\r' {
651 self.dmsg.plen = self.num as u32;
652 self.state = DynParseState::CrlfBeforeDone;
653 self.num = 0;
654 idx += 1;
655 continue;
656 }
657 return ParseStep::Error { consumed: idx };
658 }
659 DynParseState::CrlfBeforeDone => {
660 if ch == b'\n' {
661 self.state = DynParseState::Done;
662 idx += 1;
663 return ParseStep::HeaderDone { consumed: idx };
664 }
665 return ParseStep::Error { consumed: idx };
666 }
667 DynParseState::Done | DynParseState::PostDone | DynParseState::Unknown => {
668 return ParseStep::HeaderDone { consumed: idx };
669 }
670 }
671 }
672 ParseStep::NeedMore { consumed: idx }
673 }
674}
675
676impl Default for DnodeParser {
677 fn default() -> Self {
678 Self::new()
679 }
680}
681
682/// Encode a DNODE header into the writable region of `mbuf`.
683///
684/// `aes_key_payload`, when `Some`, is written as the inline data
685/// field; this is how the crypto handshake transports the
686/// RSA-wrapped AES key. When `None`, a single-byte `'d'` placeholder
687/// is emitted.
688///
689/// `flags` is taken verbatim (the encryption bit must be set by the
690/// caller, alongside any compression bit).
691///
692/// The encoder writes the entire header as a single contiguous
693/// region; if `mbuf` lacks the necessary capacity,
694/// [`DnodeError::OutOfSpace`] is returned.
695///
696/// # Examples
697///
698/// ```
699/// use dynomite::io::mbuf::MbufPool;
700/// use dynomite::proto::dnode::{dmsg_write, DmsgType};
701///
702/// let pool = MbufPool::default();
703/// let mut buf = pool.get();
704/// dmsg_write(
705/// &mut buf,
706/// /* msg_id */ 1,
707/// DmsgType::Req,
708/// /* flags */ 0,
709/// /* same_dc */ true,
710/// /* aes_key_payload */ None,
711/// /* plen */ 0,
712/// )
713/// .unwrap();
714/// assert!(buf.readable().starts_with(b" $2014$ 1 3 0"));
715/// ```
716pub fn dmsg_write(
717 mbuf: &mut Mbuf,
718 msg_id: MsgId,
719 ty: DmsgType,
720 flags: u8,
721 same_dc: bool,
722 aes_key_payload: Option<&[u8]>,
723 plen: u32,
724) -> Result<(), DnodeError> {
725 let header = build_header(msg_id, ty, flags, same_dc, aes_key_payload, plen, false);
726 write_chain(mbuf, &header)
727}
728
729/// Encode a gossip-flavored DNODE header.
730///
731/// Mirrors the reference engine's `dmsg_write_mbuf`, which differs
732/// from [`dmsg_write`] only in the placeholder byte emitted when no
733/// AES key payload accompanies the header (`'a'` instead of `'d'`).
734///
735/// # Examples
736///
737/// ```
738/// use dynomite::io::mbuf::MbufPool;
739/// use dynomite::proto::dnode::{dmsg_write_mbuf, DmsgType};
740///
741/// let pool = MbufPool::default();
742/// let mut buf = pool.get();
743/// dmsg_write_mbuf(
744/// &mut buf,
745/// /* msg_id */ 5,
746/// DmsgType::GossipSyn,
747/// /* flags */ 0,
748/// /* same_dc */ true,
749/// /* aes_key_payload */ None,
750/// /* plen */ 64,
751/// )
752/// .unwrap();
753/// assert!(buf.readable().contains(&b'a'));
754/// ```
755pub fn dmsg_write_mbuf(
756 mbuf: &mut Mbuf,
757 msg_id: MsgId,
758 ty: DmsgType,
759 flags: u8,
760 same_dc: bool,
761 aes_key_payload: Option<&[u8]>,
762 plen: u32,
763) -> Result<(), DnodeError> {
764 let header = build_header(msg_id, ty, flags, same_dc, aes_key_payload, plen, true);
765 write_chain(mbuf, &header)
766}
767
768fn build_header(
769 msg_id: MsgId,
770 ty: DmsgType,
771 flags: u8,
772 same_dc: bool,
773 aes_key_payload: Option<&[u8]>,
774 plen: u32,
775 gossip_placeholder: bool,
776) -> Vec<u8> {
777 use std::io::Write as _;
778 let mut buf: Vec<u8> = Vec::with_capacity(64);
779 // Three leading spaces are part of the magic literal as written
780 // on the wire; the parser tolerates and skips them in DYN_START.
781 buf.extend_from_slice(b" $2014$ ");
782 let _ = write!(buf, "{msg_id}");
783 buf.push(b' ');
784 let _ = write!(buf, "{}", ty.as_u8());
785 buf.push(b' ');
786 let _ = write!(buf, "{}", flags & 0xF);
787 buf.push(b' ');
788 let _ = write!(buf, "{VERSION_10}");
789 buf.push(b' ');
790 buf.push(if same_dc { b'1' } else { b'0' });
791 buf.push(b' ');
792 buf.push(b'*');
793 if let Some(payload) = aes_key_payload {
794 let _ = write!(buf, "{}", payload.len());
795 buf.push(b' ');
796 buf.extend_from_slice(payload);
797 } else {
798 buf.extend_from_slice(b"1 ");
799 buf.push(if gossip_placeholder {
800 GOSSIP_PLACEHOLDER_DATA
801 } else {
802 HANDSHAKE_PLACEHOLDER_DATA
803 });
804 }
805 buf.push(b' ');
806 buf.push(b'*');
807 let _ = write!(buf, "{plen}");
808 buf.extend_from_slice(CRLF);
809 buf
810}
811
812fn write_chain(mbuf: &mut Mbuf, payload: &[u8]) -> Result<(), DnodeError> {
813 if mbuf.remaining() < payload.len() {
814 return Err(DnodeError::OutOfSpace);
815 }
816 let n = mbuf.recv(payload);
817 debug_assert_eq!(n, payload.len());
818 Ok(())
819}
820
821/// Sync byte parser that drives a request message's DNODE header
822/// state machine.
823///
824/// The parser walks the contiguous bytes spanning the message's
825/// mbuf chain and updates the [`Msg`] in place. On a fully parsed
826/// header, the function attaches the [`Dmsg`] to the message and
827/// returns `MsgParseResult::Ok`. On truncated input the parser
828/// returns `MsgParseResult::Again`. On invalid bytes the parser
829/// records `MsgParseResult::Error` and surfaces the same value.
830///
831/// The async wrapping (per-connection task scheduling, decryption
832/// hand-off when the encryption bit is set) ships in Stage 9.
833///
834/// # Examples
835///
836/// ```
837/// use dynomite::io::mbuf::MbufPool;
838/// use dynomite::msg::{Msg, MsgType};
839/// use dynomite::proto::dnode::{parse_req, DmsgType, DynParseState};
840///
841/// let pool = MbufPool::default();
842/// let mut msg = Msg::new(0, MsgType::Unknown, true);
843/// let mut mb = pool.get();
844/// mb.recv(b"$2014$ 1 3 0 1 1 *1 d *0\r\n");
845/// msg.mbufs_mut().push_back(mb);
846/// msg.recompute_mlen();
847/// let result = parse_req(&mut msg);
848/// assert_eq!(msg.dyn_parse_state(), DynParseState::Done);
849/// assert_eq!(msg.dmsg().unwrap().ty, DmsgType::Req);
850/// drop(result);
851/// ```
852pub fn parse_req(msg: &mut Msg) -> MsgParseResult {
853 parse_msg(msg, false)
854}
855
856/// Sync byte parser counterpart to [`parse_req`] for response
857/// messages.
858///
859/// # Examples
860///
861/// ```
862/// use dynomite::io::mbuf::MbufPool;
863/// use dynomite::msg::{Msg, MsgType};
864/// use dynomite::proto::dnode::{parse_rsp, DmsgType};
865///
866/// let pool = MbufPool::default();
867/// let mut msg = Msg::new(0, MsgType::Unknown, false);
868/// let mut mb = pool.get();
869/// mb.recv(b"$2014$ 9 5 0 1 1 *1 d *0\r\n");
870/// msg.mbufs_mut().push_back(mb);
871/// msg.recompute_mlen();
872/// let _ = parse_rsp(&mut msg);
873/// assert_eq!(msg.dmsg().unwrap().ty, DmsgType::Res);
874/// ```
875pub fn parse_rsp(msg: &mut Msg) -> MsgParseResult {
876 parse_msg(msg, true)
877}
878
879fn parse_msg(msg: &mut Msg, _is_response: bool) -> MsgParseResult {
880 // Flatten the chain into a single buffer for parsing. The
881 // reference engine walks the chain byte by byte and tolerates
882 // splits at arbitrary boundaries; the Rust port drives the same
883 // state machine over a contiguous slice. Stage 9 will replace
884 // this with a streaming feed when the connection FSM lands.
885 let mut bytes: Vec<u8> = Vec::with_capacity(msg.mbufs().total_len());
886 for mbuf in msg.mbufs() {
887 bytes.extend_from_slice(mbuf.readable());
888 }
889
890 let mut parser = DnodeParser::new();
891 parser.state = msg.dyn_parse_state();
892 match parser.step(&bytes) {
893 ParseStep::HeaderDone { .. } => {
894 let dmsg = parser.take_dmsg();
895 msg.set_dyn_parse_state(DynParseState::Done);
896 msg.set_dmsg(dmsg);
897 msg.set_parse_result(MsgParseResult::Ok);
898 MsgParseResult::Ok
899 }
900 ParseStep::NeedMore { .. } => {
901 msg.set_dyn_parse_state(parser.state);
902 msg.set_parse_result(MsgParseResult::Again);
903 MsgParseResult::Again
904 }
905 ParseStep::Error { .. } => {
906 msg.set_dyn_parse_state(DynParseState::Unknown);
907 msg.set_parse_result(MsgParseResult::Error);
908 MsgParseResult::Error
909 }
910 }
911}
912
913/// Outcome of [`dmsg_process`].
914///
915/// `Bypass` means the header has been recognised as control traffic
916/// and the cluster layer should not pass the message further down
917/// the protocol stack.
918#[derive(Copy, Clone, Debug, Eq, PartialEq)]
919pub enum DmsgDispatch {
920 /// Frame consumed by a control-plane handler.
921 Bypass,
922 /// Frame should continue through the data-plane stack.
923 Forward,
924}
925
926/// Stage 7 dispatcher: classify a parsed [`Dmsg`] as
927/// control-plane traffic the cluster layer should consume directly,
928/// or data-plane traffic that should continue through the protocol
929/// stack.
930///
931/// Stage 10 will extend this with the gossip-message decoders. For
932/// now, only the message-shape side of the dispatch is in place.
933///
934/// # Examples
935///
936/// ```
937/// use dynomite::proto::dnode::{dmsg_process, Dmsg, DmsgDispatch, DmsgType};
938///
939/// let mut d = Dmsg::new();
940/// d.ty = DmsgType::CryptoHandshake;
941/// assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
942///
943/// // Gossip variants other than SYN / SYN_REPLY fall through.
944/// d.ty = DmsgType::GossipShutdown;
945/// assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
946///
947/// d.ty = DmsgType::Req;
948/// assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
949/// ```
950#[must_use]
951pub fn dmsg_process(dmsg: &Dmsg) -> DmsgDispatch {
952 // Dmsg dispatch table: only CRYPTO_HANDSHAKE,
953 // GOSSIP_SYN, and GOSSIP_SYN_REPLY short-circuit; the other
954 // gossip variants (ACK, DIGEST_SYN, DIGEST_ACK, DIGEST_ACK2,
955 // SHUTDOWN) fall through to the default branch and are
956 // forwarded to the cluster handlers (which Stage 10 will wire
957 // up). HANDOFF_CHUNK frames are control-plane traffic for
958 // the explicit handoff coordinator and bypass the data-plane
959 // stack alongside the crypto / gossip handshake variants.
960 match dmsg.ty {
961 DmsgType::CryptoHandshake
962 | DmsgType::GossipSyn
963 | DmsgType::GossipSynReply
964 | DmsgType::HandoffChunk
965 | DmsgType::FtSearchReq
966 | DmsgType::FtSearchRep => DmsgDispatch::Bypass,
967 _ => DmsgDispatch::Forward,
968 }
969}
970
971/// Drain `chain` into a contiguous `Vec<u8>` recycling each chunk
972/// back to `pool`. Useful for tests and for the Stage 9 path that
973/// needs a flat buffer of decrypted payload bytes.
974pub fn flatten_chain(chain: &mut MbufQueue) -> Vec<u8> {
975 let mut out = Vec::with_capacity(chain.total_len());
976 while let Some(buf) = chain.pop_front() {
977 out.extend_from_slice(buf.readable());
978 }
979 out
980}
981
982/// Peer-handshake control payload exchanged on top of a
983/// [`DmsgType::GossipSyn`] frame.
984///
985/// Today the handshake carries the cluster-wide capability
986/// advertisement (see [`crate::cluster::capability`]). Future
987/// fields will be appended as new typed records; older peers
988/// ignore unknown trailing bytes.
989///
990/// # Wire format
991///
992/// ```text
993/// magic(4) = "DHS1"
994/// flags(2) = 0
995/// CapabilityAd (length-prefixed, see
996/// `CapabilityAd::encode` for the exact layout)
997/// ```
998///
999/// All multi-byte integers are little-endian. The encoding uses
1000/// only the standard library; no external codec is pulled in.
1001///
1002/// # Examples
1003///
1004/// ```
1005/// use dynomite::cluster::capability::{CapabilityAd, CapabilityAdEntry};
1006/// use dynomite::proto::dnode::Handshake;
1007/// let ad = CapabilityAd::from_entries(vec![
1008/// CapabilityAdEntry::new("framing".into(), vec![vec![1, 0, 0, 0]]),
1009/// ]);
1010/// let hs = Handshake::new(ad.clone());
1011/// let bytes = hs.encode();
1012/// let back = Handshake::decode(&bytes).unwrap();
1013/// assert_eq!(back.capabilities(), &ad);
1014/// ```
1015#[derive(Clone, Debug, Default, Eq, PartialEq)]
1016pub struct Handshake {
1017 capabilities: crate::cluster::capability::CapabilityAd,
1018}
1019
1020impl Handshake {
1021 /// Magic literal that opens every handshake payload.
1022 pub const MAGIC: [u8; 4] = *b"DHS1";
1023
1024 /// Build a handshake carrying `capabilities`.
1025 #[must_use]
1026 pub fn new(capabilities: crate::cluster::capability::CapabilityAd) -> Self {
1027 Self { capabilities }
1028 }
1029
1030 /// Borrow the embedded capability advertisement.
1031 #[must_use]
1032 pub fn capabilities(&self) -> &crate::cluster::capability::CapabilityAd {
1033 &self.capabilities
1034 }
1035
1036 /// Consume the handshake and return the embedded
1037 /// advertisement.
1038 #[must_use]
1039 pub fn into_capabilities(self) -> crate::cluster::capability::CapabilityAd {
1040 self.capabilities
1041 }
1042
1043 /// Serialise the handshake to a length-prefixed byte
1044 /// stream.
1045 #[must_use]
1046 pub fn encode(&self) -> Vec<u8> {
1047 let cap_bytes = self.capabilities.encode();
1048 let mut out = Vec::with_capacity(Self::MAGIC.len() + 2 + cap_bytes.len());
1049 out.extend_from_slice(&Self::MAGIC);
1050 out.extend_from_slice(&0u16.to_le_bytes()); // flags
1051 out.extend_from_slice(&cap_bytes);
1052 out
1053 }
1054
1055 /// Inverse of [`Handshake::encode`]. Surfaces a typed error
1056 /// when the magic / version is wrong or the embedded
1057 /// advertisement is malformed.
1058 pub fn decode(bytes: &[u8]) -> Result<Self, crate::cluster::capability::CapabilityCodecError> {
1059 use crate::cluster::capability::CapabilityCodecError;
1060 if bytes.len() < Self::MAGIC.len() + 2 {
1061 return Err(CapabilityCodecError::Truncated);
1062 }
1063 if bytes[..Self::MAGIC.len()] != Self::MAGIC {
1064 return Err(CapabilityCodecError::BadMagic);
1065 }
1066 // Flags are reserved; the only currently legal value is
1067 // zero. Any non-zero value is reserved for future use
1068 // and rejected here so older builds fail closed.
1069 let flags_off = Self::MAGIC.len();
1070 let flags = u16::from_le_bytes([bytes[flags_off], bytes[flags_off + 1]]);
1071 if flags != 0 {
1072 return Err(CapabilityCodecError::BadMagic);
1073 }
1074 let cap_bytes = &bytes[flags_off + 2..];
1075 let capabilities = crate::cluster::capability::CapabilityAd::decode(cap_bytes)?;
1076 Ok(Self { capabilities })
1077 }
1078
1079 /// Number of bytes the handshake's fixed-size prefix
1080 /// occupies before the embedded advertisement. Useful in
1081 /// tests that assert the on-the-wire delta.
1082 #[must_use]
1083 pub const fn header_len() -> usize {
1084 Self::MAGIC.len() + 2
1085 }
1086}
1087
1088#[cfg(test)]
1089mod tests {
1090 use super::*;
1091 use crate::io::mbuf::MbufPool;
1092
1093 #[test]
1094 fn parse_simple_req() {
1095 let mut p = DnodeParser::new();
1096 let bytes = b"$2014$ 1 3 0 1 1 *1 d *0\r\n";
1097 match p.step(bytes) {
1098 ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1099 other => panic!("unexpected: {other:?}"),
1100 }
1101 let d = p.take_dmsg();
1102 assert_eq!(d.id, 1);
1103 assert_eq!(d.ty, DmsgType::Req);
1104 assert_eq!(d.flags, 0);
1105 assert_eq!(d.version, 1);
1106 assert!(d.same_dc);
1107 assert_eq!(d.mlen, 1);
1108 assert_eq!(d.data, b"d");
1109 assert_eq!(d.plen, 0);
1110 }
1111
1112 #[test]
1113 fn parse_payload_len() {
1114 let mut p = DnodeParser::new();
1115 let bytes = b"$2014$ 2 3 0 1 1 *1 d *413\r\n";
1116 match p.step(bytes) {
1117 ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1118 other => panic!("unexpected: {other:?}"),
1119 }
1120 assert_eq!(p.dmsg().plen, 413);
1121 }
1122
1123 #[test]
1124 fn parse_three_back_to_back() {
1125 let mut input: Vec<u8> = Vec::new();
1126 input.extend_from_slice(b"$2014$ 1 3 0 1 1 *1 d *0\r\n");
1127 input.extend_from_slice(b"some redis bytes here ignored");
1128 input.extend_from_slice(b"$2014$ 2 3 0 1 1 *1 d *3\r\nABC");
1129 input.extend_from_slice(b"$2014$ 3 3 0 1 1 *1 d *0\r\n");
1130 let mut p = DnodeParser::new();
1131 let mut idx = 0;
1132 let mut count = 0;
1133 while idx < input.len() {
1134 match p.step(&input[idx..]) {
1135 ParseStep::HeaderDone { consumed } => {
1136 let d = p.take_dmsg();
1137 count += 1;
1138 let after_header = idx + consumed;
1139 if count == 1 {
1140 assert_eq!(d.id, 1);
1141 // skip past the redis bytes by scanning for the next '$'
1142 idx = input[after_header..]
1143 .iter()
1144 .position(|&b| b == b'$')
1145 .map_or(input.len(), |n| after_header + n);
1146 } else if count == 2 {
1147 assert_eq!(d.id, 2);
1148 assert_eq!(d.plen, 3);
1149 idx = after_header + d.plen as usize;
1150 } else {
1151 assert_eq!(d.id, 3);
1152 idx = after_header;
1153 }
1154 p.reset();
1155 }
1156 ParseStep::NeedMore { .. } => {
1157 break;
1158 }
1159 ParseStep::Error { consumed } => {
1160 idx += consumed.max(1);
1161 p.reset();
1162 }
1163 }
1164 }
1165 assert_eq!(count, 3);
1166 }
1167
1168 #[test]
1169 fn need_more_when_truncated() {
1170 let mut p = DnodeParser::new();
1171 let prefix = b"$2014$ 1 3 0 1 1 *1 d *";
1172 match p.step(prefix) {
1173 ParseStep::NeedMore { consumed } => assert_eq!(consumed, prefix.len()),
1174 other => panic!("unexpected: {other:?}"),
1175 }
1176 let suffix = b"42\r\n";
1177 match p.step(suffix) {
1178 ParseStep::HeaderDone { consumed } => assert_eq!(consumed, suffix.len()),
1179 other => panic!("unexpected: {other:?}"),
1180 }
1181 assert_eq!(p.take_dmsg().plen, 42);
1182 }
1183
1184 #[test]
1185 fn parse_error_on_garbage_prefix() {
1186 let mut p = DnodeParser::new();
1187 match p.step(b"!nope") {
1188 ParseStep::Error { consumed } => assert_eq!(consumed, 0),
1189 other => panic!("unexpected: {other:?}"),
1190 }
1191 }
1192
1193 #[test]
1194 fn writer_round_trip_unencrypted() {
1195 let pool = MbufPool::default();
1196 let mut buf = pool.get();
1197 dmsg_write(&mut buf, 42, DmsgType::Req, 0, true, None, 0).unwrap();
1198 let bytes = buf.readable().to_vec();
1199 let mut p = DnodeParser::new();
1200 let step = p.step(&bytes);
1201 assert!(matches!(step, ParseStep::HeaderDone { .. }));
1202 let d = p.take_dmsg();
1203 assert_eq!(d.id, 42);
1204 assert_eq!(d.ty, DmsgType::Req);
1205 assert_eq!(d.flags, 0);
1206 assert!(d.same_dc);
1207 assert_eq!(d.mlen, 1);
1208 assert_eq!(d.data, b"d");
1209 assert_eq!(d.plen, 0);
1210 }
1211
1212 #[test]
1213 fn writer_round_trip_with_aes_payload() {
1214 let pool = MbufPool::default();
1215 let mut buf = pool.get();
1216 let payload = vec![0xAB; 128];
1217 dmsg_write(
1218 &mut buf,
1219 7,
1220 DmsgType::CryptoHandshake,
1221 DMSG_FLAG_ENCRYPTED,
1222 false,
1223 Some(&payload),
1224 512,
1225 )
1226 .unwrap();
1227 let bytes = buf.readable().to_vec();
1228 let mut p = DnodeParser::new();
1229 match p.step(&bytes) {
1230 ParseStep::HeaderDone { consumed } => assert_eq!(consumed, bytes.len()),
1231 other => panic!("unexpected: {other:?}"),
1232 }
1233 let d = p.take_dmsg();
1234 assert_eq!(d.id, 7);
1235 assert_eq!(d.ty, DmsgType::CryptoHandshake);
1236 assert!(d.is_encrypted());
1237 assert!(!d.same_dc);
1238 assert_eq!(d.data, payload);
1239 assert_eq!(d.plen, 512);
1240 }
1241
1242 #[test]
1243 fn dispatcher_classifies_control_plane() {
1244 let mut d = Dmsg::new();
1245 // Pin the exact three variants the C `dmsg_process`
1246 // bypasses.
1247 for ty in [
1248 DmsgType::CryptoHandshake,
1249 DmsgType::GossipSyn,
1250 DmsgType::GossipSynReply,
1251 ] {
1252 d.ty = ty;
1253 assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1254 }
1255 // Every other gossip variant falls through to the default
1256 // branch (forward), matching the C switch.
1257 for ty in [
1258 DmsgType::GossipAck,
1259 DmsgType::GossipDigestSyn,
1260 DmsgType::GossipDigestAck,
1261 DmsgType::GossipDigestAck2,
1262 DmsgType::GossipShutdown,
1263 DmsgType::Req,
1264 DmsgType::ReqForward,
1265 DmsgType::Res,
1266 ] {
1267 d.ty = ty;
1268 assert_eq!(dmsg_process(&d), DmsgDispatch::Forward);
1269 }
1270 // HandoffChunk routes to the explicit handoff coordinator
1271 // and is therefore bypassed alongside the handshake
1272 // variants.
1273 d.ty = DmsgType::HandoffChunk;
1274 assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1275 // FT.SEARCH coordinator messages are routed to the
1276 // dedicated query-fsm coordinator via the same
1277 // bypass path used by the handoff coordinator.
1278 for ty in [DmsgType::FtSearchReq, DmsgType::FtSearchRep] {
1279 d.ty = ty;
1280 assert_eq!(dmsg_process(&d), DmsgDispatch::Bypass);
1281 }
1282 }
1283}