#pragma once
#include <atomic>
#include <memory>
#include <utility>
#include "memory/allocator.h"
#include "memory/arena.h"
#include "port/lang.h"
#include "port/likely.h"
#include "util/core_local.h"
#include "util/mutexlock.h"
#include "util/thread_local.h"
#ifdef __clang__
#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
#else
#define ROCKSDB_FIELD_UNUSED
#endif
namespace ROCKSDB_NAMESPACE {
class Logger;
class ConcurrentArena : public Allocator {
public:
explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
AllocTracker* tracker = nullptr,
size_t huge_page_size = 0);
char* Allocate(size_t bytes) override {
return AllocateImpl(bytes, false ,
[this, bytes]() { return arena_.Allocate(bytes); });
}
char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
Logger* logger = nullptr) override {
size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
(rounded_up % sizeof(void*)) == 0);
return AllocateImpl(rounded_up, huge_page_size != 0 ,
[this, rounded_up, huge_page_size, logger]() {
return arena_.AllocateAligned(rounded_up,
huge_page_size, logger);
});
}
size_t ApproximateMemoryUsage() const {
std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
lock.lock();
return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
}
size_t MemoryAllocatedBytes() const {
return memory_allocated_bytes_.load(std::memory_order_relaxed);
}
size_t AllocatedAndUnused() const {
return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
ShardAllocatedAndUnused();
}
size_t IrregularBlockNum() const {
return irregular_block_num_.load(std::memory_order_relaxed);
}
size_t BlockSize() const override { return arena_.BlockSize(); }
private:
struct Shard {
char padding[40] ROCKSDB_FIELD_UNUSED;
mutable SpinMutex mutex;
char* free_begin_;
std::atomic<size_t> allocated_and_unused_;
Shard() : free_begin_(nullptr), allocated_and_unused_(0) {}
};
static thread_local size_t tls_cpuid;
char padding0[56] ROCKSDB_FIELD_UNUSED;
size_t shard_block_size_;
CoreLocalArray<Shard> shards_;
Arena arena_;
mutable SpinMutex arena_mutex_;
std::atomic<size_t> arena_allocated_and_unused_;
std::atomic<size_t> memory_allocated_bytes_;
std::atomic<size_t> irregular_block_num_;
char padding1[56] ROCKSDB_FIELD_UNUSED;
Shard* Repick();
size_t ShardAllocatedAndUnused() const {
size_t total = 0;
for (size_t i = 0; i < shards_.Size(); ++i) {
total += shards_.AccessAtCore(i)->allocated_and_unused_.load(
std::memory_order_relaxed);
}
return total;
}
template <typename Func>
char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
size_t cpu;
std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
if (bytes > shard_block_size_ / 4 || force_arena ||
((cpu = tls_cpuid) == 0 &&
!shards_.AccessAtCore(0)->allocated_and_unused_.load(
std::memory_order_relaxed) &&
arena_lock.try_lock())) {
if (!arena_lock.owns_lock()) {
arena_lock.lock();
}
auto rv = func();
Fixup();
return rv;
}
Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
if (!s->mutex.try_lock()) {
s = Repick();
s->mutex.lock();
}
std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
if (avail < bytes) {
std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
assert(exact == arena_.AllocatedAndUnused());
if (exact >= bytes && arena_.IsInInlineBlock()) {
auto rv = func();
Fixup();
return rv;
}
avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
? exact
: shard_block_size_;
s->free_begin_ = arena_.AllocateAligned(avail);
Fixup();
}
s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
char* rv;
if ((bytes % sizeof(void*)) == 0) {
rv = s->free_begin_;
s->free_begin_ += bytes;
} else {
rv = s->free_begin_ + avail - bytes;
}
return rv;
}
void Fixup() {
arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
std::memory_order_relaxed);
memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
std::memory_order_relaxed);
irregular_block_num_.store(arena_.IrregularBlockNum(),
std::memory_order_relaxed);
}
ConcurrentArena(const ConcurrentArena&) = delete;
ConcurrentArena& operator=(const ConcurrentArena&) = delete;
};
}