use std::fs::{self, File, OpenOptions};
use std::os::unix::fs::FileExt;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use crate::error::{DbError, DbResult};
use crate::fixed::bitmap::Bitmap;
use crate::fixed::config::FixedConfig;
use crate::fixed::slot;
const HEADER_SIZE: u64 = 4096;
const MAGIC: &[u8; 4] = b"FIXD";
const VERSION: u16 = 1;
const CLEAN_SHUTDOWN_OFFSET: u64 = 17;
pub struct FixedShardInner {
file: File,
dir: PathBuf,
pub(crate) bitmap: Bitmap,
pub(crate) slot_size: u16,
pub(crate) slot_count: u32,
key_len: u16,
value_len: u16,
#[allow(dead_code)]
shard_id: u8,
grow_step: u32,
pending_writes: u32,
sync_batch_size: u32,
last_sync: Instant,
sync_interval: Duration,
enable_fsync: bool,
}
impl FixedShardInner {
pub fn open(
dir: impl Into<PathBuf>,
shard_id: u8,
key_len: u16,
value_len: u16,
config: &FixedConfig,
) -> DbResult<Self> {
let dir = dir.into();
fs::create_dir_all(&dir)?;
let data_path = dir.join("fixed.data");
let slot_size = slot::slot_size(key_len as usize, value_len as usize) as u16;
let exists = data_path.exists();
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&data_path)?;
if exists {
let mut header = [0u8; HEADER_SIZE as usize];
file.read_exact_at(&mut header, 0)?;
if &header[0..4] != MAGIC {
return Err(DbError::FormatMismatch("fixed.data: bad magic".into()));
}
let stored_version = u16::from_le_bytes([header[4], header[5]]);
if stored_version != VERSION {
return Err(DbError::FormatMismatch(format!(
"fixed.data: version mismatch: stored {stored_version}, expected {VERSION}"
)));
}
let stored_slot_size = u16::from_le_bytes([header[6], header[7]]);
if stored_slot_size != slot_size {
return Err(DbError::FormatMismatch(format!(
"fixed.data: slot_size mismatch: stored {stored_slot_size}, expected {slot_size}"
)));
}
let stored_key_len = u16::from_le_bytes([header[12], header[13]]);
if stored_key_len != key_len {
return Err(DbError::FormatMismatch(format!(
"fixed.data: key_len mismatch: stored {stored_key_len}, expected {key_len}"
)));
}
let stored_value_len = u16::from_le_bytes([header[14], header[15]]);
if stored_value_len != value_len {
return Err(DbError::FormatMismatch(format!(
"fixed.data: value_len mismatch: stored {stored_value_len}, expected {value_len}"
)));
}
let stored_shard_id = header[16];
if stored_shard_id != shard_id {
return Err(DbError::FormatMismatch(format!(
"fixed.data: shard_id mismatch: stored {stored_shard_id}, expected {shard_id}"
)));
}
let stored_slot_count =
u32::from_le_bytes([header[8], header[9], header[10], header[11]]);
let bitmap = Bitmap::new(stored_slot_count);
Ok(Self {
file,
dir,
bitmap,
slot_size,
slot_count: stored_slot_count,
key_len,
value_len,
shard_id,
grow_step: config.grow_step,
pending_writes: 0,
sync_batch_size: config.sync_batch_size,
last_sync: Instant::now(),
sync_interval: config.sync_interval,
enable_fsync: config.enable_fsync,
})
} else {
let initial_slots = config.grow_step;
let total_size = HEADER_SIZE + initial_slots as u64 * slot_size as u64;
file.set_len(total_size)?;
let mut header = [0u8; HEADER_SIZE as usize];
header[0..4].copy_from_slice(MAGIC);
header[4..6].copy_from_slice(&VERSION.to_le_bytes());
header[6..8].copy_from_slice(&slot_size.to_le_bytes());
header[8..12].copy_from_slice(&initial_slots.to_le_bytes());
header[12..14].copy_from_slice(&key_len.to_le_bytes());
header[14..16].copy_from_slice(&value_len.to_le_bytes());
header[16] = shard_id;
file.write_all_at(&header, 0)?;
file.sync_data()?;
let bitmap = Bitmap::new(initial_slots);
Ok(Self {
file,
dir,
bitmap,
slot_size,
slot_count: initial_slots,
key_len,
value_len,
shard_id,
grow_step: config.grow_step,
pending_writes: 0,
sync_batch_size: config.sync_batch_size,
last_sync: Instant::now(),
sync_interval: config.sync_interval,
enable_fsync: config.enable_fsync,
})
}
}
#[inline]
fn slot_offset(&self, slot_id: u32) -> u64 {
HEADER_SIZE + slot_id as u64 * self.slot_size as u64
}
pub fn write_slot(&mut self, slot_id: u32, key: &[u8], value: &[u8]) -> DbResult<()> {
let size = self.slot_size as usize;
let mut buf = vec![0u8; size];
slot::serialize_slot(&mut buf, key, value);
let offset = self.slot_offset(slot_id);
self.file.write_all_at(&buf, offset)?;
self.pending_writes += 1;
if self.enable_fsync {
self.file.sync_data()?;
}
Ok(())
}
pub fn delete_slot(&mut self, slot_id: u32) -> DbResult<()> {
let size = self.slot_size as usize;
let mut buf = vec![0u8; size];
slot::serialize_deleted_slot(&mut buf);
let offset = self.slot_offset(slot_id);
self.file.write_all_at(&buf, offset)?;
self.pending_writes += 1;
if self.enable_fsync {
self.file.sync_data()?;
}
Ok(())
}
pub fn read_slot(&self, slot_id: u32) -> DbResult<Vec<u8>> {
let size = self.slot_size as usize;
let mut buf = vec![0u8; size];
let offset = self.slot_offset(slot_id);
self.file.read_exact_at(&mut buf, offset)?;
Ok(buf)
}
pub fn grow(&mut self) -> DbResult<()> {
let new_count = self
.slot_count
.checked_add(self.grow_step)
.ok_or(DbError::Internal("slot_count overflow on grow"))?;
let new_size = HEADER_SIZE + new_count as u64 * self.slot_size as u64;
self.file.set_len(new_size)?;
self.bitmap.grow(new_count);
self.slot_count = new_count;
self.file.write_all_at(&new_count.to_le_bytes(), 8)?;
self.file.sync_data()?;
Ok(())
}
pub fn alloc_slot(&mut self) -> DbResult<u32> {
match self.bitmap.alloc() {
Ok(id) => Ok(id),
Err(DbError::SlotsFull) => {
self.grow()?;
self.bitmap.alloc()
}
Err(e) => Err(e),
}
}
pub fn should_sync(&self) -> bool {
self.pending_writes >= self.sync_batch_size
|| self.last_sync.elapsed() >= self.sync_interval
}
pub fn sync(&mut self) -> DbResult<()> {
self.file.sync_data()?;
self.pending_writes = 0;
self.last_sync = Instant::now();
Ok(())
}
pub fn clean_shutdown(&mut self) -> DbResult<()> {
self.file.sync_data()?;
let bitmap_path = self.dir.join("fixed.bitmap");
fs::write(&bitmap_path, self.bitmap.as_bytes())?;
self.file.write_all_at(&[1u8], CLEAN_SHUTDOWN_OFFSET)?;
self.file.sync_data()?;
Ok(())
}
pub fn clear_clean_shutdown(&mut self) -> DbResult<()> {
self.file.write_all_at(&[0u8], CLEAN_SHUTDOWN_OFFSET)?;
self.file.sync_data()?;
Ok(())
}
pub fn has_clean_shutdown(&self) -> bool {
let mut buf = [0u8; 1];
if self
.file
.read_exact_at(&mut buf, CLEAN_SHUTDOWN_OFFSET)
.is_err()
{
return false;
}
buf[0] == 1 && self.dir.join("fixed.bitmap").exists()
}
pub fn load_bitmap_sidecar(&mut self) -> DbResult<()> {
let bitmap_path = self.dir.join("fixed.bitmap");
let data = fs::read(&bitmap_path)?;
self.bitmap = Bitmap::from_bytes(&data, self.slot_count);
Ok(())
}
pub fn slot_count(&self) -> u32 {
self.slot_count
}
pub fn key_len(&self) -> u16 {
self.key_len
}
pub fn value_len(&self) -> u16 {
self.value_len
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn test_config() -> FixedConfig {
FixedConfig {
grow_step: 64,
..FixedConfig::test()
}
}
#[test]
fn test_create_and_reopen() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
{
let shard = FixedShardInner::open(&shard_dir, 0, 8, 32, &cfg).unwrap();
assert_eq!(shard.slot_count(), cfg.grow_step);
assert_eq!(shard.key_len(), 8);
assert_eq!(shard.value_len(), 32);
}
{
let shard = FixedShardInner::open(&shard_dir, 0, 8, 32, &cfg).unwrap();
assert_eq!(shard.slot_count(), cfg.grow_step);
}
}
#[test]
fn test_reopen_mismatch_detected() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
FixedShardInner::open(&shard_dir, 0, 8, 32, &cfg).unwrap();
let result = FixedShardInner::open(&shard_dir, 0, 16, 32, &cfg);
assert!(result.is_err());
let msg = result.err().unwrap().to_string();
assert!(
msg.contains("mismatch"),
"expected a mismatch error, got: {msg}"
);
}
#[test]
fn test_write_read_slot() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let slot_id = shard.alloc_slot().unwrap();
let key = b"key_0001";
let value = b"value___00000001";
shard.write_slot(slot_id, key, value).unwrap();
let buf = shard.read_slot(slot_id).unwrap();
let (k, v) = slot::validate_slot(&buf, key.len(), value.len()).expect("CRC should match");
assert_eq!(k, key);
assert_eq!(v, value);
}
#[test]
fn test_delete_slot() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let slot_id = shard.alloc_slot().unwrap();
shard
.write_slot(slot_id, b"key_0001", b"value___00000001")
.unwrap();
shard.delete_slot(slot_id).unwrap();
let buf = shard.read_slot(slot_id).unwrap();
assert_eq!(slot::slot_status(&buf), slot::SLOT_DELETED);
assert!(slot::validate_slot(&buf, 8, 16).is_none());
}
#[test]
fn test_grow() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = FixedConfig {
grow_step: 4,
..FixedConfig::test()
};
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 8, &cfg).unwrap();
assert_eq!(shard.slot_count(), 4);
for _ in 0..4 {
let id = shard.alloc_slot().unwrap();
shard.write_slot(id, b"kkkkkkkk", b"vvvvvvvv").unwrap();
}
let id = shard.alloc_slot().unwrap();
assert_eq!(id, 4);
assert_eq!(shard.slot_count(), 8);
shard.write_slot(id, b"kkkkkkkk", b"vvvvvvvv").unwrap();
let buf = shard.read_slot(id).unwrap();
assert!(slot::validate_slot(&buf, 8, 8).is_some());
}
#[test]
fn test_clean_shutdown_and_reopen() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
{
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let id = shard.alloc_slot().unwrap();
shard
.write_slot(id, b"key_0001", b"value___00000001")
.unwrap();
shard.clean_shutdown().unwrap();
assert!(shard.has_clean_shutdown());
}
{
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
assert!(shard.has_clean_shutdown());
shard.load_bitmap_sidecar().unwrap();
assert_eq!(shard.bitmap.occupied(), 1);
assert!(shard.bitmap.is_set(0));
shard.clear_clean_shutdown().unwrap();
assert!(!shard.has_clean_shutdown());
}
}
#[test]
fn test_should_sync() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = FixedConfig {
grow_step: 64,
sync_batch_size: 2,
sync_interval: Duration::from_secs(60),
..FixedConfig::test()
};
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 8, &cfg).unwrap();
assert!(!shard.should_sync());
let id0 = shard.alloc_slot().unwrap();
shard.write_slot(id0, b"kkkkkkkk", b"vvvvvvvv").unwrap();
assert!(!shard.should_sync());
let id1 = shard.alloc_slot().unwrap();
shard.write_slot(id1, b"kkkkkkkk", b"vvvvvvvv").unwrap();
assert!(shard.should_sync());
shard.sync().unwrap();
assert!(!shard.should_sync());
}
}