#include "db/seqno_to_time_mapping.h"
#include <algorithm>
#include <cassert>
#include <cstdint>
#include <deque>
#include <functional>
#include <queue>
#include <vector>
#include "db/version_edit.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
SeqnoToTimeMapping::pair_const_iterator SeqnoToTimeMapping::FindGreaterTime(
uint64_t time) const {
assert(enforced_);
return std::upper_bound(pairs_.cbegin(), pairs_.cend(),
SeqnoTimePair{0, time}, SeqnoTimePair::TimeLess);
}
SeqnoToTimeMapping::pair_const_iterator SeqnoToTimeMapping::FindGreaterEqSeqno(
SequenceNumber seqno) const {
assert(enforced_);
return std::lower_bound(pairs_.cbegin(), pairs_.cend(),
SeqnoTimePair{seqno, 0}, SeqnoTimePair::SeqnoLess);
}
SeqnoToTimeMapping::pair_const_iterator SeqnoToTimeMapping::FindGreaterSeqno(
SequenceNumber seqno) const {
assert(enforced_);
return std::upper_bound(pairs_.cbegin(), pairs_.cend(),
SeqnoTimePair{seqno, 0}, SeqnoTimePair::SeqnoLess);
}
uint64_t SeqnoToTimeMapping::GetProximalTimeBeforeSeqno(
SequenceNumber seqno) const {
assert(enforced_);
auto it = FindGreaterEqSeqno(seqno);
if (it == pairs_.cbegin()) {
return kUnknownTimeBeforeAll;
}
it--;
return it->time;
}
SequenceNumber SeqnoToTimeMapping::GetProximalSeqnoBeforeTime(
uint64_t time) const {
assert(enforced_);
auto it = FindGreaterTime(time);
if (it == pairs_.cbegin()) {
return kUnknownSeqnoBeforeAll;
}
--it;
return it->seqno;
}
void SeqnoToTimeMapping::GetCurrentTieringCutoffSeqnos(
uint64_t current_time, uint64_t preserve_internal_time_seconds,
uint64_t preclude_last_level_data_seconds,
SequenceNumber* preserve_time_min_seqno,
SequenceNumber* preclude_last_level_min_seqno) const {
uint64_t preserve_time_duration = std::max(preserve_internal_time_seconds,
preclude_last_level_data_seconds);
if (preserve_time_duration <= 0) {
return;
}
uint64_t preserve_time = current_time > preserve_time_duration
? current_time - preserve_time_duration
: 0;
if (preserve_time_min_seqno) {
*preserve_time_min_seqno = GetProximalSeqnoBeforeTime(preserve_time) + 1;
}
if (preclude_last_level_data_seconds > 0 && preclude_last_level_min_seqno) {
uint64_t preclude_last_level_time =
current_time > preclude_last_level_data_seconds
? current_time - preclude_last_level_data_seconds
: 0;
*preclude_last_level_min_seqno =
GetProximalSeqnoBeforeTime(preclude_last_level_time) + 1;
}
}
void SeqnoToTimeMapping::EnforceMaxTimeSpan(uint64_t now) {
assert(enforced_); uint64_t cutoff_time;
if (pairs_.size() <= 1) {
return;
}
if (now > 0) {
if (now < max_time_span_) {
return;
}
cutoff_time = now - max_time_span_;
} else {
const auto& last = pairs_.back();
if (last.time < max_time_span_) {
return;
}
cutoff_time = last.time - max_time_span_;
}
while (pairs_.size() >= 2 && pairs_[0].time <= cutoff_time &&
pairs_[1].time <= cutoff_time) {
pairs_.pop_front();
}
}
void SeqnoToTimeMapping::EnforceCapacity(bool strict) {
assert(enforced_); uint64_t strict_cap = capacity_;
if (strict_cap == 0) {
pairs_.clear();
return;
}
if (strict_cap == 1) {
strict_cap = 2;
}
uint64_t effective_cap = strict_cap + (strict ? 0 : strict_cap / 8);
if (effective_cap < strict_cap) {
effective_cap = UINT64_MAX;
}
if (pairs_.size() <= effective_cap) {
return;
}
assert(pairs_.size() >= 3);
size_t to_remove_count = pairs_.size() - strict_cap;
struct RemovalCandidate {
uint64_t new_time_gap;
std::deque<SeqnoTimePair>::iterator it;
RemovalCandidate(uint64_t _new_time_gap,
std::deque<SeqnoTimePair>::iterator _it)
: new_time_gap(_new_time_gap), it(_it) {}
bool operator>(const RemovalCandidate& other) const {
if (new_time_gap == other.new_time_gap) {
return it->seqno > other.it->seqno;
}
return new_time_gap > other.new_time_gap;
}
};
using RC = RemovalCandidate;
using PQ = std::priority_queue<RC, std::vector<RC>, std::greater<RC>>;
PQ pq;
{
auto it = pairs_.begin();
assert(it->time != kUnknownTimeBeforeAll);
uint64_t prev_prev_time = it->time;
++it;
assert(it->time != kUnknownTimeBeforeAll);
auto prev_it = it;
++it;
while (it != pairs_.end()) {
assert(it->time != kUnknownTimeBeforeAll);
uint64_t gap = it->time - prev_prev_time;
pq.emplace(gap, prev_it);
prev_prev_time = prev_it->time;
prev_it = it;
++it;
}
}
while (to_remove_count > 0) {
assert(!pq.empty());
auto rc = pq.top();
pq.pop();
auto it = rc.it + 1;
uint64_t after_time = it->time;
while (after_time == kUnknownTimeBeforeAll) {
assert(it != pairs_.end());
++it;
after_time = it->time;
}
it = rc.it - 1;
uint64_t before_time = it->time;
while (before_time == kUnknownTimeBeforeAll) {
assert(it != pairs_.begin());
--it;
before_time = it->time;
}
if (rc.new_time_gap == after_time - before_time) {
rc.it->time = kUnknownTimeBeforeAll;
--to_remove_count;
} else {
pq.emplace(after_time - before_time, rc.it);
}
}
auto from_it = pairs_.begin();
auto to_it = from_it;
for (; from_it != pairs_.end(); ++from_it) {
if (from_it->time != kUnknownTimeBeforeAll) {
if (from_it != to_it) {
*to_it = *from_it;
}
++to_it;
}
}
pairs_.erase(to_it, pairs_.end());
assert(pairs_.size() == strict_cap);
}
bool SeqnoToTimeMapping::SeqnoTimePair::Merge(const SeqnoTimePair& other) {
assert(seqno <= other.seqno);
if (seqno == other.seqno) {
time = std::min(time, other.time);
return true;
} else if (time == other.time) {
seqno = std::max(seqno, other.seqno);
return true;
} else if (time > other.time) {
assert(seqno < other.seqno);
*this = other;
return true;
} else {
return false;
}
}
void SeqnoToTimeMapping::SortAndMerge() {
assert(!enforced_);
if (!pairs_.empty()) {
std::sort(pairs_.begin(), pairs_.end());
auto from_it = pairs_.begin();
auto to_it = from_it;
for (++from_it; from_it != pairs_.end(); ++from_it) {
if (to_it->Merge(*from_it)) {
} else {
*++to_it = *from_it;
}
}
pairs_.erase(to_it + 1, pairs_.end());
}
enforced_ = true;
}
SeqnoToTimeMapping& SeqnoToTimeMapping::SetMaxTimeSpan(uint64_t max_time_span) {
max_time_span_ = max_time_span;
if (enforced_) {
EnforceMaxTimeSpan();
}
return *this;
}
SeqnoToTimeMapping& SeqnoToTimeMapping::SetCapacity(uint64_t capacity) {
capacity_ = capacity;
if (enforced_) {
EnforceCapacity(true);
}
return *this;
}
SeqnoToTimeMapping& SeqnoToTimeMapping::Enforce(uint64_t now) {
if (!enforced_) {
SortAndMerge();
assert(enforced_);
EnforceMaxTimeSpan(now);
} else if (now > 0) {
EnforceMaxTimeSpan(now);
}
EnforceCapacity(true);
return *this;
}
void SeqnoToTimeMapping::AddUnenforced(SequenceNumber seqno, uint64_t time) {
if (seqno == 0) {
return;
}
enforced_ = false;
pairs_.emplace_back(seqno, time);
}
void SeqnoToTimeMapping::EncodeTo(std::string& dest) const {
assert(enforced_);
if (pairs_.empty()) {
return;
}
PutVarint64(&dest, pairs_.size());
SeqnoTimePair base;
for (auto& cur : pairs_) {
assert(base < cur);
SeqnoTimePair val = cur.ComputeDelta(base);
base = cur;
val.Encode(dest);
}
}
namespace {
Status DecodeImpl(Slice& input,
std::deque<SeqnoToTimeMapping::SeqnoTimePair>& pairs) {
if (input.empty()) {
return Status::OK();
}
uint64_t count;
if (!GetVarint64(&input, &count)) {
return Status::Corruption("Invalid sequence number time size");
}
SeqnoToTimeMapping::SeqnoTimePair base;
for (uint64_t i = 0; i < count; i++) {
SeqnoToTimeMapping::SeqnoTimePair val;
Status s = val.Decode(input);
if (!s.ok()) {
return s;
}
val.ApplyDelta(base);
pairs.emplace_back(val);
base = val;
}
if (!input.empty()) {
return Status::Corruption(
"Extra bytes at end of sequence number time mapping");
}
return Status::OK();
}
}
Status SeqnoToTimeMapping::DecodeFrom(const std::string& pairs_str) {
size_t orig_size = pairs_.size();
Slice input(pairs_str);
Status s = DecodeImpl(input, pairs_);
if (!s.ok()) {
pairs_.resize(orig_size);
} else if (orig_size > 0 || max_time_span_ < UINT64_MAX ||
capacity_ < UINT64_MAX) {
enforced_ = false;
}
return s;
}
void SeqnoToTimeMapping::SeqnoTimePair::Encode(std::string& dest) const {
PutVarint64Varint64(&dest, seqno, time);
}
Status SeqnoToTimeMapping::SeqnoTimePair::Decode(Slice& input) {
if (!GetVarint64(&input, &seqno)) {
return Status::Corruption("Invalid sequence number");
}
if (!GetVarint64(&input, &time)) {
return Status::Corruption("Invalid time");
}
return Status::OK();
}
void SeqnoToTimeMapping::CopyFromSeqnoRange(const SeqnoToTimeMapping& src,
SequenceNumber from_seqno,
SequenceNumber to_seqno) {
bool orig_empty = Empty();
auto src_it = src.FindGreaterEqSeqno(from_seqno);
auto src_it_end =
to_seqno < from_seqno ? src_it : src.FindGreaterSeqno(to_seqno);
if (src_it != src.pairs_.begin()) {
--src_it;
}
assert(src_it <= src_it_end);
std::copy(src_it, src_it_end, std::back_inserter(pairs_));
if (!orig_empty || max_time_span_ < UINT64_MAX || capacity_ < UINT64_MAX) {
enforced_ = false;
}
}
bool SeqnoToTimeMapping::Append(SequenceNumber seqno, uint64_t time) {
if (capacity_ == 0) {
return false;
}
bool added = false;
if (seqno == 0) {
} else if (pairs_.empty()) {
enforced_ = true;
pairs_.emplace_back(seqno, time);
return true;
} else {
auto& last = pairs_.back();
if (last.seqno <= seqno) {
bool merged = last.Merge({seqno, time});
if (!merged) {
if (enforced_ && (seqno <= last.seqno || time <= last.time)) {
assert(false);
} else {
pairs_.emplace_back(seqno, time);
added = true;
}
}
} else if (!enforced_) {
pairs_.emplace_back(seqno, time);
added = true;
} else {
assert(false);
}
}
if (!enforced_) {
SortAndMerge();
assert(enforced_);
}
EnforceMaxTimeSpan();
EnforceCapacity(false);
return added;
}
void SeqnoToTimeMapping::PrePopulate(SequenceNumber from_seqno,
SequenceNumber to_seqno,
uint64_t from_time, uint64_t to_time) {
assert(Empty());
assert(from_seqno > 0);
assert(to_seqno > from_seqno);
assert(from_time > kUnknownTimeBeforeAll);
assert(to_time >= from_time);
for (auto i = from_seqno; i <= to_seqno; i++) {
uint64_t t = from_time + (to_time - from_time) * (i - from_seqno) /
(to_seqno - from_seqno);
pairs_.emplace_back(i, t);
}
}
std::string SeqnoToTimeMapping::ToHumanString() const {
std::string ret;
for (const auto& seq_time : pairs_) {
AppendNumberTo(&ret, seq_time.seqno);
ret.append("->");
AppendNumberTo(&ret, seq_time.time);
ret.append(",");
}
return ret;
}
Slice PackValueAndWriteTime(const Slice& value, uint64_t unix_write_time,
std::string* buf) {
buf->assign(value.data(), value.size());
PutFixed64(buf, unix_write_time);
return Slice(*buf);
}
Slice PackValueAndSeqno(const Slice& value, SequenceNumber seqno,
std::string* buf) {
buf->assign(value.data(), value.size());
PutFixed64(buf, seqno);
return Slice(*buf);
}
uint64_t ParsePackedValueForWriteTime(const Slice& value) {
assert(value.size() >= sizeof(uint64_t));
Slice write_time_slice(value.data() + value.size() - sizeof(uint64_t),
sizeof(uint64_t));
uint64_t write_time;
[[maybe_unused]] auto res = GetFixed64(&write_time_slice, &write_time);
assert(res);
return write_time;
}
std::tuple<Slice, uint64_t> ParsePackedValueWithWriteTime(const Slice& value) {
return std::make_tuple(Slice(value.data(), value.size() - sizeof(uint64_t)),
ParsePackedValueForWriteTime(value));
}
SequenceNumber ParsePackedValueForSeqno(const Slice& value) {
assert(value.size() >= sizeof(SequenceNumber));
Slice seqno_slice(value.data() + value.size() - sizeof(uint64_t),
sizeof(uint64_t));
SequenceNumber seqno;
[[maybe_unused]] auto res = GetFixed64(&seqno_slice, &seqno);
assert(res);
return seqno;
}
std::tuple<Slice, SequenceNumber> ParsePackedValueWithSeqno(
const Slice& value) {
return std::make_tuple(
Slice(value.data(), value.size() - sizeof(SequenceNumber)),
ParsePackedValueForSeqno(value));
}
Slice ParsePackedValueForValue(const Slice& value) {
assert(value.size() >= sizeof(uint64_t));
return Slice(value.data(), value.size() - sizeof(uint64_t));
}
}