#include "utilities/cache_dump_load_impl.h"
#include <limits>
#include "cache/cache_entry_roles.h"
#include "cache/cache_key.h"
#include "file/writable_file_writer.h"
#include "port/lang.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "rocksdb/utilities/ldb_cmd.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/format.h"
#include "util/crc32c.h"
namespace ROCKSDB_NAMESPACE {
Status CacheDumperImpl::SetDumpFilter(const std::vector<DB*>& db_list) {
Status s = Status::OK();
dump_all_keys_ = false;
for (size_t i = 0; i < db_list.size(); i++) {
assert(i < db_list.size());
TablePropertiesCollection ptc;
assert(db_list[i] != nullptr);
s = db_list[i]->GetPropertiesOfAllTables(&ptc);
if (!s.ok()) {
return s;
}
for (auto id = ptc.begin(); id != ptc.end(); id++) {
OffsetableCacheKey base;
bool is_stable;
BlockBasedTable::SetupBaseCacheKey(id->second.get(),
"",
0, &base, &is_stable);
if (is_stable) {
Slice prefix_slice = base.CommonPrefixSlice();
assert(prefix_slice.size() == OffsetableCacheKey::kCommonPrefixSize);
prefix_filter_.insert(prefix_slice.ToString());
}
}
}
return s;
}
IOStatus CacheDumperImpl::DumpCacheEntriesToWriter() {
if (cache_ == nullptr) {
return IOStatus::InvalidArgument("Cache is null");
}
if (writer_ == nullptr) {
return IOStatus::InvalidArgument("CacheDumpWriter is null");
}
if (options_.clock == nullptr) {
return IOStatus::InvalidArgument("System clock is null");
}
clock_ = options_.clock;
deadline_ = options_.deadline;
sequence_num_ = 0;
IOStatus io_s = WriteHeader();
if (!io_s.ok()) {
return io_s;
}
std::string buf;
cache_->ApplyToAllEntries(DumpOneBlockCallBack(buf), {});
io_s = WriteFooter();
if (!io_s.ok()) {
return io_s;
}
io_s = writer_->Close();
return io_s;
}
bool CacheDumperImpl::ShouldFilterOut(const Slice& key) {
if (key.size() < OffsetableCacheKey::kCommonPrefixSize) {
return true;
}
Slice key_prefix(key.data(), OffsetableCacheKey::kCommonPrefixSize);
std::string prefix = key_prefix.ToString();
return prefix_filter_.find(prefix) == prefix_filter_.end();
}
std::function<void(const Slice&, Cache::ObjectPtr, size_t,
const Cache::CacheItemHelper*)>
CacheDumperImpl::DumpOneBlockCallBack(std::string& buf) {
return [&](const Slice& key, Cache::ObjectPtr value, size_t ,
const Cache::CacheItemHelper* helper) {
if (helper == nullptr || helper->size_cb == nullptr ||
helper->saveto_cb == nullptr) {
return;
}
if (options_.max_size_bytes > 0 &&
dumped_size_bytes_ > options_.max_size_bytes) {
return;
}
uint64_t timestamp = clock_->NowMicros();
if (deadline_.count()) {
std::chrono::microseconds now = std::chrono::microseconds(timestamp);
if (now >= deadline_) {
return;
}
}
CacheEntryRole role = helper->role;
CacheDumpUnitType type = CacheDumpUnitType::kBlockTypeMax;
switch (role) {
case CacheEntryRole::kDataBlock:
type = CacheDumpUnitType::kData;
break;
case CacheEntryRole::kFilterBlock:
type = CacheDumpUnitType::kFilter;
break;
case CacheEntryRole::kFilterMetaBlock:
type = CacheDumpUnitType::kFilterMetaBlock;
break;
case CacheEntryRole::kIndexBlock:
type = CacheDumpUnitType::kIndex;
break;
default:
return;
}
if (!dump_all_keys_ && ShouldFilterOut(key)) {
return;
}
assert(type != CacheDumpUnitType::kBlockTypeMax);
size_t len = helper->size_cb(value);
buf.assign(len, '\0');
Status s = helper->saveto_cb(value, 0, len, buf.data());
if (s.ok()) {
WriteBlock(type, key, buf, timestamp).PermitUncheckedError();
dumped_size_bytes_ += len;
}
};
}
IOStatus CacheDumperImpl::WriteBlock(CacheDumpUnitType type, const Slice& key,
const Slice& value, uint64_t timestamp) {
uint32_t value_checksum = crc32c::Value(value.data(), value.size());
DumpUnit dump_unit;
dump_unit.timestamp = timestamp;
dump_unit.key = key;
dump_unit.type = type;
dump_unit.value_len = value.size();
dump_unit.value = const_cast<char*>(value.data());
dump_unit.value_checksum = value_checksum;
std::string encoded_data;
CacheDumperHelper::EncodeDumpUnit(dump_unit, &encoded_data);
DumpUnitMeta unit_meta;
unit_meta.sequence_num = sequence_num_;
sequence_num_++;
unit_meta.dump_unit_checksum =
crc32c::Value(encoded_data.data(), encoded_data.size());
unit_meta.dump_unit_size = encoded_data.size();
std::string encoded_meta;
CacheDumperHelper::EncodeDumpUnitMeta(unit_meta, &encoded_meta);
assert(writer_ != nullptr);
IOStatus io_s = writer_->WriteMetadata(encoded_meta);
if (!io_s.ok()) {
return io_s;
}
return writer_->WritePacket(encoded_data);
}
IOStatus CacheDumperImpl::WriteHeader() {
std::string header_key = "header";
std::ostringstream s;
s << kTraceMagic << "\t"
<< "Cache dump format version: " << kCacheDumpMajorVersion << "."
<< kCacheDumpMinorVersion << "\t" << "RocksDB Version: " << kMajorVersion
<< "." << kMinorVersion << "\t"
<< "Format: dump_unit_metadata <sequence_number, dump_unit_checksum, "
"dump_unit_size>, dump_unit <timestamp, key, block_type, "
"block_size, block_data, block_checksum> cache_value\n";
std::string header_value(s.str());
CacheDumpUnitType type = CacheDumpUnitType::kHeader;
uint64_t timestamp = clock_->NowMicros();
return WriteBlock(type, header_key, header_value, timestamp);
}
IOStatus CacheDumperImpl::WriteFooter() {
std::string footer_key = "footer";
std::string footer_value("cache dump completed");
CacheDumpUnitType type = CacheDumpUnitType::kFooter;
uint64_t timestamp = clock_->NowMicros();
return WriteBlock(type, footer_key, footer_value, timestamp);
}
IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
(void)options_;
if (secondary_cache_ == nullptr) {
return IOStatus::InvalidArgument("Secondary Cache is null");
}
if (reader_ == nullptr) {
return IOStatus::InvalidArgument("CacheDumpReader is null");
}
IOStatus io_s;
DumpUnit dump_unit;
std::string data;
io_s = ReadHeader(&data, &dump_unit);
if (!io_s.ok()) {
return io_s;
}
while (io_s.ok()) {
dump_unit.reset();
data.clear();
io_s = ReadCacheBlock(&data, &dump_unit);
if (!io_s.ok()) {
break;
}
if (dump_unit.type == CacheDumpUnitType::kFooter) {
break;
}
Slice content =
Slice(static_cast<char*>(dump_unit.value), dump_unit.value_len);
Status s = secondary_cache_->InsertSaved(dump_unit.key, content);
if (!s.ok()) {
io_s = status_to_io_status(std::move(s));
}
}
if (dump_unit.type == CacheDumpUnitType::kFooter) {
return IOStatus::OK();
} else {
return io_s;
}
}
IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data,
DumpUnitMeta* unit_meta) {
assert(reader_ != nullptr);
assert(data != nullptr);
assert(unit_meta != nullptr);
IOStatus io_s = reader_->ReadMetadata(data);
if (!io_s.ok()) {
return io_s;
}
return status_to_io_status(
CacheDumperHelper::DecodeDumpUnitMeta(*data, unit_meta));
}
IOStatus CacheDumpedLoaderImpl::ReadDumpUnit(size_t len, std::string* data,
DumpUnit* unit) {
assert(reader_ != nullptr);
assert(data != nullptr);
assert(unit != nullptr);
IOStatus io_s = reader_->ReadPacket(data);
if (!io_s.ok()) {
return io_s;
}
if (data->size() != len) {
return IOStatus::Corruption(
"The data being read out does not match the size stored in metadata!");
}
Slice block;
return status_to_io_status(CacheDumperHelper::DecodeDumpUnit(*data, unit));
}
IOStatus CacheDumpedLoaderImpl::ReadHeader(std::string* data,
DumpUnit* dump_unit) {
DumpUnitMeta header_meta;
header_meta.reset();
std::string meta_string;
IOStatus io_s = ReadDumpUnitMeta(&meta_string, &header_meta);
if (!io_s.ok()) {
return io_s;
}
io_s = ReadDumpUnit(header_meta.dump_unit_size, data, dump_unit);
if (!io_s.ok()) {
return io_s;
}
uint32_t unit_checksum = crc32c::Value(data->data(), data->size());
if (unit_checksum != header_meta.dump_unit_checksum) {
return IOStatus::Corruption("Read header unit corrupted!");
}
return io_s;
}
IOStatus CacheDumpedLoaderImpl::ReadCacheBlock(std::string* data,
DumpUnit* dump_unit) {
DumpUnitMeta unit_meta;
unit_meta.reset();
std::string unit_string;
IOStatus io_s = ReadDumpUnitMeta(&unit_string, &unit_meta);
if (!io_s.ok()) {
return io_s;
}
io_s = ReadDumpUnit(unit_meta.dump_unit_size, data, dump_unit);
if (!io_s.ok()) {
return io_s;
}
uint32_t unit_checksum = crc32c::Value(data->data(), data->size());
if (unit_checksum != unit_meta.dump_unit_checksum) {
return IOStatus::Corruption(
"Checksum does not match! Read dumped unit corrupted!");
}
return io_s;
}
}