use super::framing;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TableDesc {
pub id: u64,
pub checksum: u128,
pub global_seqno: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChangedLevel {
pub level: u8,
pub runs: Vec<Vec<TableDesc>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AddedBlobFile {
pub id: u64,
pub checksum: u128,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct VersionEdit {
pub new_version_id: u64,
pub changed_levels: Vec<ChangedLevel>,
pub added_blob_files: Vec<AddedBlobFile>,
pub removed_blob_file_ids: Vec<u64>,
pub gc_stats: Option<Vec<u8>>,
}
const CHECKSUM_TYPE_XXH3: u8 = 0;
impl VersionEdit {
fn encode_payload(&self, out: &mut Vec<u8>) -> crate::Result<()> {
out.write_u64::<LittleEndian>(self.new_version_id)?;
out.write_u32::<LittleEndian>(u32_len(self.changed_levels.len())?)?;
for cl in &self.changed_levels {
out.write_u8(cl.level)?;
out.write_u32::<LittleEndian>(u32_len(cl.runs.len())?)?;
for run in &cl.runs {
out.write_u32::<LittleEndian>(u32_len(run.len())?)?;
for t in run {
out.write_u64::<LittleEndian>(t.id)?;
out.write_u8(CHECKSUM_TYPE_XXH3)?;
out.write_u128::<LittleEndian>(t.checksum)?;
out.write_u64::<LittleEndian>(t.global_seqno)?;
}
}
}
out.write_u32::<LittleEndian>(u32_len(self.added_blob_files.len())?)?;
for b in &self.added_blob_files {
out.write_u64::<LittleEndian>(b.id)?;
out.write_u8(CHECKSUM_TYPE_XXH3)?;
out.write_u128::<LittleEndian>(b.checksum)?;
}
out.write_u32::<LittleEndian>(u32_len(self.removed_blob_file_ids.len())?)?;
for &id in &self.removed_blob_file_ids {
out.write_u64::<LittleEndian>(id)?;
}
match &self.gc_stats {
Some(bytes) => {
out.write_u32::<LittleEndian>(u32_len(bytes.len())?)?;
out.write_all(bytes)?;
}
None => out.write_u32::<LittleEndian>(0)?,
}
Ok(())
}
pub fn append_to<W: Write>(&self, writer: &mut W, scratch: &mut Vec<u8>) -> crate::Result<()> {
framing::write_framed_record(writer, scratch, |payload| self.encode_payload(payload))
}
pub fn decode_payload(mut bytes: &[u8]) -> crate::Result<Self> {
const ERR: crate::Error = crate::Error::InvalidHeader("VersionEdit");
let r = &mut bytes;
let new_version_id = r.read_u64::<LittleEndian>().map_err(|_| ERR)?;
let changed_level_count = r.read_u32::<LittleEndian>().map_err(|_| ERR)?;
let mut changed_levels = Vec::with_capacity(cap(changed_level_count));
for _ in 0..changed_level_count {
let level = r.read_u8().map_err(|_| ERR)?;
let run_count = r.read_u32::<LittleEndian>().map_err(|_| ERR)?;
let mut runs = Vec::with_capacity(cap(run_count));
for _ in 0..run_count {
let table_count = r.read_u32::<LittleEndian>().map_err(|_| ERR)?;
let mut run = Vec::with_capacity(cap(table_count));
for _ in 0..table_count {
let id = r.read_u64::<LittleEndian>().map_err(|_| ERR)?;
let checksum_type = r.read_u8().map_err(|_| ERR)?;
if checksum_type != CHECKSUM_TYPE_XXH3 {
return Err(ERR);
}
let checksum = r.read_u128::<LittleEndian>().map_err(|_| ERR)?;
let global_seqno = r.read_u64::<LittleEndian>().map_err(|_| ERR)?;
run.push(TableDesc {
id,
checksum,
global_seqno,
});
}
runs.push(run);
}
changed_levels.push(ChangedLevel { level, runs });
}
let added_blob_count = r.read_u32::<LittleEndian>().map_err(|_| ERR)?;
let mut added_blob_files = Vec::with_capacity(cap(added_blob_count));
for _ in 0..added_blob_count {
let id = r.read_u64::<LittleEndian>().map_err(|_| ERR)?;
let checksum_type = r.read_u8().map_err(|_| ERR)?;
if checksum_type != CHECKSUM_TYPE_XXH3 {
return Err(ERR);
}
let checksum = r.read_u128::<LittleEndian>().map_err(|_| ERR)?;
added_blob_files.push(AddedBlobFile { id, checksum });
}
let removed_blob_count = r.read_u32::<LittleEndian>().map_err(|_| ERR)?;
let mut removed_blob_file_ids = Vec::with_capacity(cap(removed_blob_count));
for _ in 0..removed_blob_count {
removed_blob_file_ids.push(r.read_u64::<LittleEndian>().map_err(|_| ERR)?);
}
let gc_stats_len = r.read_u32::<LittleEndian>().map_err(|_| ERR)? as usize;
let gc_stats = if gc_stats_len == 0 {
None
} else {
if r.len() < gc_stats_len {
return Err(ERR);
}
let (head, tail) = r.split_at(gc_stats_len);
*r = tail;
Some(head.to_vec())
};
if !r.is_empty() {
return Err(ERR);
}
Ok(Self {
new_version_id,
changed_levels,
added_blob_files,
removed_blob_file_ids,
gc_stats,
})
}
}
pub fn replay_edits<R: Read>(reader: &mut R) -> crate::Result<Vec<VersionEdit>> {
use framing::FramedRecordOutcome;
let mut edits = Vec::new();
let mut scratch = Vec::new();
while matches!(
framing::read_framed_record(reader, u64::MAX, None, &mut scratch)?,
FramedRecordOutcome::Ok
) {
edits.push(VersionEdit::decode_payload(&scratch)?);
}
Ok(edits)
}
fn u32_len(n: usize) -> crate::Result<u32> {
u32::try_from(n).map_err(|_| crate::Error::Unrecoverable)
}
fn cap(count: u32) -> usize {
(count as usize).min(1024)
}
#[cfg(test)]
#[expect(clippy::expect_used, clippy::indexing_slicing, reason = "test code")]
mod tests {
use super::*;
fn sample() -> VersionEdit {
VersionEdit {
new_version_id: 42,
changed_levels: vec![
ChangedLevel {
level: 0,
runs: vec![
vec![TableDesc {
id: 7,
checksum: 0x1122_3344_5566_7788_99AA_BBCC_DDEE_FF00,
global_seqno: 100,
}],
vec![TableDesc {
id: 8,
checksum: 1,
global_seqno: 101,
}],
],
},
ChangedLevel {
level: 3,
runs: vec![vec![
TableDesc {
id: 10,
checksum: 2,
global_seqno: 50,
},
TableDesc {
id: 11,
checksum: 3,
global_seqno: 51,
},
]],
},
],
added_blob_files: vec![AddedBlobFile {
id: 9,
checksum: 0xDEAD_BEEF,
}],
removed_blob_file_ids: vec![4],
gc_stats: Some(vec![0xAB; 20]),
}
}
#[test]
fn framed_roundtrip_recovers_the_edit() {
let edit = sample();
let mut buf = Vec::new();
let mut scratch = Vec::new();
edit.append_to(&mut buf, &mut scratch).expect("append");
let mut payload = Vec::new();
let outcome =
framing::read_framed_record(&mut &buf[..], u64::MAX, None, &mut payload).expect("read");
assert!(
matches!(outcome, framing::FramedRecordOutcome::Ok),
"clean record must decode Ok, got {outcome:?}",
);
let decoded = VersionEdit::decode_payload(&payload).expect("decode");
assert_eq!(decoded, edit);
}
#[test]
fn empty_level_layout_roundtrips() {
let mut edit = sample();
edit.changed_levels.push(ChangedLevel {
level: 2,
runs: vec![],
});
let mut buf = Vec::new();
edit.append_to(&mut buf, &mut Vec::new()).expect("append");
let mut payload = Vec::new();
framing::read_framed_record(&mut &buf[..], u64::MAX, None, &mut payload).expect("read");
assert_eq!(VersionEdit::decode_payload(&payload).expect("decode"), edit);
}
#[test]
fn empty_gc_stats_roundtrips_as_none() {
let mut edit = sample();
edit.gc_stats = None;
let mut buf = Vec::new();
edit.append_to(&mut buf, &mut Vec::new()).expect("append");
let mut payload = Vec::new();
framing::read_framed_record(&mut &buf[..], u64::MAX, None, &mut payload).expect("read");
assert_eq!(VersionEdit::decode_payload(&payload).expect("decode"), edit);
}
#[test]
fn truncated_trailing_record_is_detected() {
let edit = sample();
let mut buf = Vec::new();
edit.append_to(&mut buf, &mut Vec::new()).expect("append");
buf.truncate(buf.len() - 5); let mut payload = Vec::new();
let outcome = framing::read_framed_record(&mut &buf[..], u64::MAX, None, &mut payload)
.expect("read does not error on truncation");
assert!(
!matches!(outcome, framing::FramedRecordOutcome::Ok),
"truncated record must not be Ok, got {outcome:?}",
);
}
#[test]
fn bitflip_in_payload_fails_checksum() {
let edit = sample();
let mut buf = Vec::new();
edit.append_to(&mut buf, &mut Vec::new()).expect("append");
let last = buf.len() - 1;
buf[last] ^= 0xFF;
let mut payload = Vec::new();
let outcome =
framing::read_framed_record(&mut &buf[..], u64::MAX, None, &mut payload).expect("read");
assert!(
matches!(
outcome,
framing::FramedRecordOutcome::ChecksumMismatch { .. }
),
"bit-flip must surface as ChecksumMismatch, got {outcome:?}",
);
}
#[test]
fn replay_recovers_all_durable_edits_in_order() {
let mut log = Vec::new();
let mut scratch = Vec::new();
let edits: Vec<VersionEdit> = (0..5)
.map(|i| {
let mut e = sample();
e.new_version_id = 100 + i;
e
})
.collect();
for e in &edits {
e.append_to(&mut log, &mut scratch).expect("append");
}
let replayed = replay_edits(&mut &log[..]).expect("replay");
assert_eq!(replayed, edits, "replay must recover every edit in order");
}
#[test]
fn replay_stops_at_torn_tail_keeping_clean_prefix() {
let mut log = Vec::new();
let mut scratch = Vec::new();
let mut e0 = sample();
e0.new_version_id = 1;
let mut e1 = sample();
e1.new_version_id = 2;
e0.append_to(&mut log, &mut scratch).expect("append e0");
e1.append_to(&mut log, &mut scratch).expect("append e1");
let clean_len = log.len();
let mut e2 = sample();
e2.new_version_id = 3;
e2.append_to(&mut log, &mut scratch).expect("append e2");
log.truncate(clean_len + 6);
let replayed = replay_edits(&mut &log[..]).expect("replay");
assert_eq!(replayed, vec![e0, e1], "torn tail dropped, prefix kept");
}
#[test]
fn replay_stops_at_bitflipped_record() {
let mut log = Vec::new();
let mut scratch = Vec::new();
let mut e0 = sample();
e0.new_version_id = 1;
let mut e1 = sample();
e1.new_version_id = 2;
e0.append_to(&mut log, &mut scratch).expect("append e0");
let after_e0 = log.len();
e1.append_to(&mut log, &mut scratch).expect("append e1");
let target = after_e0 + framing::FRAME_HEADER_LEN + 2;
log[target] ^= 0xFF;
let replayed = replay_edits(&mut &log[..]).expect("replay");
assert_eq!(replayed, vec![e0], "replay stops at the corrupted record");
}
#[test]
fn replay_of_empty_log_is_empty() {
let replayed = replay_edits(&mut &[][..]).expect("replay");
assert!(replayed.is_empty(), "empty log → no edits");
}
#[test]
fn decode_rejects_unknown_table_checksum_type() {
let edit = sample();
let mut payload = Vec::new();
edit.encode_payload(&mut payload).expect("encode");
let cs_type_off = 8 + 4 + 1 + 4 + 4 + 8;
payload[cs_type_off] = 0xEE; assert!(
matches!(
VersionEdit::decode_payload(&payload),
Err(crate::Error::InvalidHeader("VersionEdit"))
),
"an unknown table checksum_type tag must be rejected",
);
}
#[test]
fn decode_rejects_trailing_garbage() {
let edit = sample();
let mut payload = Vec::new();
edit.encode_payload(&mut payload).expect("encode");
payload.extend_from_slice(&[0xAB, 0xCD]);
assert!(
matches!(
VersionEdit::decode_payload(&payload),
Err(crate::Error::InvalidHeader("VersionEdit"))
),
"trailing bytes after a complete edit must be rejected",
);
}
#[test]
fn decode_rejects_truncated_payload() {
let edit = sample();
let mut payload = Vec::new();
edit.encode_payload(&mut payload).expect("encode");
payload.truncate(payload.len() / 2);
assert!(
matches!(
VersionEdit::decode_payload(&payload),
Err(crate::Error::InvalidHeader("VersionEdit"))
),
"a truncated payload must surface InvalidHeader",
);
}
}