Skip to main content

kevy_replicate/
wire_snapshot.rs

1//! Snapshot ship wire format (T1.22) — split out of [`crate::wire`]
2//! to keep that file under the 500-LOC project ceiling.
3//!
4//! See `docs/snapshot.md` for the full spec. The primary sends:
5//!
6//!   `+SNAPSHOT\r\n`
7//!   `$L1\r\n<L1 bytes>\r\n`  (chunk 1)
8//!   `$L2\r\n<L2 bytes>\r\n`  (chunk 2)
9//!   ...
10//!   `+SNAPSHOT_END <ack_offset>\r\n`
11//!
12//! Markers are RESP simple strings, chunks are RESP bulk strings —
13//! any RESP-aware tool can peek a captured stream.
14
15use crate::wire::{WireError, find_crlf, parse_decimal, push_u64};
16
17/// Per-chunk cap: a chunk's `$L\r\n` length must not exceed this.
18/// Replica drops the link if a chunk header reports a larger size.
19/// 64 KiB matches a typical TCP segment + keeps the per-chunk
20/// allocation modest. The primary may pick any chunk size from
21/// `1` up to this.
22pub const SNAPSHOT_CHUNK_MAX: usize = 64 * 1024;
23
24/// Maximum length of a snapshot control line (`+SNAPSHOT_END N\r\n`).
25/// 256 B is generous — the longest legal line is `+SNAPSHOT_END ` +
26/// 20 digits + `\r\n` = 38 B.
27pub const SNAPSHOT_LINE_MAX: usize = 256;
28
29/// Decoded snapshot marker, returned by [`decode_snapshot_marker`].
30#[derive(Debug, PartialEq, Eq)]
31pub enum SnapshotMarker {
32    /// `+SNAPSHOT\r\n` — primary is about to stream snapshot chunks.
33    Begin,
34    /// `+SNAPSHOT_END <ack_offset>\r\n` — end of snapshot; the next
35    /// live frame's offset will equal `ack_offset`.
36    End(u64),
37}
38
39/// Encode the snapshot-begin marker. Allocates the exact 11 bytes.
40pub fn encode_snapshot_begin() -> Vec<u8> {
41    b"+SNAPSHOT\r\n".to_vec()
42}
43
44/// Encode one snapshot chunk as a RESP bulk string. Caller is
45/// responsible for chunking — typical strategy is fixed
46/// [`SNAPSHOT_CHUNK_MAX`]-sized reads from a snapshot file or
47/// in-memory serializer.
48///
49/// **Debug-asserts** `bytes.len() <= SNAPSHOT_CHUNK_MAX` so an
50/// accidental oversize chunk trips during development; release
51/// builds emit a frame the peer will reject with [`WireError::BadEnvelope`]
52/// (replica's decoder caps incoming chunk lengths).
53pub fn encode_snapshot_chunk(bytes: &[u8]) -> Vec<u8> {
54    debug_assert!(
55        bytes.len() <= SNAPSHOT_CHUNK_MAX,
56        "snapshot chunk {} > cap {}",
57        bytes.len(),
58        SNAPSHOT_CHUNK_MAX,
59    );
60    let mut out = Vec::with_capacity(16 + bytes.len());
61    out.push(b'$');
62    push_u64(&mut out, bytes.len() as u64);
63    out.extend_from_slice(b"\r\n");
64    out.extend_from_slice(bytes);
65    out.extend_from_slice(b"\r\n");
66    out
67}
68
69/// Encode the snapshot-end marker carrying the ack offset (the
70/// next live frame's offset will equal this value).
71pub fn encode_snapshot_end(ack_offset: u64) -> Vec<u8> {
72    let mut out = Vec::with_capacity(32);
73    out.extend_from_slice(b"+SNAPSHOT_END ");
74    push_u64(&mut out, ack_offset);
75    out.extend_from_slice(b"\r\n");
76    out
77}
78
79/// Peek the next line at the front of `buf` to detect a snapshot
80/// marker. Returns:
81/// - `Ok(Some((marker, used)))` — a full marker line was found;
82///   `used` bytes may be dropped.
83/// - `Ok(None)` — the buffer doesn't start with a `+` byte; caller
84///   should treat the bytes as a regular `*2\r\n` frame and feed
85///   the frame decoder instead.
86/// - `Err(WireError::Truncated)` — buffer starts with `+` but the
87///   `\r\n` terminator is not yet in the buffer.
88/// - `Err(WireError::BadEnvelope)` — buffer starts with `+` but the
89///   line is neither `+SNAPSHOT` nor `+SNAPSHOT_END <N>`, or the
90///   line exceeds [`SNAPSHOT_LINE_MAX`].
91pub fn decode_snapshot_marker(buf: &[u8]) -> Result<Option<(SnapshotMarker, usize)>, WireError> {
92    if buf.is_empty() {
93        return Err(WireError::Truncated);
94    }
95    if buf[0] != b'+' {
96        return Ok(None);
97    }
98    let Some(eol) = find_crlf(buf, 1) else {
99        return if buf.len() > SNAPSHOT_LINE_MAX {
100            Err(WireError::BadEnvelope)
101        } else {
102            Err(WireError::Truncated)
103        };
104    };
105    if eol > SNAPSHOT_LINE_MAX {
106        return Err(WireError::BadEnvelope);
107    }
108    let line = &buf[1..eol];
109    if line == b"SNAPSHOT" {
110        return Ok(Some((SnapshotMarker::Begin, eol + 2)));
111    }
112    if let Some(rest) = line.strip_prefix(b"SNAPSHOT_END ") {
113        let offset = parse_decimal(rest).ok_or(WireError::BadEnvelope)?;
114        return Ok(Some((SnapshotMarker::End(offset), eol + 2)));
115    }
116    Err(WireError::BadEnvelope)
117}
118
119/// Decode the next snapshot chunk (`$L\r\n<L bytes>\r\n`) at the
120/// front of `buf`. Returns:
121/// - `Ok((chunk_bytes, used))` — `chunk_bytes` borrows from `buf`;
122///   `used` bytes were consumed.
123/// - `Err(WireError::Truncated)` — not enough bytes for a complete
124///   chunk yet.
125/// - `Err(WireError::BadEnvelope)` — header wasn't `$L\r\n`, `L`
126///   exceeded [`SNAPSHOT_CHUNK_MAX`], `L` parsed as non-numeric, or
127///   the trailing CRLF was missing.
128pub fn decode_snapshot_chunk(buf: &[u8]) -> Result<(&[u8], usize), WireError> {
129    if buf.is_empty() {
130        return Err(WireError::Truncated);
131    }
132    if buf[0] != b'$' {
133        return Err(WireError::BadEnvelope);
134    }
135    let len_eol = find_crlf(buf, 1).ok_or(WireError::Truncated)?;
136    let len = parse_decimal(&buf[1..len_eol]).ok_or(WireError::BadEnvelope)?;
137    let len = usize::try_from(len).map_err(|_| WireError::BadEnvelope)?;
138    if len > SNAPSHOT_CHUNK_MAX {
139        return Err(WireError::BadEnvelope);
140    }
141    let data_start = len_eol + 2;
142    let data_end = data_start + len;
143    if buf.len() < data_end + 2 {
144        return Err(WireError::Truncated);
145    }
146    if &buf[data_end..data_end + 2] != b"\r\n" {
147        return Err(WireError::BadEnvelope);
148    }
149    Ok((&buf[data_start..data_end], data_end + 2))
150}