#include "table/block_based/block_based_table_builder.h"
#include <atomic>
#include <cassert>
#include <cstdio>
#include <list>
#include <map>
#include <memory>
#include <numeric>
#include <string>
#include <unordered_map>
#include <utility>
#include "block_cache.h"
#include "cache/cache_entry_roles.h"
#include "cache/cache_helpers.h"
#include "cache/cache_key.h"
#include "cache/cache_reservation_manager.h"
#include "db/dbformat.h"
#include "index_builder.h"
#include "logging/logging.h"
#include "memory/memory_allocator_impl.h"
#include "options/options_helper.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/table.h"
#include "rocksdb/types.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/block_builder.h"
#include "table/block_based/filter_block.h"
#include "table/block_based/filter_policy_internal.h"
#include "table/block_based/full_filter_block.h"
#include "table/block_based/partitioned_filter_block.h"
#include "table/block_based/user_defined_index_wrapper.h"
#include "table/format.h"
#include "table/meta_blocks.h"
#include "table/table_builder.h"
#include "util/bit_fields.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/defer.h"
#include "util/semaphore.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
extern const std::string kHashIndexPrefixesBlock;
extern const std::string kHashIndexPrefixesMetadataBlock;
namespace {
constexpr size_t kBlockTrailerSize = BlockBasedTable::kBlockTrailerSize;
FilterBlockBuilder* CreateFilterBlockBuilder(
const ImmutableCFOptions& , const MutableCFOptions& mopt,
const FilterBuildingContext& context,
const bool use_delta_encoding_for_index_values,
PartitionedIndexBuilder* const p_index_builder, size_t ts_sz,
const bool persist_user_defined_timestamps) {
const BlockBasedTableOptions& table_opt = context.table_options;
assert(table_opt.filter_policy);
FilterBitsBuilder* filter_bits_builder =
BloomFilterPolicy::GetBuilderFromContext(context);
if (filter_bits_builder == nullptr) {
return nullptr;
} else {
if (table_opt.partition_filters) {
assert(p_index_builder != nullptr);
assert(table_opt.block_size_deviation <= 100);
auto partition_size =
static_cast<uint32_t>(((table_opt.metadata_block_size *
(100 - table_opt.block_size_deviation)) +
99) /
100);
partition_size = std::max(partition_size, static_cast<uint32_t>(1));
return new PartitionedFilterBlockBuilder(
mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
filter_bits_builder, table_opt.index_block_restart_interval,
use_delta_encoding_for_index_values, p_index_builder, partition_size,
ts_sz, persist_user_defined_timestamps,
table_opt.decouple_partitioned_filters);
} else {
return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
table_opt.whole_key_filtering,
filter_bits_builder);
}
}
}
Compressor* MaybeCloneSpecialized(
Compressor* compressor, CacheEntryRole block_type,
Compressor::DictConfigArgs&& dict_config = Compressor::DictDisabled{}) {
auto specialized =
compressor->MaybeCloneSpecialized(block_type, std::move(dict_config));
if (specialized) {
return specialized.release();
} else {
return compressor;
}
}
}
const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
: public InternalTblPropColl {
public:
explicit BlockBasedTablePropertiesCollector(
BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
bool prefix_filtering, bool decoupled_partitioned_filters)
: index_type_(index_type),
whole_key_filtering_(whole_key_filtering),
prefix_filtering_(prefix_filtering),
decoupled_partitioned_filters_(decoupled_partitioned_filters) {}
Status InternalAdd(const Slice& , const Slice& ,
uint64_t ) override {
return Status::OK();
}
void BlockAdd(uint64_t ,
uint64_t ,
uint64_t ) override {
}
Status Finish(UserCollectedProperties* properties) override {
std::string val;
PutFixed32(&val, static_cast<uint32_t>(index_type_));
properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
whole_key_filtering_ ? kPropTrue : kPropFalse});
properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
prefix_filtering_ ? kPropTrue : kPropFalse});
if (decoupled_partitioned_filters_) {
properties->insert(
{BlockBasedTablePropertyNames::kDecoupledPartitionedFilters,
kPropTrue});
}
return Status::OK();
}
const char* Name() const override {
return "BlockBasedTablePropertiesCollector";
}
UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties();
}
private:
BlockBasedTableOptions::IndexType index_type_;
bool whole_key_filtering_;
bool prefix_filtering_;
bool decoupled_partitioned_filters_;
};
struct BlockBasedTableBuilder::WorkingAreaPair {
Compressor::ManagedWorkingArea compress;
Decompressor::ManagedWorkingArea verify;
};
struct BlockBasedTableBuilder::ParallelCompressionRep {
enum class ThreadKind {
kEmitter,
kWorker,
};
enum class ThreadState {
kEmitting,
kIdle,
kCompressing,
kEnd,
kCompressingAndWriting,
kWriting,
};
struct ALIGN_AS(CACHE_LINE_SIZE) BlockRep {
std::string uncompressed;
GrowableBuffer compressed;
CompressionType compression_type = kNoCompression;
std::unique_ptr<IndexBuilder::PreparedIndexEntry> prepared_index_entry;
};
std::unique_ptr<BlockRep[]> ring_buffer;
const int ring_buffer_nbits;
const uint32_t ring_buffer_mask;
const uint32_t num_worker_threads;
RelaxedAtomic<uint64_t> estimated_inflight_size{0};
std::vector<port::Thread> worker_threads;
std::vector<WorkingAreaPair> working_areas;
CountingSemaphore idle_worker_sem{0};
BinarySemaphore idle_emit_sem{0};
struct State : public BitFields<uint64_t, State> {};
ALIGN_AS(CACHE_LINE_SIZE) BitFieldsAtomic<State> atomic_state;
using NeedsWriter = UnsignedBitField<State, 32, NoPrevBitField>;
using IdleWorkerCount = UnsignedBitField<State, 5, NeedsWriter>;
using IdleEmitFlag = BoolBitField<State, IdleWorkerCount>;
using NoMoreToEmitFlag = BoolBitField<State, IdleEmitFlag>;
using AbortFlag = BoolBitField<State, NoMoreToEmitFlag>;
using NextToWrite = UnsignedBitField<State, 8, AbortFlag>;
using NextToCompress = UnsignedBitField<State, 8, NextToWrite>;
using NextToEmit = UnsignedBitField<State, 8, NextToCompress>;
static_assert(NextToEmit::kEndBit == 64);
ALIGN_AS(CACHE_LINE_SIZE)
ThreadState emit_thread_state = ThreadState::kEmitting;
uint32_t emit_slot = 0;
int32_t emit_counter_toward_wake_up = 0;
int32_t emit_counter_for_wake_up = 0;
static constexpr int32_t kMaxWakeupInterval = 8;
#if !defined(NDEBUG) && !(defined(__GNUC__) && defined(__SANITIZE_THREAD__))
#define BBTB_PC_WATCHDOG 1
#endif
#ifdef BBTB_PC_WATCHDOG
std::thread watchdog_thread;
std::mutex watchdog_mutex;
std::condition_variable watchdog_cv;
bool shutdown_watchdog = false;
RelaxedAtomic<uint32_t> live_workers{0};
RelaxedAtomic<uint32_t> idling_workers{0};
RelaxedAtomic<bool> live_emit{0};
RelaxedAtomic<bool> idling_emit{0};
#endif
int ComputeRingBufferNbits(uint32_t parallel_threads) {
if (parallel_threads >= 9) {
return 5;
} else if (parallel_threads >= 5) {
return 4;
} else if (parallel_threads >= 3) {
return 3;
} else {
assert(parallel_threads > 1);
return 2;
}
}
explicit ParallelCompressionRep(uint32_t parallel_threads)
: ring_buffer_nbits(ComputeRingBufferNbits(parallel_threads)),
ring_buffer_mask((uint32_t{1} << ring_buffer_nbits) - 1),
num_worker_threads(std::min(parallel_threads, ring_buffer_mask)) {
assert(num_worker_threads <= IdleWorkerCount::kMask);
ring_buffer = std::make_unique<BlockRep[]>(ring_buffer_mask + 1);
emit_counter_for_wake_up = -static_cast<int32_t>(num_worker_threads);
}
~ParallelCompressionRep() {
#ifndef NDEBUG
auto state = atomic_state.Load();
if (state.Get<AbortFlag>() == false) {
assert(state.Get<NeedsWriter>() == 0);
assert(state.Get<NextToWrite>() == state.Get<NextToCompress>());
assert(state.Get<NextToCompress>() == state.Get<NextToEmit>());
assert(estimated_inflight_size.LoadRelaxed() == 0);
}
assert(state.Get<IdleWorkerCount>() == 0);
assert(state.Get<IdleEmitFlag>() == false);
assert(!idle_emit_sem.TryAcquire());
assert(!idle_worker_sem.TryAcquire());
#endif }
template <ThreadKind thread_kind>
void StateTransition(
ThreadState& thread_state,
uint32_t& slot) {
assert(slot <= ring_buffer_mask);
State seen_state = atomic_state.Load();
for (;;) {
if (seen_state.Get<AbortFlag>()) {
thread_state = ThreadState::kEnd;
return;
}
assert(static_cast<uint8_t>(seen_state.Get<NextToEmit>() -
seen_state.Get<NextToCompress>()) <=
ring_buffer_mask + 1);
assert(static_cast<uint8_t>(seen_state.Get<NextToCompress>() -
seen_state.Get<NextToWrite>()) <=
ring_buffer_mask + 1);
assert(static_cast<uint8_t>(seen_state.Get<NextToEmit>() -
seen_state.Get<NextToWrite>()) <=
ring_buffer_mask + 1);
State next_state = seen_state;
bool wake_idle = false;
switch (thread_state) {
case ThreadState::kEmitting: {
assert(thread_kind == ThreadKind::kEmitter);
assert(slot == (next_state.Get<NextToEmit>() & ring_buffer_mask));
next_state.Ref<NextToEmit>() += 1;
if (next_state.Get<IdleWorkerCount>() > 0 &&
static_cast<uint8_t>(next_state.Get<NextToEmit>() -
next_state.Get<NextToCompress>()) >=
(ring_buffer_mask + 1) / 4 +
(num_worker_threads -
next_state.Get<IdleWorkerCount>())) {
if (emit_counter_toward_wake_up >= emit_counter_for_wake_up) {
wake_idle = true;
next_state.Ref<IdleWorkerCount>() -= 1;
emit_counter_toward_wake_up = 0;
emit_counter_for_wake_up =
std::min(emit_counter_for_wake_up + 1,
static_cast<int32_t>(num_worker_threads +
kMaxWakeupInterval));
} else {
emit_counter_toward_wake_up++;
}
}
break;
}
case ThreadState::kIdle:
break;
case ThreadState::kCompressing:
next_state.Ref<NeedsWriter>() |= uint32_t{1} << slot;
if constexpr (thread_kind == ThreadKind::kEmitter) {
if (next_state.Get<IdleWorkerCount>() == num_worker_threads) {
wake_idle = true;
next_state.Ref<IdleWorkerCount>() -= 1;
}
}
break;
case ThreadState::kEnd:
assert(thread_state != ThreadState::kEnd);
return;
case ThreadState::kCompressingAndWriting:
case ThreadState::kWriting:
assert(thread_kind == ThreadKind::kWorker);
assert((next_state.Get<NextToWrite>() & ring_buffer_mask) == slot);
assert(next_state.Get<NextToCompress>() !=
next_state.Get<NextToWrite>());
assert(next_state.Get<NextToEmit>() != next_state.Get<NextToWrite>());
assert((next_state.Get<NeedsWriter>() & (uint32_t{1} << slot)) == 0);
next_state.Ref<NextToWrite>() += 1;
if (next_state.Get<IdleEmitFlag>()) {
wake_idle = true;
next_state.Set<IdleEmitFlag>(false);
}
break;
}
ThreadState next_thread_state = ThreadState::kEnd;
uint32_t next_slot = 0;
if constexpr (thread_kind == ThreadKind::kEmitter) {
if (static_cast<uint8_t>(next_state.Get<NextToEmit>() -
next_state.Get<NextToWrite>()) <=
ring_buffer_mask) {
next_thread_state = ThreadState::kEmitting;
next_slot = next_state.Get<NextToEmit>() & ring_buffer_mask;
}
}
if constexpr (thread_kind == ThreadKind::kWorker) {
uint32_t next_to_write_slot =
next_state.Get<NextToWrite>() & ring_buffer_mask;
uint32_t needs_writer_bit = uint32_t{1} << next_to_write_slot;
if (next_state.Get<NeedsWriter>() & needs_writer_bit) {
next_state.Ref<NeedsWriter>() &= ~needs_writer_bit;
next_thread_state = ThreadState::kWriting;
next_slot = next_to_write_slot;
}
}
if (next_thread_state == ThreadState::kEnd) {
if (next_state.Get<NextToCompress>() != next_state.Get<NextToEmit>()) {
if (thread_kind == ThreadKind::kWorker &&
next_state.Get<NextToCompress>() ==
next_state.Get<NextToWrite>()) {
next_thread_state = ThreadState::kCompressingAndWriting;
} else {
next_thread_state = ThreadState::kCompressing;
}
next_slot = next_state.Get<NextToCompress>() & ring_buffer_mask;
next_state.Ref<NextToCompress>() += 1;
} else if constexpr (thread_kind == ThreadKind::kEmitter) {
next_thread_state = ThreadState::kIdle;
assert(next_state.Get<IdleEmitFlag>() == false);
assert(next_state.Get<NoMoreToEmitFlag>() == false);
next_state.Set<IdleEmitFlag>(true);
} else if (next_state.Get<NoMoreToEmitFlag>()) {
next_thread_state = ThreadState::kEnd;
} else {
next_thread_state = ThreadState::kIdle;
assert(next_state.Get<IdleWorkerCount>() < IdleWorkerCount::kMask);
next_state.Ref<IdleWorkerCount>() += 1;
}
}
assert(thread_state != ThreadState::kEnd);
if (atomic_state.CasWeak(seen_state, next_state)) {
thread_state = next_thread_state;
slot = next_slot;
seen_state = next_state;
if (wake_idle) {
if constexpr (thread_kind == ThreadKind::kEmitter) {
idle_worker_sem.Release();
} else {
idle_emit_sem.Release();
}
}
if (thread_state != ThreadState::kIdle) {
return;
}
if constexpr (thread_kind == ThreadKind::kEmitter) {
#ifdef BBTB_PC_WATCHDOG
idling_emit.StoreRelaxed(true);
Defer decr{[this]() { idling_emit.StoreRelaxed(false); }};
#endif
idle_emit_sem.Acquire();
} else {
#ifdef BBTB_PC_WATCHDOG
idling_workers.FetchAddRelaxed(1);
Defer decr{[this]() { idling_workers.FetchSubRelaxed(1); }};
#endif
idle_worker_sem.Acquire();
}
seen_state = atomic_state.Load();
}
}
}
void EmitterStateTransition(
ThreadState& thread_state,
uint32_t& slot) {
StateTransition<ThreadKind::kEmitter>(thread_state, slot);
}
void WorkerStateTransition(
ThreadState& thread_state,
uint32_t& slot) {
StateTransition<ThreadKind::kWorker>(thread_state, slot);
}
void WakeAllIdle() {
State old_state, new_state;
auto transform =
IdleEmitFlag::ClearTransform() + IdleWorkerCount::ClearTransform();
atomic_state.Apply(transform, &old_state, &new_state);
assert(new_state.Get<IdleEmitFlag>() == false);
assert(new_state.Get<IdleWorkerCount>() == 0);
if (old_state.Get<IdleEmitFlag>()) {
idle_emit_sem.Release();
}
idle_worker_sem.Release(old_state.Get<IdleWorkerCount>());
}
void SetNoMoreToEmit( ThreadState& thread_state,
uint32_t& slot) {
(void)slot;
State old_state;
atomic_state.Apply(NoMoreToEmitFlag::SetTransform(), &old_state);
assert(old_state.Get<NoMoreToEmitFlag>() == false);
assert(slot == BitwiseAnd(old_state.Get<NextToEmit>(), ring_buffer_mask));
assert(thread_state == ThreadState::kEmitting);
thread_state = ThreadState::kEnd;
WakeAllIdle();
}
void SetAbort( ThreadState& thread_state) {
State old_state;
atomic_state.Apply(AbortFlag::SetTransform(), &old_state);
if (old_state.Get<AbortFlag>() == false) {
WakeAllIdle();
}
thread_state = ThreadState::kEnd;
}
#ifdef BBTB_PC_WATCHDOG
void BGWatchdog() {
int count_toward_deadlock_judgment = 0;
for (;;) {
if (live_workers.LoadRelaxed() == 0 && live_emit.LoadRelaxed() == false) {
return;
}
if (idling_workers.LoadRelaxed() < live_workers.LoadRelaxed() ||
(live_emit.LoadRelaxed() && !idling_emit.LoadRelaxed())) {
count_toward_deadlock_judgment = 0;
} else {
count_toward_deadlock_judgment++;
if (count_toward_deadlock_judgment >= 70) {
fprintf(stderr,
"Error: apparent deadlock in parallel compression. "
"Aborting. %u / %u, %d / %d, %llx\n",
(unsigned)idling_workers.LoadRelaxed(),
(unsigned)live_workers.LoadRelaxed(),
(int)idling_emit.LoadRelaxed(), (int)live_emit.LoadRelaxed(),
(long long)atomic_state.Load().underlying);
std::terminate();
}
}
std::unique_lock<std::mutex> lock(watchdog_mutex);
if (!shutdown_watchdog) {
watchdog_cv.wait_for(lock, std::chrono::seconds{1});
}
}
}
#endif };
struct BlockBasedTableBuilder::Rep {
const ImmutableOptions ioptions;
std::shared_ptr<const SliceTransform> prefix_extractor;
const WriteOptions write_options;
const BlockBasedTableOptions table_options;
const InternalKeyComparator& internal_comparator;
size_t ts_sz;
bool persist_user_defined_timestamps;
WritableFileWriter* file;
RelaxedAtomic<uint64_t> offset{0};
size_t alignment;
BlockBuilder data_block;
std::vector<std::string> data_block_buffers;
BlockBuilder range_del_block;
InternalKeySliceTransform internal_prefix_transform;
std::unique_ptr<IndexBuilder> index_builder;
std::string index_separator_scratch;
PartitionedIndexBuilder* p_index_builder_ = nullptr;
std::string last_ikey; bool warm_cache = false;
bool uses_explicit_compression_manager = false;
uint64_t sample_for_compression;
RelaxedAtomic<uint64_t> compressible_input_data_bytes{0};
RelaxedAtomic<uint64_t> uncompressible_input_data_bytes{0};
RelaxedAtomic<uint64_t> sampled_input_data_bytes{0};
RelaxedAtomic<uint64_t> sampled_output_slow_data_bytes{0};
RelaxedAtomic<uint64_t> sampled_output_fast_data_bytes{0};
uint32_t compression_parallel_threads;
int max_compressed_bytes_per_kb;
Compressor::DictConfig data_block_dict_guidance;
std::unique_ptr<Compressor> basic_compressor;
std::unique_ptr<Compressor> fast_sample_compressor;
std::unique_ptr<Compressor> slow_sample_compressor;
UnownedPtr<Compressor> data_block_compressor = nullptr;
UnownedPtr<Compressor> index_block_compressor = nullptr;
std::shared_ptr<Decompressor> basic_decompressor;
std::unique_ptr<Decompressor> verify_decompressor_with_dict;
UnownedPtr<Decompressor> verify_decompressor;
UnownedPtr<Decompressor> data_block_verify_decompressor;
SmallEnumSet<CompressionType, kDisableCompressionOption>
compression_types_used;
WorkingAreaPair index_block_working_area;
WorkingAreaPair data_block_working_area;
size_t data_begin_offset = 0;
TableProperties props;
enum class State {
kBuffered,
kUnbuffered,
kClosed,
};
State state = State::kUnbuffered;
uint64_t buffer_limit = 0;
std::shared_ptr<CacheReservationManager>
compression_dict_buffer_cache_res_mgr;
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
OffsetableCacheKey base_cache_key;
const TableFileCreationReason reason;
const bool target_file_size_is_upper_bound;
BlockHandle pending_handle;
GrowableBuffer single_threaded_compressed_output;
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
std::vector<std::unique_ptr<InternalTblPropColl>> table_properties_collectors;
std::unique_ptr<ParallelCompressionRep> pc_rep;
RelaxedAtomic<uint64_t> worker_cpu_micros{0};
BlockCreateContext create_context;
uint64_t tail_size;
uint64_t pre_compression_size = 0;
uint32_t base_context_checksum;
uint64_t get_offset() { return offset.LoadRelaxed(); }
void set_offset(uint64_t o) { offset.StoreRelaxed(o); }
bool IsParallelCompressionActive() const { return pc_rep != nullptr; }
Status GetStatus() { return GetIOStatus(); }
bool StatusOk() {
bool ok = io_status_ok.LoadRelaxed();
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
if (ok) {
std::lock_guard<std::mutex> lock(io_status_mutex);
if (io_status_ok.LoadRelaxed()) {
io_status.PermitUncheckedError();
assert(io_status.ok());
} else {
ok = false;
}
}
#endif return ok;
}
IOStatus GetIOStatus() {
if (LIKELY(io_status_ok.LoadRelaxed())) {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
std::lock_guard<std::mutex> lock(io_status_mutex);
if (io_status_ok.LoadRelaxed()) {
io_status.PermitUncheckedError();
assert(io_status.ok());
} else {
return io_status;
}
#endif return IOStatus::OK();
} else {
std::lock_guard<std::mutex> lock(io_status_mutex);
return io_status;
}
}
void SetStatus(Status&& s) {
if (UNLIKELY(!s.ok()) && io_status_ok.LoadRelaxed()) {
SetFailedIOStatus(status_to_io_status(std::move(s)));
}
}
void SetStatus(const Status& s) {
if (UNLIKELY(!s.ok()) && io_status_ok.LoadRelaxed()) {
SetFailedIOStatus(status_to_io_status(Status(s)));
}
}
void SetIOStatus(IOStatus&& ios) {
if (UNLIKELY(!ios.ok()) && io_status_ok.LoadRelaxed()) {
SetFailedIOStatus(std::move(ios));
}
}
void SetIOStatus(const IOStatus& ios) {
if (UNLIKELY(!ios.ok()) && io_status_ok.LoadRelaxed()) {
SetFailedIOStatus(IOStatus(ios));
}
}
void SetFailedIOStatus(IOStatus&& ios) {
assert(!ios.ok());
std::lock_guard<std::mutex> lock(io_status_mutex);
if (io_status.ok()) {
io_status = std::move(ios);
io_status_ok.StoreRelaxed(false);
}
}
Rep(const BlockBasedTableOptions& table_opt, const TableBuilderOptions& tbo,
WritableFileWriter* f)
: ioptions(tbo.ioptions),
prefix_extractor(tbo.moptions.prefix_extractor),
write_options(tbo.write_options),
table_options(table_opt),
internal_comparator(tbo.internal_comparator),
ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()),
persist_user_defined_timestamps(
tbo.ioptions.persist_user_defined_timestamps),
file(f),
alignment(table_options.block_align
? std::min(static_cast<size_t>(table_options.block_size),
kDefaultPageSize)
: 0),
data_block(table_options.block_restart_interval,
table_options.use_delta_encoding,
false ,
tbo.internal_comparator.user_comparator()
->CanKeysWithDifferentByteContentsBeEqual()
? BlockBasedTableOptions::kDataBlockBinarySearch
: table_options.data_block_index_type,
table_options.data_block_hash_table_util_ratio, ts_sz,
persist_user_defined_timestamps, false ,
table_options.separate_key_value_in_data_block),
range_del_block(
1 , true ,
false ,
BlockBasedTableOptions::kDataBlockBinarySearch ,
0.75 , ts_sz,
persist_user_defined_timestamps, false ,
false ),
internal_prefix_transform(prefix_extractor.get()),
sample_for_compression(tbo.moptions.sample_for_compression),
compression_parallel_threads(
((table_opt.partition_filters &&
!table_opt.decouple_partitioned_filters) ||
table_options.user_defined_index_factory)
? uint32_t{1}
: tbo.compression_opts.parallel_threads),
max_compressed_bytes_per_kb(
tbo.compression_opts.max_compressed_bytes_per_kb),
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
!table_opt.block_align),
reason(tbo.reason),
target_file_size_is_upper_bound(
tbo.moptions.target_file_size_is_upper_bound),
flush_block_policy(
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
table_options, data_block)),
create_context(&table_options, &ioptions, ioptions.stats,
nullptr,
tbo.moptions.block_protection_bytes_per_key,
tbo.internal_comparator.user_comparator(),
!use_delta_encoding_for_index_values,
table_opt.index_type ==
BlockBasedTableOptions::kBinarySearchWithFirstKey,
table_options.block_restart_interval,
table_options.index_block_restart_interval),
tail_size(0) {
FilterBuildingContext filter_context(table_options);
filter_context.info_log = ioptions.logger;
filter_context.column_family_name = tbo.column_family_name;
filter_context.reason = reason;
if (reason != TableFileCreationReason::kMisc) {
filter_context.compaction_style = ioptions.compaction_style;
filter_context.num_levels = ioptions.num_levels;
filter_context.level_at_creation = tbo.level_at_creation;
filter_context.is_bottommost = tbo.is_bottommost;
assert(filter_context.level_at_creation < filter_context.num_levels);
}
props.compression_options =
CompressionOptionsToString(tbo.compression_opts);
auto* mgr = tbo.moptions.compression_manager.get();
if (mgr == nullptr) {
uses_explicit_compression_manager = false;
mgr = GetBuiltinV2CompressionManager().get();
} else {
uses_explicit_compression_manager = true;
props.compression_options.append("_compression_manager=");
props.compression_options.append(mgr->GetId());
props.compression_options.append("; ");
}
max_compressed_bytes_per_kb =
std::min(int{1023}, tbo.compression_opts.max_compressed_bytes_per_kb);
basic_compressor = mgr->GetCompressorForSST(
filter_context, tbo.compression_opts, tbo.compression_type);
if (basic_compressor) {
if (table_options.enable_index_compression) {
index_block_compressor = MaybeCloneSpecialized(
basic_compressor.get(), CacheEntryRole::kIndexBlock);
index_block_working_area.compress =
index_block_compressor->ObtainWorkingArea();
}
data_block_dict_guidance =
basic_compressor->GetDictGuidance(CacheEntryRole::kDataBlock);
if (auto* sampling =
std::get_if<Compressor::DictSampling>(&data_block_dict_guidance);
sampling != nullptr && sampling->max_sample_bytes > 0) {
state = State::kBuffered;
if (tbo.target_file_size == 0) {
buffer_limit = tbo.compression_opts.max_dict_buffer_bytes;
} else if (tbo.compression_opts.max_dict_buffer_bytes == 0) {
buffer_limit = tbo.target_file_size;
} else {
buffer_limit = std::min(tbo.target_file_size,
tbo.compression_opts.max_dict_buffer_bytes);
}
} else if (auto* predef = std::get_if<Compressor::DictPreDefined>(
&data_block_dict_guidance);
predef != nullptr && !predef->dict_data.empty()) {
data_block_compressor = MaybeCloneSpecialized(
basic_compressor.get(), CacheEntryRole::kDataBlock,
Compressor::DictPreDefined{std::string{predef->dict_data}});
data_block_working_area.compress =
data_block_compressor->ObtainWorkingArea();
} else {
assert(std::holds_alternative<Compressor::DictSampling>(
data_block_dict_guidance) ||
std::holds_alternative<Compressor::DictPreDefined>(
data_block_dict_guidance) ||
std::holds_alternative<Compressor::DictDisabled>(
data_block_dict_guidance));
data_block_compressor = MaybeCloneSpecialized(
basic_compressor.get(), CacheEntryRole::kDataBlock);
data_block_working_area.compress =
data_block_compressor->ObtainWorkingArea();
}
basic_decompressor = basic_compressor->GetOptimizedDecompressor();
if (basic_decompressor == nullptr) {
basic_decompressor = mgr->GetDecompressor();
}
create_context.decompressor = basic_decompressor.get();
if (table_options.verify_compression) {
verify_decompressor = basic_decompressor.get();
if (table_options.enable_index_compression) {
index_block_working_area.verify =
verify_decompressor->ObtainWorkingArea(
index_block_compressor->GetPreferredCompressionType());
}
if (state == State::kUnbuffered) {
assert(data_block_compressor);
data_block_verify_decompressor = verify_decompressor.get();
data_block_working_area.verify =
data_block_verify_decompressor->ObtainWorkingArea(
data_block_compressor->GetPreferredCompressionType());
}
}
}
if (sample_for_compression > 0) {
auto builtin = GetBuiltinV2CompressionManager();
if (builtin->SupportsCompressionType(kLZ4Compression)) {
fast_sample_compressor = builtin->GetCompressor({}, kLZ4Compression);
} else if (builtin->SupportsCompressionType(kSnappyCompression)) {
fast_sample_compressor = builtin->GetCompressor({}, kSnappyCompression);
}
if (builtin->SupportsCompressionType(kZSTD)) {
slow_sample_compressor = builtin->GetCompressor({}, kZSTD);
} else if (builtin->SupportsCompressionType(kZlibCompression)) {
slow_sample_compressor = builtin->GetCompressor({}, kZlibCompression);
}
}
switch (table_options.prepopulate_block_cache) {
case BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly:
warm_cache = (reason == TableFileCreationReason::kFlush);
break;
case BlockBasedTableOptions::PrepopulateBlockCache::kDisable:
warm_cache = false;
break;
default:
assert(false);
warm_cache = false;
}
const auto compress_dict_build_buffer_charged =
table_options.cache_usage_options.options_overrides
.at(CacheEntryRole::kCompressionDictionaryBuildingBuffer)
.charged;
if (table_options.block_cache &&
(compress_dict_build_buffer_charged ==
CacheEntryRoleOptions::Decision::kEnabled ||
compress_dict_build_buffer_charged ==
CacheEntryRoleOptions::Decision::kFallback)) {
compression_dict_buffer_cache_res_mgr =
std::make_shared<CacheReservationManagerImpl<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>>(
table_options.block_cache);
} else {
compression_dict_buffer_cache_res_mgr = nullptr;
}
if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
&internal_comparator, use_delta_encoding_for_index_values,
table_options, ts_sz, persist_user_defined_timestamps);
index_builder.reset(p_index_builder_);
} else {
index_builder.reset(IndexBuilder::CreateIndexBuilder(
table_options.index_type, &internal_comparator,
&this->internal_prefix_transform, use_delta_encoding_for_index_values,
table_options, ts_sz, persist_user_defined_timestamps));
}
if (table_options.user_defined_index_factory != nullptr) {
if (tbo.moptions.compression_opts.parallel_threads > 1 ||
tbo.moptions.bottommost_compression_opts.parallel_threads > 1) {
SetStatus(
Status::InvalidArgument("user_defined_index_factory not supported "
"with parallel compression"));
} else {
std::unique_ptr<UserDefinedIndexBuilder> user_defined_index_builder;
UserDefinedIndexOption udi_options;
udi_options.comparator = internal_comparator.user_comparator();
auto s = table_options.user_defined_index_factory->NewBuilder(
udi_options, user_defined_index_builder);
if (!s.ok()) {
SetStatus(s);
} else {
if (user_defined_index_builder != nullptr) {
index_builder = std::make_unique<UserDefinedIndexBuilderWrapper>(
std::string(table_options.user_defined_index_factory->Name()),
std::move(index_builder), std::move(user_defined_index_builder),
&internal_comparator, ts_sz, persist_user_defined_timestamps);
}
}
}
}
if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) {
filter_builder.reset();
} else if (!table_options.filter_policy) {
filter_builder.reset();
} else {
filter_builder.reset(CreateFilterBlockBuilder(
ioptions, tbo.moptions, filter_context,
use_delta_encoding_for_index_values, p_index_builder_, ts_sz,
persist_user_defined_timestamps));
}
assert(tbo.internal_tbl_prop_coll_factories);
for (auto& factory : *tbo.internal_tbl_prop_coll_factories) {
assert(factory);
std::unique_ptr<InternalTblPropColl> collector{
factory->CreateInternalTblPropColl(
tbo.column_family_id, tbo.level_at_creation,
tbo.ioptions.num_levels,
tbo.last_level_inclusive_max_seqno_threshold)};
if (collector) {
table_properties_collectors.emplace_back(std::move(collector));
}
}
table_properties_collectors.emplace_back(
std::make_unique<BlockBasedTablePropertiesCollector>(
table_options.index_type, table_options.whole_key_filtering,
prefix_extractor != nullptr,
table_options.decouple_partitioned_filters));
if (ts_sz > 0 && persist_user_defined_timestamps) {
table_properties_collectors.emplace_back(
std::make_unique<TimestampTablePropertiesCollector>(
tbo.internal_comparator.user_comparator()));
}
props.column_family_id = tbo.column_family_id;
props.column_family_name = tbo.column_family_name;
props.oldest_key_time = tbo.oldest_key_time;
props.newest_key_time = tbo.newest_key_time;
props.file_creation_time = tbo.file_creation_time;
props.orig_file_number = tbo.cur_file_num;
props.db_id = tbo.db_id;
props.db_session_id = tbo.db_session_id;
props.db_host_id = ioptions.db_host_id;
props.format_version = table_options.format_version;
props.data_block_restart_interval = table_options.block_restart_interval;
props.index_block_restart_interval =
table_options.index_block_restart_interval;
props.separate_key_value_in_data_block =
table_options.separate_key_value_in_data_block ? 1 : 0;
if (!ReifyDbHostIdProperty(ioptions.env, &props.db_host_id).ok()) {
ROCKS_LOG_INFO(ioptions.logger, "db_host_id property will not be set");
}
props.key_largest_seqno = 0;
props.key_smallest_seqno = UINT64_MAX;
PrePopulateCompressionProperties(mgr);
if (FormatVersionUsesContextChecksum(table_options.format_version)) {
do {
base_context_checksum = Random::GetTLSInstance()->Next();
} while (UNLIKELY(base_context_checksum == 0));
} else {
base_context_checksum = 0;
}
if (alignment > 0 && basic_compressor) {
SetStatus(Status::InvalidArgument(
"Enable block_align, but compression enabled"));
}
}
~Rep() {
index_block_working_area = {};
data_block_working_area = {};
assert(pc_rep == nullptr);
if (data_block_compressor.get() != basic_compressor.get()) {
delete data_block_compressor.get();
}
if (index_block_compressor.get() != basic_compressor.get()) {
delete index_block_compressor.get();
}
}
Rep(const Rep&) = delete;
Rep& operator=(const Rep&) = delete;
void PrePopulateCompressionProperties(UnownedPtr<CompressionManager> mgr) {
if (FormatVersionUsesCompressionManagerName(table_options.format_version)) {
assert(mgr);
props.compression_name.reserve(32);
if (basic_compressor) {
props.compression_name.append(mgr->CompatibilityName());
}
props.compression_name.push_back(';');
} else {
assert(
Slice(mgr->CompatibilityName())
.compare(GetBuiltinV2CompressionManager()->CompatibilityName()) ==
0);
}
}
void PostPopulateCompressionProperties() {
compression_types_used.Remove(kNoCompression);
size_t ctype_count = compression_types_used.count();
if (uses_explicit_compression_manager) {
std::string& compression_options = props.compression_options;
compression_options.append("_compressor=");
compression_options.append(data_block_compressor
? data_block_compressor->GetId()
: std::string{});
compression_options.append("; ");
} else {
assert(compression_types_used.count() <= 1);
}
std::string& compression_name = props.compression_name;
if (FormatVersionUsesCompressionManagerName(table_options.format_version)) {
assert(*compression_name.rbegin() == ';');
size_t pos = compression_name.size();
compression_name.append(ctype_count * 2, '\0');
char* ptr = compression_name.data() + pos;
for (CompressionType t : compression_types_used) {
PutBaseChars<16>(&ptr, 2, static_cast<unsigned char>(t),
true);
}
assert(ptr == compression_name.data() + pos + ctype_count * 2);
compression_name.push_back(';');
} else {
assert(compression_name.empty());
if (ctype_count == 0) {
if (data_block_compressor) {
compression_name = CompressionTypeToString(
data_block_compressor->GetPreferredCompressionType());
} else {
assert(basic_compressor == nullptr);
compression_name = CompressionTypeToString(kNoCompression);
}
} else if (compression_types_used.Contains(kZSTD)) {
compression_name = CompressionTypeToString(kZSTD);
} else {
compression_name =
CompressionTypeToString(*compression_types_used.begin());
}
}
}
private:
std::mutex io_status_mutex;
RelaxedAtomic<bool> io_status_ok{true};
IOStatus io_status;
};
BlockBasedTableBuilder::BlockBasedTableBuilder(
const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo,
WritableFileWriter* file) {
BlockBasedTableOptions sanitized_table_options(table_options);
auto ucmp = tbo.internal_comparator.user_comparator();
assert(ucmp);
(void)ucmp; rep_ = std::make_unique<Rep>(sanitized_table_options, tbo, file);
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::BlockBasedTableBuilder:PreSetupBaseCacheKey",
const_cast<TableProperties*>(&rep_->props));
BlockBasedTable::SetupBaseCacheKey(&rep_->props, tbo.db_session_id,
tbo.cur_file_num, &rep_->base_cache_key);
MaybeStartParallelCompression();
if (!rep_->IsParallelCompressionActive() && rep_->basic_compressor) {
rep_->single_threaded_compressed_output.ResetForSize(
table_options.block_size);
}
}
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
assert(rep_->state == Rep::State::kClosed);
}
void BlockBasedTableBuilder::Add(const Slice& ikey, const Slice& value) {
Rep* r = rep_.get();
assert(rep_->state != Rep::State::kClosed);
if (UNLIKELY(!ok())) {
return;
}
ValueType value_type;
SequenceNumber seq;
UnPackSequenceAndType(ExtractInternalKeyFooter(ikey), &seq, &value_type);
r->props.key_largest_seqno = std::max(r->props.key_largest_seqno, seq);
r->props.key_smallest_seqno = std::min(r->props.key_smallest_seqno, seq);
if (IsValueType(value_type)) {
#ifndef NDEBUG
if (r->props.num_entries > r->props.num_range_deletions) {
assert(r->internal_comparator.Compare(ikey, Slice(r->last_ikey)) > 0);
}
bool skip = false;
TEST_SYNC_POINT_CALLBACK("BlockBasedTableBuilder::Add::skip", (void*)&skip);
if (skip) {
return;
}
#endif
auto should_flush = r->flush_block_policy->Update(ikey, value);
if (should_flush) {
assert(!r->data_block.empty());
Flush(&ikey);
}
if (r->state == Rep::State::kUnbuffered) {
if (r->filter_builder != nullptr) {
r->filter_builder->AddWithPrevKey(
ExtractUserKeyAndStripTimestamp(ikey, r->ts_sz),
r->last_ikey.empty()
? Slice{}
: ExtractUserKeyAndStripTimestamp(r->last_ikey, r->ts_sz));
}
}
r->data_block.AddWithLastKey(ikey, value, r->last_ikey);
r->last_ikey.assign(ikey.data(), ikey.size());
assert(!r->last_ikey.empty());
if (r->state == Rep::State::kBuffered) {
} else {
r->index_builder->OnKeyAdded(ikey, value);
}
NotifyCollectTableCollectorsOnAdd(ikey, value, r->get_offset(),
r->table_properties_collectors,
r->ioptions.logger);
} else if (value_type == kTypeRangeDeletion) {
Slice persisted_end = value;
if (r->ts_sz > 0 && !r->persist_user_defined_timestamps) {
persisted_end = StripTimestampFromUserKey(value, r->ts_sz);
}
r->range_del_block.Add(ikey, persisted_end);
NotifyCollectTableCollectorsOnAdd(ikey, value, r->get_offset(),
r->table_properties_collectors,
r->ioptions.logger);
} else {
assert(false);
r->SetStatus(Status::InvalidArgument(
"BlockBasedBuilder::Add() received a key with invalid value type " +
std::to_string(static_cast<unsigned int>(value_type))));
return;
}
r->props.num_entries++;
r->props.raw_key_size += ikey.size();
if (!r->persist_user_defined_timestamps) {
r->props.raw_key_size -= r->ts_sz;
}
r->props.raw_value_size += value.size();
if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion ||
value_type == kTypeDeletionWithTimestamp) {
r->props.num_deletions++;
} else if (value_type == kTypeRangeDeletion) {
r->props.num_deletions++;
r->props.num_range_deletions++;
} else if (value_type == kTypeMerge) {
r->props.num_merge_operands++;
}
}
void BlockBasedTableBuilder::Flush(const Slice* first_key_in_next_block) {
Rep* r = rep_.get();
assert(rep_->state != Rep::State::kClosed);
if (UNLIKELY(!ok())) {
return;
}
if (r->data_block.empty()) {
return;
}
Slice uncompressed_block_data = r->data_block.Finish();
if (r->sample_for_compression > 0 &&
Random::GetTLSInstance()->OneIn(
static_cast<int>(r->sample_for_compression))) {
GrowableBuffer sampled_output;
sampled_output.ResetForSize(uncompressed_block_data.size());
size_t fast_size = uncompressed_block_data.size();
size_t slow_size = uncompressed_block_data.size();
if (r->fast_sample_compressor) {
CompressionType result_type = kNoCompression;
Status s = r->fast_sample_compressor->CompressBlock(
uncompressed_block_data, sampled_output.data(), &fast_size,
&result_type, nullptr);
if (!s.ok() || result_type == kNoCompression) {
fast_size = uncompressed_block_data.size();
}
}
if (r->slow_sample_compressor) {
CompressionType result_type = kNoCompression;
Status s = r->slow_sample_compressor->CompressBlock(
uncompressed_block_data, sampled_output.data(), &slow_size,
&result_type, nullptr);
if (!s.ok() || result_type == kNoCompression) {
slow_size = uncompressed_block_data.size();
}
}
r->sampled_input_data_bytes.FetchAddRelaxed(uncompressed_block_data.size());
r->sampled_output_slow_data_bytes.FetchAddRelaxed(slow_size);
r->sampled_output_fast_data_bytes.FetchAddRelaxed(fast_size);
NotifyCollectTableCollectorsOnBlockAdd(r->table_properties_collectors,
uncompressed_block_data.size(),
slow_size, fast_size);
} else {
NotifyCollectTableCollectorsOnBlockAdd(
r->table_properties_collectors, uncompressed_block_data.size(),
0 , 0 );
}
if (rep_->state == Rep::State::kBuffered) {
std::string uncompressed_block_holder;
uncompressed_block_holder.reserve(rep_->table_options.block_size);
r->data_block.SwapAndReset(uncompressed_block_holder);
assert(uncompressed_block_data.size() == uncompressed_block_holder.size());
rep_->data_block_buffers.emplace_back(std::move(uncompressed_block_holder));
rep_->data_begin_offset += uncompressed_block_data.size();
MaybeEnterUnbuffered(first_key_in_next_block);
} else {
++r->props.num_data_blocks;
if (r->filter_builder) {
r->filter_builder->OnDataBlockFinalized(r->props.num_data_blocks);
}
if (r->IsParallelCompressionActive()) {
EmitBlockForParallel(r->data_block.MutableBuffer(), r->last_ikey,
first_key_in_next_block);
} else {
EmitBlock(r->data_block.MutableBuffer(), r->last_ikey,
first_key_in_next_block);
}
r->data_block.Reset();
}
}
void BlockBasedTableBuilder::EmitBlockForParallel(
std::string& uncompressed, const Slice& last_key_in_current_block,
const Slice* first_key_in_next_block) {
Rep* r = rep_.get();
assert(r->state == Rep::State::kUnbuffered);
assert(uncompressed.size() > 0);
auto& pc_rep = *r->pc_rep;
assert(pc_rep.emit_thread_state ==
ParallelCompressionRep::ThreadState::kEmitting);
auto* block_rep = &pc_rep.ring_buffer[pc_rep.emit_slot];
pc_rep.estimated_inflight_size.FetchAddRelaxed(uncompressed.size() +
kBlockTrailerSize);
std::swap(uncompressed, block_rep->uncompressed);
r->index_builder->PrepareIndexEntry(last_key_in_current_block,
first_key_in_next_block,
block_rep->prepared_index_entry.get());
block_rep->compressed.Reset();
block_rep->compression_type = kNoCompression;
for (;;) {
pc_rep.EmitterStateTransition(pc_rep.emit_thread_state, pc_rep.emit_slot);
if (pc_rep.emit_thread_state ==
ParallelCompressionRep::ThreadState::kCompressing) {
block_rep = &pc_rep.ring_buffer[pc_rep.emit_slot];
Status s = CompressAndVerifyBlock(
block_rep->uncompressed, true,
r->data_block_working_area, &block_rep->compressed,
&block_rep->compression_type);
if (UNLIKELY(!s.ok())) {
r->SetStatus(s);
pc_rep.SetAbort(pc_rep.emit_thread_state);
break;
}
} else {
assert(pc_rep.emit_thread_state !=
ParallelCompressionRep::ThreadState::kCompressingAndWriting);
assert(pc_rep.emit_thread_state !=
ParallelCompressionRep::ThreadState::kWriting);
assert(pc_rep.emit_thread_state !=
ParallelCompressionRep::ThreadState::kIdle);
if (first_key_in_next_block == nullptr &&
pc_rep.emit_thread_state ==
ParallelCompressionRep::ThreadState::kEmitting) {
pc_rep.SetNoMoreToEmit(pc_rep.emit_thread_state, pc_rep.emit_slot);
}
break;
}
}
}
void BlockBasedTableBuilder::EmitBlock(std::string& uncompressed,
const Slice& last_key_in_current_block,
const Slice* first_key_in_next_block) {
Rep* r = rep_.get();
assert(r->state == Rep::State::kUnbuffered);
assert(!r->IsParallelCompressionActive());
assert(uncompressed.size() > 0);
bool skip_delta_encoding = false;
WriteBlock(uncompressed, &r->pending_handle, BlockType::kData,
&skip_delta_encoding);
if (LIKELY(ok())) {
r->index_builder->AddIndexEntry(
last_key_in_current_block, first_key_in_next_block, r->pending_handle,
&r->index_separator_scratch, skip_delta_encoding);
}
}
void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data,
BlockHandle* handle,
BlockType block_type,
bool* skip_delta_encoding) {
Rep* r = rep_.get();
assert(r->state == Rep::State::kUnbuffered);
assert(!r->IsParallelCompressionActive());
CompressionType type;
bool is_data_block = block_type == BlockType::kData;
assert(is_data_block || block_type == BlockType::kIndex);
Status compress_status = CompressAndVerifyBlock(
uncompressed_block_data, is_data_block,
is_data_block ? r->data_block_working_area : r->index_block_working_area,
&r->single_threaded_compressed_output, &type);
r->SetStatus(compress_status);
if (UNLIKELY(!ok())) {
return;
}
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteBlock:TamperWithCompressedData",
&r->single_threaded_compressed_output);
WriteMaybeCompressedBlock(
type == kNoCompression ? uncompressed_block_data
: Slice(r->single_threaded_compressed_output),
type, handle, block_type, &uncompressed_block_data, skip_delta_encoding);
r->single_threaded_compressed_output.Reset();
if (is_data_block) {
r->props.data_size = r->get_offset();
r->props.uncompressed_data_size += uncompressed_block_data.size();
}
}
uint64_t BlockBasedTableBuilder::GetWorkerCPUMicros() const {
return rep_->worker_cpu_micros.LoadRelaxed();
}
void BlockBasedTableBuilder::BGWorker(WorkingAreaPair& working_area) {
const uint64_t start_cpu_micros =
rep_->ioptions.env->GetSystemClock()->CPUMicros();
Defer log_cpu{[this, start_cpu_micros]() {
rep_->worker_cpu_micros.FetchAddRelaxed(
rep_->ioptions.env->GetSystemClock()->CPUMicros() - start_cpu_micros);
}};
auto& pc_rep = *rep_->pc_rep;
#ifdef BBTB_PC_WATCHDOG
pc_rep.live_workers.FetchAddRelaxed(1);
Defer decr{[&pc_rep]() { pc_rep.live_workers.FetchSubRelaxed(1); }};
#endif ParallelCompressionRep::ThreadState thread_state =
ParallelCompressionRep::ThreadState::kIdle;
uint32_t slot = 0;
IOStatus ios;
do {
pc_rep.WorkerStateTransition(thread_state, slot);
ParallelCompressionRep::BlockRep* block_rep = &pc_rep.ring_buffer[slot];
auto compress_fn = [this, block_rep, &ios, &working_area]() {
ios = status_to_io_status(CompressAndVerifyBlock(
block_rep->uncompressed, true, working_area,
&block_rep->compressed, &block_rep->compression_type));
};
auto write_fn = [this, block_rep, &ios]() {
Slice compressed = block_rep->compressed;
Slice uncompressed = block_rep->uncompressed;
bool skip_delta_encoding = false;
ios = WriteMaybeCompressedBlockImpl(
block_rep->compression_type == kNoCompression ? uncompressed
: compressed,
block_rep->compression_type, &rep_->pending_handle, BlockType::kData,
&uncompressed, &skip_delta_encoding);
if (LIKELY(ios.ok())) {
rep_->props.data_size = rep_->get_offset();
rep_->props.uncompressed_data_size += block_rep->uncompressed.size();
rep_->index_builder->FinishIndexEntry(
rep_->pending_handle, block_rep->prepared_index_entry.get(),
skip_delta_encoding);
}
};
switch (thread_state) {
case ParallelCompressionRep::ThreadState::kEnd:
assert(ios.ok());
return;
case ParallelCompressionRep::ThreadState::kCompressing:
compress_fn();
break;
case ParallelCompressionRep::ThreadState::kCompressingAndWriting:
compress_fn();
if (LIKELY(ios.ok())) {
write_fn();
}
break;
case ParallelCompressionRep::ThreadState::kWriting:
write_fn();
break;
case ParallelCompressionRep::ThreadState::kEmitting:
assert(thread_state != ParallelCompressionRep::ThreadState::kEmitting);
break;
case ParallelCompressionRep::ThreadState::kIdle:
assert(thread_state != ParallelCompressionRep::ThreadState::kIdle);
break;
default:
assert(false);
break;
}
} while (LIKELY(ios.ok()));
rep_->SetIOStatus(ios);
pc_rep.SetAbort(thread_state);
}
Status BlockBasedTableBuilder::CompressAndVerifyBlock(
const Slice& uncompressed_block_data, bool is_data_block,
WorkingAreaPair& working_area, GrowableBuffer* compressed_output,
CompressionType* result_compression_type) {
Rep* r = rep_.get();
Status status;
UnownedPtr<Compressor> compressor = nullptr;
Decompressor* verify_decomp = nullptr;
if (is_data_block) {
compressor = r->data_block_compressor;
verify_decomp = r->data_block_verify_decompressor.get();
} else {
compressor = r->index_block_compressor;
verify_decomp = r->verify_decompressor.get();
}
compressed_output->Reset();
CompressionType type = kNoCompression;
if (LIKELY(uncompressed_block_data.size() < kCompressionSizeLimit)) {
if (compressor) {
StopWatchNano timer(
r->ioptions.clock,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));
size_t max_compressed_size = static_cast<size_t>(
(static_cast<uint64_t>(r->max_compressed_bytes_per_kb) *
uncompressed_block_data.size()) >>
10);
compressed_output->ResetForSize(max_compressed_size);
status = compressor->CompressBlock(
uncompressed_block_data, compressed_output->data(),
&compressed_output->MutableSize(), &type, &working_area.compress);
assert(type == kNoCompression || status.ok());
assert(type == kNoCompression ||
r->table_options.verify_compression == (verify_decomp != nullptr));
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::CompressAndVerifyBlock:TamperWithResultType",
&type);
if (verify_decomp && type != kNoCompression) {
BlockContents contents;
Status uncompress_status = DecompressBlockData(
compressed_output->data(), compressed_output->size(), type,
*verify_decomp, &contents, r->ioptions,
nullptr, &working_area.verify);
if (LIKELY(uncompress_status.ok())) {
bool data_match = contents.data.compare(uncompressed_block_data) == 0;
if (!data_match) {
const char* const msg =
"Decompressed block did not match pre-compression block";
ROCKS_LOG_ERROR(r->ioptions.logger, "%s", msg);
status = Status::Corruption(msg);
type = kNoCompression;
}
} else {
status = Status::Corruption(std::string("Could not decompress: ") +
uncompress_status.getState());
type = kNoCompression;
}
}
if (timer.IsStarted()) {
RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
}
}
if (is_data_block) {
r->compressible_input_data_bytes.FetchAddRelaxed(
uncompressed_block_data.size());
r->uncompressible_input_data_bytes.FetchAddRelaxed(kBlockTrailerSize);
}
} else {
if (is_data_block) {
r->uncompressible_input_data_bytes.FetchAddRelaxed(
uncompressed_block_data.size() + kBlockTrailerSize);
}
}
if (type == kNoCompression) {
bool compression_attempted = !compressed_output->empty();
RecordTick(r->ioptions.stats, compression_attempted
? NUMBER_BLOCK_COMPRESSION_REJECTED
: NUMBER_BLOCK_COMPRESSION_BYPASSED);
RecordTick(r->ioptions.stats,
compression_attempted ? BYTES_COMPRESSION_REJECTED
: BYTES_COMPRESSION_BYPASSED,
uncompressed_block_data.size());
} else {
RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED);
RecordTick(r->ioptions.stats, BYTES_COMPRESSED_FROM,
uncompressed_block_data.size());
RecordTick(r->ioptions.stats, BYTES_COMPRESSED_TO,
compressed_output->size());
if (r->IsParallelCompressionActive() && is_data_block) {
r->pc_rep->estimated_inflight_size.FetchSubRelaxed(
uncompressed_block_data.size() - compressed_output->size());
}
}
*result_compression_type = type;
return status;
}
void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
const Slice& block_contents, CompressionType comp_type, BlockHandle* handle,
BlockType block_type, const Slice* uncompressed_block_data,
bool* skip_delta_encoding) {
assert(status().ok());
assert(io_status().ok());
rep_->SetIOStatus(WriteMaybeCompressedBlockImpl(
block_contents, comp_type, handle, block_type, uncompressed_block_data,
skip_delta_encoding));
}
IOStatus BlockBasedTableBuilder::WriteMaybeCompressedBlockImpl(
const Slice& block_contents, CompressionType comp_type, BlockHandle* handle,
BlockType block_type, const Slice* uncompressed_block_data,
bool* skip_delta_encoding) {
Rep* r = rep_.get();
bool is_data_block = block_type == BlockType::kData;
if (is_data_block) {
assert(skip_delta_encoding != nullptr);
}
if (skip_delta_encoding != nullptr) {
*skip_delta_encoding = false;
}
IOOptions io_options;
IOStatus io_s =
WritableFileWriter::PrepareIOOptions(r->write_options, io_options);
if (UNLIKELY(!io_s.ok())) {
return io_s;
}
StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
auto offset = r->get_offset();
if ((r->table_options.super_block_alignment_size != 0) && is_data_block) {
auto super_block_alignment_mask =
r->table_options.super_block_alignment_size - 1;
if ((r->table_options.super_block_alignment_space_overhead_ratio != 0) &&
(offset & (~super_block_alignment_mask)) !=
((offset + block_contents.size()) &
(~super_block_alignment_mask))) {
auto allowed_max_padding_size =
r->table_options.super_block_alignment_size /
r->table_options.super_block_alignment_space_overhead_ratio;
auto pad_bytes = r->table_options.super_block_alignment_size -
(offset & super_block_alignment_mask);
if (pad_bytes < allowed_max_padding_size) {
io_s = r->file->Pad(io_options, pad_bytes, allowed_max_padding_size);
if (UNLIKELY(!io_s.ok())) {
r->SetIOStatus(io_s);
return io_s;
}
r->pre_compression_size += pad_bytes;
offset += pad_bytes;
r->set_offset(offset);
if (skip_delta_encoding != nullptr) {
*skip_delta_encoding = true;
}
TEST_SYNC_POINT(
"BlockBasedTableBuilder::WriteMaybeCompressedBlock:"
"SuperBlockAlignment");
} else {
TEST_SYNC_POINT(
"BlockBasedTableBuilder::WriteMaybeCompressedBlock:"
"SuperBlockAlignmentPaddingBytesExceedLimit");
}
}
}
handle->set_offset(offset);
handle->set_size(block_contents.size());
if (uncompressed_block_data == nullptr) {
uncompressed_block_data = &block_contents;
assert(comp_type == kNoCompression);
}
{
io_s = r->file->Append(io_options, block_contents);
if (UNLIKELY(!io_s.ok())) {
return io_s;
}
}
r->compression_types_used.Add(comp_type);
std::array<char, kBlockTrailerSize> trailer;
trailer[0] = comp_type;
uint32_t checksum = ComputeBuiltinChecksumWithLastByte(
r->table_options.checksum, block_contents.data(), block_contents.size(),
comp_type);
checksum += ChecksumModifierForContext(r->base_context_checksum, offset);
if (block_type == BlockType::kFilter) {
io_s = status_to_io_status(
r->filter_builder->MaybePostVerifyFilter(block_contents));
if (UNLIKELY(!io_s.ok())) {
return io_s;
}
}
EncodeFixed32(trailer.data() + 1, checksum);
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteMaybeCompressedBlock:TamperWithChecksum",
trailer.data());
{
io_s = r->file->Append(io_options, Slice(trailer.data(), trailer.size()));
if UNLIKELY (!io_s.ok()) {
return io_s;
}
}
if (r->warm_cache) {
io_s = status_to_io_status(
InsertBlockInCacheHelper(*uncompressed_block_data, handle, block_type));
if (UNLIKELY(!io_s.ok())) {
return io_s;
}
}
r->pre_compression_size +=
uncompressed_block_data->size() + kBlockTrailerSize;
r->set_offset(r->get_offset() + block_contents.size() + kBlockTrailerSize);
if (r->table_options.block_align && is_data_block) {
size_t pad_bytes =
(r->alignment -
((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) &
(r->alignment - 1);
io_s = r->file->Pad(io_options, pad_bytes, kDefaultPageSize);
if (LIKELY(io_s.ok())) {
r->pre_compression_size += pad_bytes;
r->set_offset(r->get_offset() + pad_bytes);
} else {
return io_s;
}
}
if (r->IsParallelCompressionActive() && is_data_block) {
r->pc_rep->estimated_inflight_size.FetchSubRelaxed(block_contents.size() +
kBlockTrailerSize);
}
return io_s;
}
void BlockBasedTableBuilder::MaybeStartParallelCompression() {
if (rep_->compression_parallel_threads <= 1) {
return;
}
if (!rep_->data_block_compressor) {
return;
}
rep_->pc_rep = std::make_unique<ParallelCompressionRep>(
rep_->compression_parallel_threads);
auto& pc_rep = *rep_->pc_rep;
for (uint32_t i = 0; i <= pc_rep.ring_buffer_mask; i++) {
pc_rep.ring_buffer[i].prepared_index_entry =
rep_->index_builder->CreatePreparedIndexEntry();
}
pc_rep.worker_threads.reserve(pc_rep.num_worker_threads);
pc_rep.working_areas.resize(pc_rep.num_worker_threads);
for (uint32_t i = 0; i < pc_rep.num_worker_threads; i++) {
auto& wa = pc_rep.working_areas[i];
if (rep_->data_block_compressor) {
wa.compress = rep_->data_block_compressor->ObtainWorkingArea();
}
if (rep_->data_block_verify_decompressor) {
wa.verify = rep_->data_block_verify_decompressor->ObtainWorkingArea(
rep_->data_block_compressor->GetPreferredCompressionType());
}
pc_rep.worker_threads.emplace_back([this, &wa] { BGWorker(wa); });
}
#ifdef BBTB_PC_WATCHDOG
pc_rep.watchdog_thread = std::thread([&pc_rep] { pc_rep.BGWatchdog(); });
pc_rep.live_emit.StoreRelaxed(true);
#endif }
void BlockBasedTableBuilder::StopParallelCompression(bool abort) {
auto& pc_rep = *rep_->pc_rep;
if (abort) {
pc_rep.SetAbort(pc_rep.emit_thread_state);
} else if (pc_rep.emit_thread_state !=
ParallelCompressionRep::ThreadState::kEnd) {
assert(rep_->props.num_data_blocks == 0 || !ok());
pc_rep.SetNoMoreToEmit(pc_rep.emit_thread_state, pc_rep.emit_slot);
}
#ifdef BBTB_PC_WATCHDOG
pc_rep.live_emit.StoreRelaxed(false);
#endif assert(pc_rep.emit_thread_state == ParallelCompressionRep::ThreadState::kEnd);
for (auto& thread : pc_rep.worker_threads) {
thread.join();
}
#ifdef BBTB_PC_WATCHDOG
{
std::unique_lock<std::mutex> lock(pc_rep.watchdog_mutex);
pc_rep.shutdown_watchdog = true;
pc_rep.watchdog_cv.notify_all();
}
pc_rep.watchdog_thread.join();
#endif rep_->pc_rep.reset();
}
Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
IOStatus BlockBasedTableBuilder::io_status() const {
return rep_->GetIOStatus();
}
bool BlockBasedTableBuilder::ok() const { return rep_->StatusOk(); }
Status BlockBasedTableBuilder::InsertBlockInCacheHelper(
const Slice& block_contents, const BlockHandle* handle,
BlockType block_type) {
Cache* block_cache = rep_->table_options.block_cache.get();
Status s;
auto helper =
GetCacheItemHelper(block_type, rep_->ioptions.lowest_used_cache_tier);
if (block_cache && helper && helper->create_cb) {
CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle);
size_t charge;
s = WarmInCache(block_cache, key.AsSlice(), block_contents,
&rep_->create_context, helper, Cache::Priority::LOW,
&charge);
if (LIKELY(s.ok())) {
BlockBasedTable::UpdateCacheInsertionMetrics(
block_type, nullptr , charge, s.IsOkOverwritten(),
rep_->ioptions.stats);
} else {
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
}
}
return s;
}
void BlockBasedTableBuilder::WriteFilterBlock(
MetaIndexBuilder* meta_index_builder) {
if (rep_->filter_builder == nullptr || rep_->filter_builder->IsEmpty()) {
return;
}
if (!rep_->last_ikey.empty()) {
rep_->filter_builder->PrevKeyBeforeFinish(
ExtractUserKeyAndStripTimestamp(rep_->last_ikey, rep_->ts_sz));
}
BlockHandle filter_block_handle;
bool is_partitioned_filter = rep_->table_options.partition_filters;
if (LIKELY(ok())) {
rep_->props.num_filter_entries +=
rep_->filter_builder->EstimateEntriesAdded();
Status s = Status::Incomplete();
while (LIKELY(ok()) && s.IsIncomplete()) {
std::unique_ptr<const char[]> filter_owner;
Slice filter_content;
s = rep_->filter_builder->Finish(filter_block_handle, &filter_content,
&filter_owner);
assert(s.ok() || s.IsIncomplete() || s.IsCorruption());
if (s.IsCorruption()) {
rep_->SetStatus(s);
break;
}
rep_->props.filter_size += filter_content.size();
BlockType btype = is_partitioned_filter && s.ok()
? BlockType::kFilterPartitionIndex
: BlockType::kFilter;
WriteMaybeCompressedBlock(filter_content, kNoCompression,
&filter_block_handle, btype);
}
rep_->filter_builder->ResetFilterBitsBuilder();
}
if (LIKELY(ok())) {
std::string key;
key = is_partitioned_filter ? BlockBasedTable::kPartitionedFilterBlockPrefix
: BlockBasedTable::kFullFilterBlockPrefix;
key.append(rep_->table_options.filter_policy->CompatibilityName());
meta_index_builder->Add(key, filter_block_handle);
}
}
void BlockBasedTableBuilder::WriteIndexBlock(
MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
if (UNLIKELY(!ok())) {
return;
}
IndexBuilder::IndexBlocks index_blocks;
auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
if (LIKELY(ok()) && !index_builder_status.ok() &&
!index_builder_status.IsIncomplete()) {
rep_->SetStatus(index_builder_status);
}
if (LIKELY(ok())) {
for (const auto& item : index_blocks.meta_blocks) {
BlockHandle block_handle;
if (item.second.first == BlockType::kIndex) {
WriteBlock(item.second.second, &block_handle, item.second.first);
} else {
assert(item.second.first == BlockType::kUserDefinedIndex);
WriteMaybeCompressedBlock(item.second.second, kNoCompression,
&block_handle, item.second.first);
}
if (UNLIKELY(!ok())) {
break;
}
meta_index_builder->Add(item.first, block_handle);
}
}
if (LIKELY(ok())) {
if (rep_->table_options.enable_index_compression) {
WriteBlock(index_blocks.index_block_contents, index_block_handle,
BlockType::kIndex);
} else {
WriteMaybeCompressedBlock(index_blocks.index_block_contents,
kNoCompression, index_block_handle,
BlockType::kIndex);
}
}
if (index_builder_status.IsIncomplete()) {
bool index_building_finished = false;
while (LIKELY(ok()) && !index_building_finished) {
Status s =
rep_->index_builder->Finish(&index_blocks, *index_block_handle);
if (s.ok()) {
index_building_finished = true;
} else if (s.IsIncomplete()) {
assert(!index_building_finished);
} else {
rep_->SetStatus(s);
return;
}
if (rep_->table_options.enable_index_compression) {
WriteBlock(index_blocks.index_block_contents, index_block_handle,
BlockType::kIndex);
} else {
WriteMaybeCompressedBlock(index_blocks.index_block_contents,
kNoCompression, index_block_handle,
BlockType::kIndex);
}
}
}
if (LIKELY(ok()) && !FormatVersionUsesIndexHandleInFooter(
rep_->table_options.format_version)) {
meta_index_builder->Add(kIndexBlockName, *index_block_handle);
}
}
void BlockBasedTableBuilder::WritePropertiesBlock(
MetaIndexBuilder* meta_index_builder) {
BlockHandle properties_block_handle;
if (LIKELY(ok())) {
PropertyBlockBuilder property_block_builder;
rep_->props.filter_policy_name =
rep_->table_options.filter_policy != nullptr
? rep_->table_options.filter_policy->Name()
: "";
rep_->props.index_size =
rep_->index_builder->IndexSize() + kBlockTrailerSize;
rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
? rep_->ioptions.user_comparator->Name()
: "nullptr";
rep_->props.merge_operator_name =
rep_->ioptions.merge_operator != nullptr
? rep_->ioptions.merge_operator->Name()
: "nullptr";
rep_->props.prefix_extractor_name =
rep_->prefix_extractor ? rep_->prefix_extractor->AsString() : "nullptr";
std::string property_collectors_names = "[";
for (size_t i = 0;
i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
if (i != 0) {
property_collectors_names += ",";
}
property_collectors_names +=
rep_->ioptions.table_properties_collector_factories[i]->Name();
}
property_collectors_names += "]";
rep_->props.property_collectors_names = property_collectors_names;
rep_->PostPopulateCompressionProperties();
if (rep_->table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
assert(rep_->p_index_builder_ != nullptr);
rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
rep_->props.top_level_index_size =
rep_->p_index_builder_->TopLevelIndexSize(rep_->offset.LoadRelaxed());
}
rep_->props.index_key_is_user_key =
!rep_->index_builder->separator_is_key_plus_seq();
rep_->props.index_value_is_delta_encoded =
rep_->use_delta_encoding_for_index_values;
if (rep_->sampled_input_data_bytes.LoadRelaxed() > 0) {
rep_->props.slow_compression_estimated_data_size = static_cast<uint64_t>(
static_cast<double>(
rep_->sampled_output_slow_data_bytes.LoadRelaxed()) /
rep_->sampled_input_data_bytes.LoadRelaxed() *
rep_->compressible_input_data_bytes.LoadRelaxed() +
rep_->uncompressible_input_data_bytes.LoadRelaxed() + 0.5);
rep_->props.fast_compression_estimated_data_size = static_cast<uint64_t>(
static_cast<double>(
rep_->sampled_output_fast_data_bytes.LoadRelaxed()) /
rep_->sampled_input_data_bytes.LoadRelaxed() *
rep_->compressible_input_data_bytes.LoadRelaxed() +
rep_->uncompressible_input_data_bytes.LoadRelaxed() + 0.5);
} else if (rep_->sample_for_compression > 0) {
rep_->props.slow_compression_estimated_data_size =
rep_->compressible_input_data_bytes.LoadRelaxed() +
rep_->uncompressible_input_data_bytes.LoadRelaxed();
rep_->props.fast_compression_estimated_data_size =
rep_->compressible_input_data_bytes.LoadRelaxed() +
rep_->uncompressible_input_data_bytes.LoadRelaxed();
}
rep_->props.user_defined_timestamps_persisted =
rep_->persist_user_defined_timestamps;
assert(IsEmpty() || rep_->props.key_largest_seqno != UINT64_MAX);
property_block_builder.AddTableProperty(rep_->props);
NotifyCollectTableCollectorsOnFinish(
rep_->table_properties_collectors, rep_->ioptions.logger,
&property_block_builder, rep_->props.user_collected_properties,
rep_->props.readable_properties);
Slice block_data = property_block_builder.Finish();
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WritePropertiesBlock:BlockData", &block_data);
WriteMaybeCompressedBlock(block_data, kNoCompression,
&properties_block_handle, BlockType::kProperties);
}
if (LIKELY(ok())) {
#ifndef NDEBUG
{
uint64_t props_block_offset = properties_block_handle.offset();
uint64_t props_block_size = properties_block_handle.size();
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
&props_block_offset);
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
&props_block_size);
}
#endif
const std::string* properties_block_meta = &kPropertiesBlockName;
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WritePropertiesBlock:Meta",
&properties_block_meta);
meta_index_builder->Add(*properties_block_meta, properties_block_handle);
}
}
void BlockBasedTableBuilder::WriteCompressionDictBlock(
MetaIndexBuilder* meta_index_builder) {
Slice compression_dict;
if (rep_->data_block_compressor) {
compression_dict = rep_->data_block_compressor->GetSerializedDict();
}
if (!compression_dict.empty()) {
BlockHandle compression_dict_block_handle;
if (LIKELY(ok())) {
WriteMaybeCompressedBlock(compression_dict, kNoCompression,
&compression_dict_block_handle,
BlockType::kCompressionDictionary);
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
&compression_dict);
}
if (LIKELY(ok())) {
meta_index_builder->Add(kCompressionDictBlockName,
compression_dict_block_handle);
}
}
}
void BlockBasedTableBuilder::WriteRangeDelBlock(
MetaIndexBuilder* meta_index_builder) {
if (LIKELY(ok()) && !rep_->range_del_block.empty()) {
BlockHandle range_del_block_handle;
WriteMaybeCompressedBlock(rep_->range_del_block.Finish(), kNoCompression,
&range_del_block_handle,
BlockType::kRangeDeletion);
meta_index_builder->Add(kRangeDelBlockName, range_del_block_handle);
}
}
void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
BlockHandle& index_block_handle) {
assert(LIKELY(ok()));
Rep* r = rep_.get();
FooterBuilder footer;
Status s = footer.Build(kBlockBasedTableMagicNumber,
r->table_options.format_version, r->get_offset(),
r->table_options.checksum, metaindex_block_handle,
index_block_handle, r->base_context_checksum);
if (!s.ok()) {
r->SetStatus(s);
return;
}
IOOptions io_options;
IOStatus ios =
WritableFileWriter::PrepareIOOptions(r->write_options, io_options);
if (!ios.ok()) {
r->SetIOStatus(ios);
return;
}
ios = r->file->Append(io_options, footer.GetSlice());
if (ios.ok()) {
r->pre_compression_size += footer.GetSlice().size();
r->set_offset(r->get_offset() + footer.GetSlice().size());
} else {
r->SetIOStatus(ios);
}
}
void BlockBasedTableBuilder::MaybeEnterUnbuffered(
const Slice* first_key_in_next_block) {
Rep* r = rep_.get();
assert(r->state == Rep::State::kBuffered);
if (first_key_in_next_block != nullptr) {
bool exceeds_buffer_limit =
(r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
if (!exceeds_buffer_limit) {
bool exceeds_global_block_cache_limit = false;
if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
Status s =
r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
r->data_begin_offset);
exceeds_global_block_cache_limit = s.IsMemoryLimit();
}
if (!exceeds_global_block_cache_limit) {
return;
}
}
}
r->state = Rep::State::kUnbuffered;
const size_t kNumBlocksBuffered = r->data_block_buffers.size();
if (kNumBlocksBuffered == 0) {
assert(!r->data_block_compressor);
r->data_block_compressor = r->basic_compressor.get();
return;
}
const uint64_t kPrimeGenerator = 545055921143ull;
const size_t kPrimeGeneratorRemainder = static_cast<size_t>(
kPrimeGenerator % static_cast<uint64_t>(kNumBlocksBuffered));
const size_t kInitSampleIdx = kNumBlocksBuffered / 2;
Compressor::DictSamples samples;
size_t buffer_idx = kInitSampleIdx;
auto* sampling =
std::get_if<Compressor::DictSampling>(&r->data_block_dict_guidance);
assert(sampling != nullptr);
size_t max_sample_bytes = sampling->max_sample_bytes;
for (size_t i = 0;
i < kNumBlocksBuffered && samples.sample_data.size() < max_sample_bytes;
++i) {
size_t copy_len = std::min(max_sample_bytes - samples.sample_data.size(),
r->data_block_buffers[buffer_idx].size());
samples.sample_data.append(r->data_block_buffers[buffer_idx], 0, copy_len);
samples.sample_lens.emplace_back(copy_len);
buffer_idx += kPrimeGeneratorRemainder;
if (buffer_idx >= kNumBlocksBuffered) {
buffer_idx -= kNumBlocksBuffered;
}
}
assert(samples.sample_data.size() > 0);
r->data_block_compressor =
MaybeCloneSpecialized(r->basic_compressor.get(),
CacheEntryRole::kDataBlock, std::move(samples));
Slice serialized_dict = r->data_block_compressor->GetSerializedDict();
if (r->verify_decompressor) {
if (serialized_dict.empty()) {
r->data_block_verify_decompressor = r->verify_decompressor.get();
} else {
Status s = r->verify_decompressor->MaybeCloneForDict(
serialized_dict, &r->verify_decompressor_with_dict);
assert(r->verify_decompressor_with_dict);
if (r->verify_decompressor_with_dict) {
r->data_block_verify_decompressor =
r->verify_decompressor_with_dict.get();
assert(s.ok());
} else {
assert(!s.ok());
r->SetStatus(s);
}
}
}
auto get_iterator_for_block = [&r](size_t i) {
auto& data_block = r->data_block_buffers[i];
assert(!data_block.empty());
Block reader{
BlockContents{data_block}, 0 ,
nullptr ,
static_cast<uint32_t>(r->table_options.block_restart_interval)};
DataBlockIter* iter = reader.NewDataIterator(
r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber,
nullptr , nullptr ,
false , r->persist_user_defined_timestamps);
iter->SeekToFirst();
assert(iter->Valid());
return std::unique_ptr<DataBlockIter>(iter);
};
std::unique_ptr<DataBlockIter> iter = nullptr, next_block_iter = nullptr;
for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) {
if (iter == nullptr) {
iter = get_iterator_for_block(i);
assert(iter != nullptr);
};
for (; iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (r->filter_builder != nullptr) {
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, r->ts_sz));
}
r->index_builder->OnKeyAdded(key, iter->value());
}
Slice first_key_in_loop_next_block;
const Slice* first_key_in_loop_next_block_ptr;
if (i + 1 < r->data_block_buffers.size()) {
next_block_iter = get_iterator_for_block(i + 1);
first_key_in_loop_next_block = next_block_iter->key();
first_key_in_loop_next_block_ptr = &first_key_in_loop_next_block;
} else {
first_key_in_loop_next_block_ptr = first_key_in_next_block;
}
auto& data_block = r->data_block_buffers[i];
iter->SeekToLast();
assert(iter->Valid());
if (r->IsParallelCompressionActive()) {
EmitBlockForParallel(data_block, iter->key(),
first_key_in_loop_next_block_ptr);
} else {
EmitBlock(data_block, iter->key(), first_key_in_loop_next_block_ptr);
}
std::swap(iter, next_block_iter);
}
r->data_block_buffers.clear();
r->data_begin_offset = 0;
if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
r->data_begin_offset);
s.PermitUncheckedError();
}
}
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_.get();
assert(r->state != Rep::State::kClosed);
#ifndef NDEBUG
{
IOStatus s = rep_->GetIOStatus();
TEST_SYNC_POINT_CALLBACK("BlockBasedTableBuilder::Finish:ParallelIOStatus",
&s);
if (!s.ok()) {
rep_->SetIOStatus(s);
}
}
#endif Flush(nullptr);
if (rep_->state == Rep::State::kBuffered) {
MaybeEnterUnbuffered(nullptr);
}
assert(r->state == Rep::State::kUnbuffered);
if (r->IsParallelCompressionActive()) {
StopParallelCompression(false);
}
r->props.tail_start_offset = r->offset.LoadRelaxed();
uint64_t last_estimated_tail_size = EstimatedTailSize();
BlockHandle metaindex_block_handle, index_block_handle;
MetaIndexBuilder meta_index_builder;
WriteFilterBlock(&meta_index_builder);
WriteIndexBlock(&meta_index_builder, &index_block_handle);
WriteCompressionDictBlock(&meta_index_builder);
WriteRangeDelBlock(&meta_index_builder);
WritePropertiesBlock(&meta_index_builder);
if (LIKELY(ok())) {
WriteMaybeCompressedBlock(meta_index_builder.Finish(), kNoCompression,
&metaindex_block_handle, BlockType::kMetaIndex);
}
if (LIKELY(ok())) {
WriteFooter(metaindex_block_handle, index_block_handle);
}
r->state = Rep::State::kClosed;
r->tail_size = r->offset.LoadRelaxed() - r->props.tail_start_offset;
if (r->target_file_size_is_upper_bound &&
r->reason == TableFileCreationReason::kCompaction &&
r->table_options.index_type != BlockBasedTableOptions::kHashSearch) {
ROCKS_LOG_WARN(r->ioptions.info_log,
"File number: %" PRIu64 ", Estimated tail size = %" PRIu64
" bytes, Actual tail size = %" PRIu64 " bytes",
r->props.orig_file_number, last_estimated_tail_size,
r->tail_size);
assert(r->tail_size <= last_estimated_tail_size);
}
return r->GetStatus();
}
void BlockBasedTableBuilder::Abandon() {
assert(rep_->state != Rep::State::kClosed);
if (rep_->IsParallelCompressionActive()) {
StopParallelCompression(true);
}
rep_->state = Rep::State::kClosed;
rep_->GetIOStatus().PermitUncheckedError();
}
uint64_t BlockBasedTableBuilder::NumEntries() const {
return rep_->props.num_entries;
}
bool BlockBasedTableBuilder::IsEmpty() const {
return rep_->props.num_entries == 0 && rep_->props.num_range_deletions == 0;
}
uint64_t BlockBasedTableBuilder::PreCompressionSize() const {
return rep_->pre_compression_size;
}
uint64_t BlockBasedTableBuilder::FileSize() const {
return rep_->offset.LoadRelaxed();
}
uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
if (rep_->IsParallelCompressionActive()) {
return FileSize() + rep_->pc_rep->estimated_inflight_size.LoadRelaxed();
} else {
return FileSize();
}
}
uint64_t BlockBasedTableBuilder::EstimatedTailSize() const {
uint64_t estimated_tail_size = 0;
if (rep_->table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
assert(rep_->p_index_builder_);
estimated_tail_size += rep_->p_index_builder_->CurrentIndexSizeEstimate();
} else {
assert(rep_->index_builder);
estimated_tail_size += rep_->index_builder->CurrentIndexSizeEstimate();
}
if (rep_->filter_builder) {
estimated_tail_size += rep_->filter_builder->CurrentFilterSizeEstimate();
}
if (rep_->data_block_compressor) {
Slice dict = rep_->data_block_compressor->GetSerializedDict();
if (!dict.empty()) {
estimated_tail_size += dict.size();
}
}
if (!rep_->range_del_block.empty()) {
estimated_tail_size += rep_->range_del_block.CurrentSizeEstimate();
}
estimated_tail_size += 2048;
estimated_tail_size += 1024;
estimated_tail_size += Footer::kMaxEncodedLength;
return estimated_tail_size;
}
uint64_t BlockBasedTableBuilder::GetTailSize() const { return rep_->tail_size; }
bool BlockBasedTableBuilder::NeedCompact() const {
for (const auto& collector : rep_->table_properties_collectors) {
if (collector->NeedCompact()) {
return true;
}
}
return false;
}
TableProperties BlockBasedTableBuilder::GetTableProperties() const {
return rep_->props;
}
std::string BlockBasedTableBuilder::GetFileChecksum() const {
if (rep_->file != nullptr) {
return rep_->file->GetFileChecksum();
} else {
return kUnknownFileChecksum;
}
}
const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
if (rep_->file != nullptr) {
return rep_->file->GetFileChecksumFuncName();
} else {
return kUnknownFileChecksumFuncName;
}
}
void BlockBasedTableBuilder::SetSeqnoTimeTableProperties(
const SeqnoToTimeMapping& relevant_mapping, uint64_t oldest_ancestor_time) {
assert(rep_->props.seqno_to_time_mapping.empty());
relevant_mapping.EncodeTo(rep_->props.seqno_to_time_mapping);
rep_->props.creation_time = oldest_ancestor_time;
}
const std::string BlockBasedTable::kObsoleteFilterBlockPrefix = "filter.";
const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
"partitionedfilter.";
}