#include "file/readahead_raf.h"
#include <algorithm>
#include <mutex>
#include "file/read_write_util.h"
#include "rocksdb/file_system.h"
#include "util/aligned_buffer.h"
#include "util/rate_limiter_impl.h"
namespace ROCKSDB_NAMESPACE {
namespace {
class ReadaheadRandomAccessFile : public FSRandomAccessFile {
public:
ReadaheadRandomAccessFile(std::unique_ptr<FSRandomAccessFile>&& file,
size_t readahead_size)
: file_(std::move(file)),
alignment_(file_->GetRequiredBufferAlignment()),
readahead_size_(Roundup(readahead_size, alignment_)),
buffer_(),
buffer_offset_(0) {
buffer_.Alignment(alignment_);
buffer_.AllocateNewBuffer(readahead_size_);
}
ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) =
delete;
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override {
if (n + alignment_ >= readahead_size_) {
return file_->Read(offset, n, options, result, scratch, dbg);
}
std::unique_lock<std::mutex> lk(lock_);
size_t cached_len = 0;
if (TryReadFromCache(offset, n, &cached_len, scratch) &&
(cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
*result = Slice(scratch, cached_len);
return IOStatus::OK();
}
size_t advanced_offset = static_cast<size_t>(offset + cached_len);
size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
IOStatus s = ReadIntoBuffer(chunk_offset, readahead_size_, options, dbg);
if (s.ok()) {
size_t remaining_len;
TryReadFromCache(advanced_offset, n - cached_len, &remaining_len,
scratch + cached_len);
*result = Slice(scratch, cached_len + remaining_len);
}
return s;
}
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) override {
if (n < readahead_size_) {
return IOStatus::OK();
}
std::unique_lock<std::mutex> lk(lock_);
size_t offset_ = static_cast<size_t>(offset);
size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_);
if (prefetch_offset == buffer_offset_) {
return IOStatus::OK();
}
return ReadIntoBuffer(prefetch_offset,
Roundup(offset_ + n, alignment_) - prefetch_offset,
options, dbg);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return file_->GetUniqueId(id, max_size);
}
void Hint(AccessPattern pattern) override { file_->Hint(pattern); }
IOStatus InvalidateCache(size_t offset, size_t length) override {
std::unique_lock<std::mutex> lk(lock_);
buffer_.Clear();
return file_->InvalidateCache(offset, length);
}
bool use_direct_io() const override { return file_->use_direct_io(); }
IOStatus GetFileSize(uint64_t* result) override {
return file_->GetFileSize(result);
}
private:
bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len,
char* scratch) const {
if (offset < buffer_offset_ ||
offset >= buffer_offset_ + buffer_.CurrentSize()) {
*cached_len = 0;
return false;
}
uint64_t offset_in_buffer = offset - buffer_offset_;
*cached_len = std::min(
buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
return true;
}
IOStatus ReadIntoBuffer(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) const {
if (n > buffer_.Capacity()) {
n = buffer_.Capacity();
}
assert(IsFileSectorAligned(offset, alignment_));
assert(IsFileSectorAligned(n, alignment_));
Slice result;
IOStatus s =
file_->Read(offset, n, options, &result, buffer_.BufferStart(), dbg);
if (s.ok()) {
buffer_offset_ = offset;
buffer_.Size(result.size());
assert(result.size() == 0 || buffer_.BufferStart() == result.data());
}
return s;
}
const std::unique_ptr<FSRandomAccessFile> file_;
const size_t alignment_;
const size_t readahead_size_;
mutable std::mutex lock_;
mutable AlignedBuffer buffer_;
mutable uint64_t buffer_offset_;
};
}
std::unique_ptr<FSRandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<FSRandomAccessFile>&& file, size_t readahead_size) {
std::unique_ptr<FSRandomAccessFile> result(
new ReadaheadRandomAccessFile(std::move(file), readahead_size));
return result;
}
}