use std::{
fmt::Debug,
future::Future,
marker::PhantomData,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::Instant,
};
#[cfg(feature = "tracing")]
use fastrace::prelude::*;
use foyer_common::{
bits,
code::{StorageKey, StorageValue},
error::{Error, ErrorKind, Result},
metrics::Metrics,
properties::{Age, Properties},
spawn::Spawner,
};
use futures_core::future::BoxFuture;
use futures_util::{
future::{join_all, try_join_all},
FutureExt,
};
use itertools::Itertools;
use mea::mpsc::UnboundedReceiver;
use super::{
flusher::{Flusher, InvalidStats, Submission},
indexer::Indexer,
recover::RecoverRunner,
};
#[cfg(any(test, feature = "test_utils"))]
use crate::test_utils::*;
use crate::{
compress::Compression,
engine::{
block::{
eviction::{EvictionPicker, FifoPicker, InvalidRatioPicker},
manager::{BlockId, BlockManager},
reclaimer::{BlockCleaner, Reclaimer, ReclaimerTrait},
serde::{AtomicSequence, EntryHeader},
tombstone::{Tombstone, TombstoneLog},
},
Engine, EngineBuildContext, EngineConfig, Populated,
},
filter::conditions::IoThrottle,
io::{bytes::IoSliceMut, PAGE},
keeper::PieceRef,
serde::EntryDeserializer,
Device, Load, RejectAll, StorageFilter, StorageFilterResult,
};
pub struct BlockEngineConfig<K, V, P>
where
K: StorageKey,
V: StorageValue,
P: Properties,
{
device: Arc<dyn Device>,
block_size: usize,
compression: Compression,
indexer_shards: usize,
recover_concurrency: usize,
flushers: usize,
reclaimers: usize,
buffer_pool_size: usize,
blob_index_size: usize,
submit_queue_size_threshold: usize,
clean_block_threshold: usize,
eviction_pickers: Vec<Box<dyn EvictionPicker>>,
admission_filter: StorageFilter,
reinsertion_filter: StorageFilter,
enable_tombstone_log: bool,
#[cfg(any(test, feature = "test_utils"))]
flush_switch: Switch,
#[cfg(any(test, feature = "test_utils"))]
load_holder: Holder,
marker: PhantomData<(K, V, P)>,
}
impl<K, V, P> Debug for BlockEngineConfig<K, V, P>
where
K: StorageKey,
V: StorageValue,
P: Properties,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockEngineConfig")
.field("device", &self.device)
.field("block_size", &self.block_size)
.field("compression", &self.compression)
.field("indexer_shards", &self.indexer_shards)
.field("recover_concurrency", &self.recover_concurrency)
.field("flushers", &self.flushers)
.field("reclaimers", &self.reclaimers)
.field("buffer_pool_size", &self.buffer_pool_size)
.field("blob_index_size", &self.blob_index_size)
.field("submit_queue_size_threshold", &self.submit_queue_size_threshold)
.field("clean_block_threshold", &self.clean_block_threshold)
.field("eviction_pickers", &self.eviction_pickers)
.field("admission_filter", &self.admission_filter)
.field("reinsertion_filter", &self.reinsertion_filter)
.field("enable_tombstone_log", &self.enable_tombstone_log)
.finish()
}
}
impl<K, V, P> BlockEngineConfig<K, V, P>
where
K: StorageKey,
V: StorageValue,
P: Properties,
{
pub fn new(device: Arc<dyn Device>) -> Self {
Self {
device,
block_size: 16 * 1024 * 1024, compression: Compression::default(),
indexer_shards: 64,
recover_concurrency: 8,
flushers: 1,
reclaimers: 1,
buffer_pool_size: 16 * 1024 * 1024, blob_index_size: 4 * 1024, submit_queue_size_threshold: 16 * 1024 * 1024, clean_block_threshold: 1,
eviction_pickers: vec![Box::new(InvalidRatioPicker::new(0.8)), Box::<FifoPicker>::default()],
admission_filter: StorageFilter::new(),
reinsertion_filter: StorageFilter::new().with_condition(RejectAll),
enable_tombstone_log: false,
#[cfg(any(test, feature = "test_utils"))]
flush_switch: Switch::default(),
#[cfg(any(test, feature = "test_utils"))]
load_holder: Holder::default(),
marker: PhantomData,
}
}
pub fn with_block_size(mut self, block_size: usize) -> Self {
self.block_size = bits::align_up(PAGE, block_size);
self
}
pub fn with_indexer_shards(mut self, indexer_shards: usize) -> Self {
self.indexer_shards = indexer_shards;
self
}
pub fn with_recover_concurrency(mut self, recover_concurrency: usize) -> Self {
self.recover_concurrency = recover_concurrency;
self
}
pub fn with_flushers(mut self, flushers: usize) -> Self {
self.flushers = flushers;
self
}
pub fn with_admission_filter(mut self, filter: StorageFilter) -> Self {
self.admission_filter = filter;
self
}
pub fn with_reclaimers(mut self, reclaimers: usize) -> Self {
self.reclaimers = reclaimers;
self
}
pub fn with_buffer_pool_size(mut self, buffer_pool_size: usize) -> Self {
self.buffer_pool_size = buffer_pool_size;
self
}
pub fn with_blob_index_size(mut self, blob_index_size: usize) -> Self {
let blob_index_size = bits::align_up(PAGE, blob_index_size);
self.blob_index_size = blob_index_size;
self
}
pub fn with_submit_queue_size_threshold(mut self, submit_queue_size_threshold: usize) -> Self {
self.submit_queue_size_threshold = submit_queue_size_threshold;
self
}
pub fn with_clean_block_threshold(mut self, clean_block_threshold: usize) -> Self {
self.clean_block_threshold = clean_block_threshold;
self
}
pub fn with_eviction_pickers(mut self, eviction_pickers: Vec<Box<dyn EvictionPicker>>) -> Self {
self.eviction_pickers = eviction_pickers;
self
}
pub fn with_reinsertion_filter(mut self, filter: StorageFilter) -> Self {
self.reinsertion_filter = filter;
self
}
pub fn with_tombstone_log(mut self, enable: bool) -> Self {
self.enable_tombstone_log = enable;
self
}
#[cfg(any(test, feature = "test_utils"))]
pub fn with_flush_switch(mut self, flush_switch: Switch) -> Self {
self.flush_switch = flush_switch;
self
}
#[cfg(any(test, feature = "test_utils"))]
pub fn with_load_holder(mut self, load_holder: Holder) -> Self {
self.load_holder = load_holder;
self
}
pub async fn build(
self: Box<Self>,
EngineBuildContext {
io_engine,
metrics,
spawner: runtime,
recover_mode,
}: EngineBuildContext,
) -> Result<Arc<BlockEngine<K, V, P>>> {
let device = self.device;
let block_size = self.block_size;
let mut tombstones = vec![];
let tombstone_log = if self.enable_tombstone_log {
let mut partitions = vec![];
let max_entries = device.capacity() / PAGE;
let pages = max_entries / TombstoneLog::SLOTS_PER_PAGE
+ if max_entries % TombstoneLog::SLOTS_PER_PAGE > 0 {
1
} else {
0
};
let partition = device.create_partition(pages * PAGE)?;
partitions.push(partition);
let tombstone_log = TombstoneLog::open(partitions, io_engine.clone(), &mut tombstones).await?;
Some(tombstone_log)
} else {
None
};
let indexer = Indexer::new(self.indexer_shards);
let submit_queue_size = Arc::<AtomicUsize>::default();
#[expect(clippy::type_complexity)]
let (flushers, rxs): (Vec<Flusher<K, V, P>>, Vec<UnboundedReceiver<Submission<K, V, P>>>) = (0..self.flushers)
.map(|id| Flusher::<K, V, P>::new(id, submit_queue_size.clone(), metrics.clone()))
.unzip();
let reclaimer = Reclaimer::new(
indexer.clone(),
flushers.clone(),
Arc::new(self.reinsertion_filter),
self.blob_index_size,
device.statistics().clone(),
runtime.clone(),
);
let reclaimer: Arc<dyn ReclaimerTrait> = Arc::new(reclaimer);
let block_manager = BlockManager::open(
device.clone(),
io_engine,
block_size,
self.eviction_pickers,
reclaimer,
self.reclaimers,
self.clean_block_threshold,
metrics.clone(),
runtime.clone(),
)?;
let blocks = block_manager.blocks();
if self.flushers + self.clean_block_threshold > blocks / 2 {
tracing::warn!("[block engine]: block-based object disk cache stable blocks count is too small, flusher [{flushers}] + clean block threshold [{clean_block_threshold}] (default = reclaimers) is supposed to be much larger than the block count [{blocks}]",
flushers = self.flushers,
clean_block_threshold = self.clean_block_threshold,
);
}
let sequence = AtomicSequence::default();
RecoverRunner::run(
self.recover_concurrency,
recover_mode,
self.blob_index_size,
(0..blocks as BlockId).collect_vec(),
&sequence,
&indexer,
&block_manager,
&tombstones,
runtime.clone(),
metrics.clone(),
)
.await?;
let io_buffer_size = self.buffer_pool_size / self.flushers;
for (flusher, rx) in flushers.iter().zip(rxs.into_iter()) {
flusher.run(
rx,
block_size,
io_buffer_size,
self.blob_index_size,
self.compression,
indexer.clone(),
block_manager.clone(),
tombstone_log.clone(),
metrics.clone(),
&runtime,
#[cfg(any(test, feature = "test_utils"))]
self.flush_switch.clone(),
)?;
}
let admission_filter = self.admission_filter.with_condition(IoThrottle);
let inner = BlockEngineInner {
admission_filter,
device,
indexer,
block_manager,
flushers,
submit_queue_size,
submit_queue_size_threshold: self.submit_queue_size_threshold,
sequence,
_spawner: runtime,
active: AtomicBool::new(true),
metrics,
#[cfg(any(test, feature = "test_utils"))]
flush_switch: self.flush_switch,
#[cfg(any(test, feature = "test_utils"))]
load_holder: self.load_holder,
};
let inner = Arc::new(inner);
let engine = BlockEngine { inner };
let engine = Arc::new(engine);
Ok(engine)
}
}
impl<K, V, P> EngineConfig<K, V, P> for BlockEngineConfig<K, V, P>
where
K: StorageKey,
V: StorageValue,
P: Properties,
{
fn build(self: Box<Self>, ctx: EngineBuildContext) -> BoxFuture<'static, Result<Arc<dyn Engine<K, V, P>>>> {
async move { self.build(ctx).await.map(|e| e as Arc<dyn Engine<K, V, P>>) }.boxed()
}
}
impl<K, V, P> From<BlockEngineConfig<K, V, P>> for Box<dyn EngineConfig<K, V, P>>
where
K: StorageKey,
V: StorageValue,
P: Properties,
{
fn from(builder: BlockEngineConfig<K, V, P>) -> Self {
builder.boxed()
}
}
pub struct BlockEngine<K, V, P>
where
K: StorageKey,
V: StorageValue,
P: Properties,
{
inner: Arc<BlockEngineInner<K, V, P>>,
}
impl<K, V, P> Debug for BlockEngine<K, V, P>
where
K: StorageKey,
V: StorageValue,
P: Properties,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GenericStore").finish()
}
}
struct BlockEngineInner<K, V, P>
where
K: StorageKey,
V: StorageValue,
P: Properties,
{
admission_filter: StorageFilter,
device: Arc<dyn Device>,
indexer: Indexer,
block_manager: BlockManager,
flushers: Vec<Flusher<K, V, P>>,
submit_queue_size: Arc<AtomicUsize>,
submit_queue_size_threshold: usize,
sequence: AtomicSequence,
_spawner: Spawner,
active: AtomicBool,
metrics: Arc<Metrics>,
#[cfg(any(test, feature = "test_utils"))]
flush_switch: Switch,
#[cfg(any(test, feature = "test_utils"))]
load_holder: Holder,
}
impl<K, V, P> Clone for BlockEngine<K, V, P>
where
K: StorageKey,
V: StorageValue,
P: Properties,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<K, V, P> BlockEngine<K, V, P>
where
K: StorageKey,
V: StorageValue,
P: Properties,
{
fn wait(&self) -> impl Future<Output = ()> + Send + 'static {
let flushers = self.inner.flushers.clone();
let block_manager = self.inner.block_manager.clone();
async move {
join_all(flushers.iter().map(|flusher| flusher.wait())).await;
block_manager.wait_reclaim().await;
}
}
fn close(&self) -> BoxFuture<'static, Result<()>> {
let this = self.clone();
async move {
this.inner.active.store(false, Ordering::Relaxed);
this.wait().await;
Ok(())
}
.boxed()
}
#[cfg_attr(feature = "tracing", trace(name = "foyer::storage::engine::block::generic::enqueue"))]
fn enqueue(&self, piece: PieceRef<K, V, P>, estimated_size: usize) {
if !self.inner.active.load(Ordering::Relaxed) {
tracing::warn!("cannot enqueue new entry after closed");
return;
}
tracing::trace!(
hash = piece.hash(),
age = ?piece.properties().age().unwrap_or_default(),
"[block engine]: enqueue"
);
match piece.properties().age().unwrap_or_default() {
Age::Fresh | Age::Old => {}
Age::Young => {
self.inner.metrics.storage_block_engine_enqueue_skip.increase(1);
return;
}
}
if self.inner.submit_queue_size.load(Ordering::Relaxed) > self.inner.submit_queue_size_threshold {
self.inner.metrics.storage_queue_channel_overflow.increase(1);
return;
}
let sequence = self.inner.sequence.fetch_add(1, Ordering::Relaxed);
self.inner.flushers[piece.hash() as usize % self.inner.flushers.len()].submit(Submission::CacheEntry {
piece,
estimated_size,
sequence,
});
}
fn load(&self, hash: u64) -> impl Future<Output = Result<Load<K, V, P>>> + Send + 'static {
tracing::trace!(hash, "[block engine]: load");
#[cfg(any(test, feature = "test_utils"))]
let load_holer = self.inner.load_holder.wait();
let indexer = self.inner.indexer.clone();
let metrics = self.inner.metrics.clone();
let block_manager = self.inner.block_manager.clone();
let load = async move {
#[cfg(any(test, feature = "test_utils"))]
load_holer.await;
let addr = match indexer.get(hash) {
Some(addr) => addr,
None => {
return Ok(Load::Miss);
}
};
tracing::trace!(hash, ?addr, "[block engine]: load");
let block = block_manager.block(addr.block);
if block.partition().statistics().is_read_throttled() {
return Ok(Load::Throttled);
}
let buf = IoSliceMut::new(bits::align_up(PAGE, addr.len as _));
let (buf, res) = block.read(Box::new(buf), addr.offset as _).await;
match res {
Ok(_) => {}
Err(e) => {
tracing::error!(hash, ?addr, ?e, "[block engine load]: load error");
return Err(e);
}
}
let header = match EntryHeader::read(&buf[..EntryHeader::serialized_len()]) {
Ok(header) => header,
Err(e) => {
return match e.kind() {
ErrorKind::Parse
| ErrorKind::MagicMismatch
| ErrorKind::ChecksumMismatch
| ErrorKind::OutOfRange => {
tracing::warn!(
hash,
?addr,
?e,
"[block engine load]: deserialize read buffer raise error, remove this entry and skip"
);
indexer.remove(hash);
Ok(Load::Miss)
}
_ => {
tracing::error!(hash, ?addr, ?e, "[block engine load]: load error");
Err(e)
}
}
}
};
let (key, value) = {
let now = Instant::now();
let res = match EntryDeserializer::deserialize::<K, V>(
&buf[EntryHeader::serialized_len()..],
header.key_len as _,
header.value_len as _,
header.compression,
Some(header.checksum),
) {
Ok(res) => res,
Err(e) => {
return match e.kind() {
ErrorKind::MagicMismatch | ErrorKind::ChecksumMismatch | ErrorKind::OutOfRange => {
tracing::warn!(
hash,
?addr,
?header,
?e,
"[block engine load]: deserialize read buffer raise error, remove this entry and skip"
);
indexer.remove(hash);
Ok(Load::Miss)
}
_ => {
tracing::error!(hash, ?addr, ?header, ?e, "[block engine load]: load error");
Err(e)
}
}
}
};
metrics
.storage_entry_deserialize_duration
.record(now.elapsed().as_secs_f64());
res
};
let age = match block.statistics().probation.load(Ordering::Relaxed) {
true => Age::Old,
false => Age::Young,
};
Ok(Load::Entry {
key,
value,
populated: Populated { age },
})
};
#[cfg(feature = "tracing")]
let load = load.in_span(Span::enter_with_local_parent(
"foyer::storage::engine::block::generic::load",
));
load
}
fn delete(&self, hash: u64) {
if !self.inner.active.load(Ordering::Relaxed) {
tracing::warn!("cannot delete entry after closed");
return;
}
let sequence = self.inner.sequence.fetch_add(1, Ordering::Relaxed);
let stats = self
.inner
.indexer
.insert_tombstone(hash, sequence)
.map(|addr| InvalidStats {
block: addr.block,
size: bits::align_up(PAGE, addr.len as usize),
});
let this = self.clone();
this.inner.flushers[hash as usize % this.inner.flushers.len()].submit(Submission::Tombstone {
tombstone: Tombstone { hash, sequence },
stats,
});
}
fn may_contains(&self, hash: u64) -> bool {
self.inner.indexer.get(hash).is_some()
}
fn destroy(&self) -> BoxFuture<'static, Result<()>> {
let this = self.clone();
async move {
if !this.inner.active.load(Ordering::Relaxed) {
return Err(Error::new(ErrorKind::Closed, "cannot delete entry after closed"));
}
let sequence = this.inner.sequence.fetch_add(1, Ordering::Relaxed);
this.inner.flushers[0].submit(Submission::Tombstone {
tombstone: Tombstone { hash: 0, sequence },
stats: None,
});
this.wait().await;
this.inner.indexer.clear();
try_join_all((0..this.inner.block_manager.blocks() as BlockId).map(|id| {
let block = this.inner.block_manager.block(id).clone();
async move {
let res = BlockCleaner::clean(&block).await;
block.statistics().reset();
res
}
}))
.await?;
Ok(())
}
.boxed()
}
#[cfg(any(test, feature = "test_utils"))]
pub fn hold_flush(&self) {
self.inner.flush_switch.on();
}
#[cfg(any(test, feature = "test_utils"))]
pub fn unhold_flush(&self) {
self.inner.flush_switch.off();
}
}
impl<K, V, P> Engine<K, V, P> for BlockEngine<K, V, P>
where
K: StorageKey,
V: StorageValue,
P: Properties,
{
fn device(&self) -> &Arc<dyn Device> {
&self.inner.device
}
fn filter(&self, hash: u64, estimated_size: usize) -> StorageFilterResult {
self.inner
.admission_filter
.filter(self.inner.device.statistics(), hash, estimated_size)
}
fn enqueue(&self, piece: PieceRef<K, V, P>, estimated_size: usize) {
self.enqueue(piece, estimated_size);
}
fn load(&self, hash: u64) -> BoxFuture<'static, Result<Load<K, V, P>>> {
self.load(hash).boxed()
}
fn delete(&self, hash: u64) {
self.delete(hash);
}
fn may_contains(&self, hash: u64) -> bool {
self.may_contains(hash)
}
fn destroy(&self) -> BoxFuture<'static, Result<()>> {
self.destroy()
}
fn wait(&self) -> BoxFuture<'static, ()> {
self.wait().boxed()
}
fn close(&self) -> BoxFuture<'static, Result<()>> {
self.close()
}
}
#[cfg(test)]
mod tests {
use std::{fs::File, path::Path};
use bytesize::ByteSize;
use foyer_common::hasher::ModHasher;
use foyer_memory::{Cache, CacheBuilder, CacheEntry, FifoConfig, TestProperties};
use itertools::Itertools;
use super::*;
use crate::{
engine::RecoverMode,
io::{
device::{combined::CombinedDeviceBuilder, fs::FsDeviceBuilder, DeviceBuilder},
engine::{IoEngine, IoEngineBuildContext, IoEngineConfig},
},
serde::EntrySerializer,
test_utils::Biased,
PsyncIoEngineConfig, RejectAll,
};
const KB: usize = 1024;
fn cache_for_test() -> Cache<u64, Vec<u8>, ModHasher, TestProperties> {
CacheBuilder::new(10)
.with_shards(1)
.with_eviction_config(FifoConfig::default())
.with_hash_builder(ModHasher::default())
.build()
}
async fn io_engine_for_test(spawner: Spawner) -> Arc<dyn IoEngine> {
PsyncIoEngineConfig::new()
.boxed()
.build(IoEngineBuildContext { spawner })
.await
.unwrap()
}
async fn engine_for_test(dir: impl AsRef<Path>) -> Arc<BlockEngine<u64, Vec<u8>, TestProperties>> {
store_for_test_with_reinsertion_filter(dir, StorageFilter::new().with_condition(RejectAll)).await
}
async fn store_for_test_with_reinsertion_filter(
dir: impl AsRef<Path>,
reinsertion_filter: StorageFilter,
) -> Arc<BlockEngine<u64, Vec<u8>, TestProperties>> {
let device = FsDeviceBuilder::new(dir)
.with_capacity(ByteSize::kib(64).as_u64() as _)
.build()
.unwrap();
let spawner = Spawner::current();
let io_engine = io_engine_for_test(spawner.clone()).await;
let metrics = Arc::new(Metrics::noop());
let builder = BlockEngineConfig {
device,
block_size: 16 * 1024,
compression: Compression::None,
indexer_shards: 4,
recover_concurrency: 2,
flushers: 1,
reclaimers: 1,
clean_block_threshold: 1,
admission_filter: StorageFilter::new(),
eviction_pickers: vec![Box::<FifoPicker>::default()],
reinsertion_filter,
enable_tombstone_log: false,
buffer_pool_size: 16 * 1024 * 1024,
blob_index_size: 4 * 1024,
submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
flush_switch: Switch::default(),
load_holder: Holder::default(),
marker: PhantomData,
};
let builder = Box::new(builder);
builder
.build(EngineBuildContext {
io_engine,
metrics,
spawner,
recover_mode: RecoverMode::Strict,
})
.await
.unwrap()
}
async fn store_for_test_with_tombstone_log(
dir: impl AsRef<Path>,
) -> Arc<BlockEngine<u64, Vec<u8>, TestProperties>> {
let device = FsDeviceBuilder::new(dir)
.with_capacity(ByteSize::kib(64).as_u64() as usize + ByteSize::kib(4).as_u64() as usize)
.build()
.unwrap();
let spawner = Spawner::current();
let io_engine = io_engine_for_test(spawner.clone()).await;
let metrics = Arc::new(Metrics::noop());
let builder = BlockEngineConfig {
device,
block_size: 16 * 1024,
compression: Compression::None,
indexer_shards: 4,
recover_concurrency: 2,
flushers: 1,
reclaimers: 1,
clean_block_threshold: 1,
eviction_pickers: vec![Box::<FifoPicker>::default()],
admission_filter: StorageFilter::new(),
reinsertion_filter: StorageFilter::new().with_condition(RejectAll),
enable_tombstone_log: true,
buffer_pool_size: 16 * 1024 * 1024,
blob_index_size: 4 * 1024,
submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
flush_switch: Switch::default(),
load_holder: Holder::default(),
marker: PhantomData,
};
let builder = Box::new(builder);
builder
.build(EngineBuildContext {
io_engine,
metrics,
spawner,
recover_mode: RecoverMode::Strict,
})
.await
.unwrap()
}
fn enqueue(
store: &BlockEngine<u64, Vec<u8>, TestProperties>,
entry: CacheEntry<u64, Vec<u8>, ModHasher, TestProperties>,
) {
let estimated_size = EntrySerializer::estimated_size(entry.key(), entry.value());
store.enqueue(entry.piece().into(), estimated_size);
}
#[test_log::test(tokio::test)]
async fn test_store_enqueue_lookup_recovery() {
let dir = tempfile::tempdir().unwrap();
let memory = cache_for_test();
let store = engine_for_test(dir.path()).await;
store.hold_flush();
let e1 = memory.insert(1, vec![1; 7 * KB]);
let e2 = memory.insert(2, vec![2; 3 * KB]);
enqueue(&store, e1.clone());
enqueue(&store, e2);
store.unhold_flush();
store.wait().await;
let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
assert_eq!(r1, (1, vec![1; 7 * KB]));
let r2 = store.load(memory.hash(&2)).await.unwrap().kv().unwrap();
assert_eq!(r2, (2, vec![2; 3 * KB]));
store.hold_flush();
let e3 = memory.insert(3, vec![3; 7 * KB]);
let e4 = memory.insert(4, vec![4; 2 * KB]);
enqueue(&store, e3);
enqueue(&store, e4);
store.unhold_flush();
store.wait().await;
let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
assert_eq!(r1, (1, vec![1; 7 * KB]));
let r2 = store.load(memory.hash(&2)).await.unwrap().kv().unwrap();
assert_eq!(r2, (2, vec![2; 3 * KB]));
let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
assert_eq!(r3, (3, vec![3; 7 * KB]));
let r4 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
assert_eq!(r4, (4, vec![4; 2 * KB]));
let e5 = memory.insert(5, vec![5; 11 * KB]);
enqueue(&store, e5);
store.wait().await;
let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
assert_eq!(r1, (1, vec![1; 7 * KB]));
let r2 = store.load(memory.hash(&2)).await.unwrap().kv().unwrap();
assert_eq!(r2, (2, vec![2; 3 * KB]));
let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
assert_eq!(r3, (3, vec![3; 7 * KB]));
let r4 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
assert_eq!(r4, (4, vec![4; 2 * KB]));
let r5 = store.load(memory.hash(&5)).await.unwrap().kv().unwrap();
assert_eq!(r5, (5, vec![5; 11 * KB]));
store.hold_flush();
let e6 = memory.insert(6, vec![6; 7 * KB]);
let e4v2 = memory.insert(4, vec![!4; 3 * KB]);
enqueue(&store, e6);
enqueue(&store, e4v2);
store.unhold_flush();
store.wait().await;
assert!(store.load(memory.hash(&1)).await.unwrap().kv().is_none());
assert!(store.load(memory.hash(&2)).await.unwrap().kv().is_none());
let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
assert_eq!(r3, (3, vec![3; 7 * KB]));
let r4v2 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
assert_eq!(r4v2, (4, vec![!4; 3 * KB]));
let r5 = store.load(memory.hash(&5)).await.unwrap().kv().unwrap();
assert_eq!(r5, (5, vec![5; 11 * KB]));
let r6 = store.load(memory.hash(&6)).await.unwrap().kv().unwrap();
assert_eq!(r6, (6, vec![6; 7 * KB]));
store.close().await.unwrap();
enqueue(&store, e1);
store.wait().await;
drop(store);
let store = engine_for_test(dir.path()).await;
assert!(store.load(memory.hash(&1)).await.unwrap().kv().is_none());
assert!(store.load(memory.hash(&2)).await.unwrap().kv().is_none());
let r3 = store.load(memory.hash(&3)).await.unwrap().kv().unwrap();
assert_eq!(r3, (3, vec![3; 7 * KB]));
let r4v2 = store.load(memory.hash(&4)).await.unwrap().kv().unwrap();
assert_eq!(r4v2, (4, vec![!4; 3 * KB]));
let r5 = store.load(memory.hash(&5)).await.unwrap().kv().unwrap();
assert_eq!(r5, (5, vec![5; 11 * KB]));
let r6 = store.load(memory.hash(&6)).await.unwrap().kv().unwrap();
assert_eq!(r6, (6, vec![6; 7 * KB]));
}
#[test_log::test(tokio::test)]
async fn test_store_delete_recovery() {
let dir = tempfile::tempdir().unwrap();
let memory = cache_for_test();
let store = store_for_test_with_tombstone_log(dir.path()).await;
let es = (0..10).map(|i| memory.insert(i, vec![i as u8; 3 * KB])).collect_vec();
for e in es.iter().take(9) {
enqueue(&store, e.clone());
}
store.wait().await;
for i in 0..9 {
assert_eq!(
store.load(memory.hash(&i)).await.unwrap().kv(),
Some((i, vec![i as u8; 3 * KB]))
);
}
store.delete(memory.hash(&3));
store.wait().await;
assert_eq!(store.load(memory.hash(&3)).await.unwrap().kv(), None);
store.close().await.unwrap();
drop(store);
let store = store_for_test_with_tombstone_log(dir.path()).await;
for i in 0..9 {
if i != 3 {
assert_eq!(
store.load(memory.hash(&i)).await.unwrap().kv(),
Some((i, vec![i as u8; 3 * KB]))
);
} else {
assert_eq!(store.load(memory.hash(&3)).await.unwrap().kv(), None);
}
}
enqueue(&store, es[3].clone());
store.wait().await;
assert_eq!(
store.load(memory.hash(&3)).await.unwrap().kv(),
Some((3, vec![3; 3 * KB]))
);
store.close().await.unwrap();
drop(store);
let store = store_for_test_with_tombstone_log(dir.path()).await;
assert_eq!(
store.load(memory.hash(&3)).await.unwrap().kv(),
Some((3, vec![3; 3 * KB]))
);
}
#[test_log::test(tokio::test)]
async fn test_store_destroy_recovery() {
let dir = tempfile::tempdir().unwrap();
let memory = cache_for_test();
let store = store_for_test_with_tombstone_log(dir.path()).await;
let es = (0..10).map(|i| memory.insert(i, vec![i as u8; 3 * KB])).collect_vec();
store.hold_flush();
for e in es.iter().take(9) {
enqueue(&store, e.clone());
}
store.unhold_flush();
store.wait().await;
for i in 0..9 {
assert_eq!(
store.load(memory.hash(&i)).await.unwrap().kv(),
Some((i, vec![i as u8; 3 * KB]))
);
}
store.delete(memory.hash(&3));
store.wait().await;
assert_eq!(store.load(memory.hash(&3)).await.unwrap().kv(), None);
store.destroy().await.unwrap();
store.close().await.unwrap();
drop(store);
let store = store_for_test_with_tombstone_log(dir.path()).await;
for i in 0..9 {
assert_eq!(store.load(memory.hash(&i)).await.unwrap().kv(), None);
}
enqueue(&store, es[3].clone());
store.wait().await;
assert_eq!(
store.load(memory.hash(&3)).await.unwrap().kv(),
Some((3, vec![3; 3 * KB]))
);
store.close().await.unwrap();
drop(store);
let store = store_for_test_with_tombstone_log(dir.path()).await;
assert_eq!(
store.load(memory.hash(&3)).await.unwrap().kv(),
Some((3, vec![3; 3 * KB]))
);
}
#[test_log::test(tokio::test)]
async fn test_store_reinsertion() {
let dir = tempfile::tempdir().unwrap();
let memory = cache_for_test();
let store = store_for_test_with_reinsertion_filter(
dir.path(),
StorageFilter::new().with_condition(Biased::new(vec![1, 3, 5, 7, 9, 11, 13, 15, 17, 19])),
)
.await;
let es = (0..15).map(|i| memory.insert(i, vec![i as u8; 3 * KB])).collect_vec();
for e in es.iter().take(9).cloned() {
enqueue(&store, e);
store.wait().await;
}
for i in 0..9 {
let r = store.load(memory.hash(&i)).await.unwrap().kv().unwrap();
assert_eq!(r, (i, vec![i as u8; 3 * KB]));
}
enqueue(&store, es[9].clone());
enqueue(&store, es[10].clone());
store.wait().await;
let mut res = vec![];
for i in 0..11 {
res.push(store.load(memory.hash(&i)).await.unwrap().kv());
}
assert_eq!(
res,
vec![
None,
Some((1, vec![1; 3 * KB])),
None,
Some((3, vec![3; 3 * KB])),
Some((4, vec![4; 3 * KB])),
Some((5, vec![5; 3 * KB])),
Some((6, vec![6; 3 * KB])),
Some((7, vec![7; 3 * KB])),
Some((8, vec![8; 3 * KB])),
Some((9, vec![9; 3 * KB])),
Some((10, vec![10; 3 * KB])),
]
);
enqueue(&store, es[11].clone());
store.wait().await;
let mut res = vec![];
for i in 0..12 {
res.push(store.load(memory.hash(&i)).await.unwrap().kv());
}
assert_eq!(
res,
vec![
None,
Some((1, vec![1; 3 * KB])),
None,
Some((3, vec![3; 3 * KB])),
None,
Some((5, vec![5; 3 * KB])),
Some((6, vec![6; 3 * KB])),
Some((7, vec![7; 3 * KB])),
Some((8, vec![8; 3 * KB])),
Some((9, vec![9; 3 * KB])),
Some((10, vec![10; 3 * KB])),
Some((11, vec![11; 3 * KB])),
]
);
store.delete(memory.hash(&7));
store.wait().await;
enqueue(&store, es[12].clone());
store.wait().await;
enqueue(&store, es[13].clone());
store.wait().await;
enqueue(&store, es[14].clone());
store.wait().await;
let mut res = vec![];
for i in 0..15 {
res.push(store.load(memory.hash(&i)).await.unwrap().kv());
}
assert_eq!(
res,
vec![
None,
Some((1, vec![1; 3 * KB])),
None,
Some((3, vec![3; 3 * KB])),
None,
Some((5, vec![5; 3 * KB])),
None,
None,
None,
Some((9, vec![9; 3 * KB])),
Some((10, vec![10; 3 * KB])),
Some((11, vec![11; 3 * KB])),
Some((12, vec![12; 3 * KB])),
Some((13, vec![13; 3 * KB])),
Some((14, vec![14; 3 * KB])),
]
);
}
#[test_log::test(tokio::test)]
async fn test_store_magic_checksum_mismatch() {
let dir = tempfile::tempdir().unwrap();
let memory = cache_for_test();
let store = engine_for_test(dir.path()).await;
let e1 = memory.insert(1, vec![1; 7 * KB]);
enqueue(&store, e1);
store.wait().await;
let r1 = store.load(memory.hash(&1)).await.unwrap().kv().unwrap();
assert_eq!(r1, (1, vec![1; 7 * KB]));
for entry in std::fs::read_dir(dir.path()).unwrap() {
let entry = entry.unwrap();
if !entry.metadata().unwrap().is_file() {
continue;
}
let file = File::options().write(true).open(entry.path()).unwrap();
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::FileExt;
file.write_all_at(&[b'x'; 42], 5 * 1024).unwrap();
}
#[cfg(target_family = "windows")]
{
use std::os::windows::fs::FileExt;
file.seek_write(&[b'x'; 42], 5 * 1024).unwrap();
}
}
assert!(store.load(memory.hash(&1)).await.unwrap().kv().is_none());
}
#[test_log::test(tokio::test)]
async fn test_aggregated_device() {
let dir = tempfile::tempdir().unwrap();
const KB: usize = 1024;
const MB: usize = 1024 * 1024;
let spawner = Spawner::current();
let io_engine = io_engine_for_test(spawner.clone()).await;
let d1 = FsDeviceBuilder::new(dir.path().join("dev1"))
.with_capacity(MB)
.build()
.unwrap();
let d2 = FsDeviceBuilder::new(dir.path().join("dev2"))
.with_capacity(2 * MB)
.build()
.unwrap();
let d3 = FsDeviceBuilder::new(dir.path().join("dev3"))
.with_capacity(4 * MB)
.build()
.unwrap();
let device = CombinedDeviceBuilder::new()
.with_device(d1)
.with_device(d2)
.with_device(d3)
.build()
.unwrap();
let engine = BlockEngineConfig::<u64, Vec<u8>, TestProperties>::new(device)
.with_block_size(64 * KB)
.boxed()
.build(EngineBuildContext {
io_engine,
metrics: Arc::new(Metrics::noop()),
spawner,
recover_mode: RecoverMode::None,
})
.await
.unwrap();
assert_eq!(engine.inner.block_manager.blocks(), (1 + 2 + 4) * MB / (64 * KB));
}
}