#pragma once
#include <memory>
#include "common/cast.h"
#include "common/copy_constructors.h"
#include "common/in_mem_overflow_buffer.h"
#include "processor/operator/aggregate/aggregate_hash_table.h"
#include "processor/operator/aggregate/base_aggregate.h"
namespace lbug {
namespace processor {
class SimpleAggregateSharedState final : public BaseAggregateSharedState {
friend class SimpleAggregate;
public:
explicit SimpleAggregateSharedState(main::ClientContext* clientContext,
const std::vector<function::AggregateFunction>& aggregateFunctions,
const std::vector<AggregateInfo>& aggInfos);
DELETE_COPY_AND_MOVE(SimpleAggregateSharedState);
void combineAggregateStates(
const std::vector<std::unique_ptr<function::AggregateState>>& localAggregateStates,
common::InMemOverflowBuffer&& localOverflowBuffer);
void finalizeAggregateStates();
std::pair<uint64_t, uint64_t> getNextRangeToRead() override;
function::AggregateState* getAggregateState(uint64_t idx) {
return globalAggregateStates[idx].get();
}
void finalizePartitions(storage::MemoryManager* memoryManager,
const std::vector<AggregateInfo>& aggInfos);
bool isReadyForFinalization() const { return readyForFinalization; }
protected:
struct Partition {
struct DistinctData {
std::unique_ptr<AggregateHashTable> hashTable;
std::unique_ptr<HashTableQueue> queue;
std::unique_ptr<function::AggregateState> state;
};
std::mutex mtx;
std::vector<DistinctData> distinctTables;
std::atomic<bool> finalized = false;
};
class SimpleAggregatePartitioningData : public AggregatePartitioningData {
public:
SimpleAggregatePartitioningData(SimpleAggregateSharedState* sharedState, size_t functionIdx)
: sharedState{sharedState}, functionIdx{functionIdx} {}
void appendTuples(const FactorizedTable& factorizedTable,
ft_col_offset_t hashOffset) override;
void appendDistinctTuple(size_t, std::span<uint8_t>, common::hash_t) override;
void appendOverflow(common::InMemOverflowBuffer&& overflowBuffer) override;
private:
SimpleAggregateSharedState* sharedState;
size_t functionIdx;
};
private:
bool hasDistinct;
std::vector<Partition> globalPartitions;
std::vector<SimpleAggregatePartitioningData> partitioningData;
common::InMemOverflowBuffer aggregateOverflowBuffer;
std::vector<std::unique_ptr<function::AggregateState>> globalAggregateStates;
};
struct SimpleAggregatePrintInfo final : OPPrintInfo {
binder::expression_vector aggregates;
explicit SimpleAggregatePrintInfo(binder::expression_vector aggregates)
: aggregates{std::move(aggregates)} {}
std::string toString() const override;
std::unique_ptr<OPPrintInfo> copy() const override {
return std::unique_ptr<SimpleAggregatePrintInfo>(new SimpleAggregatePrintInfo(*this));
}
private:
SimpleAggregatePrintInfo(const SimpleAggregatePrintInfo& other)
: OPPrintInfo{other}, aggregates{other.aggregates} {}
};
class SimpleAggregate final : public BaseAggregate {
public:
SimpleAggregate(std::shared_ptr<BaseAggregateSharedState> sharedState,
std::vector<function::AggregateFunction> aggregateFunctions,
std::vector<AggregateInfo> aggInfos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
std::unique_ptr<OPPrintInfo> printInfo)
: BaseAggregate{std::move(sharedState), std::move(aggregateFunctions), std::move(aggInfos),
std::move(child), id, std::move(printInfo)} {}
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
void executeInternal(ExecutionContext* context) override;
std::unique_ptr<PhysicalOperator> copy() override {
return make_unique<SimpleAggregate>(sharedState, copyVector(aggregateFunctions),
copyVector(aggInfos), children[0]->copy(), id, printInfo->copy());
}
private:
void computeAggregate(function::AggregateFunction* function, AggregateInput* input,
function::AggregateState* state, common::InMemOverflowBuffer& overflowBuffer);
SimpleAggregateSharedState& getSharedState() {
return common::dynamic_cast_checked<SimpleAggregateSharedState&>(*sharedState.get());
}
private:
std::vector<std::unique_ptr<function::AggregateState>> localAggregateStates;
std::vector<std::unique_ptr<PartitioningAggregateHashTable>> distinctHashTables;
};
class SimpleAggregateFinalize final : public Sink {
static constexpr PhysicalOperatorType type_ = PhysicalOperatorType::AGGREGATE_FINALIZE;
public:
SimpleAggregateFinalize(std::shared_ptr<SimpleAggregateSharedState> sharedState,
std::vector<AggregateInfo> aggInfos, physical_op_id id,
std::unique_ptr<OPPrintInfo> printInfo)
: Sink{type_, id, std::move(printInfo)}, sharedState{std::move(sharedState)},
aggInfos{std::move(aggInfos)} {}
bool isSource() const override { return true; }
void executeInternal(ExecutionContext* context) override;
void finalizeInternal(ExecutionContext* context) override;
std::unique_ptr<PhysicalOperator> copy() override {
return std::make_unique<SimpleAggregateFinalize>(sharedState, copyVector(aggInfos), id,
printInfo->copy());
}
private:
std::shared_ptr<SimpleAggregateSharedState> sharedState;
std::vector<AggregateInfo> aggInfos;
std::vector<std::unique_ptr<function::AggregateState>> globalAggregateStates;
};
} }