use std::mem::size_of;
use std::path::PathBuf;
use std::hash::Hash;
use crate::const_map::ConstMap;
use crate::const_tree::ConstTree;
use crate::durability::{Durability, Fixed};
use crate::error::{DbError, DbResult};
use crate::fixed::slot::{self, STATUS_FREE, STATUS_OCCUPIED, is_newer, status_of, version_of};
use crate::hook::WriteHook;
use crate::key::Key;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApplyOutcome {
Applied,
Skipped,
}
fn validate_len(kind: &str, actual: usize, expected: usize) -> DbResult<()> {
if actual != expected {
return Err(DbError::Replication(format!(
"fixed replication {kind} length mismatch: got {actual}, expected {expected}"
)));
}
Ok(())
}
pub trait FixedReplicationTarget: Send + Sync {
fn shard_count(&self) -> usize;
fn key_len(&self) -> usize;
fn value_len(&self) -> usize;
fn shard_prefix_bits(&self) -> u8;
fn shard_occupied_count(&self, shard_id: u8) -> u32;
fn shard_dir(&self, shard_id: u8) -> PathBuf;
fn grow_shard_to(&self, shard_id: u8, min_slot_count: u32) -> DbResult<()>;
fn apply_occupied(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
value: &[u8],
) -> DbResult<ApplyOutcome>;
fn apply_deleted(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
) -> DbResult<ApplyOutcome>;
fn apply_reset(&self, shard_id: u8, slot_id: u32, meta: u32) -> DbResult<ApplyOutcome>;
fn sync_applied_batch(&self, shard_id: u8) -> DbResult<()>;
}
impl<K, const V: usize, H> FixedReplicationTarget for ConstTree<K, V, H, Fixed>
where
K: Key + 'static,
H: WriteHook<K> + 'static,
{
fn shard_count(&self) -> usize {
self.fixed_durability().shard_count()
}
fn key_len(&self) -> usize {
size_of::<K>()
}
fn value_len(&self) -> usize {
V
}
fn shard_prefix_bits(&self) -> u8 {
self.fixed_durability().shard_prefix_bits() as u8
}
fn shard_occupied_count(&self, shard_id: u8) -> u32 {
let inner = self.fixed_durability().lock_shard(shard_id as usize);
inner.bitmap.occupied()
}
fn shard_dir(&self, shard_id: u8) -> PathBuf {
self.fixed_durability()
.engine
.path()
.join(format!("shard_{:03}", shard_id))
}
fn grow_shard_to(&self, shard_id: u8, min_slot_count: u32) -> DbResult<()> {
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() < min_slot_count {
inner.grow()?;
}
if inner.slot_count() > min_slot_count {
tracing::warn!(
shard_id,
follower_count = inner.slot_count(),
leader_count = min_slot_count,
"follower has more slots than leader — ignoring (no truncate)"
);
}
Ok(())
}
fn apply_occupied(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
value: &[u8],
) -> DbResult<ApplyOutcome> {
validate_len("key", key.len(), size_of::<K>())?;
validate_len("value", value.len(), V)?;
let key_typed: K = K::from_bytes(key);
let mut value_arr = [0u8; V];
value_arr.copy_from_slice(value);
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() <= slot_id {
inner.grow()?;
}
let old_meta = inner.versions[slot_id as usize];
if !is_newer(version_of(meta), version_of(old_meta)) {
metrics::counter!(
"armdb.fixed.skipped_events",
"shard" => shard_id.to_string()
)
.increment(1);
return Ok(ApplyOutcome::Skipped);
}
if status_of(old_meta) == STATUS_OCCUPIED && self.get_slot_id(&key_typed) != Some(slot_id) {
let buf = inner.read_slot_header_and_key(slot_id, size_of::<K>())?;
let old_key_bytes =
&buf[slot::SLOT_HEADER_SIZE..slot::SLOT_HEADER_SIZE + size_of::<K>()];
let old_key = K::from_bytes(old_key_bytes);
if old_key.as_bytes() != key_typed.as_bytes()
&& self.remove_key_if_slot_matches(&old_key, slot_id)
{
metrics::counter!(
"armdb.fixed.stale_cleanup_count",
"shard" => shard_id.to_string()
)
.increment(1);
}
}
inner.apply_foreign_slot_deferred(slot_id, meta, key, value)?;
inner.bitmap.set(slot_id);
self.upsert_replicated(&key_typed, value_arr, slot_id);
metrics::counter!(
"armdb.fixed.applied_events",
"shard" => shard_id.to_string(),
"kind" => "occupied"
)
.increment(1);
Ok(ApplyOutcome::Applied)
}
fn apply_deleted(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
) -> DbResult<ApplyOutcome> {
validate_len("key", key.len(), size_of::<K>())?;
let key_typed: K = K::from_bytes(key);
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() <= slot_id {
inner.grow()?;
}
let old_meta = inner.versions[slot_id as usize];
if !is_newer(version_of(meta), version_of(old_meta)) {
metrics::counter!(
"armdb.fixed.skipped_events",
"shard" => shard_id.to_string()
)
.increment(1);
return Ok(ApplyOutcome::Skipped);
}
inner.apply_foreign_delete_deferred(slot_id, meta)?;
inner.bitmap.clear(slot_id);
let _removed = self.remove_key_if_slot_matches(&key_typed, slot_id);
metrics::counter!(
"armdb.fixed.applied_events",
"shard" => shard_id.to_string(),
"kind" => "deleted"
)
.increment(1);
Ok(ApplyOutcome::Applied)
}
fn apply_reset(&self, shard_id: u8, slot_id: u32, meta: u32) -> DbResult<ApplyOutcome> {
if status_of(meta) != STATUS_FREE {
return Err(DbError::Replication(format!(
"fixed replication reset must carry FREE status, got {}",
status_of(meta)
)));
}
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() <= slot_id {
inner.grow()?;
}
let old_meta = inner.versions[slot_id as usize];
let new_version = version_of(meta);
let old_version = version_of(old_meta);
let should_apply = is_newer(new_version, old_version)
|| (new_version == old_version && status_of(old_meta) != STATUS_FREE);
if !should_apply {
metrics::counter!(
"armdb.fixed.skipped_events",
"shard" => shard_id.to_string()
)
.increment(1);
return Ok(ApplyOutcome::Skipped);
}
let old_key = if status_of(old_meta) == STATUS_OCCUPIED {
let buf = inner.read_slot_header_and_key(slot_id, size_of::<K>())?;
let old_key_bytes =
&buf[slot::SLOT_HEADER_SIZE..slot::SLOT_HEADER_SIZE + size_of::<K>()];
Some(K::from_bytes(old_key_bytes))
} else {
None
};
inner.apply_foreign_reset_deferred(slot_id, meta)?;
if let Some(old_key) = old_key {
let _removed = self.remove_key_if_slot_matches(&old_key, slot_id);
}
metrics::counter!(
"armdb.fixed.applied_events",
"shard" => shard_id.to_string(),
"kind" => "reset"
)
.increment(1);
Ok(ApplyOutcome::Applied)
}
fn sync_applied_batch(&self, shard_id: u8) -> DbResult<()> {
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
inner.sync_replication_batch()
}
}
impl<K, const V: usize, H> FixedReplicationTarget for ConstMap<K, V, H, Fixed>
where
K: Key + Send + Sync + Hash + Eq + 'static,
H: WriteHook<K> + 'static,
{
fn shard_count(&self) -> usize {
self.fixed_durability().shard_count()
}
fn key_len(&self) -> usize {
size_of::<K>()
}
fn value_len(&self) -> usize {
V
}
fn shard_prefix_bits(&self) -> u8 {
self.fixed_durability().shard_prefix_bits() as u8
}
fn shard_occupied_count(&self, shard_id: u8) -> u32 {
let inner = self.fixed_durability().lock_shard(shard_id as usize);
inner.bitmap.occupied()
}
fn shard_dir(&self, shard_id: u8) -> PathBuf {
self.fixed_durability()
.engine
.path()
.join(format!("shard_{:03}", shard_id))
}
fn grow_shard_to(&self, shard_id: u8, min_slot_count: u32) -> DbResult<()> {
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() < min_slot_count {
inner.grow()?;
}
if inner.slot_count() > min_slot_count {
tracing::warn!(
shard_id,
follower_count = inner.slot_count(),
leader_count = min_slot_count,
"follower has more slots than leader — ignoring (no truncate)"
);
}
Ok(())
}
fn apply_occupied(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
value: &[u8],
) -> DbResult<ApplyOutcome> {
validate_len("key", key.len(), size_of::<K>())?;
validate_len("value", value.len(), V)?;
let key_typed: K = K::from_bytes(key);
let mut value_arr = [0u8; V];
value_arr.copy_from_slice(value);
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() <= slot_id {
inner.grow()?;
}
let old_meta = inner.versions[slot_id as usize];
if !is_newer(version_of(meta), version_of(old_meta)) {
metrics::counter!(
"armdb.fixed.skipped_events",
"shard" => shard_id.to_string()
)
.increment(1);
return Ok(ApplyOutcome::Skipped);
}
if status_of(old_meta) == STATUS_OCCUPIED && self.get_slot_id(&key_typed) != Some(slot_id) {
let buf = inner.read_slot_header_and_key(slot_id, size_of::<K>())?;
let old_key_bytes =
&buf[slot::SLOT_HEADER_SIZE..slot::SLOT_HEADER_SIZE + size_of::<K>()];
let old_key = K::from_bytes(old_key_bytes);
if old_key.as_bytes() != key_typed.as_bytes()
&& self.remove_key_if_slot_matches(&old_key, slot_id)
{
metrics::counter!(
"armdb.fixed.stale_cleanup_count",
"shard" => shard_id.to_string()
)
.increment(1);
}
}
inner.apply_foreign_slot_deferred(slot_id, meta, key, value)?;
inner.bitmap.set(slot_id);
self.upsert_replicated(&key_typed, value_arr, slot_id);
metrics::counter!(
"armdb.fixed.applied_events",
"shard" => shard_id.to_string(),
"kind" => "occupied"
)
.increment(1);
Ok(ApplyOutcome::Applied)
}
fn apply_deleted(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
) -> DbResult<ApplyOutcome> {
validate_len("key", key.len(), size_of::<K>())?;
let key_typed: K = K::from_bytes(key);
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() <= slot_id {
inner.grow()?;
}
let old_meta = inner.versions[slot_id as usize];
if !is_newer(version_of(meta), version_of(old_meta)) {
metrics::counter!(
"armdb.fixed.skipped_events",
"shard" => shard_id.to_string()
)
.increment(1);
return Ok(ApplyOutcome::Skipped);
}
inner.apply_foreign_delete_deferred(slot_id, meta)?;
inner.bitmap.clear(slot_id);
let _removed = self.remove_key_if_slot_matches(&key_typed, slot_id);
metrics::counter!(
"armdb.fixed.applied_events",
"shard" => shard_id.to_string(),
"kind" => "deleted"
)
.increment(1);
Ok(ApplyOutcome::Applied)
}
fn apply_reset(&self, shard_id: u8, slot_id: u32, meta: u32) -> DbResult<ApplyOutcome> {
if status_of(meta) != STATUS_FREE {
return Err(DbError::Replication(format!(
"fixed replication reset must carry FREE status, got {}",
status_of(meta)
)));
}
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() <= slot_id {
inner.grow()?;
}
let old_meta = inner.versions[slot_id as usize];
let new_version = version_of(meta);
let old_version = version_of(old_meta);
let should_apply = is_newer(new_version, old_version)
|| (new_version == old_version && status_of(old_meta) != STATUS_FREE);
if !should_apply {
metrics::counter!(
"armdb.fixed.skipped_events",
"shard" => shard_id.to_string()
)
.increment(1);
return Ok(ApplyOutcome::Skipped);
}
let old_key = if status_of(old_meta) == STATUS_OCCUPIED {
let buf = inner.read_slot_header_and_key(slot_id, size_of::<K>())?;
let old_key_bytes =
&buf[slot::SLOT_HEADER_SIZE..slot::SLOT_HEADER_SIZE + size_of::<K>()];
Some(K::from_bytes(old_key_bytes))
} else {
None
};
inner.apply_foreign_reset_deferred(slot_id, meta)?;
if let Some(old_key) = old_key {
let _removed = self.remove_key_if_slot_matches(&old_key, slot_id);
}
metrics::counter!(
"armdb.fixed.applied_events",
"shard" => shard_id.to_string(),
"kind" => "reset"
)
.increment(1);
Ok(ApplyOutcome::Applied)
}
fn sync_applied_batch(&self, shard_id: u8) -> DbResult<()> {
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
inner.sync_replication_batch()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::FixedConfig;
use crate::fixed::FixedTree;
use crate::fixed::slot::{STATUS_OCCUPIED, pack_meta};
use tempfile::tempdir;
fn cfg() -> FixedConfig {
FixedConfig {
shard_count: 1,
grow_step: 64,
..FixedConfig::test()
}
}
type T = FixedTree<[u8; 8], 8>;
fn shard_of(tree: &T, key: &[u8; 8]) -> usize {
tree.shard_for(key)
}
#[test]
fn test_apply_occupied_insert() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let key = 1u64.to_be_bytes();
let value = 42u64.to_be_bytes();
let meta = pack_meta(STATUS_OCCUPIED, 1);
let shard = shard_of(&tree, &key) as u8;
let out = tree.apply_occupied(shard, 5, meta, &key, &value).unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(tree.get(&key), Some(value));
assert_eq!(tree.get_slot_id(&key), Some(5));
}
#[test]
fn test_apply_deleted_bug1_moved_key() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let key = 7u64.to_be_bytes();
let value = 7u64.to_be_bytes();
let shard = shard_of(&tree, &key) as u8;
let occ = pack_meta(STATUS_OCCUPIED, 10);
tree.apply_occupied(shard, 13, occ, &key, &value).unwrap();
assert_eq!(tree.get_slot_id(&key), Some(13));
let del = pack_meta(crate::fixed::slot::STATUS_DELETED, 1);
let out = tree.apply_deleted(shard, 5, del, &key).unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(
tree.get_slot_id(&key),
Some(13),
"Bug 1 regression: stale DELETE removed a live mapping"
);
assert_eq!(tree.get(&key), Some(value));
}
#[test]
fn test_apply_occupied_bug2_stale_cleanup() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let key_x = 100u64.to_be_bytes();
let key_y = 200u64.to_be_bytes();
let val_x = 11u64.to_be_bytes();
let val_y = 22u64.to_be_bytes();
let val_y_new = 33u64.to_be_bytes();
let shard = 0u8;
let occ_x = pack_meta(STATUS_OCCUPIED, 2);
tree.apply_occupied(shard, 3, occ_x, &key_x, &val_x)
.unwrap();
let occ_y = pack_meta(STATUS_OCCUPIED, 4);
tree.apply_occupied(shard, 7, occ_y, &key_y, &val_y)
.unwrap();
assert_eq!(tree.get_slot_id(&key_x), Some(3));
assert_eq!(tree.get_slot_id(&key_y), Some(7));
let occ_reassign = pack_meta(STATUS_OCCUPIED, 5);
let out = tree
.apply_occupied(shard, 3, occ_reassign, &key_y, &val_y_new)
.unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(
tree.get_slot_id(&key_x),
None,
"Bug 2 regression: stale key_X mapping to slot_A not cleaned up"
);
assert!(!tree.contains(&key_x));
assert_eq!(tree.get_slot_id(&key_y), Some(3));
assert_eq!(tree.get(&key_y), Some(val_y_new));
}
#[test]
fn test_apply_modular_skipped() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let key = 9u64.to_be_bytes();
let value = 9u64.to_be_bytes();
let shard = shard_of(&tree, &key) as u8;
let high = pack_meta(STATUS_OCCUPIED, 100);
tree.apply_occupied(shard, 0, high, &key, &value).unwrap();
let low = pack_meta(STATUS_OCCUPIED, 50);
let out = tree.apply_occupied(shard, 0, low, &key, &value).unwrap();
assert_eq!(out, ApplyOutcome::Skipped);
let low_del = pack_meta(crate::fixed::slot::STATUS_DELETED, 50);
let out = tree.apply_deleted(shard, 0, low_del, &key).unwrap();
assert_eq!(out, ApplyOutcome::Skipped);
assert_eq!(tree.get(&key), Some(value));
}
type M = crate::fixed::FixedMap<[u8; 8], 8>;
#[test]
fn test_map_apply_occupied_and_deleted() {
let dir = tempdir().unwrap();
let map: M = M::open(dir.path(), cfg()).unwrap();
let key = 42u64.to_be_bytes();
let value = 99u64.to_be_bytes();
let shard = map.shard_for(&key) as u8;
let meta_occ = pack_meta(STATUS_OCCUPIED, 1);
let out = map
.apply_occupied(shard, 0, meta_occ, &key, &value)
.unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(map.get(&key), Some(value));
assert_eq!(map.get_slot_id(&key), Some(0));
let meta_del = pack_meta(crate::fixed::slot::STATUS_DELETED, 2);
let out = map.apply_deleted(shard, 0, meta_del, &key).unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(map.get(&key), None);
}
#[test]
fn test_apply_occupied_rejects_wrong_key_and_value_lengths() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let meta = pack_meta(STATUS_OCCUPIED, 1);
let err = tree
.apply_occupied(0, 0, meta, b"short", &[0u8; 8])
.unwrap_err();
assert!(matches!(err, crate::error::DbError::Replication(_)));
let key = 1u64.to_be_bytes();
let err = tree.apply_occupied(0, 0, meta, &key, b"short").unwrap_err();
assert!(matches!(err, crate::error::DbError::Replication(_)));
}
#[test]
fn test_apply_deleted_rejects_wrong_key_length() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let meta = pack_meta(crate::fixed::slot::STATUS_DELETED, 1);
let err = tree.apply_deleted(0, 0, meta, b"short").unwrap_err();
assert!(matches!(err, crate::error::DbError::Replication(_)));
}
#[test]
fn test_apply_reset_removes_stale_index_mapping() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let key = 10u64.to_be_bytes();
let value = 20u64.to_be_bytes();
tree.apply_occupied(0, 3, pack_meta(STATUS_OCCUPIED, 2), &key, &value)
.unwrap();
assert_eq!(tree.get_slot_id(&key), Some(3));
let reset = pack_meta(crate::fixed::slot::STATUS_FREE, 3);
let out = tree.apply_reset(0, 3, reset).unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(tree.get_slot_id(&key), None);
assert_eq!(tree.get(&key), None);
}
#[test]
fn test_apply_reset_preserves_mapping_when_key_moved() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let key = 10u64.to_be_bytes();
let old_value = 20u64.to_be_bytes();
let new_value = 30u64.to_be_bytes();
tree.apply_occupied(0, 3, pack_meta(STATUS_OCCUPIED, 2), &key, &old_value)
.unwrap();
tree.apply_occupied(0, 9, pack_meta(STATUS_OCCUPIED, 4), &key, &new_value)
.unwrap();
let reset = pack_meta(crate::fixed::slot::STATUS_FREE, 3);
let out = tree.apply_reset(0, 3, reset).unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(tree.get_slot_id(&key), Some(9));
assert_eq!(tree.get(&key), Some(new_value));
}
#[test]
fn test_sync_applied_batch_is_callable() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
tree.sync_applied_batch(0).unwrap();
}
#[test]
fn test_map_apply_reset_removes_stale_index_mapping() {
let dir = tempdir().unwrap();
let map: M = M::open(dir.path(), cfg()).unwrap();
let key = 10u64.to_be_bytes();
let value = 20u64.to_be_bytes();
map.apply_occupied(0, 3, pack_meta(STATUS_OCCUPIED, 2), &key, &value)
.unwrap();
assert_eq!(map.get_slot_id(&key), Some(3));
let out = map
.apply_reset(0, 3, pack_meta(crate::fixed::slot::STATUS_FREE, 3))
.unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(map.get_slot_id(&key), None);
}
}