#pragma once
#include <cstddef>
#include <cstdint>
#include "common/assert.h"
#include "common/cast.h"
#include "common/types/types.h"
#include "storage/enums/residency_state.h"
#include "storage/table/column_chunk_data.h"
#include "storage/table/update_info.h"
namespace lbug {
namespace storage {
class PageAllocator;
class MemoryManager;
class Column;
struct ColumnChunkScanner;
struct ChunkCheckpointState {
std::unique_ptr<ColumnChunkData> chunkData;
common::row_idx_t startRow;
common::length_t numRows;
ChunkCheckpointState(std::unique_ptr<ColumnChunkData> chunkData, common::row_idx_t startRow,
common::length_t numRows)
: chunkData{std::move(chunkData)}, startRow{startRow}, numRows{numRows} {}
};
struct SegmentCheckpointState {
SegmentCheckpointState(const ColumnChunkData& chunkData, common::row_idx_t startRowInData,
common::row_idx_t offsetInSegment, common::row_idx_t numRows)
: chunkData{chunkData}, startRowInData{startRowInData}, offsetInSegment{offsetInSegment},
numRows{numRows} {}
const ColumnChunkData& chunkData;
common::row_idx_t startRowInData;
common::row_idx_t offsetInSegment;
common::row_idx_t numRows;
};
template<class SegmentView>
std::pair<typename std::span<SegmentView>::iterator, common::offset_t> genericFindSegment(
std::span<SegmentView> segments, common::offset_t offsetInChunk) {
auto offsetInSegment = offsetInChunk;
auto segment = segments.begin();
while (segment != segments.end()) {
if (offsetInSegment < (**segment).getNumValues()) {
return std::make_pair(segment, offsetInSegment);
}
offsetInSegment -= (**segment).getNumValues();
segment++;
}
return std::make_pair(segments.end(), 0);
}
template<class SegmentView,
std::invocable<SegmentView&, common::offset_t ,
common::offset_t , common::offset_t >
Func>
common::offset_t genericRangeSegments(std::span<SegmentView> segments,
common::offset_t offsetInChunk, common::length_t length, Func func) {
auto [segment, offsetInSegment] = genericFindSegment(segments, offsetInChunk);
return genericRangeSegmentsFromIt(segments, segment, offsetInSegment, length, std::move(func));
}
template<class SegmentView,
std::invocable<SegmentView&, common::offset_t ,
common::offset_t , common::offset_t >
Func>
common::offset_t genericRangeSegmentsFromIt(std::span<SegmentView> segments,
typename std::span<SegmentView>::iterator segment, common::offset_t offsetInSegment,
common::length_t length, Func func) {
common::offset_t lengthScanned = 0;
while (lengthScanned < length && segment != segments.end()) {
DASSERT((**segment).getNumValues() > offsetInSegment);
auto lengthInSegment =
std::min(length - lengthScanned, (**segment).getNumValues() - offsetInSegment);
func(*segment, offsetInSegment, lengthInSegment, lengthScanned);
lengthScanned += lengthInSegment;
segment++;
offsetInSegment = 0;
}
return lengthScanned;
}
class ColumnChunk;
struct ColumnCheckpointState {
ColumnChunkData& persistentData;
std::vector<SegmentCheckpointState> segmentCheckpointStates;
common::row_idx_t endRowIdxToWrite;
ColumnCheckpointState(ColumnChunkData& persistentData,
std::vector<SegmentCheckpointState> segmentCheckpointStates)
: persistentData{persistentData},
segmentCheckpointStates{std::move(segmentCheckpointStates)}, endRowIdxToWrite{0} {
for (const auto& chunkCheckpointState : this->segmentCheckpointStates) {
const auto endRowIdx =
chunkCheckpointState.offsetInSegment + chunkCheckpointState.numRows;
if (endRowIdx > endRowIdxToWrite) {
endRowIdxToWrite = endRowIdx;
}
}
}
};
struct ChunkState {
const Column* column = nullptr;
std::vector<SegmentState> segmentStates;
void reclaimAllocatedPages(PageAllocator& pageAllocator) const;
std::pair<const SegmentState*, common::offset_t> findSegment(
common::offset_t offsetInChunk) const;
template<std::invocable<SegmentState&, common::offset_t ,
common::offset_t , common::offset_t >
Func>
common::offset_t rangeSegments(common::offset_t offsetInChunk, common::length_t length,
Func func) {
return genericRangeSegments(std::span(segmentStates), offsetInChunk, length, func);
}
template<std::invocable<SegmentState&, common::offset_t ,
common::offset_t , common::offset_t >
Func>
common::offset_t rangeSegments(common::offset_t offsetInChunk, common::length_t length,
Func func) const {
return const_cast<ChunkState*>(this)->rangeSegments(offsetInChunk, length, func);
}
};
class ColumnChunk {
public:
ColumnChunk(MemoryManager& mm, common::LogicalType&& dataType, uint64_t capacity,
bool enableCompression, ResidencyState residencyState, bool initializeToZero = true);
ColumnChunk(MemoryManager& mm, common::LogicalType&& dataType, bool enableCompression,
ColumnChunkMetadata metadata);
ColumnChunk(bool enableCompression, std::unique_ptr<ColumnChunkData> data);
ColumnChunk(bool enableCompression, std::vector<std::unique_ptr<ColumnChunkData>> segments);
void initializeScanState(ChunkState& state, const Column* column) const;
void scan(const transaction::Transaction* transaction, const ChunkState& state,
common::ValueVector& output, common::offset_t offsetInChunk, common::length_t length) const;
template<ResidencyState SCAN_RESIDENCY_STATE>
void scanCommitted(const transaction::Transaction* transaction, ChunkState& chunkState,
ColumnChunkScanner& output, common::row_idx_t startRow = 0,
common::row_idx_t numRows = common::INVALID_ROW_IDX) const;
template<ResidencyState SCAN_RESIDENCY_STATE>
void scanCommitted(const transaction::Transaction* transaction, ChunkState& chunkState,
ColumnChunkData& output, common::row_idx_t startRow = 0,
common::row_idx_t numRows = common::INVALID_ROW_IDX) const;
void lookup(const transaction::Transaction* transaction, const ChunkState& state,
common::offset_t rowInChunk, common::ValueVector& output,
common::sel_t posInOutputVector) const;
void update(const transaction::Transaction* transaction, common::offset_t offsetInChunk,
const common::ValueVector& values);
uint64_t getEstimatedMemoryUsage() const {
if (getResidencyState() == ResidencyState::ON_DISK) {
return 0;
}
uint64_t memUsage = 0;
for (auto& segment : data) {
memUsage += segment->getEstimatedMemoryUsage();
}
return memUsage;
}
void serialize(common::Serializer& serializer) const;
static std::unique_ptr<ColumnChunk> deserialize(MemoryManager& mm, common::Deserializer& deSer);
uint64_t getNumValues() const {
uint64_t numValues = 0;
for (const auto& chunk : data) {
numValues += chunk->getNumValues();
}
return numValues;
}
uint64_t getCapacity() const {
uint64_t capacity = 0;
for (const auto& chunk : data) {
capacity += chunk->getCapacity();
}
return capacity;
}
void truncate(uint64_t numValues) {
uint64_t seenValues = 0;
uint64_t seenSegments = 0;
for (auto& segment : data) {
seenSegments++;
if (seenValues + segment->getNumValues() < numValues) {
seenValues += segment->getNumValues();
} else {
segment->setNumValues(numValues - seenValues);
break;
}
}
data.resize(seenSegments);
}
common::row_idx_t getNumUpdatedRows(const transaction::Transaction* transaction) const;
const common::LogicalType& getDataType() const { return data.front()->getDataType(); }
bool isCompressionEnabled() const { return enableCompression; }
ResidencyState getResidencyState() const {
auto state = data.front()->getResidencyState();
RUNTIME_CHECK(for (auto& chunk : data) { DASSERT(chunk->getResidencyState() == state); });
return state;
}
bool hasUpdates() const { return updateInfo.isSet(); }
bool hasUpdates(const transaction::Transaction* transaction, common::row_idx_t startRow,
common::length_t numRows) const;
void resetUpdateInfo() { updateInfo.reset(); }
MergedColumnChunkStats getMergedColumnChunkStats() const;
void reclaimStorage(PageAllocator& pageAllocator) const;
void append(common::ValueVector* vector, const common::SelectionView& selView);
void append(const ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend);
void append(const ColumnChunkData* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend);
template<typename T, std::invocable<T&, uint64_t> Func>
void mapValues(Func func, uint64_t startOffset = 0, uint64_t endOffset = UINT64_MAX) {
rangeSegments(startOffset, endOffset == UINT64_MAX ? UINT64_MAX : endOffset - startOffset,
[&](auto& segment, auto offsetInSegment, auto lengthInSegment, auto dstOffset) {
DASSERT(segment->getResidencyState() == ResidencyState::IN_MEMORY);
auto* segmentData = segment->template getData<T>();
for (size_t i = offsetInSegment; i < lengthInSegment; i++) {
func(segmentData[i], dstOffset + i - offsetInSegment);
}
});
}
template<typename T>
T getValue(common::offset_t pos) const {
DASSERT(pos < getCapacity());
auto [segment, offsetInSegment] = genericFindSegment(std::span(data), pos);
DASSERT(segment->get() != nullptr);
DASSERT((*segment)->getResidencyState() == ResidencyState::IN_MEMORY);
return (*segment)->template getValue<T>(offsetInSegment);
}
template<typename T>
void setValue(T val, common::offset_t pos) const {
DASSERT(pos < getCapacity());
auto [segment, offsetInSegment] = genericFindSegment(std::span(data), pos);
DASSERT(segment->get() != nullptr);
DASSERT((*segment)->getResidencyState() == ResidencyState::IN_MEMORY);
(*segment)->template setValue<T>(val, pos);
}
void flush(PageAllocator& pageAllocator) {
for (auto& segment : data) {
DASSERT(segment->getResidencyState() == ResidencyState::IN_MEMORY);
segment->flush(pageAllocator);
}
}
void populateWithDefaultVal(evaluator::ExpressionEvaluator& defaultEvaluator,
uint64_t& numValues_, ColumnStats* newColumnStats) {
DASSERT(data.size() == 1 && data.back()->getNumValues() == 0);
data.back()->populateWithDefaultVal(defaultEvaluator, numValues_, newColumnStats);
}
void finalize() {
for (auto& segment : data) {
DASSERT(segment->getResidencyState() == ResidencyState::IN_MEMORY);
segment->finalize();
}
}
std::vector<const ColumnChunkData*> getSegments() const {
std::vector<const ColumnChunkData*> segments;
for (const auto& segment : data) {
segments.push_back(segment.get());
}
return segments;
}
void checkpoint(Column& column, std::vector<ChunkCheckpointState>&& chunkCheckpointStates,
PageAllocator& pageAllocator);
void write(Column& column, ChunkState& state, common::offset_t dstOffset,
const ColumnChunkData& dataToWrite, common::offset_t srcOffset, common::length_t numValues);
void syncNumValues() {
for (auto& segment : data) {
segment->syncNumValues();
}
}
void setTableID(common::table_id_t tableID) {
for (const auto& segment : data) {
auto internalIDSegment =
common::dynamic_cast_checked<InternalIDChunkData*>(segment.get());
internalIDSegment->setTableID(tableID);
}
}
private:
template<typename Func>
void rangeSegments(common::offset_t offsetInChunk, common::length_t length, Func func) const {
genericRangeSegments(std::span(data), offsetInChunk, length, func);
}
void scanInMemSegments(ColumnChunkScanner& output, common::offset_t startRow,
common::offset_t numRows) const;
private:
bool enableCompression;
std::vector<std::unique_ptr<ColumnChunkData>> data;
UpdateInfo updateInfo;
};
} }