#ifndef LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_
#define LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_
#include <jxl/memory_manager.h>
#include <jxl/parallel_runner.h>
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <mutex>
#include <thread>
#include <vector>
namespace jpegxl {
class ThreadParallelRunner {
public:
static JxlParallelRetCode Runner(void* runner_opaque, void* jpegxl_opaque,
JxlParallelRunInit init,
JxlParallelRunFunction func,
uint32_t start_range, uint32_t end_range);
explicit ThreadParallelRunner(
int num_worker_threads = std::thread::hardware_concurrency());
~ThreadParallelRunner();
size_t NumThreads() const { return num_threads_; }
template <class Func>
void RunOnEachThread(const Func& func) {
if (num_worker_threads_ == 0) {
const int thread = 0;
func(thread, thread);
return;
}
data_func_ = reinterpret_cast<JxlParallelRunFunction>(&CallClosure<Func>);
jpegxl_opaque_ = const_cast<void*>(static_cast<const void*>(&func));
StartWorkers(kWorkerOnce);
WorkersReadyBarrier();
}
JxlMemoryManager memory_manager;
private:
using WorkerCommand = uint64_t;
static constexpr WorkerCommand kWorkerWait = ~1ULL;
static constexpr WorkerCommand kWorkerOnce = ~2ULL;
static constexpr WorkerCommand kWorkerExit = ~3ULL;
template <class Closure>
static void CallClosure(void* f, const uint32_t task, const size_t thread) {
(*reinterpret_cast<const Closure*>(f))(task, thread);
}
void WorkersReadyBarrier() {
std::unique_lock<std::mutex> lock(mutex_);
while (workers_ready_ != threads_.size()) {
workers_ready_cv_.wait(lock);
}
workers_ready_ = 0;
worker_start_command_ = kWorkerWait;
}
void StartWorkers(const WorkerCommand worker_command) {
mutex_.lock();
worker_start_command_ = worker_command;
mutex_.unlock();
worker_start_cv_.notify_all();
}
static void RunRange(ThreadParallelRunner* self, WorkerCommand command,
int thread);
static void ThreadFunc(ThreadParallelRunner* self, int thread);
std::vector<std::thread> threads_;
const uint32_t num_worker_threads_; const uint32_t num_threads_;
std::atomic<int> depth_{0};
std::mutex mutex_; std::condition_variable workers_ready_cv_;
uint32_t workers_ready_ = 0;
std::condition_variable worker_start_cv_;
WorkerCommand worker_start_command_;
JxlParallelRunFunction data_func_;
void* jpegxl_opaque_;
uint8_t padding1[64];
std::atomic<uint32_t> num_reserved_{0};
uint8_t padding2[64];
};
}
#endif