use std::collections::VecDeque;
use std::sync::{Arc, Weak};
#[cfg(not(test))]
use parking_lot::Mutex;
#[cfg(test)]
use tracing_mutex::parkinglot::Mutex;
use crate::BlockId;
use crate::blocks::{
BlockDuplicationPolicy, BlockMetadata, CompleteBlock, ImmutableBlockInner, MutableBlock,
SequenceHash,
};
use crate::metrics::BlockPoolMetrics;
use crate::registry::BlockRegistrationHandle;
use super::SeqHashMap;
pub(crate) trait InactiveIndex: Send + Sync {
fn find_matches(
&mut self,
hashes: &[SequenceHash],
touch: bool,
) -> Vec<(SequenceHash, BlockId)>;
fn find_match(&mut self, hash: SequenceHash, touch: bool) -> Option<(SequenceHash, BlockId)> {
self.find_matches(&[hash], touch).into_iter().next()
}
fn scan_matches(
&mut self,
hashes: &[SequenceHash],
touch: bool,
) -> Vec<(SequenceHash, BlockId)>;
fn allocate(&mut self, count: usize) -> Vec<(SequenceHash, BlockId)>;
fn insert(&mut self, seq_hash: SequenceHash, block_id: BlockId);
fn len(&self) -> usize;
#[allow(dead_code)]
fn is_empty(&self) -> bool {
self.len() == 0
}
fn has(&self, seq_hash: SequenceHash) -> bool;
#[allow(dead_code)]
fn take(&mut self, seq_hash: SequenceHash, block_id: BlockId) -> bool;
fn allocate_all(&mut self) -> Vec<(SequenceHash, BlockId)> {
let n = self.len();
self.allocate(n)
}
}
#[allow(dead_code)]
pub(crate) enum SlotState<T: BlockMetadata> {
Reset,
Mutable,
Staged { seq_hash: SequenceHash },
Primary {
seq_hash: SequenceHash,
handle: BlockRegistrationHandle,
inner: Weak<ImmutableBlockInner<T>>,
},
Duplicate {
seq_hash: SequenceHash,
handle: BlockRegistrationHandle,
inner: Weak<ImmutableBlockInner<T>>,
},
Inactive {
seq_hash: SequenceHash,
handle: BlockRegistrationHandle,
},
}
impl<T: BlockMetadata> std::fmt::Debug for SlotState<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SlotState::Reset => f.write_str("Reset"),
SlotState::Mutable => f.write_str("Mutable"),
SlotState::Staged { seq_hash } => f
.debug_struct("Staged")
.field("seq_hash", seq_hash)
.finish(),
SlotState::Primary { seq_hash, .. } => f
.debug_struct("Primary")
.field("seq_hash", seq_hash)
.finish(),
SlotState::Duplicate { seq_hash, .. } => f
.debug_struct("Duplicate")
.field("seq_hash", seq_hash)
.finish(),
SlotState::Inactive { seq_hash, .. } => f
.debug_struct("Inactive")
.field("seq_hash", seq_hash)
.finish(),
}
}
}
#[derive(Debug)]
pub(crate) struct BlockSlot<T: BlockMetadata> {
pub(crate) block_size: usize,
pub(crate) state: SlotState<T>,
}
pub(crate) struct BlockStoreInner<T: BlockMetadata> {
slots: Vec<BlockSlot<T>>,
free: VecDeque<BlockId>,
inactive: Box<dyn InactiveIndex>,
active_by_hash: SeqHashMap<BlockId>,
reset_on_release: Vec<bool>,
}
pub(crate) struct BlockStore<T: BlockMetadata> {
id: crate::ManagerId,
inner: Mutex<BlockStoreInner<T>>,
block_size: usize,
total_blocks: usize,
metrics: Arc<BlockPoolMetrics>,
default_reset_on_release: bool,
#[cfg(test)]
release_primary_gate: Mutex<()>,
#[cfg(test)]
release_primary_arrivals: std::sync::atomic::AtomicU64,
}
#[allow(dead_code)]
impl<T: BlockMetadata + Sync> BlockStore<T> {
pub(crate) fn new(
total_blocks: usize,
block_size: usize,
inactive: Box<dyn InactiveIndex>,
metrics: Arc<BlockPoolMetrics>,
default_reset_on_release: bool,
) -> Arc<Self> {
let mut slots = Vec::with_capacity(total_blocks);
let mut free = VecDeque::with_capacity(total_blocks);
for i in 0..total_blocks {
slots.push(BlockSlot {
block_size,
state: SlotState::Reset,
});
free.push_back(i);
}
let reset_on_release = vec![default_reset_on_release; total_blocks];
Arc::new(Self {
id: crate::ManagerId::next(),
inner: Mutex::new(BlockStoreInner {
slots,
free,
inactive,
active_by_hash: SeqHashMap::default(),
reset_on_release,
}),
block_size,
total_blocks,
metrics,
default_reset_on_release,
#[cfg(test)]
release_primary_gate: Mutex::new(()),
#[cfg(test)]
release_primary_arrivals: std::sync::atomic::AtomicU64::new(0),
})
}
pub(crate) fn default_reset_on_release(&self) -> bool {
self.default_reset_on_release
}
pub(crate) fn store_reset_on_release(&self, block_id: BlockId, value: bool) {
self.inner.lock().reset_on_release[block_id] = value;
}
pub(crate) fn id(&self) -> crate::ManagerId {
self.id
}
#[cfg(test)]
pub(crate) fn pause_release_primary(&self) -> tracing_mutex::parkinglot::MutexGuard<'_, ()> {
self.release_primary_gate.lock()
}
#[cfg(test)]
pub(crate) fn release_primary_arrivals(&self) -> u64 {
self.release_primary_arrivals
.load(std::sync::atomic::Ordering::Acquire)
}
pub(crate) fn block_size(&self) -> usize {
self.block_size
}
pub(crate) fn total_blocks(&self) -> usize {
self.total_blocks
}
pub(crate) fn metrics(&self) -> &Arc<BlockPoolMetrics> {
&self.metrics
}
pub(crate) fn reset_len(&self) -> usize {
self.inner.lock().free.len()
}
pub(crate) fn inactive_len(&self) -> usize {
self.inner.lock().inactive.len()
}
pub(crate) fn available_len(&self) -> usize {
let inner = self.inner.lock();
inner.free.len() + inner.inactive.len()
}
pub(crate) fn has_inactive(&self, seq_hash: SequenceHash) -> bool {
self.inner.lock().inactive.has(seq_hash)
}
pub(crate) fn slot_block_size(&self, block_id: BlockId) -> usize {
self.inner.lock().slots[block_id].block_size
}
pub(crate) fn allocate_reset_blocks(self: &Arc<Self>, count: usize) -> Vec<MutableBlock<T>> {
let mut inner = self.inner.lock();
let take = std::cmp::min(count, inner.free.len());
let mut out = Vec::with_capacity(take);
for _ in 0..take {
let id = inner.free.pop_front().unwrap();
inner.slots[id].state = SlotState::Mutable;
inner.reset_on_release[id] = self.default_reset_on_release;
let block_size = inner.slots[id].block_size;
out.push(MutableBlock::from_store(self.clone(), id, block_size));
}
self.metrics.dec_reset_pool_size_by(take as i64);
self.metrics.inc_inflight_mutable_by(take as i64);
self.metrics.inc_allocations(take as u64);
self.metrics.inc_allocations_from_reset(take as u64);
out
}
pub(crate) fn allocate_atomic(
self: &Arc<Self>,
count: usize,
) -> Option<(Vec<MutableBlock<T>>, Vec<SequenceHash>)> {
if count == 0 {
return Some((Vec::new(), Vec::new()));
}
let mut inner = self.inner.lock();
if inner.free.len() + inner.inactive.len() < count {
return None;
}
let from_reset = std::cmp::min(count, inner.free.len());
let from_inactive = count - from_reset;
let mut reset_ids: Vec<BlockId> = Vec::with_capacity(from_reset);
for _ in 0..from_reset {
reset_ids.push(inner.free.pop_front().unwrap());
}
let evicted_pairs = if from_inactive > 0 {
inner.inactive.allocate(from_inactive)
} else {
Vec::new()
};
if evicted_pairs.len() != from_inactive {
for (h, id) in evicted_pairs {
inner.inactive.insert(h, id);
}
for id in reset_ids.into_iter().rev() {
inner.free.push_front(id);
}
self.metrics.inc_allocate_atomic_rollback();
return None;
}
let mut blocks = Vec::with_capacity(count);
for id in reset_ids {
inner.slots[id].state = SlotState::Mutable;
inner.reset_on_release[id] = self.default_reset_on_release;
let block_size = inner.slots[id].block_size;
blocks.push(MutableBlock::from_store(self.clone(), id, block_size));
}
let mut evicted = Vec::with_capacity(from_inactive);
let mut handles = Vec::with_capacity(from_inactive);
for (seq_hash, block_id) in evicted_pairs {
let handle = take_inactive_handle(&mut inner.slots[block_id], block_id);
inner.slots[block_id].state = SlotState::Mutable;
inner.reset_on_release[block_id] = self.default_reset_on_release;
let block_size = inner.slots[block_id].block_size;
blocks.push(MutableBlock::from_store(self.clone(), block_id, block_size));
evicted.push(seq_hash);
handles.push(handle);
}
self.metrics.dec_reset_pool_size_by(from_reset as i64);
self.metrics.dec_inactive_pool_size_by(from_inactive as i64);
self.metrics.inc_inflight_mutable_by(count as i64);
self.metrics.inc_evictions(from_inactive as u64);
self.metrics.inc_allocations(count as u64);
self.metrics.inc_allocations_from_reset(from_reset as u64);
drop(inner);
for h in handles {
h.mark_absent::<T>();
}
Some((blocks, evicted))
}
pub(crate) fn drain_inactive_to_mutable(self: &Arc<Self>) -> Vec<MutableBlock<T>> {
let mut inner = self.inner.lock();
let drained = inner.inactive.allocate_all();
let count = drained.len();
let mut handles = Vec::with_capacity(count);
let mut out = Vec::with_capacity(count);
for (_seq_hash, block_id) in drained {
let handle = take_inactive_handle(&mut inner.slots[block_id], block_id);
inner.slots[block_id].state = SlotState::Mutable;
inner.reset_on_release[block_id] = self.default_reset_on_release;
handles.push(handle);
let block_size = inner.slots[block_id].block_size;
out.push(MutableBlock::from_store(self.clone(), block_id, block_size));
}
self.metrics.dec_inactive_pool_size_by(count as i64);
self.metrics.inc_inflight_mutable_by(count as i64);
drop(inner);
for h in handles {
h.mark_absent::<T>();
}
out
}
pub(crate) fn scan_inactive_primaries(
self: &Arc<Self>,
hashes: &[SequenceHash],
touch: bool,
) -> Vec<(SequenceHash, Arc<ImmutableBlockInner<T>>)> {
self.promote_inactive(hashes, touch, true)
}
pub(crate) fn acquire_for_hash(
self: &Arc<Self>,
seq_hash: SequenceHash,
touch: bool,
) -> Option<Arc<ImmutableBlockInner<T>>> {
let mut inner = self.inner.lock();
self.acquire_for_hash_locked(&mut inner, seq_hash, touch)
}
fn acquire_for_hash_locked(
self: &Arc<Self>,
inner: &mut BlockStoreInner<T>,
seq_hash: SequenceHash,
touch: bool,
) -> Option<Arc<ImmutableBlockInner<T>>> {
if let Some(&block_id) = inner.active_by_hash.get(&seq_hash) {
let live: Option<Arc<ImmutableBlockInner<T>>> = match &inner.slots[block_id].state {
SlotState::Primary { inner: weak, .. } => weak.upgrade(),
other => panic!("active_by_hash[{seq_hash:?}] = {block_id} but slot is {other:?}"),
};
if let Some(arc) = live {
return Some(arc);
}
self.eager_primary_to_inactive_locked(inner, seq_hash, block_id);
}
let block_id = inner.inactive.find_match(seq_hash, touch)?.1;
self.metrics.dec_inactive_pool_size();
let handle = take_inactive_handle(&mut inner.slots[block_id], block_id);
let inner_arc =
ImmutableBlockInner::new_primary(self.clone(), block_id, seq_hash, handle.clone());
inner.slots[block_id].state = SlotState::Primary {
seq_hash,
handle,
inner: Arc::downgrade(&inner_arc),
};
inner.active_by_hash.insert(seq_hash, block_id);
Some(inner_arc)
}
pub(crate) fn match_prefix_locked_batch(
self: &Arc<Self>,
hashes: &[SequenceHash],
) -> Vec<Arc<ImmutableBlockInner<T>>> {
let mut inner = self.inner.lock();
let mut out = Vec::with_capacity(hashes.len());
for &h in hashes {
match self.acquire_for_hash_locked(&mut inner, h, false) {
Some(arc) => out.push(arc),
None => break,
}
}
out
}
pub(crate) fn register_completed_block(
self: &Arc<Self>,
block: CompleteBlock<T>,
handle: BlockRegistrationHandle,
policy: BlockDuplicationPolicy,
) -> Arc<ImmutableBlockInner<T>> {
let block_id = block.block_id();
let seq_hash = block.sequence_hash();
debug_assert_eq!(seq_hash, handle.seq_hash());
let mut block = block;
block.disarm();
let mut inner = self.inner.lock();
let existing = self.acquire_for_hash_locked(&mut inner, seq_hash, false);
let mut presence_added = false;
let result = if let Some(existing_primary) = existing {
assert_ne!(
existing_primary.block_id(),
block_id,
"register_completed_block: collision with same block_id {block_id}"
);
match policy {
BlockDuplicationPolicy::Allow => {
debug_assert!(matches!(
inner.slots[block_id].state,
SlotState::Staged { .. }
));
let inner_arc = ImmutableBlockInner::new_duplicate(
self.clone(),
block_id,
seq_hash,
handle.clone(),
existing_primary,
);
inner.slots[block_id].state = SlotState::Duplicate {
seq_hash,
handle: handle.clone(),
inner: Arc::downgrade(&inner_arc),
};
self.metrics.inc_duplicate_blocks();
presence_added = true;
inner_arc
}
BlockDuplicationPolicy::Reject => {
self.metrics.inc_registration_dedup();
block.rearm();
existing_primary
}
}
} else {
debug_assert!(matches!(
inner.slots[block_id].state,
SlotState::Staged { .. }
));
let inner_arc =
ImmutableBlockInner::new_primary(self.clone(), block_id, seq_hash, handle.clone());
inner.slots[block_id].state = SlotState::Primary {
seq_hash,
handle: handle.clone(),
inner: Arc::downgrade(&inner_arc),
};
inner.active_by_hash.insert(seq_hash, block_id);
presence_added = true;
inner_arc
};
drop(inner);
if presence_added {
handle.mark_present::<T>();
}
drop(block);
result
}
fn eager_primary_to_inactive_locked(
&self,
inner: &mut BlockStoreInner<T>,
seq_hash: SequenceHash,
block_id: BlockId,
) {
let slot = &mut inner.slots[block_id];
let handle = match &slot.state {
SlotState::Primary { handle, .. } => handle.clone(),
other => panic!("eager_primary_to_inactive: slot {block_id} was {other:?}"),
};
slot.state = SlotState::Inactive { seq_hash, handle };
inner.inactive.insert(seq_hash, block_id);
inner.active_by_hash.remove(&seq_hash);
self.metrics.inc_inactive_pool_size();
self.metrics.inc_eager_primary_to_inactive();
tracing::trace!(
?seq_hash,
block_id,
"Eager Primary → Inactive (lookup-driven)"
);
}
fn promote_inactive(
self: &Arc<Self>,
hashes: &[SequenceHash],
touch: bool,
scan: bool,
) -> Vec<(SequenceHash, Arc<ImmutableBlockInner<T>>)> {
let mut inner = self.inner.lock();
let matched: Vec<(SequenceHash, BlockId)> = if scan {
inner.inactive.scan_matches(hashes, touch)
} else {
let Some((&first_hash, rest)) = hashes.split_first() else {
return Vec::new();
};
let Some(first_pair) = inner.inactive.find_match(first_hash, touch) else {
return Vec::new();
};
let mut matched = Vec::with_capacity(hashes.len());
matched.push(first_pair);
if !rest.is_empty() {
matched.extend(inner.inactive.find_matches(rest, touch));
}
matched
};
self.metrics.dec_inactive_pool_size_by(matched.len() as i64);
matched
.into_iter()
.map(|(seq_hash, block_id)| {
let handle = take_inactive_handle(&mut inner.slots[block_id], block_id);
let inner_arc = ImmutableBlockInner::new_primary(
self.clone(),
block_id,
seq_hash,
handle.clone(),
);
inner.slots[block_id].state = SlotState::Primary {
seq_hash,
handle,
inner: Arc::downgrade(&inner_arc),
};
inner.active_by_hash.insert(seq_hash, block_id);
(seq_hash, inner_arc)
})
.collect()
}
pub(crate) fn release_mutable(&self, block_id: BlockId) {
let mut inner = self.inner.lock();
debug_assert!(matches!(inner.slots[block_id].state, SlotState::Mutable));
inner.slots[block_id].state = SlotState::Reset;
inner.free.push_back(block_id);
self.metrics.inc_reset_pool_size();
self.metrics.dec_inflight_mutable();
}
pub(crate) fn transition_to_staged(&self, block_id: BlockId, seq_hash: SequenceHash) {
let mut inner = self.inner.lock();
debug_assert!(matches!(inner.slots[block_id].state, SlotState::Mutable));
inner.slots[block_id].state = SlotState::Staged { seq_hash };
self.metrics.dec_inflight_mutable();
self.metrics.inc_stagings();
}
pub(crate) fn transition_back_to_mutable(&self, block_id: BlockId) {
let mut inner = self.inner.lock();
debug_assert!(matches!(
inner.slots[block_id].state,
SlotState::Staged { .. }
));
inner.slots[block_id].state = SlotState::Mutable;
inner.reset_on_release[block_id] = self.default_reset_on_release;
self.metrics.inc_inflight_mutable();
}
pub(crate) fn release_staged(&self, block_id: BlockId) {
let mut inner = self.inner.lock();
debug_assert!(matches!(
inner.slots[block_id].state,
SlotState::Staged { .. }
));
inner.slots[block_id].state = SlotState::Reset;
inner.free.push_back(block_id);
self.metrics.inc_reset_pool_size();
}
pub(crate) fn release_primary(&self, block_id: BlockId, self_ptr: *const ()) {
#[cfg(test)]
self.release_primary_arrivals
.fetch_add(1, std::sync::atomic::Ordering::Release);
#[cfg(test)]
let _gate = self.release_primary_gate.lock();
let handle_to_mark_absent = {
let mut inner = self.inner.lock();
let (seq_hash, handle) = match &inner.slots[block_id].state {
SlotState::Primary {
seq_hash,
handle,
inner: weak,
} if weak.as_ptr() as *const () == self_ptr => (*seq_hash, handle.clone()),
_ => {
self.metrics.inc_release_primary_noop();
return;
}
};
let reset_on_release = inner.reset_on_release[block_id];
if reset_on_release {
self.reset_slot_locked(&mut inner, block_id);
inner.active_by_hash.remove(&seq_hash);
tracing::trace!(?seq_hash, block_id, "Primary released to reset pool");
Some(handle)
} else {
inner.slots[block_id].state = SlotState::Inactive { seq_hash, handle };
inner.inactive.insert(seq_hash, block_id);
inner.active_by_hash.remove(&seq_hash);
self.metrics.inc_inactive_pool_size();
tracing::trace!(?seq_hash, block_id, "Block stored in inactive pool");
None
}
};
if let Some(handle) = handle_to_mark_absent {
handle.mark_absent::<T>();
}
}
pub(crate) fn release_duplicate(&self, block_id: BlockId, self_ptr: *const ()) {
let handle = {
let mut inner = self.inner.lock();
let handle = match &inner.slots[block_id].state {
SlotState::Duplicate {
handle,
inner: weak,
..
} if weak.as_ptr() as *const () == self_ptr => handle.clone(),
_ => {
self.metrics.inc_release_duplicate_noop();
return;
}
};
self.reset_slot_locked(&mut inner, block_id);
handle
};
handle.mark_absent::<T>();
}
fn reset_slot_locked(&self, inner: &mut BlockStoreInner<T>, block_id: BlockId) {
inner.slots[block_id].state = SlotState::Reset;
inner.free.push_back(block_id);
self.metrics.inc_reset_pool_size();
}
}
impl<T: BlockMetadata> std::fmt::Debug for BlockStore<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockStore")
.field("block_size", &self.block_size)
.field("total_blocks", &self.total_blocks)
.finish()
}
}
fn take_inactive_handle<T: BlockMetadata>(
slot: &mut BlockSlot<T>,
block_id: BlockId,
) -> BlockRegistrationHandle {
match &slot.state {
SlotState::Inactive { handle, .. } => handle.clone(),
other => panic!("expected Inactive state for slot {block_id}, got {other:?}"),
}
}
pub(crate) fn upgrade_or_resurrect<T: BlockMetadata + Sync>(
handle: &BlockRegistrationHandle,
store: &Arc<BlockStore<T>>,
touch: bool,
) -> Option<Arc<ImmutableBlockInner<T>>> {
store.acquire_for_hash(handle.seq_hash(), touch)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pools::IdBuildHasher;
fn sample_keys() -> Vec<SequenceHash> {
vec![
SequenceHash::new(0x1234, None, 0),
SequenceHash::new(0x1234, Some(0x1234), 1),
SequenceHash::new(0x5678, Some(0x1234), 2),
SequenceHash::new(0xdead_beef, Some(0x5678), 3),
SequenceHash::new(0xffff_ffff_ffff_ffff, Some(0xdead_beef), 255),
]
}
#[test]
fn seq_hash_map_round_trips_keys() {
let keys = sample_keys();
let mut map: SeqHashMap<u32> = SeqHashMap::default();
for (i, &k) in keys.iter().enumerate() {
map.insert(k, i as u32);
}
assert_eq!(map.len(), keys.len(), "no key collisions / overwrites");
for (i, &k) in keys.iter().enumerate() {
assert_eq!(map.get(&k).copied(), Some(i as u32), "round-trip key {i}");
}
map.insert(keys[0], 999);
assert_eq!(map.get(&keys[0]).copied(), Some(999));
assert_eq!(map.remove(&keys[1]), Some(1));
assert!(!map.contains_key(&keys[1]));
}
#[test]
fn id_hasher_distinguishes_distinct_keys() {
use std::collections::HashSet;
use std::hash::BuildHasher;
let digests: HashSet<u64> = sample_keys()
.iter()
.map(|k| IdBuildHasher.hash_one(k))
.collect();
assert_eq!(
digests.len(),
5,
"distinct keys must produce distinct digests"
);
}
}