use crate::error::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::time::SystemTime;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Manifest {
pub version: u64,
pub lsm_file: Option<String>,
pub vector_indexes: HashMap<String, String>,
pub spatial_indexes: HashMap<String, String>,
pub text_indexes: HashMap<String, String>,
pub timestamp_indexes: HashMap<String, String>,
pub column_indexes: HashMap<String, String>,
pub checksum: u64,
pub timestamp: u64,
}
impl Manifest {
pub fn new(version: u64) -> Self {
Self {
version,
lsm_file: None,
vector_indexes: HashMap::new(),
spatial_indexes: HashMap::new(),
text_indexes: HashMap::new(),
timestamp_indexes: HashMap::new(),
column_indexes: HashMap::new(),
checksum: 0,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
pub fn calculate_checksum(&mut self) -> Result<u64> {
let mut hasher = 0u64;
if let Some(ref file) = self.lsm_file {
hasher = hasher.wrapping_add(Self::hash_string(file));
}
for (_, file) in &self.vector_indexes {
hasher = hasher.wrapping_add(Self::hash_string(file));
}
for (_, file) in &self.spatial_indexes {
hasher = hasher.wrapping_add(Self::hash_string(file));
}
for (_, file) in &self.text_indexes {
hasher = hasher.wrapping_add(Self::hash_string(file));
}
for (_, file) in &self.timestamp_indexes {
hasher = hasher.wrapping_add(Self::hash_string(file));
}
for (_, file) in &self.column_indexes {
hasher = hasher.wrapping_add(Self::hash_string(file));
}
self.checksum = hasher;
Ok(hasher)
}
fn hash_string(s: &str) -> u64 {
let mut hash = 0u64;
for byte in s.bytes() {
hash = hash.wrapping_mul(31).wrapping_add(byte as u64);
}
hash
}
pub fn write_temp(&self, db_path: &Path) -> Result<PathBuf> {
let temp_path = db_path.join(format!("MANIFEST-{:06}.tmp", self.version));
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&temp_path)?;
let mut writer = BufWriter::new(file);
let json = serde_json::to_string_pretty(self)?;
writer.write_all(json.as_bytes())?;
writer.flush()?;
writer.get_ref().sync_all()?;
Ok(temp_path)
}
pub fn commit_atomic(temp_path: &Path, db_path: &Path) -> Result<()> {
let current_path = db_path.join("MANIFEST-CURRENT");
std::fs::rename(temp_path, ¤t_path)?;
#[cfg(unix)]
{
use std::os::unix::io::AsRawFd;
let dir = File::open(db_path)?;
unsafe {
libc::fsync(dir.as_raw_fd());
}
}
#[cfg(not(unix))]
{
File::open(¤t_path)?.sync_all()?;
}
Ok(())
}
pub fn read_current(db_path: &Path) -> Result<Self> {
let current_path = db_path.join("MANIFEST-CURRENT");
if !current_path.exists() {
return Ok(Self::new(0));
}
let file = File::open(¤t_path)?;
let reader = BufReader::new(file);
let manifest: Manifest = serde_json::from_reader(reader)?;
Ok(manifest)
}
pub fn write_atomic(&self, db_path: &Path) -> Result<()> {
let temp_path = self.write_temp(db_path)?;
Self::commit_atomic(&temp_path, db_path)?;
Ok(())
}
pub fn verify_files(&self, db_path: &Path) -> Result<bool> {
if let Some(ref file) = self.lsm_file {
if !db_path.join(file).exists() {
return Ok(false);
}
}
for (_, file) in &self.vector_indexes {
if !db_path.join(file).exists() {
return Ok(false);
}
}
for (_, file) in &self.spatial_indexes {
if !db_path.join(file).exists() {
return Ok(false);
}
}
for (_, file) in &self.text_indexes {
if !db_path.join(file).exists() {
return Ok(false);
}
}
for (_, file) in &self.timestamp_indexes {
if !db_path.join(file).exists() {
return Ok(false);
}
}
for (_, file) in &self.column_indexes {
if !db_path.join(file).exists() {
return Ok(false);
}
}
Ok(true)
}
pub fn find_orphans(&self, db_path: &Path) -> Result<Vec<PathBuf>> {
let mut orphans = Vec::new();
let mut active_files = std::collections::HashSet::new();
if let Some(ref file) = self.lsm_file {
active_files.insert(file.clone());
}
for (_, file) in &self.vector_indexes {
active_files.insert(file.clone());
}
for (_, file) in &self.spatial_indexes {
active_files.insert(file.clone());
}
for (_, file) in &self.text_indexes {
active_files.insert(file.clone());
}
for (_, file) in &self.timestamp_indexes {
active_files.insert(file.clone());
}
for (_, file) in &self.column_indexes {
active_files.insert(file.clone());
}
for entry in std::fs::read_dir(db_path)? {
let entry = entry?;
let path = entry.path();
if path.is_file() {
let filename = path.file_name()
.and_then(|n| n.to_str())
.unwrap_or("");
if filename.ends_with(".sst") ||
filename.ends_with(".mmap") ||
filename.ends_with(".idx") ||
filename.ends_with(".bin") {
if !active_files.contains(filename) {
orphans.push(path);
}
}
}
}
Ok(orphans)
}
pub fn list_all_versions(db_path: &Path) -> Result<Vec<u64>> {
let mut versions = Vec::new();
for entry in std::fs::read_dir(db_path)? {
let entry = entry?;
let path = entry.path();
if path.is_file() {
if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
if filename.starts_with("MANIFEST-") && filename != "MANIFEST-CURRENT" {
if let Some(version_str) = filename.strip_prefix("MANIFEST-") {
if let Ok(version) = version_str.parse::<u64>() {
versions.push(version);
}
}
}
}
}
}
versions.sort_unstable();
Ok(versions)
}
pub fn read_version(db_path: &Path, version: u64) -> Result<Self> {
let manifest_path = db_path.join(format!("MANIFEST-{:06}", version));
if !manifest_path.exists() {
return Err(crate::StorageError::FileNotFound(
manifest_path
));
}
let file = File::open(&manifest_path)?;
let reader = BufReader::new(file);
let manifest: Manifest = serde_json::from_reader(reader)?;
Ok(manifest)
}
pub fn cleanup_old_versions(db_path: &Path, keep_versions: usize) -> Result<usize> {
if keep_versions == 0 {
return Ok(0);
}
let current = Self::read_current(db_path)?;
let mut active_files = std::collections::HashSet::new();
if let Some(ref file) = current.lsm_file {
active_files.insert(file.clone());
}
for (_, file) in ¤t.vector_indexes {
active_files.insert(file.clone());
}
for (_, file) in ¤t.spatial_indexes {
active_files.insert(file.clone());
}
for (_, file) in ¤t.text_indexes {
active_files.insert(file.clone());
}
for (_, file) in ¤t.timestamp_indexes {
active_files.insert(file.clone());
}
for (_, file) in ¤t.column_indexes {
active_files.insert(file.clone());
}
let mut all_versions = Self::list_all_versions(db_path)?;
all_versions.sort_unstable();
all_versions.reverse();
if all_versions.len() <= keep_versions {
return Ok(0); }
let versions_to_delete = &all_versions[keep_versions..];
let mut deleted_count = 0;
for &version in versions_to_delete {
if let Ok(old_manifest) = Self::read_version(db_path, version) {
if let Some(ref file) = old_manifest.lsm_file {
if !active_files.contains(file) {
let _ = std::fs::remove_file(db_path.join(file));
deleted_count += 1;
}
}
for (_, file) in &old_manifest.vector_indexes {
if !active_files.contains(file) {
let _ = std::fs::remove_file(db_path.join(file));
deleted_count += 1;
}
}
for (_, file) in &old_manifest.spatial_indexes {
if !active_files.contains(file) {
let _ = std::fs::remove_file(db_path.join(file));
deleted_count += 1;
}
}
for (_, file) in &old_manifest.text_indexes {
if !active_files.contains(file) {
let _ = std::fs::remove_file(db_path.join(file));
deleted_count += 1;
}
}
for (_, file) in &old_manifest.timestamp_indexes {
if !active_files.contains(file) {
let _ = std::fs::remove_file(db_path.join(file));
deleted_count += 1;
}
}
for (_, file) in &old_manifest.column_indexes {
if !active_files.contains(file) {
let _ = std::fs::remove_file(db_path.join(file));
deleted_count += 1;
}
}
let _ = std::fs::remove_file(db_path.join(format!("MANIFEST-{:06}", version)));
}
}
Ok(deleted_count)
}
pub fn diff_versions(
db_path: &Path,
from_version: u64,
to_version: u64,
) -> Result<(Vec<String>, Vec<String>)> {
let from_manifest = Self::read_version(db_path, from_version)?;
let to_manifest = Self::read_version(db_path, to_version)?;
let mut from_files = std::collections::HashSet::new();
let mut to_files = std::collections::HashSet::new();
Self::collect_files(&from_manifest, &mut from_files);
Self::collect_files(&to_manifest, &mut to_files);
let added: Vec<String> = to_files.difference(&from_files)
.cloned()
.collect();
let removed: Vec<String> = from_files.difference(&to_files)
.cloned()
.collect();
Ok((added, removed))
}
pub fn backup_incremental(
db_path: &Path,
from_version: u64,
to_version: u64,
backup_path: &Path,
) -> Result<usize> {
let (added_files, _removed) = Self::diff_versions(db_path, from_version, to_version)?;
std::fs::create_dir_all(backup_path)?;
let mut copied_count = 0;
for file in added_files {
let src = db_path.join(&file);
let dst = backup_path.join(&file);
if src.exists() {
if let Some(parent) = dst.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::copy(&src, &dst)?;
copied_count += 1;
}
}
let manifest_src = db_path.join(format!("MANIFEST-{:06}", to_version));
let manifest_dst = backup_path.join(format!("MANIFEST-{:06}", to_version));
if manifest_src.exists() {
std::fs::copy(&manifest_src, &manifest_dst)?;
}
Ok(copied_count)
}
pub fn restore_incremental(
backup_path: &Path,
version: u64,
target_path: &Path,
) -> Result<usize> {
std::fs::create_dir_all(target_path)?;
let manifest = Self::read_version(backup_path, version)?;
let mut restored_count = 0;
for file in Self::get_all_files(&manifest) {
let src = backup_path.join(&file);
let dst = target_path.join(&file);
if src.exists() {
if let Some(parent) = dst.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::copy(&src, &dst)?;
restored_count += 1;
}
}
let manifest_src = backup_path.join(format!("MANIFEST-{:06}", version));
let manifest_dst = target_path.join("MANIFEST-CURRENT");
if manifest_src.exists() {
std::fs::copy(&manifest_src, &manifest_dst)?;
}
Ok(restored_count)
}
fn collect_files(manifest: &Manifest, files: &mut std::collections::HashSet<String>) {
if let Some(ref file) = manifest.lsm_file {
files.insert(file.clone());
}
for (_, file) in &manifest.vector_indexes {
files.insert(file.clone());
}
for (_, file) in &manifest.spatial_indexes {
files.insert(file.clone());
}
for (_, file) in &manifest.text_indexes {
files.insert(file.clone());
}
for (_, file) in &manifest.timestamp_indexes {
files.insert(file.clone());
}
for (_, file) in &manifest.column_indexes {
files.insert(file.clone());
}
}
fn get_all_files(manifest: &Manifest) -> Vec<String> {
let mut files = Vec::new();
if let Some(ref file) = manifest.lsm_file {
files.push(file.clone());
}
for (_, file) in &manifest.vector_indexes {
files.push(file.clone());
}
for (_, file) in &manifest.spatial_indexes {
files.push(file.clone());
}
for (_, file) in &manifest.text_indexes {
files.push(file.clone());
}
for (_, file) in &manifest.timestamp_indexes {
files.push(file.clone());
}
for (_, file) in &manifest.column_indexes {
files.push(file.clone());
}
files
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_manifest_write_read() {
let dir = TempDir::new().unwrap();
let db_path = dir.path();
let mut manifest = Manifest::new(1);
manifest.lsm_file = Some("data_000001.sst".to_string());
manifest.vector_indexes.insert("embeddings".to_string(), "vector_000001.sst".to_string());
manifest.calculate_checksum().unwrap();
manifest.write_atomic(db_path).unwrap();
let read_manifest = Manifest::read_current(db_path).unwrap();
assert_eq!(read_manifest.version, 1);
assert_eq!(read_manifest.lsm_file, Some("data_000001.sst".to_string()));
assert_eq!(read_manifest.checksum, manifest.checksum);
}
#[test]
fn test_atomic_commit() {
let dir = TempDir::new().unwrap();
let db_path = dir.path();
let manifest1 = Manifest::new(1);
manifest1.write_atomic(db_path).unwrap();
let manifest2 = Manifest::new(2);
manifest2.write_atomic(db_path).unwrap();
let current = Manifest::read_current(db_path).unwrap();
assert_eq!(current.version, 2);
}
#[test]
fn test_orphan_detection() {
let dir = TempDir::new().unwrap();
let db_path = dir.path();
std::fs::write(db_path.join("data_000001.sst"), b"data").unwrap();
std::fs::write(db_path.join("data_000002.sst"), b"data").unwrap();
std::fs::write(db_path.join("vector_000001.sst"), b"vector").unwrap();
let mut manifest = Manifest::new(1);
manifest.lsm_file = Some("data_000002.sst".to_string());
manifest.write_atomic(db_path).unwrap();
let orphans = manifest.find_orphans(db_path).unwrap();
assert_eq!(orphans.len(), 2);
}
}