use std::path::Path;
use crate::disk_loc::DiskLoc;
use crate::entry::entry_size;
use crate::error::DbResult;
use crate::fixed::config::FixedConfig;
use crate::fixed::slot;
use crate::key::Location;
use crate::sync::{self, MutexGuard};
pub trait DurabilityInner: Send {
type Loc: Location;
fn write_new(&mut self, shard_id: u8, key: &[u8], value: &[u8]) -> DbResult<Self::Loc>;
fn write_update(
&mut self,
shard_id: u8,
old_loc: Self::Loc,
key: &[u8],
value: &[u8],
) -> DbResult<Self::Loc>;
fn write_tombstone(&mut self, shard_id: u8, old_loc: Self::Loc, key: &[u8]) -> DbResult<()>;
fn write_discard(&mut self, loc: Self::Loc) -> DbResult<()>;
fn should_sync(&self) -> bool;
fn sync(&mut self) -> DbResult<()>;
}
pub trait Durability: Send + Sync + Sized {
type Loc: Location;
type Inner: DurabilityInner<Loc = Self::Loc>;
fn shard_count(&self) -> usize;
fn lock_shard(&self, shard_id: usize) -> MutexGuard<'_, Self::Inner>;
fn shard_prefix_bits(&self) -> usize;
fn flush(&self) -> DbResult<()>;
fn close(&self) -> DbResult<()>;
}
#[allow(dead_code)]
pub struct Bitcask {
pub(crate) engine: crate::engine::Engine,
pub(crate) compaction_threshold: f64,
}
impl DurabilityInner for crate::shard::ShardInner {
type Loc = DiskLoc;
fn write_new(&mut self, shard_id: u8, key: &[u8], value: &[u8]) -> DbResult<DiskLoc> {
let (loc, _gsn) = self.append_entry(shard_id, key, value, false)?;
Ok(loc)
}
fn write_update(
&mut self,
shard_id: u8,
old_loc: DiskLoc,
key: &[u8],
value: &[u8],
) -> DbResult<DiskLoc> {
let (new_loc, _gsn) = self.append_entry(shard_id, key, value, false)?;
self.add_dead_bytes(old_loc.file_id as u32, entry_size(key.len(), old_loc.len));
Ok(new_loc)
}
fn write_tombstone(&mut self, shard_id: u8, old_loc: DiskLoc, key: &[u8]) -> DbResult<()> {
let (_loc, _gsn) = self.append_entry(shard_id, key, &[], true)?;
self.add_dead_bytes(old_loc.file_id as u32, entry_size(key.len(), old_loc.len));
Ok(())
}
fn write_discard(&mut self, _loc: DiskLoc) -> DbResult<()> {
Ok(())
}
fn should_sync(&self) -> bool {
false
}
fn sync(&mut self) -> DbResult<()> {
Ok(())
}
}
impl Durability for Bitcask {
type Loc = DiskLoc;
type Inner = crate::shard::ShardInner;
fn shard_count(&self) -> usize {
self.engine.shards().len()
}
fn lock_shard(&self, shard_id: usize) -> MutexGuard<'_, crate::shard::ShardInner> {
self.engine.shards()[shard_id].lock()
}
fn shard_prefix_bits(&self) -> usize {
self.engine.config().shard_prefix_bits
}
fn flush(&self) -> DbResult<()> {
self.engine.flush()
}
fn close(&self) -> DbResult<()> {
self.engine.flush()
}
}
#[allow(dead_code)]
pub struct Fixed {
pub(crate) engine: crate::fixed::engine::FixedEngine,
}
impl Fixed {
pub fn open(
path: impl AsRef<Path>,
config: FixedConfig,
key_len: usize,
value_len: usize,
) -> DbResult<Self> {
let engine = crate::fixed::engine::FixedEngine::open(
path,
config,
key_len as u16,
value_len as u16,
)?;
Ok(Self { engine })
}
pub fn recover_entries(
&self,
mut visitor: impl FnMut(usize, &[u8], &[u8], u32),
) -> DbResult<u32> {
let shards = self.engine.shards();
let mut total_recovered = 0u32;
for (shard_idx, shard) in shards.iter().enumerate() {
let mut inner = shard.inner.lock();
let key_len = inner.key_len() as usize;
let value_len = inner.value_len() as usize;
if inner.has_clean_shutdown() {
inner.load_bitmap_sidecar()?;
let slot_count = inner.slot_count();
for slot_id in 0..slot_count {
if !inner.bitmap.is_set(slot_id) {
continue;
}
let buf = inner.read_slot(slot_id)?;
if let Some((key_bytes, value_bytes)) =
slot::validate_slot(&buf, key_len, value_len)
{
visitor(shard_idx, key_bytes, value_bytes, slot_id);
total_recovered += 1;
} else {
inner.bitmap.clear(slot_id);
}
}
} else {
let slot_count = inner.slot_count();
for slot_id in 0..slot_count {
let buf = inner.read_slot(slot_id)?;
if slot::slot_status(&buf) != slot::SLOT_OCCUPIED {
continue;
}
if let Some((key_bytes, value_bytes)) =
slot::validate_slot(&buf, key_len, value_len)
{
inner.bitmap.set(slot_id);
visitor(shard_idx, key_bytes, value_bytes, slot_id);
total_recovered += 1;
}
}
}
inner.clear_clean_shutdown()?;
}
Ok(total_recovered)
}
}
impl DurabilityInner for crate::fixed::shard::FixedShardInner {
type Loc = u32;
fn write_new(&mut self, _shard_id: u8, key: &[u8], value: &[u8]) -> DbResult<u32> {
let slot = self.alloc_slot()?;
self.write_slot(slot, key, value)?;
Ok(slot)
}
fn write_update(
&mut self,
_shard_id: u8,
old_loc: u32,
key: &[u8],
value: &[u8],
) -> DbResult<u32> {
self.write_slot(old_loc, key, value)?;
Ok(old_loc)
}
fn write_tombstone(&mut self, _shard_id: u8, old_loc: u32, key: &[u8]) -> DbResult<()> {
let _ = key; self.delete_slot(old_loc)?;
self.bitmap.clear(old_loc);
Ok(())
}
fn write_discard(&mut self, loc: u32) -> DbResult<()> {
self.delete_slot(loc)?;
self.bitmap.clear(loc);
Ok(())
}
fn should_sync(&self) -> bool {
self.should_sync()
}
fn sync(&mut self) -> DbResult<()> {
self.sync()
}
}
impl Durability for Fixed {
type Loc = u32;
type Inner = crate::fixed::shard::FixedShardInner;
fn shard_count(&self) -> usize {
self.engine.shards().len()
}
fn lock_shard(&self, shard_id: usize) -> MutexGuard<'_, crate::fixed::shard::FixedShardInner> {
sync::lock(&self.engine.shards()[shard_id].inner)
}
fn shard_prefix_bits(&self) -> usize {
self.engine.config().shard_prefix_bits
}
fn flush(&self) -> DbResult<()> {
self.engine.flush()
}
fn close(&self) -> DbResult<()> {
self.engine.close()
}
}