1use serde::{Deserialize, Serialize};
14
15use crate::error::{ArrayError, ArrayResult};
16use crate::sync::hlc::Hlc;
17use crate::types::coord::value::CoordValue;
18
19#[derive(
27 Clone,
28 Debug,
29 PartialEq,
30 Serialize,
31 Deserialize,
32 zerompk::ToMessagePack,
33 zerompk::FromMessagePack,
34)]
35pub struct CoordRange {
36 pub lo: Vec<CoordValue>,
38 pub hi: Vec<CoordValue>,
40}
41
42#[derive(
50 Clone,
51 Debug,
52 PartialEq,
53 Serialize,
54 Deserialize,
55 zerompk::ToMessagePack,
56 zerompk::FromMessagePack,
57)]
58pub struct TileSnapshot {
59 pub array: String,
61 pub coord_range: CoordRange,
63 pub tile_blob: Vec<u8>,
65 pub snapshot_hlc: Hlc,
68 pub schema_hlc: Hlc,
70}
71
72#[derive(
77 Clone,
78 Debug,
79 PartialEq,
80 Serialize,
81 Deserialize,
82 zerompk::ToMessagePack,
83 zerompk::FromMessagePack,
84)]
85pub struct SnapshotChunk {
86 pub array: String,
88 pub chunk_index: u32,
90 pub total_chunks: u32,
92 pub payload: Vec<u8>,
94 pub snapshot_hlc: Hlc,
96}
97
98#[derive(
102 Clone,
103 Debug,
104 PartialEq,
105 Serialize,
106 Deserialize,
107 zerompk::ToMessagePack,
108 zerompk::FromMessagePack,
109)]
110pub struct SnapshotHeader {
111 pub array: String,
113 pub coord_range: CoordRange,
115 pub total_chunks: u32,
117 pub snapshot_hlc: Hlc,
119 pub schema_hlc: Hlc,
121}
122
123pub fn encode_snapshot(s: &TileSnapshot) -> ArrayResult<Vec<u8>> {
130 zerompk::to_msgpack_vec(s).map_err(|e| ArrayError::SegmentCorruption {
131 detail: format!("snapshot encode failed: {e}"),
132 })
133}
134
135pub fn decode_snapshot(b: &[u8]) -> ArrayResult<TileSnapshot> {
139 zerompk::from_msgpack(b).map_err(|e| ArrayError::SegmentCorruption {
140 detail: format!("snapshot decode failed: {e}"),
141 })
142}
143
144pub fn split_into_chunks(
154 s: &TileSnapshot,
155 max_chunk_bytes: usize,
156) -> ArrayResult<(SnapshotHeader, Vec<SnapshotChunk>)> {
157 if max_chunk_bytes < 64 {
158 return Err(ArrayError::InvalidOp {
159 detail: "max_chunk_bytes too small".into(),
160 });
161 }
162
163 let blob = &s.tile_blob;
164 let total_chunks = if blob.is_empty() {
165 1
166 } else {
167 blob.len().div_ceil(max_chunk_bytes).max(1) as u32
168 };
169
170 let header = SnapshotHeader {
171 array: s.array.clone(),
172 coord_range: s.coord_range.clone(),
173 total_chunks,
174 snapshot_hlc: s.snapshot_hlc,
175 schema_hlc: s.schema_hlc,
176 };
177
178 let chunks: Vec<SnapshotChunk> = if blob.is_empty() {
179 vec![SnapshotChunk {
180 array: s.array.clone(),
181 chunk_index: 0,
182 total_chunks: 1,
183 payload: Vec::new(),
184 snapshot_hlc: s.snapshot_hlc,
185 }]
186 } else {
187 blob.chunks(max_chunk_bytes)
188 .enumerate()
189 .map(|(i, slice)| SnapshotChunk {
190 array: s.array.clone(),
191 chunk_index: i as u32,
192 total_chunks,
193 payload: slice.to_vec(),
194 snapshot_hlc: s.snapshot_hlc,
195 })
196 .collect()
197 };
198
199 Ok((header, chunks))
200}
201
202pub fn assemble_chunks(
210 header: &SnapshotHeader,
211 chunks: &mut [SnapshotChunk],
212) -> ArrayResult<TileSnapshot> {
213 chunks.sort_by_key(|c| c.chunk_index);
214
215 let expected = header.total_chunks as usize;
216
217 if chunks.len() != expected {
218 return Err(ArrayError::SegmentCorruption {
219 detail: format!("expected {} chunks, got {}", expected, chunks.len()),
220 });
221 }
222
223 for (i, chunk) in chunks.iter().enumerate() {
224 if chunk.chunk_index as usize != i {
225 return Err(ArrayError::SegmentCorruption {
226 detail: format!("chunk index gap: expected {i}, got {}", chunk.chunk_index),
227 });
228 }
229 }
230
231 let tile_blob: Vec<u8> = chunks
232 .iter()
233 .flat_map(|c| c.payload.iter().copied())
234 .collect();
235
236 Ok(TileSnapshot {
237 array: header.array.clone(),
238 coord_range: header.coord_range.clone(),
239 tile_blob,
240 snapshot_hlc: header.snapshot_hlc,
241 schema_hlc: header.schema_hlc,
242 })
243}
244
245pub trait SnapshotSink: Send + Sync {
254 fn write_snapshot(&self, snapshot: &TileSnapshot) -> ArrayResult<()>;
256}
257
258#[cfg(test)]
261mod tests {
262 use super::*;
263 use crate::sync::hlc::Hlc;
264 use crate::sync::replica_id::ReplicaId;
265
266 fn hlc(ms: u64) -> Hlc {
267 Hlc::new(ms, 0, ReplicaId::new(1)).unwrap()
268 }
269
270 fn range() -> CoordRange {
271 CoordRange {
272 lo: vec![CoordValue::Int64(0)],
273 hi: vec![CoordValue::Int64(100)],
274 }
275 }
276
277 fn snapshot(blob_len: usize) -> TileSnapshot {
278 TileSnapshot {
279 array: "test".into(),
280 coord_range: range(),
281 tile_blob: vec![0xAB; blob_len],
282 snapshot_hlc: hlc(1000),
283 schema_hlc: hlc(500),
284 }
285 }
286
287 #[test]
288 fn coord_range_roundtrip() {
289 let r = range();
290 let bytes = zerompk::to_msgpack_vec(&r).unwrap();
291 let back: CoordRange = zerompk::from_msgpack(&bytes).unwrap();
292 assert_eq!(r, back);
293 }
294
295 #[test]
296 fn tile_snapshot_roundtrip() {
297 let s = snapshot(256);
298 let bytes = encode_snapshot(&s).unwrap();
299 let back = decode_snapshot(&bytes).unwrap();
300 assert_eq!(s, back);
301 }
302
303 #[test]
304 fn split_then_assemble_roundtrip() {
305 let s = snapshot(1000);
306 let (header, mut chunks) = split_into_chunks(&s, 300).unwrap();
307 assert_eq!(header.total_chunks, 4);
309 assert_eq!(chunks.len(), 4);
310 let reassembled = assemble_chunks(&header, &mut chunks).unwrap();
311 assert_eq!(reassembled.tile_blob, s.tile_blob);
312 assert_eq!(reassembled.array, s.array);
313 assert_eq!(reassembled.snapshot_hlc, s.snapshot_hlc);
314 assert_eq!(reassembled.schema_hlc, s.schema_hlc);
315 }
316
317 #[test]
318 fn assemble_rejects_missing_chunk() {
319 let s = snapshot(1000);
320 let (header, mut chunks) = split_into_chunks(&s, 300).unwrap();
321 chunks.retain(|c| c.chunk_index != 2);
323 let result = assemble_chunks(&header, &mut chunks);
324 assert!(matches!(result, Err(ArrayError::SegmentCorruption { .. })));
325 }
326
327 #[test]
328 fn assemble_rejects_duplicate_chunk() {
329 let s = snapshot(600);
330 let (header, mut chunks) = split_into_chunks(&s, 300).unwrap();
331 let dup = chunks[0].clone();
333 chunks.push(dup);
334 let result = assemble_chunks(&header, &mut chunks);
335 assert!(matches!(result, Err(ArrayError::SegmentCorruption { .. })));
336 }
337
338 #[test]
339 fn split_rejects_too_small_max() {
340 let s = snapshot(100);
341 let result = split_into_chunks(&s, 63);
342 assert!(matches!(result, Err(ArrayError::InvalidOp { .. })));
343 }
344
345 #[test]
346 fn split_empty_blob_produces_one_chunk() {
347 let s = snapshot(0);
348 let (header, chunks) = split_into_chunks(&s, 64).unwrap();
349 assert_eq!(header.total_chunks, 1);
350 assert_eq!(chunks.len(), 1);
351 assert!(chunks[0].payload.is_empty());
352 }
353}