#pragma once
#include <algorithm>
#include "memory/memory_allocator_impl.h"
#include "rocksdb/advanced_compression.h"
#include "rocksdb/options.h"
#include "table/block_based/block_type.h"
#include "util/aligned_buffer.h"
#include "util/coding.h"
#include "util/compression_context_cache.h"
#ifdef ZSTD
#include <zstd.h>
#include <zstd_errors.h>
#if ZSTD_VERSION_NUMBER < 10400
#error "ZSTD support requires version >= 1.4.0 (libzstd-devel)"
#endif #if defined(ZSTD_STATIC_LINKING_ONLY)
#define ROCKSDB_ZSTD_DDICT
#endif #include <zdict.h>
#if ZSTD_VERSION_NUMBER >= 10405
#define ROCKSDB_ZDICT_FINALIZE
#endif #endif
namespace ROCKSDB_NAMESPACE {
#if defined(ZSTD) && defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && \
defined(ZSTD_STATIC_LINKING_ONLY)
#define ROCKSDB_ZSTD_CUSTOM_MEM
namespace port {
ZSTD_customMem GetJeZstdAllocationOverrides();
} #endif
class ZSTDUncompressCachedData {
public:
#if defined(ZSTD)
using ZSTDNativeContext = ZSTD_DCtx*;
#else
using ZSTDNativeContext = void*;
#endif ZSTDUncompressCachedData() {}
ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete;
ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) noexcept
: ZSTDUncompressCachedData() {
*this = std::move(o);
}
ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) noexcept {
assert(zstd_ctx_ == nullptr);
std::swap(zstd_ctx_, o.zstd_ctx_);
std::swap(cache_idx_, o.cache_idx_);
return *this;
}
ZSTDNativeContext Get() const { return zstd_ctx_; }
int64_t GetCacheIndex() const { return cache_idx_; }
void CreateIfNeeded() {
if (zstd_ctx_ == nullptr) {
#if !defined(ZSTD)
zstd_ctx_ = nullptr;
#elif defined(ROCKSDB_ZSTD_CUSTOM_MEM)
zstd_ctx_ =
ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides());
#else
zstd_ctx_ = ZSTD_createDCtx();
#endif
cache_idx_ = -1;
}
}
void InitFromCache(const ZSTDUncompressCachedData& o, int64_t idx) {
zstd_ctx_ = o.zstd_ctx_;
cache_idx_ = idx;
}
~ZSTDUncompressCachedData() {
#if defined(ZSTD)
if (zstd_ctx_ != nullptr && cache_idx_ == -1) {
ZSTD_freeDCtx(zstd_ctx_);
}
#endif }
private:
ZSTDNativeContext zstd_ctx_ = nullptr;
int64_t cache_idx_ = -1; };
}
#if defined(XPRESS)
#include "port/xpress.h"
#endif
namespace ROCKSDB_NAMESPACE {
class FailureDecompressor : public Decompressor {
public:
explicit FailureDecompressor(Status&& status) : status_(std::move(status)) {
assert(!status_.ok());
}
~FailureDecompressor() override { status_.PermitUncheckedError(); }
const char* Name() const override { return "FailureDecompressor"; }
Status ExtractUncompressedSize(Args& ) override { return status_; }
Status DecompressBlock(const Args& ,
char* ) override {
return status_;
}
protected:
Status status_;
};
struct DecompressorDict {
std::string dict_str_;
CacheAllocationPtr dict_allocation_;
std::unique_ptr<Decompressor> decompressor_;
size_t memory_usage_;
DecompressorDict(std::string&& dict, Decompressor& from_decompressor)
: dict_str_(std::move(dict)) {
Populate(from_decompressor, dict_str_);
}
DecompressorDict(Slice slice, CacheAllocationPtr&& allocation,
Decompressor& from_decompressor)
: dict_allocation_(std::move(allocation)) {
Populate(from_decompressor, slice);
}
DecompressorDict(DecompressorDict&& rhs) noexcept
: dict_str_(std::move(rhs.dict_str_)),
dict_allocation_(std::move(rhs.dict_allocation_)),
decompressor_(std::move(rhs.decompressor_)),
memory_usage_(std::move(rhs.memory_usage_)) {}
DecompressorDict& operator=(DecompressorDict&& rhs) noexcept {
if (this == &rhs) {
return *this;
}
dict_str_ = std::move(rhs.dict_str_);
dict_allocation_ = std::move(rhs.dict_allocation_);
decompressor_ = std::move(rhs.decompressor_);
return *this;
}
DecompressorDict(const DecompressorDict&) = delete;
DecompressorDict& operator=(const DecompressorDict&) = delete;
bool own_bytes() const { return !dict_str_.empty() || dict_allocation_; }
const Slice& GetRawDict() const { return decompressor_->GetSerializedDict(); }
const Slice& ContentSlice() const { return GetRawDict(); }
static constexpr CacheEntryRole kCacheEntryRole = CacheEntryRole::kOtherBlock;
static constexpr BlockType kBlockType = BlockType::kCompressionDictionary;
size_t ApproximateMemoryUsage() const { return memory_usage_; }
private:
void Populate(Decompressor& from_decompressor, Slice dict);
};
struct CompressionDict {
#ifdef ZSTD
ZSTD_CDict* zstd_cdict_ = nullptr;
#endif std::string dict_;
public:
CompressionDict() = default;
CompressionDict(std::string&& dict, CompressionType type, int level) {
dict_ = std::move(dict);
#ifdef ZSTD
zstd_cdict_ = nullptr;
if (!dict_.empty() && type == kZSTD) {
if (level == CompressionOptions::kDefaultCompressionLevel) {
level = ZSTD_CLEVEL_DEFAULT;
}
zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level);
assert(zstd_cdict_ != nullptr);
}
#else
(void)type;
(void)level;
#endif }
CompressionDict(CompressionDict&& other) {
#ifdef ZSTD
zstd_cdict_ = other.zstd_cdict_;
other.zstd_cdict_ = nullptr;
#endif dict_ = std::move(other.dict_);
}
CompressionDict& operator=(CompressionDict&& other) {
if (this == &other) {
return *this;
}
#ifdef ZSTD
zstd_cdict_ = other.zstd_cdict_;
other.zstd_cdict_ = nullptr;
#endif dict_ = std::move(other.dict_);
return *this;
}
~CompressionDict() {
#ifdef ZSTD
size_t res = 0;
if (zstd_cdict_ != nullptr) {
res = ZSTD_freeCDict(zstd_cdict_);
}
assert(res == 0); (void)res; #endif }
#ifdef ZSTD
const ZSTD_CDict* GetDigestedZstdCDict() const { return zstd_cdict_; }
#endif
Slice GetRawDict() const { return dict_; }
bool empty() const { return dict_.empty(); }
static const CompressionDict& GetEmptyDict() {
static CompressionDict empty_dict{};
return empty_dict;
}
CompressionDict(const CompressionDict&) = delete;
CompressionDict& operator=(const CompressionDict&) = delete;
};
class CompressionContext : public Compressor::WorkingArea {
private:
#ifdef ZSTD
ZSTD_CCtx* zstd_ctx_ = nullptr;
ZSTD_CCtx* CreateZSTDContext() {
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM
return ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
#else
return ZSTD_createCCtx();
#endif }
public:
ZSTD_CCtx* ZSTDPreallocCtx() const {
assert(zstd_ctx_ != nullptr);
return zstd_ctx_;
}
private:
#endif
void CreateNativeContext(CompressionType type, int level, bool checksum) {
#ifdef ZSTD
if (type == kZSTD) {
zstd_ctx_ = CreateZSTDContext();
if (level == CompressionOptions::kDefaultCompressionLevel) {
level = ZSTD_CLEVEL_DEFAULT;
}
size_t err =
ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_compressionLevel, level);
if (ZSTD_isError(err)) {
assert(false);
ZSTD_freeCCtx(zstd_ctx_);
zstd_ctx_ = CreateZSTDContext();
}
if (checksum) {
err = ZSTD_CCtx_setParameter(zstd_ctx_, ZSTD_c_checksumFlag, 1);
if (ZSTD_isError(err)) {
assert(false);
ZSTD_freeCCtx(zstd_ctx_);
zstd_ctx_ = CreateZSTDContext();
}
}
}
#else
(void)type;
(void)level;
(void)checksum;
#endif }
void DestroyNativeContext() {
#ifdef ZSTD
if (zstd_ctx_ != nullptr) {
ZSTD_freeCCtx(zstd_ctx_);
}
#endif }
public:
explicit CompressionContext(CompressionType type,
const CompressionOptions& options) {
CreateNativeContext(type, options.level, options.checksum);
}
~CompressionContext() { DestroyNativeContext(); }
CompressionContext(const CompressionContext&) = delete;
CompressionContext& operator=(const CompressionContext&) = delete;
};
class UncompressionContext : public Decompressor::WorkingArea {
private:
CompressionContextCache* ctx_cache_ = nullptr;
ZSTDUncompressCachedData uncomp_cached_data_;
public:
explicit UncompressionContext(CompressionType type) {
if (type == kZSTD) {
ctx_cache_ = CompressionContextCache::Instance();
uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
}
}
~UncompressionContext() {
if (uncomp_cached_data_.GetCacheIndex() != -1) {
assert(ctx_cache_ != nullptr);
ctx_cache_->ReturnCachedZSTDUncompressData(
uncomp_cached_data_.GetCacheIndex());
}
}
UncompressionContext(const UncompressionContext&) = delete;
UncompressionContext& operator=(const UncompressionContext&) = delete;
ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
return uncomp_cached_data_.Get();
}
};
inline bool Snappy_Supported() {
#ifdef SNAPPY
return true;
#else
return false;
#endif
}
inline bool Zlib_Supported() {
#ifdef ZLIB
return true;
#else
return false;
#endif
}
inline bool BZip2_Supported() {
#ifdef BZIP2
return true;
#else
return false;
#endif
}
inline bool LZ4_Supported() {
#ifdef LZ4
return true;
#else
return false;
#endif
}
inline bool XPRESS_Supported() {
#ifdef XPRESS
return true;
#else
return false;
#endif
}
inline bool ZSTD_Supported() {
#ifdef ZSTD
return true;
#else
return false;
#endif
}
inline bool ZSTD_Streaming_Supported() {
#if defined(ZSTD)
return true;
#else
return false;
#endif
}
inline bool StreamingCompressionTypeSupported(
CompressionType compression_type) {
switch (compression_type) {
case kNoCompression:
return true;
case kZSTD:
return ZSTD_Streaming_Supported();
default:
return false;
}
}
inline bool CompressionTypeSupported(CompressionType compression_type) {
switch (compression_type) {
case kNoCompression:
return true;
case kSnappyCompression:
return Snappy_Supported();
case kZlibCompression:
return Zlib_Supported();
case kBZip2Compression:
return BZip2_Supported();
case kLZ4Compression:
return LZ4_Supported();
case kLZ4HCCompression:
return LZ4_Supported();
case kXpressCompression:
return XPRESS_Supported();
case kZSTD:
return ZSTD_Supported();
default: return false;
}
}
inline bool DictCompressionTypeSupported(CompressionType compression_type) {
switch (compression_type) {
case kNoCompression:
return false;
case kSnappyCompression:
return false;
case kZlibCompression:
return Zlib_Supported();
case kBZip2Compression:
return false;
case kLZ4Compression:
case kLZ4HCCompression:
#if LZ4_VERSION_NUMBER >= 10400
return LZ4_Supported();
#else
return false;
#endif
case kXpressCompression:
return false;
case kZSTD:
return ZSTD_Supported();
default: return false;
}
}
std::string CompressionTypeToString(CompressionType compression_type);
CompressionType CompressionTypeFromString(std::string compression_type_str);
std::string CompressionOptionsToString(
const CompressionOptions& compression_options);
inline bool ZSTD_TrainDictionarySupported() {
#ifdef ZSTD
return true;
#else
return false;
#endif
}
inline bool ZSTD_FinalizeDictionarySupported() {
#ifdef ROCKSDB_ZDICT_FINALIZE
return true;
#else
return false;
#endif
}
Status LegacyForceBuiltinCompression(
Compressor& builtin_compressor,
Compressor::ManagedWorkingArea* working_area, Slice from,
GrowableBuffer* to);
class CompressionTypeRecord {
public:
explicit CompressionTypeRecord(CompressionType compression_type)
: compression_type_(compression_type) {}
CompressionType GetCompressionType() const { return compression_type_; }
inline void EncodeTo(std::string* dst) const {
assert(dst != nullptr);
PutFixed32(dst, compression_type_);
}
inline Status DecodeFrom(Slice* src) {
constexpr char class_name[] = "CompressionTypeRecord";
uint32_t val;
if (!GetFixed32(src, &val)) {
return Status::Corruption(class_name,
"Error decoding WAL compression type");
}
CompressionType compression_type = static_cast<CompressionType>(val);
if (!StreamingCompressionTypeSupported(compression_type)) {
return Status::Corruption(class_name,
"WAL compression type not supported");
}
compression_type_ = compression_type;
return Status::OK();
}
inline std::string DebugString() const {
return "compression_type: " + CompressionTypeToString(compression_type_);
}
private:
CompressionType compression_type_;
};
class StreamingCompress {
public:
StreamingCompress(CompressionType compression_type,
const CompressionOptions& opts,
uint32_t compress_format_version, size_t max_output_len)
: compression_type_(compression_type),
opts_(opts),
compress_format_version_(compress_format_version),
max_output_len_(max_output_len) {}
virtual ~StreamingCompress() = default;
virtual int Compress(const char* input, size_t input_size, char* output,
size_t* output_pos) = 0;
static StreamingCompress* Create(CompressionType compression_type,
const CompressionOptions& opts,
uint32_t compress_format_version,
size_t max_output_len);
virtual void Reset() = 0;
protected:
const CompressionType compression_type_;
const CompressionOptions opts_;
const uint32_t compress_format_version_;
const size_t max_output_len_;
};
class StreamingUncompress {
public:
StreamingUncompress(CompressionType compression_type,
uint32_t compress_format_version, size_t max_output_len)
: compression_type_(compression_type),
compress_format_version_(compress_format_version),
max_output_len_(max_output_len) {}
virtual ~StreamingUncompress() = default;
virtual int Uncompress(const char* input, size_t input_size, char* output,
size_t* output_pos) = 0;
static StreamingUncompress* Create(CompressionType compression_type,
uint32_t compress_format_version,
size_t max_output_len);
virtual void Reset() = 0;
protected:
CompressionType compression_type_;
uint32_t compress_format_version_;
size_t max_output_len_;
};
class ZSTDStreamingCompress final : public StreamingCompress {
public:
explicit ZSTDStreamingCompress(const CompressionOptions& opts,
uint32_t compress_format_version,
size_t max_output_len)
: StreamingCompress(kZSTD, opts, compress_format_version,
max_output_len) {
#ifdef ZSTD
cctx_ = ZSTD_createCCtx();
ZSTD_CCtx_setParameter(cctx_, ZSTD_c_checksumFlag, 1);
assert(cctx_ != nullptr);
input_buffer_ = {nullptr, 0, 0};
#endif
}
~ZSTDStreamingCompress() override {
#ifdef ZSTD
ZSTD_freeCCtx(cctx_);
#endif
}
int Compress(const char* input, size_t input_size, char* output,
size_t* output_pos) override;
void Reset() override;
#ifdef ZSTD
ZSTD_CCtx* cctx_;
ZSTD_inBuffer input_buffer_;
#endif
};
class ZSTDStreamingUncompress final : public StreamingUncompress {
public:
explicit ZSTDStreamingUncompress(uint32_t compress_format_version,
size_t max_output_len)
: StreamingUncompress(kZSTD, compress_format_version, max_output_len) {
#ifdef ZSTD
dctx_ = ZSTD_createDCtx();
assert(dctx_ != nullptr);
input_buffer_ = {nullptr, 0, 0};
#endif
}
~ZSTDStreamingUncompress() override {
#ifdef ZSTD
ZSTD_freeDCtx(dctx_);
#endif
}
int Uncompress(const char* input, size_t input_size, char* output,
size_t* output_size) override;
void Reset() override;
private:
#ifdef ZSTD
ZSTD_DCtx* dctx_;
ZSTD_inBuffer input_buffer_;
#endif
};
}