#include "rocksdb/write_buffer_manager.h"
#include <memory>
#include "cache/cache_entry_roles.h"
#include "cache/cache_reservation_manager.h"
#include "db/db_impl/db_impl.h"
#include "rocksdb/status.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
WriteBufferManager::WriteBufferManager(size_t _buffer_size,
std::shared_ptr<Cache> cache,
bool allow_stall)
: buffer_size_(_buffer_size),
mutable_limit_(buffer_size_ * 7 / 8),
memory_used_(0),
memory_active_(0),
cache_res_mgr_(nullptr),
allow_stall_(allow_stall),
stall_active_(false) {
if (cache) {
cache_res_mgr_ = std::make_shared<
CacheReservationManagerImpl<CacheEntryRole::kWriteBuffer>>(
cache, true );
}
}
WriteBufferManager::~WriteBufferManager() {
#ifndef NDEBUG
std::unique_lock<std::mutex> lock(mu_);
assert(queue_.empty());
#endif
}
std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
if (cache_res_mgr_ != nullptr) {
return cache_res_mgr_->GetTotalReservedCacheSize();
} else {
return 0;
}
}
void WriteBufferManager::ReserveMem(size_t mem) {
if (cache_res_mgr_ != nullptr) {
ReserveMemWithCache(mem);
} else if (enabled()) {
memory_used_.fetch_add(mem, std::memory_order_relaxed);
}
if (enabled()) {
memory_active_.fetch_add(mem, std::memory_order_relaxed);
}
}
void WriteBufferManager::ReserveMemWithCache(size_t mem) {
assert(cache_res_mgr_ != nullptr);
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used);
s.PermitUncheckedError();
}
void WriteBufferManager::ScheduleFreeMem(size_t mem) {
if (enabled()) {
memory_active_.fetch_sub(mem, std::memory_order_relaxed);
}
}
void WriteBufferManager::FreeMem(size_t mem) {
if (cache_res_mgr_ != nullptr) {
FreeMemWithCache(mem);
} else if (enabled()) {
memory_used_.fetch_sub(mem, std::memory_order_relaxed);
}
MaybeEndWriteStall();
}
void WriteBufferManager::FreeMemWithCache(size_t mem) {
assert(cache_res_mgr_ != nullptr);
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used);
s.PermitUncheckedError();
}
void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
assert(wbm_stall != nullptr);
std::list<StallInterface*> new_node = {wbm_stall};
{
std::unique_lock<std::mutex> lock(mu_);
if (ShouldStall()) {
stall_active_.store(true, std::memory_order_relaxed);
queue_.splice(queue_.end(), std::move(new_node));
}
}
if (!new_node.empty()) {
new_node.front()->Signal();
}
}
void WriteBufferManager::MaybeEndWriteStall() {
if (allow_stall_.load(std::memory_order_relaxed) &&
IsStallThresholdExceeded()) {
return;
}
std::list<StallInterface*> cleanup;
std::unique_lock<std::mutex> lock(mu_);
if (!stall_active_.load(std::memory_order_relaxed)) {
return; }
stall_active_.store(false, std::memory_order_relaxed);
for (StallInterface* wbm_stall : queue_) {
wbm_stall->Signal();
}
cleanup = std::move(queue_);
}
void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
assert(wbm_stall != nullptr);
std::list<StallInterface*> cleanup;
if (enabled() && allow_stall_.load(std::memory_order_relaxed)) {
std::unique_lock<std::mutex> lock(mu_);
for (auto it = queue_.begin(); it != queue_.end();) {
auto next = std::next(it);
if (*it == wbm_stall) {
cleanup.splice(cleanup.end(), queue_, std::move(it));
}
it = next;
}
}
wbm_stall->Signal();
}
}