#pragma once
#include "rocksdb/env.h"
#include "util/thread_status_util.h"
namespace rocksdb {
class ThreadPool {
public:
ThreadPool();
~ThreadPool();
void JoinAllThreads();
void LowerIOPriority();
void BGThread(size_t thread_id);
void WakeUpAllThreads();
void IncBackgroundThreadsIfNeeded(int num);
void SetBackgroundThreads(int num);
void StartBGThreads();
void Schedule(void (*function)(void* arg1), void* arg, void* tag,
void (*unschedFunction)(void* arg));
int UnSchedule(void* arg);
unsigned int GetQueueLen() const {
return queue_len_.load(std::memory_order_relaxed);
}
void SetHostEnv(Env* env) { env_ = env; }
Env* GetHostEnv() { return env_; }
bool HasExcessiveThread() {
return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
}
bool IsLastExcessiveThread(size_t thread_id) {
return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
}
bool IsExcessiveThread(size_t thread_id) {
return static_cast<int>(thread_id) >= total_threads_limit_;
}
Env::Priority GetThreadPriority() { return priority_; }
void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
static void PthreadCall(const char* label, int result);
private:
struct BGItem {
void* arg;
void (*function)(void*);
void* tag;
void (*unschedFunction)(void*);
};
typedef std::deque<BGItem> BGQueue;
pthread_mutex_t mu_;
pthread_cond_t bgsignal_;
int total_threads_limit_;
std::vector<pthread_t> bgthreads_;
BGQueue queue_;
std::atomic_uint queue_len_; bool exit_all_threads_;
bool low_io_priority_;
Env::Priority priority_;
Env* env_;
void SetBackgroundThreadsInternal(int num, bool allow_reduce);
};
}