Skip to main content

nodedb_array/sync/
snapshot.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Snapshot wire types and chunk codec for array CRDT sync.
4//!
5//! A [`TileSnapshot`] captures the full state of a tile coord-range at a
6//! specific HLC. Snapshots are used for catch-up replication (first connect,
7//! long disconnect, after log GC) so receivers can resume the op-stream from
8//! [`TileSnapshot::snapshot_hlc`] without replaying the full op history.
9//!
10//! The actual snapshot *builder* (which walks engine state) lands in Phase F
11//! (Origin). This file ships only the wire shape and the streaming chunk codec.
12
13use serde::{Deserialize, Serialize};
14
15use crate::error::{ArrayError, ArrayResult};
16use crate::sync::hlc::Hlc;
17use crate::types::coord::value::CoordValue;
18
19// ─── Coordinate range ────────────────────────────────────────────────────────
20
21/// A half-open coordinate range `[lo, hi)`.
22///
23/// Convention (matches the engine): `lo` is inclusive, `hi` is exclusive.
24/// Both bounds have one element per array dimension, in the same order as
25/// [`crate::schema::ArraySchema::dims`].
26#[derive(
27    Clone,
28    Debug,
29    PartialEq,
30    Serialize,
31    Deserialize,
32    zerompk::ToMessagePack,
33    zerompk::FromMessagePack,
34)]
35pub struct CoordRange {
36    /// Inclusive lower bound (one value per dimension).
37    pub lo: Vec<CoordValue>,
38    /// Exclusive upper bound (one value per dimension).
39    pub hi: Vec<CoordValue>,
40}
41
42// ─── Snapshot types ───────────────────────────────────────────────────────────
43
44/// Full state snapshot for a tile coord-range at a specific HLC.
45///
46/// `tile_blob` is an opaque compressed tile payload produced by the origin
47/// engine's structural codec. Receivers write it directly into their tile
48/// store without re-encoding.
49#[derive(
50    Clone,
51    Debug,
52    PartialEq,
53    Serialize,
54    Deserialize,
55    zerompk::ToMessagePack,
56    zerompk::FromMessagePack,
57)]
58pub struct TileSnapshot {
59    /// Name of the target array collection.
60    pub array: String,
61    /// Coord range covered by this snapshot (half-open: lo inclusive, hi exclusive).
62    pub coord_range: CoordRange,
63    /// Compressed tile payload produced by the structural codec.
64    pub tile_blob: Vec<u8>,
65    /// HLC at which this snapshot was taken; ops with `hlc >= snapshot_hlc`
66    /// should be applied on top after the snapshot is loaded.
67    pub snapshot_hlc: Hlc,
68    /// HLC of the array schema that was in effect when this snapshot was taken.
69    pub schema_hlc: Hlc,
70}
71
72/// One chunk of a streaming [`TileSnapshot`].
73///
74/// Large tile blobs are split into fixed-size chunks for transmission.
75/// Reassemble with [`assemble_chunks`].
76#[derive(
77    Clone,
78    Debug,
79    PartialEq,
80    Serialize,
81    Deserialize,
82    zerompk::ToMessagePack,
83    zerompk::FromMessagePack,
84)]
85pub struct SnapshotChunk {
86    /// Name of the target array collection.
87    pub array: String,
88    /// Zero-based index of this chunk within the stream.
89    pub chunk_index: u32,
90    /// Total number of chunks in the stream.
91    pub total_chunks: u32,
92    /// Raw bytes of this chunk (a slice of the encoded `tile_blob`).
93    pub payload: Vec<u8>,
94    /// HLC at which the snapshot was taken; echoed on every chunk for routing.
95    pub snapshot_hlc: Hlc,
96}
97
98/// Metadata header sent before the chunk stream begins.
99///
100/// Receivers use this to pre-allocate storage and validate completeness.
101#[derive(
102    Clone,
103    Debug,
104    PartialEq,
105    Serialize,
106    Deserialize,
107    zerompk::ToMessagePack,
108    zerompk::FromMessagePack,
109)]
110pub struct SnapshotHeader {
111    /// Name of the target array collection.
112    pub array: String,
113    /// Coord range covered by this snapshot.
114    pub coord_range: CoordRange,
115    /// Number of [`SnapshotChunk`]s that will follow.
116    pub total_chunks: u32,
117    /// HLC at which the snapshot was taken.
118    pub snapshot_hlc: Hlc,
119    /// HLC of the schema in effect when this snapshot was taken.
120    pub schema_hlc: Hlc,
121}
122
123// ─── Codec ───────────────────────────────────────────────────────────────────
124
125/// Encode a [`TileSnapshot`] to MessagePack bytes.
126///
127/// Use [`decode_snapshot`] to reverse. Errors map to
128/// [`ArrayError::SegmentCorruption`].
129pub fn encode_snapshot(s: &TileSnapshot) -> ArrayResult<Vec<u8>> {
130    zerompk::to_msgpack_vec(s).map_err(|e| ArrayError::SegmentCorruption {
131        detail: format!("snapshot encode failed: {e}"),
132    })
133}
134
135/// Decode a [`TileSnapshot`] from MessagePack bytes produced by [`encode_snapshot`].
136///
137/// Errors map to [`ArrayError::SegmentCorruption`].
138pub fn decode_snapshot(b: &[u8]) -> ArrayResult<TileSnapshot> {
139    zerompk::from_msgpack(b).map_err(|e| ArrayError::SegmentCorruption {
140        detail: format!("snapshot decode failed: {e}"),
141    })
142}
143
144// ─── Chunking ────────────────────────────────────────────────────────────────
145
146/// Split a [`TileSnapshot`] into a header plus a sequence of fixed-size chunks.
147///
148/// `max_chunk_bytes` must be at least 64. Returns
149/// [`ArrayError::InvalidOp`] if it is smaller.
150///
151/// `total_chunks = ceil(blob.len() / max_chunk_bytes).max(1)` — there is
152/// always at least one chunk, even for an empty blob.
153pub fn split_into_chunks(
154    s: &TileSnapshot,
155    max_chunk_bytes: usize,
156) -> ArrayResult<(SnapshotHeader, Vec<SnapshotChunk>)> {
157    if max_chunk_bytes < 64 {
158        return Err(ArrayError::InvalidOp {
159            detail: "max_chunk_bytes too small".into(),
160        });
161    }
162
163    let blob = &s.tile_blob;
164    let total_chunks = if blob.is_empty() {
165        1
166    } else {
167        blob.len().div_ceil(max_chunk_bytes).max(1) as u32
168    };
169
170    let header = SnapshotHeader {
171        array: s.array.clone(),
172        coord_range: s.coord_range.clone(),
173        total_chunks,
174        snapshot_hlc: s.snapshot_hlc,
175        schema_hlc: s.schema_hlc,
176    };
177
178    let chunks: Vec<SnapshotChunk> = if blob.is_empty() {
179        vec![SnapshotChunk {
180            array: s.array.clone(),
181            chunk_index: 0,
182            total_chunks: 1,
183            payload: Vec::new(),
184            snapshot_hlc: s.snapshot_hlc,
185        }]
186    } else {
187        blob.chunks(max_chunk_bytes)
188            .enumerate()
189            .map(|(i, slice)| SnapshotChunk {
190                array: s.array.clone(),
191                chunk_index: i as u32,
192                total_chunks,
193                payload: slice.to_vec(),
194                snapshot_hlc: s.snapshot_hlc,
195            })
196            .collect()
197    };
198
199    Ok((header, chunks))
200}
201
202/// Reassemble a [`TileSnapshot`] from a header and a slice of chunks.
203///
204/// Sorts chunks by `chunk_index`, validates that exactly `0..total_chunks`
205/// are present (no gaps, no duplicates), concatenates their payloads, and
206/// reconstructs the snapshot.
207///
208/// Validation failures → [`ArrayError::SegmentCorruption`].
209pub fn assemble_chunks(
210    header: &SnapshotHeader,
211    chunks: &mut [SnapshotChunk],
212) -> ArrayResult<TileSnapshot> {
213    chunks.sort_by_key(|c| c.chunk_index);
214
215    let expected = header.total_chunks as usize;
216
217    if chunks.len() != expected {
218        return Err(ArrayError::SegmentCorruption {
219            detail: format!("expected {} chunks, got {}", expected, chunks.len()),
220        });
221    }
222
223    for (i, chunk) in chunks.iter().enumerate() {
224        if chunk.chunk_index as usize != i {
225            return Err(ArrayError::SegmentCorruption {
226                detail: format!("chunk index gap: expected {i}, got {}", chunk.chunk_index),
227            });
228        }
229    }
230
231    let tile_blob: Vec<u8> = chunks
232        .iter()
233        .flat_map(|c| c.payload.iter().copied())
234        .collect();
235
236    Ok(TileSnapshot {
237        array: header.array.clone(),
238        coord_range: header.coord_range.clone(),
239        tile_blob,
240        snapshot_hlc: header.snapshot_hlc,
241        schema_hlc: header.schema_hlc,
242    })
243}
244
245// ─── Sink trait ──────────────────────────────────────────────────────────────
246
247/// Receiver for completed snapshots during GC compaction.
248///
249/// Implementations live in `nodedb-lite` (redb-backed) and `nodedb`
250/// (WAL-backed) and are wired in later phases. This trait gives the pure
251/// GC logic in [`crate::sync::gc`] a place to write snapshots before
252/// dropping the underlying ops from the log.
253pub trait SnapshotSink: Send + Sync {
254    /// Persist a completed snapshot.
255    fn write_snapshot(&self, snapshot: &TileSnapshot) -> ArrayResult<()>;
256}
257
258// ─── Tests ───────────────────────────────────────────────────────────────────
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use crate::sync::hlc::Hlc;
264    use crate::sync::replica_id::ReplicaId;
265
266    fn hlc(ms: u64) -> Hlc {
267        Hlc::new(ms, 0, ReplicaId::new(1)).unwrap()
268    }
269
270    fn range() -> CoordRange {
271        CoordRange {
272            lo: vec![CoordValue::Int64(0)],
273            hi: vec![CoordValue::Int64(100)],
274        }
275    }
276
277    fn snapshot(blob_len: usize) -> TileSnapshot {
278        TileSnapshot {
279            array: "test".into(),
280            coord_range: range(),
281            tile_blob: vec![0xAB; blob_len],
282            snapshot_hlc: hlc(1000),
283            schema_hlc: hlc(500),
284        }
285    }
286
287    #[test]
288    fn coord_range_roundtrip() {
289        let r = range();
290        let bytes = zerompk::to_msgpack_vec(&r).unwrap();
291        let back: CoordRange = zerompk::from_msgpack(&bytes).unwrap();
292        assert_eq!(r, back);
293    }
294
295    #[test]
296    fn tile_snapshot_roundtrip() {
297        let s = snapshot(256);
298        let bytes = encode_snapshot(&s).unwrap();
299        let back = decode_snapshot(&bytes).unwrap();
300        assert_eq!(s, back);
301    }
302
303    #[test]
304    fn split_then_assemble_roundtrip() {
305        let s = snapshot(1000);
306        let (header, mut chunks) = split_into_chunks(&s, 300).unwrap();
307        // ceil(1000 / 300) = 4
308        assert_eq!(header.total_chunks, 4);
309        assert_eq!(chunks.len(), 4);
310        let reassembled = assemble_chunks(&header, &mut chunks).unwrap();
311        assert_eq!(reassembled.tile_blob, s.tile_blob);
312        assert_eq!(reassembled.array, s.array);
313        assert_eq!(reassembled.snapshot_hlc, s.snapshot_hlc);
314        assert_eq!(reassembled.schema_hlc, s.schema_hlc);
315    }
316
317    #[test]
318    fn assemble_rejects_missing_chunk() {
319        let s = snapshot(1000);
320        let (header, mut chunks) = split_into_chunks(&s, 300).unwrap();
321        // Remove chunk at index 2 to create a gap.
322        chunks.retain(|c| c.chunk_index != 2);
323        let result = assemble_chunks(&header, &mut chunks);
324        assert!(matches!(result, Err(ArrayError::SegmentCorruption { .. })));
325    }
326
327    #[test]
328    fn assemble_rejects_duplicate_chunk() {
329        let s = snapshot(600);
330        let (header, mut chunks) = split_into_chunks(&s, 300).unwrap();
331        // Duplicate chunk 0 to produce 3 chunks for a 2-chunk stream.
332        let dup = chunks[0].clone();
333        chunks.push(dup);
334        let result = assemble_chunks(&header, &mut chunks);
335        assert!(matches!(result, Err(ArrayError::SegmentCorruption { .. })));
336    }
337
338    #[test]
339    fn split_rejects_too_small_max() {
340        let s = snapshot(100);
341        let result = split_into_chunks(&s, 63);
342        assert!(matches!(result, Err(ArrayError::InvalidOp { .. })));
343    }
344
345    #[test]
346    fn split_empty_blob_produces_one_chunk() {
347        let s = snapshot(0);
348        let (header, chunks) = split_into_chunks(&s, 64).unwrap();
349        assert_eq!(header.total_chunks, 1);
350        assert_eq!(chunks.len(), 1);
351        assert!(chunks[0].payload.is_empty());
352    }
353}