1use std::{
16 fmt::Debug,
17 future::Future,
18 marker::PhantomData,
19 sync::{
20 atomic::{AtomicBool, AtomicUsize, Ordering},
21 Arc,
22 },
23 time::Instant,
24};
25
26#[cfg(feature = "tracing")]
27use fastrace::prelude::*;
28use foyer_common::{
29 bits,
30 code::{StorageKey, StorageValue},
31 error::{Error, ErrorKind, Result},
32 metrics::Metrics,
33 properties::{Age, Properties},
34 spawn::Spawner,
35};
36use futures_core::future::BoxFuture;
37use futures_util::{
38 future::{join_all, try_join_all},
39 FutureExt,
40};
41use itertools::Itertools;
42use mea::mpsc::UnboundedReceiver;
43
44use super::{
45 flusher::{Flusher, InvalidStats, Submission},
46 indexer::Indexer,
47 recover::RecoverRunner,
48};
49#[cfg(any(test, feature = "test_utils"))]
50use crate::test_utils::*;
51use crate::{
52 compress::Compression,
53 engine::{
54 block::{
55 eviction::{EvictionPicker, FifoPicker, InvalidRatioPicker},
56 manager::{BlockId, BlockManager},
57 reclaimer::{BlockCleaner, Reclaimer, ReclaimerTrait},
58 serde::{AtomicSequence, EntryHeader},
59 tombstone::{Tombstone, TombstoneLog},
60 },
61 Engine, EngineBuildContext, EngineConfig, Populated,
62 },
63 filter::conditions::IoThrottle,
64 io::{bytes::IoSliceMut, PAGE},
65 keeper::PieceRef,
66 serde::EntryDeserializer,
67 Device, Load, RejectAll, StorageFilter, StorageFilterResult,
68};
69
70pub struct BlockEngineConfig<K, V, P>
79where
80 K: StorageKey,
81 V: StorageValue,
82 P: Properties,
83{
84 device: Arc<dyn Device>,
85 block_size: usize,
86 compression: Compression,
87 indexer_shards: usize,
88 recover_concurrency: usize,
89 flushers: usize,
90 reclaimers: usize,
91 buffer_pool_size: usize,
92 blob_index_size: usize,
93 submit_queue_size_threshold: usize,
94 clean_block_threshold: usize,
95 eviction_pickers: Vec<Box<dyn EvictionPicker>>,
96 admission_filter: StorageFilter,
97 reinsertion_filter: StorageFilter,
98 enable_tombstone_log: bool,
99 #[cfg(any(test, feature = "test_utils"))]
100 flush_switch: Switch,
101 #[cfg(any(test, feature = "test_utils"))]
102 load_holder: Holder,
103 marker: PhantomData<(K, V, P)>,
104}
105
106impl<K, V, P> Debug for BlockEngineConfig<K, V, P>
107where
108 K: StorageKey,
109 V: StorageValue,
110 P: Properties,
111{
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("BlockEngineConfig")
114 .field("device", &self.device)
115 .field("block_size", &self.block_size)
116 .field("compression", &self.compression)
117 .field("indexer_shards", &self.indexer_shards)
118 .field("recover_concurrency", &self.recover_concurrency)
119 .field("flushers", &self.flushers)
120 .field("reclaimers", &self.reclaimers)
121 .field("buffer_pool_size", &self.buffer_pool_size)
122 .field("blob_index_size", &self.blob_index_size)
123 .field("submit_queue_size_threshold", &self.submit_queue_size_threshold)
124 .field("clean_block_threshold", &self.clean_block_threshold)
125 .field("eviction_pickers", &self.eviction_pickers)
126 .field("admission_filter", &self.admission_filter)
127 .field("reinsertion_filter", &self.reinsertion_filter)
128 .field("enable_tombstone_log", &self.enable_tombstone_log)
129 .finish()
130 }
131}
132
133impl<K, V, P> BlockEngineConfig<K, V, P>
134where
135 K: StorageKey,
136 V: StorageValue,
137 P: Properties,
138{
139 pub fn new(device: Arc<dyn Device>) -> Self {
141 Self {
142 device,
143 block_size: 16 * 1024 * 1024, compression: Compression::default(),
145 indexer_shards: 64,
146 recover_concurrency: 8,
147 flushers: 1,
148 reclaimers: 1,
149 buffer_pool_size: 16 * 1024 * 1024, blob_index_size: 4 * 1024, submit_queue_size_threshold: 16 * 1024 * 1024, clean_block_threshold: 1,
153 eviction_pickers: vec![Box::new(InvalidRatioPicker::new(0.8)), Box::<FifoPicker>::default()],
154 admission_filter: StorageFilter::new(),
155 reinsertion_filter: StorageFilter::new().with_condition(RejectAll),
156 enable_tombstone_log: false,
157 #[cfg(any(test, feature = "test_utils"))]
158 flush_switch: Switch::default(),
159 #[cfg(any(test, feature = "test_utils"))]
160 load_holder: Holder::default(),
161 marker: PhantomData,
162 }
163 }
164
165 pub fn with_block_size(mut self, block_size: usize) -> Self {
174 self.block_size = bits::align_up(PAGE, block_size);
175 self
176 }
177
178 pub fn with_indexer_shards(mut self, indexer_shards: usize) -> Self {
182 self.indexer_shards = indexer_shards;
183 self
184 }
185
186 pub fn with_recover_concurrency(mut self, recover_concurrency: usize) -> Self {
190 self.recover_concurrency = recover_concurrency;
191 self
192 }
193
194 pub fn with_flushers(mut self, flushers: usize) -> Self {
200 self.flushers = flushers;
201 self
202 }
203
204 pub fn with_admission_filter(mut self, filter: StorageFilter) -> Self {
210 self.admission_filter = filter;
211 self
212 }
213
214 pub fn with_reclaimers(mut self, reclaimers: usize) -> Self {
220 self.reclaimers = reclaimers;
221 self
222 }
223
224 pub fn with_buffer_pool_size(mut self, buffer_pool_size: usize) -> Self {
232 self.buffer_pool_size = buffer_pool_size;
233 self
234 }
235
236 pub fn with_blob_index_size(mut self, blob_index_size: usize) -> Self {
245 let blob_index_size = bits::align_up(PAGE, blob_index_size);
246 self.blob_index_size = blob_index_size;
247 self
248 }
249
250 pub fn with_submit_queue_size_threshold(mut self, submit_queue_size_threshold: usize) -> Self {
257 self.submit_queue_size_threshold = submit_queue_size_threshold;
258 self
259 }
260
261 pub fn with_clean_block_threshold(mut self, clean_block_threshold: usize) -> Self {
267 self.clean_block_threshold = clean_block_threshold;
268 self
269 }
270
271 pub fn with_eviction_pickers(mut self, eviction_pickers: Vec<Box<dyn EvictionPicker>>) -> Self {
282 self.eviction_pickers = eviction_pickers;
283 self
284 }
285
286 pub fn with_reinsertion_filter(mut self, filter: StorageFilter) -> Self {
296 self.reinsertion_filter = filter;
297 self
298 }
299
300 pub fn with_tombstone_log(mut self, enable: bool) -> Self {
305 self.enable_tombstone_log = enable;
306 self
307 }
308
309 #[cfg(any(test, feature = "test_utils"))]
311 pub fn with_flush_switch(mut self, flush_switch: Switch) -> Self {
312 self.flush_switch = flush_switch;
313 self
314 }
315
316 #[cfg(any(test, feature = "test_utils"))]
318 pub fn with_load_holder(mut self, load_holder: Holder) -> Self {
319 self.load_holder = load_holder;
320 self
321 }
322
323 pub async fn build(
325 self: Box<Self>,
326 EngineBuildContext {
327 io_engine,
328 metrics,
329 spawner: runtime,
330 recover_mode,
331 }: EngineBuildContext,
332 ) -> Result<Arc<BlockEngine<K, V, P>>> {
333 let device = self.device;
334 let block_size = self.block_size;
335
336 let mut tombstones = vec![];
337
338 let tombstone_log = if self.enable_tombstone_log {
339 let mut partitions = vec![];
341
342 let max_entries = device.capacity() / PAGE;
343 let pages = max_entries / TombstoneLog::SLOTS_PER_PAGE
344 + if max_entries % TombstoneLog::SLOTS_PER_PAGE > 0 {
345 1
346 } else {
347 0
348 };
349 let partition = device.create_partition(pages * PAGE)?;
350 partitions.push(partition);
351
352 let tombstone_log = TombstoneLog::open(partitions, io_engine.clone(), &mut tombstones).await?;
353 Some(tombstone_log)
354 } else {
355 None
356 };
357
358 let indexer = Indexer::new(self.indexer_shards);
359 let submit_queue_size = Arc::<AtomicUsize>::default();
360
361 #[expect(clippy::type_complexity)]
362 let (flushers, rxs): (Vec<Flusher<K, V, P>>, Vec<UnboundedReceiver<Submission<K, V, P>>>) = (0..self.flushers)
363 .map(|id| Flusher::<K, V, P>::new(id, submit_queue_size.clone(), metrics.clone()))
364 .unzip();
365
366 let reclaimer = Reclaimer::new(
367 indexer.clone(),
368 flushers.clone(),
369 Arc::new(self.reinsertion_filter),
370 self.blob_index_size,
371 device.statistics().clone(),
372 runtime.clone(),
373 );
374 let reclaimer: Arc<dyn ReclaimerTrait> = Arc::new(reclaimer);
375
376 let block_manager = BlockManager::open(
377 device.clone(),
378 io_engine,
379 block_size,
380 self.eviction_pickers,
381 reclaimer,
382 self.reclaimers,
383 self.clean_block_threshold,
384 metrics.clone(),
385 runtime.clone(),
386 )?;
387 let blocks = block_manager.blocks();
388
389 if self.flushers + self.clean_block_threshold > blocks / 2 {
390 tracing::warn!("[block engine]: block-based object disk cache stable blocks count is too small, flusher [{flushers}] + clean block threshold [{clean_block_threshold}] (default = reclaimers) is supposed to be much larger than the block count [{blocks}]",
391 flushers = self.flushers,
392 clean_block_threshold = self.clean_block_threshold,
393 );
394 }
395
396 let sequence = AtomicSequence::default();
397
398 RecoverRunner::run(
399 self.recover_concurrency,
400 recover_mode,
401 self.blob_index_size,
402 (0..blocks as BlockId).collect_vec(),
403 &sequence,
404 &indexer,
405 &block_manager,
406 &tombstones,
407 runtime.clone(),
408 metrics.clone(),
409 )
410 .await?;
411
412 let io_buffer_size = self.buffer_pool_size / self.flushers;
413 for (flusher, rx) in flushers.iter().zip(rxs.into_iter()) {
414 flusher.run(
415 rx,
416 block_size,
417 io_buffer_size,
418 self.blob_index_size,
419 self.compression,
420 indexer.clone(),
421 block_manager.clone(),
422 tombstone_log.clone(),
423 metrics.clone(),
424 &runtime,
425 #[cfg(any(test, feature = "test_utils"))]
426 self.flush_switch.clone(),
427 )?;
428 }
429
430 let admission_filter = self.admission_filter.with_condition(IoThrottle);
431
432 let inner = BlockEngineInner {
433 admission_filter,
434 device,
435 indexer,
436 block_manager,
437 flushers,
438 submit_queue_size,
439 submit_queue_size_threshold: self.submit_queue_size_threshold,
440 sequence,
441 _spawner: runtime,
442 active: AtomicBool::new(true),
443 metrics,
444 #[cfg(any(test, feature = "test_utils"))]
445 flush_switch: self.flush_switch,
446 #[cfg(any(test, feature = "test_utils"))]
447 load_holder: self.load_holder,
448 };
449 let inner = Arc::new(inner);
450 let engine = BlockEngine { inner };
451 let engine = Arc::new(engine);
452 Ok(engine)
453 }
454}
455
456impl<K, V, P> EngineConfig<K, V, P> for BlockEngineConfig<K, V, P>
457where
458 K: StorageKey,
459 V: StorageValue,
460 P: Properties,
461{
462 fn build(self: Box<Self>, ctx: EngineBuildContext) -> BoxFuture<'static, Result<Arc<dyn Engine<K, V, P>>>> {
463 async move { self.build(ctx).await.map(|e| e as Arc<dyn Engine<K, V, P>>) }.boxed()
464 }
465}
466
467impl<K, V, P> From<BlockEngineConfig<K, V, P>> for Box<dyn EngineConfig<K, V, P>>
468where
469 K: StorageKey,
470 V: StorageValue,
471 P: Properties,
472{
473 fn from(builder: BlockEngineConfig<K, V, P>) -> Self {
474 builder.boxed()
475 }
476}
477
478pub struct BlockEngine<K, V, P>
480where
481 K: StorageKey,
482 V: StorageValue,
483 P: Properties,
484{
485 inner: Arc<BlockEngineInner<K, V, P>>,
486}
487
488impl<K, V, P> Debug for BlockEngine<K, V, P>
489where
490 K: StorageKey,
491 V: StorageValue,
492 P: Properties,
493{
494 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
495 f.debug_struct("GenericStore").finish()
496 }
497}
498
499struct BlockEngineInner<K, V, P>
500where
501 K: StorageKey,
502 V: StorageValue,
503 P: Properties,
504{
505 admission_filter: StorageFilter,
506
507 device: Arc<dyn Device>,
508
509 indexer: Indexer,
510 block_manager: BlockManager,
511
512 flushers: Vec<Flusher<K, V, P>>,
513
514 submit_queue_size: Arc<AtomicUsize>,
515 submit_queue_size_threshold: usize,
516
517 sequence: AtomicSequence,
518
519 _spawner: Spawner,
520
521 active: AtomicBool,
522
523 metrics: Arc<Metrics>,
524
525 #[cfg(any(test, feature = "test_utils"))]
526 flush_switch: Switch,
527
528 #[cfg(any(test, feature = "test_utils"))]
529 load_holder: Holder,
530}
531
532impl<K, V, P> Clone for BlockEngine<K, V, P>
533where
534 K: StorageKey,
535 V: StorageValue,
536 P: Properties,
537{
538 fn clone(&self) -> Self {
539 Self {
540 inner: self.inner.clone(),
541 }
542 }
543}
544
545impl<K, V, P> BlockEngine<K, V, P>
546where
547 K: StorageKey,
548 V: StorageValue,
549 P: Properties,
550{
551 fn wait(&self) -> impl Future<Output = ()> + Send + 'static {
552 let flushers = self.inner.flushers.clone();
553 let block_manager = self.inner.block_manager.clone();
554 async move {
555 join_all(flushers.iter().map(|flusher| flusher.wait())).await;
556 block_manager.wait_reclaim().await;
557 }
558 }
559
560 fn close(&self) -> BoxFuture<'static, Result<()>> {
561 let this = self.clone();
562 async move {
563 this.inner.active.store(false, Ordering::Relaxed);
564 this.wait().await;
565 Ok(())
566 }
567 .boxed()
568 }
569
570 #[cfg_attr(feature = "tracing", trace(name = "foyer::storage::engine::block::generic::enqueue"))]
571 fn enqueue(&self, piece: PieceRef<K, V, P>, estimated_size: usize) {
572 if !self.inner.active.load(Ordering::Relaxed) {
573 tracing::warn!("cannot enqueue new entry after closed");
574 return;
575 }
576
577 tracing::trace!(
578 hash = piece.hash(),
579 age = ?piece.properties().age().unwrap_or_default(),
580 "[block engine]: enqueue"
581 );
582 match piece.properties().age().unwrap_or_default() {
583 Age::Fresh | Age::Old => {}
584 Age::Young => {
585 self.inner.metrics.storage_block_engine_enqueue_skip.increase(1);
587 return;
588 }
589 }
590
591 if self.inner.submit_queue_size.load(Ordering::Relaxed) > self.inner.submit_queue_size_threshold {
592 self.inner.metrics.storage_queue_channel_overflow.increase(1);
593 return;
594 }
595
596 let sequence = self.inner.sequence.fetch_add(1, Ordering::Relaxed);
597
598 self.inner.flushers[piece.hash() as usize % self.inner.flushers.len()].submit(Submission::CacheEntry {
599 piece,
600 estimated_size,
601 sequence,
602 });
603 }
604
605 fn load(&self, hash: u64) -> impl Future<Output = Result<Load<K, V, P>>> + Send + 'static {
606 tracing::trace!(hash, "[block engine]: load");
607
608 #[cfg(any(test, feature = "test_utils"))]
609 let load_holer = self.inner.load_holder.wait();
610
611 let indexer = self.inner.indexer.clone();
612 let metrics = self.inner.metrics.clone();
613 let block_manager = self.inner.block_manager.clone();
614
615 let load = async move {
616 #[cfg(any(test, feature = "test_utils"))]
617 load_holer.await;
618
619 let addr = match indexer.get(hash) {
620 Some(addr) => addr,
621 None => {
622 return Ok(Load::Miss);
623 }
624 };
625
626 tracing::trace!(hash, ?addr, "[block engine]: load");
627
628 let block = block_manager.block(addr.block);
629 if block.partition().statistics().is_read_throttled() {
630 return Ok(Load::Throttled);
631 }
632
633 let buf = IoSliceMut::new(bits::align_up(PAGE, addr.len as _));
634 let (buf, res) = block.read(Box::new(buf), addr.offset as _).await;
635 match res {
636 Ok(_) => {}
637 Err(e) => {
638 tracing::error!(hash, ?addr, ?e, "[block engine load]: load error");
639 return Err(e);
640 }
641 }
642
643 let header = match EntryHeader::read(&buf[..EntryHeader::serialized_len()]) {
644 Ok(header) => header,
645 Err(e) => {
646 return match e.kind() {
647 ErrorKind::Parse
648 | ErrorKind::MagicMismatch
649 | ErrorKind::ChecksumMismatch
650 | ErrorKind::OutOfRange => {
651 tracing::warn!(
652 hash,
653 ?addr,
654 ?e,
655 "[block engine load]: deserialize read buffer raise error, remove this entry and skip"
656 );
657 indexer.remove(hash);
658 Ok(Load::Miss)
659 }
660 _ => {
661 tracing::error!(hash, ?addr, ?e, "[block engine load]: load error");
662 Err(e)
663 }
664 }
665 }
666 };
667
668 let (key, value) = {
669 let now = Instant::now();
670 let res = match EntryDeserializer::deserialize::<K, V>(
671 &buf[EntryHeader::serialized_len()..],
672 header.key_len as _,
673 header.value_len as _,
674 header.compression,
675 Some(header.checksum),
676 ) {
677 Ok(res) => res,
678 Err(e) => {
679 return match e.kind() {
680 ErrorKind::MagicMismatch | ErrorKind::ChecksumMismatch | ErrorKind::OutOfRange => {
681 tracing::warn!(
682 hash,
683 ?addr,
684 ?header,
685 ?e,
686 "[block engine load]: deserialize read buffer raise error, remove this entry and skip"
687 );
688 indexer.remove(hash);
689 Ok(Load::Miss)
690 }
691 _ => {
692 tracing::error!(hash, ?addr, ?header, ?e, "[block engine load]: load error");
693 Err(e)
694 }
695 }
696 }
697 };
698 metrics
699 .storage_entry_deserialize_duration
700 .record(now.elapsed().as_secs_f64());
701 res
702 };
703
704 let age = match block.statistics().probation.load(Ordering::Relaxed) {
705 true => Age::Old,
706 false => Age::Young,
707 };
708
709 Ok(Load::Entry {
710 key,
711 value,
712 populated: Populated { age },
713 })
714 };
715 #[cfg(feature = "tracing")]
716 let load = load.in_span(Span::enter_with_local_parent(
717 "foyer::storage::engine::block::generic::load",
718 ));
719 load
720 }
721
722 fn delete(&self, hash: u64) {
723 if !self.inner.active.load(Ordering::Relaxed) {
724 tracing::warn!("cannot delete entry after closed");
725 return;
726 }
727
728 let sequence = self.inner.sequence.fetch_add(1, Ordering::Relaxed);
729 let stats = self
730 .inner
731 .indexer
732 .insert_tombstone(hash, sequence)
733 .map(|addr| InvalidStats {
734 block: addr.block,
735 size: bits::align_up(PAGE, addr.len as usize),
736 });
737
738 let this = self.clone();
739
740 this.inner.flushers[hash as usize % this.inner.flushers.len()].submit(Submission::Tombstone {
741 tombstone: Tombstone { hash, sequence },
742 stats,
743 });
744 }
745
746 fn may_contains(&self, hash: u64) -> bool {
747 self.inner.indexer.get(hash).is_some()
748 }
749
750 fn destroy(&self) -> BoxFuture<'static, Result<()>> {
751 let this = self.clone();
752 async move {
753 if !this.inner.active.load(Ordering::Relaxed) {
754 return Err(Error::new(ErrorKind::Closed, "cannot delete entry after closed"));
755 }
756
757 let sequence = this.inner.sequence.fetch_add(1, Ordering::Relaxed);
759
760 this.inner.flushers[0].submit(Submission::Tombstone {
761 tombstone: Tombstone { hash: 0, sequence },
762 stats: None,
763 });
764 this.wait().await;
765
766 this.inner.indexer.clear();
771
772 try_join_all((0..this.inner.block_manager.blocks() as BlockId).map(|id| {
774 let block = this.inner.block_manager.block(id).clone();
775 async move {
776 let res = BlockCleaner::clean(&block).await;
777 block.statistics().reset();
778 res
779 }
780 }))
781 .await?;
782
783 Ok(())
784 }
785 .boxed()
786 }
787
788 #[cfg(any(test, feature = "test_utils"))]
789 pub fn hold_flush(&self) {
790 self.inner.flush_switch.on();
791 }
792
793 #[cfg(any(test, feature = "test_utils"))]
794 pub fn unhold_flush(&self) {
795 self.inner.flush_switch.off();
796 }
797}
798
799impl<K, V, P> Engine<K, V, P> for BlockEngine<K, V, P>
800where
801 K: StorageKey,
802 V: StorageValue,
803 P: Properties,
804{
805 fn device(&self) -> &Arc<dyn Device> {
806 &self.inner.device
807 }
808
809 fn filter(&self, hash: u64, estimated_size: usize) -> StorageFilterResult {
810 self.inner
811 .admission_filter
812 .filter(self.inner.device.statistics(), hash, estimated_size)
813 }
814
815 fn enqueue(&self, piece: PieceRef<K, V, P>, estimated_size: usize) {
816 self.enqueue(piece, estimated_size);
817 }
818
819 fn load(&self, hash: u64) -> BoxFuture<'static, Result<Load<K, V, P>>> {
820 self.load(hash).boxed()
822 }
823
824 fn delete(&self, hash: u64) {
825 self.delete(hash);
826 }
827
828 fn may_contains(&self, hash: u64) -> bool {
829 self.may_contains(hash)
830 }
831
832 fn destroy(&self) -> BoxFuture<'static, Result<()>> {
833 self.destroy()
834 }
835
836 fn wait(&self) -> BoxFuture<'static, ()> {
837 self.wait().boxed()
839 }
840
841 fn close(&self) -> BoxFuture<'static, Result<()>> {
842 self.close()
843 }
844}
845
846#[cfg(test)]
847mod tests {
848
849 use std::{fs::File, path::Path};
850
851 use bytesize::ByteSize;
852 use foyer_common::hasher::ModHasher;
853 use foyer_memory::{Cache, CacheBuilder, CacheEntry, FifoConfig, TestProperties};
854 use itertools::Itertools;
855
856 use super::*;
857 use crate::{
858 engine::RecoverMode,
859 io::{
860 device::{combined::CombinedDeviceBuilder, fs::FsDeviceBuilder, DeviceBuilder},
861 engine::{IoEngine, IoEngineBuildContext, IoEngineConfig},
862 },
863 serde::EntrySerializer,
864 test_utils::Biased,
865 PsyncIoEngineConfig, RejectAll,
866 };
867
868 const KB: usize = 1024;
869
870 fn cache_for_test() -> Cache<u64, Vec<u8>, ModHasher, TestProperties> {
871 CacheBuilder::new(10)
872 .with_shards(1)
873 .with_eviction_config(FifoConfig::default())
874 .with_hash_builder(ModHasher::default())
875 .build()
876 }
877
878 async fn io_engine_for_test(spawner: Spawner) -> Arc<dyn IoEngine> {
879 PsyncIoEngineConfig::new()
881 .boxed()
882 .build(IoEngineBuildContext { spawner })
883 .await
884 .unwrap()
885 }
886
887 async fn engine_for_test(dir: impl AsRef<Path>) -> Arc<BlockEngine<u64, Vec<u8>, TestProperties>> {
889 store_for_test_with_reinsertion_filter(dir, StorageFilter::new().with_condition(RejectAll)).await
890 }
891
892 async fn store_for_test_with_reinsertion_filter(
893 dir: impl AsRef<Path>,
894 reinsertion_filter: StorageFilter,
895 ) -> Arc<BlockEngine<u64, Vec<u8>, TestProperties>> {
896 let device = FsDeviceBuilder::new(dir)
897 .with_capacity(ByteSize::kib(64).as_u64() as _)
898 .build()
899 .unwrap();
900 let spawner = Spawner::current();
901 let io_engine = io_engine_for_test(spawner.clone()).await;
902 let metrics = Arc::new(Metrics::noop());
903 let builder = BlockEngineConfig {
904 device,
905 block_size: 16 * 1024,
906 compression: Compression::None,
907 indexer_shards: 4,
908 recover_concurrency: 2,
909 flushers: 1,
910 reclaimers: 1,
911 clean_block_threshold: 1,
912 admission_filter: StorageFilter::new(),
913 eviction_pickers: vec![Box::<FifoPicker>::default()],
914 reinsertion_filter,
915 enable_tombstone_log: false,
916 buffer_pool_size: 16 * 1024 * 1024,
917 blob_index_size: 4 * 1024,
918 submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
919 flush_switch: Switch::default(),
920 load_holder: Holder::default(),
921 marker: PhantomData,
922 };
923
924 let builder = Box::new(builder);
925 builder
926 .build(EngineBuildContext {
927 io_engine,
928 metrics,
929 spawner,
930 recover_mode: RecoverMode::Strict,
931 })
932 .await
933 .unwrap()
934 }
935
936 async fn store_for_test_with_tombstone_log(
937 dir: impl AsRef<Path>,
938 ) -> Arc<BlockEngine<u64, Vec<u8>, TestProperties>> {
939 let device = FsDeviceBuilder::new(dir)
940 .with_capacity(ByteSize::kib(64).as_u64() as usize + ByteSize::kib(4).as_u64() as usize)
941 .build()
942 .unwrap();
943 let spawner = Spawner::current();
944 let io_engine = io_engine_for_test(spawner.clone()).await;
945 let metrics = Arc::new(Metrics::noop());
946 let builder = BlockEngineConfig {
947 device,
948 block_size: 16 * 1024,
949 compression: Compression::None,
950 indexer_shards: 4,
951 recover_concurrency: 2,
952 flushers: 1,
953 reclaimers: 1,
954 clean_block_threshold: 1,
955 eviction_pickers: vec![Box::<FifoPicker>::default()],
956 admission_filter: StorageFilter::new(),
957 reinsertion_filter: StorageFilter::new().with_condition(RejectAll),
958 enable_tombstone_log: true,
959 buffer_pool_size: 16 * 1024 * 1024,
960 blob_index_size: 4 * 1024,
961 submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
962 flush_switch: Switch::default(),
963 load_holder: Holder::default(),
964 marker: PhantomData,
965 };
966 let builder = Box::new(builder);
967 builder
968 .build(EngineBuildContext {
969 io_engine,
970 metrics,
971 spawner,
972 recover_mode: RecoverMode::Strict,
973 })
974 .await
975 .unwrap()
976 }
977
978 fn enqueue(
979 store: &BlockEngine<u64, Vec<u8>, TestProperties>,
980 entry: CacheEntry<u64, Vec<u8>, ModHasher, TestProperties>,
981 ) {
982 let estimated_size = EntrySerializer::estimated_size(entry.key(), entry.value());
983 store.enqueue(entry.piece().into(), estimated_size);
984 }
985
986 #[test_log::test(tokio::test)]
987 async fn test_store_enqueue_lookup_recovery() {
988 let dir = tempfile::tempdir().unwrap();
989
990 let memory = cache_for_test();
991 let store = engine_for_test(dir.path()).await;
992
993 store.hold_flush();
995 let e1 = memory.insert(1, vec![1; 7 * KB]);
996 let e2 = memory.insert(2, vec![2; 3 * KB]);
997 enqueue(&store, e1.clone());
998 enqueue(&store, e2);
999 store.unhold_flush();
1000 store.wait().await;
1001
1002 let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
1003 assert_eq!(r1, (1, vec![1; 7 * KB]));
1004 let r2 = store.load(memory.hash(&2)).await.unwrap().kv().unwrap();
1005 assert_eq!(r2, (2, vec![2; 3 * KB]));
1006
1007 store.hold_flush();
1009 let e3 = memory.insert(3, vec![3; 7 * KB]);
1010 let e4 = memory.insert(4, vec![4; 2 * KB]);
1011 enqueue(&store, e3);
1012 enqueue(&store, e4);
1013 store.unhold_flush();
1014 store.wait().await;
1015
1016 let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
1017 assert_eq!(r1, (1, vec![1; 7 * KB]));
1018 let r2 = store.load(memory.hash(&2)).await.unwrap().kv().unwrap();
1019 assert_eq!(r2, (2, vec![2; 3 * KB]));
1020 let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
1021 assert_eq!(r3, (3, vec![3; 7 * KB]));
1022 let r4 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
1023 assert_eq!(r4, (4, vec![4; 2 * KB]));
1024
1025 let e5 = memory.insert(5, vec![5; 11 * KB]);
1027 enqueue(&store, e5);
1028 store.wait().await;
1029
1030 let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
1031 assert_eq!(r1, (1, vec![1; 7 * KB]));
1032 let r2 = store.load(memory.hash(&2)).await.unwrap().kv().unwrap();
1033 assert_eq!(r2, (2, vec![2; 3 * KB]));
1034 let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
1035 assert_eq!(r3, (3, vec![3; 7 * KB]));
1036 let r4 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
1037 assert_eq!(r4, (4, vec![4; 2 * KB]));
1038 let r5 = store.load(memory.hash(&5)).await.unwrap().kv().unwrap();
1039 assert_eq!(r5, (5, vec![5; 11 * KB]));
1040
1041 store.hold_flush();
1043 let e6 = memory.insert(6, vec![6; 7 * KB]);
1044 let e4v2 = memory.insert(4, vec![!4; 3 * KB]);
1045 enqueue(&store, e6);
1046 enqueue(&store, e4v2);
1047 store.unhold_flush();
1048 store.wait().await;
1049
1050 assert!(store.load(memory.hash(&1)).await.unwrap().kv().is_none());
1051 assert!(store.load(memory.hash(&2)).await.unwrap().kv().is_none());
1052 let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
1053 assert_eq!(r3, (3, vec![3; 7 * KB]));
1054 let r4v2 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
1055 assert_eq!(r4v2, (4, vec![!4; 3 * KB]));
1056 let r5 = store.load(memory.hash(&5)).await.unwrap().kv().unwrap();
1057 assert_eq!(r5, (5, vec![5; 11 * KB]));
1058 let r6 = store.load(memory.hash(&6)).await.unwrap().kv().unwrap();
1059 assert_eq!(r6, (6, vec![6; 7 * KB]));
1060
1061 store.close().await.unwrap();
1062 enqueue(&store, e1);
1063 store.wait().await;
1064
1065 drop(store);
1066
1067 let store = engine_for_test(dir.path()).await;
1068
1069 assert!(store.load(memory.hash(&1)).await.unwrap().kv().is_none());
1070 assert!(store.load(memory.hash(&2)).await.unwrap().kv().is_none());
1071 let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
1072 assert_eq!(r3, (3, vec![3; 7 * KB]));
1073 let r4v2 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
1074 assert_eq!(r4v2, (4, vec![!4; 3 * KB]));
1075 let r5 = store.load(memory.hash(&5)).await.unwrap().kv().unwrap();
1076 assert_eq!(r5, (5, vec![5; 11 * KB]));
1077 let r6 = store.load(memory.hash(&6)).await.unwrap().kv().unwrap();
1078 assert_eq!(r6, (6, vec![6; 7 * KB]));
1079 }
1080
1081 #[test_log::test(tokio::test)]
1082 async fn test_store_delete_recovery() {
1083 let dir = tempfile::tempdir().unwrap();
1084
1085 let memory = cache_for_test();
1086 let store = store_for_test_with_tombstone_log(dir.path()).await;
1087
1088 let es = (0..10).map(|i| memory.insert(i, vec![i as u8; 3 * KB])).collect_vec();
1089
1090 for e in es.iter().take(9) {
1092 enqueue(&store, e.clone());
1093 }
1094 store.wait().await;
1095
1096 for i in 0..9 {
1097 assert_eq!(
1098 store.load(memory.hash(&i)).await.unwrap().kv(),
1099 Some((i, vec![i as u8; 3 * KB]))
1100 );
1101 }
1102
1103 store.delete(memory.hash(&3));
1104 store.wait().await;
1105 assert_eq!(store.load(memory.hash(&3)).await.unwrap().kv(), None);
1106
1107 store.close().await.unwrap();
1108 drop(store);
1109
1110 let store = store_for_test_with_tombstone_log(dir.path()).await;
1111 for i in 0..9 {
1112 if i != 3 {
1113 assert_eq!(
1114 store.load(memory.hash(&i)).await.unwrap().kv(),
1115 Some((i, vec![i as u8; 3 * KB]))
1116 );
1117 } else {
1118 assert_eq!(store.load(memory.hash(&3)).await.unwrap().kv(), None);
1119 }
1120 }
1121
1122 enqueue(&store, es[3].clone());
1123 store.wait().await;
1124 assert_eq!(
1125 store.load(memory.hash(&3)).await.unwrap().kv(),
1126 Some((3, vec![3; 3 * KB]))
1127 );
1128
1129 store.close().await.unwrap();
1130 drop(store);
1131
1132 let store = store_for_test_with_tombstone_log(dir.path()).await;
1133
1134 assert_eq!(
1135 store.load(memory.hash(&3)).await.unwrap().kv(),
1136 Some((3, vec![3; 3 * KB]))
1137 );
1138 }
1139
1140 #[test_log::test(tokio::test)]
1141 async fn test_store_destroy_recovery() {
1142 let dir = tempfile::tempdir().unwrap();
1143
1144 let memory = cache_for_test();
1145 let store = store_for_test_with_tombstone_log(dir.path()).await;
1146
1147 let es = (0..10).map(|i| memory.insert(i, vec![i as u8; 3 * KB])).collect_vec();
1148
1149 store.hold_flush();
1151 for e in es.iter().take(9) {
1152 enqueue(&store, e.clone());
1153 }
1154 store.unhold_flush();
1155 store.wait().await;
1156
1157 for i in 0..9 {
1158 assert_eq!(
1159 store.load(memory.hash(&i)).await.unwrap().kv(),
1160 Some((i, vec![i as u8; 3 * KB]))
1161 );
1162 }
1163
1164 store.delete(memory.hash(&3));
1165 store.wait().await;
1166 assert_eq!(store.load(memory.hash(&3)).await.unwrap().kv(), None);
1167
1168 store.destroy().await.unwrap();
1169
1170 store.close().await.unwrap();
1171 drop(store);
1172
1173 let store = store_for_test_with_tombstone_log(dir.path()).await;
1174 for i in 0..9 {
1175 assert_eq!(store.load(memory.hash(&i)).await.unwrap().kv(), None);
1176 }
1177
1178 enqueue(&store, es[3].clone());
1179 store.wait().await;
1180 assert_eq!(
1181 store.load(memory.hash(&3)).await.unwrap().kv(),
1182 Some((3, vec![3; 3 * KB]))
1183 );
1184
1185 store.close().await.unwrap();
1186 drop(store);
1187
1188 let store = store_for_test_with_tombstone_log(dir.path()).await;
1189
1190 assert_eq!(
1191 store.load(memory.hash(&3)).await.unwrap().kv(),
1192 Some((3, vec![3; 3 * KB]))
1193 );
1194 }
1195
1196 #[test_log::test(tokio::test)]
1217 async fn test_store_reinsertion() {
1218 let dir = tempfile::tempdir().unwrap();
1219
1220 let memory = cache_for_test();
1221 let store = store_for_test_with_reinsertion_filter(
1222 dir.path(),
1223 StorageFilter::new().with_condition(Biased::new(vec![1, 3, 5, 7, 9, 11, 13, 15, 17, 19])),
1224 )
1225 .await;
1226
1227 let es = (0..15).map(|i| memory.insert(i, vec![i as u8; 3 * KB])).collect_vec();
1228
1229 for e in es.iter().take(9).cloned() {
1231 enqueue(&store, e);
1232 store.wait().await;
1233 }
1234
1235 for i in 0..9 {
1236 let r = store.load(memory.hash(&i)).await.unwrap().kv().unwrap();
1237 assert_eq!(r, (i, vec![i as u8; 3 * KB]));
1238 }
1239
1240 enqueue(&store, es[9].clone());
1242 enqueue(&store, es[10].clone());
1243 store.wait().await;
1244 let mut res = vec![];
1245 for i in 0..11 {
1246 res.push(store.load(memory.hash(&i)).await.unwrap().kv());
1247 }
1248 assert_eq!(
1249 res,
1250 vec![
1251 None,
1252 Some((1, vec![1; 3 * KB])),
1253 None,
1254 Some((3, vec![3; 3 * KB])),
1255 Some((4, vec![4; 3 * KB])),
1256 Some((5, vec![5; 3 * KB])),
1257 Some((6, vec![6; 3 * KB])),
1258 Some((7, vec![7; 3 * KB])),
1259 Some((8, vec![8; 3 * KB])),
1260 Some((9, vec![9; 3 * KB])),
1261 Some((10, vec![10; 3 * KB])),
1262 ]
1263 );
1264
1265 enqueue(&store, es[11].clone());
1267 store.wait().await;
1268 let mut res = vec![];
1269 for i in 0..12 {
1270 res.push(store.load(memory.hash(&i)).await.unwrap().kv());
1271 }
1272 assert_eq!(
1273 res,
1274 vec![
1275 None,
1276 Some((1, vec![1; 3 * KB])),
1277 None,
1278 Some((3, vec![3; 3 * KB])),
1279 None,
1280 Some((5, vec![5; 3 * KB])),
1281 Some((6, vec![6; 3 * KB])),
1282 Some((7, vec![7; 3 * KB])),
1283 Some((8, vec![8; 3 * KB])),
1284 Some((9, vec![9; 3 * KB])),
1285 Some((10, vec![10; 3 * KB])),
1286 Some((11, vec![11; 3 * KB])),
1287 ]
1288 );
1289
1290 store.delete(memory.hash(&7));
1292 store.wait().await;
1293 enqueue(&store, es[12].clone());
1294 store.wait().await;
1295 enqueue(&store, es[13].clone());
1296 store.wait().await;
1297 enqueue(&store, es[14].clone());
1298 store.wait().await;
1299 let mut res = vec![];
1300 for i in 0..15 {
1301 res.push(store.load(memory.hash(&i)).await.unwrap().kv());
1302 }
1303 assert_eq!(
1304 res,
1305 vec![
1306 None,
1307 Some((1, vec![1; 3 * KB])),
1308 None,
1309 Some((3, vec![3; 3 * KB])),
1310 None,
1311 Some((5, vec![5; 3 * KB])),
1312 None,
1313 None,
1314 None,
1315 Some((9, vec![9; 3 * KB])),
1316 Some((10, vec![10; 3 * KB])),
1317 Some((11, vec![11; 3 * KB])),
1318 Some((12, vec![12; 3 * KB])),
1319 Some((13, vec![13; 3 * KB])),
1320 Some((14, vec![14; 3 * KB])),
1321 ]
1322 );
1323 }
1324
1325 #[test_log::test(tokio::test)]
1326 async fn test_store_magic_checksum_mismatch() {
1327 let dir = tempfile::tempdir().unwrap();
1328
1329 let memory = cache_for_test();
1330 let store = engine_for_test(dir.path()).await;
1331
1332 let e1 = memory.insert(1, vec![1; 7 * KB]);
1334 enqueue(&store, e1);
1335 store.wait().await;
1336
1337 let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
1339 assert_eq!(r1, (1, vec![1; 7 * KB]));
1340
1341 for entry in std::fs::read_dir(dir.path()).unwrap() {
1343 let entry = entry.unwrap();
1344 if !entry.metadata().unwrap().is_file() {
1345 continue;
1346 }
1347
1348 let file = File::options().write(true).open(entry.path()).unwrap();
1349 #[cfg(target_family = "unix")]
1350 {
1351 use std::os::unix::fs::FileExt;
1352 file.write_all_at(&[b'x'; 42], 5 * 1024).unwrap();
1353 }
1354 #[cfg(target_family = "windows")]
1355 {
1356 use std::os::windows::fs::FileExt;
1357 file.seek_write(&[b'x'; 42], 5 * 1024).unwrap();
1358 }
1359 }
1360
1361 assert!(store.load(memory.hash(&1)).await.unwrap().kv().is_none());
1362 }
1363
1364 #[test_log::test(tokio::test)]
1365 async fn test_aggregated_device() {
1366 let dir = tempfile::tempdir().unwrap();
1367
1368 const KB: usize = 1024;
1369 const MB: usize = 1024 * 1024;
1370
1371 let spawner = Spawner::current();
1372 let io_engine = io_engine_for_test(spawner.clone()).await;
1373
1374 let d1 = FsDeviceBuilder::new(dir.path().join("dev1"))
1375 .with_capacity(MB)
1376 .build()
1377 .unwrap();
1378 let d2 = FsDeviceBuilder::new(dir.path().join("dev2"))
1379 .with_capacity(2 * MB)
1380 .build()
1381 .unwrap();
1382 let d3 = FsDeviceBuilder::new(dir.path().join("dev3"))
1383 .with_capacity(4 * MB)
1384 .build()
1385 .unwrap();
1386 let device = CombinedDeviceBuilder::new()
1387 .with_device(d1)
1388 .with_device(d2)
1389 .with_device(d3)
1390 .build()
1391 .unwrap();
1392 let engine = BlockEngineConfig::<u64, Vec<u8>, TestProperties>::new(device)
1393 .with_block_size(64 * KB)
1394 .boxed()
1395 .build(EngineBuildContext {
1396 io_engine,
1397 metrics: Arc::new(Metrics::noop()),
1398 spawner,
1399 recover_mode: RecoverMode::None,
1400 })
1401 .await
1402 .unwrap();
1403 assert_eq!(engine.inner.block_manager.blocks(), (1 + 2 + 4) * MB / (64 * KB));
1404 }
1405}