1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <cinttypes>
#include <list>
#include <string>
#include <unordered_map>
#include "db/dbformat.h"
#include "rocksdb/comparator.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_builder.h"
#include "table/block_based/flush_block_policy_impl.h"
#include "table/format.h"
#include "util/atomic.h"
namespace ROCKSDB_NAMESPACE {
// The interface for building index.
// Instruction for adding a new concrete IndexBuilder:
// 1. Create a subclass instantiated from IndexBuilder.
// 2. Add a new entry associated with that subclass in TableOptions::IndexType.
// 3. Add a create function for the new subclass in CreateIndexBuilder.
// Note: we can devise more advanced design to simplify the process for adding
// new subclass, which will, on the other hand, increase the code complexity and
// catch unwanted attention from readers. Given that we won't add/change
// indexes frequently, it makes sense to just embrace a more straightforward
// design that just works.
class IndexBuilder {
public:
static IndexBuilder* CreateIndexBuilder(
BlockBasedTableOptions::IndexType index_type,
const InternalKeyComparator* comparator,
const InternalKeySliceTransform* int_key_slice_transform,
bool use_value_delta_encoding, const BlockBasedTableOptions& table_opt,
size_t ts_sz, bool persist_user_defined_timestamps);
// Index builder will construct a set of blocks which contain:
// 1. One primary index block.
// 2. (Optional) a set of metablocks that contains the metadata of the
// primary index.
struct IndexBlocks {
Slice index_block_contents;
std::unordered_map<std::string, std::pair<BlockType, Slice>> meta_blocks;
};
IndexBuilder(const InternalKeyComparator* comparator, size_t ts_sz,
bool persist_user_defined_timestamps)
: comparator_(comparator),
ts_sz_(ts_sz),
persist_user_defined_timestamps_(persist_user_defined_timestamps) {}
virtual ~IndexBuilder() = default;
// Add a new index entry to index block.
//
// To allow further optimization, we provide `last_key_in_current_block` and
// `first_key_in_next_block`, based on which the specific implementation can
// determine the best index key to be used for the index block.
// Called before the OnKeyAdded() call for first_key_in_next_block.
// @last_key_in_current_block: TODO lifetime details
// @first_key_in_next_block: it will be nullptr if the entry being added is
// the last one in the table
// @separator_scratch: a scratch buffer to back a computed separator between
// those, as needed. May be modified on each call.
// @skip_delta_encoding: whether to skip delta encoding for this index entry
// for cases of violating the assumption that this
// block_handle starts where the last one ended.
// @return: the key or separator stored in the index, which could be
// last_key_in_current_block or a computed separator backed by
// separator_scratch or last_key_in_current_block.
// REQUIRES: Finish() has not yet been called.
virtual Slice AddIndexEntry(const Slice& last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle,
std::string* separator_scratch,
bool skip_delta_encoding) = 0;
// An abstract (extensible) holder for passing data from PrepareIndexEntry to
// FinishIndexEntry (see below).
struct PreparedIndexEntry {
virtual ~PreparedIndexEntry() = default;
};
// Parallel compression/construction alternative to AddIndexEntry, 1/3
//
// This function creates a holder for data that needs to be passed from
// PrepareIndexEntry to FinishIndexEntry, depending on the implementation
// of those. Few of these are created and reused, so construction/destruction
// performance is not critical.
virtual std::unique_ptr<PreparedIndexEntry> CreatePreparedIndexEntry() = 0;
// Parallel compression/construction alternative to AddIndexEntry, 2/3
//
// One thread calls this function for successive index entries to compute and
// record in `out` what is needed to build the index entry EXCEPT for the
// BlockHandle, which will only be known later. That thread is generally the
// same thread as calls every other function such as OnKeyAdded EXCEPT
// FinishIndexEntry (see below). This function should be considered "mostly
// stateless" but might modify state distinct from what is modified by
// FinishIndexEntry. Ideally synchronization within the IndexBuilder can be
// avoided.
//
// The passed-in PreparedIndexEntry object is likely reused so might be
// passed-in in any state.
virtual void PrepareIndexEntry(const Slice& last_key_in_current_block,
const Slice* first_key_in_next_block,
PreparedIndexEntry* out) = 0;
// Parallel compression/construction alternative to AddIndexEntry, 3/3
//
// This function is called by a different thread than PrepareIndexEntry, but
// is called on entries in the same order as PrepareIndexEntry, passed in the
// PreparedIndexEntry objects populated by PrepareIndexEntry. This function
// finishes the same effect of AddIndexEntry but split across a few functions.
//
// External synchronization ensures Finish is only called after all the
// FinishIndexEntry calls have completed.
virtual void FinishIndexEntry(const BlockHandle& block_handle,
PreparedIndexEntry* entry,
bool skip_delta_encoding) = 0;
// This method will be called whenever a key is added. The subclasses may
// override OnKeyAdded() if they need to collect additional information.
virtual void OnKeyAdded(const Slice& /*key*/,
const std::optional<Slice>& /*value*/) {}
// Inform the index builder that all entries has been written. Block builder
// may therefore perform any operation required for block finalization.
//
// REQUIRES: Finish() has not yet been called.
inline Status Finish(IndexBlocks* index_blocks) {
// Throw away the changes to last_partition_block_handle. It has no effect
// on the first call to Finish anyway.
BlockHandle last_partition_block_handle;
return Finish(index_blocks, last_partition_block_handle);
}
// This override of Finish can be utilized to build the 2nd level index in
// PartitionIndexBuilder.
//
// index_blocks will be filled with the resulting index data. If the return
// value is Status::InComplete() then it means that the index is partitioned
// and the callee should keep calling Finish until Status::OK() is returned.
// In that case, last_partition_block_handle is pointer to the block written
// with the result of the last call to Finish. This can be utilized to build
// the second level index pointing to each block of partitioned indexes. The
// last call to Finish() that returns Status::OK() populates index_blocks with
// the 2nd level index content.
virtual Status Finish(IndexBlocks* index_blocks,
const BlockHandle& last_partition_block_handle) = 0;
// Get the size for index block. Must be called after ::Finish.
virtual size_t IndexSize() const = 0;
// Returns an estimate of the current index size based on the builder's state.
// Implementations should cache the estimate and update it via
// UpdateIndexSizeEstimate() to avoid recalculating on every key add,
// which is critical for performance in the compaction hot path.
//
// This function is only called by the SST "emit thread" but must be
// thread safe with concurrent calls to UpdateIndexSizeEstimate() from another
// thread (such as during parallel compression).
virtual uint64_t CurrentIndexSizeEstimate() const = 0;
virtual bool separator_is_key_plus_seq() { return true; }
protected:
// Given the last key in current block and the first key in the next block,
// return true if internal key should be used as separator, false if user key
// can be used as separator.
inline bool ShouldUseKeyPlusSeqAsSeparator(
const Slice& last_key_in_current_block,
const Slice& first_key_in_next_block) const {
Slice l_user_key = ExtractUserKey(last_key_in_current_block);
Slice r_user_key = ExtractUserKey(first_key_in_next_block);
// If user defined timestamps are not persisted. All the user keys will
// act like they have minimal timestamp. Only having user key is not
// sufficient, even if they are different user keys for now, they have to be
// different user keys without the timestamp part.
return persist_user_defined_timestamps_
? comparator_->user_comparator()->Compare(l_user_key,
r_user_key) == 0
: comparator_->user_comparator()->CompareWithoutTimestamp(
l_user_key, r_user_key) == 0;
}
// Updates the cached index size estimate used by CurrentIndexSizeEstimate().
//
// This function can be called from the SST "write thread" (via
// FinishIndexEntry()), and needs to be thread safe with
// CurrentIndexSizeEstimate() called from the SST "emit thread".
virtual void UpdateIndexSizeEstimate() {}
const InternalKeyComparator* comparator_;
// Size of user-defined timestamp in bytes.
size_t ts_sz_;
// Whether user-defined timestamp in the user key should be persisted when
// creating index block. If this flag is false, user-defined timestamp will
// be stripped from user key for each index entry, and the
// `first_internal_key` in `IndexValue` if it's included.
bool persist_user_defined_timestamps_;
// Set after ::Finish is called
size_t index_size_ = 0;
};
// This index builder builds space-efficient index block.
//
// Optimizations:
// 1. Made block's `block_restart_interval` to be 1, which will avoid linear
// search when doing index lookup (can be disabled by setting
// index_block_restart_interval).
// 2. Shorten the key length for index block. Other than honestly using the
// last key in the data block as the index key, we instead find a shortest
// substitute key that serves the same function.
class ShortenedIndexBuilder : public IndexBuilder {
public:
ShortenedIndexBuilder(
const InternalKeyComparator* comparator,
const int index_block_restart_interval, const uint32_t format_version,
const bool use_value_delta_encoding,
BlockBasedTableOptions::IndexShorteningMode shortening_mode,
bool include_first_key, size_t ts_sz,
const bool persist_user_defined_timestamps)
: IndexBuilder(comparator, ts_sz, persist_user_defined_timestamps),
index_block_builder_(
index_block_restart_interval, true /*use_delta_encoding*/,
use_value_delta_encoding,
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
persist_user_defined_timestamps, false /* is_user_key */,
/*use_separated_kv_storage=*/false),
index_block_builder_without_seq_(
index_block_restart_interval, true /*use_delta_encoding*/,
use_value_delta_encoding,
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
persist_user_defined_timestamps, true /* is_user_key */,
/*use_separated_kv_storage=*/false),
use_value_delta_encoding_(use_value_delta_encoding),
include_first_key_(include_first_key),
shortening_mode_(shortening_mode) {
// Making the default true will disable the feature for old versions
must_use_separator_with_seq_.StoreRelaxed(format_version <= 2);
}
void OnKeyAdded(const Slice& key,
const std::optional<Slice>& /*value*/) override {
if (include_first_key_ && current_block_first_internal_key_.empty()) {
current_block_first_internal_key_.assign(key.data(), key.size());
}
}
Slice GetSeparatorWithSeq(const Slice& last_key_in_current_block,
const Slice* first_key_in_next_block,
std::string* separator_scratch) {
Slice separator_with_seq;
if (first_key_in_next_block != nullptr) {
if (shortening_mode_ !=
BlockBasedTableOptions::IndexShorteningMode::kNoShortening) {
separator_with_seq = FindShortestInternalKeySeparator(
*comparator_->user_comparator(), last_key_in_current_block,
*first_key_in_next_block, separator_scratch);
} else {
separator_with_seq = last_key_in_current_block;
}
if (!must_use_separator_with_seq_.LoadRelaxed() &&
ShouldUseKeyPlusSeqAsSeparator(last_key_in_current_block,
*first_key_in_next_block)) {
must_use_separator_with_seq_.StoreRelaxed(true);
}
} else {
if (shortening_mode_ == BlockBasedTableOptions::IndexShorteningMode::
kShortenSeparatorsAndSuccessor) {
separator_with_seq = FindShortInternalKeySuccessor(
*comparator_->user_comparator(), last_key_in_current_block,
separator_scratch);
} else {
separator_with_seq = last_key_in_current_block;
}
}
return separator_with_seq;
}
Slice GetFirstInternalKey(std::string* first_internal_key_buf) const {
if (!include_first_key_) {
return Slice();
}
assert(!current_block_first_internal_key_.empty());
// When UDT should not be persisted, the index block builders take care of
// stripping UDT from the key, for the first internal key contained in the
// IndexValue, we need to explicitly do the stripping here before passing
// it to the block builders.
Slice first_internal_key = current_block_first_internal_key_;
if (!current_block_first_internal_key_.empty() && ts_sz_ > 0 &&
!persist_user_defined_timestamps_) {
first_internal_key_buf->clear();
StripTimestampFromInternalKey(first_internal_key_buf,
current_block_first_internal_key_, ts_sz_);
first_internal_key = *first_internal_key_buf;
}
return first_internal_key;
}
void AddIndexEntryImpl(const Slice& separator_with_seq,
const Slice& first_internal_key,
const BlockHandle& block_handle,
bool must_use_separator_with_seq,
bool skip_delta_encoding) {
IndexValue entry(block_handle, first_internal_key);
std::string encoded_entry;
std::string delta_encoded_entry;
entry.EncodeTo(&encoded_entry, include_first_key_, nullptr);
if (use_value_delta_encoding_ && !last_encoded_handle_.IsNull() &&
!skip_delta_encoding) {
entry.EncodeTo(&delta_encoded_entry, include_first_key_,
&last_encoded_handle_);
} else {
// If it's the first block, or delta encoding is disabled,
// BlockBuilder::Add() below won't use delta-encoded slice.
}
last_encoded_handle_ = block_handle;
const Slice delta_encoded_entry_slice(delta_encoded_entry);
// TODO(yuzhangyu): fix this when "FindShortInternalKeySuccessor"
// optimization is available.
// Timestamp aware comparator currently doesn't provide override for
// "FindShortInternalKeySuccessor" optimization. So the actual
// last key in current block is used as the key for indexing the current
// block. As a result, when UDTs should not be persisted, it's safe to strip
// away the UDT from key in index block as data block does the same thing.
// What are the implications if a "FindShortInternalKeySuccessor"
// optimization is provided.
index_block_builder_.Add(separator_with_seq, encoded_entry,
&delta_encoded_entry_slice, skip_delta_encoding);
if (!must_use_separator_with_seq) {
index_block_builder_without_seq_.Add(
ExtractUserKey(separator_with_seq), encoded_entry,
&delta_encoded_entry_slice, skip_delta_encoding);
}
++num_index_entries_;
UpdateIndexSizeEstimate();
}
Slice AddIndexEntry(const Slice& last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle,
std::string* separator_scratch,
bool skip_delta_encoding) override {
Slice separator_with_seq = GetSeparatorWithSeq(
last_key_in_current_block, first_key_in_next_block, separator_scratch);
std::string first_internal_key_buf;
Slice first_internal_key = GetFirstInternalKey(&first_internal_key_buf);
AddIndexEntryImpl(separator_with_seq, first_internal_key, block_handle,
must_use_separator_with_seq_.LoadRelaxed(),
skip_delta_encoding);
current_block_first_internal_key_.clear();
return separator_with_seq;
}
struct ShortenedPreparedIndexEntry : public PreparedIndexEntry {
std::string separator_with_seq;
std::string first_internal_key;
bool must_use_separator_with_seq = false;
void SaveFrom(const Slice& from_separator,
const Slice& from_first_internal_key,
bool from_must_use_separator_with_seq) {
assert(from_separator.size() >= kNumInternalBytes);
if (from_separator.data() == separator_with_seq.data()) {
// No need to copy
assert(from_separator.size() == separator_with_seq.size());
} else {
// Copy the separator
separator_with_seq.assign(from_separator.data(), from_separator.size());
}
// first_internal_key is optional, so it may be empty.
assert(from_first_internal_key.empty() ||
from_first_internal_key.size() >= kNumInternalBytes);
if (from_first_internal_key.data() == first_internal_key.data()) {
// No need to copy
assert(from_first_internal_key.size() == first_internal_key.size());
} else {
// Copy the first internal key
first_internal_key.assign(from_first_internal_key.data(),
from_first_internal_key.size());
}
must_use_separator_with_seq = from_must_use_separator_with_seq;
}
};
std::unique_ptr<PreparedIndexEntry> CreatePreparedIndexEntry() override {
return std::make_unique<ShortenedPreparedIndexEntry>();
}
void PrepareIndexEntry(const Slice& last_key_in_current_block,
const Slice* first_key_in_next_block,
PreparedIndexEntry* out) override {
ShortenedPreparedIndexEntry* entry =
static_cast<ShortenedPreparedIndexEntry*>(out);
Slice separator =
GetSeparatorWithSeq(last_key_in_current_block, first_key_in_next_block,
&entry->separator_with_seq);
Slice first_internal_key = GetFirstInternalKey(&entry->first_internal_key);
entry->SaveFrom(separator, first_internal_key,
must_use_separator_with_seq_.LoadRelaxed());
current_block_first_internal_key_.clear();
}
void FinishIndexEntry(const BlockHandle& block_handle,
PreparedIndexEntry* base_entry,
bool skip_delta_encoding) override {
ShortenedPreparedIndexEntry* entry =
static_cast<ShortenedPreparedIndexEntry*>(base_entry);
AddIndexEntryImpl(entry->separator_with_seq, entry->first_internal_key,
block_handle, entry->must_use_separator_with_seq,
skip_delta_encoding);
}
using IndexBuilder::Finish;
Status Finish(IndexBlocks* index_blocks,
const BlockHandle& /*last_partition_block_handle*/) override {
if (must_use_separator_with_seq_.LoadRelaxed()) {
index_blocks->index_block_contents = index_block_builder_.Finish();
} else {
index_blocks->index_block_contents =
index_block_builder_without_seq_.Finish();
}
index_size_ = index_blocks->index_block_contents.size();
return Status::OK();
}
size_t IndexSize() const override { return index_size_; }
uint64_t CurrentIndexSizeEstimate() const override {
return estimated_index_size_.LoadRelaxed();
}
// Updates the cached size estimate to minimize CPU usage in hot path
void UpdateIndexSizeEstimate() override;
bool separator_is_key_plus_seq() override {
return must_use_separator_with_seq_.LoadRelaxed();
}
// Changes *key to a short string >= *key.
//
static Slice FindShortestInternalKeySeparator(const Comparator& comparator,
const Slice& start,
const Slice& limit,
std::string* scratch);
static Slice FindShortInternalKeySuccessor(const Comparator& comparator,
const Slice& key,
std::string* scratch);
friend class PartitionedIndexBuilder;
private:
BlockBuilder index_block_builder_;
// TODO: consider optimizing to only one builder. When discovering that
// sequence numbers are needed, read existing entries without seq and rewrite
// them with seq (which should be trivial to populate since seq wasn't needed
// before).
BlockBuilder index_block_builder_without_seq_;
const bool use_value_delta_encoding_;
RelaxedAtomic<bool> must_use_separator_with_seq_;
const bool include_first_key_;
BlockBasedTableOptions::IndexShorteningMode shortening_mode_;
BlockHandle last_encoded_handle_ = BlockHandle::NullBlockHandle();
std::string current_block_first_internal_key_;
uint64_t num_index_entries_ = 0;
// Cache for index size estimate to avoid recalculating in hot path
RelaxedAtomic<uint64_t> estimated_index_size_{0};
};
// HashIndexBuilder contains a binary-searchable primary index and the
// metadata for secondary hash index construction.
// The metadata for hash index consists two parts:
// - a metablock that compactly contains a sequence of prefixes. All prefixes
// are stored consectively without any metadata (like, prefix sizes) being
// stored, which is kept in the other metablock.
// - a metablock contains the metadata of the prefixes, including prefix size,
// restart index and number of block it spans. The format looks like:
//
// +-----------------+---------------------------+---------------------+
// <=prefix 1
// | length: 4 bytes | restart interval: 4 bytes | num-blocks: 4 bytes |
// +-----------------+---------------------------+---------------------+
// <=prefix 2
// | length: 4 bytes | restart interval: 4 bytes | num-blocks: 4 bytes |
// +-----------------+---------------------------+---------------------+
// | |
// | .... |
// | |
// +-----------------+---------------------------+---------------------+
// <=prefix n
// | length: 4 bytes | restart interval: 4 bytes | num-blocks: 4 bytes |
// +-----------------+---------------------------+---------------------+
//
// The reason of separating these two metablocks is to enable the efficiently
// reuse the first metablock during hash index construction without unnecessary
// data copy or small heap allocations for prefixes.
class HashIndexBuilder : public IndexBuilder {
public:
HashIndexBuilder(const InternalKeyComparator* comparator,
const SliceTransform* hash_key_extractor,
int index_block_restart_interval, int format_version,
bool use_value_delta_encoding,
BlockBasedTableOptions::IndexShorteningMode shortening_mode,
size_t ts_sz, const bool persist_user_defined_timestamps)
: IndexBuilder(comparator, ts_sz, persist_user_defined_timestamps),
primary_index_builder_(comparator, index_block_restart_interval,
format_version, use_value_delta_encoding,
shortening_mode, /* include_first_key */ false,
ts_sz, persist_user_defined_timestamps),
hash_key_extractor_(hash_key_extractor) {}
Slice AddIndexEntry(const Slice& last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle,
std::string* separator_scratch,
bool skip_delta_encoding) override {
++current_restart_index_;
return primary_index_builder_.AddIndexEntry(
last_key_in_current_block, first_key_in_next_block, block_handle,
separator_scratch, skip_delta_encoding);
}
std::unique_ptr<PreparedIndexEntry> CreatePreparedIndexEntry() override {
return primary_index_builder_.CreatePreparedIndexEntry();
}
void PrepareIndexEntry(const Slice& last_key_in_current_block,
const Slice* first_key_in_next_block,
PreparedIndexEntry* out) override {
++current_restart_index_;
primary_index_builder_.PrepareIndexEntry(last_key_in_current_block,
first_key_in_next_block, out);
}
void FinishIndexEntry(const BlockHandle& block_handle,
PreparedIndexEntry* entry,
bool skip_delta_encoding) override {
primary_index_builder_.FinishIndexEntry(block_handle, entry,
skip_delta_encoding);
}
void OnKeyAdded(const Slice& key,
const std::optional<Slice>& /*value*/) override {
auto key_prefix = hash_key_extractor_->Transform(key);
bool is_first_entry = pending_block_num_ == 0;
// Keys may share the prefix
if (is_first_entry || pending_entry_prefix_ != key_prefix) {
if (!is_first_entry) {
FlushPendingPrefix();
}
// need a hard copy otherwise the underlying data changes all the time.
// TODO(kailiu) std::to_string() is expensive. We may speed up can avoid
// data copy.
pending_entry_prefix_ = key_prefix.ToString();
pending_block_num_ = 1;
pending_entry_index_ = static_cast<uint32_t>(current_restart_index_);
} else {
// entry number increments when keys share the prefix reside in
// different data blocks.
auto last_restart_index = pending_entry_index_ + pending_block_num_ - 1;
assert(last_restart_index <= current_restart_index_);
if (last_restart_index != current_restart_index_) {
++pending_block_num_;
}
}
}
Status Finish(IndexBlocks* index_blocks,
const BlockHandle& last_partition_block_handle) override {
if (pending_block_num_ != 0) {
FlushPendingPrefix();
}
Status s = primary_index_builder_.Finish(index_blocks,
last_partition_block_handle);
index_blocks->meta_blocks.insert(
{kHashIndexPrefixesBlock.c_str(), {BlockType::kIndex, prefix_block_}});
index_blocks->meta_blocks.insert({kHashIndexPrefixesMetadataBlock.c_str(),
{BlockType::kIndex, prefix_meta_block_}});
return s;
}
size_t IndexSize() const override {
return primary_index_builder_.IndexSize() + prefix_block_.size() +
prefix_meta_block_.size();
}
uint64_t CurrentIndexSizeEstimate() const override { return 0; }
bool separator_is_key_plus_seq() override {
return primary_index_builder_.separator_is_key_plus_seq();
}
private:
void FlushPendingPrefix() {
prefix_block_.append(pending_entry_prefix_.data(),
pending_entry_prefix_.size());
PutVarint32(&prefix_meta_block_,
static_cast<uint32_t>(pending_entry_prefix_.size()),
pending_entry_index_, pending_block_num_);
}
ShortenedIndexBuilder primary_index_builder_;
const SliceTransform* hash_key_extractor_;
// stores a sequence of prefixes
std::string prefix_block_;
// stores the metadata of prefixes
std::string prefix_meta_block_;
// The following 3 variables keeps unflushed prefix and its metadata.
// The details of block_num and entry_index can be found in
// "block_hash_index.{h,cc}"
uint32_t pending_block_num_ = 0;
uint32_t pending_entry_index_ = 0;
std::string pending_entry_prefix_;
uint64_t current_restart_index_ = 0;
};
/**
* IndexBuilder for two-level indexing. Internally it creates a new index for
* each partition and Finish then in order when Finish is called on it
* continiously until Status::OK() is returned.
*
* The format on the disk would be I I I I I I IP where I is block containing a
* partition of indexes built using ShortenedIndexBuilder and IP is a block
* containing a secondary index on the partitions, built using
* ShortenedIndexBuilder.
*/
class PartitionedIndexBuilder : public IndexBuilder {
public:
static PartitionedIndexBuilder* CreateIndexBuilder(
const InternalKeyComparator* comparator, bool use_value_delta_encoding,
const BlockBasedTableOptions& table_opt, size_t ts_sz,
bool persist_user_defined_timestamps);
PartitionedIndexBuilder(const InternalKeyComparator* comparator,
const BlockBasedTableOptions& table_opt,
bool use_value_delta_encoding, size_t ts_sz,
bool persist_user_defined_timestamps);
Slice AddIndexEntry(const Slice& last_key_in_current_block,
const Slice* first_key_in_next_block,
const BlockHandle& block_handle,
std::string* separator_scratch,
bool skip_delta_encoding) override;
std::unique_ptr<PreparedIndexEntry> CreatePreparedIndexEntry() override;
void PrepareIndexEntry(const Slice& last_key_in_current_block,
const Slice* first_key_in_next_block,
PreparedIndexEntry* out) override;
void FinishIndexEntry(const BlockHandle& block_handle,
PreparedIndexEntry* entry,
bool skip_delta_encoding) override;
void MaybeFlush(const Slice& index_key, const BlockHandle& index_value);
Status Finish(IndexBlocks* index_blocks,
const BlockHandle& last_partition_block_handle) override;
size_t IndexSize() const override { return index_size_; }
size_t TopLevelIndexSize(uint64_t) const { return top_level_index_size_; }
size_t NumPartitions() const;
// Returns a cached estimate of the current index size. This
// estimate is updated when data blocks are added.
uint64_t CurrentIndexSizeEstimate() const override {
return estimated_index_size_.LoadRelaxed();
}
inline bool ShouldCutFilterBlock() {
// Current policy is to align the partitions of index and filters
if (cut_filter_block) {
cut_filter_block = false;
return true;
}
return false;
}
const std::string& GetPartitionKey() {
static const std::string kEmptyKey;
return entries_.empty() ? kEmptyKey : entries_.back().key;
}
// Called when an external entity (such as filter partition builder) request
// cutting the next partition
void RequestPartitionCut();
// This function must be thread safe because multiple worker threads might
// update the index builder state during parallel compression.
bool separator_is_key_plus_seq() override {
return must_use_separator_with_seq_.LoadRelaxed();
}
bool get_use_value_delta_encoding() const {
return use_value_delta_encoding_;
}
private:
// Set after ::Finish is called
size_t top_level_index_size_ = 0;
// Set after ::Finish is called
size_t partition_cnt_ = 0;
void MakeNewSubIndexBuilder();
void UpdateIndexSizeEstimate() override;
struct Entry {
std::string key;
std::unique_ptr<ShortenedIndexBuilder> value;
};
// List of partitioned indexes and their keys. Note that when
// sub_index_builder_ is not null (during construction), there
// will be a placeholder entry at the back of this list tracking
// the possible key for that next entry.
std::list<Entry> entries_;
BlockBuilder index_block_builder_; // top-level index builder
BlockBuilder index_block_builder_without_seq_; // same for user keys
// the active partition index builder (owned by an Entry in entries_)
ShortenedIndexBuilder* sub_index_builder_;
// the last key in the active partition index builder
std::unique_ptr<RetargetableFlushBlockPolicy> flush_policy_;
// true if Finish is called once but not complete yet.
bool finishing_indexes_ = false;
const BlockBasedTableOptions& table_opt_;
RelaxedAtomic<bool> must_use_separator_with_seq_;
bool use_value_delta_encoding_;
// true if an external entity (such as filter partition builder) request
// cutting the next partition
bool partition_cut_requested_ = true;
// true if it should cut the next filter partition block
bool cut_filter_block = false;
BlockHandle last_encoded_handle_;
// Cached estimate of current index size, updated when data blocks are added
RelaxedAtomic<uint64_t> estimated_index_size_{0};
// Running estimate of completed partitions total size
RelaxedAtomic<uint64_t> estimated_completed_partitions_size_{0};
};
} // namespace ROCKSDB_NAMESPACE