1use std::num::NonZeroUsize;
7use std::sync::Arc;
8
9use parking_lot::Mutex;
10
11use crate::metrics::{BlockPoolMetrics, MetricsAggregator, short_type_name};
12use crate::{BlockId, pools::backends::LineageBackend, tinylfu::TinyLFUTracker};
13
14use crate::{
15 blocks::{Block, BlockMetadata, state::Reset},
16 pools::{
17 ActivePool, BlockDuplicationPolicy, InactivePool, InactivePoolBackend, ResetPool,
18 ReusePolicy, SequenceHash,
19 backends::{HashMapBackend, LruBackend, MultiLruBackend},
20 },
21 registry::BlockRegistry,
22};
23
24use super::BlockManager;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
29pub enum FrequencyTrackingCapacity {
30 Small,
32 #[default]
34 Medium,
35 Large,
37}
38
39impl FrequencyTrackingCapacity {
40 pub fn size(&self) -> usize {
42 match self {
43 Self::Small => 1 << 18,
44 Self::Medium => 1 << 21,
45 Self::Large => 1 << 24,
46 }
47 }
48
49 pub fn create_tracker(&self) -> Arc<TinyLFUTracker<u128>> {
51 Arc::new(TinyLFUTracker::new(self.size()))
52 }
53}
54
55pub enum InactiveBackendConfig {
57 HashMap { reuse_policy: Box<dyn ReusePolicy> },
59 Lru,
61 MultiLru {
63 frequency_thresholds: [u8; 3],
66 },
67 Lineage,
69}
70
71#[derive(Debug, thiserror::Error)]
73pub enum BlockManagerBuilderError {
74 #[error("Block count must be greater than 0")]
75 InvalidBlockCount,
76 #[error("Block size mismatch: expected {expected} tokens, got {actual}")]
77 BlockSizeMismatch { expected: usize, actual: usize },
78 #[error("Invalid backend configuration: {0}")]
79 InvalidBackend(String),
80 #[error("Builder validation failed: {0}")]
81 ValidationError(String),
82}
83
84#[derive(Debug, thiserror::Error)]
86pub enum BlockManagerResetError {
87 #[error("Reset pool count mismatch: expected {expected}, got {actual}")]
88 BlockCountMismatch { expected: usize, actual: usize },
89}
90
91pub struct BlockManagerConfigBuilder<T: BlockMetadata> {
95 block_count: Option<usize>,
97
98 block_size: Option<usize>,
101
102 registry: Option<BlockRegistry>,
104
105 inactive_backend: Option<InactiveBackendConfig>,
107
108 duplication_policy: Option<BlockDuplicationPolicy>,
110
111 aggregator: Option<MetricsAggregator>,
113
114 _phantom: std::marker::PhantomData<T>,
116}
117
118impl<T: BlockMetadata> Default for BlockManagerConfigBuilder<T> {
119 fn default() -> Self {
120 Self {
121 block_count: None,
122 block_size: Some(16), registry: None,
124 inactive_backend: None,
125 duplication_policy: None,
126 aggregator: None,
127 _phantom: std::marker::PhantomData,
128 }
129 }
130}
131
132impl<T: BlockMetadata> BlockManagerConfigBuilder<T> {
133 pub fn new() -> Self {
135 Self::default()
136 }
137
138 pub fn block_count(mut self, count: usize) -> Self {
140 self.block_count = Some(count);
141 self
142 }
143
144 pub fn block_size(mut self, size: usize) -> Self {
153 assert!(
154 (1..=1024).contains(&size),
155 "block_size must be between 1 and 1024, got {}",
156 size
157 );
158 assert!(
159 size.is_power_of_two(),
160 "block_size must be a power of 2, got {}",
161 size
162 );
163 self.block_size = Some(size);
164 self
165 }
166
167 pub fn duplication_policy(mut self, policy: BlockDuplicationPolicy) -> Self {
169 self.duplication_policy = Some(policy);
170 self
171 }
172
173 pub fn registry(mut self, registry: BlockRegistry) -> Self {
175 self.registry = Some(registry);
176 self
177 }
178
179 pub fn with_lru_backend(mut self) -> Self {
181 self.inactive_backend = Some(InactiveBackendConfig::Lru);
182 self
183 }
184
185 pub fn with_multi_lru_backend(mut self) -> Self {
190 self.inactive_backend = Some(InactiveBackendConfig::MultiLru {
191 frequency_thresholds: [3, 8, 15],
192 });
193 self
194 }
195
196 pub fn with_multi_lru_backend_custom_thresholds(
211 mut self,
212 cold_to_warm: u8,
213 warm_to_hot: u8,
214 hot_to_very_hot: u8,
215 ) -> Self {
216 assert!(
218 cold_to_warm < warm_to_hot && warm_to_hot < hot_to_very_hot,
219 "Thresholds must be in ascending order: {} < {} < {} failed",
220 cold_to_warm,
221 warm_to_hot,
222 hot_to_very_hot
223 );
224
225 assert!(
227 hot_to_very_hot <= 15,
228 "hot_to_very_hot threshold ({}) must be <= 15 (4-bit counter maximum)",
229 hot_to_very_hot
230 );
231
232 assert!(
234 cold_to_warm >= 1,
235 "cold_to_warm threshold must be >= 1 to distinguish from zero-access blocks"
236 );
237
238 self.inactive_backend = Some(InactiveBackendConfig::MultiLru {
239 frequency_thresholds: [cold_to_warm, warm_to_hot, hot_to_very_hot],
240 });
241 self
242 }
243
244 pub fn with_hashmap_backend(mut self, reuse_policy: Box<dyn ReusePolicy>) -> Self {
246 self.inactive_backend = Some(InactiveBackendConfig::HashMap { reuse_policy });
247 self
248 }
249
250 pub fn with_lineage_backend(mut self) -> Self {
252 self.inactive_backend = Some(InactiveBackendConfig::Lineage);
253 self
254 }
255
256 pub fn aggregator(mut self, aggregator: MetricsAggregator) -> Self {
260 self.aggregator = Some(aggregator);
261 self
262 }
263
264 fn validate(&self) -> Result<(), String> {
266 let registry = self.registry.as_ref().ok_or("registry is required")?;
267
268 let block_count = self.block_count.ok_or("block_count is required")?;
269
270 if block_count == 0 {
271 return Err("block_count must be greater than 0".to_string());
272 }
273
274 let block_size = self.block_size.unwrap_or(16);
276 if !block_size.is_power_of_two() || !(1..=1024).contains(&block_size) {
277 return Err(format!(
278 "Invalid block_size {}: must be a power of 2 between 1 and 1024",
279 block_size
280 ));
281 }
282
283 if let Some(InactiveBackendConfig::MultiLru {
285 frequency_thresholds,
286 }) = &self.inactive_backend
287 {
288 let [t1, t2, t3] = frequency_thresholds;
289 if !(*t1 < *t2 && *t2 < *t3) {
290 return Err(format!(
291 "Invalid thresholds [{}, {}, {}]: must be in ascending order",
292 t1, t2, t3
293 ));
294 }
295 if *t3 > 15 {
296 return Err(format!(
297 "Invalid threshold {}: maximum frequency is 15 (4-bit counter)",
298 t3
299 ));
300 }
301
302 if !registry.has_frequency_tracking() {
304 return Err(
305 "MultiLRU backend requires a registry with frequency tracking".to_string(),
306 );
307 }
308 }
309
310 Ok(())
311 }
312
313 pub fn build(mut self) -> Result<BlockManager<T>, BlockManagerBuilderError> {
319 self.validate()
321 .map_err(BlockManagerBuilderError::ValidationError)?;
322
323 let block_count = self.block_count.unwrap();
324 let block_size = self.block_size.unwrap_or(16);
325
326 let registry = self.registry.unwrap();
328
329 let metrics = Arc::new(BlockPoolMetrics::new(short_type_name::<T>()));
331
332 let blocks: Vec<Block<T, Reset>> = (0..block_count as BlockId)
334 .map(|id| Block::new(id, block_size))
335 .collect();
336 let reset_pool = ResetPool::new(blocks, block_size, Some(metrics.clone()));
337 metrics.set_reset_pool_size(block_count as i64);
338
339 let backend: Box<dyn InactivePoolBackend<T>> = match self.inactive_backend.take() {
341 Some(InactiveBackendConfig::HashMap { reuse_policy }) => {
342 tracing::info!("Using HashMap for inactive pool");
343 Box::new(HashMapBackend::new(reuse_policy))
344 }
345 Some(InactiveBackendConfig::Lru) => {
346 let capacity = NonZeroUsize::new(block_count).expect("block_count must be > 0");
348 tracing::info!("Using LRU for inactive pool");
349 Box::new(LruBackend::new(capacity))
350 }
351 Some(InactiveBackendConfig::MultiLru {
352 frequency_thresholds,
353 }) => {
354 let frequency_tracker = registry.frequency_tracker().ok_or_else(|| {
356 BlockManagerBuilderError::InvalidBackend(
357 "MultiLRU backend requires a registry with frequency tracking".to_string(),
358 )
359 })?;
360
361 let level_capacity =
364 NonZeroUsize::new(block_count).expect("block_count must be > 0");
365
366 tracing::info!(
367 "Using MultiLRU inactive backend with thresholds: {:?}",
368 frequency_thresholds
369 );
370 Box::new(
371 MultiLruBackend::new_with_thresholds(
372 level_capacity,
373 &frequency_thresholds,
374 frequency_tracker,
375 )
376 .map_err(|e| BlockManagerBuilderError::InvalidBackend(e.to_string()))?,
377 )
378 }
379 Some(InactiveBackendConfig::Lineage) => {
380 tracing::info!("Using Lineage inactive backend");
381 Box::new(LineageBackend::default())
382 }
383 None => {
384 tracing::info!("Using default inactive backend: Lineage");
385 Box::new(LineageBackend::default())
386 }
387 };
388
389 let inactive_pool = InactivePool::new(backend, &reset_pool, Some(metrics.clone()));
391 let active_pool = ActivePool::new(registry.clone(), inactive_pool.return_fn());
392
393 let registry_clone = registry.clone();
395 let inactive_pool_clone = inactive_pool.clone();
396 let return_fn_clone = inactive_pool.return_fn();
397 let upgrade_fn = Arc::new(
398 move |seq_hash: SequenceHash| -> Option<Arc<dyn crate::blocks::RegisteredBlock<T>>> {
399 if let Some(handle) = registry_clone.match_sequence_hash(seq_hash, false)
401 && let Some(block) = handle.try_get_block::<T>(return_fn_clone.clone())
402 {
403 return Some(block);
404 }
405 if let Some(block) = inactive_pool_clone
407 .find_blocks(&[seq_hash], false)
408 .into_iter()
409 .next()
410 {
411 return Some(block);
412 }
413 None
414 },
415 );
416
417 if let Some(ref aggregator) = self.aggregator {
419 aggregator.register_source(metrics.clone());
420 }
421
422 Ok(BlockManager {
423 reset_pool,
424 active_pool,
425 inactive_pool,
426 block_registry: registry,
427 duplication_policy: self
428 .duplication_policy
429 .unwrap_or(BlockDuplicationPolicy::Allow),
430 upgrade_fn,
431 allocate_mutex: Mutex::new(()),
432 total_blocks: block_count,
433 block_size,
434 metrics,
435 })
436 }
437}