#include "storage/overflow_file.h"
#include <memory>
#include "common/type_utils.h"
#include "common/types/types.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/file_handle.h"
#include "storage/shadow_utils.h"
#include "storage/storage_utils.h"
#include "transaction/transaction.h"
using namespace lbug::transaction;
using namespace lbug::common;
namespace lbug {
namespace storage {
std::string OverflowFileHandle::readString(TransactionType trxType, const string_t& str) const {
if (string_t::isShortString(str.len)) {
return str.getAsShortString();
}
PageCursor cursor;
TypeUtils::decodeOverflowPtr(str.overflowPtr, cursor.pageIdx, cursor.elemPosInPage);
std::string retVal;
retVal.reserve(str.len);
int32_t remainingLength = str.len;
while (remainingLength > 0) {
auto numBytesToReadInPage =
std::min(static_cast<uint32_t>(remainingLength), END_OF_PAGE - cursor.elemPosInPage);
auto startPosInSrc = retVal.size();
read(trxType, cursor.pageIdx, [&](uint8_t* frame) {
retVal.replace(startPosInSrc, numBytesToReadInPage,
std::string_view(reinterpret_cast<const char*>(frame) + cursor.elemPosInPage,
numBytesToReadInPage));
cursor.pageIdx = *reinterpret_cast<page_idx_t*>(frame + END_OF_PAGE);
});
remainingLength -= numBytesToReadInPage;
cursor.elemPosInPage = 0;
}
return retVal;
}
bool OverflowFileHandle::equals(TransactionType trxType, std::string_view keyToLookup,
const string_t& keyInEntry) const {
PageCursor cursor;
TypeUtils::decodeOverflowPtr(keyInEntry.overflowPtr, cursor.pageIdx, cursor.elemPosInPage);
auto lengthRead = 0u;
while (lengthRead < keyInEntry.len) {
auto numBytesToCheckInPage = std::min(static_cast<page_idx_t>(keyInEntry.len) - lengthRead,
END_OF_PAGE - cursor.elemPosInPage);
bool equal = true;
read(trxType, cursor.pageIdx, [&](auto* frame) {
equal = memcmp(keyToLookup.data() + lengthRead, frame + cursor.elemPosInPage,
numBytesToCheckInPage) == 0;
cursor.pageIdx = *reinterpret_cast<page_idx_t*>(frame + END_OF_PAGE);
});
if (!equal) {
return false;
}
cursor.elemPosInPage = 0;
lengthRead += numBytesToCheckInPage;
}
return true;
}
uint8_t* OverflowFileHandle::addANewPage(PageAllocator* pageAllocator) {
page_idx_t newPageIdx = overflowFile.getNewPageIdx(pageAllocator);
if (pageWriteCache.size() > 0) {
memcpy(pageWriteCache[nextPosToWriteTo.pageIdx].buffer->getData() + END_OF_PAGE,
&newPageIdx, sizeof(page_idx_t));
}
if (startPageIdx == INVALID_PAGE_IDX) {
startPageIdx = newPageIdx;
}
pageWriteCache.emplace(newPageIdx,
CachedPage{.buffer = overflowFile.memoryManager.allocateBuffer(true ,
LBUG_PAGE_SIZE),
.newPage = true});
nextPosToWriteTo.elemPosInPage = 0;
nextPosToWriteTo.pageIdx = newPageIdx;
return pageWriteCache[newPageIdx].buffer->getData();
}
void OverflowFileHandle::setStringOverflow(PageAllocator* pageAllocator, const char* srcRawString,
uint64_t len, string_t& diskDstString) {
if (len <= string_t::SHORT_STR_LENGTH) {
return;
}
overflowFile.headerChanged = true;
uint8_t* pageToWrite = nullptr;
if (nextPosToWriteTo.pageIdx == INVALID_PAGE_IDX) {
pageToWrite = addANewPage(pageAllocator);
} else {
auto cached = pageWriteCache.find(nextPosToWriteTo.pageIdx);
if (cached != pageWriteCache.end()) {
pageToWrite = cached->second.buffer->getData();
} else {
overflowFile.readFromDisk(TransactionType::CHECKPOINT, nextPosToWriteTo.pageIdx,
[&](auto* frame) {
auto page = overflowFile.memoryManager.allocateBuffer(
false , LBUG_PAGE_SIZE);
memcpy(page->getData(), frame, LBUG_PAGE_SIZE);
pageToWrite = page->getData();
pageWriteCache.emplace(nextPosToWriteTo.pageIdx,
CachedPage{.buffer = std::move(page), .newPage = false});
});
}
}
int32_t remainingLength = len;
TypeUtils::encodeOverflowPtr(diskDstString.overflowPtr, nextPosToWriteTo.pageIdx,
nextPosToWriteTo.elemPosInPage);
while (remainingLength > 0) {
auto bytesWritten = len - remainingLength;
auto numBytesToWriteInPage = std::min(static_cast<uint32_t>(remainingLength),
END_OF_PAGE - nextPosToWriteTo.elemPosInPage);
memcpy(pageToWrite + nextPosToWriteTo.elemPosInPage, srcRawString + bytesWritten,
numBytesToWriteInPage);
remainingLength -= numBytesToWriteInPage;
nextPosToWriteTo.elemPosInPage += numBytesToWriteInPage;
if (nextPosToWriteTo.elemPosInPage >= END_OF_PAGE) {
pageToWrite = addANewPage(pageAllocator);
}
}
}
string_t OverflowFileHandle::writeString(PageAllocator* pageAllocator, std::string_view rawString) {
string_t result;
result.len = rawString.length();
auto shortStrLen = string_t::SHORT_STR_LENGTH;
auto inlineLen = std::min(shortStrLen, static_cast<uint64_t>(result.len));
memcpy(result.prefix, rawString.data(), inlineLen);
setStringOverflow(pageAllocator, rawString.data(), rawString.length(), result);
return result;
}
void OverflowFileHandle::checkpoint() {
for (auto& [pageIndex, page] : pageWriteCache) {
overflowFile.writePageToDisk(pageIndex, page.buffer->getData(), page.newPage);
}
}
void OverflowFileHandle::reclaimStorage(PageAllocator& pageAllocator) {
if (startPageIdx == INVALID_PAGE_IDX) {
return;
}
auto pageIdx = startPageIdx;
while (true) {
if (pageIdx == 0 || pageIdx == INVALID_PAGE_IDX) [[unlikely]] {
throw RuntimeException(
"The overflow file has been corrupted, this should never happen.");
}
pageAllocator.freePage(pageIdx);
if (pageIdx == nextPosToWriteTo.pageIdx) {
break;
}
DASSERT(!pageWriteCache.contains(pageIdx));
overflowFile.readFromDisk(TransactionType::CHECKPOINT, pageIdx, [&pageIdx](auto* frame) {
pageIdx = *reinterpret_cast<page_idx_t*>(frame + END_OF_PAGE);
});
}
}
void OverflowFileHandle::read(TransactionType trxType, page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func) const {
auto cachedPage = pageWriteCache.find(pageIdx);
if (cachedPage != pageWriteCache.end()) {
return func(cachedPage->second.buffer->getData());
}
overflowFile.readFromDisk(trxType, pageIdx, func);
}
OverflowFile::OverflowFile(FileHandle* fileHandle, MemoryManager& memoryManager,
ShadowFile* shadowFile, page_idx_t headerPageIdx)
: fileHandle{fileHandle}, shadowFile{shadowFile}, memoryManager{memoryManager},
headerChanged{false}, headerPageIdx{headerPageIdx} {
DASSERT(shadowFile);
if (headerPageIdx != INVALID_PAGE_IDX) {
readFromDisk(TransactionType::READ_ONLY, headerPageIdx,
[&](auto* frame) { memcpy(&header, frame, sizeof(header)); });
} else {
header = StringOverflowFileHeader();
}
}
OverflowFile::OverflowFile(MemoryManager& memoryManager)
: fileHandle{nullptr}, shadowFile{nullptr}, memoryManager{memoryManager}, headerChanged{false},
headerPageIdx{INVALID_PAGE_IDX} {
this->headerPageIdx = getNewPageIdx(nullptr);
header = StringOverflowFileHeader();
}
common::page_idx_t OverflowFile::getNewPageIdx(PageAllocator* pageAllocator) {
if (pageAllocator) {
return pageAllocator->allocatePage();
} else {
return pageCounter.fetch_add(1);
}
}
void OverflowFile::readFromDisk(TransactionType trxType, page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func) const {
DASSERT(shadowFile);
auto [fileHandleToPin, pageIdxToPin] = ShadowUtils::getFileHandleAndPhysicalPageIdxToPin(
*fileHandle, pageIdx, *shadowFile, trxType);
fileHandleToPin->optimisticReadPage(pageIdxToPin, func);
}
void OverflowFile::writePageToDisk(page_idx_t pageIdx, uint8_t* data, bool newPage) const {
if (newPage) {
DASSERT(fileHandle);
DASSERT(!fileHandle->isInMemoryMode());
fileHandle->writePageToFile(data, pageIdx);
} else {
DASSERT(shadowFile);
ShadowUtils::updatePage(*fileHandle, pageIdx, true ,
*shadowFile, [&](auto* frame) { memcpy(frame, data, LBUG_PAGE_SIZE); });
}
}
void OverflowFile::checkpoint(PageAllocator& pageAllocator) {
DASSERT(fileHandle);
if (headerPageIdx == INVALID_PAGE_IDX) {
this->headerPageIdx = getNewPageIdx(&pageAllocator);
headerChanged = true;
}
for (auto& handle : handles) {
handle->checkpoint();
}
if (headerChanged) {
uint8_t page[LBUG_PAGE_SIZE];
memcpy(page, &header, sizeof(header));
std::fill(page + sizeof(header), page + LBUG_PAGE_SIZE, 0);
writePageToDisk(headerPageIdx + HEADER_PAGE_IDX, page, false );
}
}
void OverflowFile::checkpointInMemory() {
headerChanged = false;
}
void OverflowFile::rollbackInMemory() {
DASSERT(getFileHandle()->getNumPages() <= INVALID_PAGE_IDX);
if (getFileHandle()->getNumPages() > headerPageIdx) {
readFromDisk(TransactionType::READ_ONLY, headerPageIdx,
[&](auto* frame) { memcpy(&header, frame, sizeof(header)); });
}
for (auto i = 0u; i < handles.size(); i++) {
auto& handle = handles[i];
handle->rollbackInMemory(header.entries[i].cursor);
}
}
void OverflowFile::reclaimStorage(PageAllocator& pageAllocator) const {
for (auto& handle : handles) {
handle->reclaimStorage(pageAllocator);
}
if (headerPageIdx != INVALID_PAGE_IDX) {
pageAllocator.freePage(headerPageIdx);
}
}
} }