# FixedStore Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Implement FixedTree and FixedMap — fixed-slot durability backend for armdb with pwrite + batched fdatasync, replacing Bitcask append for fixed-size frequently-updated values.
**Architecture:** New `fixed/` submodule in armdb. FixedTree/FixedMap share existing SkipList/HashMap indexes with ConstNode made generic over location type (DiskLoc vs u32). FixedEngine manages per-shard slot files with in-memory bitmap. No compaction.
**Tech Stack:** Rust, rustix (pwrite/pread/fdatasync/ftruncate), crc32fast, seize (epoch GC), parking_lot (Mutex)
**Spec:** `armdb/docs/fixed_store_design.md`
---
## File Structure
```
armdb/src/
fixed/
mod.rs — module declarations + re-exports
config.rs — FixedConfig struct + validate + Default
bitmap.rs — In-memory Bitmap for slot allocation
slot.rs — Slot header format, serialize/deserialize, CRC
shard.rs — FixedShard, FixedShardInner (file I/O, fdatasync)
engine.rs — FixedEngine (shard management, db.meta)
recovery.rs — Recovery: scan slots → rebuild index + bitmap
tree.rs — FixedTree (SkipList-based, ordered)
map.rs — FixedMap (HashMap-based, O(1))
skiplist/node.rs — MODIFY: ConstNode<K, V, L> generic over Location
key.rs — MODIFY: add Location trait
lib.rs — MODIFY: add mod fixed + re-exports
armdb/tests/
fixed_tree_tests.rs — Integration tests for FixedTree
fixed_map_tests.rs — Integration tests for FixedMap
```
## Dependency Graph
```
Task 1 (Location + ConstNode) ──────────────────────────────┐
│
Task 2 (FixedConfig) ──┐ │
Task 3 (Bitmap) ───────┤ │
Task 4 (Slot format) ──┼──→ Task 5 (FixedShard) ──→ Task 6 (FixedEngine)
│ │
│ Task 7 (Recovery) ←─────────┤
│ │ │
│ ▼ │
└──→ Task 8 (FixedTree) ←─────────────┘
│
▼
Task 9 (FixedMap)
│
▼
Task 10 (Integration tests)
```
Tasks 2, 3, 4 are independent and can run in parallel.
Task 1 can run in parallel with tasks 2-4.
---
### Task 1: Location trait + ConstNode generic
Make ConstNode generic over location type so FixedTree can use `u32` (slot_id) instead of `DiskLoc` (12 bytes), saving 8 bytes per entry.
**Files:**
- Modify: `armdb/src/key.rs`
- Modify: `armdb/src/skiplist/node.rs`
- [ ] **Step 1: Add Location trait to key.rs**
In `armdb/src/key.rs`, add after the `Key` trait definition:
```rust
/// Marker trait for disk location types stored in SkipList nodes.
/// DiskLoc (12 bytes) for Bitcask, u32 (4 bytes) for FixedStore slot_id.
pub trait Location: Copy + Send + Sync + 'static {}
impl Location for crate::disk_loc::DiskLoc {}
impl Location for u32 {}
```
- [ ] **Step 2: Make ConstNode generic over L: Location**
In `armdb/src/skiplist/node.rs`, change the import and struct:
Replace:
```rust
use crate::disk_loc::DiskLoc;
```
With:
```rust
use crate::disk_loc::DiskLoc;
use crate::key::Location;
```
Replace the struct definition (lines 76-86):
```rust
pub struct ConstNode<K: Key, const V: usize> {
pub key: K,
seq: AtomicU64,
disk: UnsafeCell<DiskLoc>,
value: UnsafeCell<[u8; V]>,
height: u8,
tower_ptr: *const AtomicPtr<Self>,
}
```
With:
```rust
pub struct ConstNode<K: Key, const V: usize, L: Location = DiskLoc> {
pub key: K,
seq: AtomicU64,
loc: UnsafeCell<L>,
value: UnsafeCell<[u8; V]>,
height: u8,
tower_ptr: *const AtomicPtr<Self>,
}
```
- [ ] **Step 3: Update impl block and all methods**
Change the impl block (line 88):
```rust
impl<K: Key, const V: usize, L: Location> ConstNode<K, V, L> {
```
Update `alloc()` (line 104):
```rust
pub fn alloc(key: K, value: [u8; V], loc: L, height: u8) -> *mut Self {
let layout = Self::layout_for(height);
unsafe {
let ptr = std::alloc::alloc(layout) as *mut Self;
assert!(!ptr.is_null(), "allocation failed");
let tower = (ptr as *mut u8).add(Self::base_size()) as *mut AtomicPtr<Self>;
std::ptr::write(&raw mut (*ptr).key, key);
std::ptr::write(&raw mut (*ptr).seq, AtomicU64::new(0));
std::ptr::write(&raw mut (*ptr).loc, UnsafeCell::new(loc));
std::ptr::write(&raw mut (*ptr).value, UnsafeCell::new(value));
std::ptr::write(&raw mut (*ptr).height, height);
std::ptr::write(&raw mut (*ptr).tower_ptr, tower as *const AtomicPtr<Self>);
for i in 0..height as usize {
std::ptr::write(tower.add(i), AtomicPtr::new(ptr::null_mut()));
}
ptr
}
}
```
Update `read_data()` — rename return to `L`:
```rust
pub fn read_data(&self) -> (L, [u8; V]) {
loop {
let s1 = self.seq.load(Ordering::Acquire);
if s1 & 1 != 0 { spin_or_yield(); continue; }
let loc = unsafe { read_cell(&self.loc) };
let value = unsafe { read_cell(&self.value) };
fence(Ordering::Acquire);
let s2 = self.seq.load(Ordering::Relaxed);
if s1 == s2 { return (loc, value); }
spin_or_yield();
}
}
```
Update `read_disk()` → rename to `read_loc()`:
```rust
pub fn read_loc(&self) -> L {
loop {
let s1 = self.seq.load(Ordering::Acquire);
if s1 & 1 != 0 { spin_or_yield(); continue; }
let loc = unsafe { read_cell(&self.loc) };
fence(Ordering::Acquire);
let s2 = self.seq.load(Ordering::Relaxed);
if s1 == s2 { return loc; }
spin_or_yield();
}
}
```
Update `write_data()`:
```rust
pub fn write_data(&self, loc: L, value: &[u8; V]) {
self.seq.fetch_add(1, Ordering::Relaxed);
fence(Ordering::Release);
unsafe {
write_cell(&self.loc, loc);
write_cell(&self.value, *value);
}
self.seq.fetch_add(1, Ordering::Release);
}
```
Update `write_disk()` → rename to `write_loc()`:
```rust
pub fn write_loc(&self, loc: L) {
self.seq.fetch_add(1, Ordering::Relaxed);
fence(Ordering::Release);
unsafe {
write_cell(&self.loc, loc);
}
self.seq.fetch_add(1, Ordering::Release);
}
```
- [ ] **Step 4: Update SkipNode impl**
Change (line 216):
```rust
impl<K: Key, const V: usize, L: Location> SkipNode for ConstNode<K, V, L> {
```
Update `alloc_head()` to use `L` — need a zero/default value. Add `Default` bound to Location or use `MaybeUninit`:
Add to `Location` trait in key.rs:
```rust
pub trait Location: Copy + Send + Sync + 'static {
fn zeroed() -> Self;
}
impl Location for DiskLoc {
fn zeroed() -> Self { DiskLoc::new(0, 0, 0, 0) }
}
impl Location for u32 {
fn zeroed() -> Self { 0 }
}
```
Then `alloc_head()`:
```rust
fn alloc_head() -> *mut Self {
Self::alloc(K::zeroed(), [0u8; V], L::zeroed(), MAX_HEIGHT as u8)
}
```
Update Send/Sync impls (lines 273-275):
```rust
unsafe impl<K: Key, const V: usize, L: Location> Send for ConstNode<K, V, L> {}
unsafe impl<K: Key, const V: usize, L: Location> Sync for ConstNode<K, V, L> {}
```
- [ ] **Step 5: Fix all call sites — rename read_disk/write_disk → read_loc/write_loc**
Search for `read_disk` and `write_disk` in all armdb source files and update:
- `armdb/src/const_tree.rs`: `node.read_disk()` → `node.read_loc()`
- `armdb/src/const_map.rs`: similar
- `armdb/src/compaction.rs`: `node.read_disk()` → `node.read_loc()`, `node.write_disk()` → `node.write_loc()`
- `armdb/src/recovery.rs`: if uses these methods
All existing code uses `ConstNode<K, V>` which now means `ConstNode<K, V, DiskLoc>` — types unchanged.
- [ ] **Step 6: Verify compilation**
Run: `cargo check -p armdb`
Expected: compiles with no errors.
- [ ] **Step 7: Run existing tests**
Run: `cargo nextest run -p armdb`
Expected: all existing tests pass (no behavior change).
- [ ] **Step 8: Commit**
```bash
git add armdb/src/key.rs armdb/src/skiplist/node.rs armdb/src/const_tree.rs armdb/src/const_map.rs armdb/src/compaction.rs armdb/src/recovery.rs
git commit -m "refactor: make ConstNode generic over Location type (DiskLoc vs u32)"
```
---
### Task 2: FixedConfig
**Files:**
- Create: `armdb/src/fixed/config.rs`
- Create: `armdb/src/fixed/mod.rs` (initial)
- [ ] **Step 1: Create fixed/mod.rs with config module**
```rust
pub mod config;
pub use config::FixedConfig;
```
- [ ] **Step 2: Write test for FixedConfig**
In `armdb/src/fixed/config.rs`:
```rust
use std::time::Duration;
use crate::error::{DbError, DbResult};
#[derive(Debug, Clone)]
pub struct FixedConfig {
/// Number of shards. Immutable after creation.
pub shard_count: usize,
/// Bits of key prefix used for shard routing. Immutable.
pub shard_prefix_bits: usize,
/// Number of new slots allocated per grow. Tunable.
pub grow_step: u32,
/// fdatasync interval. Tunable.
pub sync_interval: Duration,
/// fdatasync every N writes. Tunable.
pub sync_batch_size: u32,
/// fdatasync on every write. Tunable.
pub enable_fsync: bool,
}
impl Default for FixedConfig {
fn default() -> Self {
let shard_count = std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(4);
Self {
shard_count,
shard_prefix_bits: 0,
grow_step: 1_000_000,
sync_interval: Duration::from_millis(50),
sync_batch_size: 1000,
enable_fsync: false,
}
}
}
impl FixedConfig {
/// Reduced config for tests: fewer shards, smaller grow_step.
pub fn test() -> Self {
Self {
shard_count: 3,
grow_step: 1_000,
sync_interval: Duration::from_millis(10),
sync_batch_size: 100,
..Default::default()
}
}
pub fn validate(&self) -> DbResult<()> {
if self.shard_count == 0 || self.shard_count > 255 {
return Err(DbError::InvalidConfig("shard_count must be 1..=255"));
}
if self.grow_step == 0 {
return Err(DbError::InvalidConfig("grow_step must be > 0"));
}
if self.sync_batch_size == 0 {
return Err(DbError::InvalidConfig("sync_batch_size must be > 0"));
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config_is_valid() {
FixedConfig::default().validate().unwrap();
}
#[test]
fn test_test_config_is_valid() {
FixedConfig::test().validate().unwrap();
}
#[test]
fn test_invalid_shard_count() {
let mut c = FixedConfig::test();
c.shard_count = 0;
assert!(c.validate().is_err());
c.shard_count = 256;
assert!(c.validate().is_err());
}
#[test]
fn test_invalid_grow_step() {
let mut c = FixedConfig::test();
c.grow_step = 0;
assert!(c.validate().is_err());
}
}
```
- [ ] **Step 3: Add mod fixed to lib.rs**
In `armdb/src/lib.rs`, add the module declaration (near other mod declarations):
```rust
mod fixed;
```
And add export:
```rust
pub use fixed::FixedConfig;
```
- [ ] **Step 4: Verify**
Run: `cargo check -p armdb && cargo nextest run -p armdb -E 'test(fixed)'`
Expected: compiles, config tests pass.
- [ ] **Step 5: Commit**
```bash
git add armdb/src/fixed/
git commit -m "feat(armdb): add FixedConfig for fixed-slot store"
```
---
### Task 3: Bitmap
In-memory bitmap for slot allocation/deallocation.
**Files:**
- Create: `armdb/src/fixed/bitmap.rs`
- Modify: `armdb/src/fixed/mod.rs`
- [ ] **Step 1: Write Bitmap with tests**
```rust
use crate::error::{DbError, DbResult};
/// In-memory bitmap for tracking occupied/free slots.
/// Source of truth is the status byte of each slot on disk.
/// Bitmap is rebuilt on recovery.
pub struct Bitmap {
/// Each bit = 1 slot. 1 = occupied, 0 = free.
words: Vec<u64>,
/// Total number of slots tracked.
count: u32,
/// Number of occupied slots.
occupied: u32,
/// Hint: first word that might contain a free bit.
first_free_hint: usize,
}
impl Bitmap {
pub fn new(count: u32) -> Self {
let num_words = ((count as usize) + 63) / 64;
Self {
words: vec![0u64; num_words],
count,
occupied: 0,
first_free_hint: 0,
}
}
/// Allocate the first free slot. Returns slot_id.
pub fn alloc(&mut self) -> DbResult<u32> {
for i in self.first_free_hint..self.words.len() {
let word = self.words[i];
if word != u64::MAX {
let bit = (!word).trailing_zeros(); // first 0-bit
let slot_id = i as u32 * 64 + bit;
if slot_id >= self.count {
return Err(DbError::SlotsFull);
}
self.words[i] |= 1u64 << bit;
self.occupied += 1;
self.first_free_hint = i;
return Ok(slot_id);
}
}
Err(DbError::SlotsFull)
}
/// Mark slot as occupied.
pub fn set(&mut self, slot_id: u32) {
let (word, bit) = (slot_id as usize / 64, slot_id % 64);
if self.words[word] & (1u64 << bit) == 0 {
self.occupied += 1;
}
self.words[word] |= 1u64 << bit;
}
/// Mark slot as free.
pub fn clear(&mut self, slot_id: u32) {
let (word, bit) = (slot_id as usize / 64, slot_id % 64);
if self.words[word] & (1u64 << bit) != 0 {
self.occupied -= 1;
}
self.words[word] &= !(1u64 << bit);
if word < self.first_free_hint {
self.first_free_hint = word;
}
}
/// Check if slot is occupied.
pub fn is_set(&self, slot_id: u32) -> bool {
let (word, bit) = (slot_id as usize / 64, slot_id % 64);
self.words[word] & (1u64 << bit) != 0
}
pub fn occupied(&self) -> u32 { self.occupied }
pub fn count(&self) -> u32 { self.count }
pub fn is_full(&self) -> bool { self.occupied >= self.count }
/// Grow bitmap to accommodate more slots.
pub fn grow(&mut self, new_count: u32) {
assert!(new_count > self.count);
let new_words = ((new_count as usize) + 63) / 64;
self.words.resize(new_words, 0);
self.count = new_count;
}
/// Serialize bitmap for sidecar file.
pub fn as_bytes(&self) -> &[u8] {
unsafe {
std::slice::from_raw_parts(
self.words.as_ptr() as *const u8,
self.words.len() * 8,
)
}
}
/// Deserialize bitmap from sidecar file.
pub fn from_bytes(data: &[u8], count: u32) -> Self {
let num_words = ((count as usize) + 63) / 64;
let mut words = vec![0u64; num_words];
let copy_len = data.len().min(num_words * 8);
unsafe {
std::ptr::copy_nonoverlapping(
data.as_ptr(),
words.as_mut_ptr() as *mut u8,
copy_len,
);
}
let mut bm = Self { words, count, occupied: 0, first_free_hint: 0 };
// Count occupied bits
bm.occupied = bm.words.iter().map(|w| w.count_ones()).sum();
// Find first free hint
bm.first_free_hint = bm.words.iter()
.position(|&w| w != u64::MAX)
.unwrap_or(bm.words.len());
bm
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_alloc_sequential() {
let mut bm = Bitmap::new(128);
for i in 0..128u32 {
assert_eq!(bm.alloc().unwrap(), i);
}
assert!(bm.alloc().is_err());
assert!(bm.is_full());
}
#[test]
fn test_free_and_realloc() {
let mut bm = Bitmap::new(4);
let s0 = bm.alloc().unwrap();
let s1 = bm.alloc().unwrap();
let _s2 = bm.alloc().unwrap();
bm.clear(s1);
assert_eq!(bm.occupied(), 2);
let reused = bm.alloc().unwrap();
assert_eq!(reused, s1); // reuses freed slot
assert_eq!(bm.occupied(), 3);
assert!(!bm.is_set(s0) || bm.is_set(s0)); // s0 is set
bm.clear(s0);
assert!(!bm.is_set(s0));
}
#[test]
fn test_grow() {
let mut bm = Bitmap::new(64);
for _ in 0..64 { bm.alloc().unwrap(); }
assert!(bm.is_full());
bm.grow(128);
assert!(!bm.is_full());
let slot = bm.alloc().unwrap();
assert_eq!(slot, 64);
}
#[test]
fn test_serde_roundtrip() {
let mut bm = Bitmap::new(256);
bm.set(0);
bm.set(42);
bm.set(255);
let bytes = bm.as_bytes().to_vec();
let bm2 = Bitmap::from_bytes(&bytes, 256);
assert!(bm2.is_set(0));
assert!(bm2.is_set(42));
assert!(bm2.is_set(255));
assert!(!bm2.is_set(1));
assert_eq!(bm2.occupied(), 3);
}
}
```
- [ ] **Step 2: Add SlotsFull error variant**
In `armdb/src/error.rs`, add to the `DbError` enum:
```rust
SlotsFull,
```
And in the Display/Error impl, add the arm:
```rust
DbError::SlotsFull => write!(f, "all slots are occupied"),
```
- [ ] **Step 3: Add bitmap module to fixed/mod.rs**
```rust
pub mod bitmap;
pub mod config;
pub use config::FixedConfig;
```
- [ ] **Step 4: Verify**
Run: `cargo nextest run -p armdb -E 'test(bitmap)'`
Expected: all bitmap tests pass.
- [ ] **Step 5: Commit**
```bash
git add armdb/src/fixed/bitmap.rs armdb/src/fixed/mod.rs armdb/src/error.rs
git commit -m "feat(armdb): add in-memory Bitmap for FixedStore slot allocation"
```
---
### Task 4: Slot format
Slot serialization, deserialization, CRC computation.
**Files:**
- Create: `armdb/src/fixed/slot.rs`
- Modify: `armdb/src/fixed/mod.rs`
- [ ] **Step 1: Write slot module with tests**
```rust
use crc32fast::Hasher;
pub const SLOT_FREE: u8 = 0x00;
pub const SLOT_OCCUPIED: u8 = 0x01;
pub const SLOT_DELETED: u8 = 0x02;
/// Slot header: [status: 1][_pad: 3][crc32: 4] = 8 bytes.
pub const SLOT_HEADER_SIZE: usize = 8;
/// Compute total slot size (8-byte aligned).
pub const fn slot_size(key_len: usize, value_len: usize) -> usize {
let raw = SLOT_HEADER_SIZE + key_len + value_len;
(raw + 7) & !7
}
/// Compute CRC32 over key + value.
pub fn compute_crc(key: &[u8], value: &[u8]) -> u32 {
let mut h = Hasher::new();
h.update(key);
h.update(value);
h.finalize()
}
/// Serialize a slot into `buf`. `buf.len()` must equal `slot_size(key, value)`.
pub fn serialize_slot(buf: &mut [u8], key: &[u8], value: &[u8]) {
debug_assert_eq!(buf.len(), slot_size(key.len(), value.len()));
buf[0] = SLOT_OCCUPIED;
buf[1] = 0; buf[2] = 0; buf[3] = 0; // pad
let crc = compute_crc(key, value);
buf[4..8].copy_from_slice(&crc.to_le_bytes());
buf[8..8 + key.len()].copy_from_slice(key);
buf[8 + key.len()..8 + key.len() + value.len()].copy_from_slice(value);
// Zero tail padding
buf[8 + key.len() + value.len()..].fill(0);
}
/// Serialize a deleted/free slot marker.
pub fn serialize_deleted_slot(buf: &mut [u8]) {
buf[0] = SLOT_DELETED;
buf[1..].fill(0);
}
/// Parse status byte from a slot buffer.
pub fn slot_status(buf: &[u8]) -> u8 {
buf[0]
}
/// Validate CRC of a slot buffer. Returns (key_slice, value_slice) if valid.
pub fn validate_slot<'a>(
buf: &'a [u8],
key_len: usize,
value_len: usize,
) -> Option<(&'a [u8], &'a [u8])> {
if buf[0] != SLOT_OCCUPIED {
return None;
}
let stored_crc = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]);
let key = &buf[8..8 + key_len];
let value = &buf[8 + key_len..8 + key_len + value_len];
let computed_crc = compute_crc(key, value);
if stored_crc != computed_crc {
return None;
}
Some((key, value))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_slot_size_alignment() {
// 8 header + 8 key + 32 value = 48 (already aligned)
assert_eq!(slot_size(8, 32), 48);
// 8 header + 8 key + 64 value = 80 (already aligned)
assert_eq!(slot_size(8, 64), 80);
// 8 header + 16 key + 32 value = 56 (already aligned)
assert_eq!(slot_size(16, 32), 56);
// 8 header + 3 key + 5 value = 16 (already aligned)
assert_eq!(slot_size(3, 5), 16);
// 8 header + 3 key + 4 value = 15 → 16
assert_eq!(slot_size(3, 4), 16);
}
#[test]
fn test_serialize_validate_roundtrip() {
let key = [1u8, 2, 3, 4, 5, 6, 7, 8];
let value = [10u8; 32];
let ss = slot_size(8, 32);
let mut buf = vec![0u8; ss];
serialize_slot(&mut buf, &key, &value);
assert_eq!(slot_status(&buf), SLOT_OCCUPIED);
let (k, v) = validate_slot(&buf, 8, 32).expect("valid slot");
assert_eq!(k, &key);
assert_eq!(v, &value);
}
#[test]
fn test_corrupted_slot_fails_validation() {
let key = [1u8; 8];
let value = [2u8; 32];
let ss = slot_size(8, 32);
let mut buf = vec![0u8; ss];
serialize_slot(&mut buf, &key, &value);
// Corrupt one byte of value
buf[10] ^= 0xFF;
assert!(validate_slot(&buf, 8, 32).is_none());
}
#[test]
fn test_free_slot_not_valid() {
let buf = vec![0u8; 48];
assert!(validate_slot(&buf, 8, 32).is_none());
}
#[test]
fn test_deleted_slot() {
let mut buf = vec![0u8; 48];
serialize_deleted_slot(&mut buf);
assert_eq!(slot_status(&buf), SLOT_DELETED);
assert!(validate_slot(&buf, 8, 32).is_none());
}
}
```
- [ ] **Step 2: Add slot module to fixed/mod.rs**
```rust
pub mod bitmap;
pub mod config;
pub mod slot;
pub use config::FixedConfig;
```
- [ ] **Step 3: Verify**
Run: `cargo nextest run -p armdb -E 'test(slot)'`
Expected: all slot tests pass.
- [ ] **Step 4: Commit**
```bash
git add armdb/src/fixed/slot.rs armdb/src/fixed/mod.rs
git commit -m "feat(armdb): add slot format serialization for FixedStore"
```
---
### Task 5: FixedShard
Per-shard file I/O: open/create slot file, read/write slots, grow file, fdatasync batching.
**Files:**
- Create: `armdb/src/fixed/shard.rs`
- Modify: `armdb/src/fixed/mod.rs`
- [ ] **Step 1: Write FixedShard**
```rust
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Write};
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::time::Instant;
use crate::error::{DbError, DbResult};
use super::bitmap::Bitmap;
use super::config::FixedConfig;
use super::slot::{self, SLOT_HEADER_SIZE};
const HEADER_SIZE: u64 = 4096;
const MAGIC: &[u8; 4] = b"FIXD";
const VERSION: u16 = 1;
/// File header layout (first 4096 bytes).
#[repr(C)]
struct FileHeader {
magic: [u8; 4],
version: u16,
slot_size: u16,
slot_count: u32,
key_len: u16,
value_len: u16,
shard_id: u8,
clean_shutdown: u8,
_reserved: [u8; 4078],
}
pub(crate) struct FixedShardInner {
file: File,
dir: PathBuf,
pub(crate) bitmap: Bitmap,
pub(crate) slot_size: u16,
pub(crate) slot_count: u32,
key_len: u16,
value_len: u16,
shard_id: u8,
grow_step: u32,
// fdatasync batching
pending_writes: u32,
sync_batch_size: u32,
last_sync: Instant,
sync_interval: std::time::Duration,
enable_fsync: bool,
}
impl FixedShardInner {
/// Open or create a shard's fixed.data file.
pub fn open(
dir: &Path,
shard_id: u8,
key_len: usize,
value_len: usize,
config: &FixedConfig,
) -> DbResult<Self> {
fs::create_dir_all(dir)?;
let data_path = dir.join("fixed.data");
let ss = slot::slot_size(key_len, value_len) as u16;
let (file, slot_count) = if data_path.exists() {
let mut file = OpenOptions::new().read(true).write(true).open(&data_path)?;
let slot_count = Self::read_and_validate_header(&mut file, ss, key_len as u16, value_len as u16, shard_id)?;
(file, slot_count)
} else {
let mut file = OpenOptions::new()
.read(true).write(true).create(true)
.open(&data_path)?;
let initial_slots = config.grow_step;
Self::write_header(&mut file, ss, initial_slots, key_len as u16, value_len as u16, shard_id)?;
let total_size = HEADER_SIZE + initial_slots as u64 * ss as u64;
file.set_len(total_size)?;
(file, initial_slots)
};
let bitmap = Bitmap::new(slot_count);
Ok(Self {
file,
dir: dir.to_path_buf(),
bitmap,
slot_size: ss,
slot_count,
key_len: key_len as u16,
value_len: value_len as u16,
shard_id,
grow_step: config.grow_step,
pending_writes: 0,
sync_batch_size: config.sync_batch_size,
last_sync: Instant::now(),
sync_interval: config.sync_interval,
enable_fsync: config.enable_fsync,
})
}
fn slot_offset(&self, slot_id: u32) -> u64 {
HEADER_SIZE + slot_id as u64 * self.slot_size as u64
}
/// Write a slot to disk via pwrite.
pub fn write_slot(&mut self, slot_id: u32, key: &[u8], value: &[u8]) -> DbResult<()> {
let mut buf = vec![0u8; self.slot_size as usize];
slot::serialize_slot(&mut buf, key, value);
let offset = self.slot_offset(slot_id);
self.pwrite(&buf, offset)?;
self.pending_writes += 1;
if self.enable_fsync {
self.fdatasync()?;
}
Ok(())
}
/// Write a deleted marker to a slot.
pub fn delete_slot(&mut self, slot_id: u32) -> DbResult<()> {
let mut buf = vec![0u8; self.slot_size as usize];
slot::serialize_deleted_slot(&mut buf);
let offset = self.slot_offset(slot_id);
self.pwrite(&buf, offset)?;
self.pending_writes += 1;
Ok(())
}
/// Read a slot from disk.
pub fn read_slot(&self, slot_id: u32) -> DbResult<Vec<u8>> {
let mut buf = vec![0u8; self.slot_size as usize];
let offset = self.slot_offset(slot_id);
self.pread(&mut buf, offset)?;
Ok(buf)
}
/// Grow the file by grow_step slots.
pub fn grow(&mut self) -> DbResult<()> {
let new_count = self.slot_count + self.grow_step;
let new_size = HEADER_SIZE + new_count as u64 * self.slot_size as u64;
self.file.set_len(new_size)?;
self.bitmap.grow(new_count);
self.slot_count = new_count;
// Update slot_count in header
let count_bytes = new_count.to_le_bytes();
self.pwrite(&count_bytes, 8)?; // offset of slot_count in header
self.fdatasync()?;
Ok(())
}
/// Allocate a slot. Grows file if needed.
pub fn alloc_slot(&mut self) -> DbResult<u32> {
match self.bitmap.alloc() {
Ok(id) => Ok(id),
Err(DbError::SlotsFull) => {
self.grow()?;
self.bitmap.alloc()
}
Err(e) => Err(e),
}
}
/// Check if fdatasync should be triggered. Call after releasing shard mutex.
pub fn should_sync(&self) -> bool {
self.pending_writes >= self.sync_batch_size
|| self.last_sync.elapsed() >= self.sync_interval
}
/// Perform fdatasync and reset counters.
pub fn sync(&mut self) -> DbResult<()> {
if self.pending_writes > 0 {
self.fdatasync()?;
self.pending_writes = 0;
self.last_sync = Instant::now();
}
Ok(())
}
/// Write bitmap sidecar and set clean_shutdown flag.
pub fn clean_shutdown(&mut self) -> DbResult<()> {
self.fdatasync()?;
// Write bitmap sidecar
let bitmap_path = self.dir.join("fixed.bitmap");
fs::write(&bitmap_path, self.bitmap.as_bytes())?;
// Set clean_shutdown = 1 in header
self.pwrite(&[1u8], 16)?; // offset of clean_shutdown
self.fdatasync()?;
Ok(())
}
/// Clear clean_shutdown flag (called at open before recovery).
pub fn clear_clean_shutdown(&mut self) -> DbResult<()> {
self.pwrite(&[0u8], 16)?;
self.fdatasync()?;
Ok(())
}
/// Check if clean shutdown bitmap sidecar exists.
pub fn has_clean_shutdown(&self) -> bool {
// Read clean_shutdown byte from header
let mut buf = [0u8; 1];
if self.pread(&mut buf, 16).is_ok() && buf[0] == 1 {
return self.dir.join("fixed.bitmap").exists();
}
false
}
/// Load bitmap from sidecar file.
pub fn load_bitmap_sidecar(&mut self) -> DbResult<()> {
let bitmap_path = self.dir.join("fixed.bitmap");
let data = fs::read(&bitmap_path)?;
self.bitmap = Bitmap::from_bytes(&data, self.slot_count);
Ok(())
}
pub fn slot_count(&self) -> u32 { self.slot_count }
pub fn key_len(&self) -> usize { self.key_len as usize }
pub fn value_len(&self) -> usize { self.value_len as usize }
// -- Private I/O helpers --
fn pwrite(&self, buf: &[u8], offset: u64) -> DbResult<()> {
use std::os::unix::fs::FileExt;
self.file.write_at(buf, offset)?;
Ok(())
}
fn pread(&self, buf: &mut [u8], offset: u64) -> DbResult<()> {
use std::os::unix::fs::FileExt;
self.file.read_at(buf, offset)?;
Ok(())
}
fn fdatasync(&self) -> DbResult<()> {
self.file.sync_data()?;
Ok(())
}
fn write_header(
file: &mut File,
slot_size: u16,
slot_count: u32,
key_len: u16,
value_len: u16,
shard_id: u8,
) -> DbResult<()> {
let mut header = [0u8; HEADER_SIZE as usize];
header[0..4].copy_from_slice(MAGIC);
header[4..6].copy_from_slice(&VERSION.to_le_bytes());
header[6..8].copy_from_slice(&slot_size.to_le_bytes());
header[8..12].copy_from_slice(&slot_count.to_le_bytes());
header[12..14].copy_from_slice(&key_len.to_le_bytes());
header[14..16].copy_from_slice(&value_len.to_le_bytes());
header[16] = shard_id;
header[17] = 0; // clean_shutdown
use std::os::unix::fs::FileExt;
file.write_at(&header, 0)?;
Ok(())
}
fn read_and_validate_header(
file: &mut File,
expected_slot_size: u16,
expected_key_len: u16,
expected_value_len: u16,
expected_shard_id: u8,
) -> DbResult<u32> {
let mut header = [0u8; HEADER_SIZE as usize];
use std::os::unix::fs::FileExt;
file.read_at(&mut header, 0)?;
if &header[0..4] != MAGIC {
return Err(DbError::Corruption("invalid magic in fixed.data header"));
}
let version = u16::from_le_bytes([header[4], header[5]]);
if version != VERSION {
return Err(DbError::Corruption("unsupported fixed.data version"));
}
let slot_size = u16::from_le_bytes([header[6], header[7]]);
if slot_size != expected_slot_size {
return Err(DbError::Corruption("slot_size mismatch in fixed.data"));
}
let slot_count = u32::from_le_bytes([header[8], header[9], header[10], header[11]]);
let key_len = u16::from_le_bytes([header[12], header[13]]);
let value_len = u16::from_le_bytes([header[14], header[15]]);
if key_len != expected_key_len || value_len != expected_value_len {
return Err(DbError::Corruption("key_len/value_len mismatch"));
}
let shard_id = header[16];
if shard_id != expected_shard_id {
return Err(DbError::Corruption("shard_id mismatch"));
}
Ok(slot_count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_create_and_reopen() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_000");
let config = FixedConfig::test();
// Create
{
let inner = FixedShardInner::open(&shard_dir, 0, 8, 32, &config).unwrap();
assert_eq!(inner.slot_count(), config.grow_step);
assert_eq!(inner.key_len(), 8);
assert_eq!(inner.value_len(), 32);
}
// Reopen
{
let inner = FixedShardInner::open(&shard_dir, 0, 8, 32, &config).unwrap();
assert_eq!(inner.slot_count(), config.grow_step);
}
}
#[test]
fn test_write_read_slot() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_000");
let config = FixedConfig::test();
let mut inner = FixedShardInner::open(&shard_dir, 0, 8, 32, &config).unwrap();
let slot_id = inner.alloc_slot().unwrap();
let key = [1u8, 2, 3, 4, 5, 6, 7, 8];
let value = [42u8; 32];
inner.write_slot(slot_id, &key, &value).unwrap();
let buf = inner.read_slot(slot_id).unwrap();
let (k, v) = slot::validate_slot(&buf, 8, 32).expect("valid");
assert_eq!(k, &key);
assert_eq!(v, &value);
}
#[test]
fn test_grow() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_000");
let config = FixedConfig { grow_step: 4, ..FixedConfig::test() };
let mut inner = FixedShardInner::open(&shard_dir, 0, 8, 8, &config).unwrap();
// Fill all 4 slots
for _ in 0..4 { inner.alloc_slot().unwrap(); }
assert!(inner.bitmap.is_full());
// alloc_slot triggers grow
let slot = inner.alloc_slot().unwrap();
assert_eq!(slot, 4);
assert_eq!(inner.slot_count(), 8);
}
#[test]
fn test_clean_shutdown_and_reopen() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_000");
let config = FixedConfig::test();
{
let mut inner = FixedShardInner::open(&shard_dir, 0, 8, 32, &config).unwrap();
let s = inner.alloc_slot().unwrap();
inner.write_slot(s, &[1u8; 8], &[2u8; 32]).unwrap();
inner.bitmap.set(s);
inner.clean_shutdown().unwrap();
}
{
let inner = FixedShardInner::open(&shard_dir, 0, 8, 32, &config).unwrap();
assert!(inner.has_clean_shutdown());
}
}
}
```
**Important:** Header byte offsets: magic[0..4], version[4..6], slot_size[6..8], slot_count[8..12], key_len[12..14], value_len[14..16], shard_id[16], clean_shutdown[17]. In `clean_shutdown()` and `has_clean_shutdown()` and `clear_clean_shutdown()`, the pwrite/pread offset for `clean_shutdown` byte must be **17**, not 16. Fix during implementation.
**Missing from v1 (add later):** `cas()` method (compare-and-swap) and `iter()` (ordered iteration for FixedTree). The spec lists both in the public API. Basic put/get/delete are sufficient for first implementation.
- [ ] **Step 2: Add shard module to fixed/mod.rs**
```rust
pub mod bitmap;
pub mod config;
pub mod slot;
pub(crate) mod shard;
pub use config::FixedConfig;
```
- [ ] **Step 3: Verify**
Run: `cargo nextest run -p armdb -E 'test(fixed)'`
Expected: all tests pass.
- [ ] **Step 4: Commit**
```bash
git add armdb/src/fixed/shard.rs armdb/src/fixed/mod.rs
git commit -m "feat(armdb): add FixedShard with slot file I/O and fdatasync batching"
```
---
### Task 6: FixedEngine
Manages multiple shards, db.meta, open/close lifecycle.
**Files:**
- Create: `armdb/src/fixed/engine.rs`
- Modify: `armdb/src/fixed/mod.rs`
- [ ] **Step 1: Write FixedEngine**
```rust
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use parking_lot::Mutex;
use crate::error::{DbError, DbResult};
use super::config::FixedConfig;
use super::shard::FixedShardInner;
const META_MAGIC: &[u8; 4] = b"ARMD";
const META_VERSION: u8 = 2;
const META_BACKEND_FIXED: u8 = 1;
const META_SIZE: usize = 16;
pub(crate) struct FixedShard {
pub id: u8,
pub inner: Mutex<FixedShardInner>,
}
pub(crate) struct FixedEngine {
path: PathBuf,
shards: Arc<Vec<FixedShard>>,
config: FixedConfig,
}
impl FixedEngine {
pub fn open(
path: impl AsRef<Path>,
config: FixedConfig,
key_len: usize,
value_len: usize,
) -> DbResult<Self> {
config.validate()?;
let path = path.as_ref().to_path_buf();
fs::create_dir_all(&path)?;
let meta_path = path.join("db.meta");
if meta_path.exists() {
Self::validate_meta(&meta_path, &config)?;
} else {
Self::write_meta(&meta_path, &config)?;
}
let mut shards = Vec::with_capacity(config.shard_count);
for i in 0..config.shard_count {
let shard_dir = path.join(format!("shard_{i:03}"));
let inner = FixedShardInner::open(&shard_dir, i as u8, key_len, value_len, &config)?;
shards.push(FixedShard {
id: i as u8,
inner: Mutex::new(inner),
});
}
Ok(Self {
path,
shards: Arc::new(shards),
config,
})
}
pub fn shards(&self) -> &Arc<Vec<FixedShard>> { &self.shards }
pub fn config(&self) -> &FixedConfig { &self.config }
pub fn path(&self) -> &Path { &self.path }
pub fn shard_dirs(&self) -> Vec<PathBuf> {
(0..self.config.shard_count)
.map(|i| self.path.join(format!("shard_{i:03}")))
.collect()
}
/// Flush all pending writes and fdatasync all shards.
pub fn flush(&self) -> DbResult<()> {
for shard in self.shards.iter() {
shard.inner.lock().sync()?;
}
Ok(())
}
/// Clean shutdown: flush + write bitmap sidecar + set clean_shutdown flag.
pub fn close(&self) -> DbResult<()> {
for shard in self.shards.iter() {
shard.inner.lock().clean_shutdown()?;
}
Ok(())
}
fn write_meta(meta_path: &Path, config: &FixedConfig) -> DbResult<()> {
let mut buf = [0u8; META_SIZE];
buf[0..4].copy_from_slice(META_MAGIC);
buf[4] = META_VERSION;
buf[5] = META_BACKEND_FIXED;
buf[6] = config.shard_count as u8;
buf[7] = config.shard_prefix_bits as u8;
buf[8] = 0; // flags (no encryption)
// bytes 9-15 reserved
fs::write(meta_path, &buf)?;
Ok(())
}
fn validate_meta(meta_path: &Path, config: &FixedConfig) -> DbResult<()> {
let buf = fs::read(meta_path)?;
if buf.len() < META_SIZE {
return Err(DbError::Corruption("db.meta too short"));
}
if &buf[0..4] != META_MAGIC {
return Err(DbError::Corruption("db.meta invalid magic"));
}
if buf[4] != META_VERSION {
return Err(DbError::Corruption("db.meta version mismatch"));
}
if buf[5] != META_BACKEND_FIXED {
return Err(DbError::Corruption("db.meta backend is not FixedStore"));
}
let stored_shards = buf[6] as usize;
if stored_shards != config.shard_count {
return Err(DbError::Corruption("shard_count mismatch with db.meta"));
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_engine_open_creates_shards() {
let dir = tempdir().unwrap();
let config = FixedConfig { shard_count: 3, ..FixedConfig::test() };
let engine = FixedEngine::open(dir.path(), config, 8, 32).unwrap();
assert_eq!(engine.shards().len(), 3);
assert!(dir.path().join("db.meta").exists());
assert!(dir.path().join("shard_000/fixed.data").exists());
assert!(dir.path().join("shard_001/fixed.data").exists());
assert!(dir.path().join("shard_002/fixed.data").exists());
}
#[test]
fn test_engine_reopen() {
let dir = tempdir().unwrap();
let config = FixedConfig { shard_count: 2, ..FixedConfig::test() };
FixedEngine::open(dir.path(), config.clone(), 8, 32).unwrap();
// Reopen
let engine = FixedEngine::open(dir.path(), config, 8, 32).unwrap();
assert_eq!(engine.shards().len(), 2);
}
#[test]
fn test_engine_shard_count_mismatch() {
let dir = tempdir().unwrap();
let config = FixedConfig { shard_count: 2, ..FixedConfig::test() };
FixedEngine::open(dir.path(), config, 8, 32).unwrap();
let bad_config = FixedConfig { shard_count: 4, ..FixedConfig::test() };
assert!(FixedEngine::open(dir.path(), bad_config, 8, 32).is_err());
}
}
```
- [ ] **Step 2: Add engine module to fixed/mod.rs**
```rust
pub mod bitmap;
pub mod config;
pub(crate) mod engine;
pub mod slot;
pub(crate) mod shard;
pub use config::FixedConfig;
```
- [ ] **Step 3: Verify**
Run: `cargo nextest run -p armdb -E 'test(fixed)'`
Expected: all tests pass.
- [ ] **Step 4: Commit**
```bash
git add armdb/src/fixed/engine.rs armdb/src/fixed/mod.rs
git commit -m "feat(armdb): add FixedEngine with shard management and db.meta"
```
---
### Task 7: Recovery
Scan slots on disk to rebuild in-memory index + bitmap. Clean shutdown optimization.
**Files:**
- Create: `armdb/src/fixed/recovery.rs`
- Modify: `armdb/src/fixed/mod.rs`
- [ ] **Step 1: Write recovery module**
```rust
use std::mem::size_of;
use crate::error::DbResult;
use crate::key::Key;
use crate::skiplist::node::ConstNode;
use crate::skiplist::SkipList;
use super::shard::FixedShardInner;
use super::slot;
/// Recover a single shard: scan slots, insert valid entries into index, rebuild bitmap.
pub(crate) fn recover_shard<K: Key, const V: usize>(
inner: &mut FixedShardInner,
index: &SkipList<ConstNode<K, V, u32>>,
) -> DbResult<u32> {
let key_len = inner.key_len();
let value_len = inner.value_len();
let slot_count = inner.slot_count();
let mut recovered = 0u32;
if inner.has_clean_shutdown() {
// Fast path: load bitmap from sidecar, verify occupied slots
inner.load_bitmap_sidecar()?;
for slot_id in 0..slot_count {
if !inner.bitmap.is_set(slot_id) {
continue;
}
let buf = inner.read_slot(slot_id)?;
if let Some((key_bytes, value_bytes)) = slot::validate_slot(&buf, key_len, value_len) {
insert_into_index::<K, V>(index, key_bytes, value_bytes, slot_id);
recovered += 1;
} else {
// CRC mismatch despite bitmap saying occupied — clear bitmap
inner.bitmap.clear(slot_id);
}
}
} else {
// Slow path: sequential scan of all slots
for slot_id in 0..slot_count {
let buf = inner.read_slot(slot_id)?;
let status = slot::slot_status(&buf);
if status != slot::SLOT_OCCUPIED {
continue;
}
if let Some((key_bytes, value_bytes)) = slot::validate_slot(&buf, key_len, value_len) {
inner.bitmap.set(slot_id);
insert_into_index::<K, V>(index, key_bytes, value_bytes, slot_id);
recovered += 1;
}
// Invalid CRC → skip (bitmap stays 0 = free)
}
}
inner.clear_clean_shutdown()?;
Ok(recovered)
}
fn insert_into_index<K: Key, const V: usize>(
index: &SkipList<ConstNode<K, V, u32>>,
key_bytes: &[u8],
value_bytes: &[u8],
slot_id: u32,
) {
debug_assert_eq!(key_bytes.len(), size_of::<K>());
debug_assert_eq!(value_bytes.len(), V);
let key = K::from_bytes(key_bytes);
let mut value = [0u8; V];
value.copy_from_slice(value_bytes);
let height = crate::skiplist::random_height();
let node = ConstNode::<K, V, u32>::alloc(key, value, slot_id, height);
let guard = index.collector().enter();
let _ = index.insert(node, &guard);
// If insert returns Exists, duplicate key in slot file — later slot wins.
// This shouldn't happen in normal operation.
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::config::FixedConfig;
use tempfile::tempdir;
#[test]
fn test_dirty_recovery() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_000");
let config = FixedConfig::test();
// Write some slots
{
let mut inner = FixedShardInner::open(&shard_dir, 0, 8, 32, &config).unwrap();
for i in 0u64..5 {
let slot_id = inner.alloc_slot().unwrap();
inner.write_slot(slot_id, &i.to_be_bytes(), &[i as u8; 32]).unwrap();
}
inner.sync().unwrap();
// No clean shutdown — dirty recovery
}
// Recovery
{
let mut inner = FixedShardInner::open(&shard_dir, 0, 8, 32, &config).unwrap();
let index = SkipList::<ConstNode<[u8; 8], 32, u32>>::new(false);
let recovered = recover_shard::<[u8; 8], 32>(&mut inner, &index).unwrap();
assert_eq!(recovered, 5);
// Verify all keys accessible
let guard = index.collector().enter();
for i in 0u64..5 {
let node = index.get(&i.to_be_bytes(), &guard).expect("key must exist");
assert_eq!(node.read_value(), [i as u8; 32]);
}
}
}
#[test]
fn test_clean_recovery() {
let dir = tempdir().unwrap();
let shard_dir = dir.path().join("shard_000");
let config = FixedConfig::test();
{
let mut inner = FixedShardInner::open(&shard_dir, 0, 8, 32, &config).unwrap();
for i in 0u64..3 {
let slot_id = inner.alloc_slot().unwrap();
inner.write_slot(slot_id, &i.to_be_bytes(), &[i as u8; 32]).unwrap();
inner.bitmap.set(slot_id);
}
inner.clean_shutdown().unwrap();
}
{
let mut inner = FixedShardInner::open(&shard_dir, 0, 8, 32, &config).unwrap();
assert!(inner.has_clean_shutdown());
let index = SkipList::<ConstNode<[u8; 8], 32, u32>>::new(false);
let recovered = recover_shard::<[u8; 8], 32>(&mut inner, &index).unwrap();
assert_eq!(recovered, 3);
}
}
}
```
- [ ] **Step 2: Add recovery module to fixed/mod.rs**
```rust
pub mod bitmap;
pub mod config;
pub(crate) mod engine;
pub(crate) mod recovery;
pub mod slot;
pub(crate) mod shard;
pub use config::FixedConfig;
```
- [ ] **Step 3: Verify**
Run: `cargo nextest run -p armdb -E 'test(recovery) & test(fixed)'`
Expected: recovery tests pass.
- [ ] **Step 4: Commit**
```bash
git add armdb/src/fixed/recovery.rs armdb/src/fixed/mod.rs
git commit -m "feat(armdb): add FixedStore recovery with clean shutdown optimization"
```
---
### Task 8: FixedTree
Main public API — ordered SkipList-based collection with FixedStore durability.
**Files:**
- Create: `armdb/src/fixed/tree.rs`
- Modify: `armdb/src/fixed/mod.rs`
- Modify: `armdb/src/lib.rs`
- Create: `armdb/tests/fixed_tree_tests.rs`
- [ ] **Step 1: Write FixedTree struct and open()**
```rust
use std::mem::size_of;
use std::path::Path;
use crate::error::DbResult;
use crate::hook::{NoHook, WriteHook};
use crate::key::Key;
use crate::skiplist::node::ConstNode;
use crate::skiplist::SkipList;
use super::config::FixedConfig;
use super::engine::FixedEngine;
use super::recovery;
pub struct FixedTree<K: Key, const V: usize, H: WriteHook<K> = NoHook> {
index: SkipList<ConstNode<K, V, u32>>,
engine: FixedEngine,
shard_prefix_bits: usize,
reversed: bool,
hook: H,
}
impl<K: Key, const V: usize> FixedTree<K, V> {
pub fn open(path: impl AsRef<Path>, config: FixedConfig) -> DbResult<Self> {
Self::open_with_hook(path, config, NoHook)
}
}
impl<K: Key, const V: usize, H: WriteHook<K>> FixedTree<K, V, H> {
pub fn open_with_hook(path: impl AsRef<Path>, config: FixedConfig, hook: H) -> DbResult<Self> {
let reversed = false; // FixedTree default: natural order
let shard_prefix_bits = config.shard_prefix_bits;
let engine = FixedEngine::open(&path, config, size_of::<K>(), V)?;
let index = SkipList::<ConstNode<K, V, u32>>::new(reversed);
// Recovery: rebuild index from disk
for shard in engine.shards().iter() {
let mut inner = shard.inner.lock();
recovery::recover_shard::<K, V>(&mut inner, &index)?;
}
Ok(Self { index, engine, shard_prefix_bits, reversed, hook })
}
fn shard_for(&self, key: &K) -> usize {
let hash = xxhash_rust::xxh3::xxh3_64(key.as_bytes());
(hash as usize) % self.engine.shards().len()
}
pub fn get(&self, key: &K) -> Option<[u8; V]> {
let guard = self.index.collector().enter();
self.index.get(key.as_bytes(), &guard).map(|node| node.read_value())
}
pub fn contains(&self, key: &K) -> bool {
let guard = self.index.collector().enter();
self.index.get(key.as_bytes(), &guard).is_some()
}
pub fn put(&self, key: &K, value: &[u8; V]) -> DbResult<Option<[u8; V]>> {
let shard_id = self.shard_for(key);
let shards = self.engine.shards();
let mut inner = shards[shard_id].inner.lock();
let guard = self.index.collector().enter();
// Check if key exists
if let Some(existing) = self.index.get(key.as_bytes(), &guard) {
let old_value = existing.read_value();
let (slot_id, _) = existing.read_data();
// Overwrite slot on disk
inner.write_slot(slot_id, key.as_bytes(), value)?;
// Update index inline (SeqLock)
existing.write_data(slot_id, value);
let need_sync = inner.should_sync();
drop(inner);
if need_sync {
shards[shard_id].inner.lock().sync()?;
}
self.hook.on_write(key, Some(&old_value[..]), Some(&value[..]));
return Ok(Some(old_value));
}
// New key: allocate slot
let slot_id = inner.alloc_slot()?;
inner.write_slot(slot_id, key.as_bytes(), value)?;
// Insert into index
let height = crate::skiplist::random_height();
let node = ConstNode::<K, V, u32>::alloc(*key, *value, slot_id, height);
match self.index.insert(node, &guard) {
crate::skiplist::InsertResult::Inserted => {}
crate::skiplist::InsertResult::Exists(existing) => {
// Race: another thread inserted between our get and insert.
// Update existing node instead.
let old_value = existing.read_value();
existing.write_data(slot_id, value);
// Free the unused allocated node
unsafe { ConstNode::<K, V, u32>::dealloc_node(node); }
let need_sync = inner.should_sync();
drop(inner);
if need_sync {
shards[shard_id].inner.lock().sync()?;
}
self.hook.on_write(key, Some(&old_value[..]), Some(&value[..]));
return Ok(Some(old_value));
}
}
let need_sync = inner.should_sync();
drop(inner);
if need_sync {
shards[shard_id].inner.lock().sync()?;
}
self.hook.on_write(key, None, Some(&value[..]));
Ok(None)
}
pub fn delete(&self, key: &K) -> DbResult<Option<[u8; V]>> {
let shard_id = self.shard_for(key);
let shards = self.engine.shards();
let mut inner = shards[shard_id].inner.lock();
let guard = self.index.collector().enter();
if let Some(node) = self.index.get(key.as_bytes(), &guard) {
let old_value = node.read_value();
let (slot_id, _) = node.read_data();
// Mark slot as deleted on disk
inner.delete_slot(slot_id)?;
inner.bitmap.clear(slot_id);
// Remove from index
drop(guard);
let removed_guard = self.index.collector().enter();
self.index.remove(key.as_bytes(), &removed_guard);
let need_sync = inner.should_sync();
drop(inner);
if need_sync {
shards[shard_id].inner.lock().sync()?;
}
self.hook.on_write(key, Some(&old_value[..]), None);
return Ok(Some(old_value));
}
Ok(None)
}
/// Flush all pending writes to disk.
pub fn flush(&self) -> DbResult<()> {
self.engine.flush()
}
/// Clean shutdown with bitmap sidecar.
pub fn close(&self) -> DbResult<()> {
self.engine.close()
}
pub fn len(&self) -> usize {
self.index.len()
}
pub fn is_empty(&self) -> bool {
self.index.is_empty()
}
}
```
- [ ] **Step 2: Add tree module to fixed/mod.rs and lib.rs**
In `armdb/src/fixed/mod.rs`:
```rust
pub mod bitmap;
pub mod config;
pub(crate) mod engine;
pub(crate) mod recovery;
pub mod slot;
pub(crate) mod shard;
pub mod tree;
pub use config::FixedConfig;
pub use tree::FixedTree;
```
In `armdb/src/lib.rs`, add to exports:
```rust
pub use fixed::{FixedConfig, FixedTree};
```
- [ ] **Step 3: Write integration tests**
Create `armdb/tests/fixed_tree_tests.rs`:
```rust
use armdb::{FixedConfig, FixedTree};
use tempfile::tempdir;
#[test]
fn test_fixed_put_get() {
let dir = tempdir().unwrap();
let tree = FixedTree::<[u8; 8], 32>::open(dir.path(), FixedConfig::test()).unwrap();
for i in 0..100u64 {
let key = i.to_be_bytes();
let value = [i as u8; 32];
tree.put(&key, &value).unwrap();
}
for i in 0..100u64 {
let key = i.to_be_bytes();
let got = tree.get(&key).expect("key should exist");
assert_eq!(got, [i as u8; 32]);
}
assert_eq!(tree.len(), 100);
}
#[test]
fn test_fixed_overwrite() {
let dir = tempdir().unwrap();
let tree = FixedTree::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap();
let key = 1u64.to_be_bytes();
tree.put(&key, &100u64.to_be_bytes()).unwrap();
assert_eq!(tree.get(&key).unwrap(), 100u64.to_be_bytes());
let old = tree.put(&key, &200u64.to_be_bytes()).unwrap();
assert_eq!(old, Some(100u64.to_be_bytes()));
assert_eq!(tree.get(&key).unwrap(), 200u64.to_be_bytes());
assert_eq!(tree.len(), 1); // still 1 entry
}
#[test]
fn test_fixed_delete() {
let dir = tempdir().unwrap();
let tree = FixedTree::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap();
let key = 42u64.to_be_bytes();
tree.put(&key, &[1u8; 8]).unwrap();
assert!(tree.contains(&key));
let deleted = tree.delete(&key).unwrap();
assert_eq!(deleted, Some([1u8; 8]));
assert!(!tree.contains(&key));
assert!(tree.get(&key).is_none());
assert_eq!(tree.len(), 0);
}
#[test]
fn test_fixed_recovery() {
let dir = tempdir().unwrap();
// Write data, close without clean shutdown
{
let tree = FixedTree::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap();
for i in 0..50u64 {
tree.put(&i.to_be_bytes(), &(i * 10).to_be_bytes()).unwrap();
}
tree.flush().unwrap(); // ensure on disk, but no clean shutdown
}
// Reopen — should recover
{
let tree = FixedTree::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap();
assert_eq!(tree.len(), 50);
for i in 0..50u64 {
let got = tree.get(&i.to_be_bytes()).expect("key must survive recovery");
assert_eq!(got, (i * 10).to_be_bytes());
}
}
}
#[test]
fn test_fixed_clean_shutdown_recovery() {
let dir = tempdir().unwrap();
{
let tree = FixedTree::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap();
for i in 0..20u64 {
tree.put(&i.to_be_bytes(), &[i as u8; 8]).unwrap();
}
tree.close().unwrap(); // clean shutdown with bitmap sidecar
}
{
let tree = FixedTree::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap();
assert_eq!(tree.len(), 20);
}
}
#[test]
fn test_fixed_delete_and_reuse_slot() {
let dir = tempdir().unwrap();
let config = FixedConfig { grow_step: 10, ..FixedConfig::test() };
let tree = FixedTree::<[u8; 8], 8>::open(dir.path(), config).unwrap();
// Fill 10 slots
for i in 0..10u64 {
tree.put(&i.to_be_bytes(), &[i as u8; 8]).unwrap();
}
// Delete one
tree.delete(&5u64.to_be_bytes()).unwrap();
// Insert new key — should reuse freed slot (no grow needed)
tree.put(&99u64.to_be_bytes(), &[99u8; 8]).unwrap();
assert_eq!(tree.len(), 10);
assert_eq!(tree.get(&99u64.to_be_bytes()).unwrap(), [99u8; 8]);
}
```
- [ ] **Step 4: Verify**
Run: `cargo nextest run -p armdb -E 'test(fixed)'`
Expected: all FixedTree tests pass.
- [ ] **Step 5: Commit**
```bash
git add armdb/src/fixed/tree.rs armdb/src/fixed/mod.rs armdb/src/lib.rs armdb/tests/fixed_tree_tests.rs
git commit -m "feat(armdb): add FixedTree with put/get/delete and recovery"
```
---
### Task 9: FixedMap
HashMap-based collection for O(1) lookups (unordered).
**Files:**
- Create: `armdb/src/fixed/map.rs`
- Modify: `armdb/src/fixed/mod.rs`
- Modify: `armdb/src/lib.rs`
- Create: `armdb/tests/fixed_map_tests.rs`
- [ ] **Step 1: Write FixedMap**
```rust
use std::collections::HashMap;
use std::hash::Hash;
use std::mem::size_of;
use std::path::Path;
use parking_lot::Mutex;
use crate::error::DbResult;
use crate::hook::{NoHook, WriteHook};
use crate::key::Key;
use super::config::FixedConfig;
use super::engine::FixedEngine;
use super::slot;
pub(crate) struct FixedMapEntry<const V: usize> {
pub slot_id: u32,
pub value: [u8; V],
}
pub struct FixedMap<K: Key + Hash + Eq, const V: usize, H: WriteHook<K> = NoHook> {
indexes: Vec<Mutex<HashMap<K, FixedMapEntry<V>>>>,
engine: FixedEngine,
shard_prefix_bits: usize,
hook: H,
}
impl<K: Key + Hash + Eq, const V: usize> FixedMap<K, V> {
pub fn open(path: impl AsRef<Path>, config: FixedConfig) -> DbResult<Self> {
Self::open_with_hook(path, config, NoHook)
}
}
impl<K: Key + Hash + Eq, const V: usize, H: WriteHook<K>> FixedMap<K, V, H> {
pub fn open_with_hook(path: impl AsRef<Path>, config: FixedConfig, hook: H) -> DbResult<Self> {
let shard_prefix_bits = config.shard_prefix_bits;
let shard_count = config.shard_count;
let engine = FixedEngine::open(&path, config, size_of::<K>(), V)?;
let mut indexes: Vec<Mutex<HashMap<K, FixedMapEntry<V>>>> = Vec::with_capacity(shard_count);
for _ in 0..shard_count {
indexes.push(Mutex::new(HashMap::new()));
}
// Recovery
for shard in engine.shards().iter() {
let mut inner = shard.inner.lock();
let shard_id = shard.id as usize;
let key_len = inner.key_len();
let value_len = inner.value_len();
let slot_count = inner.slot_count();
let has_clean = inner.has_clean_shutdown();
if has_clean {
inner.load_bitmap_sidecar()?;
}
let mut index = indexes[shard_id].lock();
for slot_id in 0..slot_count {
if has_clean && !inner.bitmap.is_set(slot_id) {
continue;
}
let buf = inner.read_slot(slot_id)?;
let status = slot::slot_status(&buf);
if status != slot::SLOT_OCCUPIED {
continue;
}
if let Some((key_bytes, value_bytes)) = slot::validate_slot(&buf, key_len, value_len) {
let key = K::from_bytes(key_bytes);
let mut value = [0u8; V];
value.copy_from_slice(value_bytes);
if !has_clean {
inner.bitmap.set(slot_id);
}
index.insert(key, FixedMapEntry { slot_id, value });
} else if has_clean {
inner.bitmap.clear(slot_id);
}
}
inner.clear_clean_shutdown()?;
}
Ok(Self { indexes, engine, shard_prefix_bits, hook })
}
fn shard_for(&self, key: &K) -> usize {
let hash = xxhash_rust::xxh3::xxh3_64(key.as_bytes());
(hash as usize) % self.engine.shards().len()
}
pub fn get(&self, key: &K) -> Option<[u8; V]> {
let shard_id = self.shard_for(key);
let index = self.indexes[shard_id].lock();
index.get(key).map(|e| e.value)
}
pub fn contains(&self, key: &K) -> bool {
let shard_id = self.shard_for(key);
let index = self.indexes[shard_id].lock();
index.contains_key(key)
}
pub fn put(&self, key: &K, value: &[u8; V]) -> DbResult<Option<[u8; V]>> {
let shard_id = self.shard_for(key);
let shards = self.engine.shards();
let mut inner = shards[shard_id].inner.lock();
let mut index = self.indexes[shard_id].lock();
if let Some(entry) = index.get_mut(key) {
let old_value = entry.value;
inner.write_slot(entry.slot_id, key.as_bytes(), value)?;
entry.value = *value;
let need_sync = inner.should_sync();
drop(inner);
drop(index);
if need_sync { shards[shard_id].inner.lock().sync()?; }
self.hook.on_write(key, Some(&old_value[..]), Some(&value[..]));
return Ok(Some(old_value));
}
let slot_id = inner.alloc_slot()?;
inner.write_slot(slot_id, key.as_bytes(), value)?;
index.insert(*key, FixedMapEntry { slot_id, value: *value });
let need_sync = inner.should_sync();
drop(inner);
drop(index);
if need_sync { shards[shard_id].inner.lock().sync()?; }
self.hook.on_write(key, None, Some(&value[..]));
Ok(None)
}
pub fn delete(&self, key: &K) -> DbResult<Option<[u8; V]>> {
let shard_id = self.shard_for(key);
let shards = self.engine.shards();
let mut inner = shards[shard_id].inner.lock();
let mut index = self.indexes[shard_id].lock();
if let Some(entry) = index.remove(key) {
inner.delete_slot(entry.slot_id)?;
inner.bitmap.clear(entry.slot_id);
let need_sync = inner.should_sync();
drop(inner);
drop(index);
if need_sync { shards[shard_id].inner.lock().sync()?; }
self.hook.on_write(key, Some(&entry.value[..]), None);
return Ok(Some(entry.value));
}
Ok(None)
}
pub fn flush(&self) -> DbResult<()> { self.engine.flush() }
pub fn close(&self) -> DbResult<()> { self.engine.close() }
pub fn len(&self) -> usize {
self.indexes.iter().map(|m| m.lock().len()).sum()
}
pub fn is_empty(&self) -> bool {
self.indexes.iter().all(|m| m.lock().is_empty())
}
}
```
- [ ] **Step 2: Add map module to fixed/mod.rs and lib.rs**
In `armdb/src/fixed/mod.rs`:
```rust
pub mod bitmap;
pub mod config;
pub(crate) mod engine;
pub mod map;
pub(crate) mod recovery;
pub mod slot;
pub(crate) mod shard;
pub mod tree;
pub use config::FixedConfig;
pub use map::FixedMap;
pub use tree::FixedTree;
```
In `armdb/src/lib.rs`, update exports:
```rust
pub use fixed::{FixedConfig, FixedMap, FixedTree};
```
- [ ] **Step 3: Write integration tests**
Create `armdb/tests/fixed_map_tests.rs`:
```rust
use armdb::{FixedConfig, FixedMap};
use tempfile::tempdir;
#[test]
fn test_fixed_map_put_get() {
let dir = tempdir().unwrap();
let map = FixedMap::<[u8; 8], 32>::open(dir.path(), FixedConfig::test()).unwrap();
for i in 0..100u64 {
let key = i.to_be_bytes();
let value = [i as u8; 32];
map.put(&key, &value).unwrap();
}
for i in 0..100u64 {
let key = i.to_be_bytes();
let got = map.get(&key).expect("key should exist");
assert_eq!(got, [i as u8; 32]);
}
assert_eq!(map.len(), 100);
}
#[test]
fn test_fixed_map_overwrite() {
let dir = tempdir().unwrap();
let map = FixedMap::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap();
let key = 1u64.to_be_bytes();
map.put(&key, &[10u8; 8]).unwrap();
let old = map.put(&key, &[20u8; 8]).unwrap();
assert_eq!(old, Some([10u8; 8]));
assert_eq!(map.get(&key).unwrap(), [20u8; 8]);
assert_eq!(map.len(), 1);
}
#[test]
fn test_fixed_map_delete() {
let dir = tempdir().unwrap();
let map = FixedMap::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap();
let key = 42u64.to_be_bytes();
map.put(&key, &[1u8; 8]).unwrap();
let deleted = map.delete(&key).unwrap();
assert_eq!(deleted, Some([1u8; 8]));
assert!(map.get(&key).is_none());
assert_eq!(map.len(), 0);
}
#[test]
fn test_fixed_map_recovery() {
let dir = tempdir().unwrap();
{
let map = FixedMap::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap();
for i in 0..30u64 {
map.put(&i.to_be_bytes(), &(i * 10).to_be_bytes()).unwrap();
}
map.flush().unwrap();
}
{
let map = FixedMap::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap();
assert_eq!(map.len(), 30);
for i in 0..30u64 {
assert_eq!(map.get(&i.to_be_bytes()).unwrap(), (i * 10).to_be_bytes());
}
}
}
```
- [ ] **Step 4: Verify**
Run: `cargo nextest run -p armdb -E 'test(fixed_map)'`
Expected: all FixedMap tests pass.
- [ ] **Step 5: Commit**
```bash
git add armdb/src/fixed/map.rs armdb/src/fixed/mod.rs armdb/src/lib.rs armdb/tests/fixed_map_tests.rs
git commit -m "feat(armdb): add FixedMap with O(1) lookup and FixedStore durability"
```
---
### Task 10: Concurrent and stress tests
**Files:**
- Create: `armdb/tests/fixed_concurrent_tests.rs`
- [ ] **Step 1: Write concurrent tests**
```rust
use std::sync::Arc;
use std::thread;
use armdb::{FixedConfig, FixedTree, FixedMap};
use tempfile::tempdir;
#[test]
fn test_fixed_tree_concurrent_writes() {
let dir = tempdir().unwrap();
let tree = Arc::new(
FixedTree::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap()
);
let handles: Vec<_> = (0..8u64).map(|t| {
let tree = Arc::clone(&tree);
thread::spawn(move || {
for i in 0..100u64 {
let key = (t * 1000 + i).to_be_bytes();
let value = (t * 1000 + i).to_be_bytes();
tree.put(&key, &value).unwrap();
}
})
}).collect();
for h in handles { h.join().unwrap(); }
assert_eq!(tree.len(), 800);
// Verify all keys
for t in 0..8u64 {
for i in 0..100u64 {
let key = (t * 1000 + i).to_be_bytes();
let expected = (t * 1000 + i).to_be_bytes();
assert_eq!(tree.get(&key).unwrap(), expected);
}
}
}
#[test]
fn test_fixed_tree_concurrent_read_write() {
let dir = tempdir().unwrap();
let tree = Arc::new(
FixedTree::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap()
);
// Pre-populate
for i in 0..100u64 {
tree.put(&i.to_be_bytes(), &i.to_be_bytes()).unwrap();
}
let handles: Vec<_> = (0..4).map(|t| {
let tree = Arc::clone(&tree);
thread::spawn(move || {
for i in 0..100u64 {
if t % 2 == 0 {
// Writer: update existing keys
let key = i.to_be_bytes();
let value = (i + t as u64 * 1000).to_be_bytes();
tree.put(&key, &value).unwrap();
} else {
// Reader
let key = i.to_be_bytes();
let _ = tree.get(&key); // may or may not see latest write
}
}
})
}).collect();
for h in handles { h.join().unwrap(); }
assert_eq!(tree.len(), 100);
}
#[test]
fn test_fixed_map_concurrent_writes() {
let dir = tempdir().unwrap();
let map = Arc::new(
FixedMap::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap()
);
let handles: Vec<_> = (0..8u64).map(|t| {
let map = Arc::clone(&map);
thread::spawn(move || {
for i in 0..100u64 {
let key = (t * 1000 + i).to_be_bytes();
let value = (t * 1000 + i).to_be_bytes();
map.put(&key, &value).unwrap();
}
})
}).collect();
for h in handles { h.join().unwrap(); }
assert_eq!(map.len(), 800);
}
#[test]
fn test_fixed_tree_stress_overwrite() {
let dir = tempdir().unwrap();
let tree = Arc::new(
FixedTree::<[u8; 8], 8>::open(dir.path(), FixedConfig::test()).unwrap()
);
// Many threads overwriting the same key
let key = 1u64.to_be_bytes();
tree.put(&key, &0u64.to_be_bytes()).unwrap();
let handles: Vec<_> = (0..8u64).map(|t| {
let tree = Arc::clone(&tree);
thread::spawn(move || {
for i in 0..1000u64 {
let value = (t * 10000 + i).to_be_bytes();
tree.put(&key, &value).unwrap();
}
})
}).collect();
for h in handles { h.join().unwrap(); }
// Key still exists with some valid value
assert_eq!(tree.len(), 1);
let _ = tree.get(&key).expect("key must exist after stress");
}
```
- [ ] **Step 2: Verify**
Run: `cargo nextest run -p armdb -E 'test(fixed)'`
Expected: all tests pass, including concurrent tests.
- [ ] **Step 3: Run full test suite to check no regressions**
Run: `cargo nextest run -p armdb`
Expected: all existing tests + new tests pass.
- [ ] **Step 4: Commit**
```bash
git add armdb/tests/fixed_concurrent_tests.rs
git commit -m "test(armdb): add concurrent and stress tests for FixedTree and FixedMap"
```