#include "db/write_controller.h"
#include <algorithm>
#include <atomic>
#include <cassert>
#include <ratio>
#include "rocksdb/system_clock.h"
namespace ROCKSDB_NAMESPACE {
std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
++total_stopped_;
return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
}
std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
uint64_t write_rate) {
if (0 == total_delayed_++) {
next_refill_time_ = 0;
credit_in_bytes_ = 0;
}
set_delayed_write_rate(write_rate);
return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
}
std::unique_ptr<WriteControllerToken>
WriteController::GetCompactionPressureToken() {
++total_compaction_pressure_;
return std::unique_ptr<WriteControllerToken>(
new CompactionPressureToken(this));
}
bool WriteController::IsStopped() const {
return total_stopped_.load(std::memory_order_relaxed) > 0;
}
uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
if (total_stopped_.load(std::memory_order_relaxed) > 0) {
return 0;
}
if (total_delayed_.load(std::memory_order_relaxed) == 0) {
return 0;
}
if (credit_in_bytes_ >= num_bytes) {
credit_in_bytes_ -= num_bytes;
return 0;
}
auto time_now = NowMicrosMonotonic(clock);
const uint64_t kMicrosPerSecond = 1000000;
const uint64_t kMicrosPerRefill = 1000;
if (next_refill_time_ == 0) {
next_refill_time_ = time_now;
}
if (next_refill_time_ <= time_now) {
uint64_t elapsed = time_now - next_refill_time_ + kMicrosPerRefill;
credit_in_bytes_ += static_cast<uint64_t>(
1.0 * elapsed / kMicrosPerSecond * delayed_write_rate_ + 0.999999);
next_refill_time_ = time_now + kMicrosPerRefill;
if (credit_in_bytes_ >= num_bytes) {
credit_in_bytes_ -= num_bytes;
return 0;
}
}
assert(num_bytes > credit_in_bytes_);
uint64_t bytes_over_budget = num_bytes - credit_in_bytes_;
uint64_t needed_delay = static_cast<uint64_t>(
1.0 * bytes_over_budget / delayed_write_rate_ * kMicrosPerSecond);
credit_in_bytes_ = 0;
next_refill_time_ += needed_delay;
return std::max(next_refill_time_ - time_now, kMicrosPerRefill);
}
uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) {
return clock->NowNanos() / std::milli::den;
}
StopWriteToken::~StopWriteToken() {
assert(controller_->total_stopped_ >= 1);
--controller_->total_stopped_;
}
DelayWriteToken::~DelayWriteToken() {
controller_->total_delayed_--;
assert(controller_->total_delayed_.load() >= 0);
}
CompactionPressureToken::~CompactionPressureToken() {
controller_->total_compaction_pressure_--;
assert(controller_->total_compaction_pressure_ >= 0);
}
}