#include "db/db_test_util.h"
#include "file/file_prefetch_buffer.h"
#include "file/file_util.h"
#include "rocksdb/file_system.h"
#include "test_util/sync_point.h"
#ifdef GFLAGS
#include "tools/io_tracer_parser_tool.h"
#endif
#include "rocksdb/flush_block_policy.h"
#include "util/random.h"
namespace {
static bool enable_io_uring = true;
extern "C" bool RocksDbIOUringEnable() { return enable_io_uring; }
}
namespace ROCKSDB_NAMESPACE {
class MockFS;
class MockRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile>& file,
bool support_prefetch, std::atomic_int& prefetch_count,
bool small_buffer_alignment = false)
: FSRandomAccessFileOwnerWrapper(std::move(file)),
support_prefetch_(support_prefetch),
prefetch_count_(prefetch_count),
small_buffer_alignment_(small_buffer_alignment) {}
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) override {
if (support_prefetch_) {
prefetch_count_.fetch_add(1);
return target()->Prefetch(offset, n, options, dbg);
} else {
return IOStatus::NotSupported("Prefetch not supported");
}
}
size_t GetRequiredBufferAlignment() const override {
return small_buffer_alignment_
? 1
: FSRandomAccessFileOwnerWrapper::GetRequiredBufferAlignment();
}
private:
const bool support_prefetch_;
std::atomic_int& prefetch_count_;
const bool small_buffer_alignment_;
};
class MockFS : public FileSystemWrapper {
public:
explicit MockFS(const std::shared_ptr<FileSystem>& wrapped,
bool support_prefetch, bool small_buffer_alignment = false)
: FileSystemWrapper(wrapped),
support_prefetch_(support_prefetch),
small_buffer_alignment_(small_buffer_alignment) {}
static const char* kClassName() { return "MockFS"; }
const char* Name() const override { return kClassName(); }
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
std::unique_ptr<FSRandomAccessFile> file;
IOStatus s;
s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
result->reset(new MockRandomAccessFile(
file, support_prefetch_, prefetch_count_, small_buffer_alignment_));
return s;
}
void ClearPrefetchCount() { prefetch_count_ = 0; }
bool IsPrefetchCalled() { return prefetch_count_ > 0; }
int GetPrefetchCount() {
return prefetch_count_.load(std::memory_order_relaxed);
}
private:
const bool support_prefetch_;
const bool small_buffer_alignment_;
std::atomic_int prefetch_count_{0};
};
class PrefetchTest
: public DBTestBase,
public ::testing::WithParamInterface<std::tuple<bool, bool>> {
public:
PrefetchTest() : DBTestBase("prefetch_test", true) {}
virtual void SetGenericOptions(Env* env, bool use_direct_io,
Options& options) {
anon::OptionsOverride options_override;
options_override.full_block_cache = true;
options = CurrentOptions(options_override);
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env;
options.disable_auto_compactions = true;
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
}
void SetBlockBasedTableOptions(BlockBasedTableOptions& table_options) {
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
void VerifyScan(ReadOptions& iter_ro, ReadOptions& cmp_iter_ro,
const Slice* seek_key, const Slice* iterate_upper_bound,
bool prefix_same_as_start) const {
assert(!(seek_key == nullptr));
iter_ro.iterate_upper_bound = cmp_iter_ro.iterate_upper_bound =
iterate_upper_bound;
iter_ro.prefix_same_as_start = cmp_iter_ro.prefix_same_as_start =
prefix_same_as_start;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(iter_ro));
auto cmp_iter = std::unique_ptr<Iterator>(db_->NewIterator(cmp_iter_ro));
iter->Seek(*seek_key);
cmp_iter->Seek(*seek_key);
while (iter->Valid() && cmp_iter->Valid()) {
if (iter->key() != cmp_iter->key()) {
ASSERT_TRUE(false);
}
iter->Next();
cmp_iter->Next();
}
ASSERT_TRUE(!cmp_iter->Valid() && !iter->Valid());
ASSERT_TRUE(cmp_iter->status().ok() && iter->status().ok());
}
void VerifySeekPrevSeek(ReadOptions& iter_ro, ReadOptions& cmp_iter_ro,
const Slice* seek_key,
const Slice* iterate_upper_bound,
bool prefix_same_as_start) {
assert(!(seek_key == nullptr));
iter_ro.iterate_upper_bound = cmp_iter_ro.iterate_upper_bound =
iterate_upper_bound;
iter_ro.prefix_same_as_start = cmp_iter_ro.prefix_same_as_start =
prefix_same_as_start;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(iter_ro));
auto cmp_iter = std::unique_ptr<Iterator>(db_->NewIterator(cmp_iter_ro));
cmp_iter->Seek(*seek_key);
ASSERT_TRUE(cmp_iter->Valid());
ASSERT_OK(cmp_iter->status());
iter->Seek(*seek_key);
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), cmp_iter->key());
cmp_iter->Prev();
ASSERT_TRUE(cmp_iter->Valid());
ASSERT_OK(cmp_iter->status());
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), cmp_iter->key());
cmp_iter->Seek(*seek_key);
ASSERT_TRUE(cmp_iter->Valid());
ASSERT_OK(cmp_iter->status());
iter->Seek(*seek_key);
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), cmp_iter->key());
}
};
INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
std::string BuildKey(int num, std::string postfix = "") {
return "my_key_" + std::to_string(num) + postfix;
}
TEST_P(PrefetchTest, Basic) {
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
bool use_direct_io = std::get<1>(GetParam());
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
const int kNumKeys = 1100;
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), "v1"));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(db_->Flush(FlushOptions()));
batch.Clear();
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i, "key2"), "v2"));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(db_->Flush(FlushOptions()));
batch.Clear();
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Delete(BuildKey(i, "key2")));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(db_->Flush(FlushOptions()));
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
const size_t num_file = metadata.size();
if (support_prefetch && !use_direct_io) {
ASSERT_TRUE(fs->IsPrefetchCalled());
ASSERT_EQ(num_file, fs->GetPrefetchCount());
ASSERT_EQ(0, buff_prefetch_count);
fs->ClearPrefetchCount();
} else {
ASSERT_FALSE(fs->IsPrefetchCalled());
ASSERT_EQ(buff_prefetch_count, num_file);
buff_prefetch_count = 0;
}
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
HistogramData prev_table_open_prefetch_tail_read;
options.statistics->histogramData(TABLE_OPEN_PREFETCH_TAIL_READ_BYTES,
&prev_table_open_prefetch_tail_read);
const uint64_t prev_table_open_prefetch_tail_miss =
options.statistics->getTickerCount(TABLE_OPEN_PREFETCH_TAIL_MISS);
const uint64_t prev_table_open_prefetch_tail_hit =
options.statistics->getTickerCount(TABLE_OPEN_PREFETCH_TAIL_HIT);
HistogramData pre_compaction_prefetch_bytes;
options.statistics->histogramData(COMPACTION_PREFETCH_BYTES,
&pre_compaction_prefetch_bytes);
ASSERT_EQ(pre_compaction_prefetch_bytes.count, 0);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
HistogramData post_compaction_prefetch_bytes;
options.statistics->histogramData(COMPACTION_PREFETCH_BYTES,
&post_compaction_prefetch_bytes);
HistogramData cur_table_open_prefetch_tail_read;
options.statistics->histogramData(TABLE_OPEN_PREFETCH_TAIL_READ_BYTES,
&cur_table_open_prefetch_tail_read);
const uint64_t cur_table_open_prefetch_tail_miss =
options.statistics->getTickerCount(TABLE_OPEN_PREFETCH_TAIL_MISS);
const uint64_t cur_table_open_prefetch_tail_hit =
options.statistics->getTickerCount(TABLE_OPEN_PREFETCH_TAIL_HIT);
if (support_prefetch && !use_direct_io) {
ASSERT_TRUE(fs->IsPrefetchCalled());
ASSERT_GT(fs->GetPrefetchCount(), 1);
ASSERT_EQ(0, buff_prefetch_count);
fs->ClearPrefetchCount();
ASSERT_EQ(post_compaction_prefetch_bytes.count, 0);
} else {
ASSERT_FALSE(fs->IsPrefetchCalled());
ASSERT_GT(buff_prefetch_count, 1);
buff_prefetch_count = 0;
ASSERT_GT(cur_table_open_prefetch_tail_read.count,
prev_table_open_prefetch_tail_read.count);
ASSERT_GT(cur_table_open_prefetch_tail_hit,
prev_table_open_prefetch_tail_hit);
ASSERT_GE(cur_table_open_prefetch_tail_miss,
prev_table_open_prefetch_tail_miss);
ASSERT_GT(post_compaction_prefetch_bytes.count, 0);
auto expected_compaction_readahead_size =
Options().compaction_readahead_size;
ASSERT_LE(post_compaction_prefetch_bytes.max,
expected_compaction_readahead_size * 1.1);
ASSERT_GE(post_compaction_prefetch_bytes.max,
expected_compaction_readahead_size * 0.9);
ASSERT_LE(post_compaction_prefetch_bytes.average,
expected_compaction_readahead_size * 1.1);
ASSERT_GE(post_compaction_prefetch_bytes.average,
expected_compaction_readahead_size * 0.9);
}
for (bool disable_io : {false, true}) {
SCOPED_TRACE("disable_io: " + std::to_string(disable_io));
ReadOptions ro;
if (disable_io) {
ro.read_tier = ReadTier::kBlockCacheTier;
}
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
num_keys++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(num_keys, kNumKeys);
}
if (disable_io) {
ASSERT_FALSE(fs->IsPrefetchCalled());
ASSERT_EQ(0, buff_prefetch_count);
} else if (support_prefetch && !use_direct_io) {
ASSERT_TRUE(fs->IsPrefetchCalled());
fs->ClearPrefetchCount();
ASSERT_EQ(0, buff_prefetch_count);
} else {
ASSERT_FALSE(fs->IsPrefetchCalled());
ASSERT_GT(buff_prefetch_count, 0);
buff_prefetch_count = 0;
}
}
Close();
}
class PrefetchTailTest : public PrefetchTest {
public:
bool SupportPrefetch() const {
return std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
}
bool UseDirectIO() const { return std::get<1>(GetParam()); }
bool UseFilePrefetchBuffer() const {
return !SupportPrefetch() || UseDirectIO();
}
Env* GetEnv(bool small_buffer_alignment = false) const {
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
env_->GetFileSystem(), SupportPrefetch(), small_buffer_alignment);
return new CompositeEnvWrapper(env_, fs);
}
void SetGenericOptions(Env* env, bool use_direct_io,
Options& options) override {
PrefetchTest::SetGenericOptions(env, use_direct_io, options);
options.statistics = CreateDBStatistics();
}
void SetBlockBasedTableOptions(
BlockBasedTableOptions& table_options, bool partition_filters = true,
uint64_t metadata_block_size =
BlockBasedTableOptions().metadata_block_size,
bool use_small_cache = false) {
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
table_options.partition_filters = partition_filters;
if (table_options.partition_filters) {
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
}
table_options.metadata_block_size = metadata_block_size;
if (use_small_cache) {
LRUCacheOptions co;
co.capacity = 1;
std::shared_ptr<Cache> cache = NewLRUCache(co);
table_options.block_cache = cache;
}
}
int64_t GetNumIndexPartition() const {
int64_t index_partition_counts = 0;
TablePropertiesCollection all_table_props;
assert(db_->GetPropertiesOfAllTables(&all_table_props).ok());
for (const auto& name_and_table_props : all_table_props) {
const auto& table_props = name_and_table_props.second;
index_partition_counts += table_props->index_partitions;
}
return index_partition_counts;
}
};
INSTANTIATE_TEST_CASE_P(PrefetchTailTest, PrefetchTailTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
TEST_P(PrefetchTailTest, Basic) {
std::unique_ptr<Env> env(GetEnv());
Options options;
SetGenericOptions(env.get(), UseDirectIO(), options);
BlockBasedTableOptions bbto;
SetBlockBasedTableOptions(bbto);
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Status s = TryReopen(options);
if (UseDirectIO() && (s.IsNotSupported() || s.IsInvalidArgument())) {
ROCKSDB_GTEST_BYPASS("Direct IO is not supported");
return;
} else {
ASSERT_OK(s);
}
ASSERT_OK(Put("k1", "v1"));
HistogramData pre_flush_file_read;
options.statistics->histogramData(FILE_READ_FLUSH_MICROS,
&pre_flush_file_read);
ASSERT_OK(Flush());
HistogramData post_flush_file_read;
options.statistics->histogramData(FILE_READ_FLUSH_MICROS,
&post_flush_file_read);
if (UseFilePrefetchBuffer()) {
ASSERT_EQ(post_flush_file_read.count - pre_flush_file_read.count, 1);
} else {
ASSERT_GT(post_flush_file_read.count - pre_flush_file_read.count, 1);
}
ASSERT_OK(Put("k1", "v2"));
ASSERT_OK(Put("k2", "v2"));
ASSERT_OK(Flush());
CompactRangeOptions cro;
HistogramData pre_compaction_file_read;
options.statistics->histogramData(FILE_READ_COMPACTION_MICROS,
&pre_compaction_file_read);
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
HistogramData post_compaction_file_read;
options.statistics->histogramData(FILE_READ_COMPACTION_MICROS,
&post_compaction_file_read);
if (UseFilePrefetchBuffer()) {
ASSERT_EQ(post_compaction_file_read.count - pre_compaction_file_read.count,
3);
} else {
ASSERT_GT(post_compaction_file_read.count - pre_compaction_file_read.count,
3);
}
Close();
}
TEST_P(PrefetchTailTest, UpgradeToTailSizeInManifest) {
if (!UseFilePrefetchBuffer()) {
ROCKSDB_GTEST_BYPASS(
"Upgrade to tail size in manifest is only relevant when RocksDB file "
"prefetch buffer is used.");
}
if (UseDirectIO()) {
ROCKSDB_GTEST_BYPASS(
"To simplify testing logics with setting file's buffer alignment to "
"be "
"1, direct IO is required to be disabled.");
}
std::unique_ptr<Env> env(GetEnv(true ));
Options options;
SetGenericOptions(env.get(), false , options);
options.max_open_files = -1;
options.write_buffer_size = 1024 * 1024;
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options, false ,
1 ,
true );
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->SetCallBack(
"FileMetaData::FileMetaData", [&](void* arg) {
FileMetaData* meta = static_cast<FileMetaData*>(arg);
meta->tail_size = 0;
});
ASSERT_OK(TryReopen(options));
for (int i = 0; i < 10000; ++i) {
ASSERT_OK(Put("k" + std::to_string(i), "v"));
}
ASSERT_OK(Flush());
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) {
std::pair<size_t*, size_t*>* prefetch_off_len_pair =
static_cast<std::pair<size_t*, size_t*>*>(arg);
size_t* prefetch_off = prefetch_off_len_pair->first;
size_t* tail_size = prefetch_off_len_pair->second;
const size_t file_size = *prefetch_off + *tail_size;
*tail_size = 1;
*prefetch_off = file_size - (*tail_size);
});
ASSERT_OK(TryReopen(options));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
HistogramData db_open_file_read;
options.statistics->histogramData(FILE_READ_DB_OPEN_MICROS,
&db_open_file_read);
int64_t num_index_partition = GetNumIndexPartition();
ASSERT_LT(db_open_file_read.count, num_index_partition);
Close();
}
TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
bool use_direct_io = std::get<1>(GetParam());
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
table_options.max_auto_readahead_size = 0;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = (int*)arg;
*max_open_files = 11;
});
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
Random rnd(309);
int key_count = 0;
const int num_keys_per_level = 100;
for (int level = 2; level >= 0; level--) {
key_count = level * num_keys_per_level;
for (int i = 0; i < num_keys_per_level; ++i) {
ASSERT_OK(Put(Key(key_count++), rnd.RandomString(500)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(level);
}
Close();
std::vector<int> buff_prefetch_level_count = {0, 0, 0};
ASSERT_OK(TryReopen(options));
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
fs->ClearPrefetchCount();
buff_prefetch_count = 0;
for (int level = 2; level >= 0; level--) {
key_count = level * num_keys_per_level;
switch (level) {
case 0:
ASSERT_OK(db_->SetOptions(
{{"block_based_table_factory", "{max_auto_readahead_size=0;}"}}));
break;
case 1:
ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
"{max_auto_readahead_size=4096;}"}}));
break;
case 2:
ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
"{max_auto_readahead_size=65536;}"}}));
break;
default:
assert(false);
}
ASSERT_OK(iter->status());
ASSERT_OK(iter->Refresh());
for (int i = 0; i < num_keys_per_level; ++i) {
iter->Seek(Key(key_count++));
iter->Next();
}
buff_prefetch_level_count[level] = buff_prefetch_count;
if (support_prefetch && !use_direct_io) {
if (level == 0) {
ASSERT_FALSE(fs->IsPrefetchCalled());
} else {
ASSERT_TRUE(fs->IsPrefetchCalled());
}
fs->ClearPrefetchCount();
} else {
ASSERT_FALSE(fs->IsPrefetchCalled());
if (level == 0) {
ASSERT_EQ(buff_prefetch_count, 0);
} else {
ASSERT_GT(buff_prefetch_count, 0);
}
buff_prefetch_count = 0;
}
}
}
if (!support_prefetch) {
ASSERT_GT(buff_prefetch_level_count[1], buff_prefetch_level_count[2]);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) {
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
bool use_direct_io = std::get<1>(GetParam());
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
table_options.initial_auto_readahead_size = 0;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = (int*)arg;
*max_open_files = 11;
});
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
Random rnd(309);
int key_count = 0;
const int num_keys_per_level = 100;
for (int level = 2; level >= 0; level--) {
key_count = level * num_keys_per_level;
for (int i = 0; i < num_keys_per_level; ++i) {
ASSERT_OK(Put(Key(key_count++), rnd.RandomString(500)));
}
ASSERT_OK(Flush());
MoveFilesToLevel(level);
}
Close();
ASSERT_OK(TryReopen(options));
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
fs->ClearPrefetchCount();
buff_prefetch_count = 0;
std::vector<int> buff_prefetch_level_count = {0, 0, 0};
for (int level = 2; level >= 0; level--) {
key_count = level * num_keys_per_level;
switch (level) {
case 0:
ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
"{initial_auto_readahead_size=0;}"}}));
break;
case 1:
ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
"{initial_auto_readahead_size=4096;max_"
"auto_readahead_size=4096;}"}}));
break;
case 2:
ASSERT_OK(
db_->SetOptions({{"block_based_table_factory",
"{initial_auto_readahead_size=65536;}"}}));
break;
default:
assert(false);
}
ASSERT_OK(iter->status());
ASSERT_OK(iter->Refresh());
for (int i = 0; i < num_keys_per_level; ++i) {
iter->Seek(Key(key_count++));
iter->Next();
}
ASSERT_OK(iter->status());
buff_prefetch_level_count[level] = buff_prefetch_count;
if (support_prefetch && !use_direct_io) {
if (level == 0) {
ASSERT_FALSE(fs->IsPrefetchCalled());
} else {
ASSERT_TRUE(fs->IsPrefetchCalled());
}
fs->ClearPrefetchCount();
} else {
ASSERT_FALSE(fs->IsPrefetchCalled());
if (level == 0) {
ASSERT_EQ(buff_prefetch_count, 0);
} else {
ASSERT_GT(buff_prefetch_count, 0);
}
buff_prefetch_count = 0;
}
}
if (!support_prefetch) {
ASSERT_GT(buff_prefetch_level_count[1], buff_prefetch_level_count[2]);
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) {
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
const int kNumKeys = 2000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<1>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
table_options.num_file_reads_for_auto_readahead = 0;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
Close();
ASSERT_OK(TryReopen(options));
fs->ClearPrefetchCount();
buff_prefetch_count = 0;
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1000)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1004)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1008)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1011)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1015)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1019)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(103)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1033)); ASSERT_TRUE(iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 4);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 4);
buff_prefetch_count = 0;
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
TEST_P(PrefetchTest, PrefetchWhenReseek) {
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
const int kNumKeys = 2000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<1>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
fs->ClearPrefetchCount();
buff_prefetch_count = 0;
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1000));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1004)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1008));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1011));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1015)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1019));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(103)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1033)); ASSERT_TRUE(iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 3);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 3);
buff_prefetch_count = 0;
}
}
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1008));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1019));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1033));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1048));
ASSERT_TRUE(iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 0);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 0);
buff_prefetch_count = 0;
}
}
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(10));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(100));
ASSERT_TRUE(iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 0);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 0);
buff_prefetch_count = 0;
}
}
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1000));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1004)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1008));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(996)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(992));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(989));
ASSERT_TRUE(iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 1);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 1);
buff_prefetch_count = 0;
}
iter->Seek(BuildKey(1011));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1015));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1019)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1022));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1026));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(103)); ASSERT_TRUE(iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 2);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 2);
buff_prefetch_count = 0;
}
}
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1167));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1334)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1499));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1667));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1847));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1999));
ASSERT_TRUE(iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 1);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 1);
buff_prefetch_count = 0;
}
}
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
int i = 0;
int j = 1000;
do {
iter->Seek(BuildKey(i));
if (!iter->Valid()) {
ASSERT_OK(iter->status());
break;
}
i = i + 100;
iter->Seek(BuildKey(j));
j = j + 100;
} while (i < 1000 && j < kNumKeys && iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 0);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 0);
buff_prefetch_count = 0;
}
}
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
}
ASSERT_OK(iter->status());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 13);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 13);
buff_prefetch_count = 0;
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
const int kNumKeys = 2000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<1>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); table_options.block_cache = cache;
table_options.no_block_cache = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
fs->ClearPrefetchCount();
buff_prefetch_count = 0;
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(1011));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1015));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1019));
ASSERT_TRUE(iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 1);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 1);
buff_prefetch_count = 0;
}
}
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1000));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1004)); ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1011));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1015));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1019));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1022));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1026));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(103));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1033));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1037));
ASSERT_TRUE(iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 3);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 2);
buff_prefetch_count = 0;
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
TEST_P(PrefetchTest, PrefetchWithBlockLookupAutoTuneTest) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(FileSystem::Default(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), false, options);
options.statistics = CreateDBStatistics();
const std::string prefix = "my_key_";
options.prefix_extractor.reset(NewFixedPrefixTransform(prefix.size()));
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
ASSERT_OK(s);
Random rnd(309);
WriteBatch batch;
for (int i = 0; i < 26; i++) {
std::string key = prefix;
for (int j = 0; j < 10; j++) {
key += char('a' + i);
ASSERT_OK(batch.Put(key, rnd.RandomString(1000)));
}
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = prefix + "a";
std::string end_key = prefix;
for (int j = 0; j < 10; j++) {
end_key += char('a' + 25);
}
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
for (size_t i = 0; i < 3; i++) {
std::shared_ptr<Cache> cache = NewLRUCache(1024 * 1024, 2);
table_options.block_cache = cache;
table_options.no_block_cache = false;
table_options.num_file_reads_for_auto_readahead = i;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
s = TryReopen(options);
ASSERT_OK(s);
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(prefix + "bbb");
ASSERT_TRUE(iter->Valid());
iter->Seek(prefix + "ccccccccc");
ASSERT_TRUE(iter->Valid());
iter->Seek(prefix + "ddd");
ASSERT_TRUE(iter->Valid());
iter->Seek(prefix + "ddddddd");
ASSERT_TRUE(iter->Valid());
iter->Seek(prefix + "e");
ASSERT_TRUE(iter->Valid());
iter->Seek(prefix + "eeeee");
ASSERT_TRUE(iter->Valid());
iter->Seek(prefix + "eeeeeeeee");
ASSERT_TRUE(iter->Valid());
}
ReadOptions ropts;
ReadOptions cmp_ro;
if (std::get<0>(GetParam())) {
ropts.readahead_size = cmp_ro.readahead_size = 32768;
}
if (std::get<1>(GetParam())) {
ropts.async_io = true;
}
ropts.auto_readahead_size = true;
cmp_ro.auto_readahead_size = false;
ASSERT_OK(options.statistics->Reset());
const std::string seek_key_str = prefix + "aaa";
const Slice seek_key(seek_key_str);
const std::string ub_str = prefix + "uuu";
const Slice ub(ub_str);
VerifyScan(ropts , cmp_ro ,
&seek_key , &ub ,
false );
const std::string seek_key_new_str = prefix + "v";
const Slice seek_key_new(seek_key_new_str);
const std::string ub_new_str = prefix + "y";
const Slice ub_new(ub_new_str);
VerifyScan(ropts , cmp_ro ,
&seek_key_new , &ub_new ,
false );
VerifyScan(ropts , cmp_ro ,
&seek_key , nullptr ,
true );
Close();
}
}
TEST_F(PrefetchTest, PrefetchWithBlockLookupAutoTuneWithPrev) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(FileSystem::Default(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), false, options);
options.statistics = CreateDBStatistics();
const std::string prefix = "my_key_";
options.prefix_extractor.reset(NewFixedPrefixTransform(prefix.size()));
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
std::shared_ptr<Cache> cache = NewLRUCache(1024 * 1024, 2);
table_options.block_cache = cache;
table_options.no_block_cache = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
ASSERT_OK(s);
Random rnd(309);
WriteBatch batch;
for (int i = 0; i < 26; i++) {
std::string key = prefix;
for (int j = 0; j < 10; j++) {
key += char('a' + i);
ASSERT_OK(batch.Put(key, rnd.RandomString(1000)));
}
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = prefix + "a";
std::string end_key = prefix;
for (int j = 0; j < 10; j++) {
end_key += char('a' + 25);
}
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
ReadOptions ropts;
ropts.auto_readahead_size = true;
ReadOptions cmp_readopts = ropts;
cmp_readopts.auto_readahead_size = false;
const std::string seek_key_str = prefix + "bbb";
const Slice seek_key(seek_key_str);
const std::string ub_key = prefix + "uuu";
const Slice ub(ub_key);
VerifySeekPrevSeek(ropts , cmp_readopts ,
&seek_key , &ub ,
false );
VerifySeekPrevSeek(ropts , cmp_readopts ,
&seek_key ,
nullptr ,
true );
Close();
}
class PrefetchTrimReadaheadTestParam
: public DBTestBase,
public ::testing::WithParamInterface<
std::tuple<BlockBasedTableOptions::IndexShorteningMode, bool>> {
public:
const std::string kPrefix = "a_prefix_";
Random rnd = Random(309);
PrefetchTrimReadaheadTestParam()
: DBTestBase("prefetch_trim_readahead_test_param", true) {}
virtual void SetGenericOptions(Env* env, Options& options) {
options = CurrentOptions();
options.env = env;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
options.write_buffer_size = 1024 * 1024 * 1024;
options.prefix_extractor.reset(NewFixedPrefixTransform(kPrefix.size()));
}
void SetBlockBasedTableOptions(BlockBasedTableOptions& table_options) {
table_options.no_block_cache = false;
table_options.index_shortening = std::get<0>(GetParam());
table_options.block_size = 1;
table_options.flush_block_policy_factory.reset(
new FlushBlockBySizePolicyFactory());
}
};
INSTANTIATE_TEST_CASE_P(
PrefetchTrimReadaheadTestParam, PrefetchTrimReadaheadTestParam,
::testing::Combine(
::testing::Values(
BlockBasedTableOptions::IndexShorteningMode::kNoShortening,
BlockBasedTableOptions::IndexShorteningMode::kShortenSeparators,
BlockBasedTableOptions::IndexShorteningMode::
kShortenSeparatorsAndSuccessor),
::testing::Bool()));
TEST_P(PrefetchTrimReadaheadTestParam, PrefixSameAsStart) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const bool auto_readahead_size = std::get<1>(GetParam());
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
FileSystem::Default(), false ,
true );
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), options);
BlockBasedTableOptions table_optoins;
SetBlockBasedTableOptions(table_optoins);
options.table_factory.reset(NewBlockBasedTableFactory(table_optoins));
Status s = TryReopen(options);
ASSERT_OK(s);
WriteBatch prefix_batch;
for (int i = 0; i < 10; i++) {
std::string key = kPrefix + std::to_string(i);
ASSERT_OK(prefix_batch.Put(key, rnd.RandomString(100)));
}
ASSERT_OK(db_->Write(WriteOptions(), &prefix_batch));
WriteBatch diff_prefix_batch;
for (int i = 0; i < 10; i++) {
std::string diff_prefix = std::string(1, char('c' + i)) + kPrefix.substr(1);
std::string key = diff_prefix + std::to_string(i);
ASSERT_OK(diff_prefix_batch.Put(key, rnd.RandomString(100)));
}
ASSERT_OK(db_->Write(WriteOptions(), &diff_prefix_batch));
ASSERT_OK(db_->Flush(FlushOptions()));
ReadOptions ro;
ro.prefix_same_as_start = true;
ro.auto_readahead_size = auto_readahead_size;
ro.readahead_size = 1024 * 1024 * 1024;
ASSERT_OK(options.statistics->Reset());
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
for (iter->Seek(kPrefix); iter->status().ok() && iter->Valid();
iter->Next()) {
}
}
auto readahead_trimmed =
options.statistics->getTickerCount(READAHEAD_TRIMMED);
if (auto_readahead_size) {
ASSERT_GT(readahead_trimmed, 0);
} else {
ASSERT_EQ(readahead_trimmed, 0);
}
Close();
}
TEST_P(PrefetchTest, DBIterLevelReadAhead) {
const int kNumKeys = 1000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
bool is_adaptive_readahead = std::get<1>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
int total_keys = 0;
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
total_keys++;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
int buff_prefetch_count = 0;
int readahead_carry_over_count = 0;
int num_sst_files = NumTableFilesAtLevel(2);
size_t current_readahead_size = 0;
{
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"BlockPrefetcher::SetReadaheadState", [&](void* arg) {
readahead_carry_over_count++;
size_t readahead_size = *static_cast<size_t*>(arg);
if (readahead_carry_over_count) {
ASSERT_GT(readahead_size, 8 * 1024);
}
});
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
current_readahead_size = *static_cast<size_t*>(arg);
ASSERT_GT(current_readahead_size, 0);
});
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
if (is_adaptive_readahead) {
ro.adaptive_readahead = true;
}
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(num_keys, total_keys);
if (is_adaptive_readahead) {
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
} else {
ASSERT_GT(buff_prefetch_count, 0);
ASSERT_EQ(readahead_carry_over_count, 0);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
Close();
}
TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 1000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(FileSystem::Default(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
bool is_adaptive_readahead = std::get<1>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
int total_keys = 0;
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
total_keys++;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
int buff_prefetch_count = 0;
int readahead_carry_over_count = 0;
int num_sst_files = NumTableFilesAtLevel(2);
size_t current_readahead_size = 0;
bool read_async_called = false;
{
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* ) { read_async_called = true; });
SyncPoint::GetInstance()->SetCallBack(
"BlockPrefetcher::SetReadaheadState", [&](void* arg) {
readahead_carry_over_count++;
size_t readahead_size = *static_cast<size_t*>(arg);
if (readahead_carry_over_count) {
ASSERT_GT(readahead_size, 8 * 1024);
}
});
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
current_readahead_size = *static_cast<size_t*>(arg);
ASSERT_GT(current_readahead_size, 0);
});
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
if (is_adaptive_readahead) {
ro.adaptive_readahead = true;
}
ro.async_io = true;
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(num_keys, total_keys);
if (is_adaptive_readahead) {
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
} else {
ASSERT_EQ(readahead_carry_over_count, 0);
}
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (read_async_called) {
ASSERT_GT(buff_prefetch_count, 0);
ASSERT_GT(async_read_bytes.count, 0);
} else {
ASSERT_GT(buff_prefetch_count, 0);
ASSERT_EQ(async_read_bytes.count, 0);
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
Close();
}
TEST_P(PrefetchTest, AvoidBlockCacheLookupTwice) {
const int kNumKeys = 1000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
bool async_io = std::get<1>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); table_options.block_cache = cache;
table_options.no_block_cache = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
{
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
}
ReadOptions ro;
ro.async_io = async_io;
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
ASSERT_OK(options.statistics->Reset());
iter->Seek(BuildKey(99)); ASSERT_TRUE(iter->Valid());
ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOCK_CACHE_DATA_MISS),
1);
}
Close();
}
TEST_P(PrefetchTest, DBIterAsyncIONoIOUring) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 1000;
bool use_direct_io = std::get<0>(GetParam());
bool is_adaptive_readahead = std::get<1>(GetParam());
Options options;
SetGenericOptions(Env::Default(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
enable_io_uring = false;
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
enable_io_uring = true;
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
int total_keys = 0;
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
total_keys++;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
{
ReadOptions ro;
if (is_adaptive_readahead) {
ro.adaptive_readahead = true;
}
ro.async_io = true;
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(num_keys, total_keys);
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(options.statistics->getTickerCount(READ_ASYNC_MICROS), 0);
}
}
{
ReadOptions ro;
if (is_adaptive_readahead) {
ro.adaptive_readahead = true;
}
ro.async_io = true;
ro.tailing = true;
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_OK(iter->status());
ASSERT_EQ(num_keys, total_keys);
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(options.statistics->getTickerCount(READ_ASYNC_MICROS), 0);
}
}
Close();
enable_io_uring = true;
}
class PrefetchTest1 : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
PrefetchTest1() : DBTestBase("prefetch_test1", true) {}
virtual void SetGenericOptions(Env* env, bool use_direct_io,
Options& options) {
options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env;
options.disable_auto_compactions = true;
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
}
void SetBlockBasedTableOptions(BlockBasedTableOptions& table_options) {
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
};
INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool());
TEST_P(PrefetchTest1, SeekWithExtraPrefetchAsyncIO) {
const int kNumKeys = 2000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), GetParam(), options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
Close();
int buff_prefetch_count = 0, extra_prefetch_buff_cnt = 0;
for (size_t i = 0; i < 3; i++) {
table_options.num_file_reads_for_auto_readahead = i;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
s = TryReopen(options);
ASSERT_OK(s);
buff_prefetch_count = 0;
extra_prefetch_buff_cnt = 0;
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsync:ExtraPrefetching",
[&](void*) { extra_prefetch_buff_cnt++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.async_io = true;
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->Seek(BuildKey(
0)); ASSERT_TRUE(iter->Valid());
ASSERT_EQ(extra_prefetch_buff_cnt, (i == 0 ? 1 : 0));
ASSERT_EQ(buff_prefetch_count, i == 0 ? 2 : 1);
extra_prefetch_buff_cnt = 0;
buff_prefetch_count = 0;
iter->Seek(
BuildKey(22)); ASSERT_TRUE(iter->Valid());
ASSERT_EQ(extra_prefetch_buff_cnt, (i == 0 ? 1 : 0));
ASSERT_EQ(buff_prefetch_count, 1);
extra_prefetch_buff_cnt = 0;
buff_prefetch_count = 0;
iter->Seek(
BuildKey(33)); ASSERT_TRUE(iter->Valid());
ASSERT_EQ(extra_prefetch_buff_cnt, (i == 0 ? 1 : 0));
ASSERT_EQ(buff_prefetch_count, 1);
}
Close();
}
}
TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) {
const int kNumKeys = 1000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), GetParam(), options);
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
int buff_prefetch_count = 0;
int set_readahead = 0;
size_t readahead_size = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"BlockPrefetcher::SetReadaheadState",
[&](void* ) { set_readahead++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::TryReadFromCache",
[&](void* arg) { readahead_size = *static_cast<size_t*>(arg); });
SyncPoint::GetInstance()->EnableProcessing();
{
ReadOptions ro;
ro.adaptive_readahead = true;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
while (iter->Valid() && buff_prefetch_count == 0) {
iter->Next();
}
ASSERT_EQ(readahead_size, 8 * 1024);
ASSERT_EQ(buff_prefetch_count, 1);
ASSERT_EQ(set_readahead, 0);
buff_prefetch_count = 0;
iter->Seek(BuildKey(4004));
ASSERT_TRUE(iter->Valid());
while (iter->Valid() && buff_prefetch_count == 0) {
iter->Next();
}
ASSERT_EQ(readahead_size, 8 * 1024);
ASSERT_EQ(set_readahead, 0);
ASSERT_EQ(buff_prefetch_count, 1);
}
Close();
}
TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) {
const int kNumKeys = 2000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), GetParam(), options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); table_options.block_cache = cache;
table_options.no_block_cache = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
int buff_prefetch_count = 0;
size_t current_readahead_size = 0;
size_t expected_current_readahead_size = 8 * 1024;
size_t decrease_readahead_size = 8 * 1024;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::TryReadFromCache",
[&](void* arg) { current_readahead_size = *static_cast<size_t*>(arg); });
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.adaptive_readahead = true;
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->Seek(BuildKey(1011));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1015));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1019));
ASSERT_TRUE(iter->Valid());
buff_prefetch_count = 0;
}
{
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->Seek(
BuildKey(0)); ASSERT_TRUE(iter->Valid());
expected_current_readahead_size = std::max(
decrease_readahead_size,
(expected_current_readahead_size >= decrease_readahead_size
? (expected_current_readahead_size - decrease_readahead_size)
: 0));
iter->Seek(BuildKey(1000)); ASSERT_TRUE(iter->Valid());
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
iter->Seek(BuildKey(1004)); ASSERT_TRUE(iter->Valid());
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
expected_current_readahead_size *= 2;
iter->Seek(BuildKey(1011));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1015));
ASSERT_TRUE(iter->Valid());
expected_current_readahead_size = std::max(
decrease_readahead_size,
(expected_current_readahead_size >= decrease_readahead_size
? (expected_current_readahead_size - decrease_readahead_size)
: 0));
iter->Seek(BuildKey(1016));
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1019));
ASSERT_TRUE(iter->Valid());
expected_current_readahead_size = std::max(
decrease_readahead_size,
(expected_current_readahead_size >= decrease_readahead_size
? (expected_current_readahead_size - decrease_readahead_size)
: 0));
iter->Seek(BuildKey(1022));
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
ASSERT_EQ(buff_prefetch_count, 2);
buff_prefetch_count = 0;
}
Close();
}
TEST_P(PrefetchTest1, SeekParallelizationTest) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 2000;
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
FileSystem::Default(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), GetParam(), options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
bool read_async_called = false;
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* ) { read_async_called = true; });
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.adaptive_readahead = true;
ro.async_io = true;
{
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->Seek(BuildKey(0)); ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (read_async_called) {
ASSERT_EQ(buff_prefetch_count, 2);
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0);
} else {
ASSERT_EQ(buff_prefetch_count, 1);
}
}
Close();
}
TEST_P(PrefetchTest1, PollErrorRecoveryDuringIteration) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 500;
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
FileSystem::Default(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = GetParam();
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
ROCKSDB_GTEST_SKIP("Direct IO not supported");
return;
}
ASSERT_OK(s);
std::map<std::string, std::string> expected_data;
{
WriteBatch batch;
for (int i = 0; i < kNumKeys; i++) {
std::string key = BuildKey(i);
std::string value = "value_" + std::to_string(i) + "_" +
std::string(100, 'x'); ASSERT_OK(batch.Put(key, value));
expected_data[key] = value;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
std::atomic<int> poll_call_count{0};
std::atomic<int> poll_error_injected_count{0};
bool read_async_called = false;
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PollIfNeeded:IOStatus", [&](void* arg) {
poll_call_count++;
int current_count = poll_call_count.load();
if (current_count == 3) {
IOStatus* io_s = static_cast<IOStatus*>(arg);
*io_s = IOStatus::IOError("Injected Poll error for e2e testing");
poll_error_injected_count++;
std::cout << "PollErrorRecoveryDuringIteration: Injected error on "
"Poll call #"
<< current_count << std::endl;
}
});
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* ) { read_async_called = true; });
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.async_io = true;
ro.adaptive_readahead = true;
int keys_read = 0;
int data_mismatches = 0;
Status iter_status;
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
std::string key = iter->key().ToString();
std::string value = iter->value().ToString();
auto it = expected_data.find(key);
if (it == expected_data.end()) {
std::cout << "PollErrorRecoveryDuringIteration: Unexpected key: " << key
<< std::endl;
data_mismatches++;
} else if (it->second != value) {
std::cout << "PollErrorRecoveryDuringIteration: Value mismatch for key "
<< key << std::endl;
data_mismatches++;
}
keys_read++;
}
iter_status = iter->status();
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
std::cout << "PollErrorRecoveryDuringIteration: " << "read_async_called="
<< read_async_called << ", poll_calls=" << poll_call_count.load()
<< ", poll_errors_injected=" << poll_error_injected_count.load()
<< ", keys_read=" << keys_read << ", expected_keys=" << kNumKeys
<< ", data_mismatches=" << data_mismatches
<< ", iter_status=" << iter_status.ToString() << std::endl;
ASSERT_EQ(data_mismatches, 0)
<< "Found " << data_mismatches << " data mismatches";
if (read_async_called) {
ASSERT_EQ(poll_call_count.load(), 3)
<< "Expected exactly 3 Poll calls when error injected on 3rd call";
ASSERT_EQ(poll_error_injected_count.load(), 1)
<< "Expected exactly 1 Poll error to be injected";
ASSERT_TRUE(iter_status.IsIOError())
<< "Expected iterator to report IOError after Poll failure, got: "
<< iter_status.ToString();
std::cout << "PollErrorRecoveryDuringIteration: Successfully verified "
"Poll error was injected and propagated to iterator"
<< std::endl;
} else {
ASSERT_OK(iter_status);
ASSERT_EQ(keys_read, kNumKeys);
std::cout << "PollErrorRecoveryDuringIteration: Async IO (io_uring) not "
"supported on this platform, verified data correctness"
<< std::endl;
}
{
int retry_keys_read = 0;
int retry_data_mismatches = 0;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
std::string key = iter->key().ToString();
std::string value = iter->value().ToString();
auto it = expected_data.find(key);
if (it == expected_data.end()) {
retry_data_mismatches++;
} else if (it->second != value) {
retry_data_mismatches++;
}
retry_keys_read++;
}
ASSERT_OK(iter->status())
<< "Retry iteration failed: " << iter->status().ToString();
ASSERT_EQ(retry_keys_read, kNumKeys)
<< "Retry should read all " << kNumKeys << " keys";
ASSERT_EQ(retry_data_mismatches, 0)
<< "Retry found " << retry_data_mismatches << " data mismatches";
std::cout << "PollErrorRecoveryDuringIteration: Retry succeeded, read all "
<< retry_keys_read << " keys correctly" << std::endl;
}
Close();
}
namespace {
#ifdef GFLAGS
const int kMaxArgCount = 100;
const size_t kArgBufferSize = 100000;
void RunIOTracerParserTool(std::string trace_file) {
std::vector<std::string> params = {"./io_tracer_parser",
"-io_trace_file=" + trace_file};
char arg_buffer[kArgBufferSize];
char* argv[kMaxArgCount];
int argc = 0;
int cursor = 0;
for (const auto& arg : params) {
ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize);
ASSERT_LE(argc + 1, kMaxArgCount);
snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str());
argv[argc++] = arg_buffer + cursor;
cursor += static_cast<int>(arg.size()) + 1;
}
ASSERT_EQ(0, ROCKSDB_NAMESPACE::io_tracer_parser(argc, argv));
}
#endif }
TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 1000;
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
FileSystem::Default(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
int total_keys = 0;
{
WriteBatch batch;
Random rnd(309);
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
total_keys++;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
}
int buff_prefetch_count = 0;
bool read_async_called = false;
ReadOptions ro;
ro.adaptive_readahead = true;
ro.async_io = true;
if (std::get<1>(GetParam())) {
ro.readahead_size = 16 * 1024;
}
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* ) { read_async_called = true; });
SyncPoint::GetInstance()->EnableProcessing();
{
ASSERT_OK(options.statistics->Reset());
get_perf_context()->Reset();
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_OK(iter->status());
if (read_async_called) {
ASSERT_EQ(num_keys, total_keys);
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
HistogramData prefetched_bytes_discarded;
options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED,
&prefetched_bytes_discarded);
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(prefetched_bytes_discarded.count, 0);
ASSERT_EQ(get_perf_context()->number_async_seek, 0);
} else {
ASSERT_EQ(num_keys, total_keys);
}
ASSERT_GT(buff_prefetch_count, 0);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
TEST_P(PrefetchTest, MultipleSeekWithPosixFS) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 1000;
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
FileSystem::Default(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
int total_keys = 0;
{
WriteBatch batch;
Random rnd(309);
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
total_keys++;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
}
(void)total_keys;
int num_keys_first_batch = 0;
int num_keys_second_batch = 0;
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(450));
while (iter->Valid() && num_keys_first_batch < 100) {
ASSERT_OK(iter->status());
num_keys_first_batch++;
iter->Next();
}
ASSERT_OK(iter->status());
iter->Seek(BuildKey(942));
while (iter->Valid()) {
ASSERT_OK(iter->status());
num_keys_second_batch++;
iter->Next();
}
ASSERT_OK(iter->status());
}
int buff_prefetch_count = 0;
bool read_async_called = false;
ReadOptions ro;
ro.adaptive_readahead = true;
ro.async_io = true;
if (std::get<1>(GetParam())) {
ro.readahead_size = 16 * 1024;
}
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* ) { read_async_called = true; });
SyncPoint::GetInstance()->EnableProcessing();
{
ASSERT_OK(options.statistics->Reset());
get_perf_context()->Reset();
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
{
iter->Seek(BuildKey(450));
while (iter->Valid() && num_keys < 100) {
ASSERT_OK(iter->status());
num_keys++;
iter->Next();
}
ASSERT_OK(iter->status());
ASSERT_EQ(num_keys, num_keys_first_batch);
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (read_async_called) {
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0);
} else {
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(get_perf_context()->number_async_seek, 0);
}
}
{
num_keys = 0;
ASSERT_OK(options.statistics->Reset());
get_perf_context()->Reset();
iter->Seek(BuildKey(942));
while (iter->Valid()) {
ASSERT_OK(iter->status());
num_keys++;
iter->Next();
}
ASSERT_OK(iter->status());
ASSERT_EQ(num_keys, num_keys_second_batch);
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
HistogramData prefetched_bytes_discarded;
options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED,
&prefetched_bytes_discarded);
ASSERT_GT(prefetched_bytes_discarded.count, 0);
if (read_async_called) {
ASSERT_GT(buff_prefetch_count, 0);
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0);
} else {
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(get_perf_context()->number_async_seek, 0);
}
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
TEST_P(PrefetchTest, SeekParallelizationTestWithPosix) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 2000;
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
FileSystem::Default(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
bool read_async_called = false;
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* ) { read_async_called = true; });
SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.adaptive_readahead = true;
ro.async_io = true;
if (std::get<1>(GetParam())) {
ro.readahead_size = 16 * 1024;
}
{
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->Seek(BuildKey(0)); ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
iter->Next();
ASSERT_TRUE(iter->Valid());
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (read_async_called) {
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0);
if (std::get<1>(GetParam())) {
ASSERT_EQ(buff_prefetch_count, 1);
} else {
ASSERT_EQ(buff_prefetch_count, 2);
}
} else {
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(get_perf_context()->number_async_seek, 0);
}
}
Close();
}
#ifdef GFLAGS
TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 1000;
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
FileSystem::Default(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
Options options;
SetGenericOptions(env.get(), use_direct_io, options);
options.statistics = CreateDBStatistics();
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
return;
} else {
ASSERT_OK(s);
}
int total_keys = 0;
{
WriteBatch batch;
Random rnd(309);
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
total_keys++;
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
}
int buff_prefetch_count = 0;
bool read_async_called = false;
ReadOptions ro;
ro.adaptive_readahead = true;
ro.async_io = true;
if (std::get<1>(GetParam())) {
ro.readahead_size = 16 * 1024;
}
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* ) { read_async_called = true; });
SyncPoint::GetInstance()->EnableProcessing();
{
WriteOptions write_opt;
TraceOptions trace_opt;
std::unique_ptr<TraceWriter> trace_writer;
std::string trace_file_path = dbname_ + "/io_trace_file";
ASSERT_OK(
NewFileTraceWriter(env_, EnvOptions(), trace_file_path, &trace_writer));
ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer)));
ASSERT_OK(options.statistics->Reset());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_OK(iter->status());
ASSERT_OK(db_->EndIOTrace());
ASSERT_OK(env_->FileExists(trace_file_path));
ASSERT_EQ(num_keys, total_keys);
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (read_async_called) {
ASSERT_GT(buff_prefetch_count, 0);
ASSERT_GT(async_read_bytes.count, 0);
} else {
ASSERT_EQ(async_read_bytes.count, 0);
}
RunIOTracerParserTool(trace_file_path);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
#endif
class FilePrefetchBufferTest : public testing::Test {
public:
void SetUp() override {
SetupSyncPointsToMockDirectIO();
env_ = Env::Default();
fs_ = FileSystem::Default();
test_dir_ = test::PerThreadDBPath("file_prefetch_buffer_test");
ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
stats_ = CreateDBStatistics();
}
void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); }
void Write(const std::string& fname, const std::string& content) {
std::unique_ptr<FSWritableFile> f;
ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr));
ASSERT_OK(f->Append(content, IOOptions(), nullptr));
ASSERT_OK(f->Close(IOOptions(), nullptr));
}
void Read(const std::string& fname, const FileOptions& opts,
std::unique_ptr<RandomAccessFileReader>* reader) {
std::string fpath = Path(fname);
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr));
reader->reset(new RandomAccessFileReader(
std::move(f), fpath, env_->GetSystemClock().get(),
nullptr, stats_.get()));
}
void AssertResult(const std::string& content,
const std::vector<FSReadRequest>& reqs) {
for (const auto& r : reqs) {
ASSERT_OK(r.status);
ASSERT_EQ(r.len, r.result.size());
ASSERT_EQ(content.substr(r.offset, r.len), r.result.ToString());
}
}
FileSystem* fs() { return fs_.get(); }
Statistics* stats() { return stats_.get(); }
private:
Env* env_;
std::shared_ptr<FileSystem> fs_;
std::string test_dir_;
std::shared_ptr<Statistics> stats_;
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
};
TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) {
std::string fname = "seek-with-block-cache-hit";
Random rand(0);
std::string content = rand.RandomString(32768);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 16384;
readahead_params.max_readahead_size = 16384;
FilePrefetchBuffer fpb(readahead_params, true, false, fs());
Slice result;
Status s = fpb.PrefetchAsync(IOOptions(), r.get(), 0, 4096, &result);
if (s.IsNotSupported()) {
return;
}
ASSERT_TRUE(s.IsTryAgain());
fpb.UpdateReadPattern(0, 4096, false);
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
ASSERT_TRUE(fpb.TryReadFromCache(io_opts, r.get(), 8192, 8192, &result, &s));
}
TEST_F(FilePrefetchBufferTest, SeekWithoutAlignment) {
std::string fname = "seek-without-alignment";
Random rand(0);
std::string content = rand.RandomString(32768);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
size_t alignment = r->file()->GetRequiredBufferAlignment();
size_t n = alignment / 2;
int read_async_called = 0;
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::ReadAsync",
[&](void* ) { read_async_called++; });
SyncPoint::GetInstance()->EnableProcessing();
{
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 16384;
readahead_params.implicit_auto_readahead = true;
readahead_params.num_file_reads_for_auto_readahead = 2;
readahead_params.num_buffers = 2;
FilePrefetchBuffer fpb(readahead_params, true,
false, fs(), nullptr, nullptr,
nullptr, FilePrefetchBufferUsage::kUnknown);
Slice result;
Status s = fpb.PrefetchAsync(IOOptions(), r.get(), n, n, &result);
if (s.IsNotSupported()) {
return;
}
ASSERT_TRUE(s.IsTryAgain());
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
ASSERT_TRUE(fpb.TryReadFromCache(io_opts, r.get(), n, n, &result, &s));
if (read_async_called) {
ASSERT_EQ(fpb.GetPrefetchOffset(), n);
}
}
{
read_async_called = false;
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 16384;
readahead_params.max_readahead_size = 16384;
readahead_params.num_file_reads_for_auto_readahead = 2;
readahead_params.num_buffers = 2;
FilePrefetchBuffer fpb(readahead_params, true,
false, fs(), nullptr, nullptr,
nullptr, FilePrefetchBufferUsage::kUnknown);
Slice result;
Status s = fpb.PrefetchAsync(IOOptions(), r.get(), n, n, &result);
if (s.IsNotSupported()) {
return;
}
ASSERT_TRUE(s.IsTryAgain());
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
ASSERT_TRUE(fpb.TryReadFromCache(io_opts, r.get(), n, n, &result, &s));
if (read_async_called) {
ASSERT_EQ(fpb.GetPrefetchOffset(), 0);
}
}
}
TEST_F(FilePrefetchBufferTest, NoSyncWithAsyncIO) {
std::string fname = "seek-with-block-cache-hit";
Random rand(0);
std::string content = rand.RandomString(32768);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 16384;
readahead_params.num_buffers = 2;
FilePrefetchBuffer fpb(readahead_params, true,
false, fs(), nullptr, nullptr,
nullptr, FilePrefetchBufferUsage::kUnknown);
int read_async_called = 0;
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::ReadAsync",
[&](void* ) { read_async_called++; });
SyncPoint::GetInstance()->EnableProcessing();
Slice async_result;
Status s = fpb.PrefetchAsync(IOOptions(), r.get(), 3000, 4000, &async_result);
if (s.IsNotSupported()) {
return;
}
ASSERT_TRUE(s.IsTryAgain());
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
ASSERT_TRUE(fpb.TryReadFromCache(io_opts, r.get(), 3000,
4000, &async_result, &s));
HistogramData sst_read_micros;
stats()->histogramData(SST_READ_MICROS, &sst_read_micros);
ASSERT_EQ(sst_read_micros.count, 0);
ASSERT_EQ(read_async_called, 2);
ASSERT_EQ(async_result.size(), 4000);
Slice result(&content[3000], 4000);
ASSERT_EQ(result.size(), 4000);
ASSERT_EQ(result, async_result);
}
TEST_F(FilePrefetchBufferTest, SyncReadaheadStats) {
std::string fname = "seek-with-block-cache-hit";
Random rand(0);
std::string content = rand.RandomString(32768);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
std::shared_ptr<Statistics> stats = CreateDBStatistics();
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 8192;
FilePrefetchBuffer fpb(
readahead_params, true, false, fs(), nullptr, stats.get(),
nullptr , FilePrefetchBufferUsage::kUserScanPrefetch );
Slice result;
Status s;
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 0, 4096, &result, &s));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(stats->getTickerCount(PREFETCH_HITS), 0);
ASSERT_EQ(stats->getTickerCount(PREFETCH_BYTES_USEFUL), 0);
fpb.UpdateReadPattern(4096, 4096, false);
ASSERT_TRUE(
fpb.TryReadFromCache(IOOptions(), r.get(), 8192, 8192, &result, &s));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(stats->getTickerCount(PREFETCH_HITS), 0);
ASSERT_EQ(stats->getTickerCount(PREFETCH_BYTES_USEFUL), 4096);
ASSERT_TRUE(
fpb.TryReadFromCache(IOOptions(), r.get(), 12288, 4096, &result, &s));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 1);
ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), 8192);
ASSERT_TRUE(
fpb.TryReadFromCache(IOOptions(), r.get(), 16000, 10000, &result, &s));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0);
ASSERT_EQ(
stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL),
8576);
}
TEST_F(FilePrefetchBufferTest, ForCompaction) {
std::string fname = "fs-prefetch-buffer-for-compaction";
Random rand(0);
std::string content = rand.RandomString(64 * 1024);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
std::shared_ptr<Statistics> stats = CreateDBStatistics();
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 8192;
readahead_params.num_buffers = 1;
FilePrefetchBuffer fpb(readahead_params, true ,
false , fs(), nullptr,
stats.get());
Slice result;
Status s;
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 0 ,
3000 , &result, &s, true));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(result.size(), 3000);
ASSERT_EQ(strncmp(result.data(), content.substr(0, 3000).c_str(), 3000), 0);
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 3000 ,
10000 , &result, &s, true));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(result.size(), 10000);
ASSERT_EQ(strncmp(result.data(), content.substr(3000, 10000).c_str(), 10000),
0);
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 15000 ,
4096 , &result, &s, true));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(result.size(), 4096);
ASSERT_EQ(strncmp(result.data(), content.substr(15000, 4096).c_str(), 4096),
0);
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 40000 ,
20000 , &result, &s, true));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(result.size(), 20000);
ASSERT_EQ(strncmp(result.data(), content.substr(40000, 20000).c_str(), 20000),
0);
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 60000 ,
10000 , &result, &s, true));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(result.size(), 64 * 1024 - 60000);
ASSERT_EQ(
strncmp(result.data(), content.substr(60000, 64 * 1024 - 60000).c_str(),
64 * 1024 - 60000),
0);
}
TEST_F(FilePrefetchBufferTest, PollErrorPropagation) {
std::string fname = "poll-error-test";
Random rand(0);
std::string content = rand.RandomString(32768);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 16384;
readahead_params.max_readahead_size = 16384;
FilePrefetchBuffer fpb(readahead_params, true,
false, fs());
Slice result;
Status s = fpb.PrefetchAsync(IOOptions(), r.get(), 0, 4096, &result);
if (s.IsNotSupported()) {
ROCKSDB_GTEST_SKIP("Async IO not supported on this platform");
return;
}
ASSERT_TRUE(s.IsTryAgain());
{
std::vector<std::tuple<uint64_t, size_t, bool>> buf_info(1);
fpb.TEST_GetBufferOffsetandSize(buf_info);
bool async_read_in_progress = std::get<2>(buf_info[0]);
if (!async_read_in_progress) {
ROCKSDB_GTEST_SKIP("Async IO not available (sync fallback used)");
return;
}
}
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PollIfNeeded:IOStatus", [&](void* arg) {
IOStatus* io_s = static_cast<IOStatus*>(arg);
*io_s = IOStatus::IOError("Injected Poll error for testing");
});
SyncPoint::GetInstance()->EnableProcessing();
IOOptions io_opts;
io_opts.rate_limiter_priority = Env::IOPriority::IO_LOW;
Status read_status;
bool found =
fpb.TryReadFromCache(io_opts, r.get(), 0, 4096, &result, &read_status);
ASSERT_FALSE(found) << "Expected TryReadFromCache to return false on Poll "
"error, but it returned true";
ASSERT_TRUE(read_status.IsIOError())
<< "Expected IOError status, got: " << read_status.ToString();
ASSERT_TRUE(read_status.ToString().find("Injected Poll error") !=
std::string::npos)
<< "Expected error message to contain 'Injected Poll error', got: "
<< read_status.ToString();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(FilePrefetchBufferTest, ReadAsyncSyncFallbackOnNotSupported) {
std::string fname = "read-async-sync-fallback";
Random rand(0);
std::string content = rand.RandomString(32768);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
SyncPoint::GetInstance()->SetCallBack(
"RandomAccessFileReader::ReadAsync:InjectStatus", [](void* arg) {
*static_cast<IOStatus*>(arg) = IOStatus::NotSupported();
});
SyncPoint::GetInstance()->EnableProcessing();
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 16384;
readahead_params.max_readahead_size = 16384;
readahead_params.num_buffers = 2;
FilePrefetchBuffer fpb(readahead_params, true,
false, fs());
Slice result;
Status s;
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 0, 4096, &result, &s));
ASSERT_OK(s);
ASSERT_EQ(result.size(), 4096);
ASSERT_EQ(memcmp(result.data(), content.data(), 4096), 0);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
class FSBufferPrefetchTest
: public testing::Test,
public ::testing::WithParamInterface<std::tuple<bool, bool>> {
public:
class BufferReuseFS : public FileSystemWrapper {
public:
explicit BufferReuseFS(const std::shared_ptr<FileSystem>& _target)
: FileSystemWrapper(_target) {}
~BufferReuseFS() override {}
const char* Name() const override { return "BufferReuseFS"; }
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
class WrappedRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
explicit WrappedRandomAccessFile(
std::unique_ptr<FSRandomAccessFile>& file)
: FSRandomAccessFileOwnerWrapper(std::move(file)) {}
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) override {
for (size_t i = 0; i < num_reqs; ++i) {
FSReadRequest& req = reqs[i];
char* internalData = new char[req.len];
req.status = Read(req.offset, req.len, options, &req.result,
internalData, dbg);
Slice* internalSlice = new Slice(internalData, req.len);
FSAllocationPtr internalPtr(internalSlice, [](void* ptr) {
delete[] static_cast<const char*>(
static_cast<Slice*>(ptr)->data_);
delete static_cast<Slice*>(ptr);
});
req.fs_scratch = std::move(internalPtr);
}
return IOStatus::OK();
}
};
std::unique_ptr<FSRandomAccessFile> file;
IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
EXPECT_OK(s);
result->reset(new WrappedRandomAccessFile(file));
return s;
}
void SupportedOps(int64_t& supported_ops) override {
supported_ops = 1 << FSSupportedOps::kAsyncIO;
supported_ops |= 1 << FSSupportedOps::kFSBuffer;
}
};
void SetUp() override {
SetupSyncPointsToMockDirectIO();
env_ = Env::Default();
bool use_async_prefetch = std::get<0>(GetParam());
if (use_async_prefetch) {
fs_ = FileSystem::Default();
} else {
fs_ = std::make_shared<BufferReuseFS>(FileSystem::Default());
}
test_dir_ = test::PerThreadDBPath("fs_buffer_prefetch_test");
ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
stats_ = CreateDBStatistics();
}
void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); }
void Write(const std::string& fname, const std::string& content) {
std::unique_ptr<FSWritableFile> f;
ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr));
ASSERT_OK(f->Append(content, IOOptions(), nullptr));
ASSERT_OK(f->Close(IOOptions(), nullptr));
}
void Read(const std::string& fname, const FileOptions& opts,
std::unique_ptr<RandomAccessFileReader>* reader) {
std::string fpath = Path(fname);
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr));
reader->reset(new RandomAccessFileReader(
std::move(f), fpath, env_->GetSystemClock().get(),
nullptr, stats_.get()));
}
FileSystem* fs() { return fs_.get(); }
Statistics* stats() { return stats_.get(); }
SystemClock* clock() { return env_->GetSystemClock().get(); }
private:
Env* env_;
std::shared_ptr<FileSystem> fs_;
std::string test_dir_;
std::shared_ptr<Statistics> stats_;
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
};
INSTANTIATE_TEST_CASE_P(FSBufferPrefetchTest, FSBufferPrefetchTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
TEST_P(FSBufferPrefetchTest, FSBufferPrefetchStatsInternals) {
std::string fname = "fs-buffer-prefetch-stats-internals";
Random rand(0);
std::string content = rand.RandomString(32768);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
std::shared_ptr<Statistics> stats = CreateDBStatistics();
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 8192;
bool use_async_prefetch = std::get<0>(GetParam());
bool for_compaction = std::get<1>(GetParam());
if (use_async_prefetch && for_compaction) {
return;
}
size_t num_buffers = use_async_prefetch ? 2 : 1;
readahead_params.num_buffers = num_buffers;
FilePrefetchBuffer fpb(
readahead_params, true , false , fs(),
clock(), stats.get(), nullptr ,
FilePrefetchBufferUsage::kUserScanPrefetch );
int overlap_buffer_write_ct = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::CopyDataToOverlapBuffer:Complete",
[&](void* ) { overlap_buffer_write_ct++; });
SyncPoint::GetInstance()->EnableProcessing();
Slice result;
Status s;
std::vector<std::tuple<uint64_t, size_t, bool>> buffer_info(num_buffers);
std::pair<uint64_t, size_t> overlap_buffer_info;
bool could_read_from_cache =
fpb.TryReadFromCache(IOOptions(), r.get(), 0 , 4096 ,
&result, &s, for_compaction);
if (use_async_prefetch && s.IsNotSupported()) {
return;
}
ASSERT_TRUE(could_read_from_cache);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0);
ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL), 0);
ASSERT_EQ(strncmp(result.data(), content.substr(0, 4096).c_str(), 4096), 0);
fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info);
fpb.TEST_GetBufferOffsetandSize(buffer_info);
if (use_async_prefetch) {
bool async_read_in_progress = std::get<2>(buffer_info[1]);
if (!async_read_in_progress) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
return;
}
ASSERT_EQ(overlap_buffer_info.first, 0);
ASSERT_EQ(overlap_buffer_info.second, 0);
ASSERT_EQ(std::get<0>(buffer_info[0]), 0);
ASSERT_EQ(std::get<1>(buffer_info[0]), 4096 + 8192 / 2);
ASSERT_EQ(std::get<0>(buffer_info[1]), 4096 + 8192 / 2);
ASSERT_EQ(std::get<1>(buffer_info[1]), 8192 / 2);
} else {
ASSERT_EQ(overlap_buffer_info.first, 0);
ASSERT_EQ(overlap_buffer_info.second, 0);
ASSERT_EQ(std::get<0>(buffer_info[0]), 0);
ASSERT_EQ(std::get<1>(buffer_info[0]), 4096 + 8192);
}
fpb.UpdateReadPattern(4096, 4096, false);
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 8192 ,
8192 , &result, &s, for_compaction));
ASSERT_EQ(s, Status::OK());
if (!for_compaction) {
ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0);
ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL),
4096); }
ASSERT_EQ(strncmp(result.data(), content.substr(8192, 8192).c_str(), 8192),
0);
fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info);
fpb.TEST_GetBufferOffsetandSize(buffer_info);
if (use_async_prefetch) {
ASSERT_EQ(overlap_buffer_info.first, 0);
ASSERT_EQ(overlap_buffer_info.second, 0);
ASSERT_EQ(std::get<0>(buffer_info[0]), 8192);
ASSERT_EQ(std::get<1>(buffer_info[0]), 8192 + 8192 / 2);
ASSERT_EQ(std::get<0>(buffer_info[1]), 8192 + (8192 + 8192 / 2));
ASSERT_EQ(std::get<1>(buffer_info[1]), 8192 / 2);
} else {
ASSERT_EQ(overlap_buffer_info.first, 8192);
ASSERT_EQ(overlap_buffer_info.second, 8192);
ASSERT_EQ(overlap_buffer_write_ct, 2);
ASSERT_EQ(std::get<0>(buffer_info[0]), 12288);
ASSERT_EQ(std::get<1>(buffer_info[0]), 12288);
}
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 12288 ,
4096 , &result, &s, for_compaction));
ASSERT_EQ(s, Status::OK());
if (!for_compaction) {
ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 1);
ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL),
4096); }
ASSERT_EQ(strncmp(result.data(), content.substr(12288, 4096).c_str(), 4096),
0);
fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info);
fpb.TEST_GetBufferOffsetandSize(buffer_info);
if (use_async_prefetch) {
ASSERT_EQ(overlap_buffer_info.first, 0);
ASSERT_EQ(overlap_buffer_info.second, 0);
ASSERT_EQ(std::get<0>(buffer_info[0]), 8192);
ASSERT_EQ(std::get<1>(buffer_info[0]), 8192 + 8192 / 2);
ASSERT_EQ(std::get<0>(buffer_info[1]), 8192 + (8192 + 8192 / 2));
ASSERT_EQ(std::get<1>(buffer_info[1]), 8192 / 2);
} else {
fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info);
ASSERT_EQ(overlap_buffer_info.first, 8192);
ASSERT_EQ(overlap_buffer_info.second, 8192);
ASSERT_EQ(overlap_buffer_write_ct, 2);
ASSERT_EQ(std::get<0>(buffer_info[0]), 12288);
ASSERT_EQ(std::get<1>(buffer_info[0]), 12288);
}
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 16000 ,
10000 , &result, &s, for_compaction));
ASSERT_EQ(s, Status::OK());
if (!for_compaction) {
ASSERT_EQ(stats->getAndResetTickerCount(PREFETCH_HITS), 0);
ASSERT_EQ(
stats->getAndResetTickerCount(PREFETCH_BYTES_USEFUL),
8576);
}
ASSERT_EQ(strncmp(result.data(), content.substr(16000, 10000).c_str(), 10000),
0);
fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info);
fpb.TEST_GetBufferOffsetandSize(buffer_info);
if (use_async_prefetch) {
ASSERT_EQ(overlap_buffer_info.first, 16000);
ASSERT_EQ(overlap_buffer_info.second, 10000);
ASSERT_EQ(overlap_buffer_write_ct, 3);
ASSERT_EQ(std::get<0>(buffer_info[0]), 24576);
ASSERT_EQ(std::get<1>(buffer_info[0]), 32768 - 24576);
ASSERT_EQ(std::get<0>(buffer_info[1]), 32768);
ASSERT_EQ(std::get<1>(buffer_info[1]), 4096);
ASSERT_TRUE(std::get<2>(
buffer_info[1])); } else {
ASSERT_EQ(overlap_buffer_info.first, 16000);
ASSERT_EQ(overlap_buffer_info.second, 10000);
ASSERT_EQ(overlap_buffer_write_ct, 4);
ASSERT_EQ(std::get<0>(buffer_info[0]), 12288 + 12288);
ASSERT_EQ(std::get<1>(buffer_info[0]), 8192);
}
}
TEST_P(FSBufferPrefetchTest, FSBufferPrefetchUnalignedReads) {
std::string fname = "fs-buffer-prefetch-unaligned-reads";
Random rand(0);
std::string content = rand.RandomString(1000);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
std::shared_ptr<Statistics> stats = CreateDBStatistics();
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 5;
readahead_params.max_readahead_size = 100;
bool use_async_prefetch = std::get<0>(GetParam());
bool for_compaction = std::get<1>(GetParam());
if (use_async_prefetch && for_compaction) {
return;
}
size_t num_buffers = use_async_prefetch ? 2 : 1;
readahead_params.num_buffers = num_buffers;
FilePrefetchBuffer fpb(readahead_params, true ,
false , fs(), clock(),
stats.get());
int overlap_buffer_write_ct = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::CopyDataToOverlapBuffer:Complete",
[&](void* ) { overlap_buffer_write_ct++; });
SyncPoint::GetInstance()->EnableProcessing();
Slice result;
Status s;
std::vector<std::tuple<uint64_t, size_t, bool>> buffer_info(num_buffers);
std::pair<uint64_t, size_t> overlap_buffer_info;
bool could_read_from_cache =
fpb.TryReadFromCache(IOOptions(), r.get(), 5 , 3 ,
&result, &s, for_compaction);
if (use_async_prefetch && s.IsNotSupported()) {
return;
}
ASSERT_TRUE(could_read_from_cache);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(strncmp(result.data(), content.substr(5, 3).c_str(), 3), 0);
fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info);
fpb.TEST_GetBufferOffsetandSize(buffer_info);
if (use_async_prefetch) {
bool async_read_in_progress = std::get<2>(buffer_info[1]);
if (!async_read_in_progress) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
return;
}
ASSERT_EQ(overlap_buffer_info.first, 0);
ASSERT_EQ(overlap_buffer_info.second, 0);
ASSERT_EQ(std::get<0>(buffer_info[0]), 0);
ASSERT_EQ(std::get<1>(buffer_info[0]), 1000);
ASSERT_EQ(std::get<0>(buffer_info[1]), 4096);
ASSERT_EQ(std::get<1>(buffer_info[1]), 4096);
ASSERT_TRUE(std::get<2>(buffer_info[1])); } else {
ASSERT_EQ(overlap_buffer_info.first, 0);
ASSERT_EQ(overlap_buffer_info.second, 0);
ASSERT_EQ(std::get<0>(buffer_info[0]), 5);
ASSERT_EQ(std::get<1>(buffer_info[0]), 3 + 5);
}
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 16 ,
7 , &result, &s, for_compaction));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(strncmp(result.data(), content.substr(16, 7).c_str(), 7), 0);
fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info);
fpb.TEST_GetBufferOffsetandSize(buffer_info);
if (use_async_prefetch) {
ASSERT_EQ(overlap_buffer_info.first, 0);
ASSERT_EQ(overlap_buffer_info.second, 0);
ASSERT_EQ(std::get<0>(buffer_info[0]), 0);
ASSERT_EQ(std::get<1>(buffer_info[0]), 1000);
} else {
ASSERT_EQ(overlap_buffer_info.first, 0);
ASSERT_EQ(overlap_buffer_info.second, 0);
ASSERT_EQ(std::get<0>(buffer_info[0]), 16);
ASSERT_EQ(std::get<1>(buffer_info[0]), 7 + 10);
}
if (use_async_prefetch) {
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 10 ,
8 , &result, &s, for_compaction));
} else {
ASSERT_FALSE(fpb.TryReadFromCache(IOOptions(), r.get(), 10 ,
8 , &result, &s, for_compaction));
}
ASSERT_EQ(s, Status::OK());
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 27 ,
6 , &result, &s, for_compaction));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(strncmp(result.data(), content.substr(27, 6).c_str(), 6), 0);
fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info);
fpb.TEST_GetBufferOffsetandSize(buffer_info);
if (use_async_prefetch) {
ASSERT_EQ(overlap_buffer_info.first, 0);
ASSERT_EQ(overlap_buffer_info.second, 0);
ASSERT_EQ(std::get<0>(buffer_info[0]), 0);
ASSERT_EQ(std::get<1>(buffer_info[0]), 1000);
} else {
ASSERT_EQ(overlap_buffer_info.first, 0);
ASSERT_EQ(overlap_buffer_info.second, 0);
ASSERT_EQ(std::get<0>(buffer_info[0]), 16);
ASSERT_EQ(std::get<1>(buffer_info[0]), 7 + 10);
}
ASSERT_TRUE(fpb.TryReadFromCache(IOOptions(), r.get(), 30 ,
20 , &result, &s, for_compaction));
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(strncmp(result.data(), content.substr(30, 20).c_str(), 20), 0);
fpb.TEST_GetOverlapBufferOffsetandSize(overlap_buffer_info);
fpb.TEST_GetBufferOffsetandSize(buffer_info);
if (use_async_prefetch) {
ASSERT_EQ(overlap_buffer_info.first, 0);
ASSERT_EQ(overlap_buffer_info.second, 0);
ASSERT_EQ(std::get<0>(buffer_info[0]), 0);
ASSERT_EQ(std::get<1>(buffer_info[0]), 1000);
} else {
ASSERT_EQ(overlap_buffer_info.first, 30);
ASSERT_EQ(overlap_buffer_info.second, 20);
ASSERT_EQ(overlap_buffer_write_ct, 2);
ASSERT_EQ(std::get<0>(buffer_info[0]), 33);
ASSERT_EQ(std::get<1>(buffer_info[0]), (50 - 33) + 20);
}
}
TEST_P(FSBufferPrefetchTest, FSBufferPrefetchRandomized) {
std::string fname = "fs-buffer-prefetch-randomized";
Random rand(0);
std::string content = rand.RandomString(16 * 1024 * 1024);
Write(fname, content);
FileOptions opts;
std::unique_ptr<RandomAccessFileReader> r;
Read(fname, opts, &r);
std::shared_ptr<Statistics> stats = CreateDBStatistics();
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 512;
readahead_params.max_readahead_size = 2048;
bool use_async_prefetch = std::get<0>(GetParam());
bool for_compaction = std::get<1>(GetParam());
if (use_async_prefetch && for_compaction) {
return;
}
size_t num_buffers = use_async_prefetch ? 2 : 1;
readahead_params.num_buffers = num_buffers;
FilePrefetchBuffer fpb(readahead_params, true ,
false , fs(), clock(),
stats.get());
Slice result;
Status s;
uint64_t offset = 0;
Random rnd(987654);
for (int i = 0; i < 1000; i++) {
size_t len = rnd.Uniform(16 * 1024);
if (offset >= content.size()) {
std::cout << "Stopped early after " << i << " iterations" << std::endl;
break;
}
bool could_read_from_cache = fpb.TryReadFromCache(
IOOptions(), r.get(), offset, len, &result, &s, for_compaction);
if (use_async_prefetch && s.IsNotSupported()) {
return;
}
ASSERT_TRUE(could_read_from_cache);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(result.size(),
std::min(len, content.size() - static_cast<size_t>(offset)));
ASSERT_EQ(strncmp(result.data(),
content.substr(offset, offset + len).c_str(), len),
0);
if (i % 4 == 0) {
offset += len + 2 * readahead_params.max_readahead_size;
} else if (i % 4 == 1) {
offset += len + readahead_params.max_readahead_size / 2;
} else {
offset += len;
}
}
}
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}