Skip to main content

kvbm_logical/manager/
builder.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Builder and configuration types for [`BlockManager`](super::BlockManager).
5
6use std::num::NonZeroUsize;
7use std::sync::Arc;
8
9use parking_lot::Mutex;
10
11use crate::metrics::{BlockPoolMetrics, MetricsAggregator, short_type_name};
12use crate::{BlockId, pools::backends::LineageBackend, tinylfu::TinyLFUTracker};
13
14use crate::{
15    blocks::{Block, BlockMetadata, state::Reset},
16    pools::{
17        ActivePool, BlockDuplicationPolicy, InactivePool, InactivePoolBackend, ResetPool,
18        ReusePolicy, SequenceHash,
19        backends::{HashMapBackend, LruBackend, MultiLruBackend},
20    },
21    registry::BlockRegistry,
22};
23
24use super::BlockManager;
25
26/// Capacity settings for the TinyLFU frequency tracker used by
27/// [`BlockRegistry`] and the multi-level LRU backend.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
29pub enum FrequencyTrackingCapacity {
30    /// Small capacity: 2^18 (262,144) entries
31    Small,
32    /// Medium capacity: 2^21 (2,097,152) entries - default
33    #[default]
34    Medium,
35    /// Large capacity: 2^24 (16,777,216) entries
36    Large,
37}
38
39impl FrequencyTrackingCapacity {
40    /// Get the size in number of entries.
41    pub fn size(&self) -> usize {
42        match self {
43            Self::Small => 1 << 18,
44            Self::Medium => 1 << 21,
45            Self::Large => 1 << 24,
46        }
47    }
48
49    /// Create a new [`TinyLFUTracker`] with this capacity.
50    pub fn create_tracker(&self) -> Arc<TinyLFUTracker<u128>> {
51        Arc::new(TinyLFUTracker::new(self.size()))
52    }
53}
54
55/// Configuration for the inactive pool backend.
56pub enum InactiveBackendConfig {
57    /// HashMap with configurable reuse policy
58    HashMap { reuse_policy: Box<dyn ReusePolicy> },
59    /// Simple LRU - capacity automatically set to block_count
60    Lru,
61    /// Multi-level LRU with 4 fixed levels - capacity automatically set to block_count
62    MultiLru {
63        /// Frequency thresholds: [cold->warm, warm->hot, hot->very_hot]
64        /// Default: [3, 8, 15]
65        frequency_thresholds: [u8; 3],
66    },
67    /// Lineage backend
68    Lineage,
69}
70
71/// Error types for [`BlockManager`] builder validation.
72#[derive(Debug, thiserror::Error)]
73pub enum BlockManagerBuilderError {
74    #[error("Block count must be greater than 0")]
75    InvalidBlockCount,
76    #[error("Block size mismatch: expected {expected} tokens, got {actual}")]
77    BlockSizeMismatch { expected: usize, actual: usize },
78    #[error("Invalid backend configuration: {0}")]
79    InvalidBackend(String),
80    #[error("Builder validation failed: {0}")]
81    ValidationError(String),
82}
83
84/// Error types for [`BlockManager::reset_inactive_pool`].
85#[derive(Debug, thiserror::Error)]
86pub enum BlockManagerResetError {
87    #[error("Reset pool count mismatch: expected {expected}, got {actual}")]
88    BlockCountMismatch { expected: usize, actual: usize },
89}
90
91/// Builder for [`BlockManager`] configuration.
92///
93/// Construct via [`BlockManager::builder()`] and finish with [`build()`](Self::build).
94pub struct BlockManagerConfigBuilder<T: BlockMetadata> {
95    /// Number of blocks in the pool
96    block_count: Option<usize>,
97
98    /// Size of each block in tokens (must be power of 2, 1-1024)
99    /// Default: 16
100    block_size: Option<usize>,
101
102    /// Block registry for tracking blocks and frequency
103    registry: Option<BlockRegistry>,
104
105    /// Inactive pool backend configuration
106    inactive_backend: Option<InactiveBackendConfig>,
107
108    /// Policy for handling duplicate sequence hashes
109    duplication_policy: Option<BlockDuplicationPolicy>,
110
111    /// Optional metrics aggregator for prometheus export
112    aggregator: Option<MetricsAggregator>,
113
114    /// Phantom data for type parameter
115    _phantom: std::marker::PhantomData<T>,
116}
117
118impl<T: BlockMetadata> Default for BlockManagerConfigBuilder<T> {
119    fn default() -> Self {
120        Self {
121            block_count: None,
122            block_size: Some(16), // Default to 16 tokens per block
123            registry: None,
124            inactive_backend: None,
125            duplication_policy: None,
126            aggregator: None,
127            _phantom: std::marker::PhantomData,
128        }
129    }
130}
131
132impl<T: BlockMetadata> BlockManagerConfigBuilder<T> {
133    /// Create a new builder.
134    pub fn new() -> Self {
135        Self::default()
136    }
137
138    /// Set the number of blocks in the pool.
139    pub fn block_count(mut self, count: usize) -> Self {
140        self.block_count = Some(count);
141        self
142    }
143
144    /// Set the block size (number of tokens per block).
145    ///
146    /// # Requirements
147    /// - Must be >= 1 and <= 1024
148    /// - Must be a power of 2
149    ///
150    /// # Panics
151    /// Panics if the block size doesn't meet requirements.
152    pub fn block_size(mut self, size: usize) -> Self {
153        assert!(
154            (1..=1024).contains(&size),
155            "block_size must be between 1 and 1024, got {}",
156            size
157        );
158        assert!(
159            size.is_power_of_two(),
160            "block_size must be a power of 2, got {}",
161            size
162        );
163        self.block_size = Some(size);
164        self
165    }
166
167    /// Set the duplication policy.
168    pub fn duplication_policy(mut self, policy: BlockDuplicationPolicy) -> Self {
169        self.duplication_policy = Some(policy);
170        self
171    }
172
173    /// Set the block registry.
174    pub fn registry(mut self, registry: BlockRegistry) -> Self {
175        self.registry = Some(registry);
176        self
177    }
178
179    /// Use simple LRU backend (capacity automatically set to block_count).
180    pub fn with_lru_backend(mut self) -> Self {
181        self.inactive_backend = Some(InactiveBackendConfig::Lru);
182        self
183    }
184
185    /// Use multi-level LRU backend with 4 fixed priority levels.
186    ///
187    /// Default thresholds: `[3, 8, 15]` for transitions between:
188    /// Cold (0-2 hits) -> Warm (3-7) -> Hot (8-14) -> Very Hot (15+).
189    pub fn with_multi_lru_backend(mut self) -> Self {
190        self.inactive_backend = Some(InactiveBackendConfig::MultiLru {
191            frequency_thresholds: [3, 8, 15],
192        });
193        self
194    }
195
196    /// Use multi-level LRU with custom frequency thresholds.
197    ///
198    /// # Requirements
199    /// - Thresholds must be in ascending order: cold_to_warm < warm_to_hot < hot_to_very_hot
200    /// - hot_to_very_hot must be <= 15 (4-bit counter maximum)
201    /// - cold_to_warm must be >= 1 (to distinguish from never-accessed blocks)
202    ///
203    /// # Arguments
204    /// * `cold_to_warm` - Minimum frequency to move from Cold to Warm level
205    /// * `warm_to_hot` - Minimum frequency to move from Warm to Hot level
206    /// * `hot_to_very_hot` - Minimum frequency to move from Hot to Very Hot level
207    ///
208    /// # Panics
209    /// Panics if thresholds don't meet the requirements above.
210    pub fn with_multi_lru_backend_custom_thresholds(
211        mut self,
212        cold_to_warm: u8,
213        warm_to_hot: u8,
214        hot_to_very_hot: u8,
215    ) -> Self {
216        // Validate ascending order
217        assert!(
218            cold_to_warm < warm_to_hot && warm_to_hot < hot_to_very_hot,
219            "Thresholds must be in ascending order: {} < {} < {} failed",
220            cold_to_warm,
221            warm_to_hot,
222            hot_to_very_hot
223        );
224
225        // Validate maximum value (4-bit counter limit)
226        assert!(
227            hot_to_very_hot <= 15,
228            "hot_to_very_hot threshold ({}) must be <= 15 (4-bit counter maximum)",
229            hot_to_very_hot
230        );
231
232        // Additional validation: ensure reasonable gaps between levels
233        assert!(
234            cold_to_warm >= 1,
235            "cold_to_warm threshold must be >= 1 to distinguish from zero-access blocks"
236        );
237
238        self.inactive_backend = Some(InactiveBackendConfig::MultiLru {
239            frequency_thresholds: [cold_to_warm, warm_to_hot, hot_to_very_hot],
240        });
241        self
242    }
243
244    /// Use HashMap backend with custom reuse policy.
245    pub fn with_hashmap_backend(mut self, reuse_policy: Box<dyn ReusePolicy>) -> Self {
246        self.inactive_backend = Some(InactiveBackendConfig::HashMap { reuse_policy });
247        self
248    }
249
250    /// Use lineage backend.
251    pub fn with_lineage_backend(mut self) -> Self {
252        self.inactive_backend = Some(InactiveBackendConfig::Lineage);
253        self
254    }
255
256    /// Set a metrics aggregator for prometheus export.
257    ///
258    /// The aggregator will automatically receive this manager's metrics source.
259    pub fn aggregator(mut self, aggregator: MetricsAggregator) -> Self {
260        self.aggregator = Some(aggregator);
261        self
262    }
263
264    /// Validate the configuration.
265    fn validate(&self) -> Result<(), String> {
266        let registry = self.registry.as_ref().ok_or("registry is required")?;
267
268        let block_count = self.block_count.ok_or("block_count is required")?;
269
270        if block_count == 0 {
271            return Err("block_count must be greater than 0".to_string());
272        }
273
274        // Validate block_size
275        let block_size = self.block_size.unwrap_or(16);
276        if !block_size.is_power_of_two() || !(1..=1024).contains(&block_size) {
277            return Err(format!(
278                "Invalid block_size {}: must be a power of 2 between 1 and 1024",
279                block_size
280            ));
281        }
282
283        // Additional validation for MultiLRU thresholds at build time
284        if let Some(InactiveBackendConfig::MultiLru {
285            frequency_thresholds,
286        }) = &self.inactive_backend
287        {
288            let [t1, t2, t3] = frequency_thresholds;
289            if !(*t1 < *t2 && *t2 < *t3) {
290                return Err(format!(
291                    "Invalid thresholds [{}, {}, {}]: must be in ascending order",
292                    t1, t2, t3
293                ));
294            }
295            if *t3 > 15 {
296                return Err(format!(
297                    "Invalid threshold {}: maximum frequency is 15 (4-bit counter)",
298                    t3
299                ));
300            }
301
302            // Validate MultiLRU requires frequency tracking
303            if !registry.has_frequency_tracking() {
304                return Err(
305                    "MultiLRU backend requires a registry with frequency tracking".to_string(),
306                );
307            }
308        }
309
310        Ok(())
311    }
312
313    /// Build the [`BlockManager`].
314    ///
315    /// Validates configuration and constructs all pools, the upgrade closure,
316    /// and the metrics source. Returns an error if validation fails or
317    /// backend construction fails.
318    pub fn build(mut self) -> Result<BlockManager<T>, BlockManagerBuilderError> {
319        // First validate the configuration
320        self.validate()
321            .map_err(BlockManagerBuilderError::ValidationError)?;
322
323        let block_count = self.block_count.unwrap();
324        let block_size = self.block_size.unwrap_or(16);
325
326        // Use provided registry
327        let registry = self.registry.unwrap();
328
329        // Create metrics
330        let metrics = Arc::new(BlockPoolMetrics::new(short_type_name::<T>()));
331
332        // Create reset pool
333        let blocks: Vec<Block<T, Reset>> = (0..block_count as BlockId)
334            .map(|id| Block::new(id, block_size))
335            .collect();
336        let reset_pool = ResetPool::new(blocks, block_size, Some(metrics.clone()));
337        metrics.set_reset_pool_size(block_count as i64);
338
339        // Create backend based on configuration
340        let backend: Box<dyn InactivePoolBackend<T>> = match self.inactive_backend.take() {
341            Some(InactiveBackendConfig::HashMap { reuse_policy }) => {
342                tracing::info!("Using HashMap for inactive pool");
343                Box::new(HashMapBackend::new(reuse_policy))
344            }
345            Some(InactiveBackendConfig::Lru) => {
346                // Capacity automatically set to block_count
347                let capacity = NonZeroUsize::new(block_count).expect("block_count must be > 0");
348                tracing::info!("Using LRU for inactive pool");
349                Box::new(LruBackend::new(capacity))
350            }
351            Some(InactiveBackendConfig::MultiLru {
352                frequency_thresholds,
353            }) => {
354                // Require frequency tracker for MultiLRU
355                let frequency_tracker = registry.frequency_tracker().ok_or_else(|| {
356                    BlockManagerBuilderError::InvalidBackend(
357                        "MultiLRU backend requires a registry with frequency tracking".to_string(),
358                    )
359                })?;
360
361                // Each level needs capacity for all blocks since the frequency
362                // distribution is unpredictable — all blocks could land in one level.
363                let level_capacity =
364                    NonZeroUsize::new(block_count).expect("block_count must be > 0");
365
366                tracing::info!(
367                    "Using MultiLRU inactive backend with thresholds: {:?}",
368                    frequency_thresholds
369                );
370                Box::new(
371                    MultiLruBackend::new_with_thresholds(
372                        level_capacity,
373                        &frequency_thresholds,
374                        frequency_tracker,
375                    )
376                    .map_err(|e| BlockManagerBuilderError::InvalidBackend(e.to_string()))?,
377                )
378            }
379            Some(InactiveBackendConfig::Lineage) => {
380                tracing::info!("Using Lineage inactive backend");
381                Box::new(LineageBackend::default())
382            }
383            None => {
384                tracing::info!("Using default inactive backend: Lineage");
385                Box::new(LineageBackend::default())
386            }
387        };
388
389        // Create pools
390        let inactive_pool = InactivePool::new(backend, &reset_pool, Some(metrics.clone()));
391        let active_pool = ActivePool::new(registry.clone(), inactive_pool.return_fn());
392
393        // Create upgrade function that captures the necessary components
394        let registry_clone = registry.clone();
395        let inactive_pool_clone = inactive_pool.clone();
396        let return_fn_clone = inactive_pool.return_fn();
397        let upgrade_fn = Arc::new(
398            move |seq_hash: SequenceHash| -> Option<Arc<dyn crate::blocks::RegisteredBlock<T>>> {
399                // Try active pool first with touch=false (using registry directly)
400                if let Some(handle) = registry_clone.match_sequence_hash(seq_hash, false)
401                    && let Some(block) = handle.try_get_block::<T>(return_fn_clone.clone())
402                {
403                    return Some(block);
404                }
405                // Then try inactive pool with touch=false
406                if let Some(block) = inactive_pool_clone
407                    .find_blocks(&[seq_hash], false)
408                    .into_iter()
409                    .next()
410                {
411                    return Some(block);
412                }
413                None
414            },
415        );
416
417        // Register with aggregator if provided
418        if let Some(ref aggregator) = self.aggregator {
419            aggregator.register_source(metrics.clone());
420        }
421
422        Ok(BlockManager {
423            reset_pool,
424            active_pool,
425            inactive_pool,
426            block_registry: registry,
427            duplication_policy: self
428                .duplication_policy
429                .unwrap_or(BlockDuplicationPolicy::Allow),
430            upgrade_fn,
431            allocate_mutex: Mutex::new(()),
432            total_blocks: block_count,
433            block_size,
434            metrics,
435        })
436    }
437}