use super::version::{Version, VersionEdit, FileMetadata, FileType};
use crate::{Result, StorageError};
use std::fs::{self, File, OpenOptions};
use std::io::{Write, Read};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use serde::{Deserialize, Serialize};
use crc32fast::Hasher;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ManifestRecord {
AddFile(FileMetadata),
DeleteFile { file_id: u64, file_type: FileType },
VersionCommit { version: u64 },
}
pub struct Manifest {
data_dir: PathBuf,
current_version: Arc<Mutex<Version>>,
manifest_file: Arc<Mutex<File>>,
next_version: Arc<Mutex<u64>>,
#[allow(dead_code)]
manifest_number: u64,
}
impl Manifest {
pub fn open(data_dir: impl AsRef<Path>) -> Result<Self> {
let data_dir = data_dir.as_ref().to_path_buf();
fs::create_dir_all(&data_dir)?;
let current_path = data_dir.join("CURRENT");
let (manifest_number, version) = if current_path.exists() {
let manifest_name = fs::read_to_string(¤t_path)?;
let manifest_path = data_dir.join(manifest_name.trim());
let version = Self::recover_version(&manifest_path)?;
let manifest_number = manifest_name
.trim()
.strip_prefix("MANIFEST-")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(1);
(manifest_number, version)
} else {
(1, Version::new(0))
};
let manifest_path = data_dir.join(format!("MANIFEST-{:06}", manifest_number));
let manifest_file = OpenOptions::new()
.create(true)
.append(true)
.open(&manifest_path)?;
let mut current_file = File::create(¤t_path)?;
writeln!(current_file, "MANIFEST-{:06}", manifest_number)?;
current_file.sync_all()?;
let next_version = version.version_number + 1;
Ok(Self {
data_dir,
current_version: Arc::new(Mutex::new(version)),
manifest_file: Arc::new(Mutex::new(manifest_file)),
next_version: Arc::new(Mutex::new(next_version)),
manifest_number,
})
}
fn recover_version(manifest_path: &Path) -> Result<Version> {
let mut file = File::open(manifest_path)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;
let mut current_version = Version::new(0);
let mut last_committed_version = Version::new(0);
let mut offset = 0;
while offset < buffer.len() {
if offset + 4 > buffer.len() {
break;
}
let len = u32::from_le_bytes([
buffer[offset],
buffer[offset + 1],
buffer[offset + 2],
buffer[offset + 3],
]) as usize;
offset += 4;
if offset + len > buffer.len() {
break;
}
if let Ok(record) = bincode::deserialize::<ManifestRecord>(&buffer[offset..offset + len]) {
match &record {
ManifestRecord::AddFile(meta) => {
current_version.add_file(meta.clone());
}
ManifestRecord::DeleteFile { file_id, file_type } => {
current_version.delete_file(*file_id, file_type);
}
ManifestRecord::VersionCommit { version } => {
current_version.version_number = *version;
last_committed_version = current_version.clone();
}
}
}
offset += len;
}
Ok(last_committed_version)
}
pub fn current_version(&self) -> Version {
self.current_version.lock()
.expect("Manifest current_version lock poisoned")
.clone()
}
pub fn apply_edit(&self, edit: VersionEdit) -> Result<u64> {
if edit.is_empty() {
return Ok(self.current_version.lock()
.expect("Manifest lock poisoned")
.version_number);
}
for meta in &edit.add_files {
let file_path = self.data_dir.join(&meta.path);
if !file_path.exists() {
return Err(StorageError::FileNotFound(file_path));
}
let actual_size = fs::metadata(&file_path)
.map_err(StorageError::Io)?
.len();
if actual_size != meta.size {
return Err(StorageError::Corruption(
format!("File size mismatch: {} (expected {}, got {})",
meta.path, meta.size, actual_size)
));
}
let actual_checksum = Self::calculate_checksum(&file_path)?;
if actual_checksum != meta.checksum {
return Err(StorageError::CorruptedFile(file_path));
}
}
let mut version = self.current_version.lock()
.map_err(|_| StorageError::Lock("Version lock poisoned".into()))?;
let mut file = self.manifest_file.lock()
.map_err(|_| StorageError::Lock("Manifest file lock poisoned".into()))?;
let mut next_ver = self.next_version.lock()
.map_err(|_| StorageError::Lock("Next version lock poisoned".into()))?;
for meta in &edit.add_files {
let record = ManifestRecord::AddFile(meta.clone());
let data = bincode::serialize(&record)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
file.write_all(&(data.len() as u32).to_le_bytes())?;
file.write_all(&data)?;
}
for (file_id, file_type) in &edit.delete_files {
let record = ManifestRecord::DeleteFile {
file_id: *file_id,
file_type: file_type.clone(),
};
let data = bincode::serialize(&record)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
file.write_all(&(data.len() as u32).to_le_bytes())?;
file.write_all(&data)?;
}
file.sync_all()?;
let commit_record = ManifestRecord::VersionCommit { version: *next_ver };
let data = bincode::serialize(&commit_record)
.map_err(|e| StorageError::Serialization(e.to_string()))?;
file.write_all(&(data.len() as u32).to_le_bytes())?;
file.write_all(&data)?;
file.sync_all()?;
for meta in &edit.add_files {
version.add_file(meta.clone());
}
for (file_id, file_type) in &edit.delete_files {
version.delete_file(*file_id, file_type);
}
version.version_number = *next_ver;
let committed_version = *next_ver;
*next_ver += 1;
Ok(committed_version)
}
fn calculate_checksum(path: &Path) -> Result<u32> {
let mut file = File::open(path)?;
let mut hasher = Hasher::new();
let mut buffer = vec![0u8; 65536];
loop {
let n = file.read(&mut buffer)?;
if n == 0 {
break;
}
hasher.update(&buffer[..n]);
}
Ok(hasher.finalize())
}
pub fn garbage_collect(&self) -> Result<Vec<String>> {
let version = self.current_version.lock()
.map_err(|_| StorageError::Lock("Version lock poisoned".into()))?;
let active_files = version.all_file_names();
let mut deleted_files = Vec::new();
for entry in fs::read_dir(&self.data_dir)? {
let entry = entry?;
let file_name = entry.file_name();
let file_name_str = file_name.to_string_lossy().to_string();
if file_name_str.starts_with("MANIFEST") || file_name_str == "CURRENT" {
continue;
}
if !active_files.contains(&file_name_str) {
fs::remove_file(entry.path())?;
deleted_files.push(file_name_str);
}
}
Ok(deleted_files)
}
pub fn data_dir(&self) -> &Path {
&self.data_dir
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_manifest_atomic_commit() {
let temp_dir = TempDir::new().unwrap();
let manifest = Manifest::open(temp_dir.path()).unwrap();
let sst_path = temp_dir.path().join("sstable_00001.sst");
let ts_path = temp_dir.path().join("timestamp_idx_00001.idx");
let text_path = temp_dir.path().join("text_00001.lsm");
std::fs::write(&sst_path, vec![0u8; 1024]).unwrap();
std::fs::write(&ts_path, vec![0u8; 512]).unwrap();
std::fs::write(&text_path, vec![0u8; 256]).unwrap();
let sst_checksum = Manifest::calculate_checksum(&sst_path).unwrap();
let ts_checksum = Manifest::calculate_checksum(&ts_path).unwrap();
let text_checksum = Manifest::calculate_checksum(&text_path).unwrap();
let mut edit = VersionEdit::new();
edit.add_file(FileMetadata {
file_id: 1,
file_type: FileType::SSTable,
path: "sstable_00001.sst".to_string(),
size: 1024,
checksum: sst_checksum,
min_key: Some(0),
max_key: Some(100),
level: Some(0),
});
edit.add_file(FileMetadata {
file_id: 1,
file_type: FileType::TimestampIndex,
path: "timestamp_idx_00001.idx".to_string(),
size: 512,
checksum: ts_checksum,
min_key: None,
max_key: None,
level: None,
});
edit.add_file(FileMetadata {
file_id: 1,
file_type: FileType::TextIndexLSM,
path: "text_00001.lsm".to_string(),
size: 256,
checksum: text_checksum,
min_key: None,
max_key: None,
level: None,
});
let v1 = manifest.apply_edit(edit).unwrap();
assert_eq!(v1, 1);
let version = manifest.current_version();
assert_eq!(version.files.len(), 3);
assert_eq!(version.files[&FileType::SSTable].len(), 1);
assert_eq!(version.files[&FileType::TimestampIndex].len(), 1);
assert_eq!(version.files[&FileType::TextIndexLSM].len(), 1);
}
#[test]
fn test_crash_recovery() {
let temp_dir = TempDir::new().unwrap();
let sst_path = temp_dir.path().join("sstable_00001.sst");
std::fs::write(&sst_path, vec![0u8; 1024]).unwrap();
let sst_checksum = Manifest::calculate_checksum(&sst_path).unwrap();
{
let manifest = Manifest::open(temp_dir.path()).unwrap();
let mut edit = VersionEdit::new();
edit.add_file(FileMetadata {
file_id: 1,
file_type: FileType::SSTable,
path: "sstable_00001.sst".to_string(),
size: 1024,
checksum: sst_checksum,
min_key: Some(0),
max_key: Some(100),
level: Some(0),
});
manifest.apply_edit(edit).unwrap();
}
{
let manifest = Manifest::open(temp_dir.path()).unwrap();
let version = manifest.current_version();
assert_eq!(version.version_number, 1);
assert_eq!(version.files[&FileType::SSTable].len(), 1);
}
}
}