lbug 0.16.1

An in-process property graph database management system built for query speed and scalability
Documentation
#include "common/exception/runtime.h"
#include "common/file_system/virtual_file_system.h"
#include "extension/mapper_extension.h"
#include "main/client_context.h"
#include "planner/operator/simple/logical_attach_database.h"
#include "planner/operator/simple/logical_detach_database.h"
#include "planner/operator/simple/logical_export_db.h"
#include "planner/operator/simple/logical_extension.h"
#include "planner/operator/simple/logical_graph.h"
#include "planner/operator/simple/logical_import_db.h"
#include "planner/operator/simple/logical_use_database.h"
#include "processor/operator/persistent/copy_to.h"
#include "processor/operator/simple/attach_database.h"
#include "processor/operator/simple/detach_database.h"
#include "processor/operator/simple/export_db.h"
#include "processor/operator/simple/import_db.h"
#include "processor/operator/simple/install_extension.h"
#include "processor/operator/simple/load_extension.h"
#include "processor/operator/simple/uninstall_extension.h"
#include "processor/operator/simple/use_database.h"
#include "processor/operator/simple/use_graph.h"
#include "processor/plan_mapper.h"
#include "processor/result/factorized_table_util.h"
#include "storage/buffer_manager/memory_manager.h"
#include <format>

namespace lbug {
namespace processor {

using namespace lbug::planner;
using namespace lbug::common;
using namespace lbug::storage;
using namespace lbug::extension;

std::unique_ptr<PhysicalOperator> PlanMapper::mapUseDatabase(
    const LogicalOperator* logicalOperator) {
    auto useDatabase = logicalOperator->constPtrCast<LogicalUseDatabase>();
    auto printInfo = std::make_unique<UseDatabasePrintInfo>(useDatabase->getDBName());
    auto messageTable =
        FactorizedTableUtils::getSingleStringColumnFTable(MemoryManager::Get(*clientContext));
    return std::make_unique<UseDatabase>(useDatabase->getDBName(), std::move(messageTable),
        getOperatorID(), std::move(printInfo));
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapAttachDatabase(
    const LogicalOperator* logicalOperator) {
    auto attachDatabase = logicalOperator->constPtrCast<LogicalAttachDatabase>();
    auto info = attachDatabase->getAttachInfo();
    auto printInfo = std::make_unique<AttachDatabasePrintInfo>(info.dbAlias, info.dbPath);
    auto messageTable =
        FactorizedTableUtils::getSingleStringColumnFTable(MemoryManager::Get(*clientContext));
    return std::make_unique<AttachDatabase>(std::move(info), std::move(messageTable),
        getOperatorID(), std::move(printInfo));
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapDetachDatabase(
    const LogicalOperator* logicalOperator) {
    auto detachDatabase = logicalOperator->constPtrCast<LogicalDetachDatabase>();
    auto printInfo = std::make_unique<OPPrintInfo>();
    auto messageTable =
        FactorizedTableUtils::getSingleStringColumnFTable(MemoryManager::Get(*clientContext));
    return std::make_unique<DetachDatabase>(detachDatabase->getDBName(), std::move(messageTable),
        getOperatorID(), std::move(printInfo));
}

static void exportDatabaseCollectParallelFlags(const std::unique_ptr<DummySimpleSink>& sink) {
    auto exportDB = sink->getChild(0)->ptrCast<ExportDB>();
    for (auto i = 1u; i < sink->getNumChildren(); ++i) {
        const auto& tableFuncCall = sink->getChild(i);
        ASSERT(tableFuncCall->getChild(0)->getOperatorType() == PhysicalOperatorType::COPY_TO);
        const auto& [file, parallelFlag] =
            tableFuncCall->getChild(0)->ptrCast<CopyTo>()->getParallelFlag();
        exportDB->addToParallelReaderMap(file, parallelFlag);
    }
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapExportDatabase(
    const LogicalOperator* logicalOperator) {
    auto exportDatabase = logicalOperator->constPtrCast<LogicalExportDatabase>();
    auto fs = VirtualFileSystem::GetUnsafe(*clientContext);
    auto boundFileInfo = exportDatabase->getBoundFileInfo();
    DASSERT(boundFileInfo->filePaths.size() == 1);
    auto filePath = boundFileInfo->filePaths[0];
    if (fs->fileOrPathExists(filePath, clientContext)) {
        throw RuntimeException(std::format("Directory {} already exists.", filePath));
    }
    fs->createDir(filePath);
    auto printInfo = std::make_unique<ExportDBPrintInfo>(filePath, boundFileInfo->options);
    auto messageTable =
        FactorizedTableUtils::getSingleStringColumnFTable(MemoryManager::Get(*clientContext));
    auto exportDB = std::make_unique<ExportDB>(boundFileInfo->copy(),
        exportDatabase->isSchemaOnly(), messageTable, getOperatorID(), std::move(printInfo));
    auto sink = std::make_unique<DummySimpleSink>(messageTable, getOperatorID());
    sink->addChild(std::move(exportDB));
    for (auto child : exportDatabase->getChildren()) {
        sink->addChild(mapOperator(child.get()));
    }
    exportDatabaseCollectParallelFlags(sink);
    return sink;
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapImportDatabase(
    const LogicalOperator* logicalOperator) {
    auto importDatabase = logicalOperator->constPtrCast<LogicalImportDatabase>();
    auto printInfo = std::make_unique<OPPrintInfo>();
    auto messageTable =
        FactorizedTableUtils::getSingleStringColumnFTable(MemoryManager::Get(*clientContext));
    return std::make_unique<ImportDB>(importDatabase->getQuery(), importDatabase->getIndexQuery(),
        std::move(messageTable), getOperatorID(), std::move(printInfo));
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapExtension(const LogicalOperator* logicalOperator) {
    auto logicalExtension = logicalOperator->constPtrCast<LogicalExtension>();
    auto& auxInfo = logicalExtension->getAuxInfo();
    auto path = auxInfo.path;
    auto messageTable =
        FactorizedTableUtils::getSingleStringColumnFTable(MemoryManager::Get(*clientContext));
    switch (auxInfo.action) {
    case ExtensionAction::INSTALL: {
        auto installAuxInfo = auxInfo.contCast<InstallExtensionAuxInfo>();
        InstallExtensionInfo info{path, installAuxInfo.extensionRepo, installAuxInfo.forceInstall};
        auto printInfo = std::make_unique<InstallExtensionPrintInfo>(path);
        return std::make_unique<InstallExtension>(std::move(info), std::move(messageTable),
            getOperatorID(), std::move(printInfo));
    }
    case ExtensionAction::UNINSTALL: {
        auto printInfo = std::make_unique<UninstallExtensionPrintInfo>(path);
        return std::make_unique<UninstallExtension>(path, std::move(messageTable), getOperatorID(),
            std::move(printInfo));
    }
    case ExtensionAction::LOAD: {
        auto printInfo = std::make_unique<LoadExtensionPrintInfo>(path);
        return std::make_unique<LoadExtension>(path, std::move(messageTable), getOperatorID(),
            std::move(printInfo));
    }
    default:
        UNREACHABLE_CODE;
    }
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapExtensionClause(
    const LogicalOperator* logicalOperator) {
    for (auto& mapperExtension : mapperExtensions) {
        auto physicalOP = mapperExtension->map(logicalOperator, clientContext, getOperatorID());
        if (physicalOP) {
            return physicalOP;
        }
    }
    UNREACHABLE_CODE;
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapCreateGraph(
    const LogicalOperator* logicalOperator) {
    auto createGraph = logicalOperator->constPtrCast<LogicalCreateGraph>();
    auto printInfo = std::make_unique<OPPrintInfo>();
    auto messageTable =
        FactorizedTableUtils::getSingleStringColumnFTable(MemoryManager::Get(*clientContext));
    return std::make_unique<CreateGraph>(createGraph->getGraphName(), createGraph->isAnyGraph(),
        std::move(messageTable), getOperatorID(), std::move(printInfo));
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapUseGraph(const LogicalOperator* logicalOperator) {
    auto useGraph = logicalOperator->constPtrCast<LogicalUseGraph>();
    auto printInfo = std::make_unique<UseGraphPrintInfo>(useGraph->getGraphName());
    auto messageTable =
        FactorizedTableUtils::getSingleStringColumnFTable(MemoryManager::Get(*clientContext));
    return std::make_unique<UseGraph>(useGraph->getGraphName(), std::move(messageTable),
        getOperatorID(), std::move(printInfo));
}

} // namespace processor
} // namespace lbug