#include <cinttypes>
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "logging/logging.h"
#include "memtable/wbwi_memtable.h"
#include "monitoring/perf_context_imp.h"
#include "options/options_helper.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
namespace ROCKSDB_NAMESPACE {
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}
return DB::Put(o, column_family, key, val);
}
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& val) {
const Status s = FailIfTsMismatchCf(column_family, ts);
if (!s.ok()) {
return s;
}
return DB::Put(o, column_family, key, ts, val);
}
Status DBImpl::PutEntity(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}
return DB::PutEntity(options, column_family, key, columns);
}
Status DBImpl::PutEntity(const WriteOptions& options, const Slice& key,
const AttributeGroups& attribute_groups) {
for (const AttributeGroup& ag : attribute_groups) {
const Status s = FailIfCfHasTs(ag.column_family());
if (!s.ok()) {
return s;
}
}
return DB::PutEntity(options, key, attribute_groups);
}
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
if (!cfh->cfd()->ioptions().merge_operator) {
return Status::NotSupported("Provide a merge_operator when opening DB");
} else {
return DB::Merge(o, column_family, key, val);
}
}
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& val) {
const Status s = FailIfTsMismatchCf(column_family, ts);
if (!s.ok()) {
return s;
}
return DB::Merge(o, column_family, key, ts, val);
}
Status DBImpl::Delete(const WriteOptions& write_options,
ColumnFamilyHandle* column_family, const Slice& key) {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}
return DB::Delete(write_options, column_family, key);
}
Status DBImpl::Delete(const WriteOptions& write_options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& ts) {
const Status s = FailIfTsMismatchCf(column_family, ts);
if (!s.ok()) {
return s;
}
return DB::Delete(write_options, column_family, key, ts);
}
Status DBImpl::SingleDelete(const WriteOptions& write_options,
ColumnFamilyHandle* column_family,
const Slice& key) {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}
return DB::SingleDelete(write_options, column_family, key);
}
Status DBImpl::SingleDelete(const WriteOptions& write_options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& ts) {
const Status s = FailIfTsMismatchCf(column_family, ts);
if (!s.ok()) {
return s;
}
return DB::SingleDelete(write_options, column_family, key, ts);
}
Status DBImpl::DeleteRange(const WriteOptions& write_options,
ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}
return DB::DeleteRange(write_options, column_family, begin_key, end_key);
}
Status DBImpl::DeleteRange(const WriteOptions& write_options,
ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key,
const Slice& ts) {
const Status s = FailIfTsMismatchCf(column_family, ts);
if (!s.ok()) {
return s;
}
return DB::DeleteRange(write_options, column_family, begin_key, end_key, ts);
}
void DBImpl::SetRecoverableStatePreReleaseCallback(
PreReleaseCallback* callback) {
recoverable_state_pre_release_callback_.reset(callback);
}
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
Status s;
if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo(
my_batch, write_options.protection_bytes_per_key);
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, nullptr,
nullptr,
nullptr);
}
return s;
}
Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
UserWriteCallback* user_write_cb) {
Status s;
if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo(
my_batch, write_options.protection_bytes_per_key);
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, callback, user_write_cb);
}
return s;
}
Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch,
UserWriteCallback* user_write_cb) {
Status s;
if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo(
my_batch, write_options.protection_bytes_per_key);
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, nullptr, user_write_cb);
}
return s;
}
Status DBImpl::IngestWriteBatchWithIndex(
const WriteOptions& write_options,
std::shared_ptr<WriteBatchWithIndex> wbwi) {
if (!wbwi) {
return Status::InvalidArgument("Batch is nullptr!");
}
if (!write_options.disableWAL) {
return Status::NotSupported(
"IngestWriteBatchWithIndex does not support disableWAL=true");
}
Status s;
if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo(
wbwi->GetWriteBatch(), write_options.protection_bytes_per_key);
}
if (s.ok()) {
WriteBatch dummy_empty_batch;
s = WriteImpl(
write_options, &dummy_empty_batch, nullptr,
nullptr, nullptr, 0,
false, nullptr,
0, nullptr,
nullptr, wbwi);
}
return s;
}
Status DBImpl::IngestWBWIAsMemtable(
std::shared_ptr<WriteBatchWithIndex> wbwi,
const WBWIMemTable::SeqnoRange& assigned_seqno, uint64_t min_prep_log,
SequenceNumber last_seqno_after_ingest, bool memtable_updated,
bool ignore_missing_cf) {
assert(assigned_seqno.upper_bound <= last_seqno_after_ingest);
assert(assigned_seqno.lower_bound > versions_->LastSequence());
autovector<ReadOnlyMemTable*> memtables;
autovector<ColumnFamilyData*> cfds;
InstrumentedMutexLock lock(&mutex_);
ColumnFamilySet* cf_set = versions_->GetColumnFamilySet();
for (const auto& [cf_id, stat] : wbwi->GetCFStats()) {
ColumnFamilyData* cfd = cf_set->GetColumnFamily(cf_id);
if (!cfd) {
if (ignore_missing_cf) {
continue;
}
for (auto mem : memtables) {
mem->Unref();
delete mem;
}
for (auto cfd_ptr : cfds) {
cfd_ptr->UnrefAndTryDelete();
}
Status s = Status::InvalidArgument(
"Invalid column family id from WriteBatchWithIndex: " +
std::to_string(cf_id));
if (memtable_updated) {
s = Status::Corruption(
"Part of the write batch is applied. Memtable is in a inconsistent "
"state. " +
s.ToString());
error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
}
return s;
}
WBWIMemTable* wbwi_memtable =
new WBWIMemTable(wbwi, cfd->user_comparator(), cf_id, &cfd->ioptions(),
&cfd->GetLatestMutableCFOptions(), stat);
wbwi_memtable->Ref();
wbwi_memtable->AssignSequenceNumbers(assigned_seqno);
wbwi_memtable->SetMinPrepLog(min_prep_log);
memtables.push_back(wbwi_memtable);
cfd->Ref();
cfds.push_back(cfd);
}
autovector<ColumnFamilyData*> cfds_for_atomic_flush;
if (immutable_db_options_.atomic_flush) {
SelectColumnFamiliesForAtomicFlush(&cfds_for_atomic_flush);
for (auto cfd : cfds_for_atomic_flush) {
bool found = false;
for (auto existing_cfd : cfds) {
if (existing_cfd == cfd) {
found = true;
break;
}
}
if (!found) {
cfd->Ref();
cfds.push_back(cfd);
}
}
}
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
WaitForPendingWrites();
Status s;
for (size_t i = 0; i < cfds.size(); ++i) {
WriteContext write_context;
if (i < memtables.size()) {
s = SwitchMemtable(cfds[i], &write_context, memtables[i],
last_seqno_after_ingest);
} else {
s = SwitchMemtable(cfds[i], &write_context);
}
if (!s.ok()) {
assert(i == 0);
if (i != 0 || memtable_updated) {
s = Status::Corruption(
"Part of the write batch is applied. Memtable is in a inconsistent "
"state. " +
s.ToString());
error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
} else {
}
for (size_t j = i; j < memtables.size(); j++) {
memtables[j]->Unref();
delete memtables[j];
}
break;
}
}
for (size_t i = 0; i < cfds.size(); ++i) {
if (cfds[i]->UnrefAndTryDelete()) {
cfds[i] = nullptr;
}
}
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
if (s.ok()) {
for (const auto cfd : cfds) {
if (cfd == nullptr) {
continue;
}
cfd->imm()->FlushRequested();
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kExternalFileIngestion,
&flush_req);
EnqueuePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds);
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kExternalFileIngestion,
&flush_req);
EnqueuePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
}
return s;
}
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
UserWriteCallback* user_write_cb, uint64_t* wal_used,
uint64_t log_ref, bool disable_memtable,
uint64_t* seq_used, size_t batch_cnt,
PreReleaseCallback* pre_release_callback,
PostMemTableCallback* post_memtable_callback,
std::shared_ptr<WriteBatchWithIndex> wbwi) {
assert(!seq_per_batch_ || batch_cnt != 0);
assert(my_batch == nullptr || my_batch->Count() == 0 ||
write_options.protection_bytes_per_key == 0 ||
write_options.protection_bytes_per_key ==
my_batch->GetProtectionBytesPerKey());
if (my_batch == nullptr) {
return Status::InvalidArgument("Batch is nullptr!");
} else if (!disable_memtable &&
WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) {
return Status::InvalidArgument("write batch must have timestamp(s) set");
} else if (write_options.rate_limiter_priority != Env::IO_TOTAL &&
write_options.rate_limiter_priority != Env::IO_USER) {
return Status::InvalidArgument(
"WriteOptions::rate_limiter_priority only allows "
"Env::IO_TOTAL and Env::IO_USER due to implementation constraints");
} else if (write_options.rate_limiter_priority != Env::IO_TOTAL &&
(write_options.disableWAL || manual_wal_flush_)) {
return Status::InvalidArgument(
"WriteOptions::rate_limiter_priority currently only supports "
"rate-limiting automatic WAL flush, which requires "
"`WriteOptions::disableWAL` and "
"`DBOptions::manual_wal_flush` both set to false");
} else if (write_options.protection_bytes_per_key != 0 &&
write_options.protection_bytes_per_key != 8) {
return Status::InvalidArgument(
"`WriteOptions::protection_bytes_per_key` must be zero or eight");
} else if (write_options.disableWAL &&
immutable_db_options_.recycle_log_file_num > 0 &&
!(two_write_queues_ && disable_memtable)) {
return Status::InvalidArgument(
"WriteOptions::disableWAL option is not supported if "
"DBOptions::recycle_log_file_num > 0");
}
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_ && !tracer_->IsWriteOrderPreserved()) {
tracer_->Write(my_batch).PermitUncheckedError();
}
}
if (write_options.sync && write_options.disableWAL) {
return Status::InvalidArgument("Sync writes has to enable WAL.");
}
if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) {
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
return Status::NotSupported(
"pipelined_writes is not compatible with seq_per_batch");
}
if (immutable_db_options_.unordered_write &&
immutable_db_options_.enable_pipelined_write) {
return Status::NotSupported(
"pipelined_writes is not compatible with unordered_write");
}
if (immutable_db_options_.enable_pipelined_write &&
post_memtable_callback != nullptr) {
return Status::NotSupported(
"pipelined write currently does not honor post_memtable_callback");
}
if (seq_per_batch_ && post_memtable_callback != nullptr) {
return Status::NotSupported(
"seq_per_batch currently does not honor post_memtable_callback");
}
if (my_batch->HasDeleteRange() && immutable_db_options_.row_cache) {
return Status::NotSupported(
"DeleteRange is not compatible with row cache.");
}
bool ingest_wbwi_for_commit = false;
if (wbwi) {
if (my_batch->HasCommit()) {
ingest_wbwi_for_commit = true;
assert(log_ref);
} else {
assert(write_options.disableWAL);
}
assert(!callback);
if (immutable_db_options_.unordered_write) {
return Status::NotSupported(
"Ingesting WriteBatch does not support unordered_write");
}
if (immutable_db_options_.enable_pipelined_write) {
return Status::NotSupported(
"Ingesting WriteBatch does not support pipelined_write");
}
if (!wbwi->GetOverwriteKey()) {
return Status::NotSupported(
"WriteBatchWithIndex ingestion requires overwrite_key=true");
}
}
assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
disable_memtable);
if (write_options.low_pri) {
Status s = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
if (!s.ok()) {
return s;
}
}
if (two_write_queues_ && disable_memtable) {
AssignOrder assign_order =
seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder;
return WriteImplWALOnly(
&nonmem_write_thread_, write_options, my_batch, callback, user_write_cb,
wal_used, log_ref, seq_used, batch_cnt, pre_release_callback,
assign_order, kDontPublishLastSeq, disable_memtable);
}
if (immutable_db_options_.unordered_write) {
const size_t sub_batch_cnt = batch_cnt != 0
? batch_cnt
: WriteBatchInternal::Count(my_batch);
uint64_t seq = 0;
Status status = WriteImplWALOnly(
&write_thread_, write_options, my_batch, callback, user_write_cb,
wal_used, log_ref, &seq, sub_batch_cnt, pre_release_callback,
kDoAssignOrder, kDoPublishLastSeq, disable_memtable);
TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
if (!status.ok()) {
return status;
}
if (seq_used) {
*seq_used = seq;
}
if (!disable_memtable) {
TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable");
status = UnorderedWriteMemtable(write_options, my_batch, callback,
log_ref, seq, sub_batch_cnt);
}
return status;
}
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, user_write_cb,
wal_used, log_ref, disable_memtable, seq_used);
}
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,
log_ref, disable_memtable, batch_cnt,
pre_release_callback, post_memtable_callback,
wbwi != nullptr);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);
write_thread_.JoinBatchGroup(&w);
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_CALLER) {
write_thread_.SetMemWritersEachStride(&w);
}
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
if (w.ShouldWriteToMemtable()) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time);
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 , this,
true , seq_per_batch_, w.batch_cnt,
batch_per_txn_, write_options.memtable_insert_hint_per_batch);
PERF_TIMER_START(write_pre_and_post_process_time);
}
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
auto last_sequence = w.write_group->last_sequence;
for (auto* tmp_w : *(w.write_group)) {
assert(tmp_w);
if (tmp_w->post_memtable_callback) {
Status tmp_s =
(*tmp_w->post_memtable_callback)(last_sequence, disable_memtable);
assert(tmp_s.ok());
}
}
if (w.status.ok()) { versions_->SetLastSequence(last_sequence);
} else {
HandleMemTableInsertFailure(w.status);
}
write_thread_.ExitAsBatchGroupFollower(&w);
}
assert(w.state == WriteThread::STATE_COMPLETED);
}
if (w.state == WriteThread::STATE_COMPLETED) {
if (wal_used != nullptr) {
*wal_used = w.wal_used;
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
return w.FinalStatus();
}
assert(w.state == WriteThread::STATE_GROUP_LEADER);
Status status;
WriteContext write_context;
WalContext wal_context(write_options.sync);
WriteThread::WriteGroup write_group;
bool in_parallel_group = false;
uint64_t last_sequence = kMaxSequenceNumber;
assert(!two_write_queues_ || !disable_memtable);
{
PERF_TIMER_STOP(write_pre_and_post_process_time);
status = PreprocessWrite(write_options, &wal_context, &write_context);
if (!two_write_queues_) {
last_sequence = versions_->LastSequence();
}
PERF_TIMER_START(write_pre_and_post_process_time);
}
TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters");
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
if (wbwi) {
assert(write_group.size == 1);
}
IOStatus io_s;
Status pre_release_cb_status;
size_t seq_inc = 0;
if (status.ok()) {
bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
write_group.size > 1;
size_t total_count = 0;
size_t valid_batches = 0;
size_t total_byte_size = 0;
size_t pre_release_callback_cnt = 0;
for (auto* writer : write_group) {
assert(writer);
if (writer->CheckCallback(this)) {
valid_batches += writer->batch_cnt;
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->batch);
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
parallel = parallel && !writer->batch->HasMerge();
}
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
}
}
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_ && tracer_->IsWriteOrderPreserved()) {
for (auto* writer : write_group) {
if (writer->CallbackFailed()) {
continue;
}
if (wbwi && !ingest_wbwi_for_commit) {
tracer_->Write(wbwi->GetWriteBatch()).PermitUncheckedError();
} else {
tracer_->Write(writer->batch).PermitUncheckedError();
}
}
}
}
seq_inc = seq_per_batch_ ? valid_batches : total_count;
if (wbwi) {
seq_inc += wbwi->GetWriteBatch()->Count();
}
const bool concurrent_update = two_write_queues_;
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count,
concurrent_update);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
concurrent_update);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_SELF);
auto write_done_by_other = write_group.size - 1;
if (write_done_by_other > 0) {
stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
write_done_by_other, concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
}
RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
if (write_options.disableWAL) {
has_unpersisted_data_.store(true, std::memory_order_relaxed);
}
PERF_TIMER_STOP(write_pre_and_post_process_time);
if (!two_write_queues_) {
if (status.ok() && !write_options.disableWAL) {
assert(wal_context.wal_file_number_size);
wal_context.prev_size = wal_context.writer->file()->GetFileSize();
PERF_TIMER_GUARD(write_wal_time);
io_s = WriteGroupToWAL(write_group, wal_context.writer, wal_used,
wal_context.need_wal_sync,
wal_context.need_wal_dir_sync, last_sequence + 1,
*wal_context.wal_file_number_size);
}
} else {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
io_s = ConcurrentWriteGroupToWAL(write_group, wal_used, &last_sequence,
seq_inc);
} else {
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
}
}
status = io_s;
assert(last_sequence != kMaxSequenceNumber);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += seq_inc;
if (wal_context.need_wal_sync) {
VersionEdit synced_wals;
wal_write_mutex_.Lock();
if (status.ok()) {
MarkLogsSynced(cur_wal_number_, wal_context.need_wal_dir_sync,
&synced_wals);
} else {
MarkLogsNotSynced(cur_wal_number_);
}
wal_write_mutex_.Unlock();
if (status.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
const ReadOptions read_options;
status = ApplyWALToManifest(read_options, write_options, &synced_wals);
}
if (status.ok() && two_write_queues_) {
if (manual_wal_flush_) {
status = FlushWAL(true);
} else {
status = SyncWAL();
}
}
}
if (status.ok()) {
SequenceNumber next_sequence = current_sequence;
size_t index = 0;
for (auto* writer : write_group) {
if (writer->CallbackFailed()) {
continue;
}
writer->sequence = next_sequence;
if (writer->pre_release_callback) {
Status ws = writer->pre_release_callback->Callback(
writer->sequence, disable_memtable, writer->wal_used, index++,
pre_release_callback_cnt);
if (!ws.ok()) {
status = pre_release_cb_status = ws;
break;
}
}
if (seq_per_batch_) {
assert(writer->batch_cnt);
next_sequence += writer->batch_cnt;
} else if (writer->ShouldWriteToMemtable()) {
next_sequence += WriteBatchInternal::Count(writer->batch);
}
}
}
if (status.ok()) {
PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time);
if (!parallel) {
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, &trim_history_scheduler_,
write_options.ignore_missing_column_families,
0 , this, seq_per_batch_, batch_per_txn_);
} else {
write_group.last_sequence = last_sequence;
write_thread_.LaunchParallelMemTableWriters(&write_group);
in_parallel_group = true;
if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence);
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 ,
this, true , seq_per_batch_,
w.batch_cnt, batch_per_txn_,
write_options.memtable_insert_hint_per_batch);
}
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
}
}
PERF_TIMER_START(write_pre_and_post_process_time);
if (!io_s.ok()) {
WALIOStatusCheck(io_s);
}
if (!w.CallbackFailed()) {
if (!io_s.ok()) {
assert(pre_release_cb_status.ok());
} else {
WriteStatusCheck(pre_release_cb_status);
}
} else {
assert(pre_release_cb_status.ok());
}
bool should_exit_batch_group = true;
if (in_parallel_group) {
should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
}
if (wbwi && status.ok() && w.status.ok()) {
uint32_t wbwi_count = wbwi->GetWriteBatch()->Count();
if (wbwi_count) {
uint32_t memtable_update_count = w.batch->Count();
assert(seq_inc == memtable_update_count + wbwi_count);
assert(wbwi_count > 0);
assert(last_sequence != kMaxSequenceNumber);
SequenceNumber lb = last_sequence + 1 - wbwi_count;
SequenceNumber ub = last_sequence;
if (two_write_queues_) {
assert(ub <= versions_->LastAllocatedSequence());
}
status =
IngestWBWIAsMemtable(wbwi, {lb, ub},
log_ref, last_sequence,
memtable_update_count > 0,
write_options.ignore_missing_column_families);
RecordTick(stats_, NUMBER_WBWI_INGEST);
}
}
if (should_exit_batch_group) {
if (status.ok()) {
for (auto* tmp_w : write_group) {
assert(tmp_w);
if (tmp_w->post_memtable_callback) {
Status tmp_s =
(*tmp_w->post_memtable_callback)(last_sequence, disable_memtable);
assert(tmp_s.ok());
}
}
if (w.status.ok()) { versions_->SetLastSequence(last_sequence);
}
}
if (!w.status.ok()) {
if (wal_context.prev_size < SIZE_MAX) {
InstrumentedMutexLock l(&wal_write_mutex_);
if (logs_.back().number == wal_context.wal_file_number_size->number) {
logs_.back().SetAttemptTruncateSize(wal_context.prev_size);
}
}
HandleMemTableInsertFailure(w.status);
}
write_thread_.ExitAsBatchGroupLeader(write_group, status);
}
if (status.ok()) {
status = w.FinalStatus();
}
return status;
}
Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
UserWriteCallback* user_write_cb,
uint64_t* wal_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);
WriteContext write_context;
WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,
log_ref, disable_memtable, 0,
nullptr);
write_thread_.JoinBatchGroup(&w);
TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup");
if (w.state == WriteThread::STATE_GROUP_LEADER) {
WriteThread::WriteGroup wal_write_group;
if (w.callback && !w.callback->AllowWriteBatching()) {
write_thread_.WaitForMemTableWriters();
}
WalContext wal_context(!write_options.disableWAL && write_options.sync);
PERF_TIMER_STOP(write_pre_and_post_process_time);
w.status = PreprocessWrite(write_options, &wal_context, &write_context);
PERF_TIMER_START(write_pre_and_post_process_time);
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
const SequenceNumber current_sequence =
write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
size_t total_count = 0;
size_t total_byte_size = 0;
if (w.status.ok()) {
SequenceNumber next_sequence = current_sequence;
for (auto* writer : wal_write_group) {
assert(writer);
if (writer->CheckCallback(this)) {
if (writer->ShouldWriteToMemtable()) {
writer->sequence = next_sequence;
size_t count = WriteBatchInternal::Count(writer->batch);
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
next_sequence += count;
total_count += count;
}
}
}
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_ != nullptr && tracer_->IsWriteOrderPreserved()) {
for (auto* writer : wal_write_group) {
if (writer->CallbackFailed()) {
continue;
}
tracer_->Write(writer->batch).PermitUncheckedError();
}
}
}
if (w.disable_wal) {
has_unpersisted_data_.store(true, std::memory_order_relaxed);
}
write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
}
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
PERF_TIMER_STOP(write_pre_and_post_process_time);
IOStatus io_s;
io_s.PermitUncheckedError();
if (w.status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
if (wal_write_group.size > 1) {
stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
wal_write_group.size - 1);
RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
}
assert(wal_context.wal_file_number_size);
WalFileNumberSize& wal_file_number_size =
*(wal_context.wal_file_number_size);
io_s = WriteGroupToWAL(wal_write_group, wal_context.writer, wal_used,
wal_context.need_wal_sync,
wal_context.need_wal_dir_sync, current_sequence,
wal_file_number_size);
w.status = io_s;
}
if (!io_s.ok()) {
WALIOStatusCheck(io_s);
} else if (!w.CallbackFailed()) {
WriteStatusCheck(w.status);
}
VersionEdit synced_wals;
if (wal_context.need_wal_sync) {
InstrumentedMutexLock l(&wal_write_mutex_);
if (w.status.ok()) {
MarkLogsSynced(cur_wal_number_, wal_context.need_wal_dir_sync,
&synced_wals);
} else {
MarkLogsNotSynced(cur_wal_number_);
}
}
if (w.status.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
const ReadOptions read_options;
w.status = ApplyWALToManifest(read_options, write_options, &synced_wals);
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
}
WriteThread::WriteGroup memtable_write_group;
if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time);
assert(w.ShouldWriteToMemtable());
write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
if (memtable_write_group.size > 1 &&
immutable_db_options_.allow_concurrent_memtable_write) {
write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
} else {
memtable_write_group.status = WriteBatchInternal::InsertInto(
memtable_write_group, w.sequence, column_family_memtables_.get(),
&flush_scheduler_, &trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 , this,
seq_per_batch_, batch_per_txn_);
if (memtable_write_group.status
.ok()) { versions_->SetLastSequence(memtable_write_group.last_sequence);
} else {
HandleMemTableInsertFailure(memtable_write_group.status);
}
write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
}
} else {
memtable_write_group.status.PermitUncheckedError();
}
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_CALLER) {
write_thread_.SetMemWritersEachStride(&w);
}
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time);
assert(w.ShouldWriteToMemtable());
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 , this, true ,
false , 0 , true ,
write_options.memtable_insert_hint_per_batch);
PERF_TIMER_STOP(write_memtable_time);
PERF_TIMER_START(write_pre_and_post_process_time);
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
if (w.status.ok()) { versions_->SetLastSequence(w.write_group->last_sequence);
} else {
HandleMemTableInsertFailure(w.status);
}
write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
}
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
assert(w.state == WriteThread::STATE_COMPLETED);
return w.FinalStatus();
}
Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback, uint64_t log_ref,
SequenceNumber seq,
const size_t sub_batch_cnt) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);
WriteThread::Writer w(write_options, my_batch, callback,
nullptr, log_ref,
false );
if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) {
w.sequence = seq;
size_t total_count = WriteBatchInternal::Count(my_batch);
InternalStats* stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_FOR_WAIT_GUARD(write_memtable_time);
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 , this, true ,
seq_per_batch_, sub_batch_cnt, true ,
write_options.memtable_insert_hint_per_batch);
if (write_options.disableWAL) {
has_unpersisted_data_.store(true, std::memory_order_relaxed);
}
PERF_TIMER_START(write_pre_and_post_process_time);
}
size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1;
if (pending_cnt == 0) {
std::lock_guard<std::mutex> lck(switch_mutex_);
switch_cv_.notify_all();
}
WriteStatusCheck(w.status);
if (!w.FinalStatus().ok()) {
return w.FinalStatus();
}
return Status::OK();
}
Status DBImpl::WriteImplWALOnly(
WriteThread* write_thread, const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
UserWriteCallback* user_write_cb, uint64_t* wal_used,
const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
const PublishLastSeq publish_last_seq, const bool disable_memtable) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,
log_ref, disable_memtable, sub_batch_cnt,
pre_release_callback);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);
write_thread->JoinBatchGroup(&w);
assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER);
if (w.state == WriteThread::STATE_COMPLETED) {
if (wal_used != nullptr) {
*wal_used = w.wal_used;
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
return w.FinalStatus();
}
assert(w.state == WriteThread::STATE_GROUP_LEADER);
if (publish_last_seq == kDoPublishLastSeq) {
assert(immutable_db_options_.unordered_write);
WalContext wal_context;
WriteContext write_context;
Status status =
PreprocessWrite(write_options, &wal_context, &write_context);
WriteStatusCheckOnLocked(status);
if (!status.ok()) {
WriteThread::WriteGroup write_group;
write_thread->EnterAsBatchGroupLeader(&w, &write_group);
write_thread->ExitAsBatchGroupLeader(write_group, status);
return status;
}
} else {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_FOR_WAIT_GUARD(write_delay_time);
InstrumentedMutexLock lock(&mutex_);
Status status =
DelayWrite(0ull, *write_thread, write_options);
PERF_TIMER_STOP(write_delay_time);
PERF_TIMER_START(write_pre_and_post_process_time);
if (!status.ok()) {
WriteThread::WriteGroup write_group;
write_thread->EnterAsBatchGroupLeader(&w, &write_group);
write_thread->ExitAsBatchGroupLeader(write_group, status);
return status;
}
}
WriteThread::WriteGroup write_group;
uint64_t last_sequence;
write_thread->EnterAsBatchGroupLeader(&w, &write_group);
size_t pre_release_callback_cnt = 0;
size_t total_byte_size = 0;
for (auto* writer : write_group) {
assert(writer);
if (writer->CheckCallback(this)) {
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
}
}
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_ != nullptr && tracer_->IsWriteOrderPreserved()) {
for (auto* writer : write_group) {
if (writer->CallbackFailed()) {
continue;
}
tracer_->Write(writer->batch).PermitUncheckedError();
}
}
}
const bool concurrent_update = true;
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size,
concurrent_update);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1,
concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_SELF);
auto write_done_by_other = write_group.size - 1;
if (write_done_by_other > 0) {
stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
write_done_by_other, concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
}
RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_wal_time);
size_t seq_inc = 0 ;
if (assign_order == kDoAssignOrder) {
size_t total_batch_cnt = 0;
for (auto* writer : write_group) {
assert(writer->batch_cnt || !seq_per_batch_);
if (!writer->CallbackFailed()) {
total_batch_cnt += writer->batch_cnt;
}
}
seq_inc = total_batch_cnt;
}
Status status;
if (!write_options.disableWAL) {
IOStatus io_s = ConcurrentWriteGroupToWAL(write_group, wal_used,
&last_sequence, seq_inc);
status = io_s;
if (!io_s.ok()) {
WALIOStatusCheck(io_s);
write_thread->ExitAsBatchGroupLeader(write_group, status);
return status;
}
} else {
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
}
size_t memtable_write_cnt = 0;
auto curr_seq = last_sequence + 1;
for (auto* writer : write_group) {
if (writer->CallbackFailed()) {
continue;
}
writer->sequence = curr_seq;
if (assign_order == kDoAssignOrder) {
assert(writer->batch_cnt || !seq_per_batch_);
curr_seq += writer->batch_cnt;
}
if (!writer->disable_memtable) {
memtable_write_cnt++;
}
}
if (status.ok() && write_options.sync) {
assert(!write_options.disableWAL);
if (manual_wal_flush_) {
status = FlushWAL(true);
} else {
status = SyncWAL();
}
}
PERF_TIMER_START(write_pre_and_post_process_time);
if (!w.CallbackFailed()) {
WriteStatusCheck(status);
}
if (status.ok()) {
size_t index = 0;
for (auto* writer : write_group) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(
writer->sequence, disable_memtable, writer->wal_used, index++,
pre_release_callback_cnt);
if (!ws.ok()) {
status = ws;
break;
}
}
}
}
if (publish_last_seq == kDoPublishLastSeq) {
versions_->SetLastSequence(last_sequence + seq_inc);
assert(immutable_db_options_.unordered_write);
}
if (immutable_db_options_.unordered_write && status.ok()) {
pending_memtable_writes_ += memtable_write_cnt;
}
write_thread->ExitAsBatchGroupLeader(write_group, status);
if (status.ok()) {
status = w.FinalStatus();
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
return status;
}
void DBImpl::WriteStatusCheckOnLocked(const Status& status) {
InstrumentedMutexLock l(&mutex_);
assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
if (immutable_db_options_.paranoid_checks && !status.ok() &&
!status.IsBusy() && !status.IsIncomplete()) {
error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
}
}
void DBImpl::WriteStatusCheck(const Status& status) {
assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
if (immutable_db_options_.paranoid_checks && !status.ok() &&
!status.IsBusy() && !status.IsIncomplete()) {
mutex_.Lock();
error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
mutex_.Unlock();
}
}
void DBImpl::WALIOStatusCheck(const IOStatus& io_status) {
if ((immutable_db_options_.paranoid_checks && !io_status.ok() &&
!io_status.IsBusy() && !io_status.IsIncomplete()) ||
io_status.IsIOFenced()) {
mutex_.Lock();
error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback,
true);
mutex_.Unlock();
} else {
logs_.back().writer->file()->reset_seen_error();
}
}
void DBImpl::HandleMemTableInsertFailure(const Status& status) {
assert(!status.ok());
mutex_.Lock();
assert(!error_handler_.IsBGWorkStopped());
error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable);
mutex_.Unlock();
}
Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
WalContext* wal_context,
WriteContext* write_context) {
assert(write_context != nullptr && wal_context != nullptr);
Status status;
if (error_handler_.IsDBStopped()) {
InstrumentedMutexLock l(&mutex_);
status = error_handler_.GetBGError();
}
PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time);
if (UNLIKELY(status.ok() &&
wals_total_size_.LoadRelaxed() > GetMaxTotalWalSize())) {
assert(versions_);
InstrumentedMutexLock l(&mutex_);
const ColumnFamilySet* const column_families =
versions_->GetColumnFamilySet();
assert(column_families);
size_t num_cfs = column_families->NumberOfColumnFamilies();
assert(num_cfs >= 1);
if (num_cfs > 1) {
WaitForPendingWrites();
status = SwitchWAL(write_context);
}
}
if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
InstrumentedMutexLock l(&mutex_);
WaitForPendingWrites();
status = HandleWriteBufferManagerFlush(write_context);
}
if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) {
InstrumentedMutexLock l(&mutex_);
status = TrimMemtableHistory(write_context);
}
if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
InstrumentedMutexLock l(&mutex_);
WaitForPendingWrites();
status = ScheduleFlushes(write_context);
}
PERF_TIMER_STOP(write_scheduling_flushes_compactions_time);
PERF_TIMER_GUARD(write_pre_and_post_process_time);
if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
write_controller_.NeedsDelay()))) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_FOR_WAIT_GUARD(write_delay_time);
InstrumentedMutexLock l(&mutex_);
status = DelayWrite(last_batch_group_size_, write_thread_, write_options);
PERF_TIMER_START(write_pre_and_post_process_time);
}
if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldStall())) {
default_cf_internal_stats_->AddDBStats(
InternalStats::kIntStatsWriteBufferManagerLimitStopsCounts, 1,
true );
if (write_options.no_slowdown) {
status = Status::Incomplete("Write stall");
} else {
InstrumentedMutexLock l(&mutex_);
WriteBufferManagerStallWrites();
}
}
InstrumentedMutexLock l(&wal_write_mutex_);
if (status.ok() && wal_context->need_wal_sync) {
while (logs_.front().IsSyncing()) {
wal_sync_cv_.Wait();
}
for (auto& log : logs_) {
log.PrepareForSync();
}
} else {
wal_context->need_wal_sync = false;
}
wal_context->writer = logs_.back().writer;
wal_context->need_wal_dir_sync =
wal_context->need_wal_dir_sync && !wal_dir_synced_;
wal_context->wal_file_number_size = std::addressof(alive_wal_files_.back());
return status;
}
Status DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
WriteBatch* tmp_batch, WriteBatch** merged_batch,
size_t* write_with_wal,
WriteBatch** to_be_cached_state) {
assert(write_with_wal != nullptr);
assert(tmp_batch != nullptr);
assert(*to_be_cached_state == nullptr);
*write_with_wal = 0;
auto* leader = write_group.leader;
assert(!leader->disable_wal); if (write_group.size == 1 && !leader->CallbackFailed() &&
leader->batch->GetWalTerminationPoint().is_cleared()) {
*merged_batch = leader->batch;
if (WriteBatchInternal::IsLatestPersistentState(*merged_batch)) {
*to_be_cached_state = *merged_batch;
}
*write_with_wal = 1;
} else {
*merged_batch = tmp_batch;
for (auto writer : write_group) {
if (!writer->CallbackFailed()) {
Status s = WriteBatchInternal::Append(*merged_batch, writer->batch,
true);
if (!s.ok()) {
tmp_batch->Clear();
return s;
}
if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
*to_be_cached_state = writer->batch;
}
(*write_with_wal)++;
}
}
}
return Status::OK();
}
IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
const WriteOptions& write_options,
log::Writer* log_writer, uint64_t* wal_used,
uint64_t* log_size,
WalFileNumberSize& wal_file_number_size,
SequenceNumber sequence) {
assert(log_size != nullptr);
Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
TEST_SYNC_POINT_CALLBACK("DBImpl::WriteToWAL:log_entry", &log_entry);
auto s = merged_batch.VerifyChecksum();
if (!s.ok()) {
return status_to_io_status(std::move(s));
}
*log_size = log_entry.size();
const bool needs_locking = manual_wal_flush_ && !two_write_queues_;
if (UNLIKELY(needs_locking)) {
wal_write_mutex_.Lock();
}
IOStatus io_s = log_writer->MaybeAddUserDefinedTimestampSizeRecord(
write_options, versions_->GetColumnFamiliesTimestampSizeForRecord());
if (!io_s.ok()) {
return io_s;
}
io_s = log_writer->AddRecord(write_options, log_entry, sequence);
if (UNLIKELY(needs_locking)) {
wal_write_mutex_.Unlock();
}
if (wal_used != nullptr) {
*wal_used = cur_wal_number_;
assert(*wal_used == wal_file_number_size.number);
}
wals_total_size_.FetchAddRelaxed(log_entry.size());
wal_file_number_size.AddSize(*log_size);
wal_empty_ = false;
return io_s;
}
IOStatus DBImpl::WriteGroupToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* wal_used,
bool need_wal_sync, bool need_wal_dir_sync,
SequenceNumber sequence,
WalFileNumberSize& wal_file_number_size) {
IOStatus io_s;
assert(!two_write_queues_);
assert(!write_group.leader->disable_wal);
size_t write_with_wal = 0;
WriteBatch* to_be_cached_state = nullptr;
WriteBatch* merged_batch;
io_s = status_to_io_status(MergeBatch(write_group, &tmp_batch_, &merged_batch,
&write_with_wal, &to_be_cached_state));
if (UNLIKELY(!io_s.ok())) {
return io_s;
}
if (merged_batch == write_group.leader->batch) {
write_group.leader->wal_used = cur_wal_number_;
} else if (write_with_wal > 1) {
for (auto writer : write_group) {
writer->wal_used = cur_wal_number_;
}
}
WriteBatchInternal::SetSequence(merged_batch, sequence);
uint64_t log_size;
WriteOptions write_options;
write_options.rate_limiter_priority =
write_group.leader->rate_limiter_priority;
io_s = WriteToWAL(*merged_batch, write_options, log_writer, wal_used,
&log_size, wal_file_number_size, sequence);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
}
if (io_s.ok() && need_wal_sync) {
StopWatch sw(immutable_db_options_.clock, stats_, WAL_FILE_SYNC_MICROS);
const bool needs_locking = manual_wal_flush_ && !two_write_queues_;
if (UNLIKELY(needs_locking)) {
wal_write_mutex_.Lock();
}
if (io_s.ok()) {
for (auto& log : logs_) {
IOOptions opts;
io_s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (!io_s.ok()) {
break;
}
if (auto* f = log.writer->file()) {
io_s = f->Sync(opts, immutable_db_options_.use_fsync);
if (!io_s.ok()) {
break;
}
}
}
}
if (UNLIKELY(needs_locking)) {
wal_write_mutex_.Unlock();
}
if (io_s.ok() && need_wal_dir_sync) {
io_s = directories_.GetWalDir()->FsyncWithDirOptions(
IOOptions(), nullptr,
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
}
}
if (merged_batch == &tmp_batch_) {
tmp_batch_.Clear();
}
if (io_s.ok()) {
auto stats = default_cf_internal_stats_;
if (need_wal_sync) {
stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1);
RecordTick(stats_, WAL_FILE_SYNCED);
}
stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size);
RecordTick(stats_, WAL_FILE_BYTES, log_size);
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
for (auto* writer : write_group) {
if (!writer->CallbackFailed()) {
writer->CheckPostWalWriteCallback();
}
}
}
return io_s;
}
IOStatus DBImpl::ConcurrentWriteGroupToWAL(
const WriteThread::WriteGroup& write_group, uint64_t* wal_used,
SequenceNumber* last_sequence, size_t seq_inc) {
IOStatus io_s;
assert(two_write_queues_ || immutable_db_options_.unordered_write);
assert(!write_group.leader->disable_wal);
WriteBatch tmp_batch;
size_t write_with_wal = 0;
WriteBatch* to_be_cached_state = nullptr;
WriteBatch* merged_batch;
io_s = status_to_io_status(MergeBatch(write_group, &tmp_batch, &merged_batch,
&write_with_wal, &to_be_cached_state));
if (UNLIKELY(!io_s.ok())) {
return io_s;
}
wal_write_mutex_.Lock();
if (merged_batch == write_group.leader->batch) {
write_group.leader->wal_used = cur_wal_number_;
} else if (write_with_wal > 1) {
for (auto writer : write_group) {
writer->wal_used = cur_wal_number_;
}
}
*last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
auto sequence = *last_sequence + 1;
WriteBatchInternal::SetSequence(merged_batch, sequence);
log::Writer* log_writer = logs_.back().writer;
WalFileNumberSize& wal_file_number_size = alive_wal_files_.back();
assert(log_writer->get_log_number() == wal_file_number_size.number);
uint64_t log_size;
WriteOptions write_options;
write_options.rate_limiter_priority =
write_group.leader->rate_limiter_priority;
io_s = WriteToWAL(*merged_batch, write_options, log_writer, wal_used,
&log_size, wal_file_number_size, sequence);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
}
wal_write_mutex_.Unlock();
if (io_s.ok()) {
const bool concurrent = true;
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size,
concurrent);
RecordTick(stats_, WAL_FILE_BYTES, log_size);
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal,
concurrent);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
for (auto* writer : write_group) {
if (!writer->CallbackFailed()) {
writer->CheckPostWalWriteCallback();
}
}
}
return io_s;
}
Status DBImpl::WriteRecoverableState() {
mutex_.AssertHeld();
if (!cached_recoverable_state_empty_) {
assert(seq_per_batch_);
bool dont_care_bool;
SequenceNumber next_seq;
if (two_write_queues_) {
wal_write_mutex_.Lock();
}
SequenceNumber seq;
if (two_write_queues_) {
seq = versions_->FetchAddLastAllocatedSequence(0);
} else {
seq = versions_->LastSequence();
}
WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1);
auto status = WriteBatchInternal::InsertInto(
&cached_recoverable_state_, column_family_memtables_.get(),
&flush_scheduler_, &trim_history_scheduler_, true,
0 , this, false ,
&next_seq, &dont_care_bool, seq_per_batch_);
auto last_seq = next_seq - 1;
if (status.ok()) { if (two_write_queues_) {
versions_->FetchAddLastAllocatedSequence(last_seq - seq);
versions_->SetLastPublishedSequence(last_seq);
}
versions_->SetLastSequence(last_seq);
} else {
HandleMemTableInsertFailure(status);
}
if (two_write_queues_) {
wal_write_mutex_.Unlock();
}
if (status.ok() && recoverable_state_pre_release_callback_) {
const bool DISABLE_MEMTABLE = true;
for (uint64_t sub_batch_seq = seq + 1;
sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
uint64_t const no_log_num = 0;
mutex_.Unlock();
status = recoverable_state_pre_release_callback_->Callback(
sub_batch_seq, !DISABLE_MEMTABLE, no_log_num, 0, 1);
mutex_.Lock();
}
}
if (status.ok()) {
cached_recoverable_state_.Clear();
cached_recoverable_state_empty_ = true;
} else {
}
return status;
}
return Status::OK();
}
void DBImpl::SelectColumnFamiliesForAtomicFlush(
autovector<ColumnFamilyData*>* selected_cfds,
const autovector<ColumnFamilyData*>& provided_candidate_cfds,
FlushReason flush_reason) {
mutex_.AssertHeld();
assert(selected_cfds);
autovector<ColumnFamilyData*> candidate_cfds;
if (provided_candidate_cfds.empty()) {
for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->initialized()) {
cfd->Ref();
candidate_cfds.push_back(cfd);
}
}
} else {
candidate_cfds = provided_candidate_cfds;
}
for (ColumnFamilyData* cfd : candidate_cfds) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load() ||
IsRecoveryFlush(flush_reason)) {
selected_cfds->push_back(cfd);
}
}
if (provided_candidate_cfds.empty()) {
for (auto candidate_cfd : candidate_cfds) {
candidate_cfd->UnrefAndTryDelete();
}
}
}
void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) {
assert(immutable_db_options_.atomic_flush);
auto seq = versions_->LastSequence();
for (auto cfd : cfds) {
if (cfd) {
cfd->imm()->AssignAtomicFlushSeq(seq);
}
}
}
Status DBImpl::SwitchWAL(WriteContext* write_context) {
mutex_.AssertHeld();
assert(write_context != nullptr);
Status status;
if (alive_wal_files_.begin()->getting_flushed) {
return status;
}
auto oldest_alive_log = alive_wal_files_.begin()->number;
bool flush_wont_release_oldest_log = false;
if (allow_2pc()) {
auto oldest_log_with_uncommitted_prep =
logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();
assert(oldest_log_with_uncommitted_prep == 0 ||
oldest_log_with_uncommitted_prep >= oldest_alive_log);
if (oldest_log_with_uncommitted_prep > 0 &&
oldest_log_with_uncommitted_prep == oldest_alive_log) {
if (unable_to_release_oldest_log_) {
return status;
} else {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to release oldest log due to uncommitted transaction");
unable_to_release_oldest_log_ = true;
flush_wont_release_oldest_log = true;
}
}
}
if (!flush_wont_release_oldest_log) {
unable_to_release_oldest_log_ = false;
alive_wal_files_.begin()->getting_flushed = true;
}
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Flushing all column families with data in WAL number %" PRIu64
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
oldest_alive_log, wals_total_size_.LoadRelaxed(), GetMaxTotalWalSize());
autovector<ColumnFamilyData*> cfds;
if (immutable_db_options_.atomic_flush) {
SelectColumnFamiliesForAtomicFlush(&cfds);
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->OldestLogToKeep() <= oldest_alive_log) {
cfds.push_back(cfd);
}
}
MaybeFlushStatsCF(&cfds);
}
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
for (const auto cfd : cfds) {
cfd->Ref();
status = SwitchMemtable(cfd, write_context);
cfd->UnrefAndTryDelete();
if (!status.ok()) {
break;
}
}
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
if (status.ok()) {
if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds);
}
for (auto cfd : cfds) {
cfd->imm()->FlushRequested();
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWalFull, &flush_req);
EnqueuePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWalFull, &flush_req);
EnqueuePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
}
return status;
}
Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
mutex_.AssertHeld();
assert(write_context != nullptr);
Status status;
autovector<ColumnFamilyData*> cfds;
if (immutable_db_options_.atomic_flush) {
SelectColumnFamiliesForAtomicFlush(&cfds);
} else {
ColumnFamilyData* cfd_picked = nullptr;
SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (!cfd->mem()->IsEmpty() && !cfd->imm()->IsFlushPendingOrRunning()) {
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
cfd_picked = cfd;
seq_num_for_cf_picked = seq;
}
}
}
if (cfd_picked != nullptr) {
cfds.push_back(cfd_picked);
}
MaybeFlushStatsCF(&cfds);
}
if (!cfds.empty()) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Flushing triggered to alleviate write buffer memory usage. Write "
"buffer is using %" ROCKSDB_PRIszt
" bytes out of a total of %" ROCKSDB_PRIszt ".",
write_buffer_manager_->memory_usage(),
write_buffer_manager_->buffer_size());
}
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
for (const auto cfd : cfds) {
if (cfd->mem()->IsEmpty()) {
continue;
}
cfd->Ref();
status = SwitchMemtable(cfd, write_context);
cfd->UnrefAndTryDelete();
if (!status.ok()) {
break;
}
}
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
if (status.ok()) {
if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds);
}
for (const auto cfd : cfds) {
cfd->imm()->FlushRequested();
if (!immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferManager,
&flush_req);
EnqueuePendingFlush(flush_req);
}
}
if (immutable_db_options_.atomic_flush) {
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager, &flush_req);
EnqueuePendingFlush(flush_req);
}
MaybeScheduleFlushOrCompaction();
}
return status;
}
uint64_t DBImpl::GetMaxTotalWalSize() const {
uint64_t max_total_wal_size =
max_total_wal_size_.load(std::memory_order_acquire);
if (max_total_wal_size > 0) {
return max_total_wal_size;
}
return 4 * max_total_in_memory_state_.load(std::memory_order_acquire);
}
Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
const WriteOptions& write_options) {
mutex_.AssertHeld();
uint64_t start_time = 0;
bool delayed = false;
{
uint64_t delay;
if (&write_thread == &write_thread_) {
delay =
write_controller_.GetDelay(immutable_db_options_.clock, num_bytes);
} else {
assert(num_bytes == 0);
delay = 0;
}
TEST_SYNC_POINT("DBImpl::DelayWrite:Start");
start_time = immutable_db_options_.clock->NowMicros();
if (delay > 0) {
if (write_options.no_slowdown) {
return Status::Incomplete("Write stall");
}
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
write_thread.BeginWriteStall();
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone");
const uint64_t kDelayInterval = 1001;
uint64_t stall_end = start_time + delay;
while (write_controller_.NeedsDelay()) {
if (immutable_db_options_.clock->NowMicros() >= stall_end) {
break;
}
delayed = true;
immutable_db_options_.clock->SleepForMicroseconds(kDelayInterval);
}
mutex_.Lock();
write_thread.EndWriteStall();
}
while ((error_handler_.GetBGError().ok() ||
error_handler_.IsRecoveryInProgress()) &&
write_controller_.IsStopped() &&
!shutting_down_.load(std::memory_order_relaxed)) {
if (write_options.no_slowdown) {
return Status::Incomplete("Write stall");
}
delayed = true;
write_thread.BeginWriteStall();
if (&write_thread == &write_thread_) {
TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
} else {
TEST_SYNC_POINT("DBImpl::DelayWrite:NonmemWait");
}
bg_cv_.Wait();
TEST_SYNC_POINT_CALLBACK("DBImpl::DelayWrite:AfterWait", &mutex_);
write_thread.EndWriteStall();
}
}
assert(!delayed || !write_options.no_slowdown);
if (delayed) {
auto time_delayed = immutable_db_options_.clock->NowMicros() - start_time;
default_cf_internal_stats_->AddDBStats(
InternalStats::kIntStatsWriteStallMicros, time_delayed);
RecordTick(stats_, STALL_MICROS, time_delayed);
RecordInHistogram(stats_, WRITE_STALL, time_delayed);
}
Status s;
if (write_controller_.IsStopped()) {
if (!shutting_down_.load(std::memory_order_relaxed)) {
s = Status::Incomplete(error_handler_.GetBGError().ToString());
} else {
s = Status::ShutdownInProgress("stalled writes");
}
}
if (error_handler_.IsDBStopped()) {
s = error_handler_.GetBGError();
}
return s;
}
void DBImpl::WriteBufferManagerStallWrites() {
mutex_.AssertHeld();
write_thread_.BeginWriteStall();
mutex_.Unlock();
static_cast<WBMStallInterface*>(wbm_stall_.get())
->SetState(WBMStallInterface::State::BLOCKED);
write_buffer_manager_->BeginWriteStall(wbm_stall_.get());
wbm_stall_->Block();
mutex_.Lock();
write_thread_.EndWriteStall();
}
Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
WriteBatch* my_batch) {
assert(write_options.low_pri);
if (write_controller_.NeedSpeedupCompaction()) {
if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) {
return Status::OK();
}
if (write_options.no_slowdown) {
return Status::Incomplete("Low priority write stall");
} else {
assert(my_batch != nullptr);
PERF_TIMER_FOR_WAIT_GUARD(write_delay_time);
auto data_size = my_batch->GetDataSize();
while (data_size > 0) {
size_t allowed = write_controller_.low_pri_rate_limiter()->RequestToken(
data_size, 0 , Env::IO_HIGH, nullptr ,
RateLimiter::OpType::kWrite);
data_size -= allowed;
}
}
}
return Status::OK();
}
void DBImpl::MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds) {
assert(cfds != nullptr);
if (!cfds->empty() && immutable_db_options_.persist_stats_to_disk) {
ColumnFamilyData* cfd_stats =
versions_->GetColumnFamilySet()->GetColumnFamily(
kPersistentStatsColumnFamilyName);
if (cfd_stats != nullptr && !cfd_stats->mem()->IsEmpty()) {
for (ColumnFamilyData* cfd : *cfds) {
if (cfd == cfd_stats) {
return;
}
}
bool force_flush_stats_cf = true;
for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
if (loop_cfd == cfd_stats) {
continue;
}
if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
force_flush_stats_cf = false;
}
}
if (force_flush_stats_cf) {
cfds->push_back(cfd_stats);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Force flushing stats CF with automated flush "
"to avoid holding old logs");
}
}
}
}
Status DBImpl::TrimMemtableHistory(WriteContext* context) {
autovector<ColumnFamilyData*> cfds;
ColumnFamilyData* tmp_cfd;
while ((tmp_cfd = trim_history_scheduler_.TakeNextColumnFamily()) !=
nullptr) {
cfds.push_back(tmp_cfd);
}
for (auto& cfd : cfds) {
autovector<MemTable*> to_delete;
bool trimmed = cfd->imm()->TrimHistory(&context->memtables_to_free_,
cfd->mem()->MemoryAllocatedBytes());
if (trimmed) {
context->superversion_context.NewSuperVersion();
assert(context->superversion_context.new_superversion.get() != nullptr);
cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
}
if (cfd->UnrefAndTryDelete()) {
cfd = nullptr;
}
}
return Status::OK();
}
Status DBImpl::ScheduleFlushes(WriteContext* context) {
autovector<ColumnFamilyData*> cfds;
if (immutable_db_options_.atomic_flush) {
SelectColumnFamiliesForAtomicFlush(&cfds);
for (auto cfd : cfds) {
cfd->Ref();
}
flush_scheduler_.Clear();
} else {
ColumnFamilyData* tmp_cfd;
while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
cfds.push_back(tmp_cfd);
}
MaybeFlushStatsCF(&cfds);
}
Status status;
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
TEST_SYNC_POINT_CALLBACK("DBImpl::ScheduleFlushes:PreSwitchMemtable",
nullptr);
for (auto& cfd : cfds) {
if (status.ok() && !cfd->mem()->IsEmpty()) {
status = SwitchMemtable(cfd, context);
}
if (cfd->UnrefAndTryDelete()) {
cfd = nullptr;
}
}
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
if (status.ok()) {
if (immutable_db_options_.atomic_flush) {
AssignAtomicFlushSeq(cfds);
FlushRequest flush_req;
GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull, &flush_req);
EnqueuePendingFlush(flush_req);
} else {
for (auto* cfd : cfds) {
FlushRequest flush_req;
GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req);
EnqueuePendingFlush(flush_req);
}
}
MaybeScheduleFlushOrCompaction();
}
return status;
}
void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* ,
const MemTableInfo& mem_table_info) {
if (immutable_db_options_.listeners.size() == 0U) {
return;
}
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
mutex_.Unlock();
for (const auto& listener : immutable_db_options_.listeners) {
listener->OnMemTableSealed(mem_table_info);
}
mutex_.Lock();
}
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
ReadOnlyMemTable* new_imm,
SequenceNumber last_seqno) {
mutex_.AssertHeld();
assert(lock_wal_count_ == 0);
const WriteOptions write_options;
log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr;
IOStatus io_s;
Status s = WriteRecoverableState();
if (!s.ok()) {
return s;
}
assert(versions_->prev_log_number() == 0);
if (two_write_queues_) {
wal_write_mutex_.Lock();
}
bool creating_new_log = !wal_empty_;
if (two_write_queues_) {
wal_write_mutex_.Unlock();
}
uint64_t recycle_log_number = 0;
if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
!wal_recycle_files_.empty() && IsFileDeletionsEnabled()) {
recycle_log_number = wal_recycle_files_.front();
}
uint64_t new_log_number =
creating_new_log ? versions_->NewFileNumber() : cur_wal_number_;
const MutableCFOptions mutable_cf_options_copy =
cfd->GetLatestMutableCFOptions();
MemTableInfo memtable_info;
memtable_info.cf_name = cfd->GetName();
memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
memtable_info.num_entries = cfd->mem()->NumEntries();
memtable_info.num_deletes = cfd->mem()->NumDeletion();
if (!cfd->ioptions().persist_user_defined_timestamps &&
cfd->user_comparator()->timestamp_size() > 0) {
const Slice& newest_udt = cfd->mem()->GetNewestUDT();
memtable_info.newest_udt.assign(newest_udt.data(), newest_udt.size());
}
int num_imm_unflushed = cfd->imm()->NumNotFlushed();
const auto preallocate_block_size =
GetWalPreallocateBlockSize(mutable_cf_options_copy.write_buffer_size);
mutex_.Unlock();
if (creating_new_log) {
PredecessorWALInfo info;
wal_write_mutex_.Lock();
if (!logs_.empty()) {
log::Writer* cur_log_writer = logs_.back().writer;
info = PredecessorWALInfo(cur_log_writer->get_log_number(),
cur_log_writer->file()->GetFileSize(),
cur_log_writer->GetLastSeqnoRecorded());
}
wal_write_mutex_.Unlock();
io_s = CreateWAL(write_options, new_log_number, recycle_log_number,
preallocate_block_size, info, &new_log);
if (s.ok()) {
s = io_s;
}
}
if (s.ok()) {
SequenceNumber seq;
if (new_imm) {
assert(last_seqno > versions_->LastSequence());
seq = last_seqno;
} else {
seq = versions_->LastSequence();
}
new_mem = cfd->ConstructNewMemtable(mutable_cf_options_copy,
seq);
context->superversion_context.NewSuperVersion();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] New memtable created with log file: #%" PRIu64
". Immutable memtables: %d.\n",
cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
cfd->mem()->ConstructFragmentedRangeTombstones();
}
mutex_.Lock();
if (recycle_log_number != 0) {
assert(wal_recycle_files_.front() == recycle_log_number);
wal_recycle_files_.pop_front();
}
if (s.ok() && creating_new_log) {
InstrumentedMutexLock l(&wal_write_mutex_);
assert(new_log != nullptr);
if (!logs_.empty()) {
log::Writer* cur_log_writer = logs_.back().writer;
if (error_handler_.IsRecoveryInProgress()) {
cur_log_writer->file()->reset_seen_error();
}
io_s = cur_log_writer->WriteBuffer(write_options);
if (s.ok()) {
s = io_s;
}
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
" WAL file\n",
cfd->GetName().c_str(), cur_log_writer->get_log_number(),
new_log_number);
}
}
if (s.ok()) {
cur_wal_number_ = new_log_number;
wal_empty_ = true;
wal_dir_synced_ = false;
logs_.emplace_back(cur_wal_number_, new_log);
alive_wal_files_.emplace_back(cur_wal_number_);
}
}
if (!s.ok()) {
assert(creating_new_log);
delete new_mem;
delete new_log;
context->superversion_context.new_superversion.reset();
if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kMemTable,
true);
} else {
error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
}
s = error_handler_.GetBGError();
return s;
}
bool empty_cf_updated = false;
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
!immutable_db_options_.allow_2pc && creating_new_log) {
uint64_t min_wal_number_to_keep =
versions_->PreComputeMinLogNumberWithUnflushedData(cur_wal_number_);
if (min_wal_number_to_keep >
versions_->GetWalSet().GetMinWalNumberToKeep()) {
const ReadOptions read_options;
autovector<ColumnFamilyData*> empty_cfs;
for (auto cf : *versions_->GetColumnFamilySet()) {
if (cf->IsEmpty()) {
empty_cfs.push_back(cf);
}
}
VersionEdit wal_deletion;
wal_deletion.DeleteWalsBefore(min_wal_number_to_keep);
s = versions_->LogAndApplyToDefaultColumnFamily(
read_options, write_options, &wal_deletion, &mutex_,
directories_.GetDbDir());
if (!s.ok() && versions_->io_status().IsIOError()) {
error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
if (!s.ok()) {
return s;
}
for (auto cf : empty_cfs) {
if (cf->IsEmpty()) {
cf->SetLogNumber(cur_wal_number_);
cf->mem()->SetCreationSeq(versions_->LastSequence());
} }
empty_cf_updated = true;
}
}
if (!empty_cf_updated) {
for (auto cf : *versions_->GetColumnFamilySet()) {
if (cf->IsEmpty()) {
if (creating_new_log) {
cf->SetLogNumber(cur_wal_number_);
}
cf->mem()->SetCreationSeq(versions_->LastSequence());
}
}
}
cfd->mem()->SetNextLogNumber(cur_wal_number_);
assert(new_mem != nullptr);
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
if (new_imm) {
cfd->AssignMemtableID(new_imm);
new_imm->SetNextLogNumber(cur_wal_number_);
cfd->imm()->Add(new_imm, &context->memtables_to_free_);
}
new_mem->Ref();
cfd->SetMemtable(new_mem);
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context);
NotifyOnMemTableSealed(cfd, memtable_info);
io_s.PermitUncheckedError();
assert(s.ok());
return s;
}
size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
mutex_.AssertHeld();
size_t bsize =
static_cast<size_t>(write_buffer_size / 10 + write_buffer_size);
if (mutable_db_options_.max_total_wal_size > 0) {
bsize = std::min<size_t>(
bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size));
}
if (immutable_db_options_.db_write_buffer_size > 0) {
bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);
}
if (immutable_db_options_.write_buffer_manager &&
immutable_db_options_.write_buffer_manager->enabled()) {
bsize = std::min<size_t>(
bsize, immutable_db_options_.write_buffer_manager->buffer_size());
}
return bsize;
}
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
WriteBatch batch(key.size() + value.size() + 24, 0 ,
opt.protection_bytes_per_key, 0 );
Status s = batch.Put(column_family, key, value);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& value) {
ColumnFamilyHandle* default_cf = DefaultColumnFamily();
assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp);
WriteBatch batch(0 , 0 ,
opt.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
Status s = batch.Put(column_family, key, ts, value);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
Status DB::PutEntity(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) {
const ColumnFamilyHandle* const default_cf = DefaultColumnFamily();
assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp);
WriteBatch batch( 0, 0,
options.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
const Status s = batch.PutEntity(column_family, key, columns);
if (!s.ok()) {
return s;
}
return Write(options, &batch);
}
Status DB::PutEntity(const WriteOptions& options, const Slice& key,
const AttributeGroups& attribute_groups) {
ColumnFamilyHandle* default_cf = DefaultColumnFamily();
assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp);
WriteBatch batch(0 , 0 ,
options.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
const Status s = batch.PutEntity(key, attribute_groups);
if (!s.ok()) {
return s;
}
return Write(options, &batch);
}
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key) {
WriteBatch batch(0 , 0 ,
opt.protection_bytes_per_key, 0 );
Status s = batch.Delete(column_family, key);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts) {
ColumnFamilyHandle* default_cf = DefaultColumnFamily();
assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp);
WriteBatch batch(0 , 0 ,
opt.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
Status s = batch.Delete(column_family, key, ts);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
Status DB::SingleDelete(const WriteOptions& opt,
ColumnFamilyHandle* column_family, const Slice& key) {
WriteBatch batch(0 , 0 ,
opt.protection_bytes_per_key, 0 );
Status s = batch.SingleDelete(column_family, key);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
Status DB::SingleDelete(const WriteOptions& opt,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& ts) {
ColumnFamilyHandle* default_cf = DefaultColumnFamily();
assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp);
WriteBatch batch(0 , 0 ,
opt.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
Status s = batch.SingleDelete(column_family, key, ts);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
Status DB::DeleteRange(const WriteOptions& opt,
ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) {
WriteBatch batch(0 , 0 ,
opt.protection_bytes_per_key, 0 );
Status s = batch.DeleteRange(column_family, begin_key, end_key);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
Status DB::DeleteRange(const WriteOptions& opt,
ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key,
const Slice& ts) {
ColumnFamilyHandle* default_cf = DefaultColumnFamily();
assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp);
WriteBatch batch(0 , 0 ,
opt.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
Status s = batch.DeleteRange(column_family, begin_key, end_key, ts);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
WriteBatch batch(0 , 0 ,
opt.protection_bytes_per_key, 0 );
Status s = batch.Merge(column_family, key, value);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& value) {
ColumnFamilyHandle* default_cf = DefaultColumnFamily();
assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp);
WriteBatch batch(0 , 0 ,
opt.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
Status s = batch.Merge(column_family, key, ts, value);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
}