use std::{
fs::File,
io::Read,
path::Path,
};
use bytes::Bytes;
use crate::{
errs::ManifestError,
levels::VersionSet,
manifest::{
EditEntry,
ManifestHeader,
},
version::VersionEdit,
};
pub struct ManifestReader {
file: File,
}
impl ManifestReader {
pub fn open(base_path: &Path) -> Result<Self, ManifestError> {
let manifest_path = base_path.join("MANIFEST");
let file = match File::open(manifest_path) {
| Ok(f) => f,
| Err(e) => return Err(ManifestError::Io(e)),
};
Ok(Self { file })
}
pub fn read_all_edits(&mut self) -> Result<Vec<VersionEdit>, ManifestError> {
let mut buf = Vec::new();
if let Err(e) = self.file.read_to_end(&mut buf) {
return Err(ManifestError::Io(e));
}
if buf.len() < 48 {
return Err(ManifestError::CorruptedHeader);
}
let header_bytes = Bytes::copy_from_slice(&buf[0..48]);
let header = match ManifestHeader::decode(header_bytes) {
| Ok(h) => h,
| Err(e) => return Err(e),
};
if !header.crc_enabled() {
return Err(ManifestError::CorruptedHeader);
}
let mut edits = Vec::new();
let mut offset = 48;
while offset < buf.len() {
let remaining = Bytes::copy_from_slice(&buf[offset..]);
match EditEntry::decode(remaining) {
| Ok((entry, bytes_read)) => {
match VersionEdit::decode(entry.payload) {
| Ok(edit) => {
edits.push(edit);
offset += bytes_read;
},
| Err(e) => {
tracing::warn!(
"Failed to decode VersionEdit at offset {}: {:?}",
offset,
e
);
break;
},
}
},
| Err(e) => {
tracing::warn!("Corruption detected at offset {}: {:?}", offset, e);
break;
},
}
}
Ok(edits)
}
pub fn recover_version_set(
base_path: &Path,
num_levels: usize,
) -> Result<Option<VersionSet>, ManifestError> {
let manifest_path = base_path.join("MANIFEST");
if !manifest_path.exists() {
return Ok(None);
}
let mut reader = match Self::open(base_path) {
| Ok(r) => r,
| Err(e) => return Err(e),
};
let edits = match reader.read_all_edits() {
| Ok(e) => e,
| Err(e) => return Err(e),
};
if edits.is_empty() {
return Ok(Some(VersionSet::new(0, num_levels)));
}
let mut version = VersionSet::new(0, num_levels);
for edit in edits {
match edit.apply(&mut version, base_path) {
| Ok(()) => {},
| Err(e) => {
tracing::warn!("Failed to apply edit during recovery: {:?}", e);
},
}
}
Ok(Some(version))
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use super::*;
use crate::{
manifest_writer::ManifestWriter,
version::VersionEdit,
};
#[test]
fn test_read_empty_manifest() {
let temp_dir = TempDir::new().unwrap();
let _writer = ManifestWriter::create(temp_dir.path().to_path_buf(), 0).unwrap();
let mut reader = ManifestReader::open(temp_dir.path()).unwrap();
let edits = reader.read_all_edits().unwrap();
assert_eq!(edits.len(), 0);
}
#[test]
fn test_read_single_edit() {
let temp_dir = TempDir::new().unwrap();
{
let mut writer = ManifestWriter::create(temp_dir.path().to_path_buf(), 0).unwrap();
let edit = VersionEdit::UpdateSequence { sequence: 42 };
writer.append_edit(&edit).unwrap();
writer.sync().unwrap();
}
let mut reader = ManifestReader::open(temp_dir.path()).unwrap();
let edits = reader.read_all_edits().unwrap();
assert_eq!(edits.len(), 1);
match &edits[0] {
| VersionEdit::UpdateSequence { sequence } => assert_eq!(*sequence, 42),
| _ => panic!("wrong edit type"),
}
}
#[test]
fn test_read_multiple_edits() {
let temp_dir = TempDir::new().unwrap();
{
let mut writer = ManifestWriter::create(temp_dir.path().to_path_buf(), 0).unwrap();
writer
.append_edit(&VersionEdit::AddL0Segment {
segment_id: 1,
key_range: (b"a".to_vec(), b"z".to_vec()),
size: 1000,
})
.unwrap();
writer
.append_edit(&VersionEdit::AddSegment {
level: 1,
segment_id: 2,
key_range: (b"a".to_vec(), b"m".to_vec()),
size: 2000,
})
.unwrap();
writer
.append_edit(&VersionEdit::RemoveSegment {
level: 1,
segment_id: 2,
})
.unwrap();
writer.sync().unwrap();
}
let mut reader = ManifestReader::open(temp_dir.path()).unwrap();
let edits = reader.read_all_edits().unwrap();
assert_eq!(edits.len(), 3);
}
#[test]
fn test_recover_version_set_no_manifest() {
let temp_dir = TempDir::new().unwrap();
let result = ManifestReader::recover_version_set(temp_dir.path(), 7).unwrap();
assert!(result.is_none());
}
#[test]
fn test_recover_version_set_empty() {
let temp_dir = TempDir::new().unwrap();
{
let _writer = ManifestWriter::create(temp_dir.path().to_path_buf(), 0).unwrap();
}
let result = ManifestReader::recover_version_set(temp_dir.path(), 7).unwrap();
assert!(result.is_some());
let version = result.unwrap();
assert_eq!(version.sequence, 0);
}
#[test]
fn test_recover_version_set_with_sequence() {
let temp_dir = TempDir::new().unwrap();
{
let mut writer = ManifestWriter::create(temp_dir.path().to_path_buf(), 0).unwrap();
writer
.append_edit(&VersionEdit::UpdateSequence { sequence: 123 })
.unwrap();
writer.sync().unwrap();
}
let result = ManifestReader::recover_version_set(temp_dir.path(), 7).unwrap();
let version = result.unwrap();
assert_eq!(version.sequence, 123);
}
#[test]
fn test_corrupted_entry_stops_parsing() {
let temp_dir = TempDir::new().unwrap();
let manifest_path = temp_dir.path().join("MANIFEST");
{
let mut writer = ManifestWriter::create(temp_dir.path().to_path_buf(), 0).unwrap();
writer
.append_edit(&VersionEdit::UpdateSequence { sequence: 1 })
.unwrap();
writer
.append_edit(&VersionEdit::UpdateSequence { sequence: 2 })
.unwrap();
writer.sync().unwrap();
}
{
use std::{
fs::OpenOptions,
io::Write,
};
let mut file = OpenOptions::new()
.append(true)
.open(&manifest_path)
.unwrap();
file.write_all(&[0xff; 100]).unwrap();
}
let mut reader = ManifestReader::open(temp_dir.path()).unwrap();
let edits = reader.read_all_edits().unwrap();
assert_eq!(edits.len(), 2);
}
}