foyer_storage/engine/block/
manager.rs1use std::{
16 collections::{HashSet, VecDeque},
17 fmt::Debug,
18 ops::{Deref, DerefMut},
19 sync::{
20 atomic::{AtomicBool, AtomicUsize, Ordering},
21 Arc, RwLock, RwLockWriteGuard,
22 },
23};
24
25use foyer_common::{
26 error::{ErrorKind, Result},
27 metrics::Metrics,
28 spawn::Spawner,
29};
30use futures_core::future::BoxFuture;
31use futures_util::{
32 future::{ready, Shared},
33 FutureExt,
34};
35use itertools::Itertools;
36use mea::oneshot;
37use rand::seq::IteratorRandom;
38
39use crate::{
40 engine::block::{
41 eviction::{EvictionInfo, EvictionPicker},
42 reclaimer::ReclaimerTrait,
43 },
44 io::{
45 bytes::{IoB, IoBuf, IoBufMut},
46 device::Partition,
47 engine::IoEngine,
48 },
49 Device,
50};
51
52pub type BlockId = u32;
53
54#[derive(Debug, Default)]
56pub struct BlockStatistics {
57 pub invalid: AtomicUsize,
60 pub access: AtomicUsize,
62 pub probation: AtomicBool,
64}
65
66impl BlockStatistics {
67 pub(crate) fn reset(&self) {
68 self.invalid.store(0, Ordering::Relaxed);
69 self.access.store(0, Ordering::Relaxed);
70 self.probation.store(false, Ordering::Relaxed);
71 }
72}
73
74#[derive(Debug)]
75struct BlockInner {
76 id: BlockId,
77 partition: Arc<dyn Partition>,
78 io_engine: Arc<dyn IoEngine>,
79 statistics: Arc<BlockStatistics>,
80}
81
82#[derive(Debug, Clone)]
84pub struct Block {
85 inner: Arc<BlockInner>,
86}
87
88impl Block {
89 pub fn id(&self) -> BlockId {
91 self.inner.id
92 }
93
94 pub fn statistics(&self) -> &Arc<BlockStatistics> {
96 &self.inner.statistics
97 }
98
99 pub fn size(&self) -> usize {
101 self.inner.partition.size()
102 }
103
104 pub(crate) async fn write(&self, buf: Box<dyn IoBuf>, offset: u64) -> (Box<dyn IoB>, Result<()>) {
105 let (buf, res) = self
106 .inner
107 .io_engine
108 .write(buf, self.inner.partition.as_ref(), offset)
109 .await;
110 (buf, res)
111 }
112
113 pub(crate) async fn read(&self, buf: Box<dyn IoBufMut>, offset: u64) -> (Box<dyn IoB>, Result<()>) {
114 let (buf, res) = self
115 .inner
116 .io_engine
117 .read(buf, self.inner.partition.as_ref(), offset)
118 .await;
119 (buf, res)
120 }
121
122 pub(crate) fn partition(&self) -> &Arc<dyn Partition> {
123 &self.inner.partition
124 }
125}
126
127#[cfg(test)]
128impl Block {
129 pub(crate) fn new_for_test(id: BlockId, partition: Arc<dyn Partition>, io_engine: Arc<dyn IoEngine>) -> Self {
130 let inner = BlockInner {
131 id,
132 partition,
133 io_engine,
134 statistics: Arc::<BlockStatistics>::default(),
135 };
136 let inner = Arc::new(inner);
137 Self { inner }
138 }
139}
140
141pub type GetCleanBlockHandle = Shared<BoxFuture<'static, Block>>;
142
143#[derive(Debug)]
144struct State {
145 clean_blocks: VecDeque<BlockId>,
146 evictable_blocks: HashSet<BlockId>,
147 writing_blocks: HashSet<BlockId>,
148 reclaiming_blocks: HashSet<BlockId>,
149
150 clean_block_waiters: Vec<oneshot::Sender<Block>>,
151
152 eviction_pickers: Vec<Box<dyn EvictionPicker>>,
153
154 reclaim_waiters: Vec<oneshot::Sender<()>>,
155}
156
157#[derive(Debug)]
158struct Inner {
159 blocks: Vec<Block>,
160 state: RwLock<State>,
161 reclaimer: Arc<dyn ReclaimerTrait>,
162 reclaim_concurrency: usize,
163 clean_block_threshold: usize,
164 metrics: Arc<Metrics>,
165 spawner: Spawner,
166}
167
168#[derive(Debug, Clone)]
169pub struct BlockManager {
170 inner: Arc<Inner>,
171}
172
173impl BlockManager {
174 #[expect(clippy::too_many_arguments)]
175 pub fn open(
176 device: Arc<dyn Device>,
177 io_engine: Arc<dyn IoEngine>,
178 block_size: usize,
179 mut eviction_pickers: Vec<Box<dyn EvictionPicker>>,
180 reclaimer: Arc<dyn ReclaimerTrait>,
181 reclaim_concurrency: usize,
182 clean_block_threshold: usize,
183 metrics: Arc<Metrics>,
184 spawner: Spawner,
185 ) -> Result<Self> {
186 let mut blocks = vec![];
187
188 while device.free() >= block_size {
189 let partition = match device.create_partition(block_size) {
190 Ok(partition) => partition,
191 Err(e) if e.kind() == ErrorKind::NoSpace => break,
192 Err(e) => return Err(e),
193 };
194 let id = blocks.len() as BlockId;
195 let block = Block {
196 inner: Arc::new(BlockInner {
197 id,
198 partition,
199 io_engine: io_engine.clone(),
200 statistics: Arc::<BlockStatistics>::default(),
201 }),
202 };
203 blocks.push(block);
204 }
205
206 let rs = blocks.iter().map(|r| r.id()).collect_vec();
207 for pickers in eviction_pickers.iter_mut() {
208 pickers.init(&rs, block_size);
209 }
210
211 metrics.storage_block_engine_block_size_bytes.absolute(block_size as _);
212
213 let state = State {
214 clean_blocks: VecDeque::new(),
215 evictable_blocks: HashSet::new(),
216 writing_blocks: HashSet::new(),
217 reclaiming_blocks: HashSet::new(),
218 clean_block_waiters: Vec::new(),
219 eviction_pickers,
220 reclaim_waiters: Vec::new(),
221 };
222 let inner = Inner {
223 blocks,
224 state: RwLock::new(state),
225 reclaimer,
226 reclaim_concurrency,
227 clean_block_threshold,
228 metrics,
229 spawner,
230 };
231 let inner = Arc::new(inner);
232 let this = Self { inner };
233 Ok(this)
234 }
235
236 pub fn init(&self, clean_blocks: &[BlockId]) {
237 let mut state = self.inner.state.write().unwrap();
238 let mut evictable_blocks: HashSet<BlockId> = self.inner.blocks.iter().map(|r| r.id()).collect();
239 state.clean_blocks = clean_blocks
240 .iter()
241 .inspect(|id| {
242 evictable_blocks.remove(id);
243 })
244 .copied()
245 .collect();
246
247 let mut pickers = std::mem::take(&mut state.eviction_pickers);
249
250 for block in evictable_blocks {
252 state.evictable_blocks.insert(block);
253 for picker in pickers.iter_mut() {
254 picker.on_block_evictable(
255 EvictionInfo {
256 blocks: &self.inner.blocks,
257 evictable: &state.evictable_blocks,
258 clean: state.clean_blocks.len(),
259 },
260 block,
261 );
262 }
263 }
264
265 std::mem::swap(&mut state.eviction_pickers, &mut pickers);
268 assert!(pickers.is_empty());
269
270 let metrics = &self.inner.metrics;
271 metrics
272 .storage_block_engine_block_clean
273 .absolute(state.clean_blocks.len() as _);
274 metrics
275 .storage_block_engine_block_evictable
276 .absolute(state.evictable_blocks.len() as _);
277 metrics
278 .storage_block_engine_block_writing
279 .absolute(state.writing_blocks.len() as _);
280 metrics
281 .storage_block_engine_block_reclaiming
282 .absolute(state.reclaiming_blocks.len() as _);
283 }
284
285 pub fn blocks(&self) -> usize {
286 self.inner.blocks.len()
287 }
288
289 pub fn block(&self, id: BlockId) -> &Block {
290 &self.inner.blocks[id as usize]
291 }
292
293 pub fn get_clean_block(&self) -> GetCleanBlockHandle {
294 let this = self.clone();
295 async move {
296 let rx = {
298 let mut state = this.inner.state.write().unwrap();
299 if let Some(id) = state.clean_blocks.pop_front() {
300 let block = this.inner.blocks[id as usize].clone();
301 state.writing_blocks.insert(id);
302 this.inner.metrics.storage_block_engine_block_clean.decrease(1);
303 this.inner.metrics.storage_block_engine_block_writing.increase(1);
304 this.reclaim_if_needed(&mut state);
305 return block;
306 } else {
307 let (tx, rx) = oneshot::channel();
308 state.clean_block_waiters.push(tx);
309 drop(state);
310 rx
311 }
312 };
313 rx.await.unwrap()
314 }
315 .boxed()
316 .shared()
317 }
318
319 pub fn on_writing_finish(&self, block: Block) {
320 let mut state = self.inner.state.write().unwrap();
321 state.writing_blocks.remove(&block.id());
322 self.inner.metrics.storage_block_engine_block_writing.decrease(1);
323 let inserted = state.evictable_blocks.insert(block.id());
324 self.inner.metrics.storage_block_engine_block_evictable.increase(1);
325
326 assert!(inserted);
327
328 let mut pickers = std::mem::take(&mut state.eviction_pickers);
330
331 for picker in pickers.iter_mut() {
333 picker.on_block_evictable(
334 EvictionInfo {
335 blocks: &self.inner.blocks,
336 evictable: &state.evictable_blocks,
337 clean: state.clean_blocks.len(),
338 },
339 block.id(),
340 );
341 }
342
343 std::mem::swap(&mut state.eviction_pickers, &mut pickers);
346 assert!(pickers.is_empty());
347
348 tracing::debug!(
349 id = block.id(),
350 "[block manager]: Block state transfers from writing to evictable."
351 );
352
353 self.reclaim_if_needed(&mut state);
354 }
355
356 fn on_reclaim_finish(&self, block: Block) {
357 let mut state = self.inner.state.write().unwrap();
358 state.reclaiming_blocks.remove(&block.id());
359 self.inner.metrics.storage_block_engine_block_reclaiming.decrease(1);
360 if let Some(waiter) = state.clean_block_waiters.pop() {
361 self.inner.metrics.storage_block_engine_block_writing.increase(1);
362 let _ = waiter.send(block);
363 } else {
364 self.inner.metrics.storage_block_engine_block_clean.increase(1);
365 state.clean_blocks.push_back(block.id());
366 }
367 self.reclaim_if_needed(&mut state);
368 if state.reclaiming_blocks.is_empty() {
369 for tx in std::mem::take(&mut state.reclaim_waiters) {
370 let _ = tx.send(());
371 }
372 }
373 }
374
375 fn reclaim_if_needed<'a>(&self, state: &mut RwLockWriteGuard<'a, State>) {
376 if state.clean_blocks.len() < self.inner.clean_block_threshold
377 && state.reclaiming_blocks.len() < self.inner.reclaim_concurrency
378 {
379 if let Some(block) = self.evict(state) {
380 state.reclaiming_blocks.insert(block.id());
381 self.inner.metrics.storage_block_engine_block_reclaiming.increase(1);
382 let block = ReclaimingBlock {
383 block_manager: self.clone(),
384 block,
385 };
386 let future = self.inner.reclaimer.reclaim(block);
387 self.inner.spawner.spawn(future);
388 }
389 }
390 }
391
392 fn evict<'a>(&self, state: &mut RwLockWriteGuard<'a, State>) -> Option<Block> {
393 let mut picked = None;
394
395 if state.evictable_blocks.is_empty() {
396 return None;
397 }
398
399 let mut pickers = std::mem::take(&mut state.eviction_pickers);
401
402 for picker in pickers.iter_mut() {
404 if let Some(block) = picker.pick(EvictionInfo {
405 blocks: &self.inner.blocks,
406 evictable: &state.evictable_blocks,
407 clean: state.clean_blocks.len(),
408 }) {
409 picked = Some(block);
410 break;
411 }
412 }
413
414 let picked = picked.unwrap_or_else(|| state.evictable_blocks.iter().choose(&mut rand::rng()).copied().unwrap());
416
417 let removed = state.evictable_blocks.remove(&picked);
419 self.inner.metrics.storage_block_engine_block_evictable.decrease(1);
420 assert!(removed);
421
422 for picker in pickers.iter_mut() {
424 picker.on_block_evict(
425 EvictionInfo {
426 blocks: &self.inner.blocks,
427 evictable: &state.evictable_blocks,
428 clean: state.clean_blocks.len(),
429 },
430 picked,
431 );
432 }
433
434 std::mem::swap(&mut state.eviction_pickers, &mut pickers);
436 assert!(pickers.is_empty());
437
438 let block = self.inner.blocks[picked as usize].clone();
439 tracing::debug!("[block manager]: Block {picked} is evicted.");
440
441 Some(block)
442 }
443
444 pub fn wait_reclaim(&self) -> BoxFuture<'static, ()> {
445 let mut state = self.inner.state.write().unwrap();
446 if state.reclaiming_blocks.is_empty() {
447 return ready(()).boxed();
448 }
449 let (tx, rx) = oneshot::channel();
450 state.reclaim_waiters.push(tx);
451 async move {
452 let _ = rx.await;
453 }
454 .boxed()
455 }
456}
457
458pub struct ReclaimingBlock {
459 block_manager: BlockManager,
460 block: Block,
461}
462
463impl Deref for ReclaimingBlock {
464 type Target = Block;
465
466 fn deref(&self) -> &Self::Target {
467 &self.block
468 }
469}
470
471impl DerefMut for ReclaimingBlock {
472 fn deref_mut(&mut self) -> &mut Self::Target {
473 &mut self.block
474 }
475}
476
477impl Drop for ReclaimingBlock {
478 fn drop(&mut self) {
479 self.block_manager.on_reclaim_finish(self.block.clone());
480 }
481}