#pragma once
#include <atomic>
#include <mutex>
#include "compute.h"
#include "density_state.h"
#include "gds_object_manager.h"
namespace lbug {
namespace processor {
struct ExecutionContext;
}
namespace function {
using iteration_t = uint16_t;
static constexpr iteration_t FRONTIER_UNVISITED = UINT16_MAX;
static constexpr iteration_t FRONTIER_INITIAL_VISITED = 0;
class LBUG_API Frontier {
public:
virtual ~Frontier() = default;
virtual void pinTableID(common::table_id_t tableID) = 0;
virtual void addNode(common::nodeID_t nodeID, iteration_t iter) = 0;
virtual void addNode(common::offset_t offset, iteration_t iter) = 0;
virtual void addNodes(const std::vector<common::nodeID_t>& nodeIDs, iteration_t iter) = 0;
virtual iteration_t getIteration(common::offset_t offset) const = 0;
template<class TARGET>
TARGET& cast() {
return common::dynamic_cast_checked<TARGET&>(*this);
}
};
class LBUG_API SparseFrontier : public Frontier {
friend class SparseFrontierReference;
friend class SPFrontierPair;
friend class DenseSparseDynamicFrontierPair;
public:
explicit SparseFrontier(const common::table_id_map_t<common::offset_t>& nodeMaxOffsetMap)
: sparseObjects{nodeMaxOffsetMap} {}
void pinTableID(common::table_id_t tableID) override;
void addNode(common::nodeID_t nodeID, iteration_t iter) override;
void addNode(common::offset_t offset, iteration_t iter) override;
void addNodes(const std::vector<common::nodeID_t>& nodeIDs, iteration_t iter) override;
iteration_t getIteration(common::offset_t offset) const override;
uint64_t size() const { return sparseObjects.size(); }
const std::unordered_map<common::offset_t, iteration_t>& getCurrentData() const {
return *curData;
}
private:
GDSSpareObjectManager<iteration_t> sparseObjects;
std::unordered_map<common::offset_t, iteration_t>* curData = nullptr;
};
class SparseFrontierReference : public Frontier {
public:
explicit SparseFrontierReference(SparseFrontier& frontier)
: sparseObjects{frontier.sparseObjects} {}
void pinTableID(common::table_id_t tableID) override;
void addNode(common::nodeID_t nodeID, iteration_t iter) override;
void addNode(common::offset_t offset, iteration_t iter) override;
void addNodes(const std::vector<common::nodeID_t>& nodeIDs, iteration_t iter) override;
iteration_t getIteration(common::offset_t offset) const override;
const std::unordered_map<common::offset_t, iteration_t>& getCurrentData() const {
return *curData;
}
private:
GDSSpareObjectManager<iteration_t>& sparseObjects;
std::unordered_map<common::offset_t, iteration_t>* curData = nullptr;
};
class LBUG_API DenseFrontier : public Frontier {
friend class SparseFrontier;
friend class DenseFrontierReference;
friend class SPFrontierPair;
friend class DenseSparseDynamicFrontierPair;
public:
explicit DenseFrontier(const common::table_id_map_t<common::offset_t>& nodeMaxOffsetMap)
: nodeMaxOffsetMap{nodeMaxOffsetMap} {}
DenseFrontier(const DenseFrontier& other) = delete;
DenseFrontier(const DenseFrontier&& other) = delete;
void init(processor::ExecutionContext* context, graph::Graph* graph, iteration_t val);
void resetValue(processor::ExecutionContext* context, graph::Graph* graph, iteration_t val);
void pinTableID(common::table_id_t tableID) override;
void addNode(common::nodeID_t nodeID, iteration_t iter) override;
void addNode(common::offset_t offset, iteration_t iter) override;
void addNodes(const std::vector<common::nodeID_t>& nodeIDs, iteration_t iter) override;
iteration_t getIteration(common::offset_t offset) const override;
static std::unique_ptr<DenseFrontier> getUninitializedFrontier(
processor::ExecutionContext* context, graph::Graph* graph);
static std::unique_ptr<DenseFrontier> getUnvisitedFrontier(processor::ExecutionContext* context,
graph::Graph* graph);
static std::unique_ptr<DenseFrontier> getVisitedFrontier(processor::ExecutionContext* context,
graph::Graph* graph);
static std::unique_ptr<DenseFrontier> getVisitedFrontier(processor::ExecutionContext* context,
graph::Graph* graph, common::NodeOffsetMaskMap* maskMap);
private:
common::table_id_map_t<common::offset_t> nodeMaxOffsetMap;
GDSDenseObjectManager<std::atomic<iteration_t>> denseObjects;
std::atomic<iteration_t>* curData = nullptr;
};
class DenseFrontierReference : public Frontier {
friend class SPFrontierPair;
public:
explicit DenseFrontierReference(const DenseFrontier& denseFrontier)
: denseObjects{denseFrontier.denseObjects} {}
void pinTableID(common::table_id_t tableID) override;
void addNode(common::nodeID_t nodeID, iteration_t iter) override;
void addNode(common::offset_t offset, iteration_t iter) override;
void addNodes(const std::vector<common::nodeID_t>& nodeIDs, iteration_t iter) override;
iteration_t getIteration(common::offset_t offset) const override;
private:
const GDSDenseObjectManager<std::atomic<iteration_t>>& denseObjects;
std::atomic<iteration_t>* curData = nullptr;
};
class LBUG_API FrontierPair {
public:
FrontierPair() { hasActiveNodesForNextIter_.store(false); }
virtual ~FrontierPair() = default;
void resetCurrentIter() { curIter = 0; }
iteration_t getCurrentIter() const { return curIter; }
void setActiveNodesForNextIter() { hasActiveNodesForNextIter_.store(true); }
bool continueNextIter(uint16_t maxIter) {
return hasActiveNodesForNextIter_.load(std::memory_order_relaxed) &&
getCurrentIter() < maxIter;
}
void beginNewIteration();
void pinCurrentFrontier(common::table_id_t tableID);
void pinNextFrontier(common::table_id_t tableID);
void beginFrontierComputeBetweenTables(common::table_id_t curTableID,
common::table_id_t nextTableID);
void addNodeToNextFrontier(common::nodeID_t nodeID);
void addNodeToNextFrontier(common::offset_t offset);
void addNodesToNextFrontier(const std::vector<common::nodeID_t>& nodeIDs);
iteration_t getNextFrontierValue(common::offset_t offset);
bool isActiveOnCurrentFrontier(common::offset_t offset);
virtual std::unordered_set<common::offset_t> getActiveNodesOnCurrentFrontier() = 0;
virtual GDSDensityState getState() const = 0;
virtual bool needSwitchToDense(uint64_t threshold) const = 0;
virtual void switchToDense(processor::ExecutionContext* context, graph::Graph* graph) = 0;
template<class TARGET>
TARGET* ptrCast() {
return common::dynamic_cast_checked<TARGET*>(this);
}
protected:
virtual void beginNewIterationInternalNoLock() = 0;
protected:
std::mutex mtx;
iteration_t curIter = 0;
std::atomic<bool> hasActiveNodesForNextIter_;
Frontier* currentFrontier = nullptr;
Frontier* nextFrontier = nullptr;
};
class SPFrontierPair : public FrontierPair {
public:
explicit SPFrontierPair(std::unique_ptr<DenseFrontier> denseFrontier);
Frontier* getFrontier();
void beginNewIterationInternalNoLock() override;
common::offset_t getNumActiveNodesInCurrentFrontier(common::NodeOffsetMaskMap& mask);
std::unordered_set<common::offset_t> getActiveNodesOnCurrentFrontier() override;
GDSDensityState getState() const override { return state; }
bool needSwitchToDense(uint64_t threshold) const override {
return state == GDSDensityState::SPARSE && sparseFrontier->size() > threshold;
}
void switchToDense(processor::ExecutionContext* context, graph::Graph* graph) override;
private:
GDSDensityState state;
std::unique_ptr<DenseFrontier> denseFrontier;
std::unique_ptr<DenseFrontierReference> curDenseFrontier = nullptr;
std::unique_ptr<DenseFrontierReference> nextDenseFrontier = nullptr;
std::unique_ptr<SparseFrontier> sparseFrontier;
std::unique_ptr<SparseFrontierReference> curSparseFrontier = nullptr;
std::unique_ptr<SparseFrontierReference> nextSparseFrontier = nullptr;
};
class LBUG_API DenseSparseDynamicFrontierPair : public FrontierPair {
public:
DenseSparseDynamicFrontierPair(std::unique_ptr<DenseFrontier> curDenseFrontier,
std::unique_ptr<DenseFrontier> nextDenseFrontier);
void beginNewIterationInternalNoLock() override;
std::unordered_set<common::offset_t> getActiveNodesOnCurrentFrontier() override;
GDSDensityState getState() const override { return state; }
bool needSwitchToDense(uint64_t threshold) const override {
return state == GDSDensityState::SPARSE && nextSparseFrontier->size() > threshold;
}
void switchToDense(processor::ExecutionContext* context, graph::Graph* graph) override;
private:
GDSDensityState state;
std::unique_ptr<DenseFrontier> curDenseFrontier = nullptr;
std::unique_ptr<DenseFrontier> nextDenseFrontier = nullptr;
std::unique_ptr<SparseFrontier> curSparseFrontier = nullptr;
std::unique_ptr<SparseFrontier> nextSparseFrontier = nullptr;
};
class LBUG_API DenseFrontierPair : public FrontierPair {
public:
DenseFrontierPair(std::unique_ptr<DenseFrontier> curDenseFrontier,
std::unique_ptr<DenseFrontier> nextDenseFrontier);
void beginNewIterationInternalNoLock() override;
std::unordered_set<common::offset_t> getActiveNodesOnCurrentFrontier() override {
UNREACHABLE_CODE;
}
void resetValue(processor::ExecutionContext* context, graph::Graph* graph, iteration_t val);
GDSDensityState getState() const override { return GDSDensityState::DENSE; }
bool needSwitchToDense(uint64_t) const override { return false; }
void switchToDense(processor::ExecutionContext*, graph::Graph*) override {
}
private:
std::shared_ptr<DenseFrontier> curDenseFrontier;
std::shared_ptr<DenseFrontier> nextDenseFrontier;
};
class SPEdgeCompute : public EdgeCompute {
public:
explicit SPEdgeCompute(SPFrontierPair* frontierPair)
: frontierPair{frontierPair}, numNodesReached{0} {}
void resetSingleThreadState() override { numNodesReached = 0; }
bool terminate(common::NodeOffsetMaskMap& maskMap) override;
protected:
SPFrontierPair* frontierPair;
common::offset_t numNodesReached;
};
} }