#pragma once
#include <cstdint>
#include <cstring>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include "common/copy_constructors.h"
#include "common/types/types.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/shadow_utils.h"
#include "storage/storage_utils.h"
#include "transaction/transaction.h"
#include <bit>
#include <span>
namespace lbug {
namespace storage {
class PageAllocator;
class FileHandle;
class BufferManager;
static constexpr uint64_t NUM_PAGE_IDXS_PER_PIP =
(common::LBUG_PAGE_SIZE - sizeof(common::page_idx_t)) / sizeof(common::page_idx_t);
struct DiskArrayHeader {
DiskArrayHeader() : numElements{0}, firstPIPPageIdx{common::INVALID_PAGE_IDX} {}
bool operator==(const DiskArrayHeader& other) const = default;
uint64_t numElements;
common::page_idx_t firstPIPPageIdx;
uint32_t _padding{};
};
static_assert(std::has_unique_object_representations_v<DiskArrayHeader>);
struct PageStorageInfo {
explicit PageStorageInfo(uint64_t elementSize);
uint64_t alignedElementSize;
uint64_t numElementsPerPage;
};
struct PIP {
PIP() : nextPipPageIdx{ShadowUtils::NULL_PAGE_IDX}, pageIdxs{} {
for (auto& pageIdx : pageIdxs) {
pageIdx = ShadowUtils::NULL_PAGE_IDX;
}
}
common::page_idx_t nextPipPageIdx;
common::page_idx_t pageIdxs[NUM_PAGE_IDXS_PER_PIP];
};
static_assert(sizeof(PIP) == common::LBUG_PAGE_SIZE);
struct PIPWrapper {
PIPWrapper(const FileHandle& fileHandle, common::page_idx_t pipPageIdx);
explicit PIPWrapper(common::page_idx_t pipPageIdx) : pipPageIdx(pipPageIdx) {}
common::page_idx_t pipPageIdx;
PIP pipContents;
};
struct PIPUpdates {
std::optional<PIPWrapper> updatedLastPIP;
std::vector<PIPWrapper> newPIPs;
void clear() {
updatedLastPIP.reset();
newPIPs.clear();
}
};
class DiskArrayInternal {
public:
DiskArrayInternal(FileHandle& fileHandle, const DiskArrayHeader& headerForReadTrx,
DiskArrayHeader& headerForWriteTrx, ShadowFile* shadowFile, uint64_t elementSize,
bool bypassShadowing = false);
uint64_t getNumElements(
transaction::TransactionType trxType = transaction::TransactionType::READ_ONLY);
void get(uint64_t idx, const transaction::Transaction* transaction, std::span<std::byte> val);
void update(const transaction::Transaction* transaction, uint64_t idx,
std::span<std::byte> val);
uint64_t resize(PageAllocator& pageAllocator, const transaction::Transaction* transaction,
uint64_t newNumElements, std::span<std::byte> defaultVal);
void checkpointInMemoryIfNecessary() {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(true );
}
void rollbackInMemoryIfNecessary() {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(false );
}
void checkpoint();
void reclaimStorage(PageAllocator& pageAllocator) const;
struct WriteIterator {
DiskArrayInternal& diskArray;
PageCursor apCursor;
uint32_t valueSize;
ShadowPageAndFrame shadowPageAndFrame;
static const transaction::TransactionType TRX_TYPE =
transaction::TransactionType::CHECKPOINT;
uint64_t idx;
DEFAULT_MOVE_CONSTRUCT(WriteIterator);
WriteIterator(uint32_t valueSize, DiskArrayInternal& diskArray)
: diskArray(diskArray), apCursor(), valueSize(valueSize),
shadowPageAndFrame{common::INVALID_PAGE_IDX, common::INVALID_PAGE_IDX, nullptr},
idx(0) {
diskArray.hasTransactionalUpdates = true;
}
WriteIterator& seek(size_t newIdx);
void pushBack(PageAllocator& pageAllocator, const transaction::Transaction* transaction,
std::span<std::byte> val);
inline WriteIterator& operator+=(size_t increment) { return seek(idx + increment); }
~WriteIterator() { unpin(); }
std::span<uint8_t> operator*() const {
DASSERT(idx < diskArray.headerForWriteTrx.numElements);
DASSERT(shadowPageAndFrame.originalPage != common::INVALID_PAGE_IDX);
return std::span(shadowPageAndFrame.frame + apCursor.elemPosInPage, valueSize);
}
uint64_t size() const { return diskArray.headerForWriteTrx.numElements; }
private:
void unpin();
void getPage(common::page_idx_t newPageIdx, bool isNewlyAdded);
};
WriteIterator iter_mut(uint64_t valueSize);
inline common::page_idx_t getAPIdx(uint64_t idx) const;
protected:
void updatePage(uint64_t pageIdx, bool isNewPage, std::function<void(uint8_t*)> updateOp);
void updateLastPageOnDisk();
uint64_t getNumElementsNoLock(transaction::TransactionType trxType) const {
return getDiskArrayHeader(trxType).numElements;
}
uint64_t getNumAPs(const DiskArrayHeader& header) const {
return (header.numElements + storageInfo.numElementsPerPage - 1) /
storageInfo.numElementsPerPage;
}
void setNextPIPPageIDxOfPIPNoLock(uint64_t pipIdxOfPreviousPIP,
common::page_idx_t nextPIPPageIdx);
common::page_idx_t getAPPageIdxNoLock(common::page_idx_t apIdx,
transaction::TransactionType trxType = transaction::TransactionType::READ_ONLY);
common::page_idx_t getUpdatedPageIdxOfPipNoLock(uint64_t pipIdx);
void clearWALPageVersionAndRemovePageFromFrameIfNecessary(common::page_idx_t pageIdx);
void checkpointOrRollbackInMemoryIfNecessaryNoLock(bool isCheckpoint);
private:
bool checkOutOfBoundAccess(transaction::TransactionType trxType, uint64_t idx) const;
bool hasPIPUpdatesNoLock(uint64_t pipIdx) const;
const DiskArrayHeader& getDiskArrayHeader(transaction::TransactionType trxType) const {
if (trxType == transaction::TransactionType::CHECKPOINT) {
return headerForWriteTrx;
}
return header;
}
std::pair<common::page_idx_t, bool> getAPPageIdxAndAddAPToPIPIfNecessaryForWriteTrxNoLock(
PageAllocator& pageAllocator, const transaction::Transaction* transaction,
common::page_idx_t apIdx);
protected:
PageStorageInfo storageInfo;
FileHandle& fileHandle;
const DiskArrayHeader& header;
DiskArrayHeader& headerForWriteTrx;
bool hasTransactionalUpdates;
ShadowFile* shadowFile;
std::vector<PIPWrapper> pips;
PIPUpdates pipUpdates;
std::shared_mutex diskArraySharedMtx;
common::page_idx_t lastAPPageIdx;
common::page_idx_t lastPageOnDisk;
};
template<typename U>
inline std::span<std::byte> getSpan(U& val) {
return std::span(reinterpret_cast<std::byte*>(&val), sizeof(U));
}
template<typename U>
class DiskArray {
static_assert(sizeof(U) <= common::LBUG_PAGE_SIZE);
public:
DiskArray(FileHandle& fileHandle, const DiskArrayHeader& headerForReadTrx,
DiskArrayHeader& headerForWriteTrx, ShadowFile* shadowFile, bool bypassWAL = false)
: diskArray(fileHandle, headerForReadTrx, headerForWriteTrx, shadowFile, sizeof(U),
bypassWAL) {}
inline void update(const transaction::Transaction* transaction, uint64_t idx, U val) {
diskArray.update(transaction, idx, getSpan(val));
}
inline U get(uint64_t idx, const transaction::Transaction* transaction) {
U val;
diskArray.get(idx, transaction, getSpan(val));
return val;
}
inline uint64_t resize(PageAllocator& pageAllocator,
const transaction::Transaction* transaction, uint64_t newNumElements) {
U defaultVal;
return diskArray.resize(pageAllocator, transaction, newNumElements, getSpan(defaultVal));
}
inline uint64_t getNumElements(
transaction::TransactionType trxType = transaction::TransactionType::READ_ONLY) {
return diskArray.getNumElements(trxType);
}
inline void checkpointInMemoryIfNecessary() { diskArray.checkpointInMemoryIfNecessary(); }
inline void rollbackInMemoryIfNecessary() { diskArray.rollbackInMemoryIfNecessary(); }
inline void checkpoint() { diskArray.checkpoint(); }
inline void reclaimStorage(PageAllocator& pageAllocator) const {
diskArray.reclaimStorage(pageAllocator);
}
class WriteIterator {
public:
explicit WriteIterator(DiskArrayInternal::WriteIterator&& iter) : iter(std::move(iter)) {}
inline U& operator*() { return *reinterpret_cast<U*>((*iter).data()); }
DELETE_COPY_DEFAULT_MOVE(WriteIterator);
inline WriteIterator& operator+=(size_t dist) {
iter += dist;
return *this;
}
inline WriteIterator& seek(size_t idx) {
iter.seek(idx);
return *this;
}
inline uint64_t idx() const { return iter.idx; }
inline uint64_t getAPIdx() const { return iter.apCursor.pageIdx; }
inline WriteIterator& pushBack(PageAllocator& pageAllocator,
const transaction::Transaction* transaction, U val) {
iter.pushBack(pageAllocator, transaction, getSpan(val));
return *this;
}
inline uint64_t size() const { return iter.size(); }
private:
DiskArrayInternal::WriteIterator iter;
};
inline WriteIterator iter_mut() { return WriteIterator{diskArray.iter_mut(sizeof(U))}; }
inline uint64_t getAPIdx(uint64_t idx) const { return diskArray.getAPIdx(idx); }
static constexpr uint32_t getAlignedElementSize() { return std::bit_ceil(sizeof(U)); }
private:
DiskArrayInternal diskArray;
};
class BlockVectorInternal {
public:
using element_construct_func_t = std::function<void(uint8_t*)>;
explicit BlockVectorInternal(MemoryManager& memoryManager, size_t elementSize)
: storageInfo{elementSize}, numElements{0}, memoryManager{memoryManager} {}
void resize(uint64_t newNumElements, const element_construct_func_t& defaultConstructor);
inline uint64_t size() const { return numElements; }
uint8_t* operator[](uint64_t idx) const;
private:
inline uint64_t getNumArrayPagesNeededForElements(uint64_t numElements) const {
return (numElements + this->storageInfo.numElementsPerPage - 1) /
this->storageInfo.numElementsPerPage;
}
protected:
std::vector<std::unique_ptr<MemoryBuffer>> inMemArrayPages;
PageStorageInfo storageInfo;
uint64_t numElements;
MemoryManager& memoryManager;
};
template<typename U>
class BlockVector {
public:
explicit BlockVector(MemoryManager& memoryManager, uint64_t numElements = 0)
: vector(memoryManager, sizeof(U)) {
resize(numElements);
}
~BlockVector() {
for (uint64_t i = 0; i < size(); ++i) {
operator[](i).~U();
}
}
inline U& operator[](uint64_t idx) { return *(U*)vector[idx]; }
inline void resize(uint64_t newNumElements) {
static constexpr auto defaultConstructor = [](uint8_t* data) {
[[maybe_unused]] auto* p = new (data) U();
DASSERT(p);
};
vector.resize(newNumElements, defaultConstructor);
}
inline uint64_t size() const { return vector.size(); }
static constexpr uint32_t getAlignedElementSize() {
return DiskArray<U>::getAlignedElementSize();
}
private:
BlockVectorInternal vector;
};
} }