Skip to main content

net/adapter/net/redex/
replication.rs

1//! RedEX Distributed wire protocol — `SUBPROTOCOL_REDEX` and the four
2//! `DISPATCH_REPLICA_SYNC` codes that ride on top of the existing
3//! reliable-stream `Mesh::publish` machinery.
4//!
5//! Phase A scaffold of `docs/plans/REDEX_DISTRIBUTED_PLAN.md`. Implements
6//! the byte layouts pinned in §2 of that plan:
7//!
8//! - `SyncRequest`   (`0x20`, replica → leader)  — 47 bytes, fixed.
9//! - `SyncResponse`  (`0x21`, leader → replica) — variable; bounded by
10//!   the matching request's `chunk_max`.
11//! - `SyncHeartbeat` (`0x22`, bidirectional)    — 52 bytes, fixed.
12//! - `SyncNack`      (`0x23`, leader → replica) — variable; carries
13//!   an optional UTF-8 diagnostic.
14//!
15//! Encoding conventions (LOCKED, mirroring §2 of the plan):
16//!
17//! - Multi-byte integers are **little-endian, fixed-width** — no varints.
18//! - The standard subprotocol header (`subprotocol_id: u16 LE` +
19//!   `dispatch_code: u8`) prefixes every message — 3 bytes.
20//! - `ChannelId` is the 32-byte BLAKE2s hash of the channel name.
21//! - Length-prefixed strings: `(u16 LE len, [len] utf-8 bytes)`.
22//! - Range encoding (used by future reserved variants): `(u64 LE start,
23//!   u64 LE end)`, half-open `[start, end)`.
24//!
25//! Election is wire-free — `StandbyGroup` invokes RedEX's deterministic
26//! `elect()` selection function from local state, so no `LEADER_ELECTION`
27//! dispatch code exists. Reserved range `0x24..=0x2F` (12 codes) is held
28//! for future variants (range-bounded sync, parallel-stream sync, etc.).
29//!
30//! Codec layer only — daemon, heartbeat loop, election integration, and
31//! `ReplicationCoordinator` itself land in Phases C / D / E.
32
33use blake2::{
34    digest::{generic_array::typenum::U32, FixedOutput, KeyInit, Mac},
35    Blake2sMac,
36};
37use bytes::{Buf, BufMut};
38
39use super::super::channel::ChannelName;
40
41/// Subprotocol ID for RedEX Distributed replication. Claims `0x0E00`
42/// in the `SUBPROTOCOLS.md` registry; the high byte (`0x0E`) is the
43/// next free family above capability (`0x0C`) and reflex (`0x0D`).
44pub const SUBPROTOCOL_REDEX: u16 = 0x0E00;
45
46/// Replica → leader: ask for events `[since_seq, since_seq + chunk_max)`.
47pub const DISPATCH_SYNC_REQUEST: u8 = 0x20;
48/// Leader → replica: bounded chunk of events.
49pub const DISPATCH_SYNC_RESPONSE: u8 = 0x21;
50/// Bidirectional liveness + tail-seq heartbeat.
51pub const DISPATCH_SYNC_HEARTBEAT: u8 = 0x22;
52/// Leader → replica: structured rejection (typed `error_code`).
53pub const DISPATCH_SYNC_NACK: u8 = 0x23;
54
55/// Reserved range upper bound (exclusive) for future
56/// `DISPATCH_REPLICA_SYNC` variants. `0x24..0x2F` is reserved for
57/// range-bounded sync, parallel-stream sync, etc.; document each new
58/// code in `SUBPROTOCOLS.md` as it lands.
59pub const DISPATCH_REPLICA_SYNC_RESERVED_END: u8 = 0x30;
60
61/// Fixed encoded size of a v0.2 [`SyncRequest`] message including
62/// the 3-byte subprotocol header. Legacy peers serialize at this
63/// size; v0.3 Phase D adds one trailing class byte (see
64/// [`SYNC_REQUEST_SIZE_V2_CLASS`]). The decoder accepts both
65/// sizes — frames missing the trailing byte decode as
66/// [`BandwidthClass::Foreground`](super::bandwidth::BandwidthClass::Foreground).
67pub const SYNC_REQUEST_SIZE: usize = 3 + 32 + 8 + 4 + 8; // 55
68
69/// v0.3 Phase D encoded size — `SYNC_REQUEST_SIZE` plus one byte
70/// for the wire-encoded [`BandwidthClass`](super::bandwidth::BandwidthClass).
71/// New senders always emit this size; new readers accept both
72/// sizes (legacy 55-byte frames degrade to `Foreground`).
73pub const SYNC_REQUEST_SIZE_V2_CLASS: usize = SYNC_REQUEST_SIZE + 1; // 56
74
75/// Fixed encoded size of a [`SyncHeartbeat`] message including the
76/// 3-byte subprotocol header.
77pub const SYNC_HEARTBEAT_SIZE: usize = 3 + 32 + 8 + 1 + 8; // 52
78
79/// Domain-separation label for the BLAKE2s hash that turns a channel
80/// name into a 32-byte `ChannelId`. Picked once, frozen — changing it
81/// would invalidate every `ChannelId` on the wire.
82const CHANNEL_ID_LABEL: &[u8] = b"redex-channel-id-v1";
83
84/// 32-byte channel identifier — BLAKE2s of the channel name with a
85/// domain-separation label. Distinct from `ChannelName::hash() -> u16`
86/// (the routing hint), which has routine collisions at mesh scale.
87/// The replication protocol needs an identifier with negligible
88/// collision probability so two channels can't accidentally observe
89/// each other's heartbeats.
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
91pub struct ChannelId([u8; 32]);
92
93impl ChannelId {
94    /// Compute the `ChannelId` for a channel name.
95    pub fn from_name(name: &ChannelName) -> Self {
96        Self::from_str_internal(name.as_str())
97    }
98
99    /// Internal helper so tests can hash literal strings without
100    /// constructing a [`ChannelName`].
101    #[expect(
102        clippy::expect_used,
103        reason = "Blake2sMac::new_from_slice rejects only keys longer than 32 bytes; CHANNEL_ID_LABEL is a short compile-time-constant label"
104    )]
105    fn from_str_internal(s: &str) -> Self {
106        let mut mac = <Blake2sMac<U32> as KeyInit>::new_from_slice(CHANNEL_ID_LABEL)
107            .expect("BLAKE2s accepts variable-length keys");
108        Mac::update(&mut mac, s.as_bytes());
109        let bytes = mac.finalize_fixed();
110        let mut out = [0u8; 32];
111        out.copy_from_slice(&bytes);
112        Self(out)
113    }
114
115    /// Construct from raw bytes — used by the decode path.
116    pub const fn from_bytes(bytes: [u8; 32]) -> Self {
117        Self(bytes)
118    }
119
120    /// Borrow the 32-byte representation.
121    pub fn as_bytes(&self) -> &[u8; 32] {
122        &self.0
123    }
124}
125
126/// `ReplicaState` discriminator carried on the wire in
127/// [`SyncHeartbeat`] messages. The four-state model is pinned at §3 of
128/// the plan; this enum is the encoding view of those states.
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum ReplicaRole {
131    /// Sole appender for the channel.
132    Leader = 0,
133    /// Catching up or steady-state lagging by ≤ 1 heartbeat.
134    Replica = 1,
135    /// Brief transient: leader-loss detected, computing the
136    /// deterministic election winner. Microseconds; not a broadcast wait.
137    Candidate = 2,
138    /// Holds the channel's storage but has no replica role.
139    Idle = 3,
140}
141
142impl ReplicaRole {
143    fn from_wire(byte: u8) -> Option<Self> {
144        match byte {
145            0 => Some(Self::Leader),
146            1 => Some(Self::Replica),
147            2 => Some(Self::Candidate),
148            3 => Some(Self::Idle),
149            _ => None,
150        }
151    }
152
153    fn to_wire(self) -> u8 {
154        self as u8
155    }
156}
157
158/// Typed rejection error in [`SyncNack`]. Replicas key their retry
159/// policy on the variant — never silently treat as transport-level
160/// failure (the reliable-stream layer surfaces transport errors
161/// separately).
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub enum SyncNackError {
164    /// Receiver is not the leader for this channel. Replica should
165    /// re-resolve leadership via `Mesh::find_chain_holders`.
166    NotLeader = 1,
167    /// `since_seq` lies outside the leader's retained range. Replica
168    /// should trim its local tail and retry from the leader's first
169    /// available seq.
170    BadRange = 2,
171    /// Leader is currently saturated. Replica should exponentially
172    /// back off and retry the same request.
173    Backpressure = 3,
174    /// Channel was closed. Replica withdraws its role and emits a
175    /// metric.
176    ChannelClosed = 4,
177}
178
179impl SyncNackError {
180    fn from_wire(byte: u8) -> Option<Self> {
181        match byte {
182            1 => Some(Self::NotLeader),
183            2 => Some(Self::BadRange),
184            3 => Some(Self::Backpressure),
185            4 => Some(Self::ChannelClosed),
186            _ => None,
187        }
188    }
189
190    fn to_wire(self) -> u8 {
191        self as u8
192    }
193}
194
195/// One event record inside a [`SyncResponse`] chunk. `event_seq`
196/// values are strictly increasing across a chunk; gaps within a chunk
197/// are not permitted (gaps come as explicit skip-ahead in Phase D).
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub struct SyncEvent {
200    /// Monotonic sequence number assigned by the channel's leader.
201    pub event_seq: u64,
202    /// Opaque event body bytes — the layer-7 payload.
203    pub payload: Vec<u8>,
204}
205
206/// Replica → leader: pull request for events
207/// `[since_seq, since_seq + chunk_max)`. 55 bytes (v0.2 legacy)
208/// or 56 bytes (v0.3 Phase D with trailing class byte).
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct SyncRequest {
211    /// 32-byte BLAKE2s hash of the channel name.
212    pub channel_id: ChannelId,
213    /// First sequence number the replica wants from the leader's
214    /// retained range. Inclusive.
215    pub since_seq: u64,
216    /// Maximum payload bytes the leader may send in the matching
217    /// [`SyncResponse`].
218    pub chunk_max: u32,
219    /// Replica-minted correlation token. The leader echoes it
220    /// verbatim on the matching `SyncResponse` / `SyncNack`. The
221    /// replica's `OutstandingRequests` set holds the minted token
222    /// until the response lands; responses or NACKs whose token
223    /// is not in the set are dropped. Random 64-bit value drawn
224    /// from `getrandom` per request so a stale NACK from a prior
225    /// epoch (or any peer that observed wire traffic) cannot match
226    /// an in-flight request the replica is currently waiting on.
227    pub request_id: u64,
228    /// v0.3 Phase D per-request bandwidth-class hint. Receiver-
229    /// stamped; the leader's admission gate honors this in
230    /// preference to the channel's
231    /// [`ReplicationConfig::default_bandwidth_class`](super::replication_config::ReplicationConfig).
232    /// Encoded as a 1-byte trailing field on the wire; legacy
233    /// 55-byte frames omit it and decode as
234    /// [`BandwidthClass::Foreground`](super::bandwidth::BandwidthClass::Foreground).
235    pub class: super::bandwidth::BandwidthClass,
236}
237
238/// Leader → replica: bounded chunk of events answering the matching
239/// [`SyncRequest`]. Variable size; bounded by `chunk_max` from the
240/// request side.
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct SyncResponse {
243    /// 32-byte BLAKE2s hash of the channel name.
244    pub channel_id: ChannelId,
245    /// Sequence number of `events[0]` in this chunk. Replicas use
246    /// this to detect server-side trimming (`first_seq` greater than
247    /// the request's `since_seq` means the leader no longer retains
248    /// the requested range).
249    pub first_seq: u64,
250    /// **R-5 disambiguation:** leader's first retained seq at the
251    /// time of this response. Lets the replica tell a legitimate
252    /// retention trim (`first_seq == leader_first_retained_seq`) from
253    /// a divergent-log split-brain (`first_seq >
254    /// leader_first_retained_seq` AND replica's local tail had data
255    /// in `[leader_first_retained_seq, first_seq)`). The replica
256    /// still does the skip-ahead in both cases (safety) but
257    /// observability-wise the divergence case is flagged with a
258    /// distinct metric for operator review.
259    pub leader_first_retained_seq: u64,
260    /// Echo of the matching `SyncRequest::request_id`. Replicas
261    /// drop responses whose token is not in their in-flight set
262    /// (per-leader pending state). Without this, a stale response
263    /// from a prior request the replica had already timed out and
264    /// re-issued would land on the current request's apply path.
265    pub request_id: u64,
266    /// In-order event records. `event_seq` increases monotonically
267    /// across the slice; no gaps within a chunk.
268    pub events: Vec<SyncEvent>,
269}
270
271/// Bidirectional liveness heartbeat. Leader emits these to all
272/// replicas at `heartbeat_ms` cadence; replicas emit their own
273/// `tail_seq` back to the leader so the leader can observe lag.
274#[derive(Debug, Clone, PartialEq, Eq)]
275pub struct SyncHeartbeat {
276    /// 32-byte BLAKE2s hash of the channel name.
277    pub channel_id: ChannelId,
278    /// Sender's current tail sequence number.
279    pub tail_seq: u64,
280    /// Sender's `ReplicaState` — operator-facing observability only;
281    /// receivers don't make routing decisions on this field (those
282    /// route through the capability layer's `causal:` tags).
283    pub role: ReplicaRole,
284    /// Sender's monotonic-clock milliseconds. Used **only** for drift
285    /// detection (operator-facing); never consumed for ordering or
286    /// liveness logic — those route through `tail_seq` + reliable-
287    /// stream ack accounting.
288    pub wall_clock_ms: u64,
289}
290
291/// Leader → replica: structured rejection. The leader MUST emit this
292/// (rather than silently closing the stream) on every rejection
293/// reason that isn't a transport-level failure — silent close is
294/// reserved for the latter.
295#[derive(Debug, Clone, PartialEq, Eq)]
296pub struct SyncNack {
297    /// 32-byte BLAKE2s hash of the channel name.
298    pub channel_id: ChannelId,
299    /// Echoes the rejected request's `since_seq` so the replica can
300    /// correlate the NACK with the in-flight request that triggered
301    /// it.
302    pub since_seq: u64,
303    /// Typed rejection reason. Replicas key their retry policy here.
304    pub error_code: SyncNackError,
305    /// Leader's first-retained sequence at the time of the reject.
306    /// Populated by the leader on `BadRange` so the replica can
307    /// `skip_to(leader_first_retained_seq)` in one round trip
308    /// instead of advancing by one per `BadRange` cycle. Set to
309    /// `0` on other error codes; receivers ignore it outside the
310    /// `BadRange` arm.
311    pub leader_first_retained_seq: u64,
312    /// Echo of the matching `SyncRequest::request_id`. Replicas
313    /// drop NACKs whose token is not in their in-flight set.
314    /// Without this, a stale `BadRange` NACK from a prior request
315    /// the replica had timed out and re-issued would still drive
316    /// `skip_to` on the local file — destructive on retry.
317    pub request_id: u64,
318    /// Optional human-readable diagnostic. UTF-8 encoded; may be
319    /// empty. The replica's retry policy keys off `error_code` only —
320    /// `detail` is for operator logs.
321    pub detail: String,
322}
323
324/// Errors surfacing from the decode path. Mirrors the typed-error
325/// shape the rest of the substrate uses for fallible decoders.
326#[derive(Debug, thiserror::Error, PartialEq, Eq)]
327pub enum WireError {
328    /// Buffer is shorter than the encoded message demands.
329    #[error("redex wire truncated: need {need} bytes, have {have}")]
330    Truncated {
331        /// Minimum bytes the decoder needed to make progress.
332        need: usize,
333        /// Bytes actually available in the input.
334        have: usize,
335    },
336    /// Subprotocol header doesn't match `SUBPROTOCOL_REDEX`.
337    #[error("redex wire subprotocol mismatch: got {got:#06x}, expected {SUBPROTOCOL_REDEX:#06x}")]
338    SubprotocolMismatch {
339        /// Subprotocol id observed in the header.
340        got: u16,
341    },
342    /// Dispatch code doesn't match the decoder being invoked, or
343    /// falls outside the reserved `0x20..=0x2F` range entirely.
344    #[error("redex wire dispatch code {got:#04x} does not match expected {expected:#04x}")]
345    DispatchMismatch {
346        /// Dispatch byte observed in the header.
347        got: u8,
348        /// Dispatch byte the decoder being invoked is keyed on.
349        expected: u8,
350    },
351    /// `role` byte in a [`SyncHeartbeat`] is outside the `0..=3`
352    /// range the four-state model pins.
353    #[error("redex wire role byte {0} is not a valid ReplicaRole (0..=3)")]
354    BadRole(u8),
355    /// `error_code` byte in a [`SyncNack`] is outside the `1..=4`
356    /// range the typed-error variants pin.
357    #[error("redex wire error_code {0} is not a valid SyncNackError (1..=4)")]
358    BadErrorCode(u8),
359    /// `detail` bytes in a [`SyncNack`] are not valid UTF-8.
360    #[error("redex wire NACK detail is not valid UTF-8")]
361    InvalidUtf8,
362}
363
364/// Write the standard 3-byte subprotocol header
365/// (`SUBPROTOCOL_REDEX` + `dispatch_code`) to `buf`.
366fn put_header(buf: &mut Vec<u8>, dispatch: u8) {
367    buf.put_u16_le(SUBPROTOCOL_REDEX);
368    buf.put_u8(dispatch);
369}
370
371/// Validate the standard 3-byte subprotocol header on `data` and
372/// return the remaining payload slice. Errors on truncation,
373/// subprotocol mismatch, or dispatch-code mismatch.
374fn check_header(data: &[u8], expected_dispatch: u8) -> Result<&[u8], WireError> {
375    if data.len() < 3 {
376        return Err(WireError::Truncated {
377            need: 3,
378            have: data.len(),
379        });
380    }
381    let mut cursor = &data[..3];
382    let subprotocol = cursor.get_u16_le();
383    let dispatch = cursor.get_u8();
384    if subprotocol != SUBPROTOCOL_REDEX {
385        return Err(WireError::SubprotocolMismatch { got: subprotocol });
386    }
387    if dispatch != expected_dispatch {
388        return Err(WireError::DispatchMismatch {
389            got: dispatch,
390            expected: expected_dispatch,
391        });
392    }
393    Ok(&data[3..])
394}
395
396/// Read a `ChannelId` from `cursor`. Caller is responsible for
397/// ensuring `cursor.remaining() >= 32`.
398fn get_channel_id(cursor: &mut &[u8]) -> ChannelId {
399    let mut id = [0u8; 32];
400    id.copy_from_slice(&cursor[..32]);
401    cursor.advance(32);
402    ChannelId::from_bytes(id)
403}
404
405// ============================================================================
406// SyncRequest — 0x20, replica → leader
407// ============================================================================
408
409impl SyncRequest {
410    /// Serialize to bytes. v0.3 Phase D senders always emit
411    /// [`SYNC_REQUEST_SIZE_V2_CLASS`] (56 bytes) — the trailing
412    /// class byte is the only delta from the v0.2 layout.
413    pub fn to_bytes(&self) -> Vec<u8> {
414        let mut buf = Vec::with_capacity(SYNC_REQUEST_SIZE_V2_CLASS);
415        put_header(&mut buf, DISPATCH_SYNC_REQUEST);
416        buf.put_slice(self.channel_id.as_bytes());
417        buf.put_u64_le(self.since_seq);
418        buf.put_u32_le(self.chunk_max);
419        buf.put_u64_le(self.request_id);
420        buf.put_u8(self.class.as_u8());
421        debug_assert_eq!(buf.len(), SYNC_REQUEST_SIZE_V2_CLASS);
422        buf
423    }
424
425    /// Deserialize from bytes. Accepts both v0.2 (55 bytes) and
426    /// v0.3 Phase D (56 bytes) frames — a v0.2 frame missing
427    /// the trailing class byte decodes as
428    /// [`BandwidthClass::Foreground`](super::bandwidth::BandwidthClass::Foreground)
429    /// for backward compat. An unknown class discriminant value
430    /// (forward-compat) also decodes as `Foreground` — see
431    /// [`BandwidthClass::from_wire_or_default`](super::bandwidth::BandwidthClass::from_wire_or_default).
432    pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
433        let payload = check_header(data, DISPATCH_SYNC_REQUEST)?;
434        if payload.len() < SYNC_REQUEST_SIZE - 3 {
435            return Err(WireError::Truncated {
436                need: SYNC_REQUEST_SIZE,
437                have: data.len(),
438            });
439        }
440        let mut cursor = payload;
441        let channel_id = get_channel_id(&mut cursor);
442        let since_seq = cursor.get_u64_le();
443        let chunk_max = cursor.get_u32_le();
444        let request_id = cursor.get_u64_le();
445        // Trailing class byte (v0.3+ Phase D). Absent on legacy
446        // 55-byte frames; defaults to Foreground.
447        let class = if cursor.has_remaining() {
448            super::bandwidth::BandwidthClass::from_wire_or_default(cursor.get_u8())
449        } else {
450            super::bandwidth::BandwidthClass::default()
451        };
452        Ok(Self {
453            channel_id,
454            since_seq,
455            chunk_max,
456            request_id,
457            class,
458        })
459    }
460}
461
462// ============================================================================
463// SyncResponse — 0x21, leader → replica
464// ============================================================================
465
466impl SyncResponse {
467    /// Serialize to bytes. Variable size: header + 32 + 8 + 8 + 8 +
468    /// 4 + Σ(8 + 4 + payload.len()) over events.
469    /// (R-23: added 8 bytes for `request_id` echo.)
470    pub fn to_bytes(&self) -> Vec<u8> {
471        // Cap the pre-allocation against `u32::MAX` — `event_count`
472        // is the wire-format width, so we can't honestly encode
473        // more than `u32::MAX` events anyway, and on 32-bit hosts
474        // the multiplication below would overflow `usize` otherwise.
475        let events_size: usize = self.events.iter().map(|e| 8 + 4 + e.payload.len()).sum();
476        let mut buf = Vec::with_capacity(3 + 32 + 8 + 8 + 8 + 4 + events_size);
477        put_header(&mut buf, DISPATCH_SYNC_RESPONSE);
478        buf.put_slice(self.channel_id.as_bytes());
479        buf.put_u64_le(self.first_seq);
480        buf.put_u64_le(self.leader_first_retained_seq);
481        buf.put_u64_le(self.request_id);
482        // `events.len()` wider than u32::MAX is impossible to
483        // represent on the wire — clamp via saturating cast. In
484        // practice callers honor `chunk_max` (bounded u32) so the
485        // saturation is dead code, but stay safe.
486        debug_assert!(
487            self.events.len() <= u32::MAX as usize,
488            "events.len() {} exceeds u32::MAX",
489            self.events.len()
490        );
491        let event_count = u32::try_from(self.events.len()).unwrap_or(u32::MAX);
492        buf.put_u32_le(event_count);
493        for event in &self.events {
494            buf.put_u64_le(event.event_seq);
495            debug_assert!(event.payload.len() <= u32::MAX as usize);
496            let payload_len = u32::try_from(event.payload.len()).unwrap_or(u32::MAX);
497            buf.put_u32_le(payload_len);
498            buf.put_slice(&event.payload);
499        }
500        buf
501    }
502
503    /// Deserialize from bytes. Errors on truncation or header
504    /// mismatch. Validates each event-record's length prefix
505    /// against the remaining buffer so a malformed `payload_len`
506    /// can't trigger a panic.
507    pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
508        let payload = check_header(data, DISPATCH_SYNC_RESPONSE)?;
509        let prefix_needed = 32 + 8 + 8 + 8 + 4;
510        if payload.len() < prefix_needed {
511            return Err(WireError::Truncated {
512                need: 3 + prefix_needed,
513                have: data.len(),
514            });
515        }
516        let mut cursor = payload;
517        let channel_id = get_channel_id(&mut cursor);
518        let first_seq = cursor.get_u64_le();
519        let leader_first_retained_seq = cursor.get_u64_le();
520        let request_id = cursor.get_u64_le();
521        let event_count = cursor.get_u32_le() as usize;
522        // R-36: cap the pre-allocation at 4096 events to bound a
523        // hostile `event_count` (e.g. peer sending a maximum-u32
524        // count without the matching payload bytes). Legitimate
525        // chunks above 4096 events incur progressive grow-and-
526        // copy, but the byte budget (`chunk_max` ≤ 64 MiB)
527        // means an over-4096-event chunk averages payload <
528        // 16 KiB / event, which is comfortably small.
529        let mut events = Vec::with_capacity(event_count.min(4096));
530        for _ in 0..event_count {
531            if cursor.remaining() < 8 + 4 {
532                // Report total bytes needed correctly —
533                // consumed-so-far + still-needed.
534                let consumed = data.len() - cursor.remaining();
535                return Err(WireError::Truncated {
536                    need: consumed + (8 + 4),
537                    have: data.len(),
538                });
539            }
540            let event_seq = cursor.get_u64_le();
541            let payload_len = cursor.get_u32_le() as usize;
542            if cursor.remaining() < payload_len {
543                let consumed = data.len() - cursor.remaining();
544                // checked_add — on 32-bit targets a u32 payload_len
545                // close to u32::MAX would overflow the plain `+`
546                // expression (consumed ≥ payload_len lower-bound
547                // doesn't shrink the danger window). Same fix shape
548                // as the umbrella's L-9 for BufferedEvents.
549                let need = consumed
550                    .checked_add(payload_len)
551                    .ok_or(WireError::Truncated {
552                        need: usize::MAX,
553                        have: data.len(),
554                    })?;
555                return Err(WireError::Truncated {
556                    need,
557                    have: data.len(),
558                });
559            }
560            let event_payload = cursor[..payload_len].to_vec();
561            cursor.advance(payload_len);
562            events.push(SyncEvent {
563                event_seq,
564                payload: event_payload,
565            });
566        }
567        Ok(Self {
568            channel_id,
569            first_seq,
570            leader_first_retained_seq,
571            request_id,
572            events,
573        })
574    }
575}
576
577// ============================================================================
578// SyncHeartbeat — 0x22, bidirectional
579// ============================================================================
580
581impl SyncHeartbeat {
582    /// Serialize to bytes. Fixed [`SYNC_HEARTBEAT_SIZE`] (52) bytes.
583    pub fn to_bytes(&self) -> Vec<u8> {
584        let mut buf = Vec::with_capacity(SYNC_HEARTBEAT_SIZE);
585        put_header(&mut buf, DISPATCH_SYNC_HEARTBEAT);
586        buf.put_slice(self.channel_id.as_bytes());
587        buf.put_u64_le(self.tail_seq);
588        buf.put_u8(self.role.to_wire());
589        buf.put_u64_le(self.wall_clock_ms);
590        debug_assert_eq!(buf.len(), SYNC_HEARTBEAT_SIZE);
591        buf
592    }
593
594    /// Deserialize from bytes. Errors on truncation, header
595    /// mismatch, or `role` byte outside `0..=3`.
596    pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
597        let payload = check_header(data, DISPATCH_SYNC_HEARTBEAT)?;
598        if payload.len() < SYNC_HEARTBEAT_SIZE - 3 {
599            return Err(WireError::Truncated {
600                need: SYNC_HEARTBEAT_SIZE,
601                have: data.len(),
602            });
603        }
604        let mut cursor = payload;
605        let channel_id = get_channel_id(&mut cursor);
606        let tail_seq = cursor.get_u64_le();
607        let role_byte = cursor.get_u8();
608        let role = ReplicaRole::from_wire(role_byte).ok_or(WireError::BadRole(role_byte))?;
609        let wall_clock_ms = cursor.get_u64_le();
610        Ok(Self {
611            channel_id,
612            tail_seq,
613            role,
614            wall_clock_ms,
615        })
616    }
617}
618
619// ============================================================================
620// SyncNack — 0x23, leader → replica
621// ============================================================================
622
623/// Maximum permitted length of a [`SyncNack::detail`] string on the
624/// wire. The `detail_len` field is u16 LE, so the absolute ceiling
625/// is `u16::MAX`; this constant matches that and lives here so
626/// callers can opt to truncate diagnostic text rather than failing
627/// the encode.
628pub const SYNC_NACK_DETAIL_MAX: usize = u16::MAX as usize;
629
630impl SyncNack {
631    /// Serialize to bytes. Variable size: header + 32 + 8 + 1 + 8 +
632    /// 8 + 2 + detail.len(). Truncates `detail` to
633    /// [`SYNC_NACK_DETAIL_MAX`] if longer — the protocol can't
634    /// represent a longer string and silently truncating the
635    /// diagnostic is preferable to losing the structured error
636    /// code entirely.
637    pub fn to_bytes(&self) -> Vec<u8> {
638        // Truncate `detail` at a UTF-8 char boundary ≤ the wire cap.
639        // A byte-aligned cut can split a multi-byte codepoint,
640        // producing invalid UTF-8 that the decoder's `from_utf8`
641        // check then rejects — losing the structured error code.
642        let detail_str = if self.detail.len() <= SYNC_NACK_DETAIL_MAX {
643            self.detail.as_str()
644        } else {
645            let mut cut = SYNC_NACK_DETAIL_MAX;
646            while cut > 0 && !self.detail.is_char_boundary(cut) {
647                cut -= 1;
648            }
649            &self.detail[..cut]
650        };
651        let detail_bytes = detail_str.as_bytes();
652        let detail_len = detail_bytes.len();
653        let mut buf = Vec::with_capacity(3 + 32 + 8 + 1 + 8 + 8 + 2 + detail_len);
654        put_header(&mut buf, DISPATCH_SYNC_NACK);
655        buf.put_slice(self.channel_id.as_bytes());
656        buf.put_u64_le(self.since_seq);
657        buf.put_u8(self.error_code.to_wire());
658        buf.put_u64_le(self.leader_first_retained_seq);
659        buf.put_u64_le(self.request_id);
660        buf.put_u16_le(detail_len as u16);
661        buf.put_slice(detail_bytes);
662        buf
663    }
664
665    /// Deserialize from bytes. Errors on truncation, header
666    /// mismatch, `error_code` outside `1..=4`, or non-UTF-8 detail.
667    pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
668        let payload = check_header(data, DISPATCH_SYNC_NACK)?;
669        let prefix_needed = 32 + 8 + 1 + 8 + 8 + 2;
670        if payload.len() < prefix_needed {
671            return Err(WireError::Truncated {
672                need: 3 + prefix_needed,
673                have: data.len(),
674            });
675        }
676        let mut cursor = payload;
677        let channel_id = get_channel_id(&mut cursor);
678        let since_seq = cursor.get_u64_le();
679        let code_byte = cursor.get_u8();
680        let error_code =
681            SyncNackError::from_wire(code_byte).ok_or(WireError::BadErrorCode(code_byte))?;
682        let leader_first_retained_seq = cursor.get_u64_le();
683        let request_id = cursor.get_u64_le();
684        let detail_len = cursor.get_u16_le() as usize;
685        if cursor.remaining() < detail_len {
686            // Report total bytes needed correctly: consumed-so-far +
687            // still-needed. The previous formula double-counted the
688            // consumed prefix.
689            let consumed = data.len() - cursor.remaining();
690            return Err(WireError::Truncated {
691                need: consumed + detail_len,
692                have: data.len(),
693            });
694        }
695        let detail_bytes = &cursor[..detail_len];
696        let detail = std::str::from_utf8(detail_bytes)
697            .map_err(|_| WireError::InvalidUtf8)?
698            .to_string();
699        Ok(Self {
700            channel_id,
701            since_seq,
702            error_code,
703            leader_first_retained_seq,
704            request_id,
705            detail,
706        })
707    }
708}
709
710#[cfg(test)]
711mod tests {
712    use super::*;
713
714    fn sample_channel_id() -> ChannelId {
715        ChannelId::from_str_internal("net/redex/example")
716    }
717
718    // ----------------------------------------------------------------
719    // ChannelId
720    // ----------------------------------------------------------------
721
722    #[test]
723    fn channel_id_is_deterministic() {
724        let a = ChannelId::from_str_internal("payments/settlements");
725        let b = ChannelId::from_str_internal("payments/settlements");
726        assert_eq!(a, b);
727    }
728
729    #[test]
730    fn channel_id_is_unique_per_name() {
731        let a = ChannelId::from_str_internal("payments/settlements");
732        let b = ChannelId::from_str_internal("payments/refunds");
733        assert_ne!(a, b);
734    }
735
736    // ----------------------------------------------------------------
737    // SyncRequest round-trip
738    // ----------------------------------------------------------------
739
740    #[test]
741    fn sync_request_round_trip() {
742        let original = SyncRequest {
743            channel_id: sample_channel_id(),
744            since_seq: 0xDEAD_BEEF_CAFE_BABE,
745            chunk_max: 1_048_576,
746            request_id: 0xAA55_AA55_AA55_AA55,
747            class: super::super::bandwidth::BandwidthClass::Background,
748        };
749        let bytes = original.to_bytes();
750        // v0.3 Phase D2: new senders emit the v2 56-byte frame
751        // (trailing class byte). Legacy 55-byte frames decode
752        // cleanly too — covered by
753        // `sync_request_decodes_legacy_55_byte_frame_as_foreground`
754        // below.
755        assert_eq!(bytes.len(), SYNC_REQUEST_SIZE_V2_CLASS);
756        let decoded = SyncRequest::from_bytes(&bytes).expect("decode");
757        assert_eq!(decoded, original);
758    }
759
760    /// v0.3 Phase D2 backward-compat: a legacy 55-byte
761    /// SyncRequest (no trailing class byte) decodes cleanly with
762    /// `class = BandwidthClass::Foreground` defaulted in.
763    #[test]
764    fn sync_request_decodes_legacy_55_byte_frame_as_foreground() {
765        let original = SyncRequest {
766            channel_id: sample_channel_id(),
767            since_seq: 7,
768            chunk_max: 1024,
769            request_id: 0xBEEF,
770            class: super::super::bandwidth::BandwidthClass::Realtime,
771        };
772        // Encode v2 then truncate the trailing class byte to
773        // simulate a legacy 55-byte frame.
774        let mut bytes = original.to_bytes();
775        bytes.pop(); // drop the class byte
776        assert_eq!(bytes.len(), SYNC_REQUEST_SIZE);
777        let decoded = SyncRequest::from_bytes(&bytes).expect("legacy decode");
778        // Everything else round-trips; class defaults to Foreground.
779        assert_eq!(decoded.channel_id, original.channel_id);
780        assert_eq!(decoded.since_seq, original.since_seq);
781        assert_eq!(decoded.chunk_max, original.chunk_max);
782        assert_eq!(decoded.request_id, original.request_id);
783        assert_eq!(
784            decoded.class,
785            super::super::bandwidth::BandwidthClass::Foreground
786        );
787    }
788
789    #[test]
790    fn sync_request_byte_layout_pinned() {
791        // Pin the byte layout exactly. Drift here is a wire-protocol
792        // break — every fielded peer would fail to decode.
793        let req = SyncRequest {
794            channel_id: ChannelId::from_bytes([0xAB; 32]),
795            since_seq: 0x0102_0304_0506_0708,
796            chunk_max: 0x1122_3344,
797            request_id: 0,
798            class: Default::default(),
799        };
800        let bytes = req.to_bytes();
801        // Subprotocol header is u16 LE = 0x0E00 → bytes [0x00, 0x0E];
802        // followed by dispatch_code 0x20.
803        assert_eq!(&bytes[..3], &[0x00, 0x0E, 0x20]);
804        assert_eq!(&bytes[3..35], &[0xAB; 32]);
805        assert_eq!(
806            &bytes[35..43],
807            &[0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01]
808        );
809        assert_eq!(&bytes[43..47], &[0x44, 0x33, 0x22, 0x11]);
810    }
811
812    #[test]
813    fn sync_request_rejects_wrong_dispatch() {
814        let mut bytes = SyncRequest {
815            channel_id: sample_channel_id(),
816            since_seq: 0,
817            chunk_max: 1,
818            request_id: 0,
819            class: Default::default(),
820        }
821        .to_bytes();
822        bytes[2] = DISPATCH_SYNC_RESPONSE; // wrong code
823        let err = SyncRequest::from_bytes(&bytes).expect_err("must reject");
824        assert!(matches!(err, WireError::DispatchMismatch { .. }));
825    }
826
827    #[test]
828    fn sync_request_rejects_wrong_subprotocol() {
829        let mut bytes = SyncRequest {
830            channel_id: sample_channel_id(),
831            since_seq: 0,
832            chunk_max: 1,
833            request_id: 0,
834            class: Default::default(),
835        }
836        .to_bytes();
837        bytes[0] = 0x00;
838        bytes[1] = 0x05; // SUBPROTOCOL_MIGRATION
839        let err = SyncRequest::from_bytes(&bytes).expect_err("must reject");
840        assert!(matches!(
841            err,
842            WireError::SubprotocolMismatch { got: 0x0500 }
843        ));
844    }
845
846    #[test]
847    fn sync_request_rejects_truncation() {
848        let bytes = SyncRequest {
849            channel_id: sample_channel_id(),
850            since_seq: 0,
851            chunk_max: 1,
852            request_id: 0,
853            class: Default::default(),
854        }
855        .to_bytes();
856        // v0.3 Phase D2 backward-compat: cuts at or above the
857        // legacy SYNC_REQUEST_SIZE (55 bytes) are valid frames
858        // — the v0.3 56-byte size includes a trailing class
859        // byte that legacy peers don't emit, so the decoder
860        // accepts both lengths. Only cuts below the legacy
861        // size are true truncations.
862        for cut in 0..SYNC_REQUEST_SIZE {
863            let err = SyncRequest::from_bytes(&bytes[..cut]).expect_err("must reject");
864            assert!(matches!(err, WireError::Truncated { .. }));
865        }
866        // Cut at SYNC_REQUEST_SIZE (legacy form) decodes cleanly.
867        let _legacy =
868            SyncRequest::from_bytes(&bytes[..SYNC_REQUEST_SIZE]).expect("legacy length decodes");
869    }
870
871    // ----------------------------------------------------------------
872    // SyncResponse round-trip
873    // ----------------------------------------------------------------
874
875    #[test]
876    fn sync_response_round_trip_empty_chunk() {
877        let original = SyncResponse {
878            channel_id: sample_channel_id(),
879            first_seq: 42,
880            leader_first_retained_seq: 42,
881            events: vec![],
882            request_id: 7,
883        };
884        let bytes = original.to_bytes();
885        let decoded = SyncResponse::from_bytes(&bytes).expect("decode");
886        assert_eq!(decoded, original);
887    }
888
889    #[test]
890    fn sync_response_round_trip_with_events() {
891        let original = SyncResponse {
892            channel_id: sample_channel_id(),
893            first_seq: 100,
894            leader_first_retained_seq: 50,
895            events: vec![
896                SyncEvent {
897                    event_seq: 100,
898                    payload: b"hello".to_vec(),
899                },
900                SyncEvent {
901                    event_seq: 101,
902                    payload: b"world".to_vec(),
903                },
904                SyncEvent {
905                    event_seq: 102,
906                    payload: vec![], // empty payload — explicitly representable
907                },
908            ],
909            request_id: 123,
910        };
911        let bytes = original.to_bytes();
912        let decoded = SyncResponse::from_bytes(&bytes).expect("decode");
913        assert_eq!(decoded, original);
914    }
915
916    /// R-5 codec pin: the new `leader_first_retained_seq` field
917    /// sits at offset 3 + 32 + 8 = 43 (after subprotocol header,
918    /// channel id, first_seq) and is u64 LE.
919    #[test]
920    fn sync_response_leader_first_retained_seq_byte_offset() {
921        let original = SyncResponse {
922            channel_id: sample_channel_id(),
923            first_seq: 0x0102_0304_0506_0708,
924            leader_first_retained_seq: 0x1112_1314_1516_1718,
925            events: vec![],
926            request_id: 0,
927        };
928        let bytes = original.to_bytes();
929        // Header (3) + channel_id (32) = 35; first_seq (8) at 35..43;
930        // leader_first_retained_seq (8) at 43..51.
931        assert_eq!(
932            &bytes[43..51],
933            &0x1112_1314_1516_1718_u64.to_le_bytes(),
934            "leader_first_retained_seq must be at offset 43..51 in LE form"
935        );
936    }
937
938    #[test]
939    fn sync_response_rejects_truncated_event_record() {
940        // Build a valid bytes buffer, then truncate inside the
941        // last event's payload to make sure the decoder doesn't
942        // panic on a malformed `payload_len`.
943        let bytes = SyncResponse {
944            channel_id: sample_channel_id(),
945            first_seq: 1,
946            leader_first_retained_seq: 0,
947            events: vec![SyncEvent {
948                event_seq: 1,
949                payload: b"truncated".to_vec(),
950            }],
951            request_id: 0,
952        }
953        .to_bytes();
954        // Cut off the last 3 bytes of the payload.
955        let err = SyncResponse::from_bytes(&bytes[..bytes.len() - 3]).expect_err("must reject");
956        assert!(matches!(err, WireError::Truncated { .. }));
957    }
958
959    // ----------------------------------------------------------------
960    // SyncHeartbeat round-trip
961    // ----------------------------------------------------------------
962
963    #[test]
964    fn sync_heartbeat_round_trip_each_role() {
965        for role in [
966            ReplicaRole::Leader,
967            ReplicaRole::Replica,
968            ReplicaRole::Candidate,
969            ReplicaRole::Idle,
970        ] {
971            let original = SyncHeartbeat {
972                channel_id: sample_channel_id(),
973                tail_seq: 0xCAFE,
974                role,
975                wall_clock_ms: 1_700_000_000_000,
976            };
977            let bytes = original.to_bytes();
978            assert_eq!(bytes.len(), SYNC_HEARTBEAT_SIZE);
979            let decoded = SyncHeartbeat::from_bytes(&bytes).expect("decode");
980            assert_eq!(decoded, original);
981        }
982    }
983
984    #[test]
985    fn sync_heartbeat_rejects_unknown_role() {
986        let mut bytes = SyncHeartbeat {
987            channel_id: sample_channel_id(),
988            tail_seq: 0,
989            role: ReplicaRole::Leader,
990            wall_clock_ms: 0,
991        }
992        .to_bytes();
993        // role byte is at offset 3 + 32 + 8 = 43
994        bytes[43] = 99;
995        let err = SyncHeartbeat::from_bytes(&bytes).expect_err("must reject");
996        assert!(matches!(err, WireError::BadRole(99)));
997    }
998
999    // ----------------------------------------------------------------
1000    // SyncNack round-trip
1001    // ----------------------------------------------------------------
1002
1003    #[test]
1004    fn sync_nack_round_trip_each_error() {
1005        for error_code in [
1006            SyncNackError::NotLeader,
1007            SyncNackError::BadRange,
1008            SyncNackError::Backpressure,
1009            SyncNackError::ChannelClosed,
1010        ] {
1011            let original = SyncNack {
1012                channel_id: sample_channel_id(),
1013                since_seq: 12345,
1014                error_code,
1015                leader_first_retained_seq: 9999,
1016                detail: format!("test detail for {:?}", error_code),
1017                request_id: 0,
1018            };
1019            let bytes = original.to_bytes();
1020            let decoded = SyncNack::from_bytes(&bytes).expect("decode");
1021            assert_eq!(decoded, original);
1022        }
1023    }
1024
1025    #[test]
1026    fn sync_nack_empty_detail_round_trips() {
1027        let original = SyncNack {
1028            channel_id: sample_channel_id(),
1029            since_seq: 0,
1030            error_code: SyncNackError::NotLeader,
1031            leader_first_retained_seq: 0,
1032            detail: String::new(),
1033            request_id: 0,
1034        };
1035        let bytes = original.to_bytes();
1036        let decoded = SyncNack::from_bytes(&bytes).expect("decode");
1037        assert_eq!(decoded, original);
1038    }
1039
1040    #[test]
1041    fn sync_nack_truncates_oversized_detail() {
1042        // detail longer than u16::MAX gets silently truncated rather
1043        // than failing the encode — the structured error code is the
1044        // load-bearing part; detail is operator-facing logging.
1045        let huge = "x".repeat(SYNC_NACK_DETAIL_MAX + 1000);
1046        let original = SyncNack {
1047            channel_id: sample_channel_id(),
1048            since_seq: 0,
1049            error_code: SyncNackError::Backpressure,
1050            leader_first_retained_seq: 0,
1051            detail: huge.clone(),
1052            request_id: 0,
1053        };
1054        let bytes = original.to_bytes();
1055        let decoded = SyncNack::from_bytes(&bytes).expect("decode");
1056        assert_eq!(decoded.detail.len(), SYNC_NACK_DETAIL_MAX);
1057        assert!(huge.starts_with(&decoded.detail));
1058    }
1059
1060    #[test]
1061    fn sync_nack_rejects_unknown_error_code() {
1062        let mut bytes = SyncNack {
1063            channel_id: sample_channel_id(),
1064            since_seq: 0,
1065            error_code: SyncNackError::NotLeader,
1066            leader_first_retained_seq: 0,
1067            detail: String::new(),
1068            request_id: 0,
1069        }
1070        .to_bytes();
1071        // error_code byte is at offset 3 + 32 + 8 = 43
1072        bytes[43] = 0;
1073        let err = SyncNack::from_bytes(&bytes).expect_err("must reject");
1074        assert!(matches!(err, WireError::BadErrorCode(0)));
1075    }
1076
1077    #[test]
1078    fn sync_nack_truncated_payload_reports_correct_need() {
1079        // Build a valid encoding then chop bytes off the tail. The
1080        // reported `need` must equal the full encoded length — i.e.
1081        // consumed-so-far + still-needed — not a double-counted value
1082        // that overstates the requirement.
1083        let original = SyncNack {
1084            channel_id: sample_channel_id(),
1085            since_seq: 0x4242_4242_4242_4242,
1086            error_code: SyncNackError::BadRange,
1087            leader_first_retained_seq: 100,
1088            detail: "hello".to_string(),
1089            request_id: 0,
1090        };
1091        let full = original.to_bytes();
1092        // Lop one byte off so the detail body is short. We expect
1093        // `need == full.len()` and `have == truncated.len()`.
1094        let truncated = &full[..full.len() - 1];
1095        match SyncNack::from_bytes(truncated).expect_err("must error") {
1096            WireError::Truncated { need, have } => {
1097                assert_eq!(need, full.len());
1098                assert_eq!(have, truncated.len());
1099            }
1100            other => panic!("unexpected error: {other:?}"),
1101        }
1102    }
1103
1104    #[test]
1105    fn sync_nack_truncates_oversized_detail_at_char_boundary() {
1106        // Build a detail string whose byte cut at SYNC_NACK_DETAIL_MAX
1107        // would split a multi-byte codepoint. The encoder must back
1108        // off to a valid UTF-8 boundary so the decoder accepts the
1109        // frame.
1110        // "é" is 2 bytes (0xC3 0xA9). Pad with ASCII so the offset
1111        // exactly straddles the cut.
1112        let pad_len = SYNC_NACK_DETAIL_MAX - 1;
1113        let mut detail = "a".repeat(pad_len);
1114        detail.push('é'); // 2 bytes; the second byte falls past the cap
1115        debug_assert!(detail.len() > SYNC_NACK_DETAIL_MAX);
1116        let original = SyncNack {
1117            channel_id: sample_channel_id(),
1118            since_seq: 0,
1119            error_code: SyncNackError::Backpressure,
1120            leader_first_retained_seq: 0,
1121            detail,
1122            request_id: 0,
1123        };
1124        let bytes = original.to_bytes();
1125        let decoded = SyncNack::from_bytes(&bytes).expect("decode after truncate");
1126        assert_eq!(decoded.detail.len(), pad_len);
1127    }
1128
1129    #[test]
1130    fn sync_nack_rejects_invalid_utf8() {
1131        let mut bytes = SyncNack {
1132            channel_id: sample_channel_id(),
1133            since_seq: 0,
1134            error_code: SyncNackError::BadRange,
1135            leader_first_retained_seq: 0,
1136            detail: "ascii".to_string(),
1137            request_id: 0,
1138        }
1139        .to_bytes();
1140        // detail starts at offset 3 + 32 + 8 + 1 + 8 + 8 + 2 = 62
1141        // (the extra 8 is the request_id echo);
1142        // replace with an invalid UTF-8 byte sequence of the same
1143        // length.
1144        let detail_start = 62;
1145        let detail_len = bytes.len() - detail_start;
1146        for i in 0..detail_len {
1147            bytes[detail_start + i] = 0xC0; // invalid lead byte
1148        }
1149        let err = SyncNack::from_bytes(&bytes).expect_err("must reject");
1150        assert!(matches!(err, WireError::InvalidUtf8));
1151    }
1152
1153    // ----------------------------------------------------------------
1154    // Dispatch-code reservations — pin the constants so a renumbering
1155    // surface change in a future slice is loud.
1156    // ----------------------------------------------------------------
1157
1158    #[test]
1159    fn dispatch_codes_pinned() {
1160        assert_eq!(DISPATCH_SYNC_REQUEST, 0x20);
1161        assert_eq!(DISPATCH_SYNC_RESPONSE, 0x21);
1162        assert_eq!(DISPATCH_SYNC_HEARTBEAT, 0x22);
1163        assert_eq!(DISPATCH_SYNC_NACK, 0x23);
1164        assert_eq!(DISPATCH_REPLICA_SYNC_RESERVED_END, 0x30);
1165    }
1166
1167    #[test]
1168    fn subprotocol_id_pinned() {
1169        assert_eq!(SUBPROTOCOL_REDEX, 0x0E00);
1170    }
1171}