#ifndef ROCKSDB_LITE
#include "db/transaction_log_impl.h"
#include <cinttypes>
#include "db/write_batch_internal.h"
#include "file/sequence_file_reader.h"
namespace ROCKSDB_NAMESPACE {
TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dir, const ImmutableDBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer)
: dir_(dir),
options_(options),
read_options_(read_options),
soptions_(soptions),
starting_sequence_number_(seq),
files_(std::move(files)),
started_(false),
is_valid_(false),
current_file_index_(0),
current_batch_seq_(0),
current_last_seq_(0),
versions_(versions),
seq_per_batch_(seq_per_batch),
io_tracer_(io_tracer) {
assert(files_ != nullptr);
assert(versions_ != nullptr);
current_status_.PermitUncheckedError(); reporter_.env = options_->env;
reporter_.info_log = options_->info_log.get();
SeekToStartSequence(); }
Status TransactionLogIteratorImpl::OpenLogFile(
const LogFile* log_file,
std::unique_ptr<SequentialFileReader>* file_reader) {
FileSystemPtr fs(options_->fs, io_tracer_);
std::unique_ptr<FSSequentialFile> file;
std::string fname;
Status s;
EnvOptions optimized_env_options = fs->OptimizeForLogRead(soptions_);
if (log_file->Type() == kArchivedLogFile) {
fname = ArchivedLogFileName(dir_, log_file->LogNumber());
s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
} else {
fname = LogFileName(dir_, log_file->LogNumber());
s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
if (!s.ok()) {
fname = ArchivedLogFileName(dir_, log_file->LogNumber());
s = fs->NewSequentialFile(fname, optimized_env_options,
&file, nullptr);
}
}
if (s.ok()) {
file_reader->reset(
new SequentialFileReader(std::move(file), fname, io_tracer_));
}
return s;
}
BatchResult TransactionLogIteratorImpl::GetBatch() {
assert(is_valid_); BatchResult result;
result.sequence = current_batch_seq_;
result.writeBatchPtr = std::move(current_batch_);
return result;
}
Status TransactionLogIteratorImpl::status() { return current_status_; }
bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; }
bool TransactionLogIteratorImpl::RestrictedRead(Slice* record) {
if (current_last_seq_ >= versions_->LastSequence()) {
return false;
}
return current_log_reader_->ReadRecord(record, &scratch_);
}
void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
bool strict) {
Slice record;
started_ = false;
is_valid_ = false;
if (files_->size() <= start_file_index) {
return;
}
Status s =
OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
if (!s.ok()) {
current_status_ = s;
reporter_.Info(current_status_.ToString().c_str());
return;
}
while (RestrictedRead(&record)) {
if (record.size() < WriteBatchInternal::kHeader) {
reporter_.Corruption(
record.size(), Status::Corruption("very small log record"));
continue;
}
UpdateCurrentWriteBatch(record);
if (current_last_seq_ >= starting_sequence_number_) {
if (strict && current_batch_seq_ != starting_sequence_number_) {
current_status_ = Status::Corruption(
"Gap in sequence number. Could not "
"seek to required sequence number");
reporter_.Info(current_status_.ToString().c_str());
return;
} else if (strict) {
reporter_.Info("Could seek required sequence number. Iterator will "
"continue.");
}
is_valid_ = true;
started_ = true; return;
} else {
is_valid_ = false;
}
}
if (strict) {
current_status_ = Status::Corruption(
"Gap in sequence number. Could not "
"seek to required sequence number");
reporter_.Info(current_status_.ToString().c_str());
} else if (files_->size() != 1) {
current_status_ = Status::Corruption(
"Start sequence was not found, "
"skipping to the next available");
reporter_.Info(current_status_.ToString().c_str());
NextImpl(true);
}
}
void TransactionLogIteratorImpl::Next() {
return NextImpl(false);
}
void TransactionLogIteratorImpl::NextImpl(bool internal) {
Slice record;
is_valid_ = false;
if (!internal && !started_) {
return SeekToStartSequence();
}
while(true) {
assert(current_log_reader_);
if (current_log_reader_->IsEOF()) {
current_log_reader_->UnmarkEOF();
}
while (RestrictedRead(&record)) {
if (record.size() < WriteBatchInternal::kHeader) {
reporter_.Corruption(
record.size(), Status::Corruption("very small log record"));
continue;
} else {
assert(internal || started_);
assert(!internal || !started_);
UpdateCurrentWriteBatch(record);
if (internal && !started_) {
started_ = true;
}
return;
}
}
if (current_file_index_ < files_->size() - 1) {
++current_file_index_;
Status s = OpenLogReader(files_->at(current_file_index_).get());
if (!s.ok()) {
is_valid_ = false;
current_status_ = s;
return;
}
} else {
is_valid_ = false;
if (current_last_seq_ == versions_->LastSequence()) {
current_status_ = Status::OK();
} else {
const char* msg = "Create a new iterator to fetch the new tail.";
current_status_ = Status::TryAgain(msg);
}
return;
}
}
}
bool TransactionLogIteratorImpl::IsBatchExpected(
const WriteBatch* batch, const SequenceNumber expected_seq) {
assert(batch);
SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
if (batchSeq != expected_seq) {
char buf[200];
snprintf(buf, sizeof(buf),
"Discontinuity in log records. Got seq=%" PRIu64
", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64
".Log iterator will reseek the correct batch.",
batchSeq, expected_seq, versions_->LastSequence());
reporter_.Info(buf);
return false;
}
return true;
}
void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
std::unique_ptr<WriteBatch> batch(new WriteBatch());
Status s = WriteBatchInternal::SetContents(batch.get(), record);
s.PermitUncheckedError();
SequenceNumber expected_seq = current_last_seq_ + 1;
if (started_ && !IsBatchExpected(batch.get(), expected_seq)) {
if (expected_seq < files_->at(current_file_index_)->StartSequence()) {
if (current_file_index_ != 0) {
current_file_index_--;
}
}
starting_sequence_number_ = expected_seq;
current_status_ = Status::NotFound("Gap in sequence numbers");
return SeekToStartSequence(current_file_index_, !seq_per_batch_);
}
struct BatchCounter : public WriteBatch::Handler {
SequenceNumber sequence_;
BatchCounter(SequenceNumber sequence) : sequence_(sequence) {}
Status MarkNoop(bool empty_batch) override {
if (!empty_batch) {
sequence_++;
}
return Status::OK();
}
Status MarkEndPrepare(const Slice&) override {
sequence_++;
return Status::OK();
}
Status MarkCommit(const Slice&) override {
sequence_++;
return Status::OK();
}
Status PutCF(uint32_t , const Slice& ,
const Slice& ) override {
return Status::OK();
}
Status DeleteCF(uint32_t , const Slice& ) override {
return Status::OK();
}
Status SingleDeleteCF(uint32_t , const Slice& ) override {
return Status::OK();
}
Status MergeCF(uint32_t , const Slice& ,
const Slice& ) override {
return Status::OK();
}
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
};
current_batch_seq_ = WriteBatchInternal::Sequence(batch.get());
if (seq_per_batch_) {
BatchCounter counter(current_batch_seq_);
batch->Iterate(&counter);
current_last_seq_ = counter.sequence_;
} else {
current_last_seq_ =
current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
}
assert(current_last_seq_ <= versions_->LastSequence());
current_batch_ = std::move(batch);
is_valid_ = true;
current_status_ = Status::OK();
}
Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
std::unique_ptr<SequentialFileReader> file;
Status s = OpenLogFile(log_file, &file);
if (!s.ok()) {
return s;
}
assert(file);
current_log_reader_.reset(
new log::Reader(options_->info_log, std::move(file), &reporter_,
read_options_.verify_checksums_, log_file->LogNumber()));
return Status::OK();
}
} #endif