use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use serde::{Deserialize, Serialize};
use tracing::info;
use crate::core::crypto::crc32_checksum;
use crate::core::error::{Error, Result};
const MANIFEST_MAGIC: &[u8; 8] = b"HNSHMNFT";
const MANIFEST_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Manifest {
pub version: u64,
pub wal_checkpoint: u64,
pub checkpoint_hash: Option<String>,
pub sstables: Vec<SSTableManifestEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SSTableManifestEntry {
pub id: u64,
pub level: u32,
pub path: PathBuf,
pub size: u64,
pub entry_count: u64,
pub min_key: Vec<u8>,
pub max_key: Vec<u8>,
pub min_sequence: u64,
pub max_sequence: u64,
pub creation_time: u64,
}
impl Manifest {
pub fn new() -> Self {
Self {
version: 0,
wal_checkpoint: 0,
checkpoint_hash: None,
sstables: Vec::new(),
}
}
pub fn load_or_create(data_dir: &Path) -> Result<Self> {
let manifest_path = data_dir.join("MANIFEST");
if manifest_path.exists() {
Self::load(&manifest_path)
} else {
info!("No manifest found, creating new database");
Ok(Self::new())
}
}
pub fn load(path: &Path) -> Result<Self> {
let file = File::open(path).map_err(|e| Error::Io {
message: format!("Failed to open manifest: {:?}", path),
source: e,
})?;
let mut reader = BufReader::new(file);
let mut file_data = Vec::new();
reader.read_to_end(&mut file_data)?;
if file_data.len() < 4 {
return Err(Error::Internal {
message: "Manifest file too small".to_string(),
});
}
let (payload, checksum_bytes) = file_data.split_at(file_data.len() - 4);
let stored_checksum = u32::from_le_bytes(
checksum_bytes
.try_into()
.expect("checksum slice is exactly 4 bytes"),
);
let computed_checksum = crc32_checksum(payload);
if stored_checksum != computed_checksum {
return Err(Error::Internal {
message: format!(
"Manifest checksum mismatch: stored={:#010x}, computed={:#010x}",
stored_checksum, computed_checksum
),
});
}
let mut reader = std::io::Cursor::new(payload);
let mut magic = [0u8; 8];
reader.read_exact(&mut magic)?;
if &magic != MANIFEST_MAGIC {
return Err(Error::Internal {
message: "Invalid manifest magic number".to_string(),
});
}
let version = reader.read_u32::<LittleEndian>()?;
if version != MANIFEST_VERSION {
return Err(Error::Internal {
message: format!("Unsupported manifest version: {}", version),
});
}
let wal_checkpoint = reader.read_u64::<LittleEndian>()?;
let hash_len = reader.read_u32::<LittleEndian>()? as usize;
let checkpoint_hash = if hash_len > 0 {
let mut hash_bytes = vec![0u8; hash_len];
reader.read_exact(&mut hash_bytes)?;
Some(String::from_utf8_lossy(&hash_bytes).to_string())
} else {
None
};
let sstable_count = reader.read_u32::<LittleEndian>()? as usize;
let mut sstables = Vec::with_capacity(sstable_count);
for _ in 0..sstable_count {
let entry = Self::read_sstable_entry(&mut reader)?;
sstables.push(entry);
}
info!(
"Loaded manifest: version={}, wal_checkpoint={}, sstables={}",
version,
wal_checkpoint,
sstables.len()
);
Ok(Self {
version: version as u64,
wal_checkpoint,
checkpoint_hash,
sstables,
})
}
pub fn save(&self, data_dir: &Path) -> Result<()> {
let manifest_path = data_dir.join("MANIFEST");
let temp_path = data_dir.join("MANIFEST.tmp");
{
let mut buf: Vec<u8> = Vec::new();
buf.write_all(MANIFEST_MAGIC)?;
buf.write_u32::<LittleEndian>(MANIFEST_VERSION)?;
buf.write_u64::<LittleEndian>(self.wal_checkpoint)?;
if let Some(ref hash) = self.checkpoint_hash {
let hash_bytes = hash.as_bytes();
buf.write_u32::<LittleEndian>(hash_bytes.len() as u32)?;
buf.write_all(hash_bytes)?;
} else {
buf.write_u32::<LittleEndian>(0)?;
}
buf.write_u32::<LittleEndian>(self.sstables.len() as u32)?;
for entry in &self.sstables {
Self::write_sstable_entry(&mut buf, entry)?;
}
let checksum = crc32_checksum(&buf);
buf.write_u32::<LittleEndian>(checksum)?;
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&temp_path)?;
let mut writer = BufWriter::new(file);
writer.write_all(&buf)?;
writer.flush()?;
let file = writer.into_inner().map_err(|e| e.into_error())?;
file.sync_all()?;
}
std::fs::rename(&temp_path, &manifest_path)?;
if let Ok(dir) = std::fs::File::open(data_dir) {
let _ = dir.sync_all();
}
info!(
"Saved manifest: wal_checkpoint={}, sstables={}",
self.wal_checkpoint,
self.sstables.len()
);
Ok(())
}
pub fn update_checkpoint(&mut self, sequence: u64, hash: Option<String>) {
self.wal_checkpoint = sequence;
self.checkpoint_hash = hash;
self.version += 1;
}
pub fn add_sstable(&mut self, entry: SSTableManifestEntry) {
self.sstables.push(entry);
self.version += 1;
}
pub fn remove_sstables(&mut self, ids: &[u64]) {
self.sstables.retain(|e| !ids.contains(&e.id));
self.version += 1;
}
fn read_sstable_entry(reader: &mut impl Read) -> Result<SSTableManifestEntry> {
let id = reader.read_u64::<LittleEndian>()?;
let level = reader.read_u32::<LittleEndian>()?;
let path_len = reader.read_u32::<LittleEndian>()? as usize;
let mut path_bytes = vec![0u8; path_len];
reader.read_exact(&mut path_bytes)?;
let path = PathBuf::from(String::from_utf8_lossy(&path_bytes).to_string());
let size = reader.read_u64::<LittleEndian>()?;
let entry_count = reader.read_u64::<LittleEndian>()?;
let min_key_len = reader.read_u32::<LittleEndian>()? as usize;
let mut min_key = vec![0u8; min_key_len];
reader.read_exact(&mut min_key)?;
let max_key_len = reader.read_u32::<LittleEndian>()? as usize;
let mut max_key = vec![0u8; max_key_len];
reader.read_exact(&mut max_key)?;
let min_sequence = reader.read_u64::<LittleEndian>()?;
let max_sequence = reader.read_u64::<LittleEndian>()?;
let creation_time = reader.read_u64::<LittleEndian>()?;
Ok(SSTableManifestEntry {
id,
level,
path,
size,
entry_count,
min_key,
max_key,
min_sequence,
max_sequence,
creation_time,
})
}
fn write_sstable_entry(writer: &mut impl Write, entry: &SSTableManifestEntry) -> Result<()> {
writer.write_u64::<LittleEndian>(entry.id)?;
writer.write_u32::<LittleEndian>(entry.level)?;
let path_str = entry.path.to_string_lossy();
writer.write_u32::<LittleEndian>(path_str.len() as u32)?;
writer.write_all(path_str.as_bytes())?;
writer.write_u64::<LittleEndian>(entry.size)?;
writer.write_u64::<LittleEndian>(entry.entry_count)?;
writer.write_u32::<LittleEndian>(entry.min_key.len() as u32)?;
writer.write_all(&entry.min_key)?;
writer.write_u32::<LittleEndian>(entry.max_key.len() as u32)?;
writer.write_all(&entry.max_key)?;
writer.write_u64::<LittleEndian>(entry.min_sequence)?;
writer.write_u64::<LittleEndian>(entry.max_sequence)?;
writer.write_u64::<LittleEndian>(entry.creation_time)?;
Ok(())
}
}
impl Default for Manifest {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_manifest_save_load() {
let temp_dir = TempDir::new().unwrap();
let mut manifest = Manifest::new();
manifest.wal_checkpoint = 12345;
manifest.add_sstable(SSTableManifestEntry {
id: 1,
level: 0,
path: PathBuf::from("/data/sstables/1.sst"),
size: 1024,
entry_count: 100,
min_key: vec![0, 1, 2],
max_key: vec![9, 9, 9],
min_sequence: 0,
max_sequence: 99,
creation_time: 1234567890,
});
manifest.save(temp_dir.path()).unwrap();
let loaded = Manifest::load_or_create(temp_dir.path()).unwrap();
assert_eq!(loaded.wal_checkpoint, 12345);
assert_eq!(loaded.sstables.len(), 1);
assert_eq!(loaded.sstables[0].id, 1);
assert_eq!(loaded.sstables[0].entry_count, 100);
}
#[test]
fn test_manifest_new_database() {
let temp_dir = TempDir::new().unwrap();
let manifest = Manifest::load_or_create(temp_dir.path()).unwrap();
assert_eq!(manifest.wal_checkpoint, 0);
assert!(manifest.sstables.is_empty());
}
}