Skip to main content

kvbm_logical/manager/
mod.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Block lifecycle orchestration across reset, active, and inactive pools.
5//!
6//! [`BlockManager`] is the top-level owner of the three pool tiers and the
7//! block registry. It exposes allocation, registration, matching, and scanning
8//! operations while keeping all pool transitions behind a single API surface.
9//!
10//! Construction uses a builder pattern — see [`BlockManagerConfigBuilder`].
11//!
12//! # Re-exported configuration types
13//!
14//! - [`FrequencyTrackingCapacity`] — TinyLFU tracker sizing
15//! - [`InactiveBackendConfig`] — inactive pool backend selection
16//! - [`BlockManagerBuilderError`] / [`BlockManagerResetError`] — error types
17
18mod builder;
19
20#[cfg(test)]
21mod tests;
22
23pub use builder::{
24    BlockManagerBuilderError, BlockManagerConfigBuilder, BlockManagerResetError,
25    FrequencyTrackingCapacity, InactiveBackendConfig,
26};
27
28use std::collections::HashMap;
29use std::sync::Arc;
30
31use parking_lot::Mutex;
32
33use crate::blocks::{BlockMetadata, CompleteBlock, ImmutableBlock, MutableBlock, UpgradeFn};
34use crate::metrics::BlockPoolMetrics;
35use crate::pools::{ActivePool, BlockDuplicationPolicy, InactivePool, ResetPool, SequenceHash};
36use crate::registry::BlockRegistry;
37
38/// Manages the full block lifecycle across three pool tiers:
39/// reset (free), active (in-use), and inactive (cached, evictable).
40///
41/// Thread-safe: allocation is serialised via an internal [`Mutex`]; individual
42/// pools use their own internal locking.
43///
44/// Construct via [`BlockManager::builder()`].
45pub struct BlockManager<T: BlockMetadata> {
46    reset_pool: ResetPool<T>,
47    active_pool: ActivePool<T>,
48    inactive_pool: InactivePool<T>,
49    block_registry: BlockRegistry,
50    duplication_policy: BlockDuplicationPolicy,
51    upgrade_fn: UpgradeFn<T>,
52    allocate_mutex: Mutex<()>,
53    total_blocks: usize,
54    block_size: usize,
55    metrics: Arc<BlockPoolMetrics>,
56}
57
58impl<T: BlockMetadata> BlockManager<T> {
59    /// Create a new builder for BlockManager.
60    ///
61    /// # Example
62    /// ```ignore
63    /// let tracker = FrequencyTrackingCapacity::Medium.create_tracker();
64    /// let registry = BlockRegistry::builder().frequency_tracker(tracker).build();
65    ///
66    /// let manager = BlockManager::builder()
67    ///     .block_count(1000)
68    ///     .registry(registry)
69    ///     .with_multi_lru_backend()
70    ///     .build()?;
71    /// ```
72    pub fn builder() -> BlockManagerConfigBuilder<T> {
73        BlockManagerConfigBuilder::default()
74    }
75
76    /// Allocate `count` mutable blocks, drawing first from the reset pool
77    /// then evicting from the inactive pool if needed.
78    ///
79    /// Returns `None` if fewer than `count` blocks are available across both pools.
80    pub fn allocate_blocks(&self, count: usize) -> Option<Vec<MutableBlock<T>>> {
81        self.allocate_blocks_with_evictions(count)
82            .map(|(blocks, _evicted)| blocks)
83    }
84
85    /// Like [`allocate_blocks`](Self::allocate_blocks) but also reports the
86    /// [`SequenceHash`] of each block evicted from the inactive pool to
87    /// satisfy the allocation. Callers maintaining a shadow view of which
88    /// registrations are alive (e.g. the mocker's router-event bridge) can
89    /// translate these hashes into cache-invalidation events directly,
90    /// avoiding an O(N) presence scan over the registry.
91    pub fn allocate_blocks_with_evictions(
92        &self,
93        count: usize,
94    ) -> Option<(Vec<MutableBlock<T>>, Vec<SequenceHash>)> {
95        let _guard = self.allocate_mutex.lock();
96        let from_reset = self.reset_pool.allocate_blocks(count);
97        let from_reset_count = from_reset.len();
98        let mut blocks = from_reset;
99
100        let remaining_needed = count - blocks.len();
101        match self.inactive_pool.allocate_blocks(remaining_needed) {
102            Some((remaining, evicted)) => {
103                let eviction_count = remaining.len() as u64;
104                blocks.extend(remaining);
105
106                self.metrics.inc_allocations(blocks.len() as u64);
107                self.metrics
108                    .inc_allocations_from_reset(from_reset_count as u64);
109                self.metrics.inc_evictions(eviction_count);
110
111                Some((blocks, evicted))
112            }
113            None => None,
114        }
115    }
116
117    /// Drain the inactive pool, returning all blocks to the reset pool.
118    ///
119    /// 1. Acquires the inactive pool lock and allocates all blocks.
120    /// 2. Releases the lock.
121    /// 3. Drops the allocated blocks (RAII returns them to reset).
122    /// 4. Verifies the reset pool contains the expected total.
123    ///
124    /// Returns an error under contention when blocks are in active use.
125    pub fn reset_inactive_pool(&self) -> Result<(), BlockManagerResetError> {
126        // 1. Allocate all blocks from inactive pool (acquires lock internally)
127        let blocks = self.inactive_pool.allocate_all_blocks();
128
129        // 2. Drop blocks - RAII returns them to reset pool
130        drop(blocks);
131
132        // 3. Verify block count (may fail under contention - that's OK)
133        let reset_count = self.reset_pool.len();
134        if reset_count != self.total_blocks {
135            return Err(BlockManagerResetError::BlockCountMismatch {
136                expected: self.total_blocks,
137                actual: reset_count,
138            });
139        }
140
141        Ok(())
142    }
143
144    /// Register a batch of completed blocks, returning immutable handles.
145    pub fn register_blocks(&self, blocks: Vec<CompleteBlock<T>>) -> Vec<ImmutableBlock<T>> {
146        blocks
147            .into_iter()
148            .map(|block| self.register_block(block))
149            .collect()
150    }
151
152    /// Register a single completed block and return an immutable handle.
153    ///
154    /// Deduplication is governed by the configured [`BlockDuplicationPolicy`].
155    pub fn register_block(&self, block: CompleteBlock<T>) -> ImmutableBlock<T> {
156        self.metrics.inc_registrations();
157        let handle = self
158            .block_registry
159            .register_sequence_hash(block.sequence_hash());
160        let registered_block = handle.register_block(
161            block,
162            self.duplication_policy,
163            &self.inactive_pool,
164            Some(self.metrics.as_ref()),
165        );
166        ImmutableBlock::new(
167            registered_block,
168            self.upgrade_fn.clone(),
169            Some(self.metrics.clone()),
170        )
171    }
172
173    /// Linear prefix match: walks `seq_hash` left-to-right, stopping on first miss.
174    ///
175    /// Checks the active pool first, then the inactive pool for remaining hashes.
176    pub fn match_blocks(&self, seq_hash: &[SequenceHash]) -> Vec<ImmutableBlock<T>> {
177        self.metrics
178            .inc_match_hashes_requested(seq_hash.len() as u64);
179
180        tracing::debug!(
181            num_hashes = seq_hash.len(),
182            inactive_pool_len = self.inactive_pool.len(),
183            "match_blocks called"
184        );
185
186        let Some((&first_hash, remaining_after_first)) = seq_hash.split_first() else {
187            tracing::debug!(total_matched = 0, "match_blocks result");
188            return Vec::new();
189        };
190
191        let mut matched: Vec<ImmutableBlock<T>>;
192        let active_matched;
193
194        if let Some(first_block) = self.active_pool.find_match(first_hash, true) {
195            matched = Vec::with_capacity(seq_hash.len());
196            matched.push(ImmutableBlock::new(
197                first_block,
198                self.upgrade_fn.clone(),
199                Some(self.metrics.clone()),
200            ));
201
202            if !remaining_after_first.is_empty() {
203                matched.extend(
204                    self.active_pool
205                        .find_matches(remaining_after_first, true)
206                        .into_iter()
207                        .map(|block| {
208                            ImmutableBlock::new(
209                                block,
210                                self.upgrade_fn.clone(),
211                                Some(self.metrics.clone()),
212                            )
213                        }),
214                );
215            }
216
217            active_matched = matched.len();
218        } else {
219            let inactive_found = self.inactive_pool.find_blocks(seq_hash, true);
220            let inactive_matched = inactive_found.len();
221            tracing::debug!(
222                remaining_to_check = seq_hash.len(),
223                inactive_matched,
224                "Matched from inactive pool"
225            );
226
227            if inactive_found.is_empty() {
228                self.metrics.inc_match_blocks_returned(0);
229                tracing::debug!(total_matched = 0, "match_blocks result");
230                return Vec::new();
231            }
232
233            matched = Vec::with_capacity(seq_hash.len());
234            matched.extend(inactive_found.into_iter().map(|block| {
235                ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone()))
236            }));
237            active_matched = 0;
238        }
239
240        tracing::debug!(active_matched, "Matched from active pool");
241
242        // If we didn't match all hashes, try inactive blocks for the remaining ones
243        let remaining_hashes = &seq_hash[matched.len()..];
244        if !remaining_hashes.is_empty() {
245            let inactive_found: Vec<_> = self.inactive_pool.find_blocks(remaining_hashes, true);
246            let inactive_matched = inactive_found.len();
247            tracing::debug!(
248                remaining_to_check = remaining_hashes.len(),
249                inactive_matched,
250                "Matched from inactive pool"
251            );
252            matched.extend(inactive_found.into_iter().map(|block| {
253                ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone()))
254            }));
255        }
256
257        self.metrics.inc_match_blocks_returned(matched.len() as u64);
258
259        tracing::debug!(total_matched = matched.len(), "match_blocks result");
260        tracing::trace!(matched = ?matched, "matched blocks");
261        matched
262    }
263
264    /// Scatter-gather scan: finds all blocks matching any hash, without stopping on misses.
265    ///
266    /// Returns a map of found hashes to immutable handles.
267    pub fn scan_matches(
268        &self,
269        seq_hashes: &[SequenceHash],
270        touch: bool,
271    ) -> HashMap<SequenceHash, ImmutableBlock<T>> {
272        self.metrics
273            .inc_scan_hashes_requested(seq_hashes.len() as u64);
274
275        let mut result = HashMap::new();
276
277        // 1. Check active pool for all hashes (read-only, no touch needed)
278        let active_found = self.active_pool.scan_matches(seq_hashes);
279        for (hash, block) in active_found {
280            result.insert(
281                hash,
282                ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone())),
283            );
284        }
285
286        // 2. Build remaining hashes set
287        let remaining: Vec<SequenceHash> = seq_hashes
288            .iter()
289            .filter(|h| !result.contains_key(h))
290            .copied()
291            .collect();
292
293        // 3. Scan inactive pool for remaining (acquires blocks, may touch)
294        if !remaining.is_empty() {
295            let inactive_found = self.inactive_pool.scan_blocks(&remaining, touch);
296            for (hash, block) in inactive_found {
297                result.insert(
298                    hash,
299                    ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone())),
300                );
301            }
302        }
303
304        self.metrics.inc_scan_blocks_returned(result.len() as u64);
305
306        result
307    }
308
309    /// Total number of blocks managed (constant after construction).
310    pub fn total_blocks(&self) -> usize {
311        self.total_blocks
312    }
313
314    /// Blocks available for allocation (reset + inactive pools).
315    pub fn available_blocks(&self) -> usize {
316        self.reset_pool.len() + self.inactive_pool.len()
317    }
318
319    /// Tokens per block (constant after construction).
320    pub fn block_size(&self) -> usize {
321        self.block_size
322    }
323
324    /// Current duplication policy.
325    pub fn duplication_policy(&self) -> &BlockDuplicationPolicy {
326        &self.duplication_policy
327    }
328
329    /// Reference to the shared block registry.
330    pub fn block_registry(&self) -> &BlockRegistry {
331        &self.block_registry
332    }
333
334    /// Reference to the block pool metrics.
335    pub fn metrics(&self) -> &Arc<BlockPoolMetrics> {
336        &self.metrics
337    }
338}