use std::mem::size_of;
use std::path::PathBuf;
use std::hash::Hash;
use crate::const_map::ConstMap;
use crate::const_tree::ConstTree;
use crate::durability::{Durability, Fixed};
use crate::error::DbResult;
use crate::fixed::slot::{self, STATUS_OCCUPIED, is_newer, status_of, version_of};
use crate::hook::WriteHook;
use crate::key::Key;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ApplyOutcome {
Applied,
Skipped,
}
pub trait FixedReplicationTarget: Send + Sync {
fn shard_count(&self) -> usize;
fn key_len(&self) -> usize;
fn value_len(&self) -> usize;
fn shard_prefix_bits(&self) -> u8;
fn shard_occupied_count(&self, shard_id: u8) -> u32;
fn shard_dir(&self, shard_id: u8) -> PathBuf;
fn grow_shard_to(&self, shard_id: u8, min_slot_count: u32) -> DbResult<()>;
fn apply_occupied(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
value: &[u8],
) -> DbResult<ApplyOutcome>;
fn apply_deleted(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
) -> DbResult<ApplyOutcome>;
}
impl<K, const V: usize, H> FixedReplicationTarget for ConstTree<K, V, H, Fixed>
where
K: Key + 'static,
H: WriteHook<K> + 'static,
{
fn shard_count(&self) -> usize {
self.fixed_durability().shard_count()
}
fn key_len(&self) -> usize {
size_of::<K>()
}
fn value_len(&self) -> usize {
V
}
fn shard_prefix_bits(&self) -> u8 {
self.fixed_durability().shard_prefix_bits() as u8
}
fn shard_occupied_count(&self, shard_id: u8) -> u32 {
let inner = self.fixed_durability().lock_shard(shard_id as usize);
inner.bitmap.occupied()
}
fn shard_dir(&self, shard_id: u8) -> PathBuf {
self.fixed_durability()
.engine
.path()
.join(format!("shard_{:03}", shard_id))
}
fn grow_shard_to(&self, shard_id: u8, min_slot_count: u32) -> DbResult<()> {
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() < min_slot_count {
inner.grow()?;
}
if inner.slot_count() > min_slot_count {
tracing::warn!(
shard_id,
follower_count = inner.slot_count(),
leader_count = min_slot_count,
"follower has more slots than leader — ignoring (no truncate)"
);
}
Ok(())
}
fn apply_occupied(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
value: &[u8],
) -> DbResult<ApplyOutcome> {
debug_assert_eq!(key.len(), size_of::<K>());
debug_assert_eq!(value.len(), V);
let key_typed: K = K::from_bytes(key);
let mut value_arr = [0u8; V];
value_arr.copy_from_slice(value);
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() <= slot_id {
inner.grow()?;
}
let old_meta = inner.versions[slot_id as usize];
if !is_newer(version_of(meta), version_of(old_meta)) {
metrics::counter!(
"armdb.fixed.skipped_events",
"shard" => shard_id.to_string()
)
.increment(1);
return Ok(ApplyOutcome::Skipped);
}
if status_of(old_meta) == STATUS_OCCUPIED && self.get_slot_id(&key_typed) != Some(slot_id) {
let buf = inner.read_slot_header_and_key(slot_id, size_of::<K>())?;
let old_key_bytes =
&buf[slot::SLOT_HEADER_SIZE..slot::SLOT_HEADER_SIZE + size_of::<K>()];
let old_key = K::from_bytes(old_key_bytes);
if old_key.as_bytes() != key_typed.as_bytes()
&& self.remove_key_if_slot_matches(&old_key, slot_id)
{
metrics::counter!(
"armdb.fixed.stale_cleanup_count",
"shard" => shard_id.to_string()
)
.increment(1);
}
}
inner.apply_foreign_slot(slot_id, meta, key, value)?;
inner.bitmap.set(slot_id);
drop(inner);
self.upsert_replicated(&key_typed, value_arr, slot_id);
metrics::counter!(
"armdb.fixed.applied_events",
"shard" => shard_id.to_string(),
"kind" => "occupied"
)
.increment(1);
Ok(ApplyOutcome::Applied)
}
fn apply_deleted(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
) -> DbResult<ApplyOutcome> {
debug_assert_eq!(key.len(), size_of::<K>());
let key_typed: K = K::from_bytes(key);
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() <= slot_id {
inner.grow()?;
}
let old_meta = inner.versions[slot_id as usize];
if !is_newer(version_of(meta), version_of(old_meta)) {
metrics::counter!(
"armdb.fixed.skipped_events",
"shard" => shard_id.to_string()
)
.increment(1);
return Ok(ApplyOutcome::Skipped);
}
inner.apply_foreign_delete(slot_id, meta)?;
inner.bitmap.clear(slot_id);
drop(inner);
let _removed = self.remove_key_if_slot_matches(&key_typed, slot_id);
metrics::counter!(
"armdb.fixed.applied_events",
"shard" => shard_id.to_string(),
"kind" => "deleted"
)
.increment(1);
Ok(ApplyOutcome::Applied)
}
}
impl<K, const V: usize, H> FixedReplicationTarget for ConstMap<K, V, H, Fixed>
where
K: Key + Send + Sync + Hash + Eq + 'static,
H: WriteHook<K> + 'static,
{
fn shard_count(&self) -> usize {
self.fixed_durability().shard_count()
}
fn key_len(&self) -> usize {
size_of::<K>()
}
fn value_len(&self) -> usize {
V
}
fn shard_prefix_bits(&self) -> u8 {
self.fixed_durability().shard_prefix_bits() as u8
}
fn shard_occupied_count(&self, shard_id: u8) -> u32 {
let inner = self.fixed_durability().lock_shard(shard_id as usize);
inner.bitmap.occupied()
}
fn shard_dir(&self, shard_id: u8) -> PathBuf {
self.fixed_durability()
.engine
.path()
.join(format!("shard_{:03}", shard_id))
}
fn grow_shard_to(&self, shard_id: u8, min_slot_count: u32) -> DbResult<()> {
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() < min_slot_count {
inner.grow()?;
}
if inner.slot_count() > min_slot_count {
tracing::warn!(
shard_id,
follower_count = inner.slot_count(),
leader_count = min_slot_count,
"follower has more slots than leader — ignoring (no truncate)"
);
}
Ok(())
}
fn apply_occupied(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
value: &[u8],
) -> DbResult<ApplyOutcome> {
debug_assert_eq!(key.len(), size_of::<K>());
debug_assert_eq!(value.len(), V);
let key_typed: K = K::from_bytes(key);
let mut value_arr = [0u8; V];
value_arr.copy_from_slice(value);
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() <= slot_id {
inner.grow()?;
}
let old_meta = inner.versions[slot_id as usize];
if !is_newer(version_of(meta), version_of(old_meta)) {
metrics::counter!(
"armdb.fixed.skipped_events",
"shard" => shard_id.to_string()
)
.increment(1);
return Ok(ApplyOutcome::Skipped);
}
if status_of(old_meta) == STATUS_OCCUPIED && self.get_slot_id(&key_typed) != Some(slot_id) {
let buf = inner.read_slot_header_and_key(slot_id, size_of::<K>())?;
let old_key_bytes =
&buf[slot::SLOT_HEADER_SIZE..slot::SLOT_HEADER_SIZE + size_of::<K>()];
let old_key = K::from_bytes(old_key_bytes);
if old_key.as_bytes() != key_typed.as_bytes()
&& self.remove_key_if_slot_matches(&old_key, slot_id)
{
metrics::counter!(
"armdb.fixed.stale_cleanup_count",
"shard" => shard_id.to_string()
)
.increment(1);
}
}
inner.apply_foreign_slot(slot_id, meta, key, value)?;
inner.bitmap.set(slot_id);
drop(inner);
self.upsert_replicated(&key_typed, value_arr, slot_id);
metrics::counter!(
"armdb.fixed.applied_events",
"shard" => shard_id.to_string(),
"kind" => "occupied"
)
.increment(1);
Ok(ApplyOutcome::Applied)
}
fn apply_deleted(
&self,
shard_id: u8,
slot_id: u32,
meta: u32,
key: &[u8],
) -> DbResult<ApplyOutcome> {
debug_assert_eq!(key.len(), size_of::<K>());
let key_typed: K = K::from_bytes(key);
let mut inner = self.fixed_durability().lock_shard(shard_id as usize);
while inner.slot_count() <= slot_id {
inner.grow()?;
}
let old_meta = inner.versions[slot_id as usize];
if !is_newer(version_of(meta), version_of(old_meta)) {
metrics::counter!(
"armdb.fixed.skipped_events",
"shard" => shard_id.to_string()
)
.increment(1);
return Ok(ApplyOutcome::Skipped);
}
inner.apply_foreign_delete(slot_id, meta)?;
inner.bitmap.clear(slot_id);
drop(inner);
let _removed = self.remove_key_if_slot_matches(&key_typed, slot_id);
metrics::counter!(
"armdb.fixed.applied_events",
"shard" => shard_id.to_string(),
"kind" => "deleted"
)
.increment(1);
Ok(ApplyOutcome::Applied)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::FixedConfig;
use crate::fixed::FixedTree;
use crate::fixed::slot::{STATUS_OCCUPIED, pack_meta};
use tempfile::tempdir;
fn cfg() -> FixedConfig {
FixedConfig {
shard_count: 1,
grow_step: 64,
..FixedConfig::test()
}
}
type T = FixedTree<[u8; 8], 8>;
fn shard_of(tree: &T, key: &[u8; 8]) -> usize {
tree.shard_for(key)
}
#[test]
fn test_apply_occupied_insert() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let key = 1u64.to_be_bytes();
let value = 42u64.to_be_bytes();
let meta = pack_meta(STATUS_OCCUPIED, 1);
let shard = shard_of(&tree, &key) as u8;
let out = tree.apply_occupied(shard, 5, meta, &key, &value).unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(tree.get(&key), Some(value));
assert_eq!(tree.get_slot_id(&key), Some(5));
}
#[test]
fn test_apply_deleted_bug1_moved_key() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let key = 7u64.to_be_bytes();
let value = 7u64.to_be_bytes();
let shard = shard_of(&tree, &key) as u8;
let occ = pack_meta(STATUS_OCCUPIED, 10);
tree.apply_occupied(shard, 13, occ, &key, &value).unwrap();
assert_eq!(tree.get_slot_id(&key), Some(13));
let del = pack_meta(crate::fixed::slot::STATUS_DELETED, 1);
let out = tree.apply_deleted(shard, 5, del, &key).unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(
tree.get_slot_id(&key),
Some(13),
"Bug 1 regression: stale DELETE removed a live mapping"
);
assert_eq!(tree.get(&key), Some(value));
}
#[test]
fn test_apply_occupied_bug2_stale_cleanup() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let key_x = 100u64.to_be_bytes();
let key_y = 200u64.to_be_bytes();
let val_x = 11u64.to_be_bytes();
let val_y = 22u64.to_be_bytes();
let val_y_new = 33u64.to_be_bytes();
let shard = 0u8;
let occ_x = pack_meta(STATUS_OCCUPIED, 2);
tree.apply_occupied(shard, 3, occ_x, &key_x, &val_x)
.unwrap();
let occ_y = pack_meta(STATUS_OCCUPIED, 4);
tree.apply_occupied(shard, 7, occ_y, &key_y, &val_y)
.unwrap();
assert_eq!(tree.get_slot_id(&key_x), Some(3));
assert_eq!(tree.get_slot_id(&key_y), Some(7));
let occ_reassign = pack_meta(STATUS_OCCUPIED, 5);
let out = tree
.apply_occupied(shard, 3, occ_reassign, &key_y, &val_y_new)
.unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(
tree.get_slot_id(&key_x),
None,
"Bug 2 regression: stale key_X mapping to slot_A not cleaned up"
);
assert!(!tree.contains(&key_x));
assert_eq!(tree.get_slot_id(&key_y), Some(3));
assert_eq!(tree.get(&key_y), Some(val_y_new));
}
#[test]
fn test_apply_modular_skipped() {
let dir = tempdir().unwrap();
let tree: T = FixedTree::open(dir.path(), cfg()).unwrap();
let key = 9u64.to_be_bytes();
let value = 9u64.to_be_bytes();
let shard = shard_of(&tree, &key) as u8;
let high = pack_meta(STATUS_OCCUPIED, 100);
tree.apply_occupied(shard, 0, high, &key, &value).unwrap();
let low = pack_meta(STATUS_OCCUPIED, 50);
let out = tree.apply_occupied(shard, 0, low, &key, &value).unwrap();
assert_eq!(out, ApplyOutcome::Skipped);
let low_del = pack_meta(crate::fixed::slot::STATUS_DELETED, 50);
let out = tree.apply_deleted(shard, 0, low_del, &key).unwrap();
assert_eq!(out, ApplyOutcome::Skipped);
assert_eq!(tree.get(&key), Some(value));
}
type M = crate::fixed::FixedMap<[u8; 8], 8>;
#[test]
fn test_map_apply_occupied_and_deleted() {
let dir = tempdir().unwrap();
let map: M = M::open(dir.path(), cfg()).unwrap();
let key = 42u64.to_be_bytes();
let value = 99u64.to_be_bytes();
let shard = map.shard_for(&key) as u8;
let meta_occ = pack_meta(STATUS_OCCUPIED, 1);
let out = map
.apply_occupied(shard, 0, meta_occ, &key, &value)
.unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(map.get(&key), Some(value));
assert_eq!(map.get_slot_id(&key), Some(0));
let meta_del = pack_meta(crate::fixed::slot::STATUS_DELETED, 2);
let out = map.apply_deleted(shard, 0, meta_del, &key).unwrap();
assert_eq!(out, ApplyOutcome::Applied);
assert_eq!(map.get(&key), None);
}
}