1mod builder;
19
20#[cfg(test)]
21mod tests;
22
23pub use builder::{
24 BlockManagerBuilderError, BlockManagerConfigBuilder, BlockManagerResetError,
25 FrequencyTrackingCapacity, InactiveBackendConfig,
26};
27
28use std::collections::HashMap;
29use std::sync::Arc;
30
31use parking_lot::Mutex;
32
33use crate::blocks::{BlockMetadata, CompleteBlock, ImmutableBlock, MutableBlock, UpgradeFn};
34use crate::metrics::BlockPoolMetrics;
35use crate::pools::{ActivePool, BlockDuplicationPolicy, InactivePool, ResetPool, SequenceHash};
36use crate::registry::BlockRegistry;
37
38pub struct BlockManager<T: BlockMetadata> {
46 reset_pool: ResetPool<T>,
47 active_pool: ActivePool<T>,
48 inactive_pool: InactivePool<T>,
49 block_registry: BlockRegistry,
50 duplication_policy: BlockDuplicationPolicy,
51 upgrade_fn: UpgradeFn<T>,
52 allocate_mutex: Mutex<()>,
53 total_blocks: usize,
54 block_size: usize,
55 metrics: Arc<BlockPoolMetrics>,
56}
57
58impl<T: BlockMetadata> BlockManager<T> {
59 pub fn builder() -> BlockManagerConfigBuilder<T> {
73 BlockManagerConfigBuilder::default()
74 }
75
76 pub fn allocate_blocks(&self, count: usize) -> Option<Vec<MutableBlock<T>>> {
81 self.allocate_blocks_with_evictions(count)
82 .map(|(blocks, _evicted)| blocks)
83 }
84
85 pub fn allocate_blocks_with_evictions(
92 &self,
93 count: usize,
94 ) -> Option<(Vec<MutableBlock<T>>, Vec<SequenceHash>)> {
95 let _guard = self.allocate_mutex.lock();
96 let from_reset = self.reset_pool.allocate_blocks(count);
97 let from_reset_count = from_reset.len();
98 let mut blocks = from_reset;
99
100 let remaining_needed = count - blocks.len();
101 match self.inactive_pool.allocate_blocks(remaining_needed) {
102 Some((remaining, evicted)) => {
103 let eviction_count = remaining.len() as u64;
104 blocks.extend(remaining);
105
106 self.metrics.inc_allocations(blocks.len() as u64);
107 self.metrics
108 .inc_allocations_from_reset(from_reset_count as u64);
109 self.metrics.inc_evictions(eviction_count);
110
111 Some((blocks, evicted))
112 }
113 None => None,
114 }
115 }
116
117 pub fn reset_inactive_pool(&self) -> Result<(), BlockManagerResetError> {
126 let blocks = self.inactive_pool.allocate_all_blocks();
128
129 drop(blocks);
131
132 let reset_count = self.reset_pool.len();
134 if reset_count != self.total_blocks {
135 return Err(BlockManagerResetError::BlockCountMismatch {
136 expected: self.total_blocks,
137 actual: reset_count,
138 });
139 }
140
141 Ok(())
142 }
143
144 pub fn register_blocks(&self, blocks: Vec<CompleteBlock<T>>) -> Vec<ImmutableBlock<T>> {
146 blocks
147 .into_iter()
148 .map(|block| self.register_block(block))
149 .collect()
150 }
151
152 pub fn register_block(&self, block: CompleteBlock<T>) -> ImmutableBlock<T> {
156 self.metrics.inc_registrations();
157 let handle = self
158 .block_registry
159 .register_sequence_hash(block.sequence_hash());
160 let registered_block = handle.register_block(
161 block,
162 self.duplication_policy,
163 &self.inactive_pool,
164 Some(self.metrics.as_ref()),
165 );
166 ImmutableBlock::new(
167 registered_block,
168 self.upgrade_fn.clone(),
169 Some(self.metrics.clone()),
170 )
171 }
172
173 pub fn match_blocks(&self, seq_hash: &[SequenceHash]) -> Vec<ImmutableBlock<T>> {
177 self.metrics
178 .inc_match_hashes_requested(seq_hash.len() as u64);
179
180 tracing::debug!(
181 num_hashes = seq_hash.len(),
182 inactive_pool_len = self.inactive_pool.len(),
183 "match_blocks called"
184 );
185
186 let Some((&first_hash, remaining_after_first)) = seq_hash.split_first() else {
187 tracing::debug!(total_matched = 0, "match_blocks result");
188 return Vec::new();
189 };
190
191 let mut matched: Vec<ImmutableBlock<T>>;
192 let active_matched;
193
194 if let Some(first_block) = self.active_pool.find_match(first_hash, true) {
195 matched = Vec::with_capacity(seq_hash.len());
196 matched.push(ImmutableBlock::new(
197 first_block,
198 self.upgrade_fn.clone(),
199 Some(self.metrics.clone()),
200 ));
201
202 if !remaining_after_first.is_empty() {
203 matched.extend(
204 self.active_pool
205 .find_matches(remaining_after_first, true)
206 .into_iter()
207 .map(|block| {
208 ImmutableBlock::new(
209 block,
210 self.upgrade_fn.clone(),
211 Some(self.metrics.clone()),
212 )
213 }),
214 );
215 }
216
217 active_matched = matched.len();
218 } else {
219 let inactive_found = self.inactive_pool.find_blocks(seq_hash, true);
220 let inactive_matched = inactive_found.len();
221 tracing::debug!(
222 remaining_to_check = seq_hash.len(),
223 inactive_matched,
224 "Matched from inactive pool"
225 );
226
227 if inactive_found.is_empty() {
228 self.metrics.inc_match_blocks_returned(0);
229 tracing::debug!(total_matched = 0, "match_blocks result");
230 return Vec::new();
231 }
232
233 matched = Vec::with_capacity(seq_hash.len());
234 matched.extend(inactive_found.into_iter().map(|block| {
235 ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone()))
236 }));
237 active_matched = 0;
238 }
239
240 tracing::debug!(active_matched, "Matched from active pool");
241
242 let remaining_hashes = &seq_hash[matched.len()..];
244 if !remaining_hashes.is_empty() {
245 let inactive_found: Vec<_> = self.inactive_pool.find_blocks(remaining_hashes, true);
246 let inactive_matched = inactive_found.len();
247 tracing::debug!(
248 remaining_to_check = remaining_hashes.len(),
249 inactive_matched,
250 "Matched from inactive pool"
251 );
252 matched.extend(inactive_found.into_iter().map(|block| {
253 ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone()))
254 }));
255 }
256
257 self.metrics.inc_match_blocks_returned(matched.len() as u64);
258
259 tracing::debug!(total_matched = matched.len(), "match_blocks result");
260 tracing::trace!(matched = ?matched, "matched blocks");
261 matched
262 }
263
264 pub fn scan_matches(
268 &self,
269 seq_hashes: &[SequenceHash],
270 touch: bool,
271 ) -> HashMap<SequenceHash, ImmutableBlock<T>> {
272 self.metrics
273 .inc_scan_hashes_requested(seq_hashes.len() as u64);
274
275 let mut result = HashMap::new();
276
277 let active_found = self.active_pool.scan_matches(seq_hashes);
279 for (hash, block) in active_found {
280 result.insert(
281 hash,
282 ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone())),
283 );
284 }
285
286 let remaining: Vec<SequenceHash> = seq_hashes
288 .iter()
289 .filter(|h| !result.contains_key(h))
290 .copied()
291 .collect();
292
293 if !remaining.is_empty() {
295 let inactive_found = self.inactive_pool.scan_blocks(&remaining, touch);
296 for (hash, block) in inactive_found {
297 result.insert(
298 hash,
299 ImmutableBlock::new(block, self.upgrade_fn.clone(), Some(self.metrics.clone())),
300 );
301 }
302 }
303
304 self.metrics.inc_scan_blocks_returned(result.len() as u64);
305
306 result
307 }
308
309 pub fn total_blocks(&self) -> usize {
311 self.total_blocks
312 }
313
314 pub fn available_blocks(&self) -> usize {
316 self.reset_pool.len() + self.inactive_pool.len()
317 }
318
319 pub fn block_size(&self) -> usize {
321 self.block_size
322 }
323
324 pub fn duplication_policy(&self) -> &BlockDuplicationPolicy {
326 &self.duplication_policy
327 }
328
329 pub fn block_registry(&self) -> &BlockRegistry {
331 &self.block_registry
332 }
333
334 pub fn metrics(&self) -> &Arc<BlockPoolMetrics> {
336 &self.metrics
337 }
338}