#include "threadpool.hpp"
#include <cstdlib>
namespace {
void joinThreadPoolInstance() { rtc::ThreadPool::Instance().join(); }
}
namespace rtc {
ThreadPool &ThreadPool::Instance() {
static ThreadPool *instance = new ThreadPool;
return *instance;
}
ThreadPool::ThreadPool() { std::atexit(joinThreadPoolInstance); }
ThreadPool::~ThreadPool() {}
int ThreadPool::count() const {
std::unique_lock lock(mWorkersMutex);
return int(mWorkers.size());
}
void ThreadPool::spawn(int count) {
std::unique_lock lock(mWorkersMutex);
while (count-- > 0)
mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
}
void ThreadPool::join() {
{
std::unique_lock lock(mMutex);
mWaitingCondition.wait(lock, [&]() { return mWaitingWorkers == int(mWorkers.size()); });
mJoining = true;
mTasksCondition.notify_all();
}
std::unique_lock lock(mWorkersMutex);
for (auto &w : mWorkers)
w.join();
mWorkers.clear();
mJoining = false;
}
void ThreadPool::run() {
while (runOne()) {
}
}
bool ThreadPool::runOne() {
if (auto task = dequeue()) {
task();
return true;
}
return false;
}
std::function<void()> ThreadPool::dequeue() {
std::unique_lock lock(mMutex);
while (!mJoining) {
if (!mTasks.empty()) {
if (mTasks.top().time <= clock::now()) {
auto func = std::move(mTasks.top().func);
mTasks.pop();
return func;
}
++mWaitingWorkers;
mWaitingCondition.notify_all();
mTasksCondition.wait_until(lock, mTasks.top().time);
} else {
++mWaitingWorkers;
mWaitingCondition.notify_all();
mTasksCondition.wait(lock);
}
--mWaitingWorkers;
}
return nullptr;
}
}