use serde::{Deserialize, Serialize};
use crate::error::{ArrayError, ArrayResult};
use crate::sync::hlc::Hlc;
use crate::types::coord::value::CoordValue;
#[derive(
Clone,
Debug,
PartialEq,
Serialize,
Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub struct CoordRange {
pub lo: Vec<CoordValue>,
pub hi: Vec<CoordValue>,
}
#[derive(
Clone,
Debug,
PartialEq,
Serialize,
Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub struct TileSnapshot {
pub array: String,
pub coord_range: CoordRange,
pub tile_blob: Vec<u8>,
pub snapshot_hlc: Hlc,
pub schema_hlc: Hlc,
}
#[derive(
Clone,
Debug,
PartialEq,
Serialize,
Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub struct SnapshotChunk {
pub array: String,
pub chunk_index: u32,
pub total_chunks: u32,
pub payload: Vec<u8>,
pub snapshot_hlc: Hlc,
}
#[derive(
Clone,
Debug,
PartialEq,
Serialize,
Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub struct SnapshotHeader {
pub array: String,
pub coord_range: CoordRange,
pub total_chunks: u32,
pub snapshot_hlc: Hlc,
pub schema_hlc: Hlc,
}
pub fn encode_snapshot(s: &TileSnapshot) -> ArrayResult<Vec<u8>> {
zerompk::to_msgpack_vec(s).map_err(|e| ArrayError::SegmentCorruption {
detail: format!("snapshot encode failed: {e}"),
})
}
pub fn decode_snapshot(b: &[u8]) -> ArrayResult<TileSnapshot> {
zerompk::from_msgpack(b).map_err(|e| ArrayError::SegmentCorruption {
detail: format!("snapshot decode failed: {e}"),
})
}
pub fn split_into_chunks(
s: &TileSnapshot,
max_chunk_bytes: usize,
) -> ArrayResult<(SnapshotHeader, Vec<SnapshotChunk>)> {
if max_chunk_bytes < 64 {
return Err(ArrayError::InvalidOp {
detail: "max_chunk_bytes too small".into(),
});
}
let blob = &s.tile_blob;
let total_chunks = if blob.is_empty() {
1
} else {
blob.len().div_ceil(max_chunk_bytes).max(1) as u32
};
let header = SnapshotHeader {
array: s.array.clone(),
coord_range: s.coord_range.clone(),
total_chunks,
snapshot_hlc: s.snapshot_hlc,
schema_hlc: s.schema_hlc,
};
let chunks: Vec<SnapshotChunk> = if blob.is_empty() {
vec![SnapshotChunk {
array: s.array.clone(),
chunk_index: 0,
total_chunks: 1,
payload: Vec::new(),
snapshot_hlc: s.snapshot_hlc,
}]
} else {
blob.chunks(max_chunk_bytes)
.enumerate()
.map(|(i, slice)| SnapshotChunk {
array: s.array.clone(),
chunk_index: i as u32,
total_chunks,
payload: slice.to_vec(),
snapshot_hlc: s.snapshot_hlc,
})
.collect()
};
Ok((header, chunks))
}
pub fn assemble_chunks(
header: &SnapshotHeader,
chunks: &mut [SnapshotChunk],
) -> ArrayResult<TileSnapshot> {
chunks.sort_by_key(|c| c.chunk_index);
let expected = header.total_chunks as usize;
if chunks.len() != expected {
return Err(ArrayError::SegmentCorruption {
detail: format!("expected {} chunks, got {}", expected, chunks.len()),
});
}
for (i, chunk) in chunks.iter().enumerate() {
if chunk.chunk_index as usize != i {
return Err(ArrayError::SegmentCorruption {
detail: format!("chunk index gap: expected {i}, got {}", chunk.chunk_index),
});
}
}
let tile_blob: Vec<u8> = chunks
.iter()
.flat_map(|c| c.payload.iter().copied())
.collect();
Ok(TileSnapshot {
array: header.array.clone(),
coord_range: header.coord_range.clone(),
tile_blob,
snapshot_hlc: header.snapshot_hlc,
schema_hlc: header.schema_hlc,
})
}
pub trait SnapshotSink: Send + Sync {
fn write_snapshot(&self, snapshot: &TileSnapshot) -> ArrayResult<()>;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sync::hlc::Hlc;
use crate::sync::replica_id::ReplicaId;
fn hlc(ms: u64) -> Hlc {
Hlc::new(ms, 0, ReplicaId::new(1)).unwrap()
}
fn range() -> CoordRange {
CoordRange {
lo: vec![CoordValue::Int64(0)],
hi: vec![CoordValue::Int64(100)],
}
}
fn snapshot(blob_len: usize) -> TileSnapshot {
TileSnapshot {
array: "test".into(),
coord_range: range(),
tile_blob: vec![0xAB; blob_len],
snapshot_hlc: hlc(1000),
schema_hlc: hlc(500),
}
}
#[test]
fn coord_range_roundtrip() {
let r = range();
let bytes = zerompk::to_msgpack_vec(&r).unwrap();
let back: CoordRange = zerompk::from_msgpack(&bytes).unwrap();
assert_eq!(r, back);
}
#[test]
fn tile_snapshot_roundtrip() {
let s = snapshot(256);
let bytes = encode_snapshot(&s).unwrap();
let back = decode_snapshot(&bytes).unwrap();
assert_eq!(s, back);
}
#[test]
fn split_then_assemble_roundtrip() {
let s = snapshot(1000);
let (header, mut chunks) = split_into_chunks(&s, 300).unwrap();
assert_eq!(header.total_chunks, 4);
assert_eq!(chunks.len(), 4);
let reassembled = assemble_chunks(&header, &mut chunks).unwrap();
assert_eq!(reassembled.tile_blob, s.tile_blob);
assert_eq!(reassembled.array, s.array);
assert_eq!(reassembled.snapshot_hlc, s.snapshot_hlc);
assert_eq!(reassembled.schema_hlc, s.schema_hlc);
}
#[test]
fn assemble_rejects_missing_chunk() {
let s = snapshot(1000);
let (header, mut chunks) = split_into_chunks(&s, 300).unwrap();
chunks.retain(|c| c.chunk_index != 2);
let result = assemble_chunks(&header, &mut chunks);
assert!(matches!(result, Err(ArrayError::SegmentCorruption { .. })));
}
#[test]
fn assemble_rejects_duplicate_chunk() {
let s = snapshot(600);
let (header, mut chunks) = split_into_chunks(&s, 300).unwrap();
let dup = chunks[0].clone();
chunks.push(dup);
let result = assemble_chunks(&header, &mut chunks);
assert!(matches!(result, Err(ArrayError::SegmentCorruption { .. })));
}
#[test]
fn split_rejects_too_small_max() {
let s = snapshot(100);
let result = split_into_chunks(&s, 63);
assert!(matches!(result, Err(ArrayError::InvalidOp { .. })));
}
#[test]
fn split_empty_blob_produces_one_chunk() {
let s = snapshot(0);
let (header, chunks) = split_into_chunks(&s, 64).unwrap();
assert_eq!(header.total_chunks, 1);
assert_eq!(chunks.len(), 1);
assert!(chunks[0].payload.is_empty());
}
}