#include "cache/compressed_secondary_cache.h"
#include <algorithm>
#include <cstdint>
#include <memory>
#include "memory/memory_allocator_impl.h"
#include "monitoring/perf_context_imp.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
namespace {
constexpr uint32_t kTagSize = 2;
uint32_t GetHeaderSize(size_t data_size, bool enable_split_merge) {
return (enable_split_merge ? 0 : VarintLength(kTagSize + data_size)) +
kTagSize;
}
}
CompressedSecondaryCache::CompressedSecondaryCache(
const CompressedSecondaryCacheOptions& opts)
: cache_(opts.LRUCacheOptions::MakeSharedCache()),
cache_options_(opts),
cache_res_mgr_(std::make_shared<ConcurrentCacheReservationManager>(
std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
cache_))),
disable_cache_(opts.capacity == 0) {
auto mgr = GetBuiltinV2CompressionManager();
compressor_ = mgr->GetCompressor(cache_options_.compression_opts,
cache_options_.compression_type);
decompressor_ =
mgr->GetDecompressorOptimizeFor(cache_options_.compression_type);
}
CompressedSecondaryCache::~CompressedSecondaryCache() = default;
std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
const Slice& key, const Cache::CacheItemHelper* helper,
Cache::CreateContext* create_context, bool , bool advise_erase,
Statistics* stats, bool& kept_in_sec_cache) {
assert(helper);
if (disable_cache_.LoadRelaxed()) {
return nullptr;
}
std::unique_ptr<SecondaryCacheResultHandle> handle;
kept_in_sec_cache = false;
Cache::Handle* lru_handle = cache_->Lookup(key);
if (lru_handle == nullptr) {
return nullptr;
}
void* handle_value = cache_->Value(lru_handle);
if (handle_value == nullptr) {
cache_->Release(lru_handle, false);
RecordTick(stats, COMPRESSED_SECONDARY_CACHE_DUMMY_HITS);
return nullptr;
}
std::string merged_value;
Slice tagged_data;
if (cache_options_.enable_custom_split_merge) {
CacheValueChunk* value_chunk_ptr =
static_cast<CacheValueChunk*>(handle_value);
merged_value = MergeChunksIntoValue(value_chunk_ptr);
tagged_data = Slice(merged_value);
} else {
tagged_data = GetLengthPrefixedSlice(static_cast<char*>(handle_value));
}
auto source = lossless_cast<CacheTier>(tagged_data[0]);
auto type = lossless_cast<CompressionType>(tagged_data[1]);
std::unique_ptr<char[]> uncompressed;
Slice saved(tagged_data.data() + kTagSize, tagged_data.size() - kTagSize);
if (source == CacheTier::kVolatileCompressedTier) {
if (type != kNoCompression) {
Decompressor::Args args;
args.compressed_data = saved;
args.compression_type = type;
Status s = decompressor_->ExtractUncompressedSize(args);
assert(s.ok()); if (s.ok()) {
uncompressed = std::make_unique<char[]>(args.uncompressed_size);
s = decompressor_->DecompressBlock(args, uncompressed.get());
assert(s.ok()); }
if (!s.ok()) {
cache_->Release(lru_handle, true);
return nullptr;
}
saved = Slice(uncompressed.get(), args.uncompressed_size);
type = kNoCompression;
merged_value = std::string();
}
source = CacheTier::kVolatileTier;
}
Cache::ObjectPtr result_value = nullptr;
size_t result_charge = 0;
Status s = helper->create_cb(saved, type, source, create_context,
cache_options_.memory_allocator.get(),
&result_value, &result_charge);
if (!s.ok()) {
cache_->Release(lru_handle, true);
return nullptr;
}
if (advise_erase) {
cache_->Release(lru_handle, true);
cache_
->Insert(key, nullptr,
GetHelper(cache_options_.enable_custom_split_merge),
0)
.PermitUncheckedError();
} else {
kept_in_sec_cache = true;
cache_->Release(lru_handle, false);
}
handle.reset(
new CompressedSecondaryCacheResultHandle(result_value, result_charge));
RecordTick(stats, COMPRESSED_SECONDARY_CACHE_HITS);
return handle;
}
bool CompressedSecondaryCache::MaybeInsertDummy(const Slice& key) {
auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge);
Cache::Handle* lru_handle = cache_->Lookup(key);
if (lru_handle == nullptr) {
PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1);
cache_->Insert(key, nullptr, internal_helper, 0)
.PermitUncheckedError();
return true;
} else {
cache_->Release(lru_handle, false);
}
return false;
}
Status CompressedSecondaryCache::InsertInternal(
const Slice& key, Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper, CompressionType from_type,
CacheTier source) {
bool enable_split_merge = cache_options_.enable_custom_split_merge;
const Cache::CacheItemHelper* internal_helper = GetHelper(enable_split_merge);
const size_t data_size_original = (*helper->size_cb)(value);
size_t header_size = GetHeaderSize(data_size_original, enable_split_merge);
CacheAllocationPtr allocation = AllocateBlock(
header_size + data_size_original, cache_options_.memory_allocator.get());
char* data_ptr = allocation.get() + header_size;
Slice tagged_data(data_ptr - kTagSize, data_size_original + kTagSize);
assert(tagged_data.data() >= allocation.get());
Status s = (*helper->saveto_cb)(value, 0, data_size_original, data_ptr);
if (!s.ok()) {
return s;
}
std::unique_ptr<char[]> tagged_compressed_data;
CompressionType to_type = kNoCompression;
if (compressor_ && from_type == kNoCompression &&
!cache_options_.do_not_compress_roles.Contains(helper->role)) {
assert(source == CacheTier::kVolatileCompressedTier);
size_t data_size_compressed = data_size_original - 1;
tagged_compressed_data =
std::make_unique<char[]>(data_size_compressed + kTagSize);
s = compressor_->CompressBlock(Slice(data_ptr, data_size_original),
tagged_compressed_data.get() + kTagSize,
&data_size_compressed, &to_type,
nullptr );
if (!s.ok()) {
return s;
}
PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes,
data_size_original);
if (to_type == kNoCompression) {
to_type = kNoCompression;
tagged_compressed_data.reset();
PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes,
data_size_original);
} else {
PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes,
data_size_compressed);
if (enable_split_merge) {
tagged_data = Slice(tagged_compressed_data.get(),
data_size_compressed + kTagSize);
allocation.reset();
} else {
header_size = GetHeaderSize(data_size_compressed, enable_split_merge);
allocation = AllocateBlock(header_size + data_size_compressed,
cache_options_.memory_allocator.get());
data_ptr = allocation.get() + header_size;
std::memcpy(data_ptr, tagged_compressed_data.get() + kTagSize,
data_size_compressed);
tagged_data =
Slice(data_ptr - kTagSize, data_size_compressed + kTagSize);
assert(tagged_data.data() >= allocation.get());
}
}
}
PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1);
const_cast<char*>(tagged_data.data())[0] = lossless_cast<char>(source);
const_cast<char*>(tagged_data.data())[1] = lossless_cast<char>(
source == CacheTier::kVolatileCompressedTier ? to_type : from_type);
if (enable_split_merge) {
size_t split_charge{0};
CacheValueChunk* value_chunks_head =
SplitValueIntoChunks(tagged_data, split_charge);
s = cache_->Insert(key, value_chunks_head, internal_helper, split_charge);
assert(s.ok()); } else {
char* ptr = allocation.get();
ptr = EncodeVarint64(ptr, tagged_data.size());
assert(ptr == tagged_data.data());
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
size_t charge = malloc_usable_size(allocation.get());
#else
size_t charge = tagged_data.size();
#endif
s = cache_->Insert(key, allocation.release(), internal_helper, charge);
assert(s.ok()); }
return Status::OK();
}
Status CompressedSecondaryCache::Insert(const Slice& key,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper,
bool force_insert) {
if (value == nullptr) {
return Status::InvalidArgument();
}
if (!force_insert && MaybeInsertDummy(key)) {
return Status::OK();
}
return InsertInternal(key, value, helper, kNoCompression,
CacheTier::kVolatileCompressedTier);
}
Status CompressedSecondaryCache::InsertSaved(
const Slice& key, const Slice& saved, CompressionType type = kNoCompression,
CacheTier source = CacheTier::kVolatileTier) {
if (source == CacheTier::kVolatileCompressedTier) {
assert(source != CacheTier::kVolatileCompressedTier);
return Status::OK();
}
if (type == kNoCompression) {
return Status::OK();
}
if (cache_options_.enable_custom_split_merge) {
return Status::OK();
}
auto slice_helper = &kSliceCacheItemHelper;
if (MaybeInsertDummy(key)) {
return Status::OK();
}
return InsertInternal(
key, static_cast<Cache::ObjectPtr>(const_cast<Slice*>(&saved)),
slice_helper, type, source);
}
void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
Status CompressedSecondaryCache::SetCapacity(size_t capacity) {
MutexLock l(&capacity_mutex_);
cache_options_.capacity = capacity;
cache_->SetCapacity(capacity);
disable_cache_.StoreRelaxed(capacity == 0);
return Status::OK();
}
Status CompressedSecondaryCache::GetCapacity(size_t& capacity) {
MutexLock l(&capacity_mutex_);
capacity = cache_options_.capacity;
return Status::OK();
}
std::string CompressedSecondaryCache::GetPrintableOptions() const {
std::string ret;
ret.reserve(20000);
const int kBufferSize{200};
char buffer[kBufferSize];
ret.append(cache_->GetPrintableOptions());
snprintf(buffer, kBufferSize, " compression_type : %s\n",
CompressionTypeToString(cache_options_.compression_type).c_str());
ret.append(buffer);
snprintf(buffer, kBufferSize, " compression_opts : %s\n",
CompressionOptionsToString(
const_cast<CompressionOptions&>(cache_options_.compression_opts))
.c_str());
ret.append(buffer);
return ret;
}
CompressedSecondaryCache::CacheValueChunk*
CompressedSecondaryCache::SplitValueIntoChunks(const Slice& value,
size_t& charge) {
assert(!value.empty());
const char* src_ptr = value.data();
size_t src_size{value.size()};
CacheValueChunk dummy_head = CacheValueChunk();
CacheValueChunk* current_chunk = &dummy_head;
size_t predicted_chunk_size{0};
size_t actual_chunk_size{0};
size_t tmp_size{0};
while (src_size > 0) {
predicted_chunk_size = sizeof(CacheValueChunk) - 1 + src_size;
auto upper =
std::upper_bound(malloc_bin_sizes_.begin(), malloc_bin_sizes_.end(),
predicted_chunk_size);
if (upper == malloc_bin_sizes_.begin() ||
upper == malloc_bin_sizes_.end() ||
*upper - predicted_chunk_size < malloc_bin_sizes_.front()) {
tmp_size = predicted_chunk_size;
} else {
tmp_size = *(--upper);
}
CacheValueChunk* new_chunk =
static_cast<CacheValueChunk*>(static_cast<void*>(new char[tmp_size]));
current_chunk->next = new_chunk;
current_chunk = current_chunk->next;
actual_chunk_size = tmp_size - sizeof(CacheValueChunk) + 1;
memcpy(current_chunk->data, src_ptr, actual_chunk_size);
current_chunk->size = actual_chunk_size;
src_ptr += actual_chunk_size;
src_size -= actual_chunk_size;
charge += tmp_size;
}
current_chunk->next = nullptr;
return dummy_head.next;
}
std::string CompressedSecondaryCache::MergeChunksIntoValue(
const CacheValueChunk* head) {
const CacheValueChunk* current_chunk = head;
size_t total_size = 0;
while (current_chunk != nullptr) {
total_size += current_chunk->size;
current_chunk = current_chunk->next;
}
std::string result;
result.reserve(total_size);
current_chunk = head;
while (current_chunk != nullptr) {
result.append(current_chunk->data, current_chunk->size);
current_chunk = current_chunk->next;
}
assert(result.size() == total_size);
return result;
}
const Cache::CacheItemHelper* CompressedSecondaryCache::GetHelper(
bool enable_custom_split_merge) const {
if (enable_custom_split_merge) {
static const Cache::CacheItemHelper kHelper{
CacheEntryRole::kMisc,
[](Cache::ObjectPtr obj, MemoryAllocator* ) {
CacheValueChunk* chunks_head = static_cast<CacheValueChunk*>(obj);
while (chunks_head != nullptr) {
CacheValueChunk* tmp_chunk = chunks_head;
chunks_head = chunks_head->next;
tmp_chunk->Free();
}
}};
return &kHelper;
} else {
static const Cache::CacheItemHelper kHelper{
CacheEntryRole::kMisc,
[](Cache::ObjectPtr obj, MemoryAllocator* alloc) {
if (obj != nullptr) {
CacheAllocationDeleter{alloc}(static_cast<char*>(obj));
}
}};
return &kHelper;
}
}
size_t CompressedSecondaryCache::TEST_GetCharge(const Slice& key) {
Cache::Handle* lru_handle = cache_->Lookup(key);
if (lru_handle == nullptr) {
return 0;
}
size_t charge = cache_->GetCharge(lru_handle);
cache_->Release(lru_handle, false);
return charge;
}
std::shared_ptr<SecondaryCache>
CompressedSecondaryCacheOptions::MakeSharedSecondaryCache() const {
return std::make_shared<CompressedSecondaryCache>(*this);
}
Status CompressedSecondaryCache::Deflate(size_t decrease) {
return cache_res_mgr_->UpdateCacheReservation(decrease, true);
}
Status CompressedSecondaryCache::Inflate(size_t increase) {
return cache_res_mgr_->UpdateCacheReservation(increase, false);
}
}