mod builder;
#[cfg(test)]
mod tests;
pub use builder::{
BlockManagerBuilderError, BlockManagerConfigBuilder, BlockManagerResetError,
FrequencyTrackingCapacity, InactiveBackendConfig,
};
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::Mutex;
use crate::blocks::{BlockMetadata, CompleteBlock, ImmutableBlock, MutableBlock, UpgradeFn};
use crate::metrics::BlockPoolMetrics;
use crate::pools::{ActivePool, BlockDuplicationPolicy, InactivePool, ResetPool, SequenceHash};
use crate::registry::BlockRegistry;
pub struct BlockManager<T: BlockMetadata> {
reset_pool: ResetPool<T>,
active_pool: ActivePool<T>,
inactive_pool: InactivePool<T>,
block_registry: BlockRegistry,
duplication_policy: BlockDuplicationPolicy,
upgrade_fn: UpgradeFn<T>,
allocate_mutex: Mutex<()>,
total_blocks: usize,
block_size: usize,
metrics: Arc<BlockPoolMetrics>,
}
impl<T: BlockMetadata> BlockManager<T> {
pub fn builder() -> BlockManagerConfigBuilder<T> {
BlockManagerConfigBuilder::default()
}
pub fn allocate_blocks(&self, count: usize) -> Option<Vec<MutableBlock<T>>> {
self.allocate_blocks_with_evictions(count)
.map(|(blocks, _evicted)| blocks)
}
pub fn allocate_blocks_with_evictions(
&self,
count: usize,
) -> Option<(Vec<MutableBlock<T>>, Vec<SequenceHash>)> {
let _guard = self.allocate_mutex.lock();
let from_reset = self.reset_pool.allocate_blocks(count);
let from_reset_count = from_reset.len();
let mut blocks = from_reset;
let remaining_needed = count - blocks.len();
match self.inactive_pool.allocate_blocks(remaining_needed) {
Some((remaining, evicted)) => {
let eviction_count = remaining.len() as u64;
blocks.extend(remaining);
self.metrics.inc_allocations(blocks.len() as u64);
self.metrics
.inc_allocations_from_reset(from_reset_count as u64);
self.metrics.inc_evictions(eviction_count);
Some((blocks, evicted))
}
None => None,
}
}
pub fn reset_inactive_pool(&self) -> Result<(), BlockManagerResetError> {
let blocks = self.inactive_pool.allocate_all_blocks();
drop(blocks);
let reset_count = self.reset_pool.len();
if reset_count != self.total_blocks {
return Err(BlockManagerResetError::BlockCountMismatch {
expected: self.total_blocks,
actual: reset_count,
});
}
Ok(())
}
pub fn register_blocks(&self, blocks: Vec<CompleteBlock<T>>) -> Vec<ImmutableBlock<T>> {
blocks
.into_iter()
.map(|block| self.register_block(block))
.collect()
}
pub fn register_block(&self, block: CompleteBlock<T>) -> ImmutableBlock<T> {
self.metrics.inc_registrations();
let handle = self
.block_registry
.register_sequence_hash(block.sequence_hash());
let registered_block = handle.register_block(
block,
self.duplication_policy,
&self.inactive_pool,
Some(self.metrics.as_ref()),
);
ImmutableBlock::new(
registered_block,
self.upgrade_fn.clone(),
Some(self.metrics.clone()),
)
}
pub fn match_blocks(&self, seq_hash: &[SequenceHash]) -> Vec<ImmutableBlock<T>> {
self.metrics
.inc_match_hashes_requested(seq_hash.len() as u64);
tracing::debug!(
num_hashes = seq_hash.len(),
inactive_pool_len = self.inactive_pool.len(),
"match_blocks called"
);
let Some((&first_hash, remaining_after_first)) = seq_hash.split_first() else {
tracing::debug!(total_matched = 0, "match_blocks result");
return Vec::new();
};
let mut matched: Vec<ImmutableBlock<T>>;
let active_matched;
if let Some(first_block) = self.active_pool.find_match(first_hash, true) {
matched = Vec::with_capacity(seq_hash.len());
matched.push(ImmutableBlock::new(
first_block,
self.upgrade_fn.clone(),
Some(self.metrics.clone()),
));
if !remaining_after_first.is_empty() {
matched.extend(
self.active_pool
.find_matches(remaining_after_first, true)
.into_iter()
.map(|block| {
ImmutableBlock::new(
block,
self.upgrade_fn.clone(),
Some(self.metrics.clone()),
)
}),
);
}
active_matched = matched.len();
} else {
let inactive_found = self.inactive_pool.find_blocks(seq_hash, true);
let inactive_matched = inactive_found.len();
tracing::debug!(
remaining_to_check = seq_hash.len(),
inactive_matched,
"Matched from inactive pool"
);
if inactive_found.is_empty() {
self.metrics.inc_match_blocks_returned(0);
tracing::debug!(total_matched = 0, "match_blocks result");
return Vec::new();
}
matched = Vec::with_capacity(seq_hash.len());
matched.extend(inactive_found.into_iter().map(|block| {
ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone()))
}));
active_matched = 0;
}
tracing::debug!(active_matched, "Matched from active pool");
let remaining_hashes = &seq_hash[matched.len()..];
if !remaining_hashes.is_empty() {
let inactive_found: Vec<_> = self.inactive_pool.find_blocks(remaining_hashes, true);
let inactive_matched = inactive_found.len();
tracing::debug!(
remaining_to_check = remaining_hashes.len(),
inactive_matched,
"Matched from inactive pool"
);
matched.extend(inactive_found.into_iter().map(|block| {
ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone()))
}));
}
self.metrics.inc_match_blocks_returned(matched.len() as u64);
tracing::debug!(total_matched = matched.len(), "match_blocks result");
tracing::trace!(matched = ?matched, "matched blocks");
matched
}
pub fn scan_matches(
&self,
seq_hashes: &[SequenceHash],
touch: bool,
) -> HashMap<SequenceHash, ImmutableBlock<T>> {
self.metrics
.inc_scan_hashes_requested(seq_hashes.len() as u64);
let mut result = HashMap::new();
let active_found = self.active_pool.scan_matches(seq_hashes);
for (hash, block) in active_found {
result.insert(
hash,
ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone())),
);
}
let remaining: Vec<SequenceHash> = seq_hashes
.iter()
.filter(|h| !result.contains_key(h))
.copied()
.collect();
if !remaining.is_empty() {
let inactive_found = self.inactive_pool.scan_blocks(&remaining, touch);
for (hash, block) in inactive_found {
result.insert(
hash,
ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone())),
);
}
}
self.metrics.inc_scan_blocks_returned(result.len() as u64);
result
}
pub fn total_blocks(&self) -> usize {
self.total_blocks
}
pub fn available_blocks(&self) -> usize {
self.reset_pool.len() + self.inactive_pool.len()
}
pub fn block_size(&self) -> usize {
self.block_size
}
pub fn duplication_policy(&self) -> &BlockDuplicationPolicy {
&self.duplication_policy
}
pub fn block_registry(&self) -> &BlockRegistry {
&self.block_registry
}
pub fn metrics(&self) -> &Arc<BlockPoolMetrics> {
&self.metrics
}
}