#include "storage/table/string_column.h"
#include <algorithm>
#include <unordered_map>
#include "common/assert.h"
#include "common/cast.h"
#include "common/null_mask.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/compression/compression.h"
#include "storage/page_allocator.h"
#include "storage/storage_utils.h"
#include "storage/table/column.h"
#include "storage/table/column_chunk.h"
#include "storage/table/null_column.h"
#include "storage/table/string_chunk_data.h"
using namespace lbug::catalog;
using namespace lbug::common;
namespace lbug {
namespace storage {
using string_index_t = DictionaryChunk::string_index_t;
using string_offset_t = DictionaryChunk::string_offset_t;
StringColumn::StringColumn(std::string name, common::LogicalType dataType, FileHandle* dataFH,
MemoryManager* mm, ShadowFile* shadowFile, bool enableCompression)
: Column{std::move(name), std::move(dataType), dataFH, mm, shadowFile, enableCompression,
true },
dictionary{this->name, dataFH, mm, shadowFile, enableCompression} {
auto indexColumnName =
StorageUtils::getColumnName(this->name, StorageUtils::ColumnType::INDEX, "index");
indexColumn = std::make_unique<Column>(indexColumnName, LogicalType::UINT32(), dataFH, mm,
shadowFile, enableCompression, false );
}
SegmentState& StringColumn::getChildState(SegmentState& state, ChildStateIndex child) {
const auto childIdx = static_cast<idx_t>(child);
return state.getChildState(childIdx);
}
const SegmentState& StringColumn::getChildState(const SegmentState& state, ChildStateIndex child) {
const auto childIdx = static_cast<idx_t>(child);
return state.getChildState(childIdx);
}
std::unique_ptr<ColumnChunkData> StringColumn::flushChunkData(const ColumnChunkData& chunkData,
PageAllocator& pageAllocator) {
auto flushedChunkData = flushNonNestedChunkData(chunkData, pageAllocator);
auto& flushedStringData = flushedChunkData->cast<StringChunkData>();
auto& stringChunk = chunkData.cast<StringChunkData>();
flushedStringData.setIndexChunk(
Column::flushChunkData(*stringChunk.getIndexColumnChunk(), pageAllocator));
auto& dictChunk = stringChunk.getDictionaryChunk();
flushedStringData.getDictionaryChunk().setOffsetChunk(
Column::flushChunkData(*dictChunk.getOffsetChunk(), pageAllocator));
flushedStringData.getDictionaryChunk().setStringDataChunk(
Column::flushChunkData(*dictChunk.getStringDataChunk(), pageAllocator));
return flushedChunkData;
}
void StringColumn::lookupInternal(const SegmentState& state, offset_t nodeOffset,
ValueVector* resultVector, uint32_t posInVector) const {
auto [nodeGroupIdx, offsetInChunk] = StorageUtils::getNodeGroupIdxAndOffsetInChunk(nodeOffset);
string_index_t index = 0;
indexColumn->scanSegment(getChildState(state, ChildStateIndex::INDEX), offsetInChunk, 1,
reinterpret_cast<uint8_t*>(&index));
std::vector<std::pair<string_index_t, uint64_t>> offsetsToScan;
offsetsToScan.emplace_back(index, posInVector);
dictionary.scan(getChildState(state, ChildStateIndex::OFFSET),
getChildState(state, ChildStateIndex::DATA), offsetsToScan, resultVector,
getChildState(state, ChildStateIndex::INDEX).metadata);
}
void StringColumn::writeSegment(ColumnChunkData& persistentChunk, SegmentState& state,
offset_t dstOffsetInSegment, const ColumnChunkData& data, offset_t srcOffset,
length_t numValues) const {
auto& stringPersistentChunk = persistentChunk.cast<StringChunkData>();
numValues = std::min(numValues, data.getNumValues() - srcOffset);
auto& strChunkToWriteFrom = data.cast<StringChunkData>();
std::vector<string_index_t> indices;
indices.resize(numValues);
for (auto i = 0u; i < numValues; i++) {
if (strChunkToWriteFrom.getNullData()->isNull(i + srcOffset)) {
indices[i] = 0;
continue;
}
const auto strVal = strChunkToWriteFrom.getValue<std::string_view>(i + srcOffset);
indices[i] = dictionary.append(persistentChunk.cast<StringChunkData>().getDictionaryChunk(),
state, strVal);
}
NullMask nullMask(numValues);
nullMask.copyFromNullBits(data.getNullData()->getNullMask().getData(), srcOffset,
0 , numValues);
indexColumn->writeValuesInternal(getChildState(state, ChildStateIndex::INDEX),
dstOffsetInSegment, reinterpret_cast<const uint8_t*>(&indices[0]), &nullMask,
0 , numValues);
auto [min, max] = std::minmax_element(indices.begin(), indices.end());
auto minWritten = StorageValue(*min);
auto maxWritten = StorageValue(*max);
updateStatistics(persistentChunk.getMetadata(), dstOffsetInSegment + numValues - 1, minWritten,
maxWritten);
indexColumn->updateStatistics(stringPersistentChunk.getIndexColumnChunk()->getMetadata(),
dstOffsetInSegment + numValues - 1, minWritten, maxWritten);
}
std::vector<std::unique_ptr<ColumnChunkData>> StringColumn::checkpointSegment(
ColumnCheckpointState&& checkpointState, PageAllocator& pageAllocator,
bool canSplitSegment) const {
auto& persistentData = checkpointState.persistentData;
auto result =
Column::checkpointSegment(std::move(checkpointState), pageAllocator, canSplitSegment);
persistentData.syncNumValues();
return result;
}
void StringColumn::scanSegment(const SegmentState& state, offset_t startOffsetInChunk,
row_idx_t numValuesToScan, ValueVector* resultVector, offset_t offsetInResult) const {
if (nullColumn) {
DASSERT(state.nullState);
nullColumn->scanSegment(*state.nullState, startOffsetInChunk, numValuesToScan, resultVector,
offsetInResult);
}
DASSERT(resultVector->dataType.getPhysicalType() == PhysicalTypeID::STRING ||
resultVector->dataType.getPhysicalType() == PhysicalTypeID::JSON);
if (!resultVector->state || resultVector->state->getSelVector().isUnfiltered()) {
scanUnfiltered(state, startOffsetInChunk, numValuesToScan, resultVector, offsetInResult);
} else {
scanFiltered(state, startOffsetInChunk, resultVector, offsetInResult);
}
}
void StringColumn::scanSegment(const SegmentState& state, ColumnChunkData* resultChunk,
common::offset_t startOffsetInSegment, common::row_idx_t numValuesToScan) const {
auto startOffsetInResult = resultChunk->getNumValues();
Column::scanSegment(state, resultChunk, startOffsetInSegment, numValuesToScan);
DASSERT(resultChunk->getDataType().getPhysicalType() == PhysicalTypeID::STRING ||
resultChunk->getDataType().getPhysicalType() == PhysicalTypeID::JSON);
auto* stringResultChunk = dynamic_cast_checked<StringChunkData*>(resultChunk);
stringResultChunk->getIndexColumnChunk()->setNumValues(startOffsetInResult);
auto* indexChunk = stringResultChunk->getIndexColumnChunk();
indexColumn->scanSegment(getChildState(state, ChildStateIndex::INDEX), indexChunk,
startOffsetInSegment, numValuesToScan);
const auto initialDictSize =
stringResultChunk->getDictionaryChunk().getOffsetChunk()->getNumValues();
if (numValuesToScan == state.metadata.numValues) {
for (row_idx_t i = 0; i < numValuesToScan; i++) {
indexChunk->setValue<string_index_t>(
indexChunk->getValue<string_index_t>(startOffsetInResult + i) + initialDictSize,
startOffsetInResult + i);
}
dictionary.scan(state, stringResultChunk->getDictionaryChunk());
} else {
std::unordered_map<string_index_t, uint64_t> indexMap;
std::vector<std::pair<string_index_t, uint64_t>> offsetsToScan;
for (auto i = 0u; i < numValuesToScan; i++) {
if (!resultChunk->isNull(startOffsetInResult + i)) {
auto index = indexChunk->getValue<string_index_t>(startOffsetInResult + i);
auto element = indexMap.find(index);
if (element == indexMap.end()) {
indexMap.insert(std::make_pair(index, initialDictSize + offsetsToScan.size()));
indexChunk->setValue<string_index_t>(initialDictSize + offsetsToScan.size(),
startOffsetInResult + i);
offsetsToScan.emplace_back(index, initialDictSize + offsetsToScan.size());
} else {
indexChunk->setValue<string_index_t>(element->second, startOffsetInResult + i);
}
}
}
if (offsetsToScan.size() == 0) {
return;
}
dictionary.scan(getChildState(state, ChildStateIndex::OFFSET),
getChildState(state, ChildStateIndex::DATA), offsetsToScan, stringResultChunk,
getChildState(state, ChildStateIndex::INDEX).metadata);
}
DASSERT(resultChunk->getNumValues() == startOffsetInResult + numValuesToScan &&
stringResultChunk->getIndexColumnChunk()->getNumValues() ==
startOffsetInResult + numValuesToScan);
RUNTIME_CHECK({
auto dictionarySize =
stringResultChunk->getDictionaryChunk().getOffsetChunk()->getNumValues();
auto indexSize = stringResultChunk->getIndexColumnChunk()->getNumValues();
for (offset_t i = 0; i < indexSize; i++) {
if (!stringResultChunk->isNull(i)) {
auto stringIndex =
stringResultChunk->getIndexColumnChunk()->getValue<string_index_t>(i);
DASSERT(stringIndex < dictionarySize);
}
}
});
}
void StringColumn::scanUnfiltered(const SegmentState& state, offset_t startOffsetInChunk,
offset_t numValuesToRead, ValueVector* resultVector, sel_t startPosInVector) const {
auto indices = std::make_unique<string_index_t[]>(numValuesToRead);
indexColumn->scanSegment(getChildState(state, ChildStateIndex::INDEX), startOffsetInChunk,
numValuesToRead, reinterpret_cast<uint8_t*>(indices.get()));
std::vector<std::pair<string_index_t, uint64_t>> offsetsToScan;
for (auto i = 0u; i < numValuesToRead; i++) {
if (!resultVector->isNull(startPosInVector + i)) {
offsetsToScan.emplace_back(indices[i], startPosInVector + i);
}
}
if (offsetsToScan.size() == 0) {
return;
}
dictionary.scan(getChildState(state, ChildStateIndex::OFFSET),
getChildState(state, ChildStateIndex::DATA), offsetsToScan, resultVector,
getChildState(state, ChildStateIndex::INDEX).metadata);
}
void StringColumn::scanFiltered(const SegmentState& state, offset_t startOffsetInChunk,
ValueVector* resultVector, offset_t offsetInResult) const {
std::vector<std::pair<string_index_t, uint64_t>> offsetsToScan;
for (sel_t i = 0; i < resultVector->state->getSelVector().getSelSize(); i++) {
const auto pos = resultVector->state->getSelVector()[i];
if (pos >= offsetInResult && startOffsetInChunk + pos < state.metadata.numValues &&
!resultVector->isNull(pos)) {
const auto offsetInGroup = startOffsetInChunk + pos - offsetInResult;
string_index_t index = 0;
indexColumn->scanSegment(getChildState(state, ChildStateIndex::INDEX), offsetInGroup, 1,
reinterpret_cast<uint8_t*>(&index));
offsetsToScan.emplace_back(index, pos);
}
}
if (offsetsToScan.size() == 0) {
return;
}
dictionary.scan(getChildState(state, ChildStateIndex::OFFSET),
getChildState(state, ChildStateIndex::DATA), offsetsToScan, resultVector,
getChildState(state, ChildStateIndex::INDEX).metadata);
}
bool StringColumn::canCheckpointInPlace(const SegmentState& state,
const ColumnCheckpointState& checkpointState) const {
row_idx_t strLenToAdd = 0u;
idx_t numStrings = 0u;
for (auto& segmentCheckpointState : checkpointState.segmentCheckpointStates) {
auto& strChunk = segmentCheckpointState.chunkData.cast<StringChunkData>();
numStrings += segmentCheckpointState.numRows;
for (auto i = 0u; i < segmentCheckpointState.numRows; i++) {
if (strChunk.getNullData()->isNull(segmentCheckpointState.startRowInData + i)) {
continue;
}
strLenToAdd += strChunk.getStringLength(segmentCheckpointState.startRowInData + i);
}
}
if (!dictionary.canCommitInPlace(state, numStrings, strLenToAdd)) {
return false;
}
return canIndexCommitInPlace(state, numStrings, checkpointState.endRowIdxToWrite);
}
bool StringColumn::canIndexCommitInPlace(const SegmentState& state, uint64_t numStrings,
offset_t maxOffset) const {
const SegmentState& indexState = getChildState(state, ChildStateIndex::INDEX);
if (indexColumn->isEndOffsetOutOfPagesCapacity(indexState.metadata, maxOffset)) {
return false;
}
if (indexState.metadata.compMeta.canAlwaysUpdateInPlace()) {
return true;
}
const auto totalStringsAfterUpdate =
getChildState(state, ChildStateIndex::OFFSET).metadata.numValues + numStrings;
InPlaceUpdateLocalState localUpdateState{};
if (!indexState.metadata.compMeta.canUpdateInPlace(
reinterpret_cast<const uint8_t*>(&totalStringsAfterUpdate), 0 , 1 ,
PhysicalTypeID::UINT32, localUpdateState)) {
return false;
}
return true;
}
} }