use memmap2::Mmap;
use std::collections::{HashMap, HashSet};
use std::convert::From;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Result, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
mod simd_copy;
use simd_copy::simd_copy;
mod digest;
use digest::{compute_checksum, compute_hash, Xxh3BuildHasher};
use log::{debug, info, warn};
const METADATA_SIZE: usize = 20;
const KEY_HASH_RANGE: std::ops::Range<usize> = 0..8;
const PREV_OFFSET_RANGE: std::ops::Range<usize> = 8..16;
const CHECKSUM_RANGE: std::ops::Range<usize> = 16..20;
const NULL_BYTE: [u8; 1] = [0];
const CHECKSUM_LEN: usize = CHECKSUM_RANGE.end - CHECKSUM_RANGE.start;
#[repr(C)]
#[derive(Debug, Clone, Copy)]
struct EntryMetadata {
key_hash: u64, prev_offset: u64, checksum: [u8; 4], }
impl EntryMetadata {
#[inline]
fn serialize(&self) -> [u8; METADATA_SIZE] {
let mut buf = [0u8; METADATA_SIZE];
buf[KEY_HASH_RANGE].copy_from_slice(&self.key_hash.to_le_bytes());
buf[PREV_OFFSET_RANGE].copy_from_slice(&self.prev_offset.to_le_bytes());
buf[CHECKSUM_RANGE].copy_from_slice(&self.checksum);
buf
}
#[inline]
fn deserialize(data: &[u8]) -> Self {
Self {
key_hash: u64::from_le_bytes(data[KEY_HASH_RANGE].try_into().unwrap()),
prev_offset: u64::from_le_bytes(data[PREV_OFFSET_RANGE].try_into().unwrap()),
checksum: {
let mut checksum = [0u8; CHECKSUM_LEN];
checksum.copy_from_slice(&data[CHECKSUM_RANGE]);
checksum
},
}
}
}
pub struct EntryIterator<'a> {
mmap: &'a Mmap,
cursor: u64,
seen_keys: HashSet<u64, Xxh3BuildHasher>,
}
impl<'a> EntryIterator<'a> {
pub fn new(mmap: &'a Mmap, last_offset: u64) -> Self {
Self {
mmap,
cursor: last_offset,
seen_keys: HashSet::with_hasher(Xxh3BuildHasher),
}
}
}
impl<'a> Iterator for EntryIterator<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
if self.cursor < METADATA_SIZE as u64 || self.mmap.len() == 0 {
return None;
}
let metadata_offset = (self.cursor - METADATA_SIZE as u64) as usize;
let metadata_bytes = &self.mmap[metadata_offset..metadata_offset + METADATA_SIZE];
let metadata = EntryMetadata::deserialize(metadata_bytes);
let entry_start = metadata.prev_offset as usize;
let entry_end = metadata_offset;
if entry_start >= entry_end || entry_end > self.mmap.len() {
return None;
}
self.cursor = metadata.prev_offset;
if !self.seen_keys.insert(metadata.key_hash) {
return self.next(); }
let entry_data = &self.mmap[entry_start..entry_end];
if entry_data == NULL_BYTE {
return self.next();
}
Some(entry_data)
}
}
pub struct AppendStorage {
file: BufWriter<File>,
mmap: Arc<Mmap>,
last_offset: u64,
key_index: HashMap<u64, u64, Xxh3BuildHasher>, lock: Arc<RwLock<()>>,
path: PathBuf,
}
impl<'a> IntoIterator for &'a AppendStorage {
type Item = &'a [u8];
type IntoIter = EntryIterator<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter_entries()
}
}
impl From<PathBuf> for AppendStorage {
fn from(path: PathBuf) -> Self {
AppendStorage::open(&path).expect("Failed to open storage file")
}
}
impl AppendStorage {
pub fn open(path: &Path) -> Result<Self> {
let file = Self::open_file_in_append_mode(path)?;
let file_len = file.get_ref().metadata()?.len();
let mmap = Self::init_mmap(&file)?;
let final_len = Self::recover_valid_chain(&mmap, file_len)?;
if final_len < file_len {
warn!(
"Truncating corrupted data in {} from offset {} to {}.",
path.display(),
final_len,
file_len
);
drop(mmap);
drop(file);
let file = OpenOptions::new().read(true).write(true).open(path)?;
file.set_len(final_len)?;
file.sync_all()?;
return Self::open(path);
}
let key_index = Self::build_key_index(&mmap, final_len);
Ok(Self {
file,
mmap: Arc::new(mmap),
last_offset: final_len,
key_index,
lock: Arc::new(RwLock::new(())),
path: path.to_path_buf(),
})
}
fn init_mmap(file: &BufWriter<File>) -> Result<Mmap> {
unsafe { memmap2::MmapOptions::new().map(file.get_ref()) }
}
fn open_file_in_append_mode(path: &Path) -> Result<BufWriter<File>> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
file.seek(SeekFrom::End(0))?;
Ok(BufWriter::new(file))
}
pub fn iter_entries(&self) -> EntryIterator {
EntryIterator::new(&self.mmap, self.last_offset)
}
fn build_key_index(mmap: &Mmap, last_offset: u64) -> HashMap<u64, u64, Xxh3BuildHasher> {
let mut index = HashMap::with_hasher(Xxh3BuildHasher);
let mut seen_keys = HashSet::with_hasher(Xxh3BuildHasher);
let mut cursor = last_offset;
while cursor >= METADATA_SIZE as u64 {
let metadata_offset = cursor as usize - METADATA_SIZE;
let metadata_bytes = &mmap[metadata_offset..metadata_offset + METADATA_SIZE];
let metadata = EntryMetadata::deserialize(metadata_bytes);
if seen_keys.contains(&metadata.key_hash) {
cursor = metadata.prev_offset;
continue;
}
seen_keys.insert(metadata.key_hash);
index.insert(metadata.key_hash, metadata_offset as u64);
if metadata.prev_offset == 0 {
break;
}
cursor = metadata.prev_offset;
}
index
}
fn recover_valid_chain(mmap: &Mmap, file_len: u64) -> Result<u64> {
if file_len < METADATA_SIZE as u64 {
return Ok(0);
}
let mut cursor = file_len;
let mut best_valid_offset = None;
while cursor >= METADATA_SIZE as u64 {
let metadata_offset = cursor - METADATA_SIZE as u64;
let metadata_bytes =
&mmap[metadata_offset as usize..(metadata_offset as usize + METADATA_SIZE)];
let metadata = EntryMetadata::deserialize(metadata_bytes);
let entry_start = metadata.prev_offset;
if entry_start >= metadata_offset {
cursor -= 1;
continue;
}
let mut chain_valid = true;
let mut back_cursor = entry_start;
let mut total_size = (metadata_offset - entry_start) + METADATA_SIZE as u64;
let mut temp_chain = vec![metadata_offset];
while back_cursor != 0 {
if back_cursor < METADATA_SIZE as u64 {
chain_valid = false;
break;
}
let prev_metadata_offset = back_cursor - METADATA_SIZE as u64;
let prev_metadata_bytes = &mmap[prev_metadata_offset as usize
..(prev_metadata_offset as usize + METADATA_SIZE)];
let prev_metadata = EntryMetadata::deserialize(prev_metadata_bytes);
let entry_size = prev_metadata_offset - prev_metadata.prev_offset;
total_size += entry_size + METADATA_SIZE as u64;
if prev_metadata.prev_offset >= prev_metadata_offset {
chain_valid = false;
break;
}
temp_chain.push(prev_metadata_offset);
back_cursor = prev_metadata.prev_offset;
}
if chain_valid && back_cursor == 0 && total_size <= file_len {
debug!(
"Found valid chain of {} entries. Ending at offset {}.",
temp_chain.len(),
metadata_offset + METADATA_SIZE as u64
);
best_valid_offset = Some(metadata_offset + METADATA_SIZE as u64);
break; }
cursor -= 1;
}
let final_len = best_valid_offset.unwrap_or(0);
Ok(final_len)
}
fn remap_file(&mut self) -> Result<()> {
let mmap = Self::init_mmap(&self.file)?;
self.mmap = Arc::new(mmap);
Ok(())
}
pub fn append_entry(&mut self, key: &[u8], payload: &[u8]) -> Result<u64> {
let key_hash = compute_hash(key);
self.append_entry_with_key_hash(key_hash, payload)
}
pub fn delete_entry(&mut self, key: &[u8]) -> Result<u64> {
self.append_entry(key, &NULL_BYTE)
}
pub fn append_entry_with_key_hash(&mut self, key_hash: u64, payload: &[u8]) -> Result<u64> {
self.batch_write(vec![(key_hash, payload)])
}
pub fn append_entries(&mut self, entries: &[(&[u8], &[u8])]) -> Result<u64> {
let hashed_entries: Vec<(u64, &[u8])> = entries
.iter()
.map(|(key, payload)| (compute_hash(key), *payload))
.collect();
self.batch_write(hashed_entries)
}
pub fn append_entries_with_key_hashes(&mut self, entries: &[(u64, &[u8])]) -> Result<u64> {
self.batch_write(entries.to_vec())
}
fn batch_write(&mut self, entries: Vec<(u64, &[u8])>) -> Result<u64> {
{
let _write_lock = self.lock.write().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "Failed to acquire write lock")
})?;
let mut buffer = Vec::new(); let mut last_offset = self.last_offset;
for (key_hash, payload) in entries {
if payload.is_empty() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Payload cannot be empty.",
));
}
let prev_offset = last_offset;
let checksum = compute_checksum(payload);
let metadata = EntryMetadata {
key_hash,
prev_offset,
checksum,
};
let mut entry = vec![0u8; payload.len() + METADATA_SIZE];
simd_copy(&mut entry[..payload.len()], payload);
entry[payload.len()..].copy_from_slice(&metadata.serialize());
buffer.extend_from_slice(&entry);
last_offset += entry.len() as u64;
self.key_index
.insert(key_hash, last_offset - METADATA_SIZE as u64);
}
self.file.write_all(&buffer)?;
self.file.flush()?;
self.last_offset = last_offset;
}
self.remap_file()?;
Ok(self.last_offset)
}
pub fn read_last_entry(&self) -> Option<&[u8]> {
let _read_lock = self.lock.read().ok()?;
if self.last_offset < METADATA_SIZE as u64 || self.mmap.len() == 0 {
return None;
}
let metadata_offset = (self.last_offset - METADATA_SIZE as u64) as usize;
let metadata_bytes = &self.mmap[metadata_offset..metadata_offset + METADATA_SIZE];
let metadata = EntryMetadata::deserialize(metadata_bytes);
let entry_start = metadata.prev_offset as usize;
let entry_end = metadata_offset;
if entry_start >= entry_end || entry_end > self.mmap.len() {
return None;
}
Some(&self.mmap[entry_start..entry_end]) }
pub fn get_entry_by_key(&self, key: &[u8]) -> Option<&[u8]> {
let key_hash = compute_hash(key);
if let Some(&offset) = self.key_index.get(&key_hash) {
let metadata_bytes = &self.mmap[offset as usize..offset as usize + METADATA_SIZE];
let metadata = EntryMetadata::deserialize(metadata_bytes);
let entry_start = metadata.prev_offset as usize;
let entry = &self.mmap[entry_start..offset as usize];
if entry == NULL_BYTE {
return None;
}
return Some(entry);
}
None
}
pub fn compact(&mut self) -> Result<()> {
let _write_lock = self.lock.write().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "Failed to acquire write lock")
})?;
let compacted_path = self.path.with_extension("bk");
debug!("Starting compaction. Writing to: {:?}", compacted_path);
let mut compacted_storage = AppendStorage::open(&compacted_path)?;
for entry in self.iter_entries() {
let entry_start_offset = entry.as_ptr() as usize - self.mmap.as_ptr() as usize;
let metadata_offset = entry_start_offset + entry.len();
if metadata_offset + METADATA_SIZE > self.mmap.len() {
warn!("Skipping corrupted entry at offset {}", entry_start_offset);
continue;
}
let metadata_bytes = &self.mmap[metadata_offset..metadata_offset + METADATA_SIZE];
let metadata = EntryMetadata::deserialize(metadata_bytes);
compacted_storage.append_entry_with_key_hash(metadata.key_hash, entry)?;
debug!(
"Writing key_hash: {} | entry_size: {}",
metadata.key_hash,
entry.len()
);
}
compacted_storage.file.flush()?;
drop(compacted_storage);
debug!("Reduced backup completed. Swapping files...");
std::fs::rename(&compacted_path, &self.path)?;
info!("Compaction successful.");
Ok(())
}
pub fn count(&self) -> usize {
self.iter_entries().count()
}
pub fn estimate_compaction_savings(&self) -> u64 {
let total_size = self.get_storage_size().unwrap_or(0);
let mut unique_entry_size: u64 = 0;
let mut seen_keys = HashSet::with_hasher(Xxh3BuildHasher);
for entry in self.iter_entries() {
let entry_start_offset = entry.as_ptr() as usize - self.mmap.as_ptr() as usize;
let metadata_offset = entry_start_offset + entry.len();
if metadata_offset + METADATA_SIZE > self.mmap.len() {
warn!("Skipping corrupted entry at offset {}", entry_start_offset);
continue;
}
let metadata_bytes = &self.mmap[metadata_offset..metadata_offset + METADATA_SIZE];
let metadata = EntryMetadata::deserialize(metadata_bytes);
if seen_keys.insert(metadata.key_hash) {
unique_entry_size += entry.len() as u64 + METADATA_SIZE as u64;
}
}
total_size.saturating_sub(unique_entry_size)
}
pub fn get_storage_size(&self) -> Result<u64> {
std::fs::metadata(&self.path).map(|meta| meta.len())
}
}