use std::path::Path;
use std::sync::Arc;
use crate::disk_loc::DiskLoc;
use crate::entry::entry_size;
use crate::error::DbResult;
use crate::fixed::config::FixedConfig;
use crate::fixed::slot;
use crate::key::Location;
use crate::sync::{self, MutexGuard};
pub trait DurabilityInner: Send {
type Loc: Location;
fn write_new(&mut self, shard_id: u8, key: &[u8], value: &[u8]) -> DbResult<Self::Loc>;
fn write_update(
&mut self,
shard_id: u8,
old_loc: Self::Loc,
key: &[u8],
value: &[u8],
) -> DbResult<Self::Loc>;
fn write_tombstone(&mut self, shard_id: u8, old_loc: Self::Loc, key: &[u8]) -> DbResult<()>;
fn write_discard(&mut self, loc: Self::Loc) -> DbResult<()>;
fn should_sync(&self) -> bool;
fn sync(&mut self) -> DbResult<()>;
}
pub trait Durability: Send + Sync + Sized {
type Loc: Location;
type Inner: DurabilityInner<Loc = Self::Loc>;
fn shard_count(&self) -> usize;
fn lock_shard(&self, shard_id: usize) -> MutexGuard<'_, Self::Inner>;
fn shard_prefix_bits(&self) -> usize;
fn flush(&self) -> DbResult<()>;
fn close(&self) -> DbResult<()>;
}
#[allow(dead_code)]
pub struct Bitcask {
pub(crate) engine: crate::engine::Engine,
pub(crate) compaction_threshold: f64,
}
impl DurabilityInner for crate::shard::ShardInner {
type Loc = DiskLoc;
fn write_new(&mut self, shard_id: u8, key: &[u8], value: &[u8]) -> DbResult<DiskLoc> {
let (loc, _gsn) = self.append_entry(shard_id, key, value, false)?;
Ok(loc)
}
fn write_update(
&mut self,
shard_id: u8,
old_loc: DiskLoc,
key: &[u8],
value: &[u8],
) -> DbResult<DiskLoc> {
let (new_loc, _gsn) = self.append_entry(shard_id, key, value, false)?;
self.add_dead_bytes(old_loc.file_id as u32, entry_size(key.len(), old_loc.len));
Ok(new_loc)
}
fn write_tombstone(&mut self, shard_id: u8, old_loc: DiskLoc, key: &[u8]) -> DbResult<()> {
let (_loc, _gsn) = self.append_entry(shard_id, key, &[], true)?;
self.add_dead_bytes(old_loc.file_id as u32, entry_size(key.len(), old_loc.len));
Ok(())
}
fn write_discard(&mut self, _loc: DiskLoc) -> DbResult<()> {
Ok(())
}
fn should_sync(&self) -> bool {
false
}
fn sync(&mut self) -> DbResult<()> {
Ok(())
}
}
impl Durability for Bitcask {
type Loc = DiskLoc;
type Inner = crate::shard::ShardInner;
fn shard_count(&self) -> usize {
self.engine.shards().len()
}
fn lock_shard(&self, shard_id: usize) -> MutexGuard<'_, crate::shard::ShardInner> {
self.engine.shards()[shard_id].lock()
}
fn shard_prefix_bits(&self) -> usize {
self.engine.config().shard_prefix_bits
}
fn flush(&self) -> DbResult<()> {
self.engine.flush()
}
fn close(&self) -> DbResult<()> {
self.engine.flush()
}
}
#[allow(dead_code)]
pub struct Fixed {
pub(crate) engine: Arc<crate::fixed::engine::FixedEngine>,
}
impl Fixed {
pub fn open(
path: impl AsRef<Path>,
config: FixedConfig,
key_len: usize,
value_len: usize,
) -> DbResult<Self> {
let engine = crate::fixed::engine::FixedEngine::open(
path,
config,
key_len as u16,
value_len as u16,
)?;
Ok(Self {
engine: Arc::new(engine),
})
}
pub fn recover_entries(
&self,
mut visitor: impl FnMut(usize, &[u8], &[u8], u32),
) -> DbResult<u32> {
let shards = self.engine.shards();
let mut total_recovered = 0u32;
for (shard_idx, shard) in shards.iter().enumerate() {
let mut inner = shard.inner.lock();
let key_len = inner.key_len() as usize;
let value_len = inner.value_len() as usize;
let slot_count = inner.slot_count();
let dir = inner.dir().to_path_buf();
let used_sidecar = if inner.has_clean_shutdown() {
match inner.load_versions_sidecar() {
Ok(()) => true,
Err(e) => {
tracing::warn!(
shard = shard_idx, error = %e,
"fixed.versions sidecar invalid; falling back to full scan"
);
let _ = std::fs::remove_file(dir.join("fixed.versions"));
false
}
}
} else {
false
};
if used_sidecar {
for slot_id in 0..slot_count {
if slot::status_of(inner.versions[slot_id as usize]) != slot::STATUS_OCCUPIED {
continue;
}
let buf = inner.read_slot(slot_id)?;
match slot::read_slot(&buf, key_len, value_len) {
Some((_m, k, v)) => {
inner.bitmap.set(slot_id);
visitor(shard_idx, k, v, slot_id);
total_recovered += 1;
}
None => {
let meta = inner.versions[slot_id as usize];
inner.versions[slot_id as usize] =
slot::pack_meta(slot::STATUS_FREE, slot::version_of(meta));
inner.bitmap.clear(slot_id);
}
}
}
} else {
for slot_id in 0..slot_count {
let buf = inner.read_slot(slot_id)?;
let meta = slot::meta_of(&buf);
let status = slot::status_of(meta);
inner.versions[slot_id as usize] = meta;
if status == slot::STATUS_OCCUPIED {
if let Some((_m, k, v)) = slot::read_slot(&buf, key_len, value_len) {
inner.bitmap.set(slot_id);
visitor(shard_idx, k, v, slot_id);
total_recovered += 1;
} else {
inner.versions[slot_id as usize] =
slot::pack_meta(slot::STATUS_FREE, slot::version_of(meta));
}
}
}
}
inner.clear_clean_shutdown()?;
}
Ok(total_recovered)
}
}
impl DurabilityInner for crate::fixed::shard::FixedShardInner {
type Loc = u32;
fn write_new(&mut self, _shard_id: u8, key: &[u8], value: &[u8]) -> DbResult<u32> {
let slot = self.alloc_slot()?;
let _ = self.write_slot(slot, key, value)?;
Ok(slot)
}
fn write_update(
&mut self,
_shard_id: u8,
old_loc: u32,
key: &[u8],
value: &[u8],
) -> DbResult<u32> {
let _ = self.write_slot(old_loc, key, value)?;
Ok(old_loc)
}
fn write_tombstone(&mut self, _shard_id: u8, old_loc: u32, key: &[u8]) -> DbResult<()> {
self.delete_slot(old_loc, key)?;
self.bitmap.clear(old_loc);
Ok(())
}
fn write_discard(&mut self, loc: u32) -> DbResult<()> {
self.delete_slot(loc, &[])?;
self.bitmap.clear(loc);
Ok(())
}
fn should_sync(&self) -> bool {
self.should_sync()
}
fn sync(&mut self) -> DbResult<()> {
self.sync()
}
}
impl Durability for Fixed {
type Loc = u32;
type Inner = crate::fixed::shard::FixedShardInner;
fn shard_count(&self) -> usize {
self.engine.shards().len()
}
fn lock_shard(&self, shard_id: usize) -> MutexGuard<'_, crate::fixed::shard::FixedShardInner> {
sync::lock(&self.engine.shards()[shard_id].inner)
}
fn shard_prefix_bits(&self) -> usize {
self.engine.config().shard_prefix_bits
}
fn flush(&self) -> DbResult<()> {
self.engine.flush()
}
fn close(&self) -> DbResult<()> {
self.engine.close()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fixed::config::FixedConfig;
use crate::fixed::slot::{STATUS_DELETED, STATUS_FREE, STATUS_OCCUPIED, status_of, version_of};
use tempfile::tempdir;
fn test_fixed_config() -> FixedConfig {
FixedConfig {
shard_count: 1,
grow_step: 16,
..FixedConfig::test()
}
}
#[test]
fn test_recover_populates_versions_dirty() {
let dir = tempdir().unwrap();
let path = dir.path().join("db");
{
let fixed = Fixed::open(&path, test_fixed_config(), 8, 16).unwrap();
{
let mut inner = fixed.engine.shards()[0].inner.lock();
let id0 = inner.alloc_slot().unwrap();
inner
.write_slot(id0, b"keyaaaaa", b"val_0000_0000_00")
.unwrap();
let id1 = inner.alloc_slot().unwrap();
inner
.write_slot(id1, b"keybbbbb", b"val_0001_0000_00")
.unwrap();
inner.delete_slot(id1, b"keybbbbb").unwrap();
inner.sync().unwrap();
}
drop(fixed);
}
let fixed = Fixed::open(&path, test_fixed_config(), 8, 16).unwrap();
let mut entries = Vec::new();
fixed
.recover_entries(|_shard, k, v, slot_id| {
entries.push((k.to_vec(), v.to_vec(), slot_id));
})
.unwrap();
assert_eq!(entries.len(), 1, "only one OCCUPIED entry");
assert_eq!(entries[0].0, b"keyaaaaa");
let inner = fixed.engine.shards()[0].inner.lock();
assert_eq!(status_of(inner.versions[0]), STATUS_OCCUPIED);
assert_eq!(status_of(inner.versions[1]), STATUS_DELETED);
}
#[test]
fn test_recover_populates_versions_clean() {
let dir = tempdir().unwrap();
let path = dir.path().join("db");
{
let fixed = Fixed::open(&path, test_fixed_config(), 8, 16).unwrap();
{
let mut inner = fixed.engine.shards()[0].inner.lock();
let id = inner.alloc_slot().unwrap();
inner
.write_slot(id, b"keyaaaaa", b"val_0000_0000_00")
.unwrap();
}
fixed.engine.close().unwrap();
}
let fixed = Fixed::open(&path, test_fixed_config(), 8, 16).unwrap();
let mut entries = Vec::new();
fixed
.recover_entries(|_shard, k, _v, _id| {
entries.push(k.to_vec());
})
.unwrap();
assert_eq!(entries.len(), 1);
let inner = fixed.engine.shards()[0].inner.lock();
assert_eq!(status_of(inner.versions[0]), STATUS_OCCUPIED);
assert_eq!(version_of(inner.versions[0]), 1);
}
#[test]
fn test_torn_slot_preserves_version() {
let dir = tempdir().unwrap();
let path = dir.path().join("db");
{
let fixed = Fixed::open(&path, test_fixed_config(), 8, 16).unwrap();
{
let mut inner = fixed.engine.shards()[0].inner.lock();
let id = inner.alloc_slot().unwrap();
inner
.write_slot(id, b"keyaaaaa", b"val_0000_0000_00")
.unwrap();
inner.sync().unwrap();
}
drop(fixed);
}
let shard_data = path.join("shard_000").join("fixed.data");
use std::os::unix::fs::FileExt;
let f = std::fs::OpenOptions::new()
.write(true)
.open(&shard_data)
.unwrap();
f.write_all_at(&[0xFFu8; 16], 4112).unwrap();
f.sync_data().unwrap();
drop(f);
let fixed = Fixed::open(&path, test_fixed_config(), 8, 16).unwrap();
let mut entries = Vec::new();
fixed
.recover_entries(|_shard, k, _v, _id| {
entries.push(k.to_vec());
})
.unwrap();
assert!(entries.is_empty(), "torn slot must be skipped");
let inner = fixed.engine.shards()[0].inner.lock();
assert_eq!(status_of(inner.versions[0]), STATUS_FREE);
assert_eq!(
version_of(inner.versions[0]),
1,
"version must be preserved so next bump continues monotonically"
);
}
}