foyer_storage/engine/block/
manager.rs

1// Copyright 2026 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{HashSet, VecDeque},
17    fmt::Debug,
18    ops::{Deref, DerefMut},
19    sync::{
20        atomic::{AtomicBool, AtomicUsize, Ordering},
21        Arc, RwLock, RwLockWriteGuard,
22    },
23};
24
25use foyer_common::{
26    error::{ErrorKind, Result},
27    metrics::Metrics,
28    spawn::Spawner,
29};
30use futures_core::future::BoxFuture;
31use futures_util::{
32    future::{ready, Shared},
33    FutureExt,
34};
35use itertools::Itertools;
36use mea::oneshot;
37use rand::seq::IteratorRandom;
38
39use crate::{
40    engine::block::{
41        eviction::{EvictionInfo, EvictionPicker},
42        reclaimer::ReclaimerTrait,
43    },
44    io::{
45        bytes::{IoB, IoBuf, IoBufMut},
46        device::Partition,
47        engine::IoEngine,
48    },
49    Device,
50};
51
52pub type BlockId = u32;
53
54/// Block statistics.
55#[derive(Debug, Default)]
56pub struct BlockStatistics {
57    /// Estimated invalid bytes in the block.
58    /// FIXME(MrCroxx): This value is way too coarse. Need fix.
59    pub invalid: AtomicUsize,
60    /// Access count of the block.
61    pub access: AtomicUsize,
62    /// Marked as `true` if the block is about to be evicted by some eviction picker.
63    pub probation: AtomicBool,
64}
65
66impl BlockStatistics {
67    pub(crate) fn reset(&self) {
68        self.invalid.store(0, Ordering::Relaxed);
69        self.access.store(0, Ordering::Relaxed);
70        self.probation.store(false, Ordering::Relaxed);
71    }
72}
73
74#[derive(Debug)]
75struct BlockInner {
76    id: BlockId,
77    partition: Arc<dyn Partition>,
78    io_engine: Arc<dyn IoEngine>,
79    statistics: Arc<BlockStatistics>,
80}
81
82/// A block is a logical partition of a device. It is used to manage the device's storage space.
83#[derive(Debug, Clone)]
84pub struct Block {
85    inner: Arc<BlockInner>,
86}
87
88impl Block {
89    /// Get block id.
90    pub fn id(&self) -> BlockId {
91        self.inner.id
92    }
93
94    /// Get block Statistics.
95    pub fn statistics(&self) -> &Arc<BlockStatistics> {
96        &self.inner.statistics
97    }
98
99    /// Get block size.
100    pub fn size(&self) -> usize {
101        self.inner.partition.size()
102    }
103
104    pub(crate) async fn write(&self, buf: Box<dyn IoBuf>, offset: u64) -> (Box<dyn IoB>, Result<()>) {
105        let (buf, res) = self
106            .inner
107            .io_engine
108            .write(buf, self.inner.partition.as_ref(), offset)
109            .await;
110        (buf, res)
111    }
112
113    pub(crate) async fn read(&self, buf: Box<dyn IoBufMut>, offset: u64) -> (Box<dyn IoB>, Result<()>) {
114        let (buf, res) = self
115            .inner
116            .io_engine
117            .read(buf, self.inner.partition.as_ref(), offset)
118            .await;
119        (buf, res)
120    }
121
122    pub(crate) fn partition(&self) -> &Arc<dyn Partition> {
123        &self.inner.partition
124    }
125}
126
127#[cfg(test)]
128impl Block {
129    pub(crate) fn new_for_test(id: BlockId, partition: Arc<dyn Partition>, io_engine: Arc<dyn IoEngine>) -> Self {
130        let inner = BlockInner {
131            id,
132            partition,
133            io_engine,
134            statistics: Arc::<BlockStatistics>::default(),
135        };
136        let inner = Arc::new(inner);
137        Self { inner }
138    }
139}
140
141pub type GetCleanBlockHandle = Shared<BoxFuture<'static, Block>>;
142
143#[derive(Debug)]
144struct State {
145    clean_blocks: VecDeque<BlockId>,
146    evictable_blocks: HashSet<BlockId>,
147    writing_blocks: HashSet<BlockId>,
148    reclaiming_blocks: HashSet<BlockId>,
149
150    clean_block_waiters: Vec<oneshot::Sender<Block>>,
151
152    eviction_pickers: Vec<Box<dyn EvictionPicker>>,
153
154    reclaim_waiters: Vec<oneshot::Sender<()>>,
155}
156
157#[derive(Debug)]
158struct Inner {
159    blocks: Vec<Block>,
160    state: RwLock<State>,
161    reclaimer: Arc<dyn ReclaimerTrait>,
162    reclaim_concurrency: usize,
163    clean_block_threshold: usize,
164    metrics: Arc<Metrics>,
165    spawner: Spawner,
166}
167
168#[derive(Debug, Clone)]
169pub struct BlockManager {
170    inner: Arc<Inner>,
171}
172
173impl BlockManager {
174    #[expect(clippy::too_many_arguments)]
175    pub fn open(
176        device: Arc<dyn Device>,
177        io_engine: Arc<dyn IoEngine>,
178        block_size: usize,
179        mut eviction_pickers: Vec<Box<dyn EvictionPicker>>,
180        reclaimer: Arc<dyn ReclaimerTrait>,
181        reclaim_concurrency: usize,
182        clean_block_threshold: usize,
183        metrics: Arc<Metrics>,
184        spawner: Spawner,
185    ) -> Result<Self> {
186        let mut blocks = vec![];
187
188        while device.free() >= block_size {
189            let partition = match device.create_partition(block_size) {
190                Ok(partition) => partition,
191                Err(e) if e.kind() == ErrorKind::NoSpace => break,
192                Err(e) => return Err(e),
193            };
194            let id = blocks.len() as BlockId;
195            let block = Block {
196                inner: Arc::new(BlockInner {
197                    id,
198                    partition,
199                    io_engine: io_engine.clone(),
200                    statistics: Arc::<BlockStatistics>::default(),
201                }),
202            };
203            blocks.push(block);
204        }
205
206        let rs = blocks.iter().map(|r| r.id()).collect_vec();
207        for pickers in eviction_pickers.iter_mut() {
208            pickers.init(&rs, block_size);
209        }
210
211        metrics.storage_block_engine_block_size_bytes.absolute(block_size as _);
212
213        let state = State {
214            clean_blocks: VecDeque::new(),
215            evictable_blocks: HashSet::new(),
216            writing_blocks: HashSet::new(),
217            reclaiming_blocks: HashSet::new(),
218            clean_block_waiters: Vec::new(),
219            eviction_pickers,
220            reclaim_waiters: Vec::new(),
221        };
222        let inner = Inner {
223            blocks,
224            state: RwLock::new(state),
225            reclaimer,
226            reclaim_concurrency,
227            clean_block_threshold,
228            metrics,
229            spawner,
230        };
231        let inner = Arc::new(inner);
232        let this = Self { inner };
233        Ok(this)
234    }
235
236    pub fn init(&self, clean_blocks: &[BlockId]) {
237        let mut state = self.inner.state.write().unwrap();
238        let mut evictable_blocks: HashSet<BlockId> = self.inner.blocks.iter().map(|r| r.id()).collect();
239        state.clean_blocks = clean_blocks
240            .iter()
241            .inspect(|id| {
242                evictable_blocks.remove(id);
243            })
244            .copied()
245            .collect();
246
247        // Temporarily take pickers to make borrow checker happy.
248        let mut pickers = std::mem::take(&mut state.eviction_pickers);
249
250        // Notify pickers.
251        for block in evictable_blocks {
252            state.evictable_blocks.insert(block);
253            for picker in pickers.iter_mut() {
254                picker.on_block_evictable(
255                    EvictionInfo {
256                        blocks: &self.inner.blocks,
257                        evictable: &state.evictable_blocks,
258                        clean: state.clean_blocks.len(),
259                    },
260                    block,
261                );
262            }
263        }
264
265        // Restore taken pickers after operations.
266
267        std::mem::swap(&mut state.eviction_pickers, &mut pickers);
268        assert!(pickers.is_empty());
269
270        let metrics = &self.inner.metrics;
271        metrics
272            .storage_block_engine_block_clean
273            .absolute(state.clean_blocks.len() as _);
274        metrics
275            .storage_block_engine_block_evictable
276            .absolute(state.evictable_blocks.len() as _);
277        metrics
278            .storage_block_engine_block_writing
279            .absolute(state.writing_blocks.len() as _);
280        metrics
281            .storage_block_engine_block_reclaiming
282            .absolute(state.reclaiming_blocks.len() as _);
283    }
284
285    pub fn blocks(&self) -> usize {
286        self.inner.blocks.len()
287    }
288
289    pub fn block(&self, id: BlockId) -> &Block {
290        &self.inner.blocks[id as usize]
291    }
292
293    pub fn get_clean_block(&self) -> GetCleanBlockHandle {
294        let this = self.clone();
295        async move {
296            // Wrap state lock guard to make borrow checker happy.
297            let rx = {
298                let mut state = this.inner.state.write().unwrap();
299                if let Some(id) = state.clean_blocks.pop_front() {
300                    let block = this.inner.blocks[id as usize].clone();
301                    state.writing_blocks.insert(id);
302                    this.inner.metrics.storage_block_engine_block_clean.decrease(1);
303                    this.inner.metrics.storage_block_engine_block_writing.increase(1);
304                    this.reclaim_if_needed(&mut state);
305                    return block;
306                } else {
307                    let (tx, rx) = oneshot::channel();
308                    state.clean_block_waiters.push(tx);
309                    drop(state);
310                    rx
311                }
312            };
313            rx.await.unwrap()
314        }
315        .boxed()
316        .shared()
317    }
318
319    pub fn on_writing_finish(&self, block: Block) {
320        let mut state = self.inner.state.write().unwrap();
321        state.writing_blocks.remove(&block.id());
322        self.inner.metrics.storage_block_engine_block_writing.decrease(1);
323        let inserted = state.evictable_blocks.insert(block.id());
324        self.inner.metrics.storage_block_engine_block_evictable.increase(1);
325
326        assert!(inserted);
327
328        // Temporarily take pickers to make borrow checker happy.
329        let mut pickers = std::mem::take(&mut state.eviction_pickers);
330
331        // Notify pickers.
332        for picker in pickers.iter_mut() {
333            picker.on_block_evictable(
334                EvictionInfo {
335                    blocks: &self.inner.blocks,
336                    evictable: &state.evictable_blocks,
337                    clean: state.clean_blocks.len(),
338                },
339                block.id(),
340            );
341        }
342
343        // Restore taken pickers after operations.
344
345        std::mem::swap(&mut state.eviction_pickers, &mut pickers);
346        assert!(pickers.is_empty());
347
348        tracing::debug!(
349            id = block.id(),
350            "[block manager]: Block state transfers from writing to evictable."
351        );
352
353        self.reclaim_if_needed(&mut state);
354    }
355
356    fn on_reclaim_finish(&self, block: Block) {
357        let mut state = self.inner.state.write().unwrap();
358        state.reclaiming_blocks.remove(&block.id());
359        self.inner.metrics.storage_block_engine_block_reclaiming.decrease(1);
360        if let Some(waiter) = state.clean_block_waiters.pop() {
361            self.inner.metrics.storage_block_engine_block_writing.increase(1);
362            let _ = waiter.send(block);
363        } else {
364            self.inner.metrics.storage_block_engine_block_clean.increase(1);
365            state.clean_blocks.push_back(block.id());
366        }
367        self.reclaim_if_needed(&mut state);
368        if state.reclaiming_blocks.is_empty() {
369            for tx in std::mem::take(&mut state.reclaim_waiters) {
370                let _ = tx.send(());
371            }
372        }
373    }
374
375    fn reclaim_if_needed<'a>(&self, state: &mut RwLockWriteGuard<'a, State>) {
376        if state.clean_blocks.len() < self.inner.clean_block_threshold
377            && state.reclaiming_blocks.len() < self.inner.reclaim_concurrency
378        {
379            if let Some(block) = self.evict(state) {
380                state.reclaiming_blocks.insert(block.id());
381                self.inner.metrics.storage_block_engine_block_reclaiming.increase(1);
382                let block = ReclaimingBlock {
383                    block_manager: self.clone(),
384                    block,
385                };
386                let future = self.inner.reclaimer.reclaim(block);
387                self.inner.spawner.spawn(future);
388            }
389        }
390    }
391
392    fn evict<'a>(&self, state: &mut RwLockWriteGuard<'a, State>) -> Option<Block> {
393        let mut picked = None;
394
395        if state.evictable_blocks.is_empty() {
396            return None;
397        }
398
399        // Temporarily take pickers to make borrow checker happy.
400        let mut pickers = std::mem::take(&mut state.eviction_pickers);
401
402        // Pick a block to evict with pickers.
403        for picker in pickers.iter_mut() {
404            if let Some(block) = picker.pick(EvictionInfo {
405                blocks: &self.inner.blocks,
406                evictable: &state.evictable_blocks,
407                clean: state.clean_blocks.len(),
408            }) {
409                picked = Some(block);
410                break;
411            }
412        }
413
414        // If no block is selected, just randomly pick one.
415        let picked = picked.unwrap_or_else(|| state.evictable_blocks.iter().choose(&mut rand::rng()).copied().unwrap());
416
417        // Update evictable map.
418        let removed = state.evictable_blocks.remove(&picked);
419        self.inner.metrics.storage_block_engine_block_evictable.decrease(1);
420        assert!(removed);
421
422        // Notify pickers.
423        for picker in pickers.iter_mut() {
424            picker.on_block_evict(
425                EvictionInfo {
426                    blocks: &self.inner.blocks,
427                    evictable: &state.evictable_blocks,
428                    clean: state.clean_blocks.len(),
429                },
430                picked,
431            );
432        }
433
434        // Restore taken pickers after operations.
435        std::mem::swap(&mut state.eviction_pickers, &mut pickers);
436        assert!(pickers.is_empty());
437
438        let block = self.inner.blocks[picked as usize].clone();
439        tracing::debug!("[block manager]: Block {picked} is evicted.");
440
441        Some(block)
442    }
443
444    pub fn wait_reclaim(&self) -> BoxFuture<'static, ()> {
445        let mut state = self.inner.state.write().unwrap();
446        if state.reclaiming_blocks.is_empty() {
447            return ready(()).boxed();
448        }
449        let (tx, rx) = oneshot::channel();
450        state.reclaim_waiters.push(tx);
451        async move {
452            let _ = rx.await;
453        }
454        .boxed()
455    }
456}
457
458pub struct ReclaimingBlock {
459    block_manager: BlockManager,
460    block: Block,
461}
462
463impl Deref for ReclaimingBlock {
464    type Target = Block;
465
466    fn deref(&self) -> &Self::Target {
467        &self.block
468    }
469}
470
471impl DerefMut for ReclaimingBlock {
472    fn deref_mut(&mut self) -> &mut Self::Target {
473        &mut self.block
474    }
475}
476
477impl Drop for ReclaimingBlock {
478    fn drop(&mut self) {
479        self.block_manager.on_reclaim_finish(self.block.clone());
480    }
481}