#include <atomic>
#include <set>
#include <thread>
#include <unordered_map>
#include "db/compaction/compaction_job.h"
#include "db/db_impl/db_impl_secondary.h"
#include "db/db_test_util.h"
#include "options/options_helper.h"
#include "port/stack_trace.h"
#include "rocksdb/db.h"
#include "rocksdb/sst_file_writer.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
namespace ROCKSDB_NAMESPACE {
class AbortSynchronizer {
public:
AbortSynchronizer() : abort_cv_(&abort_mutex_) {}
~AbortSynchronizer() {
if (abort_thread_.joinable()) {
abort_thread_.join();
}
}
AbortSynchronizer(const AbortSynchronizer&) = delete;
AbortSynchronizer& operator=(const AbortSynchronizer&) = delete;
void TriggerAbort(DBImpl* db) {
if (abort_triggered_.load() && abort_completed_.load()) {
Reset();
}
if (!abort_triggered_.exchange(true)) {
abort_thread_ = std::thread([this, db]() {
db->AbortAllCompactions();
SignalAbortCompleted();
});
}
}
void WaitForAbortCompletion() {
MutexLock l(&abort_mutex_);
while (!abort_completed_.load()) {
abort_cv_.Wait();
}
}
void Reset() {
if (abort_thread_.joinable()) {
abort_thread_.join();
}
abort_triggered_.store(false);
abort_completed_.store(false);
}
bool IsAbortTriggered() const { return abort_triggered_.load(); }
private:
void SignalAbortCompleted() {
MutexLock l(&abort_mutex_);
abort_completed_.store(true);
abort_cv_.SignalAll();
}
std::atomic<bool> abort_triggered_{false}; std::atomic<bool> abort_completed_{false}; port::Mutex abort_mutex_;
port::CondVar abort_cv_;
std::thread abort_thread_; };
inline void CleanupSyncPoints() {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
class SyncPointAbortHelper {
public:
explicit SyncPointAbortHelper(const std::string& trigger_point)
: trigger_point_(trigger_point) {}
void Setup(DBImpl* db_impl) {
db_impl_ = db_impl;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::AbortAllCompactions:FlagSet", kWaitPointName},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
trigger_point_, [this](void* ) {
abort_sync_.TriggerAbort(db_impl_);
TEST_SYNC_POINT_CALLBACK(kWaitPointName, nullptr);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
}
void WaitForAbortCompletion() { abort_sync_.WaitForAbortCompletion(); }
void CleanupAndWait() {
CleanupSyncPoints();
WaitForAbortCompletion();
}
private:
static constexpr const char* kWaitPointName =
"SyncPointAbortHelper::WaitForAbort";
std::string trigger_point_;
DBImpl* db_impl_{nullptr};
AbortSynchronizer abort_sync_;
};
class DBCompactionAbortTest : public DBTestBase {
public:
DBCompactionAbortTest()
: DBTestBase("db_compaction_abort_test", false) {}
protected:
std::unordered_map<std::string, std::string> expected_values_;
std::shared_ptr<Statistics> stats_;
Options GetOptionsWithStats() {
Options options = CurrentOptions();
stats_ = CreateDBStatistics();
options.statistics = stats_;
return options;
}
void PopulateData(int num_files, int keys_per_file, int value_size,
bool overlapping = true, int seed = 301) {
Random rnd(seed);
for (int i = 0; i < num_files; ++i) {
for (int j = 0; j < keys_per_file; ++j) {
int key_index = overlapping ? j : (j + i * keys_per_file);
std::string key = Key(key_index);
std::string value = rnd.RandomString(value_size);
ASSERT_OK(Put(key, value));
expected_values_[key] = value;
}
ASSERT_OK(Flush());
}
}
void VerifyDataIntegrity(int num_keys, int start_key = 0) {
std::string val;
for (int j = start_key; j < start_key + num_keys; ++j) {
std::string key = Key(j);
ASSERT_OK(dbfull()->Get(ReadOptions(), key, &val));
auto it = expected_values_.find(key);
if (it != expected_values_.end()) {
ASSERT_EQ(it->second, val) << "Value mismatch for key: " << key;
}
}
}
void ClearExpectedValues() { expected_values_.clear(); }
void RunSyncPointAbortTest(const std::string& trigger_point,
CompactRangeOptions cro = CompactRangeOptions()) {
uint64_t aborted_before = 0;
uint64_t write_bytes_before = 0;
if (stats_) {
aborted_before = stats_->getTickerCount(COMPACTION_ABORTED);
write_bytes_before = stats_->getTickerCount(COMPACT_WRITE_BYTES);
}
SyncPointAbortHelper helper(trigger_point);
helper.Setup(dbfull());
Status s = dbfull()->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.IsCompactionAborted());
if (stats_) {
uint64_t aborted_after = stats_->getTickerCount(COMPACTION_ABORTED);
ASSERT_GT(aborted_after, aborted_before)
<< "COMPACTION_ABORTED stat should increase after abort";
}
helper.CleanupAndWait();
dbfull()->ResumeAllCompactions();
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
if (stats_) {
uint64_t write_bytes_after = stats_->getTickerCount(COMPACT_WRITE_BYTES);
ASSERT_GT(write_bytes_after, write_bytes_before)
<< "COMPACT_WRITE_BYTES should increase after successful compaction";
}
}
};
class DBCompactionAbortSubcompactionTest
: public DBCompactionAbortTest,
public ::testing::WithParamInterface<int> {};
TEST_P(DBCompactionAbortSubcompactionTest, AbortWithVaryingSubcompactions) {
int max_subcompactions = GetParam();
Options options = GetOptionsWithStats();
options.level0_file_num_compaction_trigger = 4;
options.max_subcompactions = max_subcompactions;
options.disable_auto_compactions = true;
Reopen(options);
PopulateData(4, 100, 100);
RunSyncPointAbortTest("CompactionJob::RunSubcompactions:BeforeStart");
VerifyDataIntegrity(100);
}
INSTANTIATE_TEST_CASE_P(SubcompactionVariants,
DBCompactionAbortSubcompactionTest,
::testing::Values(1, 2, 4),
[](const ::testing::TestParamInfo<int>& param_info) {
return "MaxSubcompactionCount_" +
std::to_string(param_info.param);
});
class DBCompactionAbortStyleTest
: public DBCompactionAbortTest,
public ::testing::WithParamInterface<CompactionStyle> {
protected:
void ConfigureOptionsForStyle(Options& options, CompactionStyle style) {
options.compaction_style = style;
options.level0_file_num_compaction_trigger = 4;
options.disable_auto_compactions = true;
switch (style) {
case kCompactionStyleLevel:
break;
case kCompactionStyleUniversal:
options.compaction_options_universal.size_ratio = 10;
break;
case kCompactionStyleFIFO:
options.compaction_options_fifo.max_table_files_size =
100 * 1024 * 1024;
options.compaction_options_fifo.allow_compaction = true;
options.max_open_files = -1; break;
default:
break;
}
}
};
TEST_P(DBCompactionAbortStyleTest, AbortCompaction) {
CompactionStyle style = GetParam();
Options options = GetOptionsWithStats();
options.max_subcompactions = 1;
ConfigureOptionsForStyle(options, style);
Reopen(options);
PopulateData(4, 100, 100);
RunSyncPointAbortTest("CompactionJob::RunSubcompactions:BeforeStart");
VerifyDataIntegrity(100);
}
INSTANTIATE_TEST_CASE_P(
CompactionStyleVariants, DBCompactionAbortStyleTest,
::testing::Values(kCompactionStyleLevel, kCompactionStyleUniversal,
kCompactionStyleFIFO),
[](const ::testing::TestParamInfo<CompactionStyle>& param_info) {
return OptionsHelper::compaction_style_to_string.at(param_info.param);
});
TEST_F(DBCompactionAbortTest, AbortManualCompaction) {
Options options = GetOptionsWithStats();
options.level0_file_num_compaction_trigger = 10;
options.disable_auto_compactions = true;
Reopen(options);
PopulateData(5, 100, 1000);
CompactRangeOptions cro;
cro.exclusive_manual_compaction = true;
RunSyncPointAbortTest("CompactionJob::ProcessKeyValueCompaction:Start", cro);
VerifyDataIntegrity(100);
}
TEST_F(DBCompactionAbortTest, AbortAutomaticCompaction) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.max_subcompactions = 2;
options.disable_auto_compactions = false;
Reopen(options);
Random rnd(301);
AbortSynchronizer abort_sync;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::ProcessKeyValueCompaction:Start",
[&](void* ) { abort_sync.TriggerAbort(dbfull()); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < 4; ++i) {
for (int j = 0; j < 100; ++j) {
ASSERT_OK(Put(Key(j), rnd.RandomString(1000)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
CleanupSyncPoints();
abort_sync.WaitForAbortCompletion();
dbfull()->ResumeAllCompactions();
for (int j = 0; j < 100; ++j) {
ASSERT_OK(Put(Key(j), rnd.RandomString(1000)));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
std::string val;
for (int j = 0; j < 100; ++j) {
ASSERT_OK(dbfull()->Get(ReadOptions(), Key(j), &val));
}
}
TEST_F(DBCompactionAbortTest, AbortAndVerifyNoOutputFiles) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.max_subcompactions = 2;
options.disable_auto_compactions = true;
Reopen(options);
PopulateData(4, 100, 1000);
int num_l0_files_before = NumTableFilesAtLevel(0);
int num_l1_files_before = NumTableFilesAtLevel(1);
SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start");
helper.Setup(dbfull());
CompactRangeOptions cro;
Status s = dbfull()->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.IsCompactionAborted());
CleanupSyncPoints();
int num_l0_files_after = NumTableFilesAtLevel(0);
int num_l1_files_after = NumTableFilesAtLevel(1);
ASSERT_EQ(num_l0_files_before, num_l0_files_after);
ASSERT_EQ(num_l1_files_before, num_l1_files_after);
helper.WaitForAbortCompletion();
dbfull()->ResumeAllCompactions();
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
int num_l0_files_final = NumTableFilesAtLevel(0);
int num_l1_files_final = NumTableFilesAtLevel(1);
ASSERT_EQ(0, num_l0_files_final);
ASSERT_GT(num_l1_files_final, 0);
VerifyDataIntegrity(100);
}
TEST_F(DBCompactionAbortTest, MultipleAbortResumeSequence) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.max_subcompactions = 2;
options.disable_auto_compactions = true;
Reopen(options);
PopulateData(4, 100, 1000);
for (int round = 0; round < 3; ++round) {
SyncPointAbortHelper helper(
"CompactionJob::ProcessKeyValueCompaction:Start");
helper.Setup(dbfull());
CompactRangeOptions cro;
Status s = dbfull()->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.IsCompactionAborted());
helper.CleanupAndWait();
dbfull()->ResumeAllCompactions();
}
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyDataIntegrity(100);
}
TEST_F(DBCompactionAbortTest, AbortWithOutputFilesCleanup) {
Options options = CurrentOptions();
options.num_levels = 2; options.level0_file_num_compaction_trigger = 4;
options.max_subcompactions = 2;
options.disable_auto_compactions = true;
options.target_file_size_base = 50 * 1024;
Reopen(options);
PopulateData(4, 100, 100);
SyncPointAbortHelper helper("CompactionJob::RunSubcompactions:BeforeStart");
helper.Setup(dbfull());
CompactRangeOptions cro;
Status s = dbfull()->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.IsCompactionAborted());
CleanupSyncPoints();
int num_l1_files_after_abort = NumTableFilesAtLevel(1);
ASSERT_EQ(0, num_l1_files_after_abort);
helper.WaitForAbortCompletion();
dbfull()->ResumeAllCompactions();
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
int num_l0_files_final = NumTableFilesAtLevel(0);
int num_l1_files_final = NumTableFilesAtLevel(1);
ASSERT_EQ(0, num_l0_files_final)
<< "L0 should be empty after successful compaction";
ASSERT_GT(num_l1_files_final, 0)
<< "L1 should have files after successful compaction";
VerifyDataIntegrity(100);
}
TEST_F(DBCompactionAbortTest, NestedAbortResumeCalls) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.max_subcompactions = 2;
options.disable_auto_compactions = true;
Reopen(options);
PopulateData(4, 100, 1000);
dbfull()->AbortAllCompactions();
dbfull()->AbortAllCompactions();
dbfull()->ResumeAllCompactions();
CompactRangeOptions cro;
Status s = dbfull()->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.IsCompactionAborted());
dbfull()->ResumeAllCompactions();
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
VerifyDataIntegrity(100);
}
TEST_F(DBCompactionAbortTest, AbortCompactFilesAPI) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 100; options.disable_auto_compactions = true;
Reopen(options);
PopulateData(4, 100, 1000);
std::vector<std::string> files_to_compact;
ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
for (const auto& file : cf_meta.levels[0].files) {
files_to_compact.push_back(file.name);
}
ASSERT_GE(files_to_compact.size(), 2);
SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start");
helper.Setup(dbfull());
CompactionOptions compact_options;
Status s = dbfull()->CompactFiles(compact_options, files_to_compact, 1);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.IsCompactionAborted());
helper.CleanupAndWait();
dbfull()->ResumeAllCompactions();
ASSERT_OK(dbfull()->CompactFiles(compact_options, files_to_compact, 1));
VerifyDataIntegrity(100);
}
TEST_F(DBCompactionAbortTest, AbortDoesNotAffectFlush) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 100;
options.disable_auto_compactions = true;
Reopen(options);
Random rnd(301);
for (int j = 0; j < 100; ++j) {
ASSERT_OK(Put(Key(j), rnd.RandomString(1000)));
}
dbfull()->AbortAllCompactions();
ASSERT_OK(Flush());
for (int j = 100; j < 200; ++j) {
ASSERT_OK(Put(Key(j), rnd.RandomString(1000)));
}
ASSERT_OK(Flush());
dbfull()->ResumeAllCompactions();
VerifyDataIntegrity(200);
}
TEST_F(DBCompactionAbortTest, AbortBeforeCompactionStarts) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.disable_auto_compactions = true;
Reopen(options);
PopulateData(4, 100, 1000);
dbfull()->AbortAllCompactions();
CompactRangeOptions cro;
Status s = dbfull()->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.IsCompactionAborted());
dbfull()->ResumeAllCompactions();
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ(0, NumTableFilesAtLevel(0));
}
TEST_F(DBCompactionAbortTest, AbortWithInProgressFileCleanup) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.max_subcompactions =
1; options.disable_auto_compactions = true;
options.target_file_size_base = 32 * 1024;
options.enable_blob_files = true;
options.min_blob_size = 0; options.blob_file_size =
1024 * 1024; options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 1.0; options.blob_garbage_collection_force_threshold = 0.0;
Reopen(options);
PopulateData(4, 2000, 500);
auto GetBlobFilesOnDisk = [this]() -> std::vector<std::string> {
std::vector<std::string> blob_files;
std::vector<std::string> files;
EXPECT_OK(env_->GetChildren(dbname_, &files));
for (const auto& f : files) {
if (f.find(".blob") != std::string::npos) {
blob_files.push_back(f);
}
}
std::sort(blob_files.begin(), blob_files.end());
return blob_files;
};
auto GetBlobFilesInMetadata = [this]() -> std::vector<uint64_t> {
std::vector<uint64_t> blob_file_numbers;
ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta);
for (const auto& blob_meta : cf_meta.blob_files) {
blob_file_numbers.push_back(blob_meta.blob_file_number);
}
std::sort(blob_file_numbers.begin(), blob_file_numbers.end());
return blob_file_numbers;
};
auto GetSstFilesOnDisk = [this]() -> std::vector<std::string> {
std::vector<std::string> sst_files;
std::vector<std::string> files;
EXPECT_OK(env_->GetChildren(dbname_, &files));
for (const auto& f : files) {
if (f.find(".sst") != std::string::npos) {
sst_files.push_back(f);
}
}
std::sort(sst_files.begin(), sst_files.end());
return sst_files;
};
auto GetSstFilesInMetadata = [this]() -> std::vector<uint64_t> {
std::vector<uint64_t> sst_file_numbers;
ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta);
for (const auto& level : cf_meta.levels) {
for (const auto& file : level.files) {
uint64_t file_num = 0;
std::string fname = file.name;
size_t pos = fname.rfind('/');
if (pos != std::string::npos) {
fname = fname.substr(pos + 1);
}
if (sscanf(fname.c_str(), "%" PRIu64, &file_num) == 1) {
sst_file_numbers.push_back(file_num);
}
}
}
std::sort(sst_file_numbers.begin(), sst_file_numbers.end());
return sst_file_numbers;
};
std::vector<std::string> initial_blob_files = GetBlobFilesOnDisk();
std::vector<uint64_t> initial_meta_blobs = GetBlobFilesInMetadata();
std::vector<std::string> initial_sst_files = GetSstFilesOnDisk();
std::vector<uint64_t> initial_meta_ssts = GetSstFilesInMetadata();
ASSERT_GT(initial_blob_files.size(), 0u) << "Expected initial blob files";
ASSERT_EQ(initial_blob_files.size(), initial_meta_blobs.size())
<< "Initial blob files should match between disk and metadata";
ASSERT_GT(initial_sst_files.size(), 0u) << "Expected initial SST files";
ASSERT_EQ(initial_sst_files.size(), initial_meta_ssts.size())
<< "Initial SST files should match between disk and metadata";
std::atomic<int> blob_writes{0};
std::atomic<bool> abort_triggered{false};
AbortSynchronizer abort_sync;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::AbortAllCompactions:FlagSet",
"DBCompactionAbortTest::InProgressBlob:WaitForAbort"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlobFileBuilder::WriteBlobToFile:AddRecord", [&](void* ) {
int count = blob_writes.fetch_add(1) + 1;
if (count == 100 && !abort_triggered.exchange(true)) {
abort_sync.TriggerAbort(dbfull());
TEST_SYNC_POINT_CALLBACK(
"DBCompactionAbortTest::InProgressBlob:WaitForAbort", nullptr);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
CompactRangeOptions cro;
Status s = dbfull()->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete())
<< "Expected compaction to be aborted, got: " << s.ToString();
CleanupSyncPoints();
abort_sync.WaitForAbortCompletion();
std::vector<std::string> post_abort_disk_blobs = GetBlobFilesOnDisk();
std::vector<uint64_t> post_abort_meta_blobs = GetBlobFilesInMetadata();
std::vector<std::string> post_abort_disk_ssts = GetSstFilesOnDisk();
std::vector<uint64_t> post_abort_meta_ssts = GetSstFilesInMetadata();
ASSERT_EQ(post_abort_disk_blobs.size(), post_abort_meta_blobs.size())
<< "Orphan blob file detected! In-progress blob file was not cleaned up "
"after abort. Files on disk: "
<< post_abort_disk_blobs.size()
<< ", Files in metadata: " << post_abort_meta_blobs.size()
<< ". The difference indicates orphaned in-progress blob file(s).";
ASSERT_EQ(post_abort_disk_ssts.size(), post_abort_meta_ssts.size())
<< "Orphan SST file detected! In-progress SST file was not cleaned up "
"after abort. Files on disk: "
<< post_abort_disk_ssts.size()
<< ", Files in metadata: " << post_abort_meta_ssts.size()
<< ". The difference indicates orphaned in-progress SST file(s).";
dbfull()->ResumeAllCompactions();
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
VerifyDataIntegrity(2000);
}
TEST_F(DBCompactionAbortTest, AbortBottommostLevelCompaction) {
Options options = CurrentOptions();
options.num_levels = 4;
options.level0_file_num_compaction_trigger = 2;
options.max_bytes_for_level_base = 1024 * 10; options.max_bytes_for_level_multiplier = 2;
options.disable_auto_compactions = true;
Reopen(options);
PopulateData(6, 100,
500, false);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
PopulateData(4, 100, 500);
SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start");
helper.Setup(dbfull());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
Status s = dbfull()->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.IsCompactionAborted());
helper.CleanupAndWait();
dbfull()->ResumeAllCompactions();
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
VerifyDataIntegrity(600);
}
TEST_F(DBCompactionAbortTest, AbortThenAtomicRangeReplace) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 4;
options.max_subcompactions = 2;
options.disable_auto_compactions = true;
Reopen(options);
std::string sst_files_dir = dbname_ + "_sst_files/";
ASSERT_OK(env_->CreateDirIfMissing(sst_files_dir));
PopulateData(4, 100, 500);
VerifyDataIntegrity(100);
SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start");
helper.Setup(dbfull());
CompactRangeOptions cro;
Status s = dbfull()->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.IsCompactionAborted());
helper.CleanupAndWait();
std::string sst_file_path = sst_files_dir + "atomic_replace_1.sst";
SstFileWriter sst_file_writer(EnvOptions(), options);
ASSERT_OK(sst_file_writer.Open(sst_file_path));
Random rnd(42);
std::unordered_map<std::string, std::string> new_values;
for (int j = 0; j < 50; ++j) {
std::string key = Key(j);
std::string value = "replaced_" + rnd.RandomString(100);
ASSERT_OK(sst_file_writer.Put(key, value));
new_values[key] = value;
}
ASSERT_OK(sst_file_writer.Finish());
IngestExternalFileArg arg;
arg.column_family = db_->DefaultColumnFamily();
arg.external_files = {sst_file_path};
arg.atomic_replace_range = RangeOpt();
arg.options.snapshot_consistency = false;
ASSERT_OK(db_->IngestExternalFiles({arg}));
dbfull()->ResumeAllCompactions();
std::string val;
for (int j = 0; j < 50; ++j) {
std::string key = Key(j);
ASSERT_OK(db_->Get(ReadOptions(), key, &val));
auto it = new_values.find(key);
ASSERT_NE(it, new_values.end());
ASSERT_EQ(it->second, val) << "Value mismatch for replaced key: " << key;
}
for (int j = 50; j < 100; ++j) {
std::string key = Key(j);
Status get_status = db_->Get(ReadOptions(), key, &val);
ASSERT_TRUE(get_status.IsNotFound())
<< "Key " << key << " should not exist after full CF replace";
}
ASSERT_OK(DestroyDir(env_, sst_files_dir));
}
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}