mod builder;
#[cfg(test)]
mod tests;
pub use builder::{
BlockManagerBuilderError, BlockManagerConfigBuilder, BlockManagerResetError,
FrequencyTrackingCapacity, InactiveBackendConfig, LineageEviction,
};
use std::collections::HashMap;
use std::sync::Arc;
use crate::blocks::{BlockMetadata, CompleteBlock, ImmutableBlock, MutableBlock};
use crate::metrics::BlockPoolMetrics;
use crate::pools::{BlockDuplicationPolicy, BlockStore, SequenceHash};
use crate::registry::BlockRegistry;
pub struct BlockManager<T: BlockMetadata> {
pub(crate) store: Arc<BlockStore<T>>,
pub(crate) block_registry: BlockRegistry,
pub(crate) duplication_policy: BlockDuplicationPolicy,
pub(crate) total_blocks: usize,
pub(crate) block_size: usize,
pub(crate) metrics: Arc<BlockPoolMetrics>,
}
impl<T: BlockMetadata + Sync> BlockManager<T> {
pub fn builder() -> BlockManagerConfigBuilder<T> {
BlockManagerConfigBuilder::default()
}
pub fn id(&self) -> crate::ManagerId {
self.store.id()
}
pub fn allocate_blocks(&self, count: usize) -> Option<Vec<MutableBlock<T>>> {
self.allocate_blocks_with_evictions(count)
.map(|(blocks, _evicted)| blocks)
}
pub fn allocate_blocks_with_evictions(
&self,
count: usize,
) -> Option<(Vec<MutableBlock<T>>, Vec<SequenceHash>)> {
self.store.allocate_atomic(count)
}
pub fn reset_inactive_pool(&self) -> Result<(), BlockManagerResetError> {
let blocks = self.store.drain_inactive_to_mutable();
drop(blocks);
let reset_count = self.store.reset_len();
if reset_count != self.total_blocks {
return Err(BlockManagerResetError::BlockCountMismatch {
expected: self.total_blocks,
actual: reset_count,
});
}
Ok(())
}
pub fn register_blocks(&self, blocks: Vec<CompleteBlock<T>>) -> Vec<ImmutableBlock<T>> {
blocks
.into_iter()
.map(|block| self.register_block(block))
.collect()
}
pub fn register_block(&self, block: CompleteBlock<T>) -> ImmutableBlock<T> {
self.metrics.inc_registrations();
let handle = self
.block_registry
.register_sequence_hash(block.sequence_hash());
let inner = handle.register_block(block, self.duplication_policy, &self.store);
ImmutableBlock::from_inner(inner)
}
pub fn match_blocks(&self, seq_hash: &[SequenceHash]) -> Vec<ImmutableBlock<T>> {
self.metrics
.inc_match_hashes_requested(seq_hash.len() as u64);
if seq_hash.is_empty() {
self.metrics.inc_match_blocks_returned(0);
return Vec::new();
}
let inners = self.store.match_prefix_locked_batch(seq_hash);
if self.block_registry.has_frequency_tracking() {
for inner in &inners {
self.block_registry.touch(inner.sequence_hash());
}
}
let matched: Vec<ImmutableBlock<T>> =
inners.into_iter().map(ImmutableBlock::from_inner).collect();
self.metrics.inc_match_blocks_returned(matched.len() as u64);
tracing::debug!(
num_hashes = seq_hash.len(),
total_matched = matched.len(),
"match_blocks result"
);
tracing::trace!(matched = ?matched, "matched blocks");
matched
}
pub fn scan_matches(
&self,
seq_hashes: &[SequenceHash],
touch: bool,
) -> HashMap<SequenceHash, ImmutableBlock<T>> {
self.metrics
.inc_scan_hashes_requested(seq_hashes.len() as u64);
let mut result = HashMap::new();
let active_found = self.scan_active_matches(seq_hashes, touch);
for (hash, inner) in active_found {
result.insert(hash, ImmutableBlock::from_inner(inner));
}
let remaining: Vec<SequenceHash> = seq_hashes
.iter()
.filter(|h| !result.contains_key(h))
.copied()
.collect();
if !remaining.is_empty() {
let inactive_found = self.store.scan_inactive_primaries(&remaining, touch);
for (hash, inner) in inactive_found {
result.insert(hash, ImmutableBlock::from_inner(inner));
}
}
self.metrics.inc_scan_blocks_returned(result.len() as u64);
result
}
fn scan_active_matches(
&self,
hashes: &[SequenceHash],
touch: bool,
) -> Vec<(SequenceHash, Arc<crate::blocks::ImmutableBlockInner<T>>)> {
hashes
.iter()
.filter_map(|hash| {
self.block_registry
.match_sequence_hash(*hash, touch)
.and_then(|handle| {
handle
.try_get_inner::<T>(&self.store, touch)
.map(|inner| (*hash, inner))
})
})
.collect()
}
pub fn total_blocks(&self) -> usize {
self.total_blocks
}
pub fn available_blocks(&self) -> usize {
self.store.available_len()
}
pub fn block_size(&self) -> usize {
self.block_size
}
pub fn duplication_policy(&self) -> &BlockDuplicationPolicy {
&self.duplication_policy
}
pub fn block_registry(&self) -> &BlockRegistry {
&self.block_registry
}
pub fn metrics(&self) -> &Arc<BlockPoolMetrics> {
&self.metrics
}
#[cfg(test)]
pub(crate) fn store_for_test(&self) -> &Arc<BlockStore<T>> {
&self.store
}
}