#include "table/block_fetcher.h"
#include <cassert>
#include <cinttypes>
#include <string>
#include "logging/logging.h"
#include "memory/memory_allocator_impl.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/env.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/block_type.h"
#include "table/block_based/reader_common.h"
#include "table/format.h"
#include "table/persistent_cache_helper.h"
#include "util/compression.h"
#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE {
inline void BlockFetcher::ProcessTrailerIfPresent() {
if (footer_.GetBlockTrailerSize() > 0) {
assert(footer_.GetBlockTrailerSize() == BlockBasedTable::kBlockTrailerSize);
if (read_options_.verify_checksums) {
io_status_ = status_to_io_status(VerifyBlockChecksum(
footer_, slice_.data(), block_size_, file_->file_name(),
handle_.offset(), block_type_));
RecordTick(ioptions_.stats, BLOCK_CHECKSUM_COMPUTE_COUNT);
if (!io_status_.ok()) {
assert(io_status_.IsCorruption());
RecordTick(ioptions_.stats, BLOCK_CHECKSUM_MISMATCH_COUNT);
}
}
compression_type() =
BlockBasedTable::GetBlockCompressionType(slice_.data(), block_size_);
} else {
compression_type() = kNoCompression;
}
}
inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
if (cache_options_.persistent_cache &&
!cache_options_.persistent_cache->IsCompressed()) {
Status status = PersistentCacheHelper::LookupUncompressed(
cache_options_, handle_, contents_);
if (status.ok()) {
return true;
} else {
if (ioptions_.logger && !status.IsNotFound()) {
assert(!status.ok());
ROCKS_LOG_INFO(ioptions_.logger,
"Error reading from persistent cache. %s",
status.ToString().c_str());
}
}
}
return false;
}
inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
if (prefetch_buffer_ != nullptr) {
IOOptions opts;
IODebugContext dbg;
IOStatus io_s = file_->PrepareIOOptions(read_options_, opts, &dbg);
if (io_s.ok()) {
bool read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
&io_s, for_compaction_);
if (read_from_prefetch_buffer) {
ProcessTrailerIfPresent();
if (io_status_.ok()) {
got_from_prefetch_buffer_ = true;
used_buf_ = const_cast<char*>(slice_.data());
} else if (io_status_.IsCorruption()) {
return true;
}
}
}
if (!io_s.ok()) {
io_status_ = io_s;
return true;
}
}
return got_from_prefetch_buffer_;
}
inline bool BlockFetcher::TryGetSerializedBlockFromPersistentCache() {
if (cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) {
std::unique_ptr<char[]> buf;
io_status_ = status_to_io_status(PersistentCacheHelper::LookupSerialized(
cache_options_, handle_, &buf, block_size_with_trailer_));
if (io_status_.ok()) {
heap_buf_ = CacheAllocationPtr(buf.release());
used_buf_ = heap_buf_.get();
slice_ = Slice(heap_buf_.get(), block_size_);
ProcessTrailerIfPresent();
return true;
} else if (!io_status_.IsNotFound() && ioptions_.logger) {
assert(!io_status_.ok());
ROCKS_LOG_INFO(ioptions_.logger,
"Error reading from persistent cache. %s",
io_status_.ToString().c_str());
}
}
return false;
}
inline void BlockFetcher::PrepareBufferForBlockFromFile() {
if ((do_uncompress_ || ioptions_.allow_mmap_reads) &&
block_size_with_trailer_ < kDefaultStackBufferSize) {
used_buf_ = &stack_buf_[0];
} else if (maybe_compressed_ && !do_uncompress_) {
compressed_buf_ =
AllocateBlock(block_size_with_trailer_, memory_allocator_compressed_);
used_buf_ = compressed_buf_.get();
} else {
heap_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_);
used_buf_ = heap_buf_.get();
}
}
inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
if (io_status_.ok() && read_options_.fill_cache &&
cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) {
PersistentCacheHelper::InsertSerialized(cache_options_, handle_, used_buf_,
block_size_with_trailer_);
}
}
inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
if (io_status_.ok() && !got_from_prefetch_buffer_ &&
read_options_.fill_cache && cache_options_.persistent_cache &&
!cache_options_.persistent_cache->IsCompressed()) {
PersistentCacheHelper::InsertUncompressed(cache_options_, handle_,
*contents_);
}
}
inline void BlockFetcher::CopyBufferToHeapBuf() {
assert(used_buf_ != heap_buf_.get());
heap_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_);
memcpy(heap_buf_.get(), used_buf_, block_size_with_trailer_);
#ifndef NDEBUG
num_heap_buf_memcpy_++;
#endif
}
inline void BlockFetcher::CopyBufferToCompressedBuf() {
assert(used_buf_ != compressed_buf_.get());
compressed_buf_ =
AllocateBlock(block_size_with_trailer_, memory_allocator_compressed_);
memcpy(compressed_buf_.get(), used_buf_, block_size_with_trailer_);
#ifndef NDEBUG
num_compressed_buf_memcpy_++;
#endif
}
inline void BlockFetcher::GetBlockContents() {
if (slice_.data() != used_buf_) {
*contents_ = BlockContents(Slice(slice_.data(), block_size_));
} else {
if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
CopyBufferToHeapBuf();
} else if (used_buf_ == compressed_buf_.get()) {
if (compression_type() == kNoCompression &&
memory_allocator_ != memory_allocator_compressed_) {
CopyBufferToHeapBuf();
} else {
heap_buf_ = std::move(compressed_buf_);
}
} else if (direct_io_buf_.get() != nullptr || use_fs_scratch_) {
if (compression_type() == kNoCompression) {
CopyBufferToHeapBuf();
} else {
CopyBufferToCompressedBuf();
heap_buf_ = std::move(compressed_buf_);
}
}
*contents_ = BlockContents(std::move(heap_buf_), block_size_);
}
#ifndef NDEBUG
contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
#endif
}
void BlockFetcher::ReadBlock(bool retry) {
FSReadRequest read_req;
IOOptions opts;
IODebugContext dbg;
io_status_ = file_->PrepareIOOptions(read_options_, opts, &dbg);
opts.verify_and_reconstruct_read = retry;
read_req.status.PermitUncheckedError();
if (io_status_.ok()) {
if (file_->use_direct_io()) {
PERF_TIMER_GUARD(block_read_time);
PERF_CPU_TIMER_GUARD(
block_read_cpu_time,
ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
io_status_ =
file_->Read(opts, handle_.offset(), block_size_with_trailer_, &slice_,
nullptr, &direct_io_buf_, &dbg);
PERF_COUNTER_ADD(block_read_count, 1);
used_buf_ = const_cast<char*>(slice_.data());
} else if (use_fs_scratch_) {
PERF_TIMER_GUARD(block_read_time);
PERF_CPU_TIMER_GUARD(
block_read_cpu_time,
ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
read_req.offset = handle_.offset();
read_req.len = block_size_with_trailer_;
read_req.scratch = nullptr;
io_status_ = file_->MultiRead(opts, &read_req, 1,
nullptr, &dbg);
PERF_COUNTER_ADD(block_read_count, 1);
slice_ = Slice(read_req.result.data(), read_req.result.size());
used_buf_ = const_cast<char*>(slice_.data());
} else {
PrepareBufferForBlockFromFile();
PERF_TIMER_GUARD(block_read_time);
PERF_CPU_TIMER_GUARD(
block_read_cpu_time,
ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
io_status_ =
file_->Read(opts, handle_.offset(), block_size_with_trailer_,
&slice_, used_buf_,
nullptr, &dbg);
PERF_COUNTER_ADD(block_read_count, 1);
#ifndef NDEBUG
if (slice_.data() == &stack_buf_[0]) {
num_stack_buf_memcpy_++;
} else if (slice_.data() == heap_buf_.get()) {
num_heap_buf_memcpy_++;
} else if (slice_.data() == compressed_buf_.get()) {
num_compressed_buf_memcpy_++;
}
#endif
}
}
switch (block_type_) {
case BlockType::kFilter:
case BlockType::kFilterPartitionIndex:
PERF_COUNTER_ADD(filter_block_read_count, 1);
break;
case BlockType::kCompressionDictionary:
PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
break;
case BlockType::kIndex:
PERF_COUNTER_ADD(index_block_read_count, 1);
break;
default:
break;
}
PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_);
IGNORE_STATUS_IF_ERROR(io_status_);
if (io_status_.ok()) {
if (use_fs_scratch_ && !read_req.status.ok()) {
io_status_ = read_req.status;
} else if (slice_.size() != block_size_with_trailer_) {
io_status_ = IOStatus::Corruption(
"truncated block read from " + file_->file_name() + " offset " +
std::to_string(handle_.offset()) + ", expected " +
std::to_string(block_size_with_trailer_) + " bytes, got " +
std::to_string(slice_.size()));
}
}
if (io_status_.ok()) {
ProcessTrailerIfPresent();
}
if (retry) {
RecordTick(ioptions_.stats, FILE_READ_CORRUPTION_RETRY_COUNT);
}
if (io_status_.ok()) {
InsertCompressedBlockToPersistentCacheIfNeeded();
fs_buf_ = std::move(read_req.fs_scratch);
if (retry) {
RecordTick(ioptions_.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
}
} else {
ReleaseFileSystemProvidedBuffer(&read_req);
direct_io_buf_.reset();
compressed_buf_.reset();
heap_buf_.reset();
used_buf_ = nullptr;
}
}
IOStatus BlockFetcher::ReadBlockContents() {
if (TryGetUncompressBlockFromPersistentCache()) {
compression_type() = kNoCompression;
#ifndef NDEBUG
contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
#endif return IOStatus::OK();
}
if (TryGetFromPrefetchBuffer()) {
if (io_status_.IsCorruption() && retry_corrupt_read_) {
ReadBlock(true);
}
if (!io_status_.ok()) {
assert(!fs_buf_);
return io_status_;
}
} else if (!TryGetSerializedBlockFromPersistentCache()) {
ReadBlock(false);
if (io_status_.IsCorruption() && retry_corrupt_read_) {
assert(!fs_buf_);
ReadBlock(true);
}
if (!io_status_.ok()) {
assert(!fs_buf_);
return io_status_;
}
}
if (do_uncompress_ && compression_type() != kNoCompression) {
PERF_TIMER_GUARD(block_decompress_time);
slice_.size_ = block_size_;
decomp_args_.compressed_data = slice_;
io_status_ = status_to_io_status(DecompressSerializedBlock(
decomp_args_, *decompressor_, contents_, ioptions_, memory_allocator_));
#ifndef NDEBUG
num_heap_buf_memcpy_++;
#endif
} else {
GetBlockContents();
slice_ = Slice();
}
InsertUncompressedBlockToPersistentCacheIfNeeded();
return io_status_;
}
IOStatus BlockFetcher::ReadAsyncBlockContents() {
if (TryGetUncompressBlockFromPersistentCache()) {
compression_type() = kNoCompression;
#ifndef NDEBUG
contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
#endif return IOStatus::OK();
} else if (!TryGetSerializedBlockFromPersistentCache()) {
assert(prefetch_buffer_ != nullptr);
if (!for_compaction_) {
IOOptions opts;
IODebugContext dbg;
IOStatus io_s = file_->PrepareIOOptions(read_options_, opts, &dbg);
if (!io_s.ok()) {
return io_s;
}
io_s = status_to_io_status(prefetch_buffer_->PrefetchAsync(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_));
if (io_s.IsTryAgain()) {
return io_s;
}
if (io_s.ok()) {
got_from_prefetch_buffer_ = true;
ProcessTrailerIfPresent();
if (io_status_.IsCorruption() && retry_corrupt_read_) {
got_from_prefetch_buffer_ = false;
ReadBlock( true);
}
if (!io_status_.ok()) {
assert(!fs_buf_);
return io_status_;
}
used_buf_ = const_cast<char*>(slice_.data());
if (do_uncompress_ && compression_type() != kNoCompression) {
PERF_TIMER_GUARD(block_decompress_time);
slice_.size_ = block_size_;
decomp_args_.compressed_data = slice_;
io_status_ = status_to_io_status(
DecompressSerializedBlock(decomp_args_, *decompressor_, contents_,
ioptions_, memory_allocator_));
#ifndef NDEBUG
num_heap_buf_memcpy_++;
#endif
} else {
GetBlockContents();
}
InsertUncompressedBlockToPersistentCacheIfNeeded();
return io_status_;
}
}
return ReadBlockContents();
}
return io_status_;
}
}