#pragma once
#include <cstdint>
#include <functional>
#include <optional>
#include <variant>
#include "common/data_chunk/sel_vector.h"
#include "common/enums/rel_multiplicity.h"
#include "common/null_mask.h"
#include "common/system_config.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/compression/compression.h"
#include "storage/enums/residency_state.h"
#include "storage/table/column_chunk_metadata.h"
#include "storage/table/column_chunk_stats.h"
#include "storage/table/in_memory_exception_chunk.h"
namespace lbug::storage {
class PageManager;
}
namespace lbug {
namespace evaluator {
class ExpressionEvaluator;
}
namespace transaction {
class Transaction;
}
namespace storage {
class Column;
class NullChunkData;
class ColumnStats;
class PageAllocator;
class FileHandle;
struct SegmentState {
const Column* column;
ColumnChunkMetadata metadata;
uint64_t numValuesPerPage = UINT64_MAX;
std::unique_ptr<SegmentState> nullState;
std::vector<SegmentState> childrenStates;
std::variant<std::unique_ptr<InMemoryExceptionChunk<double>>,
std::unique_ptr<InMemoryExceptionChunk<float>>>
alpExceptionChunk;
explicit SegmentState(bool hasNull = true) : column{nullptr} {
if (hasNull) {
nullState = std::make_unique<SegmentState>(false );
}
}
SegmentState(ColumnChunkMetadata metadata, uint64_t numValuesPerPage)
: column{nullptr}, metadata{std::move(metadata)}, numValuesPerPage{numValuesPerPage} {
nullState = std::make_unique<SegmentState>(false );
}
SegmentState& getChildState(common::idx_t childIdx) {
DASSERT(childIdx < childrenStates.size());
return childrenStates[childIdx];
}
const SegmentState& getChildState(common::idx_t childIdx) const {
DASSERT(childIdx < childrenStates.size());
return childrenStates[childIdx];
}
template<std::floating_point T>
InMemoryExceptionChunk<T>* getExceptionChunk() {
using GetType = std::unique_ptr<InMemoryExceptionChunk<T>>;
DASSERT(std::holds_alternative<GetType>(alpExceptionChunk));
return std::get<GetType>(alpExceptionChunk).get();
}
template<std::floating_point T>
const InMemoryExceptionChunk<T>* getExceptionChunkConst() const {
using GetType = std::unique_ptr<InMemoryExceptionChunk<T>>;
DASSERT(std::holds_alternative<GetType>(alpExceptionChunk));
return std::get<GetType>(alpExceptionChunk).get();
}
void reclaimAllocatedPages(PageAllocator& pageAllocator) const;
SegmentState& operator*() { return *this; }
const SegmentState& operator*() const { return *this; }
uint64_t getNumValues() const { return metadata.numValues; }
};
class Spiller;
class LBUG_API ColumnChunkData {
public:
friend struct ColumnChunkFactory;
friend class Spiller;
ColumnChunkData(MemoryManager& mm, common::LogicalType dataType, uint64_t capacity,
bool enableCompression, ResidencyState residencyState, bool hasNullData,
bool initializeToZero = true);
ColumnChunkData(MemoryManager& mm, common::LogicalType dataType, bool enableCompression,
const ColumnChunkMetadata& metadata, bool hasNullData, bool initializeToZero = true);
ColumnChunkData(MemoryManager& mm, common::PhysicalTypeID physicalType, bool enableCompression,
const ColumnChunkMetadata& metadata, bool hasNullData, bool initializeToZero = true);
virtual ~ColumnChunkData();
template<typename T>
T getValue(common::offset_t pos) const {
DASSERT(pos < numValues);
DASSERT(residencyState != ResidencyState::ON_DISK);
return getData<T>()[pos];
}
template<typename T>
void setValue(T val, common::offset_t pos) {
DASSERT(pos < capacity);
DASSERT(residencyState != ResidencyState::ON_DISK);
getData<T>()[pos] = val;
if (pos >= numValues) {
numValues = pos + 1;
}
if constexpr (StorageValueType<T>) {
inMemoryStats.update(StorageValue{val}, dataType.getPhysicalType());
}
}
virtual bool isNull(common::offset_t pos) const;
void setNullData(std::unique_ptr<NullChunkData> nullData_);
bool hasNullData() const { return nullData != nullptr; }
NullChunkData* getNullData() { return nullData.get(); }
const NullChunkData* getNullData() const { return nullData.get(); }
std::optional<common::NullMask> getNullMask() const;
std::unique_ptr<NullChunkData> moveNullData();
common::LogicalType& getDataType() { return dataType; }
const common::LogicalType& getDataType() const { return dataType; }
ResidencyState getResidencyState() const { return residencyState; }
bool isCompressionEnabled() const { return enableCompression; }
ColumnChunkMetadata& getMetadata() {
DASSERT(residencyState == ResidencyState::ON_DISK);
return metadata;
}
const ColumnChunkMetadata& getMetadata() const {
DASSERT(residencyState == ResidencyState::ON_DISK);
return metadata;
}
void setMetadata(const ColumnChunkMetadata& metadata_) {
DASSERT(residencyState == ResidencyState::ON_DISK);
metadata = metadata_;
}
virtual void resetToAllNull();
virtual void resetToEmpty();
virtual ColumnChunkMetadata getMetadataToFlush() const;
virtual void append(common::ValueVector* vector, const common::SelectionView& selView);
virtual void append(const ColumnChunkData* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend);
virtual void flush(PageAllocator& pageAllocator);
ColumnChunkMetadata flushBuffer(PageAllocator& pageAllocator, const PageRange& entry,
const ColumnChunkMetadata& metadata) const;
static common::page_idx_t getNumPagesForBytes(uint64_t numBytes) {
return (numBytes + common::LBUG_PAGE_SIZE - 1) / common::LBUG_PAGE_SIZE;
}
uint64_t getNumBytesPerValue() const { return numBytesPerValue; }
uint8_t* getData() const;
template<typename T>
T* getData() const {
return reinterpret_cast<T*>(getData());
}
uint64_t getBufferSize() const;
virtual void initializeScanState(SegmentState& state, const Column* column) const;
virtual void scan(common::ValueVector& output, common::offset_t offset, common::length_t length,
common::sel_t posInOutputVector = 0) const;
virtual void lookup(common::offset_t offsetInChunk, common::ValueVector& output,
common::sel_t posInOutputVector) const;
virtual void write(const common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk);
virtual void write(ColumnChunkData* chunk, ColumnChunkData* offsetsInChunk,
common::RelMultiplicity multiplicity);
virtual void write(const ColumnChunkData* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy);
virtual void setToInMemory();
virtual void resize(uint64_t newCapacity);
virtual void resizeWithoutPreserve(uint64_t newCapacity);
void populateWithDefaultVal(evaluator::ExpressionEvaluator& defaultEvaluator,
uint64_t& numValues_, ColumnStats* newColumnStats);
virtual void finalize() {
DASSERT(residencyState != ResidencyState::ON_DISK);
}
uint64_t getCapacity() const { return capacity; }
uint64_t getNumValues() const { return numValues; }
virtual void resetNumValuesFromMetadata();
virtual void setNumValues(uint64_t numValues_);
inline void truncate(uint64_t numValues_) { setNumValues(numValues_); }
virtual void syncNumValues() {}
virtual bool numValuesSanityCheck() const;
virtual bool sanityCheck() const;
virtual uint64_t getEstimatedMemoryUsage() const;
bool shouldSplit() const {
return numValues > 1 && getSizeOnDisk() > std::max(getMinimumSizeOnDisk(),
common::StorageConfig::MAX_SEGMENT_SIZE);
}
const ColumnChunkStats& getInMemoryStats() const;
virtual uint64_t getMinimumSizeOnDisk() const;
virtual uint64_t getSizeOnDisk() const;
virtual uint64_t getSizeOnDiskInMemoryStats() const;
virtual void serialize(common::Serializer& serializer) const;
static std::unique_ptr<ColumnChunkData> deserialize(MemoryManager& mm,
common::Deserializer& deSer);
template<typename TARGET>
TARGET& cast() {
return common::dynamic_cast_checked<TARGET&>(*this);
}
template<typename TARGET>
const TARGET& cast() const {
return common::dynamic_cast_checked<const TARGET&>(*this);
}
MemoryManager& getMemoryManager() const;
void loadFromDisk();
SpillResult spillToDisk();
MergedColumnChunkStats getMergedColumnChunkStats() const;
void updateStats(const common::ValueVector* vector, const common::SelectionView& selVector);
virtual void reclaimStorage(PageAllocator& pageAllocator);
std::vector<std::unique_ptr<ColumnChunkData>> split(bool targetMaxSize = false) const;
protected:
void initializeBuffer(common::PhysicalTypeID physicalType, MemoryManager& mm,
bool initializeToZero);
void initializeFunction();
void setToOnDisk(const ColumnChunkMetadata& metadata);
virtual void copyVectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk,
const common::SelectionView& selView);
void resetInMemoryStats();
private:
using flush_buffer_func_t = std::function<ColumnChunkMetadata(const std::span<uint8_t>,
FileHandle*, const PageRange&, const ColumnChunkMetadata&)>;
flush_buffer_func_t initializeFlushBufferFunction(
std::shared_ptr<CompressionAlg> compression) const;
uint64_t getBufferSize(uint64_t capacity_) const;
protected:
using get_metadata_func_t = std::function<ColumnChunkMetadata(const std::span<uint8_t>,
uint64_t, StorageValue, StorageValue)>;
using get_min_max_func_t =
std::function<std::pair<StorageValue, StorageValue>(const uint8_t*, uint64_t)>;
ResidencyState residencyState;
common::LogicalType dataType;
bool enableCompression;
uint32_t numBytesPerValue;
uint64_t capacity;
std::unique_ptr<MemoryBuffer> buffer;
std::unique_ptr<NullChunkData> nullData;
uint64_t numValues;
flush_buffer_func_t flushBufferFunction;
get_metadata_func_t getMetadataFunction;
ColumnChunkMetadata metadata;
ColumnChunkStats inMemoryStats;
};
template<>
inline void ColumnChunkData::setValue(bool val, common::offset_t pos) {
DASSERT(pos < capacity);
DASSERT(residencyState != ResidencyState::ON_DISK);
common::NullMask::setNull(getData<uint64_t>(), pos, val);
if (pos >= numValues) {
numValues = pos + 1;
}
inMemoryStats.update(StorageValue{val}, dataType.getPhysicalType());
}
template<>
inline bool ColumnChunkData::getValue(common::offset_t pos) const {
return common::NullMask::isNull(getData<uint64_t>(), pos);
}
class BoolChunkData : public ColumnChunkData {
public:
BoolChunkData(MemoryManager& mm, uint64_t capacity, bool enableCompression, ResidencyState type,
bool hasNullChunk)
: ColumnChunkData(mm, common::LogicalType::BOOL(), capacity,
enableCompression, type, hasNullChunk, true) {}
BoolChunkData(MemoryManager& mm, bool enableCompression, const ColumnChunkMetadata& metadata,
bool hasNullData)
: ColumnChunkData{mm, common::LogicalType::BOOL(), enableCompression, metadata, hasNullData,
true} {}
void append(common::ValueVector* vector, const common::SelectionView& sel) final;
void append(const ColumnChunkData* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) override;
void scan(common::ValueVector& output, common::offset_t offset, common::length_t length,
common::sel_t posInOutputVector = 0) const override;
void lookup(common::offset_t offsetInChunk, common::ValueVector& output,
common::sel_t posInOutputVector) const override;
void write(const common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) override;
void write(ColumnChunkData* chunk, ColumnChunkData* dstOffsets,
common::RelMultiplicity multiplicity) final;
void write(const ColumnChunkData* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
};
class NullChunkData final : public BoolChunkData {
public:
NullChunkData(MemoryManager& mm, uint64_t capacity, bool enableCompression, ResidencyState type)
: BoolChunkData(mm, capacity, enableCompression, type, false ) {}
NullChunkData(MemoryManager& mm, bool enableCompression, const ColumnChunkMetadata& metadata)
: BoolChunkData{mm, enableCompression, metadata, false } {}
bool isNull(common::offset_t pos) const override { return getValue<bool>(pos); }
void setNull(common::offset_t pos, bool isNull);
bool noNullsGuaranteedInMem() const {
return !inMemoryStats.max || !inMemoryStats.max->get<bool>();
}
bool allNullsGuaranteedInMem() const {
return !inMemoryStats.min || inMemoryStats.min->get<bool>();
}
bool haveNoNullsGuaranteed() const;
bool haveAllNullsGuaranteed() const;
void resetToEmpty() override {
memset(getData(), 0 , getBufferSize());
numValues = 0;
inMemoryStats.min = inMemoryStats.max = std::nullopt;
}
void resetToNoNull() {
memset(getData(), 0 , getBufferSize());
inMemoryStats.min = inMemoryStats.max = false;
}
void resetToAllNull() override {
memset(getData(), 0xFF , getBufferSize());
inMemoryStats.min = inMemoryStats.max = true;
}
void copyFromBuffer(const uint64_t* srcBuffer, uint64_t srcOffset, uint64_t dstOffset,
uint64_t numBits) {
DASSERT(numBits > 0);
common::NullMask::copyNullMask(srcBuffer, srcOffset, getData<uint64_t>(), dstOffset,
numBits);
auto [min, max] = common::NullMask::getMinMax(srcBuffer, srcOffset, numBits);
if (!inMemoryStats.min.has_value() || min < inMemoryStats.min->get<bool>()) {
inMemoryStats.min = min;
}
if (!inMemoryStats.max.has_value() || max > inMemoryStats.max->get<bool>()) {
inMemoryStats.max = max;
}
if ((dstOffset + numBits) >= numValues) {
numValues = dstOffset + numBits;
}
}
void appendNulls(const common::ValueVector* vector, const common::SelectionView& selView,
common::offset_t startPosInChunk);
void scan(common::ValueVector& output, common::offset_t offset, common::length_t length,
common::sel_t posInOutputVector = 0) const override;
void append(const ColumnChunkData* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) override;
void write(const common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) override;
void write(const ColumnChunkData* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void serialize(common::Serializer& serializer) const override;
static std::unique_ptr<NullChunkData> deserialize(MemoryManager& mm,
common::Deserializer& deSer);
common::NullMask getNullMask() const;
};
class LBUG_API InternalIDChunkData final : public ColumnChunkData {
public:
InternalIDChunkData(MemoryManager& mm, uint64_t capacity, bool enableCompression,
ResidencyState residencyState)
: ColumnChunkData(mm, common::LogicalType::INTERNAL_ID(), capacity, enableCompression,
residencyState, false ),
commonTableID{common::INVALID_TABLE_ID} {}
InternalIDChunkData(MemoryManager& mm, bool enableCompression,
const ColumnChunkMetadata& metadata)
: ColumnChunkData{mm, common::LogicalType::INTERNAL_ID(), enableCompression, metadata,
false },
commonTableID{common::INVALID_TABLE_ID} {}
void append(common::ValueVector* vector, const common::SelectionView& selView) override;
void copyVectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk,
const common::SelectionView& selView) override;
void copyInt64VectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk,
const common::SelectionView& selView) const;
void scan(common::ValueVector& output, common::offset_t offset, common::length_t length,
common::sel_t posInOutputVector = 0) const override;
void lookup(common::offset_t offsetInChunk, common::ValueVector& output,
common::sel_t posInOutputVector) const override;
void write(const common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) override;
void append(const ColumnChunkData* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) override;
void setTableID(common::table_id_t tableID) { commonTableID = tableID; }
common::table_id_t getTableID() const { return commonTableID; }
common::offset_t operator[](common::offset_t pos) const {
return getValue<common::offset_t>(pos);
}
common::offset_t& operator[](common::offset_t pos) { return getData<common::offset_t>()[pos]; }
private:
common::table_id_t commonTableID;
};
struct ColumnChunkFactory {
static std::unique_ptr<ColumnChunkData> createColumnChunkData(MemoryManager& mm,
common::LogicalType dataType, bool enableCompression, uint64_t capacity,
ResidencyState residencyState, bool hasNullData = true, bool initializeToZero = true);
static std::unique_ptr<ColumnChunkData> createColumnChunkData(MemoryManager& mm,
common::LogicalType dataType, bool enableCompression, ColumnChunkMetadata& metadata,
bool hasNullData, bool initializeToZero);
static std::unique_ptr<ColumnChunkData> createNullChunkData(MemoryManager& mm,
bool enableCompression, uint64_t capacity, ResidencyState type) {
return std::make_unique<NullChunkData>(mm, capacity, enableCompression, type);
}
};
} }