#include "storage/table/in_memory_exception_chunk.h"
#include <algorithm>
#include "common/utils.h"
#include "storage/buffer_manager/memory_manager.h"
#include "storage/compression/float_compression.h"
#include "storage/storage_utils.h"
#include "storage/table/column.h"
#include "storage/table/column_chunk_data.h"
#include <concepts>
namespace lbug::storage {
using namespace common;
using namespace transaction;
template<std::floating_point T>
using ExceptionInBuffer = std::array<std::byte, EncodeException<T>::sizeInBytes()>;
template<std::floating_point T>
InMemoryExceptionChunk<T>::InMemoryExceptionChunk(const SegmentState& state, FileHandle* dataFH,
MemoryManager* memoryManager, ShadowFile* shadowFile)
: exceptionCount(state.metadata.compMeta.floatMetadata()->exceptionCount),
finalizedExceptionCount(exceptionCount),
exceptionCapacity(state.metadata.compMeta.floatMetadata()->exceptionCapacity),
emptyMask(exceptionCapacity),
column(std::make_unique<Column>("ALPExceptionChunk", physicalType, dataFH, memoryManager,
shadowFile, false, false )) {
const auto exceptionBaseCursor =
getExceptionPageCursor(state.metadata, PageCursor{state.metadata.getStartPageIdx(), 0},
state.metadata.compMeta.floatMetadata()->exceptionCapacity);
const auto compMeta =
CompressionMetadata(StorageValue{0}, StorageValue{1}, CompressionType::UNCOMPRESSED);
const auto exceptionChunkMeta = ColumnChunkMetadata(exceptionBaseCursor.pageIdx,
safeIntegerConversion<page_idx_t>(
EncodeException<T>::numPagesFromExceptions(exceptionCapacity)),
exceptionCapacity, compMeta);
chunkState = std::make_unique<SegmentState>(exceptionChunkMeta,
EncodeException<T>::exceptionBytesPerPage() / EncodeException<T>::sizeInBytes());
chunkData =
std::make_unique<ColumnChunkData>(*memoryManager, physicalType, false, exceptionChunkMeta,
false );
chunkData->setToInMemory();
column->scanSegment(*chunkState, chunkData.get(), 0, chunkState->metadata.numValues);
}
template<std::floating_point T>
InMemoryExceptionChunk<T>::~InMemoryExceptionChunk() = default;
template<std::floating_point T>
void InMemoryExceptionChunk<T>::finalizeAndFlushToDisk(SegmentState& state) {
finalize(state);
column->writeSegment(*chunkData, *chunkState, 0, *chunkData, 0, exceptionCapacity);
}
template<std::floating_point T>
void InMemoryExceptionChunk<T>::finalize(SegmentState& state) {
finalizedExceptionCount = 0;
for (size_t i = 0; i < exceptionCount; ++i) {
if (!emptyMask.isNull(i)) {
++finalizedExceptionCount;
if (finalizedExceptionCount - 1 == i) {
continue;
}
writeException(getExceptionAt(i), finalizedExceptionCount - 1);
}
}
DASSERT(finalizedExceptionCount <= state.metadata.compMeta.floatMetadata()->exceptionCapacity);
state.metadata.compMeta.floatMetadata()->exceptionCount = finalizedExceptionCount;
ExceptionInBuffer<T>* exceptionWordBuffer =
reinterpret_cast<ExceptionInBuffer<T>*>(chunkData->getData());
std::sort(exceptionWordBuffer, exceptionWordBuffer + finalizedExceptionCount,
[](ExceptionInBuffer<T>& a, ExceptionInBuffer<T>& b) {
return EncodeExceptionView<T>{reinterpret_cast<std::byte*>(&a)}.getValue() <
EncodeExceptionView<T>{reinterpret_cast<std::byte*>(&b)}.getValue();
});
std::memset(chunkData->getData() + finalizedExceptionCount * EncodeException<T>::sizeInBytes(),
0, (exceptionCount - finalizedExceptionCount) * EncodeException<T>::sizeInBytes());
emptyMask.setNullFromRange(0, finalizedExceptionCount, false);
emptyMask.setNullFromRange(finalizedExceptionCount, (exceptionCount - finalizedExceptionCount),
true);
exceptionCount = finalizedExceptionCount;
chunkData->setNumValues(finalizedExceptionCount);
}
template<std::floating_point T>
void InMemoryExceptionChunk<T>::addException(EncodeException<T> exception) {
DASSERT(exceptionCount < exceptionCapacity);
++exceptionCount;
writeException(exception, exceptionCount - 1);
emptyMask.setNull(exceptionCount - 1, false);
}
template<std::floating_point T>
void InMemoryExceptionChunk<T>::removeExceptionAt(size_t exceptionIdx) {
emptyMask.setNull(exceptionIdx, true);
}
template<std::floating_point T>
EncodeException<T> InMemoryExceptionChunk<T>::getExceptionAt(size_t exceptionIdx) const {
DASSERT(exceptionIdx < exceptionCount);
auto bytesInBuffer = chunkData->getValue<ExceptionInBuffer<T>>(exceptionIdx);
return EncodeExceptionView<T>{reinterpret_cast<std::byte*>(&bytesInBuffer)}.getValue();
}
template<std::floating_point T>
void InMemoryExceptionChunk<T>::writeException(EncodeException<T> exception, size_t exceptionIdx) {
DASSERT(exceptionIdx < exceptionCount);
EncodeExceptionView<T>{reinterpret_cast<std::byte*>(chunkData->getData())}.setValue(exception,
exceptionIdx);
}
template<std::floating_point T>
offset_t InMemoryExceptionChunk<T>::findFirstExceptionAtOrPastOffset(offset_t offsetInChunk) const {
offset_t lo = 0;
offset_t hi = finalizedExceptionCount;
while (lo < hi) {
const size_t curExceptionIdx = (lo + hi) / 2;
EncodeException<T> lastException = getExceptionAt(curExceptionIdx);
if (lastException.posInChunk < offsetInChunk) {
lo = curExceptionIdx + 1;
} else {
hi = curExceptionIdx;
}
}
return lo;
}
template<std::floating_point T>
PageCursor InMemoryExceptionChunk<T>::getExceptionPageCursor(const ColumnChunkMetadata& metadata,
PageCursor pageBaseCursor, size_t exceptionCapacity) {
const size_t numExceptionPages = EncodeException<T>::numPagesFromExceptions(exceptionCapacity);
const size_t exceptionPageOffset = metadata.getNumPages() - numExceptionPages;
DASSERT(exceptionPageOffset == (page_idx_t)exceptionPageOffset);
return {pageBaseCursor.pageIdx + (page_idx_t)exceptionPageOffset, 0};
}
template<std::floating_point T>
size_t InMemoryExceptionChunk<T>::getExceptionCount() const {
return finalizedExceptionCount;
}
template class InMemoryExceptionChunk<float>;
template class InMemoryExceptionChunk<double>;
}