#include "storage/table/parquet_node_table.h"
#include <mutex>
#include "catalog/catalog_entry/node_table_catalog_entry.h"
#include "common/data_chunk/sel_vector.h"
#include "common/exception/runtime.h"
#include "common/file_system/virtual_file_system.h"
#include "common/types/value/value.h"
#include "main/client_context.h"
#include "processor/operator/persistent/reader/parquet/parquet_reader.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/storage_manager.h"
#include "storage/storage_utils.h"
#include "storage/table/column.h"
#include "transaction/transaction.h"
using namespace lbug::catalog;
using namespace lbug::common;
using namespace lbug::processor;
using namespace lbug::transaction;
namespace lbug {
namespace storage {
ParquetNodeTable::ParquetNodeTable(const StorageManager* storageManager,
const NodeTableCatalogEntry* nodeTableEntry, MemoryManager* memoryManager)
: ColumnarNodeTableBase{storageManager, nodeTableEntry, memoryManager,
std::make_unique<ParquetNodeTableScanSharedState>()} {
std::string prefix = nodeTableEntry->getStorage();
if (prefix.empty()) {
throw RuntimeException("Parquet file prefix is empty for parquet-backed node table");
}
parquetFilePath = constructStoragePath(prefix, ".parquet");
}
void ParquetNodeTable::initializeScanCoordination(const transaction::Transaction* transaction) {
auto parquetScanSharedState =
static_cast<ParquetNodeTableScanSharedState*>(tableScanSharedState.get());
auto numBatches = getNumBatches(transaction);
parquetScanSharedState->reset(numBatches);
}
void ParquetNodeTable::initScanState(Transaction* transaction, TableScanState& scanState,
[[maybe_unused]] bool resetCachedBoundNodeSelVec) const {
auto& nodeScanState = scanState.cast<NodeTableScanState>();
nodeScanState.source = TableScanSource::COMMITTED;
auto& parquetNodeScanState = static_cast<ParquetNodeTableScanState&>(nodeScanState);
parquetNodeScanState.dataRead = false;
parquetNodeScanState.allData.clear();
parquetNodeScanState.totalRows = 0;
parquetNodeScanState.nextRowToDistribute = 0;
parquetNodeScanState.scanCompleted = false;
if (!parquetNodeScanState.initialized) {
auto context = transaction->getClientContext();
if (!context) {
throw RuntimeException("Invalid client context for parquet scan state initialization");
}
std::vector<bool> columnSkips;
try {
auto resolvedPath = VirtualFileSystem::resolvePath(context, parquetFilePath);
parquetNodeScanState.parquetReader =
std::make_unique<ParquetReader>(resolvedPath, columnSkips, context);
parquetNodeScanState.initialized = true;
} catch (const std::exception& e) {
throw RuntimeException("Failed to initialize parquet reader for file '" +
parquetFilePath + "': " + e.what());
}
}
parquetNodeScanState.nodeGroupIdx = INVALID_NODE_GROUP_IDX;
initParquetScanForRowGroup(transaction, parquetNodeScanState);
}
common::node_group_idx_t ParquetNodeTable::getNumBatches(const Transaction* transaction) const {
auto context = transaction->getClientContext();
if (!context) {
return 1;
}
std::vector<bool> columnSkips;
try {
auto resolvedPath = VirtualFileSystem::resolvePath(context, parquetFilePath);
auto tempReader = std::make_unique<ParquetReader>(resolvedPath, columnSkips, context);
return tempReader->getNumRowsGroups();
} catch (const std::exception& e) {
return 1; }
}
void ParquetNodeTable::initParquetScanForRowGroup(Transaction* transaction,
ParquetNodeTableScanState& scanState) const {
auto context = transaction->getClientContext();
if (!context) {
return;
}
auto vfs = VirtualFileSystem::GetUnsafe(*context);
if (!vfs) {
return;
}
if (!scanState.parquetReader) {
return;
}
if (!scanState.parquetScanState) {
return;
}
std::vector<uint64_t> groupsToRead;
if (scanState.nodeGroupIdx == INVALID_NODE_GROUP_IDX) {
common::node_group_idx_t assignedRowGroup;
if (dynamic_cast<ParquetNodeTableScanSharedState*>(tableScanSharedState.get())
->getNextBatch(assignedRowGroup)) {
scanState.nodeGroupIdx = assignedRowGroup;
groupsToRead.push_back(assignedRowGroup);
} else {
scanState.scanCompleted = true;
scanState.parquetReader->initializeScan(*scanState.parquetScanState, groupsToRead, vfs);
return;
}
} else {
groupsToRead.push_back(scanState.nodeGroupIdx);
}
scanState.parquetReader->initializeScan(*scanState.parquetScanState, groupsToRead, vfs);
}
bool ParquetNodeTable::scanInternal(Transaction* transaction, TableScanState& scanState) {
auto& parquetScanState = static_cast<ParquetNodeTableScanState&>(scanState);
if (parquetScanState.scanCompleted) {
return false;
}
scanState.resetOutVectors();
if (!parquetScanState.dataRead) {
if (!parquetScanState.initialized) {
return false;
}
auto numColumns = parquetScanState.parquetReader->getNumColumns();
if (numColumns == 0) {
throw RuntimeException("Parquet file '" + parquetFilePath + "' has no columns");
}
DataChunk parquetDataChunk(numColumns, scanState.outState);
for (uint32_t i = 0; i < numColumns; ++i) {
const auto& parquetColumnType = parquetScanState.parquetReader->getColumnType(i);
auto columnType = parquetColumnType.copy();
auto vector = std::make_shared<ValueVector>(std::move(columnType),
MemoryManager::Get(*transaction->getClientContext()), scanState.outState);
parquetDataChunk.insert(i, vector);
}
parquetScanState.parquetReader->scan(*parquetScanState.parquetScanState, parquetDataChunk);
auto selSize = parquetDataChunk.state->getSelVector().getSelSize();
if (selSize > 0) {
parquetScanState.allData.resize(selSize);
for (size_t row = 0; row < selSize; ++row) {
parquetScanState.allData[row].resize(
scanState.outputVectors
.size());
auto maxParquetCol = std::min(static_cast<size_t>(numColumns),
static_cast<size_t>(parquetDataChunk.getNumValueVectors()));
for (size_t parquetCol = 0; parquetCol < maxParquetCol; ++parquetCol) {
if (parquetCol >= parquetDataChunk.getNumValueVectors()) {
continue;
}
auto& srcVector = parquetDataChunk.getValueVectorMutable(parquetCol);
std::string parquetColumnName =
parquetScanState.parquetReader->getColumnName(parquetCol);
auto nodeTableEntry = this->nodeTableCatalogEntry;
if (!nodeTableEntry->containsProperty(parquetColumnName)) {
continue;
}
column_id_t parquetColumnID = nodeTableEntry->getColumnID(parquetColumnName);
size_t outputCol = INVALID_COLUMN_ID;
for (size_t outCol = 0; outCol < scanState.columnIDs.size(); ++outCol) {
if (scanState.columnIDs[outCol] == parquetColumnID) {
outputCol = outCol;
break;
}
}
if (outputCol != INVALID_COLUMN_ID &&
outputCol < parquetScanState.allData[row].size()) {
if (row >= srcVector.state->getSelVector().getSelSize()) {
continue;
}
if (srcVector.isNull(row)) {
parquetScanState.allData[row][outputCol] =
std::make_unique<Value>(Value::createNullValue());
} else {
parquetScanState.allData[row][outputCol] =
std::make_unique<Value>(*srcVector.getAsValue(row));
}
}
}
}
parquetScanState.totalRows = selSize;
}
parquetScanState.dataRead = true;
}
if (parquetScanState.nextRowToDistribute >= parquetScanState.totalRows) {
parquetScanState.scanCompleted = true;
return false; }
size_t rowIndex = parquetScanState.nextRowToDistribute++;
if (rowIndex >= parquetScanState.allData.size()) {
parquetScanState.scanCompleted = true;
return false;
}
auto numColumns =
std::min(scanState.outputVectors.size(), parquetScanState.allData[rowIndex].size());
for (size_t col = 0; col < numColumns; ++col) {
if (col >= scanState.outputVectors.size() || !scanState.outputVectors[col]) {
continue;
}
auto& dstVector = *scanState.outputVectors[col];
if (col >= parquetScanState.allData[rowIndex].size() ||
!parquetScanState.allData[rowIndex][col]) {
dstVector.setNull(0, true);
continue;
}
auto& value = *parquetScanState.allData[rowIndex][col];
if (value.isNull()) {
dstVector.setNull(0, true);
} else {
dstVector.copyFromValue(0, value);
}
}
auto tableID = this->getTableID();
auto& nodeID = scanState.nodeIDVector->getValue<nodeID_t>(0);
nodeID.tableID = tableID;
nodeID.offset = rowIndex;
scanState.outState->getSelVectorUnsafe().setSelSize(1); return true;
}
row_idx_t ParquetNodeTable::getTotalRowCount(const Transaction* transaction) const {
auto context = transaction->getClientContext();
if (!context) {
return 0;
}
std::vector<bool> columnSkips;
try {
auto resolvedPath = VirtualFileSystem::resolvePath(context, parquetFilePath);
auto tempReader = std::make_unique<ParquetReader>(resolvedPath, columnSkips, context);
if (!tempReader) {
return 0;
}
auto metadata = tempReader->getMetadata();
return metadata ? metadata->num_rows : 0;
} catch (const std::exception& e) {
return 0;
}
}
} }