#include "storage/disk_array_collection.h"
#include <memory>
#include "common/exception/runtime.h"
#include "common/system_config.h"
#include "common/types/types.h"
#include "storage/file_handle.h"
#include "storage/shadow_utils.h"
using namespace lbug::common;
namespace lbug {
namespace storage {
DiskArrayCollection::DiskArrayCollection(FileHandle& fileHandle, ShadowFile& shadowFile,
bool bypassShadowing)
: fileHandle(fileHandle), shadowFile{shadowFile}, bypassShadowing{bypassShadowing},
numHeaders{0} {
headersForReadTrx.push_back(std::make_unique<HeaderPage>());
headersForWriteTrx.push_back(std::make_unique<HeaderPage>());
headerPagesOnDisk = 0;
}
DiskArrayCollection::DiskArrayCollection(FileHandle& fileHandle, ShadowFile& shadowFile,
page_idx_t firstHeaderPage, bool bypassShadowing)
: fileHandle(fileHandle), shadowFile{shadowFile}, bypassShadowing{bypassShadowing},
numHeaders{0} {
page_idx_t headerPageIdx = firstHeaderPage;
do {
std::unique_ptr<HeaderPage> headerPage;
page_idx_t nextHeaderPageIdx = INVALID_PAGE_IDX;
fileHandle.optimisticReadPage(headerPageIdx, [&](auto* frame) {
const auto page = reinterpret_cast<HeaderPage*>(frame);
headerPage = std::make_unique<HeaderPage>(*page);
nextHeaderPageIdx = page->nextHeaderPage;
});
if (!headerPage) {
throw RuntimeException("Failed to read header page from disk.");
}
numHeaders += headerPage->numHeaders;
headersForReadTrx.push_back(std::make_unique<HeaderPage>(*headerPage));
headersForWriteTrx.push_back(std::move(headerPage));
headerPageIdx = nextHeaderPageIdx;
} while (headerPageIdx != INVALID_PAGE_IDX);
headerPagesOnDisk = headersForReadTrx.size();
}
void DiskArrayCollection::checkpoint(page_idx_t firstHeaderPage, PageAllocator& pageAllocator) {
page_idx_t headerPage = firstHeaderPage;
for (page_idx_t indexInMemory = 0; indexInMemory < headersForWriteTrx.size(); indexInMemory++) {
if (headersForWriteTrx[indexInMemory]->nextHeaderPage == INVALID_PAGE_IDX &&
indexInMemory < headersForWriteTrx.size() - 1) {
populateNextHeaderPage(pageAllocator, indexInMemory);
}
if (indexInMemory >= headerPagesOnDisk ||
*headersForWriteTrx[indexInMemory] != *headersForReadTrx[indexInMemory]) {
ShadowUtils::updatePage(*pageAllocator.getDataFH(), headerPage,
true , shadowFile, [&](auto* frame) {
memcpy(frame, headersForWriteTrx[indexInMemory].get(), sizeof(HeaderPage));
if constexpr (sizeof(HeaderPage) < LBUG_PAGE_SIZE) {
std::fill(frame + sizeof(HeaderPage), frame + LBUG_PAGE_SIZE, 0);
}
});
}
headerPage = headersForWriteTrx[indexInMemory]->nextHeaderPage;
}
headerPagesOnDisk = headersForWriteTrx.size();
}
void DiskArrayCollection::populateNextHeaderPage(PageAllocator& pageAllocator,
common::page_idx_t indexInMemory) {
auto nextHeaderPage = pageAllocator.allocatePage();
headersForWriteTrx[indexInMemory]->nextHeaderPage = nextHeaderPage;
headersForReadTrx[indexInMemory]->nextHeaderPage = nextHeaderPage;
}
size_t DiskArrayCollection::addDiskArray() {
auto oldSize = numHeaders++;
auto pageIdx = numHeaders % HeaderPage::NUM_HEADERS_PER_PAGE;
if (pageIdx >= headersForWriteTrx.size()) {
headersForWriteTrx.emplace_back(std::make_unique<HeaderPage>());
headersForReadTrx.emplace_back(std::make_unique<HeaderPage>());
}
auto& headerPage = *headersForWriteTrx[pageIdx];
DASSERT(headerPage.numHeaders < HeaderPage::NUM_HEADERS_PER_PAGE);
auto indexInPage = headerPage.numHeaders;
headerPage.headers[indexInPage] = DiskArrayHeader();
headerPage.numHeaders++;
headersForReadTrx[pageIdx]->numHeaders++;
return oldSize;
}
void DiskArrayCollection::reclaimStorage(PageAllocator& pageAllocator,
common::page_idx_t firstHeaderPage) const {
auto headerPage = firstHeaderPage;
for (page_idx_t indexInMemory = 0; indexInMemory < headersForReadTrx.size(); indexInMemory++) {
if (headerPage == INVALID_PAGE_IDX) {
break;
}
pageAllocator.freePage(headerPage);
headerPage = headersForReadTrx[indexInMemory]->nextHeaderPage;
}
}
} }