#include "storage/table/csr_node_group.h"
#include "common/constants.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/storage_utils.h"
#include "storage/table/column_chunk_data.h"
#include "storage/table/csr_chunked_node_group.h"
#include "storage/table/lazy_segment_scanner.h"
#include "storage/table/rel_table.h"
#include "transaction/transaction.h"
using namespace lbug::common;
using namespace lbug::transaction;
namespace lbug {
namespace storage {
bool CSRNodeGroupScanState::tryScanCachedTuples(RelTableScanState& tableScanState) {
if (numCachedRows == 0 ||
tableScanState.currBoundNodeIdx >= tableScanState.cachedBoundNodeSelVector.getSelSize()) {
return false;
}
const auto boundNodeOffset = tableScanState.nodeIDVector->readNodeOffset(
tableScanState.cachedBoundNodeSelVector[tableScanState.currBoundNodeIdx]);
const auto boundNodeOffsetInGroup = boundNodeOffset % StorageConfig::NODE_GROUP_SIZE;
const auto startCSROffset = header->getStartCSROffset(boundNodeOffsetInGroup);
const auto csrLength = header->getCSRLength(boundNodeOffsetInGroup);
nextCachedRowToScan = std::max(nextCachedRowToScan, startCSROffset);
if (nextCachedRowToScan >= nextRowToScan ||
nextCachedRowToScan < nextRowToScan - numCachedRows) {
return false;
}
DASSERT(nextCachedRowToScan >= nextRowToScan - numCachedRows);
const auto numRowsToScan =
std::min(nextRowToScan, startCSROffset + csrLength) - nextCachedRowToScan;
const auto startCachedRow = nextCachedRowToScan - (nextRowToScan - numCachedRows);
if (cachedScannedVectorsSelBitset.has_value()) {
auto cachedScannedVectorsSelBitset = *this->cachedScannedVectorsSelBitset;
auto numSelected = 0u;
tableScanState.outState->getSelVectorUnsafe().setToFiltered();
for (auto i = 0u; i < numRowsToScan; i++) {
const auto rowIdx = startCachedRow + i;
tableScanState.outState->getSelVectorUnsafe()[numSelected] = rowIdx;
numSelected += cachedScannedVectorsSelBitset[rowIdx];
}
tableScanState.outState->getSelVectorUnsafe().setSelSize(numSelected);
} else {
tableScanState.outState->getSelVectorUnsafe().setRange(startCachedRow, numRowsToScan);
}
tableScanState.setNodeIDVectorToFlat(
tableScanState.cachedBoundNodeSelVector[tableScanState.currBoundNodeIdx]);
nextCachedRowToScan += numRowsToScan;
if ((startCSROffset + csrLength) == nextCachedRowToScan) {
tableScanState.currBoundNodeIdx++;
nextCachedRowToScan = 0;
}
return true;
}
void CSRNodeGroup::initializeScanState(const Transaction* transaction,
TableScanState& state) const {
auto& relScanState = state.cast<RelTableScanState>();
DASSERT(relScanState.nodeGroupScanState);
auto& nodeGroupScanState = relScanState.nodeGroupScanState->cast<CSRNodeGroupScanState>();
if (relScanState.nodeGroupIdx != nodeGroupIdx || relScanState.randomLookup) {
relScanState.nodeGroupIdx = nodeGroupIdx;
if (persistentChunkGroup) {
initScanForCommittedPersistent(transaction, relScanState, nodeGroupScanState);
}
}
if (persistentChunkGroup) {
nodeGroupScanState.nextRowToScan = 0;
nodeGroupScanState.numCachedRows = 0;
nodeGroupScanState.nextCachedRowToScan = 0;
nodeGroupScanState.source = CSRNodeGroupScanSource::COMMITTED_PERSISTENT;
} else if (csrIndex) {
initScanForCommittedInMem(relScanState, nodeGroupScanState);
} else {
nodeGroupScanState.source = CSRNodeGroupScanSource::NONE;
nodeGroupScanState.nextRowToScan = 0;
}
}
void CSRNodeGroup::initScanForCommittedPersistent(const Transaction* transaction,
RelTableScanState& relScanState, CSRNodeGroupScanState& nodeGroupScanState) const {
ChunkState offsetState, lengthState;
auto& csrChunkGroup = persistentChunkGroup->cast<ChunkedCSRNodeGroup>();
const auto& csrHeader = csrChunkGroup.getCSRHeader();
csrHeader.offset->initializeScanState(offsetState, relScanState.csrOffsetColumn);
csrHeader.length->initializeScanState(lengthState, relScanState.csrLengthColumn);
nodeGroupScanState.header->offset->setNumValues(0);
nodeGroupScanState.header->length->setNumValues(0);
for (auto i = 0u; i < relScanState.columnIDs.size(); i++) {
if (relScanState.columnIDs[i] == INVALID_COLUMN_ID ||
relScanState.columnIDs[i] == ROW_IDX_COLUMN_ID) {
continue;
}
auto& chunk = persistentChunkGroup->getColumnChunk(relScanState.columnIDs[i]);
chunk.initializeScanState(nodeGroupScanState.chunkStates[i], relScanState.columns[i]);
}
DASSERT(csrHeader.offset->getNumValues() == csrHeader.length->getNumValues());
if (relScanState.randomLookup) {
auto pos = relScanState.nodeIDVector->state->getSelVector()[0];
auto nodeOffset = relScanState.nodeIDVector->readNodeOffset(pos);
auto offsetInGroup = nodeOffset % StorageConfig::NODE_GROUP_SIZE;
auto offsetToScanFrom = offsetInGroup == 0 ? 0 : offsetInGroup - 1;
csrHeader.offset->scanCommitted<ResidencyState::ON_DISK>(transaction, offsetState,
*nodeGroupScanState.header->offset, offsetToScanFrom, 1);
csrHeader.length->scanCommitted<ResidencyState::ON_DISK>(transaction, lengthState,
*nodeGroupScanState.header->length, offsetInGroup, 1);
} else {
auto numBoundNodes = csrHeader.offset->getNumValues();
csrHeader.offset->scanCommitted<ResidencyState::ON_DISK>(transaction, offsetState,
*nodeGroupScanState.header->offset);
csrHeader.length->scanCommitted<ResidencyState::ON_DISK>(transaction, lengthState,
*nodeGroupScanState.header->length);
nodeGroupScanState.numTotalRows =
nodeGroupScanState.header->getStartCSROffset(numBoundNodes);
}
nodeGroupScanState.header->randomLookup = relScanState.randomLookup;
}
void CSRNodeGroup::initScanForCommittedInMem(RelTableScanState& relScanState,
CSRNodeGroupScanState& nodeGroupScanState) {
relScanState.currBoundNodeIdx = 0;
nodeGroupScanState.source = CSRNodeGroupScanSource::COMMITTED_IN_MEMORY;
nodeGroupScanState.nextRowToScan = 0;
nodeGroupScanState.numCachedRows = 0;
nodeGroupScanState.inMemCSRList.clear();
}
NodeGroupScanResult CSRNodeGroup::scan(const Transaction* transaction,
TableScanState& state) const {
auto& relScanState = state.cast<RelTableScanState>();
auto& nodeGroupScanState = relScanState.nodeGroupScanState->cast<CSRNodeGroupScanState>();
while (true) {
switch (nodeGroupScanState.source) {
case CSRNodeGroupScanSource::COMMITTED_PERSISTENT: {
auto result = scanCommittedPersistent(transaction, relScanState, nodeGroupScanState);
if (result == NODE_GROUP_SCAN_EMPTY_RESULT && csrIndex) {
initScanForCommittedInMem(relScanState, nodeGroupScanState);
continue;
}
return result;
}
case CSRNodeGroupScanSource::COMMITTED_IN_MEMORY: {
relScanState.resetOutVectors();
const auto result = scanCommittedInMem(transaction, relScanState, nodeGroupScanState);
if (result == NODE_GROUP_SCAN_EMPTY_RESULT) {
relScanState.outState->getSelVectorUnsafe().setSelSize(0);
return NODE_GROUP_SCAN_EMPTY_RESULT;
}
return result;
}
case CSRNodeGroupScanSource::NONE: {
relScanState.outState->getSelVectorUnsafe().setSelSize(0);
return NODE_GROUP_SCAN_EMPTY_RESULT;
}
default: {
UNREACHABLE_CODE;
}
}
}
}
NodeGroupScanResult CSRNodeGroup::scanCommittedPersistent(const Transaction* transaction,
RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) const {
if (tableState.cachedBoundNodeSelVector.getSelSize() == 1) {
return scanCommittedPersistentWithoutCache(transaction, tableState, nodeGroupScanState);
}
return scanCommittedPersistentWithCache(transaction, tableState, nodeGroupScanState);
}
NodeGroupScanResult CSRNodeGroup::scanCommittedPersistentWithCache(const Transaction* transaction,
RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) const {
while (true) {
while (nodeGroupScanState.tryScanCachedTuples(tableState)) {
if (tableState.outState->getSelVector().getSelSize() > 0) {
return NodeGroupScanResult{nodeGroupScanState.nextRowToScan,
tableState.outState->getSelVector().getSelSize()};
}
}
if (nodeGroupScanState.nextRowToScan == nodeGroupScanState.numTotalRows ||
tableState.currBoundNodeIdx >= tableState.cachedBoundNodeSelVector.getSelSize()) {
return NODE_GROUP_SCAN_EMPTY_RESULT;
}
const auto currNodeOffset = tableState.nodeIDVector->readNodeOffset(
tableState.cachedBoundNodeSelVector[tableState.currBoundNodeIdx]);
const auto offsetInGroup = currNodeOffset % StorageConfig::NODE_GROUP_SIZE;
const auto startCSROffset = nodeGroupScanState.header->getStartCSROffset(offsetInGroup);
if (startCSROffset > nodeGroupScanState.nextRowToScan) {
nodeGroupScanState.nextRowToScan = startCSROffset;
}
DASSERT(nodeGroupScanState.nextRowToScan <= nodeGroupScanState.numTotalRows);
const auto numToScan =
std::min(nodeGroupScanState.numTotalRows - nodeGroupScanState.nextRowToScan,
DEFAULT_VECTOR_CAPACITY);
persistentChunkGroup->scan(transaction, tableState, nodeGroupScanState,
nodeGroupScanState.nextRowToScan, numToScan);
nodeGroupScanState.numCachedRows = numToScan;
nodeGroupScanState.nextRowToScan += numToScan;
if (tableState.outState->getSelVector().isUnfiltered()) {
nodeGroupScanState.cachedScannedVectorsSelBitset.reset();
} else {
nodeGroupScanState.cachedScannedVectorsSelBitset =
std::bitset<DEFAULT_VECTOR_CAPACITY>();
for (auto i = 0u; i < tableState.outState->getSelVector().getSelSize(); i++) {
nodeGroupScanState.cachedScannedVectorsSelBitset->set(
tableState.outState->getSelVector()[i], true);
}
}
}
}
NodeGroupScanResult CSRNodeGroup::scanCommittedPersistentWithoutCache(
const Transaction* transaction, RelTableScanState& tableState,
CSRNodeGroupScanState& nodeGroupScanState) const {
const auto currNodeOffset = tableState.nodeIDVector->readNodeOffset(
tableState.cachedBoundNodeSelVector[tableState.currBoundNodeIdx]);
const auto offsetInGroup = currNodeOffset % StorageConfig::NODE_GROUP_SIZE;
const auto csrListLength = nodeGroupScanState.header->getCSRLength(offsetInGroup);
if (nodeGroupScanState.nextRowToScan == csrListLength) {
return NODE_GROUP_SCAN_EMPTY_RESULT;
}
const auto startRow = nodeGroupScanState.header->getStartCSROffset(offsetInGroup) +
nodeGroupScanState.nextRowToScan;
const auto numToScan =
std::min(csrListLength - nodeGroupScanState.nextRowToScan, DEFAULT_VECTOR_CAPACITY);
persistentChunkGroup->scan(transaction, tableState, nodeGroupScanState, startRow, numToScan);
nodeGroupScanState.nextRowToScan += numToScan;
tableState.setNodeIDVectorToFlat(
tableState.cachedBoundNodeSelVector[tableState.currBoundNodeIdx]);
return NodeGroupScanResult{startRow, numToScan};
}
NodeGroupScanResult CSRNodeGroup::scanCommittedInMem(const Transaction* transaction,
RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) const {
while (true) {
if (tableState.currBoundNodeIdx >= tableState.cachedBoundNodeSelVector.getSelSize()) {
return NODE_GROUP_SCAN_EMPTY_RESULT;
}
if (nodeGroupScanState.inMemCSRList.rowIndices.empty()) {
const auto boundNodePos =
tableState.cachedBoundNodeSelVector[tableState.currBoundNodeIdx];
const auto boundNodeOffset = tableState.nodeIDVector->readNodeOffset(boundNodePos);
const auto offsetInGroup = boundNodeOffset % StorageConfig::NODE_GROUP_SIZE;
nodeGroupScanState.inMemCSRList = csrIndex->indices[offsetInGroup];
}
if (!nodeGroupScanState.inMemCSRList.isSequential) {
DASSERT(std::is_sorted(nodeGroupScanState.inMemCSRList.rowIndices.begin(),
nodeGroupScanState.inMemCSRList.rowIndices.end()));
}
auto scanResult =
nodeGroupScanState.inMemCSRList.isSequential ?
scanCommittedInMemSequential(transaction, tableState, nodeGroupScanState) :
scanCommittedInMemRandom(transaction, tableState, nodeGroupScanState);
if (scanResult == NODE_GROUP_SCAN_EMPTY_RESULT) {
tableState.currBoundNodeIdx++;
nodeGroupScanState.nextRowToScan = 0;
nodeGroupScanState.inMemCSRList.clear();
} else {
tableState.setNodeIDVectorToFlat(
tableState.cachedBoundNodeSelVector[tableState.currBoundNodeIdx]);
return scanResult;
}
}
}
NodeGroupScanResult CSRNodeGroup::scanCommittedInMemSequential(const Transaction* transaction,
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) const {
const auto startRow =
nodeGroupScanState.inMemCSRList.rowIndices[0] + nodeGroupScanState.nextRowToScan;
auto numRows =
std::min(nodeGroupScanState.inMemCSRList.rowIndices[1] - nodeGroupScanState.nextRowToScan,
DEFAULT_VECTOR_CAPACITY);
auto [chunkIdx, startRowInChunk] =
StorageUtils::getQuotientRemainder(startRow, StorageConfig::CHUNKED_NODE_GROUP_CAPACITY);
numRows = std::min(numRows, StorageConfig::CHUNKED_NODE_GROUP_CAPACITY - startRowInChunk);
if (numRows == 0) {
return NODE_GROUP_SCAN_EMPTY_RESULT;
}
const ChunkedNodeGroup* chunkedGroup = nullptr;
{
const auto lock = chunkedGroups.lock();
chunkedGroup = chunkedGroups.getGroup(lock, chunkIdx);
}
chunkedGroup->scan(transaction, tableState, nodeGroupScanState, startRowInChunk, numRows);
nodeGroupScanState.nextRowToScan += numRows;
return NodeGroupScanResult{startRow, numRows};
}
NodeGroupScanResult CSRNodeGroup::scanCommittedInMemRandom(const Transaction* transaction,
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) const {
const auto numRows = std::min(nodeGroupScanState.inMemCSRList.rowIndices.size() -
nodeGroupScanState.nextRowToScan,
DEFAULT_VECTOR_CAPACITY);
if (numRows == 0) {
return NODE_GROUP_SCAN_EMPTY_RESULT;
}
row_idx_t nextRow = 0;
ChunkedNodeGroup* chunkedGroup = nullptr;
node_group_idx_t currentChunkIdx = INVALID_NODE_GROUP_IDX;
sel_t numSelected = 0;
while (nextRow < numRows) {
const auto rowIdx =
nodeGroupScanState.inMemCSRList.rowIndices[nextRow + nodeGroupScanState.nextRowToScan];
auto [chunkIdx, rowInChunk] =
StorageUtils::getQuotientRemainder(rowIdx, StorageConfig::CHUNKED_NODE_GROUP_CAPACITY);
if (chunkIdx != currentChunkIdx) {
currentChunkIdx = chunkIdx;
const auto lock = chunkedGroups.lock();
chunkedGroup = chunkedGroups.getGroup(lock, chunkIdx);
}
DASSERT(chunkedGroup);
numSelected += chunkedGroup->lookup(transaction, tableState, nodeGroupScanState, rowInChunk,
numSelected);
nextRow++;
}
nodeGroupScanState.nextRowToScan += numRows;
tableState.outState->getSelVectorUnsafe().setSelSize(numSelected);
return NodeGroupScanResult{0, numRows};
}
void CSRNodeGroup::appendChunkedCSRGroup(const Transaction* transaction,
const std::vector<column_id_t>& columnIDs, InMemChunkedCSRNodeGroup& chunkedGroup) {
const auto& csrHeader = chunkedGroup.getCSRHeader();
std::vector<const ColumnChunkData*> chunkedGroupForProperties(chunkedGroup.getNumColumns());
for (auto i = 0u; i < chunkedGroup.getNumColumns(); i++) {
chunkedGroupForProperties[i] = &chunkedGroup.getColumnChunk(i);
}
auto startRow = NodeGroup::append(transaction, columnIDs, chunkedGroupForProperties, 0,
chunkedGroup.getNumRows());
if (!csrIndex) {
csrIndex = std::make_unique<CSRIndex>();
}
for (auto i = 0u; i < csrHeader.offset->getNumValues(); i++) {
const auto length = csrHeader.length->getValue<length_t>(i);
updateCSRIndex(i, startRow, length);
startRow += length;
}
}
void CSRNodeGroup::append(const Transaction* transaction, const std::vector<column_id_t>& columnIDs,
offset_t boundOffsetInGroup, std::span<const ColumnChunk*> chunks, row_idx_t startRowInChunks,
row_idx_t numRows) {
const auto startRow =
NodeGroup::append(transaction, columnIDs, chunks, startRowInChunks, numRows);
if (!csrIndex) {
csrIndex = std::make_unique<CSRIndex>();
}
updateCSRIndex(boundOffsetInGroup, startRow, 1 );
}
void CSRNodeGroup::updateCSRIndex(offset_t boundNodeOffsetInGroup, row_idx_t startRow,
length_t length) const {
auto& nodeCSRIndex = csrIndex->indices[boundNodeOffsetInGroup];
const auto isEmptyCSR = nodeCSRIndex.rowIndices.empty();
const auto appendToEndOfCSR =
!isEmptyCSR && nodeCSRIndex.isSequential &&
(nodeCSRIndex.rowIndices[0] + nodeCSRIndex.rowIndices[1] == startRow);
const bool sequential = isEmptyCSR || appendToEndOfCSR;
if (nodeCSRIndex.isSequential && !sequential) {
const auto csrListStartRow = nodeCSRIndex.rowIndices[0];
const auto csrListLength = nodeCSRIndex.rowIndices[1];
nodeCSRIndex.rowIndices.clear();
nodeCSRIndex.rowIndices.reserve(csrListLength + length);
for (auto j = 0u; j < csrListLength; j++) {
nodeCSRIndex.rowIndices.push_back(csrListStartRow + j);
}
}
if (sequential) {
nodeCSRIndex.isSequential = true;
if (!nodeCSRIndex.rowIndices.empty()) {
DASSERT(appendToEndOfCSR);
nodeCSRIndex.rowIndices[1] += length;
} else {
nodeCSRIndex.rowIndices.resize(2);
nodeCSRIndex.rowIndices[0] = startRow;
nodeCSRIndex.rowIndices[1] = length;
}
} else {
nodeCSRIndex.isSequential = false;
for (auto j = 0u; j < length; j++) {
nodeCSRIndex.rowIndices.push_back(startRow + j);
}
std::sort(nodeCSRIndex.rowIndices.begin(), nodeCSRIndex.rowIndices.end());
}
}
void CSRNodeGroup::update(const Transaction* transaction, CSRNodeGroupScanSource source,
row_idx_t rowIdxInGroup, column_id_t columnID, const ValueVector& propertyVector) {
switch (source) {
case CSRNodeGroupScanSource::COMMITTED_PERSISTENT: {
DASSERT(persistentChunkGroup);
return persistentChunkGroup->update(transaction, rowIdxInGroup, columnID, propertyVector);
}
case CSRNodeGroupScanSource::COMMITTED_IN_MEMORY: {
DASSERT(csrIndex);
auto [chunkIdx, rowInChunk] = StorageUtils::getQuotientRemainder(rowIdxInGroup,
StorageConfig::CHUNKED_NODE_GROUP_CAPACITY);
const auto lock = chunkedGroups.lock();
const auto chunkedGroup = chunkedGroups.getGroup(lock, chunkIdx);
return chunkedGroup->update(transaction, rowInChunk, columnID, propertyVector);
}
default: {
UNREACHABLE_CODE;
}
}
}
bool CSRNodeGroup::delete_(const Transaction* transaction, CSRNodeGroupScanSource source,
row_idx_t rowIdxInGroup) {
switch (source) {
case CSRNodeGroupScanSource::COMMITTED_PERSISTENT: {
DASSERT(persistentChunkGroup);
return persistentChunkGroup->delete_(transaction, rowIdxInGroup);
}
case CSRNodeGroupScanSource::COMMITTED_IN_MEMORY: {
DASSERT(csrIndex);
auto [chunkIdx, rowInChunk] = StorageUtils::getQuotientRemainder(rowIdxInGroup,
StorageConfig::CHUNKED_NODE_GROUP_CAPACITY);
const auto lock = chunkedGroups.lock();
const auto chunkedGroup = chunkedGroups.getGroup(lock, chunkIdx);
return chunkedGroup->delete_(transaction, rowInChunk);
}
default: {
return false;
}
}
}
void CSRNodeGroup::addColumn(TableAddColumnState& addColumnState, PageAllocator* pageAllocator,
ColumnStats* newColumnStats) {
if (persistentChunkGroup) {
persistentChunkGroup->addColumn(mm, addColumnState, enableCompression, pageAllocator,
newColumnStats);
}
NodeGroup::addColumn(addColumnState, pageAllocator, newColumnStats);
}
void CSRNodeGroup::serialize(Serializer& serializer) {
serializer.writeDebuggingInfo("node_group_idx");
serializer.write<node_group_idx_t>(nodeGroupIdx);
serializer.writeDebuggingInfo("enable_compression");
serializer.write<bool>(enableCompression);
serializer.writeDebuggingInfo("format");
serializer.write<NodeGroupDataFormat>(format);
serializer.writeDebuggingInfo("has_checkpointed_data");
serializer.write<bool>(persistentChunkGroup != nullptr);
if (persistentChunkGroup) {
serializer.writeDebuggingInfo("checkpointed_data");
persistentChunkGroup->serialize(serializer);
}
}
void CSRNodeGroup::checkpoint(MemoryManager&, NodeGroupCheckpointState& state) {
const auto lock = chunkedGroups.lock();
if (!persistentChunkGroup) {
checkpointInMemOnly(lock, state);
} else {
checkpointInMemAndOnDisk(lock, state);
}
checkpointDataTypesNoLock(state);
}
void CSRNodeGroup::reclaimStorage(PageAllocator& pageAllocator, const UniqLock& lock) const {
NodeGroup::reclaimStorage(pageAllocator, lock);
if (persistentChunkGroup) {
persistentChunkGroup->reclaimStorage(pageAllocator);
}
}
static std::unique_ptr<ChunkedCSRNodeGroup> createNewPersistentChunkGroup(
ChunkedCSRNodeGroup& oldPersistentChunkGroup, CSRNodeGroupCheckpointState& csrState) {
auto newGroup =
std::make_unique<ChunkedCSRNodeGroup>(oldPersistentChunkGroup, csrState.columnIDs);
oldPersistentChunkGroup.reclaimStorage(csrState.pageAllocator);
return newGroup;
}
void CSRNodeGroup::checkpointInMemAndOnDisk(const UniqLock& lock, NodeGroupCheckpointState& state) {
auto& csrState = state.cast<CSRNodeGroupCheckpointState>();
persistentChunkGroup->cast<ChunkedCSRNodeGroup>().scanCSRHeader(*state.mm, csrState);
csrState.newHeader =
std::make_unique<InMemChunkedCSRHeader>(*state.mm, false, StorageConfig::NODE_GROUP_SIZE);
csrState.newHeader->setNumValues(StorageConfig::NODE_GROUP_SIZE);
csrState.newHeader->copyFrom(*csrState.oldHeader);
auto leafRegions = collectLeafRegionsAndCSRLength(lock, csrState);
DASSERT(std::is_sorted(leafRegions.begin(), leafRegions.end(),
[](const auto& a, const auto& b) { return a.regionIdx < b.regionIdx; }));
const auto regionsToCheckpoint = mergeRegionsToCheckpoint(csrState, leafRegions);
if (regionsToCheckpoint.empty()) {
persistentChunkGroup->resetVersionAndUpdateInfo();
if (csrState.columnIDs.size() != persistentChunkGroup->getNumColumns()) {
persistentChunkGroup = createNewPersistentChunkGroup(
persistentChunkGroup->cast<ChunkedCSRNodeGroup>(), csrState);
}
return;
}
if (regionsToCheckpoint.size() == 1 &&
regionsToCheckpoint[0].level > DEFAULT_PACKED_CSR_INFO.calibratorTreeHeight) {
redistributeCSRRegions(csrState, leafRegions);
} else {
for (auto& region : regionsToCheckpoint) {
csrState.newHeader->populateRegionCSROffsets(region, *csrState.oldHeader);
DASSERT(csrState.oldHeader->getStartCSROffset(region.leftNodeOffset) ==
csrState.newHeader->getStartCSROffset(region.leftNodeOffset));
}
}
uint64_t numTuplesAfterCheckpoint = 0;
const auto numNodeOffsets = csrState.newHeader->length->getNumValues();
for (auto i = 0u; i < numNodeOffsets; ++i) {
numTuplesAfterCheckpoint += csrState.newHeader->getCSRLength(i);
}
if (numTuplesAfterCheckpoint == 0) {
reclaimStorage(csrState.pageAllocator, lock);
persistentChunkGroup = nullptr;
} else {
DASSERT(csrState.newHeader->sanityCheck());
for (const auto columnID : csrState.columnIDs) {
checkpointColumn(lock, columnID, csrState, regionsToCheckpoint);
}
checkpointCSRHeaderColumns(csrState);
persistentChunkGroup = createNewPersistentChunkGroup(
persistentChunkGroup->cast<ChunkedCSRNodeGroup>(), csrState);
}
finalizeCheckpoint(lock);
}
std::vector<CSRRegion> CSRNodeGroup::collectLeafRegionsAndCSRLength(const UniqLock& lock,
const CSRNodeGroupCheckpointState& csrState) const {
std::vector<CSRRegion> leafRegions;
constexpr auto numLeafRegions =
StorageConfig::NODE_GROUP_SIZE / StorageConfig::CSR_LEAF_REGION_SIZE;
leafRegions.reserve(numLeafRegions);
for (auto leafRegionIdx = 0u; leafRegionIdx < numLeafRegions; leafRegionIdx++) {
CSRRegion region(leafRegionIdx, 0 );
collectRegionChangesAndUpdateHeaderLength(lock, region, csrState);
leafRegions.push_back(std::move(region));
}
return leafRegions;
}
void CSRNodeGroup::redistributeCSRRegions(const CSRNodeGroupCheckpointState& csrState,
const std::vector<CSRRegion>& leafRegions) {
DASSERT(std::is_sorted(leafRegions.begin(), leafRegions.end(),
[](const auto& a, const auto& b) { return a.regionIdx < b.regionIdx; }));
DASSERT(std::all_of(leafRegions.begin(), leafRegions.end(),
[](const CSRRegion& region) { return region.level == 0; }));
UNUSED(leafRegions);
const auto rightCSROffsetOfRegions =
csrState.newHeader->populateStartCSROffsetsFromLength(true );
csrState.newHeader->populateEndCSROffsetFromStartAndLength();
csrState.newHeader->finalizeCSRRegionEndOffsets(rightCSROffsetOfRegions);
}
void CSRNodeGroup::checkpointColumn(const UniqLock& lock, column_id_t columnID,
const CSRNodeGroupCheckpointState& csrState, const std::vector<CSRRegion>& regions) const {
std::vector<ChunkCheckpointState> chunkCheckpointStates;
chunkCheckpointStates.reserve(regions.size());
for (auto& region : regions) {
if (!region.needCheckpointColumn(columnID)) {
continue;
}
auto regionCheckpointStates = checkpointColumnInRegion(lock, columnID, csrState, region);
for (auto& regionCheckpointState : regionCheckpointStates) {
chunkCheckpointStates.push_back(std::move(regionCheckpointState));
}
}
persistentChunkGroup->getColumnChunk(columnID).checkpoint(*csrState.columns[columnID],
std::move(chunkCheckpointStates), csrState.pageAllocator);
}
struct SegmentCursor {
SegmentCursor(LazySegmentScanner& scanner, offset_t leftCSROffset)
: scanner(scanner), it(scanner.begin()), curCSROffset(leftCSROffset) {}
void advance(offset_t n) {
curCSROffset += n;
it.advance(n);
}
void operator++() { advance(1); }
LazySegmentScanner& scanner;
LazySegmentScanner::Iterator it;
offset_t curCSROffset;
};
struct CheckpointReadCursor {
CheckpointReadCursor(LazySegmentScanner& scanner, offset_t leftCSROffset)
: cursor(scanner, leftCSROffset) {}
void advance(offset_t n) { cursor.advance(n); }
void operator++() { cursor.operator++(); }
offset_t getCSROffset() const { return cursor.curCSROffset; }
std::pair<ColumnChunkData*, offset_t> getDataToRead() {
if (cursor.it->segmentData == nullptr) {
cursor.scanner.scanSegmentIfNeeded(cursor.it.segmentIdx);
}
return {cursor.it->segmentData.get(), cursor.it.offsetInSegment};
}
bool canSkipRead() const { return cursor.it->segmentData == nullptr; }
template<typename Func>
void rangeSegments(common::length_t length, Func func) const {
cursor.scanner.rangeSegments(cursor.it, length, std::move(func));
}
SegmentCursor cursor;
};
class CheckpointWriteCursor {
public:
CheckpointWriteCursor(offset_t leftCSROffset, MemoryManager& memoryManager,
LogicalType& columnType, std::vector<ChunkCheckpointState>& outputSegments)
: segmentStartOffset(leftCSROffset), curCSROffset(leftCSROffset),
memoryManager(memoryManager), columnType(columnType), outputSegments(outputSegments) {
resetOutputChunk();
}
void advance(offset_t n) { curCSROffset += n; }
void operator++() { advance(1); }
offset_t getCSROffset() const { return curCSROffset; }
void finalize() {
if (currentOutputSegment->getNumValues() > 0) {
appendCurrentSegmentToOutput();
}
}
ColumnChunkData& getCurrentSegmentForWrite(offset_t numValuesToWrite) {
if (segmentStartOffset + currentOutputSegment->getNumValues() < curCSROffset) {
startNewSegment();
}
if (currentOutputSegment->getNumValues() + numValuesToWrite >
currentOutputSegment->getCapacity()) {
currentOutputSegment->resize(
std::bit_ceil(currentOutputSegment->getNumValues() + numValuesToWrite));
}
return *currentOutputSegment;
}
void appendToCurrentSegment(ColumnChunkData* data, offset_t srcOffset,
offset_t numValuesToAppend) {
getCurrentSegmentForWrite(numValuesToAppend).append(data, srcOffset, numValuesToAppend);
}
private:
offset_t getInitChunkCapacity() const { return DEFAULT_VECTOR_CAPACITY; }
void resetOutputChunk() {
currentOutputSegment = ColumnChunkFactory::createColumnChunkData(memoryManager,
columnType.copy(), false, getInitChunkCapacity(), ResidencyState::IN_MEMORY);
}
void appendCurrentSegmentToOutput() {
outputSegments.emplace_back(std::move(currentOutputSegment), segmentStartOffset,
currentOutputSegment->getNumValues());
}
void startNewSegment() {
if (currentOutputSegment->getNumValues() > 0) {
appendCurrentSegmentToOutput();
resetOutputChunk();
}
segmentStartOffset = curCSROffset;
}
offset_t segmentStartOffset;
std::unique_ptr<ColumnChunkData> currentOutputSegment;
offset_t curCSROffset;
MemoryManager& memoryManager;
LogicalType& columnType;
std::vector<ChunkCheckpointState>& outputSegments;
};
static bool canSkipWrite(CheckpointReadCursor& readCursor, CheckpointWriteCursor& writeCursor) {
return readCursor.getCSROffset() == writeCursor.getCSROffset() && readCursor.canSkipRead();
}
static ChunkState scanCommittedUpdates(const Transaction* transaction, ColumnChunk& persistentChunk,
Column* column, LazySegmentScanner& scanner, offset_t startCSROffset, offset_t numRowsToScan) {
ChunkState chunkState;
persistentChunk.initializeScanState(chunkState, column);
persistentChunk.scanCommitted<ResidencyState::ON_DISK>(transaction, chunkState, scanner,
startCSROffset, numRowsToScan);
return chunkState;
}
static void writeCSRListNoPersistentDeletions(CheckpointReadCursor& readCursor,
CheckpointWriteCursor& writeCursor, offset_t oldCSRLength) {
readCursor.rangeSegments(oldCSRLength,
[&](auto& segmentData, auto offsetInSegment, auto lengthInSegment, auto) {
if (!canSkipWrite(readCursor, writeCursor)) {
[[maybe_unused]] auto [readSegmentData, readOffsetInSegment] =
readCursor.getDataToRead();
DASSERT(readSegmentData == segmentData.segmentData.get() &&
readOffsetInSegment == offsetInSegment);
writeCursor.appendToCurrentSegment(segmentData.segmentData.get(), offsetInSegment,
lengthInSegment);
}
readCursor.advance(lengthInSegment);
writeCursor.advance(lengthInSegment);
});
}
static void writeCSRListWithPersistentDeletions(const Transaction* transaction,
CheckpointReadCursor& readCursor, CheckpointWriteCursor& writeCursor, offset_t oldCSRLength,
const ChunkedNodeGroup& persistentChunkGroup) {
for (auto i = 0u; i < oldCSRLength; i++) {
if (!persistentChunkGroup.isDeleted(transaction, readCursor.getCSROffset())) {
if (!canSkipWrite(readCursor, writeCursor)) {
auto [segmentData, offsetInSegment] = readCursor.getDataToRead();
writeCursor.appendToCurrentSegment(segmentData, offsetInSegment, 1);
}
++writeCursor;
}
++readCursor;
}
}
static void writeInMemoryCSRInsertion(const Transaction* transaction,
CheckpointWriteCursor& writeCursor, const ChunkedNodeGroup& chunkedGroup, row_idx_t rowInChunk,
column_id_t columnID, ChunkState& chunkState) {
DASSERT(!chunkedGroup.isDeleted(transaction, rowInChunk));
chunkedGroup.getColumnChunk(columnID).scanCommitted<ResidencyState::IN_MEMORY>(transaction,
chunkState, writeCursor.getCurrentSegmentForWrite(1), rowInChunk, 1);
++writeCursor;
}
static void fillCSRGaps(CheckpointReadCursor& readCursor, CheckpointWriteCursor& writeCursor,
ColumnChunkData* dummyChunkForNulls, length_t numOldGaps, length_t numGaps) {
auto numOldGapsRemaining = numOldGaps;
auto numGapsRemaining = numGaps;
if (readCursor.getCSROffset() < writeCursor.getCSROffset()) {
const auto numGapsToAdvance =
std::min(numOldGapsRemaining, writeCursor.getCSROffset() - readCursor.getCSROffset());
readCursor.advance(numGapsToAdvance);
numOldGapsRemaining -= numGapsToAdvance;
}
if (readCursor.getCSROffset() == writeCursor.getCSROffset()) {
auto numSkippableGaps = std::min(numGapsRemaining, numOldGapsRemaining);
numGapsRemaining -= numSkippableGaps;
writeCursor.advance(numSkippableGaps);
}
while (numGapsRemaining > 0) {
const auto numGapsToFill =
std::min(numGapsRemaining, static_cast<length_t>(DEFAULT_VECTOR_CAPACITY));
dummyChunkForNulls->setNumValues(numGapsToFill);
writeCursor.appendToCurrentSegment(dummyChunkForNulls, 0, numGapsToFill);
writeCursor.advance(numGapsToFill);
numGapsRemaining -= numGapsToFill;
}
readCursor.advance(numOldGapsRemaining);
}
std::vector<ChunkCheckpointState> CSRNodeGroup::checkpointColumnInRegion(const UniqLock& lock,
column_id_t columnID, const CSRNodeGroupCheckpointState& csrState,
const CSRRegion& region) const {
const auto* txn = csrState.transaction ? csrState.transaction : &DUMMY_CHECKPOINT_TRANSACTION;
const auto leftCSROffset = csrState.oldHeader->getStartCSROffset(region.leftNodeOffset);
DASSERT(leftCSROffset == csrState.newHeader->getStartCSROffset(region.leftNodeOffset));
const auto rightCSROffset = csrState.oldHeader->getEndCSROffset(region.rightNodeOffset);
const auto numOldRowsInRegion = rightCSROffset - leftCSROffset;
Column* column = csrState.columns[columnID];
LazySegmentScanner oldChunkScanner{*csrState.mm, column->getDataType().copy(),
enableCompression};
auto chunkState = scanCommittedUpdates(txn, persistentChunkGroup->getColumnChunk(columnID),
column, oldChunkScanner, leftCSROffset, numOldRowsInRegion);
const auto dummyChunkForNulls = ColumnChunkFactory::createColumnChunkData(*csrState.mm,
dataTypes[columnID].copy(), false, DEFAULT_VECTOR_CAPACITY, ResidencyState::IN_MEMORY);
dummyChunkForNulls->resetToAllNull();
std::vector<ChunkCheckpointState> ret;
CheckpointReadCursor readCursor{oldChunkScanner, leftCSROffset};
CheckpointWriteCursor writeCursor{leftCSROffset, *csrState.mm, column->getDataType(), ret};
for (auto nodeOffset = region.leftNodeOffset; nodeOffset <= region.rightNodeOffset;
nodeOffset++) {
const auto oldCSRLength = csrState.oldHeader->getCSRLength(nodeOffset);
DASSERT(csrState.newHeader->getStartCSROffset(nodeOffset) == writeCursor.getCSROffset());
DASSERT(csrState.oldHeader->getStartCSROffset(nodeOffset) == readCursor.getCSROffset());
if (!region.hasPersistentDeletions) {
writeCSRListNoPersistentDeletions(readCursor, writeCursor, oldCSRLength);
} else {
writeCSRListWithPersistentDeletions(txn, readCursor, writeCursor, oldCSRLength,
*persistentChunkGroup);
}
if (csrIndex) {
auto rows = csrIndex->indices[nodeOffset].getRows();
for (const auto row : rows) {
if (row == INVALID_ROW_IDX) {
continue;
}
auto [chunkIdx, rowInChunk] = StorageUtils::getQuotientRemainder(row,
StorageConfig::CHUNKED_NODE_GROUP_CAPACITY);
const auto chunkedGroup = chunkedGroups.getGroup(lock, chunkIdx);
writeInMemoryCSRInsertion(txn, writeCursor, *chunkedGroup, rowInChunk, columnID,
chunkState);
}
}
const length_t numGaps = csrState.newHeader->getGapSize(nodeOffset);
const length_t numOldGaps = csrState.oldHeader->getGapSize(nodeOffset);
DASSERT(numGaps == 0 || (nodeOffset == region.rightNodeOffset - 1) ||
(nodeOffset + 1) % StorageConfig::CSR_LEAF_REGION_SIZE == 0);
fillCSRGaps(readCursor, writeCursor, dummyChunkForNulls.get(), numOldGaps, numGaps);
}
writeCursor.finalize();
DASSERT(readCursor.getCSROffset() - leftCSROffset == numOldRowsInRegion);
DASSERT(
writeCursor.getCSROffset() == csrState.newHeader->getEndCSROffset(region.rightNodeOffset));
DASSERT(readCursor.getCSROffset() == writeCursor.getCSROffset() || ret.empty() ||
ret.back().startRow + ret.back().numRows == writeCursor.getCSROffset());
return ret;
}
void CSRNodeGroup::checkpointCSRHeaderColumns(const CSRNodeGroupCheckpointState& csrState) const {
std::vector<ChunkCheckpointState> csrOffsetChunkCheckpointStates;
const auto numNodes = csrState.newHeader->offset->getNumValues();
DASSERT(numNodes == csrState.newHeader->length->getNumValues());
csrOffsetChunkCheckpointStates.push_back(
ChunkCheckpointState{std::move(csrState.newHeader->offset), 0, numNodes});
persistentChunkGroup->cast<ChunkedCSRNodeGroup>().getCSRHeader().offset->checkpoint(
*csrState.csrOffsetColumn, std::move(csrOffsetChunkCheckpointStates),
csrState.pageAllocator);
std::vector<ChunkCheckpointState> csrLengthChunkCheckpointStates;
csrLengthChunkCheckpointStates.push_back(
ChunkCheckpointState{std::move(csrState.newHeader->length), 0, numNodes});
persistentChunkGroup->cast<ChunkedCSRNodeGroup>().getCSRHeader().length->checkpoint(
*csrState.csrLengthColumn, std::move(csrLengthChunkCheckpointStates),
csrState.pageAllocator);
}
void CSRNodeGroup::collectRegionChangesAndUpdateHeaderLength(const UniqLock& lock,
CSRRegion& region, const CSRNodeGroupCheckpointState& csrState) const {
collectInMemRegionChangesAndUpdateHeaderLength(lock, region, csrState);
collectOnDiskRegionChangesAndUpdateHeaderLength(lock, region, csrState);
}
void CSRNodeGroup::collectInMemRegionChangesAndUpdateHeaderLength(const UniqLock& lock,
CSRRegion& region, const CSRNodeGroupCheckpointState& csrState) const {
const auto* txn = csrState.transaction ? csrState.transaction : &DUMMY_CHECKPOINT_TRANSACTION;
row_idx_t numInsertionsInRegion = 0u;
if (csrIndex) {
for (auto nodeOffset = region.leftNodeOffset; nodeOffset <= region.rightNodeOffset;
nodeOffset++) {
auto rows = csrIndex->indices[nodeOffset].getRows();
row_idx_t numInsertedRows = rows.size();
row_idx_t numInMemDeletionsInCSR = 0;
for (auto i = 0u; i < rows.size(); i++) {
const auto row = rows[i];
auto [chunkIdx, rowInChunk] = StorageUtils::getQuotientRemainder(row,
StorageConfig::CHUNKED_NODE_GROUP_CAPACITY);
const auto chunkedGroup = chunkedGroups.getGroup(lock, chunkIdx);
if (chunkedGroup->isDeleted(txn, rowInChunk)) {
csrIndex->indices[nodeOffset].turnToNonSequential();
csrIndex->indices[nodeOffset].setInvalid(i);
numInMemDeletionsInCSR++;
}
}
DASSERT(numInMemDeletionsInCSR <= numInsertedRows);
numInsertedRows -= numInMemDeletionsInCSR;
const auto oldLength = csrState.oldHeader->getCSRLength(nodeOffset);
const auto newLength = oldLength + numInsertedRows;
csrState.newHeader->length->setValue<length_t>(newLength, nodeOffset);
numInsertionsInRegion += numInsertedRows;
}
}
region.hasInsertions = numInsertionsInRegion > 0;
region.sizeChange += static_cast<int64_t>(numInsertionsInRegion);
}
void CSRNodeGroup::collectOnDiskRegionChangesAndUpdateHeaderLength(const UniqLock&,
CSRRegion& region, const CSRNodeGroupCheckpointState& csrState) const {
collectPersistentUpdatesInRegion(region, csrState);
int64_t numDeletionsInRegion = 0u;
if (persistentChunkGroup) {
for (auto nodeOffset = region.leftNodeOffset; nodeOffset <= region.rightNodeOffset;
nodeOffset++) {
const auto numDeletedRows =
getNumDeletionsForNodeInPersistentData(nodeOffset, csrState);
if (numDeletedRows == 0) {
continue;
}
numDeletionsInRegion += numDeletedRows;
const auto currentLength = csrState.newHeader->getCSRLength(nodeOffset);
DASSERT(currentLength >= numDeletedRows);
csrState.newHeader->length->setValue<length_t>(currentLength - numDeletedRows,
nodeOffset);
}
}
region.hasPersistentDeletions = numDeletionsInRegion > 0;
region.sizeChange -= numDeletionsInRegion;
}
void CSRNodeGroup::collectPersistentUpdatesInRegion(CSRRegion& region,
const CSRNodeGroupCheckpointState& csrState) const {
const auto* txn = csrState.transaction ? csrState.transaction : &DUMMY_CHECKPOINT_TRANSACTION;
const auto leftCSROffset = csrState.oldHeader->getStartCSROffset(region.leftNodeOffset);
const auto rightCSROffset = csrState.oldHeader->getEndCSROffset(region.rightNodeOffset);
region.hasUpdates.resize(csrState.columnIDs.size(), false);
for (auto i = 0u; i < csrState.columnIDs.size(); i++) {
auto columnID = csrState.columnIDs[i];
if (persistentChunkGroup->hasAnyUpdates(txn, columnID, leftCSROffset,
rightCSROffset - leftCSROffset + 1)) {
region.hasUpdates[i] = true;
}
}
}
row_idx_t CSRNodeGroup::getNumDeletionsForNodeInPersistentData(offset_t nodeOffset,
const CSRNodeGroupCheckpointState& csrState) const {
const auto* txn = csrState.transaction ? csrState.transaction : &DUMMY_CHECKPOINT_TRANSACTION;
const auto length = csrState.oldHeader->getCSRLength(nodeOffset);
const auto startRow = csrState.oldHeader->getStartCSROffset(nodeOffset);
return persistentChunkGroup->getNumDeletions(txn, startRow, length);
}
static DataChunk initScanDataChunk(const CSRNodeGroupCheckpointState& csrState,
const std::vector<LogicalType>& dataTypes) {
const auto scanChunkState = std::make_shared<DataChunkState>();
DataChunk dataChunk(csrState.columnIDs.size(), scanChunkState);
for (auto i = 0u; i < csrState.columnIDs.size(); i++) {
const auto columnID = csrState.columnIDs[i];
DASSERT(columnID < dataTypes.size());
const auto valueVector =
std::make_shared<ValueVector>(dataTypes[columnID].copy(), csrState.mm);
dataChunk.insert(i, valueVector);
}
return dataChunk;
}
void CSRNodeGroup::checkpointInMemOnly(const UniqLock& lock, NodeGroupCheckpointState& state) {
const auto* txn = state.transaction ? state.transaction : &DUMMY_CHECKPOINT_TRANSACTION;
auto numRels = 0u;
for (auto& chunkedGroup : chunkedGroups.getAllGroups(lock)) {
numRels += chunkedGroup->getNumRows();
}
if (numRels == 0) {
return;
}
auto& csrState = state.cast<CSRNodeGroupCheckpointState>();
csrState.newHeader = std::make_unique<InMemChunkedCSRHeader>(*state.mm,
false , StorageConfig::NODE_GROUP_SIZE);
const auto numNodes = csrIndex->getMaxOffsetWithRels() + 1;
csrState.newHeader->setNumValues(numNodes);
populateCSRLengthInMemOnly(lock, numNodes, csrState);
const auto rightCSROffsetsOfRegions =
csrState.newHeader->populateStartCSROffsetsFromLength(true );
csrState.newHeader->populateEndCSROffsetFromStartAndLength();
csrState.newHeader->finalizeCSRRegionEndOffsets(rightCSROffsetsOfRegions);
const auto numColumnsToCheckpoint = csrState.columnIDs.size();
auto scanChunk = initScanDataChunk(csrState, dataTypes);
std::vector<const Column*> columns(numColumnsToCheckpoint);
for (auto i = 0u; i < numColumnsToCheckpoint; i++) {
columns[i] = csrState.columns[i];
}
std::vector<ValueVector*> outVectors;
for (auto i = 0u; i < numColumnsToCheckpoint; i++) {
outVectors.push_back(scanChunk.valueVectors[i].get());
}
auto scanState = std::make_unique<TableScanState>(nullptr, outVectors, scanChunk.state);
scanState->columnIDs = csrState.columnIDs;
scanState->columns = columns;
scanState->nodeGroupScanState =
std::make_unique<CSRNodeGroupScanState>(csrState.columnIDs.size());
auto dummyChunk = initScanDataChunk(csrState, dataTypes);
for (auto i = 0u; i < dummyChunk.getNumValueVectors(); i++) {
dummyChunk.getValueVectorMutable(i).setAllNull();
}
auto chunkCapacity = rightCSROffsetsOfRegions.back() + 1;
std::vector<std::unique_ptr<ColumnChunk>> dataChunksToFlush(numColumnsToCheckpoint);
for (auto i = 0u; i < numColumnsToCheckpoint; i++) {
const auto columnID = csrState.columnIDs[i];
DASSERT(columnID < dataTypes.size());
dataChunksToFlush[i] = std::make_unique<ColumnChunk>(*state.mm, dataTypes[columnID].copy(),
chunkCapacity, enableCompression, ResidencyState::IN_MEMORY);
}
for (auto offset = 0u; offset < numNodes; offset++) {
const auto numRows = csrIndex->getNumRows(offset);
auto rows = csrIndex->indices[offset].getRows();
auto numRowsTryAppended = 0u;
while (numRowsTryAppended < numRows) {
const auto maxNumRowsToAppend =
std::min(numRows - numRowsTryAppended, DEFAULT_VECTOR_CAPACITY);
auto numRowsToAppend = 0u;
for (auto i = 0u; i < maxNumRowsToAppend; i++) {
const auto row = rows[numRowsTryAppended + i];
if (row == INVALID_ROW_IDX) {
continue;
}
scanState->rowIdxVector->setValue<row_idx_t>(numRowsToAppend++, row);
}
scanChunk.state->getSelVectorUnsafe().setSelSize(numRowsToAppend);
if (numRowsToAppend > 0) {
[[maybe_unused]] auto res = lookupMultiple(lock, txn, *scanState);
for (auto idx = 0u; idx < numColumnsToCheckpoint; idx++) {
dataChunksToFlush[idx]->append(scanChunk.valueVectors[idx].get(),
scanChunk.state->getSelVector());
}
}
numRowsTryAppended += maxNumRowsToAppend;
}
auto gapSize = csrState.newHeader->getGapSize(offset);
while (gapSize > 0) {
DASSERT((offset == numNodes - 1) ||
(offset + 1) % StorageConfig::CSR_LEAF_REGION_SIZE == 0);
const auto numGapsToAppend = std::min(gapSize, DEFAULT_VECTOR_CAPACITY);
DASSERT(dummyChunk.state->getSelVector().isUnfiltered());
dummyChunk.state->getSelVectorUnsafe().setSelSize(numGapsToAppend);
for (auto columnID = 0u; columnID < numColumnsToCheckpoint; columnID++) {
dataChunksToFlush[columnID]->append(dummyChunk.valueVectors[columnID].get(),
dummyChunk.state->getSelVector());
}
gapSize -= numGapsToAppend;
}
}
for (const auto& chunk : dataChunksToFlush) {
chunk->flush(csrState.pageAllocator);
}
csrState.newHeader->offset->flush(csrState.pageAllocator);
csrState.newHeader->length->flush(csrState.pageAllocator);
persistentChunkGroup = std::make_unique<ChunkedCSRNodeGroup>(
ChunkedCSRHeader(false , std::move(*csrState.newHeader)),
std::move(dataChunksToFlush), 0);
chunkedGroups.clear(lock);
numRows = 0;
csrIndex.reset();
}
void CSRNodeGroup::populateCSRLengthInMemOnly(const UniqLock& lock, offset_t numNodes,
const CSRNodeGroupCheckpointState& csrState) {
const auto* txn = csrState.transaction ? csrState.transaction : &DUMMY_CHECKPOINT_TRANSACTION;
for (auto offset = 0u; offset < numNodes; offset++) {
auto rows = csrIndex->indices[offset].getRows();
const length_t length = rows.size();
auto lengthAfterDelete = length;
for (auto i = 0u; i < rows.size(); i++) {
const auto row = rows[i];
auto [chunkIdx, rowInChunk] =
StorageUtils::getQuotientRemainder(row, StorageConfig::CHUNKED_NODE_GROUP_CAPACITY);
const auto chunkedGroup = chunkedGroups.getGroup(lock, chunkIdx);
const auto isDeleted = chunkedGroup->isDeleted(txn, rowInChunk);
if (isDeleted) {
csrIndex->indices[offset].turnToNonSequential();
csrIndex->indices[offset].setInvalid(i);
lengthAfterDelete--;
}
}
DASSERT(lengthAfterDelete <= length);
csrState.newHeader->length->setValue<length_t>(lengthAfterDelete, offset);
}
}
std::vector<CSRRegion> CSRNodeGroup::mergeRegionsToCheckpoint(
const CSRNodeGroupCheckpointState& csrState, const std::vector<CSRRegion>& leafRegions) {
DASSERT(std::all_of(leafRegions.begin(), leafRegions.end(),
[](const CSRRegion& region) { return region.level == 0; }));
DASSERT(std::is_sorted(leafRegions.begin(), leafRegions.end(),
[](const CSRRegion& a, const CSRRegion& b) { return a.regionIdx < b.regionIdx; }));
constexpr auto numLeafRegions =
StorageConfig::NODE_GROUP_SIZE / StorageConfig::CSR_LEAF_REGION_SIZE;
DASSERT(leafRegions.size() == numLeafRegions);
std::vector<CSRRegion> mergedRegions;
idx_t leafRegionIdx = 0u;
while (leafRegionIdx < numLeafRegions) {
auto region = leafRegions[leafRegionIdx];
if (!region.needCheckpoint()) {
leafRegionIdx++;
continue;
}
while (!isWithinDensityBound(*csrState.oldHeader, leafRegions, region)) {
region = CSRRegion::upgradeLevel(leafRegions, region);
if (region.level > DEFAULT_PACKED_CSR_INFO.calibratorTreeHeight) {
return {region};
}
}
leafRegionIdx = region.getRightLeafRegionIdx() + 1;
std::erase_if(mergedRegions, [&](const CSRRegion& r) { return r.isWithin(region); });
mergedRegions.push_back(region);
}
std::sort(mergedRegions.begin(), mergedRegions.end(),
[](const CSRRegion& a, const CSRRegion& b) {
return a.getLeftLeafRegionIdx() < b.getLeftLeafRegionIdx();
});
return mergedRegions;
}
static double getHighDensity(uint64_t level) {
DASSERT(level <= CSRNodeGroup::DEFAULT_PACKED_CSR_INFO.calibratorTreeHeight);
if (level == 0) {
return StorageConstants::LEAF_HIGH_CSR_DENSITY;
}
return StorageConstants::PACKED_CSR_DENSITY +
CSRNodeGroup::DEFAULT_PACKED_CSR_INFO.highDensityStep *
static_cast<double>(
CSRNodeGroup::DEFAULT_PACKED_CSR_INFO.calibratorTreeHeight - level);
}
bool CSRNodeGroup::isWithinDensityBound(const InMemChunkedCSRHeader& header,
const std::vector<CSRRegion>& leafRegions, const CSRRegion& region) {
int64_t oldSize = 0;
for (auto offset = region.leftNodeOffset; offset <= region.rightNodeOffset; offset++) {
oldSize += header.getCSRLength(offset);
}
int64_t sizeChange = 0;
const idx_t leftRegionIdx = region.getLeftLeafRegionIdx();
const idx_t rightRegionIdx = region.getRightLeafRegionIdx();
for (auto regionIdx = leftRegionIdx; regionIdx <= rightRegionIdx; regionIdx++) {
sizeChange += leafRegions[regionIdx].sizeChange;
}
DASSERT(sizeChange >= 0 || sizeChange < oldSize);
const auto newSize = oldSize + sizeChange;
const auto capacity = header.getEndCSROffset(region.rightNodeOffset) -
header.getStartCSROffset(region.leftNodeOffset);
const double ratio = static_cast<double>(newSize) / static_cast<double>(capacity);
return ratio <= getHighDensity(region.level);
}
void CSRNodeGroup::finalizeCheckpoint(const UniqLock& lock) {
if (persistentChunkGroup) {
persistentChunkGroup->resetNumRowsFromChunks();
persistentChunkGroup->resetVersionAndUpdateInfo();
}
chunkedGroups.clear(lock);
numRows = 0;
csrIndex.reset();
}
} }