#include "rocksdb/write_batch.h"
#include <map>
#include <stack>
#include <stdexcept>
#include <type_traits>
#include <unordered_map>
#include <vector>
#include "db/column_family.h"
#include "db/db_impl/db_impl.h"
#include "db/dbformat.h"
#include "db/flush_scheduler.h"
#include "db/kv_checksum.h"
#include "db/memtable.h"
#include "db/merge_context.h"
#include "db/snapshot_impl.h"
#include "db/trim_history_scheduler.h"
#include "db/write_batch_internal.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
#include "port/lang.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/system_clock.h"
#include "util/autovector.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/duplicate_detector.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
namespace {
enum ContentFlags : uint32_t {
DEFERRED = 1 << 0,
HAS_PUT = 1 << 1,
HAS_DELETE = 1 << 2,
HAS_SINGLE_DELETE = 1 << 3,
HAS_MERGE = 1 << 4,
HAS_BEGIN_PREPARE = 1 << 5,
HAS_END_PREPARE = 1 << 6,
HAS_COMMIT = 1 << 7,
HAS_ROLLBACK = 1 << 8,
HAS_DELETE_RANGE = 1 << 9,
HAS_BLOB_INDEX = 1 << 10,
HAS_BEGIN_UNPREPARE = 1 << 11,
};
struct BatchContentClassifier : public WriteBatch::Handler {
uint32_t content_flags = 0;
Status PutCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_PUT;
return Status::OK();
}
Status DeleteCF(uint32_t, const Slice&) override {
content_flags |= ContentFlags::HAS_DELETE;
return Status::OK();
}
Status SingleDeleteCF(uint32_t, const Slice&) override {
content_flags |= ContentFlags::HAS_SINGLE_DELETE;
return Status::OK();
}
Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_DELETE_RANGE;
return Status::OK();
}
Status MergeCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_MERGE;
return Status::OK();
}
Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_BLOB_INDEX;
return Status::OK();
}
Status MarkBeginPrepare(bool unprepare) override {
content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
if (unprepare) {
content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
}
return Status::OK();
}
Status MarkEndPrepare(const Slice&) override {
content_flags |= ContentFlags::HAS_END_PREPARE;
return Status::OK();
}
Status MarkCommit(const Slice&) override {
content_flags |= ContentFlags::HAS_COMMIT;
return Status::OK();
}
Status MarkRollback(const Slice&) override {
content_flags |= ContentFlags::HAS_ROLLBACK;
return Status::OK();
}
};
class TimestampAssigner : public WriteBatch::Handler {
public:
explicit TimestampAssigner(const Slice& ts,
WriteBatch::ProtectionInfo* prot_info)
: timestamp_(ts),
timestamps_(kEmptyTimestampList),
prot_info_(prot_info) {}
explicit TimestampAssigner(const std::vector<Slice>& ts_list,
WriteBatch::ProtectionInfo* prot_info)
: timestamps_(ts_list), prot_info_(prot_info) {
SanityCheck();
}
~TimestampAssigner() override {}
Status PutCF(uint32_t, const Slice& key, const Slice&) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}
Status DeleteCF(uint32_t, const Slice& key) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}
Status SingleDeleteCF(uint32_t, const Slice& key) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}
Status DeleteRangeCF(uint32_t, const Slice& begin_key,
const Slice& ) override {
AssignTimestamp(begin_key);
++idx_;
return Status::OK();
}
Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}
Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
return Status::OK();
}
Status MarkBeginPrepare(bool) override {
return Status::OK();
}
Status MarkEndPrepare(const Slice&) override {
return Status::OK();
}
Status MarkCommit(const Slice&) override {
return Status::OK();
}
Status MarkRollback(const Slice&) override {
return Status::OK();
}
private:
void SanityCheck() const {
assert(!timestamps_.empty());
#ifndef NDEBUG
const size_t ts_sz = timestamps_[0].size();
for (size_t i = 1; i != timestamps_.size(); ++i) {
assert(ts_sz == timestamps_[i].size());
}
#endif }
void AssignTimestamp(const Slice& key) {
assert(timestamps_.empty() || idx_ < timestamps_.size());
const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
size_t ts_sz = ts.size();
char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
if (prot_info_ != nullptr) {
Slice old_ts(ptr, ts_sz), new_ts(ts.data(), ts_sz);
prot_info_->entries_[idx_].UpdateT(old_ts, new_ts);
}
memcpy(ptr, ts.data(), ts_sz);
}
static const std::vector<Slice> kEmptyTimestampList;
const Slice timestamp_;
const std::vector<Slice>& timestamps_;
WriteBatch::ProtectionInfo* const prot_info_;
size_t idx_ = 0;
TimestampAssigner(const TimestampAssigner&) = delete;
TimestampAssigner(TimestampAssigner&&) = delete;
TimestampAssigner& operator=(const TimestampAssigner&) = delete;
TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
};
const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;
}
struct SavePoints {
std::stack<SavePoint, autovector<SavePoint>> stack;
};
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
: content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(0) {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
? reserved_bytes
: WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz)
: content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
reserved_bytes : WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz,
size_t protection_bytes_per_key)
: content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
assert(protection_bytes_per_key == 0 || protection_bytes_per_key == 8);
if (protection_bytes_per_key != 0) {
prot_info_.reset(new WriteBatch::ProtectionInfo());
}
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
? reserved_bytes
: WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}
WriteBatch::WriteBatch(const std::string& rep)
: content_flags_(ContentFlags::DEFERRED),
max_bytes_(0),
rep_(rep),
timestamp_size_(0) {}
WriteBatch::WriteBatch(std::string&& rep)
: content_flags_(ContentFlags::DEFERRED),
max_bytes_(0),
rep_(std::move(rep)),
timestamp_size_(0) {}
WriteBatch::WriteBatch(const WriteBatch& src)
: wal_term_point_(src.wal_term_point_),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_),
rep_(src.rep_),
timestamp_size_(src.timestamp_size_) {
if (src.save_points_ != nullptr) {
save_points_.reset(new SavePoints());
save_points_->stack = src.save_points_->stack;
}
if (src.prot_info_ != nullptr) {
prot_info_.reset(new WriteBatch::ProtectionInfo());
prot_info_->entries_ = src.prot_info_->entries_;
}
}
WriteBatch::WriteBatch(WriteBatch&& src) noexcept
: save_points_(std::move(src.save_points_)),
wal_term_point_(std::move(src.wal_term_point_)),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_),
prot_info_(std::move(src.prot_info_)),
rep_(std::move(src.rep_)),
timestamp_size_(src.timestamp_size_) {}
WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
if (&src != this) {
this->~WriteBatch();
new (this) WriteBatch(src);
}
return *this;
}
WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
if (&src != this) {
this->~WriteBatch();
new (this) WriteBatch(std::move(src));
}
return *this;
}
WriteBatch::~WriteBatch() { }
WriteBatch::Handler::~Handler() { }
void WriteBatch::Handler::LogData(const Slice& ) {
}
bool WriteBatch::Handler::Continue() {
return true;
}
void WriteBatch::Clear() {
rep_.clear();
rep_.resize(WriteBatchInternal::kHeader);
content_flags_.store(0, std::memory_order_relaxed);
if (save_points_ != nullptr) {
while (!save_points_->stack.empty()) {
save_points_->stack.pop();
}
}
if (prot_info_ != nullptr) {
prot_info_->entries_.clear();
}
wal_term_point_.clear();
}
uint32_t WriteBatch::Count() const { return WriteBatchInternal::Count(this); }
uint32_t WriteBatch::ComputeContentFlags() const {
auto rv = content_flags_.load(std::memory_order_relaxed);
if ((rv & ContentFlags::DEFERRED) != 0) {
BatchContentClassifier classifier;
Iterate(&classifier).PermitUncheckedError();
rv = classifier.content_flags;
content_flags_.store(rv, std::memory_order_relaxed);
}
return rv;
}
void WriteBatch::MarkWalTerminationPoint() {
wal_term_point_.size = GetDataSize();
wal_term_point_.count = Count();
wal_term_point_.content_flags = content_flags_;
}
size_t WriteBatch::GetProtectionBytesPerKey() const {
if (prot_info_ != nullptr) {
return prot_info_->GetBytesPerKey();
}
return 0;
}
bool WriteBatch::HasPut() const {
return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
}
bool WriteBatch::HasDelete() const {
return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
}
bool WriteBatch::HasSingleDelete() const {
return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
}
bool WriteBatch::HasDeleteRange() const {
return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
}
bool WriteBatch::HasMerge() const {
return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
}
bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
assert(input != nullptr && key != nullptr);
input->remove_prefix(1);
if (cf_record) {
uint32_t cf;
if (!GetVarint32(input, &cf)) {
return false;
}
}
return GetLengthPrefixedSlice(input, key);
}
bool WriteBatch::HasBeginPrepare() const {
return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
}
bool WriteBatch::HasEndPrepare() const {
return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
}
bool WriteBatch::HasCommit() const {
return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
}
bool WriteBatch::HasRollback() const {
return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
}
Status ReadRecordFromWriteBatch(Slice* input, char* tag,
uint32_t* column_family, Slice* key,
Slice* value, Slice* blob, Slice* xid) {
assert(key != nullptr && value != nullptr);
*tag = (*input)[0];
input->remove_prefix(1);
*column_family = 0; switch (*tag) {
case kTypeColumnFamilyValue:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Put");
}
FALLTHROUGH_INTENDED;
case kTypeValue:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeColumnFamilyDeletion:
case kTypeColumnFamilySingleDeletion:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Delete");
}
FALLTHROUGH_INTENDED;
case kTypeDeletion:
case kTypeSingleDeletion:
if (!GetLengthPrefixedSlice(input, key)) {
return Status::Corruption("bad WriteBatch Delete");
}
break;
case kTypeColumnFamilyRangeDeletion:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch DeleteRange");
}
FALLTHROUGH_INTENDED;
case kTypeRangeDeletion:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch DeleteRange");
}
break;
case kTypeColumnFamilyMerge:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Merge");
}
FALLTHROUGH_INTENDED;
case kTypeMerge:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch Merge");
}
break;
case kTypeColumnFamilyBlobIndex:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch BlobIndex");
}
FALLTHROUGH_INTENDED;
case kTypeBlobIndex:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch BlobIndex");
}
break;
case kTypeLogData:
assert(blob != nullptr);
if (!GetLengthPrefixedSlice(input, blob)) {
return Status::Corruption("bad WriteBatch Blob");
}
break;
case kTypeNoop:
case kTypeBeginPrepareXID:
case kTypeBeginPersistedPrepareXID:
case kTypeBeginUnprepareXID:
break;
case kTypeEndPrepareXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad EndPrepare XID");
}
break;
case kTypeCommitXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad Commit XID");
}
break;
case kTypeRollbackXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad Rollback XID");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
return Status::OK();
}
Status WriteBatch::Iterate(Handler* handler) const {
if (rep_.size() < WriteBatchInternal::kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
return WriteBatchInternal::Iterate(this, handler, WriteBatchInternal::kHeader,
rep_.size());
}
Status WriteBatchInternal::Iterate(const WriteBatch* wb,
WriteBatch::Handler* handler, size_t begin,
size_t end) {
if (begin > wb->rep_.size() || end > wb->rep_.size() || end < begin) {
return Status::Corruption("Invalid start/end bounds for Iterate");
}
assert(begin <= end);
Slice input(wb->rep_.data() + begin, static_cast<size_t>(end - begin));
bool whole_batch =
(begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
Slice key, value, blob, xid;
bool empty_batch = true;
uint32_t found = 0;
Status s;
char tag = 0;
uint32_t column_family = 0; bool last_was_try_again = false;
bool handler_continue = true;
while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) {
handler_continue = handler->Continue();
if (!handler_continue) {
break;
}
if (LIKELY(!s.IsTryAgain())) {
last_was_try_again = false;
tag = 0;
column_family = 0;
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob, &xid);
if (!s.ok()) {
return s;
}
} else {
assert(s.IsTryAgain());
assert(!last_was_try_again); if (UNLIKELY(last_was_try_again)) {
return Status::Corruption(
"two consecutive TryAgain in WriteBatch handler; this is either a "
"software bug or data corruption.");
}
last_was_try_again = true;
s = Status::OK();
}
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
s = handler->PutCF(column_family, key, value);
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
s = handler->DeleteCF(column_family, key);
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
s = handler->SingleDeleteCF(column_family, key);
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyRangeDeletion:
case kTypeRangeDeletion:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
s = handler->DeleteRangeCF(column_family, key, value);
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
s = handler->MergeCF(column_family, key, value);
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
break;
case kTypeColumnFamilyBlobIndex:
case kTypeBlobIndex:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
s = handler->PutBlobIndexCF(column_family, key, value);
if (LIKELY(s.ok())) {
found++;
}
break;
case kTypeLogData:
handler->LogData(blob);
empty_batch = false;
break;
case kTypeBeginPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
s = handler->MarkBeginPrepare();
assert(s.ok());
empty_batch = false;
if (!handler->WriteAfterCommit()) {
s = Status::NotSupported(
"WriteCommitted txn tag when write_after_commit_ is disabled (in "
"WritePrepared/WriteUnprepared mode). If it is not due to "
"corruption, the WAL must be emptied before changing the "
"WritePolicy.");
}
if (handler->WriteBeforePrepare()) {
s = Status::NotSupported(
"WriteCommitted txn tag when write_before_prepare_ is enabled "
"(in WriteUnprepared mode). If it is not due to corruption, the "
"WAL must be emptied before changing the WritePolicy.");
}
break;
case kTypeBeginPersistedPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
s = handler->MarkBeginPrepare();
assert(s.ok());
empty_batch = false;
if (handler->WriteAfterCommit()) {
s = Status::NotSupported(
"WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
"is enabled (in default WriteCommitted mode). If it is not due "
"to corruption, the WAL must be emptied before changing the "
"WritePolicy.");
}
break;
case kTypeBeginUnprepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
s = handler->MarkBeginPrepare(true );
assert(s.ok());
empty_batch = false;
if (handler->WriteAfterCommit()) {
s = Status::NotSupported(
"WriteUnprepared txn tag when write_after_commit_ is enabled (in "
"default WriteCommitted mode). If it is not due to corruption, "
"the WAL must be emptied before changing the WritePolicy.");
}
if (!handler->WriteBeforePrepare()) {
s = Status::NotSupported(
"WriteUnprepared txn tag when write_before_prepare_ is disabled "
"(in WriteCommitted/WritePrepared mode). If it is not due to "
"corruption, the WAL must be emptied before changing the "
"WritePolicy.");
}
break;
case kTypeEndPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
s = handler->MarkEndPrepare(xid);
assert(s.ok());
empty_batch = true;
break;
case kTypeCommitXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
s = handler->MarkCommit(xid);
assert(s.ok());
empty_batch = true;
break;
case kTypeRollbackXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
s = handler->MarkRollback(xid);
assert(s.ok());
empty_batch = true;
break;
case kTypeNoop:
s = handler->MarkNoop(empty_batch);
assert(s.ok());
empty_batch = true;
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (!s.ok()) {
return s;
}
if (handler_continue && whole_batch &&
found != WriteBatchInternal::Count(wb)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}
bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) {
return b->is_latest_persistent_state_;
}
void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) {
b->is_latest_persistent_state_ = true;
}
uint32_t WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}
void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) {
EncodeFixed32(&b->rep_[8], n);
}
SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
return SequenceNumber(DecodeFixed64(b->rep_.data()));
}
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}
size_t WriteBatchInternal::GetFirstOffset(WriteBatch* ) {
return WriteBatchInternal::kHeader;
}
Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
if (key.size() > size_t{port::kMaxUint32}) {
return Status::InvalidArgument("key is too large");
}
if (value.size() > size_t{port::kMaxUint32}) {
return Status::InvalidArgument("value is too large");
}
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
std::string timestamp(b->timestamp_size_, '\0');
if (0 == b->timestamp_size_) {
PutLengthPrefixedSlice(&b->rep_, key);
} else {
PutVarint32(&b->rep_,
static_cast<uint32_t>(key.size() + b->timestamp_size_));
b->rep_.append(key.data(), key.size());
b->rep_.append(timestamp);
}
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVOT(key, value, kTypeValue, timestamp)
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
value);
}
Status WriteBatchInternal::CheckSlicePartsLength(const SliceParts& key,
const SliceParts& value) {
size_t total_key_bytes = 0;
for (int i = 0; i < key.num_parts; ++i) {
total_key_bytes += key.parts[i].size();
}
if (total_key_bytes >= size_t{port::kMaxUint32}) {
return Status::InvalidArgument("key is too large");
}
size_t total_value_bytes = 0;
for (int i = 0; i < value.num_parts; ++i) {
total_value_bytes += value.parts[i].size();
}
if (total_value_bytes >= size_t{port::kMaxUint32}) {
return Status::InvalidArgument("value is too large");
}
return Status::OK();
}
Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value) {
Status s = CheckSlicePartsLength(key, value);
if (!s.ok()) {
return s;
}
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
std::string timestamp(b->timestamp_size_, '\0');
if (0 == b->timestamp_size_) {
PutLengthPrefixedSliceParts(&b->rep_, key);
} else {
PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
}
PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVOT(key, value, kTypeValue, timestamp)
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) {
return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
value);
}
Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
b->rep_.push_back(static_cast<char>(kTypeNoop));
return Status::OK();
}
Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid,
bool write_after_commit,
bool unprepared_batch) {
assert(b->rep_[12] == static_cast<char>(kTypeNoop));
if (b->save_points_ != nullptr) {
while (!b->save_points_->stack.empty()) {
b->save_points_->stack.pop();
}
}
b->rep_[12] = static_cast<char>(
write_after_commit ? kTypeBeginPrepareXID
: (unprepared_batch ? kTypeBeginUnprepareXID
: kTypeBeginPersistedPrepareXID));
b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_END_PREPARE |
ContentFlags::HAS_BEGIN_PREPARE,
std::memory_order_relaxed);
if (unprepared_batch) {
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_BEGIN_UNPREPARE,
std::memory_order_relaxed);
}
return Status::OK();
}
Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeCommitXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_COMMIT,
std::memory_order_relaxed);
return Status::OK();
}
Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_ROLLBACK,
std::memory_order_relaxed);
return Status::OK();
}
Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
const Slice& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id);
}
std::string timestamp(b->timestamp_size_, '\0');
if (0 == b->timestamp_size_) {
PutLengthPrefixedSlice(&b->rep_, key);
} else {
PutVarint32(&b->rep_,
static_cast<uint32_t>(key.size() + b->timestamp_size_));
b->rep_.append(key.data(), key.size());
b->rep_.append(timestamp);
}
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVOT(key, "" , kTypeDeletion, timestamp)
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
key);
}
Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id);
}
std::string timestamp(b->timestamp_size_, '\0');
if (0 == b->timestamp_size_) {
PutLengthPrefixedSliceParts(&b->rep_, key);
} else {
PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
}
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVOT(key,
SliceParts(nullptr , 0 ),
kTypeDeletion, timestamp)
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
key);
}
Status WriteBatchInternal::SingleDelete(WriteBatch* b,
uint32_t column_family_id,
const Slice& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_SINGLE_DELETE,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(ProtectionInfo64()
.ProtectKVOT(key, "" ,
kTypeSingleDeletion,
"" )
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) {
return WriteBatchInternal::SingleDelete(
this, GetColumnFamilyID(column_family), key);
}
Status WriteBatchInternal::SingleDelete(WriteBatch* b,
uint32_t column_family_id,
const SliceParts& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_SINGLE_DELETE,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVOT(key,
SliceParts(nullptr ,
0 ) ,
kTypeSingleDeletion, "" )
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
return WriteBatchInternal::SingleDelete(
this, GetColumnFamilyID(column_family), key);
}
Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
const Slice& begin_key,
const Slice& end_key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, begin_key);
PutLengthPrefixedSlice(&b->rep_, end_key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE_RANGE,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(ProtectionInfo64()
.ProtectKVOT(begin_key, end_key,
kTypeRangeDeletion,
"" )
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) {
return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
begin_key, end_key);
}
Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
const SliceParts& begin_key,
const SliceParts& end_key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, begin_key);
PutLengthPrefixedSliceParts(&b->rep_, end_key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE_RANGE,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(ProtectionInfo64()
.ProtectKVOT(begin_key, end_key,
kTypeRangeDeletion,
"" )
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
const SliceParts& begin_key,
const SliceParts& end_key) {
return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
begin_key, end_key);
}
Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
if (key.size() > size_t{port::kMaxUint32}) {
return Status::InvalidArgument("key is too large");
}
if (value.size() > size_t{port::kMaxUint32}) {
return Status::InvalidArgument("value is too large");
}
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeMerge));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_MERGE,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVOT(key, value, kTypeMerge, "" )
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
value);
}
Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key,
const SliceParts& value) {
Status s = CheckSlicePartsLength(key, value);
if (!s.ok()) {
return s;
}
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeMerge));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_MERGE,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVOT(key, value, kTypeMerge, "" )
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value) {
return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
value);
}
Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
uint32_t column_family_id,
const Slice& key, const Slice& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_BLOB_INDEX,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVOT(key, value, kTypeBlobIndex, "" )
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::PutLogData(const Slice& blob) {
LocalSavePoint save(this);
rep_.push_back(static_cast<char>(kTypeLogData));
PutLengthPrefixedSlice(&rep_, blob);
return save.commit();
}
void WriteBatch::SetSavePoint() {
if (save_points_ == nullptr) {
save_points_.reset(new SavePoints());
}
save_points_->stack.push(SavePoint(
GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
}
Status WriteBatch::RollbackToSavePoint() {
if (save_points_ == nullptr || save_points_->stack.size() == 0) {
return Status::NotFound();
}
SavePoint savepoint = save_points_->stack.top();
save_points_->stack.pop();
assert(savepoint.size <= rep_.size());
assert(static_cast<uint32_t>(savepoint.count) <= Count());
if (savepoint.size == rep_.size()) {
} else if (savepoint.size == 0) {
Clear();
} else {
rep_.resize(savepoint.size);
if (prot_info_ != nullptr) {
prot_info_->entries_.resize(savepoint.count);
}
WriteBatchInternal::SetCount(this, savepoint.count);
content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
}
return Status::OK();
}
Status WriteBatch::PopSavePoint() {
if (save_points_ == nullptr || save_points_->stack.size() == 0) {
return Status::NotFound();
}
save_points_->stack.pop();
return Status::OK();
}
Status WriteBatch::AssignTimestamp(const Slice& ts) {
TimestampAssigner ts_assigner(ts, prot_info_.get());
return Iterate(&ts_assigner);
}
Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
TimestampAssigner ts_assigner(ts_list, prot_info_.get());
return Iterate(&ts_assigner);
}
class MemTableInserter : public WriteBatch::Handler {
SequenceNumber sequence_;
ColumnFamilyMemTables* const cf_mems_;
FlushScheduler* const flush_scheduler_;
TrimHistoryScheduler* const trim_history_scheduler_;
const bool ignore_missing_column_families_;
const uint64_t recovering_log_number_;
uint64_t log_number_ref_;
DBImpl* db_;
const bool concurrent_memtable_writes_;
bool post_info_created_;
const WriteBatch::ProtectionInfo* prot_info_;
size_t prot_info_idx_;
bool* has_valid_writes_;
using MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
using PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
PostMapType mem_post_info_map_;
WriteBatch* rebuilding_trx_;
SequenceNumber rebuilding_trx_seq_;
bool seq_per_batch_;
bool write_after_commit_;
bool write_before_prepare_;
bool unprepared_batch_;
using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
DupDetector duplicate_detector_;
bool dup_dectector_on_;
bool hint_per_batch_;
bool hint_created_;
using HintMap = std::unordered_map<MemTable*, void*>;
using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
HintMapType hint_;
HintMap& GetHintMap() {
assert(hint_per_batch_);
if (!hint_created_) {
new (&hint_) HintMap();
hint_created_ = true;
}
return *reinterpret_cast<HintMap*>(&hint_);
}
MemPostInfoMap& GetPostMap() {
assert(concurrent_memtable_writes_);
if(!post_info_created_) {
new (&mem_post_info_map_) MemPostInfoMap();
post_info_created_ = true;
}
return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
}
bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
assert(!write_after_commit_);
assert(rebuilding_trx_ != nullptr);
if (!dup_dectector_on_) {
new (&duplicate_detector_) DuplicateDetector(db_);
dup_dectector_on_ = true;
}
return reinterpret_cast<DuplicateDetector*>
(&duplicate_detector_)->IsDuplicateKeySeq(column_family_id, key, sequence_);
}
const ProtectionInfoKVOTC64* NextProtectionInfo() {
const ProtectionInfoKVOTC64* res = nullptr;
if (prot_info_ != nullptr) {
assert(prot_info_idx_ < prot_info_->entries_.size());
res = &prot_info_->entries_[prot_info_idx_];
++prot_info_idx_;
}
return res;
}
protected:
bool WriteBeforePrepare() const override { return write_before_prepare_; }
bool WriteAfterCommit() const override { return write_after_commit_; }
public:
MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families,
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes,
const WriteBatch::ProtectionInfo* prot_info,
bool* has_valid_writes = nullptr, bool seq_per_batch = false,
bool batch_per_txn = true, bool hint_per_batch = false)
: sequence_(_sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
trim_history_scheduler_(trim_history_scheduler),
ignore_missing_column_families_(ignore_missing_column_families),
recovering_log_number_(recovering_log_number),
log_number_ref_(0),
db_(static_cast_with_check<DBImpl>(db)),
concurrent_memtable_writes_(concurrent_memtable_writes),
post_info_created_(false),
prot_info_(prot_info),
prot_info_idx_(0),
has_valid_writes_(has_valid_writes),
rebuilding_trx_(nullptr),
rebuilding_trx_seq_(0),
seq_per_batch_(seq_per_batch),
write_after_commit_(!seq_per_batch),
write_before_prepare_(!batch_per_txn),
unprepared_batch_(false),
duplicate_detector_(),
dup_dectector_on_(false),
hint_per_batch_(hint_per_batch),
hint_created_(false) {
assert(cf_mems_);
}
~MemTableInserter() override {
if (dup_dectector_on_) {
reinterpret_cast<DuplicateDetector*>
(&duplicate_detector_)->~DuplicateDetector();
}
if (post_info_created_) {
reinterpret_cast<MemPostInfoMap*>
(&mem_post_info_map_)->~MemPostInfoMap();
}
if (hint_created_) {
for (auto iter : GetHintMap()) {
delete[] reinterpret_cast<char*>(iter.second);
}
reinterpret_cast<HintMap*>(&hint_)->~HintMap();
}
delete rebuilding_trx_;
}
MemTableInserter(const MemTableInserter&) = delete;
MemTableInserter& operator=(const MemTableInserter&) = delete;
void MaybeAdvanceSeq(bool batch_boundry = false) {
if (batch_boundry == seq_per_batch_) {
sequence_++;
}
}
void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
void set_prot_info(const WriteBatch::ProtectionInfo* prot_info) {
prot_info_ = prot_info;
prot_info_idx_ = 0;
}
SequenceNumber sequence() const { return sequence_; }
void PostProcess() {
assert(concurrent_memtable_writes_);
if(post_info_created_) {
for (auto& pair : GetPostMap()) {
pair.first->BatchPostProcess(pair.second);
}
}
}
bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
bool found = cf_mems_->Seek(column_family_id);
if (!found) {
if (ignore_missing_column_families_) {
*s = Status::OK();
} else {
*s = Status::InvalidArgument(
"Invalid column family specified in write batch");
}
return false;
}
if (recovering_log_number_ != 0 &&
recovering_log_number_ < cf_mems_->GetLogNumber()) {
*s = Status::OK();
return false;
}
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
if (log_number_ref_ > 0) {
cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
}
return true;
}
Status PutCFImpl(uint32_t column_family_id, const Slice& key,
const Slice& value, ValueType value_type,
const ProtectionInfoKVOTS64* kv_prot_info) {
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
return WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key,
value);
}
Status ret_status;
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
if (ret_status.ok() && rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
key, value);
if (ret_status.ok()) {
MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
}
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false );
}
return ret_status;
}
assert(ret_status.ok());
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetImmutableMemTableOptions();
assert(!seq_per_batch_ || !moptions->inplace_update_support);
if (!moptions->inplace_update_support) {
ret_status =
mem->Add(sequence_, value_type, key, value, kv_prot_info,
concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
} else if (moptions->inplace_callback == nullptr) {
assert(!concurrent_memtable_writes_);
ret_status = mem->Update(sequence_, key, value, kv_prot_info);
} else {
assert(!concurrent_memtable_writes_);
ret_status = mem->UpdateCallback(sequence_, key, value, kv_prot_info);
if (ret_status.IsNotFound()) {
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
ropts.fill_cache = false;
ropts.snapshot = &read_from_snapshot;
std::string prev_value;
std::string merged_value;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
Status get_status = Status::NotSupported();
if (db_ != nullptr && recovering_log_number_ == 0) {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
get_status = db_->Get(ropts, cf_handle, key, &prev_value);
}
if (!get_status.ok() && !get_status.IsNotFound()) {
ret_status = get_status;
} else {
ret_status = Status::OK();
}
if (ret_status.ok()) {
UpdateStatus update_status;
char* prev_buffer = const_cast<char*>(prev_value.c_str());
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
if (get_status.ok()) {
update_status = moptions->inplace_callback(prev_buffer, &prev_size,
value, &merged_value);
} else {
update_status = moptions->inplace_callback(
nullptr , nullptr ,
value, &merged_value);
}
if (update_status == UpdateStatus::UPDATED_INPLACE) {
assert(get_status.ok());
if (kv_prot_info != nullptr) {
ProtectionInfoKVOTS64 updated_kv_prot_info(*kv_prot_info);
updated_kv_prot_info.UpdateV(value,
Slice(prev_buffer, prev_size));
ret_status = mem->Add(sequence_, value_type, key,
Slice(prev_buffer, prev_size),
&updated_kv_prot_info);
} else {
ret_status = mem->Add(sequence_, value_type, key,
Slice(prev_buffer, prev_size),
nullptr );
}
if (ret_status.ok()) {
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
}
} else if (update_status == UpdateStatus::UPDATED) {
if (kv_prot_info != nullptr) {
ProtectionInfoKVOTS64 updated_kv_prot_info(*kv_prot_info);
updated_kv_prot_info.UpdateV(value, merged_value);
ret_status = mem->Add(sequence_, value_type, key,
Slice(merged_value), &updated_kv_prot_info);
} else {
ret_status =
mem->Add(sequence_, value_type, key, Slice(merged_value),
nullptr );
}
if (ret_status.ok()) {
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
}
}
}
}
}
if (UNLIKELY(ret_status.IsTryAgain())) {
assert(seq_per_batch_);
const bool kBatchBoundary = true;
MaybeAdvanceSeq(kBatchBoundary);
} else if (ret_status.ok()) {
MaybeAdvanceSeq();
CheckMemtableFull();
}
if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
key, value);
}
return ret_status;
}
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
const auto* kv_prot_info = NextProtectionInfo();
if (kv_prot_info != nullptr) {
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
return PutCFImpl(column_family_id, key, value, kTypeValue,
&mem_kv_prot_info);
}
return PutCFImpl(column_family_id, key, value, kTypeValue,
nullptr );
}
Status DeleteImpl(uint32_t , const Slice& key,
const Slice& value, ValueType delete_type,
const ProtectionInfoKVOTS64* kv_prot_info) {
Status ret_status;
MemTable* mem = cf_mems_->GetMemTable();
ret_status =
mem->Add(sequence_, delete_type, key, value, kv_prot_info,
concurrent_memtable_writes_, get_post_process_info(mem),
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
if (UNLIKELY(ret_status.IsTryAgain())) {
assert(seq_per_batch_);
const bool kBatchBoundary = true;
MaybeAdvanceSeq(kBatchBoundary);
} else if (ret_status.ok()) {
MaybeAdvanceSeq();
CheckMemtableFull();
}
return ret_status;
}
Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
const auto* kv_prot_info = NextProtectionInfo();
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
return WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
}
Status ret_status;
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
if (ret_status.ok() && rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
ret_status =
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
if (ret_status.ok()) {
MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
}
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false );
}
return ret_status;
}
ColumnFamilyData* cfd = cf_mems_->current();
assert(!cfd || cfd->user_comparator());
const size_t ts_sz = (cfd && cfd->user_comparator())
? cfd->user_comparator()->timestamp_size()
: 0;
const ValueType delete_type =
(0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp;
if (kv_prot_info != nullptr) {
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
mem_kv_prot_info.UpdateO(kTypeDeletion, delete_type);
ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
&mem_kv_prot_info);
} else {
ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type,
nullptr );
}
if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
ret_status =
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
}
return ret_status;
}
Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
const auto* kv_prot_info = NextProtectionInfo();
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
return WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
key);
}
Status ret_status;
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
if (ret_status.ok() && rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
column_family_id, key);
if (ret_status.ok()) {
MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
}
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false );
}
return ret_status;
}
assert(ret_status.ok());
if (kv_prot_info != nullptr) {
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
ret_status = DeleteImpl(column_family_id, key, Slice(),
kTypeSingleDeletion, &mem_kv_prot_info);
} else {
ret_status = DeleteImpl(column_family_id, key, Slice(),
kTypeSingleDeletion, nullptr );
}
if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
column_family_id, key);
}
return ret_status;
}
Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
const Slice& end_key) override {
const auto* kv_prot_info = NextProtectionInfo();
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
return WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
begin_key, end_key);
}
Status ret_status;
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
if (ret_status.ok() && rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
ret_status = WriteBatchInternal::DeleteRange(
rebuilding_trx_, column_family_id, begin_key, end_key);
if (ret_status.ok()) {
MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, begin_key));
}
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false );
}
return ret_status;
}
assert(ret_status.ok());
if (db_ != nullptr) {
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(cf_handle)->cfd();
if (!cfd->is_delete_range_supported()) {
ret_status.PermitUncheckedError();
return Status::NotSupported(
std::string("DeleteRange not supported for table type ") +
cfd->ioptions()->table_factory->Name() + " in CF " +
cfd->GetName());
}
int cmp = cfd->user_comparator()->Compare(begin_key, end_key);
if (cmp > 0) {
ret_status.PermitUncheckedError();
return Status::InvalidArgument("end key comes before start key");
} else if (cmp == 0) {
ret_status.PermitUncheckedError();
return Status::OK();
}
}
if (kv_prot_info != nullptr) {
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
ret_status = DeleteImpl(column_family_id, begin_key, end_key,
kTypeRangeDeletion, &mem_kv_prot_info);
} else {
ret_status = DeleteImpl(column_family_id, begin_key, end_key,
kTypeRangeDeletion, nullptr );
}
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
ret_status = WriteBatchInternal::DeleteRange(
rebuilding_trx_, column_family_id, begin_key, end_key);
}
return ret_status;
}
Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
const auto* kv_prot_info = NextProtectionInfo();
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
return WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
value);
}
Status ret_status;
if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) {
if (ret_status.ok() && rebuilding_trx_ != nullptr) {
assert(!write_after_commit_);
ret_status = WriteBatchInternal::Merge(rebuilding_trx_,
column_family_id, key, value);
if (ret_status.ok()) {
MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key));
}
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false );
}
return ret_status;
}
assert(ret_status.ok());
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetImmutableMemTableOptions();
if (moptions->merge_operator == nullptr) {
return Status::InvalidArgument(
"Merge requires `ColumnFamilyOptions::merge_operator != nullptr`");
}
bool perform_merge = false;
assert(!concurrent_memtable_writes_ ||
moptions->max_successive_merges == 0);
if (moptions->max_successive_merges > 0 && db_ != nullptr &&
recovering_log_number_ == 0) {
assert(!concurrent_memtable_writes_);
LookupKey lkey(key, sequence_);
size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
if (num_merges >= moptions->max_successive_merges) {
perform_merge = true;
}
}
if (perform_merge) {
std::string get_value;
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions read_options;
read_options.snapshot = &read_from_snapshot;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
Status get_status = db_->Get(read_options, cf_handle, key, &get_value);
if (!get_status.ok()) {
perform_merge = false;
} else {
Slice get_value_slice = Slice(get_value);
auto merge_operator = moptions->merge_operator;
assert(merge_operator);
std::string new_value;
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, &get_value_slice, {value}, &new_value,
moptions->info_log, moptions->statistics,
SystemClock::Default().get());
if (!merge_status.ok()) {
perform_merge = false;
} else {
assert(!concurrent_memtable_writes_);
if (kv_prot_info != nullptr) {
auto merged_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
merged_kv_prot_info.UpdateV(value, new_value);
merged_kv_prot_info.UpdateO(kTypeMerge, kTypeValue);
ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
&merged_kv_prot_info);
} else {
ret_status = mem->Add(sequence_, kTypeValue, key, new_value,
nullptr );
}
}
}
}
if (!perform_merge) {
assert(ret_status.ok());
if (kv_prot_info != nullptr) {
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
ret_status =
mem->Add(sequence_, kTypeMerge, key, value, &mem_kv_prot_info,
concurrent_memtable_writes_, get_post_process_info(mem));
} else {
ret_status = mem->Add(
sequence_, kTypeMerge, key, value, nullptr ,
concurrent_memtable_writes_, get_post_process_info(mem));
}
}
if (UNLIKELY(ret_status.IsTryAgain())) {
assert(seq_per_batch_);
const bool kBatchBoundary = true;
MaybeAdvanceSeq(kBatchBoundary);
} else if (ret_status.ok()) {
MaybeAdvanceSeq();
CheckMemtableFull();
}
if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);
ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
key, value);
}
return ret_status;
}
Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
const auto* kv_prot_info = NextProtectionInfo();
if (kv_prot_info != nullptr) {
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
&mem_kv_prot_info);
} else {
return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
nullptr );
}
}
void CheckMemtableFull() {
if (flush_scheduler_ != nullptr) {
auto* cfd = cf_mems_->current();
assert(cfd != nullptr);
if (cfd->mem()->ShouldScheduleFlush() &&
cfd->mem()->MarkFlushScheduled()) {
flush_scheduler_->ScheduleWork(cfd);
}
}
if (trim_history_scheduler_ != nullptr) {
auto* cfd = cf_mems_->current();
assert(cfd);
assert(cfd->ioptions());
const size_t size_to_maintain = static_cast<size_t>(
cfd->ioptions()->max_write_buffer_size_to_maintain);
if (size_to_maintain > 0) {
MemTableList* const imm = cfd->imm();
assert(imm);
if (imm->HasHistory()) {
const MemTable* const mem = cfd->mem();
assert(mem);
if (mem->ApproximateMemoryUsageFast() +
imm->ApproximateMemoryUsageExcludingLast() >=
size_to_maintain &&
imm->MarkTrimHistoryNeeded()) {
trim_history_scheduler_->ScheduleWork(cfd);
}
}
}
}
}
Status MarkBeginPrepare(bool unprepare) override {
assert(rebuilding_trx_ == nullptr);
assert(db_);
if (recovering_log_number_ != 0) {
if (db_->allow_2pc() == false) {
return Status::NotSupported(
"WAL contains prepared transactions. Open with "
"TransactionDB::Open().");
}
rebuilding_trx_ = new WriteBatch();
rebuilding_trx_seq_ = sequence_;
assert(!unprepared_batch_);
unprepared_batch_ = unprepare;
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
}
return Status::OK();
}
Status MarkEndPrepare(const Slice& name) override {
assert(db_);
assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
if (recovering_log_number_ != 0) {
assert(db_->allow_2pc());
size_t batch_cnt =
write_after_commit_
? 0 : static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
rebuilding_trx_, rebuilding_trx_seq_,
batch_cnt, unprepared_batch_);
unprepared_batch_ = false;
rebuilding_trx_ = nullptr;
} else {
assert(rebuilding_trx_ == nullptr);
}
const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry);
return Status::OK();
}
Status MarkNoop(bool empty_batch) override {
if (!empty_batch) {
const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry);
}
return Status::OK();
}
Status MarkCommit(const Slice& name) override {
assert(db_);
Status s;
if (recovering_log_number_ != 0) {
auto trx = db_->GetRecoveredTransaction(name.ToString());
if (trx != nullptr) {
assert(log_number_ref_ == 0);
if (write_after_commit_) {
assert(trx->batches_.size() == 1);
const auto& batch_info = trx->batches_.begin()->second;
log_number_ref_ = batch_info.log_number_;
s = batch_info.batch_->Iterate(this);
log_number_ref_ = 0;
}
if (s.ok()) {
db_->DeleteRecoveredTransaction(name.ToString());
}
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
}
} else {
assert(!write_after_commit_ || log_number_ref_ > 0);
}
const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry);
return s;
}
Status MarkRollback(const Slice& name) override {
assert(db_);
if (recovering_log_number_ != 0) {
auto trx = db_->GetRecoveredTransaction(name.ToString());
if (trx != nullptr) {
db_->DeleteRecoveredTransaction(name.ToString());
}
} else {
}
const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry);
return Status::OK();
}
private:
MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
if (!concurrent_memtable_writes_) {
return nullptr;
}
return &GetPostMap()[mem];
}
};
Status WriteBatchInternal::InsertInto(
WriteThread::WriteGroup& write_group, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch, bool batch_per_txn) {
MemTableInserter inserter(
sequence, memtables, flush_scheduler, trim_history_scheduler,
ignore_missing_column_families, recovery_log_number, db,
concurrent_memtable_writes, nullptr ,
nullptr , seq_per_batch, batch_per_txn);
for (auto w : write_group) {
if (w->CallbackFailed()) {
continue;
}
w->sequence = inserter.sequence();
if (!w->ShouldWriteToMemtable()) {
inserter.MaybeAdvanceSeq(true);
continue;
}
SetSequence(w->batch, inserter.sequence());
inserter.set_log_number_ref(w->log_ref);
inserter.set_prot_info(w->batch->prot_info_.get());
w->status = w->batch->Iterate(&inserter);
if (!w->status.ok()) {
return w->status;
}
assert(!seq_per_batch || w->batch_cnt != 0);
assert(!seq_per_batch || inserter.sequence() - w->sequence == w->batch_cnt);
}
return Status::OK();
}
Status WriteBatchInternal::InsertInto(
WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
bool batch_per_txn, bool hint_per_batch) {
#ifdef NDEBUG
(void)batch_cnt;
#endif
assert(writer->ShouldWriteToMemtable());
MemTableInserter inserter(sequence, memtables, flush_scheduler,
trim_history_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes, nullptr ,
nullptr , seq_per_batch,
batch_per_txn, hint_per_batch);
SetSequence(writer->batch, sequence);
inserter.set_log_number_ref(writer->log_ref);
inserter.set_prot_info(writer->batch->prot_info_.get());
Status s = writer->batch->Iterate(&inserter);
assert(!seq_per_batch || batch_cnt != 0);
assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt);
if (concurrent_memtable_writes) {
inserter.PostProcess();
}
return s;
}
Status WriteBatchInternal::InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
TrimHistoryScheduler* trim_history_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
bool concurrent_memtable_writes, SequenceNumber* next_seq,
bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) {
MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
trim_history_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes, batch->prot_info_.get(),
has_valid_writes, seq_per_batch, batch_per_txn);
Status s = batch->Iterate(&inserter);
if (next_seq != nullptr) {
*next_seq = inserter.sequence();
}
if (concurrent_memtable_writes) {
inserter.PostProcess();
}
return s;
}
Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= WriteBatchInternal::kHeader);
assert(b->prot_info_ == nullptr);
b->rep_.assign(contents.data(), contents.size());
b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
return Status::OK();
}
Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
const bool wal_only) {
assert(dst->Count() == 0 ||
(dst->prot_info_ == nullptr) == (src->prot_info_ == nullptr));
size_t src_len;
int src_count;
uint32_t src_flags;
const SavePoint& batch_end = src->GetWalTerminationPoint();
if (wal_only && !batch_end.is_cleared()) {
src_len = batch_end.size - WriteBatchInternal::kHeader;
src_count = batch_end.count;
src_flags = batch_end.content_flags;
} else {
src_len = src->rep_.size() - WriteBatchInternal::kHeader;
src_count = Count(src);
src_flags = src->content_flags_.load(std::memory_order_relaxed);
}
if (dst->prot_info_ != nullptr) {
std::copy(src->prot_info_->entries_.begin(),
src->prot_info_->entries_.begin() + src_count,
std::back_inserter(dst->prot_info_->entries_));
} else if (src->prot_info_ != nullptr) {
dst->prot_info_.reset(new WriteBatch::ProtectionInfo(*src->prot_info_));
}
SetCount(dst, Count(dst) + src_count);
assert(src->rep_.size() >= WriteBatchInternal::kHeader);
dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
dst->content_flags_.store(
dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
std::memory_order_relaxed);
return Status::OK();
}
size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
size_t rightByteSize) {
if (leftByteSize == 0 || rightByteSize == 0) {
return leftByteSize + rightByteSize;
} else {
return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
}
}
}