lbug 0.17.0

An in-process property graph database management system built for query speed and scalability
#include "processor/operator/ddl/create_index.h"

#include "catalog/catalog.h"
#include "catalog/catalog_entry/index_catalog_entry.h"
#include "common/exception/binder.h"
#include "common/string_utils.h"
#include "processor/execution_context.h"
#include "storage/index/art_index.h"
#include "storage/index/hash_index.h"
#include "storage/storage_manager.h"
#include "storage/table/node_table.h"
#include <format>

using namespace lbug::catalog;
using namespace lbug::common;

namespace lbug {
namespace processor {

static std::string getExistingIndexName(Catalog* catalog, transaction::Transaction* transaction,
    common::table_id_t tableID, common::property_id_t propertyID) {
    for (auto* indexEntry : catalog->getIndexEntries(transaction, tableID)) {
        if (indexEntry->containsPropertyID(propertyID)) {
            return indexEntry->getIndexName();
        }
    }
    return "";
}

void CreateIndex::executeInternal(ExecutionContext* context) {
    auto clientContext = context->clientContext;
    auto catalog = Catalog::Get(*clientContext);
    auto transaction = transaction::Transaction::Get(*clientContext);
    auto memoryManager = storage::MemoryManager::Get(*clientContext);
    auto storageManager = storage::StorageManager::Get(*clientContext);
    const auto indexNameExists = catalog->containsIndex(transaction, info.tableID, info.indexName);
    const auto indexedPropertyExists =
        catalog->containsIndex(transaction, info.tableID, info.propertyID);
    if (indexNameExists || indexedPropertyExists) {
        const auto existingIndexName = indexNameExists ? info.indexName :
                                                         getExistingIndexName(catalog, transaction,
                                                             info.tableID, info.propertyID);
        switch (info.onConflict) {
        case ConflictAction::ON_CONFLICT_DO_NOTHING: {
            appendMessage(std::format("Index {} already exists.", existingIndexName),
                memoryManager);
            return;
        }
        case ConflictAction::ON_CONFLICT_THROW: {
            throw BinderException(existingIndexName + " already exists in catalog.");
        }
        default:
            UNREACHABLE_CODE;
        }
    }
    auto* table = storageManager->getTable(info.tableID)->ptrCast<storage::NodeTable>();
    const auto storageIndexNameExists = table->getIndex(info.indexName).has_value();
    auto storagePKIndexExists = table->tryGetPrimaryKeyIndex() != nullptr;
    const auto canCreatePhysicalIndex = !storagePKIndexExists && !storageIndexNameExists;
    auto indexTypeOptional = storageManager->getIndexType(info.indexType);
    if (!indexTypeOptional.has_value()) {
        throw BinderException(std::format("Index type {} does not exist.", info.indexType));
    }
    const auto& indexType = indexTypeOptional.value().get();
    if (storagePKIndexExists &&
        table->tryGetPrimaryKeyIndex()->getIndexInfo().indexType != indexType.typeName) {
        throw BinderException(std::format(
            "Cannot create {} index because the table already has a {} primary-key index.",
            indexType.typeName, table->tryGetPrimaryKeyIndex()->getIndexInfo().indexType));
    }
    if (canCreatePhysicalIndex) {
        storage::IndexInfo indexInfo{info.indexName, indexType.typeName, info.tableID,
            {info.columnID}, {info.keyDataType},
            indexType.constraintType == storage::IndexConstraintType::PRIMARY,
            indexType.definitionType == storage::IndexDefinitionType::BUILTIN};
        std::unique_ptr<storage::Index> index;
        if (StringUtils::caseInsensitiveEquals(indexType.typeName,
                storage::PrimaryKeyIndex::getIndexType().typeName)) {
            index = storage::PrimaryKeyIndex::createNewIndex(std::move(indexInfo),
                storageManager->isInMemory(), *memoryManager,
                *storageManager->getDataFH()->getPageManager(), &storageManager->getShadowFile());
        } else if (StringUtils::caseInsensitiveEquals(indexType.typeName,
                       storage::ArtPrimaryKeyIndex::getIndexType().typeName)) {
            index = storage::ArtPrimaryKeyIndex::createNewIndex(std::move(indexInfo));
        } else {
            throw BinderException(
                std::format("Index type {} is not supported by CREATE INDEX.", indexType.typeName));
        }
        table->buildIndexAndAdd(clientContext, std::move(index));
        storagePKIndexExists = true;
    }
    if (storagePKIndexExists) {
        catalog->createIndex(transaction,
            std::make_unique<IndexCatalogEntry>(indexType.typeName, info.tableID, info.indexName,
                std::vector<property_id_t>{info.propertyID},
                std::make_unique<BuiltinIndexAuxInfo>()));
        appendMessage(std::format("Index {} has been created.", info.indexName), memoryManager);
        return;
    }
    if (storageIndexNameExists) {
        switch (info.onConflict) {
        case ConflictAction::ON_CONFLICT_DO_NOTHING: {
            appendMessage(std::format("Index {} already exists.", info.indexName), memoryManager);
            return;
        }
        case ConflictAction::ON_CONFLICT_THROW: {
            throw BinderException(info.indexName + " already exists in catalog.");
        }
        default:
            UNREACHABLE_CODE;
        }
    }
}

} // namespace processor
} // namespace lbug