1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#pragma once
#include <utility>
#include "storage/data_page_builder.h"
#include "storage/index_page_builder.h"
#include "tasks/write_task.h"
#include "write_tree_stack.h"
namespace eloqstore
{
class BatchWriteTask : public WriteTask
{
public:
BatchWriteTask();
TaskType Type() const override
{
return TaskType::BatchWrite;
}
void Reset(const TableIdent &tbl_id) override;
void Abort() override;
bool SetBatch(std::span<WriteDataEntry> entries);
KvError Apply();
KvError Truncate(std::string_view trunc_pos);
KvError CleanExpiredKeys();
private:
KvError ApplyBatch(PageId &root_id, bool update_ttl, uint64_t now_ts = 0);
KvError ApplyTTLBatch();
KvError ApplyOnePage(size_t &cidx, uint64_t now_ms);
KvError LoadApplyingPage(PageId page_id);
std::pair<MemIndexPage::Handle, KvError> Pop();
KvError FinishIndexPage(MemIndexPage::Handle &prev_handle,
std::string &prev_key,
PageId &prev_page_id,
std::string cur_page_key);
KvError FlushIndexPage(MemIndexPage::Handle &new_page,
std::string idx_page_key,
PageId page_id,
bool split);
KvError FinishDataPage(std::string page_key, PageId page_id);
/**
* @brief Balance two adjacent data pages by transferring data regions from
* the previous page to the current page.
* @param prev_page The previous page is expected to be larger than the
* current page, and will be modified in-place.
* @param cur_page The current page before redistributing.
* @return The current page after redistributing.
*/
Page Redistribute(DataPage &prev_page, std::string_view cur_page);
std::string_view Redistribute(MemIndexPage::Handle &prev_handle,
std::string_view cur_page,
std::string &cur_page_key);
/**
* @brief Pops up the index stack such that the top index entry contains the
* search key.
*
* @param search_key
*/
KvError SeekStack(std::string_view search_key);
std::pair<uint32_t, KvError> Seek(std::string_view key);
/**
* @brief Calculates the left boundary of the data page or the top index
* page in the stack.
*
* @param is_data_page
* @return std::string_view
*/
std::string_view LeftBound(bool is_data_page);
/**
* @brief Calculates the right boundary of the data page or the top index
* page in the stack.
*
* @param is_data_page
* @return std::string
*/
std::string RightBound(bool is_data_page);
std::span<WriteDataEntry> data_batch_;
DataPage applying_page_;
/**
* @brief Batch of updates that need to be applied on the TTL tree.
*/
std::vector<WriteDataEntry> ttl_batch_;
bool do_update_ttl_{};
inline void UpdateTTL(uint64_t expire_ts, std::string_view key, WriteOp op);
std::vector<std::unique_ptr<IndexStackEntry>> stack_;
IndexPageBuilder idx_page_builder_;
DataPageBuilder data_page_builder_;
std::string overflow_ptrs_;
/**
* @brief To maintain the double link list between bottom data pages, we
* keep at most three adjacent data pages in memory. [1] is the page
* ApplyOnePage currently writing to, [0] is the previous page of [1], and
* [2] is the next page of [1].
*/
DataPage leaf_triple_[3];
/**
* @brief Get triple element at position idx.
*
* @param idx The element position, can be 0, 1 or 2.
*/
DataPage *TripleElement(uint8_t idx);
/**
* @brief Load leaf data page from disk to leaf_triple_[idx].
*
* @param idx The element position, can be 0 or 2.
*/
KvError LoadTripleElement(uint8_t idx, PageId page_id);
/**
* @brief Update element in leaf link list. Replace the old page at
* applying_page_ with new page. The new page is the first page generated by
* ApplyOnePage.
*
* @param page The new page.
* @param old_fp File page id of the old page.
*/
void LeafLinkUpdate(DataPage &&page);
/**
* @brief Insert new page into leaf link list.
*
* @param page The new page.
*/
KvError LeafLinkInsert(DataPage &&page);
/**
* @brief Delete the old data page at applying_page_ from leaf link list.
*/
KvError LeafLinkDelete();
KvError ShiftLeafLink();
KvError DeleteTree(PageId page_id, bool update_prev);
KvError DeleteDataPage(PageId page_id, bool update_prev);
/**
* @brief Split and write overflow value into multiple pages.
* @return overflow_ptrs_ store the encoded overflow pointers.
*/
KvError WriteOverflowValue(std::string_view value);
/**
* @brief Delete overflow value.
* @param encoded_ptrs The encoded overflow pointers.
*/
KvError DelOverflowValue(std::string_view encoded_ptrs);
/**
* @brief Truncate the data page at page_id from the trunc_pos.
* @return true if the page is empty after truncation.
*/
std::pair<bool, KvError> TruncateDataPage(PageId page_id,
std::string_view trunc_pos);
std::pair<MemIndexPage::Handle, KvError> TruncateIndexPage(
PageId page_id, std::string_view trunc_pos);
static void AdvanceDataPageIter(DataPageIter &iter, bool &is_valid);
static void AdvanceIndexPageIter(IndexPageIter &iter, bool &is_valid);
};
} // namespace eloqstore