#pragma once
#include <optional>
#include <utility>
#include "common/assert.h"
#include "common/cast.h"
#include "common/copy_constructors.h"
#include "common/data_chunk/data_chunk_state.h"
#include "common/null_mask.h"
#include "common/types/string_t.h"
#include "common/vector/auxiliary_buffer.h"
namespace lbug {
namespace common {
class Value;
class LBUG_API ValueVector {
friend class ListVector;
friend class ListAuxiliaryBuffer;
friend class StructVector;
friend class StringVector;
friend class ArrowColumnVector;
public:
explicit ValueVector(LogicalType dataType, storage::MemoryManager* memoryManager = nullptr,
std::shared_ptr<DataChunkState> dataChunkState = nullptr);
explicit ValueVector(LogicalTypeID dataTypeID, storage::MemoryManager* memoryManager = nullptr)
: ValueVector(LogicalType(dataTypeID), memoryManager) {
DASSERT(dataTypeID != LogicalTypeID::LIST);
}
DELETE_COPY_AND_MOVE(ValueVector);
~ValueVector() = default;
template<typename T>
std::optional<T> firstNonNull() const {
sel_t selectedSize = state->getSelSize();
if (selectedSize == 0) {
return std::nullopt;
}
if (hasNoNullsGuarantee()) {
return getValue<T>(state->getSelVector()[0]);
} else {
for (size_t i = 0; i < selectedSize; i++) {
auto pos = state->getSelVector()[i];
if (!isNull(pos)) {
return std::make_optional(getValue<T>(pos));
}
}
}
return std::nullopt;
}
template<class Func>
void forEachNonNull(Func&& func) const {
if (hasNoNullsGuarantee()) {
state->getSelVector().forEach(func);
} else {
state->getSelVector().forEach([&](auto i) {
if (!isNull(i)) {
func(i);
}
});
}
}
uint32_t countNonNull() const;
void setState(const std::shared_ptr<DataChunkState>& state_);
void setAllNull() { nullMask.setAllNull(); }
void setAllNonNull() { nullMask.setAllNonNull(); }
bool hasNoNullsGuarantee() const { return nullMask.hasNoNullsGuarantee(); }
void setNullRange(uint32_t startPos, uint32_t len, bool value) {
nullMask.setNullFromRange(startPos, len, value);
}
const NullMask& getNullMask() const { return nullMask; }
void setNull(uint32_t pos, bool isNull);
uint8_t isNull(uint32_t pos) const { return nullMask.isNull(pos); }
void setAsSingleNullEntry() {
state->getSelVectorUnsafe().setSelSize(1);
setNull(state->getSelVector()[0], true);
}
bool setNullFromBits(const uint64_t* srcNullEntries, uint64_t srcOffset, uint64_t dstOffset,
uint64_t numBitsToCopy, bool invert = false);
uint32_t getNumBytesPerValue() const { return numBytesPerValue; }
template<typename T>
const T& getValue(uint32_t pos) const {
return ((T*)valueBuffer.get())[pos];
}
template<typename T>
T& getValue(uint32_t pos) {
return ((T*)valueBuffer.get())[pos];
}
template<typename T>
void setValue(uint32_t pos, T val);
void copyFromRowData(uint32_t pos, const uint8_t* rowData);
void copyToRowData(uint32_t pos, uint8_t* rowData,
InMemOverflowBuffer* rowOverflowBuffer) const;
void copyFromVectorData(uint8_t* dstData, const ValueVector* srcVector,
const uint8_t* srcVectorData);
void copyFromVectorData(uint64_t dstPos, const ValueVector* srcVector, uint64_t srcPos);
void copyFromValue(uint64_t pos, const Value& value);
std::unique_ptr<Value> getAsValue(uint64_t pos) const;
uint8_t* getData() const { return valueBuffer.get(); }
offset_t readNodeOffset(uint32_t pos) const {
DASSERT(dataType.getLogicalTypeID() == LogicalTypeID::INTERNAL_ID);
return getValue<nodeID_t>(pos).offset;
}
void resetAuxiliaryBuffer();
static bool discardNull(ValueVector& vector);
void serialize(Serializer& ser) const;
static std::unique_ptr<ValueVector> deSerialize(Deserializer& deSer, storage::MemoryManager* mm,
std::shared_ptr<DataChunkState> dataChunkState);
SelectionVector* getSelVectorPtr() const {
return state ? &state->getSelVectorUnsafe() : nullptr;
}
private:
uint32_t getDataTypeSize(const LogicalType& type);
void initializeValueBuffer();
public:
LogicalType dataType;
std::shared_ptr<DataChunkState> state;
private:
std::unique_ptr<uint8_t[]> valueBuffer;
NullMask nullMask;
uint32_t numBytesPerValue;
std::unique_ptr<AuxiliaryBuffer> auxiliaryBuffer;
};
class LBUG_API StringVector {
public:
static inline InMemOverflowBuffer* getInMemOverflowBuffer(ValueVector* vector) {
DASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING ||
vector->dataType.getPhysicalType() == PhysicalTypeID::JSON);
return dynamic_cast_checked<StringAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->getOverflowBuffer();
}
static void addString(ValueVector* vector, uint32_t vectorPos, string_t& srcStr);
static void addString(ValueVector* vector, uint32_t vectorPos, const char* srcStr,
uint64_t length);
static void addString(ValueVector* vector, uint32_t vectorPos, std::string_view srcStr);
static string_t& reserveString(ValueVector* vector, uint32_t vectorPos, uint64_t length);
static void reserveString(ValueVector* vector, string_t& dstStr, uint64_t length);
static void addString(ValueVector* vector, string_t& dstStr, string_t& srcStr);
static void addString(ValueVector* vector, string_t& dstStr, const char* srcStr,
uint64_t length);
static void addString(lbug::common::ValueVector* vector, string_t& dstStr,
const std::string& srcStr);
static void copyToRowData(const ValueVector* vector, uint32_t pos, uint8_t* rowData,
InMemOverflowBuffer* rowOverflowBuffer);
};
struct LBUG_API BlobVector {
static void addBlob(ValueVector* vector, uint32_t pos, const char* data, uint32_t length) {
StringVector::addString(vector, pos, data, length);
} static void addBlob(ValueVector* vector, uint32_t pos, const uint8_t* data, uint64_t length) {
StringVector::addString(vector, pos, reinterpret_cast<const char*>(data), length);
}
};
class LBUG_API ListVector {
public:
static const ListAuxiliaryBuffer& getAuxBuffer(const ValueVector& vector) {
return vector.auxiliaryBuffer->constCast<ListAuxiliaryBuffer>();
}
static ListAuxiliaryBuffer& getAuxBufferUnsafe(const ValueVector& vector) {
return vector.auxiliaryBuffer->cast<ListAuxiliaryBuffer>();
}
static void setDataVector(const ValueVector* vector, std::shared_ptr<ValueVector> dataVector) {
DASSERT(validateType(*vector));
auto& listBuffer = getAuxBufferUnsafe(*vector);
listBuffer.setDataVector(std::move(dataVector));
}
static void copyListEntryAndBufferMetaData(ValueVector& vector,
const SelectionVector& selVector, const ValueVector& other,
const SelectionVector& otherSelVector);
static ValueVector* getDataVector(const ValueVector* vector) {
DASSERT(validateType(*vector));
return getAuxBuffer(*vector).getDataVector();
}
static std::shared_ptr<ValueVector> getSharedDataVector(const ValueVector* vector) {
DASSERT(validateType(*vector));
return getAuxBuffer(*vector).getSharedDataVector();
}
static uint64_t getDataVectorSize(const ValueVector* vector) {
DASSERT(validateType(*vector));
return getAuxBuffer(*vector).getSize();
}
static uint8_t* getListValues(const ValueVector* vector, const list_entry_t& listEntry) {
DASSERT(validateType(*vector));
auto dataVector = getDataVector(vector);
return dataVector->getData() + dataVector->getNumBytesPerValue() * listEntry.offset;
}
static uint8_t* getListValuesWithOffset(const ValueVector* vector,
const list_entry_t& listEntry, offset_t elementOffsetInList) {
DASSERT(validateType(*vector));
return getListValues(vector, listEntry) +
elementOffsetInList * getDataVector(vector)->getNumBytesPerValue();
}
static list_entry_t addList(ValueVector* vector, uint64_t listSize) {
DASSERT(validateType(*vector));
return getAuxBufferUnsafe(*vector).addList(listSize);
}
static void resizeDataVector(ValueVector* vector, uint64_t numValues) {
DASSERT(validateType(*vector));
getAuxBufferUnsafe(*vector).resize(numValues);
}
static void copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData);
static void copyToRowData(const ValueVector* vector, uint32_t pos, uint8_t* rowData,
InMemOverflowBuffer* rowOverflowBuffer);
static void copyFromVectorData(ValueVector* dstVector, uint8_t* dstData,
const ValueVector* srcVector, const uint8_t* srcData);
static void appendDataVector(ValueVector* dstVector, ValueVector* srcDataVector,
uint64_t numValuesToAppend);
static void sliceDataVector(ValueVector* vectorToSlice, uint64_t offset, uint64_t numValues);
private:
static bool validateType(const ValueVector& vector) {
switch (vector.dataType.getPhysicalType()) {
case PhysicalTypeID::LIST:
case PhysicalTypeID::ARRAY:
return true;
default:
return false;
}
}
};
class StructVector {
public:
static const std::vector<std::shared_ptr<ValueVector>>& getFieldVectors(
const ValueVector* vector) {
return dynamic_cast_checked<StructAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->getFieldVectors();
}
static std::shared_ptr<ValueVector> getFieldVector(const ValueVector* vector,
struct_field_idx_t idx) {
return dynamic_cast_checked<StructAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->getFieldVectorShared(idx);
}
static ValueVector* getFieldVectorRaw(const ValueVector& vector, const std::string& fieldName) {
auto idx = StructType::getFieldIdx(vector.dataType, fieldName);
return dynamic_cast_checked<StructAuxiliaryBuffer*>(vector.auxiliaryBuffer.get())
->getFieldVectorPtr(idx);
}
static void referenceVector(ValueVector* vector, struct_field_idx_t idx,
std::shared_ptr<ValueVector> vectorToReference) {
dynamic_cast_checked<StructAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->referenceChildVector(idx, std::move(vectorToReference));
}
static void copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData);
static void copyToRowData(const ValueVector* vector, uint32_t pos, uint8_t* rowData,
InMemOverflowBuffer* rowOverflowBuffer);
static void copyFromVectorData(ValueVector* dstVector, const uint8_t* dstData,
const ValueVector* srcVector, const uint8_t* srcData);
};
class UnionVector {
public:
static inline ValueVector* getTagVector(const ValueVector* vector) {
DASSERT(vector->dataType.getLogicalTypeID() == LogicalTypeID::UNION);
return StructVector::getFieldVector(vector, UnionType::TAG_FIELD_IDX).get();
}
static inline ValueVector* getValVector(const ValueVector* vector, union_field_idx_t fieldIdx) {
DASSERT(vector->dataType.getLogicalTypeID() == LogicalTypeID::UNION);
return StructVector::getFieldVector(vector, UnionType::getInternalFieldIdx(fieldIdx)).get();
}
static inline std::shared_ptr<ValueVector> getSharedValVector(const ValueVector* vector,
union_field_idx_t fieldIdx) {
DASSERT(vector->dataType.getLogicalTypeID() == LogicalTypeID::UNION);
return StructVector::getFieldVector(vector, UnionType::getInternalFieldIdx(fieldIdx));
}
static inline void referenceVector(ValueVector* vector, union_field_idx_t fieldIdx,
std::shared_ptr<ValueVector> vectorToReference) {
StructVector::referenceVector(vector, UnionType::getInternalFieldIdx(fieldIdx),
std::move(vectorToReference));
}
static inline void setTagField(ValueVector& vector, SelectionVector& sel,
union_field_idx_t tag) {
DASSERT(vector.dataType.getLogicalTypeID() == LogicalTypeID::UNION);
for (auto i = 0u; i < sel.getSelSize(); i++) {
vector.setValue<struct_field_idx_t>(sel[i], tag);
}
}
};
class MapVector {
public:
static inline ValueVector* getKeyVector(const ValueVector* vector) {
return StructVector::getFieldVector(ListVector::getDataVector(vector), 0 )
.get();
}
static inline ValueVector* getValueVector(const ValueVector* vector) {
return StructVector::getFieldVector(ListVector::getDataVector(vector), 1 )
.get();
}
static inline uint8_t* getMapKeys(const ValueVector* vector, const list_entry_t& listEntry) {
auto keyVector = getKeyVector(vector);
return keyVector->getData() + keyVector->getNumBytesPerValue() * listEntry.offset;
}
static inline uint8_t* getMapValues(const ValueVector* vector, const list_entry_t& listEntry) {
auto valueVector = getValueVector(vector);
return valueVector->getData() + valueVector->getNumBytesPerValue() * listEntry.offset;
}
};
} }