#include "storage/index/in_mem_hash_index.h"
#include <cstdint>
#include <cstring>
#include "common/types/string_t.h"
#include "common/types/types.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/disk_array.h"
#include "storage/index/hash_index_header.h"
#include "storage/index/hash_index_slot.h"
#include "storage/index/hash_index_utils.h"
#include "storage/overflow_file.h"
using namespace lbug::common;
namespace lbug {
namespace storage {
template<typename T>
InMemHashIndex<T>::InMemHashIndex(MemoryManager& memoryManager,
OverflowFileHandle* overflowFileHandle)
: overflowFileHandle(overflowFileHandle),
pSlots{std::make_unique<BlockVector<InMemSlotType>>(memoryManager)},
oSlots{std::make_unique<BlockVector<InMemSlotType>>(memoryManager)}, indexHeader{},
memoryManager{memoryManager}, numFreeSlots{0} {
allocateSlots(LBUG_PAGE_SIZE / pSlots->getAlignedElementSize());
}
template<typename T>
void InMemHashIndex<T>::clear() {
indexHeader = HashIndexHeader();
pSlots = std::make_unique<BlockVector<InMemSlotType>>(memoryManager);
oSlots = std::make_unique<BlockVector<InMemSlotType>>(memoryManager);
allocateSlots(LBUG_PAGE_SIZE / pSlots->getAlignedElementSize());
}
template<typename T>
void InMemHashIndex<T>::allocateSlots(uint32_t newNumSlots) {
auto existingSlots = pSlots->size();
if (newNumSlots > existingSlots) {
allocatePSlots(newNumSlots - existingSlots);
}
auto numSlotsOfCurrentLevel = 1u << this->indexHeader.currentLevel;
while ((numSlotsOfCurrentLevel << 1) <= newNumSlots) {
this->indexHeader.incrementLevel();
numSlotsOfCurrentLevel <<= 1;
}
if (newNumSlots >= numSlotsOfCurrentLevel) {
this->indexHeader.nextSplitSlotId = newNumSlots - numSlotsOfCurrentLevel;
}
}
template<typename T>
void InMemHashIndex<T>::reserve(uint32_t numEntries_) {
slot_id_t numRequiredEntries = HashIndexUtils::getNumRequiredEntries(numEntries_);
auto numRequiredSlots = (numRequiredEntries + SLOT_CAPACITY - 1) / SLOT_CAPACITY;
if (numRequiredSlots <= pSlots->size()) {
return;
}
if (indexHeader.numEntries == 0) {
allocateSlots(numRequiredSlots);
} else {
while (pSlots->size() < numRequiredSlots) {
splitSlot();
}
}
}
template<typename T>
uint64_t InMemHashIndex<T>::countSlots(SlotIterator iter) const {
if (iter.slotInfo.slotType == SlotType::OVF &&
iter.slotInfo.slotId == SlotHeader::INVALID_OVERFLOW_SLOT_ID) {
return 0;
}
uint64_t count = 1;
while (nextChainedSlot(iter)) {
count++;
}
return count;
}
template<typename T>
void InMemHashIndex<T>::reserveOverflowSlots(uint64_t totalSlotsRequired) {
SlotInfo newSlot{oSlots->size(), SlotType::OVF};
if (totalSlotsRequired > numFreeSlots) {
oSlots->resize(oSlots->size() + totalSlotsRequired - numFreeSlots);
for (uint64_t i = 0; i < totalSlotsRequired - numFreeSlots; i++) {
auto slot = getSlot(newSlot);
addFreeOverflowSlot(*slot, newSlot);
newSlot.slotId++;
}
}
}
template<typename T>
void InMemHashIndex<T>::splitSlot() {
allocatePSlots(1);
SlotIterator originalSlot(indexHeader.nextSplitSlotId, this);
reserveOverflowSlots(countSlots(originalSlot));
SlotIterator originalSlotForInsert(indexHeader.nextSplitSlotId, this);
auto entryPosToInsert = 0u;
SlotIterator newSlot(pSlots->size() - 1, this);
entry_pos_t newSlotPos = 0;
bool gaps = false;
do {
for (auto entryPos = 0u; entryPos < SLOT_CAPACITY; entryPos++) {
if (!originalSlot.slot->header.isEntryValid(entryPos)) {
DASSERT(originalSlot.slot->header.numEntries() ==
std::countr_one(originalSlot.slot->header.validityMask));
reclaimOverflowSlots(originalSlotForInsert);
indexHeader.incrementNextSplitSlotId();
return;
}
const auto& entry = originalSlot.slot->entries[entryPos];
const auto& hash = this->hashStored(originalSlot.slot->entries[entryPos].key);
const auto fingerprint = HashIndexUtils::getFingerprintForHash(hash);
const auto newSlotId = hash & indexHeader.higherLevelHashMask;
if (newSlotId != indexHeader.nextSplitSlotId) {
if (newSlotPos >= SLOT_CAPACITY) {
auto newOvfSlotId = allocateAOSlot();
newSlot.slot->header.nextOvfSlotId = newOvfSlotId;
[[maybe_unused]] bool hadNextSlot = nextChainedSlot(newSlot);
DASSERT(hadNextSlot);
newSlotPos = 0;
}
newSlot.slot->entries[newSlotPos] = entry;
newSlot.slot->header.setEntryValid(newSlotPos, fingerprint);
originalSlot.slot->header.setEntryInvalid(entryPos);
newSlotPos++;
gaps = true;
} else if (gaps) {
while (originalSlotForInsert.slot->header.isEntryValid(entryPosToInsert)) {
entryPosToInsert++;
if (entryPosToInsert >= SLOT_CAPACITY) {
entryPosToInsert = 0;
[[maybe_unused]] bool hadNextSlot = nextChainedSlot(originalSlotForInsert);
DASSERT(hadNextSlot);
}
}
originalSlotForInsert.slot->entries[entryPosToInsert] = entry;
originalSlotForInsert.slot->header.setEntryValid(entryPosToInsert, fingerprint);
originalSlot.slot->header.setEntryInvalid(entryPos);
}
}
DASSERT(originalSlot.slot->header.numEntries() ==
std::countr_one(originalSlot.slot->header.validityMask));
} while (nextChainedSlot(originalSlot));
reclaimOverflowSlots(originalSlotForInsert);
indexHeader.incrementNextSplitSlotId();
}
template<typename T>
void InMemHashIndex<T>::addFreeOverflowSlot(InMemSlotType& overflowSlot, SlotInfo slotInfo) {
DASSERT(slotInfo.slotId != SlotHeader::INVALID_OVERFLOW_SLOT_ID);
DASSERT(overflowSlot.header.nextOvfSlotId == SlotHeader::INVALID_OVERFLOW_SLOT_ID);
DASSERT(slotInfo.slotType == SlotType::OVF);
overflowSlot.header.nextOvfSlotId = indexHeader.firstFreeOverflowSlotId;
indexHeader.firstFreeOverflowSlotId = slotInfo.slotId;
numFreeSlots++;
}
template<typename T>
void InMemHashIndex<T>::reclaimOverflowSlots(SlotIterator iter) {
if (iter.slot->header.nextOvfSlotId != SlotHeader::INVALID_OVERFLOW_SLOT_ID) {
InMemSlotType* lastNonEmptySlot = iter.slot;
while (iter.slot->header.numEntries() > 0 || iter.slotInfo.slotType == SlotType::PRIMARY) {
lastNonEmptySlot = iter.slot;
if (!nextChainedSlot(iter)) {
iter.slotInfo = HashIndexUtils::INVALID_OVF_INFO;
break;
}
}
lastNonEmptySlot->header.nextOvfSlotId = SlotHeader::INVALID_OVERFLOW_SLOT_ID;
while (iter.slotInfo != HashIndexUtils::INVALID_OVF_INFO) {
DASSERT(iter.slot->header.numEntries() == 0);
auto slotInfo = iter.slotInfo;
auto slot = clearNextOverflowAndAdvanceIter(iter);
if (slotInfo.slotType == SlotType::OVF) {
addFreeOverflowSlot(*slot, slotInfo);
}
}
}
}
template<typename T>
InMemHashIndex<T>::InMemSlotType* InMemHashIndex<T>::clearNextOverflowAndAdvanceIter(
SlotIterator& iter) {
auto originalSlot = iter.slot;
auto nextOverflowSlot = iter.slot->header.nextOvfSlotId;
iter.slot->header.nextOvfSlotId = SlotHeader::INVALID_OVERFLOW_SLOT_ID;
iter.slotInfo = SlotInfo{nextOverflowSlot, SlotType::OVF};
if (nextOverflowSlot != SlotHeader::INVALID_OVERFLOW_SLOT_ID) {
iter.slot = getSlot(iter.slotInfo);
}
return originalSlot;
}
template<typename T>
uint32_t InMemHashIndex<T>::allocatePSlots(uint32_t numSlotsToAllocate) {
auto oldNumSlots = pSlots->size();
auto newNumSlots = oldNumSlots + numSlotsToAllocate;
pSlots->resize(newNumSlots);
return oldNumSlots;
}
template<typename T>
uint32_t InMemHashIndex<T>::allocateAOSlot() {
if (indexHeader.firstFreeOverflowSlotId == SlotHeader::INVALID_OVERFLOW_SLOT_ID) {
auto oldNumSlots = oSlots->size();
auto newNumSlots = oldNumSlots + 1;
oSlots->resize(newNumSlots);
return oldNumSlots;
} else {
auto freeOSlotId = indexHeader.firstFreeOverflowSlotId;
auto& slot = (*oSlots)[freeOSlotId];
indexHeader.firstFreeOverflowSlotId = slot.header.nextOvfSlotId;
DASSERT(slot.header.numEntries() == 0);
slot.header.nextOvfSlotId = SlotHeader::INVALID_OVERFLOW_SLOT_ID;
DASSERT(numFreeSlots > 0);
numFreeSlots--;
return freeOSlotId;
}
}
template<typename T>
InMemHashIndex<T>::InMemSlotType* InMemHashIndex<T>::getSlot(const SlotInfo& slotInfo) const {
if (slotInfo.slotType == SlotType::PRIMARY) {
return &pSlots->operator[](slotInfo.slotId);
} else {
return &oSlots->operator[](slotInfo.slotId);
}
}
template<typename T>
common::hash_t InMemHashIndex<T>::hashStored(const InMemHashIndex<T>::OwnedType& key) const {
return HashIndexUtils::hash(key);
}
template class InMemHashIndex<int64_t>;
template class InMemHashIndex<int32_t>;
template class InMemHashIndex<int16_t>;
template class InMemHashIndex<int8_t>;
template class InMemHashIndex<uint64_t>;
template class InMemHashIndex<uint32_t>;
template class InMemHashIndex<uint16_t>;
template class InMemHashIndex<uint8_t>;
template class InMemHashIndex<double>;
template class InMemHashIndex<float>;
template class InMemHashIndex<int128_t>;
template class InMemHashIndex<uint128_t>;
template class InMemHashIndex<string_t>;
} }