use memmap2::Mmap;
use std::collections::{HashMap, HashSet};
use std::convert::From;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Result, Seek, SeekFrom, Write};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
mod simd_copy;
use simd_copy::simd_copy;
mod digest;
pub use digest::{compute_checksum, compute_hash, Xxh3BuildHasher};
use log::{debug, info, warn};
use std::sync::atomic::{AtomicU64, Ordering};
impl std::ops::Deref for EntryHandle {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_slice()
}
}
impl PartialEq<[u8]> for EntryHandle {
fn eq(&self, other: &[u8]) -> bool {
self.as_slice() == other
}
}
impl PartialEq<&[u8]> for EntryHandle {
fn eq(&self, other: &&[u8]) -> bool {
self.as_slice() == *other
}
}
impl PartialEq<Vec<u8>> for EntryHandle {
fn eq(&self, other: &Vec<u8>) -> bool {
self.as_slice() == other.as_slice()
}
}
const METADATA_SIZE: usize = 20;
const KEY_HASH_RANGE: std::ops::Range<usize> = 0..8;
const PREV_OFFSET_RANGE: std::ops::Range<usize> = 8..16;
const CHECKSUM_RANGE: std::ops::Range<usize> = 16..20;
const NULL_BYTE: [u8; 1] = [0];
const CHECKSUM_LEN: usize = CHECKSUM_RANGE.end - CHECKSUM_RANGE.start;
#[repr(C)]
#[derive(Debug)]
pub struct EntryMetadata {
key_hash: u64, prev_offset: u64, checksum: [u8; 4], }
impl EntryMetadata {
#[inline]
fn serialize(&self) -> [u8; METADATA_SIZE] {
let mut buf = [0u8; METADATA_SIZE];
buf[KEY_HASH_RANGE].copy_from_slice(&self.key_hash.to_le_bytes());
buf[PREV_OFFSET_RANGE].copy_from_slice(&self.prev_offset.to_le_bytes());
buf[CHECKSUM_RANGE].copy_from_slice(&self.checksum);
buf
}
#[inline]
fn deserialize(data: &[u8]) -> Self {
Self {
key_hash: u64::from_le_bytes(data[KEY_HASH_RANGE].try_into().unwrap()),
prev_offset: u64::from_le_bytes(data[PREV_OFFSET_RANGE].try_into().unwrap()),
checksum: {
let mut checksum = [0u8; CHECKSUM_LEN];
checksum.copy_from_slice(&data[CHECKSUM_RANGE]);
checksum
},
}
}
}
pub struct EntryIterator {
mmap: Arc<Mmap>, cursor: u64,
seen_keys: HashSet<u64, Xxh3BuildHasher>,
}
impl EntryIterator {
pub fn new(mmap: Arc<Mmap>, last_offset: u64) -> Self {
Self {
mmap,
cursor: last_offset,
seen_keys: HashSet::with_hasher(Xxh3BuildHasher),
}
}
}
impl Iterator for EntryIterator {
type Item = EntryHandle;
fn next(&mut self) -> Option<Self::Item> {
if self.cursor < METADATA_SIZE as u64 || self.mmap.len() == 0 {
return None;
}
let metadata_offset = (self.cursor - METADATA_SIZE as u64) as usize;
let metadata_bytes = &self.mmap[metadata_offset..metadata_offset + METADATA_SIZE];
let metadata = EntryMetadata::deserialize(metadata_bytes);
let entry_start = metadata.prev_offset as usize;
let entry_end = metadata_offset;
if entry_start >= entry_end || entry_end > self.mmap.len() {
return None;
}
self.cursor = metadata.prev_offset;
if !self.seen_keys.insert(metadata.key_hash) {
return self.next(); }
let entry_data = &self.mmap[entry_start..entry_end];
if entry_data == NULL_BYTE {
return self.next();
}
Some(EntryHandle {
mmap_arc: Arc::clone(&self.mmap),
range: entry_start..entry_end,
metadata,
})
}
}
#[derive(Debug)]
pub struct EntryHandle {
mmap_arc: Arc<Mmap>,
range: Range<usize>,
metadata: EntryMetadata,
}
impl EntryHandle {
pub fn as_slice(&self) -> &[u8] {
&self.mmap_arc[self.range.clone()]
}
pub fn metadata(&self) -> &EntryMetadata {
&self.metadata
}
pub fn size(&self) -> usize {
self.range.len()
}
pub fn size_with_metadata(&self) -> usize {
self.range.len() + METADATA_SIZE
}
pub fn key_hash(&self) -> u64 {
self.metadata.key_hash
}
pub fn checksum(&self) -> u32 {
u32::from_le_bytes(self.metadata.checksum)
}
pub fn raw_checksum(&self) -> [u8; 4] {
self.metadata.checksum
}
pub fn is_valid_checksum(&self) -> bool {
let data = self.as_slice();
let computed = compute_checksum(data);
self.metadata.checksum == computed
}
pub fn start_offset(&self) -> usize {
self.range.start
}
pub fn end_offset(&self) -> usize {
self.range.end
}
pub fn offset_range(&self) -> Range<usize> {
self.range.clone()
}
pub fn address_range(&self) -> std::ops::Range<*const u8> {
let slice = self.as_slice();
let start_ptr = slice.as_ptr();
let end_ptr = unsafe { start_ptr.add(slice.len()) };
start_ptr..end_ptr
}
}
pub struct AppendStorage {
file: Arc<RwLock<BufWriter<File>>>, mmap: Arc<Mutex<Arc<Mmap>>>, last_offset: AtomicU64,
key_index: Arc<RwLock<HashMap<u64, u64, Xxh3BuildHasher>>>, path: PathBuf,
}
impl IntoIterator for AppendStorage {
type Item = EntryHandle;
type IntoIter = EntryIterator;
fn into_iter(self) -> Self::IntoIter {
self.iter_entries()
}
}
impl From<PathBuf> for AppendStorage {
fn from(path: PathBuf) -> Self {
AppendStorage::open(&path).expect("Failed to open storage file")
}
}
impl AppendStorage {
pub fn open(path: &Path) -> Result<Self> {
let file = Self::open_file_in_append_mode(path)?;
let file_len = file.get_ref().metadata()?.len();
let mmap = Self::init_mmap(&file)?;
let final_len = Self::recover_valid_chain(&mmap, file_len)?;
if final_len < file_len {
warn!(
"Truncating corrupted data in {} from offset {} to {}.",
path.display(),
final_len,
file_len
);
drop(mmap);
drop(file);
let file = OpenOptions::new().read(true).write(true).open(path)?;
file.set_len(final_len)?;
file.sync_all()?;
return Self::open(path);
}
let key_index = Self::build_key_index(&mmap, final_len);
Ok(Self {
file: Arc::new(RwLock::new(file)), mmap: Arc::new(Mutex::new(Arc::new(mmap))),
last_offset: final_len.into(),
key_index: Arc::new(RwLock::new(key_index)), path: path.to_path_buf(),
})
}
pub fn get_path(&self) -> PathBuf {
self.path.clone()
}
fn init_mmap(file: &BufWriter<File>) -> Result<Mmap> {
unsafe { memmap2::MmapOptions::new().map(file.get_ref()) }
}
fn open_file_in_append_mode(path: &Path) -> Result<BufWriter<File>> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
file.seek(SeekFrom::End(0))?;
Ok(BufWriter::new(file))
}
pub fn iter_entries(&self) -> EntryIterator {
let guard = self.mmap.lock().unwrap();
let mmap_clone = guard.clone();
drop(guard);
let last_offset = self.last_offset.load(Ordering::Acquire);
EntryIterator::new(mmap_clone, last_offset)
}
fn build_key_index(mmap: &Mmap, last_offset: u64) -> HashMap<u64, u64, Xxh3BuildHasher> {
let mut index = HashMap::with_hasher(Xxh3BuildHasher);
let mut seen_keys = HashSet::with_hasher(Xxh3BuildHasher);
let mut cursor = last_offset;
while cursor >= METADATA_SIZE as u64 {
let metadata_offset = cursor as usize - METADATA_SIZE;
let metadata_bytes = &mmap[metadata_offset..metadata_offset + METADATA_SIZE];
let metadata = EntryMetadata::deserialize(metadata_bytes);
if seen_keys.contains(&metadata.key_hash) {
cursor = metadata.prev_offset;
continue;
}
seen_keys.insert(metadata.key_hash);
index.insert(metadata.key_hash, metadata_offset as u64);
if metadata.prev_offset == 0 {
break;
}
cursor = metadata.prev_offset;
}
index
}
fn recover_valid_chain(mmap: &Mmap, file_len: u64) -> Result<u64> {
if file_len < METADATA_SIZE as u64 {
return Ok(0);
}
let mut cursor = file_len;
let mut best_valid_offset = None;
while cursor >= METADATA_SIZE as u64 {
let metadata_offset = cursor - METADATA_SIZE as u64;
let metadata_bytes =
&mmap[metadata_offset as usize..(metadata_offset as usize + METADATA_SIZE)];
let metadata = EntryMetadata::deserialize(metadata_bytes);
let entry_start = metadata.prev_offset;
if entry_start >= metadata_offset {
cursor -= 1;
continue;
}
let mut chain_valid = true;
let mut back_cursor = entry_start;
let mut total_size = (metadata_offset - entry_start) + METADATA_SIZE as u64;
let mut temp_chain = vec![metadata_offset];
while back_cursor != 0 {
if back_cursor < METADATA_SIZE as u64 {
chain_valid = false;
break;
}
let prev_metadata_offset = back_cursor - METADATA_SIZE as u64;
let prev_metadata_bytes = &mmap[prev_metadata_offset as usize
..(prev_metadata_offset as usize + METADATA_SIZE)];
let prev_metadata = EntryMetadata::deserialize(prev_metadata_bytes);
let entry_size = prev_metadata_offset.saturating_sub(prev_metadata.prev_offset);
total_size += entry_size + METADATA_SIZE as u64;
if prev_metadata.prev_offset >= prev_metadata_offset {
chain_valid = false;
break;
}
temp_chain.push(prev_metadata_offset);
back_cursor = prev_metadata.prev_offset;
}
if chain_valid && back_cursor == 0 && total_size <= file_len {
debug!(
"Found valid chain of {} entries. Ending at offset {}.",
temp_chain.len(),
metadata_offset + METADATA_SIZE as u64
);
best_valid_offset = Some(metadata_offset + METADATA_SIZE as u64);
break; }
cursor -= 1;
}
let final_len = best_valid_offset.unwrap_or(0);
Ok(final_len)
}
fn remap_file(&self) -> std::io::Result<()> {
let file_guard = self.file.read().map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to acquire file read lock",
)
})?;
let new_mmap = unsafe { memmap2::MmapOptions::new().map(file_guard.get_ref())? };
{
let mut guard = self.mmap.lock().unwrap();
*guard = Arc::new(new_mmap);
}
let new_offset = file_guard.get_ref().metadata()?.len();
self.last_offset
.store(new_offset, std::sync::atomic::Ordering::Release);
Ok(())
}
pub fn append_entry(&mut self, key: &[u8], payload: &[u8]) -> Result<u64> {
let key_hash = compute_hash(key);
self.append_entry_with_key_hash(key_hash, payload)
}
pub fn append_entry_with_key_hash(&mut self, key_hash: u64, payload: &[u8]) -> Result<u64> {
self.batch_write(vec![(key_hash, payload)])
}
pub fn append_entries(&mut self, entries: &[(&[u8], &[u8])]) -> Result<u64> {
let hashed_entries: Vec<(u64, &[u8])> = entries
.iter()
.map(|(key, payload)| (compute_hash(key), *payload))
.collect();
self.batch_write(hashed_entries)
}
pub fn append_entries_with_key_hashes(&mut self, entries: &[(u64, &[u8])]) -> Result<u64> {
self.batch_write(entries.to_vec())
}
fn batch_write(&mut self, entries: Vec<(u64, &[u8])>) -> Result<u64> {
{
let mut file = self.file.write().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "Failed to acquire file lock")
})?;
let mut buffer = Vec::new();
let mut last_offset = self.last_offset.load(Ordering::Acquire);
for (key_hash, payload) in entries {
if payload.is_empty() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Payload cannot be empty.",
));
}
let prev_offset = last_offset;
let checksum = compute_checksum(payload);
let metadata = EntryMetadata {
key_hash,
prev_offset,
checksum,
};
let mut entry = vec![0u8; payload.len() + METADATA_SIZE];
simd_copy(&mut entry[..payload.len()], payload);
entry[payload.len()..].copy_from_slice(&metadata.serialize());
buffer.extend_from_slice(&entry);
last_offset += entry.len() as u64;
{
let mut key_index = self.key_index.write().map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to acquire index lock",
)
})?;
key_index.insert(key_hash, last_offset - METADATA_SIZE as u64);
} }
file.write_all(&buffer)?;
file.flush()?;
self.last_offset.store(last_offset, Ordering::Release);
}
self.remap_file()?;
Ok(self.last_offset.load(Ordering::Acquire))
}
pub fn read_last_entry(&self) -> Option<EntryHandle> {
let guard = self.mmap.lock().unwrap();
let mmap_arc = Arc::clone(&*guard);
drop(guard);
let last_offset = self.last_offset.load(std::sync::atomic::Ordering::Acquire);
if last_offset < METADATA_SIZE as u64 || mmap_arc.len() == 0 {
return None;
}
let metadata_offset = (last_offset - METADATA_SIZE as u64) as usize;
if metadata_offset + METADATA_SIZE > mmap_arc.len() {
return None;
}
let metadata_bytes = &mmap_arc[metadata_offset..metadata_offset + METADATA_SIZE];
let metadata = EntryMetadata::deserialize(metadata_bytes);
let entry_start = metadata.prev_offset as usize;
let entry_end = metadata_offset;
if entry_start >= entry_end || entry_end > mmap_arc.len() {
return None;
}
Some(EntryHandle {
mmap_arc,
range: entry_start..entry_end,
metadata,
})
}
pub fn get_entry_by_key(&self, key: &[u8]) -> Option<EntryHandle> {
let key_hash = compute_hash(key);
let guard = self.mmap.lock().unwrap();
let mmap_arc = Arc::clone(&*guard); drop(guard);
let last_offset = self.last_offset.load(std::sync::atomic::Ordering::Acquire);
if last_offset < METADATA_SIZE as u64 || mmap_arc.len() == 0 {
return None;
}
let offset = *self.key_index.read().ok()?.get(&key_hash)?;
if offset as usize + METADATA_SIZE > mmap_arc.len() {
return None;
}
let metadata_bytes = &mmap_arc[offset as usize..offset as usize + METADATA_SIZE];
let metadata = EntryMetadata::deserialize(metadata_bytes);
let entry_start = metadata.prev_offset as usize;
let entry_end = offset as usize;
if entry_start >= entry_end || entry_end > mmap_arc.len() {
return None;
}
if entry_end - entry_start == 1 && &mmap_arc[entry_start..entry_end] == NULL_BYTE {
return None;
}
Some(EntryHandle {
mmap_arc,
range: entry_start..entry_end,
metadata,
})
}
pub fn copy_entry(&self, key: &[u8], target: &mut AppendStorage) -> Result<u64> {
let entry_handle = self.get_entry_by_key(key).ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Key not found: {:?}", key),
)
})?;
self.copy_entry_handle(&entry_handle, target)
}
fn copy_entry_handle(&self, entry: &EntryHandle, target: &mut AppendStorage) -> Result<u64> {
let metadata = entry.metadata();
let result = target.append_entry_with_key_hash(metadata.key_hash, &entry)?;
Ok(result)
}
pub fn move_entry(&mut self, key: &[u8], target: &mut AppendStorage) -> Result<u64> {
self.copy_entry(key, target)?;
self.delete_entry(&key)
}
pub fn delete_entry(&mut self, key: &[u8]) -> Result<u64> {
self.append_entry(key, &NULL_BYTE)
}
pub fn compact(&mut self) -> std::io::Result<()> {
let compacted_path = self.path.with_extension("bk");
debug!("Starting compaction. Writing to: {:?}", compacted_path);
let mut compacted_storage = AppendStorage::open(&compacted_path)?;
for entry in self.iter_entries() {
self.copy_entry_handle(&entry, &mut compacted_storage)?;
}
{
let mut file_guard = compacted_storage.file.write().map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, format!("Lock poisoned: {}", e))
})?;
file_guard.flush()?;
}
drop(compacted_storage);
debug!("Reduced backup completed. Swapping files...");
std::fs::rename(&compacted_path, &self.path)?;
info!("Compaction successful.");
Ok(())
}
pub fn count(&self) -> usize {
self.iter_entries().count()
}
pub fn estimate_compaction_savings(&self) -> u64 {
let total_size = self.get_storage_size().unwrap_or(0);
let mut unique_entry_size: u64 = 0;
let mut seen_keys = HashSet::with_hasher(Xxh3BuildHasher);
let guard = self.mmap.lock().unwrap();
let mmap_arc = Arc::clone(&*guard);
drop(guard);
for entry in self.iter_entries() {
let entry_start_offset = entry.as_ptr() as usize - mmap_arc.as_ptr() as usize;
let metadata_offset = entry_start_offset + entry.len();
if metadata_offset + METADATA_SIZE > mmap_arc.len() {
warn!("Skipping corrupted entry at offset {}", entry_start_offset);
continue;
}
let metadata_bytes = &mmap_arc[metadata_offset..metadata_offset + METADATA_SIZE];
let metadata = EntryMetadata::deserialize(metadata_bytes);
if seen_keys.insert(metadata.key_hash) {
unique_entry_size += entry.len() as u64 + METADATA_SIZE as u64;
}
}
total_size.saturating_sub(unique_entry_size)
}
pub fn get_storage_size(&self) -> Result<u64> {
std::fs::metadata(&self.path).map(|meta| meta.len())
}
}