foyer_storage/engine/block/
engine.rs

1// Copyright 2026 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    fmt::Debug,
17    future::Future,
18    marker::PhantomData,
19    sync::{
20        atomic::{AtomicBool, AtomicUsize, Ordering},
21        Arc,
22    },
23    time::Instant,
24};
25
26#[cfg(feature = "tracing")]
27use fastrace::prelude::*;
28use foyer_common::{
29    bits,
30    code::{StorageKey, StorageValue},
31    error::{Error, ErrorKind, Result},
32    metrics::Metrics,
33    properties::{Age, Properties},
34    spawn::Spawner,
35};
36use futures_core::future::BoxFuture;
37use futures_util::{
38    future::{join_all, try_join_all},
39    FutureExt,
40};
41use itertools::Itertools;
42use mea::mpsc::UnboundedReceiver;
43
44use super::{
45    flusher::{Flusher, InvalidStats, Submission},
46    indexer::Indexer,
47    recover::RecoverRunner,
48};
49#[cfg(any(test, feature = "test_utils"))]
50use crate::test_utils::*;
51use crate::{
52    compress::Compression,
53    engine::{
54        block::{
55            eviction::{EvictionPicker, FifoPicker, InvalidRatioPicker},
56            manager::{BlockId, BlockManager},
57            reclaimer::{BlockCleaner, Reclaimer, ReclaimerTrait},
58            serde::{AtomicSequence, EntryHeader},
59            tombstone::{Tombstone, TombstoneLog},
60        },
61        Engine, EngineBuildContext, EngineConfig, Populated,
62    },
63    filter::conditions::IoThrottle,
64    io::{bytes::IoSliceMut, PAGE},
65    keeper::PieceRef,
66    serde::EntryDeserializer,
67    Device, Load, RejectAll, StorageFilter, StorageFilterResult,
68};
69
70/// Config for the block-based disk cache engine.
71///
72/// The block-based disk cache engine is suitable for general cache entries with size from 2K to hundreds of MiBs.
73///
74/// Each cache entry will be aligned to a multiplier of 4K on disk, hence too small cache entries will lead to heavy
75/// internal fragmentation.
76///
77/// The disk cache evicts cache entries in block unit.
78pub struct BlockEngineConfig<K, V, P>
79where
80    K: StorageKey,
81    V: StorageValue,
82    P: Properties,
83{
84    device: Arc<dyn Device>,
85    block_size: usize,
86    compression: Compression,
87    indexer_shards: usize,
88    recover_concurrency: usize,
89    flushers: usize,
90    reclaimers: usize,
91    buffer_pool_size: usize,
92    blob_index_size: usize,
93    submit_queue_size_threshold: usize,
94    clean_block_threshold: usize,
95    eviction_pickers: Vec<Box<dyn EvictionPicker>>,
96    admission_filter: StorageFilter,
97    reinsertion_filter: StorageFilter,
98    enable_tombstone_log: bool,
99    #[cfg(any(test, feature = "test_utils"))]
100    flush_switch: Switch,
101    #[cfg(any(test, feature = "test_utils"))]
102    load_holder: Holder,
103    marker: PhantomData<(K, V, P)>,
104}
105
106impl<K, V, P> Debug for BlockEngineConfig<K, V, P>
107where
108    K: StorageKey,
109    V: StorageValue,
110    P: Properties,
111{
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("BlockEngineConfig")
114            .field("device", &self.device)
115            .field("block_size", &self.block_size)
116            .field("compression", &self.compression)
117            .field("indexer_shards", &self.indexer_shards)
118            .field("recover_concurrency", &self.recover_concurrency)
119            .field("flushers", &self.flushers)
120            .field("reclaimers", &self.reclaimers)
121            .field("buffer_pool_size", &self.buffer_pool_size)
122            .field("blob_index_size", &self.blob_index_size)
123            .field("submit_queue_size_threshold", &self.submit_queue_size_threshold)
124            .field("clean_block_threshold", &self.clean_block_threshold)
125            .field("eviction_pickers", &self.eviction_pickers)
126            .field("admission_filter", &self.admission_filter)
127            .field("reinsertion_filter", &self.reinsertion_filter)
128            .field("enable_tombstone_log", &self.enable_tombstone_log)
129            .finish()
130    }
131}
132
133impl<K, V, P> BlockEngineConfig<K, V, P>
134where
135    K: StorageKey,
136    V: StorageValue,
137    P: Properties,
138{
139    /// Create a new block-based disk cache engine builder with default configurations.
140    pub fn new(device: Arc<dyn Device>) -> Self {
141        Self {
142            device,
143            block_size: 16 * 1024 * 1024, // 16 MiB
144            compression: Compression::default(),
145            indexer_shards: 64,
146            recover_concurrency: 8,
147            flushers: 1,
148            reclaimers: 1,
149            buffer_pool_size: 16 * 1024 * 1024,            // 16 MiB
150            blob_index_size: 4 * 1024,                     // 4 KiB
151            submit_queue_size_threshold: 16 * 1024 * 1024, // 16 MiB
152            clean_block_threshold: 1,
153            eviction_pickers: vec![Box::new(InvalidRatioPicker::new(0.8)), Box::<FifoPicker>::default()],
154            admission_filter: StorageFilter::new(),
155            reinsertion_filter: StorageFilter::new().with_condition(RejectAll),
156            enable_tombstone_log: false,
157            #[cfg(any(test, feature = "test_utils"))]
158            flush_switch: Switch::default(),
159            #[cfg(any(test, feature = "test_utils"))]
160            load_holder: Holder::default(),
161            marker: PhantomData,
162        }
163    }
164
165    /// Set the block size for the block-based disk cache engine.
166    ///
167    /// Block is the minimal cache eviction unit for the block-based disk cache,
168    /// its size also limits the max cacheable entry size.
169    ///
170    /// The block size must be 4K-aligned. the given value is not 4K-aligned, it will be automatically aligned up.
171    ///
172    /// Default: `16 MiB`.
173    pub fn with_block_size(mut self, block_size: usize) -> Self {
174        self.block_size = bits::align_up(PAGE, block_size);
175        self
176    }
177
178    /// Set the shard num of the indexer. Each shard has its own lock.
179    ///
180    /// Default: `64`.
181    pub fn with_indexer_shards(mut self, indexer_shards: usize) -> Self {
182        self.indexer_shards = indexer_shards;
183        self
184    }
185
186    /// Set the recover concurrency for the disk cache store.
187    ///
188    /// Default: `8`.
189    pub fn with_recover_concurrency(mut self, recover_concurrency: usize) -> Self {
190        self.recover_concurrency = recover_concurrency;
191        self
192    }
193
194    /// Set the flusher count for the disk cache store.
195    ///
196    /// The flusher count limits how many blocks can be concurrently written.
197    ///
198    /// Default: `1`.
199    pub fn with_flushers(mut self, flushers: usize) -> Self {
200        self.flushers = flushers;
201        self
202    }
203
204    /// Set the admission filter for th disk cache store.
205    ///
206    /// The admission filter is used to pick the entries that can be inserted into the disk cache store.
207    ///
208    /// Default: Admit all.
209    pub fn with_admission_filter(mut self, filter: StorageFilter) -> Self {
210        self.admission_filter = filter;
211        self
212    }
213
214    /// Set the reclaimer count for the disk cache store.
215    ///
216    /// The reclaimer count limits how many blocks can be concurrently reclaimed.
217    ///
218    /// Default: `1`.
219    pub fn with_reclaimers(mut self, reclaimers: usize) -> Self {
220        self.reclaimers = reclaimers;
221        self
222    }
223
224    /// Set the total flush buffer pool size.
225    ///
226    /// Each flusher shares a volume at `threshold / flushers`.
227    ///
228    /// If the buffer of the flush queue exceeds the threshold, the further entries will be ignored.
229    ///
230    /// Default: 16 MiB.
231    pub fn with_buffer_pool_size(mut self, buffer_pool_size: usize) -> Self {
232        self.buffer_pool_size = buffer_pool_size;
233        self
234    }
235
236    /// Set the blob index size for each blob.
237    ///
238    /// A larger blob index size can hold more blob entries, but it will also increase the io size of each blob part
239    /// write.
240    ///
241    /// NOTE: The size will be aligned up to a multiplier of 4K.
242    ///
243    /// Default: 4 KiB
244    pub fn with_blob_index_size(mut self, blob_index_size: usize) -> Self {
245        let blob_index_size = bits::align_up(PAGE, blob_index_size);
246        self.blob_index_size = blob_index_size;
247        self
248    }
249
250    /// Set the submit queue size threshold.
251    ///
252    /// If the total entry estimated size in the submit queue exceeds the threshold, the further entries will be
253    /// ignored.
254    ///
255    /// Default: `buffer_pool_size` * 2.
256    pub fn with_submit_queue_size_threshold(mut self, submit_queue_size_threshold: usize) -> Self {
257        self.submit_queue_size_threshold = submit_queue_size_threshold;
258        self
259    }
260
261    /// Set the clean block threshold for the disk cache store.
262    ///
263    /// The reclaimers only work when the clean block count is equal to or lower than the clean block threshold.
264    ///
265    /// Default: the same value as the `reclaimers`.
266    pub fn with_clean_block_threshold(mut self, clean_block_threshold: usize) -> Self {
267        self.clean_block_threshold = clean_block_threshold;
268        self
269    }
270
271    /// Set the eviction pickers for th disk cache store.
272    ///
273    /// The eviction picker is used to pick the block to reclaim.
274    ///
275    /// The eviction pickers are applied in order. If the previous eviction picker doesn't pick any block, the next one
276    /// will be applied.
277    ///
278    /// If no eviction picker picks a block, a block will be picked randomly.
279    ///
280    /// Default: [ invalid ratio picker { threshold = 0.8 }, fifo picker ]
281    pub fn with_eviction_pickers(mut self, eviction_pickers: Vec<Box<dyn EvictionPicker>>) -> Self {
282        self.eviction_pickers = eviction_pickers;
283        self
284    }
285
286    /// Set the reinsertion filter for th disk cache store.
287    ///
288    /// The reinsertion filter is used to pick the entries that can be reinsertion into the disk cache store while
289    /// reclaiming.
290    ///
291    /// Note: Only extremely important entries should be picked. If too many entries are picked, both insertion and
292    /// reinsertion will be stuck.
293    ///
294    /// Default: Reject all.
295    pub fn with_reinsertion_filter(mut self, filter: StorageFilter) -> Self {
296        self.reinsertion_filter = filter;
297        self
298    }
299
300    /// Enable the tombstone log.
301    ///
302    /// For updatable cache, either the tombstone log or [`crate::engine::RecoverMode::None`] must be enabled to prevent
303    /// from the phantom entries after reopen.
304    pub fn with_tombstone_log(mut self, enable: bool) -> Self {
305        self.enable_tombstone_log = enable;
306        self
307    }
308
309    /// Pass the flush holder for test.
310    #[cfg(any(test, feature = "test_utils"))]
311    pub fn with_flush_switch(mut self, flush_switch: Switch) -> Self {
312        self.flush_switch = flush_switch;
313        self
314    }
315
316    /// Pass the load holder for test.
317    #[cfg(any(test, feature = "test_utils"))]
318    pub fn with_load_holder(mut self, load_holder: Holder) -> Self {
319        self.load_holder = load_holder;
320        self
321    }
322
323    /// Build the block-based disk cache engine with the given configurations.
324    pub async fn build(
325        self: Box<Self>,
326        EngineBuildContext {
327            io_engine,
328            metrics,
329            spawner: runtime,
330            recover_mode,
331        }: EngineBuildContext,
332    ) -> Result<Arc<BlockEngine<K, V, P>>> {
333        let device = self.device;
334        let block_size = self.block_size;
335
336        let mut tombstones = vec![];
337
338        let tombstone_log = if self.enable_tombstone_log {
339            // TODO(MrCroxx): The tombstone log support multiples partitions for multiple device support.
340            let mut partitions = vec![];
341
342            let max_entries = device.capacity() / PAGE;
343            let pages = max_entries / TombstoneLog::SLOTS_PER_PAGE
344                + if max_entries % TombstoneLog::SLOTS_PER_PAGE > 0 {
345                    1
346                } else {
347                    0
348                };
349            let partition = device.create_partition(pages * PAGE)?;
350            partitions.push(partition);
351
352            let tombstone_log = TombstoneLog::open(partitions, io_engine.clone(), &mut tombstones).await?;
353            Some(tombstone_log)
354        } else {
355            None
356        };
357
358        let indexer = Indexer::new(self.indexer_shards);
359        let submit_queue_size = Arc::<AtomicUsize>::default();
360
361        #[expect(clippy::type_complexity)]
362        let (flushers, rxs): (Vec<Flusher<K, V, P>>, Vec<UnboundedReceiver<Submission<K, V, P>>>) = (0..self.flushers)
363            .map(|id| Flusher::<K, V, P>::new(id, submit_queue_size.clone(), metrics.clone()))
364            .unzip();
365
366        let reclaimer = Reclaimer::new(
367            indexer.clone(),
368            flushers.clone(),
369            Arc::new(self.reinsertion_filter),
370            self.blob_index_size,
371            device.statistics().clone(),
372            runtime.clone(),
373        );
374        let reclaimer: Arc<dyn ReclaimerTrait> = Arc::new(reclaimer);
375
376        let block_manager = BlockManager::open(
377            device.clone(),
378            io_engine,
379            block_size,
380            self.eviction_pickers,
381            reclaimer,
382            self.reclaimers,
383            self.clean_block_threshold,
384            metrics.clone(),
385            runtime.clone(),
386        )?;
387        let blocks = block_manager.blocks();
388
389        if self.flushers + self.clean_block_threshold > blocks / 2 {
390            tracing::warn!("[block engine]: block-based object disk cache stable blocks count is too small, flusher [{flushers}] + clean block threshold [{clean_block_threshold}] (default = reclaimers) is supposed to be much larger than the block count [{blocks}]",
391                    flushers = self.flushers,
392                    clean_block_threshold = self.clean_block_threshold,
393                );
394        }
395
396        let sequence = AtomicSequence::default();
397
398        RecoverRunner::run(
399            self.recover_concurrency,
400            recover_mode,
401            self.blob_index_size,
402            (0..blocks as BlockId).collect_vec(),
403            &sequence,
404            &indexer,
405            &block_manager,
406            &tombstones,
407            runtime.clone(),
408            metrics.clone(),
409        )
410        .await?;
411
412        let io_buffer_size = self.buffer_pool_size / self.flushers;
413        for (flusher, rx) in flushers.iter().zip(rxs.into_iter()) {
414            flusher.run(
415                rx,
416                block_size,
417                io_buffer_size,
418                self.blob_index_size,
419                self.compression,
420                indexer.clone(),
421                block_manager.clone(),
422                tombstone_log.clone(),
423                metrics.clone(),
424                &runtime,
425                #[cfg(any(test, feature = "test_utils"))]
426                self.flush_switch.clone(),
427            )?;
428        }
429
430        let admission_filter = self.admission_filter.with_condition(IoThrottle);
431
432        let inner = BlockEngineInner {
433            admission_filter,
434            device,
435            indexer,
436            block_manager,
437            flushers,
438            submit_queue_size,
439            submit_queue_size_threshold: self.submit_queue_size_threshold,
440            sequence,
441            _spawner: runtime,
442            active: AtomicBool::new(true),
443            metrics,
444            #[cfg(any(test, feature = "test_utils"))]
445            flush_switch: self.flush_switch,
446            #[cfg(any(test, feature = "test_utils"))]
447            load_holder: self.load_holder,
448        };
449        let inner = Arc::new(inner);
450        let engine = BlockEngine { inner };
451        let engine = Arc::new(engine);
452        Ok(engine)
453    }
454}
455
456impl<K, V, P> EngineConfig<K, V, P> for BlockEngineConfig<K, V, P>
457where
458    K: StorageKey,
459    V: StorageValue,
460    P: Properties,
461{
462    fn build(self: Box<Self>, ctx: EngineBuildContext) -> BoxFuture<'static, Result<Arc<dyn Engine<K, V, P>>>> {
463        async move { self.build(ctx).await.map(|e| e as Arc<dyn Engine<K, V, P>>) }.boxed()
464    }
465}
466
467impl<K, V, P> From<BlockEngineConfig<K, V, P>> for Box<dyn EngineConfig<K, V, P>>
468where
469    K: StorageKey,
470    V: StorageValue,
471    P: Properties,
472{
473    fn from(builder: BlockEngineConfig<K, V, P>) -> Self {
474        builder.boxed()
475    }
476}
477
478/// Block-based disk cache engine.
479pub struct BlockEngine<K, V, P>
480where
481    K: StorageKey,
482    V: StorageValue,
483    P: Properties,
484{
485    inner: Arc<BlockEngineInner<K, V, P>>,
486}
487
488impl<K, V, P> Debug for BlockEngine<K, V, P>
489where
490    K: StorageKey,
491    V: StorageValue,
492    P: Properties,
493{
494    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
495        f.debug_struct("GenericStore").finish()
496    }
497}
498
499struct BlockEngineInner<K, V, P>
500where
501    K: StorageKey,
502    V: StorageValue,
503    P: Properties,
504{
505    admission_filter: StorageFilter,
506
507    device: Arc<dyn Device>,
508
509    indexer: Indexer,
510    block_manager: BlockManager,
511
512    flushers: Vec<Flusher<K, V, P>>,
513
514    submit_queue_size: Arc<AtomicUsize>,
515    submit_queue_size_threshold: usize,
516
517    sequence: AtomicSequence,
518
519    _spawner: Spawner,
520
521    active: AtomicBool,
522
523    metrics: Arc<Metrics>,
524
525    #[cfg(any(test, feature = "test_utils"))]
526    flush_switch: Switch,
527
528    #[cfg(any(test, feature = "test_utils"))]
529    load_holder: Holder,
530}
531
532impl<K, V, P> Clone for BlockEngine<K, V, P>
533where
534    K: StorageKey,
535    V: StorageValue,
536    P: Properties,
537{
538    fn clone(&self) -> Self {
539        Self {
540            inner: self.inner.clone(),
541        }
542    }
543}
544
545impl<K, V, P> BlockEngine<K, V, P>
546where
547    K: StorageKey,
548    V: StorageValue,
549    P: Properties,
550{
551    fn wait(&self) -> impl Future<Output = ()> + Send + 'static {
552        let flushers = self.inner.flushers.clone();
553        let block_manager = self.inner.block_manager.clone();
554        async move {
555            join_all(flushers.iter().map(|flusher| flusher.wait())).await;
556            block_manager.wait_reclaim().await;
557        }
558    }
559
560    fn close(&self) -> BoxFuture<'static, Result<()>> {
561        let this = self.clone();
562        async move {
563            this.inner.active.store(false, Ordering::Relaxed);
564            this.wait().await;
565            Ok(())
566        }
567        .boxed()
568    }
569
570    #[cfg_attr(feature = "tracing", trace(name = "foyer::storage::engine::block::generic::enqueue"))]
571    fn enqueue(&self, piece: PieceRef<K, V, P>, estimated_size: usize) {
572        if !self.inner.active.load(Ordering::Relaxed) {
573            tracing::warn!("cannot enqueue new entry after closed");
574            return;
575        }
576
577        tracing::trace!(
578            hash = piece.hash(),
579            age = ?piece.properties().age().unwrap_or_default(),
580            "[block engine]: enqueue"
581        );
582        match piece.properties().age().unwrap_or_default() {
583            Age::Fresh | Age::Old => {}
584            Age::Young => {
585                // skip write block engine if the entry is still young
586                self.inner.metrics.storage_block_engine_enqueue_skip.increase(1);
587                return;
588            }
589        }
590
591        if self.inner.submit_queue_size.load(Ordering::Relaxed) > self.inner.submit_queue_size_threshold {
592            self.inner.metrics.storage_queue_channel_overflow.increase(1);
593            return;
594        }
595
596        let sequence = self.inner.sequence.fetch_add(1, Ordering::Relaxed);
597
598        self.inner.flushers[piece.hash() as usize % self.inner.flushers.len()].submit(Submission::CacheEntry {
599            piece,
600            estimated_size,
601            sequence,
602        });
603    }
604
605    fn load(&self, hash: u64) -> impl Future<Output = Result<Load<K, V, P>>> + Send + 'static {
606        tracing::trace!(hash, "[block engine]: load");
607
608        #[cfg(any(test, feature = "test_utils"))]
609        let load_holer = self.inner.load_holder.wait();
610
611        let indexer = self.inner.indexer.clone();
612        let metrics = self.inner.metrics.clone();
613        let block_manager = self.inner.block_manager.clone();
614
615        let load = async move {
616            #[cfg(any(test, feature = "test_utils"))]
617            load_holer.await;
618
619            let addr = match indexer.get(hash) {
620                Some(addr) => addr,
621                None => {
622                    return Ok(Load::Miss);
623                }
624            };
625
626            tracing::trace!(hash, ?addr, "[block engine]: load");
627
628            let block = block_manager.block(addr.block);
629            if block.partition().statistics().is_read_throttled() {
630                return Ok(Load::Throttled);
631            }
632
633            let buf = IoSliceMut::new(bits::align_up(PAGE, addr.len as _));
634            let (buf, res) = block.read(Box::new(buf), addr.offset as _).await;
635            match res {
636                Ok(_) => {}
637                Err(e) => {
638                    tracing::error!(hash, ?addr, ?e, "[block engine load]: load error");
639                    return Err(e);
640                }
641            }
642
643            let header = match EntryHeader::read(&buf[..EntryHeader::serialized_len()]) {
644                Ok(header) => header,
645                Err(e) => {
646                    return match e.kind() {
647                        ErrorKind::Parse
648                        | ErrorKind::MagicMismatch
649                        | ErrorKind::ChecksumMismatch
650                        | ErrorKind::OutOfRange => {
651                            tracing::warn!(
652                                hash,
653                                ?addr,
654                                ?e,
655                                "[block engine load]: deserialize read buffer raise error, remove this entry and skip"
656                            );
657                            indexer.remove(hash);
658                            Ok(Load::Miss)
659                        }
660                        _ => {
661                            tracing::error!(hash, ?addr, ?e, "[block engine load]: load error");
662                            Err(e)
663                        }
664                    }
665                }
666            };
667
668            let (key, value) = {
669                let now = Instant::now();
670                let res = match EntryDeserializer::deserialize::<K, V>(
671                    &buf[EntryHeader::serialized_len()..],
672                    header.key_len as _,
673                    header.value_len as _,
674                    header.compression,
675                    Some(header.checksum),
676                ) {
677                    Ok(res) => res,
678                    Err(e) => {
679                        return match e.kind() {
680                            ErrorKind::MagicMismatch | ErrorKind::ChecksumMismatch | ErrorKind::OutOfRange => {
681                                tracing::warn!(
682                                hash,
683                                ?addr,
684                                ?header,
685                                ?e,
686                                "[block engine load]: deserialize read buffer raise error, remove this entry and skip"
687                            );
688                                indexer.remove(hash);
689                                Ok(Load::Miss)
690                            }
691                            _ => {
692                                tracing::error!(hash, ?addr, ?header, ?e, "[block engine load]: load error");
693                                Err(e)
694                            }
695                        }
696                    }
697                };
698                metrics
699                    .storage_entry_deserialize_duration
700                    .record(now.elapsed().as_secs_f64());
701                res
702            };
703
704            let age = match block.statistics().probation.load(Ordering::Relaxed) {
705                true => Age::Old,
706                false => Age::Young,
707            };
708
709            Ok(Load::Entry {
710                key,
711                value,
712                populated: Populated { age },
713            })
714        };
715        #[cfg(feature = "tracing")]
716        let load = load.in_span(Span::enter_with_local_parent(
717            "foyer::storage::engine::block::generic::load",
718        ));
719        load
720    }
721
722    fn delete(&self, hash: u64) {
723        if !self.inner.active.load(Ordering::Relaxed) {
724            tracing::warn!("cannot delete entry after closed");
725            return;
726        }
727
728        let sequence = self.inner.sequence.fetch_add(1, Ordering::Relaxed);
729        let stats = self
730            .inner
731            .indexer
732            .insert_tombstone(hash, sequence)
733            .map(|addr| InvalidStats {
734                block: addr.block,
735                size: bits::align_up(PAGE, addr.len as usize),
736            });
737
738        let this = self.clone();
739
740        this.inner.flushers[hash as usize % this.inner.flushers.len()].submit(Submission::Tombstone {
741            tombstone: Tombstone { hash, sequence },
742            stats,
743        });
744    }
745
746    fn may_contains(&self, hash: u64) -> bool {
747        self.inner.indexer.get(hash).is_some()
748    }
749
750    fn destroy(&self) -> BoxFuture<'static, Result<()>> {
751        let this = self.clone();
752        async move {
753            if !this.inner.active.load(Ordering::Relaxed) {
754                return Err(Error::new(ErrorKind::Closed, "cannot delete entry after closed"));
755            }
756
757            // Write a tombstone to clear tombstone log by increase the max sequence.
758            let sequence = this.inner.sequence.fetch_add(1, Ordering::Relaxed);
759
760            this.inner.flushers[0].submit(Submission::Tombstone {
761                tombstone: Tombstone { hash: 0, sequence },
762                stats: None,
763            });
764            this.wait().await;
765
766            // Clear indices.
767            //
768            // This step must perform after the latest writer finished,
769            // otherwise the indices of the latest batch cannot be cleared.
770            this.inner.indexer.clear();
771
772            // Clean blocks.
773            try_join_all((0..this.inner.block_manager.blocks() as BlockId).map(|id| {
774                let block = this.inner.block_manager.block(id).clone();
775                async move {
776                    let res = BlockCleaner::clean(&block).await;
777                    block.statistics().reset();
778                    res
779                }
780            }))
781            .await?;
782
783            Ok(())
784        }
785        .boxed()
786    }
787
788    #[cfg(any(test, feature = "test_utils"))]
789    pub fn hold_flush(&self) {
790        self.inner.flush_switch.on();
791    }
792
793    #[cfg(any(test, feature = "test_utils"))]
794    pub fn unhold_flush(&self) {
795        self.inner.flush_switch.off();
796    }
797}
798
799impl<K, V, P> Engine<K, V, P> for BlockEngine<K, V, P>
800where
801    K: StorageKey,
802    V: StorageValue,
803    P: Properties,
804{
805    fn device(&self) -> &Arc<dyn Device> {
806        &self.inner.device
807    }
808
809    fn filter(&self, hash: u64, estimated_size: usize) -> StorageFilterResult {
810        self.inner
811            .admission_filter
812            .filter(self.inner.device.statistics(), hash, estimated_size)
813    }
814
815    fn enqueue(&self, piece: PieceRef<K, V, P>, estimated_size: usize) {
816        self.enqueue(piece, estimated_size);
817    }
818
819    fn load(&self, hash: u64) -> BoxFuture<'static, Result<Load<K, V, P>>> {
820        // TODO(MrCroxx): refactor this.
821        self.load(hash).boxed()
822    }
823
824    fn delete(&self, hash: u64) {
825        self.delete(hash);
826    }
827
828    fn may_contains(&self, hash: u64) -> bool {
829        self.may_contains(hash)
830    }
831
832    fn destroy(&self) -> BoxFuture<'static, Result<()>> {
833        self.destroy()
834    }
835
836    fn wait(&self) -> BoxFuture<'static, ()> {
837        // TODO(MrCroxx): refactor this.
838        self.wait().boxed()
839    }
840
841    fn close(&self) -> BoxFuture<'static, Result<()>> {
842        self.close()
843    }
844}
845
846#[cfg(test)]
847mod tests {
848
849    use std::{fs::File, path::Path};
850
851    use bytesize::ByteSize;
852    use foyer_common::hasher::ModHasher;
853    use foyer_memory::{Cache, CacheBuilder, CacheEntry, FifoConfig, TestProperties};
854    use itertools::Itertools;
855
856    use super::*;
857    use crate::{
858        engine::RecoverMode,
859        io::{
860            device::{combined::CombinedDeviceBuilder, fs::FsDeviceBuilder, DeviceBuilder},
861            engine::{IoEngine, IoEngineBuildContext, IoEngineConfig},
862        },
863        serde::EntrySerializer,
864        test_utils::Biased,
865        PsyncIoEngineConfig, RejectAll,
866    };
867
868    const KB: usize = 1024;
869
870    fn cache_for_test() -> Cache<u64, Vec<u8>, ModHasher, TestProperties> {
871        CacheBuilder::new(10)
872            .with_shards(1)
873            .with_eviction_config(FifoConfig::default())
874            .with_hash_builder(ModHasher::default())
875            .build()
876    }
877
878    async fn io_engine_for_test(spawner: Spawner) -> Arc<dyn IoEngine> {
879        // TODO(MrCroxx): Test with other io engines.
880        PsyncIoEngineConfig::new()
881            .boxed()
882            .build(IoEngineBuildContext { spawner })
883            .await
884            .unwrap()
885    }
886
887    /// 4 files, fifo eviction, 16 KiB block, 64 KiB capacity.
888    async fn engine_for_test(dir: impl AsRef<Path>) -> Arc<BlockEngine<u64, Vec<u8>, TestProperties>> {
889        store_for_test_with_reinsertion_filter(dir, StorageFilter::new().with_condition(RejectAll)).await
890    }
891
892    async fn store_for_test_with_reinsertion_filter(
893        dir: impl AsRef<Path>,
894        reinsertion_filter: StorageFilter,
895    ) -> Arc<BlockEngine<u64, Vec<u8>, TestProperties>> {
896        let device = FsDeviceBuilder::new(dir)
897            .with_capacity(ByteSize::kib(64).as_u64() as _)
898            .build()
899            .unwrap();
900        let spawner = Spawner::current();
901        let io_engine = io_engine_for_test(spawner.clone()).await;
902        let metrics = Arc::new(Metrics::noop());
903        let builder = BlockEngineConfig {
904            device,
905            block_size: 16 * 1024,
906            compression: Compression::None,
907            indexer_shards: 4,
908            recover_concurrency: 2,
909            flushers: 1,
910            reclaimers: 1,
911            clean_block_threshold: 1,
912            admission_filter: StorageFilter::new(),
913            eviction_pickers: vec![Box::<FifoPicker>::default()],
914            reinsertion_filter,
915            enable_tombstone_log: false,
916            buffer_pool_size: 16 * 1024 * 1024,
917            blob_index_size: 4 * 1024,
918            submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
919            flush_switch: Switch::default(),
920            load_holder: Holder::default(),
921            marker: PhantomData,
922        };
923
924        let builder = Box::new(builder);
925        builder
926            .build(EngineBuildContext {
927                io_engine,
928                metrics,
929                spawner,
930                recover_mode: RecoverMode::Strict,
931            })
932            .await
933            .unwrap()
934    }
935
936    async fn store_for_test_with_tombstone_log(
937        dir: impl AsRef<Path>,
938    ) -> Arc<BlockEngine<u64, Vec<u8>, TestProperties>> {
939        let device = FsDeviceBuilder::new(dir)
940            .with_capacity(ByteSize::kib(64).as_u64() as usize + ByteSize::kib(4).as_u64() as usize)
941            .build()
942            .unwrap();
943        let spawner = Spawner::current();
944        let io_engine = io_engine_for_test(spawner.clone()).await;
945        let metrics = Arc::new(Metrics::noop());
946        let builder = BlockEngineConfig {
947            device,
948            block_size: 16 * 1024,
949            compression: Compression::None,
950            indexer_shards: 4,
951            recover_concurrency: 2,
952            flushers: 1,
953            reclaimers: 1,
954            clean_block_threshold: 1,
955            eviction_pickers: vec![Box::<FifoPicker>::default()],
956            admission_filter: StorageFilter::new(),
957            reinsertion_filter: StorageFilter::new().with_condition(RejectAll),
958            enable_tombstone_log: true,
959            buffer_pool_size: 16 * 1024 * 1024,
960            blob_index_size: 4 * 1024,
961            submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
962            flush_switch: Switch::default(),
963            load_holder: Holder::default(),
964            marker: PhantomData,
965        };
966        let builder = Box::new(builder);
967        builder
968            .build(EngineBuildContext {
969                io_engine,
970                metrics,
971                spawner,
972                recover_mode: RecoverMode::Strict,
973            })
974            .await
975            .unwrap()
976    }
977
978    fn enqueue(
979        store: &BlockEngine<u64, Vec<u8>, TestProperties>,
980        entry: CacheEntry<u64, Vec<u8>, ModHasher, TestProperties>,
981    ) {
982        let estimated_size = EntrySerializer::estimated_size(entry.key(), entry.value());
983        store.enqueue(entry.piece().into(), estimated_size);
984    }
985
986    #[test_log::test(tokio::test)]
987    async fn test_store_enqueue_lookup_recovery() {
988        let dir = tempfile::tempdir().unwrap();
989
990        let memory = cache_for_test();
991        let store = engine_for_test(dir.path()).await;
992
993        // [ [e1, e2], [], [], [] ]
994        store.hold_flush();
995        let e1 = memory.insert(1, vec![1; 7 * KB]);
996        let e2 = memory.insert(2, vec![2; 3 * KB]);
997        enqueue(&store, e1.clone());
998        enqueue(&store, e2);
999        store.unhold_flush();
1000        store.wait().await;
1001
1002        let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
1003        assert_eq!(r1, (1, vec![1; 7 * KB]));
1004        let r2 = store.load(memory.hash(&2)).await.unwrap().kv().unwrap();
1005        assert_eq!(r2, (2, vec![2; 3 * KB]));
1006
1007        // [ [e1, e2], [e3, e4], [], [] ]
1008        store.hold_flush();
1009        let e3 = memory.insert(3, vec![3; 7 * KB]);
1010        let e4 = memory.insert(4, vec![4; 2 * KB]);
1011        enqueue(&store, e3);
1012        enqueue(&store, e4);
1013        store.unhold_flush();
1014        store.wait().await;
1015
1016        let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
1017        assert_eq!(r1, (1, vec![1; 7 * KB]));
1018        let r2 = store.load(memory.hash(&2)).await.unwrap().kv().unwrap();
1019        assert_eq!(r2, (2, vec![2; 3 * KB]));
1020        let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
1021        assert_eq!(r3, (3, vec![3; 7 * KB]));
1022        let r4 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
1023        assert_eq!(r4, (4, vec![4; 2 * KB]));
1024
1025        // [ [e1, e2], [e3, e4], [e5], [] ]
1026        let e5 = memory.insert(5, vec![5; 11 * KB]);
1027        enqueue(&store, e5);
1028        store.wait().await;
1029
1030        let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
1031        assert_eq!(r1, (1, vec![1; 7 * KB]));
1032        let r2 = store.load(memory.hash(&2)).await.unwrap().kv().unwrap();
1033        assert_eq!(r2, (2, vec![2; 3 * KB]));
1034        let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
1035        assert_eq!(r3, (3, vec![3; 7 * KB]));
1036        let r4 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
1037        assert_eq!(r4, (4, vec![4; 2 * KB]));
1038        let r5 = store.load(memory.hash(&5)).await.unwrap().kv().unwrap();
1039        assert_eq!(r5, (5, vec![5; 11 * KB]));
1040
1041        // [ [], [e3, e4], [e5], [e6, e4*] ]
1042        store.hold_flush();
1043        let e6 = memory.insert(6, vec![6; 7 * KB]);
1044        let e4v2 = memory.insert(4, vec![!4; 3 * KB]);
1045        enqueue(&store, e6);
1046        enqueue(&store, e4v2);
1047        store.unhold_flush();
1048        store.wait().await;
1049
1050        assert!(store.load(memory.hash(&1)).await.unwrap().kv().is_none());
1051        assert!(store.load(memory.hash(&2)).await.unwrap().kv().is_none());
1052        let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
1053        assert_eq!(r3, (3, vec![3; 7 * KB]));
1054        let r4v2 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
1055        assert_eq!(r4v2, (4, vec![!4; 3 * KB]));
1056        let r5 = store.load(memory.hash(&5)).await.unwrap().kv().unwrap();
1057        assert_eq!(r5, (5, vec![5; 11 * KB]));
1058        let r6 = store.load(memory.hash(&6)).await.unwrap().kv().unwrap();
1059        assert_eq!(r6, (6, vec![6; 7 * KB]));
1060
1061        store.close().await.unwrap();
1062        enqueue(&store, e1);
1063        store.wait().await;
1064
1065        drop(store);
1066
1067        let store = engine_for_test(dir.path()).await;
1068
1069        assert!(store.load(memory.hash(&1)).await.unwrap().kv().is_none());
1070        assert!(store.load(memory.hash(&2)).await.unwrap().kv().is_none());
1071        let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
1072        assert_eq!(r3, (3, vec![3; 7 * KB]));
1073        let r4v2 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
1074        assert_eq!(r4v2, (4, vec![!4; 3 * KB]));
1075        let r5 = store.load(memory.hash(&5)).await.unwrap().kv().unwrap();
1076        assert_eq!(r5, (5, vec![5; 11 * KB]));
1077        let r6 = store.load(memory.hash(&6)).await.unwrap().kv().unwrap();
1078        assert_eq!(r6, (6, vec![6; 7 * KB]));
1079    }
1080
1081    #[test_log::test(tokio::test)]
1082    async fn test_store_delete_recovery() {
1083        let dir = tempfile::tempdir().unwrap();
1084
1085        let memory = cache_for_test();
1086        let store = store_for_test_with_tombstone_log(dir.path()).await;
1087
1088        let es = (0..10).map(|i| memory.insert(i, vec![i as u8; 3 * KB])).collect_vec();
1089
1090        // [[0, 1, 2], [3, 4, 5], [6, 7, 8], []]
1091        for e in es.iter().take(9) {
1092            enqueue(&store, e.clone());
1093        }
1094        store.wait().await;
1095
1096        for i in 0..9 {
1097            assert_eq!(
1098                store.load(memory.hash(&i)).await.unwrap().kv(),
1099                Some((i, vec![i as u8; 3 * KB]))
1100            );
1101        }
1102
1103        store.delete(memory.hash(&3));
1104        store.wait().await;
1105        assert_eq!(store.load(memory.hash(&3)).await.unwrap().kv(), None);
1106
1107        store.close().await.unwrap();
1108        drop(store);
1109
1110        let store = store_for_test_with_tombstone_log(dir.path()).await;
1111        for i in 0..9 {
1112            if i != 3 {
1113                assert_eq!(
1114                    store.load(memory.hash(&i)).await.unwrap().kv(),
1115                    Some((i, vec![i as u8; 3 * KB]))
1116                );
1117            } else {
1118                assert_eq!(store.load(memory.hash(&3)).await.unwrap().kv(), None);
1119            }
1120        }
1121
1122        enqueue(&store, es[3].clone());
1123        store.wait().await;
1124        assert_eq!(
1125            store.load(memory.hash(&3)).await.unwrap().kv(),
1126            Some((3, vec![3; 3 * KB]))
1127        );
1128
1129        store.close().await.unwrap();
1130        drop(store);
1131
1132        let store = store_for_test_with_tombstone_log(dir.path()).await;
1133
1134        assert_eq!(
1135            store.load(memory.hash(&3)).await.unwrap().kv(),
1136            Some((3, vec![3; 3 * KB]))
1137        );
1138    }
1139
1140    #[test_log::test(tokio::test)]
1141    async fn test_store_destroy_recovery() {
1142        let dir = tempfile::tempdir().unwrap();
1143
1144        let memory = cache_for_test();
1145        let store = store_for_test_with_tombstone_log(dir.path()).await;
1146
1147        let es = (0..10).map(|i| memory.insert(i, vec![i as u8; 3 * KB])).collect_vec();
1148
1149        // [[0, 1, 2], [3, 4, 5], [6, 7, 8], []]
1150        store.hold_flush();
1151        for e in es.iter().take(9) {
1152            enqueue(&store, e.clone());
1153        }
1154        store.unhold_flush();
1155        store.wait().await;
1156
1157        for i in 0..9 {
1158            assert_eq!(
1159                store.load(memory.hash(&i)).await.unwrap().kv(),
1160                Some((i, vec![i as u8; 3 * KB]))
1161            );
1162        }
1163
1164        store.delete(memory.hash(&3));
1165        store.wait().await;
1166        assert_eq!(store.load(memory.hash(&3)).await.unwrap().kv(), None);
1167
1168        store.destroy().await.unwrap();
1169
1170        store.close().await.unwrap();
1171        drop(store);
1172
1173        let store = store_for_test_with_tombstone_log(dir.path()).await;
1174        for i in 0..9 {
1175            assert_eq!(store.load(memory.hash(&i)).await.unwrap().kv(), None);
1176        }
1177
1178        enqueue(&store, es[3].clone());
1179        store.wait().await;
1180        assert_eq!(
1181            store.load(memory.hash(&3)).await.unwrap().kv(),
1182            Some((3, vec![3; 3 * KB]))
1183        );
1184
1185        store.close().await.unwrap();
1186        drop(store);
1187
1188        let store = store_for_test_with_tombstone_log(dir.path()).await;
1189
1190        assert_eq!(
1191            store.load(memory.hash(&3)).await.unwrap().kv(),
1192            Some((3, vec![3; 3 * KB]))
1193        );
1194    }
1195
1196    // FIXME(MrCroxx): Move the admission test to store level.
1197    // #[test_log::test(tokio::test)]
1198    // async fn test_store_admission() {
1199    //     let dir = tempfile::tempdir().unwrap();
1200
1201    //     let memory = cache_for_test();
1202    //     let store = store_for_test_with_admission_picker(&memory, dir.path(),
1203    // Arc::new(BiasedPicker::new([1]))).await;
1204
1205    //     let e1 = memory.insert(1, vec![1; 7 * KB]);
1206    //     let e2 = memory.insert(2, vec![2; 7 * KB]);
1207
1208    //     assert!(enqueue(&store, e1.clone(),).await.unwrap());
1209    //     assert!(!enqueue(&store, e2,).await.unwrap());
1210
1211    //     let r1 = store.load(&1).await.unwrap().unwrap();
1212    //     assert_eq!(r1, (1, vec![1; 7 * KB]));
1213    //     assert!(store.load(&2).await.unwrap().is_none());
1214    // }
1215
1216    #[test_log::test(tokio::test)]
1217    async fn test_store_reinsertion() {
1218        let dir = tempfile::tempdir().unwrap();
1219
1220        let memory = cache_for_test();
1221        let store = store_for_test_with_reinsertion_filter(
1222            dir.path(),
1223            StorageFilter::new().with_condition(Biased::new(vec![1, 3, 5, 7, 9, 11, 13, 15, 17, 19])),
1224        )
1225        .await;
1226
1227        let es = (0..15).map(|i| memory.insert(i, vec![i as u8; 3 * KB])).collect_vec();
1228
1229        // [[(0), (1), (2)], [(3), (4), (5)], [(6), (7), (8)], []]
1230        for e in es.iter().take(9).cloned() {
1231            enqueue(&store, e);
1232            store.wait().await;
1233        }
1234
1235        for i in 0..9 {
1236            let r = store.load(memory.hash(&i)).await.unwrap().kv().unwrap();
1237            assert_eq!(r, (i, vec![i as u8; 3 * KB]));
1238        }
1239
1240        // [[], [(3), (4), (5)], [(6), (7), (8)], [(9), (10), (1)]]
1241        enqueue(&store, es[9].clone());
1242        enqueue(&store, es[10].clone());
1243        store.wait().await;
1244        let mut res = vec![];
1245        for i in 0..11 {
1246            res.push(store.load(memory.hash(&i)).await.unwrap().kv());
1247        }
1248        assert_eq!(
1249            res,
1250            vec![
1251                None,
1252                Some((1, vec![1; 3 * KB])),
1253                None,
1254                Some((3, vec![3; 3 * KB])),
1255                Some((4, vec![4; 3 * KB])),
1256                Some((5, vec![5; 3 * KB])),
1257                Some((6, vec![6; 3 * KB])),
1258                Some((7, vec![7; 3 * KB])),
1259                Some((8, vec![8; 3 * KB])),
1260                Some((9, vec![9; 3 * KB])),
1261                Some((10, vec![10; 3 * KB])),
1262            ]
1263        );
1264
1265        // [[(11), (3), (5)], [], [(6), (7), (8)], [(9), (10), (1)]]
1266        enqueue(&store, es[11].clone());
1267        store.wait().await;
1268        let mut res = vec![];
1269        for i in 0..12 {
1270            res.push(store.load(memory.hash(&i)).await.unwrap().kv());
1271        }
1272        assert_eq!(
1273            res,
1274            vec![
1275                None,
1276                Some((1, vec![1; 3 * KB])),
1277                None,
1278                Some((3, vec![3; 3 * KB])),
1279                None,
1280                Some((5, vec![5; 3 * KB])),
1281                Some((6, vec![6; 3 * KB])),
1282                Some((7, vec![7; 3 * KB])),
1283                Some((8, vec![8; 3 * KB])),
1284                Some((9, vec![9; 3 * KB])),
1285                Some((10, vec![10; 3 * KB])),
1286                Some((11, vec![11; 3 * KB])),
1287            ]
1288        );
1289
1290        // [[(11), (3), (5)], [(12), (13), (14)], [], [(9), (10), (1)]]
1291        store.delete(memory.hash(&7));
1292        store.wait().await;
1293        enqueue(&store, es[12].clone());
1294        store.wait().await;
1295        enqueue(&store, es[13].clone());
1296        store.wait().await;
1297        enqueue(&store, es[14].clone());
1298        store.wait().await;
1299        let mut res = vec![];
1300        for i in 0..15 {
1301            res.push(store.load(memory.hash(&i)).await.unwrap().kv());
1302        }
1303        assert_eq!(
1304            res,
1305            vec![
1306                None,
1307                Some((1, vec![1; 3 * KB])),
1308                None,
1309                Some((3, vec![3; 3 * KB])),
1310                None,
1311                Some((5, vec![5; 3 * KB])),
1312                None,
1313                None,
1314                None,
1315                Some((9, vec![9; 3 * KB])),
1316                Some((10, vec![10; 3 * KB])),
1317                Some((11, vec![11; 3 * KB])),
1318                Some((12, vec![12; 3 * KB])),
1319                Some((13, vec![13; 3 * KB])),
1320                Some((14, vec![14; 3 * KB])),
1321            ]
1322        );
1323    }
1324
1325    #[test_log::test(tokio::test)]
1326    async fn test_store_magic_checksum_mismatch() {
1327        let dir = tempfile::tempdir().unwrap();
1328
1329        let memory = cache_for_test();
1330        let store = engine_for_test(dir.path()).await;
1331
1332        // write entry 1
1333        let e1 = memory.insert(1, vec![1; 7 * KB]);
1334        enqueue(&store, e1);
1335        store.wait().await;
1336
1337        // check entry 1
1338        let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
1339        assert_eq!(r1, (1, vec![1; 7 * KB]));
1340
1341        // corrupt entry and header
1342        for entry in std::fs::read_dir(dir.path()).unwrap() {
1343            let entry = entry.unwrap();
1344            if !entry.metadata().unwrap().is_file() {
1345                continue;
1346            }
1347
1348            let file = File::options().write(true).open(entry.path()).unwrap();
1349            #[cfg(target_family = "unix")]
1350            {
1351                use std::os::unix::fs::FileExt;
1352                file.write_all_at(&[b'x'; 42], 5 * 1024).unwrap();
1353            }
1354            #[cfg(target_family = "windows")]
1355            {
1356                use std::os::windows::fs::FileExt;
1357                file.seek_write(&[b'x'; 42], 5 * 1024).unwrap();
1358            }
1359        }
1360
1361        assert!(store.load(memory.hash(&1)).await.unwrap().kv().is_none());
1362    }
1363
1364    #[test_log::test(tokio::test)]
1365    async fn test_aggregated_device() {
1366        let dir = tempfile::tempdir().unwrap();
1367
1368        const KB: usize = 1024;
1369        const MB: usize = 1024 * 1024;
1370
1371        let spawner = Spawner::current();
1372        let io_engine = io_engine_for_test(spawner.clone()).await;
1373
1374        let d1 = FsDeviceBuilder::new(dir.path().join("dev1"))
1375            .with_capacity(MB)
1376            .build()
1377            .unwrap();
1378        let d2 = FsDeviceBuilder::new(dir.path().join("dev2"))
1379            .with_capacity(2 * MB)
1380            .build()
1381            .unwrap();
1382        let d3 = FsDeviceBuilder::new(dir.path().join("dev3"))
1383            .with_capacity(4 * MB)
1384            .build()
1385            .unwrap();
1386        let device = CombinedDeviceBuilder::new()
1387            .with_device(d1)
1388            .with_device(d2)
1389            .with_device(d3)
1390            .build()
1391            .unwrap();
1392        let engine = BlockEngineConfig::<u64, Vec<u8>, TestProperties>::new(device)
1393            .with_block_size(64 * KB)
1394            .boxed()
1395            .build(EngineBuildContext {
1396                io_engine,
1397                metrics: Arc::new(Metrics::noop()),
1398                spawner,
1399                recover_mode: RecoverMode::None,
1400            })
1401            .await
1402            .unwrap();
1403        assert_eq!(engine.inner.block_manager.blocks(), (1 + 2 + 4) * MB / (64 * KB));
1404    }
1405}