#include "util/compression.h"
#ifdef BZIP2
#include <bzlib.h>
#endif
#include <limits>
#ifdef LZ4
#include <lz4.h>
#include <lz4hc.h>
#if LZ4_VERSION_NUMBER < 10700
#error "LZ4 support requires version >= 1.7.0 (lz4-devel)"
#endif #endif
#ifdef SNAPPY
#include <snappy-sinksource.h>
#include <snappy.h>
#endif
#ifdef ZLIB
#include <zlib.h>
#endif
#include "options/options_helper.h"
#include "port/likely.h"
#include "rocksdb/convenience.h"
#include "rocksdb/utilities/object_registry.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
std::string CompressionTypeToString(CompressionType compression_type) {
switch (compression_type) {
case kNoCompression:
return "NoCompression";
case kSnappyCompression:
return "Snappy";
case kZlibCompression:
return "Zlib";
case kBZip2Compression:
return "BZip2";
case kLZ4Compression:
return "LZ4";
case kLZ4HCCompression:
return "LZ4HC";
case kXpressCompression:
return "Xpress";
case kZSTD:
return "ZSTD";
case kDisableCompressionOption:
return "DisableOption";
default: {
bool is_custom = compression_type >= kFirstCustomCompression &&
compression_type <= kLastCustomCompression;
unsigned char c = lossless_cast<unsigned char>(compression_type);
return (is_custom ? "Custom" : "Reserved") +
ToBaseCharsString<16>(2, c, true);
}
}
}
CompressionType CompressionTypeFromString(std::string compression_type_str) {
if (!compression_type_str.empty()) {
switch (compression_type_str[0]) {
case 'N':
if (compression_type_str == "NoCompression") {
return kNoCompression;
}
break;
case 'S':
if (compression_type_str == "Snappy") {
return kSnappyCompression;
}
break;
case 'Z':
if (compression_type_str == "ZSTD") {
return kZSTD;
}
if (compression_type_str == "Zlib") {
return kZlibCompression;
}
break;
case 'B':
if (compression_type_str == "BZip2") {
return kBZip2Compression;
}
break;
case 'L':
if (compression_type_str == "LZ4") {
return kLZ4Compression;
}
if (compression_type_str == "LZ4HC") {
return kLZ4HCCompression;
}
break;
case 'X':
if (compression_type_str == "Xpress") {
return kXpressCompression;
}
break;
default:;
}
}
return kDisableCompressionOption;
}
std::string CompressionOptionsToString(
const CompressionOptions& compression_options) {
std::string result;
result.reserve(512);
result.append("window_bits=")
.append(std::to_string(compression_options.window_bits))
.append("; ");
result.append("level=")
.append(std::to_string(compression_options.level))
.append("; ");
result.append("strategy=")
.append(std::to_string(compression_options.strategy))
.append("; ");
result.append("max_dict_bytes=")
.append(std::to_string(compression_options.max_dict_bytes))
.append("; ");
result.append("zstd_max_train_bytes=")
.append(std::to_string(compression_options.zstd_max_train_bytes))
.append("; ");
result.append("enabled=")
.append(std::to_string(compression_options.enabled))
.append("; ");
result.append("max_dict_buffer_bytes=")
.append(std::to_string(compression_options.max_dict_buffer_bytes))
.append("; ");
result.append("use_zstd_dict_trainer=")
.append(std::to_string(compression_options.use_zstd_dict_trainer))
.append("; ");
result.append("max_compressed_bytes_per_kb=")
.append(std::to_string(compression_options.max_compressed_bytes_per_kb))
.append("; ");
result.append("checksum=")
.append(std::to_string(compression_options.checksum))
.append("; ");
return result;
}
StreamingCompress* StreamingCompress::Create(CompressionType compression_type,
const CompressionOptions& opts,
uint32_t compress_format_version,
size_t max_output_len) {
switch (compression_type) {
case kZSTD: {
if (!ZSTD_Streaming_Supported()) {
return nullptr;
}
return new ZSTDStreamingCompress(opts, compress_format_version,
max_output_len);
}
default:
return nullptr;
}
}
StreamingUncompress* StreamingUncompress::Create(
CompressionType compression_type, uint32_t compress_format_version,
size_t max_output_len) {
switch (compression_type) {
case kZSTD: {
if (!ZSTD_Streaming_Supported()) {
return nullptr;
}
return new ZSTDStreamingUncompress(compress_format_version,
max_output_len);
}
default:
return nullptr;
}
}
int ZSTDStreamingCompress::Compress(const char* input, size_t input_size,
char* output, size_t* output_pos) {
assert(input != nullptr && output != nullptr && output_pos != nullptr);
*output_pos = 0;
if (input_size == 0) {
return 0;
}
#ifndef ZSTD
(void)input;
(void)input_size;
(void)output;
return -1;
#else
if (input_buffer_.src == nullptr || input_buffer_.src != input) {
assert(input_buffer_.pos == input_buffer_.size);
input_buffer_ = {input, input_size, 0};
} else if (input_buffer_.src == input) {
}
ZSTD_outBuffer output_buffer = {output, max_output_len_, 0};
const size_t remaining =
ZSTD_compressStream2(cctx_, &output_buffer, &input_buffer_, ZSTD_e_end);
if (ZSTD_isError(remaining)) {
Reset();
return -1;
}
*output_pos = output_buffer.pos;
return (int)remaining;
#endif
}
void ZSTDStreamingCompress::Reset() {
#ifdef ZSTD
ZSTD_CCtx_reset(cctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
input_buffer_ = {nullptr, 0, 0};
#endif
}
int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
char* output, size_t* output_pos) {
assert(output != nullptr && output_pos != nullptr);
*output_pos = 0;
if (input_size == 0) {
return 0;
}
#ifdef ZSTD
if (input) {
input_buffer_ = {input, input_size, 0};
}
ZSTD_outBuffer output_buffer = {output, max_output_len_, 0};
size_t ret = ZSTD_decompressStream(dctx_, &output_buffer, &input_buffer_);
if (ZSTD_isError(ret)) {
Reset();
return -1;
}
*output_pos = output_buffer.pos;
return (int)(input_buffer_.size - input_buffer_.pos);
#else
(void)input;
(void)input_size;
(void)output;
return -1;
#endif
}
void ZSTDStreamingUncompress::Reset() {
#ifdef ZSTD
ZSTD_DCtx_reset(dctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
input_buffer_ = {nullptr, 0, 0};
#endif
}
void DecompressorDict::Populate(Decompressor& from_decompressor, Slice dict) {
if (UNLIKELY(dict.empty())) {
dict_str_ = {};
dict_allocation_ = {};
decompressor_ = std::make_unique<FailureDecompressor>(
Status::Corruption("Decompression dictionary is empty"));
} else {
Status s = from_decompressor.MaybeCloneForDict(dict, &decompressor_);
if (decompressor_ == nullptr) {
dict_str_ = {};
dict_allocation_ = {};
assert(!s.ok());
decompressor_ = std::make_unique<FailureDecompressor>(std::move(s));
} else {
assert(s.ok());
assert(decompressor_->GetSerializedDict() == dict);
}
}
memory_usage_ = sizeof(struct DecompressorDict);
memory_usage_ += dict_str_.size();
if (dict_allocation_) {
auto allocator = dict_allocation_.get_deleter().allocator;
if (allocator) {
memory_usage_ +=
allocator->UsableSize(dict_allocation_.get(), GetRawDict().size());
} else {
memory_usage_ += GetRawDict().size();
}
}
memory_usage_ += decompressor_->ApproximateOwnedMemoryUsage();
}
std::string ZSTD_TrainDictionary(const std::string& samples,
const std::vector<size_t>& sample_lens,
size_t max_dict_bytes) {
#ifdef ZSTD
assert(samples.empty() == sample_lens.empty());
if (samples.empty()) {
return "";
}
std::string dict_data(max_dict_bytes, '\0');
size_t dict_len = ZDICT_trainFromBuffer(
&dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0],
static_cast<unsigned>(sample_lens.size()));
if (ZDICT_isError(dict_len)) {
return "";
}
assert(dict_len <= max_dict_bytes);
dict_data.resize(dict_len);
return dict_data;
#else
assert(false);
(void)samples;
(void)sample_lens;
(void)max_dict_bytes;
return "";
#endif }
std::string ZSTD_TrainDictionary(const std::string& samples,
size_t sample_len_shift,
size_t max_dict_bytes) {
#ifdef ZSTD
size_t num_samples = samples.size() >> sample_len_shift;
std::vector<size_t> sample_lens(num_samples, size_t(1) << sample_len_shift);
return ZSTD_TrainDictionary(samples, sample_lens, max_dict_bytes);
#else
assert(false);
(void)samples;
(void)sample_len_shift;
(void)max_dict_bytes;
return "";
#endif }
std::string ZSTD_FinalizeDictionary(const std::string& samples,
const std::vector<size_t>& sample_lens,
size_t max_dict_bytes, int level) {
#ifdef ROCKSDB_ZDICT_FINALIZE
assert(samples.empty() == sample_lens.empty());
if (samples.empty()) {
return "";
}
if (level == CompressionOptions::kDefaultCompressionLevel) {
level = ZSTD_CLEVEL_DEFAULT;
}
std::string dict_data(max_dict_bytes, '\0');
size_t dict_len = ZDICT_finalizeDictionary(
dict_data.data(), max_dict_bytes, samples.data(),
std::min(static_cast<size_t>(samples.size()), max_dict_bytes),
samples.data(), sample_lens.data(),
static_cast<unsigned>(sample_lens.size()),
{level, 0 , 0 });
if (ZDICT_isError(dict_len)) {
return "";
} else {
assert(dict_len <= max_dict_bytes);
dict_data.resize(dict_len);
return dict_data;
}
#else
assert(false);
(void)samples;
(void)sample_lens;
(void)max_dict_bytes;
(void)level;
return "";
#endif }
Status Decompressor::ExtractUncompressedSize(Args& args) {
if (LIKELY(GetVarint64(&args.compressed_data, &args.uncompressed_size))) {
if (LIKELY(args.uncompressed_size <= SIZE_MAX)) {
return Status::OK();
} else {
return Status::MemoryLimit("Uncompressed size too large for platform");
}
} else {
return Status::Corruption("Unable to extract uncompressed size");
}
}
const Slice& Decompressor::GetSerializedDict() const {
static Slice kEmptySlice;
return kEmptySlice;
}
namespace {
class CompressorBase : public Compressor {
public:
explicit CompressorBase(const CompressionOptions& opts) : opts_(opts) {}
protected:
CompressionOptions opts_;
};
class CompressorWithSimpleDictBase : public CompressorBase {
public:
explicit CompressorWithSimpleDictBase(const CompressionOptions& opts,
std::string&& dict_data = {})
: CompressorBase(opts), dict_data_(std::move(dict_data)) {}
DictConfig GetDictGuidance(CacheEntryRole ) const override {
if (opts_.max_dict_bytes == 0) {
return DictDisabled{};
}
return DictSampling{opts_.max_dict_bytes};
}
Slice GetSerializedDict() const override { return dict_data_; }
std::unique_ptr<Compressor> Clone() const override {
return CloneForDict(std::string{dict_data_});
}
std::unique_ptr<Compressor> MaybeCloneSpecialized(
CacheEntryRole ,
DictConfigArgs&& dict_config) const final override {
if (auto* samples = std::get_if<DictSamples>(&dict_config)) {
assert(samples->Verify());
if (samples->empty()) {
return nullptr;
}
return CloneForDict(std::move(samples->sample_data));
} else if (auto* predef = std::get_if<DictPreDefined>(&dict_config)) {
if (predef->dict_data.empty()) {
return nullptr;
}
return CloneForDict(std::move(predef->dict_data));
} else {
assert(std::holds_alternative<DictDisabled>(dict_config));
return nullptr;
}
}
virtual std::unique_ptr<Compressor> CloneForDict(
std::string&& dict_data) const = 0;
protected:
const std::string dict_data_;
};
class BuiltinSnappyCompressorV2 final : public CompressorWithSimpleDictBase {
public:
using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
const char* Name() const override { return "BuiltinSnappyCompressorV2"; }
CompressionType GetPreferredCompressionType() const override {
return kSnappyCompression;
}
std::unique_ptr<Compressor> CloneForDict(
std::string&& dict_data) const override {
return std::make_unique<BuiltinSnappyCompressorV2>(opts_,
std::move(dict_data));
}
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea*) override {
#ifdef SNAPPY
struct MySink : public snappy::Sink {
MySink(char* output, size_t output_size)
: output_(output), output_size_(output_size) {}
char* output_;
size_t output_size_;
size_t pos_ = 0;
void Append(const char* data, size_t n) override {
if (pos_ + n <= output_size_) {
std::memcpy(output_ + pos_, data, n);
pos_ += n;
} else {
pos_ = output_size_ + 1;
}
}
char* GetAppendBuffer(size_t length, char* scratch) override {
if (pos_ + length <= output_size_) {
return output_ + pos_;
}
return scratch;
}
};
MySink sink{compressed_output, *compressed_output_size};
snappy::ByteArraySource source{uncompressed_data.data(),
uncompressed_data.size()};
size_t outlen = snappy::Compress(&source, &sink);
if (outlen > 0 && sink.pos_ <= sink.output_size_) {
assert(outlen == sink.pos_);
*compressed_output_size = outlen;
*out_compression_type = kSnappyCompression;
return Status::OK();
}
*compressed_output_size = 1;
#else
(void)uncompressed_data;
(void)compressed_output;
*compressed_output_size = 0;
#endif
*out_compression_type = kNoCompression;
return Status::OK();
}
std::shared_ptr<Decompressor> GetOptimizedDecompressor() const override;
};
[[maybe_unused]]
std::pair<char*, size_t> StartCompressBlockV2(Slice uncompressed_data,
char* compressed_output,
size_t compressed_output_size) {
if ( uncompressed_data.size() > std::numeric_limits<uint32_t>::max() ||
compressed_output_size <= 5) {
return {nullptr, 0};
}
char* alg_output = EncodeVarint32(
compressed_output, static_cast<uint32_t>(uncompressed_data.size()));
size_t alg_max_output_size =
compressed_output_size - (alg_output - compressed_output);
return {alg_output, alg_max_output_size};
}
class BuiltinZlibCompressorV2 final : public CompressorWithSimpleDictBase {
public:
using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
const char* Name() const override { return "BuiltinZlibCompressorV2"; }
CompressionType GetPreferredCompressionType() const override {
return kZlibCompression;
}
std::unique_ptr<Compressor> CloneForDict(
std::string&& dict_data) const override {
return std::make_unique<BuiltinZlibCompressorV2>(opts_,
std::move(dict_data));
}
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea*) override {
#ifdef ZLIB
auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
uncompressed_data, compressed_output, *compressed_output_size);
if (alg_max_output_size == 0) {
*compressed_output_size = 0;
*out_compression_type = kNoCompression;
return Status::OK();
}
static const int memLevel = 8;
int level = opts_.level;
if (level == CompressionOptions::kDefaultCompressionLevel) {
level = Z_DEFAULT_COMPRESSION;
}
z_stream stream;
memset(&stream, 0, sizeof(z_stream));
int st = deflateInit2(&stream, level, Z_DEFLATED, opts_.window_bits,
memLevel, opts_.strategy);
if (st != Z_OK) {
*compressed_output_size = 0;
*out_compression_type = kNoCompression;
return Status::OK();
}
if (!dict_data_.empty()) {
st = deflateSetDictionary(
&stream, reinterpret_cast<const Bytef*>(dict_data_.data()),
static_cast<unsigned int>(dict_data_.size()));
if (st != Z_OK) {
deflateEnd(&stream);
*compressed_output_size = 0;
*out_compression_type = kNoCompression;
return Status::OK();
}
}
stream.next_in = (Bytef*)uncompressed_data.data();
stream.avail_in = static_cast<unsigned int>(uncompressed_data.size());
stream.next_out = reinterpret_cast<Bytef*>(alg_output);
stream.avail_out = static_cast<unsigned int>(alg_max_output_size);
st = deflate(&stream, Z_FINISH);
size_t outlen = alg_max_output_size - stream.avail_out;
deflateEnd(&stream);
if (st == Z_STREAM_END) {
*compressed_output_size =
outlen + (alg_output - compressed_output);
*out_compression_type = kZlibCompression;
return Status::OK();
}
*compressed_output_size = 1;
#else
(void)uncompressed_data;
(void)compressed_output;
*compressed_output_size = 0;
#endif
*out_compression_type = kNoCompression;
return Status::OK();
}
};
class BuiltinBZip2CompressorV2 final : public CompressorWithSimpleDictBase {
public:
using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
const char* Name() const override { return "BuiltinBZip2CompressorV2"; }
CompressionType GetPreferredCompressionType() const override {
return kBZip2Compression;
}
std::unique_ptr<Compressor> CloneForDict(
std::string&& dict_data) const override {
return std::make_unique<BuiltinBZip2CompressorV2>(opts_,
std::move(dict_data));
}
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea*) override {
#ifdef BZIP2
auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
uncompressed_data, compressed_output, *compressed_output_size);
if (alg_max_output_size == 0) {
*compressed_output_size = 0;
*out_compression_type = kNoCompression;
return Status::OK();
}
bz_stream stream;
memset(&stream, 0, sizeof(bz_stream));
int st = BZ2_bzCompressInit(&stream, 1, 0, 30);
if (st != BZ_OK) {
*compressed_output_size = 0;
*out_compression_type = kNoCompression;
return Status::OK();
}
stream.next_in = const_cast<char*>(uncompressed_data.data());
stream.avail_in = static_cast<unsigned int>(uncompressed_data.size());
stream.next_out = alg_output;
stream.avail_out = static_cast<unsigned int>(alg_max_output_size);
st = BZ2_bzCompress(&stream, BZ_FINISH);
size_t outlen = alg_max_output_size - stream.avail_out;
BZ2_bzCompressEnd(&stream);
if (st == BZ_STREAM_END) {
*compressed_output_size = outlen + (alg_output - compressed_output);
*out_compression_type = kBZip2Compression;
return Status::OK();
}
*compressed_output_size = 1;
#else
(void)uncompressed_data;
(void)compressed_output;
*compressed_output_size = 0;
#endif
*out_compression_type = kNoCompression;
return Status::OK();
}
};
class BuiltinLZ4CompressorV2WithDict : public CompressorWithSimpleDictBase {
public:
using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
const char* Name() const override { return "BuiltinLZ4CompressorV2"; }
CompressionType GetPreferredCompressionType() const override {
return kLZ4Compression;
}
std::unique_ptr<Compressor> CloneForDict(
std::string&& dict_data) const override {
return std::make_unique<BuiltinLZ4CompressorV2WithDict>(
opts_, std::move(dict_data));
}
ManagedWorkingArea ObtainWorkingArea() override {
#ifdef LZ4
return {reinterpret_cast<WorkingArea*>(LZ4_createStream()), this};
#else
return {};
#endif
}
void ReleaseWorkingArea(WorkingArea* wa) override {
if (wa) {
#ifdef LZ4
LZ4_freeStream(reinterpret_cast<LZ4_stream_t*>(wa));
#endif
}
}
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea* wa) override {
#ifdef LZ4
auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
uncompressed_data, compressed_output, *compressed_output_size);
if (alg_max_output_size == 0) {
*compressed_output_size = 0;
*out_compression_type = kNoCompression;
return Status::OK();
}
ManagedWorkingArea tmp_wa;
LZ4_stream_t* stream;
if (wa != nullptr && wa->owner() == this) {
stream = reinterpret_cast<LZ4_stream_t*>(wa->get());
#if LZ4_VERSION_NUMBER >= 10900
LZ4_resetStream_fast(stream);
#else
LZ4_resetStream(stream);
#endif
} else {
tmp_wa = ObtainWorkingArea();
stream = reinterpret_cast<LZ4_stream_t*>(tmp_wa.get());
}
if (!dict_data_.empty()) {
LZ4_loadDict(stream, dict_data_.data(),
static_cast<int>(dict_data_.size()));
}
int acceleration;
if (opts_.level < 0) {
acceleration = -opts_.level;
} else {
acceleration = 1;
}
auto outlen = LZ4_compress_fast_continue(
stream, uncompressed_data.data(), alg_output,
static_cast<int>(uncompressed_data.size()),
static_cast<int>(alg_max_output_size), acceleration);
if (outlen > 0) {
size_t output_size = static_cast<size_t>(
outlen + (alg_output - compressed_output));
assert(output_size <= *compressed_output_size);
*compressed_output_size = output_size;
*out_compression_type = kLZ4Compression;
return Status::OK();
}
*compressed_output_size = 1;
#else
(void)uncompressed_data;
(void)compressed_output;
(void)wa;
*compressed_output_size = 0;
#endif
*out_compression_type = kNoCompression;
return Status::OK();
}
};
class BuiltinLZ4CompressorV2NoDict final
: public BuiltinLZ4CompressorV2WithDict {
public:
BuiltinLZ4CompressorV2NoDict(const CompressionOptions& opts)
: BuiltinLZ4CompressorV2WithDict(opts, {}) {}
std::unique_ptr<Compressor> Clone() const override {
return std::make_unique<BuiltinLZ4CompressorV2NoDict>(opts_);
}
ManagedWorkingArea ObtainWorkingArea() override {
return {};
}
void ReleaseWorkingArea(WorkingArea* wa) override {
(void)wa;
assert(wa == nullptr);
}
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea* wa) override {
#ifdef LZ4
(void)wa;
auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
uncompressed_data, compressed_output, *compressed_output_size);
if (alg_max_output_size == 0) {
*compressed_output_size = 0;
*out_compression_type = kNoCompression;
return Status::OK();
}
int acceleration;
if (opts_.level < 0) {
acceleration = -opts_.level;
} else {
acceleration = 1;
}
auto outlen =
LZ4_compress_fast(uncompressed_data.data(), alg_output,
static_cast<int>(uncompressed_data.size()),
static_cast<int>(alg_max_output_size), acceleration);
if (outlen > 0) {
size_t output_size = static_cast<size_t>(
outlen + (alg_output - compressed_output));
assert(output_size <= *compressed_output_size);
*compressed_output_size = output_size;
*out_compression_type = kLZ4Compression;
return Status::OK();
}
*compressed_output_size = 1;
#else
(void)uncompressed_data;
(void)compressed_output;
(void)wa;
*compressed_output_size = 0;
#endif
*out_compression_type = kNoCompression;
return Status::OK();
}
};
class BuiltinLZ4HCCompressorV2 final : public CompressorWithSimpleDictBase {
public:
using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
const char* Name() const override { return "BuiltinLZ4HCCompressorV2"; }
CompressionType GetPreferredCompressionType() const override {
return kLZ4HCCompression;
}
std::unique_ptr<Compressor> CloneForDict(
std::string&& dict_data) const override {
return std::make_unique<BuiltinLZ4HCCompressorV2>(opts_,
std::move(dict_data));
}
ManagedWorkingArea ObtainWorkingArea() override {
#ifdef LZ4
return {reinterpret_cast<WorkingArea*>(LZ4_createStreamHC()), this};
#else
return {};
#endif
}
void ReleaseWorkingArea(WorkingArea* wa) override {
if (wa) {
#ifdef LZ4
LZ4_freeStreamHC(reinterpret_cast<LZ4_streamHC_t*>(wa));
#endif
}
}
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea* wa) override {
#ifdef LZ4
auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
uncompressed_data, compressed_output, *compressed_output_size);
if (alg_max_output_size == 0) {
*compressed_output_size = 0;
*out_compression_type = kNoCompression;
return Status::OK();
}
int level = opts_.level;
if (level == CompressionOptions::kDefaultCompressionLevel) {
level = 0; }
ManagedWorkingArea tmp_wa;
LZ4_streamHC_t* stream;
if (wa != nullptr && wa->owner() == this) {
stream = reinterpret_cast<LZ4_streamHC_t*>(wa->get());
} else {
tmp_wa = ObtainWorkingArea();
stream = reinterpret_cast<LZ4_streamHC_t*>(tmp_wa.get());
}
#if LZ4_VERSION_NUMBER >= 10900
LZ4_resetStreamHC_fast(stream, level);
#else
LZ4_resetStreamHC(stream, level);
#endif
if (dict_data_.size() > 0) {
LZ4_loadDictHC(stream, dict_data_.data(),
static_cast<int>(dict_data_.size()));
}
auto outlen =
LZ4_compress_HC_continue(stream, uncompressed_data.data(), alg_output,
static_cast<int>(uncompressed_data.size()),
static_cast<int>(alg_max_output_size));
if (outlen > 0) {
size_t output_size = static_cast<size_t>(
outlen + (alg_output - compressed_output));
assert(output_size <= *compressed_output_size);
*compressed_output_size = output_size;
*out_compression_type = kLZ4HCCompression;
return Status::OK();
}
*compressed_output_size = 1;
#else
(void)uncompressed_data;
(void)compressed_output;
(void)wa;
*compressed_output_size = 0;
#endif
*out_compression_type = kNoCompression;
return Status::OK();
}
};
class BuiltinXpressCompressorV2 final : public CompressorWithSimpleDictBase {
public:
using CompressorWithSimpleDictBase::CompressorWithSimpleDictBase;
const char* Name() const override { return "BuiltinXpressCompressorV2"; }
CompressionType GetPreferredCompressionType() const override {
return kXpressCompression;
}
std::unique_ptr<Compressor> CloneForDict(
std::string&& dict_data) const override {
return std::make_unique<BuiltinXpressCompressorV2>(opts_,
std::move(dict_data));
}
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea*) override {
#ifdef XPRESS
size_t compressed_size = port::xpress::CompressWithMaxSize(
uncompressed_data.data(), uncompressed_data.size(), compressed_output,
*compressed_output_size);
if (compressed_size > 0) {
*compressed_output_size = compressed_size;
*out_compression_type = kXpressCompression;
return Status::OK();
}
*compressed_output_size = 1;
#else
(void)uncompressed_data;
(void)compressed_output;
*compressed_output_size = 0;
#endif
*out_compression_type = kNoCompression;
return Status::OK();
}
};
class BuiltinZSTDCompressorV2 final : public CompressorBase {
public:
explicit BuiltinZSTDCompressorV2(const CompressionOptions& opts,
CompressionDict&& dict = {})
: CompressorBase(opts), dict_(std::move(dict)) {}
const char* Name() const override { return "BuiltinZSTDCompressorV2"; }
CompressionType GetPreferredCompressionType() const override { return kZSTD; }
std::unique_ptr<Compressor> Clone() const override {
CompressionDict dict_copy{dict_.GetRawDict().ToString(), kZSTD,
opts_.level};
return std::make_unique<BuiltinZSTDCompressorV2>(opts_,
std::move(dict_copy));
}
DictConfig GetDictGuidance(CacheEntryRole ) const override {
if (opts_.max_dict_bytes == 0) {
return DictDisabled{};
} else {
size_t max_sample_bytes = opts_.zstd_max_train_bytes > 0
? opts_.zstd_max_train_bytes
: opts_.max_dict_bytes;
return DictSampling{max_sample_bytes};
}
}
Slice GetSerializedDict() const override { return dict_.GetRawDict(); }
ManagedWorkingArea ObtainWorkingArea() override {
#ifdef ZSTD
ZSTD_CCtx* ctx =
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM
ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
#else
ZSTD_createCCtx();
#endif auto level = opts_.level;
if (level == CompressionOptions::kDefaultCompressionLevel) {
level = ZSTD_CLEVEL_DEFAULT;
}
size_t err = ZSTD_CCtx_setParameter(ctx, ZSTD_c_compressionLevel, level);
if (ZSTD_isError(err)) {
assert(false);
ZSTD_freeCCtx(ctx);
ctx = ZSTD_createCCtx();
}
if (opts_.checksum) {
err = ZSTD_CCtx_setParameter(ctx, ZSTD_c_checksumFlag, 1);
if (ZSTD_isError(err)) {
assert(false);
ZSTD_freeCCtx(ctx);
ctx = ZSTD_createCCtx();
}
}
return ManagedWorkingArea(reinterpret_cast<WorkingArea*>(ctx), this);
#else
return {};
#endif }
void ReleaseWorkingArea(WorkingArea* wa) override {
if (wa) {
#ifdef ZSTD
ZSTD_freeCCtx(reinterpret_cast<ZSTD_CCtx*>(wa));
#endif }
}
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
size_t* compressed_output_size,
CompressionType* out_compression_type,
ManagedWorkingArea* wa) override {
#ifdef ZSTD
auto [alg_output, alg_max_output_size] = StartCompressBlockV2(
uncompressed_data, compressed_output, *compressed_output_size);
if (alg_max_output_size == 0) {
*compressed_output_size = 0;
*out_compression_type = kNoCompression;
return Status::OK();
}
ManagedWorkingArea tmp_wa;
if (wa == nullptr || wa->owner() != this) {
tmp_wa = ObtainWorkingArea();
wa = &tmp_wa;
}
assert(wa->get() != nullptr);
ZSTD_CCtx* ctx = reinterpret_cast<ZSTD_CCtx*>(wa->get());
if (dict_.GetDigestedZstdCDict() != nullptr) {
ZSTD_CCtx_refCDict(ctx, dict_.GetDigestedZstdCDict());
} else {
ZSTD_CCtx_loadDictionary(ctx, dict_.GetRawDict().data(),
dict_.GetRawDict().size());
}
size_t outlen =
ZSTD_compress2(ctx, alg_output, alg_max_output_size,
uncompressed_data.data(), uncompressed_data.size());
if (!ZSTD_isError(outlen)) {
size_t output_size = static_cast<size_t>(
outlen + (alg_output - compressed_output));
assert(output_size <= *compressed_output_size);
*compressed_output_size = output_size;
*out_compression_type = kZSTD;
return Status::OK();
}
if (ZSTD_getErrorCode(outlen) != ZSTD_error_dstSize_tooSmall) {
return Status::Corruption(std::string("ZSTD_compress2 failed: ") +
ZSTD_getErrorName(outlen));
}
*compressed_output_size = 1;
#else
(void)uncompressed_data;
(void)compressed_output;
(void)wa;
*compressed_output_size = 0;
#endif
*out_compression_type = kNoCompression;
return Status::OK();
}
std::unique_ptr<Compressor> MaybeCloneSpecialized(
CacheEntryRole ,
DictConfigArgs&& dict_config) const override {
if (auto* disabled = std::get_if<DictDisabled>(&dict_config)) {
(void)disabled;
return nullptr;
}
std::string dict_data;
if (auto* predef = std::get_if<DictPreDefined>(&dict_config)) {
if (predef->dict_data.empty()) {
return nullptr;
}
dict_data = std::move(predef->dict_data);
}
if (auto* samples = std::get_if<DictSamples>(&dict_config)) {
assert(samples->Verify());
if (samples->empty()) {
return nullptr;
}
if (opts_.zstd_max_train_bytes > 0) {
assert(samples->sample_data.size() <= opts_.zstd_max_train_bytes);
if (opts_.use_zstd_dict_trainer) {
dict_data = ZSTD_TrainDictionary(
samples->sample_data, samples->sample_lens, opts_.max_dict_bytes);
} else {
dict_data = ZSTD_FinalizeDictionary(
samples->sample_data, samples->sample_lens, opts_.max_dict_bytes,
opts_.level);
}
} else {
assert(samples->sample_data.size() <= opts_.max_dict_bytes);
dict_data = std::move(samples->sample_data);
}
}
CompressionDict dict{std::move(dict_data), kZSTD, opts_.level};
return std::make_unique<BuiltinZSTDCompressorV2>(opts_, std::move(dict));
}
std::shared_ptr<Decompressor> GetOptimizedDecompressor() const override;
protected:
const CompressionDict dict_;
};
Status Snappy_DecompressBlock(const Decompressor::Args& args,
char* uncompressed_output) {
#ifdef SNAPPY
if (!snappy::RawUncompress(args.compressed_data.data(),
args.compressed_data.size(),
uncompressed_output)) {
return Status::Corruption("Error decompressing snappy data");
}
return Status::OK();
#else
(void)args;
(void)uncompressed_output;
return Status::NotSupported("Snappy not supported in this build");
#endif
}
Status Zlib_DecompressBlock(const Decompressor::Args& args, Slice dict,
char* uncompressed_output) {
#ifdef ZLIB
constexpr int kWindowBits = -14;
z_stream _stream;
memset(&_stream, 0, sizeof(z_stream));
int st = inflateInit2(&_stream, kWindowBits);
if (UNLIKELY(st != Z_OK)) {
return Status::Corruption("Failed to initialize zlib inflate: " +
std::to_string(st));
}
if (!dict.empty()) {
st = inflateSetDictionary(&_stream,
reinterpret_cast<const Bytef*>(dict.data()),
static_cast<unsigned int>(dict.size()));
if (UNLIKELY(st != Z_OK)) {
return Status::Corruption("Failed to initialize zlib dictionary: " +
std::to_string(st));
}
}
_stream.next_in = const_cast<Bytef*>(
reinterpret_cast<const Bytef*>(args.compressed_data.data()));
_stream.avail_in = static_cast<unsigned int>(args.compressed_data.size());
_stream.next_out = reinterpret_cast<Bytef*>(uncompressed_output);
_stream.avail_out = static_cast<unsigned int>(args.uncompressed_size);
st = inflate(&_stream, Z_SYNC_FLUSH);
if (UNLIKELY(st != Z_STREAM_END)) {
inflateEnd(&_stream);
return Status::Corruption("Failed zlib inflate: " + std::to_string(st));
}
if (_stream.avail_out != 0) {
inflateEnd(&_stream);
return Status::Corruption("Size mismatch decompressing zlib data");
}
inflateEnd(&_stream);
return Status::OK();
#else
(void)args;
(void)dict;
(void)uncompressed_output;
return Status::NotSupported("Zlib not supported in this build");
#endif
}
Status BZip2_DecompressBlock(const Decompressor::Args& args,
char* uncompressed_output) {
#ifdef BZIP2
auto uncompressed_size = static_cast<unsigned int>(args.uncompressed_size);
if (BZ_OK != BZ2_bzBuffToBuffDecompress(
uncompressed_output, &uncompressed_size,
const_cast<char*>(args.compressed_data.data()),
static_cast<unsigned int>(args.compressed_data.size()),
0 , 0 )) {
return Status::Corruption("Error decompressing bzip2 data");
}
if (uncompressed_size != args.uncompressed_size) {
return Status::Corruption("Size mismatch decompressing bzip2 data");
}
return Status::OK();
#else
(void)args;
(void)uncompressed_output;
return Status::NotSupported("BZip2 not supported in this build");
#endif
}
Status LZ4_DecompressBlock(const Decompressor::Args& args, Slice dict,
char* uncompressed_output) {
#ifdef LZ4
int expected_uncompressed_size = static_cast<int>(args.uncompressed_size);
LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
if (!dict.empty()) {
LZ4_setStreamDecode(stream, dict.data(), static_cast<int>(dict.size()));
}
int uncompressed_size = LZ4_decompress_safe_continue(
stream, args.compressed_data.data(), uncompressed_output,
static_cast<int>(args.compressed_data.size()),
expected_uncompressed_size);
LZ4_freeStreamDecode(stream);
if (uncompressed_size != expected_uncompressed_size) {
if (uncompressed_size < 0) {
return Status::Corruption("Error decompressing LZ4 data");
} else {
return Status::Corruption("Size mismatch decompressing LZ4 data");
}
}
return Status::OK();
#else
(void)args;
(void)dict;
(void)uncompressed_output;
return Status::NotSupported("LZ4 not supported in this build");
#endif
}
Status XPRESS_DecompressBlock(const Decompressor::Args& args,
char* uncompressed_output) {
#ifdef XPRESS
int64_t actual_uncompressed_size = port::xpress::DecompressToBuffer(
args.compressed_data.data(), args.compressed_data.size(),
uncompressed_output, args.uncompressed_size);
if (actual_uncompressed_size !=
static_cast<int64_t>(args.uncompressed_size)) {
if (actual_uncompressed_size < 0) {
return Status::Corruption("Error decompressing XPRESS data");
} else {
return Status::Corruption("Size mismatch decompressing XPRESS data");
}
}
return Status::OK();
#else
(void)args;
(void)uncompressed_output;
return Status::NotSupported("XPRESS not supported in this build");
#endif
}
template <bool kIsDigestedDict = false>
Status ZSTD_DecompressBlockWithContext(
const Decompressor::Args& args,
std::conditional_t<kIsDigestedDict, void*, Slice> dict,
ZSTDUncompressCachedData::ZSTDNativeContext zstd_context,
char* uncompressed_output) {
#ifdef ZSTD
size_t uncompressed_size;
assert(zstd_context != nullptr);
if constexpr (kIsDigestedDict) {
#ifdef ROCKSDB_ZSTD_DDICT
uncompressed_size = ZSTD_decompress_usingDDict(
zstd_context, uncompressed_output, args.uncompressed_size,
args.compressed_data.data(), args.compressed_data.size(),
static_cast<ZSTD_DDict*>(dict));
#else
static_assert(!kIsDigestedDict,
"Inconsistent expectation of ZSTD digested dict support");
#endif } else if (dict.empty()) {
uncompressed_size = ZSTD_decompressDCtx(
zstd_context, uncompressed_output, args.uncompressed_size,
args.compressed_data.data(), args.compressed_data.size());
} else {
uncompressed_size = ZSTD_decompress_usingDict(
zstd_context, uncompressed_output, args.uncompressed_size,
args.compressed_data.data(), args.compressed_data.size(), dict.data(),
dict.size());
}
if (ZSTD_isError(uncompressed_size)) {
return Status::Corruption(std::string("ZSTD ") +
ZSTD_getErrorName(uncompressed_size));
} else if (uncompressed_size != args.uncompressed_size) {
return Status::Corruption("ZSTD decompression size mismatch");
} else {
return Status::OK();
}
#else
(void)args;
(void)dict;
(void)zstd_context;
(void)uncompressed_output;
return Status::NotSupported("ZSTD not supported in this build");
#endif
}
template <bool kIsDigestedDict = false>
Status ZSTD_DecompressBlock(
const Decompressor::Args& args,
std::conditional_t<kIsDigestedDict, void*, Slice> dict,
const Decompressor* decompressor, char* uncompressed_output) {
if (args.working_area && args.working_area->owner() == decompressor) {
auto ctx = static_cast<UncompressionContext*>(args.working_area->get());
assert(ctx != nullptr);
if (ctx->GetZSTDContext() != nullptr) {
return ZSTD_DecompressBlockWithContext<kIsDigestedDict>(
args, dict, ctx->GetZSTDContext(), uncompressed_output);
}
}
UncompressionContext tmp_ctx{kZSTD};
return ZSTD_DecompressBlockWithContext<kIsDigestedDict>(
args, dict, tmp_ctx.GetZSTDContext(), uncompressed_output);
}
class BuiltinDecompressorV2 : public Decompressor {
public:
const char* Name() const override { return "BuiltinDecompressorV2"; }
Status ExtractUncompressedSize(Args& args) override {
assert(args.compression_type != kNoCompression);
if (args.compression_type == kSnappyCompression) {
#ifdef SNAPPY
size_t uncompressed_length = 0;
if (!snappy::GetUncompressedLength(args.compressed_data.data(),
args.compressed_data.size(),
&uncompressed_length)) {
return Status::Corruption("Error reading snappy compressed length");
}
args.uncompressed_size = uncompressed_length;
return Status::OK();
#else
return Status::NotSupported("Snappy not supported in this build");
#endif
} else if (args.compression_type == kXpressCompression) {
#ifdef XPRESS
int64_t result = port::xpress::GetDecompressedSize(
args.compressed_data.data(), args.compressed_data.size());
if (result < 0) {
return Status::Corruption("Error reading XPRESS compressed length");
}
args.uncompressed_size = static_cast<size_t>(result);
return Status::OK();
#else
return Status::NotSupported("XPRESS not supported in this build");
#endif
} else {
return Decompressor::ExtractUncompressedSize(args);
}
}
Status DecompressBlock(const Args& args, char* uncompressed_output) override {
switch (args.compression_type) {
case kSnappyCompression:
return Snappy_DecompressBlock(args, uncompressed_output);
case kZlibCompression:
return Zlib_DecompressBlock(args, Slice{},
uncompressed_output);
case kBZip2Compression:
return BZip2_DecompressBlock(args, uncompressed_output);
case kLZ4Compression:
case kLZ4HCCompression:
return LZ4_DecompressBlock(args, Slice{}, uncompressed_output);
case kXpressCompression:
return XPRESS_DecompressBlock(args, uncompressed_output);
case kZSTD:
return ZSTD_DecompressBlock(args, Slice{}, this,
uncompressed_output);
default:
return Status::NotSupported(
"Compression type not supported or not built-in: " +
CompressionTypeToString(args.compression_type));
}
}
Status MaybeCloneForDict(const Slice&,
std::unique_ptr<Decompressor>*) override;
size_t ApproximateOwnedMemoryUsage() const override {
return sizeof(BuiltinDecompressorV2);
}
};
class BuiltinDecompressorV2SnappyOnly final : public BuiltinDecompressorV2 {
public:
const char* Name() const override {
return "BuiltinDecompressorV2SnappyOnly";
}
Status ExtractUncompressedSize(Args& args) override {
assert(args.compression_type == kSnappyCompression);
#ifdef SNAPPY
size_t uncompressed_length = 0;
if (!snappy::GetUncompressedLength(args.compressed_data.data(),
args.compressed_data.size(),
&uncompressed_length)) {
return Status::Corruption("Error reading snappy compressed length");
}
args.uncompressed_size = uncompressed_length;
return Status::OK();
#else
return Status::NotSupported("Snappy not supported in this build");
#endif
}
Status DecompressBlock(const Args& args, char* uncompressed_output) override {
assert(args.compression_type == kSnappyCompression);
return Snappy_DecompressBlock(args, uncompressed_output);
}
};
class BuiltinDecompressorV2WithDict final : public BuiltinDecompressorV2 {
public:
explicit BuiltinDecompressorV2WithDict(const Slice& dict) : dict_(dict) {}
const char* Name() const override { return "BuiltinDecompressorV2WithDict"; }
Status DecompressBlock(const Args& args, char* uncompressed_output) override {
switch (args.compression_type) {
case kSnappyCompression:
return Snappy_DecompressBlock(args, uncompressed_output);
case kZlibCompression:
return Zlib_DecompressBlock(args, dict_, uncompressed_output);
case kBZip2Compression:
return BZip2_DecompressBlock(args, uncompressed_output);
case kLZ4Compression:
case kLZ4HCCompression:
return LZ4_DecompressBlock(args, dict_, uncompressed_output);
case kXpressCompression:
return XPRESS_DecompressBlock(args, uncompressed_output);
case kZSTD:
return ZSTD_DecompressBlock(args, dict_, this, uncompressed_output);
default:
return Status::NotSupported(
"Compression type not supported or not built-in: " +
CompressionTypeToString(args.compression_type));
}
}
const Slice& GetSerializedDict() const override { return dict_; }
size_t ApproximateOwnedMemoryUsage() const override {
return sizeof(BuiltinDecompressorV2WithDict);
}
protected:
const Slice dict_;
};
Status BuiltinDecompressorV2::MaybeCloneForDict(
const Slice& dict, std::unique_ptr<Decompressor>* out) {
assert(dict.size() > 0);
*out = std::make_unique<BuiltinDecompressorV2WithDict>(dict);
return Status::OK();
}
class BuiltinDecompressorV2OptimizeZstd : public BuiltinDecompressorV2 {
public:
const char* Name() const override {
return "BuiltinDecompressorV2OptimizeZstd";
}
ManagedWorkingArea ObtainWorkingArea(CompressionType preferred) override {
if (preferred == kZSTD) {
return ManagedWorkingArea(new UncompressionContext(kZSTD), this);
} else {
return {};
}
}
void ReleaseWorkingArea(WorkingArea* wa) override {
delete static_cast<UncompressionContext*>(wa);
}
Status DecompressBlock(const Args& args, char* uncompressed_output) override {
if (LIKELY(args.compression_type == kZSTD)) {
return ZSTD_DecompressBlock(args, Slice{}, this,
uncompressed_output);
} else {
return BuiltinDecompressorV2::DecompressBlock(args, uncompressed_output);
}
}
Status MaybeCloneForDict(const Slice& ,
std::unique_ptr<Decompressor>* ) override;
};
class BuiltinDecompressorV2OptimizeZstdWithDict final
: public BuiltinDecompressorV2OptimizeZstd {
public:
explicit BuiltinDecompressorV2OptimizeZstdWithDict(const Slice& dict)
:
#ifdef ROCKSDB_ZSTD_DDICT
dict_(dict),
ddict_(ZSTD_createDDict_byReference(dict.data(), dict.size())) {
assert(ddict_ != nullptr);
}
#else
dict_(dict) {
}
#endif
const char* Name() const override {
return "BuiltinDecompressorV2OptimizeZstdWithDict";
}
~BuiltinDecompressorV2OptimizeZstdWithDict() override {
#ifdef ROCKSDB_ZSTD_DDICT
size_t res = ZSTD_freeDDict(ddict_);
assert(res == 0); (void)res; #endif }
const Slice& GetSerializedDict() const override { return dict_; }
size_t ApproximateOwnedMemoryUsage() const override {
size_t sz = sizeof(BuiltinDecompressorV2WithDict);
#ifdef ROCKSDB_ZSTD_DDICT
sz += ZSTD_sizeof_DDict(ddict_);
#endif return sz;
}
Status DecompressBlock(const Args& args, char* uncompressed_output) override {
if (LIKELY(args.compression_type == kZSTD)) {
#ifdef ROCKSDB_ZSTD_DDICT
return ZSTD_DecompressBlock<true>(
args, ddict_, this, uncompressed_output);
#else
return ZSTD_DecompressBlock(args, dict_, this, uncompressed_output);
#endif } else {
return BuiltinDecompressorV2WithDict(dict_).DecompressBlock(
args, uncompressed_output);
}
}
protected:
const Slice dict_;
#ifdef ROCKSDB_ZSTD_DDICT
ZSTD_DDict* const ddict_;
#endif };
Status BuiltinDecompressorV2OptimizeZstd::MaybeCloneForDict(
const Slice& serialized_dict, std::unique_ptr<Decompressor>* out) {
*out = std::make_unique<BuiltinDecompressorV2OptimizeZstdWithDict>(
serialized_dict);
return Status::OK();
}
class BuiltinCompressionManagerV2 final : public CompressionManager {
public:
BuiltinCompressionManagerV2() = default;
~BuiltinCompressionManagerV2() override = default;
const char* Name() const override { return "BuiltinCompressionManagerV2"; }
const char* CompatibilityName() const override { return "BuiltinV2"; }
std::unique_ptr<Compressor> GetCompressor(const CompressionOptions& opts,
CompressionType type) override {
if (opts.max_compressed_bytes_per_kb <= 0) {
return nullptr;
}
if (!SupportsCompressionType(type)) {
type = ColumnFamilyOptions{}.compression;
}
switch (type) {
case kNoCompression:
default:
assert(type == kNoCompression); return nullptr;
case kSnappyCompression:
return std::make_unique<BuiltinSnappyCompressorV2>(opts);
case kZlibCompression:
return std::make_unique<BuiltinZlibCompressorV2>(opts);
case kBZip2Compression:
return std::make_unique<BuiltinBZip2CompressorV2>(opts);
case kLZ4Compression:
return std::make_unique<BuiltinLZ4CompressorV2NoDict>(opts);
case kLZ4HCCompression:
return std::make_unique<BuiltinLZ4HCCompressorV2>(opts);
case kXpressCompression:
return std::make_unique<BuiltinXpressCompressorV2>(opts);
case kZSTD:
return std::make_unique<BuiltinZSTDCompressorV2>(opts);
}
}
std::shared_ptr<Decompressor> GetDecompressor() override {
return GetGeneralDecompressor();
}
std::shared_ptr<Decompressor> GetDecompressorOptimizeFor(
CompressionType optimize_for_type) override {
if (optimize_for_type == kZSTD) {
return GetZstdDecompressor();
} else {
return GetGeneralDecompressor();
}
}
std::shared_ptr<Decompressor> GetDecompressorForTypes(
const CompressionType* types_begin,
const CompressionType* types_end) override {
if (types_begin == types_end) {
return nullptr;
} else if (types_begin + 1 == types_end &&
*types_begin == kSnappyCompression) {
return GetSnappyDecompressor();
} else if (std::find(types_begin, types_end, kZSTD) != types_end) {
return GetZstdDecompressor();
} else {
return GetGeneralDecompressor();
}
}
bool SupportsCompressionType(CompressionType type) const override {
return CompressionTypeSupported(type);
}
protected:
BuiltinDecompressorV2 decompressor_;
BuiltinDecompressorV2OptimizeZstd zstd_decompressor_;
BuiltinDecompressorV2SnappyOnly snappy_decompressor_;
public:
inline std::shared_ptr<Decompressor> GetGeneralDecompressor() {
return std::shared_ptr<Decompressor>(shared_from_this(), &decompressor_);
}
inline std::shared_ptr<Decompressor> GetZstdDecompressor() {
return std::shared_ptr<Decompressor>(shared_from_this(),
&zstd_decompressor_);
}
inline std::shared_ptr<Decompressor> GetSnappyDecompressor() {
return std::shared_ptr<Decompressor>(shared_from_this(),
&snappy_decompressor_);
}
};
const std::shared_ptr<BuiltinCompressionManagerV2>
kBuiltinCompressionManagerV2 =
std::make_shared<BuiltinCompressionManagerV2>();
std::shared_ptr<Decompressor>
BuiltinZSTDCompressorV2::GetOptimizedDecompressor() const {
return kBuiltinCompressionManagerV2->GetZstdDecompressor();
}
std::shared_ptr<Decompressor>
BuiltinSnappyCompressorV2::GetOptimizedDecompressor() const {
return kBuiltinCompressionManagerV2->GetSnappyDecompressor();
}
}
Status CompressionManager::CreateFromString(
const ConfigOptions& config_options, const std::string& value,
std::shared_ptr<CompressionManager>* result) {
if (value == kNullptrString || value.empty()) {
result->reset();
return Status::OK();
}
static std::once_flag loaded;
std::call_once(loaded, [&]() {
auto& library = *ObjectLibrary::Default();
library.AddFactory<CompressionManager>(
kBuiltinCompressionManagerV2->CompatibilityName(),
[](const std::string& ,
std::unique_ptr<CompressionManager>* guard,
std::string* ) {
*guard = std::make_unique<BuiltinCompressionManagerV2>();
return guard->get();
});
});
std::string id;
std::unordered_map<std::string, std::string> opt_map;
Status status = Customizable::GetOptionsMap(config_options, result->get(),
value, &id, &opt_map);
if (!status.ok()) { return status;
} else if (id.empty()) { return Status::NotSupported("Cannot reset object ", id);
} else {
status = config_options.registry->NewSharedObject(id, result);
}
if (config_options.ignore_unsupported_options && status.IsNotSupported()) {
return Status::OK();
} else if (status.ok()) {
status = Customizable::ConfigureNewObject(config_options, result->get(),
opt_map);
}
return status;
}
std::shared_ptr<CompressionManager>
CompressionManager::FindCompatibleCompressionManager(Slice compatibility_name) {
if (compatibility_name.compare(CompatibilityName()) == 0) {
return shared_from_this();
} else {
std::shared_ptr<CompressionManager> out;
Status s =
CreateFromString(ConfigOptions(), compatibility_name.ToString(), &out);
if (s.ok()) {
return out;
} else {
return nullptr;
}
}
}
const std::shared_ptr<CompressionManager>& GetBuiltinV2CompressionManager() {
static const std::shared_ptr<CompressionManager> v2_as_base =
kBuiltinCompressionManagerV2;
return v2_as_base;
}
Status LegacyForceBuiltinCompression(
Compressor& builtin_compressor,
Compressor::ManagedWorkingArea* working_area, Slice from,
GrowableBuffer* to) {
size_t n = from.size();
size_t upper_bound = ((19 * n) >> 4) + 604;
assert(builtin_compressor.GetPreferredCompressionType() <= kZSTD);
to->ResetForSize(upper_bound);
CompressionType actual_type = kNoCompression;
Status s = builtin_compressor.CompressBlock(
from, to->data(), &to->MutableSize(), &actual_type, working_area);
TEST_SYNC_POINT_CALLBACK("LegacyForceBuiltinCompression:TamperWithStatus",
&s);
if (!s.ok()) {
return s;
}
if (actual_type == kNoCompression) {
assert(actual_type != kNoCompression);
return Status::Corruption("Compression unexpectedly declined or aborted");
}
assert(actual_type == builtin_compressor.GetPreferredCompressionType());
return Status::OK();
}
}