use std::num::NonZeroUsize;
use std::sync::Arc;
use parking_lot::Mutex;
use crate::metrics::{BlockPoolMetrics, MetricsAggregator, short_type_name};
use crate::{BlockId, pools::backends::LineageBackend, tinylfu::TinyLFUTracker};
use crate::{
blocks::{Block, BlockMetadata, state::Reset},
pools::{
ActivePool, BlockDuplicationPolicy, InactivePool, InactivePoolBackend, ResetPool,
ReusePolicy, SequenceHash,
backends::{HashMapBackend, LruBackend, MultiLruBackend},
},
registry::BlockRegistry,
};
use super::BlockManager;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum FrequencyTrackingCapacity {
Small,
#[default]
Medium,
Large,
}
impl FrequencyTrackingCapacity {
pub fn size(&self) -> usize {
match self {
Self::Small => 1 << 18,
Self::Medium => 1 << 21,
Self::Large => 1 << 24,
}
}
pub fn create_tracker(&self) -> Arc<TinyLFUTracker<u128>> {
Arc::new(TinyLFUTracker::new(self.size()))
}
}
pub enum InactiveBackendConfig {
HashMap { reuse_policy: Box<dyn ReusePolicy> },
Lru,
MultiLru {
frequency_thresholds: [u8; 3],
},
Lineage,
}
#[derive(Debug, thiserror::Error)]
pub enum BlockManagerBuilderError {
#[error("Block count must be greater than 0")]
InvalidBlockCount,
#[error("Block size mismatch: expected {expected} tokens, got {actual}")]
BlockSizeMismatch { expected: usize, actual: usize },
#[error("Invalid backend configuration: {0}")]
InvalidBackend(String),
#[error("Builder validation failed: {0}")]
ValidationError(String),
}
#[derive(Debug, thiserror::Error)]
pub enum BlockManagerResetError {
#[error("Reset pool count mismatch: expected {expected}, got {actual}")]
BlockCountMismatch { expected: usize, actual: usize },
}
pub struct BlockManagerConfigBuilder<T: BlockMetadata> {
block_count: Option<usize>,
block_size: Option<usize>,
registry: Option<BlockRegistry>,
inactive_backend: Option<InactiveBackendConfig>,
duplication_policy: Option<BlockDuplicationPolicy>,
aggregator: Option<MetricsAggregator>,
_phantom: std::marker::PhantomData<T>,
}
impl<T: BlockMetadata> Default for BlockManagerConfigBuilder<T> {
fn default() -> Self {
Self {
block_count: None,
block_size: Some(16), registry: None,
inactive_backend: None,
duplication_policy: None,
aggregator: None,
_phantom: std::marker::PhantomData,
}
}
}
impl<T: BlockMetadata> BlockManagerConfigBuilder<T> {
pub fn new() -> Self {
Self::default()
}
pub fn block_count(mut self, count: usize) -> Self {
self.block_count = Some(count);
self
}
pub fn block_size(mut self, size: usize) -> Self {
assert!(
(1..=1024).contains(&size),
"block_size must be between 1 and 1024, got {}",
size
);
assert!(
size.is_power_of_two(),
"block_size must be a power of 2, got {}",
size
);
self.block_size = Some(size);
self
}
pub fn duplication_policy(mut self, policy: BlockDuplicationPolicy) -> Self {
self.duplication_policy = Some(policy);
self
}
pub fn registry(mut self, registry: BlockRegistry) -> Self {
self.registry = Some(registry);
self
}
pub fn with_lru_backend(mut self) -> Self {
self.inactive_backend = Some(InactiveBackendConfig::Lru);
self
}
pub fn with_multi_lru_backend(mut self) -> Self {
self.inactive_backend = Some(InactiveBackendConfig::MultiLru {
frequency_thresholds: [3, 8, 15],
});
self
}
pub fn with_multi_lru_backend_custom_thresholds(
mut self,
cold_to_warm: u8,
warm_to_hot: u8,
hot_to_very_hot: u8,
) -> Self {
assert!(
cold_to_warm < warm_to_hot && warm_to_hot < hot_to_very_hot,
"Thresholds must be in ascending order: {} < {} < {} failed",
cold_to_warm,
warm_to_hot,
hot_to_very_hot
);
assert!(
hot_to_very_hot <= 15,
"hot_to_very_hot threshold ({}) must be <= 15 (4-bit counter maximum)",
hot_to_very_hot
);
assert!(
cold_to_warm >= 1,
"cold_to_warm threshold must be >= 1 to distinguish from zero-access blocks"
);
self.inactive_backend = Some(InactiveBackendConfig::MultiLru {
frequency_thresholds: [cold_to_warm, warm_to_hot, hot_to_very_hot],
});
self
}
pub fn with_hashmap_backend(mut self, reuse_policy: Box<dyn ReusePolicy>) -> Self {
self.inactive_backend = Some(InactiveBackendConfig::HashMap { reuse_policy });
self
}
pub fn with_lineage_backend(mut self) -> Self {
self.inactive_backend = Some(InactiveBackendConfig::Lineage);
self
}
pub fn aggregator(mut self, aggregator: MetricsAggregator) -> Self {
self.aggregator = Some(aggregator);
self
}
fn validate(&self) -> Result<(), String> {
let registry = self.registry.as_ref().ok_or("registry is required")?;
let block_count = self.block_count.ok_or("block_count is required")?;
if block_count == 0 {
return Err("block_count must be greater than 0".to_string());
}
let block_size = self.block_size.unwrap_or(16);
if !block_size.is_power_of_two() || !(1..=1024).contains(&block_size) {
return Err(format!(
"Invalid block_size {}: must be a power of 2 between 1 and 1024",
block_size
));
}
if let Some(InactiveBackendConfig::MultiLru {
frequency_thresholds,
}) = &self.inactive_backend
{
let [t1, t2, t3] = frequency_thresholds;
if !(*t1 < *t2 && *t2 < *t3) {
return Err(format!(
"Invalid thresholds [{}, {}, {}]: must be in ascending order",
t1, t2, t3
));
}
if *t3 > 15 {
return Err(format!(
"Invalid threshold {}: maximum frequency is 15 (4-bit counter)",
t3
));
}
if !registry.has_frequency_tracking() {
return Err(
"MultiLRU backend requires a registry with frequency tracking".to_string(),
);
}
}
Ok(())
}
pub fn build(mut self) -> Result<BlockManager<T>, BlockManagerBuilderError> {
self.validate()
.map_err(BlockManagerBuilderError::ValidationError)?;
let block_count = self.block_count.unwrap();
let block_size = self.block_size.unwrap_or(16);
let registry = self.registry.unwrap();
let metrics = Arc::new(BlockPoolMetrics::new(short_type_name::<T>()));
let blocks: Vec<Block<T, Reset>> = (0..block_count as BlockId)
.map(|id| Block::new(id, block_size))
.collect();
let reset_pool = ResetPool::new(blocks, block_size, Some(metrics.clone()));
metrics.set_reset_pool_size(block_count as i64);
let backend: Box<dyn InactivePoolBackend<T>> = match self.inactive_backend.take() {
Some(InactiveBackendConfig::HashMap { reuse_policy }) => {
tracing::info!("Using HashMap for inactive pool");
Box::new(HashMapBackend::new(reuse_policy))
}
Some(InactiveBackendConfig::Lru) => {
let capacity = NonZeroUsize::new(block_count).expect("block_count must be > 0");
tracing::info!("Using LRU for inactive pool");
Box::new(LruBackend::new(capacity))
}
Some(InactiveBackendConfig::MultiLru {
frequency_thresholds,
}) => {
let frequency_tracker = registry.frequency_tracker().ok_or_else(|| {
BlockManagerBuilderError::InvalidBackend(
"MultiLRU backend requires a registry with frequency tracking".to_string(),
)
})?;
let level_capacity =
NonZeroUsize::new(block_count).expect("block_count must be > 0");
tracing::info!(
"Using MultiLRU inactive backend with thresholds: {:?}",
frequency_thresholds
);
Box::new(
MultiLruBackend::new_with_thresholds(
level_capacity,
&frequency_thresholds,
frequency_tracker,
)
.map_err(|e| BlockManagerBuilderError::InvalidBackend(e.to_string()))?,
)
}
Some(InactiveBackendConfig::Lineage) => {
tracing::info!("Using Lineage inactive backend");
Box::new(LineageBackend::default())
}
None => {
tracing::info!("Using default inactive backend: Lineage");
Box::new(LineageBackend::default())
}
};
let inactive_pool = InactivePool::new(backend, &reset_pool, Some(metrics.clone()));
let active_pool = ActivePool::new(registry.clone(), inactive_pool.return_fn());
let registry_clone = registry.clone();
let inactive_pool_clone = inactive_pool.clone();
let return_fn_clone = inactive_pool.return_fn();
let upgrade_fn = Arc::new(
move |seq_hash: SequenceHash| -> Option<Arc<dyn crate::blocks::RegisteredBlock<T>>> {
if let Some(handle) = registry_clone.match_sequence_hash(seq_hash, false)
&& let Some(block) = handle.try_get_block::<T>(return_fn_clone.clone())
{
return Some(block);
}
if let Some(block) = inactive_pool_clone
.find_blocks(&[seq_hash], false)
.into_iter()
.next()
{
return Some(block);
}
None
},
);
if let Some(ref aggregator) = self.aggregator {
aggregator.register_source(metrics.clone());
}
Ok(BlockManager {
reset_pool,
active_pool,
inactive_pool,
block_registry: registry,
duplication_policy: self
.duplication_policy
.unwrap_or(BlockDuplicationPolicy::Allow),
upgrade_fn,
allocate_mutex: Mutex::new(()),
total_blocks: block_count,
block_size,
metrics,
})
}
}