#include "db/db_test_util.h"
#include "env/mock_env.h"
#include "file/sst_file_manager_impl.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/table.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
class DBSSTTest : public DBTestBase {
public:
DBSSTTest(const std::string& test_name = "db_sst_test")
: DBTestBase(test_name, true) {}
};
class FlushedFileCollector : public EventListener {
public:
FlushedFileCollector() = default;
~FlushedFileCollector() override = default;
void OnFlushCompleted(DB* , const FlushJobInfo& info) override {
std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.push_back(info.file_path);
}
std::vector<std::string> GetFlushedFiles() {
std::lock_guard<std::mutex> lock(mutex_);
std::vector<std::string> result;
for (const auto& fname : flushed_files_) {
result.push_back(fname);
}
return result;
}
void ClearFlushedFiles() {
std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.clear();
}
private:
std::vector<std::string> flushed_files_;
std::mutex mutex_;
};
TEST_F(DBSSTTest, DontDeletePendingOutputs) {
Options options;
options.env = env_;
options.create_if_missing = true;
DestroyAndReopen(options);
std::function<void()> purge_obsolete_files_function = [&]() {
JobContext job_context(0);
dbfull()->TEST_LockMutex();
dbfull()->FindObsoleteFiles(&job_context, true );
dbfull()->TEST_UnlockMutex();
dbfull()->PurgeObsoleteFiles(job_context);
job_context.Clean();
};
env_->table_write_callback_ = &purge_obsolete_files_function;
for (int i = 0; i < 2; ++i) {
ASSERT_OK(Put("a", "begin"));
ASSERT_OK(Put("z", "end"));
ASSERT_OK(Flush());
}
Compact("a", "b");
}
TEST_F(DBSSTTest, SSTsWithLdbSuffixHandling) {
Options options = CurrentOptions();
options.write_buffer_size = 110 << 10; options.num_levels = 4;
DestroyAndReopen(options);
Random rnd(301);
int key_id = 0;
for (int i = 0; i < 10; ++i) {
GenerateNewFile(&rnd, &key_id, false);
}
ASSERT_OK(Flush());
Close();
int const num_files = GetSstFileCount(dbname_);
ASSERT_GT(num_files, 0);
Reopen(options);
std::vector<std::string> values;
values.reserve(key_id);
for (int k = 0; k < key_id; ++k) {
values.push_back(Get(Key(k)));
}
Close();
std::vector<std::string> filenames;
GetSstFiles(env_, dbname_, &filenames);
int num_ldb_files = 0;
for (size_t i = 0; i < filenames.size(); ++i) {
if (i & 1) {
continue;
}
std::string const rdb_name = dbname_ + "/" + filenames[i];
std::string const ldb_name = Rocks2LevelTableFileName(rdb_name);
ASSERT_TRUE(env_->RenameFile(rdb_name, ldb_name).ok());
++num_ldb_files;
}
ASSERT_GT(num_ldb_files, 0);
ASSERT_EQ(num_files, GetSstFileCount(dbname_));
Reopen(options);
for (int k = 0; k < key_id; ++k) {
ASSERT_EQ(values[k], Get(Key(k)));
}
Destroy(options);
}
TEST_F(DBSSTTest, DontDeleteMovedFile) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.max_bytes_for_level_base = 1024 * 1024; options.level0_file_num_compaction_trigger =
2; DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 2; ++i) {
for (int j = 0; j < 100; ++j) {
ASSERT_OK(Put(Key(i * 50 + j), rnd.RandomString(10 * 1024)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,1", FilesPerLevel(0));
Reopen(options);
}
TEST_F(DBSSTTest, DeleteObsoleteFilesPendingOutputs) {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 2 * 1024 * 1024; options.max_bytes_for_level_base = 1024 * 1024; options.level0_file_num_compaction_trigger =
2; options.max_background_flushes = 2;
options.max_background_compactions = 2;
OnFileDeletionListener* listener = new OnFileDeletionListener();
options.listeners.emplace_back(listener);
Reopen(options);
Random rnd(301);
for (int i = 0; i < 2; ++i) {
for (int j = 0; j < 100; ++j) {
ASSERT_OK(Put(Key(i * 50 + j), rnd.RandomString(10 * 1024)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,1", FilesPerLevel(0));
test::SleepingBackgroundTask blocking_thread;
port::Mutex mutex_;
bool already_blocked(false);
std::function<void()> block_first_time = [&]() {
bool blocking = false;
{
MutexLock l(&mutex_);
if (!already_blocked) {
blocking = true;
already_blocked = true;
}
}
if (blocking) {
blocking_thread.DoSleep();
}
};
env_->table_write_callback_ = &block_first_time;
for (int j = 0; j < 256; ++j) {
ASSERT_OK(Put(Key(j), rnd.RandomString(10 * 1024)));
}
blocking_thread.WaitUntilSleeping();
ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr));
ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(metadata.size(), 1U);
auto file_on_L2 = metadata[0].name;
listener->SetExpectedFileName(dbname_ + file_on_L2);
ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr,
true ));
ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
blocking_thread.WakeUp();
blocking_thread.WaitUntilDone();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,1,0,1", FilesPerLevel(0));
metadata.clear();
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(metadata.size(), 2U);
ASSERT_EQ(Status::NotFound(), env_->FileExists(dbname_ + file_on_L2));
listener->VerifyMatchedCount(1);
}
TEST_F(DBSSTTest, DeleteFileNotCalledForNotCreatedSSTFile) {
Options options = CurrentOptions();
options.env = env_;
OnFileDeletionListener* listener = new OnFileDeletionListener();
options.listeners.emplace_back(listener);
Reopen(options);
ASSERT_OK(Flush());
ASSERT_EQ("", FilesPerLevel(0));
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(metadata.size(), 0U);
listener->VerifyMatchedCount(0);
}
TEST_F(DBSSTTest, DeleteFileNotCalledForCreatedSSTFile) {
Options options = CurrentOptions();
options.env = env_;
OnFileDeletionListener* listener = new OnFileDeletionListener();
options.listeners.emplace_back(listener);
Reopen(options);
ASSERT_OK(Put("pika", "choo"));
ASSERT_OK(Flush());
ASSERT_EQ("1", FilesPerLevel(0));
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(metadata.size(), 1U);
listener->VerifyMatchedCount(0);
}
TEST_F(DBSSTTest, DBWithSstFileManager) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
int files_added = 0;
int files_deleted = 0;
int files_moved = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnAddFile", [&](void* ) { files_added++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnDeleteFile",
[&](void* ) { files_deleted++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnMoveFile", [&](void* ) { files_moved++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 25; i++) {
GenerateNewRandomFile(&rnd);
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
}
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
uint64_t total_files_size = 0;
for (auto& file_to_size : files_in_db) {
total_files_size += file_to_size.second;
}
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
ASSERT_GE(files_added, 25);
ASSERT_GT(files_deleted, 0);
ASSERT_EQ(files_moved, 0);
Close();
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
Reopen(options);
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
Close();
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
sst_file_manager.reset(NewSstFileManager(env_));
options.sst_file_manager = sst_file_manager;
sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Reopen(options);
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
int files_added = 0;
int files_deleted = 0;
int files_moved = 0;
int files_scheduled_to_delete = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnAddFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
files_added++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
files_deleted++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnMoveFile", [&](void* ) { files_moved++; });
int64_t untracked_files = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnUntrackFile",
[&](void* ) { ++untracked_files; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.enable_blob_files = true;
options.blob_file_size = 32; DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("Key_" + std::to_string(i), "Value_" + std::to_string(i)));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
}
std::vector<uint64_t> blob_files = GetBlobFileNumbers();
ASSERT_EQ(files_added, blob_files.size());
ASSERT_EQ(files_deleted, 0);
ASSERT_EQ(files_scheduled_to_delete, 0);
ASSERT_EQ(files_moved, 0);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
uint64_t total_files_size = 0;
for (auto& file_to_size : files_in_db) {
total_files_size += file_to_size.second;
}
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
Close();
ASSERT_EQ(untracked_files, files_in_db.size());
untracked_files = 0;
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
Reopen(options);
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
Close();
ASSERT_EQ(untracked_files, files_in_db.size());
untracked_files = 0;
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
sst_file_manager.reset(NewSstFileManager(env_));
options.sst_file_manager = sst_file_manager;
sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Reopen(options);
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
ASSERT_EQ(files_deleted, 0);
ASSERT_EQ(files_scheduled_to_delete, 0);
Close();
ASSERT_EQ(untracked_files, files_in_db.size());
untracked_files = 0;
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_deleted++;
}
});
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_EQ(files_deleted, blob_files.size());
ASSERT_EQ(files_scheduled_to_delete, blob_files.size());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFilesWithGC) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.enable_blob_files = true;
options.blob_file_size = 32; options.disable_auto_compactions = true;
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 0.5;
int files_added = 0;
int files_deleted = 0;
int files_moved = 0;
int files_scheduled_to_delete = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnAddFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
files_added++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
files_deleted++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnMoveFile", [&](void* ) { files_moved++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
DestroyAndReopen(options);
Random rnd(301);
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "first_value";
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "second_value";
ASSERT_OK(Put(first_key, first_value));
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Flush());
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "third_value";
constexpr char fourth_key[] = "fourth_key";
constexpr char fourth_value[] = "fourth_value";
constexpr char fifth_key[] = "fifth_key";
constexpr char fifth_value[] = "fifth_value";
ASSERT_OK(Put(third_key, third_value));
ASSERT_OK(Put(fourth_key, fourth_value));
ASSERT_OK(Put(fifth_key, fifth_value));
ASSERT_OK(Flush());
const std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
ASSERT_EQ(original_blob_files.size(), 5);
ASSERT_EQ(files_added, 5);
ASSERT_EQ(files_deleted, 0);
ASSERT_EQ(files_scheduled_to_delete, 0);
ASSERT_EQ(files_moved, 0);
{
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
}
const size_t cutoff_index = static_cast<size_t>(
options.blob_garbage_collection_age_cutoff * original_blob_files.size());
size_t expected_number_of_files = original_blob_files.size();
ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
expected_number_of_files -= cutoff_index;
files_added = 0;
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
sfm->WaitForEmptyTrash();
ASSERT_EQ(Get(first_key), first_value);
ASSERT_EQ(Get(second_key), second_value);
ASSERT_EQ(Get(third_key), third_value);
ASSERT_EQ(Get(fourth_key), fourth_value);
ASSERT_EQ(Get(fifth_key), fifth_value);
const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
ASSERT_EQ(new_blob_files.size(), expected_number_of_files);
ASSERT_EQ(files_added, 0);
ASSERT_EQ(files_deleted, cutoff_index);
ASSERT_EQ(files_scheduled_to_delete, cutoff_index);
ASSERT_EQ(files_moved, 0);
for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
}
{
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
}
Close();
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_deleted++;
}
});
ASSERT_OK(DestroyDB(dbname_, options));
sfm->WaitForEmptyTrash();
ASSERT_EQ(files_deleted, 5);
ASSERT_EQ(files_scheduled_to_delete, 5);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
class DBSSTTestRateLimit : public DBSSTTest,
public ::testing::WithParamInterface<bool> {
public:
DBSSTTestRateLimit() : DBSSTTest() {}
~DBSSTTestRateLimit() override = default;
};
TEST_P(DBSSTTestRateLimit, RateLimitedDelete) {
Destroy(last_options_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBSSTTest::RateLimitedDelete:1",
"DeleteScheduler::BackgroundEmptyTrash"},
});
std::vector<uint64_t> penalties;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::BackgroundEmptyTrash:Wait",
[&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t* abs_time_us = static_cast<uint64_t*>(arg);
uint64_t cur_time = env_->NowMicros();
if (*abs_time_us > cur_time) {
env_->MockSleepForMicroseconds(*abs_time_us - cur_time);
}
env_->MockSleepForMicroseconds(Random::GetTLSInstance()->Uniform(10));
*abs_time_us = Env::Default()->NowMicros();
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::StartPeriodicTaskScheduler:DisableScheduler", [&](void* arg) {
bool* disable_scheduler = static_cast<bool*>(arg);
*disable_scheduler = true;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
bool different_wal_dir = GetParam();
Options options = CurrentOptions();
SetTimeElapseOnlySleepOnReopen(&options);
options.disable_auto_compactions = true;
options.env = env_;
options.statistics = CreateDBStatistics();
if (different_wal_dir) {
options.wal_dir = alternative_wal_dir_;
}
int64_t rate_bytes_per_sec = 1024 * 10; Status s;
options.sst_file_manager.reset(
NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
ASSERT_OK(s);
options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
WriteOptions wo;
if (!different_wal_dir) {
wo.disableWAL = true;
}
Reopen(options);
for (char v = 'a'; v <= 'd'; v++) {
ASSERT_OK(Put("Key2", DummyString(1024, v), wo));
ASSERT_OK(Put("Key3", DummyString(1024, v), wo));
ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
ASSERT_OK(Put("Key1", DummyString(1024, v), wo));
ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
ASSERT_OK(Flush());
}
ASSERT_EQ("4", FilesPerLevel(0));
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,1", FilesPerLevel(0));
uint64_t delete_start_time = env_->NowMicros();
TEST_SYNC_POINT("DBSSTTest::RateLimitedDelete:1");
sfm->WaitForEmptyTrash();
uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
uint64_t total_files_size = 0;
uint64_t expected_penlty = 0;
ASSERT_EQ(penalties.size(), metadata.size());
for (size_t i = 0; i < metadata.size(); i++) {
total_files_size += metadata[i].size;
expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec);
ASSERT_EQ(expected_penlty, penalties[i]);
}
ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
ASSERT_LT(time_spent_deleting, expected_penlty * 1.1);
ASSERT_EQ(4, options.statistics->getAndResetTickerCount(FILES_MARKED_TRASH));
ASSERT_EQ(
0, options.statistics->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(RateLimitedDelete, DBSSTTestRateLimit,
::testing::Bool());
TEST_F(DBSSTTest, RateLimitedWALDelete) {
Destroy(last_options_);
std::vector<uint64_t> penalties;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::BackgroundEmptyTrash:Wait",
[&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.compression = kNoCompression;
options.env = env_;
int64_t rate_bytes_per_sec = 1024 * 10; Status s;
options.sst_file_manager.reset(
NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
ASSERT_OK(s);
options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
SetTimeElapseOnlySleepOnReopen(&options);
ASSERT_OK(TryReopen(options));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
for (char v = 'a'; v <= 'd'; v++) {
ASSERT_OK(Put("Key2", DummyString(1024, v)));
ASSERT_OK(Put("Key3", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Put("Key1", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Flush());
}
ASSERT_EQ("4", FilesPerLevel(0));
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,1", FilesPerLevel(0));
sfm->WaitForEmptyTrash();
ASSERT_EQ(penalties.size(), 8);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
class DBWALTestWithParam
: public DBTestBase,
public testing::WithParamInterface<std::tuple<std::string, bool>> {
public:
explicit DBWALTestWithParam()
: DBTestBase("db_wal_test_with_params", true) {
wal_dir_ = std::get<0>(GetParam());
wal_dir_same_as_dbname_ = std::get<1>(GetParam());
}
std::string wal_dir_;
bool wal_dir_same_as_dbname_;
};
TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
class MyEnv : public EnvWrapper {
public:
MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}
const char* Name() const override { return "MyEnv"; }
Status DeleteFile(const std::string& fname) override {
if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
return Status::OK();
}
return target()->DeleteFile(fname);
}
void set_fake_log_delete(bool fake) { fake_log_delete = fake; }
private:
bool fake_log_delete;
};
std::unique_ptr<MyEnv> env(new MyEnv(env_));
Destroy(last_options_);
env->set_fake_log_delete(true);
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.compression = kNoCompression;
options.env = env.get();
options.wal_dir = dbname_ + wal_dir_;
int64_t rate_bytes_per_sec = 1024 * 10; Status s;
options.sst_file_manager.reset(
NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
ASSERT_OK(s);
options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
Reopen(options);
for (char v = 'a'; v <= 'd'; v++) {
if (v == 'c') {
options.sst_file_manager->SetDeleteRateBytesPerSecond(1);
}
ASSERT_OK(Put("Key2", DummyString(1024, v)));
ASSERT_OK(Put("Key3", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Put("Key1", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Flush());
}
ASSERT_EQ("4", FilesPerLevel(0));
Close();
options.sst_file_manager.reset();
std::vector<std::string> filenames;
int trash_log_count = 0;
if (!wal_dir_same_as_dbname_) {
std::unique_ptr<WritableFile> result;
ASSERT_OK(env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
EnvOptions()));
result.reset();
}
ASSERT_OK(env->GetChildren(options.wal_dir, &filenames));
for (const std::string& fname : filenames) {
if (fname.find(".log.trash") != std::string::npos) {
trash_log_count++;
}
}
ASSERT_GE(trash_log_count, 1);
env->set_fake_log_delete(false);
Reopen(options);
filenames.clear();
trash_log_count = 0;
ASSERT_OK(env->GetChildren(options.wal_dir, &filenames));
for (const std::string& fname : filenames) {
if (fname.find(".log.trash") != std::string::npos) {
trash_log_count++;
}
}
ASSERT_EQ(trash_log_count, 0);
Close();
}
INSTANTIATE_TEST_CASE_P(DBWALTestWithParam, DBWALTestWithParam,
::testing::Values(std::make_tuple("", true),
std::make_tuple("_wal_dir", false)));
class DBObsoleteFileDeletionOnOpenTest
: public DBSSTTest,
public ::testing::WithParamInterface<double> {
public:
explicit DBObsoleteFileDeletionOnOpenTest()
: DBSSTTest("db_sst_deletion_on_open_test") {}
};
TEST_P(DBObsoleteFileDeletionOnOpenTest, Basic) {
Options options = CurrentOptions();
options.sst_file_manager.reset(
NewSstFileManager(env_, nullptr, "", 1024 * 1024 ));
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->SetDeleteRateBytesPerSecond(1024 * 1024);
double max_trash_db_ratio = GetParam();
sfm->delete_scheduler()->SetMaxTrashDBRatio(max_trash_db_ratio);
int bg_delete_file = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteTrashFile:DeleteFile",
[&](void* ) { bg_delete_file++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Destroy(last_options_);
ASSERT_OK(env_->CreateDirIfMissing(dbname_));
ASSERT_OK(
WriteStringToFile(env_, "abc", dbname_ + "/" + "001.sst.trash", false));
ASSERT_OK(
WriteStringToFile(env_, "abc", dbname_ + "/" + "002.sst.trash", false));
ASSERT_OK(
WriteStringToFile(env_, "abc", dbname_ + "/" + "003.sst.trash", false));
uint64_t sst_file_number = 100;
const std::string kObsoleteSstFileOne =
MakeTableFileName(dbname_, sst_file_number);
ASSERT_OK(WriteStringToFile(env_, "abc", kObsoleteSstFileOne, false));
const std::string kObsoleteSstFileTwo =
MakeTableFileName(dbname_, sst_file_number - 1);
ASSERT_OK(WriteStringToFile(env_, "abc", kObsoleteSstFileTwo, false));
Reopen(options);
sfm->WaitForEmptyTrash();
ASSERT_NOK(env_->FileExists(dbname_ + "/" + "001.sst.trash"));
ASSERT_NOK(env_->FileExists(dbname_ + "/" + "002.sst.trash"));
ASSERT_NOK(env_->FileExists(dbname_ + "/" + "003.sst.trash"));
ASSERT_NOK(env_->FileExists(kObsoleteSstFileOne));
ASSERT_NOK(env_->FileExists(kObsoleteSstFileTwo));
if (max_trash_db_ratio < 1) {
ASSERT_LE(bg_delete_file, 5);
} else {
ASSERT_EQ(bg_delete_file, 5);
}
ASSERT_EQ(sfm->GetTotalSize(), 0);
ASSERT_EQ(sfm->delete_scheduler()->GetTotalTrashSize(), 0);
}
INSTANTIATE_TEST_CASE_P(DBObsoleteFileDeletionOnOpenTest,
DBObsoleteFileDeletionOnOpenTest,
::testing::Values(0, 0.5, 1, 1.2));
TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
std::atomic<int> bg_delete_file(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteTrashFile:DeleteFile",
[&](void* ) { bg_delete_file++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteFile", [&](void* ) { bg_delete_file++; });
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.db_paths.emplace_back(dbname_, 1024 * 100);
options.db_paths.emplace_back(dbname_ + "_2", 1024 * 100);
options.env = env_;
int64_t rate_bytes_per_sec = 1024 * 1024; Status s;
options.sst_file_manager.reset(
NewSstFileManager(env_, nullptr, "", rate_bytes_per_sec, false, &s,
1.1));
ASSERT_OK(s);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
DestroyAndReopen(options);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
wo.disableWAL = true;
for (int i = 0; i < 4; i++) {
ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'A'), wo));
ASSERT_OK(Flush());
}
ASSERT_EQ("4", FilesPerLevel(0));
CompactRangeOptions compact_options;
compact_options.target_path_id = 1;
Slice begin("Key0");
Slice end("Key3");
ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
ASSERT_EQ("0,1", FilesPerLevel(0));
for (int i = 4; i < 8; i++) {
ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'B'), wo));
ASSERT_OK(Flush());
}
ASSERT_EQ("4,1", FilesPerLevel(0));
begin = "Key4";
end = "Key7";
ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
ASSERT_EQ("0,2", FilesPerLevel(0));
sfm->WaitForEmptyTrash();
ASSERT_EQ(bg_delete_file, 8);
compact_options.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel(0));
sfm->WaitForEmptyTrash();
ASSERT_EQ(bg_delete_file, 10);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, DestroyDBWithRateLimitedDelete) {
int bg_delete_file = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteTrashFile:DeleteFile",
[&](void* ) { bg_delete_file++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Status s;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.env = env_;
options.sst_file_manager.reset(
NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
ASSERT_OK(s);
DestroyAndReopen(options);
for (int i = 0; i < 4; i++) {
ASSERT_OK(Put("Key" + std::to_string(i), DummyString(1024, 'A')));
ASSERT_OK(Flush());
}
ASSERT_EQ("4", FilesPerLevel(0));
Close();
int num_sst_files = 0;
int num_wal_files = 0;
std::vector<std::string> db_files;
ASSERT_OK(env_->GetChildren(dbname_, &db_files));
for (const std::string& f : db_files) {
if (f.substr(f.find_last_of('.') + 1) == "sst") {
num_sst_files++;
} else if (f.substr(f.find_last_of('.') + 1) == "log") {
num_wal_files++;
}
}
ASSERT_GT(num_sst_files, 0);
ASSERT_GT(num_wal_files, 0);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->SetDeleteRateBytesPerSecond(1024 * 1024);
sfm->delete_scheduler()->SetMaxTrashDBRatio(1000.0);
ASSERT_OK(DestroyDB(dbname_, options));
sfm->WaitForEmptyTrash();
ASSERT_EQ(bg_delete_file, num_sst_files + num_wal_files);
}
TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.disable_auto_compactions = true;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
uint64_t first_file_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &first_file_size));
ASSERT_EQ(sfm->GetTotalSize(), first_file_size);
sfm->SetMaxAllowedSpaceUsage(first_file_size + 1);
ASSERT_OK(Put("key1", "val1"));
ASSERT_NOK(Flush());
}
TEST_F(DBSSTTest, DBWithMaxSpaceAllowedWithBlobFiles) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.disable_auto_compactions = true;
options.enable_blob_files = true;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
uint64_t files_size = 0;
uint64_t total_files_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db, &files_size));
ASSERT_GT(files_size, 0);
total_files_size = files_size;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &files_size));
total_files_size += files_size;
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
sfm->SetMaxAllowedSpaceUsage(total_files_size + 1);
bool max_allowed_space_reached = false;
bool delete_blob_file = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlobFileCompletionCallback::CallBack::MaxAllowedSpaceReached",
[&](void* ) { max_allowed_space_reached = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BuildTable::AfterDeleteFile",
[&](void* ) { delete_blob_file = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{
"BuildTable::AfterDeleteFile",
"DBSSTTest::DBWithMaxSpaceAllowedWithBlobFiles:1",
},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key1", "val1"));
ASSERT_NOK(Flush());
ASSERT_TRUE(max_allowed_space_reached);
TEST_SYNC_POINT("DBSSTTest::DBWithMaxSpaceAllowedWithBlobFiles:1");
ASSERT_TRUE(delete_blob_file);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, CancellingCompactionsWorks) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.level0_file_num_compaction_trigger = 2;
options.statistics = CreateDBStatistics();
DestroyAndReopen(options);
int completed_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* ) {
sfm->SetMaxAllowedSpaceUsage(0);
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun",
[&](void* ) { completed_compactions++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
uint64_t total_file_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size));
sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(completed_compactions, 0);
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
ASSERT_GT(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
COMPACTION_CANCELLED),
0);
ASSERT_EQ(0,
dbfull()->immutable_db_options().statistics.get()->getTickerCount(
FILES_MARKED_TRASH));
ASSERT_EQ(4,
dbfull()->immutable_db_options().statistics.get()->getTickerCount(
FILES_DELETED_IMMEDIATELY));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, CancellingManualCompactionsWorks) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.statistics = CreateDBStatistics();
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
uint64_t total_file_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size));
sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
ASSERT_TRUE(dbfull()
->CompactRange(CompactRangeOptions(), nullptr, nullptr)
.IsCompactionTooLarge());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
COMPACTION_CANCELLED),
1);
auto l0_files = collector->GetFlushedFiles();
ASSERT_TRUE(
dbfull()
->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0)
.IsCompactionTooLarge());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
COMPACTION_CANCELLED),
2);
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
sfm->SetMaxAllowedSpaceUsage(0);
int completed_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactFilesImpl:End", [&](void* ) { completed_compactions++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(),
l0_files, 0));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
ASSERT_GT(completed_compactions, 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
std::vector<int> max_space_limits_mbs = {1, 10};
std::atomic<bool> bg_error_set(false);
std::atomic<int> reached_max_space_on_flush(0);
std::atomic<int> reached_max_space_on_compaction(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
[&](void* arg) {
Status* bg_error = static_cast<Status*>(arg);
bg_error_set = true;
reached_max_space_on_flush++;
*bg_error = Status::OK();
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) {
bool* enough_room = static_cast<bool*>(arg);
*enough_room = true;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached",
[&](void* ) {
bg_error_set = true;
reached_max_space_on_compaction++;
});
for (auto limit_mb : max_space_limits_mbs) {
bg_error_set = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.write_buffer_size = 1024 * 512; DestroyAndReopen(options);
Random rnd(301);
sfm->SetMaxAllowedSpaceUsage(limit_mb * 1024 * 1024);
while (true) {
auto s = Put(rnd.RandomString(10), rnd.RandomString(50));
if (!s.ok()) {
break;
}
}
ASSERT_TRUE(bg_error_set);
uint64_t total_sst_files_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_sst_files_size));
ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
ASSERT_GT(reached_max_space_on_flush, 0);
ASSERT_GT(reached_max_space_on_compaction, 0);
}
TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFiles) {
for (int iter = 0; iter < 2; iter++) {
Options options;
options.create_if_missing = true;
options.write_buffer_size = 100000;
options.disable_auto_compactions = true;
options.max_open_files = -1;
if (iter == 0) {
options.max_file_opening_threads = 1;
} else {
options.max_file_opening_threads = 5;
}
options = CurrentOptions(options);
DestroyAndReopen(options);
for (int i = 0; i < 12; i++) {
std::string k = "L2_" + Key(i);
ASSERT_OK(Put(k, k + std::string(1000, 'a')));
ASSERT_OK(Flush());
}
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
for (int i = 0; i < 12; i++) {
std::string k = "L0_" + Key(i);
ASSERT_OK(Put(k, k + std::string(1000, 'a')));
ASSERT_OK(Flush());
}
Close();
Reopen(options);
ASSERT_EQ("12,0,12", FilesPerLevel(0));
std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
for (const auto& level : files) {
for (const auto& file : level) {
ASSERT_TRUE(file.table_reader_handle != nullptr);
}
}
for (int i = 0; i < 12; i++) {
ASSERT_EQ(Get("L0_" + Key(i)), "L0_" + Key(i) + std::string(1000, 'a'));
ASSERT_EQ(Get("L2_" + Key(i)), "L2_" + Key(i) + std::string(1000, 'a'));
}
}
}
TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFilesSubjectToMemoryLimit) {
for (CacheEntryRoleOptions::Decision charge_table_reader :
{CacheEntryRoleOptions::Decision::kEnabled,
CacheEntryRoleOptions::Decision::kDisabled}) {
for (int iter = 0; iter < 2; iter++) {
Options options;
options.create_if_missing = true;
options.write_buffer_size = 100000;
options.disable_auto_compactions = true;
options.max_open_files = -1;
BlockBasedTableOptions table_options;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
if (iter == 0) {
options.max_file_opening_threads = 1;
} else {
options.max_file_opening_threads = 5;
}
DestroyAndReopen(options);
for (int i = 0; i < 5; i++) {
std::string k = "L2_" + Key(i);
ASSERT_OK(Put(k, k + std::string(1000, 'a')));
ASSERT_OK(Flush()) << i;
}
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
for (int i = 0; i < 5; i++) {
std::string k = "L0_" + Key(i);
ASSERT_OK(Put(k, k + std::string(1000, 'a')));
ASSERT_OK(Flush());
}
Close();
table_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kBlockBasedTableReader,
{ charge_table_reader}});
table_options.block_cache =
NewLRUCache(1024 , 0 ,
true );
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (charge_table_reader == CacheEntryRoleOptions::Decision::kEnabled) {
EXPECT_TRUE(s.IsMemoryLimit());
EXPECT_TRUE(s.ToString().find(
kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
CacheEntryRole::kBlockBasedTableReader)]) !=
std::string::npos);
EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") !=
std::string::npos);
} else {
EXPECT_TRUE(s.ok());
ASSERT_EQ("5,0,5", FilesPerLevel(0));
}
}
}
}
TEST_F(DBSSTTest, GetTotalSstFilesSize) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:oldest_ancester_time", [&](void* arg) {
uint64_t* current_time = static_cast<uint64_t*>(arg);
*current_time = 0;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.compression = kNoCompression;
DestroyAndReopen(options);
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 10; j++) {
std::string val = "val_file_" + std::to_string(i);
ASSERT_OK(Put(Key(j), val));
}
ASSERT_OK(Flush());
}
ASSERT_EQ("5", FilesPerLevel(0));
std::vector<LiveFileMetaData> live_files_meta;
dbfull()->GetLiveFilesMetaData(&live_files_meta);
ASSERT_EQ(live_files_meta.size(), 5);
uint64_t single_file_size = live_files_meta[0].size;
uint64_t live_sst_files_size = 0;
uint64_t total_sst_files_size = 0;
for (const auto& file_meta : live_files_meta) {
live_sst_files_size += file_meta.size;
}
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
ASSERT_OK(iter1->status());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel(0));
live_files_meta.clear();
dbfull()->GetLiveFilesMetaData(&live_files_meta);
ASSERT_EQ(live_files_meta.size(), 1);
live_sst_files_size = 0;
total_sst_files_size = 0;
for (const auto& file_meta : live_files_meta) {
live_sst_files_size += file_meta.size;
}
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
ASSERT_EQ(live_sst_files_size, 1 * single_file_size);
ASSERT_EQ(total_sst_files_size, 6 * single_file_size);
std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));
ASSERT_OK(iter2->status());
for (int i = 0; i < 10; i++) {
ASSERT_OK(Delete(Key(i)));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("", FilesPerLevel(0));
live_files_meta.clear();
dbfull()->GetLiveFilesMetaData(&live_files_meta);
ASSERT_EQ(live_files_meta.size(), 0);
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
ASSERT_EQ(total_sst_files_size, 6 * single_file_size);
ASSERT_OK(iter1->status());
iter1.reset();
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
ASSERT_EQ(total_sst_files_size, 1 * single_file_size);
ASSERT_OK(iter2->status());
iter2.reset();
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
ASSERT_EQ(total_sst_files_size, 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.compression = kNoCompression;
DestroyAndReopen(options);
for (int i = 0; i < 5; i++) {
ASSERT_OK(Put(Key(i), "val"));
ASSERT_OK(Flush());
}
ASSERT_EQ("5", FilesPerLevel(0));
std::vector<LiveFileMetaData> live_files_meta;
dbfull()->GetLiveFilesMetaData(&live_files_meta);
ASSERT_EQ(live_files_meta.size(), 5);
uint64_t single_file_size = live_files_meta[0].size;
uint64_t live_sst_files_size = 0;
uint64_t total_sst_files_size = 0;
for (const auto& file_meta : live_files_meta) {
live_sst_files_size += file_meta.size;
}
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
ASSERT_OK(iter1->status());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,5", FilesPerLevel(0));
live_files_meta.clear();
dbfull()->GetLiveFilesMetaData(&live_files_meta);
ASSERT_EQ(live_files_meta.size(), 5);
live_sst_files_size = 0;
total_sst_files_size = 0;
for (const auto& file_meta : live_files_meta) {
live_sst_files_size += file_meta.size;
}
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));
ASSERT_OK(iter2->status());
for (int i = 0; i < 5; i++) {
ASSERT_OK(Delete(Key(i)));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("", FilesPerLevel(0));
live_files_meta.clear();
dbfull()->GetLiveFilesMetaData(&live_files_meta);
ASSERT_EQ(live_files_meta.size(), 0);
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
ASSERT_OK(iter1->status());
iter1.reset();
ASSERT_OK(iter2->status());
iter2.reset();
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
ASSERT_EQ(total_sst_files_size, 0);
}
TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.enable_blob_files = true;
options.min_blob_size = 0;
options.disable_auto_compactions = true;
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 0.5;
options.atomic_flush = true;
int files_added = 0;
int files_deleted = 0;
int files_scheduled_to_delete = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnAddFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_added++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_deleted++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
DestroyAndReopen(options);
Random rnd(301);
ASSERT_OK(Put("key_1", "value_1"));
ASSERT_OK(Put("key_2", "value_2"));
ASSERT_OK(Put("key_3", "value_3"));
ASSERT_OK(Put("key_4", "value_4"));
ASSERT_OK(Flush());
ASSERT_OK(Put("key_3", "new_value_3"));
ASSERT_OK(Put("key_4", "new_value_4"));
ASSERT_OK(Flush());
ASSERT_OK(Put("Key5", "blob_value5"));
ASSERT_OK(Put("Key6", "blob_value6"));
ASSERT_OK(Flush());
ASSERT_EQ(files_added, 3);
ASSERT_EQ(files_deleted, 0);
ASSERT_EQ(files_scheduled_to_delete, 0);
files_added = 0;
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(files_added, 1);
ASSERT_EQ(files_scheduled_to_delete, 1);
sfm->WaitForEmptyTrash();
ASSERT_EQ(files_deleted, 1);
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_deleted++;
}
});
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_EQ(files_scheduled_to_delete, 4);
sfm->WaitForEmptyTrash();
ASSERT_EQ(files_deleted, 4);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBSSTTest, SstGetFileSizeFails) {
ASSERT_OK(Put("x", "zaphod"));
ASSERT_OK(Flush());
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(1U, metadata.size());
std::string filename = dbname_ + metadata[0].name;
std::shared_ptr<FaultInjectionTestFS> fault_fs =
std::make_shared<FaultInjectionTestFS>(
CurrentOptions().env->GetFileSystem());
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
Options options = CurrentOptions();
options.env = fault_fs_env.get();
options.paranoid_checks = false;
for (int i = 0; i < 4; i++) {
SCOPED_TRACE("Iteration = " + std::to_string(i));
fault_fs->SetFailRandomAccessGetFileSizeSst(false);
fault_fs->SetFailFilesystemGetFileSizeSst(false);
Close();
if (i == 1) {
fault_fs->SetFailRandomAccessGetFileSizeSst(true);
} else if (i == 2) {
fault_fs->SetFailFilesystemGetFileSizeSst(true);
} else if (i == 3) {
fault_fs->SetFailRandomAccessGetFileSizeSst(true);
fault_fs->SetFailFilesystemGetFileSizeSst(true);
}
ASSERT_OK(TryReopen(options));
std::string value;
Status get_status = db_->Get({}, "x", &value);
if (i < 2) {
ASSERT_OK(get_status);
} else if (i == 2) {
if (encrypted_env_) {
ASSERT_EQ(get_status.code(), Status::Code::kIOError);
ASSERT_STREQ(get_status.getState(), "FileSystem::GetFileSize failed");
} else {
ASSERT_OK(get_status);
}
} else {
ASSERT_EQ(i, 3);
ASSERT_EQ(get_status.code(), Status::Code::kIOError);
ASSERT_STREQ(get_status.getState(), "FileSystem::GetFileSize failed");
}
}
Close();
}
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}