use super::framing;
use crate::UserKey;
use crate::io::{LittleEndian, ReadBytesExt, WriteBytesExt};
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
#[cfg(not(feature = "std"))]
use crate::io::{Read, Write};
#[cfg(feature = "std")]
use std::io::{Read, Write};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TableDesc {
pub id: u64,
pub checksum: u128,
pub global_seqno: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChangedLevel {
pub level: u8,
pub runs: Vec<Vec<TableDesc>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AddedBlobFile {
pub id: u64,
pub checksum: u128,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct VersionEdit {
pub new_version_id: u64,
pub changed_levels: Vec<ChangedLevel>,
pub added_blob_files: Vec<AddedBlobFile>,
pub removed_blob_file_ids: Vec<u64>,
pub gc_stats: Option<Vec<u8>>,
pub restrictions: Vec<(u64, UserKey)>,
}
const CHECKSUM_TYPE_XXH3: u8 = 0;
impl VersionEdit {
fn encode_payload(&self, out: &mut Vec<u8>) -> crate::Result<()> {
out.write_u64::<LittleEndian>(self.new_version_id)?;
out.write_u32::<LittleEndian>(u32_len(self.changed_levels.len())?)?;
for cl in &self.changed_levels {
out.write_u8(cl.level)?;
out.write_u32::<LittleEndian>(u32_len(cl.runs.len())?)?;
for run in &cl.runs {
out.write_u32::<LittleEndian>(u32_len(run.len())?)?;
for t in run {
out.write_u64::<LittleEndian>(t.id)?;
out.write_u8(CHECKSUM_TYPE_XXH3)?;
out.write_u128::<LittleEndian>(t.checksum)?;
out.write_u64::<LittleEndian>(t.global_seqno)?;
}
}
}
out.write_u32::<LittleEndian>(u32_len(self.added_blob_files.len())?)?;
for b in &self.added_blob_files {
out.write_u64::<LittleEndian>(b.id)?;
out.write_u8(CHECKSUM_TYPE_XXH3)?;
out.write_u128::<LittleEndian>(b.checksum)?;
}
out.write_u32::<LittleEndian>(u32_len(self.removed_blob_file_ids.len())?)?;
for &id in &self.removed_blob_file_ids {
out.write_u64::<LittleEndian>(id)?;
}
match &self.gc_stats {
Some(bytes) => {
out.write_u32::<LittleEndian>(u32_len(bytes.len())?)?;
out.write_all(bytes)?;
}
None => out.write_u32::<LittleEndian>(0)?,
}
out.write_u32::<LittleEndian>(u32_len(self.restrictions.len())?)?;
for (id, key) in &self.restrictions {
out.write_u64::<LittleEndian>(*id)?;
out.write_u32::<LittleEndian>(u32_len(key.len())?)?;
out.write_all(key)?;
}
Ok(())
}
pub fn append_to<W: Write>(&self, writer: &mut W, scratch: &mut Vec<u8>) -> crate::Result<()> {
framing::write_framed_record(writer, scratch, |payload| self.encode_payload(payload))
}
pub fn decode_payload(mut bytes: &[u8]) -> crate::Result<Self> {
const ERR: crate::Error = crate::Error::InvalidHeader("VersionEdit");
let r = &mut bytes;
let new_version_id = r.read_u64::<LittleEndian>().map_err(|_| ERR)?;
let changed_level_count = r.read_u32::<LittleEndian>().map_err(|_| ERR)?;
let mut changed_levels = Vec::with_capacity(cap(changed_level_count));
for _ in 0..changed_level_count {
let level = r.read_u8().map_err(|_| ERR)?;
let run_count = r.read_u32::<LittleEndian>().map_err(|_| ERR)?;
let mut runs = Vec::with_capacity(cap(run_count));
for _ in 0..run_count {
let table_count = r.read_u32::<LittleEndian>().map_err(|_| ERR)?;
let mut run = Vec::with_capacity(cap(table_count));
for _ in 0..table_count {
let id = r.read_u64::<LittleEndian>().map_err(|_| ERR)?;
let checksum_type = r.read_u8().map_err(|_| ERR)?;
if checksum_type != CHECKSUM_TYPE_XXH3 {
return Err(ERR);
}
let checksum = r.read_u128::<LittleEndian>().map_err(|_| ERR)?;
let global_seqno = r.read_u64::<LittleEndian>().map_err(|_| ERR)?;
run.push(TableDesc {
id,
checksum,
global_seqno,
});
}
runs.push(run);
}
changed_levels.push(ChangedLevel { level, runs });
}
let added_blob_count = r.read_u32::<LittleEndian>().map_err(|_| ERR)?;
let mut added_blob_files = Vec::with_capacity(cap(added_blob_count));
for _ in 0..added_blob_count {
let id = r.read_u64::<LittleEndian>().map_err(|_| ERR)?;
let checksum_type = r.read_u8().map_err(|_| ERR)?;
if checksum_type != CHECKSUM_TYPE_XXH3 {
return Err(ERR);
}
let checksum = r.read_u128::<LittleEndian>().map_err(|_| ERR)?;
added_blob_files.push(AddedBlobFile { id, checksum });
}
let removed_blob_count = r.read_u32::<LittleEndian>().map_err(|_| ERR)?;
let mut removed_blob_file_ids = Vec::with_capacity(cap(removed_blob_count));
for _ in 0..removed_blob_count {
removed_blob_file_ids.push(r.read_u64::<LittleEndian>().map_err(|_| ERR)?);
}
let gc_stats_len = r.read_u32::<LittleEndian>().map_err(|_| ERR)? as usize;
let gc_stats = if gc_stats_len == 0 {
None
} else {
if r.len() < gc_stats_len {
return Err(ERR);
}
let (head, tail) = r.split_at(gc_stats_len);
*r = tail;
Some(head.to_vec())
};
let restriction_count = r.read_u32::<LittleEndian>().map_err(|_| ERR)?;
let mut restrictions = Vec::with_capacity(cap(restriction_count));
for _ in 0..restriction_count {
let id = r.read_u64::<LittleEndian>().map_err(|_| ERR)?;
let key_len = r.read_u32::<LittleEndian>().map_err(|_| ERR)? as usize;
if r.len() < key_len {
return Err(ERR);
}
let (head, tail) = r.split_at(key_len);
*r = tail;
restrictions.push((id, UserKey::from(head)));
}
if !r.is_empty() {
return Err(ERR);
}
Ok(Self {
new_version_id,
changed_levels,
added_blob_files,
removed_blob_file_ids,
gc_stats,
restrictions,
})
}
}
fn tail_defect_kind(outcome: &framing::FramedRecordOutcome) -> &'static str {
use framing::FramedRecordOutcome;
match outcome {
FramedRecordOutcome::TailTruncation => "truncated",
FramedRecordOutcome::ChecksumMismatch { .. } => "checksum-mismatch",
FramedRecordOutcome::BadHeader => "bad-header",
FramedRecordOutcome::LenMismatch { .. } => "len-mismatch",
FramedRecordOutcome::Ok => "ok",
}
}
pub fn replay_edits<R: Read>(
reader: &mut R,
mode: crate::config::ManifestRecoveryMode,
) -> crate::Result<Vec<VersionEdit>> {
use crate::config::ManifestRecoveryMode;
#[cfg(not(feature = "std"))]
use crate::io::BufRead;
use framing::FramedRecordOutcome;
#[cfg(feature = "std")]
use std::io::BufRead;
let abort_on_truncation = matches!(mode, ManifestRecoveryMode::AbsoluteConsistency);
let tolerate_corruption = matches!(
mode,
ManifestRecoveryMode::PointInTimeRecovery | ManifestRecoveryMode::SkipAnyCorruptedRecords
);
let mut reader = crate::io::BufReader::new(reader);
let mut edits = Vec::new();
let mut scratch = Vec::new();
loop {
if reader.fill_buf().map_err(crate::Error::from)?.is_empty() {
break;
}
let outcome = framing::read_framed_record(&mut reader, u64::MAX, None, &mut scratch)?;
match outcome {
FramedRecordOutcome::Ok => edits.push(VersionEdit::decode_payload(&scratch)?),
FramedRecordOutcome::TailTruncation => {
if abort_on_truncation {
return Err(crate::Error::TornManifestEditLog { kind: "truncated" });
}
break;
}
FramedRecordOutcome::ChecksumMismatch { .. }
| FramedRecordOutcome::BadHeader
| FramedRecordOutcome::LenMismatch { .. } => {
if tolerate_corruption {
break;
}
return Err(crate::Error::TornManifestEditLog {
kind: tail_defect_kind(&outcome),
});
}
}
}
Ok(edits)
}
fn u32_len(n: usize) -> crate::Result<u32> {
u32::try_from(n).map_err(|_| crate::Error::Unrecoverable)
}
fn cap(count: u32) -> usize {
(count as usize).min(1024)
}
#[cfg(test)]
#[expect(clippy::expect_used, clippy::indexing_slicing, reason = "test code")]
mod tests;