#include "orconfig.h"
#include "lib/evloop/compat_libevent.h"
#include "lib/evloop/workqueue.h"
#include "lib/crypt_ops/crypto_rand.h"
#include "lib/intmath/weakrng.h"
#include "lib/log/ratelim.h"
#include "lib/log/log.h"
#include "lib/log/util_bug.h"
#include "lib/net/alertsock.h"
#include "lib/net/socket.h"
#include "lib/thread/threads.h"
#include "ext/tor_queue.h"
#include <event2/event.h>
#include <string.h>
#define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
#define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
#define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_t);
typedef struct work_tailq_t work_tailq_t;
struct threadpool_t {
struct workerthread_t **threads;
tor_cond_t condition;
work_tailq_t work[WORKQUEUE_N_PRIORITIES];
unsigned generation;
workqueue_reply_t (*update_fn)(void *, void *);
void (*free_update_arg_fn)(void *);
void **update_args;
struct event *reply_event;
void (*reply_cb)(threadpool_t *);
int n_threads;
tor_mutex_t lock;
replyqueue_t *reply_queue;
void *(*new_thread_state_fn)(void*);
void (*free_thread_state_fn)(void*);
void *new_thread_state_arg;
};
#define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
#define WORKQUEUE_PRIORITY_BITS 2
struct workqueue_entry_t {
TOR_TAILQ_ENTRY(workqueue_entry_t) next_work;
struct threadpool_t *on_pool;
uint8_t pending;
workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS;
workqueue_reply_t (*fn)(void *state, void *arg);
void (*reply_fn)(void *arg);
void *arg;
};
struct replyqueue_t {
tor_mutex_t lock;
TOR_TAILQ_HEAD(, workqueue_entry_t) answers;
alert_sockets_t alert;
};
typedef struct workerthread_t {
int index;
struct threadpool_t *in_pool;
void *state;
replyqueue_t *reply_queue;
unsigned generation;
int32_t lower_priority_chance;
} workerthread_t;
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
static workqueue_entry_t *
workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
void (*reply_fn)(void*),
void *arg)
{
workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
ent->fn = fn;
ent->reply_fn = reply_fn;
ent->arg = arg;
ent->priority = WQ_PRI_HIGH;
return ent;
}
#define workqueue_entry_free(ent) \
FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
static void
workqueue_entry_free_(workqueue_entry_t *ent)
{
if (!ent)
return;
memset(ent, 0xf0, sizeof(*ent));
tor_free(ent);
}
void *
workqueue_entry_cancel(workqueue_entry_t *ent)
{
int cancelled = 0;
void *result = NULL;
tor_mutex_acquire(&ent->on_pool->lock);
workqueue_priority_t prio = ent->priority;
if (ent->pending) {
TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
cancelled = 1;
result = ent->arg;
}
tor_mutex_release(&ent->on_pool->lock);
if (cancelled) {
workqueue_entry_free(ent);
}
return result;
}
static int
worker_thread_has_work(workerthread_t *thread)
{
unsigned i;
for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
return 1;
}
return thread->generation != thread->in_pool->generation;
}
static workqueue_entry_t *
worker_thread_extract_next_work(workerthread_t *thread)
{
threadpool_t *pool = thread->in_pool;
work_tailq_t *queue = NULL, *this_queue;
unsigned i;
for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
this_queue = &pool->work[i];
if (!TOR_TAILQ_EMPTY(this_queue)) {
queue = this_queue;
if (! crypto_fast_rng_one_in_n(get_thread_fast_rng(),
thread->lower_priority_chance)) {
break;
}
}
}
if (queue == NULL)
return NULL;
workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
TOR_TAILQ_REMOVE(queue, work, next_work);
work->pending = 0;
return work;
}
static void
worker_thread_main(void *thread_)
{
workerthread_t *thread = thread_;
threadpool_t *pool = thread->in_pool;
workqueue_entry_t *work;
workqueue_reply_t result;
tor_mutex_acquire(&pool->lock);
while (1) {
while (worker_thread_has_work(thread)) {
if (thread->in_pool->generation != thread->generation) {
void *arg = thread->in_pool->update_args[thread->index];
thread->in_pool->update_args[thread->index] = NULL;
workqueue_reply_t (*update_fn)(void*,void*) =
thread->in_pool->update_fn;
thread->generation = thread->in_pool->generation;
tor_mutex_release(&pool->lock);
workqueue_reply_t r = update_fn(thread->state, arg);
if (r != WQ_RPL_REPLY) {
return;
}
tor_mutex_acquire(&pool->lock);
continue;
}
work = worker_thread_extract_next_work(thread);
if (BUG(work == NULL))
break;
tor_mutex_release(&pool->lock);
result = work->fn(thread->state, work->arg);
queue_reply(thread->reply_queue, work);
if (result != WQ_RPL_REPLY) {
return;
}
tor_mutex_acquire(&pool->lock);
}
if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
log_warn(LD_GENERAL, "Fail tor_cond_wait.");
}
}
}
static void
queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
{
int was_empty;
tor_mutex_acquire(&queue->lock);
was_empty = TOR_TAILQ_EMPTY(&queue->answers);
TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
tor_mutex_release(&queue->lock);
if (was_empty) {
if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
}
}
}
static workerthread_t *
workerthread_new(int32_t lower_priority_chance,
void *state, threadpool_t *pool, replyqueue_t *replyqueue)
{
workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
thr->state = state;
thr->reply_queue = replyqueue;
thr->in_pool = pool;
thr->lower_priority_chance = lower_priority_chance;
if (spawn_func(worker_thread_main, thr) < 0) {
tor_assert_nonfatal_unreached();
log_err(LD_GENERAL, "Can't launch worker thread.");
tor_free(thr);
return NULL;
}
return thr;
}
workqueue_entry_t *
threadpool_queue_work_priority(threadpool_t *pool,
workqueue_priority_t prio,
workqueue_reply_t (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg)
{
tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
((int)prio) <= WORKQUEUE_PRIORITY_LAST);
workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
ent->on_pool = pool;
ent->pending = 1;
ent->priority = prio;
tor_mutex_acquire(&pool->lock);
TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
tor_cond_signal_one(&pool->condition);
tor_mutex_release(&pool->lock);
return ent;
}
workqueue_entry_t *
threadpool_queue_work(threadpool_t *pool,
workqueue_reply_t (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg)
{
return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
}
int
threadpool_queue_update(threadpool_t *pool,
void *(*dup_fn)(void *),
workqueue_reply_t (*fn)(void *, void *),
void (*free_fn)(void *),
void *arg)
{
int i, n_threads;
void (*old_args_free_fn)(void *arg);
void **old_args;
void **new_args;
tor_mutex_acquire(&pool->lock);
n_threads = pool->n_threads;
old_args = pool->update_args;
old_args_free_fn = pool->free_update_arg_fn;
new_args = tor_calloc(n_threads, sizeof(void*));
for (i = 0; i < n_threads; ++i) {
if (dup_fn)
new_args[i] = dup_fn(arg);
else
new_args[i] = arg;
}
pool->update_args = new_args;
pool->free_update_arg_fn = free_fn;
pool->update_fn = fn;
++pool->generation;
tor_cond_signal_all(&pool->condition);
tor_mutex_release(&pool->lock);
if (old_args) {
for (i = 0; i < n_threads; ++i) {
if (old_args[i] && old_args_free_fn)
old_args_free_fn(old_args[i]);
}
tor_free(old_args);
}
return 0;
}
#define MAX_THREADS 1024
#define CHANCE_PERMISSIVE 37
#define CHANCE_STRICT INT32_MAX
static int
threadpool_start_threads(threadpool_t *pool, int n)
{
if (BUG(n < 0))
return -1; if (n > MAX_THREADS)
n = MAX_THREADS;
tor_mutex_acquire(&pool->lock);
if (pool->n_threads < n)
pool->threads = tor_reallocarray(pool->threads,
sizeof(workerthread_t*), n);
while (pool->n_threads < n) {
int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;
void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
workerthread_t *thr = workerthread_new(chance,
state, pool, pool->reply_queue);
if (!thr) {
tor_assert_nonfatal_unreached();
pool->free_thread_state_fn(state);
tor_mutex_release(&pool->lock);
return -1;
}
thr->index = pool->n_threads;
pool->threads[pool->n_threads++] = thr;
}
tor_mutex_release(&pool->lock);
return 0;
}
threadpool_t *
threadpool_new(int n_threads,
replyqueue_t *replyqueue,
void *(*new_thread_state_fn)(void*),
void (*free_thread_state_fn)(void*),
void *arg)
{
threadpool_t *pool;
pool = tor_malloc_zero(sizeof(threadpool_t));
tor_mutex_init_nonrecursive(&pool->lock);
tor_cond_init(&pool->condition);
unsigned i;
for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
TOR_TAILQ_INIT(&pool->work[i]);
}
pool->new_thread_state_fn = new_thread_state_fn;
pool->new_thread_state_arg = arg;
pool->free_thread_state_fn = free_thread_state_fn;
pool->reply_queue = replyqueue;
if (threadpool_start_threads(pool, n_threads) < 0) {
tor_assert_nonfatal_unreached();
tor_cond_uninit(&pool->condition);
tor_mutex_uninit(&pool->lock);
tor_free(pool);
return NULL;
}
return pool;
}
replyqueue_t *
threadpool_get_replyqueue(threadpool_t *tp)
{
return tp->reply_queue;
}
replyqueue_t *
replyqueue_new(uint32_t alertsocks_flags)
{
replyqueue_t *rq;
rq = tor_malloc_zero(sizeof(replyqueue_t));
if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
tor_free(rq);
return NULL;
}
tor_mutex_init(&rq->lock);
TOR_TAILQ_INIT(&rq->answers);
return rq;
}
static void
reply_event_cb(evutil_socket_t sock, short events, void *arg)
{
threadpool_t *tp = arg;
(void) sock;
(void) events;
replyqueue_process(tp->reply_queue);
if (tp->reply_cb)
tp->reply_cb(tp);
}
int
threadpool_register_reply_event(threadpool_t *tp,
void (*cb)(threadpool_t *tp))
{
struct event_base *base = tor_libevent_get_base();
if (tp->reply_event) {
tor_event_free(tp->reply_event);
}
tp->reply_event = tor_event_new(base,
tp->reply_queue->alert.read_fd,
EV_READ|EV_PERSIST,
reply_event_cb,
tp);
tor_assert(tp->reply_event);
tp->reply_cb = cb;
return event_add(tp->reply_event, NULL);
}
void
replyqueue_process(replyqueue_t *queue)
{
int r = queue->alert.drain_fn(queue->alert.read_fd);
if (r < 0) {
static ratelim_t warn_limit = RATELIM_INIT(7200);
log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
"Failure from drain_fd: %s",
tor_socket_strerror(-r));
}
tor_mutex_acquire(&queue->lock);
while (!TOR_TAILQ_EMPTY(&queue->answers)) {
workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
tor_mutex_release(&queue->lock);
work->on_pool = NULL;
work->reply_fn(work->arg);
workqueue_entry_free(work);
tor_mutex_acquire(&queue->lock);
}
tor_mutex_release(&queue->lock);
}