#ifndef U_QUEUE_H
#define U_QUEUE_H
#include <string.h>
#include "util/futex.h"
#include "util/list.h"
#include "util/macros.h"
#include "util/os_time.h"
#include "util/u_atomic.h"
#include "util/u_thread.h"
#ifdef __cplusplus
extern "C" {
#endif
#define UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY (1 << 0)
#define UTIL_QUEUE_INIT_RESIZE_IF_FULL (1 << 1)
#define UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY (1 << 2)
#if UTIL_FUTEX_SUPPORTED
#define UTIL_QUEUE_FENCE_FUTEX
#else
#define UTIL_QUEUE_FENCE_STANDARD
#endif
#ifdef UTIL_QUEUE_FENCE_FUTEX
struct util_queue_fence {
uint32_t val;
};
static inline void
util_queue_fence_init(struct util_queue_fence *fence)
{
fence->val = 0;
}
static inline void
util_queue_fence_destroy(struct util_queue_fence *fence)
{
assert(fence->val == 0);
}
static inline void
util_queue_fence_signal(struct util_queue_fence *fence)
{
uint32_t val = p_atomic_xchg(&fence->val, 0);
assert(val != 0);
if (val == 2)
futex_wake(&fence->val, INT_MAX);
}
static inline void
util_queue_fence_reset(struct util_queue_fence *fence)
{
#ifdef NDEBUG
fence->val = 1;
#else
uint32_t v = p_atomic_xchg(&fence->val, 1);
assert(v == 0);
#endif
}
static inline bool
util_queue_fence_is_signalled(struct util_queue_fence *fence)
{
return fence->val == 0;
}
#endif
#ifdef UTIL_QUEUE_FENCE_STANDARD
struct util_queue_fence {
mtx_t mutex;
cnd_t cond;
int signalled;
};
void util_queue_fence_init(struct util_queue_fence *fence);
void util_queue_fence_destroy(struct util_queue_fence *fence);
void util_queue_fence_signal(struct util_queue_fence *fence);
static inline void
util_queue_fence_reset(struct util_queue_fence *fence)
{
assert(fence->signalled);
fence->signalled = 0;
}
static inline bool
util_queue_fence_is_signalled(struct util_queue_fence *fence)
{
return fence->signalled != 0;
}
#endif
void
_util_queue_fence_wait(struct util_queue_fence *fence);
static inline void
util_queue_fence_wait(struct util_queue_fence *fence)
{
if (unlikely(!util_queue_fence_is_signalled(fence)))
_util_queue_fence_wait(fence);
}
bool
_util_queue_fence_wait_timeout(struct util_queue_fence *fence,
int64_t abs_timeout);
static inline bool
util_queue_fence_wait_timeout(struct util_queue_fence *fence,
int64_t abs_timeout)
{
if (util_queue_fence_is_signalled(fence))
return true;
if (abs_timeout == (int64_t)OS_TIMEOUT_INFINITE) {
_util_queue_fence_wait(fence);
return true;
}
return _util_queue_fence_wait_timeout(fence, abs_timeout);
}
typedef void (*util_queue_execute_func)(void *job, int thread_index);
struct util_queue_job {
void *job;
size_t job_size;
struct util_queue_fence *fence;
util_queue_execute_func execute;
util_queue_execute_func cleanup;
};
struct util_queue {
char name[14];
mtx_t finish_lock;
mtx_t lock;
cnd_t has_queued_cond;
cnd_t has_space_cond;
thrd_t *threads;
unsigned flags;
int num_queued;
unsigned max_threads;
unsigned num_threads;
int max_jobs;
int write_idx, read_idx;
size_t total_jobs_size;
struct util_queue_job *jobs;
struct list_head head;
};
bool util_queue_init(struct util_queue *queue,
const char *name,
unsigned max_jobs,
unsigned num_threads,
unsigned flags);
void util_queue_destroy(struct util_queue *queue);
void util_queue_add_job(struct util_queue *queue,
void *job,
struct util_queue_fence *fence,
util_queue_execute_func execute,
util_queue_execute_func cleanup,
const size_t job_size);
void util_queue_drop_job(struct util_queue *queue,
struct util_queue_fence *fence);
void util_queue_finish(struct util_queue *queue);
void
util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads);
int64_t util_queue_get_thread_time_nano(struct util_queue *queue,
unsigned thread_index);
static inline bool
util_queue_is_initialized(struct util_queue *queue)
{
return queue->threads != NULL;
}
struct util_queue_monitoring
{
struct util_queue *queue;
unsigned num_offloaded_items;
unsigned num_direct_items;
unsigned num_syncs;
};
#ifdef __cplusplus
}
#endif
#endif