#include "storage/buffer_manager/buffer_manager.h"
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstring>
#include <memory>
#include <thread>
#include "common/assert.h"
#include "common/constants.h"
#include "common/exception/buffer_manager.h"
#include "common/file_system/local_file_system.h"
#include "common/file_system/virtual_file_system.h"
#include "common/types/types.h"
#include "main/db_config.h"
#include "storage/buffer_manager/spiller.h"
#include "storage/file_handle.h"
#include "storage/table/column_chunk_data.h"
#include <span>
#if defined(_WIN32)
#include <exception>
#include <eh.h>
#include <errhandlingapi.h>
#include <format>
#include <memoryapi.h>
#include <windows.h>
#include <winnt.h>
#endif
using namespace lbug::common;
namespace lbug {
namespace storage {
bool EvictionQueue::insert(uint32_t fileIndex, page_idx_t pageIndex) {
EvictionCandidate candidate{fileIndex, pageIndex};
while (size < capacity) {
auto emptyCandidate = EMPTY;
if (data[insertCursor.fetch_add(1, std::memory_order_relaxed) % capacity]
.compare_exchange_weak(emptyCandidate, candidate)) {
size++;
return true;
}
}
return false;
}
std::span<std::atomic<EvictionCandidate>, EvictionQueue::BATCH_SIZE> EvictionQueue::next() {
return std::span<std::atomic<EvictionCandidate>, BATCH_SIZE>(
data.get() + ((evictionCursor += BATCH_SIZE) % capacity), BATCH_SIZE);
}
void EvictionQueue::clear(std::atomic<EvictionCandidate>& candidate) {
auto nonEmpty = candidate.load();
if (nonEmpty != EMPTY && candidate.compare_exchange_strong(nonEmpty, EMPTY)) {
size--;
return;
}
UNREACHABLE_CODE;
}
bool EvictionQueue::tryClear(std::atomic<EvictionCandidate>& candidate,
const EvictionCandidate& expected) {
auto current = expected;
if (candidate.compare_exchange_strong(current, EMPTY)) {
size--;
return true;
}
return false;
}
BufferManager::BufferManager(const std::string& databasePath, const std::string& spillToDiskPath,
uint64_t bufferPoolSize, uint64_t maxDBSize, VirtualFileSystem* vfs, bool readOnly)
: bufferPoolSize{bufferPoolSize}, evictionQueue{bufferPoolSize / LBUG_PAGE_SIZE},
usedMemory{evictionQueue.getCapacity() * sizeof(EvictionCandidate)}, vfs{vfs} {
verifySizeParams(bufferPoolSize, maxDBSize);
#if !BM_MALLOC
vmRegions[0] = std::make_unique<VMRegion>(REGULAR_PAGE, maxDBSize);
vmRegions[1] = std::make_unique<VMRegion>(TEMP_PAGE, bufferPoolSize);
#endif
if (!readOnly && !main::DBConfig::isDBPathInMemory(databasePath) &&
dynamic_cast<LocalFileSystem*>(vfs->findFileSystem(spillToDiskPath))) {
spiller = std::make_unique<Spiller>(spillToDiskPath, *this, vfs);
}
}
void BufferManager::verifySizeParams(uint64_t bufferPoolSize, uint64_t maxDBSize) {
if (bufferPoolSize < LBUG_PAGE_SIZE) {
throw BufferManagerException(
std::format("The given buffer pool size should be at least {} bytes.", LBUG_PAGE_SIZE));
}
if (maxDBSize < 2 * LBUG_PAGE_SIZE * StorageConstants::PAGE_GROUP_SIZE) {
throw BufferManagerException(
"The given max db size should be at least " +
std::to_string(2 * LBUG_PAGE_SIZE * StorageConstants::PAGE_GROUP_SIZE) + " bytes.");
}
if ((maxDBSize & (maxDBSize - 1)) != 0) {
throw BufferManagerException("The given max db size should be a power of 2.");
}
}
uint8_t* BufferManager::pin(FileHandle& fileHandle, page_idx_t pageIdx,
PageReadPolicy pageReadPolicy) {
auto pageState = fileHandle.getPageState(pageIdx);
while (true) {
auto currStateAndVersion = pageState->getStateAndVersion();
switch (PageState::getState(currStateAndVersion)) {
case PageState::EVICTED: {
if (pageState->tryLock(currStateAndVersion)) {
if (!claimAFrame(fileHandle, pageIdx, pageReadPolicy)) {
pageState->resetToEvicted();
throw BufferManagerException("Unable to allocate memory! The buffer pool is "
"full and no memory could be freed!");
}
if (!evictionQueue.insert(fileHandle.getFileIndex(), pageIdx)) {
throw BufferManagerException(
"Eviction queue is full! This should be impossible.");
}
#if BM_MALLOC
DASSERT(pageState->getPage());
return pageState->getPage();
#else
return getFrame(fileHandle, pageIdx);
#endif
}
} break;
case PageState::UNLOCKED:
case PageState::MARKED: {
if (pageState->tryLock(currStateAndVersion)) {
return getFrame(fileHandle, pageIdx);
}
} break;
case PageState::LOCKED: {
continue;
}
default: {
UNREACHABLE_CODE;
}
}
}
}
#if defined(WIN32)
class AccessViolation : public std::exception {
public:
AccessViolation(const uint8_t* location) : location{location} {}
const uint8_t* location;
};
class ScopedTranslator {
const _se_translator_function old;
public:
ScopedTranslator(_se_translator_function newTranslator)
: old{_set_se_translator(newTranslator)} {}
~ScopedTranslator() { _set_se_translator(old); }
};
void handleAccessViolation(unsigned int exceptionCode, PEXCEPTION_POINTERS exceptionRecord) {
if (exceptionCode == EXCEPTION_ACCESS_VIOLATION
&& exceptionRecord->ExceptionRecord->ExceptionInformation[0] == 0) [[likely]] {
throw AccessViolation(
(const uint8_t*)exceptionRecord->ExceptionRecord->ExceptionInformation[1]);
}
throw exceptionCode;
}
#endif
inline bool try_func(const std::function<void(uint8_t*)>& func, uint8_t* frame,
const std::array<std::unique_ptr<VMRegion>, 2>& vmRegions [[maybe_unused]],
PageSizeClass pageSizeClass [[maybe_unused]], [[maybe_unused]] PageState* pageState) {
#if BM_MALLOC
if (frame == nullptr) {
return false;
}
pageState->addReader();
#endif
#if defined(_WIN32) && !BM_MALLOC
try {
#endif
func(frame);
#if defined(_WIN32) && !BM_MALLOC
} catch (AccessViolation& exc) {
if (vmRegions[pageSizeClass]->contains(exc.location)) {
return false;
} else {
throw EXCEPTION_ACCESS_VIOLATION;
}
}
#endif
#if BM_MALLOC
pageState->removeReader();
#endif
return true;
}
void BufferManager::optimisticRead(FileHandle& fileHandle, page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func) {
auto pageState = fileHandle.getPageState(pageIdx);
#if defined(_WIN32)
auto translator = ScopedTranslator(handleAccessViolation);
#endif
while (true) {
auto currStateAndVersion = pageState->getStateAndVersion();
switch (PageState::getState(currStateAndVersion)) {
case PageState::UNLOCKED: {
if (!try_func(func, getFrame(fileHandle, pageIdx), vmRegions,
fileHandle.getPageSizeClass(), pageState)) {
continue;
}
if (pageState->getStateAndVersion() == currStateAndVersion) {
return;
}
} break;
case PageState::MARKED: {
pageState->tryClearMark(currStateAndVersion);
continue;
}
case PageState::EVICTED: {
pin(fileHandle, pageIdx, PageReadPolicy::READ_PAGE);
unpin(fileHandle, pageIdx);
} break;
default: {
continue;
}
}
}
}
void BufferManager::unpin(FileHandle& fileHandle, page_idx_t pageIdx) {
auto pageState = fileHandle.getPageState(pageIdx);
pageState->unlock();
}
uint64_t BufferManager::evictPages() {
std::array<std::atomic<EvictionCandidate>*, EvictionQueue::BATCH_SIZE> evictionCandidates{};
size_t evictablePages = 0;
uint64_t claimedMemory = 0;
auto startCursor = evictionQueue.getEvictionCursor();
auto failureLimit = evictionQueue.getCapacity() * 2;
while (evictablePages == 0 && evictionQueue.getEvictionCursor() - startCursor < failureLimit) {
for (auto& candidate : evictionQueue.next()) {
auto evictionCandidate = candidate.load();
if (evictionCandidate == EvictionQueue::EMPTY) {
continue;
}
DASSERT(evictionCandidate.fileIdx < fileHandles.size());
auto* pageState =
fileHandles[evictionCandidate.fileIdx]->getPageState(evictionCandidate.pageIdx);
auto pageStateAndVersion = pageState->getStateAndVersion();
if (!evictionCandidate.isEvictable(pageStateAndVersion)) {
if (evictionCandidate.isSecondChanceEvictable(pageStateAndVersion)) {
pageState->tryMark(pageStateAndVersion);
} else if (evictionCandidate.isEvicted(pageStateAndVersion)) {
if (pageState->tryLock(pageStateAndVersion)) {
evictionQueue.tryClear(candidate, evictionCandidate);
pageState->resetToEvicted();
}
}
continue;
}
evictionCandidates[evictablePages++] = &candidate;
}
}
for (size_t i = 0; i < evictablePages; i++) {
claimedMemory += tryEvictPage(*evictionCandidates[i]);
}
return claimedMemory;
}
void BufferManager::removeEvictedCandidates() {
auto startCursor = evictionQueue.getEvictionCursor();
while (evictionQueue.getEvictionCursor() - startCursor < evictionQueue.getCapacity()) {
for (auto& candidate : evictionQueue.next()) {
auto evictionCandidate = candidate.load();
if (evictionCandidate == EvictionQueue::EMPTY) {
continue;
}
DASSERT(evictionCandidate.fileIdx < fileHandles.size());
auto* pageState =
fileHandles[evictionCandidate.fileIdx]->getPageState(evictionCandidate.pageIdx);
auto pageStateAndVersion = pageState->getStateAndVersion();
if (PageState::getState(pageStateAndVersion) == PageState::EVICTED) {
evictionQueue.clear(candidate);
}
}
}
}
bool BufferManager::claimAFrame(FileHandle& fileHandle, page_idx_t pageIdx,
PageReadPolicy pageReadPolicy) {
page_offset_t pageSizeToClaim = fileHandle.getPageSize();
if (!reserve(pageSizeToClaim)) {
return false;
}
#if _WIN32 && !BM_MALLOC
auto result =
VirtualAlloc(getFrame(fileHandle, pageIdx), pageSizeToClaim, MEM_COMMIT, PAGE_READWRITE);
if (result == NULL) {
throw BufferManagerException(
std::format("VirtualAlloc MEM_COMMIT failed with error code {}: {}.", GetLastError(),
std::system_category().message(GetLastError())));
}
#endif
cachePageIntoFrame(fileHandle, pageIdx, pageReadPolicy);
return true;
}
bool BufferManager::reserve(uint64_t sizeToReserve) {
usedMemory += sizeToReserve;
uint64_t totalClaimedMemory = 0;
uint64_t nonEvictableClaimedMemory = 0;
const auto needMoreMemory = [&]() {
return sizeToReserve > totalClaimedMemory &&
usedMemory > bufferPoolSize.load() - totalClaimedMemory;
};
uint8_t failedCount = 0;
while (needMoreMemory()) {
uint64_t memoryClaimed = 0;
if (!spiller || usedMemory - nonEvictableMemory > bufferPoolSize / 2) {
memoryClaimed = evictPages();
} else {
auto [_memoryClaimed, nowEvictableMemory] = spiller->claimNextGroup();
memoryClaimed = _memoryClaimed;
nonEvictableClaimedMemory += _memoryClaimed;
nonEvictableMemory -= nowEvictableMemory;
if (memoryClaimed == 0 || nowEvictableMemory > 0) {
memoryClaimed = evictPages();
}
}
if (memoryClaimed == 0 && needMoreMemory()) {
if (failedCount++ < 2) {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
} else {
freeUsedMemory(sizeToReserve + totalClaimedMemory);
nonEvictableMemory -= nonEvictableClaimedMemory;
return false;
}
}
totalClaimedMemory += memoryClaimed;
}
if (totalClaimedMemory > 0) {
freeUsedMemory(totalClaimedMemory);
nonEvictableMemory -= nonEvictableClaimedMemory;
}
return true;
}
uint64_t BufferManager::tryEvictPage(std::atomic<EvictionCandidate>& _candidate) {
auto candidate = _candidate.load();
if (candidate.pageIdx == INVALID_PAGE_IDX) {
return 0;
}
auto& pageState = *fileHandles[candidate.fileIdx]->getPageState(candidate.pageIdx);
auto currStateAndVersion = pageState.getStateAndVersion();
if (!candidate.isEvictable(currStateAndVersion) || !pageState.tryLock(currStateAndVersion)) {
return 0;
}
if (_candidate.load() != candidate
#if BM_MALLOC
|| pageState.getReaderCount() > 0
#endif
) {
pageState.unlockUnchanged();
return 0;
}
if (fileHandles[candidate.fileIdx]->isInMemoryMode()) {
pageState.unlockUnchanged();
return 0;
}
auto& fileHandle = *fileHandles[candidate.fileIdx];
fileHandle.flushPageIfDirtyWithoutLock(candidate.pageIdx);
auto numBytesFreed = fileHandle.getPageSize();
releaseFrameForPage(fileHandle, candidate.pageIdx);
evictionQueue.clear(_candidate);
pageState.resetToEvicted();
return numBytesFreed;
}
void BufferManager::cachePageIntoFrame(FileHandle& fileHandle, page_idx_t pageIdx,
PageReadPolicy pageReadPolicy) {
auto pageState = fileHandle.getPageState(pageIdx);
pageState->clearDirty();
#if BM_MALLOC
pageState->allocatePage(fileHandle.getPageSize());
if (pageReadPolicy == PageReadPolicy::READ_PAGE) {
fileHandle.readPageFromDisk(pageState->getPage(), pageIdx);
}
#else
if (pageReadPolicy == PageReadPolicy::READ_PAGE) {
fileHandle.readPageFromDisk(getFrame(fileHandle, pageIdx), pageIdx);
}
#endif
}
void BufferManager::removeFilePagesFromFrames(FileHandle& fileHandle) {
for (auto pageIdx = 0u; pageIdx < fileHandle.getNumPages(); ++pageIdx) {
removePageFromFrame(fileHandle, pageIdx, false );
}
}
void BufferManager::updateFrameIfPageIsInFrameWithoutLock(file_idx_t fileIdx,
const uint8_t* newPage, page_idx_t pageIdx) {
DASSERT(fileIdx < fileHandles.size());
auto& fileHandle = *fileHandles[fileIdx];
auto state = fileHandle.getPageState(pageIdx);
if (state && state->getState() != PageState::EVICTED) {
memcpy(getFrame(fileHandle, pageIdx), newPage, LBUG_PAGE_SIZE);
}
}
void BufferManager::updateFrameIfPageIsInFrame(file_idx_t fileIdx, const uint8_t* newPage,
page_idx_t pageIdx) {
DASSERT(fileIdx < fileHandles.size());
auto& fileHandle = *fileHandles[fileIdx];
auto pageState = fileHandle.getPageState(pageIdx);
if (!pageState) {
return;
}
while (true) {
auto currentStateAndVersion = pageState->getStateAndVersion();
if (PageState::getState(currentStateAndVersion) == PageState::EVICTED) {
return;
}
if (pageState->tryLock(currentStateAndVersion)) {
break;
}
}
memcpy(getFrame(fileHandle, pageIdx), newPage, LBUG_PAGE_SIZE);
pageState->unlock();
}
void BufferManager::removePageFromFrameIfNecessary(FileHandle& fileHandle, page_idx_t pageIdx) {
if (pageIdx >= fileHandle.getNumPages()) {
return;
}
removePageFromFrame(fileHandle, pageIdx, false );
}
void BufferManager::removePageFromFrame(FileHandle& fileHandle, page_idx_t pageIdx,
bool shouldFlush) {
auto pageState = fileHandle.getPageState(pageIdx);
if (PageState::getState(pageState->getStateAndVersion()) == PageState::EVICTED) {
return;
}
pageState->spinLock(pageState->getStateAndVersion());
if (shouldFlush) {
fileHandle.flushPageIfDirtyWithoutLock(pageIdx);
}
releaseFrameForPage(fileHandle, pageIdx);
freeUsedMemory(fileHandle.getPageSize());
pageState->resetToEvicted();
}
uint64_t BufferManager::freeUsedMemory(uint64_t size) {
DASSERT(usedMemory.load() >= size);
return usedMemory.fetch_sub(size);
}
void BufferManager::resetSpiller(std::string spillPath) {
if (spillPath.empty()) {
spiller = nullptr;
} else {
spiller = std::make_unique<Spiller>(spillPath, *this, vfs);
}
}
BufferManager::~BufferManager() = default;
} }