#pragma once
#include <algorithm>
#include <numeric>
#include "common/in_mem_overflow_buffer.h"
#include "common/types/value/value.h"
#include "common/vector/value_vector.h"
#include "factorized_table_schema.h"
#include "flat_tuple.h"
namespace lbug {
namespace storage {
class MemoryManager;
}
namespace processor {
struct BlockAppendingInfo {
BlockAppendingInfo(uint8_t* data, uint64_t numTuplesToAppend)
: data{data}, numTuplesToAppend{numTuplesToAppend} {}
uint8_t* data;
uint64_t numTuplesToAppend;
};
class DataBlock {
public:
DataBlock(storage::MemoryManager* mm, uint64_t size);
~DataBlock();
uint8_t* getData() const;
std::span<uint8_t> getSizedData() const;
uint8_t* getWritableData() const;
void resetNumTuplesAndFreeSize();
void resetToZero();
void preventDestruction();
static void copyTuples(DataBlock* blockToCopyFrom, ft_tuple_idx_t tupleIdxToCopyFrom,
DataBlock* blockToCopyInto, ft_tuple_idx_t tupleIdxToCopyTo, uint32_t numTuplesToCopy,
uint32_t numBytesPerTuple);
public:
uint32_t numTuples;
uint64_t freeSize;
private:
std::unique_ptr<storage::MemoryBuffer> block;
};
class DataBlockCollection {
public:
DataBlockCollection() : numBytesPerTuple{UINT32_MAX}, numTuplesPerBlock{UINT32_MAX} {}
DataBlockCollection(uint32_t numBytesPerTuple, uint32_t numTuplesPerBlock)
: numBytesPerTuple{numBytesPerTuple}, numTuplesPerBlock{numTuplesPerBlock} {}
void append(std::unique_ptr<DataBlock> otherBlock) { blocks.push_back(std::move(otherBlock)); }
void append(std::vector<std::unique_ptr<DataBlock>> otherBlocks) {
std::move(begin(otherBlocks), end(otherBlocks), back_inserter(blocks));
}
void append(std::unique_ptr<DataBlockCollection> other) { append(std::move(other->blocks)); }
bool needAllocation(uint64_t size) const { return isEmpty() || blocks.back()->freeSize < size; }
bool isEmpty() const { return blocks.empty(); }
const std::vector<std::unique_ptr<DataBlock>>& getBlocks() const { return blocks; }
DataBlock* getBlock(ft_block_idx_t blockIdx) { return blocks[blockIdx].get(); }
DataBlock* getLastBlock() { return blocks.back().get(); }
void merge(DataBlockCollection& other);
void preventDestruction() const {
for (auto& block : blocks) {
block->preventDestruction();
}
}
private:
uint32_t numBytesPerTuple;
uint32_t numTuplesPerBlock;
std::vector<std::unique_ptr<DataBlock>> blocks;
};
class FlatTupleIterator;
class LBUG_API FactorizedTable {
friend FlatTupleIterator;
friend class JoinHashTable;
friend class PathPropertyProbe;
public:
FactorizedTable(storage::MemoryManager* memoryManager, FactorizedTableSchema tableSchema);
~FactorizedTable();
void append(const std::vector<common::ValueVector*>& vectors);
uint8_t* appendEmptyTuple();
void scan(std::span<common::ValueVector*> vectors, ft_tuple_idx_t tupleIdx,
uint64_t numTuplesToScan) const {
std::vector<uint32_t> colIdxes(tableSchema.getNumColumns());
iota(colIdxes.begin(), colIdxes.end(), 0);
scan(vectors, tupleIdx, numTuplesToScan, colIdxes);
}
bool isEmpty() const { return getNumTuples() == 0; }
void scan(std::span<common::ValueVector*> vectors, ft_tuple_idx_t tupleIdx,
uint64_t numTuplesToScan, std::span<uint32_t> colIdxToScan) const;
void lookup(std::span<common::ValueVector*> vectors, std::span<uint32_t> colIdxesToScan,
uint8_t** tuplesToRead, uint64_t startPos, uint64_t numTuplesToRead) const;
void lookup(std::vector<common::ValueVector*>& vectors,
const common::SelectionVector* selVector, std::vector<uint32_t>& colIdxesToScan,
uint8_t* tupleToRead) const;
void lookup(std::vector<common::ValueVector*>& vectors, std::vector<uint32_t>& colIdxesToScan,
std::vector<ft_tuple_idx_t>& tupleIdxesToRead, uint64_t startPos,
uint64_t numTuplesToRead) const;
void mergeMayContainNulls(FactorizedTable& other);
void merge(FactorizedTable& other);
common::InMemOverflowBuffer* getInMemOverflowBuffer() const {
return inMemOverflowBuffer.get();
}
bool hasUnflatCol() const;
bool hasUnflatCol(std::vector<ft_col_idx_t>& colIdxes) const {
return std::any_of(colIdxes.begin(), colIdxes.end(),
[this](ft_col_idx_t colIdx) { return !tableSchema.getColumn(colIdx)->isFlat(); });
}
uint64_t getNumTuples() const { return numTuples; }
uint64_t getTotalNumFlatTuples() const;
uint64_t getNumFlatTuples(ft_tuple_idx_t tupleIdx) const;
const std::vector<std::unique_ptr<DataBlock>>& getTupleDataBlocks() {
return flatTupleBlockCollection->getBlocks();
}
const FactorizedTableSchema* getTableSchema() const { return &tableSchema; }
template<typename TYPE>
TYPE getData(ft_block_idx_t blockIdx, ft_block_offset_t blockOffset,
ft_col_offset_t colOffset) const {
return *((TYPE*)getCell(blockIdx, blockOffset, colOffset));
}
uint8_t* getTuple(ft_tuple_idx_t tupleIdx) const;
void updateFlatCell(uint8_t* tuplePtr, ft_col_idx_t colIdx, common::ValueVector* valueVector,
uint32_t pos);
void updateFlatCellNoNull(uint8_t* ftTuplePtr, ft_col_idx_t colIdx, void* dataBuf) {
memcpy(ftTuplePtr + tableSchema.getColOffset(colIdx), dataBuf,
tableSchema.getColumn(colIdx)->getNumBytes());
}
uint64_t getNumTuplesPerBlock() const { return numFlatTuplesPerBlock; }
bool hasNoNullGuarantee(ft_col_idx_t colIdx) const {
return tableSchema.getColumn(colIdx)->hasNoNullGuarantee();
}
bool isOverflowColNull(const uint8_t* nullBuffer, ft_tuple_idx_t tupleIdx,
ft_col_idx_t colIdx) const;
bool isNonOverflowColNull(const uint8_t* nullBuffer, ft_col_idx_t colIdx) const;
bool isNonOverflowColNull(ft_tuple_idx_t tupleIdx, ft_col_idx_t colIdx) const;
void setNonOverflowColNull(uint8_t* nullBuffer, ft_col_idx_t colIdx);
void clear();
storage::MemoryManager* getMemoryManager() { return memoryManager; }
void resize(uint64_t numTuples);
template<typename Func>
void forEach(Func func) {
for (auto& tupleBlock : flatTupleBlockCollection->getBlocks()) {
uint8_t* tuple = tupleBlock->getData();
for (auto i = 0u; i < tupleBlock->numTuples; i++) {
func(tuple);
tuple += getTableSchema()->getNumBytesPerTuple();
}
}
}
static std::shared_ptr<FactorizedTable> EmptyTable(storage::MemoryManager* mm) {
return std::make_shared<FactorizedTable>(mm, FactorizedTableSchema());
}
void setPreventDestruction(bool preventDestruction) {
this->preventDestruction = preventDestruction;
}
private:
void setOverflowColNull(uint8_t* nullBuffer, ft_col_idx_t colIdx, ft_tuple_idx_t tupleIdx);
uint64_t computeNumTuplesToAppend(
const std::vector<common::ValueVector*>& vectorsToAppend) const;
uint8_t* getCell(ft_block_idx_t blockIdx, ft_block_offset_t blockOffset,
ft_col_offset_t colOffset) const {
return flatTupleBlockCollection->getBlock(blockIdx)->getData() +
blockOffset * tableSchema.getNumBytesPerTuple() + colOffset;
}
std::pair<ft_block_idx_t, ft_block_offset_t> getBlockIdxAndTupleIdxInBlock(
uint64_t tupleIdx) const {
return std::make_pair(tupleIdx / numFlatTuplesPerBlock, tupleIdx % numFlatTuplesPerBlock);
}
std::vector<BlockAppendingInfo> allocateFlatTupleBlocks(uint64_t numTuplesToAppend);
uint8_t* allocateUnflatTupleBlock(uint32_t numBytes);
void copyFlatVectorToFlatColumn(const common::ValueVector& vector,
const BlockAppendingInfo& blockAppendInfo, ft_col_idx_t colIdx);
void copyUnflatVectorToFlatColumn(const common::ValueVector& vector,
const BlockAppendingInfo& blockAppendInfo, uint64_t numAppendedTuples, ft_col_idx_t colIdx);
void copyVectorToFlatColumn(const common::ValueVector& vector,
const BlockAppendingInfo& blockAppendInfo, uint64_t numAppendedTuples,
ft_col_idx_t colIdx) {
vector.state->isFlat() ?
copyFlatVectorToFlatColumn(vector, blockAppendInfo, colIdx) :
copyUnflatVectorToFlatColumn(vector, blockAppendInfo, numAppendedTuples, colIdx);
}
void copyVectorToUnflatColumn(const common::ValueVector& vector,
const BlockAppendingInfo& blockAppendInfo, ft_col_idx_t colIdx);
void copyVectorToColumn(const common::ValueVector& vector,
const BlockAppendingInfo& blockAppendInfo, uint64_t numAppendedTuples, ft_col_idx_t colIdx);
common::overflow_value_t appendVectorToUnflatTupleBlocks(const common::ValueVector& vector,
ft_col_idx_t colIdx);
void readUnflatCol(uint8_t** tuplesToRead, ft_col_idx_t colIdx,
common::ValueVector& vector) const;
void readUnflatCol(const uint8_t* tupleToRead, const common::SelectionVector& selVector,
ft_col_idx_t colIdx, common::ValueVector& vector) const;
void readFlatColToFlatVector(uint8_t* tupleToRead, ft_col_idx_t colIdx,
common::ValueVector& vector, common::sel_t pos) const;
void readFlatColToUnflatVector(uint8_t** tuplesToRead, ft_col_idx_t colIdx,
common::ValueVector& vector, uint64_t numTuplesToRead) const;
void readFlatCol(uint8_t** tuplesToRead, ft_col_idx_t colIdx, common::ValueVector& vector,
uint64_t numTuplesToRead) const;
private:
storage::MemoryManager* memoryManager;
FactorizedTableSchema tableSchema;
uint64_t numTuples;
uint64_t flatTupleBlockSize;
uint32_t numFlatTuplesPerBlock;
std::unique_ptr<DataBlockCollection> flatTupleBlockCollection;
std::unique_ptr<DataBlockCollection> unFlatTupleBlockCollection;
std::unique_ptr<common::InMemOverflowBuffer> inMemOverflowBuffer;
bool preventDestruction = false;
};
class FactorizedTableIterator {
public:
explicit FactorizedTableIterator(FactorizedTable& factorizedTable);
bool hasNext() {
return nextTupleIdx < factorizedTable.getNumTuples() || nextFlatTupleIdx < numFlatTuples;
}
void getNext(FlatTuple& tuple);
void resetState();
private:
bool isValidDataChunkPos(uint32_t dataChunkPos) const {
return flatTuplePositionsInDataChunk[dataChunkPos].first != UINT64_MAX;
}
void readUnflatColToFlatTuple(ft_col_idx_t colIdx, uint8_t* valueBuffer, FlatTuple& tuple);
void readFlatColToFlatTuple(ft_col_idx_t colIdx, uint8_t* valueBuffer, FlatTuple& tuple);
void updateInvalidEntriesInFlatTuplePositionsInDataChunk();
void updateNumElementsInDataChunk();
void updateFlatTuplePositionsInDataChunk();
const FactorizedTable& factorizedTable;
uint8_t* currentTupleBuffer;
uint64_t numFlatTuples;
ft_tuple_idx_t nextFlatTupleIdx;
ft_tuple_idx_t nextTupleIdx;
std::vector<std::pair<uint64_t, uint64_t>> flatTuplePositionsInDataChunk;
};
} }