#include "db/db_test_util.h"
#include "db/periodic_task_scheduler.h"
#include "db/seqno_to_time_mapping.h"
#include "port/stack_trace.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/utilities/debug.h"
#include "test_util/mock_time_env.h"
namespace ROCKSDB_NAMESPACE {
class SeqnoTimeTest : public DBTestBase {
public:
SeqnoTimeTest() : DBTestBase("seqno_time_test", false) {
mock_clock_ = std::make_shared<MockSystemClock>(env_->GetSystemClock());
mock_clock_->SetCurrentTime(kMockStartTime);
mock_env_ = std::make_unique<CompositeEnvWrapper>(env_, mock_clock_);
}
protected:
std::unique_ptr<Env> mock_env_;
std::shared_ptr<MockSystemClock> mock_clock_;
static constexpr uint32_t kMockStartTime = 10000000;
void SetUp() override {
mock_clock_->InstallTimedWaitFixCallback();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::StartPeriodicTaskScheduler:Init",
[mock_clock = mock_clock_](void* arg) {
auto periodic_task_scheduler_ptr =
static_cast<PeriodicTaskScheduler*>(arg);
periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock.get());
});
mock_clock_->SetCurrentTime(kMockStartTime);
}
void AssertKeyTemperature(int key_id, Temperature expected_temperature) {
get_iostats_context()->Reset();
IOStatsContext* iostats = get_iostats_context();
std::string result = Get(Key(key_id));
ASSERT_FALSE(result.empty());
ASSERT_GT(iostats->bytes_read, 0);
switch (expected_temperature) {
case Temperature::kUnknown:
ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_read_count,
0);
ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read,
0);
break;
case Temperature::kCold:
ASSERT_GT(iostats->file_io_stats_by_temperature.cold_file_read_count,
0);
ASSERT_GT(iostats->file_io_stats_by_temperature.cold_file_bytes_read,
0);
break;
default:
FAIL();
}
}
};
TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
const int kKeyPerSec = 10;
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.preclude_last_level_data_seconds = 10000;
options.env = mock_env_.get();
options.last_level_temperature = Temperature::kCold;
options.num_levels = kNumLevels;
DestroyAndReopen(options);
int sst_num = 0;
for (; sst_num < kNumTrigger; sst_num++) {
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
});
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
AssertKeyTemperature(20, Temperature::kUnknown);
for (; sst_num < kNumTrigger * 2; sst_num++) {
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
});
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
}
for (; sst_num < kNumTrigger * 3; sst_num++) {
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(kKeyPerSec));
});
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
uint64_t hot_data_size = GetSstSizeHelper(Temperature::kUnknown);
uint64_t cold_data_size = GetSstSizeHelper(Temperature::kCold);
ASSERT_GT(hot_data_size, 0);
ASSERT_GT(cold_data_size, 0);
AssertKeyTemperature(20, Temperature::kCold);
for (int i = 0; i < 30; i++) {
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(20 * kKeyPerSec));
});
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
AssertKeyTemperature(i * 20 + 250, Temperature::kUnknown);
AssertKeyTemperature(i * 20 + 200, Temperature::kCold);
}
ASSERT_LT(GetSstSizeHelper(Temperature::kUnknown), hot_data_size);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), cold_data_size);
for (int i = 0; i < 5; i++) {
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(1000)); });
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
}
AssertKeyTemperature(1000, Temperature::kCold);
Close();
}
TEST_F(SeqnoTimeTest, TemperatureBasicLevel) {
const int kNumLevels = 7;
const int kNumKeys = 100;
Options options = CurrentOptions();
options.preclude_last_level_data_seconds = 10000;
options.env = mock_env_.get();
options.last_level_temperature = Temperature::kCold;
options.num_levels = kNumLevels;
options.level_compaction_dynamic_level_bytes = true;
options.disable_auto_compactions = true;
DestroyAndReopen(options);
int sst_num = 0;
for (; sst_num < 4; sst_num++) {
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); });
}
ASSERT_OK(Flush());
}
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
AssertKeyTemperature(20, Temperature::kUnknown);
for (; sst_num < 14; sst_num++) {
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); });
}
ASSERT_OK(Flush());
}
MoveFilesToLevel(5);
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
MoveFilesToLevel(6);
uint64_t hot_data_size = GetSstSizeHelper(Temperature::kUnknown);
uint64_t cold_data_size = GetSstSizeHelper(Temperature::kCold);
ASSERT_GT(hot_data_size, 0);
ASSERT_GT(cold_data_size, 0);
AssertKeyTemperature(20, Temperature::kCold);
for (int i = 0; i < 30; i++) {
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(200)); });
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
uint64_t pre_hot = hot_data_size;
uint64_t pre_cold = cold_data_size;
hot_data_size = GetSstSizeHelper(Temperature::kUnknown);
cold_data_size = GetSstSizeHelper(Temperature::kCold);
ASSERT_LT(hot_data_size, pre_hot);
ASSERT_GT(cold_data_size, pre_cold);
AssertKeyTemperature(i * 20 + 450, Temperature::kUnknown);
AssertKeyTemperature(i * 20 + 400, Temperature::kCold);
}
for (int i = 0; i < 5; i++) {
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(1000)); });
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
}
AssertKeyTemperature(1000, Temperature::kCold);
Close();
}
enum class SeqnoTimeTestType : char {
kTrackInternalTimeSeconds = 0,
kPrecludeLastLevel = 1,
kBothSetTrackSmaller = 2,
};
class SeqnoTimeTablePropTest
: public SeqnoTimeTest,
public ::testing::WithParamInterface<SeqnoTimeTestType> {
public:
SeqnoTimeTablePropTest() : SeqnoTimeTest() {}
void SetTrackTimeDurationOptions(uint64_t track_time_duration,
Options& options) const {
switch (GetParam()) {
case SeqnoTimeTestType::kTrackInternalTimeSeconds:
options.preclude_last_level_data_seconds = 0;
options.preserve_internal_time_seconds = track_time_duration;
break;
case SeqnoTimeTestType::kPrecludeLastLevel:
options.preclude_last_level_data_seconds = track_time_duration;
options.preserve_internal_time_seconds = 0;
break;
case SeqnoTimeTestType::kBothSetTrackSmaller:
options.preclude_last_level_data_seconds = track_time_duration;
options.preserve_internal_time_seconds = track_time_duration / 10;
break;
}
}
};
INSTANTIATE_TEST_CASE_P(
SeqnoTimeTablePropTest, SeqnoTimeTablePropTest,
::testing::Values(SeqnoTimeTestType::kTrackInternalTimeSeconds,
SeqnoTimeTestType::kPrecludeLastLevel,
SeqnoTimeTestType::kBothSetTrackSmaller));
TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) {
Options options = CurrentOptions();
SetTrackTimeDurationOptions(10000, options);
options.env = mock_env_.get();
options.disable_auto_compactions = true;
DestroyAndReopen(options);
std::set<uint64_t> checked_file_nums;
SequenceNumber start_seq = dbfull()->GetLatestSequenceNumber() + 1;
uint64_t start_time = mock_clock_->NowSeconds();
for (int i = 0; i < 200; i++) {
ASSERT_OK(Put(Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); });
}
ASSERT_OK(Flush());
TablePropertiesCollection tables_props;
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
ASSERT_EQ(tables_props.size(), 1);
auto it = tables_props.begin();
SeqnoToTimeMapping tp_mapping;
ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping));
ASSERT_TRUE(tp_mapping.TEST_IsEnforced());
ASSERT_FALSE(tp_mapping.Empty());
auto seqs = tp_mapping.TEST_GetInternalMapping();
EXPECT_GE(seqs.size(), 20 / 2);
EXPECT_LE(seqs.size(), 22);
auto ValidateProximalSeqnos = [&](const char* name, double fuzz_ratio) {
SequenceNumber seq_end = dbfull()->GetLatestSequenceNumber() + 1;
uint64_t end_time = mock_clock_->NowSeconds();
uint64_t seqno_fuzz =
static_cast<uint64_t>((seq_end - start_seq) * fuzz_ratio + 0.999999);
for (unsigned time_pct = 0; time_pct <= 100; time_pct++) {
SCOPED_TRACE("name=" + std::string(name) +
" time_pct=" + std::to_string(time_pct));
uint64_t t = start_time + time_pct * (end_time - start_time) / 100;
auto seqno_reported = tp_mapping.GetProximalSeqnoBeforeTime(t);
auto seqno_expected = start_seq + time_pct * (seq_end - start_seq) / 100;
EXPECT_LE(seqno_reported, seqno_expected);
if (end_time - t < 10000) {
EXPECT_LE(seqno_expected, seqno_reported + seqno_fuzz);
}
}
start_seq = seq_end;
start_time = end_time;
};
ValidateProximalSeqnos("a", 0.1);
checked_file_nums.insert(it->second->orig_file_number);
for (int i = 0; i < 200; i++) {
ASSERT_OK(Put(Key(i + 190), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(1)); });
}
ASSERT_OK(Flush());
tables_props.clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
ASSERT_EQ(tables_props.size(), 2);
it = tables_props.begin();
while (it != tables_props.end()) {
if (!checked_file_nums.count(it->second->orig_file_number)) {
break;
}
it++;
}
ASSERT_TRUE(it != tables_props.end());
tp_mapping.Clear();
ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping));
ASSERT_TRUE(tp_mapping.TEST_IsEnforced());
seqs = tp_mapping.TEST_GetInternalMapping();
ASSERT_GE(seqs.size(), 1);
ASSERT_LE(seqs.size(), 3);
ValidateProximalSeqnos("b", 0.5);
checked_file_nums.insert(it->second->orig_file_number);
for (int i = 0; i < 200; i++) {
ASSERT_OK(Put(Key(i + 380), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(200)); });
}
ASSERT_OK(Flush());
tables_props.clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
ASSERT_EQ(tables_props.size(), 3);
it = tables_props.begin();
while (it != tables_props.end()) {
if (!checked_file_nums.count(it->second->orig_file_number)) {
break;
}
it++;
}
ASSERT_TRUE(it != tables_props.end());
tp_mapping.Clear();
ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping));
ASSERT_TRUE(tp_mapping.TEST_IsEnforced());
seqs = tp_mapping.TEST_GetInternalMapping();
ASSERT_GE(seqs.size(), 50);
ASSERT_LE(seqs.size(), 51);
ValidateProximalSeqnos("c", 0.04);
checked_file_nums.insert(it->second->orig_file_number);
for (int i = 0; i < 200; i++) {
ASSERT_OK(Put(Key(i + 570), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); });
}
ASSERT_OK(Flush());
tables_props.clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
ASSERT_EQ(tables_props.size(), 4);
it = tables_props.begin();
while (it != tables_props.end()) {
if (!checked_file_nums.count(it->second->orig_file_number)) {
break;
}
it++;
}
ASSERT_TRUE(it != tables_props.end());
tp_mapping.Clear();
ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping));
ASSERT_TRUE(tp_mapping.TEST_IsEnforced());
seqs = tp_mapping.TEST_GetInternalMapping();
ASSERT_GE(seqs.size(), 99);
ASSERT_LE(seqs.size(), 101);
checked_file_nums.insert(it->second->orig_file_number);
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "false"},
}));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
tables_props.clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
ASSERT_GE(tables_props.size(), 1);
it = tables_props.begin();
while (it != tables_props.end()) {
if (!checked_file_nums.count(it->second->orig_file_number)) {
break;
}
it++;
}
ASSERT_TRUE(it != tables_props.end());
tp_mapping.Clear();
ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping));
ASSERT_TRUE(tp_mapping.TEST_IsEnforced());
seqs = tp_mapping.TEST_GetInternalMapping();
ASSERT_GE(seqs.size(), 99);
ASSERT_LE(seqs.size(), 101);
ValidateProximalSeqnos("d", 0.02);
ASSERT_OK(db_->Close());
}
TEST_P(SeqnoTimeTablePropTest, MultiCFs) {
Options options = CurrentOptions();
options.preclude_last_level_data_seconds = 0;
options.preserve_internal_time_seconds = 0;
options.env = mock_env_.get();
options.stats_dump_period_sec = 0;
options.stats_persist_period_sec = 0;
ReopenWithColumnFamilies({"default"}, options);
const PeriodicTaskScheduler& scheduler =
dbfull()->TEST_GetPeriodicTaskScheduler();
ASSERT_FALSE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime));
for (int i = 0; i < 200; i++) {
ASSERT_OK(Put(Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); });
}
ASSERT_OK(Flush());
TablePropertiesCollection tables_props;
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
ASSERT_EQ(tables_props.size(), 1);
auto it = tables_props.begin();
ASSERT_TRUE(it->second->seqno_to_time_mapping.empty());
ASSERT_TRUE(dbfull()->TEST_GetSeqnoToTimeMapping().Empty());
Options options_1 = options;
SetTrackTimeDurationOptions(10000, options_1);
CreateColumnFamilies({"one"}, options_1);
ASSERT_TRUE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime));
for (int i = 0; i < 200; i++) {
ASSERT_OK(Put(Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); });
}
ASSERT_OK(Flush());
for (int i = 0; i < 20; i++) {
ASSERT_OK(Put(1, Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); });
}
ASSERT_OK(Flush(1));
tables_props.clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[1], &tables_props));
ASSERT_EQ(tables_props.size(), 1);
it = tables_props.begin();
SeqnoToTimeMapping tp_mapping;
ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping));
ASSERT_TRUE(tp_mapping.TEST_IsEnforced());
ASSERT_FALSE(tp_mapping.Empty());
auto seqs = tp_mapping.TEST_GetInternalMapping();
ASSERT_GE(seqs.size(), 1);
ASSERT_LE(seqs.size(), 4);
Options options_2 = options;
SetTrackTimeDurationOptions(1000000, options_2); CreateColumnFamilies({"two"}, options_2);
for (int i = 0; i < 2000; i++) {
ASSERT_OK(Put(2, Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); });
}
seqs = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping();
ASSERT_GE(seqs.size(), 1000 - 1);
ASSERT_LE(seqs.size(), 1000 * 9 / 8);
ASSERT_OK(Flush(2));
tables_props.clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[2], &tables_props));
ASSERT_EQ(tables_props.size(), 1);
it = tables_props.begin();
tp_mapping.Clear();
ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping));
ASSERT_TRUE(tp_mapping.TEST_IsEnforced());
seqs = tp_mapping.TEST_GetInternalMapping();
ASSERT_GE(seqs.size(), 100 - 1);
ASSERT_LE(seqs.size(), 100 + 1);
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(0, Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); });
}
seqs = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping();
ASSERT_OK(Flush(0));
for (int j = 0; j < 3; j++) {
for (int i = 0; i < 200; i++) {
ASSERT_OK(Put(2, Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); });
}
ASSERT_OK(Flush(2));
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
tables_props.clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[2], &tables_props));
ASSERT_EQ(tables_props.size(), 1);
it = tables_props.begin();
tp_mapping.Clear();
ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping));
ASSERT_TRUE(tp_mapping.TEST_IsEnforced());
seqs = tp_mapping.TEST_GetInternalMapping();
ASSERT_GE(seqs.size(), 99);
ASSERT_LE(seqs.size(), 101);
for (int i = 0; i < 200; i++) {
ASSERT_OK(Put(0, Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); });
}
ASSERT_OK(Flush(0));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
tables_props.clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[0], &tables_props));
ASSERT_EQ(tables_props.size(), 1);
it = tables_props.begin();
ASSERT_TRUE(it->second->seqno_to_time_mapping.empty());
for (int i = 0; i < 1000; i++) {
ASSERT_OK(Put(2, Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); });
}
ASSERT_GE(
dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping().size(),
500);
ASSERT_OK(db_->DropColumnFamily(handles_[1]));
ASSERT_LE(
dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping().size(),
100 + 5);
ASSERT_OK(db_->DropColumnFamily(handles_[2]));
ASSERT_EQ(
dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping().size(),
0);
ASSERT_FALSE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime));
Close();
}
TEST_P(SeqnoTimeTablePropTest, MultiInstancesBasic) {
const int kInstanceNum = 2;
Options options = CurrentOptions();
SetTrackTimeDurationOptions(10000, options);
options.env = mock_env_.get();
options.stats_dump_period_sec = 0;
options.stats_persist_period_sec = 0;
auto dbs = std::vector<std::unique_ptr<DB>>(kInstanceNum);
for (int i = 0; i < kInstanceNum; i++) {
ASSERT_OK(
DB::Open(options, test::PerThreadDBPath(std::to_string(i)), &(dbs[i])));
}
auto dbi = static_cast_with_check<DBImpl>(dbs[1].get());
WriteOptions wo;
for (int i = 0; i < 200; i++) {
ASSERT_OK(dbi->Put(wo, Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(100)); });
}
SeqnoToTimeMapping seqno_to_time_mapping = dbi->TEST_GetSeqnoToTimeMapping();
ASSERT_GT(seqno_to_time_mapping.Size(), 10);
for (int i = 0; i < kInstanceNum; i++) {
ASSERT_OK(dbs[i]->Close());
dbs[i].reset();
}
}
TEST_P(SeqnoTimeTablePropTest, SeqnoToTimeMappingUniversal) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
Options options = CurrentOptions();
SetTrackTimeDurationOptions(10000, options);
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = kNumLevels;
options.env = mock_env_.get();
DestroyAndReopen(options);
std::atomic_uint64_t num_seqno_zeroing{0};
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::PrepareOutput:ZeroingSeq",
[&](void* ) { num_seqno_zeroing++; });
SyncPoint::GetInstance()->EnableProcessing();
int sst_num = 0;
for (; sst_num < kNumTrigger - 1; sst_num++) {
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); });
}
ASSERT_OK(Flush());
}
TablePropertiesCollection tables_props;
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
ASSERT_EQ(tables_props.size(), 3);
for (const auto& props : tables_props) {
ASSERT_FALSE(props.second->seqno_to_time_mapping.empty());
SeqnoToTimeMapping tp_mapping;
ASSERT_OK(tp_mapping.DecodeFrom(props.second->seqno_to_time_mapping));
ASSERT_TRUE(tp_mapping.TEST_IsEnforced());
ASSERT_FALSE(tp_mapping.Empty());
auto seqs = tp_mapping.TEST_GetInternalMapping();
ASSERT_GE(seqs.size(), 10);
ASSERT_LE(seqs.size(), 10 + 2);
}
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); });
}
sst_num++;
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
tables_props.clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
ASSERT_EQ(tables_props.size(), 1);
auto it = tables_props.begin();
SeqnoToTimeMapping tp_mapping;
ASSERT_FALSE(it->second->seqno_to_time_mapping.empty());
ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping));
ASSERT_TRUE(tp_mapping.TEST_IsEnforced());
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
if (options.preclude_last_level_data_seconds > 0) {
ASSERT_GT(NumTableFilesAtLevel(5), 0);
ASSERT_EQ(NumTableFilesAtLevel(6), 0);
} else {
ASSERT_EQ(NumTableFilesAtLevel(5), 0);
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
tables_props.clear();
tp_mapping.Clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
ASSERT_EQ(tables_props.size(), 1);
ASSERT_EQ(num_seqno_zeroing, 0);
it = tables_props.begin();
ASSERT_FALSE(it->second->seqno_to_time_mapping.empty());
ASSERT_OK(tp_mapping.DecodeFrom(it->second->seqno_to_time_mapping));
ASSERT_TRUE(tp_mapping.TEST_IsEnforced());
mock_clock_->MockSleepForSeconds(static_cast<int>(8000));
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
tables_props.clear();
tp_mapping.Clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
if (options.preclude_last_level_data_seconds > 0) {
ASSERT_EQ(tables_props.size(), 2);
} else {
ASSERT_EQ(tables_props.size(), 1);
}
ASSERT_GT(num_seqno_zeroing, 0);
std::vector<KeyVersion> key_versions;
ASSERT_OK(GetAllKeyVersions(
db_.get(), {}, {}, std::numeric_limits<size_t>::max(), &key_versions));
ASSERT_GT(key_versions.size(), 300);
for (int i = 0; i < 100; i++) {
ASSERT_EQ(key_versions[i].sequence, 0);
}
auto rit = key_versions.rbegin();
for (int i = 0; i < 100; i++) {
ASSERT_GT(rit->sequence, 0);
rit++;
}
mock_clock_->MockSleepForSeconds(static_cast<int>(20000));
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_GT(num_seqno_zeroing, 0);
ASSERT_GT(NumTableFilesAtLevel(6), 0);
Close();
}
TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) {
Options base_options = CurrentOptions();
base_options.env = mock_env_.get();
base_options.disable_auto_compactions = true;
base_options.create_missing_column_families = true;
Options track_options = base_options;
constexpr uint32_t kPreserveSecs = 1234567;
SetTrackTimeDurationOptions(kPreserveSecs, track_options);
SeqnoToTimeMapping sttm;
SequenceNumber latest_seqno;
uint64_t start_time, end_time;
for (bool with_write : {false, true}) {
SCOPED_TRACE("with_write=" + std::to_string(with_write));
DestroyAndReopen(base_options);
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
ASSERT_TRUE(sttm.Empty());
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
if (with_write) {
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
} else {
ASSERT_OK(Delete("blah"));
}
CreateColumnFamilies({"one"}, track_options);
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
latest_seqno = db_->GetLatestSequenceNumber();
start_time = mock_clock_->NowSeconds();
ASSERT_EQ(sttm.Size(), 1);
ASSERT_EQ(latest_seqno, 1U);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time), 1U);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time - 1),
kUnknownSeqnoBeforeAll);
for (int i = 0; i < 20; i++) {
ASSERT_OK(Put(Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(kPreserveSecs / 99));
});
}
ASSERT_OK(Flush());
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
latest_seqno = db_->GetLatestSequenceNumber();
end_time = mock_clock_->NowSeconds();
ASSERT_EQ(sttm.Size(), 21);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(end_time), latest_seqno);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time), 1U);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time - 1),
kUnknownSeqnoBeforeAll);
}
for (bool with_write : {false, true}) {
SCOPED_TRACE("with_write=" + std::to_string(with_write));
DestroyAndReopen(base_options);
if (with_write) {
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
}
ASSERT_OK(ReadOnlyReopen(base_options));
if (with_write) {
ASSERT_EQ(Get("foo"), "bar");
}
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
ASSERT_EQ(sttm.Size(), 0);
if (!with_write) {
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);
}
ASSERT_OK(ReadOnlyReopen(track_options));
if (with_write) {
ASSERT_EQ(Get("foo"), "bar");
}
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
ASSERT_EQ(sttm.Size(), 0);
if (!with_write) {
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);
Reopen(track_options);
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
ASSERT_EQ(sttm.Size(), 1);
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);
}
}
DestroyAndReopen(track_options);
constexpr auto kPrePopPairs = kMaxSeqnoTimePairsPerSST;
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
latest_seqno = db_->GetLatestSequenceNumber();
start_time = mock_clock_->NowSeconds();
ASSERT_EQ(sttm.Size(), kPrePopPairs);
ASSERT_EQ(latest_seqno, kPrePopPairs);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time), latest_seqno);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time - kPreserveSecs), 1);
for (auto ratio : {0.0, 0.433, 0.678, 0.987, 1.0}) {
uint64_t t = start_time - kPreserveSecs +
static_cast<uint64_t>(ratio * kPreserveSecs + 0.9999999);
SequenceNumber s =
static_cast<SequenceNumber>(ratio * (latest_seqno - 1)) + 1;
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(t), s);
}
for (int i = 0; i < 20; i++) {
ASSERT_OK(Put(Key(i), "value"));
dbfull()->TEST_WaitForPeriodicTaskRun([&] {
mock_clock_->MockSleepForSeconds(static_cast<int>(kPreserveSecs / 99));
});
}
ASSERT_OK(Flush());
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
latest_seqno = db_->GetLatestSequenceNumber();
end_time = mock_clock_->NowSeconds();
ASSERT_GE(sttm.Size(), kPrePopPairs);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(end_time), latest_seqno);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time - kPreserveSecs / 2),
kPrePopPairs / 2);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time - kPreserveSecs),
kUnknownSeqnoBeforeAll);
ASSERT_OK(ReadOnlyReopen(track_options));
ASSERT_EQ(Get(Key(0)), "value");
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
ASSERT_EQ(sttm.Size(), 0);
Destroy(track_options);
ReopenWithColumnFamilies({"default", "one"},
List({base_options, track_options}));
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
latest_seqno = db_->GetLatestSequenceNumber();
start_time = mock_clock_->NowSeconds();
ASSERT_EQ(sttm.Size(), kPrePopPairs);
ASSERT_EQ(latest_seqno, kPrePopPairs);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time), latest_seqno);
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time - kPreserveSecs), 1);
ReopenWithColumnFamilies({"default", "one"},
List({base_options, base_options}));
ASSERT_EQ(latest_seqno, db_->GetLatestSequenceNumber());
Close();
}
TEST_F(SeqnoTimeTest, MappingAppend) {
using P = SeqnoToTimeMapping::SeqnoTimePair;
SeqnoToTimeMapping test;
test.SetMaxTimeSpan(100).SetCapacity(10);
ASSERT_FALSE(test.Append(0, 100));
ASSERT_TRUE(test.Append(3, 200));
auto size = test.Size();
ASSERT_TRUE(test.Append(10, 300));
size++;
ASSERT_EQ(size, test.Size());
ASSERT_FALSE(test.Append(10, 301));
ASSERT_EQ(size, test.Size());
ASSERT_EQ(test.TEST_GetLastEntry(), P({10, 300}));
ASSERT_FALSE(test.Append(10, 299));
ASSERT_EQ(size, test.Size());
ASSERT_EQ(test.TEST_GetLastEntry(), P({10, 299}));
ASSERT_FALSE(test.Append(11, 299));
ASSERT_EQ(size, test.Size());
ASSERT_EQ(test.TEST_GetLastEntry(), P({11, 299}));
ASSERT_FALSE(test.Append(11, 250));
ASSERT_EQ(size, test.Size());
ASSERT_EQ(test.TEST_GetLastEntry(), P({11, 250}));
}
TEST_F(SeqnoTimeTest, CapacityLimits) {
using P = SeqnoToTimeMapping::SeqnoTimePair;
SeqnoToTimeMapping test;
test.SetCapacity(3);
EXPECT_TRUE(test.Append(10, 300));
EXPECT_TRUE(test.Append(20, 400));
EXPECT_TRUE(test.Append(30, 500));
EXPECT_TRUE(test.Append(40, 600));
EXPECT_EQ(3U, test.Size());
EXPECT_EQ(test.TEST_GetLastEntry(), P({40, 600}));
test.SetCapacity(2);
EXPECT_EQ(2U, test.Size());
EXPECT_EQ(test.TEST_GetLastEntry(), P({40, 600}));
EXPECT_TRUE(test.Append(50, 700));
EXPECT_EQ(2U, test.Size());
EXPECT_EQ(test.TEST_GetLastEntry(), P({50, 700}));
test.SetCapacity(1);
EXPECT_EQ(2U, test.Size());
EXPECT_EQ(test.TEST_GetLastEntry(), P({50, 700}));
EXPECT_TRUE(test.Append(60, 800));
EXPECT_EQ(2U, test.Size());
EXPECT_EQ(test.TEST_GetLastEntry(), P({60, 800}));
test.SetCapacity(0);
EXPECT_EQ(0U, test.Size());
EXPECT_FALSE(test.Append(70, 900));
EXPECT_EQ(0U, test.Size());
test.SetCapacity(UINT64_MAX);
for (unsigned i = 1; i <= 10101U; i++) {
EXPECT_TRUE(test.Append(i, 11U * i));
}
EXPECT_EQ(10101U, test.Size());
}
TEST_F(SeqnoTimeTest, TimeSpanLimits) {
SeqnoToTimeMapping test;
for (unsigned i = 1; i <= 63U; i++) {
EXPECT_TRUE(test.Append(1000 + i, uint64_t{1} << i));
}
EXPECT_EQ(63U, test.Size());
test.Clear();
test.SetMaxTimeSpan(UINT64_MAX);
for (unsigned i = 1; i <= 63U; i++) {
EXPECT_TRUE(test.Append(1000 + i, uint64_t{1} << i));
}
EXPECT_EQ(63U, test.Size());
test.SetMaxTimeSpan(10);
EXPECT_EQ(2U, test.Size());
test.SetMaxTimeSpan(1);
EXPECT_EQ(2U, test.Size());
test.SetMaxTimeSpan(0);
EXPECT_EQ(1U, test.Size());
EXPECT_TRUE(test.Append(2000, (uint64_t{1} << 63) + 42U));
EXPECT_EQ(1U, test.Size());
test.Clear();
test.SetMaxTimeSpan(100);
EXPECT_TRUE(test.Append(1001, 123));
EXPECT_TRUE(test.Append(1002, 134));
EXPECT_TRUE(test.Append(1003, 150));
EXPECT_TRUE(test.Append(1004, 189));
EXPECT_TRUE(test.Append(1005, 220));
EXPECT_EQ(5U, test.Size());
EXPECT_TRUE(test.Append(1006, 233));
EXPECT_EQ(6U, test.Size());
EXPECT_TRUE(test.Append(1007, 234));
EXPECT_EQ(6U, test.Size());
EXPECT_TRUE(test.Append(1008, 235));
EXPECT_EQ(7U, test.Size());
EXPECT_TRUE(test.Append(1009, 300));
EXPECT_EQ(6U, test.Size());
EXPECT_TRUE(test.Append(1010, 350));
EXPECT_EQ(3U, test.Size());
EXPECT_TRUE(test.Append(1011, 470));
EXPECT_EQ(2U, test.Size());
}
TEST_F(SeqnoTimeTest, ProximalFunctions) {
SeqnoToTimeMapping test;
test.SetCapacity(10);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(1), kUnknownTimeBeforeAll);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(1000000000000U),
kUnknownTimeBeforeAll);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(1), kUnknownSeqnoBeforeAll);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(1000000000000U),
kUnknownSeqnoBeforeAll);
EXPECT_TRUE(test.Append(10, 500));
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(9), kUnknownTimeBeforeAll);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(10), kUnknownTimeBeforeAll);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(11), 500U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(1000000000000U), 500U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(499), kUnknownSeqnoBeforeAll);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(500), 10U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(501), 10U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(1000000000000U), 10U);
EXPECT_TRUE(test.Append(20, 600));
EXPECT_TRUE(test.Append(30, 700));
EXPECT_EQ(test.Size(), 3U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(10), kUnknownTimeBeforeAll);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(11), 500U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(20), 500U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(21), 600U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(30), 600U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(31), 700U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(1000000000000U), 700U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(499), kUnknownSeqnoBeforeAll);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(500), 10U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(501), 10U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(599), 10U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(600), 20U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(601), 20U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(699), 20U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(700), 30U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(701), 30U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(1000000000000U), 30U);
EXPECT_EQ(test.Size(), 3U);
EXPECT_FALSE(test.Append(30, 700));
EXPECT_EQ(test.Size(), 3U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(30), 600U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(31), 700U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(699), 20U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(700), 30U);
EXPECT_FALSE(test.Append(30, 800));
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(30), 600U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(31), 700U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(699), 20U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(700), 30U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(800), 30U);
EXPECT_TRUE(test.Append(40, 900));
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(30), 600U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(41), 900U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(899), 30U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(900), 40U);
EXPECT_FALSE(test.Append(50, 900));
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(49), 700U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(51), 900U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(899), 30U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(900), 50U);
}
TEST_F(SeqnoTimeTest, PrePopulate) {
SeqnoToTimeMapping test;
test.SetMaxTimeSpan(100).SetCapacity(10);
EXPECT_EQ(test.Size(), 0U);
test.PrePopulate(10, 11, 500, 600);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(10), kUnknownTimeBeforeAll);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(11), 500U);
EXPECT_EQ(test.GetProximalTimeBeforeSeqno(12), 600U);
test.Clear();
uint64_t kTimeIncrement = 1234567;
test.PrePopulate(1, 12, kTimeIncrement, kTimeIncrement * 2);
for (uint64_t i = 0; i <= 12; ++i) {
uint64_t t = kTimeIncrement + (i * kTimeIncrement) / 11 - 1;
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(t), i);
}
test.Clear();
test.PrePopulate(1, 34567, kTimeIncrement, kTimeIncrement * 2);
for (auto ratio : {0.0, 0.433, 0.678, 0.987, 1.0}) {
uint64_t t = kTimeIncrement +
static_cast<uint64_t>(ratio * kTimeIncrement + 0.9999999);
SequenceNumber s = static_cast<SequenceNumber>(ratio * (34567 - 1)) + 1;
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(t), s);
}
}
TEST_F(SeqnoTimeTest, CopyFromSeqnoRange) {
SeqnoToTimeMapping test_from;
SeqnoToTimeMapping test_to;
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 0, 1000000);
EXPECT_EQ(test_to.Size(), 0U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 100, 100);
EXPECT_EQ(test_to.Size(), 0U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, kMaxSequenceNumber, 0);
EXPECT_EQ(test_to.Size(), 0U);
EXPECT_TRUE(test_from.Append(10, 500));
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 0, 1000000);
EXPECT_EQ(test_to.Size(), 1U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 100, 100);
EXPECT_EQ(test_to.Size(), 1U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, kMaxSequenceNumber, 0);
EXPECT_EQ(test_to.Size(), 1U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 0, 9);
EXPECT_EQ(test_to.Size(), 0U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 0, 10);
EXPECT_EQ(test_to.Size(), 1U);
EXPECT_TRUE(test_from.Append(20, 600));
EXPECT_TRUE(test_from.Append(30, 700));
EXPECT_TRUE(test_from.Append(40, 800));
EXPECT_TRUE(test_from.Append(50, 900));
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 0, 1000000);
EXPECT_EQ(test_to.Size(), 5U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 100, 100);
EXPECT_EQ(test_to.Size(), 1U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 19, 19);
EXPECT_EQ(test_to.Size(), 1U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, kMaxSequenceNumber, 0);
EXPECT_EQ(test_to.Size(), 1U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 0, 9);
EXPECT_EQ(test_to.Size(), 0U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 0, 10);
EXPECT_EQ(test_to.Size(), 1U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 20, 20);
EXPECT_EQ(test_to.Size(), 2U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 20, 29);
EXPECT_EQ(test_to.Size(), 2U);
test_to.Clear();
test_to.CopyFromSeqnoRange(test_from, 20, 30);
EXPECT_EQ(test_to.Size(), 3U);
}
TEST_F(SeqnoTimeTest, EnforceWithNow) {
constexpr uint64_t kMaxTimeSpan = 420;
SeqnoToTimeMapping test;
test.SetMaxTimeSpan(kMaxTimeSpan).SetCapacity(10);
EXPECT_EQ(test.Size(), 0U);
test.Enforce(500);
EXPECT_EQ(test.Size(), 0U);
EXPECT_TRUE(test.Append(10, 500));
EXPECT_TRUE(test.Append(20, 600));
EXPECT_TRUE(test.Append(30, 700));
EXPECT_TRUE(test.Append(40, 800));
EXPECT_TRUE(test.Append(50, 900));
EXPECT_EQ(test.Size(), 5U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(500), 10U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(599), 10U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(600), 20U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(699), 20U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(700), 30U);
test.Enforce(500 + kMaxTimeSpan);
EXPECT_EQ(test.Size(), 5U);
test.Enforce(599 + kMaxTimeSpan);
EXPECT_EQ(test.Size(), 5U);
test.Enforce(600 + kMaxTimeSpan);
EXPECT_EQ(test.Size(), 4U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(500), kUnknownSeqnoBeforeAll);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(599), kUnknownSeqnoBeforeAll);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(600), 20U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(699), 20U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(700), 30U);
test.Enforce(600 + kMaxTimeSpan);
EXPECT_EQ(test.Size(), 4U);
test.Enforce(699 + kMaxTimeSpan);
EXPECT_EQ(test.Size(), 4U);
test.Enforce(899 + kMaxTimeSpan);
EXPECT_EQ(test.Size(), 2U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(799), kUnknownSeqnoBeforeAll);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(899), 40U);
test.Enforce(10000000);
EXPECT_EQ(test.Size(), 1U);
EXPECT_EQ(test.GetProximalSeqnoBeforeTime(10000000), 50U);
}
TEST_F(SeqnoTimeTest, Sort) {
SeqnoToTimeMapping test;
test.AddUnenforced(10, 11);
test.Enforce();
ASSERT_EQ(test.Size(), 1);
test.AddUnenforced(10, 11);
test.Enforce();
ASSERT_EQ(test.Size(), 1);
test.AddUnenforced(10, 10);
test.AddUnenforced(10, 12);
test.Enforce();
auto seqs = test.TEST_GetInternalMapping();
std::deque<SeqnoToTimeMapping::SeqnoTimePair> expected;
expected.emplace_back(10, 10);
ASSERT_EQ(expected, seqs);
test.AddUnenforced(9, 11);
test.Enforce();
seqs = test.TEST_GetInternalMapping();
ASSERT_EQ(expected, seqs);
test.AddUnenforced(11, 9);
test.Enforce();
seqs = test.TEST_GetInternalMapping();
expected.clear();
expected.emplace_back(11, 9);
ASSERT_EQ(expected, seqs);
test.AddUnenforced(1, 5);
test.AddUnenforced(100, 100);
test.Enforce();
seqs = test.TEST_GetInternalMapping();
expected.clear();
expected.emplace_back(1, 5);
expected.emplace_back(11, 9);
expected.emplace_back(100, 100);
ASSERT_EQ(expected, seqs);
}
TEST_F(SeqnoTimeTest, EncodeDecodeBasic) {
constexpr uint32_t kOriginalSamples = 1000;
SeqnoToTimeMapping test;
test.SetCapacity(kOriginalSamples);
std::string output;
test.EncodeTo(output);
ASSERT_TRUE(output.empty());
ASSERT_OK(test.DecodeFrom(output));
ASSERT_EQ(test.Size(), 0U);
Random rnd(123);
for (uint32_t i = 1; i <= kOriginalSamples; i++) {
ASSERT_TRUE(test.Append(i, i * 10 + rnd.Uniform(10)));
}
output.clear();
test.EncodeTo(output);
ASSERT_FALSE(output.empty());
SeqnoToTimeMapping decoded;
ASSERT_OK(decoded.DecodeFrom(output));
ASSERT_TRUE(decoded.TEST_IsEnforced());
ASSERT_EQ(test.Size(), decoded.Size());
ASSERT_EQ(test.TEST_GetInternalMapping(), decoded.TEST_GetInternalMapping());
constexpr uint32_t kReducedSize = 51U;
output.clear();
SeqnoToTimeMapping(test).SetCapacity(kReducedSize).EncodeTo(output);
decoded.Clear();
ASSERT_OK(decoded.DecodeFrom(output));
ASSERT_TRUE(decoded.TEST_IsEnforced());
ASSERT_EQ(decoded.Size(), kReducedSize);
for (uint64_t t = 1; t <= kOriginalSamples * 11; t += 1 + t / 100) {
SCOPED_TRACE("t=" + std::to_string(t));
auto orig_s = test.GetProximalSeqnoBeforeTime(t);
auto approx_s = decoded.GetProximalSeqnoBeforeTime(t);
ASSERT_EQ(orig_s == kUnknownSeqnoBeforeAll,
approx_s == kUnknownSeqnoBeforeAll);
ASSERT_EQ(orig_s == kOriginalSamples, approx_s == kOriginalSamples);
constexpr uint32_t kSeqnoFuzz = kOriginalSamples * 3 / 2 / kReducedSize;
EXPECT_GE(approx_s + kSeqnoFuzz, orig_s);
EXPECT_GE(orig_s, approx_s);
}
}
TEST_F(SeqnoTimeTest, EncodeDecodeMinimizeTimeGaps) {
SeqnoToTimeMapping test;
test.SetCapacity(10);
test.Append(1, 10);
test.Append(5, 17);
test.Append(6, 25);
test.Append(8, 30);
std::string output;
SeqnoToTimeMapping(test).SetCapacity(3).EncodeTo(output);
SeqnoToTimeMapping decoded;
ASSERT_OK(decoded.DecodeFrom(output));
ASSERT_TRUE(decoded.TEST_IsEnforced());
ASSERT_EQ(decoded.Size(), 3);
auto seqs = decoded.TEST_GetInternalMapping();
std::deque<SeqnoToTimeMapping::SeqnoTimePair> expected;
expected.emplace_back(1, 10);
expected.emplace_back(5, 17);
expected.emplace_back(8, 30);
ASSERT_EQ(expected, seqs);
test.Append(10, 100);
test.Append(13, 200);
test.Append(40, 250);
test.Append(70, 300);
output.clear();
SeqnoToTimeMapping(test).SetCapacity(4).EncodeTo(output);
decoded.Clear();
ASSERT_OK(decoded.DecodeFrom(output));
ASSERT_TRUE(decoded.TEST_IsEnforced());
ASSERT_EQ(decoded.Size(), 4);
expected.clear();
expected.emplace_back(1, 10);
expected.emplace_back(10, 100);
expected.emplace_back(13, 200);
expected.emplace_back(70, 300);
seqs = decoded.TEST_GetInternalMapping();
ASSERT_EQ(expected, seqs);
}
TEST(PackValueAndSeqnoTest, Basic) {
std::string packed_value_buf;
Slice packed_value_slice =
PackValueAndWriteTime("foo", 30u, &packed_value_buf);
auto [unpacked_value, write_time] =
ParsePackedValueWithWriteTime(packed_value_slice);
ASSERT_EQ(unpacked_value, "foo");
ASSERT_EQ(write_time, 30u);
ASSERT_EQ(ParsePackedValueForValue(packed_value_slice), "foo");
}
TEST(PackValueAndWriteTimeTest, Basic) {
std::string packed_value_buf;
Slice packed_value_slice = PackValueAndSeqno("foo", 30u, &packed_value_buf);
auto [unpacked_value, write_time] =
ParsePackedValueWithSeqno(packed_value_slice);
ASSERT_EQ(unpacked_value, "foo");
ASSERT_EQ(write_time, 30u);
ASSERT_EQ(ParsePackedValueForValue(packed_value_slice), "foo");
}
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}