use std::fs::{self, File, OpenOptions};
use std::os::unix::fs::FileExt;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use crate::error::{DbError, DbResult};
use crate::fixed::bitmap::Bitmap;
use crate::fixed::config::FixedConfig;
use crate::fixed::slot;
pub(crate) const HEADER_SIZE: u64 = 4096;
const MAGIC: &[u8; 4] = b"FIXD";
const VERSION: u16 = 2;
const CLEAN_SHUTDOWN_OFFSET: u64 = 17;
pub struct FixedShardInner {
file: File,
dir: PathBuf,
pub(crate) bitmap: Bitmap,
pub(crate) versions: Vec<u32>,
pub(crate) slot_size: u16,
pub(crate) slot_count: u32,
key_len: u16,
value_len: u16,
pub(crate) shard_id: u8,
grow_step: u32,
pending_writes: u32,
sync_batch_size: u32,
last_sync: Instant,
sync_interval: Duration,
enable_fsync: bool,
#[cfg(feature = "replication")]
pub(crate) replication_tx:
Option<rtrb::Producer<crate::fixed_replication::FixedReplicationEvent>>,
}
impl FixedShardInner {
pub fn open(
dir: impl Into<PathBuf>,
shard_id: u8,
key_len: u16,
value_len: u16,
config: &FixedConfig,
) -> DbResult<Self> {
let dir = dir.into();
fs::create_dir_all(&dir)?;
let data_path = dir.join("fixed.data");
let slot_size = slot::slot_size(key_len as usize, value_len as usize) as u16;
let exists = data_path.exists();
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&data_path)?;
if exists {
let mut header = [0u8; HEADER_SIZE as usize];
file.read_exact_at(&mut header, 0)?;
if &header[0..4] != MAGIC {
return Err(DbError::FormatMismatch("fixed.data: bad magic".into()));
}
let stored_version = u16::from_le_bytes([header[4], header[5]]);
if stored_version != VERSION {
return Err(DbError::FormatMismatch(format!(
"fixed.data: version mismatch: stored {stored_version}, expected {VERSION}"
)));
}
let stored_slot_size = u16::from_le_bytes([header[6], header[7]]);
if stored_slot_size != slot_size {
return Err(DbError::FormatMismatch(format!(
"fixed.data: slot_size mismatch: stored {stored_slot_size}, expected {slot_size}"
)));
}
let stored_key_len = u16::from_le_bytes([header[12], header[13]]);
if stored_key_len != key_len {
return Err(DbError::FormatMismatch(format!(
"fixed.data: key_len mismatch: stored {stored_key_len}, expected {key_len}"
)));
}
let stored_value_len = u16::from_le_bytes([header[14], header[15]]);
if stored_value_len != value_len {
return Err(DbError::FormatMismatch(format!(
"fixed.data: value_len mismatch: stored {stored_value_len}, expected {value_len}"
)));
}
let stored_shard_id = header[16];
if stored_shard_id != shard_id {
return Err(DbError::FormatMismatch(format!(
"fixed.data: shard_id mismatch: stored {stored_shard_id}, expected {shard_id}"
)));
}
let stored_slot_count =
u32::from_le_bytes([header[8], header[9], header[10], header[11]]);
let bitmap = Bitmap::new(stored_slot_count);
let versions = vec![0u32; stored_slot_count as usize];
Ok(Self {
file,
dir,
bitmap,
versions,
slot_size,
slot_count: stored_slot_count,
key_len,
value_len,
shard_id,
grow_step: config.grow_step,
pending_writes: 0,
sync_batch_size: config.sync_batch_size,
last_sync: Instant::now(),
sync_interval: config.sync_interval,
enable_fsync: config.enable_fsync,
#[cfg(feature = "replication")]
replication_tx: None,
})
} else {
let initial_slots = config.grow_step;
let total_size = HEADER_SIZE + initial_slots as u64 * slot_size as u64;
file.set_len(total_size)?;
let mut header = [0u8; HEADER_SIZE as usize];
header[0..4].copy_from_slice(MAGIC);
header[4..6].copy_from_slice(&VERSION.to_le_bytes());
header[6..8].copy_from_slice(&slot_size.to_le_bytes());
header[8..12].copy_from_slice(&initial_slots.to_le_bytes());
header[12..14].copy_from_slice(&key_len.to_le_bytes());
header[14..16].copy_from_slice(&value_len.to_le_bytes());
header[16] = shard_id;
file.write_all_at(&header, 0)?;
file.sync_data()?;
let bitmap = Bitmap::new(initial_slots);
let versions = vec![0u32; initial_slots as usize];
Ok(Self {
file,
dir,
bitmap,
versions,
slot_size,
slot_count: initial_slots,
key_len,
value_len,
shard_id,
grow_step: config.grow_step,
pending_writes: 0,
sync_batch_size: config.sync_batch_size,
last_sync: Instant::now(),
sync_interval: config.sync_interval,
enable_fsync: config.enable_fsync,
#[cfg(feature = "replication")]
replication_tx: None,
})
}
}
#[inline]
fn slot_offset(&self, slot_id: u32) -> u64 {
HEADER_SIZE + slot_id as u64 * self.slot_size as u64
}
pub fn write_slot(&mut self, slot_id: u32, key: &[u8], value: &[u8]) -> DbResult<u32> {
let old_meta = self.versions[slot_id as usize];
let new_meta = slot::with_status(slot::bump_version(old_meta), slot::STATUS_OCCUPIED);
let size = self.slot_size as usize;
let mut buf = vec![0u8; size];
slot::serialize_slot(&mut buf, new_meta, key, value);
let offset = self.slot_offset(slot_id);
self.file.write_all_at(&buf, offset)?;
self.versions[slot_id as usize] = new_meta;
self.maybe_warn_wrap(slot_id, slot::version_of(new_meta));
self.pending_writes += 1;
if self.enable_fsync {
self.file.sync_data()?;
}
#[cfg(feature = "replication")]
if let Some(tx) = &mut self.replication_tx
&& tx
.push(crate::fixed_replication::FixedReplicationEvent::Write {
slot_id,
payload: buf,
})
.is_err()
{
metrics::counter!(
"armdb.fixed.events_dropped",
"shard" => self.shard_id.to_string()
)
.increment(1);
}
Ok(new_meta)
}
#[cfg_attr(not(feature = "replication"), allow(unused_variables))]
pub fn delete_slot(&mut self, slot_id: u32, key: &[u8]) -> DbResult<u32> {
let old_meta = self.versions[slot_id as usize];
let new_meta = slot::with_status(slot::bump_version(old_meta), slot::STATUS_DELETED);
let offset = self.slot_offset(slot_id);
self.file.write_all_at(&new_meta.to_le_bytes(), offset)?;
self.versions[slot_id as usize] = new_meta;
self.pending_writes += 1;
if self.enable_fsync {
self.file.sync_data()?;
}
#[cfg(feature = "replication")]
if let Some(tx) = &mut self.replication_tx
&& tx
.push(crate::fixed_replication::FixedReplicationEvent::Delete {
slot_id,
meta: new_meta,
key: key.to_vec(),
})
.is_err()
{
metrics::counter!(
"armdb.fixed.events_dropped",
"shard" => self.shard_id.to_string()
)
.increment(1);
}
Ok(new_meta)
}
#[inline]
fn maybe_warn_wrap(&self, slot_id: u32, version: u32) {
if version >= slot::VERSION_WARN_THRESHOLD {
metrics::counter!(
"armdb.fixed.version_near_wrap",
"shard" => self.shard_id.to_string()
)
.increment(1);
tracing::warn!(
shard_id = self.shard_id,
slot_id,
version,
"FixedStore slot version approaching 30-bit wrap"
);
}
}
pub fn apply_foreign_slot(
&mut self,
slot_id: u32,
meta: u32,
key: &[u8],
value: &[u8],
) -> DbResult<()> {
let size = self.slot_size as usize;
let mut buf = vec![0u8; size];
slot::serialize_slot(&mut buf, meta, key, value);
let offset = self.slot_offset(slot_id);
self.file.write_all_at(&buf, offset)?;
self.versions[slot_id as usize] = meta;
self.pending_writes += 1;
if self.enable_fsync {
self.file.sync_data()?;
}
Ok(())
}
pub fn apply_foreign_delete(&mut self, slot_id: u32, meta: u32) -> DbResult<()> {
let offset = self.slot_offset(slot_id);
self.file.write_all_at(&meta.to_le_bytes(), offset)?;
self.versions[slot_id as usize] = meta;
self.pending_writes += 1;
if self.enable_fsync {
self.file.sync_data()?;
}
Ok(())
}
pub fn read_slot_header_and_key(&self, slot_id: u32, key_len: usize) -> DbResult<Vec<u8>> {
let n = slot::SLOT_HEADER_SIZE + key_len;
let mut buf = vec![0u8; n];
self.file
.read_exact_at(&mut buf, self.slot_offset(slot_id))?;
Ok(buf)
}
pub fn read_slot(&self, slot_id: u32) -> DbResult<Vec<u8>> {
let size = self.slot_size as usize;
let mut buf = vec![0u8; size];
let offset = self.slot_offset(slot_id);
self.file.read_exact_at(&mut buf, offset)?;
Ok(buf)
}
pub fn grow(&mut self) -> DbResult<()> {
let new_count = self
.slot_count
.checked_add(self.grow_step)
.ok_or(DbError::Internal("slot_count overflow on grow"))?;
let new_size = HEADER_SIZE + new_count as u64 * self.slot_size as u64;
self.file.set_len(new_size)?;
self.bitmap.grow(new_count);
self.versions.resize(new_count as usize, 0);
self.slot_count = new_count;
self.file.write_all_at(&new_count.to_le_bytes(), 8)?;
self.file.sync_data()?;
Ok(())
}
pub fn alloc_slot(&mut self) -> DbResult<u32> {
match self.bitmap.alloc() {
Ok(id) => Ok(id),
Err(DbError::SlotsFull) => {
self.grow()?;
self.bitmap.alloc()
}
Err(e) => Err(e),
}
}
pub fn should_sync(&self) -> bool {
self.pending_writes >= self.sync_batch_size
|| self.last_sync.elapsed() >= self.sync_interval
}
pub fn sync(&mut self) -> DbResult<()> {
self.file.sync_data()?;
self.pending_writes = 0;
self.last_sync = Instant::now();
Ok(())
}
const SIDECAR_FILE: &str = "fixed.versions";
pub fn clean_shutdown(&mut self) -> DbResult<()> {
self.file.sync_data()?;
self.write_versions_sidecar()?;
self.file.write_all_at(&[1u8], CLEAN_SHUTDOWN_OFFSET)?;
self.file.sync_data()?;
Ok(())
}
fn write_versions_sidecar(&self) -> DbResult<()> {
let path = self.dir.join(Self::SIDECAR_FILE);
let expected_len = (self.slot_count as usize) * 4 + 8;
let mut buf: Vec<u8> = Vec::with_capacity(expected_len);
for &m in &self.versions {
buf.extend_from_slice(&m.to_le_bytes());
}
buf.extend_from_slice(&self.slot_count.to_le_bytes());
let mut h = crc32fast::Hasher::new();
h.update(&buf);
let crc = h.finalize();
buf.extend_from_slice(&crc.to_le_bytes());
std::fs::write(&path, &buf)?;
Ok(())
}
pub fn has_clean_shutdown(&self) -> bool {
let mut buf = [0u8; 1];
if self
.file
.read_exact_at(&mut buf, CLEAN_SHUTDOWN_OFFSET)
.is_err()
{
return false;
}
buf[0] == 1 && self.dir.join(Self::SIDECAR_FILE).exists()
}
pub fn clear_clean_shutdown(&mut self) -> DbResult<()> {
self.file.write_all_at(&[0u8], CLEAN_SHUTDOWN_OFFSET)?;
self.file.sync_data()?;
Ok(())
}
pub fn load_versions_sidecar(&mut self) -> DbResult<()> {
let path = self.dir.join(Self::SIDECAR_FILE);
let data = std::fs::read(&path)?;
let expected_len = (self.slot_count as usize) * 4 + 8;
if data.len() != expected_len {
return Err(DbError::FormatMismatch(format!(
"fixed.versions size mismatch: expected {expected_len}, got {}",
data.len()
)));
}
let versions_bytes = &data[..self.slot_count as usize * 4];
let trailer = &data[data.len() - 8..];
let stored_slot_count = u32::from_le_bytes(trailer[0..4].try_into().expect("4 bytes"));
let stored_crc = u32::from_le_bytes(trailer[4..8].try_into().expect("4 bytes"));
if stored_slot_count != self.slot_count {
return Err(DbError::FormatMismatch(format!(
"fixed.versions slot_count mismatch: stored {stored_slot_count}, header {}",
self.slot_count
)));
}
let mut h = crc32fast::Hasher::new();
h.update(&data[..data.len() - 4]);
let actual_crc = h.finalize();
if actual_crc != stored_crc {
return Err(DbError::FormatMismatch(format!(
"fixed.versions CRC mismatch: expected {stored_crc:#x}, got {actual_crc:#x}"
)));
}
self.versions.clear();
self.versions.reserve(self.slot_count as usize);
for chunk in versions_bytes.chunks_exact(4) {
self.versions
.push(u32::from_le_bytes(chunk.try_into().expect("4 bytes")));
}
self.bitmap = crate::fixed::bitmap::Bitmap::from_versions(&self.versions);
Ok(())
}
pub fn slot_count(&self) -> u32 {
self.slot_count
}
pub fn key_len(&self) -> u16 {
self.key_len
}
pub fn value_len(&self) -> u16 {
self.value_len
}
pub fn dir(&self) -> &std::path::Path {
&self.dir
}
pub fn read_chunk_at(&self, offset: u64, buf: &mut [u8]) -> DbResult<()> {
self.file.read_exact_at(buf, offset)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn test_config() -> FixedConfig {
FixedConfig {
grow_step: 64,
..FixedConfig::test()
}
}
#[test]
fn test_create_and_reopen() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
{
let shard = FixedShardInner::open(&shard_dir, 0, 8, 32, &cfg).unwrap();
assert_eq!(shard.slot_count(), cfg.grow_step);
assert_eq!(shard.key_len(), 8);
assert_eq!(shard.value_len(), 32);
}
{
let shard = FixedShardInner::open(&shard_dir, 0, 8, 32, &cfg).unwrap();
assert_eq!(shard.slot_count(), cfg.grow_step);
}
}
#[test]
fn test_reopen_mismatch_detected() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
FixedShardInner::open(&shard_dir, 0, 8, 32, &cfg).unwrap();
let result = FixedShardInner::open(&shard_dir, 0, 16, 32, &cfg);
assert!(result.is_err());
let msg = result.err().unwrap().to_string();
assert!(
msg.contains("mismatch"),
"expected a mismatch error, got: {msg}"
);
}
#[test]
fn test_write_read_slot() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let slot_id = shard.alloc_slot().unwrap();
let key = b"key_0001";
let value = b"value___00000001";
shard.write_slot(slot_id, key, value).unwrap();
let buf = shard.read_slot(slot_id).unwrap();
let (_m, k, v) = slot::read_slot(&buf, key.len(), value.len()).expect("CRC should match");
assert_eq!(k, key);
assert_eq!(v, value);
}
#[test]
fn test_delete_slot() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let id = shard.alloc_slot().unwrap();
shard
.write_slot(id, b"key_0001", b"value___00000001")
.unwrap();
shard.delete_slot(id, b"key_0001").unwrap();
let buf = shard.read_slot(id).unwrap();
assert_eq!(slot::status_of(slot::meta_of(&buf)), slot::STATUS_DELETED);
assert!(slot::read_slot(&buf, 8, 16).is_none());
}
#[test]
fn test_grow() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = FixedConfig {
grow_step: 4,
..FixedConfig::test()
};
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 8, &cfg).unwrap();
assert_eq!(shard.slot_count(), 4);
for _ in 0..4 {
let id = shard.alloc_slot().unwrap();
shard.write_slot(id, b"kkkkkkkk", b"vvvvvvvv").unwrap();
}
let id = shard.alloc_slot().unwrap();
assert_eq!(id, 4);
assert_eq!(shard.slot_count(), 8);
shard.write_slot(id, b"kkkkkkkk", b"vvvvvvvv").unwrap();
let buf = shard.read_slot(id).unwrap();
assert!(slot::read_slot(&buf, 8, 8).is_some());
}
#[test]
fn test_clean_shutdown_and_reopen() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
{
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let id = shard.alloc_slot().unwrap();
shard
.write_slot(id, b"key_0001", b"value___00000001")
.unwrap();
shard.clean_shutdown().unwrap();
assert!(shard.has_clean_shutdown());
}
{
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
assert!(shard.has_clean_shutdown());
shard.load_versions_sidecar().unwrap();
assert_eq!(shard.bitmap.occupied(), 1);
assert!(shard.bitmap.is_set(0));
shard.clear_clean_shutdown().unwrap();
assert!(!shard.has_clean_shutdown());
}
}
#[test]
fn test_versions_sidecar_roundtrip() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
{
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let id0 = shard.alloc_slot().unwrap();
let id1 = shard.alloc_slot().unwrap();
shard
.write_slot(id0, b"key_0001", b"value___00000001")
.unwrap();
shard
.write_slot(id1, b"key_0002", b"value___00000002")
.unwrap();
shard.delete_slot(id1, b"key_0002").unwrap();
shard.clean_shutdown().unwrap();
}
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
assert!(shard.has_clean_shutdown());
shard.load_versions_sidecar().unwrap();
assert_eq!(shard.versions.len(), shard.slot_count() as usize);
assert_eq!(slot::status_of(shard.versions[0]), slot::STATUS_OCCUPIED);
assert_eq!(slot::status_of(shard.versions[1]), slot::STATUS_DELETED);
let b = crate::fixed::bitmap::Bitmap::from_versions(&shard.versions);
assert!(b.is_set(0));
assert!(!b.is_set(1));
}
#[test]
fn test_versions_sidecar_trailer_validation_fails_on_truncation() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
{
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let id = shard.alloc_slot().unwrap();
shard
.write_slot(id, b"key_0001", b"value___00000001")
.unwrap();
shard.clean_shutdown().unwrap();
}
let sidecar = shard_dir.join("fixed.versions");
let data = std::fs::read(&sidecar).unwrap();
std::fs::write(&sidecar, &data[..data.len() - 4]).unwrap();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let err = shard.load_versions_sidecar().unwrap_err();
match err {
DbError::FormatMismatch(msg) => {
assert!(
msg.contains("fixed.versions") || msg.contains("size mismatch"),
"got: {msg}"
);
}
other => panic!("expected FormatMismatch, got {other:?}"),
}
}
#[test]
fn test_versions_sidecar_corrupted_crc_fails() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
{
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let id = shard.alloc_slot().unwrap();
shard
.write_slot(id, b"key_0001", b"value___00000001")
.unwrap();
shard.clean_shutdown().unwrap();
}
let sidecar = shard_dir.join("fixed.versions");
let mut data = std::fs::read(&sidecar).unwrap();
data[0] ^= 0xFF;
std::fs::write(&sidecar, &data).unwrap();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let err = shard.load_versions_sidecar().unwrap_err();
match err {
DbError::FormatMismatch(msg) => {
assert!(msg.contains("CRC") || msg.contains("crc"), "got: {msg}");
}
other => panic!("expected FormatMismatch, got {other:?}"),
}
}
#[test]
fn test_should_sync() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = FixedConfig {
grow_step: 64,
sync_batch_size: 2,
sync_interval: Duration::from_secs(60),
..FixedConfig::test()
};
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 8, &cfg).unwrap();
assert!(!shard.should_sync());
let id0 = shard.alloc_slot().unwrap();
shard.write_slot(id0, b"kkkkkkkk", b"vvvvvvvv").unwrap();
assert!(!shard.should_sync());
let id1 = shard.alloc_slot().unwrap();
shard.write_slot(id1, b"kkkkkkkk", b"vvvvvvvv").unwrap();
assert!(shard.should_sync());
shard.sync().unwrap();
assert!(!shard.should_sync());
}
#[test]
fn test_write_slot_bumps_version() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let id = shard.alloc_slot().unwrap();
let m1 = shard
.write_slot(id, b"key_0001", b"value___00000001")
.unwrap();
assert_eq!(slot::status_of(m1), slot::STATUS_OCCUPIED);
assert_eq!(slot::version_of(m1), 1, "first write → version 1");
let m2 = shard
.write_slot(id, b"key_0001", b"value___00000002")
.unwrap();
assert_eq!(slot::version_of(m2), 2, "second write → version 2");
assert_eq!(shard.versions[id as usize], m2);
}
#[test]
fn test_delete_slot_4byte_partial_write() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let id = shard.alloc_slot().unwrap();
shard
.write_slot(id, b"key_0001", b"value___00000001")
.unwrap();
let m = shard.delete_slot(id, b"key_0001").unwrap();
assert_eq!(slot::status_of(m), slot::STATUS_DELETED);
assert_eq!(slot::version_of(m), 2);
let buf = shard.read_slot(id).unwrap();
assert_eq!(
&buf[slot::SLOT_HEADER_SIZE..slot::SLOT_HEADER_SIZE + 8],
b"key_0001"
);
assert_eq!(slot::meta_of(&buf), m);
assert!(slot::read_slot(&buf, 8, 16).is_none());
}
#[test]
fn test_delete_then_write_continues_version() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let id = shard.alloc_slot().unwrap();
shard
.write_slot(id, b"key_0001", b"val1_00_0000_000")
.unwrap();
shard.delete_slot(id, b"key_0001").unwrap();
let m3 = shard
.write_slot(id, b"key_0002", b"val2_00_0000_000")
.unwrap();
assert_eq!(slot::version_of(m3), 3, "version continues across DELETE");
}
#[test]
fn test_apply_foreign_slot_overwrites_version() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let foreign_meta = slot::pack_meta(slot::STATUS_OCCUPIED, 500);
shard
.apply_foreign_slot(0, foreign_meta, b"key_0001", b"value___00000001")
.unwrap();
assert_eq!(shard.versions[0], foreign_meta);
let buf = shard.read_slot(0).unwrap();
assert_eq!(slot::meta_of(&buf), foreign_meta);
}
#[test]
fn test_apply_foreign_delete_sets_meta() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let occ = slot::pack_meta(slot::STATUS_OCCUPIED, 100);
shard
.apply_foreign_slot(0, occ, b"keyabcde", b"0123456701234567")
.unwrap();
let del = slot::pack_meta(slot::STATUS_DELETED, 101);
shard.apply_foreign_delete(0, del).unwrap();
assert_eq!(shard.versions[0], del);
let buf = shard.read_slot(0).unwrap();
assert_eq!(slot::meta_of(&buf), del);
assert_eq!(
&buf[slot::SLOT_HEADER_SIZE..slot::SLOT_HEADER_SIZE + 8],
b"keyabcde"
);
}
#[test]
fn test_read_slot_header_and_key() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let id = shard.alloc_slot().unwrap();
shard
.write_slot(id, b"key_0001", b"value___00000001")
.unwrap();
let buf = shard.read_slot_header_and_key(id, 8).unwrap();
assert_eq!(buf.len(), slot::SLOT_HEADER_SIZE + 8);
assert_eq!(&buf[slot::SLOT_HEADER_SIZE..], b"key_0001");
}
#[cfg(feature = "replication")]
#[test]
fn test_replication_hook_overflow_does_not_block() {
use crate::fixed_replication::FixedReplicationEvent;
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let (producer, _consumer) = rtrb::RingBuffer::<FixedReplicationEvent>::new(2);
shard.replication_tx = Some(producer);
let id = shard.alloc_slot().unwrap();
for _ in 0..3 {
shard
.write_slot(id, b"key_0001", b"value___00000001")
.unwrap();
}
assert!(slot::version_of(shard.versions[id as usize]) >= 3);
}
#[test]
fn test_warn_wrap_threshold_triggers() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let id = shard.alloc_slot().unwrap();
shard.versions[id as usize] =
slot::pack_meta(slot::STATUS_OCCUPIED, slot::VERSION_WARN_THRESHOLD - 1);
shard
.write_slot(id, b"key_0001", b"value___00000001")
.unwrap();
assert!(slot::version_of(shard.versions[id as usize]) >= slot::VERSION_WARN_THRESHOLD);
}
#[cfg(feature = "replication")]
#[test]
fn test_replication_hook_pushes_events() {
use crate::fixed_replication::FixedReplicationEvent;
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
let cfg = test_config();
let mut shard = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg).unwrap();
let (producer, mut consumer) = rtrb::RingBuffer::new(8);
shard.replication_tx = Some(producer);
let id = shard.alloc_slot().unwrap();
shard
.write_slot(id, b"key_0001", b"value___00000001")
.unwrap();
shard.delete_slot(id, b"key_0001").unwrap();
match consumer.pop().unwrap() {
FixedReplicationEvent::Write { slot_id, payload } => {
assert_eq!(slot_id, id);
assert_eq!(payload.len(), shard.slot_size as usize);
assert_eq!(
slot::status_of(slot::meta_of(&payload)),
slot::STATUS_OCCUPIED
);
}
_ => panic!("expected Write"),
}
match consumer.pop().unwrap() {
FixedReplicationEvent::Delete { slot_id, meta, key } => {
assert_eq!(slot_id, id);
assert_eq!(slot::status_of(meta), slot::STATUS_DELETED);
assert_eq!(key, b"key_0001");
}
_ => panic!("expected Delete"),
}
}
#[test]
fn test_reject_old_version() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_0");
std::fs::create_dir_all(&shard_dir).unwrap();
let slot_size = slot::slot_size(8, 16) as u16;
let mut header = [0u8; 4096];
header[0..4].copy_from_slice(b"FIXD");
header[4..6].copy_from_slice(&1u16.to_le_bytes()); header[6..8].copy_from_slice(&slot_size.to_le_bytes());
header[8..12].copy_from_slice(&10u32.to_le_bytes());
header[12..14].copy_from_slice(&8u16.to_le_bytes());
header[14..16].copy_from_slice(&16u16.to_le_bytes());
header[16] = 0; let data_path = shard_dir.join("fixed.data");
std::fs::write(&data_path, header).unwrap();
let f = std::fs::OpenOptions::new()
.write(true)
.open(&data_path)
.unwrap();
f.set_len(4096 + 10 * slot_size as u64).unwrap();
let cfg = test_config();
let err = FixedShardInner::open(&shard_dir, 0, 8, 16, &cfg)
.err()
.expect("expected an error opening v1 file");
match err {
DbError::FormatMismatch(msg) => {
assert!(msg.contains("version"), "got: {msg}");
}
other => panic!("expected FormatMismatch, got {other:?}"),
}
}
}