#include "processor/operator/physical_operator.h"
#include "common/exception/interrupt.h"
#include "common/exception/runtime.h"
#include "common/task_system/progress_bar.h"
#include "main/client_context.h"
#include "processor/execution_context.h"
using namespace lbug::common;
namespace lbug {
namespace processor {
std::string PhysicalOperatorUtils::operatorTypeToString(PhysicalOperatorType operatorType) {
switch (operatorType) {
case PhysicalOperatorType::ALTER:
return "ALTER";
case PhysicalOperatorType::AGGREGATE:
return "AGGREGATE";
case PhysicalOperatorType::AGGREGATE_FINALIZE:
return "AGGREGATE_FINALIZE";
case PhysicalOperatorType::AGGREGATE_SCAN:
return "AGGREGATE_SCAN";
case PhysicalOperatorType::ATTACH_DATABASE:
return "ATTACH_DATABASE";
case PhysicalOperatorType::BATCH_INSERT:
return "BATCH_INSERT";
case PhysicalOperatorType::COPY_TO:
return "COPY_TO";
case PhysicalOperatorType::COUNT_REL_TABLE:
return "COUNT_REL_TABLE";
case PhysicalOperatorType::CREATE_MACRO:
return "CREATE_MACRO";
case PhysicalOperatorType::CREATE_SEQUENCE:
return "CREATE_SEQUENCE";
case PhysicalOperatorType::CREATE_TABLE:
return "CREATE_TABLE";
case PhysicalOperatorType::CREATE_TYPE:
return "CREATE_TYPE";
case PhysicalOperatorType::CROSS_PRODUCT:
return "CROSS_PRODUCT";
case PhysicalOperatorType::DETACH_DATABASE:
return "DETACH_DATABASE";
case PhysicalOperatorType::DELETE_:
return "DELETE";
case PhysicalOperatorType::DROP:
return "DROP";
case PhysicalOperatorType::DUMMY_SINK:
return "DUMMY_SINK";
case PhysicalOperatorType::DUMMY_SIMPLE_SINK:
return "DUMMY_SIMPLE_SINK";
case PhysicalOperatorType::EMPTY_RESULT:
return "EMPTY_RESULT";
case PhysicalOperatorType::EXPORT_DATABASE:
return "EXPORT_DATABASE";
case PhysicalOperatorType::EXTENSION_CLAUSE:
return "EXTENSION_CLAUSE";
case PhysicalOperatorType::FILTER:
return "FILTER";
case PhysicalOperatorType::FLATTEN:
return "FLATTEN";
case PhysicalOperatorType::HASH_JOIN_BUILD:
return "HASH_JOIN_BUILD";
case PhysicalOperatorType::HASH_JOIN_PROBE:
return "HASH_JOIN_PROBE";
case PhysicalOperatorType::IMPORT_DATABASE:
return "IMPORT_DATABASE";
case PhysicalOperatorType::INDEX_LOOKUP:
return "INDEX_LOOKUP";
case PhysicalOperatorType::INSERT:
return "INSERT";
case PhysicalOperatorType::INTERSECT_BUILD:
return "INTERSECT_BUILD";
case PhysicalOperatorType::INTERSECT:
return "INTERSECT";
case PhysicalOperatorType::INSTALL_EXTENSION:
return "INSTALL_EXTENSION";
case PhysicalOperatorType::LIMIT:
return "LIMIT";
case PhysicalOperatorType::LOAD_EXTENSION:
return "LOAD_EXTENSION";
case PhysicalOperatorType::MERGE:
return "MERGE";
case PhysicalOperatorType::MULTIPLICITY_REDUCER:
return "MULTIPLICITY_REDUCER";
case PhysicalOperatorType::PARTITIONER:
return "PARTITIONER";
case PhysicalOperatorType::PATH_PROPERTY_PROBE:
return "PATH_PROPERTY_PROBE";
case PhysicalOperatorType::PRIMARY_KEY_SCAN_NODE_TABLE:
return "PRIMARY_KEY_SCAN_NODE_TABLE";
case PhysicalOperatorType::PROJECTION:
return "PROJECTION";
case PhysicalOperatorType::PROFILE:
return "PROFILE";
case PhysicalOperatorType::RECURSIVE_EXTEND:
return "RECURSIVE_EXTEND";
case PhysicalOperatorType::RESULT_COLLECTOR:
return "RESULT_COLLECTOR";
case PhysicalOperatorType::SCAN_NODE_TABLE:
return "SCAN_NODE_TABLE";
case PhysicalOperatorType::SCAN_REL_TABLE:
return "SCAN_REL_TABLE";
case PhysicalOperatorType::SEMI_MASKER:
return "SEMI_MASKER";
case PhysicalOperatorType::SET_PROPERTY:
return "SET_PROPERTY";
case PhysicalOperatorType::SKIP:
return "SKIP";
case PhysicalOperatorType::STANDALONE_CALL:
return "STANDALONE_CALL";
case PhysicalOperatorType::TABLE_FUNCTION_CALL:
return "TABLE_FUNCTION_CALL";
case PhysicalOperatorType::TOP_K:
return "TOP_K";
case PhysicalOperatorType::TOP_K_SCAN:
return "TOP_K_SCAN";
case PhysicalOperatorType::TRANSACTION:
return "TRANSACTION";
case PhysicalOperatorType::ORDER_BY:
return "ORDER_BY";
case PhysicalOperatorType::ORDER_BY_MERGE:
return "ORDER_BY_MERGE";
case PhysicalOperatorType::ORDER_BY_SCAN:
return "ORDER_BY_SCAN";
case PhysicalOperatorType::UNION_ALL_SCAN:
return "UNION_ALL_SCAN";
case PhysicalOperatorType::UNWIND:
return "UNWIND";
case PhysicalOperatorType::UNWIND_DEDUP:
return "UNWIND_DEDUP";
case PhysicalOperatorType::USE_DATABASE:
return "USE_DATABASE";
case PhysicalOperatorType::CREATE_GRAPH:
return "CREATE_GRAPH";
case PhysicalOperatorType::USE_GRAPH:
return "USE_GRAPH";
case PhysicalOperatorType::UNINSTALL_EXTENSION:
return "UNINSTALL_EXTENSION";
default:
throw RuntimeException("Unknown physical operator type.");
}
}
std::string PhysicalOperatorUtils::operatorToString(const PhysicalOperator* physicalOp) {
return operatorTypeToString(physicalOp->getOperatorType()) + "[" +
std::to_string(physicalOp->getOperatorID()) + "]";
}
PhysicalOperator::PhysicalOperator(PhysicalOperatorType operatorType,
std::unique_ptr<PhysicalOperator> child, uint32_t id, std::unique_ptr<OPPrintInfo> printInfo)
: PhysicalOperator{operatorType, id, std::move(printInfo)} {
children.push_back(std::move(child));
}
PhysicalOperator::PhysicalOperator(PhysicalOperatorType operatorType,
std::unique_ptr<PhysicalOperator> left, std::unique_ptr<PhysicalOperator> right, uint32_t id,
std::unique_ptr<OPPrintInfo> printInfo)
: PhysicalOperator{operatorType, id, std::move(printInfo)} {
children.push_back(std::move(left));
children.push_back(std::move(right));
}
PhysicalOperator::PhysicalOperator(PhysicalOperatorType operatorType, physical_op_vector_t children,
uint32_t id, std::unique_ptr<OPPrintInfo> printInfo)
: PhysicalOperator{operatorType, id, std::move(printInfo)} {
for (auto& child : children) {
this->children.push_back(std::move(child));
}
}
std::unique_ptr<PhysicalOperator> PhysicalOperator::moveUnaryChild() {
DASSERT(children.size() == 1);
auto result = std::move(children[0]);
children.clear();
return result;
}
void PhysicalOperator::initGlobalState(ExecutionContext* context) {
if (!isSource()) {
children[0]->initGlobalState(context);
}
initGlobalStateInternal(context);
}
void PhysicalOperator::initLocalState(ResultSet* resultSet_, ExecutionContext* context) {
if (!isSource()) {
children[0]->initLocalState(resultSet_, context);
}
resultSet = resultSet_;
registerProfilingMetrics(context->profiler);
initLocalStateInternal(resultSet_, context);
}
bool PhysicalOperator::getNextTuple(ExecutionContext* context) {
if (context->clientContext->interrupted()) {
throw InterruptException{};
}
#ifdef __SINGLE_THREADED__
if (context->clientContext->hasTimeout()) {
if (context->clientContext->getTimeoutRemainingInMS() == 0) {
throw InterruptException{};
}
}
#endif
metrics->executionTime.start();
auto result = getNextTuplesInternal(context);
ProgressBar::Get(*context->clientContext)
->updateProgress(context->queryID, getProgress(context));
metrics->executionTime.stop();
return result;
}
void PhysicalOperator::finalize(ExecutionContext* context) {
if (!isSource()) {
children[0]->finalize(context);
}
finalizeInternal(context);
}
void PhysicalOperator::registerProfilingMetrics(Profiler* profiler) {
auto executionTime = profiler->registerTimeMetric(getTimeMetricKey());
auto numOutputTuple = profiler->registerNumericMetric(getNumTupleMetricKey());
metrics = std::make_unique<OperatorMetrics>(*executionTime, *numOutputTuple);
}
double PhysicalOperator::getExecutionTime(Profiler& profiler) const {
auto executionTime = profiler.sumAllTimeMetricsWithKey(getTimeMetricKey());
if (!isSource()) {
executionTime -= profiler.sumAllTimeMetricsWithKey(children[0]->getTimeMetricKey());
}
return executionTime;
}
uint64_t PhysicalOperator::getNumOutputTuples(Profiler& profiler) const {
return profiler.sumAllNumericMetricsWithKey(getNumTupleMetricKey());
}
std::unordered_map<std::string, std::string> PhysicalOperator::getProfilerKeyValAttributes(
Profiler& profiler) const {
std::unordered_map<std::string, std::string> result;
result.insert({"ExecutionTime", std::to_string(getExecutionTime(profiler))});
result.insert({"NumOutputTuples", std::to_string(getNumOutputTuples(profiler))});
return result;
}
std::vector<std::string> PhysicalOperator::getProfilerAttributes(Profiler& profiler) const {
std::vector<std::string> result;
for (auto& [key, val] : getProfilerKeyValAttributes(profiler)) {
result.emplace_back(key + ": " + std::move(val));
}
return result;
}
double PhysicalOperator::getProgress(ExecutionContext* ) const {
return 0;
}
} }