#pragma once
#include <atomic>
#include "storage/stats/table_stats.h"
#include "storage/table/group_collection.h"
#include "storage/table/node_group.h"
namespace lbug {
namespace transaction {
class Transaction;
}
namespace storage {
class MemoryManager;
class NodeGroupCollection {
public:
NodeGroupCollection(MemoryManager& mm, const std::vector<common::LogicalType>& types,
bool enableCompression, ResidencyState residency = ResidencyState::IN_MEMORY,
const VersionRecordHandler* versionRecordHandler = nullptr);
void append(const transaction::Transaction* transaction,
const std::vector<common::ValueVector*>& vectors);
void append(const transaction::Transaction* transaction,
const std::vector<common::column_id_t>& columnIDs, const NodeGroupCollection& other);
void append(const transaction::Transaction* transaction,
const std::vector<common::column_id_t>& columnIDs, const NodeGroup& nodeGroup);
std::pair<common::offset_t, common::offset_t> appendToLastNodeGroupAndFlushWhenFull(
transaction::Transaction* transaction, const std::vector<common::column_id_t>& columnIDs,
InMemChunkedNodeGroup& chunkedGroup, PageAllocator& pageAllocator);
common::row_idx_t getNumTotalRows() const;
common::node_group_idx_t getNumNodeGroups() const {
const auto lock = nodeGroups.lock();
return nodeGroups.getNumGroups(lock);
}
common::node_group_idx_t getNumNodeGroupsNoLock() const {
return nodeGroups.getNumGroupsNoLock();
}
NodeGroup* getNodeGroupNoLock(const common::node_group_idx_t groupIdx) const {
DASSERT(nodeGroups.getGroupNoLock(groupIdx)->getNodeGroupIdx() == groupIdx);
return nodeGroups.getGroupNoLock(groupIdx);
}
NodeGroup* getNodeGroup(const common::node_group_idx_t groupIdx,
bool mayOutOfBound = false) const {
const auto lock = nodeGroups.lock();
if (mayOutOfBound && groupIdx >= nodeGroups.getNumGroups(lock)) {
return nullptr;
}
DASSERT(nodeGroups.getGroupNoLock(groupIdx)->getNodeGroupIdx() == groupIdx);
return nodeGroups.getGroup(lock, groupIdx);
}
NodeGroup* getOrCreateNodeGroup(const transaction::Transaction* transaction,
common::node_group_idx_t groupIdx, NodeGroupDataFormat format);
void setNodeGroup(const common::node_group_idx_t nodeGroupIdx,
std::unique_ptr<NodeGroup> group) {
const auto lock = nodeGroups.lock();
nodeGroups.replaceGroup(lock, nodeGroupIdx, std::move(group));
}
void rollbackInsert(common::row_idx_t numRows_, bool updateNumRows = true);
void clear() {
const auto lock = nodeGroups.lock();
nodeGroups.clear(lock);
}
common::column_id_t getNumColumns() const { return types.size(); }
void addColumn(TableAddColumnState& addColumnState, PageAllocator* pageAllocator = nullptr);
uint64_t getEstimatedMemoryUsage() const;
void checkpoint(MemoryManager& memoryManager, NodeGroupCheckpointState& state);
void reclaimStorage(PageAllocator& pageAllocator) const;
TableStats getStats() const {
auto lock = nodeGroups.lock();
return stats.copy();
}
TableStats getStats(const common::UniqLock& lock) const {
DASSERT(lock.isLocked());
UNUSED(lock);
return stats.copy();
}
void mergeStats(const TableStats& stats) {
auto lock = nodeGroups.lock();
this->stats.merge(stats);
}
void mergeStats(const std::vector<common::column_id_t>& columnIDs, const TableStats& stats) {
auto lock = nodeGroups.lock();
this->stats.merge(columnIDs, stats);
}
void serialize(common::Serializer& ser);
void deserialize(common::Deserializer& deSer, MemoryManager& memoryManager);
void pushInsertInfo(const transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::row_idx_t startRow,
common::row_idx_t numRows, const VersionRecordHandler* versionRecordHandler,
bool incrementNumRows);
private:
void pushInsertInfo(const transaction::Transaction* transaction, const NodeGroup* nodeGroup,
common::row_idx_t numRows);
private:
MemoryManager& mm;
bool enableCompression;
std::atomic<common::row_idx_t> numTotalRows;
std::vector<common::LogicalType> types;
GroupCollection<NodeGroup> nodeGroups;
ResidencyState residency;
TableStats stats;
const VersionRecordHandler* versionRecordHandler;
};
} }