#include "tasks/batch_write_task.h"
#include <algorithm>
#include <cassert>
#include <limits>
#include <memory>
#include "coding.h"
#include "compression.h"
#include "storage/shard.h"
#include "tasks/task.h"
#include "utils.h"
#include "write_tree_stack.h"
namespace eloqstore
{
BatchWriteTask::BatchWriteTask()
: idx_page_builder_(Options()), data_page_builder_(Options())
{
overflow_ptrs_.reserve(Options()->overflow_pointers * 4);
}
KvError BatchWriteTask::SeekStack(std::string_view search_key)
{
const Comparator *cmp = shard->IndexManager()->GetComparator();
auto entry_contains = [](std::string_view start,
std::string_view end,
std::string_view search_key,
const Comparator *cmp)
{
return (start.empty() || cmp->Compare(search_key, start) >= 0) &&
(end.empty() || cmp->Compare(search_key, end) < 0);
};
while (stack_.size() > 1)
{
IndexPageIter &idx_iter = stack_.back()->idx_page_iter_;
if (idx_iter.HasNext())
{
idx_iter.Next();
std::string_view idx_entry_start = idx_iter.Key();
std::string idx_entry_end = RightBound(false);
if (entry_contains(idx_entry_start, idx_entry_end, search_key, cmp))
{
break;
}
else
{
auto [_, err] = Pop();
CHECK_KV_ERR(err);
}
}
else
{
auto [_, err] = Pop();
CHECK_KV_ERR(err);
}
}
return KvError::NoError;
}
std::pair<PageId, KvError> BatchWriteTask::Seek(std::string_view key)
{
if (!stack_.back()->handle_)
{
stack_.back()->is_leaf_index_ = true;
return {MaxPageId, KvError::NoError};
}
while (true)
{
IndexStackEntry *idx_entry = stack_.back().get();
IndexPageIter &idx_iter = idx_entry->idx_page_iter_;
idx_iter.Seek(key);
PageId page_id = idx_iter.GetPageId();
assert(page_id != MaxPageId);
if (idx_entry->handle_->IsPointingToLeaf())
{
break;
}
assert(!stack_.back()->is_leaf_index_);
auto [handle, err] = shard->IndexManager()->FindPage(
cow_meta_.mapper_->GetMapping(), page_id);
if (err != KvError::NoError)
{
return {MaxPageId, err};
}
stack_.emplace_back(
std::make_unique<IndexStackEntry>(std::move(handle), Options()));
}
stack_.back()->is_leaf_index_ = true;
return {stack_.back()->idx_page_iter_.GetPageId(), KvError::NoError};
}
inline DataPage *BatchWriteTask::TripleElement(uint8_t idx)
{
return leaf_triple_[idx].IsEmpty() ? nullptr : &leaf_triple_[idx];
}
KvError BatchWriteTask::LoadTripleElement(uint8_t idx, PageId page_id)
{
if (TripleElement(idx))
{
return KvError::NoError;
}
assert(page_id != MaxPageId);
auto [page, err] = LoadDataPage(page_id);
CHECK_KV_ERR(err);
leaf_triple_[idx] = std::move(page);
return KvError::NoError;
}
KvError BatchWriteTask::ShiftLeafLink()
{
if (TripleElement(0))
{
KvError err = WritePage(std::move(leaf_triple_[0]));
CHECK_KV_ERR(err);
}
leaf_triple_[0] = std::move(leaf_triple_[1]);
return KvError::NoError;
}
void BatchWriteTask::LeafLinkUpdate(DataPage &&page)
{
page.SetNextPageId(applying_page_.NextPageId());
page.SetPrevPageId(applying_page_.PrevPageId());
leaf_triple_[1] = std::move(page);
}
KvError BatchWriteTask::LeafLinkInsert(DataPage &&page)
{
assert(!TripleElement(1));
leaf_triple_[1] = std::move(page);
DataPage &new_elem = leaf_triple_[1];
DataPage *prev_page = TripleElement(0);
if (prev_page == nullptr)
{
assert(stack_.back()->idx_page_iter_.GetPageId() == MaxPageId);
new_elem.SetNextPageId(MaxPageId);
new_elem.SetPrevPageId(MaxPageId);
return KvError::NoError;
}
assert(stack_.back()->idx_page_iter_.GetPageId() == MaxPageId ||
prev_page->NextPageId() == applying_page_.NextPageId());
if (prev_page->NextPageId() != MaxPageId)
{
KvError err = LoadTripleElement(2, prev_page->NextPageId());
CHECK_KV_ERR(err);
TripleElement(2)->SetPrevPageId(new_elem.GetPageId());
}
new_elem.SetPrevPageId(prev_page->GetPageId());
new_elem.SetNextPageId(prev_page->NextPageId());
prev_page->SetNextPageId(new_elem.GetPageId());
return KvError::NoError;
}
KvError BatchWriteTask::LeafLinkDelete()
{
if (applying_page_.PrevPageId() != MaxPageId)
{
KvError err = LoadTripleElement(0, applying_page_.PrevPageId());
CHECK_KV_ERR(err);
TripleElement(0)->SetNextPageId(applying_page_.NextPageId());
}
if (applying_page_.NextPageId() != MaxPageId)
{
assert(!TripleElement(2));
KvError err = LoadTripleElement(2, applying_page_.NextPageId());
CHECK_KV_ERR(err);
TripleElement(2)->SetPrevPageId(applying_page_.PrevPageId());
}
return KvError::NoError;
}
void BatchWriteTask::Reset(const TableIdent &tbl_id)
{
WriteTask::Reset(tbl_id);
stack_.clear();
ttl_batch_.clear();
idx_page_builder_.Reset();
data_page_builder_.Reset();
overflow_ptrs_.clear();
applying_page_.Clear();
for (DataPage &page : leaf_triple_)
{
page.Clear();
}
}
bool BatchWriteTask::SetBatch(std::span<WriteDataEntry> entries)
{
#ifndef NDEBUG
const Comparator *cmp = Comp();
if (entries.size() > 1)
{
for (uint64_t i = 1; i < entries.size(); i++)
{
if (cmp->Compare(entries[i - 1].key_, entries[i].key_) >= 0)
{
assert(false);
}
}
}
uint16_t max_key_len = Options()->data_page_size >> 1;
for (WriteDataEntry &ent : entries)
{
if (ent.key_.empty() || ent.key_.size() > max_key_len)
{
assert(false);
}
}
#endif
data_batch_ = entries;
return true;
}
void BatchWriteTask::Abort()
{
WriteTask::Abort();
stack_.clear();
for (DataPage &page : leaf_triple_)
{
page.Clear();
}
ttl_batch_.clear();
}
KvError BatchWriteTask::Apply()
{
YieldToLowPQ();
KvError err = shard->IndexManager()->MakeCowRoot(tbl_ident_, cow_meta_);
cow_meta_.compression_->SampleAndBuildDictionaryIfNeeded(data_batch_);
CHECK_KV_ERR(err);
err = ApplyBatch(cow_meta_.root_id_, true);
if (err != KvError::NoError)
{
(void) WaitWrite();
return err;
}
err = ApplyTTLBatch();
if (err != KvError::NoError)
{
(void) WaitWrite();
return err;
}
err = UpdateMeta();
CHECK_KV_ERR(err);
TriggerTTL();
return KvError::NoError;
}
KvError BatchWriteTask::ApplyTTLBatch()
{
if (!ttl_batch_.empty())
{
std::sort(ttl_batch_.begin(), ttl_batch_.end());
for (size_t i = 1; i < ttl_batch_.size(); i++)
{
assert(ttl_batch_[i - 1].key_ != ttl_batch_[i].key_);
}
SetBatch(ttl_batch_);
KvError err = ApplyBatch(cow_meta_.ttl_root_id_, false);
ttl_batch_.clear();
return err;
}
else
{
return KvError::NoError;
}
}
KvError BatchWriteTask::ApplyBatch(PageId &root_id,
bool update_ttl,
uint64_t now_ts)
{
do_update_ttl_ = update_ttl;
assert(!update_ttl || ttl_batch_.empty());
if (root_id != MaxPageId)
{
auto [handle, err] = shard->IndexManager()->FindPage(
cow_meta_.old_mapping_.Get(), root_id);
CHECK_KV_ERR(err);
stack_.emplace_back(
std::make_unique<IndexStackEntry>(std::move(handle), Options()));
}
else
{
stack_.emplace_back(std::make_unique<IndexStackEntry>(
MemIndexPage::Handle(), Options()));
}
KvError err;
size_t cidx = 0;
const uint64_t now_ms =
now_ts != 0 ? now_ts : utils::UnixTs<chrono::milliseconds>();
while (cidx < data_batch_.size())
{
std::string_view batch_start_key = {data_batch_[cidx].key_.data(),
data_batch_[cidx].key_.size()};
if (stack_.size() > 1)
{
err = SeekStack(batch_start_key);
CHECK_KV_ERR(err);
}
auto [page_id, err] = Seek(batch_start_key);
CHECK_KV_ERR(err);
if (page_id != MaxPageId)
{
err = LoadApplyingPage(page_id);
CHECK_KV_ERR(err);
}
err = ApplyOnePage(cidx, now_ms);
CHECK_KV_ERR(err);
}
assert(TripleElement(2) == nullptr);
err = ShiftLeafLink();
CHECK_KV_ERR(err);
err = ShiftLeafLink();
CHECK_KV_ERR(err);
assert(!stack_.empty());
MemIndexPage::Handle new_root;
while (!stack_.empty())
{
auto [new_handle, err] = Pop();
CHECK_KV_ERR(err);
new_root = std::move(new_handle);
}
root_id = new_root ? new_root->GetPageId() : MaxPageId;
return KvError::NoError;
}
KvError BatchWriteTask::LoadApplyingPage(PageId page_id)
{
assert(page_id != MaxPageId);
if (TripleElement(1) && TripleElement(1)->GetPageId() == page_id)
{
applying_page_.Clear();
applying_page_ = std::move(*TripleElement(1));
}
else
{
auto [page, err] = LoadDataPage(page_id);
CHECK_KV_ERR(err);
applying_page_ = std::move(page);
}
assert(TypeOfPage(applying_page_.PagePtr()) == PageType::Data);
if (TripleElement(1))
{
assert(TripleElement(1)->GetPageId() != applying_page_.GetPageId());
KvError err = ShiftLeafLink();
CHECK_KV_ERR(err);
}
if (TripleElement(0) &&
TripleElement(0)->GetPageId() != applying_page_.PrevPageId())
{
KvError err = ShiftLeafLink();
CHECK_KV_ERR(err);
}
return KvError::NoError;
}
KvError BatchWriteTask::ApplyOnePage(size_t &cidx, uint64_t now_ms)
{
assert(!stack_.empty());
KvError err;
DataPage *base_page = nullptr;
std::string_view page_left_bound{};
std::string page_right_key;
std::string_view page_right_bound{};
if (stack_.back()->idx_page_iter_.GetPageId() != MaxPageId)
{
assert(stack_.back()->idx_page_iter_.GetPageId() ==
applying_page_.GetPageId());
base_page = &applying_page_;
page_left_bound = LeftBound(true);
page_right_key = RightBound(true);
page_right_bound = {page_right_key.data(), page_right_key.size()};
}
const Comparator *cmp = shard->IndexManager()->GetComparator();
compression::DictCompression *compression = cow_meta_.compression_.get();
DataPageIter base_page_iter{base_page, Options()};
bool is_base_iter_valid = false;
AdvanceDataPageIter(base_page_iter, is_base_iter_valid);
data_page_builder_.Reset();
assert(cidx < data_batch_.size());
std::string_view change_key = {data_batch_[cidx].key_.data(),
data_batch_[cidx].key_.size()};
assert(cmp->Compare(page_left_bound, change_key) <= 0);
assert(page_right_bound.empty() ||
cmp->Compare(page_left_bound, page_right_bound) < 0);
auto change_it = data_batch_.begin() + cidx;
auto change_end_it = std::lower_bound(
change_it,
data_batch_.end(),
page_right_bound,
[&](const WriteDataEntry &change_item, std::string_view key)
{
if (key.empty())
{
return true;
}
std::string_view ckey{change_item.key_.data(),
change_item.key_.size()};
return cmp->Compare(ckey, key) < 0;
});
std::string prev_key;
std::string_view page_key = stack_.back()->idx_page_iter_.Key();
std::string curr_page_key{page_key.data(), page_key.size()};
PageId page_id = MaxPageId;
if (base_page != nullptr)
{
page_id = base_page->GetPageId();
assert(page_key <= base_page_iter.Key());
}
std::string compression_scratch;
compression::CompressionType compression_type =
compression::CompressionType::None;
auto add_to_page = utils::MakeYCombinator(
[&](auto &&self,
std::string_view key,
std::string_view val,
bool is_ptr,
uint64_t ts,
uint64_t expire_ts) -> KvError
{
if (expire_ts != 0 && expire_ts <= now_ms)
{
if (is_ptr)
{
err = DelOverflowValue(val);
CHECK_KV_ERR(err);
}
UpdateTTL(expire_ts, key, WriteOp::Delete);
return KvError::NoError;
}
bool success = data_page_builder_.Add(
key, val, is_ptr, ts, expire_ts, compression_type);
if (!success)
{
if (!is_ptr && DataPageBuilder::IsOverflowKV(
key, val.size(), ts, expire_ts, Options()))
{
KvError err = WriteOverflowValue(val);
CHECK_KV_ERR(err);
return self(key, overflow_ptrs_, true, ts, expire_ts);
}
KvError err = FinishDataPage(std::move(curr_page_key), page_id);
CHECK_KV_ERR(err);
curr_page_key = cmp->FindShortestSeparator(
{prev_key.data(), prev_key.size()}, key);
assert(!prev_key.empty() && prev_key < curr_page_key);
data_page_builder_.Reset();
success = data_page_builder_.Add(
key, val, is_ptr, ts, expire_ts, compression_type);
assert(success);
page_id = MaxPageId;
}
assert(curr_page_key <= key);
prev_key = key;
return KvError::NoError;
});
while (is_base_iter_valid && change_it != change_end_it)
{
std::string_view base_key = base_page_iter.Key();
bool add_change_key = false;
std::string_view base_val = base_page_iter.Value();
uint64_t base_ts = base_page_iter.Timestamp();
bool is_overflow_ptr = false;
change_key = {change_it->key_.data(), change_it->key_.size()};
std::string_view change_val = {change_it->val_.data(),
change_it->val_.size()};
uint64_t change_ts = change_it->timestamp_;
enum struct AdvanceType
{
PageIter,
Changes,
Both
};
std::string_view new_key;
std::string_view new_val;
uint64_t new_ts;
uint64_t expire_ts;
AdvanceType adv_type;
int cmp_ret = cmp->Compare(base_key, change_key);
if (cmp_ret < 0)
{
new_key = base_key;
new_val = base_val;
new_ts = base_ts;
is_overflow_ptr = base_page_iter.IsOverflow();
expire_ts = base_page_iter.ExpireTs();
adv_type = AdvanceType::PageIter;
}
else if (cmp_ret == 0)
{
adv_type = AdvanceType::Both;
if (change_ts >= base_ts)
{
if (base_page_iter.IsOverflow())
{
err = DelOverflowValue(base_val);
CHECK_KV_ERR(err);
}
const uint64_t base_expire = base_page_iter.ExpireTs();
expire_ts = change_it->expire_ts_;
if (change_it->op_ == WriteOp::Delete)
{
new_key = std::string_view{};
UpdateTTL(base_expire, base_key, WriteOp::Delete);
assert(expire_ts == 0 || expire_ts == base_expire);
}
else if (expire_ts != 0 && expire_ts <= now_ms)
{
new_key = std::string_view{};
UpdateTTL(base_expire, base_key, WriteOp::Delete);
}
else
{
new_key = change_key;
new_val = change_val;
new_ts = change_ts;
add_change_key = true;
if (base_expire != expire_ts)
{
UpdateTTL(base_expire, base_key, WriteOp::Delete);
UpdateTTL(expire_ts, new_key, WriteOp::Upsert);
}
}
}
else
{
new_key = base_key;
new_val = base_val;
new_ts = base_ts;
is_overflow_ptr = base_page_iter.IsOverflow();
expire_ts = base_page_iter.ExpireTs();
}
}
else
{
adv_type = AdvanceType::Changes;
if (change_it->op_ == WriteOp::Delete ||
(change_it->expire_ts_ != 0 && change_it->expire_ts_ <= now_ms))
{
new_key = std::string_view{};
}
else
{
new_key = change_key;
new_val = change_val;
new_ts = change_ts;
add_change_key = true;
expire_ts = change_it->expire_ts_;
UpdateTTL(expire_ts, new_key, WriteOp::Upsert);
}
}
if (!new_key.empty())
{
if (add_change_key)
{
if (Options()->enable_compression)
{
compression::PreparedValue prepared = compression::Prepare(
new_val, compression, compression_scratch);
new_val = prepared.data;
compression_type = prepared.compression_kind;
}
else
{
compression_type = compression::CompressionType::None;
}
}
else
{
compression_type = base_page_iter.CompressionType();
}
err = add_to_page(
new_key, new_val, is_overflow_ptr, new_ts, expire_ts);
CHECK_KV_ERR(err);
}
switch (adv_type)
{
case AdvanceType::PageIter:
AdvanceDataPageIter(base_page_iter, is_base_iter_valid);
break;
case AdvanceType::Changes:
++change_it;
break;
default:
AdvanceDataPageIter(base_page_iter, is_base_iter_valid);
++change_it;
break;
}
}
while (is_base_iter_valid)
{
std::string_view key = base_page_iter.Key();
std::string_view val = base_page_iter.Value();
bool overflow = base_page_iter.IsOverflow();
uint64_t ts = base_page_iter.Timestamp();
uint64_t expire_ts = base_page_iter.ExpireTs();
compression_type = base_page_iter.CompressionType();
err = add_to_page(key, val, overflow, ts, expire_ts);
CHECK_KV_ERR(err);
AdvanceDataPageIter(base_page_iter, is_base_iter_valid);
}
while (change_it != change_end_it)
{
if (change_it->op_ == WriteOp::Upsert &&
(change_it->expire_ts_ == 0 || change_it->expire_ts_ > now_ms))
{
std::string_view key{change_it->key_.data(),
change_it->key_.size()};
std::string_view val{change_it->val_.data(),
change_it->val_.size()};
uint64_t ts = change_it->timestamp_;
uint64_t expire_ts = change_it->expire_ts_;
UpdateTTL(expire_ts, key, WriteOp::Upsert);
if (Options()->enable_compression)
{
compression::PreparedValue prepared =
compression::Prepare(val, compression, compression_scratch);
val = prepared.data;
compression_type = prepared.compression_kind;
}
else
{
compression_type = compression::CompressionType::None;
}
err = add_to_page(key, val, false, ts, expire_ts);
CHECK_KV_ERR(err);
}
else
{
}
++change_it;
}
if (data_page_builder_.IsEmpty())
{
if (base_page)
{
err = LeafLinkDelete();
CHECK_KV_ERR(err);
FreePage(applying_page_.GetPageId());
assert(stack_.back()->changes_.empty() ||
stack_.back()->changes_.back().key_ < curr_page_key);
stack_.back()->changes_.emplace_back(
std::move(curr_page_key), page_id, WriteOp::Delete);
}
}
else
{
err = FinishDataPage(std::move(curr_page_key), page_id);
CHECK_KV_ERR(err);
}
assert(!TripleElement(1));
leaf_triple_[1] = std::move(leaf_triple_[2]);
cidx = cidx + std::distance(data_batch_.begin() + cidx, change_end_it);
return KvError::NoError;
}
std::pair<MemIndexPage::Handle, KvError> BatchWriteTask::Pop()
{
if (stack_.empty())
{
return {MemIndexPage::Handle(), KvError::NoError};
}
IndexStackEntry *stack_entry = stack_.back().get();
if (stack_entry->changes_.empty())
{
MemIndexPage::Handle handle = std::move(stack_entry->handle_);
stack_.pop_back();
return {std::move(handle), KvError::NoError};
}
idx_page_builder_.Reset();
const Comparator *cmp = shard->IndexManager()->GetComparator();
std::vector<IndexOp> &changes = stack_entry->changes_;
bool is_leaf_index = stack_entry->is_leaf_index_;
IndexPageIter base_page_iter{stack_entry->handle_, Options()};
bool is_base_iter_valid = false;
AdvanceIndexPageIter(base_page_iter, is_base_iter_valid);
idx_page_builder_.Reset();
auto cit = changes.begin();
KvError err;
MemIndexPage::Handle prev_handle;
std::string prev_key;
PageId prev_page_id = MaxPageId;
std::string_view page_key =
stack_.size() == 1 ? std::string_view{}
: stack_[stack_.size() - 2]->idx_page_iter_.Key();
std::string curr_page_key{page_key};
if (stack_entry->handle_)
{
prev_page_id = stack_entry->handle_->GetPageId();
}
auto add_to_page = [&](std::string_view new_key,
uint32_t new_page_id) -> KvError
{
bool success =
idx_page_builder_.Add(new_key, new_page_id, is_leaf_index);
if (!success)
{
err = FinishIndexPage(
prev_handle, prev_key, prev_page_id, std::move(curr_page_key));
CHECK_KV_ERR(err);
curr_page_key = new_key;
idx_page_builder_.Reset();
idx_page_builder_.Add(
std::string_view{}, new_page_id, is_leaf_index);
}
return KvError::NoError;
};
while (is_base_iter_valid && cit != changes.end())
{
std::string_view base_key = base_page_iter.Key();
uint32_t base_page_id = base_page_iter.GetPageId();
std::string_view change_key{cit->key_.data(), cit->key_.size()};
uint32_t change_page = cit->page_id_;
int cmp_ret = cmp->Compare(base_key, change_key);
enum struct AdvanceType
{
PageIter,
Changes,
Both
};
std::string_view new_key;
uint32_t new_page_id;
AdvanceType adv_type;
if (cmp_ret < 0)
{
new_key = base_key;
new_page_id = base_page_id;
adv_type = AdvanceType::PageIter;
}
else if (cmp_ret == 0)
{
adv_type = AdvanceType::Both;
if (cit->op_ == WriteOp::Delete)
{
new_key = std::string_view{};
new_page_id = MaxPageId;
}
else
{
new_key = change_key;
new_page_id = change_page;
}
}
else
{
assert(cit->op_ == WriteOp::Upsert);
adv_type = AdvanceType::Changes;
new_key = change_key;
new_page_id = change_page;
}
if (!new_key.empty() || new_page_id != MaxPageId)
{
KvError err = add_to_page(new_key, new_page_id);
if (err != KvError::NoError)
{
return {MemIndexPage::Handle(), err};
}
}
switch (adv_type)
{
case AdvanceType::PageIter:
AdvanceIndexPageIter(base_page_iter, is_base_iter_valid);
break;
case AdvanceType::Changes:
cit++;
break;
default:
AdvanceIndexPageIter(base_page_iter, is_base_iter_valid);
cit++;
break;
}
}
while (is_base_iter_valid)
{
std::string_view new_key = base_page_iter.Key();
uint32_t new_page_id = base_page_iter.GetPageId();
KvError err = add_to_page(new_key, new_page_id);
if (err != KvError::NoError)
{
return {MemIndexPage::Handle(), err};
}
AdvanceIndexPageIter(base_page_iter, is_base_iter_valid);
}
while (cit != changes.end())
{
if (cit->op_ != WriteOp::Delete)
{
std::string_view new_key{cit->key_.data(), cit->key_.size()};
uint32_t new_page = cit->page_id_;
KvError err = add_to_page(new_key, new_page);
if (err != KvError::NoError)
{
return {MemIndexPage::Handle(), err};
}
}
++cit;
}
MemIndexPage::Handle new_root;
if (idx_page_builder_.IsEmpty())
{
FreePage(stack_.back()->handle_->GetPageId());
if (stack_.size() > 1)
{
IndexStackEntry *parent = stack_[stack_.size() - 2].get();
std::string_view page_key = parent->idx_page_iter_.Key();
parent->changes_.emplace_back(
std::string(page_key), prev_page_id, WriteOp::Delete);
}
}
else
{
bool splited = static_cast<bool>(prev_handle);
err = FinishIndexPage(
prev_handle, prev_key, prev_page_id, std::move(curr_page_key));
if (err != KvError::NoError)
{
return {MemIndexPage::Handle(), err};
}
err = FlushIndexPage(
prev_handle, std::move(prev_key), prev_page_id, splited);
if (err != KvError::NoError)
{
return {MemIndexPage::Handle(), err};
}
if (!splited)
{
new_root = std::move(prev_handle);
}
}
stack_.pop_back();
return {std::move(new_root), KvError::NoError};
}
KvError BatchWriteTask::FinishIndexPage(MemIndexPage::Handle &prev_handle,
std::string &prev_key,
PageId &prev_page_id,
std::string cur_page_key)
{
assert(!idx_page_builder_.IsEmpty());
const uint16_t cur_page_len = idx_page_builder_.CurrentSizeEstimate();
std::string_view page_view = idx_page_builder_.Finish();
if (prev_handle)
{
const uint16_t one_quarter = Options()->data_page_size >> 2;
const uint16_t three_quarter = Options()->data_page_size - one_quarter;
if (cur_page_len < one_quarter && prev_handle &&
prev_handle->RestartNum() > 1 &&
prev_handle->ContentLength() > three_quarter)
{
page_view = Redistribute(prev_handle, page_view, cur_page_key);
}
KvError err = FlushIndexPage(
prev_handle, std::move(prev_key), prev_page_id, true);
CHECK_KV_ERR(err);
prev_handle = MemIndexPage::Handle();
prev_page_id = MaxPageId;
}
MemIndexPage *cur_page = shard->IndexManager()->AllocIndexPage();
if (cur_page == nullptr)
{
return KvError::OutOfMem;
}
memcpy(cur_page->PagePtr(), page_view.data(), page_view.size());
prev_handle = MemIndexPage::Handle(cur_page);
prev_key = std::move(cur_page_key);
return KvError::NoError;
}
KvError BatchWriteTask::FlushIndexPage(MemIndexPage::Handle &idx_page,
std::string idx_page_key,
PageId page_id,
bool split)
{
idx_page->SetPageId(page_id);
KvError err = WritePage(idx_page);
CHECK_KV_ERR(err);
if (split && (page_id == MaxPageId || stack_.size() == 1))
{
assert(stack_.size() >= 1);
if (stack_.size() == 1)
{
stack_.emplace(stack_.begin(),
std::make_unique<IndexStackEntry>(
MemIndexPage::Handle(), Options()));
}
IndexStackEntry *parent_entry = stack_[stack_.size() - 2].get();
parent_entry->changes_.emplace_back(
std::move(idx_page_key), idx_page->GetPageId(), WriteOp::Upsert);
}
return KvError::NoError;
}
KvError BatchWriteTask::FinishDataPage(std::string page_key, PageId page_id)
{
const uint16_t cur_page_len = data_page_builder_.CurrentSizeEstimate();
std::string_view page_view = data_page_builder_.Finish();
if (page_id == MaxPageId)
{
page_id = cow_meta_.mapper_->GetPage();
DataPage new_data_page(false);
new_data_page.SetPageId(page_id);
const uint16_t one_quarter = Options()->data_page_size >> 2;
const uint16_t three_quarter = Options()->data_page_size - one_quarter;
DataPage *prev_page = TripleElement(0);
if (cur_page_len < one_quarter && prev_page != nullptr &&
prev_page->RestartNum() > 1 &&
prev_page->ContentLength() > three_quarter)
{
Page page = Redistribute(*prev_page, page_view);
new_data_page.SetPage(std::move(page));
DataPageIter iter(&new_data_page, Options());
assert(iter.HasNext());
iter.Next();
page_key.assign(iter.Key());
}
else
{
new_data_page.SetPage(Page(true));
memcpy(new_data_page.PagePtr(), page_view.data(), page_view.size());
}
KvError err = LeafLinkInsert(std::move(new_data_page));
CHECK_KV_ERR(err);
assert(stack_.back()->changes_.empty() ||
stack_.back()->changes_.back().key_ < page_key);
stack_.back()->changes_.emplace_back(
std::move(page_key), page_id, WriteOp::Upsert);
}
else
{
DataPage new_page(page_id);
memcpy(new_page.PagePtr(), page_view.data(), page_view.size());
LeafLinkUpdate(std::move(new_page));
}
return ShiftLeafLink();
}
Page BatchWriteTask::Redistribute(DataPage &prev_page,
std::string_view cur_page)
{
const uint16_t prev_page_len = prev_page.ContentLength();
const uint16_t cur_page_len =
DecodeFixed16(cur_page.data() + DataPage::page_size_offset);
assert(prev_page_len > cur_page_len);
const uint16_t average_len = (prev_page_len + cur_page_len) >> 1;
assert(prev_page_len >= average_len);
PageRegionIter iter({prev_page.PagePtr(), prev_page_len});
uint16_t preserve_region_cnt = 0;
uint16_t preserve_len = DataPageBuilder::HeaderSize() + sizeof(uint16_t);
while (preserve_len < average_len)
{
assert(iter.Valid());
std::string_view region = iter.Region();
preserve_len += region.size() + sizeof(uint16_t);
preserve_region_cnt++;
iter.Next();
}
assert(preserve_len <= prev_page_len);
Page new_page(true);
if (preserve_len == prev_page_len)
{
std::memcpy(new_page.Ptr(), cur_page.data(), cur_page.size());
return new_page;
}
FastPageBuilder builder(Options());
builder.Reset(new_page.Ptr(), PageType::Data);
for (; iter.Valid(); iter.Next())
{
std::string_view region = iter.Region();
builder.AddRegion(region);
}
std::string_view content = cur_page.substr(0, cur_page_len);
for (iter.Reset(content); iter.Valid(); iter.Next())
{
std::string_view region = iter.Region();
builder.AddRegion(region);
}
builder.Finish();
iter.Reset({prev_page.PagePtr(), prev_page_len});
builder.Reset(prev_page.PagePtr(), PageType::Data);
for (uint16_t i = 0; i < preserve_region_cnt; i++)
{
std::string_view region = iter.Region();
builder.AddRegion(region);
iter.Next();
}
builder.Finish();
#ifndef NDEBUG
assert(prev_page.RestartNum() == preserve_region_cnt);
assert(prev_page.ContentLength() == preserve_len);
const uint16_t new_page_len =
DecodeFixed16(new_page.Ptr() + DataPage::page_size_offset);
assert(prev_page_len + cur_page_len == preserve_len + new_page_len);
#endif
return new_page;
}
std::string_view BatchWriteTask::Redistribute(MemIndexPage::Handle &prev_handle,
std::string_view cur_page,
std::string &cur_page_key)
{
const uint16_t prev_page_len = prev_handle->ContentLength();
const uint16_t cur_page_len =
DecodeFixed16(cur_page.data() + DataPage::page_size_offset);
cur_page = cur_page.substr(0, cur_page_len);
assert(prev_page_len > cur_page_len);
const uint16_t average_len = (prev_page_len + cur_page_len) >> 1;
assert(prev_page_len >= average_len);
PageRegionIter region_iter({prev_handle->PagePtr(), prev_page_len});
uint16_t preserve_region_cnt = 0;
uint16_t preserve_len = IndexPageBuilder::HeaderSize() + sizeof(uint16_t);
while (preserve_len < average_len)
{
assert(region_iter.Valid());
std::string_view region = region_iter.Region();
preserve_len += region.size() + sizeof(uint16_t);
preserve_region_cnt++;
region_iter.Next();
}
assert(preserve_len <= prev_page_len);
if (preserve_len == prev_page_len)
{
return cur_page;
}
bool is_leaf_idx = prev_handle->IsPointingToLeaf();
IndexPageBuilder idx_builder(Options());
IndexPageIter iter_prev(prev_handle, Options());
iter_prev.SeekToRestart(preserve_region_cnt);
iter_prev.Next();
std::string new_page_key(iter_prev.Key());
idx_builder.Add({}, iter_prev.GetPageId(), is_leaf_idx);
while (iter_prev.Next())
{
idx_builder.Add(iter_prev.Key(), iter_prev.GetPageId(), is_leaf_idx);
}
IndexPageIter iter_cur(cur_page, Options());
iter_cur.Next();
assert(iter_cur.Key().empty());
idx_builder.Add(cur_page_key, iter_cur.GetPageId(), is_leaf_idx);
while (iter_cur.Next())
{
idx_builder.Add(iter_cur.Key(), iter_cur.GetPageId(), is_leaf_idx);
}
std::string_view new_page = idx_builder.Finish();
idx_page_builder_.Swap(idx_builder);
cur_page_key = std::move(new_page_key);
FastPageBuilder builder(Options());
builder.Reset(prev_handle->PagePtr(), TypeOfPage(prev_handle->PagePtr()));
region_iter.Reset({prev_handle->PagePtr(), prev_page_len});
for (uint16_t i = 0; i < preserve_region_cnt; i++)
{
std::string_view region = region_iter.Region();
builder.AddRegion(region);
region_iter.Next();
}
builder.Finish();
#ifndef NDEBUG
assert(prev_handle->RestartNum() == preserve_region_cnt);
assert(prev_handle->ContentLength() == preserve_len);
#endif
return new_page;
}
std::string_view BatchWriteTask::LeftBound(bool is_data_page)
{
size_t level = is_data_page ? 0 : 1;
auto stack_it = stack_.crbegin() + level;
while (stack_it != stack_.crend())
{
IndexPageIter &idx_iter = (*stack_it)->idx_page_iter_;
std::string_view idx_key = idx_iter.Key();
if (!idx_key.empty())
{
return idx_key;
}
++stack_it;
}
return std::string_view{};
}
std::string BatchWriteTask::RightBound(bool is_data_page)
{
size_t level = is_data_page ? 0 : 1;
auto stack_it = stack_.crbegin() + level;
std::string next_key{};
while (stack_it != stack_.crend())
{
IndexPageIter &idx_iter = (*stack_it)->idx_page_iter_;
idx_iter.PeekNextKey(next_key);
if (!next_key.empty())
{
return next_key;
}
++stack_it;
}
return next_key;
}
KvError BatchWriteTask::DeleteTree(PageId page_id, bool update_prev)
{
auto [handle, err] = shard->IndexManager()->FindPage(
cow_meta_.mapper_->GetMapping(), page_id);
CHECK_KV_ERR(err);
IndexPageIter iter(handle, Options());
if (handle->IsPointingToLeaf())
{
while (iter.Next())
{
err = DeleteDataPage(iter.GetPageId(), update_prev);
update_prev = false;
if (err != KvError::NoError)
{
return err;
}
}
FreePage(page_id);
return KvError::NoError;
}
while (iter.Next())
{
KvError err = DeleteTree(iter.GetPageId(), update_prev);
update_prev = false;
if (err != KvError::NoError)
{
return err;
}
}
FreePage(page_id);
return KvError::NoError;
}
KvError BatchWriteTask::DeleteDataPage(PageId page_id, bool update_prev)
{
auto [page, err] = LoadDataPage(page_id);
CHECK_KV_ERR(err);
DataPageIter iter(&page, Options());
while (iter.Next())
{
if (iter.IsOverflow())
{
err = DelOverflowValue(iter.Value());
CHECK_KV_ERR(err);
}
uint64_t expire_ts = iter.ExpireTs();
UpdateTTL(expire_ts, iter.Key(), WriteOp::Delete);
}
FreePage(page_id);
if (!update_prev || page.PrevPageId() == MaxPageId)
{
return KvError::NoError;
}
auto [prev_page, err_prev] = LoadDataPage(page.PrevPageId());
if (err_prev != KvError::NoError)
{
return err_prev;
}
prev_page.SetNextPageId(MaxPageId);
return WritePage(std::move(prev_page));
}
KvError BatchWriteTask::WriteOverflowValue(std::string_view value)
{
const KvOptions *opts = Options();
const uint16_t page_cap = OverflowPage::Capacity(opts, false);
const uint16_t end_page_cap = OverflowPage::Capacity(opts, true);
overflow_ptrs_.clear();
uint32_t end_page_id = MaxPageId;
std::string_view page_val;
KvError err;
std::array<uint32_t, max_overflow_pointers> buf;
std::span<uint32_t> pointers;
while (!value.empty())
{
size_t next_group_size = (value.size() + page_cap - 1) / page_cap;
next_group_size =
std::min(next_group_size, size_t(opts->overflow_pointers));
for (uint8_t i = 0; i < next_group_size; i++)
{
buf[i] = cow_meta_.mapper_->GetPage();
}
pointers = {buf.data(), next_group_size};
if (end_page_id == MaxPageId)
{
for (uint32_t pg_id : pointers)
{
PutFixed32(&overflow_ptrs_, pg_id);
}
}
else
{
err =
WritePage(OverflowPage(end_page_id, opts, page_val, pointers));
CHECK_KV_ERR(err);
}
uint8_t i = 0;
for (uint32_t pg_id : pointers)
{
i++;
if (i == opts->overflow_pointers && value.size() > page_cap)
{
end_page_id = pg_id;
uint16_t page_val_size =
std::min(size_t(end_page_cap), value.size());
page_val = value.substr(0, page_val_size);
value = value.substr(page_val_size);
break;
}
uint16_t page_val_size = std::min(size_t(page_cap), value.size());
page_val = value.substr(0, page_val_size);
value = value.substr(page_val_size);
err = WritePage(OverflowPage(pg_id, opts, page_val));
CHECK_KV_ERR(err);
}
assert(i == pointers.size());
}
return KvError::NoError;
}
KvError BatchWriteTask::DelOverflowValue(std::string_view encoded_ptrs)
{
std::array<PageId, max_overflow_pointers> pointers;
uint8_t n_ptrs = DecodeOverflowPointers(encoded_ptrs, pointers);
for (uint8_t i = 0; i < n_ptrs;)
{
PageId page_id = pointers[i];
if (i == (Options()->overflow_pointers - 1))
{
auto [page, err] = LoadOverflowPage(page_id);
CHECK_KV_ERR(err);
encoded_ptrs = page.GetEncodedPointers(Options());
n_ptrs = DecodeOverflowPointers(encoded_ptrs, pointers);
i = 0;
}
else
{
i++;
}
FreePage(page_id);
}
return KvError::NoError;
}
void BatchWriteTask::AdvanceDataPageIter(DataPageIter &iter, bool &is_valid)
{
is_valid = iter.HasNext() ? iter.Next() : false;
}
void BatchWriteTask::AdvanceIndexPageIter(IndexPageIter &iter, bool &is_valid)
{
is_valid = iter.HasNext() ? iter.Next() : false;
}
std::pair<MemIndexPage::Handle, KvError> BatchWriteTask::TruncateIndexPage(
PageId page_id, std::string_view trunc_pos)
{
auto [handle, err] = shard->IndexManager()->FindPage(
cow_meta_.mapper_->GetMapping(), page_id);
if (err != KvError::NoError)
{
return {MemIndexPage::Handle(), err};
}
const bool is_leaf_idx = handle->IsPointingToLeaf();
IndexPageBuilder builder(Options());
auto truncate_sub_node = [&](std::string_view sub_node_key,
PageId sub_node_id) -> KvError
{
if (is_leaf_idx)
{
auto [truncated, trunc_err] =
TruncateDataPage(sub_node_id, trunc_pos);
CHECK_KV_ERR(trunc_err);
if (truncated)
{
builder.Add(sub_node_key, sub_node_id, is_leaf_idx);
}
}
else
{
auto [sub_handle, trunc_err] =
TruncateIndexPage(sub_node_id, trunc_pos);
CHECK_KV_ERR(trunc_err);
if (sub_handle)
{
builder.Add(sub_node_key, sub_node_id, is_leaf_idx);
}
}
return KvError::NoError;
};
auto delete_sub_node = [this, is_leaf_idx](std::string_view sub_node_key,
PageId sub_node_id,
bool is_trunc_begin) -> KvError
{
if (is_leaf_idx)
{
return DeleteDataPage(sub_node_id, is_trunc_begin);
}
else
{
return DeleteTree(sub_node_id, is_trunc_begin);
}
};
IndexPageIter iter(handle, Options());
CHECK(iter.Next());
std::string sub_node_key = {};
PageId sub_node_id = iter.GetPageId();
bool cut_point_found = false;
while (iter.Next())
{
std::string_view next_node_key = iter.Key();
PageId next_node_id = iter.GetPageId();
if (cut_point_found)
{
err = delete_sub_node(sub_node_key, sub_node_id, false);
if (err != KvError::NoError)
{
return {MemIndexPage::Handle(), err};
}
}
else if (Comp()->Compare(trunc_pos, next_node_key) >= 0)
{
builder.Add(sub_node_key, sub_node_id, is_leaf_idx);
}
else
{
cut_point_found = true;
if (Comp()->Compare(trunc_pos, sub_node_key) > 0)
{
err = truncate_sub_node(sub_node_key, sub_node_id);
}
else
{
assert(Comp()->Compare(trunc_pos, sub_node_key) == 0);
err = delete_sub_node(sub_node_key, sub_node_id, true);
}
if (err != KvError::NoError)
{
return {MemIndexPage::Handle(), err};
}
}
sub_node_key = next_node_key;
sub_node_id = next_node_id;
}
if (cut_point_found)
{
err = delete_sub_node(sub_node_key, sub_node_id, false);
}
else if (Comp()->Compare(trunc_pos, sub_node_key) > 0)
{
err = truncate_sub_node(sub_node_key, sub_node_id);
}
else
{
err = delete_sub_node(sub_node_key, sub_node_id, true);
}
if (err != KvError::NoError)
{
return {MemIndexPage::Handle(), err};
}
if (builder.IsEmpty())
{
FreePage(page_id);
return {MemIndexPage::Handle(), KvError::NoError};
}
MemIndexPage *new_page = shard->IndexManager()->AllocIndexPage();
if (new_page == nullptr)
{
return {MemIndexPage::Handle(), KvError::OutOfMem};
}
MemIndexPage::Handle new_handle(new_page);
std::string_view page_view = builder.Finish();
memcpy(new_handle->PagePtr(), page_view.data(), page_view.size());
new_handle->SetPageId(page_id);
err = WritePage(new_handle);
if (err != KvError::NoError)
{
MemIndexPage *page = new_handle.Get();
new_handle.Reset();
CHECK(page->IsDetached());
CHECK(!page->IsPinned());
shard->IndexManager()->FreeIndexPage(page);
return {MemIndexPage::Handle(), err};
}
return {std::move(new_handle), KvError::NoError};
}
KvError BatchWriteTask::Truncate(std::string_view trunc_pos)
{
KvError err = shard->IndexManager()->MakeCowRoot(tbl_ident_, cow_meta_);
CHECK_KV_ERR(err);
if (cow_meta_.root_id_ == MaxPageId)
{
return KvError::NoError;
}
if (trunc_pos.empty())
{
do_update_ttl_ = false;
ttl_batch_.clear();
MappingSnapshot *mapping = cow_meta_.mapper_->GetMapping();
const auto &mapping_tbl = mapping->mapping_tbl_;
for (PageId page_id = 0; page_id < mapping_tbl.size(); ++page_id)
{
uint64_t val = mapping_tbl.Get(page_id);
auto val_type = MappingSnapshot::GetValType(val);
if (val_type == MappingSnapshot::ValType::Invalid ||
val_type == MappingSnapshot::ValType::PageId)
{
continue;
}
FreePage(page_id);
}
cow_meta_.root_id_ = MaxPageId;
cow_meta_.ttl_root_id_ = MaxPageId;
cow_meta_.next_expire_ts_ = 0;
wal_builder_.Reset();
return UpdateMeta();
}
do_update_ttl_ = true;
auto [new_root, error] = TruncateIndexPage(cow_meta_.root_id_, trunc_pos);
CHECK_KV_ERR(error);
cow_meta_.root_id_ = new_root ? new_root->GetPageId() : MaxPageId;
err = ApplyTTLBatch();
CHECK_KV_ERR(err);
return UpdateMeta();
}
std::pair<bool, KvError> BatchWriteTask::TruncateDataPage(
PageId page_id, std::string_view trunc_pos)
{
auto [page, err] = LoadDataPage(page_id);
if (err != KvError::NoError)
{
return {true, err};
}
const Comparator *cmp = shard->IndexManager()->GetComparator();
DataPageIter iter{&page, Options()};
data_page_builder_.Reset();
bool has_trunc_tail = false;
while (iter.Next())
{
if (cmp->Compare(iter.Key(), trunc_pos) >= 0)
{
has_trunc_tail = true;
break;
}
data_page_builder_.Add(iter.Key(),
iter.Value(),
iter.IsOverflow(),
iter.Timestamp(),
iter.ExpireTs(),
iter.CompressionType());
}
if (has_trunc_tail)
{
do
{
if (iter.IsOverflow())
{
err = DelOverflowValue(iter.Value());
if (err != KvError::NoError)
{
return {true, err};
}
}
uint64_t expire_ts = iter.ExpireTs();
UpdateTTL(expire_ts, iter.Key(), WriteOp::Delete);
} while (iter.Next());
}
if (data_page_builder_.IsEmpty())
{
FreePage(page.GetPageId());
uint32_t prev_page_id = page.PrevPageId();
if (prev_page_id == MaxPageId)
{
return {false, KvError::NoError};
}
auto [prev_page, err] = LoadDataPage(prev_page_id);
if (err != KvError::NoError)
{
return {false, err};
}
prev_page.SetNextPageId(MaxPageId);
err = WritePage(std::move(prev_page));
return {false, err};
}
else
{
DataPage new_page(page_id);
std::string_view page_view = data_page_builder_.Finish();
memcpy(new_page.PagePtr(), page_view.data(), page_view.size());
new_page.SetNextPageId(MaxPageId);
new_page.SetPrevPageId(page.PrevPageId());
err = WritePage(std::move(new_page));
return {true, err};
}
}
KvError BatchWriteTask::CleanExpiredKeys()
{
std::vector<DataPage> prefetched_pages;
std::vector<Page> read_pages;
ScanIterator iter(tbl_ident_, prefetched_pages, read_pages);
KvError err = iter.Seek({}, true);
if (err != KvError::NoError)
{
if (err == KvError::EndOfFile)
{
return KvError::NoError;
}
else
{
LOG(ERROR) << "failed to clean expired keys: " << ErrorString(err);
return err;
}
}
std::vector<WriteDataEntry> data_batch, ttl_batch;
data_batch.reserve(128);
ttl_batch.reserve(128);
const uint64_t now_ts_ms = utils::UnixTs<chrono::milliseconds>();
const uint64_t now_ts_us = utils::UnixTs<chrono::microseconds>();
uint64_t next_expire_ts = 0;
do
{
std::string_view ttl_key = iter.Key();
uint64_t expire_ts = BigEndianToNative(DecodeFixed64(ttl_key.data()));
if (expire_ts > now_ts_ms)
{
next_expire_ts = expire_ts;
break;
}
ttl_batch.emplace_back(std::string(ttl_key), "", 0, WriteOp::Delete);
std::string key(ttl_key.substr(8));
data_batch.emplace_back(
std::move(key), "", now_ts_us, WriteOp::Delete, expire_ts);
} while (iter.Next() == KvError::NoError);
if (ttl_batch.empty())
{
return KvError::NoError;
}
err = shard->IndexManager()->MakeCowRoot(tbl_ident_, cow_meta_);
CHECK_KV_ERR(err);
assert(cow_meta_.next_expire_ts_ != 0 &&
cow_meta_.next_expire_ts_ <= now_ts_ms);
std::sort(data_batch.begin(), data_batch.end());
SetBatch(data_batch);
err = ApplyBatch(cow_meta_.root_id_, false, now_ts_ms);
CHECK_KV_ERR(err);
assert(std::is_sorted(ttl_batch.begin(), ttl_batch.end()));
SetBatch(ttl_batch);
err = ApplyBatch(cow_meta_.ttl_root_id_, false);
CHECK_KV_ERR(err);
cow_meta_.next_expire_ts_ = next_expire_ts;
return UpdateMeta();
}
void BatchWriteTask::UpdateTTL(uint64_t expire_ts,
std::string_view key,
WriteOp op)
{
if (do_update_ttl_ && expire_ts != 0)
{
std::string ttl_key;
ttl_key.resize(sizeof(expire_ts) + key.size());
EncodeFixed64(ttl_key.data(), ToBigEndian(expire_ts));
std::memcpy(ttl_key.data() + sizeof(expire_ts), key.data(), key.size());
ttl_batch_.emplace_back(std::move(ttl_key), "", 0, op, 0);
if (op == WriteOp::Upsert)
{
uint64_t next_ts = cow_meta_.next_expire_ts_;
cow_meta_.next_expire_ts_ =
next_ts == 0 ? expire_ts : std::min(next_ts, expire_ts);
}
}
}
}