lbug 0.16.1

An in-process property graph database management system built for query speed and scalability
Documentation
#include "main/connection.h"

#include <utility>

#include "common/random_engine.h"
#include "transaction/transaction.h"
#include "transaction/transaction_manager.h"

using namespace lbug::parser;
using namespace lbug::binder;
using namespace lbug::common;
using namespace lbug::planner;
using namespace lbug::processor;
using namespace lbug::transaction;

namespace lbug {
namespace main {

Connection::Connection(Database* database) {
    DASSERT(database != nullptr);
    this->database = database;
    this->dbLifeCycleManager = database->dbLifeCycleManager;
    clientContext = std::make_unique<ClientContext>(database);
}

Connection::~Connection() {
    clientContext->waitForNoActiveQuery();
    // Roll back any active transaction so it is removed from TransactionManager. Otherwise
    // Database::~Database() checkpoint can time out waiting for transactions to leave.
    // We do this here (before destroying ClientContext) while Database and Connection are still
    // valid; ~ClientContext then skips rollback to avoid double-rollback or use-after-free.
    if (Transaction* tx = Transaction::Get(*clientContext)) {
        database->getTransactionManager()->rollback(*clientContext, tx);
    }
    clientContext->preventTransactionRollbackOnDestruction = true;
}

void Connection::setMaxNumThreadForExec(uint64_t numThreads) {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    clientContext->setMaxNumThreadForExec(numThreads);
}

uint64_t Connection::getMaxNumThreadForExec() {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    return clientContext->getMaxNumThreadForExec();
}

std::unique_ptr<PreparedStatement> Connection::prepare(std::string_view query) {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    return clientContext->prepareWithParams(query);
}

std::unique_ptr<PreparedStatement> Connection::prepareWithParams(std::string_view query,
    std::unordered_map<std::string, std::unique_ptr<common::Value>> inputParams) {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    return clientContext->prepareWithParams(query, std::move(inputParams));
}

std::unique_ptr<QueryResult> Connection::query(std::string_view queryStatement) {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    auto queryResult = clientContext->query(queryStatement);
    queryResult->setDBLifeCycleManager(dbLifeCycleManager);
    return queryResult;
}

std::unique_ptr<QueryResult> Connection::queryAsArrow(std::string_view query, int64_t chunkSize) {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    auto queryResult = clientContext->query(query, std::nullopt,
        {QueryResultType::ARROW, ArrowResultConfig{chunkSize}});
    queryResult->setDBLifeCycleManager(dbLifeCycleManager);
    return queryResult;
}

std::unique_ptr<QueryResult> Connection::queryWithID(std::string_view queryStatement,
    uint64_t queryID) {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    auto queryResult = clientContext->query(queryStatement, queryID);
    queryResult->setDBLifeCycleManager(dbLifeCycleManager);
    return queryResult;
}

void Connection::interrupt() {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    clientContext->interrupt();
}

void Connection::setQueryTimeOut(uint64_t timeoutInMS) {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    clientContext->setQueryTimeOut(timeoutInMS);
}

std::unique_ptr<QueryResult> Connection::executeWithParams(PreparedStatement* preparedStatement,
    std::unordered_map<std::string, std::unique_ptr<Value>> inputParams) {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    auto queryResult = clientContext->executeWithParams(preparedStatement, std::move(inputParams));
    queryResult->setDBLifeCycleManager(dbLifeCycleManager);
    return queryResult;
}

std::unique_ptr<QueryResult> Connection::executeWithParamsWithID(
    PreparedStatement* preparedStatement,
    std::unordered_map<std::string, std::unique_ptr<Value>> inputParams, uint64_t queryID) {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    auto queryResult =
        clientContext->executeWithParams(preparedStatement, std::move(inputParams), queryID);
    queryResult->setDBLifeCycleManager(dbLifeCycleManager);
    return queryResult;
}

void Connection::addScalarFunction(std::string name, function::function_set definitions) {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    clientContext->addScalarFunction(name, std::move(definitions));
}

void Connection::removeScalarFunction(std::string name) {
    dbLifeCycleManager->checkDatabaseClosedOrThrow();
    clientContext->removeScalarFunction(name);
}

} // namespace main
} // namespace lbug