#include "db/flush_scheduler.h"
#include <cassert>
#include "db/column_family.h"
namespace ROCKSDB_NAMESPACE {
void FlushScheduler::ScheduleWork(ColumnFamilyData* cfd) {
#ifndef NDEBUG
{
std::lock_guard<std::mutex> lock(checking_mutex_);
assert(checking_set_.count(cfd) == 0);
checking_set_.insert(cfd);
}
#endif cfd->Ref();
#ifndef __clang_analyzer__
Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
while (!head_.compare_exchange_strong(
node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
}
#endif }
ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
while (true) {
if (head_.load(std::memory_order_relaxed) == nullptr) {
return nullptr;
}
Node* node = head_.load(std::memory_order_relaxed);
head_.store(node->next, std::memory_order_relaxed);
ColumnFamilyData* cfd = node->column_family;
delete node;
#ifndef NDEBUG
{
std::lock_guard<std::mutex> lock(checking_mutex_);
auto iter = checking_set_.find(cfd);
assert(iter != checking_set_.end());
checking_set_.erase(iter);
}
#endif
if (!cfd->IsDropped()) {
return cfd;
}
cfd->UnrefAndTryDelete();
}
}
bool FlushScheduler::Empty() {
auto rv = head_.load(std::memory_order_relaxed) == nullptr;
#ifndef NDEBUG
std::lock_guard<std::mutex> lock(checking_mutex_);
assert((rv == checking_set_.empty()) || rv);
#endif return rv;
}
void FlushScheduler::Clear() {
ColumnFamilyData* cfd;
while ((cfd = TakeNextColumnFamily()) != nullptr) {
cfd->UnrefAndTryDelete();
}
assert(head_.load(std::memory_order_relaxed) == nullptr);
}
}