#include "rocksdb/write_buffer_manager.h"
#include <mutex>
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
#ifndef ROCKSDB_LITE
namespace {
const size_t kSizeDummyEntry = 256 * 1024;
const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
}
struct WriteBufferManager::CacheRep {
std::shared_ptr<Cache> cache_;
std::mutex cache_mutex_;
std::atomic<size_t> cache_allocated_size_;
char cache_key_[kCacheKeyPrefix + kMaxVarint64Length];
uint64_t next_cache_key_id_ = 0;
std::vector<Cache::Handle*> dummy_handles_;
explicit CacheRep(std::shared_ptr<Cache> cache)
: cache_(cache), cache_allocated_size_(0) {
memset(cache_key_, 0, kCacheKeyPrefix);
size_t pointer_size = sizeof(const void*);
assert(pointer_size <= kCacheKeyPrefix);
memcpy(cache_key_, static_cast<const void*>(this), pointer_size);
}
Slice GetNextCacheKey() {
memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length);
char* end =
EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++);
return Slice(cache_key_, static_cast<size_t>(end - cache_key_));
}
};
#else
struct WriteBufferManager::CacheRep {};
#endif
WriteBufferManager::WriteBufferManager(size_t _buffer_size,
std::shared_ptr<Cache> cache)
: buffer_size_(_buffer_size),
mutable_limit_(buffer_size_ * 7 / 8),
memory_used_(0),
memory_active_(0),
dummy_size_(0),
cache_rep_(nullptr) {
#ifndef ROCKSDB_LITE
if (cache) {
cache_rep_.reset(new CacheRep(cache));
}
#else
(void)cache;
#endif }
WriteBufferManager::~WriteBufferManager() {
#ifndef ROCKSDB_LITE
if (cache_rep_) {
for (auto* handle : cache_rep_->dummy_handles_) {
if (handle != nullptr) {
cache_rep_->cache_->Release(handle, true);
}
}
}
#endif }
void WriteBufferManager::ReserveMemWithCache(size_t mem) {
#ifndef ROCKSDB_LITE
assert(cache_rep_ != nullptr);
std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
while (new_mem_used > cache_rep_->cache_allocated_size_) {
Cache::Handle* handle = nullptr;
Status s =
cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr,
kSizeDummyEntry, nullptr, &handle);
s.PermitUncheckedError(); cache_rep_->dummy_handles_.push_back(handle);
cache_rep_->cache_allocated_size_ += kSizeDummyEntry;
dummy_size_.fetch_add(kSizeDummyEntry, std::memory_order_relaxed);
}
#else
(void)mem;
#endif }
void WriteBufferManager::FreeMemWithCache(size_t mem) {
#ifndef ROCKSDB_LITE
assert(cache_rep_ != nullptr);
std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 &&
cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) {
assert(!cache_rep_->dummy_handles_.empty());
auto* handle = cache_rep_->dummy_handles_.back();
if (handle != nullptr) {
cache_rep_->cache_->Release(handle, true);
}
cache_rep_->dummy_handles_.pop_back();
cache_rep_->cache_allocated_size_ -= kSizeDummyEntry;
dummy_size_.fetch_sub(kSizeDummyEntry, std::memory_order_relaxed);
}
#else
(void)mem;
#endif }
}