#include "common/arrow/arrow_converter.h"
#include "common/exception/runtime.h"
#include "common/types/int128_t.h"
#include "common/types/interval_t.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "function/cast/functions/numeric_limits.h"
namespace lbug {
namespace common {
template<typename Func>
static void rowIter(const ValueVector& outputVector, uint64_t count, Func&& func) {
if (outputVector.state != nullptr && !outputVector.state->getSelVector().isUnfiltered()) {
outputVector.state->getSelVector().forEach(func);
} else {
for (uint64_t i = 0; i < count; i++) {
func(i);
}
}
}
template<typename T>
static void scanArrowArrayFixedSizePrimitive(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer = (const T*)array->buffers[1];
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curValue = arrayBuffer[i + srcOffset];
outputVector.setValue<T>(i + dstOffset, curValue);
}
});
}
template<typename SRC, typename DST>
static void scanArrowArrayFixedSizePrimitiveAndCastTo(const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto arrayBuffer = (const SRC*)array->buffers[1];
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curValue = arrayBuffer[i + srcOffset];
outputVector.setValue<DST>(i + dstOffset, (DST)curValue);
}
});
}
template<>
void scanArrowArrayFixedSizePrimitive<bool>(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer = (const uint8_t*)array->buffers[1];
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
outputVector.setValue<bool>(i + dstOffset,
NullMask::isNull((const uint64_t*)arrayBuffer, i + srcOffset));
});
}
static void scanArrowArrayDurationScaledUp(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, int64_t scaleFactor, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset;
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curValue = arrayBuffer[i];
outputVector.setValue<interval_t>(i + dstOffset,
interval_t(0, 0, curValue * scaleFactor));
}
});
}
static void scanArrowArrayDurationScaledDown(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, int64_t scaleFactor, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset;
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curValue = arrayBuffer[i];
outputVector.setValue<interval_t>(i + dstOffset,
interval_t(0, 0, curValue / scaleFactor));
}
});
}
static void scanArrowArrayMonthInterval(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer = ((const int32_t*)array->buffers[1]) + srcOffset;
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curValue = arrayBuffer[i];
outputVector.setValue<interval_t>(i + dstOffset, interval_t(curValue, 0, 0));
}
});
}
static void scanArrowArrayDayTimeInterval(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer = ((const int64_t*)array->buffers[1]) + srcOffset;
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
int64_t curValue = arrayBuffer[i];
int32_t day = curValue;
int64_t micros = (curValue >> (4 * sizeof(int64_t))) * 1000;
outputVector.setValue<interval_t>(i + dstOffset, interval_t(0, day, micros));
}
});
}
static void scanArrowArrayMonthDayNanoInterval(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer =
(const int64_t*)((const uint8_t*)array->buffers[1] + srcOffset * 16);
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
int64_t curValue = arrayBuffer[2 * i];
int32_t month = curValue;
int32_t day = curValue >> (4 * sizeof(int64_t));
int64_t micros = arrayBuffer[2 * i + 1] / 1000;
outputVector.setValue<interval_t>(i + dstOffset, interval_t(month, day, micros));
}
});
}
template<typename offsetsT>
static void scanArrowArrayBLOB(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset;
auto arrayBuffer = (const uint8_t*)array->buffers[2];
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curOffset = offsets[i], nextOffset = offsets[i + 1];
const uint8_t* data = arrayBuffer + curOffset;
auto length = nextOffset - curOffset;
BlobVector::addBlob(&outputVector, i + dstOffset, data, length);
}
});
}
static void scanArrowArrayBLOBView(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset, uint64_t count) {
auto arrayBuffer = (const uint8_t*)(array->buffers[1]);
auto valueBuffs = (const uint8_t**)(array->buffers + 2);
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curView = (const int32_t*)(arrayBuffer + (i + srcOffset) * 16);
auto viewLength = curView[0];
if (viewLength <= 12) {
BlobVector::addBlob(&outputVector, i + dstOffset, (uint8_t*)(curView + 1),
viewLength);
} else {
auto bufIndex = curView[2];
auto offset = curView[3];
BlobVector::addBlob(&outputVector, i + dstOffset, valueBuffs[bufIndex] + offset,
viewLength);
}
}
});
}
static void scanArrowArrayFixedBLOB(const ArrowArray* array, ValueVector& outputVector,
ArrowNullMaskTree* mask, int64_t BLOBsize, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto arrayBuffer = ((const uint8_t*)array->buffers[1]) + srcOffset * BLOBsize;
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
BlobVector::addBlob(&outputVector, i + dstOffset, arrayBuffer + i * BLOBsize, BLOBsize);
}
});
}
template<typename offsetsT>
static void scanArrowArrayList(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset;
uint64_t auxDstPosition = function::NumericLimits<uint64_t>::maximum();
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
auto curOffset = offsets[i], nextOffset = offsets[i + 1];
auto newEntry = ListVector::addList(&outputVector, nextOffset - curOffset);
outputVector.setValue<list_entry_t>(i + dstOffset, newEntry);
if (auxDstPosition == function::NumericLimits<uint64_t>::maximum()) {
auxDstPosition = newEntry.offset;
}
});
if (auxDstPosition == function::NumericLimits<uint64_t>::maximum()) {
auxDstPosition = 0;
}
ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector);
ArrowConverter::fromArrowArray(schema->children[0], array->children[0], *auxiliaryBuffer,
mask->getChild(0), offsets[0] + array->children[0]->offset, auxDstPosition,
offsets[count] - offsets[0]);
}
template<typename offsetsT>
static void scanArrowArrayListView(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto offsets = ((const offsetsT*)array->buffers[1]) + srcOffset;
auto sizes = ((const offsetsT*)array->buffers[2]) + srcOffset;
ValueVector* auxiliaryBuffer = ListVector::getDataVector(&outputVector);
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto curOffset = offsets[i], size = sizes[i];
auto newEntry = ListVector::addList(&outputVector, size);
outputVector.setValue<list_entry_t>(i + dstOffset, newEntry);
ArrowNullMaskTree childTree(schema->children[0], array->children[0], srcOffset, count);
ArrowConverter::fromArrowArray(schema->children[0], array->children[0],
*auxiliaryBuffer, &childTree, curOffset, newEntry.offset, newEntry.size);
}
});
}
static void scanArrowArrayFixedList(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto numElements = ArrayType::getNumElements(outputVector.dataType);
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
auto newEntry = ListVector::addList(&outputVector, numElements);
outputVector.setValue<list_entry_t>(i + dstOffset, newEntry);
});
auto auxiliaryBuffer = ListVector::getDataVector(&outputVector);
ArrowConverter::fromArrowArray(schema->children[0], array->children[0], *auxiliaryBuffer,
mask->getChild(0), srcOffset * numElements + array->children[0]->offset,
dstOffset * numElements, count * numElements);
}
static void scanArrowArrayStruct(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
outputVector.setValue<int64_t>(i + dstOffset,
i + dstOffset); }
});
for (int64_t j = 0; j < schema->n_children; j++) {
ArrowConverter::fromArrowArray(schema->children[j], array->children[j],
*StructVector::getFieldVector(&outputVector, j).get(), mask->getChild(j),
srcOffset + array->children[j]->offset, dstOffset, count);
}
}
static void scanArrowArrayDenseUnion(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto types = ((const uint8_t*)array->buffers[0]) + srcOffset;
auto dstTypes = (uint16_t*)UnionVector::getTagVector(&outputVector)->getData();
auto offsets = ((const int32_t*)array->buffers[1]) + srcOffset;
std::vector<int32_t> firstIncident(array->n_children, INT32_MAX);
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
auto curType = types[i];
auto curOffset = offsets[i];
if (curOffset < firstIncident[curType]) {
firstIncident[curType] = curOffset;
}
if (!mask->isNull(i)) {
dstTypes[i] = curType;
auto childOffset =
mask->getChild(curType)->offsetBy(curOffset - firstIncident[curType]);
ArrowConverter::fromArrowArray(schema->children[curType], array->children[curType],
*UnionVector::getValVector(&outputVector, curType), &childOffset,
curOffset + array->children[curType]->offset, i + dstOffset, 1);
}
});
}
static void scanArrowArraySparseUnion(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto types = ((const uint8_t*)array->buffers[0]) + srcOffset;
auto dstTypes = (uint16_t*)UnionVector::getTagVector(&outputVector)->getData();
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
dstTypes[i] = types[i];
}
});
for (int8_t i = 0; i < array->n_children; i++) {
ArrowConverter::fromArrowArray(schema->children[i], array->children[i],
*UnionVector::getValVector(&outputVector, i), mask->getChild(i),
srcOffset + array->children[i]->offset, dstOffset, count);
}
}
template<typename offsetsT>
static void scanArrowArrayDictionaryEncoded(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
auto values = ((const offsetsT*)array->buffers[1]) + srcOffset;
mask->copyToValueVector(&outputVector, dstOffset, count);
rowIter(outputVector, count, [&](auto i) {
if (!mask->isNull(i)) {
auto dictOffseted = mask->getDictionary()->offsetBy(values[i]);
ArrowConverter::fromArrowArray(schema->dictionary, array->dictionary, outputVector,
&dictOffseted, values[i] + array->dictionary->offset, i + dstOffset,
1); }
});
}
static void scanArrowArrayRunEndEncoded(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
const ArrowArray* runEndArray = array->children[0];
auto runEndBuffer = (const uint32_t*)runEndArray->buffers[1];
auto runEndIdx = runEndArray->offset;
{
auto L = runEndArray->offset, H = L + runEndArray->length;
while (H >= L) {
auto M = (H + L) >> 1;
if (runEndBuffer[M] < srcOffset) {
runEndIdx = M;
H = M - 1;
} else {
L = M + 1;
}
}
}
rowIter(outputVector, count, [&](auto i) {
while (i + srcOffset >= runEndBuffer[runEndIdx + 1]) {
runEndIdx++;
}
auto valuesOffseted = mask->getChild(1)->offsetBy(runEndIdx);
ArrowConverter::fromArrowArray(schema->children[1], array->children[1], outputVector,
&valuesOffseted, runEndIdx, i + dstOffset,
1); });
}
void ArrowConverter::fromArrowArray(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector, ArrowNullMaskTree* mask, uint64_t srcOffset, uint64_t dstOffset,
uint64_t count) {
const auto arrowType = schema->format;
if (array->dictionary != nullptr) {
switch (arrowType[0]) {
case 'c':
return scanArrowArrayDictionaryEncoded<int8_t>(schema, array, outputVector, mask,
srcOffset, dstOffset, count);
case 'C':
return scanArrowArrayDictionaryEncoded<uint8_t>(schema, array, outputVector, mask,
srcOffset, dstOffset, count);
case 's':
return scanArrowArrayDictionaryEncoded<int16_t>(schema, array, outputVector, mask,
srcOffset, dstOffset, count);
case 'S':
return scanArrowArrayDictionaryEncoded<uint16_t>(schema, array, outputVector, mask,
srcOffset, dstOffset, count);
case 'i':
return scanArrowArrayDictionaryEncoded<int32_t>(schema, array, outputVector, mask,
srcOffset, dstOffset, count);
case 'I':
return scanArrowArrayDictionaryEncoded<uint32_t>(schema, array, outputVector, mask,
srcOffset, dstOffset, count);
case 'l':
return scanArrowArrayDictionaryEncoded<int64_t>(schema, array, outputVector, mask,
srcOffset, dstOffset, count);
case 'L':
return scanArrowArrayDictionaryEncoded<uint64_t>(schema, array, outputVector, mask,
srcOffset, dstOffset, count);
default:
throw RuntimeException("Invalid Index Type: " + std::string(arrowType));
}
}
switch (arrowType[0]) {
case 'n':
outputVector.setAllNull();
return;
case 'b':
return scanArrowArrayFixedSizePrimitive<bool>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'c':
return scanArrowArrayFixedSizePrimitive<int8_t>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'C':
return scanArrowArrayFixedSizePrimitive<uint8_t>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 's':
return scanArrowArrayFixedSizePrimitive<int16_t>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'S':
return scanArrowArrayFixedSizePrimitive<uint16_t>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'i':
return scanArrowArrayFixedSizePrimitive<int32_t>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'I':
return scanArrowArrayFixedSizePrimitive<uint32_t>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'l':
return scanArrowArrayFixedSizePrimitive<int64_t>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'L':
return scanArrowArrayFixedSizePrimitive<uint64_t>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'f':
return scanArrowArrayFixedSizePrimitive<float>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'g':
return scanArrowArrayFixedSizePrimitive<double>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'z':
return scanArrowArrayBLOB<int32_t>(array, outputVector, mask, srcOffset, dstOffset, count);
case 'Z':
return scanArrowArrayBLOB<int64_t>(array, outputVector, mask, srcOffset, dstOffset, count);
case 'u':
return scanArrowArrayBLOB<int32_t>(array, outputVector, mask, srcOffset, dstOffset, count);
case 'U':
return scanArrowArrayBLOB<int64_t>(array, outputVector, mask, srcOffset, dstOffset, count);
case 'v':
switch (arrowType[1]) {
case 'z':
case 'u':
return scanArrowArrayBLOBView(array, outputVector, mask, srcOffset, dstOffset, count);
default:
UNREACHABLE_CODE;
}
case 'd': {
switch (outputVector.dataType.getPhysicalType()) {
case PhysicalTypeID::INT16:
return scanArrowArrayFixedSizePrimitiveAndCastTo<int128_t, int16_t>(array, outputVector,
mask, srcOffset, dstOffset, count);
case PhysicalTypeID::INT32:
return scanArrowArrayFixedSizePrimitiveAndCastTo<int128_t, int32_t>(array, outputVector,
mask, srcOffset, dstOffset, count);
case PhysicalTypeID::INT64:
return scanArrowArrayFixedSizePrimitiveAndCastTo<int128_t, int64_t>(array, outputVector,
mask, srcOffset, dstOffset, count);
case PhysicalTypeID::INT128:
return scanArrowArrayFixedSizePrimitive<int128_t>(array, outputVector, mask, srcOffset,
dstOffset, count);
default:
UNREACHABLE_CODE;
}
}
case 'w':
return scanArrowArrayFixedBLOB(array, outputVector, mask, std::stoi(arrowType + 2),
srcOffset, dstOffset, count);
case 't':
switch (arrowType[1]) {
case 'd':
if (arrowType[2] == 'D') {
return scanArrowArrayFixedSizePrimitive<int32_t>(array, outputVector, mask,
srcOffset, dstOffset, count);
} else {
return scanArrowArrayFixedSizePrimitive<int64_t>(array, outputVector, mask,
srcOffset, dstOffset, count);
}
case 't':
UNREACHABLE_CODE;
case 's':
return scanArrowArrayFixedSizePrimitive<int64_t>(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'D':
switch (arrowType[2]) {
case 's':
return scanArrowArrayDurationScaledUp(array, outputVector, mask, 1000000, srcOffset,
dstOffset, count);
case 'm':
return scanArrowArrayDurationScaledUp(array, outputVector, mask, 1000, srcOffset,
dstOffset, count);
case 'u':
return scanArrowArrayDurationScaledUp(array, outputVector, mask, 1, srcOffset,
dstOffset, count);
case 'n':
return scanArrowArrayDurationScaledDown(array, outputVector, mask, 1000, srcOffset,
dstOffset, count);
default:
UNREACHABLE_CODE;
}
case 'i':
switch (arrowType[2]) {
case 'M':
return scanArrowArrayMonthInterval(array, outputVector, mask, srcOffset, dstOffset,
count);
case 'D':
return scanArrowArrayDayTimeInterval(array, outputVector, mask, srcOffset,
dstOffset, count);
case 'n':
return scanArrowArrayMonthDayNanoInterval(array, outputVector, mask, srcOffset,
dstOffset, count);
default:
UNREACHABLE_CODE;
}
default:
UNREACHABLE_CODE;
}
case '+':
switch (arrowType[1]) {
case 'r':
return scanArrowArrayRunEndEncoded(schema, array, outputVector, mask, srcOffset,
dstOffset, count);
case 'l':
return scanArrowArrayList<int32_t>(schema, array, outputVector, mask, srcOffset,
dstOffset, count);
case 'L':
return scanArrowArrayList<int64_t>(schema, array, outputVector, mask, srcOffset,
dstOffset, count);
case 'w': {
RUNTIME_CHECK({
auto arrowNumElements = std::stoul(arrowType + 3);
auto outputNumElements = ArrayType::getNumElements(outputVector.dataType);
DASSERT(arrowNumElements == outputNumElements);
});
return scanArrowArrayFixedList(schema, array, outputVector, mask, srcOffset, dstOffset,
count);
}
case 's':
return scanArrowArrayStruct(schema, array, outputVector, mask, srcOffset, dstOffset,
count);
case 'm':
return scanArrowArrayList<int32_t>(schema, array, outputVector, mask, srcOffset,
dstOffset, count);
case 'u':
if (arrowType[2] == 'd') {
return scanArrowArrayDenseUnion(schema, array, outputVector, mask, srcOffset,
dstOffset, count);
} else {
return scanArrowArraySparseUnion(schema, array, outputVector, mask, srcOffset,
dstOffset, count);
}
case 'v':
switch (arrowType[2]) {
case 'l':
return scanArrowArrayListView<int32_t>(schema, array, outputVector, mask, srcOffset,
dstOffset, count);
case 'L':
return scanArrowArrayListView<int64_t>(schema, array, outputVector, mask, srcOffset,
dstOffset, count);
default:
UNREACHABLE_CODE;
}
default:
UNREACHABLE_CODE;
}
default:
UNREACHABLE_CODE;
}
}
void ArrowConverter::fromArrowArray(const ArrowSchema* schema, const ArrowArray* array,
ValueVector& outputVector) {
ArrowNullMaskTree mask(schema, array, array->offset, array->length);
return fromArrowArray(schema, array, outputVector, &mask, array->offset, 0, array->length);
}
} }