kevy_replicate/wire_snapshot.rs
1//! Snapshot ship wire format (T1.22) — split out of [`crate::wire`]
2//! to keep that file under the 500-LOC project ceiling.
3//!
4//! See `docs/snapshot.md` for the full spec. The primary sends:
5//!
6//! `+SNAPSHOT\r\n`
7//! `$L1\r\n<L1 bytes>\r\n` (chunk 1)
8//! `$L2\r\n<L2 bytes>\r\n` (chunk 2)
9//! ...
10//! `+SNAPSHOT_END <ack_offset>\r\n`
11//!
12//! Markers are RESP simple strings, chunks are RESP bulk strings —
13//! any RESP-aware tool can peek a captured stream.
14
15use crate::wire::{WireError, find_crlf, parse_decimal, push_u64};
16
17/// Per-chunk cap: a chunk's `$L\r\n` length must not exceed this.
18/// Replica drops the link if a chunk header reports a larger size.
19/// 64 KiB matches a typical TCP segment + keeps the per-chunk
20/// allocation modest. The primary may pick any chunk size from
21/// `1` up to this.
22pub const SNAPSHOT_CHUNK_MAX: usize = 64 * 1024;
23
24/// Maximum length of a snapshot control line (`+SNAPSHOT_END N\r\n`).
25/// 256 B is generous — the longest legal line is `+SNAPSHOT_END ` +
26/// 20 digits + `\r\n` = 38 B.
27pub const SNAPSHOT_LINE_MAX: usize = 256;
28
29/// Decoded snapshot marker, returned by [`decode_snapshot_marker`].
30#[derive(Debug, PartialEq, Eq)]
31pub enum SnapshotMarker {
32 /// `+SNAPSHOT\r\n` — primary is about to stream snapshot chunks.
33 Begin,
34 /// `+SNAPSHOT_END <ack_offset>\r\n` — end of snapshot; the next
35 /// live frame's offset will equal `ack_offset`.
36 End(u64),
37}
38
39/// Encode the snapshot-begin marker. Allocates the exact 11 bytes.
40pub fn encode_snapshot_begin() -> Vec<u8> {
41 b"+SNAPSHOT\r\n".to_vec()
42}
43
44/// Encode one snapshot chunk as a RESP bulk string. Caller is
45/// responsible for chunking — typical strategy is fixed
46/// [`SNAPSHOT_CHUNK_MAX`]-sized reads from a snapshot file or
47/// in-memory serializer.
48///
49/// **Debug-asserts** `bytes.len() <= SNAPSHOT_CHUNK_MAX` so an
50/// accidental oversize chunk trips during development; release
51/// builds emit a frame the peer will reject with [`WireError::BadEnvelope`]
52/// (replica's decoder caps incoming chunk lengths).
53pub fn encode_snapshot_chunk(bytes: &[u8]) -> Vec<u8> {
54 debug_assert!(
55 bytes.len() <= SNAPSHOT_CHUNK_MAX,
56 "snapshot chunk {} > cap {}",
57 bytes.len(),
58 SNAPSHOT_CHUNK_MAX,
59 );
60 let mut out = Vec::with_capacity(16 + bytes.len());
61 out.push(b'$');
62 push_u64(&mut out, bytes.len() as u64);
63 out.extend_from_slice(b"\r\n");
64 out.extend_from_slice(bytes);
65 out.extend_from_slice(b"\r\n");
66 out
67}
68
69/// Encode the snapshot-end marker carrying the ack offset (the
70/// next live frame's offset will equal this value).
71pub fn encode_snapshot_end(ack_offset: u64) -> Vec<u8> {
72 let mut out = Vec::with_capacity(32);
73 out.extend_from_slice(b"+SNAPSHOT_END ");
74 push_u64(&mut out, ack_offset);
75 out.extend_from_slice(b"\r\n");
76 out
77}
78
79/// Peek the next line at the front of `buf` to detect a snapshot
80/// marker. Returns:
81/// - `Ok(Some((marker, used)))` — a full marker line was found;
82/// `used` bytes may be dropped.
83/// - `Ok(None)` — the buffer doesn't start with a `+` byte; caller
84/// should treat the bytes as a regular `*2\r\n` frame and feed
85/// the frame decoder instead.
86/// - `Err(WireError::Truncated)` — buffer starts with `+` but the
87/// `\r\n` terminator is not yet in the buffer.
88/// - `Err(WireError::BadEnvelope)` — buffer starts with `+` but the
89/// line is neither `+SNAPSHOT` nor `+SNAPSHOT_END <N>`, or the
90/// line exceeds [`SNAPSHOT_LINE_MAX`].
91pub fn decode_snapshot_marker(buf: &[u8]) -> Result<Option<(SnapshotMarker, usize)>, WireError> {
92 if buf.is_empty() {
93 return Err(WireError::Truncated);
94 }
95 if buf[0] != b'+' {
96 return Ok(None);
97 }
98 let Some(eol) = find_crlf(buf, 1) else {
99 return if buf.len() > SNAPSHOT_LINE_MAX {
100 Err(WireError::BadEnvelope)
101 } else {
102 Err(WireError::Truncated)
103 };
104 };
105 if eol > SNAPSHOT_LINE_MAX {
106 return Err(WireError::BadEnvelope);
107 }
108 let line = &buf[1..eol];
109 if line == b"SNAPSHOT" {
110 return Ok(Some((SnapshotMarker::Begin, eol + 2)));
111 }
112 if let Some(rest) = line.strip_prefix(b"SNAPSHOT_END ") {
113 let offset = parse_decimal(rest).ok_or(WireError::BadEnvelope)?;
114 return Ok(Some((SnapshotMarker::End(offset), eol + 2)));
115 }
116 Err(WireError::BadEnvelope)
117}
118
119/// Decode the next snapshot chunk (`$L\r\n<L bytes>\r\n`) at the
120/// front of `buf`. Returns:
121/// - `Ok((chunk_bytes, used))` — `chunk_bytes` borrows from `buf`;
122/// `used` bytes were consumed.
123/// - `Err(WireError::Truncated)` — not enough bytes for a complete
124/// chunk yet.
125/// - `Err(WireError::BadEnvelope)` — header wasn't `$L\r\n`, `L`
126/// exceeded [`SNAPSHOT_CHUNK_MAX`], `L` parsed as non-numeric, or
127/// the trailing CRLF was missing.
128pub fn decode_snapshot_chunk(buf: &[u8]) -> Result<(&[u8], usize), WireError> {
129 if buf.is_empty() {
130 return Err(WireError::Truncated);
131 }
132 if buf[0] != b'$' {
133 return Err(WireError::BadEnvelope);
134 }
135 let len_eol = find_crlf(buf, 1).ok_or(WireError::Truncated)?;
136 let len = parse_decimal(&buf[1..len_eol]).ok_or(WireError::BadEnvelope)?;
137 let len = usize::try_from(len).map_err(|_| WireError::BadEnvelope)?;
138 if len > SNAPSHOT_CHUNK_MAX {
139 return Err(WireError::BadEnvelope);
140 }
141 let data_start = len_eol + 2;
142 let data_end = data_start + len;
143 if buf.len() < data_end + 2 {
144 return Err(WireError::Truncated);
145 }
146 if &buf[data_end..data_end + 2] != b"\r\n" {
147 return Err(WireError::BadEnvelope);
148 }
149 Ok((&buf[data_start..data_end], data_end + 2))
150}