#include "storage/table/ice_disk_rel_table.h"
#include <filesystem>
#include <queue>
#include "catalog/catalog_entry/rel_group_catalog_entry.h"
#include "common/data_chunk/sel_vector.h"
#include "common/exception/runtime.h"
#include "common/file_system/virtual_file_system.h"
#include "common/string_utils.h"
#include "main/client_context.h"
#include "processor/operator/persistent/reader/parquet/parquet_reader.h"
#include "storage/storage_manager.h"
#include "storage/table/ice_disk_utils.h"
#include "transaction/transaction.h"
using namespace lbug::catalog;
using namespace lbug::common;
using namespace lbug::processor;
using namespace lbug::transaction;
namespace lbug {
namespace storage {
void IceDiskRelTableScanState::setToTable(const Transaction* transaction, Table* table_,
std::vector<column_id_t> columnIDs_, std::vector<ColumnPredicateSet> columnPredicateSets_,
RelDataDirection direction_) {
TableScanState::setToTable(transaction, table_, std::move(columnIDs_),
std::move(columnPredicateSets_));
columns.resize(columnIDs.size());
direction = direction_;
for (size_t i = 0; i < columnIDs.size(); ++i) {
auto columnID = columnIDs[i];
if (columnID == INVALID_COLUMN_ID || columnID == ROW_IDX_COLUMN_ID) {
columns[i] = nullptr;
} else {
columns[i] = table->cast<RelTable>().getColumn(columnID, direction);
}
}
csrOffsetColumn = table->cast<RelTable>().getCSROffsetColumn(direction);
csrLengthColumn = table->cast<RelTable>().getCSRLengthColumn(direction);
nodeGroupIdx = INVALID_NODE_GROUP_IDX;
}
void IceDiskRelTableScanState::reloadCachedBatchData(Transaction* transaction) {
auto context = transaction->getClientContext();
auto numIndicesColumns = indicesReader->getNumColumns();
cachedBatchData = std::make_unique<DataChunk>(numIndicesColumns);
auto memoryManager = MemoryManager::Get(*context);
for (uint32_t colIdx = 0; colIdx < numIndicesColumns; ++colIdx) {
const auto& columnTypeRef = indicesReader->getColumnType(colIdx);
auto columnType = columnTypeRef.copy();
auto vector = std::make_shared<ValueVector>(std::move(columnType), memoryManager);
cachedBatchData->insert(colIdx, vector);
}
indicesReader->scan(*parquetScanState, *cachedBatchData);
}
IceDiskRelTable::IceDiskRelTable(RelGroupCatalogEntry* relGroupEntry, table_id_t fromTableID,
table_id_t toTableID, const StorageManager* storageManager, MemoryManager* memoryManager,
main::ClientContext* context)
: ColumnarRelTableBase{relGroupEntry, fromTableID, toTableID, storageManager, memoryManager},
layout{IceDiskRelTableLayout::CSR} {
const auto& storage = relGroupEntry->getStorage();
if (common::StringUtils::getLower(storage).ends_with("parquet")) {
layout = IceDiskRelTableLayout::FLAT;
auto resolvedFlatPath = VirtualFileSystem::resolvePath(context, storage);
IceDiskUtils::checkVersionCompatibility(context, resolvedFlatPath);
indicesFilePath = resolvedFlatPath;
return;
}
auto paths = IceDiskUtils::constructCSRPaths(storage, relGroupEntry->getName(), ".parquet");
auto resolvedIndicesPath = VirtualFileSystem::resolvePath(context, paths.indices);
IceDiskUtils::checkVersionCompatibility(context, resolvedIndicesPath);
auto resolvedIndptrPath = VirtualFileSystem::resolvePath(context, paths.indptr);
IceDiskUtils::checkVersionCompatibility(context, resolvedIndptrPath);
indicesFilePath = resolvedIndicesPath;
indptrFilePath = resolvedIndptrPath;
}
void IceDiskRelTable::initScanState(Transaction* transaction, TableScanState& scanState,
bool resetCachedBoundNodeSelVec) const {
auto& relScanState = scanState.cast<RelTableScanState>();
relScanState.source = TableScanSource::COMMITTED;
relScanState.nodeGroup = nullptr;
relScanState.nodeGroupIdx = INVALID_NODE_GROUP_IDX;
if (resetCachedBoundNodeSelVec) {
if (relScanState.nodeIDVector->state->getSelVector().isUnfiltered()) {
relScanState.cachedBoundNodeSelVector.setToUnfiltered();
} else {
relScanState.cachedBoundNodeSelVector.setToFiltered();
memcpy(relScanState.cachedBoundNodeSelVector.getMutableBuffer().data(),
relScanState.nodeIDVector->state->getSelVector().getMutableBuffer().data(),
relScanState.nodeIDVector->state->getSelVector().getSelSize() * sizeof(sel_t));
}
relScanState.cachedBoundNodeSelVector.setSelSize(
relScanState.nodeIDVector->state->getSelVector().getSelSize());
}
auto context = transaction->getClientContext();
auto vfs = VirtualFileSystem::GetUnsafe(*context);
auto& iceDiskScanState = static_cast<IceDiskRelTableScanState&>(relScanState);
if (!iceDiskScanState.indicesReader) {
iceDiskScanState.indicesReader =
std::make_unique<ParquetReader>(indicesFilePath, std::vector<bool>{}, context);
}
if (layout == IceDiskRelTableLayout::CSR && !iceDiskScanState.indptrReader) {
iceDiskScanState.indptrReader =
std::make_unique<ParquetReader>(indptrFilePath, std::vector<bool>{}, context);
}
if (layout == IceDiskRelTableLayout::CSR) {
loadIndptrData(transaction);
}
auto numRowGroups = iceDiskScanState.indicesReader->getNumRowGroups();
std::vector<uint64_t> rowGroupsToProcess;
for (uint64_t i = 0; i < numRowGroups; ++i) {
rowGroupsToProcess.push_back(i);
}
std::unordered_map<common::offset_t, common::sel_t> boundNodeOffsets;
for (size_t i = 0; i < iceDiskScanState.cachedBoundNodeSelVector.getSelSize(); ++i) {
common::sel_t boundNodeIdx = iceDiskScanState.cachedBoundNodeSelVector[i];
const auto boundNodeID = iceDiskScanState.nodeIDVector->getValue<nodeID_t>(boundNodeIdx);
boundNodeOffsets.insert({boundNodeID.offset, boundNodeIdx});
}
iceDiskScanState.reset(std::move(boundNodeOffsets));
iceDiskScanState.indicesReader->initializeScan(*iceDiskScanState.parquetScanState,
rowGroupsToProcess, vfs);
}
void IceDiskRelTable::initializeParquetReaders(Transaction* transaction) const {
if (!indicesReader) {
std::lock_guard lock(parquetReaderMutex);
if (!indicesReader) {
std::vector<bool> columnSkips; auto context = transaction->getClientContext();
indicesReader = std::make_unique<ParquetReader>(indicesFilePath, columnSkips, context);
}
}
}
void IceDiskRelTable::initializeIndptrReader(Transaction* transaction) const {
if (!indptrFilePath.empty() && !indptrReader) {
std::lock_guard lock(parquetReaderMutex);
if (!indptrReader) {
std::vector<bool> columnSkips; auto context = transaction->getClientContext();
indptrReader = std::make_unique<ParquetReader>(indptrFilePath, columnSkips, context);
}
}
}
void IceDiskRelTable::loadIndptrData(Transaction* transaction) const {
if (indptrData.empty() && !indptrFilePath.empty()) {
std::lock_guard lock(indptrDataMutex);
if (indptrData.empty()) {
initializeIndptrReader(transaction);
if (!indptrReader)
return;
auto context = transaction->getClientContext();
auto vfs = VirtualFileSystem::GetUnsafe(*context);
std::vector<uint64_t> groupsToRead;
for (uint64_t i = 0; i < indptrReader->getNumRowGroups(); ++i) {
groupsToRead.push_back(i);
}
ParquetReaderScanState scanState;
indptrReader->initializeScan(scanState, groupsToRead, vfs);
auto numColumns = indptrReader->getNumColumns();
if (numColumns == 0) {
throw RuntimeException("Indptr parquet file has no columns");
}
const auto& indptrType = indptrReader->getColumnType(0);
if (!LogicalTypeUtils::isIntegral(indptrType.getLogicalTypeID())) {
throw RuntimeException(
"Indptr parquet file column must be integer type (column 0)");
}
DataChunk dataChunk(1);
const auto& columnTypeRef = indptrReader->getColumnType(0);
auto columnType = columnTypeRef.copy();
auto vector = std::make_shared<ValueVector>(std::move(columnType));
dataChunk.insert(0, vector);
while (indptrReader->scanInternal(scanState, dataChunk)) {
auto selSize = dataChunk.state->getSelVector().getSelSize();
for (size_t i = 0; i < selSize; ++i) {
auto value = dataChunk.getValueVector(0).getValue<common::offset_t>(i);
indptrData.push_back(value);
}
}
}
}
}
bool IceDiskRelTable::scanInternal(Transaction* transaction, TableScanState& scanState) {
auto& iceDiskScanState = static_cast<IceDiskRelTableScanState&>(scanState);
if (layout == IceDiskRelTableLayout::FLAT) {
return scanFlat(transaction, iceDiskScanState);
}
return scanCSR(transaction, iceDiskScanState);
}
bool IceDiskRelTable::scanCSR(Transaction* transaction,
IceDiskRelTableScanState& iceDiskScanState) {
iceDiskScanState.resetOutVectors();
if (iceDiskScanState.boundNodeOffsets.empty()) {
iceDiskScanState.outState->getSelVectorUnsafe().setToFiltered(0);
return false;
}
loadIndptrData(transaction);
const auto isFwd = iceDiskScanState.direction != RelDataDirection::BWD;
uint64_t totalRowsCollected = 0;
const uint64_t maxRowsPerCall = DEFAULT_VECTOR_CAPACITY;
auto activeBoundSelPos = INVALID_SEL;
auto activeBoundOffset = INVALID_OFFSET;
auto hasActiveBound = false;
auto differentBoundNodeEncountered = false;
while (totalRowsCollected < maxRowsPerCall) {
if (!iceDiskScanState.cachedBatchData ||
iceDiskScanState.currentLocalRowIdx ==
iceDiskScanState.cachedBatchData->state->getSelVector().getSelSize()) {
iceDiskScanState.currentBatchStartOffset += iceDiskScanState.currentLocalRowIdx;
iceDiskScanState.currentLocalRowIdx = 0;
iceDiskScanState.reloadCachedBatchData(transaction);
}
auto selSize = iceDiskScanState.cachedBatchData->state->getSelVector().getSelSize();
if (selSize == 0) {
break; }
for (; iceDiskScanState.currentLocalRowIdx < selSize && totalRowsCollected < maxRowsPerCall;
++iceDiskScanState.currentLocalRowIdx) {
const auto currentGlobalRowIdx =
iceDiskScanState.currentBatchStartOffset + iceDiskScanState.currentLocalRowIdx;
const auto sourceNodeOffset = findSourceNodeForRow(currentGlobalRowIdx);
if (sourceNodeOffset == common::INVALID_OFFSET) {
continue; }
const auto dstOffset =
iceDiskScanState.cachedBatchData->getValueVector(0).getValue<common::offset_t>(
iceDiskScanState.currentLocalRowIdx);
const auto boundOffset = isFwd ? sourceNodeOffset : dstOffset;
if (iceDiskScanState.boundNodeOffsets.find(boundOffset) ==
iceDiskScanState.boundNodeOffsets.end()) {
continue; }
if (!hasActiveBound) {
hasActiveBound = true;
activeBoundOffset = boundOffset;
activeBoundSelPos = iceDiskScanState.boundNodeOffsets.at(boundOffset);
} else if (boundOffset != activeBoundOffset) {
differentBoundNodeEncountered = true;
break;
}
const auto nbrOffset = isFwd ? dstOffset : sourceNodeOffset;
const auto nbrTableID = isFwd ? getToNodeTableID() : getFromNodeTableID();
auto nbrNodeID = internalID_t(nbrOffset, nbrTableID);
if (!iceDiskScanState.outputVectors.empty()) {
iceDiskScanState.outputVectors[0]->setValue(totalRowsCollected, nbrNodeID);
}
for (uint64_t outCol = 1; outCol < iceDiskScanState.outputVectors.size(); ++outCol) {
if (outCol >= iceDiskScanState.columnIDs.size()) {
continue;
}
const auto colID = iceDiskScanState.columnIDs[outCol];
if (colID == INVALID_COLUMN_ID || colID == ROW_IDX_COLUMN_ID ||
colID == NBR_ID_COLUMN_ID) {
continue;
}
if (colID == REL_ID_COLUMN_ID) {
iceDiskScanState.outputVectors[outCol]->setValue<internalID_t>(
totalRowsCollected, internalID_t{currentGlobalRowIdx, getTableID()});
continue;
}
if (colID == 0 ||
colID - 1 >= iceDiskScanState.cachedBatchData->getNumValueVectors()) {
continue;
}
iceDiskScanState.outputVectors[outCol]->copyFromVectorData(totalRowsCollected,
&iceDiskScanState.cachedBatchData->getValueVector(colID - 1),
iceDiskScanState.currentLocalRowIdx);
}
totalRowsCollected++;
}
if (differentBoundNodeEncountered) {
break;
}
}
if (totalRowsCollected > 0) {
auto& selVector = iceDiskScanState.outState->getSelVectorUnsafe();
selVector.setToUnfiltered(totalRowsCollected);
iceDiskScanState.setNodeIDVectorToFlat(activeBoundSelPos);
return true;
} else {
iceDiskScanState.outState->getSelVectorUnsafe().setToFiltered(0);
return false;
}
}
bool IceDiskRelTable::scanFlat(Transaction* transaction,
IceDiskRelTableScanState& iceDiskScanState) {
iceDiskScanState.resetOutVectors();
if (iceDiskScanState.boundNodeOffsets.empty()) {
iceDiskScanState.outState->getSelVectorUnsafe().setToFiltered(0);
return false;
}
const auto isFwd = iceDiskScanState.direction != RelDataDirection::BWD;
uint64_t totalRowsCollected = 0;
const uint64_t maxRowsPerCall = DEFAULT_VECTOR_CAPACITY;
auto activeBoundSelPos = INVALID_SEL;
auto activeBoundOffset = INVALID_OFFSET;
auto hasActiveBound = false;
auto differentBoundNodeEncountered = false;
while (totalRowsCollected < maxRowsPerCall) {
if (!iceDiskScanState.cachedBatchData ||
iceDiskScanState.currentLocalRowIdx ==
iceDiskScanState.cachedBatchData->state->getSelVector().getSelSize()) {
iceDiskScanState.currentBatchStartOffset += iceDiskScanState.currentLocalRowIdx;
iceDiskScanState.currentLocalRowIdx = 0;
iceDiskScanState.reloadCachedBatchData(transaction);
}
auto selSize = iceDiskScanState.cachedBatchData->state->getSelVector().getSelSize();
if (selSize == 0) {
break;
}
for (; iceDiskScanState.currentLocalRowIdx < selSize && totalRowsCollected < maxRowsPerCall;
++iceDiskScanState.currentLocalRowIdx) {
if (iceDiskScanState.cachedBatchData->getNumValueVectors() < 2) {
throw RuntimeException("Flat icebug-disk relationship parquet file requires source "
"and target offset columns");
}
const auto currentGlobalRowIdx =
iceDiskScanState.currentBatchStartOffset + iceDiskScanState.currentLocalRowIdx;
const auto srcOffset =
iceDiskScanState.cachedBatchData->getValueVector(0).getValue<common::offset_t>(
iceDiskScanState.currentLocalRowIdx);
const auto dstOffset =
iceDiskScanState.cachedBatchData->getValueVector(1).getValue<common::offset_t>(
iceDiskScanState.currentLocalRowIdx);
const auto boundOffset = isFwd ? srcOffset : dstOffset;
auto boundIt = iceDiskScanState.boundNodeOffsets.find(boundOffset);
if (boundIt == iceDiskScanState.boundNodeOffsets.end()) {
continue;
}
if (!hasActiveBound) {
hasActiveBound = true;
activeBoundOffset = boundOffset;
activeBoundSelPos = boundIt->second;
} else if (boundOffset != activeBoundOffset) {
differentBoundNodeEncountered = true;
break;
}
const auto nbrOffset = isFwd ? dstOffset : srcOffset;
const auto nbrTableID = isFwd ? getToNodeTableID() : getFromNodeTableID();
if (!iceDiskScanState.outputVectors.empty()) {
iceDiskScanState.outputVectors[0]->setValue(totalRowsCollected,
internalID_t(nbrOffset, nbrTableID));
}
for (uint64_t outCol = 1; outCol < iceDiskScanState.outputVectors.size(); ++outCol) {
if (outCol >= iceDiskScanState.columnIDs.size()) {
continue;
}
const auto colID = iceDiskScanState.columnIDs[outCol];
if (colID == INVALID_COLUMN_ID || colID == ROW_IDX_COLUMN_ID ||
colID == NBR_ID_COLUMN_ID) {
continue;
}
if (colID == REL_ID_COLUMN_ID) {
iceDiskScanState.outputVectors[outCol]->setValue<internalID_t>(
totalRowsCollected, internalID_t{currentGlobalRowIdx, getTableID()});
continue;
}
if (colID >= iceDiskScanState.cachedBatchData->getNumValueVectors()) {
continue;
}
iceDiskScanState.outputVectors[outCol]->copyFromVectorData(totalRowsCollected,
&iceDiskScanState.cachedBatchData->getValueVector(colID),
iceDiskScanState.currentLocalRowIdx);
}
totalRowsCollected++;
}
if (differentBoundNodeEncountered) {
break;
}
}
if (totalRowsCollected > 0) {
auto& selVector = iceDiskScanState.outState->getSelVectorUnsafe();
selVector.setToUnfiltered(totalRowsCollected);
iceDiskScanState.setNodeIDVectorToFlat(activeBoundSelPos);
return true;
}
iceDiskScanState.outState->getSelVectorUnsafe().setToFiltered(0);
return false;
}
common::offset_t IceDiskRelTable::findSourceNodeForRow(common::offset_t globalRowIdx) const {
return findSourceNodeForRowInternal(globalRowIdx, indptrData);
}
row_idx_t IceDiskRelTable::getTotalRowCount(const Transaction* transaction) const {
initializeParquetReaders(const_cast<Transaction*>(transaction));
if (!indicesReader) {
return 0;
}
auto metadata = indicesReader->getMetadata();
return metadata ? metadata->num_rows : 0;
}
row_idx_t IceDiskRelTable::getActiveBoundNodeCount(const Transaction* transaction,
RelDataDirection direction) const {
if (layout != IceDiskRelTableLayout::CSR || direction == RelDataDirection::BWD) {
return 0;
}
loadIndptrData(const_cast<Transaction*>(transaction));
row_idx_t result = 0;
for (offset_t i = 0; i + 1 < indptrData.size(); ++i) {
result += indptrData[i + 1] > indptrData[i];
}
return result;
}
std::vector<std::pair<offset_t, row_idx_t>> IceDiskRelTable::getAllDegreeEntries(
const Transaction* transaction, RelDataDirection direction) const {
if (layout != IceDiskRelTableLayout::CSR || direction == RelDataDirection::BWD) {
return {};
}
loadIndptrData(const_cast<Transaction*>(transaction));
std::vector<std::pair<offset_t, row_idx_t>> result;
result.reserve(indptrData.size());
for (offset_t i = 0; i + 1 < indptrData.size(); ++i) {
if (indptrData[i + 1] <= indptrData[i]) {
continue;
}
result.emplace_back(i, indptrData[i + 1] - indptrData[i]);
}
return result;
}
std::vector<std::pair<offset_t, row_idx_t>> IceDiskRelTable::getTopKDegreeEntries(
const Transaction* transaction, RelDataDirection direction, idx_t k) const {
if (layout != IceDiskRelTableLayout::CSR || direction == RelDataDirection::BWD || k == 0) {
return {};
}
loadIndptrData(const_cast<Transaction*>(transaction));
using degree_entry_t = std::pair<offset_t, row_idx_t>;
auto better = [](const degree_entry_t& a, const degree_entry_t& b) {
return a.second > b.second || (a.second == b.second && a.first < b.first);
};
auto worseForHeap = [better](const degree_entry_t& a, const degree_entry_t& b) {
return better(a, b);
};
std::priority_queue<degree_entry_t, std::vector<degree_entry_t>, decltype(worseForHeap)> heap{
worseForHeap};
for (offset_t i = 0; i + 1 < indptrData.size(); ++i) {
if (indptrData[i + 1] <= indptrData[i]) {
continue;
}
const auto degree = indptrData[i + 1] - indptrData[i];
degree_entry_t entry{i, degree};
if (heap.size() < k) {
heap.push(entry);
} else if (better(entry, heap.top())) {
heap.pop();
heap.push(entry);
}
}
std::vector<degree_entry_t> result;
while (!heap.empty()) {
result.push_back(heap.top());
heap.pop();
}
std::sort(result.begin(), result.end(), better);
return result;
}
} }