#include "db/db_impl/db_impl.h"
#include <cstdint>
#ifdef OS_SOLARIS
#include <alloca.h>
#endif
#include <cinttypes>
#include <cstdio>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <sstream>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "db/arena_wrapped_db_iter.h"
#include "db/attribute_group_iterator_impl.h"
#include "db/builder.h"
#include "db/coalescing_iterator.h"
#include "db/compaction/compaction_job.h"
#include "db/convenience_impl.h"
#include "db/db_info_dumper.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h"
#include "db/forward_iterator.h"
#include "db/import_column_family_job.h"
#include "db/job_context.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/malloc_stats.h"
#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/periodic_task_scheduler.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
#include "db/transaction_log_impl.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "db/write_callback.h"
#include "env/unique_id_gen.h"
#include "file/file_util.h"
#include "file/filename.h"
#include "file/random_access_file_reader.h"
#include "file/sst_file_manager_impl.h"
#include "logging/auto_roll_logger.h"
#include "logging/log_buffer.h"
#include "logging/logging.h"
#include "monitoring/in_memory_stats_history.h"
#include "monitoring/instrumented_mutex.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/persistent_stats_history.h"
#include "monitoring/thread_status_updater.h"
#include "monitoring/thread_status_util.h"
#include "options/cf_options.h"
#include "options/options_helper.h"
#include "options/options_parser.h"
#include "util/udt_util.h"
#ifdef ROCKSDB_JEMALLOC
#include "port/jemalloc_helper.h"
#endif
#include "port/port.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/statistics.h"
#include "rocksdb/stats_history.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "rocksdb/version.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/get_context.h"
#include "table/merging_iterator.h"
#include "table/multiget_context.h"
#include "table/sst_file_dumper.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "table/unique_id_impl.h"
#include "test_util/sync_point.h"
#include "trace_replay/trace_replay.h"
#include "util/autovector.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
#include "util/defer.h"
#include "util/distributed_mutex.h"
#include "util/hash_containers.h"
#include "util/mutexlock.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "util/udt_util.h"
#include "utilities/trace/replayer_impl.h"
namespace ROCKSDB_NAMESPACE {
const std::string kDefaultColumnFamilyName("default");
const std::string kPersistentStatsColumnFamilyName(
"___rocksdb_stats_history___");
void DumpRocksDBBuildVersion(Logger* log);
CompressionType GetCompressionFlush(
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options) {
if (ioptions.compaction_style == kCompactionStyleUniversal &&
mutable_cf_options.compaction_options_universal
.compression_size_percent >= 0) {
return kNoCompression;
}
if (mutable_cf_options.compression_per_level.empty()) {
return mutable_cf_options.compression;
} else {
return mutable_cf_options.compression_per_level[0];
}
}
namespace {
void DumpSupportInfo(Logger* logger) {
ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
for (auto& compression : OptionsHelper::compression_type_string_map) {
if (compression.second != kNoCompression &&
compression.second != kDisableCompressionOption) {
ROCKS_LOG_HEADER(logger, "\t%s supported: %d", compression.first.c_str(),
CompressionTypeSupported(compression.second));
}
}
ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
crc32c::IsFastCrc32Supported().c_str());
ROCKS_LOG_HEADER(logger, "DMutex implementation: %s", DMutex::kName());
bool jemalloc_supported = false;
#ifdef ROCKSDB_JEMALLOC
jemalloc_supported = HasJemalloc();
#endif
ROCKS_LOG_HEADER(logger, "Jemalloc supported: %d", jemalloc_supported);
}
}
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
const bool seq_per_batch, const bool batch_per_txn,
bool read_only)
: dbname_(dbname),
own_info_log_(options.info_log == nullptr),
initial_db_options_(SanitizeOptions(dbname, options, read_only,
&init_logger_creation_s_)),
env_(initial_db_options_.env),
io_tracer_(std::make_shared<IOTracer>()),
immutable_db_options_(initial_db_options_),
fs_(immutable_db_options_.fs, io_tracer_),
mutable_db_options_(initial_db_options_),
stats_(immutable_db_options_.stats),
#ifdef COERCE_CONTEXT_SWITCH
mutex_(stats_, immutable_db_options_.clock, DB_MUTEX_WAIT_MICROS, &bg_cv_,
immutable_db_options_.use_adaptive_mutex),
#else
mutex_(stats_, immutable_db_options_.clock, DB_MUTEX_WAIT_MICROS,
immutable_db_options_.use_adaptive_mutex),
#endif error_handler_(this, immutable_db_options_, &mutex_),
event_logger_(immutable_db_options_.info_log.get()),
max_total_in_memory_state_(0),
file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite(
file_options_, immutable_db_options_)),
seq_per_batch_(seq_per_batch),
batch_per_txn_(batch_per_txn),
bg_cv_(&mutex_),
wal_sync_cv_(&wal_write_mutex_),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
write_thread_(immutable_db_options_),
nonmem_write_thread_(immutable_db_options_),
write_controller_(mutable_db_options_.delayed_write_rate),
delete_obsolete_files_last_run_(immutable_db_options_.clock->NowMicros()),
wal_manager_(immutable_db_options_, file_options_, io_tracer_,
seq_per_batch),
two_write_queues_(options.two_write_queues),
manual_wal_flush_(options.manual_wal_flush),
last_seq_same_as_publish_seq_(
!(seq_per_batch && options.two_write_queues)),
use_custom_gc_(seq_per_batch),
own_sfm_(options.sst_file_manager == nullptr),
atomic_flush_install_cv_(&mutex_),
blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_,
&error_handler_, &event_logger_,
immutable_db_options_.listeners, dbname_) {
assert(batch_per_txn_ || seq_per_batch_);
const int table_cache_size = (mutable_db_options_.max_open_files == -1)
? TableCache::kInfiniteCapacity
: mutable_db_options_.max_open_files - 10;
LRUCacheOptions co;
co.capacity = table_cache_size;
co.num_shard_bits = immutable_db_options_.table_cache_numshardbits;
co.metadata_charge_policy = kDontChargeCacheMetadata;
co.hash_seed = 0;
table_cache_ = NewLRUCache(co);
SetDbSessionId();
assert(!db_session_id_.empty());
periodic_task_functions_.emplace(PeriodicTaskType::kDumpStats,
[this]() { this->DumpStats(); });
periodic_task_functions_.emplace(PeriodicTaskType::kPersistStats,
[this]() { this->PersistStats(); });
periodic_task_functions_.emplace(PeriodicTaskType::kFlushInfoLog,
[this]() { this->FlushInfoLog(); });
periodic_task_functions_.emplace(
PeriodicTaskType::kRecordSeqnoTime,
[this]() { this->RecordSeqnoToTimeMapping(); });
periodic_task_functions_.emplace(
PeriodicTaskType::kTriggerCompaction,
[this]() { this->TriggerPeriodicCompaction(); });
versions_.reset(new VersionSet(
dbname_, &immutable_db_options_, mutable_db_options_, file_options_,
table_cache_.get(), write_buffer_manager_, &write_controller_,
&block_cache_tracer_, io_tracer_, db_id_, db_session_id_,
options.daily_offpeak_time_utc, &error_handler_, read_only));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
DumpDBFileSummary(immutable_db_options_, dbname_, db_session_id_);
immutable_db_options_.Dump(immutable_db_options_.info_log.get());
mutable_db_options_.Dump(immutable_db_options_.info_log.get());
DumpSupportInfo(immutable_db_options_.info_log.get());
max_total_wal_size_.store(mutable_db_options_.max_total_wal_size,
std::memory_order_relaxed);
if (write_buffer_manager_) {
wbm_stall_.reset(new WBMStallInterface());
}
}
Status DBImpl::Resume() {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
InstrumentedMutexLock db_mutex(&mutex_);
if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
return Status::OK();
}
if (error_handler_.IsRecoveryInProgress()) {
return Status::Busy("Recovery in progress");
}
mutex_.Unlock();
Status s = error_handler_.RecoverFromBGError(true);
mutex_.Lock();
return s;
}
Status DBImpl::ResumeImpl(DBRecoverContext context) {
mutex_.AssertHeld();
const ReadOptions read_options;
const WriteOptions write_options;
WaitForBackgroundWork();
TEST_SYNC_POINT("DBImpl::ResumeImpl:Start");
if (immutable_db_options_.two_write_queues) {
versions_->SyncLastSequenceWithAllocated();
}
TEST_SYNC_POINT("DBImpl::ResumeImpl:AfterSyncSeq");
Status s;
if (shutdown_initiated_) {
s = Status::ShutdownInProgress();
}
if (s.ok()) {
Status bg_error = error_handler_.GetBGError();
if (bg_error.severity() > Status::Severity::kHardError) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"DB resume requested but failed due to Fatal/Unrecoverable error");
s = bg_error;
}
}
if (s.ok()) {
IOStatus io_s = versions_->io_status();
if (io_s.IsIOError()) {
assert(!versions_->descriptor_log_);
VersionEdit edit;
auto cfh =
static_cast_with_check<ColumnFamilyHandleImpl>(default_cf_handle_);
assert(cfh);
ColumnFamilyData* cfd = cfh->cfd();
s = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
if (!s.ok()) {
io_s = versions_->io_status();
if (!io_s.ok()) {
error_handler_.SetBGError(io_s,
BackgroundErrorReason::kManifestWrite);
}
}
}
}
if (s.ok()) {
if (context.flush_reason == FlushReason::kErrorRecoveryRetryFlush) {
s = RetryFlushesForErrorRecovery(FlushReason::kErrorRecoveryRetryFlush,
true );
} else {
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
s = FlushAllColumnFamilies(flush_opts, context.flush_reason);
}
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"DB resume requested but failed due to Flush failure [%s]",
s.ToString().c_str());
}
}
JobContext job_context(0);
FindObsoleteFiles(&job_context, true);
mutex_.Unlock();
job_context.manifest_file_number = 1;
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
mutex_.Lock();
if (s.ok()) {
s = error_handler_.ClearBGError();
} else {
error_handler_.GetRecoveryError().PermitUncheckedError();
}
if (s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
} else {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Failed to resume DB [%s]",
s.ToString().c_str());
}
if (shutdown_initiated_) {
s = Status::ShutdownInProgress();
}
if (s.ok() && context.flush_after_recovery) {
Status status = RetryFlushesForErrorRecovery(
FlushReason::kCatchUpAfterErrorRecovery, false );
if (!status.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"The catch up flush after successful recovery failed [%s]",
s.ToString().c_str());
}
if (shutdown_initiated_) {
s = Status::ShutdownInProgress();
}
}
if (s.ok()) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
EnqueuePendingCompaction(cfd);
}
MaybeScheduleFlushOrCompaction();
}
bg_cv_.SignalAll();
return s;
}
void DBImpl::WaitForBackgroundWork() {
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_) {
bg_cv_.Wait();
}
}
void DBImpl::CancelAllBackgroundWork(bool wait) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Shutdown: canceling all background work");
Status s = CancelPeriodicTaskScheduler();
s.PermitUncheckedError();
InstrumentedMutexLock l(&mutex_);
if (!shutting_down_.load(std::memory_order_acquire) &&
has_unpersisted_data_.load(std::memory_order_relaxed) &&
!mutable_db_options_.avoid_flush_during_shutdown) {
s = DBImpl::FlushAllColumnFamilies(FlushOptions(), FlushReason::kShutDown);
s.PermitUncheckedError(); }
if (immutable_db_options_.compaction_service) {
immutable_db_options_.compaction_service->CancelAwaitingJobs();
}
shutting_down_.store(true, std::memory_order_release);
bg_cv_.SignalAll();
if (!wait) {
return;
}
WaitForBackgroundWork();
}
Status DBImpl::MaybeReleaseTimestampedSnapshotsAndCheck() {
size_t num_snapshots = 0;
ReleaseTimestampedSnapshotsOlderThan(std::numeric_limits<uint64_t>::max(),
&num_snapshots);
if (num_snapshots > 0) {
return Status::Aborted("Cannot close DB with unreleased snapshot.");
}
return Status::OK();
}
void DBImpl::UntrackDataFiles() {
TrackOrUntrackFiles({},
false);
}
Status DBImpl::CloseHelper() {
mutex_.Lock();
shutdown_initiated_ = true;
error_handler_.CancelErrorRecoveryForShutDown();
while (!error_handler_.ReadyForShutdown()) {
bg_cv_.Wait();
}
mutex_.Unlock();
error_handler_.GetRecoveryError().PermitUncheckedError();
CancelAllBackgroundWork(false);
if (HasPendingManualCompaction()) {
DisableManualCompaction();
}
mutex_.Lock();
for (uint8_t i = 0; i < static_cast<uint8_t>(TaskType::kCount); i++) {
env_->UnSchedule(GetTaskTag(i), Env::Priority::BOTTOM);
env_->UnSchedule(GetTaskTag(i), Env::Priority::LOW);
env_->UnSchedule(GetTaskTag(i), Env::Priority::HIGH);
}
Status ret = Status::OK();
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_ || bg_purge_scheduled_ ||
pending_purge_obsolete_files_ ||
error_handler_.IsRecoveryInProgress()) {
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
bg_cv_.Wait();
}
TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
&files_grabbed_for_purge_);
EraseThreadStatusDbInfo();
flush_scheduler_.Clear();
trim_history_scheduler_.Clear();
while (!flush_queue_.empty()) {
const FlushRequest& flush_req = PopFirstFromFlushQueue();
for (const auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
iter.first->UnrefAndTryDelete();
}
}
while (!compaction_queue_.empty()) {
auto cfd = PopFirstFromCompactionQueue();
cfd->UnrefAndTryDelete();
}
if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
mutex_.Unlock();
if (default_cf_handle_) {
delete default_cf_handle_;
default_cf_handle_ = nullptr;
}
if (persist_stats_cf_handle_) {
delete persist_stats_cf_handle_;
persist_stats_cf_handle_ = nullptr;
}
mutex_.Lock();
}
if (opened_successfully_) {
JobContext job_context(next_job_id_.fetch_add(1));
FindObsoleteFiles(&job_context, true);
mutex_.Unlock();
job_context.manifest_file_number = 1;
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
mutex_.Lock();
}
{
InstrumentedMutexLock lock(&wal_write_mutex_);
for (auto l : wals_to_free_) {
delete l;
}
for (auto& log : logs_) {
uint64_t log_number = log.writer->get_log_number();
Status s = log.ClearWriter();
if (!s.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to clear writer for WAL %s with error -- %s",
LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
s.ToString().c_str());
if (ret.ok()) {
ret = s;
}
}
}
logs_.clear();
}
#ifndef NDEBUG
TEST_VerifyNoObsoleteFilesCached(true);
#endif table_cache_->EraseUnRefEntries();
for (auto& txn_entry : recovered_transactions_) {
delete txn_entry.second;
}
if (immutable_db_options_.sst_file_manager && !own_sfm_) {
mutex_.Unlock();
UntrackDataFiles();
mutex_.Lock();
}
{
Status s = versions_->Close(directories_.GetDbDir(), &mutex_);
if (!s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Unable to close MANIFEST with error -- %s",
s.ToString().c_str());
if (ret.ok()) {
ret = s;
}
}
}
versions_.reset();
mutex_.Unlock();
if (db_lock_ != nullptr) {
env_->UnlockFile(db_lock_).PermitUncheckedError();
}
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
LogFlush(immutable_db_options_.info_log);
if (immutable_db_options_.sst_file_manager && own_sfm_) {
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
sfm->Close();
}
if (immutable_db_options_.info_log && own_info_log_) {
Status s = immutable_db_options_.info_log->Close();
if (!s.ok() && !s.IsNotSupported() && ret.ok()) {
ret = s;
}
}
if (write_buffer_manager_ && wbm_stall_) {
write_buffer_manager_->RemoveDBFromQueue(wbm_stall_.get());
}
IOStatus io_s = directories_.Close(IOOptions(), nullptr );
if (!io_s.ok()) {
ret = io_s;
}
if (ret.IsAborted()) {
return Status::Incomplete(ret.ToString());
}
return ret;
}
Status DBImpl::CloseImpl() { return CloseHelper(); }
DBImpl::~DBImpl() {
ThreadStatus::OperationType cur_op_type =
ThreadStatusUtil::GetThreadOperation();
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_UNKNOWN);
init_logger_creation_s_.PermitUncheckedError();
InstrumentedMutexLock closing_lock_guard(&closing_mutex_);
if (!closed_) {
closed_ = true;
{
const Status s = MaybeReleaseTimestampedSnapshotsAndCheck();
s.PermitUncheckedError();
}
closing_status_ = CloseImpl();
closing_status_.PermitUncheckedError();
}
ThreadStatusUtil::SetThreadOperation(cur_op_type);
}
void DBImpl::MaybeIgnoreError(Status* s) const {
if (s->ok() || immutable_db_options_.paranoid_checks) {
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
s->ToString().c_str());
*s = Status::OK();
}
}
const Status DBImpl::CreateArchivalDirectory() {
if (immutable_db_options_.WAL_ttl_seconds > 0 ||
immutable_db_options_.WAL_size_limit_MB > 0) {
std::string archivalPath =
ArchivalDirectory(immutable_db_options_.GetWalDir());
return env_->CreateDirIfMissing(archivalPath);
}
return Status::OK();
}
void DBImpl::PrintStatistics() {
auto dbstats = immutable_db_options_.stats;
if (dbstats) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "STATISTICS:\n %s",
dbstats->ToString().c_str());
}
}
Status DBImpl::StartPeriodicTaskScheduler() {
#ifndef NDEBUG
bool disable_scheduler = false;
TEST_SYNC_POINT_CALLBACK(
"DBImpl::StartPeriodicTaskScheduler:DisableScheduler",
&disable_scheduler);
if (disable_scheduler) {
return Status::OK();
}
{
InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicTaskScheduler:Init",
&periodic_task_scheduler_);
}
#endif if (mutable_db_options_.stats_dump_period_sec > 0) {
Status s = periodic_task_scheduler_.Register(
PeriodicTaskType::kDumpStats,
periodic_task_functions_.at(PeriodicTaskType::kDumpStats),
mutable_db_options_.stats_dump_period_sec,
true);
if (!s.ok()) {
return s;
}
}
if (mutable_db_options_.stats_persist_period_sec > 0) {
Status s = periodic_task_scheduler_.Register(
PeriodicTaskType::kPersistStats,
periodic_task_functions_.at(PeriodicTaskType::kPersistStats),
mutable_db_options_.stats_persist_period_sec,
true);
if (!s.ok()) {
return s;
}
}
Status s = periodic_task_scheduler_.Register(
PeriodicTaskType::kFlushInfoLog,
periodic_task_functions_.at(PeriodicTaskType::kFlushInfoLog),
true);
if (s.ok()) {
s = periodic_task_scheduler_.Register(
PeriodicTaskType::kTriggerCompaction,
periodic_task_functions_.at(PeriodicTaskType::kTriggerCompaction),
false);
}
return s;
}
Status DBImpl::RegisterRecordSeqnoTimeWorker() {
options_mutex_.AssertHeld();
MinAndMaxPreserveSeconds preserve_info;
uint64_t seqno_time_cadence;
{
InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
auto& mopts = cfd->GetLatestMutableCFOptions();
if (!cfd->IsDropped()) {
preserve_info.Combine(mopts);
}
}
seqno_time_cadence = preserve_info.GetRecodingCadence();
if (seqno_time_cadence == 0) {
seqno_to_time_mapping_.SetCapacity(0);
seqno_to_time_mapping_.SetMaxTimeSpan(UINT64_MAX);
assert(seqno_to_time_mapping_.Empty());
} else {
uint64_t cap = std::min(kMaxSeqnoToTimeEntries,
preserve_info.max_preserve_seconds *
kMaxSeqnoTimePairsPerCF /
preserve_info.min_preserve_seconds);
seqno_to_time_mapping_.SetCapacity(cap);
seqno_to_time_mapping_.SetMaxTimeSpan(preserve_info.max_preserve_seconds);
}
}
TEST_SYNC_POINT_CALLBACK(
"DBImpl::RegisterRecordSeqnoTimeWorker:BeforePeriodicTaskType", nullptr);
Status s;
if (seqno_time_cadence == 0) {
s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kRecordSeqnoTime);
} else {
s = periodic_task_scheduler_.Register(
PeriodicTaskType::kRecordSeqnoTime,
periodic_task_functions_.at(PeriodicTaskType::kRecordSeqnoTime),
seqno_time_cadence, true);
}
return s;
}
Status DBImpl::CancelPeriodicTaskScheduler() {
Status s = Status::OK();
for (uint8_t task_type = 0;
task_type < static_cast<uint8_t>(PeriodicTaskType::kMax); task_type++) {
s = periodic_task_scheduler_.Unregister(
static_cast<PeriodicTaskType>(task_type));
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to unregister periodic task %d, status: %s",
task_type, s.ToString().c_str());
}
}
return s;
}
size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
stats_history_mutex_.AssertHeld();
size_t size_total =
sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
if (stats_history_.size() == 0) {
return size_total;
}
size_t size_per_slice =
sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
for (const auto& pairs : stats_history_.begin()->second) {
size_per_slice +=
pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
}
size_total = size_per_slice * stats_history_.size();
return size_total;
}
void DBImpl::PersistStats() {
TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
if (shutdown_initiated_) {
return;
}
TEST_SYNC_POINT("DBImpl::PersistStats:StartRunning");
uint64_t now_seconds =
immutable_db_options_.clock->NowMicros() / kMicrosInSecond;
Statistics* statistics = immutable_db_options_.stats;
if (!statistics) {
return;
}
size_t stats_history_size_limit = 0;
{
InstrumentedMutexLock l(&mutex_);
stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
}
std::map<std::string, uint64_t> stats_map;
if (!statistics->getTickerMap(&stats_map)) {
return;
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"------- PERSISTING STATS -------");
if (immutable_db_options_.persist_stats_to_disk) {
WriteBatch batch;
Status s = Status::OK();
if (stats_slice_initialized_) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Reading %" ROCKSDB_PRIszt " stats from statistics\n",
stats_slice_.size());
for (const auto& stat : stats_map) {
if (s.ok()) {
char key[100];
int length =
EncodePersistentStatsKey(now_seconds, stat.first, 100, key);
if (stats_slice_.find(stat.first) != stats_slice_.end()) {
uint64_t delta = stat.second - stats_slice_[stat.first];
s = batch.Put(persist_stats_cf_handle_,
Slice(key, std::min(100, length)),
std::to_string(delta));
}
}
}
}
stats_slice_initialized_ = true;
std::swap(stats_slice_, stats_map);
if (s.ok()) {
WriteOptions wo;
wo.low_pri = true;
wo.no_slowdown = true;
wo.sync = false;
s = Write(wo, &batch);
}
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Writing to persistent stats CF failed -- %s",
s.ToString().c_str());
} else {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Writing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
" to persistent stats CF succeeded",
stats_slice_.size(), now_seconds);
}
} else {
InstrumentedMutexLock l(&stats_history_mutex_);
if (stats_slice_initialized_) {
std::map<std::string, uint64_t> stats_delta;
for (const auto& stat : stats_map) {
if (stats_slice_.find(stat.first) != stats_slice_.end()) {
stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
}
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Storing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
" to in-memory stats history",
stats_slice_.size(), now_seconds);
stats_history_[now_seconds] = std::move(stats_delta);
}
stats_slice_initialized_ = true;
std::swap(stats_slice_, stats_map);
TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
size_t stats_history_size = EstimateInMemoryStatsHistorySize();
bool purge_needed = stats_history_size > stats_history_size_limit;
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[Pre-GC] In-memory stats history size: %" ROCKSDB_PRIszt
" bytes, slice count: %" ROCKSDB_PRIszt,
stats_history_size, stats_history_.size());
while (purge_needed && !stats_history_.empty()) {
stats_history_.erase(stats_history_.begin());
purge_needed =
EstimateInMemoryStatsHistorySize() > stats_history_size_limit;
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[Post-GC] In-memory stats history size: %" ROCKSDB_PRIszt
" bytes, slice count: %" ROCKSDB_PRIszt,
stats_history_size, stats_history_.size());
}
TEST_SYNC_POINT("DBImpl::PersistStats:End");
}
bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
uint64_t* new_time,
std::map<std::string, uint64_t>* stats_map) {
assert(new_time);
assert(stats_map);
if (!new_time || !stats_map) {
return false;
}
{
InstrumentedMutexLock l(&stats_history_mutex_);
auto it = stats_history_.lower_bound(start_time);
if (it != stats_history_.end() && it->first < end_time) {
*new_time = it->first;
*stats_map = it->second;
return true;
} else {
return false;
}
}
}
Status DBImpl::GetStatsHistory(
uint64_t start_time, uint64_t end_time,
std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
if (!stats_iterator) {
return Status::InvalidArgument("stats_iterator not preallocated.");
}
if (immutable_db_options_.persist_stats_to_disk) {
stats_iterator->reset(
new PersistentStatsHistoryIterator(start_time, end_time, this));
} else {
stats_iterator->reset(
new InMemoryStatsHistoryIterator(start_time, end_time, this));
}
return (*stats_iterator)->status();
}
void DBImpl::DumpStats() {
TEST_SYNC_POINT("DBImpl::DumpStats:1");
std::string stats;
if (shutdown_initiated_) {
return;
}
UnorderedSet<Cache*> probed_caches;
TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
{
InstrumentedMutexLock l(&mutex_);
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
if (!cfd->initialized() || cfd->IsDropped()) {
continue;
}
auto* table_factory =
cfd->GetCurrentMutableCFOptions().table_factory.get();
assert(table_factory != nullptr);
Cache* cache =
table_factory->GetOptions<Cache>(TableFactory::kBlockCacheOpts());
InstrumentedMutexUnlock u(&mutex_);
cfd->internal_stats()->CollectCacheEntryStats(false);
if (immutable_db_options_.info_log) {
if (cache && probed_caches.insert(cache).second) {
cache->ReportProblems(immutable_db_options_.info_log);
}
}
}
const std::string* property = &DB::Properties::kDBStats;
const DBPropertyInfo* property_info = GetPropertyInfo(*property);
assert(property_info != nullptr);
assert(!property_info->need_out_of_mutex);
default_cf_internal_stats_->GetStringProperty(*property_info, *property,
&stats);
property = &InternalStats::kPeriodicCFStats;
property_info = GetPropertyInfo(*property);
assert(property_info != nullptr);
assert(!property_info->need_out_of_mutex);
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->initialized()) {
cfd->internal_stats()->GetStringProperty(*property_info, *property,
&stats);
}
}
}
TEST_SYNC_POINT("DBImpl::DumpStats:2");
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"------- DUMPING STATS -------");
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
if (immutable_db_options_.dump_malloc_stats) {
stats.clear();
DumpMallocStats(&stats);
if (!stats.empty()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"------- Malloc STATS -------");
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
}
}
PrintStatistics();
}
void DBImpl::FlushInfoLog() {
if (shutdown_initiated_) {
return;
}
TEST_SYNC_POINT("DBImpl::FlushInfoLog:StartRunning");
LogFlush(immutable_db_options_.info_log);
}
Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
int max_entries_to_print,
std::string* out_str) {
auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
Version* version = super_version->current;
Status s =
version->TablesRangeTombstoneSummary(max_entries_to_print, out_str);
CleanupSuperVersion(super_version);
return s;
}
void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
mutex_.AssertHeld();
if (!job_context->wals_to_free.empty()) {
for (auto l : job_context->wals_to_free) {
AddToLogsToFreeQueue(l);
}
job_context->wals_to_free.clear();
}
}
FSDirectory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
assert(cfd);
FSDirectory* ret_dir = cfd->GetDataDir(path_id);
if (ret_dir == nullptr) {
return directories_.GetDataDir(path_id);
}
return ret_dir;
}
Status DBImpl::SetOptions(
const std::unordered_map<ColumnFamilyHandle*,
std::unordered_map<std::string, std::string>>&
column_families_opts_map) {
const ReadOptions read_options;
const WriteOptions write_options;
if (column_families_opts_map.empty()) {
return Status::OK();
}
for (const auto& cf_opts : column_families_opts_map) {
if (cf_opts.second.empty()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"SetOptions() on column family [%s], empty input",
cf_opts.first->GetName().c_str());
return Status::InvalidArgument("empty input");
}
}
autovector<std::pair<ColumnFamilyData*,
const std::unordered_map<std::string, std::string>*>>
column_family_datas;
for (const auto& cf_opts : column_families_opts_map) {
column_family_datas.push_back(
{static_cast_with_check<ColumnFamilyHandleImpl>(cf_opts.first)->cfd(),
&cf_opts.second});
}
InstrumentedMutexLock ol(&options_mutex_);
autovector<MutableCFOptions>
new_options_copy; Status s;
Status persist_options_status;
SuperVersionContext sv_context( true);
{
auto db_options = GetDBOptions();
InstrumentedMutexLock l(&mutex_);
VersionEdit dummy_edit;
dummy_edit.MarkNoManifestWriteDummy();
TEST_SYNC_POINT_CALLBACK("DBImpl::SetOptions:dummy_edit", &dummy_edit);
for (const auto& cfd_opts : column_family_datas) {
auto* cfd = cfd_opts.first;
const auto* options_map_ptr = cfd_opts.second;
auto pre_cb = [&]() -> Status {
Status cb_s = cfd->SetOptions(db_options, *options_map_ptr);
if (cb_s.ok()) {
new_options_copy.emplace_back(cfd->GetLatestMutableCFOptions());
}
return cb_s;
};
s = versions_->LogAndApply(
cfd, read_options, write_options, &dummy_edit, &mutex_,
directories_.GetDbDir(), false ,
nullptr , {} , pre_cb);
if (!versions_->io_status().ok()) {
assert(!s.ok());
error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
if (!s.ok()) {
break;
}
}
if (s.ok()) {
for (const auto& cfd_opts : column_family_datas) {
InstallSuperVersionForConfigChange(cfd_opts.first, &sv_context);
}
persist_options_status =
WriteOptionsFile(write_options, true );
bg_cv_.SignalAll();
#ifndef NDEBUG
for (size_t i = 0; i < column_family_datas.size(); ++i) {
auto* cfd = column_family_datas[i].first;
assert(new_options_copy[i] == cfd->GetLatestMutableCFOptions());
assert(cfd->GetLatestMutableCFOptions() ==
cfd->GetCurrentMutableCFOptions());
assert(cfd->GetCurrentMutableCFOptions() ==
cfd->current()->GetMutableCFOptions());
}
#endif
}
}
sv_context.Clean();
if (s.ok()) {
bool needs_seqno_worker = false;
for (const auto& cf_opts : column_families_opts_map) {
if (cf_opts.second.count("preserve_internal_time_seconds") > 0 ||
cf_opts.second.count("preclude_last_level_data_seconds") > 0) {
needs_seqno_worker = true;
break;
}
}
if (needs_seqno_worker) {
s = RegisterRecordSeqnoTimeWorker();
}
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"SetOptions() on [%zu] column families, inputs:",
column_family_datas.size());
for (size_t i = 0; i < column_family_datas.size(); ++i) {
const auto* cfd = column_family_datas[i].first;
const auto* options_map_ptr = column_family_datas[i].second;
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Set options on column family [%s] (%zu/%zu), inputs:",
cfd->GetName().c_str(), i, column_family_datas.size());
for (const auto& o : *options_map_ptr) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n",
o.first.c_str(), o.second.c_str());
}
}
if (s.ok()) {
for (size_t i = 0; i < column_family_datas.size(); ++i) {
const auto* cfd = column_family_datas[i].first;
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Set options on column family [%s] (%zu/%zu) succeeded, "
"updated CF options:",
cfd->GetName().c_str(), i, column_family_datas.size());
new_options_copy[i].Dump(immutable_db_options_.info_log.get());
}
if (!persist_options_status.ok()) {
s = persist_options_status;
}
} else {
persist_options_status.PermitUncheckedError(); ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetOptions() failed: %s",
s.ToString().c_str());
}
LogFlush(immutable_db_options_.info_log);
return s;
}
Status DBImpl::SetDBOptions(
const std::unordered_map<std::string, std::string>& options_map) {
if (options_map.empty()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"SetDBOptions(), empty input.");
return Status::InvalidArgument("empty input");
}
InstrumentedMutexLock ol(&options_mutex_);
MutableDBOptions new_options;
Status s;
Status persist_options_status = Status::OK();
bool wal_size_option_changed = false;
bool wal_other_option_changed = false;
WriteContext write_context;
{
InstrumentedMutexLock l(&mutex_);
s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
&new_options);
if (new_options.bytes_per_sync == 0) {
new_options.bytes_per_sync = 1024 * 1024;
}
if (MutableDBOptionsAreEqual(mutable_db_options_, new_options)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"SetDBOptions(), input option value is not changed, "
"skipping updating.");
persist_options_status.PermitUncheckedError();
return s;
}
DBOptions new_db_options =
BuildDBOptions(immutable_db_options_, new_options);
if (s.ok()) {
s = ValidateOptions(new_db_options);
}
if (s.ok()) {
for (auto c : *versions_->GetColumnFamilySet()) {
if (!c->IsDropped()) {
auto cf_options = c->GetLatestCFOptions();
s = ColumnFamilyData::ValidateOptions(new_db_options, cf_options);
if (!s.ok()) {
break;
}
}
}
}
if (s.ok()) {
const BGJobLimits current_bg_job_limits =
GetBGJobLimits(mutable_db_options_.max_background_flushes,
mutable_db_options_.max_background_compactions,
mutable_db_options_.max_background_jobs,
true);
const BGJobLimits new_bg_job_limits = GetBGJobLimits(
new_options.max_background_flushes,
new_options.max_background_compactions,
new_options.max_background_jobs, true);
const bool max_flushes_increased =
new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
const bool max_compactions_increased =
new_bg_job_limits.max_compactions >
current_bg_job_limits.max_compactions;
const bool offpeak_time_changed =
versions_->offpeak_time_option().daily_offpeak_time_utc !=
new_db_options.daily_offpeak_time_utc;
if (max_flushes_increased || max_compactions_increased ||
offpeak_time_changed) {
if (max_flushes_increased) {
env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes,
Env::Priority::HIGH);
}
if (max_compactions_increased) {
env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions,
Env::Priority::LOW);
}
if (offpeak_time_changed) {
versions_->ChangeOffpeakTimeOption(
new_db_options.daily_offpeak_time_utc);
}
MaybeScheduleFlushOrCompaction();
}
mutex_.Unlock();
if (new_options.stats_dump_period_sec == 0) {
s = periodic_task_scheduler_.Unregister(PeriodicTaskType::kDumpStats);
} else {
s = periodic_task_scheduler_.Register(
PeriodicTaskType::kDumpStats,
periodic_task_functions_.at(PeriodicTaskType::kDumpStats),
new_options.stats_dump_period_sec, true);
}
if (new_options.max_total_wal_size !=
mutable_db_options_.max_total_wal_size) {
max_total_wal_size_.store(new_options.max_total_wal_size,
std::memory_order_release);
}
if (s.ok()) {
if (new_options.stats_persist_period_sec == 0) {
s = periodic_task_scheduler_.Unregister(
PeriodicTaskType::kPersistStats);
} else {
s = periodic_task_scheduler_.Register(
PeriodicTaskType::kPersistStats,
periodic_task_functions_.at(PeriodicTaskType::kPersistStats),
new_options.stats_persist_period_sec, true);
}
}
mutex_.Lock();
if (!s.ok()) {
return s;
}
write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1
? TableCache::kInfiniteCapacity
: new_options.max_open_files - 10);
wal_other_option_changed = mutable_db_options_.wal_bytes_per_sync !=
new_options.wal_bytes_per_sync;
wal_size_option_changed = mutable_db_options_.max_total_wal_size !=
new_options.max_total_wal_size;
mutable_db_options_ = new_options;
file_options_for_compaction_ = FileOptions(new_db_options);
file_options_for_compaction_ = fs_->OptimizeForCompactionTableWrite(
file_options_for_compaction_, immutable_db_options_);
versions_->UpdatedMutableDbOptions(mutable_db_options_, &mutex_);
file_options_for_compaction_ = fs_->OptimizeForCompactionTableRead(
file_options_for_compaction_, immutable_db_options_);
if (wal_other_option_changed || wal_size_option_changed) {
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
if (wal_other_option_changed ||
wals_total_size_.LoadRelaxed() > GetMaxTotalWalSize()) {
Status purge_wal_status = SwitchWAL(&write_context);
if (!purge_wal_status.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to purge WAL files in SetDBOptions() -- %s",
purge_wal_status.ToString().c_str());
}
}
write_thread_.ExitUnbatched(&w);
}
persist_options_status =
WriteOptionsFile(WriteOptions(), true );
} else {
persist_options_status.PermitUncheckedError();
}
}
ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
for (const auto& o : options_map) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
o.second.c_str());
}
if (s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
new_options.Dump(immutable_db_options_.info_log.get());
if (!persist_options_status.ok()) {
s = Status::IOError(
"SetDBOptions() succeeded, but unable to persist options",
persist_options_status.ToString());
}
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
}
LogFlush(immutable_db_options_.info_log);
return s;
}
int DBImpl::FindMinimumEmptyLevelFitting(
ColumnFamilyData* cfd, const MutableCFOptions& ,
int level) {
mutex_.AssertHeld();
const auto* vstorage = cfd->current()->storage_info();
int minimum_level = level;
for (int i = level - 1; i > 0; --i) {
if (vstorage->NumLevelFiles(i) > 0) {
break;
}
if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
break;
}
minimum_level = i;
}
return minimum_level;
}
Status DBImpl::FlushWAL(const FlushWALOptions& options) {
WriteOptions write_options;
write_options.rate_limiter_priority = options.rate_limiter_priority;
return FlushWAL(write_options, options.sync);
}
Status DBImpl::FlushWAL(const WriteOptions& write_options, bool sync) {
if (manual_wal_flush_) {
IOStatus io_s;
{
InstrumentedMutexLock wl(&wal_write_mutex_);
log::Writer* cur_log_writer = logs_.back().writer;
io_s = cur_log_writer->WriteBuffer(write_options);
}
if (!io_s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
io_s.ToString().c_str());
WALIOStatusCheck(io_s);
return static_cast<Status>(io_s);
}
if (!sync) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
return static_cast<Status>(io_s);
}
}
if (!sync) {
return Status::OK();
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
return SyncWAL();
}
bool DBImpl::WALBufferIsEmpty() {
InstrumentedMutexLock l(&wal_write_mutex_);
log::Writer* cur_log_writer = logs_.back().writer;
auto res = cur_log_writer->BufferIsEmpty();
return res;
}
Status DBImpl::GetOpenWalSizes(std::map<uint64_t, uint64_t>& number_to_size) {
assert(number_to_size.empty());
InstrumentedMutexLock l(&wal_write_mutex_);
for (auto& log : logs_) {
auto* open_file = log.writer->file();
if (open_file) {
number_to_size[log.number] = open_file->GetFlushedSize();
}
}
return Status::OK();
}
Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBImpl::SyncWAL:Begin");
WriteOptions write_options;
VersionEdit synced_wals;
Status s = SyncWalImpl(true, write_options,
nullptr, &synced_wals,
false);
if (s.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
const ReadOptions read_options;
s = ApplyWALToManifest(read_options, write_options, &synced_wals);
}
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
return s;
}
IOStatus DBImpl::SyncWalImpl(bool include_current_wal,
const WriteOptions& write_options,
JobContext* job_context, VersionEdit* synced_wals,
bool error_recovery_in_prog) {
autovector<log::Writer*, 1> wals_to_sync;
bool need_wal_dir_sync;
uint64_t maybe_active_number;
uint64_t up_to_number;
{
InstrumentedMutexLock l(&wal_write_mutex_);
assert(!logs_.empty());
maybe_active_number = cur_wal_number_;
up_to_number =
include_current_wal ? maybe_active_number : maybe_active_number - 1;
while (logs_.front().number <= up_to_number && logs_.front().IsSyncing()) {
wal_sync_cv_.Wait();
}
if (include_current_wal &&
!logs_.back().writer->file()->writable_file()->IsSyncThreadSafe()) {
return IOStatus::NotSupported(
"SyncWAL() is not supported for this implementation of WAL file",
immutable_db_options_.allow_mmap_writes
? "try setting Options::allow_mmap_writes to false"
: Slice());
}
for (auto it = logs_.begin();
it != logs_.end() && it->number <= up_to_number; ++it) {
auto& log = *it;
log.PrepareForSync();
if (log.writer->file()) {
wals_to_sync.push_back(log.writer);
}
}
need_wal_dir_sync = !wal_dir_synced_;
}
if (include_current_wal) {
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
}
RecordTick(stats_, WAL_FILE_SYNCED);
IOOptions opts;
IOStatus io_s = WritableFileWriter::PrepareIOOptions(write_options, opts);
std::list<log::Writer*> wals_internally_closed;
if (io_s.ok()) {
for (log::Writer* log : wals_to_sync) {
if (job_context) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
log->get_log_number());
}
if (error_recovery_in_prog) {
log->file()->reset_seen_error();
}
if (log->get_log_number() >= maybe_active_number) {
assert(log->get_log_number() == maybe_active_number);
io_s = log->file()->SyncWithoutFlush(opts,
immutable_db_options_.use_fsync);
} else {
io_s = log->file()->Sync(opts, immutable_db_options_.use_fsync);
}
if (!io_s.ok()) {
break;
}
if (log->get_log_number() < maybe_active_number &&
(immutable_db_options_.recycle_log_file_num > 0 ||
!immutable_db_options_.background_close_inactive_wals)) {
if (error_recovery_in_prog) {
log->file()->reset_seen_error();
}
io_s = log->file()->Close(opts);
wals_internally_closed.push_back(log);
if (!io_s.ok()) {
break;
}
}
}
}
if (!io_s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL Sync error %s",
io_s.ToString().c_str());
WALIOStatusCheck(io_s);
}
if (io_s.ok() && need_wal_dir_sync) {
io_s = directories_.GetWalDir()->FsyncWithDirOptions(
IOOptions(), nullptr,
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
}
if (include_current_wal) {
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
} else {
TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedWals:BeforeReLock",
nullptr);
}
{
InstrumentedMutexLock l(&wal_write_mutex_);
for (auto* wal : wals_internally_closed) {
bool was_closed = wal->PublishIfClosed();
assert(was_closed);
(void)was_closed;
}
if (io_s.ok()) {
MarkLogsSynced(up_to_number, need_wal_dir_sync, synced_wals);
} else {
MarkLogsNotSynced(up_to_number);
}
}
return io_s;
}
Status DBImpl::ApplyWALToManifest(const ReadOptions& read_options,
const WriteOptions& write_options,
VersionEdit* synced_wals) {
mutex_.AssertHeld();
Status status = versions_->LogAndApplyToDefaultColumnFamily(
read_options, write_options, synced_wals, &mutex_,
directories_.GetDbDir());
if (!status.ok() && versions_->io_status().IsIOError()) {
error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
return status;
}
Status DBImpl::LockWAL() {
{
InstrumentedMutexLock lock(&mutex_);
if (lock_wal_count_ > 0) {
assert(lock_wal_write_token_);
++lock_wal_count_;
} else {
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
if (lock_wal_count_ == 0) {
assert(!lock_wal_write_token_);
lock_wal_write_token_ = write_controller_.GetStopToken();
}
++lock_wal_count_;
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);
}
}
Status s = FlushWAL(false);
if (!s.ok()) {
UnlockWAL().PermitUncheckedError();
}
return s;
}
Status DBImpl::UnlockWAL() {
bool signal = false;
uint64_t maybe_stall_begun_count = 0;
uint64_t nonmem_maybe_stall_begun_count = 0;
{
InstrumentedMutexLock lock(&mutex_);
if (lock_wal_count_ == 0) {
return Status::Aborted("No LockWAL() in effect");
}
--lock_wal_count_;
if (lock_wal_count_ == 0) {
lock_wal_write_token_.reset();
signal = true;
maybe_stall_begun_count = write_thread_.GetBegunCountOfOutstandingStall();
if (two_write_queues_) {
nonmem_maybe_stall_begun_count =
nonmem_write_thread_.GetBegunCountOfOutstandingStall();
}
}
}
if (signal) {
bg_cv_.SignalAll();
}
if (maybe_stall_begun_count) {
write_thread_.WaitForStallEndedCount(maybe_stall_begun_count);
}
if (nonmem_maybe_stall_begun_count) {
nonmem_write_thread_.WaitForStallEndedCount(nonmem_maybe_stall_begun_count);
}
return Status::OK();
}
void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
VersionEdit* synced_wals) {
wal_write_mutex_.AssertHeld();
if (synced_dir && cur_wal_number_ == up_to) {
wal_dir_synced_ = true;
}
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it;
assert(wal.IsSyncing());
if (wal.number < logs_.back().number) {
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.GetPreSyncSize() > 0) {
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}
if (wal.writer->file() == nullptr ||
(immutable_db_options_.background_close_inactive_wals &&
wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize())) {
wals_to_free_.push_back(wal.ReleaseWriter());
it = logs_.erase(it);
} else {
wal.FinishSync();
++it;
}
} else {
assert(wal.number == logs_.back().number);
wal.FinishSync();
++it;
}
}
wal_sync_cv_.SignalAll();
}
void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
wal_write_mutex_.AssertHeld();
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
wal.FinishSync();
}
wal_sync_cv_.SignalAll();
}
SequenceNumber DBImpl::GetLatestSequenceNumber() const {
return versions_->LastSequence();
}
void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
versions_->SetLastPublishedSequence(seq);
}
Status DBImpl::GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
std::string* ts_low) {
if (ts_low == nullptr) {
return Status::InvalidArgument("ts_low is nullptr");
}
ColumnFamilyData* cfd = nullptr;
if (column_family == nullptr) {
cfd = default_cf_handle_->cfd();
} else {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
assert(cfh != nullptr);
cfd = cfh->cfd();
}
assert(cfd != nullptr && cfd->user_comparator() != nullptr);
if (cfd->user_comparator()->timestamp_size() == 0) {
return Status::InvalidArgument(
"Timestamp is not enabled in this column family");
}
InstrumentedMutexLock l(&mutex_);
*ts_low = cfd->GetFullHistoryTsLow();
assert(ts_low->empty() ||
cfd->user_comparator()->timestamp_size() == ts_low->size());
return Status::OK();
}
Status DBImpl::GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family,
std::string* newest_timestamp) {
if (newest_timestamp == nullptr) {
return Status::InvalidArgument("newest_timestamp is nullptr");
}
ColumnFamilyData* cfd = nullptr;
if (column_family == nullptr) {
cfd = default_cf_handle_->cfd();
} else {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
assert(cfh != nullptr);
cfd = cfh->cfd();
}
assert(cfd != nullptr && cfd->user_comparator() != nullptr);
if (cfd->user_comparator()->timestamp_size() == 0) {
return Status::InvalidArgument(
"Timestamp is not enabled in this column family");
}
if (cfd->ioptions().persist_user_defined_timestamps) {
return Status::NotSupported(
"GetNewestUserDefinedTimestamp doesn't support the case when user"
"defined timestamps are persisted.");
}
Status status;
SuperVersion* sv = GetAndRefSuperVersion(cfd);
{
InstrumentedMutexLock l(&mutex_);
bool enter_write_thread = sv->mem == cfd->mem();
WriteThread::Writer w;
if (enter_write_thread) {
write_thread_.EnterUnbatched(&w, &mutex_);
WaitForPendingWrites();
}
*newest_timestamp = sv->mem->GetNewestUDT().ToString();
assert(!newest_timestamp->empty() || sv->mem->IsEmpty());
if (enter_write_thread) {
write_thread_.ExitUnbatched(&w);
}
}
if (newest_timestamp->empty()) {
*newest_timestamp = sv->imm->GetNewestUDT().ToString();
}
if (newest_timestamp->empty() && sv->current->GetSstFilesSize() != 0) {
Slice full_history_ts_low = sv->full_history_ts_low;
if (!full_history_ts_low.empty()) {
GetU64CutoffTsFromFullHistoryTsLow(&full_history_ts_low,
newest_timestamp);
}
}
ReturnAndCleanupSuperVersion(cfd, sv);
return status;
}
InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
Arena* arena,
SequenceNumber sequence,
ColumnFamilyHandle* column_family,
bool allow_unprepared_value) {
ColumnFamilyData* cfd;
if (column_family == nullptr) {
cfd = default_cf_handle_->cfd();
} else {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
cfd = cfh->cfd();
}
mutex_.Lock();
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
mutex_.Unlock();
return NewInternalIterator(read_options, cfd, super_version, arena, sequence,
allow_unprepared_value);
}
void DBImpl::SchedulePurge() {
mutex_.AssertHeld();
assert(opened_successfully_);
bg_purge_scheduled_++;
env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
}
void DBImpl::BackgroundCallPurge() {
TEST_SYNC_POINT("DBImpl::BackgroundCallPurge:beforeMutexLock");
mutex_.Lock();
while (!wals_to_free_queue_.empty()) {
assert(!wals_to_free_queue_.empty());
log::Writer* log_writer = *(wals_to_free_queue_.begin());
wals_to_free_queue_.pop_front();
mutex_.Unlock();
delete log_writer;
mutex_.Lock();
}
while (!superversions_to_free_queue_.empty()) {
assert(!superversions_to_free_queue_.empty());
SuperVersion* sv = superversions_to_free_queue_.front();
superversions_to_free_queue_.pop_front();
mutex_.Unlock();
delete sv;
mutex_.Lock();
}
assert(bg_purge_scheduled_ > 0);
while (!purge_files_.empty()) {
auto it = purge_files_.begin();
PurgeFileInfo purge_file = it->second;
const std::string& fname = purge_file.fname;
const std::string& dir_to_sync = purge_file.dir_to_sync;
FileType type = purge_file.type;
uint64_t number = purge_file.number;
int job_id = purge_file.job_id;
purge_files_.erase(it);
mutex_.Unlock();
DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
mutex_.Lock();
}
bg_purge_scheduled_--;
bg_cv_.SignalAll();
mutex_.Unlock();
}
namespace {
struct SuperVersionHandle {
SuperVersionHandle(DBImpl* _db, InstrumentedMutex* _mu,
SuperVersion* _super_version, bool _background_purge)
: db(_db),
mu(_mu),
super_version(_super_version),
background_purge(_background_purge) {}
DBImpl* db;
InstrumentedMutex* mu;
SuperVersion* super_version;
bool background_purge;
};
static void CleanupSuperVersionHandle(void* arg1, void* ) {
SuperVersionHandle* sv_handle = static_cast<SuperVersionHandle*>(arg1);
if (sv_handle->super_version->Unref()) {
JobContext job_context(0);
sv_handle->mu->Lock();
sv_handle->super_version->Cleanup();
sv_handle->db->FindObsoleteFiles(&job_context, false, true);
if (sv_handle->background_purge) {
sv_handle->db->ScheduleBgLogWriterClose(&job_context);
sv_handle->db->AddSuperVersionsToFreeQueue(sv_handle->super_version);
sv_handle->db->SchedulePurge();
}
sv_handle->mu->Unlock();
if (!sv_handle->background_purge) {
delete sv_handle->super_version;
}
if (job_context.HaveSomethingToDelete()) {
sv_handle->db->PurgeObsoleteFiles(job_context,
sv_handle->background_purge);
}
job_context.Clean();
}
delete sv_handle;
}
struct GetMergeOperandsState {
MergeContext merge_context;
PinnedIteratorsManager pinned_iters_mgr;
SuperVersionHandle* sv_handle;
};
static void CleanupGetMergeOperandsState(void* arg1, void* ) {
GetMergeOperandsState* state = static_cast<GetMergeOperandsState*>(arg1);
CleanupSuperVersionHandle(state->sv_handle , nullptr );
delete state;
}
}
InternalIterator* DBImpl::NewInternalIterator(
const ReadOptions& read_options, ColumnFamilyData* cfd,
SuperVersion* super_version, Arena* arena, SequenceNumber sequence,
bool allow_unprepared_value, ArenaWrappedDBIter* db_iter) {
InternalIterator* internal_iter;
assert(arena != nullptr);
auto prefix_extractor =
super_version->mutable_cf_options.prefix_extractor.get();
MergeIteratorBuilder merge_iter_builder(
&cfd->internal_comparator(), arena,
!read_options.total_order_seek && prefix_extractor != nullptr,
read_options.iterate_upper_bound);
auto mem_iter = super_version->mem->NewIterator(
read_options, super_version->GetSeqnoToTimeMapping(), arena,
super_version->mutable_cf_options.prefix_extractor.get(),
false);
Status s;
if (!read_options.ignore_range_deletions) {
std::unique_ptr<TruncatedRangeDelIterator> mem_tombstone_iter;
auto range_del_iter = super_version->mem->NewRangeTombstoneIterator(
read_options, sequence, false );
if (range_del_iter == nullptr || range_del_iter->empty()) {
delete range_del_iter;
} else {
mem_tombstone_iter = std::make_unique<TruncatedRangeDelIterator>(
std::unique_ptr<FragmentedRangeTombstoneIterator>(range_del_iter),
&cfd->ioptions().internal_comparator, nullptr ,
nullptr );
}
merge_iter_builder.AddPointAndTombstoneIterator(
mem_iter, std::move(mem_tombstone_iter));
} else {
merge_iter_builder.AddIterator(mem_iter);
}
if (s.ok()) {
super_version->imm->AddIterators(
read_options, super_version->GetSeqnoToTimeMapping(),
super_version->mutable_cf_options.prefix_extractor.get(),
&merge_iter_builder, !read_options.ignore_range_deletions);
}
TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
if (s.ok()) {
if (read_options.read_tier != kMemtableTier) {
super_version->current->AddIterators(read_options, file_options_,
&merge_iter_builder,
allow_unprepared_value);
}
internal_iter = merge_iter_builder.Finish(
read_options.ignore_range_deletions ? nullptr : db_iter);
SuperVersionHandle* cleanup = new SuperVersionHandle(
this, &mutex_, super_version,
read_options.background_purge_on_iterator_cleanup ||
immutable_db_options_.avoid_unnecessary_blocking_io);
internal_iter->RegisterCleanup(CleanupSuperVersionHandle, cleanup, nullptr);
return internal_iter;
} else {
CleanupSuperVersion(super_version);
}
return NewErrorInternalIterator<Slice>(s, arena);
}
ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
return default_cf_handle_;
}
ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
return persist_stats_cf_handle_;
}
Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
return GetImpl(read_options, column_family, key, value,
nullptr);
}
Status DBImpl::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) {
assert(value != nullptr);
value->Reset();
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
Status s = GetImpl(read_options, column_family, key, value, timestamp);
return s;
}
Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) {
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = value;
get_impl_options.timestamp = timestamp;
Status s = GetImpl(read_options, key, get_impl_options);
return s;
}
Status DBImpl::GetEntity(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableWideColumns* columns) {
if (!column_family) {
return Status::InvalidArgument(
"Cannot call GetEntity without a column family handle");
}
if (!columns) {
return Status::InvalidArgument(
"Cannot call GetEntity without a PinnableWideColumns object");
}
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGetEntity) {
return Status::InvalidArgument(
"Can only call GetEntity with `ReadOptions::io_activity` set to "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGetEntity`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGetEntity;
}
columns->Reset();
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.columns = columns;
return GetImpl(read_options, key, get_impl_options);
}
Status DBImpl::GetEntity(const ReadOptions& _read_options, const Slice& key,
PinnableAttributeGroups* result) {
if (!result) {
return Status::InvalidArgument(
"Cannot call GetEntity without PinnableAttributeGroups object");
}
Status s;
const size_t num_column_families = result->size();
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGetEntity) {
s = Status::InvalidArgument(
"Can only call GetEntity with `ReadOptions::io_activity` set to "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGetEntity`");
for (size_t i = 0; i < num_column_families; ++i) {
(*result)[i].SetStatus(s);
}
return s;
}
if (num_column_families == 0) {
return s;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGetEntity;
}
std::vector<Slice> keys;
std::vector<ColumnFamilyHandle*> column_families;
for (size_t i = 0; i < num_column_families; ++i) {
if (!(*result)[i].column_family()) {
s = Status::InvalidArgument(
"DB failed to query because one or more group(s) have null column "
"family handle");
(*result)[i].SetStatus(
Status::InvalidArgument("Column family handle cannot be null"));
break;
}
keys.emplace_back(key);
column_families.emplace_back((*result)[i].column_family());
}
if (!s.ok()) {
for (size_t i = 0; i < num_column_families; ++i) {
if ((*result)[i].status().ok()) {
(*result)[i].SetStatus(
Status::Incomplete("DB not queried due to invalid argument(s) in "
"one or more of the attribute groups"));
}
}
return s;
}
std::vector<PinnableWideColumns> columns(num_column_families);
std::vector<Status> statuses(num_column_families);
MultiGetCommon(
read_options, num_column_families, column_families.data(), keys.data(),
nullptr, columns.data(),
nullptr, statuses.data(), false);
for (size_t i = 0; i < num_column_families; ++i) {
(*result)[i].Reset();
(*result)[i].SetStatus(statuses[i]);
(*result)[i].SetColumns(std::move(columns[i]));
}
return s;
}
bool DBImpl::ShouldReferenceSuperVersion(const MergeContext& merge_context) {
static const size_t kNumBytesForSvRef = 32768;
static const size_t kLog2AvgBytesForSvRef = 8;
size_t num_bytes = 0;
for (const Slice& sl : merge_context.GetOperands()) {
num_bytes += sl.size();
}
return num_bytes >= kNumBytesForSvRef &&
(num_bytes >> kLog2AvgBytesForSvRef) >=
merge_context.GetOperands().size();
}
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.merge_operands != nullptr ||
get_impl_options.columns != nullptr);
assert(get_impl_options.column_family);
if (read_options.timestamp) {
const Status s = FailIfTsMismatchCf(get_impl_options.column_family,
*(read_options.timestamp));
if (!s.ok()) {
return s;
}
} else {
const Status s = FailIfCfHasTs(get_impl_options.column_family);
if (!s.ok()) {
return s;
}
}
if (get_impl_options.timestamp) {
get_impl_options.timestamp->clear();
}
GetWithTimestampReadCallback read_cb(0);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
get_impl_options.column_family);
auto cfd = cfh->cfd();
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->Get(get_impl_options.column_family, key).PermitUncheckedError();
}
}
if (get_impl_options.get_merge_operands_options != nullptr) {
for (int i = 0; i < get_impl_options.get_merge_operands_options
->expected_max_number_of_operands;
++i) {
get_impl_options.merge_operands[i].Reset();
}
}
SuperVersion* sv = GetAndRefSuperVersion(cfd);
if (read_options.timestamp && read_options.timestamp->size() > 0) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
ReturnAndCleanupSuperVersion(cfd, sv);
return s;
}
}
TEST_SYNC_POINT_CALLBACK("DBImpl::GetImpl:AfterAcquireSv", nullptr);
TEST_SYNC_POINT("DBImpl::GetImpl:1");
TEST_SYNC_POINT("DBImpl::GetImpl:2");
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
if (get_impl_options.callback) {
snapshot = get_impl_options.callback->max_visible_seq();
} else {
snapshot =
static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
}
} else {
snapshot = GetLastPublishedSequence();
if (get_impl_options.callback) {
get_impl_options.callback->Refresh(snapshot);
snapshot = get_impl_options.callback->max_visible_seq();
}
}
SaveAndRestore<ReadCallback*> restore_callback(&get_impl_options.callback);
const Comparator* ucmp = get_impl_options.column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() > 0) {
assert(!get_impl_options
.callback); read_cb.Refresh(snapshot);
get_impl_options.callback = &read_cb;
}
TEST_SYNC_POINT("DBImpl::GetImpl:3");
TEST_SYNC_POINT("DBImpl::GetImpl:4");
MergeContext merge_context;
merge_context.get_merge_operands_options =
get_impl_options.get_merge_operands_options;
SequenceNumber max_covering_tombstone_seq = 0;
Status s;
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);
bool skip_memtable = (read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
std::string* timestamp =
ucmp->timestamp_size() > 0 ? get_impl_options.timestamp : nullptr;
if (!skip_memtable) {
if (get_impl_options.get_value) {
if (sv->mem->Get(
lkey,
get_impl_options.value ? get_impl_options.value->GetSelf()
: nullptr,
get_impl_options.columns, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false , get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
}
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey,
get_impl_options.value
? get_impl_options.value->GetSelf()
: nullptr,
get_impl_options.columns, timestamp, &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
}
RecordTick(stats_, MEMTABLE_HIT);
}
} else {
if (sv->mem->Get(lkey, nullptr, nullptr,
nullptr, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false , nullptr, nullptr,
false)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->GetMergeOperands(lkey, &s, &merge_context,
&max_covering_tombstone_seq,
read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
}
}
if (!s.ok() && !s.IsMergeInProgress() && !s.IsNotFound()) {
assert(done);
ReturnAndCleanupSuperVersion(cfd, sv);
return s;
}
}
TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:0");
TEST_SYNC_POINT("DBImpl::GetImpl:PostMemTableGet:1");
PinnedIteratorsManager pinned_iters_mgr;
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(
read_options, lkey, get_impl_options.value, get_impl_options.columns,
timestamp, &s, &merge_context, &max_covering_tombstone_seq,
&pinned_iters_mgr,
get_impl_options.get_value ? get_impl_options.value_found : nullptr,
nullptr, nullptr,
get_impl_options.get_value ? get_impl_options.callback : nullptr,
get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
get_impl_options.get_value);
RecordTick(stats_, MEMTABLE_MISS);
}
{
PERF_TIMER_GUARD(get_post_process_time);
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = 0;
if (s.ok()) {
const auto& merge_threshold = read_options.merge_operand_count_threshold;
if (merge_threshold.has_value() &&
merge_context.GetNumOperands() > merge_threshold.value()) {
s = Status::OkMergeOperandThresholdExceeded();
}
if (get_impl_options.get_value) {
if (get_impl_options.value) {
size = get_impl_options.value->size();
} else if (get_impl_options.columns) {
size = get_impl_options.columns->serialized_size();
}
} else {
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
assert(*get_impl_options.number_of_operands > 0);
if (*get_impl_options.number_of_operands >
get_impl_options.get_merge_operands_options
->expected_max_number_of_operands) {
s = Status::Incomplete(
Status::SubCode::KMergeOperandsInsufficientCapacity);
} else {
bool ref_sv = ShouldReferenceSuperVersion(merge_context);
if (ref_sv) {
assert(!merge_context.GetOperands().empty());
SharedCleanablePtr shared_cleanable;
GetMergeOperandsState* state = nullptr;
state = new GetMergeOperandsState();
state->merge_context = std::move(merge_context);
state->pinned_iters_mgr = std::move(pinned_iters_mgr);
sv->Ref();
state->sv_handle = new SuperVersionHandle(
this, &mutex_, sv,
immutable_db_options_.avoid_unnecessary_blocking_io);
shared_cleanable.Allocate();
shared_cleanable->RegisterCleanup(CleanupGetMergeOperandsState,
state ,
nullptr );
for (size_t i = 0; i < state->merge_context.GetOperands().size();
++i) {
const Slice& sl = state->merge_context.GetOperands()[i];
size += sl.size();
get_impl_options.merge_operands->PinSlice(
sl, nullptr );
if (i == state->merge_context.GetOperands().size() - 1) {
shared_cleanable.MoveAsCleanupTo(
get_impl_options.merge_operands);
} else {
shared_cleanable.RegisterCopyWith(
get_impl_options.merge_operands);
}
get_impl_options.merge_operands++;
}
} else {
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
}
}
RecordTick(stats_, BYTES_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
}
ReturnAndCleanupSuperVersion(cfd, sv);
RecordInHistogram(stats_, BYTES_PER_READ, size);
}
return s;
}
template <class T, typename IterDerefFuncType>
Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
ReadCallback* callback,
IterDerefFuncType iter_deref_func, T* cf_list,
bool extra_sv_ref, SequenceNumber* snapshot,
bool* sv_from_thread_local) {
PERF_TIMER_GUARD(get_snapshot_time);
assert(sv_from_thread_local);
*sv_from_thread_local = true;
Status s = Status::OK();
const bool check_read_ts =
read_options.timestamp && read_options.timestamp->size() > 0;
const auto sv_cleanup_func = [&]() -> void {
for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
++cf_iter) {
auto node = iter_deref_func(cf_iter);
SuperVersion* super_version = node->super_version;
ColumnFamilyData* cfd = node->cfd;
if (super_version != nullptr) {
if (*sv_from_thread_local && !extra_sv_ref) {
ReturnAndCleanupSuperVersion(cfd, super_version);
} else {
CleanupSuperVersion(super_version);
}
}
node->super_version = nullptr;
}
};
bool acquire_mutex = false;
if (cf_list->size() == 1) {
auto cf_iter = cf_list->begin();
auto node = iter_deref_func(cf_iter);
if (extra_sv_ref) {
node->super_version = node->cfd->GetReferencedSuperVersion(this);
} else {
node->super_version = GetAndRefSuperVersion(node->cfd);
}
if (check_read_ts) {
s = FailIfReadCollapsedHistory(node->cfd, node->super_version,
*(read_options.timestamp));
}
if (s.ok() && read_options.snapshot != nullptr) {
*snapshot =
static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
if (callback) {
*snapshot = std::max(*snapshot, callback->max_visible_seq());
}
} else if (s.ok()) {
*snapshot = GetLastPublishedSequence();
}
} else {
constexpr int num_retries = 3;
for (int i = 0; i < num_retries; ++i) {
acquire_mutex = ((i == num_retries - 1) && !read_options.snapshot) ||
read_options.read_tier == kPersistedTier;
bool retry = false;
if (i > 0) {
sv_cleanup_func();
}
if (read_options.snapshot == nullptr) {
*snapshot = GetLastPublishedSequence();
} else {
*snapshot =
static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
->number_;
}
if (acquire_mutex) {
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::LastTry");
mutex_.Lock();
}
for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
++cf_iter) {
auto node = iter_deref_func(cf_iter);
if (!acquire_mutex) {
if (extra_sv_ref) {
node->super_version = node->cfd->GetReferencedSuperVersion(this);
} else {
node->super_version = GetAndRefSuperVersion(node->cfd);
}
} else {
node->super_version = node->cfd->GetSuperVersion()->Ref();
}
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterRefSV");
if (check_read_ts) {
s = FailIfReadCollapsedHistory(node->cfd, node->super_version,
*(read_options.timestamp));
if (!s.ok()) {
retry = false;
break;
}
}
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeCheckingSnapshot");
if (read_options.snapshot != nullptr || acquire_mutex) {
continue;
}
if (!acquire_mutex) {
SequenceNumber seq =
node->super_version->mem->GetEarliestSequenceNumber();
if (seq > *snapshot) {
retry = true;
break;
}
}
}
if (!retry) {
if (acquire_mutex) {
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterLastTryRefSV");
}
break;
}
assert(!acquire_mutex);
}
}
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot:AfterGetSeqNum1");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot:AfterGetSeqNum2");
PERF_TIMER_STOP(get_snapshot_time);
*sv_from_thread_local = !acquire_mutex;
if (!s.ok()) {
sv_cleanup_func();
}
return s;
}
void DBImpl::MultiGet(const ReadOptions& _read_options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses, const bool sorted_input) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
if (statuses[i].ok()) {
statuses[i] = s;
}
}
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
MultiGetCommon(read_options, num_keys, column_families, keys, values,
nullptr, timestamps, statuses, sorted_input);
}
void DBImpl::MultiGetCommon(const ReadOptions& read_options,
const size_t num_keys,
ColumnFamilyHandle** column_families,
const Slice* keys, PinnableSlice* values,
PinnableWideColumns* columns,
std::string* timestamps, Status* statuses,
const bool sorted_input) {
if (num_keys == 0) {
return;
}
bool should_fail = false;
for (size_t i = 0; i < num_keys; ++i) {
ColumnFamilyHandle* cfh = column_families[i];
if (read_options.timestamp) {
statuses[i] = FailIfTsMismatchCf(cfh, *(read_options.timestamp));
if (!statuses[i].ok()) {
should_fail = true;
}
} else {
statuses[i] = FailIfCfHasTs(cfh);
if (!statuses[i].ok()) {
should_fail = true;
}
}
}
if (should_fail) {
for (size_t i = 0; i < num_keys; ++i) {
if (statuses[i].ok()) {
statuses[i] = Status::Incomplete(
"DB not queried due to invalid argument(s) in the same MultiGet");
}
}
return;
}
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->MultiGet(num_keys, column_families, keys).PermitUncheckedError();
}
}
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
PinnableSlice* val = nullptr;
PinnableWideColumns* col = nullptr;
if (values) {
val = &values[i];
val->Reset();
} else {
assert(columns);
col = &columns[i];
col->Reset();
}
key_context.emplace_back(column_families[i], keys[i], val, col,
timestamps ? ×tamps[i] : nullptr,
&statuses[i]);
}
for (size_t i = 0; i < num_keys; ++i) {
sorted_keys[i] = &key_context[i];
}
PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
autovector<MultiGetKeyRangePerCf, MultiGetContext::MAX_BATCH_SIZE>
key_range_per_cf;
autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
cf_sv_pairs;
size_t cf_start = 0;
ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
for (size_t i = 0; i < num_keys; ++i) {
KeyContext* key_ctx = sorted_keys[i];
if (key_ctx->column_family != cf) {
key_range_per_cf.emplace_back(cf_start, i - cf_start);
cf_sv_pairs.emplace_back(cf, nullptr);
cf_start = i;
cf = key_ctx->column_family;
}
}
key_range_per_cf.emplace_back(cf_start, num_keys - cf_start);
cf_sv_pairs.emplace_back(cf, nullptr);
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local = false;
Status s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr,
[](autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
},
&cf_sv_pairs,
false, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
for (size_t i = 0; i < num_keys; ++i) {
if (statuses[i].ok()) {
statuses[i] = s;
}
}
return;
}
GetWithTimestampReadCallback timestamp_read_callback(0);
ReadCallback* read_callback = nullptr;
if (read_options.timestamp && read_options.timestamp->size() > 0) {
timestamp_read_callback.Refresh(consistent_seqnum);
read_callback = ×tamp_read_callback;
}
assert(key_range_per_cf.size() == cf_sv_pairs.size());
auto key_range_per_cf_iter = key_range_per_cf.begin();
auto cf_sv_pair_iter = cf_sv_pairs.begin();
while (key_range_per_cf_iter != key_range_per_cf.end() &&
cf_sv_pair_iter != cf_sv_pairs.end()) {
s = MultiGetImpl(read_options, key_range_per_cf_iter->start,
key_range_per_cf_iter->num_keys, &sorted_keys,
cf_sv_pair_iter->super_version, consistent_seqnum,
read_callback);
if (!s.ok()) {
break;
}
++key_range_per_cf_iter;
++cf_sv_pair_iter;
}
if (!s.ok()) {
assert(s.IsTimedOut() || s.IsAborted());
for (++key_range_per_cf_iter;
key_range_per_cf_iter != key_range_per_cf.end();
++key_range_per_cf_iter) {
for (size_t i = key_range_per_cf_iter->start;
i < key_range_per_cf_iter->start + key_range_per_cf_iter->num_keys;
++i) {
*sorted_keys[i]->s = s;
}
}
}
for (const auto& cf_sv_pair : cf_sv_pairs) {
if (sv_from_thread_local) {
ReturnAndCleanupSuperVersion(cf_sv_pair.cfd, cf_sv_pair.super_version);
} else {
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV");
CleanupSuperVersion(cf_sv_pair.super_version);
}
}
}
namespace {
struct CompareKeyContext {
inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
ColumnFamilyHandleImpl* cfh =
static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
uint32_t cfd_id1 = cfh->cfd()->GetID();
const Comparator* comparator = cfh->cfd()->user_comparator();
cfh = static_cast<ColumnFamilyHandleImpl*>(rhs->column_family);
uint32_t cfd_id2 = cfh->cfd()->GetID();
if (cfd_id1 < cfd_id2) {
return true;
} else if (cfd_id1 > cfd_id2) {
return false;
}
int cmp = comparator->CompareWithoutTimestamp(
*(lhs->key), false, *(rhs->key), false);
if (cmp < 0) {
return true;
}
return false;
}
};
}
void DBImpl::PrepareMultiGetKeys(
size_t num_keys, bool sorted_input,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
if (sorted_input) {
#ifndef NDEBUG
assert(std::is_sorted(sorted_keys->begin(), sorted_keys->end(),
CompareKeyContext()));
#endif
return;
}
std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
CompareKeyContext());
}
void DB::MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses, const bool sorted_input) {
if (num_keys > MultiGetContext::MAX_BATCH_SIZE) {
std::vector<ColumnFamilyHandle*> column_families(num_keys, column_family);
MultiGet(options, num_keys, column_families.data(), keys, values,
timestamps, statuses, sorted_input);
} else {
std::array<ColumnFamilyHandle*, MultiGetContext::MAX_BATCH_SIZE>
column_families;
std::fill(column_families.begin(), column_families.begin() + num_keys,
column_family);
MultiGet(options, num_keys, column_families.data(), keys, values,
timestamps, statuses, sorted_input);
}
}
void DBImpl::MultiGetCommon(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, PinnableWideColumns* columns,
std::string* timestamps, Status* statuses,
bool sorted_input) {
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->MultiGet(num_keys, column_family, keys).PermitUncheckedError();
}
}
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
PinnableSlice* val = nullptr;
PinnableWideColumns* col = nullptr;
if (values) {
val = &values[i];
val->Reset();
} else {
assert(columns);
col = &columns[i];
col->Reset();
}
key_context.emplace_back(column_family, keys[i], val, col,
timestamps ? ×tamps[i] : nullptr,
&statuses[i]);
}
for (size_t i = 0; i < num_keys; ++i) {
sorted_keys[i] = &key_context[i];
}
PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
MultiGetWithCallbackImpl(read_options, column_family, nullptr, &sorted_keys);
}
void DBImpl::MultiGetWithCallback(
const ReadOptions& _read_options, ColumnFamilyHandle* column_family,
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
assert(false);
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
MultiGetWithCallbackImpl(read_options, column_family, callback, sorted_keys);
}
void DBImpl::MultiGetWithCallbackImpl(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
std::array<ColumnFamilySuperVersionPair, 1> cf_sv_pairs;
cf_sv_pairs[0] = ColumnFamilySuperVersionPair(column_family, nullptr);
size_t num_keys = sorted_keys->size();
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local = false;
Status s = MultiCFSnapshot<std::array<ColumnFamilySuperVersionPair, 1>>(
read_options, callback,
[](std::array<ColumnFamilySuperVersionPair, 1>::iterator& cf_iter) {
return &(*cf_iter);
},
&cf_sv_pairs,
false, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
return;
}
#ifndef NDEBUG
assert(sv_from_thread_local);
#else
(void)sv_from_thread_local;
#endif
if (callback && read_options.snapshot == nullptr) {
callback->Refresh(consistent_seqnum);
consistent_seqnum = callback->max_visible_seq();
}
GetWithTimestampReadCallback timestamp_read_callback(0);
ReadCallback* read_callback = callback;
if (read_options.timestamp && read_options.timestamp->size() > 0) {
assert(!read_callback); timestamp_read_callback.Refresh(consistent_seqnum);
read_callback = ×tamp_read_callback;
}
s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
cf_sv_pairs[0].super_version, consistent_seqnum,
read_callback);
assert(s.ok() || s.IsTimedOut() || s.IsAborted());
ReturnAndCleanupSuperVersion(cf_sv_pairs[0].cfd,
cf_sv_pairs[0].super_version);
}
Status DBImpl::MultiGetImpl(
const ReadOptions& read_options, size_t start_key, size_t num_keys,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
SuperVersion* super_version, SequenceNumber snapshot,
ReadCallback* callback) {
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
assert(sorted_keys);
assert(start_key + num_keys <= sorted_keys->size());
for (size_t i = start_key; i < start_key + num_keys; ++i) {
KeyContext* kctx = (*sorted_keys)[i];
if (kctx->timestamp) {
kctx->timestamp->clear();
}
}
size_t keys_left = num_keys;
Status s;
uint64_t curr_value_size = 0;
while (keys_left) {
if (read_options.deadline.count() &&
immutable_db_options_.clock->NowMicros() >
static_cast<uint64_t>(read_options.deadline.count())) {
s = Status::TimedOut();
break;
}
size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
? MultiGetContext::MAX_BATCH_SIZE
: keys_left;
MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
batch_size, snapshot, read_options, GetFileSystem(),
stats_);
MultiGetRange range = ctx.GetMultiGetRange();
range.AddValueSize(curr_value_size);
bool lookup_current = true;
keys_left -= batch_size;
for (auto mget_iter = range.begin(); mget_iter != range.end();
++mget_iter) {
mget_iter->merge_context.Clear();
*mget_iter->s = Status::OK();
}
bool skip_memtable =
(read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
if (!skip_memtable) {
super_version->mem->MultiGet(read_options, &range, callback,
false );
if (!range.empty()) {
super_version->imm->MultiGet(read_options, &range, callback);
}
if (!range.empty()) {
uint64_t left = range.KeysLeft();
RecordTick(stats_, MEMTABLE_MISS, left);
} else {
lookup_current = false;
}
}
if (lookup_current) {
PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->MultiGet(read_options, &range, callback);
}
curr_value_size = range.GetValueSize();
if (curr_value_size > read_options.value_size_soft_limit) {
s = Status::Aborted();
break;
}
bool aborted = ROCKSDB_THREAD_YIELD_CHECK_ABORT();
if (aborted) {
s = Status::Aborted("Query abort.");
break;
}
}
PERF_TIMER_GUARD(get_post_process_time);
size_t num_found = 0;
uint64_t bytes_read = 0;
for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) {
KeyContext* key = (*sorted_keys)[i];
assert(key);
assert(key->s);
if (key->s->ok()) {
const auto& merge_threshold = read_options.merge_operand_count_threshold;
if (merge_threshold.has_value() &&
key->merge_context.GetNumOperands() > merge_threshold) {
*(key->s) = Status::OkMergeOperandThresholdExceeded();
}
if (key->value) {
bytes_read += key->value->size();
} else {
assert(key->columns);
bytes_read += key->columns->serialized_size();
}
num_found++;
}
}
if (keys_left) {
assert(s.IsTimedOut() || s.IsAborted());
for (size_t i = start_key + num_keys - keys_left; i < start_key + num_keys;
++i) {
KeyContext* key = (*sorted_keys)[i];
*key->s = s;
}
}
RecordTick(stats_, NUMBER_MULTIGET_CALLS);
RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
PERF_TIMER_STOP(get_post_process_time);
return s;
}
void DBImpl::MultiGetEntity(const ReadOptions& _read_options, size_t num_keys,
ColumnFamilyHandle** column_families,
const Slice* keys, PinnableWideColumns* results,
Status* statuses, bool sorted_input) {
assert(statuses);
if (!column_families) {
const Status s = Status::InvalidArgument(
"Cannot call MultiGetEntity without column families");
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = s;
}
return;
}
if (!keys) {
const Status s =
Status::InvalidArgument("Cannot call MultiGetEntity without keys");
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = s;
}
return;
}
if (!results) {
const Status s = Status::InvalidArgument(
"Cannot call MultiGetEntity without PinnableWideColumns objects");
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = s;
}
return;
}
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGetEntity) {
const Status s = Status::InvalidArgument(
"Can only call MultiGetEntity with `ReadOptions::io_activity` set to "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGetEntity`");
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = s;
}
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGetEntity;
}
MultiGetCommon(read_options, num_keys, column_families, keys,
nullptr, results, nullptr,
statuses, sorted_input);
}
void DBImpl::MultiGetEntity(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, size_t num_keys,
const Slice* keys, PinnableWideColumns* results,
Status* statuses, bool sorted_input) {
assert(statuses);
if (!column_family) {
const Status s = Status::InvalidArgument(
"Cannot call MultiGetEntity without a column family handle");
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = s;
}
return;
}
if (!keys) {
const Status s =
Status::InvalidArgument("Cannot call MultiGetEntity without keys");
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = s;
}
return;
}
if (!results) {
const Status s = Status::InvalidArgument(
"Cannot call MultiGetEntity without PinnableWideColumns objects");
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = s;
}
return;
}
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGetEntity) {
const Status s = Status::InvalidArgument(
"Can only call MultiGetEntity with `ReadOptions::io_activity` set to "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGetEntity`");
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = s;
}
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGetEntity;
}
MultiGetCommon(read_options, column_family, num_keys, keys,
nullptr, results, nullptr,
statuses, sorted_input);
}
void DBImpl::MultiGetEntity(const ReadOptions& _read_options, size_t num_keys,
const Slice* keys,
PinnableAttributeGroups* results) {
assert(results);
if (!keys) {
const Status s =
Status::InvalidArgument("Cannot call MultiGetEntity without keys");
for (size_t i = 0; i < num_keys; ++i) {
for (size_t j = 0; j < results[i].size(); ++j) {
results[i][j].SetStatus(s);
}
}
return;
}
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGetEntity) {
const Status s = Status::InvalidArgument(
"Can only call MultiGetEntity with `ReadOptions::io_activity` set to "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGetEntity`");
for (size_t i = 0; i < num_keys; ++i) {
for (size_t j = 0; j < results[i].size(); ++j) {
results[i][j].SetStatus(s);
}
}
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGetEntity;
}
std::vector<ColumnFamilyHandle*> column_families;
std::vector<Slice> all_keys;
size_t total_count = 0;
for (size_t i = 0; i < num_keys; ++i) {
for (size_t j = 0; j < results[i].size(); ++j) {
all_keys.emplace_back(keys[i]);
column_families.emplace_back(results[i][j].column_family());
++total_count;
}
}
std::vector<Status> statuses(total_count);
std::vector<PinnableWideColumns> columns(total_count);
MultiGetCommon(read_options, total_count, column_families.data(),
all_keys.data(),
nullptr, columns.data(),
nullptr, statuses.data(),
false);
size_t index = 0;
for (size_t i = 0; i < num_keys; ++i) {
for (size_t j = 0; j < results[i].size(); ++j) {
results[i][j].Reset();
results[i][j].SetStatus(std::move(statuses[index]));
results[i][j].SetColumns(std::move(columns[index]));
++index;
}
}
}
void DBImpl::MultiGetEntityWithCallback(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
assert(read_options.io_activity == Env::IOActivity::kMultiGetEntity);
MultiGetWithCallbackImpl(read_options, column_family, callback, sorted_keys);
}
Status DBImpl::WrapUpCreateColumnFamilies(
const WriteOptions& write_options,
const std::vector<const ColumnFamilyOptions*>& cf_options) {
options_mutex_.AssertHeld();
bool register_worker = false;
for (auto* opts_ptr : cf_options) {
if (opts_ptr->preserve_internal_time_seconds > 0 ||
opts_ptr->preclude_last_level_data_seconds > 0) {
register_worker = true;
break;
}
}
Status s = WriteOptionsFile(write_options, false );
if (register_worker) {
s.UpdateIfOk(RegisterRecordSeqnoTimeWorker());
}
return s;
}
Status DBImpl::CreateColumnFamily(const ReadOptions& read_options,
const WriteOptions& write_options,
const ColumnFamilyOptions& cf_options,
const std::string& column_family,
ColumnFamilyHandle** handle) {
assert(handle != nullptr);
InstrumentedMutexLock ol(&options_mutex_);
Status s = CreateColumnFamilyImpl(read_options, write_options, cf_options,
column_family, handle);
if (s.ok()) {
s.UpdateIfOk(WrapUpCreateColumnFamilies(write_options, {&cf_options}));
}
return s;
}
Status DBImpl::CreateColumnFamilies(
const ReadOptions& read_options, const WriteOptions& write_options,
const ColumnFamilyOptions& cf_options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) {
assert(handles != nullptr);
InstrumentedMutexLock ol(&options_mutex_);
handles->clear();
size_t num_cf = column_family_names.size();
Status s;
bool success_once = false;
for (size_t i = 0; i < num_cf; i++) {
ColumnFamilyHandle* handle;
s = CreateColumnFamilyImpl(read_options, write_options, cf_options,
column_family_names[i], &handle);
if (!s.ok()) {
break;
}
handles->push_back(handle);
success_once = true;
}
if (success_once) {
s.UpdateIfOk(WrapUpCreateColumnFamilies(write_options, {&cf_options}));
}
return s;
}
Status DBImpl::CreateColumnFamilies(
const ReadOptions& read_options, const WriteOptions& write_options,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) {
assert(handles != nullptr);
InstrumentedMutexLock ol(&options_mutex_);
handles->clear();
size_t num_cf = column_families.size();
Status s;
bool success_once = false;
std::vector<const ColumnFamilyOptions*> cf_opts;
cf_opts.reserve(num_cf);
for (size_t i = 0; i < num_cf; i++) {
ColumnFamilyHandle* handle;
s = CreateColumnFamilyImpl(read_options, write_options,
column_families[i].options,
column_families[i].name, &handle);
if (!s.ok()) {
break;
}
handles->push_back(handle);
success_once = true;
cf_opts.push_back(&column_families[i].options);
}
if (success_once) {
s.UpdateIfOk(WrapUpCreateColumnFamilies(write_options, cf_opts));
}
return s;
}
Status DBImpl::CreateColumnFamilyImpl(const ReadOptions& read_options,
const WriteOptions& write_options,
const ColumnFamilyOptions& cf_options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) {
options_mutex_.AssertHeld();
Status s;
*handle = nullptr;
DBOptions db_options =
BuildDBOptions(immutable_db_options_, mutable_db_options_);
s = ColumnFamilyData::ValidateOptions(db_options, cf_options);
if (s.ok()) {
for (auto& cf_path : cf_options.cf_paths) {
s = env_->CreateDirIfMissing(cf_path.path);
if (!s.ok()) {
break;
}
}
}
if (!s.ok()) {
return s;
}
SuperVersionContext sv_context( true);
{
InstrumentedMutexLock l(&mutex_);
if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
nullptr) {
return Status::InvalidArgument("Column family already exists");
}
VersionEdit edit;
edit.AddColumnFamily(column_family_name);
uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
edit.SetColumnFamily(new_id);
edit.SetLogNumber(cur_wal_number_);
edit.SetComparatorName(cf_options.comparator->Name());
edit.SetPersistUserDefinedTimestamps(
cf_options.persist_user_defined_timestamps);
{ WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
s = versions_->LogAndApply(nullptr, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir(), false,
&cf_options);
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
std::map<std::string, std::shared_ptr<FSDirectory>> dummy_created_dirs;
s = cfd->AddDirectories(&dummy_created_dirs);
}
if (s.ok()) {
auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
InstallSuperVersionForConfigChange(cfd, &sv_context);
if (!cfd->mem()->IsSnapshotSupported()) {
is_snapshot_supported_ = false;
}
cfd->set_initialized();
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Created column family [%s] (ID %u)",
column_family_name.c_str(), (unsigned)cfd->GetID());
} else {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Creating column family [%s] FAILED -- %s",
column_family_name.c_str(), s.ToString().c_str());
}
}
sv_context.Clean();
if (s.ok()) {
NewThreadStatusCfInfo(
static_cast_with_check<ColumnFamilyHandleImpl>(*handle)->cfd());
}
return s;
}
Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
assert(column_family != nullptr);
InstrumentedMutexLock ol(&options_mutex_);
Status s = DropColumnFamilyImpl(column_family);
if (s.ok()) {
s = WriteOptionsFile(WriteOptions(), false );
}
return s;
}
Status DBImpl::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) {
InstrumentedMutexLock ol(&options_mutex_);
Status s;
bool success_once = false;
for (auto* handle : column_families) {
s = DropColumnFamilyImpl(handle);
if (!s.ok()) {
break;
}
success_once = true;
}
if (success_once) {
Status persist_options_status =
WriteOptionsFile(WriteOptions(), false );
if (s.ok() && !persist_options_status.ok()) {
s = persist_options_status;
}
}
return s;
}
Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
options_mutex_.AssertHeld();
const ReadOptions read_options;
const WriteOptions write_options;
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
if (cfd->GetID() == 0) {
return Status::InvalidArgument("Can't drop default column family");
}
bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
VersionEdit edit;
edit.DropColumnFamily();
edit.SetColumnFamily(cfd->GetID());
Status s;
MinAndMaxPreserveSeconds preserve_info;
{
InstrumentedMutexLock l(&mutex_);
if (cfd->IsDropped()) {
s = Status::InvalidArgument("Column family already dropped!\n");
}
if (s.ok()) {
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
s = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
auto& moptions = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ -=
moptions.write_buffer_size * moptions.max_write_buffer_number;
preserve_info.Combine(moptions);
}
if (!cf_support_snapshot) {
bool new_is_snapshot_supported = true;
for (auto c : *versions_->GetColumnFamilySet()) {
if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
new_is_snapshot_supported = false;
break;
}
}
is_snapshot_supported_ = new_is_snapshot_supported;
}
bg_cv_.SignalAll();
}
if (preserve_info.IsEnabled()) {
s = RegisterRecordSeqnoTimeWorker();
}
if (s.ok()) {
EraseThreadStatusCfInfo(cfd);
assert(cfd->IsDropped());
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Dropped column family with id %u\n", cfd->GetID());
} else {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Dropping column family with id %u FAILED -- %s\n",
cfd->GetID(), s.ToString().c_str());
}
return s;
}
bool DBImpl::KeyMayExist(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, std::string* timestamp,
bool* value_found) {
assert(read_options.io_activity == Env::IOActivity::kUnknown);
if (value_found != nullptr) {
*value_found = true;
}
ReadOptions roptions = read_options;
roptions.read_tier = kBlockCacheTier; PinnableSlice pinnable_val;
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = value_found;
get_impl_options.timestamp = timestamp;
auto s = GetImpl(roptions, key, get_impl_options);
if (value_found && *value_found && value) {
value->assign(pinnable_val.data(), pinnable_val.size());
}
return s.ok() || s.IsIncomplete();
}
std::unique_ptr<MultiScan> DBImpl::NewMultiScan(
const ReadOptions& _read_options, ColumnFamilyHandle* column_family,
const MultiScanArgs& scan_opts) {
std::unique_ptr<MultiScan> ms_iter = std::make_unique<MultiScan>(
_read_options, scan_opts, this, column_family);
return ms_iter;
}
Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return NewErrorIterator(Status::InvalidArgument(
"Can only call NewIterator with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
Iterator* result = nullptr;
if (read_options.read_tier == kPersistedTier) {
return NewErrorIterator(Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators."));
}
assert(column_family);
if (read_options.timestamp) {
const Status s =
FailIfTsMismatchCf(column_family, *(read_options.timestamp));
if (!s.ok()) {
return NewErrorIterator(s);
}
} else {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return NewErrorIterator(s);
}
}
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
assert(cfh != nullptr);
ColumnFamilyData* cfd = cfh->cfd();
assert(cfd != nullptr);
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
if (read_options.timestamp && read_options.timestamp->size() > 0) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
CleanupSuperVersion(sv);
return NewErrorIterator(s);
}
}
if (read_options.tailing) {
read_options.total_order_seek |=
immutable_db_options_.prefix_seek_opt_in_only;
auto iter = new ForwardIterator(this, read_options, cfd, sv,
true);
result = DBIter::NewIter(env_, read_options, cfd->ioptions(),
sv->mutable_cf_options, cfd->user_comparator(),
iter, sv->current, kMaxSequenceNumber,
nullptr, nullptr,
cfh, false);
} else {
result = NewIteratorImpl(read_options, cfh, sv,
(read_options.snapshot != nullptr)
? read_options.snapshot->GetSequenceNumber()
: kMaxSequenceNumber,
nullptr );
}
return result;
}
ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
SuperVersion* sv, SequenceNumber snapshot, ReadCallback* read_callback,
bool expose_blob_index, bool allow_refresh) {
TEST_SYNC_POINT("DBImpl::NewIterator:1");
TEST_SYNC_POINT("DBImpl::NewIterator:2");
if (snapshot == kMaxSequenceNumber) {
snapshot = versions_->LastSequence();
TEST_SYNC_POINT("DBImpl::NewIterator:3");
TEST_SYNC_POINT("DBImpl::NewIterator:4");
}
return NewArenaWrappedDbIterator(
env_, read_options, cfh, sv, snapshot, read_callback, this,
expose_blob_index, allow_refresh, true);
}
std::unique_ptr<Iterator> DBImpl::NewCoalescingIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
return NewMultiCfIterator<Iterator, CoalescingIterator>(
_read_options, column_families, [](const Status& s) {
return std::unique_ptr<Iterator>(NewErrorIterator(s));
});
}
std::unique_ptr<AttributeGroupIterator> DBImpl::NewAttributeGroupIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
return NewMultiCfIterator<AttributeGroupIterator, AttributeGroupIteratorImpl>(
_read_options, column_families,
[](const Status& s) { return NewAttributeGroupErrorIterator(s); });
}
template <typename IterType, typename ImplType, typename ErrorIteratorFuncType>
std::unique_ptr<IterType> DBImpl::NewMultiCfIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
ErrorIteratorFuncType error_iterator_func) {
if (column_families.size() == 0) {
return error_iterator_func(
Status::InvalidArgument("No Column Family was provided"));
}
const Comparator* first_comparator = column_families[0]->GetComparator();
for (size_t i = 1; i < column_families.size(); ++i) {
const Comparator* cf_comparator = column_families[i]->GetComparator();
if (first_comparator != cf_comparator &&
first_comparator->GetId().compare(cf_comparator->GetId()) != 0) {
return error_iterator_func(Status::InvalidArgument(
"Different comparators are being used across CFs"));
}
}
std::vector<Iterator*> child_iterators;
Status s = NewIterators(_read_options, column_families, &child_iterators);
if (!s.ok()) {
return error_iterator_func(s);
}
assert(column_families.size() == child_iterators.size());
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
cfh_iter_pairs;
cfh_iter_pairs.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfh_iter_pairs.emplace_back(column_families[i], child_iterators[i]);
}
return std::make_unique<ImplType>(_read_options,
column_families[0]->GetComparator(),
std::move(cfh_iter_pairs));
}
Status DBImpl::NewIterators(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kDBIterator) {
return Status::InvalidArgument(
"Can only call NewIterators with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kDBIterator;
}
if (read_options.read_tier == kPersistedTier) {
return Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators.");
}
autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
cf_sv_pairs;
Status s;
for (auto* cf : column_families) {
assert(cf);
if (read_options.timestamp) {
s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
} else {
s = FailIfCfHasTs(cf);
}
if (!s.ok()) {
return s;
}
cf_sv_pairs.emplace_back(cf, nullptr);
}
iterators->clear();
iterators->reserve(column_families.size());
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local = false;
s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr ,
[](autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
},
&cf_sv_pairs,
true, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
return s;
}
assert(cf_sv_pairs.size() == column_families.size());
if (read_options.tailing) {
read_options.total_order_seek |=
immutable_db_options_.prefix_seek_opt_in_only;
for (const auto& cf_sv_pair : cf_sv_pairs) {
auto iter = new ForwardIterator(this, read_options, cf_sv_pair.cfd,
cf_sv_pair.super_version,
true);
iterators->push_back(DBIter::NewIter(
env_, read_options, cf_sv_pair.cfd->ioptions(),
cf_sv_pair.super_version->mutable_cf_options,
cf_sv_pair.cfd->user_comparator(), iter,
cf_sv_pair.super_version->current, kMaxSequenceNumber,
nullptr , nullptr, cf_sv_pair.cfh));
}
} else {
for (const auto& cf_sv_pair : cf_sv_pairs) {
iterators->push_back(NewIteratorImpl(
read_options, cf_sv_pair.cfh, cf_sv_pair.super_version,
consistent_seqnum, nullptr ));
}
}
return Status::OK();
}
const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
return GetSnapshotImpl(true);
}
std::pair<Status, std::shared_ptr<const Snapshot>>
DBImpl::CreateTimestampedSnapshot(SequenceNumber snapshot_seq, uint64_t ts) {
assert(ts != std::numeric_limits<uint64_t>::max());
auto ret = CreateTimestampedSnapshotImpl(snapshot_seq, ts, true);
return ret;
}
std::shared_ptr<const SnapshotImpl> DBImpl::GetTimestampedSnapshot(
uint64_t ts) const {
InstrumentedMutexLock lock_guard(&mutex_);
return timestamped_snapshots_.GetSnapshot(ts);
}
void DBImpl::ReleaseTimestampedSnapshotsOlderThan(uint64_t ts,
size_t* remaining_total_ss) {
autovector<std::shared_ptr<const SnapshotImpl>> snapshots_to_release;
{
InstrumentedMutexLock lock_guard(&mutex_);
timestamped_snapshots_.ReleaseSnapshotsOlderThan(ts, snapshots_to_release);
}
snapshots_to_release.clear();
if (remaining_total_ss) {
InstrumentedMutexLock lock_guard(&mutex_);
*remaining_total_ss = static_cast<size_t>(snapshots_.count());
}
}
Status DBImpl::GetTimestampedSnapshots(
uint64_t ts_lb, uint64_t ts_ub,
std::vector<std::shared_ptr<const Snapshot>>& timestamped_snapshots) const {
if (ts_lb >= ts_ub) {
return Status::InvalidArgument(
"timestamp lower bound must be smaller than upper bound");
}
timestamped_snapshots.clear();
InstrumentedMutexLock lock_guard(&mutex_);
timestamped_snapshots_.GetSnapshots(ts_lb, ts_ub, timestamped_snapshots);
return Status::OK();
}
SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock) {
int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); SnapshotImpl* s = new SnapshotImpl;
if (lock) {
mutex_.Lock();
} else {
mutex_.AssertHeld();
}
if (!is_snapshot_supported_) {
if (lock) {
mutex_.Unlock();
}
delete s;
return nullptr;
}
auto snapshot_seq = GetLastPublishedSequence();
SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
if (lock) {
mutex_.Unlock();
}
return snapshot;
}
std::pair<Status, std::shared_ptr<const SnapshotImpl>>
DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
bool lock) {
int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); SnapshotImpl* s = new SnapshotImpl;
const bool need_update_seq = (snapshot_seq != kMaxSequenceNumber);
if (lock) {
mutex_.Lock();
} else {
mutex_.AssertHeld();
}
if (!is_snapshot_supported_) {
if (lock) {
mutex_.Unlock();
}
delete s;
return std::make_pair(
Status::NotSupported("Memtable does not support snapshot"), nullptr);
}
if (!need_update_seq) {
snapshot_seq = GetLastPublishedSequence();
}
std::shared_ptr<const SnapshotImpl> latest =
timestamped_snapshots_.GetSnapshot(std::numeric_limits<uint64_t>::max());
if (latest) {
uint64_t latest_snap_ts = latest->GetTimestamp();
SequenceNumber latest_snap_seq = latest->GetSequenceNumber();
assert(latest_snap_seq <= snapshot_seq);
bool needs_create_snap = true;
Status status;
std::shared_ptr<const SnapshotImpl> ret;
if (latest_snap_ts > ts) {
needs_create_snap = false;
std::ostringstream oss;
oss << "snapshot exists with larger timestamp " << latest_snap_ts << " > "
<< ts;
status = Status::InvalidArgument(oss.str());
} else if (latest_snap_ts == ts) {
if (latest_snap_seq == snapshot_seq) {
needs_create_snap = false;
ret = latest;
} else if (latest_snap_seq < snapshot_seq) {
needs_create_snap = false;
std::ostringstream oss;
oss << "Allocated seq is " << snapshot_seq
<< ", while snapshot exists with smaller seq " << latest_snap_seq
<< " but same timestamp " << ts;
status = Status::InvalidArgument(oss.str());
}
}
if (!needs_create_snap) {
if (lock) {
mutex_.Unlock();
}
delete s;
return std::make_pair(status, ret);
} else {
status.PermitUncheckedError();
}
}
SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time,
true, ts);
std::shared_ptr<const SnapshotImpl> ret(
snapshot,
std::bind(&DBImpl::ReleaseSnapshot, this, std::placeholders::_1));
timestamped_snapshots_.AddSnapshot(ret);
if (need_update_seq) {
assert(versions_);
if (last_seq_same_as_publish_seq_) {
versions_->SetLastSequence(snapshot_seq);
} else {
assert(false);
}
}
if (lock) {
mutex_.Unlock();
}
return std::make_pair(Status::OK(), ret);
}
namespace {
using CfdList = autovector<ColumnFamilyData*, 2>;
bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
for (const ColumnFamilyData* t : list) {
if (t == cfd) {
return true;
}
}
return false;
}
}
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
if (s == nullptr) {
return;
}
const SnapshotImpl* casted_s = static_cast<const SnapshotImpl*>(s);
{
InstrumentedMutexLock l(&mutex_);
snapshots_.Delete(casted_s);
uint64_t oldest_snapshot;
if (snapshots_.empty()) {
oldest_snapshot = GetLastPublishedSequence();
} else {
oldest_snapshot = snapshots_.oldest()->number_;
}
CfdList cf_scheduled;
if (oldest_snapshot > bottommost_files_mark_threshold_) {
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->AllowIngestBehind()) {
cfd->current()->storage_info()->UpdateOldestSnapshot(
oldest_snapshot, false,
cfd->ioptions().user_comparator, cfd->GetFullHistoryTsLow());
if (!cfd->current()
->storage_info()
->BottommostFilesMarkedForCompaction()
.empty()) {
EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
cf_scheduled.push_back(cfd);
}
}
}
SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (CfdListContains(cf_scheduled, cfd) || cfd->AllowIngestBehind()) {
continue;
}
new_bottommost_files_mark_threshold = std::min(
new_bottommost_files_mark_threshold,
cfd->current()->storage_info()->bottommost_files_mark_threshold());
}
bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
}
if (oldest_snapshot >= standalone_range_deletion_files_mark_threshold_) {
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped() || CfdListContains(cf_scheduled, cfd)) {
continue;
}
if (oldest_snapshot >=
cfd->current()
->storage_info()
->standalone_range_tombstone_files_mark_threshold()) {
EnqueuePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
cf_scheduled.push_back(cfd);
}
}
}
}
delete casted_s;
}
Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
TablePropertiesCollection* props) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
mutex_.Lock();
auto version = cfd->current();
version->Ref();
mutex_.Unlock();
const ReadOptions read_options;
auto s = version->GetPropertiesOfAllTables(read_options, props);
mutex_.Lock();
version->Unref();
mutex_.Unlock();
return s;
}
Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
const Range* range, std::size_t n,
TablePropertiesCollection* props) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
mutex_.Lock();
auto version = cfd->current();
version->Ref();
mutex_.Unlock();
const ReadOptions read_options;
const Comparator* const ucmp = cfd->user_comparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
autovector<UserKeyRange> ukey_ranges;
std::vector<std::string> keys;
ukey_ranges.reserve(n);
keys.reserve(2 * n);
for (size_t i = 0; i < n; i++) {
auto [start, limit] = MaybeAddTimestampsToRange(
range[i].start, range[i].limit, ts_sz, &keys.emplace_back(),
&keys.emplace_back(), false);
assert(start.has_value());
assert(limit.has_value());
ukey_ranges.emplace_back(start.value(), limit.value());
}
auto s =
version->GetPropertiesOfTablesInRange(read_options, ukey_ranges, props);
mutex_.Lock();
version->Unref();
mutex_.Unlock();
return s;
}
Status DBImpl::GetPropertiesOfTablesByLevel(
ColumnFamilyHandle* column_family,
std::vector<std::unique_ptr<TablePropertiesCollection>>* props_by_level) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
mutex_.Lock();
auto version = cfd->current();
version->Ref();
mutex_.Unlock();
const ReadOptions read_options;
auto s = version->GetPropertiesOfTablesByLevel(read_options, props_by_level);
mutex_.Lock();
version->Unref();
mutex_.Unlock();
return s;
}
const std::string& DBImpl::GetName() const { return dbname_; }
Env* DBImpl::GetEnv() const { return env_; }
FileSystem* DB::GetFileSystem() const {
const auto& fs = GetEnv()->GetFileSystem();
return fs.get();
}
FileSystem* DBImpl::GetFileSystem() const {
return immutable_db_options_.fs.get();
}
SystemClock* DBImpl::GetSystemClock() const {
return immutable_db_options_.clock;
}
Status DBImpl::StartIOTrace(const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
assert(trace_writer != nullptr);
return io_tracer_->StartIOTrace(GetSystemClock(), trace_options,
std::move(trace_writer));
}
Status DBImpl::EndIOTrace() {
io_tracer_->EndIOTrace();
return Status::OK();
}
Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
InstrumentedMutexLock l(&mutex_);
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
cfh->cfd()->GetLatestCFOptions());
}
DBOptions DBImpl::GetDBOptions() const {
InstrumentedMutexLock l(&mutex_);
return BuildDBOptions(immutable_db_options_, mutable_db_options_);
}
bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
const Slice& property, std::string* value) {
const DBPropertyInfo* property_info = GetPropertyInfo(property);
value->clear();
auto cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
if (property_info == nullptr) {
return false;
} else if (property_info->handle_int) {
uint64_t int_value;
bool ret_value =
GetIntPropertyInternal(cfd, *property_info, false, &int_value);
if (ret_value) {
*value = std::to_string(int_value);
}
return ret_value;
} else if (property_info->handle_string) {
if (property_info->need_out_of_mutex) {
return cfd->internal_stats()->GetStringProperty(*property_info, property,
value);
} else {
InstrumentedMutexLock l(&mutex_);
return cfd->internal_stats()->GetStringProperty(*property_info, property,
value);
}
} else if (property_info->handle_string_dbimpl) {
if (property_info->need_out_of_mutex) {
return (this->*(property_info->handle_string_dbimpl))(value);
} else {
InstrumentedMutexLock l(&mutex_);
return (this->*(property_info->handle_string_dbimpl))(value);
}
}
assert(false);
return false;
}
bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
const Slice& property,
std::map<std::string, std::string>* value) {
const DBPropertyInfo* property_info = GetPropertyInfo(property);
value->clear();
auto cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
if (property_info == nullptr) {
return false;
} else if (property_info->handle_map) {
if (property_info->need_out_of_mutex) {
return cfd->internal_stats()->GetMapProperty(*property_info, property,
value);
} else {
InstrumentedMutexLock l(&mutex_);
return cfd->internal_stats()->GetMapProperty(*property_info, property,
value);
}
}
return false;
}
bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
const Slice& property, uint64_t* value) {
const DBPropertyInfo* property_info = GetPropertyInfo(property);
if (property_info == nullptr || property_info->handle_int == nullptr) {
return false;
}
auto cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
return GetIntPropertyInternal(cfd, *property_info, false, value);
}
bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
const DBPropertyInfo& property_info,
bool is_locked, uint64_t* value) {
assert(property_info.handle_int != nullptr);
if (!property_info.need_out_of_mutex) {
if (is_locked) {
mutex_.AssertHeld();
return cfd->internal_stats()->GetIntProperty(property_info, value, this);
} else {
InstrumentedMutexLock l(&mutex_);
return cfd->internal_stats()->GetIntProperty(property_info, value, this);
}
} else {
SuperVersion* sv = nullptr;
if (is_locked) {
mutex_.Unlock();
}
sv = GetAndRefSuperVersion(cfd);
bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
property_info, sv->current, value);
ReturnAndCleanupSuperVersion(cfd, sv);
if (is_locked) {
mutex_.Lock();
}
return ret;
}
}
bool DBImpl::GetPropertyHandleOptionsStatistics(std::string* value) {
assert(value != nullptr);
Statistics* statistics = immutable_db_options_.stats;
if (!statistics) {
return false;
}
*value = statistics->ToString();
return true;
}
Status DBImpl::ResetStats() {
InstrumentedMutexLock l(&mutex_);
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (cfd->initialized()) {
cfd->internal_stats()->Clear();
}
}
return Status::OK();
}
bool DBImpl::GetAggregatedIntProperty(const Slice& property,
uint64_t* aggregated_value) {
const DBPropertyInfo* property_info = GetPropertyInfo(property);
if (property_info == nullptr || property_info->handle_int == nullptr) {
return false;
}
auto aggregator = CreateIntPropertyAggregator(property);
if (aggregator == nullptr) {
return false;
}
bool ret = true;
{
InstrumentedMutexLock l(&mutex_);
uint64_t value;
for (auto* cfd : versions_->GetRefedColumnFamilySet()) {
if (!cfd->initialized()) {
continue;
}
ret = GetIntPropertyInternal(cfd, *property_info, true, &value);
mutex_.AssertHeld();
if (ret) {
aggregator->Add(cfd, value);
} else {
ret = false;
break;
}
}
}
*aggregated_value = aggregator->Aggregate();
return ret;
}
SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
return cfd->GetThreadLocalSuperVersion(this);
}
SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
auto column_family_set = versions_->GetColumnFamilySet();
auto cfd = column_family_set->GetColumnFamily(column_family_id);
if (!cfd) {
return nullptr;
}
return GetAndRefSuperVersion(cfd);
}
void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
if (sv->Unref()) {
bool defer_purge = immutable_db_options().avoid_unnecessary_blocking_io;
{
InstrumentedMutexLock l(&mutex_);
sv->Cleanup();
if (defer_purge) {
AddSuperVersionsToFreeQueue(sv);
SchedulePurge();
}
}
if (!defer_purge) {
delete sv;
}
RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
}
RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
}
void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
SuperVersion* sv) {
if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
CleanupSuperVersion(sv);
}
}
void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
SuperVersion* sv) {
auto column_family_set = versions_->GetColumnFamilySet();
auto cfd = column_family_set->GetColumnFamily(column_family_id);
assert(cfd != nullptr);
ReturnAndCleanupSuperVersion(cfd, sv);
}
ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
if (!cf_memtables->Seek(column_family_id)) {
return nullptr;
}
return cf_memtables->GetColumnFamilyHandle();
}
std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
uint32_t column_family_id) {
InstrumentedMutexLock l(&mutex_);
auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
if (cfd == nullptr) {
return nullptr;
}
return std::unique_ptr<ColumnFamilyHandleImpl>(
new ColumnFamilyHandleImpl(cfd, this, &mutex_));
}
void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
const Range& range,
uint64_t* const count,
uint64_t* const size) {
ColumnFamilyHandleImpl* cfh =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
SuperVersion* sv = GetAndRefSuperVersion(cfd);
const Comparator* const ucmp = column_family->GetComparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
std::string start_with_ts, limit_with_ts;
auto [start, limit] = MaybeAddTimestampsToRange(
range.start, range.limit, ts_sz, &start_with_ts, &limit_with_ts);
assert(start.has_value());
assert(limit.has_value());
InternalKey k1(start.value(), kMaxSequenceNumber, kValueTypeForSeek);
InternalKey k2(limit.value(), kMaxSequenceNumber, kValueTypeForSeek);
ReadOnlyMemTable::MemTableStats memStats =
sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
ReadOnlyMemTable::MemTableStats immStats =
sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
*count = memStats.count + immStats.count;
*size = memStats.size + immStats.size;
ReturnAndCleanupSuperVersion(cfd, sv);
}
Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
ColumnFamilyHandle* column_family,
const Range* range, int n, uint64_t* sizes) {
if (!options.include_memtables && !options.include_files) {
return Status::InvalidArgument("Invalid options");
}
const Comparator* const ucmp = column_family->GetComparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
Version* v;
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
SuperVersion* sv = GetAndRefSuperVersion(cfd);
v = sv->current;
const ReadOptions read_options;
for (int i = 0; i < n; i++) {
std::string start_with_ts, limit_with_ts;
auto [start, limit] = MaybeAddTimestampsToRange(
range[i].start, range[i].limit, ts_sz, &start_with_ts, &limit_with_ts);
assert(start.has_value());
assert(limit.has_value());
InternalKey k1(start.value(), kMaxSequenceNumber, kValueTypeForSeek);
InternalKey k2(limit.value(), kMaxSequenceNumber, kValueTypeForSeek);
sizes[i] = 0;
if (options.include_files) {
sizes[i] += versions_->ApproximateSize(
options, read_options, v, k1.Encode(), k2.Encode(),
0,
-1, TableReaderCaller::kUserApproximateSize);
}
if (options.include_memtables) {
sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
}
}
ReturnAndCleanupSuperVersion(cfd, sv);
return Status::OK();
}
std::list<uint64_t>::iterator
DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
pending_outputs_.push_back(versions_->current_next_file_number());
auto pending_outputs_inserted_elem = pending_outputs_.end();
--pending_outputs_inserted_elem;
return pending_outputs_inserted_elem;
}
void DBImpl::ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v) {
if (v.get() != nullptr) {
pending_outputs_.erase(*v.get());
v.reset();
}
}
std::list<uint64_t>::iterator DBImpl::CaptureOptionsFileNumber() {
min_options_file_numbers_.push_back(versions_->options_file_number());
auto min_options_file_numbers_inserted_elem = min_options_file_numbers_.end();
--min_options_file_numbers_inserted_elem;
return min_options_file_numbers_inserted_elem;
}
void DBImpl::ReleaseOptionsFileNumber(
std::unique_ptr<std::list<uint64_t>::iterator>& v) {
if (v.get() != nullptr) {
min_options_file_numbers_.erase(*v.get());
v.reset();
}
}
Status DBImpl::GetUpdatesSince(
SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options) {
RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
if (seq_per_batch_) {
return Status::NotSupported(
"This API is not yet compatible with write-prepared/write-unprepared "
"transactions");
}
if (seq > versions_->LastSequence()) {
return Status::NotFound("Requested sequence not yet written in the db");
}
return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
}
Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
const RangeOpt* ranges, size_t n,
bool include_end) {
const ReadOptions read_options;
const WriteOptions write_options;
Status status = Status::OK();
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
const Comparator* ucmp = cfd->user_comparator();
assert(ucmp);
const size_t ts_sz = ucmp->timestamp_size();
autovector<UserKeyRangeOpt> ukey_ranges;
std::vector<std::string> keys;
std::vector<Slice> key_slices;
ukey_ranges.reserve(n);
keys.reserve(2 * n);
key_slices.reserve(2 * n);
for (size_t i = 0; i < n; i++) {
auto [start, limit] = MaybeAddTimestampsToRange(
ranges[i].start, ranges[i].limit, ts_sz, &keys.emplace_back(),
&keys.emplace_back(), !include_end);
assert(ranges[i].start.has_value() == start.has_value());
assert(ranges[i].limit.has_value() == limit.has_value());
ukey_ranges.emplace_back(start, limit);
}
VersionEdit edit;
std::set<FileMetaData*> deleted_files;
JobContext job_context(next_job_id_.fetch_add(1), true);
{
InstrumentedMutexLock l(&mutex_);
Version* input_version = cfd->current();
auto* vstorage = input_version->storage_info();
for (const auto& range : ukey_ranges) {
auto begin = range.start.has_value() ? &range.start.value() : nullptr;
auto end = range.limit.has_value() ? &range.limit.value() : nullptr;
for (int i = 1; i < cfd->NumberLevels(); i++) {
if (vstorage->LevelFiles(i).empty() ||
!vstorage->OverlapInLevel(i, begin, end)) {
continue;
}
std::vector<FileMetaData*> level_files;
InternalKey begin_storage, end_storage, *begin_key, *end_key;
if (begin == nullptr) {
begin_key = nullptr;
} else {
begin_storage.SetMinPossibleForUserKey(*begin);
begin_key = &begin_storage;
}
if (end == nullptr) {
end_key = nullptr;
} else {
end_storage.SetMaxPossibleForUserKey(*end);
end_key = &end_storage;
}
vstorage->GetCleanInputsWithinInterval(
i, begin_key, end_key, &level_files, -1 ,
nullptr );
FileMetaData* level_file;
for (uint32_t j = 0; j < level_files.size(); j++) {
level_file = level_files[j];
if (level_file->being_compacted) {
continue;
}
if (deleted_files.find(level_file) != deleted_files.end()) {
continue;
}
if (!include_end && end != nullptr &&
(ucmp->CompareWithoutTimestamp(level_file->largest.user_key(),
*end) == 0)) {
continue;
}
edit.SetColumnFamily(cfd->GetID());
edit.DeleteFile(i, level_file->fd.GetNumber());
deleted_files.insert(level_file);
level_file->being_compacted = true;
}
}
}
if (!deleted_files.empty()) {
vstorage->ComputeCompactionScore(cfd->ioptions(),
cfd->GetLatestMutableCFOptions(),
cfd->GetFullHistoryTsLow());
}
if (edit.GetDeletedFiles().empty()) {
job_context.Clean();
return status;
}
input_version->Ref();
status = versions_->LogAndApply(cfd, read_options, write_options, &edit,
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(
cfd, job_context.superversion_contexts.data());
}
for (auto* deleted_file : deleted_files) {
deleted_file->being_compacted = false;
}
input_version->Unref();
FindObsoleteFiles(&job_context, false);
}
LogFlush(immutable_db_options_.info_log);
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
return status;
}
void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
InstrumentedMutexLock l(&mutex_);
versions_->GetLiveFilesMetaData(metadata);
}
Status DBImpl::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
InstrumentedMutexLock l(&mutex_);
return versions_->GetLiveFilesChecksumInfo(checksum_list);
}
void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* cf_meta) {
assert(column_family);
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
{
InstrumentedMutexLock l(&mutex_);
cfd->current()->GetColumnFamilyMetaData(cf_meta);
}
}
void DBImpl::GetColumnFamilyMetaData(
ColumnFamilyHandle* column_family,
const GetColumnFamilyMetaDataOptions& options,
ColumnFamilyMetaData* metadata) {
assert(column_family);
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
{
InstrumentedMutexLock l(&mutex_);
cfd->current()->GetColumnFamilyMetaData(options, metadata);
}
}
void DBImpl::GetAllColumnFamilyMetaData(
std::vector<ColumnFamilyMetaData>* metadata) {
InstrumentedMutexLock l(&mutex_);
for (auto cfd : *(versions_->GetColumnFamilySet())) {
{
metadata->emplace_back();
cfd->current()->GetColumnFamilyMetaData(&metadata->back());
}
}
}
Status DBImpl::GetDbIdentity(std::string& identity) const {
identity.assign(db_id_);
return Status::OK();
}
Status DBImpl::GetDbIdentityFromIdentityFile(const IOOptions& opts,
std::string* identity) const {
std::string idfilename = IdentityFileName(dbname_);
const FileOptions soptions;
Status s = ReadFileToString(fs_.get(), idfilename, opts, identity);
if (!s.ok()) {
return s;
}
if (identity->size() > 0 && identity->back() == '\n') {
identity->pop_back();
}
return s;
}
Status DBImpl::GetDbSessionId(std::string& session_id) const {
session_id.assign(db_session_id_);
return Status::OK();
}
namespace {
SemiStructuredUniqueIdGen* DbSessionIdGen() {
static SemiStructuredUniqueIdGen gen;
return &gen;
}
}
void DBImpl::TEST_ResetDbSessionIdGen() { DbSessionIdGen()->Reset(); }
std::string DBImpl::GenerateDbSessionId(Env*) {
auto gen = DbSessionIdGen();
uint64_t lo, hi;
gen->GenerateNext(&hi, &lo);
if (lo == 0) {
gen->GenerateNext(&hi, &lo);
assert(lo != 0);
}
return EncodeSessionId(hi, lo);
}
void DBImpl::SetDbSessionId() {
db_session_id_ = GenerateDbSessionId(env_);
TEST_SYNC_POINT_CALLBACK("DBImpl::SetDbSessionId", &db_session_id_);
}
Status DB::CreateColumnFamily(const ColumnFamilyOptions& ,
const std::string& ,
ColumnFamilyHandle** ) {
return Status::NotSupported("");
}
Status DB::CreateColumnFamilies(
const ColumnFamilyOptions& ,
const std::vector<std::string>& ,
std::vector<ColumnFamilyHandle*>* ) {
return Status::NotSupported("");
}
Status DB::CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& ,
std::vector<ColumnFamilyHandle*>* ) {
return Status::NotSupported("");
}
Status DB::DropColumnFamily(ColumnFamilyHandle* ) {
return Status::NotSupported("");
}
Status DB::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& ) {
return Status::NotSupported("");
}
Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
if (DefaultColumnFamily() == column_family) {
return Status::InvalidArgument(
"Cannot destroy the handle returned by DefaultColumnFamily()");
}
delete column_family;
return Status::OK();
}
DB::~DB() = default;
Status DBImpl::Close() {
InstrumentedMutexLock closing_lock_guard(&closing_mutex_);
if (closed_) {
return closing_status_;
}
{
const Status s = MaybeReleaseTimestampedSnapshotsAndCheck();
if (!s.ok()) {
return s;
}
}
closing_status_ = CloseImpl();
closed_ = true;
return closing_status_;
}
Status DB::ListColumnFamilies(const DBOptions& db_options,
const std::string& name,
std::vector<std::string>* column_families) {
const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
return VersionSet::ListColumnFamilies(column_families, name, fs.get());
}
Snapshot::~Snapshot() = default;
Status DestroyDB(const std::string& dbname, const Options& options,
const std::vector<ColumnFamilyDescriptor>& column_families) {
ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
Env* env = soptions.env;
std::vector<std::string> filenames;
bool wal_in_db_path = soptions.IsWalDirSameAsDBPath();
auto sfm = static_cast_with_check<SstFileManagerImpl>(
options.sst_file_manager.get());
std::optional<int32_t> bucket;
if (sfm) {
bucket = sfm->NewTrashBucket();
}
soptions.info_log.reset();
IOOptions io_opts;
soptions.fs
->GetChildren(dbname, io_opts, &filenames,
nullptr)
.PermitUncheckedError();
std::set<std::string> paths_to_delete;
FileLock* lock;
const std::string lockname = LockFileName(dbname);
Status result = env->LockFile(lockname, &lock);
if (result.ok()) {
uint64_t number;
FileType type;
InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
for (const auto& fname : filenames) {
if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
type != kDBLockFile) { Status del;
std::string path_to_delete = dbname + "/" + fname;
if (type == kMetaDatabase) {
del = DestroyDB(path_to_delete, options);
} else if (type == kTableFile || type == kWalFile ||
type == kBlobFile) {
del = DeleteUnaccountedDBFile(&soptions, path_to_delete, dbname,
false,
false, bucket);
} else {
del = env->DeleteFile(path_to_delete);
}
if (!del.ok() && result.ok()) {
result = del;
}
}
}
paths_to_delete.insert(dbname);
std::set<std::string> paths;
for (const DbPath& db_path : options.db_paths) {
paths.insert(db_path.path);
}
for (const ColumnFamilyDescriptor& cf : column_families) {
for (const DbPath& cf_path : cf.options.cf_paths) {
paths.insert(cf_path.path);
}
}
for (const auto& path : paths) {
if (soptions.fs
->GetChildren(path, io_opts, &filenames,
nullptr)
.ok()) {
for (const auto& fname : filenames) {
if (ParseFileName(fname, &number, &type) &&
(type == kTableFile ||
type == kBlobFile)) { std::string file_path = path + "/" + fname;
Status del = DeleteUnaccountedDBFile(&soptions, file_path, dbname,
false,
false, bucket);
if (!del.ok() && result.ok()) {
result = del;
}
}
}
}
}
paths_to_delete.merge(paths);
std::vector<std::string> walDirFiles;
std::string archivedir = ArchivalDirectory(dbname);
bool wal_dir_exists = false;
if (!soptions.IsWalDirSameAsDBPath(dbname)) {
wal_dir_exists =
soptions.fs
->GetChildren(soptions.wal_dir, io_opts, &walDirFiles,
nullptr)
.ok();
archivedir = ArchivalDirectory(soptions.wal_dir);
}
std::vector<std::string> archiveFiles;
if (soptions.fs
->GetChildren(archivedir, io_opts, &archiveFiles,
nullptr)
.ok()) {
for (const auto& file : archiveFiles) {
if (ParseFileName(file, &number, &type) && type == kWalFile) {
Status del = DeleteUnaccountedDBFile(
&soptions, archivedir + "/" + file, archivedir,
false, !wal_in_db_path, bucket);
if (!del.ok() && result.ok()) {
result = del;
}
}
}
paths_to_delete.insert(archivedir);
}
if (wal_dir_exists) {
for (const auto& file : walDirFiles) {
if (ParseFileName(file, &number, &type) && type == kWalFile) {
Status del = DeleteUnaccountedDBFile(
&soptions, LogFileName(soptions.wal_dir, number),
soptions.wal_dir, false,
!wal_in_db_path, bucket);
if (!del.ok() && result.ok()) {
result = del;
}
}
}
paths_to_delete.insert(soptions.wal_dir);
}
env->UnlockFile(lock).PermitUncheckedError();
env->DeleteFile(lockname).PermitUncheckedError();
if (sfm && bucket.has_value()) {
sfm->WaitForEmptyTrashBucket(bucket.value());
}
soptions.sst_file_manager.reset();
for (const auto& path_to_delete : paths_to_delete) {
env->DeleteDir(path_to_delete).PermitUncheckedError();
}
}
return result;
}
Status DBImpl::WriteOptionsFile(const WriteOptions& write_options,
bool db_mutex_already_held) {
options_mutex_.AssertHeld();
if (db_mutex_already_held) {
mutex_.AssertHeld();
} else {
mutex_.Lock();
}
std::vector<std::string> cf_names;
std::vector<ColumnFamilyOptions> cf_opts;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cf_names.push_back(cfd->GetName());
cf_opts.push_back(cfd->GetLatestCFOptions());
}
DBOptions db_options =
BuildDBOptions(immutable_db_options_, mutable_db_options_);
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
TEST_SYNC_POINT_CALLBACK("DBImpl::WriteOptionsFile:PersistOptions",
&db_options);
std::string file_name =
TempOptionsFileName(GetName(), versions_->NewFileNumber());
Status s = PersistRocksDBOptions(write_options, db_options, cf_names, cf_opts,
file_name, fs_.get());
if (s.ok()) {
s = RenameTempFileToOptionsFile(file_name,
db_options.compaction_service != nullptr);
}
if (!s.ok() && GetEnv()->FileExists(file_name).ok()) {
if (!GetEnv()->DeleteFile(file_name).ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to delete temp options file %s",
file_name.c_str());
}
}
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unnable to persist options -- %s", s.ToString().c_str());
s = Status::IOError("Unable to persist options.", s.ToString().c_str());
}
if (db_mutex_already_held) {
mutex_.Lock();
}
return s;
}
namespace {
void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
const size_t num_files_to_keep,
const std::shared_ptr<Logger>& info_log,
Env* env) {
if (filenames.size() <= num_files_to_keep) {
return;
}
for (auto iter = std::next(filenames.begin(), num_files_to_keep);
iter != filenames.end(); ++iter) {
if (!env->DeleteFile(iter->second).ok()) {
ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
iter->second.c_str());
}
}
}
}
Status DBImpl::DeleteObsoleteOptionsFiles() {
std::vector<std::string> filenames;
std::map<uint64_t, std::string> options_filenames;
Status s;
IOOptions io_opts;
io_opts.do_not_recurse = true;
s = fs_->GetChildren(GetName(), io_opts, &filenames,
nullptr);
if (!s.ok()) {
return s;
}
for (auto& filename : filenames) {
uint64_t file_number;
FileType type;
if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
options_filenames.insert(
{std::numeric_limits<uint64_t>::max() - file_number,
GetName() + "/" + filename});
}
}
const size_t kNumOptionsFilesKept = 2;
DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
immutable_db_options_.info_log, GetEnv());
return Status::OK();
}
Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name,
bool is_remote_compaction_enabled) {
Status s;
uint64_t options_file_number = versions_->NewFileNumber();
std::string options_file_name =
OptionsFileName(GetName(), options_file_number);
uint64_t options_file_size = 0;
s = GetEnv()->GetFileSize(file_name, &options_file_size);
if (s.ok()) {
s = GetEnv()->RenameFile(file_name, options_file_name);
std::unique_ptr<FSDirectory> dir_obj;
if (s.ok()) {
s = fs_->NewDirectory(GetName(), IOOptions(), &dir_obj, nullptr);
}
if (s.ok()) {
s = dir_obj->FsyncWithDirOptions(IOOptions(), nullptr,
DirFsyncOptions(options_file_name));
}
if (s.ok()) {
Status temp_s = dir_obj->Close(IOOptions(), nullptr);
if (!temp_s.ok()) {
if (temp_s.IsNotSupported()) {
temp_s.PermitUncheckedError();
} else {
s = temp_s;
}
}
}
}
if (s.ok()) {
int my_disable_delete_obsolete_files;
{
InstrumentedMutexLock l(&mutex_);
versions_->options_file_number_ = options_file_number;
versions_->options_file_size_ = options_file_size;
my_disable_delete_obsolete_files = disable_delete_obsolete_files_;
}
if (!my_disable_delete_obsolete_files && !is_remote_compaction_enabled) {
DeleteObsoleteOptionsFiles().PermitUncheckedError();
}
}
return s;
}
#ifndef NROCKSDB_THREAD_STATUS
void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* cfd) const {
if (immutable_db_options_.enable_thread_tracking) {
ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
cfd->ioptions().env);
}
}
void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const {
if (immutable_db_options_.enable_thread_tracking) {
ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
}
}
void DBImpl::EraseThreadStatusDbInfo() const {
if (immutable_db_options_.enable_thread_tracking) {
ThreadStatusUtil::EraseDatabaseInfo(this);
}
}
#else
void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* ) const {}
void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* ) const {}
void DBImpl::EraseThreadStatusDbInfo() const {}
#endif
void DumpRocksDBBuildVersion(Logger* log) {
ROCKS_LOG_HEADER(log, "RocksDB version: %s\n",
GetRocksVersionAsString().c_str());
const auto& props = GetRocksBuildProperties();
const auto& sha = props.find("rocksdb_build_git_sha");
if (sha != props.end()) {
ROCKS_LOG_HEADER(log, "Git sha %s", sha->second.c_str());
}
const auto date = props.find("rocksdb_build_date");
if (date != props.end()) {
ROCKS_LOG_HEADER(log, "Compile date %s", date->second.c_str());
}
}
SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
bool include_history) {
SequenceNumber earliest_seq =
sv->imm->GetEarliestSequenceNumber(include_history);
if (earliest_seq == kMaxSequenceNumber) {
earliest_seq = sv->mem->GetEarliestSequenceNumber();
}
assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
return earliest_seq;
}
Status DBImpl::GetLatestSequenceForKey(
SuperVersion* sv, const Slice& key, bool cache_only,
SequenceNumber lower_bound_seq, SequenceNumber* seq, std::string* timestamp,
bool* found_record_for_key, bool* is_blob_index) {
Status s;
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq = 0;
ReadOptions read_options;
SequenceNumber current_seq = versions_->LastSequence();
ColumnFamilyData* cfd = sv->cfd;
assert(cfd);
const Comparator* const ucmp = cfd->user_comparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
std::string ts_buf;
if (ts_sz > 0) {
assert(timestamp);
ts_buf.assign(ts_sz, '\xff');
} else {
assert(!timestamp);
}
Slice ts(ts_buf);
LookupKey lkey(key, current_seq, ts_sz == 0 ? nullptr : &ts);
*seq = kMaxSequenceNumber;
*found_record_for_key = false;
sv->mem->Get(lkey, nullptr, nullptr, timestamp, &s,
&merge_context, &max_covering_tombstone_seq, seq, read_options,
false , nullptr ,
is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Unexpected status returned from MemTable::Get: %s\n",
s.ToString().c_str());
return s;
}
assert(!ts_sz ||
(*seq != kMaxSequenceNumber &&
*timestamp != std::string(ts_sz, '\xff')) ||
(*seq == kMaxSequenceNumber && timestamp->empty()));
TEST_SYNC_POINT_CALLBACK("DBImpl::GetLatestSequenceForKey:mem", timestamp);
if (*seq != kMaxSequenceNumber) {
*found_record_for_key = true;
return Status::OK();
}
SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber();
if (lower_bound_in_mem != kMaxSequenceNumber &&
lower_bound_in_mem < lower_bound_seq) {
*found_record_for_key = false;
return Status::OK();
}
sv->imm->Get(lkey, nullptr, nullptr, timestamp, &s,
&merge_context, &max_covering_tombstone_seq, seq, read_options,
nullptr , is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Unexpected status returned from MemTableList::Get: %s\n",
s.ToString().c_str());
return s;
}
assert(!ts_sz ||
(*seq != kMaxSequenceNumber &&
*timestamp != std::string(ts_sz, '\xff')) ||
(*seq == kMaxSequenceNumber && timestamp->empty()));
if (*seq != kMaxSequenceNumber) {
*found_record_for_key = true;
return Status::OK();
}
SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber();
if (lower_bound_in_imm != kMaxSequenceNumber &&
lower_bound_in_imm < lower_bound_seq) {
*found_record_for_key = false;
return Status::OK();
}
sv->imm->GetFromHistory(lkey, nullptr, nullptr,
timestamp, &s, &merge_context,
&max_covering_tombstone_seq, seq, read_options,
is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
ROCKS_LOG_ERROR(
immutable_db_options_.info_log,
"Unexpected status returned from MemTableList::GetFromHistory: %s\n",
s.ToString().c_str());
return s;
}
assert(!ts_sz ||
(*seq != kMaxSequenceNumber &&
*timestamp != std::string(ts_sz, '\xff')) ||
(*seq == kMaxSequenceNumber && timestamp->empty()));
if (*seq != kMaxSequenceNumber) {
assert(0 == ts_sz || *timestamp != std::string(ts_sz, '\xff'));
*found_record_for_key = true;
return Status::OK();
}
if (!cache_only) {
PinnedIteratorsManager pinned_iters_mgr;
sv->current->Get(read_options, lkey, nullptr, nullptr,
timestamp, &s, &merge_context, &max_covering_tombstone_seq,
&pinned_iters_mgr, nullptr ,
found_record_for_key, seq, nullptr ,
is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Unexpected status returned from Version::Get: %s\n",
s.ToString().c_str());
}
}
return s;
}
Status DBImpl::IngestExternalFile(
ColumnFamilyHandle* column_family,
const std::vector<std::string>& external_files,
const IngestExternalFileOptions& ingestion_options) {
IngestExternalFileArg arg;
arg.column_family = column_family;
arg.external_files = external_files;
arg.options = ingestion_options;
return IngestExternalFiles({arg});
}
Status DBImpl::IngestExternalFiles(
const std::vector<IngestExternalFileArg>& args) {
PERF_TIMER_GUARD(file_ingestion_nanos);
const WriteOptions write_options;
if (args.empty()) {
return Status::InvalidArgument("ingestion arg list is empty");
}
{
std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
for (const auto& arg : args) {
if (arg.column_family == nullptr) {
return Status::InvalidArgument("column family handle is null");
} else if (unique_cfhs.count(arg.column_family) > 0) {
return Status::InvalidArgument(
"ingestion args have duplicate column families");
}
unique_cfhs.insert(arg.column_family);
}
}
const size_t num_cfs = args.size();
for (size_t i = 0; i != num_cfs; ++i) {
if (args[i].external_files.empty()) {
std::string err_msg =
"external_files[" + std::to_string(i) + "] is empty";
return Status::InvalidArgument(err_msg);
}
if (i && args[i].options.fill_cache != args[i - 1].options.fill_cache) {
return Status::InvalidArgument(
"fill_cache should be the same across ingestion options.");
}
}
for (const auto& arg : args) {
const IngestExternalFileOptions& ingest_opts = arg.options;
if (ingest_opts.ingest_behind) {
auto ucmp = arg.column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() > 0) {
return Status::NotSupported(
"Column family with user-defined "
"timestamps enabled doesn't support ingest behind.");
}
if (!static_cast<ColumnFamilyHandleImpl*>(arg.column_family)
->cfd()
->AllowIngestBehind()) {
return Status::InvalidArgument(
"Can't ingest_behind file in ColumnFamily %s with "
"cf_allow_ingest_behind=false");
}
}
if (arg.atomic_replace_range.has_value()) {
if (ingest_opts.ingest_behind) {
return Status::InvalidArgument(
"Can't combine atomic_replace_range with ingest_behind.");
}
if (ingest_opts.snapshot_consistency) {
return Status::NotSupported(
"atomic_replace_range not yet supported with "
"snapshot_consistency.");
} else {
if (arg.atomic_replace_range->start.has_value() ^
arg.atomic_replace_range->limit.has_value()) {
return Status::NotSupported(
"Only one of atomic_replace_range.{start,limit}.has_value() is "
"not supported.");
}
}
}
if (ingest_opts.allow_db_generated_files) {
if (ingest_opts.write_global_seqno) {
return Status::NotSupported(
"write_global_seqno is deprecated and does not work with "
"allow_db_generated_files.");
}
}
if (ingest_opts.move_files && ingest_opts.link_files) {
return Status::InvalidArgument(
"`move_files` and `link_files` can not both be true.");
}
}
std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
size_t total = 0;
for (const auto& arg : args) {
total += arg.external_files.size();
}
uint64_t next_file_number = 0;
Status status = ReserveFileNumbersBeforeIngestion(
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
pending_output_elem, &next_file_number);
if (!status.ok()) {
InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
return status;
}
std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
for (const auto& arg : args) {
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
ingestion_jobs.emplace_back(versions_.get(), cfd, immutable_db_options_,
mutable_db_options_, file_options_, &snapshots_,
arg.options, &directories_, &event_logger_,
io_tracer_);
}
uint64_t start_file_number = next_file_number;
for (size_t i = 1; i != num_cfs; ++i) {
start_file_number += args[i - 1].external_files.size();
SuperVersion* super_version =
ingestion_jobs[i].GetColumnFamilyData()->GetReferencedSuperVersion(
this);
Status es = ingestion_jobs[i].Prepare(
args[i].external_files, args[i].files_checksums,
args[i].files_checksum_func_names, args[i].atomic_replace_range,
args[i].file_temperature, start_file_number, super_version);
if (!es.ok() && status.ok()) {
status = es;
}
CleanupSuperVersion(super_version);
}
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
{
SuperVersion* super_version =
ingestion_jobs[0].GetColumnFamilyData()->GetReferencedSuperVersion(
this);
Status es = ingestion_jobs[0].Prepare(
args[0].external_files, args[0].files_checksums,
args[0].files_checksum_func_names, args[0].atomic_replace_range,
args[0].file_temperature, next_file_number, super_version);
if (!es.ok()) {
status = es;
}
CleanupSuperVersion(super_version);
}
if (!status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
ingestion_jobs[i].Cleanup(status);
}
InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
return status;
}
std::vector<SuperVersionContext> sv_ctxs;
for (size_t i = 0; i != num_cfs; ++i) {
sv_ctxs.emplace_back(true );
}
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
TEST_SYNC_POINT("DBImpl::AddFile:Start");
{
InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
PERF_TIMER_GUARD(file_ingestion_blocking_live_writes_nanos);
WaitForPendingWrites();
num_running_ingest_file_ += static_cast<int>(num_cfs);
TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter:2");
bool at_least_one_cf_need_flush = false;
std::vector<bool> need_flush(num_cfs, false);
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
if (cfd->IsDropped()) {
status = Status::InvalidArgument(
"cannot ingest an external file into a dropped CF");
break;
}
bool tmp = false;
status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
need_flush[i] = tmp;
at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
if (!status.ok()) {
break;
}
}
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
&at_least_one_cf_need_flush);
if (status.ok() && at_least_one_cf_need_flush) {
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
if (immutable_db_options_.atomic_flush) {
mutex_.Unlock();
status = AtomicFlushMemTables(
flush_opts, FlushReason::kExternalFileIngestion,
{} , true );
mutex_.Lock();
} else {
for (size_t i = 0; i != num_cfs; ++i) {
if (need_flush[i]) {
mutex_.Unlock();
status =
FlushMemTable(ingestion_jobs[i].GetColumnFamilyData(),
flush_opts, FlushReason::kExternalFileIngestion,
true );
mutex_.Lock();
if (!status.ok()) {
break;
}
}
}
}
if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
if (immutable_db_options_.atomic_flush || need_flush[i]) {
ingestion_jobs[i].SetFlushedBeforeRun();
}
}
}
}
if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
mutex_.AssertHeld();
status = ingestion_jobs[i].Run();
if (!status.ok()) {
break;
}
ingestion_jobs[i].RegisterRange();
}
}
if (status.ok()) {
ReadOptions read_options;
read_options.fill_cache = args[0].options.fill_cache;
autovector<ColumnFamilyData*> cfds_to_commit;
autovector<autovector<VersionEdit*>> edit_lists;
uint32_t num_entries = 0;
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
assert(!cfd->IsDropped());
cfds_to_commit.push_back(cfd);
autovector<VersionEdit*> edit_list;
edit_list.push_back(ingestion_jobs[i].edit());
edit_lists.push_back(edit_list);
++num_entries;
}
if (cfds_to_commit.size() > 1) {
for (auto& edits : edit_lists) {
assert(edits.size() == 1);
edits[0]->MarkAtomicGroup(--num_entries);
}
assert(0 == num_entries);
}
status =
versions_->LogAndApply(cfds_to_commit, read_options, write_options,
edit_lists, &mutex_, directories_.GetDbDir());
SequenceNumber max_assigned_seqno =
ingestion_jobs[0].MaxAssignedSequenceNumber();
for (size_t i = 1; i != num_cfs; ++i) {
max_assigned_seqno = std::max(
max_assigned_seqno, ingestion_jobs[i].MaxAssignedSequenceNumber());
}
if (max_assigned_seqno > 0) {
const SequenceNumber last_seqno = versions_->LastSequence();
if (max_assigned_seqno > last_seqno) {
versions_->SetLastAllocatedSequence(max_assigned_seqno);
versions_->SetLastPublishedSequence(max_assigned_seqno);
versions_->SetLastSequence(max_assigned_seqno);
}
}
}
for (auto& job : ingestion_jobs) {
job.UnregisterRange();
}
if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
assert(!cfd->IsDropped());
InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i]);
#ifndef NDEBUG
if (0 == i && num_cfs > 1) {
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
}
#endif }
} else if (versions_->io_status().IsIOError()) {
const IOStatus& io_s = versions_->io_status();
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
}
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);
PERF_TIMER_STOP(file_ingestion_blocking_live_writes_nanos);
if (status.ok()) {
for (auto& job : ingestion_jobs) {
job.UpdateStats();
}
}
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
num_running_ingest_file_ -= static_cast<int>(num_cfs);
if (0 == num_running_ingest_file_) {
bg_cv_.SignalAll();
}
TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
}
for (size_t i = 0; i != num_cfs; ++i) {
sv_ctxs[i].Clean();
ingestion_jobs[i].Cleanup(status);
}
if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd = ingestion_jobs[i].GetColumnFamilyData();
if (!cfd->IsDropped()) {
NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
}
}
}
return status;
}
Status DBImpl::CreateColumnFamilyWithImport(
const ColumnFamilyOptions& options, const std::string& column_family_name,
const ImportColumnFamilyOptions& import_options,
const std::vector<const ExportImportFilesMetaData*>& metadatas,
ColumnFamilyHandle** handle) {
assert(handle != nullptr);
assert(*handle == nullptr);
const ReadOptions read_options;
const WriteOptions write_options;
std::string cf_comparator_name = options.comparator->Name();
size_t total_file_num = 0;
std::vector<std::vector<LiveFileMetaData*>> metadata_files(metadatas.size());
for (size_t i = 0; i < metadatas.size(); i++) {
if (cf_comparator_name != metadatas[i]->db_comparator_name) {
return Status::InvalidArgument("Comparator name mismatch");
}
for (auto& file : metadatas[i]->files) {
metadata_files[i].push_back((LiveFileMetaData*)&file);
}
total_file_num += metadatas[i]->files.size();
}
auto status = CreateColumnFamily(read_options, write_options, options,
column_family_name, handle);
if (!status.ok()) {
return status;
}
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(*handle);
auto cfd = cfh->cfd();
ImportColumnFamilyJob import_job(versions_.get(), cfd, immutable_db_options_,
file_options_, import_options,
metadata_files, io_tracer_);
SuperVersionContext dummy_sv_ctx( true);
VersionEdit dummy_edit;
uint64_t next_file_number = 0;
std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
{
InstrumentedMutexLock l(&mutex_);
if (error_handler_.IsDBStopped()) {
status = error_handler_.GetBGError();
}
pending_output_elem.reset(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
if (status.ok()) {
next_file_number = versions_->FetchAddFileNumber(total_file_num);
status =
versions_->LogAndApply(cfd, read_options, write_options, &dummy_edit,
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionForConfigChange(cfd, &dummy_sv_ctx);
}
}
}
dummy_sv_ctx.Clean();
if (status.ok()) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
status = import_job.Prepare(next_file_number, sv);
CleanupSuperVersion(sv);
}
if (status.ok()) {
SuperVersionContext sv_context(true );
{
InstrumentedMutexLock l(&mutex_);
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
num_running_ingest_file_++;
assert(!cfd->IsDropped());
mutex_.AssertHeld();
status = import_job.Run();
if (status.ok()) {
status = versions_->LogAndApply(cfd, read_options, write_options,
import_job.edit(), &mutex_,
directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionForConfigChange(cfd, &sv_context);
}
}
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);
num_running_ingest_file_--;
if (num_running_ingest_file_ == 0) {
bg_cv_.SignalAll();
}
}
sv_context.Clean();
}
{
InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
}
import_job.Cleanup(status);
if (!status.ok()) {
Status temp_s = DropColumnFamily(*handle);
if (!temp_s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"DropColumnFamily failed with error %s",
temp_s.ToString().c_str());
}
temp_s = DestroyColumnFamilyHandle(*handle);
assert(temp_s.ok());
*handle = nullptr;
}
return status;
}
Status DBImpl::ClipColumnFamily(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) {
assert(column_family);
Status status;
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
if (immutable_db_options_.atomic_flush) {
status = AtomicFlushMemTables(flush_opts, FlushReason::kDeleteFiles,
{} ,
false );
} else {
status = FlushMemTable(cfd, flush_opts, FlushReason::kDeleteFiles,
false );
}
if (status.ok()) {
std::vector<RangeOpt> ranges;
ranges.emplace_back(OptSlice{}, begin_key);
ranges.emplace_back(end_key, OptSlice{});
status = DeleteFilesInRanges(column_family, ranges.data(), ranges.size());
}
bool empty_after_delete = false;
if (status.ok()) {
Slice smallest_user_key, largest_user_key;
{
InstrumentedMutexLock l(&mutex_);
cfd->current()->GetSstFilesBoundaryKeys(&smallest_user_key,
&largest_user_key);
}
if (smallest_user_key.empty() && largest_user_key.empty()) {
empty_after_delete = true;
} else {
const Comparator* const ucmp = column_family->GetComparator();
WriteOptions wo;
if (ucmp->Compare(smallest_user_key, begin_key) < 0) {
status = DeleteRange(wo, column_family, smallest_user_key, begin_key);
}
if (status.ok()) {
if (ucmp->Compare(end_key, largest_user_key) <= 0) {
status = DeleteRange(wo, column_family, end_key, largest_user_key);
if (status.ok()) {
status = Delete(wo, column_family, largest_user_key);
}
}
}
}
}
if (status.ok() && !empty_after_delete) {
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = true;
compact_options.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
status = CompactRange(compact_options, column_family, nullptr, nullptr);
}
return status;
}
Status DBImpl::VerifyFileChecksums(const ReadOptions& _read_options) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kVerifyFileChecksums) {
return Status::InvalidArgument(
"Can only call VerifyFileChecksums with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or "
"`Env::IOActivity::kVerifyFileChecksums`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kVerifyFileChecksums;
}
return VerifyChecksumInternal(read_options,
true);
}
Status DBImpl::VerifyChecksum(const ReadOptions& _read_options) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kVerifyDBChecksum) {
return Status::InvalidArgument(
"Can only call VerifyChecksum with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kVerifyDBChecksum`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kVerifyDBChecksum;
}
return VerifyChecksumInternal(read_options,
false);
}
Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
bool use_file_checksum) {
uint64_t prev_bytes_read = IOSTATS(bytes_read);
Status s;
if (use_file_checksum) {
FileChecksumGenFactory* const file_checksum_gen_factory =
immutable_db_options_.file_checksum_gen_factory.get();
if (!file_checksum_gen_factory) {
s = Status::InvalidArgument(
"Cannot verify file checksum if options.file_checksum_gen_factory is "
"null");
return s;
}
}
std::vector<ColumnFamilyData*> cfd_list;
{
InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->initialized()) {
cfd->Ref();
cfd_list.push_back(cfd);
}
}
}
std::vector<SuperVersion*> sv_list;
for (auto cfd : cfd_list) {
sv_list.push_back(cfd->GetReferencedSuperVersion(this));
}
for (auto& sv : sv_list) {
VersionStorageInfo* vstorage = sv->current->storage_info();
ColumnFamilyData* cfd = sv->current->cfd();
Options opts;
if (!use_file_checksum) {
InstrumentedMutexLock l(&mutex_);
opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
cfd->GetLatestCFOptions());
}
for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
j++) {
const auto& fd_with_krange = vstorage->LevelFilesBrief(i).files[j];
const auto& fd = fd_with_krange.fd;
const FileMetaData* fmeta = fd_with_krange.file_metadata;
assert(fmeta);
std::string fname = TableFileName(cfd->ioptions().cf_paths,
fd.GetNumber(), fd.GetPathId());
if (use_file_checksum) {
s = VerifyFullFileChecksum(fmeta->file_checksum,
fmeta->file_checksum_func_name, fname,
read_options);
} else {
FileOptions fopts = file_options_;
fopts.file_checksum = fmeta->file_checksum;
fopts.file_checksum_func_name = fmeta->file_checksum_func_name;
s = ROCKSDB_NAMESPACE::VerifySstFileChecksumInternal(
opts, fopts, read_options, fname, fd.largest_seqno);
}
RecordTick(stats_, VERIFY_CHECKSUM_READ_BYTES,
IOSTATS(bytes_read) - prev_bytes_read);
prev_bytes_read = IOSTATS(bytes_read);
}
}
if (s.ok() && use_file_checksum) {
const auto& blob_files = vstorage->GetBlobFiles();
for (const auto& meta : blob_files) {
assert(meta);
const uint64_t blob_file_number = meta->GetBlobFileNumber();
const std::string blob_file_name = BlobFileName(
cfd->ioptions().cf_paths.front().path, blob_file_number);
s = VerifyFullFileChecksum(meta->GetChecksumValue(),
meta->GetChecksumMethod(), blob_file_name,
read_options);
RecordTick(stats_, VERIFY_CHECKSUM_READ_BYTES,
IOSTATS(bytes_read) - prev_bytes_read);
prev_bytes_read = IOSTATS(bytes_read);
if (!s.ok()) {
break;
}
}
}
if (!s.ok()) {
break;
}
}
bool defer_purge = immutable_db_options().avoid_unnecessary_blocking_io;
{
InstrumentedMutexLock l(&mutex_);
for (auto sv : sv_list) {
if (sv && sv->Unref()) {
sv->Cleanup();
if (defer_purge) {
AddSuperVersionsToFreeQueue(sv);
} else {
delete sv;
}
}
}
if (defer_purge) {
SchedulePurge();
}
for (auto cfd : cfd_list) {
cfd->UnrefAndTryDelete();
}
}
RecordTick(stats_, VERIFY_CHECKSUM_READ_BYTES,
IOSTATS(bytes_read) - prev_bytes_read);
return s;
}
Status DBImpl::VerifyFullFileChecksum(const std::string& file_checksum_expected,
const std::string& func_name_expected,
const std::string& fname,
const ReadOptions& read_options) {
Status s;
if (file_checksum_expected == kUnknownFileChecksum) {
return s;
}
std::string file_checksum;
std::string func_name;
FileOptions fopts;
fopts.file_checksum = file_checksum_expected;
fopts.file_checksum_func_name = func_name_expected;
s = ROCKSDB_NAMESPACE::GenerateOneFileChecksum(
fs_.get(), fname, immutable_db_options_.file_checksum_gen_factory.get(),
func_name_expected, &file_checksum, &func_name,
read_options.readahead_size, immutable_db_options_.allow_mmap_reads,
io_tracer_, immutable_db_options_.rate_limiter.get(), read_options,
immutable_db_options_.stats, immutable_db_options_.clock, fopts);
if (s.ok()) {
assert(func_name_expected == func_name);
if (file_checksum != file_checksum_expected) {
std::ostringstream oss;
oss << fname << " file checksum mismatch, ";
oss << "expecting "
<< Slice(file_checksum_expected).ToString(true);
oss << ", but actual " << Slice(file_checksum).ToString(true);
s = Status::Corruption(oss.str());
TEST_SYNC_POINT_CALLBACK("DBImpl::VerifyFullFileChecksum:mismatch", &s);
}
}
return s;
}
void DBImpl::NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
if (immutable_db_options_.listeners.empty()) {
return;
}
for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
ExternalFileIngestionInfo info;
info.cf_name = cfd->GetName();
info.external_file_path = f.external_file_path;
info.internal_file_path = f.internal_file_path;
info.global_seqno = f.assigned_seqno;
info.table_properties = f.table_properties;
for (const auto& listener : immutable_db_options_.listeners) {
listener->OnExternalFileIngested(this, info);
}
}
}
Status DBImpl::StartTrace(const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
InstrumentedMutexLock lock(&trace_mutex_);
tracer_.reset(new Tracer(immutable_db_options_.clock, trace_options,
std::move(trace_writer)));
return Status::OK();
}
Status DBImpl::EndTrace() {
InstrumentedMutexLock lock(&trace_mutex_);
Status s;
if (tracer_ != nullptr) {
s = tracer_->Close();
tracer_.reset();
} else {
s = Status::IOError("No trace file to close");
}
return s;
}
Status DBImpl::NewDefaultReplayer(
const std::vector<ColumnFamilyHandle*>& handles,
std::unique_ptr<TraceReader>&& reader,
std::unique_ptr<Replayer>* replayer) {
replayer->reset(new ReplayerImpl(this, handles, std::move(reader)));
return Status::OK();
}
Status DBImpl::StartBlockCacheTrace(
const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
BlockCacheTraceOptions block_trace_opts;
block_trace_opts.sampling_frequency = trace_options.sampling_frequency;
BlockCacheTraceWriterOptions trace_writer_opt;
trace_writer_opt.max_trace_file_size = trace_options.max_trace_file_size;
std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer =
NewBlockCacheTraceWriter(env_->GetSystemClock().get(), trace_writer_opt,
std::move(trace_writer));
return block_cache_tracer_.StartTrace(block_trace_opts,
std::move(block_cache_trace_writer));
}
Status DBImpl::StartBlockCacheTrace(
const BlockCacheTraceOptions& trace_options,
std::unique_ptr<BlockCacheTraceWriter>&& trace_writer) {
return block_cache_tracer_.StartTrace(trace_options, std::move(trace_writer));
}
Status DBImpl::EndBlockCacheTrace() {
block_cache_tracer_.EndTrace();
return Status::OK();
}
Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound,
const Slice upper_bound) {
Status s;
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
s = tracer_->IteratorSeek(cf_id, key, lower_bound, upper_bound);
}
}
return s;
}
Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound,
const Slice upper_bound) {
Status s;
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
s = tracer_->IteratorSeekForPrev(cf_id, key, lower_bound, upper_bound);
}
}
return s;
}
Status DBImpl::ReserveFileNumbersBeforeIngestion(
ColumnFamilyData* cfd, uint64_t num,
std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
uint64_t* next_file_number) {
const ReadOptions read_options;
const WriteOptions write_options;
Status s;
SuperVersionContext dummy_sv_ctx(true );
assert(nullptr != next_file_number);
InstrumentedMutexLock l(&mutex_);
if (error_handler_.IsDBStopped()) {
return error_handler_.GetBGError();
}
pending_output_elem.reset(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
*next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
VersionEdit dummy_edit;
s = versions_->LogAndApply(cfd, read_options, write_options, &dummy_edit,
&mutex_, directories_.GetDbDir());
if (s.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx);
}
dummy_sv_ctx.Clean();
return s;
}
Status DBImpl::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
if (mutable_db_options_.max_open_files == -1) {
uint64_t oldest_time = std::numeric_limits<uint64_t>::max();
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped()) {
uint64_t ctime;
{
SuperVersion* sv = GetAndRefSuperVersion(cfd);
Version* version = sv->current;
version->GetCreationTimeOfOldestFile(&ctime);
ReturnAndCleanupSuperVersion(cfd, sv);
}
if (ctime < oldest_time) {
oldest_time = ctime;
}
if (oldest_time == 0) {
break;
}
}
}
*creation_time = oldest_time;
return Status::OK();
} else {
return Status::NotSupported("This API only works if max_open_files = -1");
}
}
std::pair<SequenceNumber, uint64_t> DBImpl::GetSeqnoToTimeSample() const {
mutex_.AssertHeld();
SequenceNumber seqno = GetLatestSequenceNumber();
seqno = std::max(seqno, SequenceNumber{1});
int64_t unix_time_signed = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time_signed)
.PermitUncheckedError(); return {seqno, static_cast<uint64_t>(unix_time_signed)};
}
void DBImpl::EnsureSeqnoToTimeMapping(
const MinAndMaxPreserveSeconds& preserve_info) {
mutex_.AssertHeld();
assert(preserve_info.IsEnabled());
auto [seqno, unix_time_now] = GetSeqnoToTimeSample();
uint64_t unix_time_last_sample = 0;
if (seqno_to_time_mapping_.Empty()) {
seqno_to_time_mapping_.SetCapacity(kMaxSeqnoToTimeEntries);
} else {
unix_time_last_sample =
seqno_to_time_mapping_.GetProximalTimeBeforeSeqno(kMaxSequenceNumber);
}
uint64_t cadence = preserve_info.GetRecodingCadence();
cadence += 3 + cadence / 100;
if (unix_time_now >= cadence &&
unix_time_last_sample <= unix_time_now - cadence) {
assert(seqno > 0); seqno_to_time_mapping_.Append(seqno, unix_time_now);
}
}
void DBImpl::PrepopulateSeqnoToTimeMapping(
const MinAndMaxPreserveSeconds& preserve_info) {
if (!preserve_info.IsEnabled()) {
assert(false);
return;
}
if (GetLatestSequenceNumber() != 0) {
assert(false);
return;
}
constexpr uint64_t kMax = kMaxSeqnoTimePairsPerSST;
versions_->SetLastAllocatedSequence(kMax);
versions_->SetLastPublishedSequence(kMax);
versions_->SetLastSequence(kMax);
const WriteOptions write_options(Env::IOActivity::kDBOpen);
const ReadOptions read_options(Env::IOActivity::kDBOpen);
VersionEdit edit;
edit.SetLastSequence(kMax);
Status s = versions_->LogAndApplyToDefaultColumnFamily(
read_options, write_options, &edit, &mutex_, directories_.GetDbDir());
if (!s.ok() && versions_->io_status().IsIOError()) {
error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
auto [seqno, unix_time_now] = GetSeqnoToTimeSample();
uint64_t populate_historical_seconds = preserve_info.max_preserve_seconds;
if (seqno > 1 && unix_time_now > populate_historical_seconds) {
SequenceNumber from_seqno = 1;
seqno_to_time_mapping_.PrePopulate(
from_seqno, seqno, unix_time_now - populate_historical_seconds,
unix_time_now);
} else {
assert(seqno > 1);
assert(unix_time_now > populate_historical_seconds);
}
}
void DBImpl::InstallSuperVersionForConfigChange(
ColumnFamilyData* cfd, SuperVersionContext* sv_context) {
MinAndMaxPreserveSeconds preserve_info{cfd->GetLatestCFOptions()};
std::shared_ptr<SeqnoToTimeMapping> new_seqno_to_time_mapping;
if (preserve_info.IsEnabled()) {
EnsureSeqnoToTimeMapping(preserve_info);
new_seqno_to_time_mapping = std::make_shared<SeqnoToTimeMapping>();
new_seqno_to_time_mapping->CopyFrom(seqno_to_time_mapping_);
}
InstallSuperVersionAndScheduleWork(cfd, sv_context,
std::move(new_seqno_to_time_mapping));
}
void DBImpl::RecordSeqnoToTimeMapping() {
SuperVersionContext sv_context;
{
InstrumentedMutexLock l(&mutex_);
seqno_to_time_mapping_.Append(GetSeqnoToTimeSample());
std::shared_ptr<SeqnoToTimeMapping> new_seqno_to_time_mapping =
std::make_shared<SeqnoToTimeMapping>();
new_seqno_to_time_mapping->CopyFrom(seqno_to_time_mapping_);
for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
MinAndMaxPreserveSeconds preserve_info{cfd->GetLatestCFOptions()};
if (preserve_info.IsEnabled()) {
sv_context.NewSuperVersion();
cfd->InstallSuperVersion(&sv_context, &mutex_,
new_seqno_to_time_mapping);
}
}
bg_cv_.SignalAll();
}
sv_context.Clean();
}
void DBImpl::TriggerPeriodicCompaction() {
TEST_SYNC_POINT("DBImpl::TriggerPeriodicCompaction:StartRunning");
{
InstrumentedMutexLock l(&mutex_);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Running the periodic task to trigger compactions.");
for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->GetLatestCFOptions().periodic_compaction_seconds &&
!cfd->queued_for_compaction()) {
cfd->current()->storage_info()->ComputeCompactionScore(
cfd->ioptions(), cfd->GetLatestMutableCFOptions(),
cfd->GetFullHistoryTsLow());
EnqueuePendingCompaction(cfd);
if (cfd->queued_for_compaction()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Periodic task to trigger compaction queued Column "
"family [%s] for compaction.",
cfd->GetName().c_str());
}
}
}
MaybeScheduleFlushOrCompaction();
bg_cv_.SignalAll();
}
}
void DBImpl::TrackOrUntrackFiles(
const std::vector<std::string>& existing_data_files, bool track) {
auto sfm = static_cast_with_check<SstFileManagerImpl>(
immutable_db_options_.sst_file_manager.get());
assert(sfm);
std::vector<ColumnFamilyMetaData> metadata;
GetAllColumnFamilyMetaData(&metadata);
auto action = [&](const std::string& file_path,
std::optional<uint64_t> size) {
if (track) {
if (size) {
sfm->OnAddFile(file_path, *size).PermitUncheckedError();
} else {
sfm->OnAddFile(file_path).PermitUncheckedError();
}
} else {
sfm->OnUntrackFile(file_path).PermitUncheckedError();
}
};
std::unordered_set<std::string> referenced_files;
for (const auto& md : metadata) {
for (const auto& lmd : md.levels) {
for (const auto& fmd : lmd.files) {
std::string file_path =
fmd.directory + kFilePathSeparator + fmd.relative_filename;
action(file_path, fmd.size);
referenced_files.insert(file_path);
}
}
for (const auto& bmd : md.blob_files) {
std::string name = bmd.blob_file_name;
if (!name.empty() && name[0] == kFilePathSeparator) {
name = name.substr(1);
}
std::string file_path = bmd.blob_file_path + kFilePathSeparator + name;
action(file_path, bmd.blob_file_size);
referenced_files.insert(file_path);
}
}
for (const auto& file_path : existing_data_files) {
if (referenced_files.find(file_path) != referenced_files.end()) {
continue;
}
action(file_path, std::nullopt);
}
}
}