#pragma once
#include <atomic>
#include <vector>
#include "storage/buffer_manager/memory_manager.h"
#include "storage/buffer_manager/mm_allocator.h"
namespace lbug {
namespace function {
template<typename T>
class ObjectBlock {
public:
ObjectBlock(std::unique_ptr<storage::MemoryBuffer> block, uint64_t sizeInBytes)
: block{std::move(block)} {
maxElements.store(sizeInBytes / (sizeof(T)), std::memory_order_relaxed);
nextPosToWrite.store(0, std::memory_order_relaxed);
}
T* reserveNext() { return getData() + nextPosToWrite.fetch_add(1, std::memory_order_relaxed); }
void revertLast() { nextPosToWrite.fetch_sub(1, std::memory_order_relaxed); }
bool hasSpace() const {
return nextPosToWrite.load(std::memory_order_relaxed) <
maxElements.load(std::memory_order_relaxed);
}
private:
T* getData() const { return reinterpret_cast<T*>(block->getData()); }
private:
std::unique_ptr<storage::MemoryBuffer> block;
std::atomic<uint64_t> maxElements;
std::atomic<uint64_t> nextPosToWrite;
};
template<typename T>
class ObjectArray {
public:
ObjectArray() : size{0} {}
ObjectArray(const common::offset_t size, storage::MemoryManager* mm,
bool initializeToZero = false)
: size{size}, mm{mm} {
allocate(size, mm, initializeToZero);
}
void allocate(const common::offset_t size, storage::MemoryManager* mm, bool initializeToZero) {
allocation = mm->allocateBuffer(initializeToZero, size * sizeof(T));
data = std::span<T>(reinterpret_cast<T*>(allocation->getData()), size);
this->size = size;
}
common::offset_t getSize() const { return size; }
void reallocate(const common::offset_t newSize, storage::MemoryManager* mm) {
if (newSize > size) {
allocate(newSize, mm, false );
}
}
void set(const common::offset_t pos, const T value) {
ASSERT(pos < size);
data[pos] = value;
}
const T& get(const common::offset_t pos) const {
ASSERT(pos < size);
return data[pos];
}
T& getUnsafe(const common::offset_t pos) {
ASSERT(pos < size);
return data[pos];
}
private:
template<typename U>
friend class AtomicObjectArray;
common::offset_t size;
std::span<T> data;
std::unique_ptr<storage::MemoryBuffer> allocation;
storage::MemoryManager* mm = nullptr;
};
template<typename T>
class AtomicObjectArray {
public:
AtomicObjectArray() = default;
AtomicObjectArray(const common::offset_t size, storage::MemoryManager* mm,
bool initializeToZero = false)
: array{ObjectArray<std::atomic<T>>(size, mm, initializeToZero)} {}
common::offset_t getSize() const { return array.size; }
void reallocate(const common::offset_t newSize, storage::MemoryManager* mm) {
array.reallocate(newSize, mm);
}
void set(common::offset_t pos, const T& value,
std::memory_order order = std::memory_order_seq_cst) {
ASSERT(pos < array.size);
array.data[pos].store(value, order);
}
T get(const common::offset_t pos, std::memory_order order = std::memory_order_seq_cst) {
ASSERT(pos < array.size);
return array.data[pos].load(order);
}
void fetchAdd(common::offset_t pos, const T& value,
std::memory_order order = std::memory_order_seq_cst) {
ASSERT(pos < array.size);
array.data[pos].fetch_add(value, order);
}
bool compareExchangeMax(const common::offset_t src, const common::offset_t dest,
std::memory_order order = std::memory_order_seq_cst) {
auto srcValue = get(src, order);
auto dstValue = get(dest, order);
while (dstValue < srcValue) {
if (array.data[dest].compare_exchange_weak(dstValue, srcValue)) {
return true;
}
}
return false;
}
private:
ObjectArray<std::atomic<T>> array;
};
template<typename T>
class vector_t {
public:
explicit vector_t(storage::MemoryManager* mm) : vec(storage::MmAllocator<T>(mm)) {}
vector_t(storage::MemoryManager* mm, std::size_t size)
: vec(size, storage::MmAllocator<T>(mm)) {}
void reserve(std::size_t size) { vec.reserve(size); }
void resize(std::size_t size) { vec.resize(size); }
void push_back(const T& value) { vec.push_back(value); }
void push_back(T&& value) { vec.push_back(std::move(value)); }
bool empty() { return vec.empty(); }
auto begin() { return vec.begin(); }
auto end() { return vec.end(); }
auto begin() const { return vec.begin(); }
auto end() const { return vec.end(); }
template<typename... Args>
void emplace_back(Args&&... args) {
vec.emplace_back(std::forward<Args>(args)...);
}
void pop_back() { vec.pop_back(); }
void clear() { vec.clear(); }
std::size_t size() const { return vec.size(); }
T& operator[](std::size_t index) { return vec[index]; }
const T& operator[](std::size_t index) const { return vec[index]; }
T& at(std::size_t index) { return vec.at(index); }
const T& at(std::size_t index) const { return vec.at(index); }
private:
std::vector<T, storage::MmAllocator<T>> vec;
};
template<typename T>
class GDSDenseObjectManager {
public:
void allocate(common::table_id_t tableID, common::offset_t maxOffset,
storage::MemoryManager* mm) {
auto buffer = mm->allocateBuffer(false, maxOffset * sizeof(T));
bufferPerTable.insert({tableID, std::move(buffer)});
}
T* getData(common::table_id_t tableID) const {
DASSERT(bufferPerTable.contains(tableID));
return reinterpret_cast<T*>(bufferPerTable.at(tableID)->getData());
}
private:
common::table_id_map_t<std::unique_ptr<storage::MemoryBuffer>> bufferPerTable;
};
template<typename T>
class GDSSpareObjectManager {
public:
explicit GDSSpareObjectManager(
const common::table_id_map_t<common::offset_t>& nodeMaxOffsetMap) {
for (auto& [tableID, _] : nodeMaxOffsetMap) {
allocate(tableID);
}
}
void allocate(common::table_id_t tableID) {
DASSERT(!mapPerTable.contains(tableID));
mapPerTable.insert({tableID, {}});
}
const common::table_id_map_t<std::unordered_map<common::offset_t, T>>& getData() {
return mapPerTable;
}
std::unordered_map<common::offset_t, T>* getMap(common::table_id_t tableID) {
DASSERT(mapPerTable.contains(tableID));
return &mapPerTable.at(tableID);
}
std::unordered_map<common::offset_t, T>* getData(common::table_id_t tableID) {
if (!mapPerTable.contains(tableID)) {
mapPerTable.insert({tableID, {}});
}
DASSERT(mapPerTable.contains(tableID));
return &mapPerTable.at(tableID);
}
uint64_t size() const {
uint64_t result = 0;
for (auto [_, map] : mapPerTable) {
result += map.size();
}
return result;
}
private:
common::table_id_map_t<std::unordered_map<common::offset_t, T>> mapPerTable;
};
} }