net/adapter/net/redex/replication.rs
1//! RedEX Distributed wire protocol — `SUBPROTOCOL_REDEX` and the four
2//! `DISPATCH_REPLICA_SYNC` codes that ride on top of the existing
3//! reliable-stream `Mesh::publish` machinery.
4//!
5//! Phase A scaffold of `docs/plans/REDEX_DISTRIBUTED_PLAN.md`. Implements
6//! the byte layouts pinned in §2 of that plan:
7//!
8//! - `SyncRequest` (`0x20`, replica → leader) — 47 bytes, fixed.
9//! - `SyncResponse` (`0x21`, leader → replica) — variable; bounded by
10//! the matching request's `chunk_max`.
11//! - `SyncHeartbeat` (`0x22`, bidirectional) — 52 bytes, fixed.
12//! - `SyncNack` (`0x23`, leader → replica) — variable; carries
13//! an optional UTF-8 diagnostic.
14//!
15//! Encoding conventions (LOCKED, mirroring §2 of the plan):
16//!
17//! - Multi-byte integers are **little-endian, fixed-width** — no varints.
18//! - The standard subprotocol header (`subprotocol_id: u16 LE` +
19//! `dispatch_code: u8`) prefixes every message — 3 bytes.
20//! - `ChannelId` is the 32-byte BLAKE2s hash of the channel name.
21//! - Length-prefixed strings: `(u16 LE len, [len] utf-8 bytes)`.
22//! - Range encoding (used by future reserved variants): `(u64 LE start,
23//! u64 LE end)`, half-open `[start, end)`.
24//!
25//! Election is wire-free — `StandbyGroup` invokes RedEX's deterministic
26//! `elect()` selection function from local state, so no `LEADER_ELECTION`
27//! dispatch code exists. Reserved range `0x24..=0x2F` (12 codes) is held
28//! for future variants (range-bounded sync, parallel-stream sync, etc.).
29//!
30//! Codec layer only — daemon, heartbeat loop, election integration, and
31//! `ReplicationCoordinator` itself land in Phases C / D / E.
32
33use blake2::{
34 digest::{generic_array::typenum::U32, FixedOutput, KeyInit, Mac},
35 Blake2sMac,
36};
37use bytes::{Buf, BufMut};
38
39use super::super::channel::ChannelName;
40
41/// Subprotocol ID for RedEX Distributed replication. Claims `0x0E00`
42/// in the `SUBPROTOCOLS.md` registry; the high byte (`0x0E`) is the
43/// next free family above capability (`0x0C`) and reflex (`0x0D`).
44pub const SUBPROTOCOL_REDEX: u16 = 0x0E00;
45
46/// Replica → leader: ask for events `[since_seq, since_seq + chunk_max)`.
47pub const DISPATCH_SYNC_REQUEST: u8 = 0x20;
48/// Leader → replica: bounded chunk of events.
49pub const DISPATCH_SYNC_RESPONSE: u8 = 0x21;
50/// Bidirectional liveness + tail-seq heartbeat.
51pub const DISPATCH_SYNC_HEARTBEAT: u8 = 0x22;
52/// Leader → replica: structured rejection (typed `error_code`).
53pub const DISPATCH_SYNC_NACK: u8 = 0x23;
54
55/// Reserved range upper bound (exclusive) for future
56/// `DISPATCH_REPLICA_SYNC` variants. `0x24..0x2F` is reserved for
57/// range-bounded sync, parallel-stream sync, etc.; document each new
58/// code in `SUBPROTOCOLS.md` as it lands.
59pub const DISPATCH_REPLICA_SYNC_RESERVED_END: u8 = 0x30;
60
61/// Fixed encoded size of a v0.2 [`SyncRequest`] message including
62/// the 3-byte subprotocol header. Legacy peers serialize at this
63/// size; v0.3 Phase D adds one trailing class byte (see
64/// [`SYNC_REQUEST_SIZE_V2_CLASS`]). The decoder accepts both
65/// sizes — frames missing the trailing byte decode as
66/// [`BandwidthClass::Foreground`](super::bandwidth::BandwidthClass::Foreground).
67pub const SYNC_REQUEST_SIZE: usize = 3 + 32 + 8 + 4 + 8; // 55
68
69/// v0.3 Phase D encoded size — `SYNC_REQUEST_SIZE` plus one byte
70/// for the wire-encoded [`BandwidthClass`](super::bandwidth::BandwidthClass).
71/// New senders always emit this size; new readers accept both
72/// sizes (legacy 55-byte frames degrade to `Foreground`).
73pub const SYNC_REQUEST_SIZE_V2_CLASS: usize = SYNC_REQUEST_SIZE + 1; // 56
74
75/// Fixed encoded size of a [`SyncHeartbeat`] message including the
76/// 3-byte subprotocol header.
77pub const SYNC_HEARTBEAT_SIZE: usize = 3 + 32 + 8 + 1 + 8; // 52
78
79/// Domain-separation label for the BLAKE2s hash that turns a channel
80/// name into a 32-byte `ChannelId`. Picked once, frozen — changing it
81/// would invalidate every `ChannelId` on the wire.
82const CHANNEL_ID_LABEL: &[u8] = b"redex-channel-id-v1";
83
84/// 32-byte channel identifier — BLAKE2s of the channel name with a
85/// domain-separation label. Distinct from `ChannelName::hash() -> u16`
86/// (the routing hint), which has routine collisions at mesh scale.
87/// The replication protocol needs an identifier with negligible
88/// collision probability so two channels can't accidentally observe
89/// each other's heartbeats.
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
91pub struct ChannelId([u8; 32]);
92
93impl ChannelId {
94 /// Compute the `ChannelId` for a channel name.
95 pub fn from_name(name: &ChannelName) -> Self {
96 Self::from_str_internal(name.as_str())
97 }
98
99 /// Internal helper so tests can hash literal strings without
100 /// constructing a [`ChannelName`].
101 #[expect(
102 clippy::expect_used,
103 reason = "Blake2sMac::new_from_slice rejects only keys longer than 32 bytes; CHANNEL_ID_LABEL is a short compile-time-constant label"
104 )]
105 fn from_str_internal(s: &str) -> Self {
106 let mut mac = <Blake2sMac<U32> as KeyInit>::new_from_slice(CHANNEL_ID_LABEL)
107 .expect("BLAKE2s accepts variable-length keys");
108 Mac::update(&mut mac, s.as_bytes());
109 let bytes = mac.finalize_fixed();
110 let mut out = [0u8; 32];
111 out.copy_from_slice(&bytes);
112 Self(out)
113 }
114
115 /// Construct from raw bytes — used by the decode path.
116 pub const fn from_bytes(bytes: [u8; 32]) -> Self {
117 Self(bytes)
118 }
119
120 /// Borrow the 32-byte representation.
121 pub fn as_bytes(&self) -> &[u8; 32] {
122 &self.0
123 }
124}
125
126/// `ReplicaState` discriminator carried on the wire in
127/// [`SyncHeartbeat`] messages. The four-state model is pinned at §3 of
128/// the plan; this enum is the encoding view of those states.
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum ReplicaRole {
131 /// Sole appender for the channel.
132 Leader = 0,
133 /// Catching up or steady-state lagging by ≤ 1 heartbeat.
134 Replica = 1,
135 /// Brief transient: leader-loss detected, computing the
136 /// deterministic election winner. Microseconds; not a broadcast wait.
137 Candidate = 2,
138 /// Holds the channel's storage but has no replica role.
139 Idle = 3,
140}
141
142impl ReplicaRole {
143 fn from_wire(byte: u8) -> Option<Self> {
144 match byte {
145 0 => Some(Self::Leader),
146 1 => Some(Self::Replica),
147 2 => Some(Self::Candidate),
148 3 => Some(Self::Idle),
149 _ => None,
150 }
151 }
152
153 fn to_wire(self) -> u8 {
154 self as u8
155 }
156}
157
158/// Typed rejection error in [`SyncNack`]. Replicas key their retry
159/// policy on the variant — never silently treat as transport-level
160/// failure (the reliable-stream layer surfaces transport errors
161/// separately).
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub enum SyncNackError {
164 /// Receiver is not the leader for this channel. Replica should
165 /// re-resolve leadership via `Mesh::find_chain_holders`.
166 NotLeader = 1,
167 /// `since_seq` lies outside the leader's retained range. Replica
168 /// should trim its local tail and retry from the leader's first
169 /// available seq.
170 BadRange = 2,
171 /// Leader is currently saturated. Replica should exponentially
172 /// back off and retry the same request.
173 Backpressure = 3,
174 /// Channel was closed. Replica withdraws its role and emits a
175 /// metric.
176 ChannelClosed = 4,
177}
178
179impl SyncNackError {
180 fn from_wire(byte: u8) -> Option<Self> {
181 match byte {
182 1 => Some(Self::NotLeader),
183 2 => Some(Self::BadRange),
184 3 => Some(Self::Backpressure),
185 4 => Some(Self::ChannelClosed),
186 _ => None,
187 }
188 }
189
190 fn to_wire(self) -> u8 {
191 self as u8
192 }
193}
194
195/// One event record inside a [`SyncResponse`] chunk. `event_seq`
196/// values are strictly increasing across a chunk; gaps within a chunk
197/// are not permitted (gaps come as explicit skip-ahead in Phase D).
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub struct SyncEvent {
200 /// Monotonic sequence number assigned by the channel's leader.
201 pub event_seq: u64,
202 /// Opaque event body bytes — the layer-7 payload.
203 pub payload: Vec<u8>,
204}
205
206/// Replica → leader: pull request for events
207/// `[since_seq, since_seq + chunk_max)`. 55 bytes (v0.2 legacy)
208/// or 56 bytes (v0.3 Phase D with trailing class byte).
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct SyncRequest {
211 /// 32-byte BLAKE2s hash of the channel name.
212 pub channel_id: ChannelId,
213 /// First sequence number the replica wants from the leader's
214 /// retained range. Inclusive.
215 pub since_seq: u64,
216 /// Maximum payload bytes the leader may send in the matching
217 /// [`SyncResponse`].
218 pub chunk_max: u32,
219 /// Replica-minted correlation token. The leader echoes it
220 /// verbatim on the matching `SyncResponse` / `SyncNack`. The
221 /// replica's `OutstandingRequests` set holds the minted token
222 /// until the response lands; responses or NACKs whose token
223 /// is not in the set are dropped. Random 64-bit value drawn
224 /// from `getrandom` per request so a stale NACK from a prior
225 /// epoch (or any peer that observed wire traffic) cannot match
226 /// an in-flight request the replica is currently waiting on.
227 pub request_id: u64,
228 /// v0.3 Phase D per-request bandwidth-class hint. Receiver-
229 /// stamped; the leader's admission gate honors this in
230 /// preference to the channel's
231 /// [`ReplicationConfig::default_bandwidth_class`](super::replication_config::ReplicationConfig).
232 /// Encoded as a 1-byte trailing field on the wire; legacy
233 /// 55-byte frames omit it and decode as
234 /// [`BandwidthClass::Foreground`](super::bandwidth::BandwidthClass::Foreground).
235 pub class: super::bandwidth::BandwidthClass,
236}
237
238/// Leader → replica: bounded chunk of events answering the matching
239/// [`SyncRequest`]. Variable size; bounded by `chunk_max` from the
240/// request side.
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct SyncResponse {
243 /// 32-byte BLAKE2s hash of the channel name.
244 pub channel_id: ChannelId,
245 /// Sequence number of `events[0]` in this chunk. Replicas use
246 /// this to detect server-side trimming (`first_seq` greater than
247 /// the request's `since_seq` means the leader no longer retains
248 /// the requested range).
249 pub first_seq: u64,
250 /// **R-5 disambiguation:** leader's first retained seq at the
251 /// time of this response. Lets the replica tell a legitimate
252 /// retention trim (`first_seq == leader_first_retained_seq`) from
253 /// a divergent-log split-brain (`first_seq >
254 /// leader_first_retained_seq` AND replica's local tail had data
255 /// in `[leader_first_retained_seq, first_seq)`). The replica
256 /// still does the skip-ahead in both cases (safety) but
257 /// observability-wise the divergence case is flagged with a
258 /// distinct metric for operator review.
259 pub leader_first_retained_seq: u64,
260 /// Echo of the matching `SyncRequest::request_id`. Replicas
261 /// drop responses whose token is not in their in-flight set
262 /// (per-leader pending state). Without this, a stale response
263 /// from a prior request the replica had already timed out and
264 /// re-issued would land on the current request's apply path.
265 pub request_id: u64,
266 /// In-order event records. `event_seq` increases monotonically
267 /// across the slice; no gaps within a chunk.
268 pub events: Vec<SyncEvent>,
269}
270
271/// Bidirectional liveness heartbeat. Leader emits these to all
272/// replicas at `heartbeat_ms` cadence; replicas emit their own
273/// `tail_seq` back to the leader so the leader can observe lag.
274#[derive(Debug, Clone, PartialEq, Eq)]
275pub struct SyncHeartbeat {
276 /// 32-byte BLAKE2s hash of the channel name.
277 pub channel_id: ChannelId,
278 /// Sender's current tail sequence number.
279 pub tail_seq: u64,
280 /// Sender's `ReplicaState` — operator-facing observability only;
281 /// receivers don't make routing decisions on this field (those
282 /// route through the capability layer's `causal:` tags).
283 pub role: ReplicaRole,
284 /// Sender's monotonic-clock milliseconds. Used **only** for drift
285 /// detection (operator-facing); never consumed for ordering or
286 /// liveness logic — those route through `tail_seq` + reliable-
287 /// stream ack accounting.
288 pub wall_clock_ms: u64,
289}
290
291/// Leader → replica: structured rejection. The leader MUST emit this
292/// (rather than silently closing the stream) on every rejection
293/// reason that isn't a transport-level failure — silent close is
294/// reserved for the latter.
295#[derive(Debug, Clone, PartialEq, Eq)]
296pub struct SyncNack {
297 /// 32-byte BLAKE2s hash of the channel name.
298 pub channel_id: ChannelId,
299 /// Echoes the rejected request's `since_seq` so the replica can
300 /// correlate the NACK with the in-flight request that triggered
301 /// it.
302 pub since_seq: u64,
303 /// Typed rejection reason. Replicas key their retry policy here.
304 pub error_code: SyncNackError,
305 /// Leader's first-retained sequence at the time of the reject.
306 /// Populated by the leader on `BadRange` so the replica can
307 /// `skip_to(leader_first_retained_seq)` in one round trip
308 /// instead of advancing by one per `BadRange` cycle. Set to
309 /// `0` on other error codes; receivers ignore it outside the
310 /// `BadRange` arm.
311 pub leader_first_retained_seq: u64,
312 /// Echo of the matching `SyncRequest::request_id`. Replicas
313 /// drop NACKs whose token is not in their in-flight set.
314 /// Without this, a stale `BadRange` NACK from a prior request
315 /// the replica had timed out and re-issued would still drive
316 /// `skip_to` on the local file — destructive on retry.
317 pub request_id: u64,
318 /// Optional human-readable diagnostic. UTF-8 encoded; may be
319 /// empty. The replica's retry policy keys off `error_code` only —
320 /// `detail` is for operator logs.
321 pub detail: String,
322}
323
324/// Errors surfacing from the decode path. Mirrors the typed-error
325/// shape the rest of the substrate uses for fallible decoders.
326#[derive(Debug, thiserror::Error, PartialEq, Eq)]
327pub enum WireError {
328 /// Buffer is shorter than the encoded message demands.
329 #[error("redex wire truncated: need {need} bytes, have {have}")]
330 Truncated {
331 /// Minimum bytes the decoder needed to make progress.
332 need: usize,
333 /// Bytes actually available in the input.
334 have: usize,
335 },
336 /// Subprotocol header doesn't match `SUBPROTOCOL_REDEX`.
337 #[error("redex wire subprotocol mismatch: got {got:#06x}, expected {SUBPROTOCOL_REDEX:#06x}")]
338 SubprotocolMismatch {
339 /// Subprotocol id observed in the header.
340 got: u16,
341 },
342 /// Dispatch code doesn't match the decoder being invoked, or
343 /// falls outside the reserved `0x20..=0x2F` range entirely.
344 #[error("redex wire dispatch code {got:#04x} does not match expected {expected:#04x}")]
345 DispatchMismatch {
346 /// Dispatch byte observed in the header.
347 got: u8,
348 /// Dispatch byte the decoder being invoked is keyed on.
349 expected: u8,
350 },
351 /// `role` byte in a [`SyncHeartbeat`] is outside the `0..=3`
352 /// range the four-state model pins.
353 #[error("redex wire role byte {0} is not a valid ReplicaRole (0..=3)")]
354 BadRole(u8),
355 /// `error_code` byte in a [`SyncNack`] is outside the `1..=4`
356 /// range the typed-error variants pin.
357 #[error("redex wire error_code {0} is not a valid SyncNackError (1..=4)")]
358 BadErrorCode(u8),
359 /// `detail` bytes in a [`SyncNack`] are not valid UTF-8.
360 #[error("redex wire NACK detail is not valid UTF-8")]
361 InvalidUtf8,
362}
363
364/// Write the standard 3-byte subprotocol header
365/// (`SUBPROTOCOL_REDEX` + `dispatch_code`) to `buf`.
366fn put_header(buf: &mut Vec<u8>, dispatch: u8) {
367 buf.put_u16_le(SUBPROTOCOL_REDEX);
368 buf.put_u8(dispatch);
369}
370
371/// Validate the standard 3-byte subprotocol header on `data` and
372/// return the remaining payload slice. Errors on truncation,
373/// subprotocol mismatch, or dispatch-code mismatch.
374fn check_header(data: &[u8], expected_dispatch: u8) -> Result<&[u8], WireError> {
375 if data.len() < 3 {
376 return Err(WireError::Truncated {
377 need: 3,
378 have: data.len(),
379 });
380 }
381 let mut cursor = &data[..3];
382 let subprotocol = cursor.get_u16_le();
383 let dispatch = cursor.get_u8();
384 if subprotocol != SUBPROTOCOL_REDEX {
385 return Err(WireError::SubprotocolMismatch { got: subprotocol });
386 }
387 if dispatch != expected_dispatch {
388 return Err(WireError::DispatchMismatch {
389 got: dispatch,
390 expected: expected_dispatch,
391 });
392 }
393 Ok(&data[3..])
394}
395
396/// Read a `ChannelId` from `cursor`. Caller is responsible for
397/// ensuring `cursor.remaining() >= 32`.
398fn get_channel_id(cursor: &mut &[u8]) -> ChannelId {
399 let mut id = [0u8; 32];
400 id.copy_from_slice(&cursor[..32]);
401 cursor.advance(32);
402 ChannelId::from_bytes(id)
403}
404
405// ============================================================================
406// SyncRequest — 0x20, replica → leader
407// ============================================================================
408
409impl SyncRequest {
410 /// Serialize to bytes. v0.3 Phase D senders always emit
411 /// [`SYNC_REQUEST_SIZE_V2_CLASS`] (56 bytes) — the trailing
412 /// class byte is the only delta from the v0.2 layout.
413 pub fn to_bytes(&self) -> Vec<u8> {
414 let mut buf = Vec::with_capacity(SYNC_REQUEST_SIZE_V2_CLASS);
415 put_header(&mut buf, DISPATCH_SYNC_REQUEST);
416 buf.put_slice(self.channel_id.as_bytes());
417 buf.put_u64_le(self.since_seq);
418 buf.put_u32_le(self.chunk_max);
419 buf.put_u64_le(self.request_id);
420 buf.put_u8(self.class.as_u8());
421 debug_assert_eq!(buf.len(), SYNC_REQUEST_SIZE_V2_CLASS);
422 buf
423 }
424
425 /// Deserialize from bytes. Accepts both v0.2 (55 bytes) and
426 /// v0.3 Phase D (56 bytes) frames — a v0.2 frame missing
427 /// the trailing class byte decodes as
428 /// [`BandwidthClass::Foreground`](super::bandwidth::BandwidthClass::Foreground)
429 /// for backward compat. An unknown class discriminant value
430 /// (forward-compat) also decodes as `Foreground` — see
431 /// [`BandwidthClass::from_wire_or_default`](super::bandwidth::BandwidthClass::from_wire_or_default).
432 pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
433 let payload = check_header(data, DISPATCH_SYNC_REQUEST)?;
434 if payload.len() < SYNC_REQUEST_SIZE - 3 {
435 return Err(WireError::Truncated {
436 need: SYNC_REQUEST_SIZE,
437 have: data.len(),
438 });
439 }
440 let mut cursor = payload;
441 let channel_id = get_channel_id(&mut cursor);
442 let since_seq = cursor.get_u64_le();
443 let chunk_max = cursor.get_u32_le();
444 let request_id = cursor.get_u64_le();
445 // Trailing class byte (v0.3+ Phase D). Absent on legacy
446 // 55-byte frames; defaults to Foreground.
447 let class = if cursor.has_remaining() {
448 super::bandwidth::BandwidthClass::from_wire_or_default(cursor.get_u8())
449 } else {
450 super::bandwidth::BandwidthClass::default()
451 };
452 Ok(Self {
453 channel_id,
454 since_seq,
455 chunk_max,
456 request_id,
457 class,
458 })
459 }
460}
461
462// ============================================================================
463// SyncResponse — 0x21, leader → replica
464// ============================================================================
465
466impl SyncResponse {
467 /// Serialize to bytes. Variable size: header + 32 + 8 + 8 + 8 +
468 /// 4 + Σ(8 + 4 + payload.len()) over events.
469 /// (R-23: added 8 bytes for `request_id` echo.)
470 pub fn to_bytes(&self) -> Vec<u8> {
471 // Cap the pre-allocation against `u32::MAX` — `event_count`
472 // is the wire-format width, so we can't honestly encode
473 // more than `u32::MAX` events anyway, and on 32-bit hosts
474 // the multiplication below would overflow `usize` otherwise.
475 let events_size: usize = self.events.iter().map(|e| 8 + 4 + e.payload.len()).sum();
476 let mut buf = Vec::with_capacity(3 + 32 + 8 + 8 + 8 + 4 + events_size);
477 put_header(&mut buf, DISPATCH_SYNC_RESPONSE);
478 buf.put_slice(self.channel_id.as_bytes());
479 buf.put_u64_le(self.first_seq);
480 buf.put_u64_le(self.leader_first_retained_seq);
481 buf.put_u64_le(self.request_id);
482 // `events.len()` wider than u32::MAX is impossible to
483 // represent on the wire — clamp via saturating cast. In
484 // practice callers honor `chunk_max` (bounded u32) so the
485 // saturation is dead code, but stay safe.
486 debug_assert!(
487 self.events.len() <= u32::MAX as usize,
488 "events.len() {} exceeds u32::MAX",
489 self.events.len()
490 );
491 let event_count = u32::try_from(self.events.len()).unwrap_or(u32::MAX);
492 buf.put_u32_le(event_count);
493 for event in &self.events {
494 buf.put_u64_le(event.event_seq);
495 debug_assert!(event.payload.len() <= u32::MAX as usize);
496 let payload_len = u32::try_from(event.payload.len()).unwrap_or(u32::MAX);
497 buf.put_u32_le(payload_len);
498 buf.put_slice(&event.payload);
499 }
500 buf
501 }
502
503 /// Deserialize from bytes. Errors on truncation or header
504 /// mismatch. Validates each event-record's length prefix
505 /// against the remaining buffer so a malformed `payload_len`
506 /// can't trigger a panic.
507 pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
508 let payload = check_header(data, DISPATCH_SYNC_RESPONSE)?;
509 let prefix_needed = 32 + 8 + 8 + 8 + 4;
510 if payload.len() < prefix_needed {
511 return Err(WireError::Truncated {
512 need: 3 + prefix_needed,
513 have: data.len(),
514 });
515 }
516 let mut cursor = payload;
517 let channel_id = get_channel_id(&mut cursor);
518 let first_seq = cursor.get_u64_le();
519 let leader_first_retained_seq = cursor.get_u64_le();
520 let request_id = cursor.get_u64_le();
521 let event_count = cursor.get_u32_le() as usize;
522 // R-36: cap the pre-allocation at 4096 events to bound a
523 // hostile `event_count` (e.g. peer sending a maximum-u32
524 // count without the matching payload bytes). Legitimate
525 // chunks above 4096 events incur progressive grow-and-
526 // copy, but the byte budget (`chunk_max` ≤ 64 MiB)
527 // means an over-4096-event chunk averages payload <
528 // 16 KiB / event, which is comfortably small.
529 let mut events = Vec::with_capacity(event_count.min(4096));
530 for _ in 0..event_count {
531 if cursor.remaining() < 8 + 4 {
532 // Report total bytes needed correctly —
533 // consumed-so-far + still-needed.
534 let consumed = data.len() - cursor.remaining();
535 return Err(WireError::Truncated {
536 need: consumed + (8 + 4),
537 have: data.len(),
538 });
539 }
540 let event_seq = cursor.get_u64_le();
541 let payload_len = cursor.get_u32_le() as usize;
542 if cursor.remaining() < payload_len {
543 let consumed = data.len() - cursor.remaining();
544 // checked_add — on 32-bit targets a u32 payload_len
545 // close to u32::MAX would overflow the plain `+`
546 // expression (consumed ≥ payload_len lower-bound
547 // doesn't shrink the danger window). Same fix shape
548 // as the umbrella's L-9 for BufferedEvents.
549 let need = consumed
550 .checked_add(payload_len)
551 .ok_or(WireError::Truncated {
552 need: usize::MAX,
553 have: data.len(),
554 })?;
555 return Err(WireError::Truncated {
556 need,
557 have: data.len(),
558 });
559 }
560 let event_payload = cursor[..payload_len].to_vec();
561 cursor.advance(payload_len);
562 events.push(SyncEvent {
563 event_seq,
564 payload: event_payload,
565 });
566 }
567 Ok(Self {
568 channel_id,
569 first_seq,
570 leader_first_retained_seq,
571 request_id,
572 events,
573 })
574 }
575}
576
577// ============================================================================
578// SyncHeartbeat — 0x22, bidirectional
579// ============================================================================
580
581impl SyncHeartbeat {
582 /// Serialize to bytes. Fixed [`SYNC_HEARTBEAT_SIZE`] (52) bytes.
583 pub fn to_bytes(&self) -> Vec<u8> {
584 let mut buf = Vec::with_capacity(SYNC_HEARTBEAT_SIZE);
585 put_header(&mut buf, DISPATCH_SYNC_HEARTBEAT);
586 buf.put_slice(self.channel_id.as_bytes());
587 buf.put_u64_le(self.tail_seq);
588 buf.put_u8(self.role.to_wire());
589 buf.put_u64_le(self.wall_clock_ms);
590 debug_assert_eq!(buf.len(), SYNC_HEARTBEAT_SIZE);
591 buf
592 }
593
594 /// Deserialize from bytes. Errors on truncation, header
595 /// mismatch, or `role` byte outside `0..=3`.
596 pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
597 let payload = check_header(data, DISPATCH_SYNC_HEARTBEAT)?;
598 if payload.len() < SYNC_HEARTBEAT_SIZE - 3 {
599 return Err(WireError::Truncated {
600 need: SYNC_HEARTBEAT_SIZE,
601 have: data.len(),
602 });
603 }
604 let mut cursor = payload;
605 let channel_id = get_channel_id(&mut cursor);
606 let tail_seq = cursor.get_u64_le();
607 let role_byte = cursor.get_u8();
608 let role = ReplicaRole::from_wire(role_byte).ok_or(WireError::BadRole(role_byte))?;
609 let wall_clock_ms = cursor.get_u64_le();
610 Ok(Self {
611 channel_id,
612 tail_seq,
613 role,
614 wall_clock_ms,
615 })
616 }
617}
618
619// ============================================================================
620// SyncNack — 0x23, leader → replica
621// ============================================================================
622
623/// Maximum permitted length of a [`SyncNack::detail`] string on the
624/// wire. The `detail_len` field is u16 LE, so the absolute ceiling
625/// is `u16::MAX`; this constant matches that and lives here so
626/// callers can opt to truncate diagnostic text rather than failing
627/// the encode.
628pub const SYNC_NACK_DETAIL_MAX: usize = u16::MAX as usize;
629
630impl SyncNack {
631 /// Serialize to bytes. Variable size: header + 32 + 8 + 1 + 8 +
632 /// 8 + 2 + detail.len(). Truncates `detail` to
633 /// [`SYNC_NACK_DETAIL_MAX`] if longer — the protocol can't
634 /// represent a longer string and silently truncating the
635 /// diagnostic is preferable to losing the structured error
636 /// code entirely.
637 pub fn to_bytes(&self) -> Vec<u8> {
638 // Truncate `detail` at a UTF-8 char boundary ≤ the wire cap.
639 // A byte-aligned cut can split a multi-byte codepoint,
640 // producing invalid UTF-8 that the decoder's `from_utf8`
641 // check then rejects — losing the structured error code.
642 let detail_str = if self.detail.len() <= SYNC_NACK_DETAIL_MAX {
643 self.detail.as_str()
644 } else {
645 let mut cut = SYNC_NACK_DETAIL_MAX;
646 while cut > 0 && !self.detail.is_char_boundary(cut) {
647 cut -= 1;
648 }
649 &self.detail[..cut]
650 };
651 let detail_bytes = detail_str.as_bytes();
652 let detail_len = detail_bytes.len();
653 let mut buf = Vec::with_capacity(3 + 32 + 8 + 1 + 8 + 8 + 2 + detail_len);
654 put_header(&mut buf, DISPATCH_SYNC_NACK);
655 buf.put_slice(self.channel_id.as_bytes());
656 buf.put_u64_le(self.since_seq);
657 buf.put_u8(self.error_code.to_wire());
658 buf.put_u64_le(self.leader_first_retained_seq);
659 buf.put_u64_le(self.request_id);
660 buf.put_u16_le(detail_len as u16);
661 buf.put_slice(detail_bytes);
662 buf
663 }
664
665 /// Deserialize from bytes. Errors on truncation, header
666 /// mismatch, `error_code` outside `1..=4`, or non-UTF-8 detail.
667 pub fn from_bytes(data: &[u8]) -> Result<Self, WireError> {
668 let payload = check_header(data, DISPATCH_SYNC_NACK)?;
669 let prefix_needed = 32 + 8 + 1 + 8 + 8 + 2;
670 if payload.len() < prefix_needed {
671 return Err(WireError::Truncated {
672 need: 3 + prefix_needed,
673 have: data.len(),
674 });
675 }
676 let mut cursor = payload;
677 let channel_id = get_channel_id(&mut cursor);
678 let since_seq = cursor.get_u64_le();
679 let code_byte = cursor.get_u8();
680 let error_code =
681 SyncNackError::from_wire(code_byte).ok_or(WireError::BadErrorCode(code_byte))?;
682 let leader_first_retained_seq = cursor.get_u64_le();
683 let request_id = cursor.get_u64_le();
684 let detail_len = cursor.get_u16_le() as usize;
685 if cursor.remaining() < detail_len {
686 // Report total bytes needed correctly: consumed-so-far +
687 // still-needed. The previous formula double-counted the
688 // consumed prefix.
689 let consumed = data.len() - cursor.remaining();
690 return Err(WireError::Truncated {
691 need: consumed + detail_len,
692 have: data.len(),
693 });
694 }
695 let detail_bytes = &cursor[..detail_len];
696 let detail = std::str::from_utf8(detail_bytes)
697 .map_err(|_| WireError::InvalidUtf8)?
698 .to_string();
699 Ok(Self {
700 channel_id,
701 since_seq,
702 error_code,
703 leader_first_retained_seq,
704 request_id,
705 detail,
706 })
707 }
708}
709
710#[cfg(test)]
711mod tests {
712 use super::*;
713
714 fn sample_channel_id() -> ChannelId {
715 ChannelId::from_str_internal("net/redex/example")
716 }
717
718 // ----------------------------------------------------------------
719 // ChannelId
720 // ----------------------------------------------------------------
721
722 #[test]
723 fn channel_id_is_deterministic() {
724 let a = ChannelId::from_str_internal("payments/settlements");
725 let b = ChannelId::from_str_internal("payments/settlements");
726 assert_eq!(a, b);
727 }
728
729 #[test]
730 fn channel_id_is_unique_per_name() {
731 let a = ChannelId::from_str_internal("payments/settlements");
732 let b = ChannelId::from_str_internal("payments/refunds");
733 assert_ne!(a, b);
734 }
735
736 // ----------------------------------------------------------------
737 // SyncRequest round-trip
738 // ----------------------------------------------------------------
739
740 #[test]
741 fn sync_request_round_trip() {
742 let original = SyncRequest {
743 channel_id: sample_channel_id(),
744 since_seq: 0xDEAD_BEEF_CAFE_BABE,
745 chunk_max: 1_048_576,
746 request_id: 0xAA55_AA55_AA55_AA55,
747 class: super::super::bandwidth::BandwidthClass::Background,
748 };
749 let bytes = original.to_bytes();
750 // v0.3 Phase D2: new senders emit the v2 56-byte frame
751 // (trailing class byte). Legacy 55-byte frames decode
752 // cleanly too — covered by
753 // `sync_request_decodes_legacy_55_byte_frame_as_foreground`
754 // below.
755 assert_eq!(bytes.len(), SYNC_REQUEST_SIZE_V2_CLASS);
756 let decoded = SyncRequest::from_bytes(&bytes).expect("decode");
757 assert_eq!(decoded, original);
758 }
759
760 /// v0.3 Phase D2 backward-compat: a legacy 55-byte
761 /// SyncRequest (no trailing class byte) decodes cleanly with
762 /// `class = BandwidthClass::Foreground` defaulted in.
763 #[test]
764 fn sync_request_decodes_legacy_55_byte_frame_as_foreground() {
765 let original = SyncRequest {
766 channel_id: sample_channel_id(),
767 since_seq: 7,
768 chunk_max: 1024,
769 request_id: 0xBEEF,
770 class: super::super::bandwidth::BandwidthClass::Realtime,
771 };
772 // Encode v2 then truncate the trailing class byte to
773 // simulate a legacy 55-byte frame.
774 let mut bytes = original.to_bytes();
775 bytes.pop(); // drop the class byte
776 assert_eq!(bytes.len(), SYNC_REQUEST_SIZE);
777 let decoded = SyncRequest::from_bytes(&bytes).expect("legacy decode");
778 // Everything else round-trips; class defaults to Foreground.
779 assert_eq!(decoded.channel_id, original.channel_id);
780 assert_eq!(decoded.since_seq, original.since_seq);
781 assert_eq!(decoded.chunk_max, original.chunk_max);
782 assert_eq!(decoded.request_id, original.request_id);
783 assert_eq!(
784 decoded.class,
785 super::super::bandwidth::BandwidthClass::Foreground
786 );
787 }
788
789 #[test]
790 fn sync_request_byte_layout_pinned() {
791 // Pin the byte layout exactly. Drift here is a wire-protocol
792 // break — every fielded peer would fail to decode.
793 let req = SyncRequest {
794 channel_id: ChannelId::from_bytes([0xAB; 32]),
795 since_seq: 0x0102_0304_0506_0708,
796 chunk_max: 0x1122_3344,
797 request_id: 0,
798 class: Default::default(),
799 };
800 let bytes = req.to_bytes();
801 // Subprotocol header is u16 LE = 0x0E00 → bytes [0x00, 0x0E];
802 // followed by dispatch_code 0x20.
803 assert_eq!(&bytes[..3], &[0x00, 0x0E, 0x20]);
804 assert_eq!(&bytes[3..35], &[0xAB; 32]);
805 assert_eq!(
806 &bytes[35..43],
807 &[0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01]
808 );
809 assert_eq!(&bytes[43..47], &[0x44, 0x33, 0x22, 0x11]);
810 }
811
812 #[test]
813 fn sync_request_rejects_wrong_dispatch() {
814 let mut bytes = SyncRequest {
815 channel_id: sample_channel_id(),
816 since_seq: 0,
817 chunk_max: 1,
818 request_id: 0,
819 class: Default::default(),
820 }
821 .to_bytes();
822 bytes[2] = DISPATCH_SYNC_RESPONSE; // wrong code
823 let err = SyncRequest::from_bytes(&bytes).expect_err("must reject");
824 assert!(matches!(err, WireError::DispatchMismatch { .. }));
825 }
826
827 #[test]
828 fn sync_request_rejects_wrong_subprotocol() {
829 let mut bytes = SyncRequest {
830 channel_id: sample_channel_id(),
831 since_seq: 0,
832 chunk_max: 1,
833 request_id: 0,
834 class: Default::default(),
835 }
836 .to_bytes();
837 bytes[0] = 0x00;
838 bytes[1] = 0x05; // SUBPROTOCOL_MIGRATION
839 let err = SyncRequest::from_bytes(&bytes).expect_err("must reject");
840 assert!(matches!(
841 err,
842 WireError::SubprotocolMismatch { got: 0x0500 }
843 ));
844 }
845
846 #[test]
847 fn sync_request_rejects_truncation() {
848 let bytes = SyncRequest {
849 channel_id: sample_channel_id(),
850 since_seq: 0,
851 chunk_max: 1,
852 request_id: 0,
853 class: Default::default(),
854 }
855 .to_bytes();
856 // v0.3 Phase D2 backward-compat: cuts at or above the
857 // legacy SYNC_REQUEST_SIZE (55 bytes) are valid frames
858 // — the v0.3 56-byte size includes a trailing class
859 // byte that legacy peers don't emit, so the decoder
860 // accepts both lengths. Only cuts below the legacy
861 // size are true truncations.
862 for cut in 0..SYNC_REQUEST_SIZE {
863 let err = SyncRequest::from_bytes(&bytes[..cut]).expect_err("must reject");
864 assert!(matches!(err, WireError::Truncated { .. }));
865 }
866 // Cut at SYNC_REQUEST_SIZE (legacy form) decodes cleanly.
867 let _legacy =
868 SyncRequest::from_bytes(&bytes[..SYNC_REQUEST_SIZE]).expect("legacy length decodes");
869 }
870
871 // ----------------------------------------------------------------
872 // SyncResponse round-trip
873 // ----------------------------------------------------------------
874
875 #[test]
876 fn sync_response_round_trip_empty_chunk() {
877 let original = SyncResponse {
878 channel_id: sample_channel_id(),
879 first_seq: 42,
880 leader_first_retained_seq: 42,
881 events: vec![],
882 request_id: 7,
883 };
884 let bytes = original.to_bytes();
885 let decoded = SyncResponse::from_bytes(&bytes).expect("decode");
886 assert_eq!(decoded, original);
887 }
888
889 #[test]
890 fn sync_response_round_trip_with_events() {
891 let original = SyncResponse {
892 channel_id: sample_channel_id(),
893 first_seq: 100,
894 leader_first_retained_seq: 50,
895 events: vec![
896 SyncEvent {
897 event_seq: 100,
898 payload: b"hello".to_vec(),
899 },
900 SyncEvent {
901 event_seq: 101,
902 payload: b"world".to_vec(),
903 },
904 SyncEvent {
905 event_seq: 102,
906 payload: vec![], // empty payload — explicitly representable
907 },
908 ],
909 request_id: 123,
910 };
911 let bytes = original.to_bytes();
912 let decoded = SyncResponse::from_bytes(&bytes).expect("decode");
913 assert_eq!(decoded, original);
914 }
915
916 /// R-5 codec pin: the new `leader_first_retained_seq` field
917 /// sits at offset 3 + 32 + 8 = 43 (after subprotocol header,
918 /// channel id, first_seq) and is u64 LE.
919 #[test]
920 fn sync_response_leader_first_retained_seq_byte_offset() {
921 let original = SyncResponse {
922 channel_id: sample_channel_id(),
923 first_seq: 0x0102_0304_0506_0708,
924 leader_first_retained_seq: 0x1112_1314_1516_1718,
925 events: vec![],
926 request_id: 0,
927 };
928 let bytes = original.to_bytes();
929 // Header (3) + channel_id (32) = 35; first_seq (8) at 35..43;
930 // leader_first_retained_seq (8) at 43..51.
931 assert_eq!(
932 &bytes[43..51],
933 &0x1112_1314_1516_1718_u64.to_le_bytes(),
934 "leader_first_retained_seq must be at offset 43..51 in LE form"
935 );
936 }
937
938 #[test]
939 fn sync_response_rejects_truncated_event_record() {
940 // Build a valid bytes buffer, then truncate inside the
941 // last event's payload to make sure the decoder doesn't
942 // panic on a malformed `payload_len`.
943 let bytes = SyncResponse {
944 channel_id: sample_channel_id(),
945 first_seq: 1,
946 leader_first_retained_seq: 0,
947 events: vec![SyncEvent {
948 event_seq: 1,
949 payload: b"truncated".to_vec(),
950 }],
951 request_id: 0,
952 }
953 .to_bytes();
954 // Cut off the last 3 bytes of the payload.
955 let err = SyncResponse::from_bytes(&bytes[..bytes.len() - 3]).expect_err("must reject");
956 assert!(matches!(err, WireError::Truncated { .. }));
957 }
958
959 // ----------------------------------------------------------------
960 // SyncHeartbeat round-trip
961 // ----------------------------------------------------------------
962
963 #[test]
964 fn sync_heartbeat_round_trip_each_role() {
965 for role in [
966 ReplicaRole::Leader,
967 ReplicaRole::Replica,
968 ReplicaRole::Candidate,
969 ReplicaRole::Idle,
970 ] {
971 let original = SyncHeartbeat {
972 channel_id: sample_channel_id(),
973 tail_seq: 0xCAFE,
974 role,
975 wall_clock_ms: 1_700_000_000_000,
976 };
977 let bytes = original.to_bytes();
978 assert_eq!(bytes.len(), SYNC_HEARTBEAT_SIZE);
979 let decoded = SyncHeartbeat::from_bytes(&bytes).expect("decode");
980 assert_eq!(decoded, original);
981 }
982 }
983
984 #[test]
985 fn sync_heartbeat_rejects_unknown_role() {
986 let mut bytes = SyncHeartbeat {
987 channel_id: sample_channel_id(),
988 tail_seq: 0,
989 role: ReplicaRole::Leader,
990 wall_clock_ms: 0,
991 }
992 .to_bytes();
993 // role byte is at offset 3 + 32 + 8 = 43
994 bytes[43] = 99;
995 let err = SyncHeartbeat::from_bytes(&bytes).expect_err("must reject");
996 assert!(matches!(err, WireError::BadRole(99)));
997 }
998
999 // ----------------------------------------------------------------
1000 // SyncNack round-trip
1001 // ----------------------------------------------------------------
1002
1003 #[test]
1004 fn sync_nack_round_trip_each_error() {
1005 for error_code in [
1006 SyncNackError::NotLeader,
1007 SyncNackError::BadRange,
1008 SyncNackError::Backpressure,
1009 SyncNackError::ChannelClosed,
1010 ] {
1011 let original = SyncNack {
1012 channel_id: sample_channel_id(),
1013 since_seq: 12345,
1014 error_code,
1015 leader_first_retained_seq: 9999,
1016 detail: format!("test detail for {:?}", error_code),
1017 request_id: 0,
1018 };
1019 let bytes = original.to_bytes();
1020 let decoded = SyncNack::from_bytes(&bytes).expect("decode");
1021 assert_eq!(decoded, original);
1022 }
1023 }
1024
1025 #[test]
1026 fn sync_nack_empty_detail_round_trips() {
1027 let original = SyncNack {
1028 channel_id: sample_channel_id(),
1029 since_seq: 0,
1030 error_code: SyncNackError::NotLeader,
1031 leader_first_retained_seq: 0,
1032 detail: String::new(),
1033 request_id: 0,
1034 };
1035 let bytes = original.to_bytes();
1036 let decoded = SyncNack::from_bytes(&bytes).expect("decode");
1037 assert_eq!(decoded, original);
1038 }
1039
1040 #[test]
1041 fn sync_nack_truncates_oversized_detail() {
1042 // detail longer than u16::MAX gets silently truncated rather
1043 // than failing the encode — the structured error code is the
1044 // load-bearing part; detail is operator-facing logging.
1045 let huge = "x".repeat(SYNC_NACK_DETAIL_MAX + 1000);
1046 let original = SyncNack {
1047 channel_id: sample_channel_id(),
1048 since_seq: 0,
1049 error_code: SyncNackError::Backpressure,
1050 leader_first_retained_seq: 0,
1051 detail: huge.clone(),
1052 request_id: 0,
1053 };
1054 let bytes = original.to_bytes();
1055 let decoded = SyncNack::from_bytes(&bytes).expect("decode");
1056 assert_eq!(decoded.detail.len(), SYNC_NACK_DETAIL_MAX);
1057 assert!(huge.starts_with(&decoded.detail));
1058 }
1059
1060 #[test]
1061 fn sync_nack_rejects_unknown_error_code() {
1062 let mut bytes = SyncNack {
1063 channel_id: sample_channel_id(),
1064 since_seq: 0,
1065 error_code: SyncNackError::NotLeader,
1066 leader_first_retained_seq: 0,
1067 detail: String::new(),
1068 request_id: 0,
1069 }
1070 .to_bytes();
1071 // error_code byte is at offset 3 + 32 + 8 = 43
1072 bytes[43] = 0;
1073 let err = SyncNack::from_bytes(&bytes).expect_err("must reject");
1074 assert!(matches!(err, WireError::BadErrorCode(0)));
1075 }
1076
1077 #[test]
1078 fn sync_nack_truncated_payload_reports_correct_need() {
1079 // Build a valid encoding then chop bytes off the tail. The
1080 // reported `need` must equal the full encoded length — i.e.
1081 // consumed-so-far + still-needed — not a double-counted value
1082 // that overstates the requirement.
1083 let original = SyncNack {
1084 channel_id: sample_channel_id(),
1085 since_seq: 0x4242_4242_4242_4242,
1086 error_code: SyncNackError::BadRange,
1087 leader_first_retained_seq: 100,
1088 detail: "hello".to_string(),
1089 request_id: 0,
1090 };
1091 let full = original.to_bytes();
1092 // Lop one byte off so the detail body is short. We expect
1093 // `need == full.len()` and `have == truncated.len()`.
1094 let truncated = &full[..full.len() - 1];
1095 match SyncNack::from_bytes(truncated).expect_err("must error") {
1096 WireError::Truncated { need, have } => {
1097 assert_eq!(need, full.len());
1098 assert_eq!(have, truncated.len());
1099 }
1100 other => panic!("unexpected error: {other:?}"),
1101 }
1102 }
1103
1104 #[test]
1105 fn sync_nack_truncates_oversized_detail_at_char_boundary() {
1106 // Build a detail string whose byte cut at SYNC_NACK_DETAIL_MAX
1107 // would split a multi-byte codepoint. The encoder must back
1108 // off to a valid UTF-8 boundary so the decoder accepts the
1109 // frame.
1110 // "é" is 2 bytes (0xC3 0xA9). Pad with ASCII so the offset
1111 // exactly straddles the cut.
1112 let pad_len = SYNC_NACK_DETAIL_MAX - 1;
1113 let mut detail = "a".repeat(pad_len);
1114 detail.push('é'); // 2 bytes; the second byte falls past the cap
1115 debug_assert!(detail.len() > SYNC_NACK_DETAIL_MAX);
1116 let original = SyncNack {
1117 channel_id: sample_channel_id(),
1118 since_seq: 0,
1119 error_code: SyncNackError::Backpressure,
1120 leader_first_retained_seq: 0,
1121 detail,
1122 request_id: 0,
1123 };
1124 let bytes = original.to_bytes();
1125 let decoded = SyncNack::from_bytes(&bytes).expect("decode after truncate");
1126 assert_eq!(decoded.detail.len(), pad_len);
1127 }
1128
1129 #[test]
1130 fn sync_nack_rejects_invalid_utf8() {
1131 let mut bytes = SyncNack {
1132 channel_id: sample_channel_id(),
1133 since_seq: 0,
1134 error_code: SyncNackError::BadRange,
1135 leader_first_retained_seq: 0,
1136 detail: "ascii".to_string(),
1137 request_id: 0,
1138 }
1139 .to_bytes();
1140 // detail starts at offset 3 + 32 + 8 + 1 + 8 + 8 + 2 = 62
1141 // (the extra 8 is the request_id echo);
1142 // replace with an invalid UTF-8 byte sequence of the same
1143 // length.
1144 let detail_start = 62;
1145 let detail_len = bytes.len() - detail_start;
1146 for i in 0..detail_len {
1147 bytes[detail_start + i] = 0xC0; // invalid lead byte
1148 }
1149 let err = SyncNack::from_bytes(&bytes).expect_err("must reject");
1150 assert!(matches!(err, WireError::InvalidUtf8));
1151 }
1152
1153 // ----------------------------------------------------------------
1154 // Dispatch-code reservations — pin the constants so a renumbering
1155 // surface change in a future slice is loud.
1156 // ----------------------------------------------------------------
1157
1158 #[test]
1159 fn dispatch_codes_pinned() {
1160 assert_eq!(DISPATCH_SYNC_REQUEST, 0x20);
1161 assert_eq!(DISPATCH_SYNC_RESPONSE, 0x21);
1162 assert_eq!(DISPATCH_SYNC_HEARTBEAT, 0x22);
1163 assert_eq!(DISPATCH_SYNC_NACK, 0x23);
1164 assert_eq!(DISPATCH_REPLICA_SYNC_RESERVED_END, 0x30);
1165 }
1166
1167 #[test]
1168 fn subprotocol_id_pinned() {
1169 assert_eq!(SUBPROTOCOL_REDEX, 0x0E00);
1170 }
1171}