#include "storage/table/dictionary_column.h"
#include <algorithm>
#include <cstdint>
#include "common/types/string_t.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/storage_utils.h"
#include "storage/table/column_chunk_data.h"
#include "storage/table/dictionary_chunk.h"
#include "storage/table/string_chunk_data.h"
#include "storage/table/string_column.h"
#include <bit>
#include <concepts>
using namespace lbug::common;
using namespace lbug::transaction;
namespace lbug {
namespace storage {
using string_index_t = DictionaryChunk::string_index_t;
using string_offset_t = DictionaryChunk::string_offset_t;
DictionaryColumn::DictionaryColumn(const std::string& name, FileHandle* dataFH, MemoryManager* mm,
ShadowFile* shadowFile, bool enableCompression) {
auto dataColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::DATA, "");
dataColumn = std::make_unique<Column>(dataColName, LogicalType::UINT8(), dataFH, mm, shadowFile,
false , false );
auto offsetColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::OFFSET, "");
offsetColumn = std::make_unique<Column>(offsetColName, LogicalType::UINT64(), dataFH, mm,
shadowFile, enableCompression, false );
}
void DictionaryColumn::scan(const SegmentState& state, DictionaryChunk& dictChunk) const {
auto offsetChunk = dictChunk.getOffsetChunk();
auto stringDataChunk = dictChunk.getStringDataChunk();
auto initialDictSize = offsetChunk->getNumValues();
auto initialDictDataSize = stringDataChunk->getNumValues();
auto& dataMetadata =
StringColumn::getChildState(state, StringColumn::ChildStateIndex::DATA).metadata;
if (stringDataChunk->getNumValues() + dataMetadata.numValues > stringDataChunk->getCapacity()) {
stringDataChunk->resize(
std::bit_ceil(stringDataChunk->getNumValues() + dataMetadata.numValues));
}
dataColumn->scanSegment(StringColumn::getChildState(state, StringColumn::ChildStateIndex::DATA),
stringDataChunk, 0,
StringColumn::getChildState(state, StringColumn::ChildStateIndex::DATA).metadata.numValues);
auto& offsetMetadata =
StringColumn::getChildState(state, StringColumn::ChildStateIndex::OFFSET).metadata;
if (offsetChunk->getNumValues() + offsetMetadata.numValues > offsetChunk->getCapacity()) {
offsetChunk->resize(std::bit_ceil(offsetChunk->getNumValues() + offsetMetadata.numValues));
}
offsetColumn->scanSegment(
StringColumn::getChildState(state, StringColumn::ChildStateIndex::OFFSET), offsetChunk, 0,
StringColumn::getChildState(state, StringColumn::ChildStateIndex::OFFSET)
.metadata.numValues);
for (row_idx_t i = initialDictSize; i < offsetChunk->getNumValues(); i++) {
offsetChunk->setValue<string_offset_t>(
offsetChunk->getValue<string_offset_t>(i) + initialDictDataSize, i);
}
}
template<typename Result>
void DictionaryColumn::scan(const SegmentState& offsetState, const SegmentState& dataState,
std::vector<std::pair<string_index_t, uint64_t>>& offsetsToScan, Result* result,
const ColumnChunkMetadata& indexMeta) const {
string_index_t firstOffsetToScan = 0, lastOffsetToScan = 0;
auto comp = [](auto pair1, auto pair2) { return pair1.first < pair2.first; };
auto duplicationFactor = (double)offsetState.metadata.numValues / indexMeta.numValues;
if (duplicationFactor <= 0.5) {
std::sort(offsetsToScan.begin(), offsetsToScan.end(), comp);
firstOffsetToScan = offsetsToScan.front().first;
lastOffsetToScan = offsetsToScan.back().first;
} else {
const auto& [min, max] =
std::minmax_element(offsetsToScan.begin(), offsetsToScan.end(), comp);
firstOffsetToScan = min->first;
lastOffsetToScan = max->first;
}
auto numOffsetsToScan = lastOffsetToScan - firstOffsetToScan + 1;
std::vector<string_offset_t> offsets(numOffsetsToScan + 1);
scanOffsets(offsetState, offsets.data(), firstOffsetToScan, numOffsetsToScan,
dataState.metadata.numValues);
if constexpr (std::same_as<Result, ColumnChunkData>) {
auto& offsetChunk = *result->getDictionaryChunk()->getOffsetChunk();
if (offsetChunk.getNumValues() + offsetsToScan.size() > offsetChunk.getCapacity()) {
offsetChunk.resize(std::bit_ceil(offsetChunk.getNumValues() + offsetsToScan.size()));
}
}
for (auto pos = 0u; pos < offsetsToScan.size(); pos++) {
auto startOffset = offsets[offsetsToScan[pos].first - firstOffsetToScan];
auto endOffset = offsets[offsetsToScan[pos].first - firstOffsetToScan + 1];
auto lengthToScan = endOffset - startOffset;
DASSERT(endOffset >= startOffset);
scanValue(dataState, startOffset, lengthToScan, result, offsetsToScan[pos].second);
if constexpr (std::same_as<Result, ValueVector>) {
auto& scannedString = result->template getValue<string_t>(offsetsToScan[pos].second);
while (pos + 1 < offsetsToScan.size() &&
offsetsToScan[pos + 1].first == offsetsToScan[pos].first) {
pos++;
result->template setValue<string_t>(offsetsToScan[pos].second, scannedString);
}
} else {
DASSERT(pos == offsetsToScan.size() - 1 ||
offsetsToScan[pos].first != offsetsToScan[pos + 1].first);
}
}
}
template void DictionaryColumn::scan<common::ValueVector>(const SegmentState& offsetState,
const SegmentState& dataState,
std::vector<std::pair<DictionaryChunk::string_index_t, uint64_t>>& offsetsToScan,
common::ValueVector* result, const ColumnChunkMetadata& indexMeta) const;
template void DictionaryColumn::scan<StringChunkData>(const SegmentState& offsetState,
const SegmentState& dataState,
std::vector<std::pair<DictionaryChunk::string_index_t, uint64_t>>& offsetsToScan,
StringChunkData* result, const ColumnChunkMetadata& indexMeta) const;
string_index_t DictionaryColumn::append(const DictionaryChunk& dictChunk, SegmentState& state,
std::string_view val) const {
const auto startOffset = dataColumn->appendValues(*dictChunk.getStringDataChunk(),
StringColumn::getChildState(state, StringColumn::ChildStateIndex::DATA),
reinterpret_cast<const uint8_t*>(val.data()), nullptr , val.size());
return offsetColumn->appendValues(*dictChunk.getOffsetChunk(),
StringColumn::getChildState(state, StringColumn::ChildStateIndex::OFFSET),
reinterpret_cast<const uint8_t*>(&startOffset), nullptr , 1 );
}
void DictionaryColumn::scanOffsets(const SegmentState& state,
DictionaryChunk::string_offset_t* offsets, uint64_t index, uint64_t numValues,
uint64_t dataSize) const {
if (index + numValues < state.metadata.numValues) {
offsetColumn->scanSegment(state, index, numValues + 1, (uint8_t*)offsets);
} else {
offsetColumn->scanSegment(state, index, numValues, (uint8_t*)offsets);
offsets[numValues] = dataSize;
}
}
void DictionaryColumn::scanValue(const SegmentState& dataState, uint64_t startOffset,
uint64_t length, ValueVector* resultVector, uint64_t offsetInVector) const {
auto& str = StringVector::reserveString(resultVector, offsetInVector, length);
dataColumn->scanSegment(dataState, startOffset, length, (uint8_t*)str.getData());
if (!string_t::isShortString(str.len)) {
memcpy(str.prefix, str.getData(), string_t::PREFIX_LENGTH);
}
}
void DictionaryColumn::scanValue(const SegmentState& dataState, uint64_t startOffset,
uint64_t length, StringChunkData* result, uint64_t offsetInResult) const {
auto& stringDataChunk = *result->getDictionaryChunk().getStringDataChunk();
auto& offsetChunk = *result->getDictionaryChunk().getOffsetChunk();
auto& indexChunk = *result->getIndexColumnChunk();
if (stringDataChunk.getCapacity() < stringDataChunk.getNumValues() + length) {
stringDataChunk.resize(std::bit_ceil(stringDataChunk.getNumValues() + length));
}
if (offsetChunk.getNumValues() == offsetChunk.getCapacity()) {
offsetChunk.resize(std::bit_ceil(offsetChunk.getNumValues() + 1));
}
if (offsetInResult >= indexChunk.getCapacity()) {
indexChunk.resize(std::bit_ceil(offsetInResult + 1));
}
dataColumn->scanSegment(dataState, startOffset, length,
stringDataChunk.getData<uint8_t>() + stringDataChunk.getNumValues());
indexChunk.setValue<string_index_t>(offsetChunk.getNumValues(), offsetInResult);
offsetChunk.setValue<string_offset_t>(stringDataChunk.getNumValues(),
offsetChunk.getNumValues());
stringDataChunk.setNumValues(stringDataChunk.getNumValues() + length);
}
bool DictionaryColumn::canCommitInPlace(const SegmentState& state, uint64_t numNewStrings,
uint64_t totalStringLengthToAdd) const {
if (!canDataCommitInPlace(
StringColumn::getChildState(state, StringColumn::ChildStateIndex::DATA),
totalStringLengthToAdd)) {
return false;
}
if (!canOffsetCommitInPlace(
StringColumn::getChildState(state, StringColumn::ChildStateIndex::OFFSET),
StringColumn::getChildState(state, StringColumn::ChildStateIndex::DATA), numNewStrings,
totalStringLengthToAdd)) {
return false;
}
return true;
}
bool DictionaryColumn::canDataCommitInPlace(const SegmentState& dataState,
uint64_t totalStringLengthToAdd) {
auto totalStringDataAfterUpdate = dataState.metadata.numValues + totalStringLengthToAdd;
if (totalStringDataAfterUpdate > dataState.metadata.getNumPages() * LBUG_PAGE_SIZE) {
return false;
}
return true;
}
bool DictionaryColumn::canOffsetCommitInPlace(const SegmentState& offsetState,
const SegmentState& dataState, uint64_t numNewStrings, uint64_t totalStringLengthToAdd) const {
auto totalStringOffsetsAfterUpdate = dataState.metadata.numValues + totalStringLengthToAdd;
auto offsetCapacity =
offsetState.metadata.compMeta.numValues(LBUG_PAGE_SIZE, offsetColumn->getDataType()) *
offsetState.metadata.getNumPages();
auto numStringsAfterUpdate = offsetState.metadata.numValues + numNewStrings;
if (numStringsAfterUpdate > offsetCapacity) {
return false;
}
if (numStringsAfterUpdate > std::numeric_limits<string_index_t>::max()) [[unlikely]] {
return false;
}
if (offsetState.metadata.compMeta.canAlwaysUpdateInPlace()) {
return true;
}
InPlaceUpdateLocalState localUpdateState{};
if (!offsetState.metadata.compMeta.canUpdateInPlace(
(const uint8_t*)&totalStringOffsetsAfterUpdate, 0 , 1 ,
offsetColumn->getDataType().getPhysicalType(), localUpdateState)) {
return false;
}
return true;
}
} }