#include "storage/index/hash_index.h"
#include <bitset>
#include "common/assert.h"
#include "common/exception/message.h"
#include "common/serializer/deserializer.h"
#include "common/types/int128_t.h"
#include "common/types/string_t.h"
#include "common/types/types.h"
#include "common/types/uint128_t.h"
#include "main/client_context.h"
#include "storage/disk_array.h"
#include "storage/disk_array_collection.h"
#include "storage/file_handle.h"
#include "storage/index/hash_index_header.h"
#include "storage/index/hash_index_slot.h"
#include "storage/index/hash_index_utils.h"
#include "storage/index/in_mem_hash_index.h"
#include "storage/local_storage/local_hash_index.h"
#include "storage/overflow_file.h"
#include "storage/shadow_utils.h"
#include "storage/storage_manager.h"
#include "transaction/transaction.h"
using namespace lbug::common;
using namespace lbug::transaction;
namespace lbug {
namespace storage {
template<typename T>
HashIndex<T>::HashIndex(MemoryManager& memoryManager, OverflowFileHandle* overflowFileHandle,
DiskArrayCollection& diskArrays, uint64_t indexPos, ShadowFile* shadowFile,
const HashIndexHeader& indexHeaderForReadTrx, HashIndexHeader& indexHeaderForWriteTrx)
: shadowFile{shadowFile}, headerPageIdx{0}, overflowFileHandle{overflowFileHandle},
localStorage{std::make_unique<HashIndexLocalStorage<T>>(memoryManager, overflowFileHandle)},
indexHeaderForReadTrx{indexHeaderForReadTrx}, indexHeaderForWriteTrx{indexHeaderForWriteTrx},
memoryManager{memoryManager} {
pSlots = diskArrays.getDiskArray<OnDiskSlotType>(indexPos);
oSlots = diskArrays.getDiskArray<OnDiskSlotType>(NUM_HASH_INDEXES + indexPos);
}
template<typename T>
void HashIndex<T>::deleteFromPersistentIndex(const Transaction* transaction, Key key,
visible_func isVisible) {
auto& header = this->indexHeaderForWriteTrx;
if (header.numEntries == 0) {
return;
}
auto hashValue = HashIndexUtils::hash(key);
auto fingerprint = HashIndexUtils::getFingerprintForHash(hashValue);
auto iter =
getSlotIterator(HashIndexUtils::getPrimarySlotIdForHash(header, hashValue), transaction);
do {
auto entryPos = findMatchedEntryInSlot(transaction, iter.slot, key, fingerprint, isVisible);
if (entryPos != SlotHeader::INVALID_ENTRY_POS) {
iter.slot.header.setEntryInvalid(entryPos);
updateSlot(transaction, iter.slotInfo, iter.slot);
header.numEntries--;
}
} while (nextChainedSlot(transaction, iter));
}
template<>
inline hash_t HashIndex<string_t>::hashStored(const Transaction* transaction,
const string_t& key) const {
hash_t hash = 0;
const auto str = overflowFileHandle->readString(transaction->getType(), key);
function::Hash::operation(str, hash);
return hash;
}
template<typename T>
bool HashIndex<T>::checkpoint(PageAllocator& pageAllocator) {
if (localStorage->hasUpdates()) {
auto transaction = &DUMMY_CHECKPOINT_TRANSACTION;
auto netInserts = localStorage->getNetInserts();
if (netInserts > 0) {
reserve(pageAllocator, transaction, netInserts);
}
localStorage->applyLocalChanges(
[&](Key) {
},
[&](const auto& insertions) {
mergeBulkInserts(pageAllocator, transaction, insertions);
});
pSlots->checkpoint();
oSlots->checkpoint();
return true;
}
pSlots->checkpoint();
oSlots->checkpoint();
return false;
}
template<typename T>
bool HashIndex<T>::checkpointInMemory() {
if (!localStorage->hasUpdates()) {
return false;
}
pSlots->checkpointInMemoryIfNecessary();
oSlots->checkpointInMemoryIfNecessary();
localStorage->clear();
if constexpr (std::same_as<string_t, T>) {
overflowFileHandle->checkpointInMemory();
}
return true;
}
template<typename T>
bool HashIndex<T>::rollbackInMemory() {
if (!localStorage->hasUpdates()) {
return false;
}
pSlots->rollbackInMemoryIfNecessary();
oSlots->rollbackInMemoryIfNecessary();
localStorage->clear();
return true;
}
template<typename T>
void HashIndex<T>::rollbackCheckpoint() {
pSlots->rollbackInMemoryIfNecessary();
oSlots->rollbackInMemoryIfNecessary();
}
template<typename T>
void HashIndex<T>::reclaimStorage(PageAllocator& pageAllocator) {
pSlots->reclaimStorage(pageAllocator);
oSlots->reclaimStorage(pageAllocator);
}
template<typename T>
void HashIndex<T>::splitSlots(PageAllocator& pageAllocator, const Transaction* transaction,
HashIndexHeader& header, slot_id_t numSlotsToSplit) {
auto originalSlotIterator = pSlots->iter_mut();
auto newSlotIterator = pSlots->iter_mut();
auto overflowSlotIterator = oSlots->iter_mut();
std::vector<OnDiskSlotType> newOverflowSlots;
auto getNextOvfSlot = [&](slot_id_t nextOvfSlotId) {
if (nextOvfSlotId >= oSlots->getNumElements()) {
return &newOverflowSlots[nextOvfSlotId - oSlots->getNumElements()];
} else {
return &*overflowSlotIterator.seek(nextOvfSlotId);
}
};
for (slot_id_t i = 0; i < numSlotsToSplit; i++) {
auto* newSlot = &*newSlotIterator.pushBack(pageAllocator, transaction, OnDiskSlotType());
entry_pos_t newEntryPos = 0;
OnDiskSlotType* originalSlot = &*originalSlotIterator.seek(header.nextSplitSlotId);
do {
for (entry_pos_t originalEntryPos = 0; originalEntryPos < PERSISTENT_SLOT_CAPACITY;
originalEntryPos++) {
if (!originalSlot->header.isEntryValid(originalEntryPos)) {
continue; }
if (newEntryPos >= PERSISTENT_SLOT_CAPACITY) {
newSlot->header.nextOvfSlotId =
newOverflowSlots.size() + oSlots->getNumElements();
newOverflowSlots.emplace_back();
newSlot = &newOverflowSlots.back();
newEntryPos = 0;
}
const auto& key = originalSlot->entries[originalEntryPos].key;
const hash_t hash = this->hashStored(transaction, key);
const auto newSlotId = hash & header.higherLevelHashMask;
if (newSlotId != header.nextSplitSlotId) {
DASSERT(newSlotId == newSlotIterator.idx());
newSlot->entries[newEntryPos] = originalSlot->entries[originalEntryPos];
newSlot->header.setEntryValid(newEntryPos,
originalSlot->header.fingerprints[originalEntryPos]);
originalSlot->header.setEntryInvalid(originalEntryPos);
newEntryPos++;
}
}
} while (originalSlot->header.nextOvfSlotId != SlotHeader::INVALID_OVERFLOW_SLOT_ID &&
(originalSlot = getNextOvfSlot(originalSlot->header.nextOvfSlotId)));
header.incrementNextSplitSlotId();
}
for (auto&& slot : newOverflowSlots) {
overflowSlotIterator.pushBack(pageAllocator, transaction, std::move(slot));
}
}
template<typename T>
std::vector<std::pair<SlotInfo, typename HashIndex<T>::OnDiskSlotType>>
HashIndex<T>::getChainedSlots(const Transaction* transaction, slot_id_t pSlotId) {
std::vector<std::pair<SlotInfo, OnDiskSlotType>> slots;
SlotInfo slotInfo{pSlotId, SlotType::PRIMARY};
while (slotInfo.slotType == SlotType::PRIMARY ||
slotInfo.slotId != SlotHeader::INVALID_OVERFLOW_SLOT_ID) {
auto slot = getSlot(transaction, slotInfo);
slots.emplace_back(slotInfo, slot);
slotInfo.slotId = slot.header.nextOvfSlotId;
slotInfo.slotType = SlotType::OVF;
}
return slots;
}
template<typename T>
void HashIndex<T>::reserve(PageAllocator& pageAllocator, const Transaction* transaction,
uint64_t newEntries) {
slot_id_t numRequiredEntries =
HashIndexUtils::getNumRequiredEntries(this->indexHeaderForWriteTrx.numEntries + newEntries);
auto numRequiredSlots =
std::max((numRequiredEntries + PERSISTENT_SLOT_CAPACITY - 1) / PERSISTENT_SLOT_CAPACITY,
static_cast<slot_id_t>(1ul << this->indexHeaderForWriteTrx.currentLevel));
numRequiredSlots = std::max(numRequiredSlots, LBUG_PAGE_SIZE / pSlots->getAlignedElementSize());
if (this->indexHeaderForWriteTrx.numEntries == 0) {
pSlots->resize(pageAllocator, transaction, numRequiredSlots);
auto numSlotsOfCurrentLevel = 1u << this->indexHeaderForWriteTrx.currentLevel;
while ((numSlotsOfCurrentLevel << 1) <= numRequiredSlots) {
this->indexHeaderForWriteTrx.incrementLevel();
numSlotsOfCurrentLevel <<= 1;
}
if (numRequiredSlots >= numSlotsOfCurrentLevel) {
this->indexHeaderForWriteTrx.nextSplitSlotId =
numRequiredSlots - numSlotsOfCurrentLevel;
}
} else {
splitSlots(pageAllocator, transaction, this->indexHeaderForWriteTrx,
numRequiredSlots - pSlots->getNumElements(transaction->getType()));
}
}
template<typename T>
void HashIndex<T>::sortEntries(const Transaction* transaction,
const InMemHashIndex<T>& insertLocalStorage,
typename InMemHashIndex<T>::SlotIterator& slotToMerge,
std::vector<HashIndexEntryView>& entries) {
do {
auto numEntries = slotToMerge.slot->header.numEntries();
for (auto entryPos = 0u; entryPos < numEntries; entryPos++) {
const auto* entry = &slotToMerge.slot->entries[entryPos];
const auto hash = hashStored(transaction, entry->key);
const auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(indexHeaderForWriteTrx, hash);
entries.push_back(HashIndexEntryView{primarySlot,
slotToMerge.slot->header.fingerprints[entryPos], entry});
}
} while (insertLocalStorage.nextChainedSlot(slotToMerge));
std::sort(entries.begin(), entries.end(), [&](auto entry1, auto entry2) -> bool {
return entry1.diskSlotId > entry2.diskSlotId;
});
}
template<typename T>
void HashIndex<T>::mergeBulkInserts(PageAllocator& pageAllocator, const Transaction* transaction,
const InMemHashIndex<T>& insertLocalStorage) {
reserve(pageAllocator, transaction, insertLocalStorage.size());
auto diskSlotIterator = pSlots->iter_mut();
auto diskOverflowSlotIterator = oSlots->iter_mut();
constexpr size_t NUM_SLOTS_PER_PAGE =
LBUG_PAGE_SIZE / DiskArray<OnDiskSlotType>::getAlignedElementSize();
std::array<std::vector<HashIndexEntryView>, NUM_SLOTS_PER_PAGE> partitionedEntries;
for (uint64_t localSlotId = 0; localSlotId < insertLocalStorage.numPrimarySlots();
localSlotId += NUM_SLOTS_PER_PAGE) {
for (size_t i = 0;
i < NUM_SLOTS_PER_PAGE && localSlotId + i < insertLocalStorage.numPrimarySlots();
i++) {
auto localSlot =
typename InMemHashIndex<T>::SlotIterator(localSlotId + i, &insertLocalStorage);
partitionedEntries[i].clear();
sortEntries(transaction, insertLocalStorage, localSlot, partitionedEntries[i]);
}
std::bitset<NUM_SLOTS_PER_PAGE> done;
while (!done.all()) {
std::optional<page_idx_t> diskSlotPage;
for (size_t i = 0; i < NUM_SLOTS_PER_PAGE; i++) {
if (!done[i] && !partitionedEntries[i].empty()) {
auto diskSlotId = partitionedEntries[i].back().diskSlotId;
if (!diskSlotPage) {
diskSlotPage = diskSlotId / NUM_SLOTS_PER_PAGE;
}
if (diskSlotId / NUM_SLOTS_PER_PAGE == diskSlotPage) {
auto merged = mergeSlot(pageAllocator, transaction, partitionedEntries[i],
diskSlotIterator, diskOverflowSlotIterator, diskSlotId);
DASSERT(merged <= partitionedEntries[i].size());
partitionedEntries[i].resize(partitionedEntries[i].size() - merged);
if (partitionedEntries[i].empty()) {
done[i] = true;
}
}
} else {
done[i] = true;
}
}
}
}
}
template<typename T>
size_t HashIndex<T>::mergeSlot(PageAllocator& pageAllocator, const Transaction* transaction,
const std::vector<HashIndexEntryView>& slotToMerge,
typename DiskArray<OnDiskSlotType>::WriteIterator& diskSlotIterator,
typename DiskArray<OnDiskSlotType>::WriteIterator& diskOverflowSlotIterator,
slot_id_t diskSlotId) {
slot_id_t diskEntryPos = 0u;
OnDiskSlotType* diskSlot = &*diskSlotIterator.seek(diskSlotId);
DASSERT(diskSlot->header.nextOvfSlotId == SlotHeader::INVALID_OVERFLOW_SLOT_ID ||
diskOverflowSlotIterator.size() > diskSlot->header.nextOvfSlotId);
size_t merged = 0;
for (auto it = std::rbegin(slotToMerge); it != std::rend(slotToMerge); ++it) {
if (it->diskSlotId != diskSlotId) {
return merged;
}
while (diskSlot->header.isEntryValid(diskEntryPos) ||
diskEntryPos >= PERSISTENT_SLOT_CAPACITY) {
diskEntryPos++;
if (diskEntryPos >= PERSISTENT_SLOT_CAPACITY) {
if (diskSlot->header.nextOvfSlotId == SlotHeader::INVALID_OVERFLOW_SLOT_ID) {
diskSlot->header.nextOvfSlotId = diskOverflowSlotIterator.size();
diskOverflowSlotIterator.pushBack(pageAllocator, transaction, OnDiskSlotType());
DASSERT(
diskSlot->header.nextOvfSlotId == SlotHeader::INVALID_OVERFLOW_SLOT_ID ||
diskOverflowSlotIterator.size() > diskSlot->header.nextOvfSlotId);
} else {
diskOverflowSlotIterator.seek(diskSlot->header.nextOvfSlotId);
DASSERT(
diskSlot->header.nextOvfSlotId == SlotHeader::INVALID_OVERFLOW_SLOT_ID ||
diskOverflowSlotIterator.size() > diskSlot->header.nextOvfSlotId);
}
diskSlot = &*diskOverflowSlotIterator;
DASSERT(diskOverflowSlotIterator.idx() != diskSlot->header.nextOvfSlotId);
diskEntryPos = 0;
}
}
DASSERT(diskEntryPos < PERSISTENT_SLOT_CAPACITY);
if constexpr (std::is_same_v<T, string_t>) {
auto* inMemEntry = it->entry;
auto str = overflowFileHandle->writeString(&pageAllocator, inMemEntry->key);
diskSlot->entries[diskEntryPos] = SlotEntry<T>{str, inMemEntry->value};
} else {
diskSlot->entries[diskEntryPos] = *it->entry;
}
diskSlot->header.setEntryValid(diskEntryPos, it->fingerprint);
DASSERT([&]() {
const auto& key = it->entry->key;
const auto hash = hashStored(transaction, key);
const auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(indexHeaderForWriteTrx, hash);
DASSERT(it->fingerprint == HashIndexUtils::getFingerprintForHash(hash));
DASSERT(primarySlot == diskSlotId);
return true;
}());
indexHeaderForWriteTrx.numEntries++;
diskEntryPos++;
merged++;
}
return merged;
}
template<typename T>
void HashIndex<T>::bulkReserve(uint64_t newEntries) {
return localStorage->reserveInserts(newEntries);
}
template<typename T>
HashIndex<T>::~HashIndex() = default;
template<>
bool HashIndex<string_t>::equals(const transaction::Transaction* transaction,
std::string_view keyToLookup, const string_t& keyInEntry) const {
if (!HashIndexUtils::areStringPrefixAndLenEqual(keyToLookup, keyInEntry)) {
return false;
}
if (keyInEntry.len <= string_t::PREFIX_LENGTH) {
return true;
} else if (keyInEntry.len <= string_t::SHORT_STR_LENGTH) {
return memcmp(keyToLookup.data(), keyInEntry.prefix, keyInEntry.len) == 0;
} else {
return overflowFileHandle->equals(transaction->getType(), keyToLookup, keyInEntry);
}
}
template class HashIndex<int64_t>;
template class HashIndex<int32_t>;
template class HashIndex<int16_t>;
template class HashIndex<int8_t>;
template class HashIndex<uint64_t>;
template class HashIndex<uint32_t>;
template class HashIndex<uint16_t>;
template class HashIndex<uint8_t>;
template class HashIndex<double>;
template class HashIndex<float>;
template class HashIndex<int128_t>;
template class HashIndex<uint128_t>;
template class HashIndex<string_t>;
std::unique_ptr<IndexStorageInfo> PrimaryKeyIndexStorageInfo::deserialize(
std::unique_ptr<BufferReader> reader) {
page_idx_t firstHeaderPage = INVALID_PAGE_IDX;
page_idx_t overflowHeaderPage = INVALID_PAGE_IDX;
Deserializer deSer(std::move(reader));
deSer.deserializeValue(firstHeaderPage);
deSer.deserializeValue(overflowHeaderPage);
return std::make_unique<PrimaryKeyIndexStorageInfo>(firstHeaderPage, overflowHeaderPage);
}
std::unique_ptr<PrimaryKeyIndex> PrimaryKeyIndex::createNewIndex(IndexInfo indexInfo,
bool inMemMode, MemoryManager& memoryManager, PageAllocator& pageAllocator,
ShadowFile* shadowFile) {
return std::make_unique<PrimaryKeyIndex>(std::move(indexInfo),
std::make_unique<PrimaryKeyIndexStorageInfo>(), inMemMode, memoryManager, pageAllocator,
shadowFile);
}
PrimaryKeyIndex::PrimaryKeyIndex(IndexInfo indexInfo, std::unique_ptr<IndexStorageInfo> storageInfo,
bool inMemMode, MemoryManager& memoryManager, PageAllocator& pageAllocator,
ShadowFile* shadowFile)
: Index{std::move(indexInfo), std::move(storageInfo)}, shadowFile{*shadowFile} {
auto& hashIndexStorageInfo = this->storageInfo->cast<PrimaryKeyIndexStorageInfo>();
if (hashIndexStorageInfo.firstHeaderPage == INVALID_PAGE_IDX) {
DASSERT(hashIndexStorageInfo.overflowHeaderPage == INVALID_PAGE_IDX);
hashIndexHeadersForReadTrx.resize(NUM_HASH_INDEXES);
hashIndexHeadersForWriteTrx.resize(NUM_HASH_INDEXES);
hashIndexDiskArrays = std::make_unique<DiskArrayCollection>(*pageAllocator.getDataFH(),
*shadowFile, true );
for (size_t i = 0; i < NUM_HASH_INDEXES * 2; i++) {
hashIndexDiskArrays->addDiskArray();
}
} else {
for (size_t headerPageIdx = 0; headerPageIdx < INDEX_HEADER_PAGES; headerPageIdx++) {
size_t startHeaderIdx = headerPageIdx * INDEX_HEADERS_PER_PAGE;
pageAllocator.getDataFH()->optimisticReadPage(
hashIndexStorageInfo.firstHeaderPage + headerPageIdx, [&](auto* frame) {
const auto onDiskHeaders = reinterpret_cast<HashIndexHeaderOnDisk*>(frame);
for (size_t i = 0;
i < INDEX_HEADERS_PER_PAGE && startHeaderIdx + i < NUM_HASH_INDEXES; i++) {
hashIndexHeadersForReadTrx.emplace_back(onDiskHeaders[i]);
}
});
}
hashIndexHeadersForWriteTrx.assign(hashIndexHeadersForReadTrx.begin(),
hashIndexHeadersForReadTrx.end());
hashIndexDiskArrays = std::make_unique<DiskArrayCollection>(*pageAllocator.getDataFH(),
*shadowFile,
hashIndexStorageInfo.firstHeaderPage +
INDEX_HEADER_PAGES ,
true );
}
initOverflowAndSubIndices(inMemMode, memoryManager, pageAllocator, hashIndexStorageInfo);
}
void PrimaryKeyIndex::initOverflowAndSubIndices(bool inMemMode, MemoryManager& mm,
PageAllocator& pageAllocator, PrimaryKeyIndexStorageInfo& storageInfo) {
DASSERT(indexInfo.keyDataTypes.size() == 1);
if (indexInfo.keyDataTypes[0] == PhysicalTypeID::STRING ||
indexInfo.keyDataTypes[0] == PhysicalTypeID::JSON) {
if (inMemMode) {
overflowFile = std::make_unique<InMemOverflowFile>(mm);
} else {
overflowFile = std::make_unique<OverflowFile>(pageAllocator.getDataFH(), mm,
&shadowFile, storageInfo.overflowHeaderPage);
}
}
hashIndices.reserve(NUM_HASH_INDEXES);
TypeUtils::visit(
indexInfo.keyDataTypes[0],
[&](string_t) {
for (auto i = 0u; i < NUM_HASH_INDEXES; i++) {
hashIndices.push_back(std::make_unique<HashIndex<string_t>>(mm,
overflowFile->addHandle(), *hashIndexDiskArrays, i, &shadowFile,
hashIndexHeadersForReadTrx[i], hashIndexHeadersForWriteTrx[i]));
}
},
[&]<HashablePrimitive T>(T) {
for (auto i = 0u; i < NUM_HASH_INDEXES; i++) {
hashIndices.push_back(std::make_unique<HashIndex<T>>(mm, nullptr,
*hashIndexDiskArrays, i, &shadowFile, hashIndexHeadersForReadTrx[i],
hashIndexHeadersForWriteTrx[i]));
}
},
[&](auto) { UNREACHABLE_CODE; });
}
bool PrimaryKeyIndex::lookup(const Transaction* trx, ValueVector* keyVector, uint64_t vectorPos,
offset_t& result, visible_func isVisible) {
bool retVal = false;
DASSERT(indexInfo.keyDataTypes.size() == 1);
TypeUtils::visit(
indexInfo.keyDataTypes[0],
[&]<IndexHashable T>(T) {
T key = keyVector->getValue<T>(vectorPos);
retVal = lookup(trx, key, result, isVisible);
},
[](auto) { UNREACHABLE_CODE; });
return retVal;
}
void PrimaryKeyIndex::commitInsert(Transaction* transaction, const ValueVector& nodeIDVector,
const std::vector<ValueVector*>& indexVectors, Index::InsertState& insertState) {
DASSERT(indexVectors.size() == 1);
const auto& pkVector = *indexVectors[0];
const auto& pkInsertState = insertState.cast<InsertState>();
for (auto i = 0u; i < nodeIDVector.state->getSelSize(); i++) {
const auto nodeIDPos = nodeIDVector.state->getSelVector()[i];
const auto offset = nodeIDVector.readNodeOffset(nodeIDPos);
const auto pkPos = pkVector.state->getSelVector()[i];
if (pkVector.isNull(pkPos)) {
throw RuntimeException(ExceptionMessage::nullPKException());
}
if (!insert(transaction, &pkVector, pkPos, offset, pkInsertState.isVisible)) {
throw RuntimeException(
ExceptionMessage::duplicatePKException(pkVector.getAsValue(pkPos)->toString()));
}
}
}
bool PrimaryKeyIndex::insert(const Transaction* transaction, const ValueVector* keyVector,
uint64_t vectorPos, offset_t value, visible_func isVisible) {
bool result = false;
DASSERT(indexInfo.keyDataTypes.size() == 1);
TypeUtils::visit(
indexInfo.keyDataTypes[0],
[&]<IndexHashable T>(T) {
T key = keyVector->getValue<T>(vectorPos);
result = insert(transaction, key, value, isVisible);
},
[](auto) { UNREACHABLE_CODE; });
return result;
}
void PrimaryKeyIndex::delete_(ValueVector* keyVector) {
DASSERT(indexInfo.keyDataTypes.size() == 1);
TypeUtils::visit(
indexInfo.keyDataTypes[0],
[&]<IndexHashable T>(T) {
for (auto i = 0u; i < keyVector->state->getSelVector().getSelSize(); i++) {
auto pos = keyVector->state->getSelVector()[i];
if (keyVector->isNull(pos)) {
continue;
}
auto key = keyVector->getValue<T>(pos);
delete_(key);
}
},
[](auto) { UNREACHABLE_CODE; });
}
void PrimaryKeyIndex::checkpointInMemory() {
bool indexChanged = false;
for (auto i = 0u; i < NUM_HASH_INDEXES; i++) {
if (hashIndices[i]->checkpointInMemory()) {
indexChanged = true;
}
}
if (indexChanged) {
for (size_t i = 0; i < NUM_HASH_INDEXES; i++) {
hashIndexHeadersForReadTrx[i] = hashIndexHeadersForWriteTrx[i];
}
hashIndexDiskArrays->checkpointInMemory();
}
if (overflowFile) {
overflowFile->checkpointInMemory();
}
}
void PrimaryKeyIndex::writeHeaders(PageAllocator& pageAllocator) const {
size_t headerIdx = 0;
auto& hashIndexStorageInfo = storageInfo->cast<PrimaryKeyIndexStorageInfo>();
if (hashIndexStorageInfo.firstHeaderPage == INVALID_PAGE_IDX) {
const auto allocatedPages = pageAllocator.allocatePageRange(
NUM_HEADER_PAGES + 1 );
hashIndexStorageInfo.firstHeaderPage = allocatedPages.startPageIdx;
}
for (size_t headerPageIdx = 0; headerPageIdx < INDEX_HEADER_PAGES; headerPageIdx++) {
ShadowUtils::updatePage(*pageAllocator.getDataFH(),
hashIndexStorageInfo.firstHeaderPage + headerPageIdx,
true , shadowFile,
[&](auto* frame) {
const auto onDiskFrame = reinterpret_cast<HashIndexHeaderOnDisk*>(frame);
for (size_t i = 0; i < INDEX_HEADERS_PER_PAGE && headerIdx < NUM_HASH_INDEXES;
i++) {
hashIndexHeadersForWriteTrx[headerIdx++].write(onDiskFrame[i]);
}
});
}
DASSERT(headerIdx == NUM_HASH_INDEXES);
}
void PrimaryKeyIndex::rollbackCheckpoint() {
for (idx_t i = 0; i < NUM_HASH_INDEXES; ++i) {
hashIndices[i]->rollbackCheckpoint();
}
hashIndexDiskArrays->rollbackCheckpoint();
hashIndexHeadersForWriteTrx.assign(hashIndexHeadersForReadTrx.begin(),
hashIndexHeadersForReadTrx.end());
if (overflowFile) {
overflowFile->rollbackInMemory();
}
}
static void updateOverflowHeaderPageIfNeeded(IndexStorageInfo* storageInfo,
OverflowFile* overflowFile) {
auto& hashIndexStorageInfo = storageInfo->cast<PrimaryKeyIndexStorageInfo>();
if (hashIndexStorageInfo.overflowHeaderPage == INVALID_PAGE_IDX) {
hashIndexStorageInfo.overflowHeaderPage = overflowFile->getHeaderPageIdx();
}
}
void PrimaryKeyIndex::checkpoint(main::ClientContext*, storage::PageAllocator& pageAllocator) {
bool indexChanged = false;
for (auto i = 0u; i < NUM_HASH_INDEXES; i++) {
if (hashIndices[i]->checkpoint(pageAllocator)) {
indexChanged = true;
}
}
if (indexChanged) {
writeHeaders(pageAllocator);
hashIndexDiskArrays->checkpoint(getDiskArrayFirstHeaderPage(), pageAllocator);
}
if (overflowFile) {
overflowFile->checkpoint(pageAllocator);
updateOverflowHeaderPageIfNeeded(storageInfo.get(), overflowFile.get());
}
pageAllocator.getDataFH()->flushAllDirtyPagesInFrames();
checkpointInMemory();
}
PrimaryKeyIndex::~PrimaryKeyIndex() = default;
std::unique_ptr<Index> PrimaryKeyIndex::load(main::ClientContext* context,
StorageManager* storageManager, IndexInfo indexInfo, std::span<uint8_t> storageInfoBuffer) {
auto storageInfoBufferReader =
std::make_unique<BufferReader>(storageInfoBuffer.data(), storageInfoBuffer.size());
auto storageInfo = PrimaryKeyIndexStorageInfo::deserialize(std::move(storageInfoBufferReader));
return std::make_unique<PrimaryKeyIndex>(indexInfo, std::move(storageInfo),
storageManager->isInMemory(), *MemoryManager::Get(*context),
*storageManager->getDataFH()->getPageManager(), &storageManager->getShadowFile());
}
void PrimaryKeyIndex::reclaimStorage(PageAllocator& pageAllocator) const {
for (auto& hashIndex : hashIndices) {
hashIndex->reclaimStorage(pageAllocator);
}
hashIndexDiskArrays->reclaimStorage(pageAllocator, getDiskArrayFirstHeaderPage());
if (overflowFile) {
overflowFile->reclaimStorage(pageAllocator);
}
const auto firstHeaderPage = getFirstHeaderPage();
if (firstHeaderPage != INVALID_PAGE_IDX) {
pageAllocator.freePageRange({getFirstHeaderPage(), NUM_HEADER_PAGES});
}
}
page_idx_t PrimaryKeyIndex::getDiskArrayFirstHeaderPage() const {
const auto firstHeaderPage = getFirstHeaderPage();
return firstHeaderPage == INVALID_PAGE_IDX ? INVALID_PAGE_IDX :
firstHeaderPage + NUM_HEADER_PAGES;
}
page_idx_t PrimaryKeyIndex::getFirstHeaderPage() const {
return storageInfo->cast<PrimaryKeyIndexStorageInfo>().firstHeaderPage;
}
} }