#ifndef TEST_MAIN
#include <config.h>
#endif
#include <stdlib.h>
#include <inttypes.h>
#include <signal.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <assert.h>
#include <stdarg.h>
#include <unistd.h>
#include <limits.h>
#include "thread_pool_internal.h"
#ifdef DEBUG
static int worker_id(hts_tpool *p) {
int i;
pthread_t s = pthread_self();
for (i = 0; i < p->tsize; i++) {
if (pthread_equal(s, p->t[i].tid))
return i;
}
return -1;
}
int DBG_OUT(FILE *fp, char *fmt, ...) {
va_list args;
va_start(args, fmt);
return vfprintf(fp, fmt, args);
}
#else
#define DBG_OUT(...) do{}while(0)
#endif
static int hts_tpool_add_result(hts_tpool_job *j, void *data) {
hts_tpool_process *q = j->q;
hts_tpool_result *r;
pthread_mutex_lock(&q->p->pool_m);
DBG_OUT(stderr, "%d: Adding result to queue %p, serial %"PRId64", %d of %d\n",
worker_id(j->p), q, j->serial, q->n_output+1, q->qsize);
if (--q->n_processing == 0)
pthread_cond_signal(&q->none_processing_c);
if (q->in_only) {
pthread_mutex_unlock(&q->p->pool_m);
return 0;
}
if (!(r = malloc(sizeof(*r))))
return -1;
r->next = NULL;
r->data = data;
r->serial = j->serial;
q->n_output++;
if (q->output_tail) {
q->output_tail->next = r;
q->output_tail = r;
} else {
q->output_head = q->output_tail = r;
}
assert(r->serial >= q->next_serial || q->next_serial == INT_MAX); if (r->serial == q->next_serial) {
DBG_OUT(stderr, "%d: Broadcasting result_avail (id %"PRId64")\n",
worker_id(j->p), r->serial);
pthread_cond_broadcast(&q->output_avail_c);
DBG_OUT(stderr, "%d: Broadcast complete\n", worker_id(j->p));
}
pthread_mutex_unlock(&q->p->pool_m);
return 0;
}
static void wake_next_worker(hts_tpool_process *q, int locked);
static hts_tpool_result *hts_tpool_next_result_locked(hts_tpool_process *q) {
hts_tpool_result *r, *last;
if (q->shutdown)
return NULL;
for (last = NULL, r = q->output_head; r; last = r, r = r->next) {
if (r->serial == q->next_serial)
break;
}
if (r) {
if (q->output_head == r)
q->output_head = r->next;
else
last->next = r->next;
if (q->output_tail == r)
q->output_tail = last;
if (!q->output_head)
q->output_tail = NULL;
q->next_serial++;
q->n_output--;
if (q->qsize && q->n_output < q->qsize) {
if (q->n_input < q->qsize)
pthread_cond_signal(&q->input_not_full_c);
if (!q->shutdown)
wake_next_worker(q, 1);
}
}
return r;
}
hts_tpool_result *hts_tpool_next_result(hts_tpool_process *q) {
hts_tpool_result *r;
DBG_OUT(stderr, "Requesting next result on queue %p\n", q);
pthread_mutex_lock(&q->p->pool_m);
r = hts_tpool_next_result_locked(q);
pthread_mutex_unlock(&q->p->pool_m);
DBG_OUT(stderr, "(q=%p) Found %p\n", q, r);
return r;
}
hts_tpool_result *hts_tpool_next_result_wait(hts_tpool_process *q) {
hts_tpool_result *r;
pthread_mutex_lock(&q->p->pool_m);
while (!(r = hts_tpool_next_result_locked(q))) {
struct timeval now;
struct timespec timeout;
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec + 10;
timeout.tv_nsec = now.tv_usec * 1000;
q->ref_count++;
if (q->shutdown) {
int rc = --q->ref_count;
pthread_mutex_unlock(&q->p->pool_m);
if (rc == 0)
hts_tpool_process_destroy(q);
return NULL;
}
pthread_cond_timedwait(&q->output_avail_c, &q->p->pool_m, &timeout);
q->ref_count--;
}
pthread_mutex_unlock(&q->p->pool_m);
return r;
}
int hts_tpool_process_empty(hts_tpool_process *q) {
int empty;
pthread_mutex_lock(&q->p->pool_m);
empty = q->n_input == 0 && q->n_processing == 0 && q->n_output == 0;
pthread_mutex_unlock(&q->p->pool_m);
return empty;
}
void hts_tpool_process_ref_incr(hts_tpool_process *q) {
pthread_mutex_lock(&q->p->pool_m);
q->ref_count++;
pthread_mutex_unlock(&q->p->pool_m);
}
void hts_tpool_process_ref_decr(hts_tpool_process *q) {
pthread_mutex_lock(&q->p->pool_m);
if (--q->ref_count <= 0) {
pthread_mutex_unlock(&q->p->pool_m);
hts_tpool_process_destroy(q);
return;
}
pthread_mutex_unlock(&q->p->pool_m);
}
int hts_tpool_process_len(hts_tpool_process *q) {
int len;
pthread_mutex_lock(&q->p->pool_m);
len = q->n_output;
pthread_mutex_unlock(&q->p->pool_m);
return len;
}
int hts_tpool_process_sz(hts_tpool_process *q) {
int len;
pthread_mutex_lock(&q->p->pool_m);
len = q->n_output + q->n_input + q->n_processing;
pthread_mutex_unlock(&q->p->pool_m);
return len;
}
void hts_tpool_process_shutdown(hts_tpool_process *q) {
pthread_mutex_lock(&q->p->pool_m);
q->shutdown = 1;
pthread_cond_broadcast(&q->output_avail_c);
pthread_cond_broadcast(&q->input_not_full_c);
pthread_cond_broadcast(&q->input_empty_c);
pthread_cond_broadcast(&q->none_processing_c);
pthread_mutex_unlock(&q->p->pool_m);
}
void hts_tpool_delete_result(hts_tpool_result *r, int free_data) {
if (!r)
return;
if (free_data && r->data)
free(r->data);
free(r);
}
void *hts_tpool_result_data(hts_tpool_result *r) {
return r->data;
}
hts_tpool_process *hts_tpool_process_init(hts_tpool *p, int qsize, int in_only) {
hts_tpool_process *q = malloc(sizeof(*q));
pthread_cond_init(&q->output_avail_c, NULL);
pthread_cond_init(&q->input_not_full_c, NULL);
pthread_cond_init(&q->input_empty_c, NULL);
pthread_cond_init(&q->none_processing_c,NULL);
q->p = p;
q->input_head = NULL;
q->input_tail = NULL;
q->output_head = NULL;
q->output_tail = NULL;
q->next_serial = 0;
q->curr_serial = 0;
q->n_input = 0;
q->n_output = 0;
q->n_processing= 0;
q->qsize = qsize;
q->in_only = in_only;
q->shutdown = 0;
q->wake_dispatch = 0;
q->ref_count = 1;
q->next = NULL;
q->prev = NULL;
hts_tpool_process_attach(p, q);
return q;
}
void hts_tpool_process_destroy(hts_tpool_process *q) {
DBG_OUT(stderr, "Destroying results queue %p\n", q);
if (!q)
return;
hts_tpool_process_reset(q, 0);
pthread_mutex_lock(&q->p->pool_m);
hts_tpool_process_detach(q->p, q);
hts_tpool_process_shutdown(q);
if (--q->ref_count > 0) {
pthread_mutex_unlock(&q->p->pool_m);
return;
}
pthread_cond_destroy(&q->output_avail_c);
pthread_cond_destroy(&q->input_not_full_c);
pthread_cond_destroy(&q->input_empty_c);
pthread_cond_destroy(&q->none_processing_c);
pthread_mutex_unlock(&q->p->pool_m);
free(q);
DBG_OUT(stderr, "Destroyed results queue %p\n", q);
}
void hts_tpool_process_attach(hts_tpool *p, hts_tpool_process *q) {
pthread_mutex_lock(&p->pool_m);
if (p->q_head) {
q->next = p->q_head;
q->prev = p->q_head->prev;
p->q_head->prev->next = q;
p->q_head->prev = q;
} else {
q->next = q;
q->prev = q;
}
p->q_head = q;
assert(p->q_head && p->q_head->prev && p->q_head->next);
pthread_mutex_unlock(&p->pool_m);
}
void hts_tpool_process_detach(hts_tpool *p, hts_tpool_process *q) {
pthread_mutex_lock(&p->pool_m);
if (!p->q_head || !q->prev || !q->next)
goto done;
hts_tpool_process *curr = p->q_head, *first = curr;
do {
if (curr == q) {
q->next->prev = q->prev;
q->prev->next = q->next;
p->q_head = q->next;
q->next = q->prev = NULL;
if (p->q_head == q)
p->q_head = NULL;
break;
}
curr = curr->next;
} while (curr != first);
done:
pthread_mutex_unlock(&p->pool_m);
}
#define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
static void *tpool_worker(void *arg) {
hts_tpool_worker *w = (hts_tpool_worker *)arg;
hts_tpool *p = w->p;
hts_tpool_job *j;
for (;;) {
pthread_mutex_lock(&p->pool_m);
assert(p->q_head == 0 || (p->q_head->prev && p->q_head->next));
int work_to_do = 0;
hts_tpool_process *first = p->q_head, *q = first;
do {
if (p->shutdown)
break;
if (q && q->input_head && q->qsize - q->n_output > p->tsize - p->nwaiting) {
work_to_do = 1;
break;
}
if (q) q = q->next;
} while (q && q != first);
if (p->shutdown) {
shutdown:
#ifdef DEBUG
fprintf(stderr, "%d: Shutting down\n", worker_id(p));
#endif
pthread_mutex_unlock(&p->pool_m);
return NULL;
}
if (!work_to_do) {
p->nwaiting++;
if (p->t_stack_top == -1 || p->t_stack_top > w->idx)
p->t_stack_top = w->idx;
p->t_stack[w->idx] = 1;
pthread_cond_wait(&w->pending_c, &p->pool_m);
p->t_stack[w->idx] = 0;
int i;
p->t_stack_top = -1;
for (i = 0; i < p->tsize; i++) {
if (p->t_stack[i]) {
p->t_stack_top = i;
break;
}
}
p->nwaiting--;
pthread_mutex_unlock(&p->pool_m);
continue; }
q->ref_count++;
while (q->input_head && q->qsize - q->n_output > q->n_processing) {
if (p->shutdown)
goto shutdown;
j = q->input_head;
assert(j->p == p);
if (!(q->input_head = j->next))
q->input_tail = NULL;
q->n_processing++;
if (q->n_input-- >= q->qsize)
pthread_cond_broadcast(&q->input_not_full_c);
if (q->n_input == 0)
pthread_cond_signal(&q->input_empty_c);
p->njobs--;
pthread_mutex_unlock(&p->pool_m);
DBG_OUT(stderr, "%d: Processing queue %p, serial %"PRId64"\n",
worker_id(j->p), q, j->serial);
hts_tpool_add_result(j, j->func(j->arg));
free(j);
pthread_mutex_lock(&p->pool_m);
}
if (--q->ref_count == 0) hts_tpool_process_destroy(q);
else
p->q_head = q->next;
pthread_mutex_unlock(&p->pool_m);
}
}
static void wake_next_worker(hts_tpool_process *q, int locked) {
hts_tpool *p = q->p;
if (!locked)
pthread_mutex_lock(&p->pool_m);
assert(q->prev && q->next); p->q_head = q;
assert(p->njobs >= q->n_input);
int running = p->tsize - p->nwaiting;
int sig = p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting
&& (!q || q->n_processing < q->qsize - q->n_output);
#ifdef AVG_USAGE
if (++p->n_count == 256) {
p->n_count >>= 1;
p->n_running >>= 1;
}
p->n_running += running;
if (sig && p->n_count >= 128 && running*p->n_count > p->n_running+1) sig=0;
#endif
if (0) {
printf("%d waiting, %d running, %d output, %d, arun %d => %d\t", p->njobs,
running, q->n_output, q->qsize - q->n_output,
p->n_running/p->n_count, sig);
int i;
for (i = 0; i < p->tsize; i++)
putchar("x "[p->t_stack[i]]);
putchar('\n');
}
if (sig)
pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
if (!locked)
pthread_mutex_unlock(&p->pool_m);
}
hts_tpool *hts_tpool_init(int n) {
int i;
hts_tpool *p = malloc(sizeof(*p));
p->tsize = n;
p->njobs = 0;
p->nwaiting = 0;
p->shutdown = 0;
p->q_head = NULL;
p->t_stack = NULL;
p->n_count = 0;
p->n_running = 0;
p->t = malloc(n * sizeof(p->t[0]));
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&p->pool_m, &attr);
pthread_mutexattr_destroy(&attr);
if (!(p->t_stack = malloc(n * sizeof(*p->t_stack))))
return NULL;
p->t_stack_top = -1;
pthread_mutex_lock(&p->pool_m);
for (i = 0; i < n; i++) {
hts_tpool_worker *w = &p->t[i];
p->t_stack[i] = 0;
w->p = p;
w->idx = i;
pthread_cond_init(&w->pending_c, NULL);
if (0 != pthread_create(&w->tid, NULL, tpool_worker, w)) {
pthread_mutex_unlock(&p->pool_m);
return NULL;
}
}
pthread_mutex_unlock(&p->pool_m);
return p;
}
int hts_tpool_size(hts_tpool *p) {
return p->tsize;
}
int hts_tpool_dispatch(hts_tpool *p, hts_tpool_process *q,
void *(*func)(void *arg), void *arg) {
return hts_tpool_dispatch2(p, q, func, arg, 0);
}
int hts_tpool_dispatch2(hts_tpool *p, hts_tpool_process *q,
void *(*func)(void *arg), void *arg, int nonblock) {
hts_tpool_job *j;
pthread_mutex_lock(&p->pool_m);
DBG_OUT(stderr, "Dispatching job for queue %p, serial %"PRId64"\n",
q, q->curr_serial);
if (q->n_input >= q->qsize && nonblock == 1) {
pthread_mutex_unlock(&p->pool_m);
errno = EAGAIN;
return -1;
}
if (!(j = malloc(sizeof(*j)))) {
pthread_mutex_unlock(&p->pool_m);
return -1;
}
j->func = func;
j->arg = arg;
j->next = NULL;
j->p = p;
j->q = q;
j->serial = q->curr_serial++;
if (nonblock == 0) {
while (q->n_input >= q->qsize && !q->shutdown && !q->wake_dispatch)
pthread_cond_wait(&q->input_not_full_c, &q->p->pool_m);
if (q->shutdown) {
free(j);
pthread_mutex_unlock(&p->pool_m);
return -1;
}
if (q->wake_dispatch) {
q->wake_dispatch = 0;
}
}
p->njobs++; q->n_input++;
if (q->input_tail) {
q->input_tail->next = j;
q->input_tail = j;
} else {
q->input_head = q->input_tail = j;
}
DBG_OUT(stderr, "Dispatched (serial %"PRId64")\n", j->serial);
if (!q->shutdown)
wake_next_worker(q, 1);
pthread_mutex_unlock(&p->pool_m);
return 0;
}
void hts_tpool_wake_dispatch(hts_tpool_process *q) {
pthread_mutex_lock(&q->p->pool_m);
q->wake_dispatch = 1;
pthread_cond_signal(&q->input_not_full_c);
pthread_mutex_unlock(&q->p->pool_m);
}
int hts_tpool_process_flush(hts_tpool_process *q) {
int i;
hts_tpool *p = q->p;
DBG_OUT(stderr, "Flushing pool %p\n", p);
pthread_mutex_lock(&p->pool_m);
for (i = 0; i < p->tsize; i++)
if (p->t_stack[i])
pthread_cond_signal(&p->t[i].pending_c);
if (q->qsize < q->n_output + q->n_input + q->n_processing)
q->qsize = q->n_output + q->n_input + q->n_processing;
while (q->n_input || q->n_processing) {
while (q->n_input)
pthread_cond_wait(&q->input_empty_c, &p->pool_m);
if (q->shutdown) break;
while (q->n_processing)
pthread_cond_wait(&q->none_processing_c, &p->pool_m);
if (q->shutdown) break;
}
pthread_mutex_unlock(&p->pool_m);
DBG_OUT(stderr, "Flushed complete for pool %p, queue %p\n", p, q);
return 0;
}
int hts_tpool_process_reset(hts_tpool_process *q, int free_results) {
pthread_mutex_lock(&q->p->pool_m);
q->next_serial = INT_MAX;
hts_tpool_job *j, *jn;
for (j = q->input_head; j; j = jn) {
jn = j->next;
free(j);
}
q->input_head = q->input_tail = NULL;
q->n_input = 0;
hts_tpool_result *r, *rn;
for (r = q->output_head; r; r = rn) {
rn = r->next;
hts_tpool_delete_result(r, free_results);
}
q->output_head = q->output_tail = NULL;
q->n_output = 0;
pthread_mutex_unlock(&q->p->pool_m);
if (hts_tpool_process_flush(q) != 0)
return -1;
pthread_mutex_lock(&q->p->pool_m);
for (r = q->output_head; r; r = rn) {
rn = r->next;
hts_tpool_delete_result(r, free_results);
}
q->output_head = q->output_tail = NULL;
q->n_output = 0;
q->next_serial = q->curr_serial = 0;
pthread_cond_signal(&q->input_not_full_c);
pthread_mutex_unlock(&q->p->pool_m);
return 0;
}
int hts_tpool_process_qsize(hts_tpool_process *q) {
return q->qsize;
}
void hts_tpool_destroy(hts_tpool *p) {
int i;
DBG_OUT(stderr, "Destroying pool %p\n", p);
pthread_mutex_lock(&p->pool_m);
p->shutdown = 1;
DBG_OUT(stderr, "Sending shutdown request\n");
for (i = 0; i < p->tsize; i++)
pthread_cond_signal(&p->t[i].pending_c);
pthread_mutex_unlock(&p->pool_m);
DBG_OUT(stderr, "Shutdown complete\n");
for (i = 0; i < p->tsize; i++)
pthread_join(p->t[i].tid, NULL);
pthread_mutex_destroy(&p->pool_m);
for (i = 0; i < p->tsize; i++)
pthread_cond_destroy(&p->t[i].pending_c);
if (p->t_stack)
free(p->t_stack);
free(p->t);
free(p);
DBG_OUT(stderr, "Destroyed pool %p\n", p);
}
void hts_tpool_kill(hts_tpool *p) {
int i;
DBG_OUT(stderr, "Destroying pool %p, kill=%d\n", p, kill);
for (i = 0; i < p->tsize; i++)
pthread_kill(p->t[i].tid, SIGINT);
pthread_mutex_destroy(&p->pool_m);
for (i = 0; i < p->tsize; i++)
pthread_cond_destroy(&p->t[i].pending_c);
if (p->t_stack)
free(p->t_stack);
free(p->t);
free(p);
DBG_OUT(stderr, "Destroyed pool %p\n", p);
}
#ifdef TEST_MAIN
#include <stdio.h>
#ifndef TASK_SIZE
#define TASK_SIZE 1000
#endif
void *doit_square_u(void *arg) {
int job = *(int *)arg;
usleep(random() % 100000);
printf("RESULT: %d\n", job*job);
free(arg);
return NULL;
}
int test_square_u(int n) {
hts_tpool *p = hts_tpool_init(n);
hts_tpool_process *q = hts_tpool_process_init(p, n*2, 1);
int i;
for (i = 0; i < TASK_SIZE; i++) {
int *ip = malloc(sizeof(*ip));
*ip = i;
hts_tpool_dispatch(p, q, doit_square_u, ip);
}
hts_tpool_process_flush(q);
hts_tpool_process_destroy(q);
hts_tpool_destroy(p);
return 0;
}
void *doit_square(void *arg) {
int job = *(int *)arg;
int *res;
usleep(500000 * ((job&31)==31) + random() % 10000);
res = malloc(sizeof(*res));
*res = (job<0) ? -job*job : job*job;
free(arg);
return res;
}
int test_square(int n) {
hts_tpool *p = hts_tpool_init(n);
hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
int i;
hts_tpool_result *r;
for (i = 0; i < TASK_SIZE; i++) {
int *ip = malloc(sizeof(*ip));
*ip = i;
int blk;
do {
blk = hts_tpool_dispatch2(p, q, doit_square, ip, 1);
if ((r = hts_tpool_next_result(q))) {
printf("RESULT: %d\n", *(int *)hts_tpool_result_data(r));
hts_tpool_delete_result(r, 1);
}
if (blk == -1) {
putchar('.'); fflush(stdout);
usleep(10000);
}
} while (blk == -1);
}
hts_tpool_process_flush(q);
while ((r = hts_tpool_next_result(q))) {
printf("RESULT: %d\n", *(int *)hts_tpool_result_data(r));
hts_tpool_delete_result(r, 1);
}
hts_tpool_process_destroy(q);
hts_tpool_destroy(p);
return 0;
}
struct squareB_opt {
hts_tpool *p;
hts_tpool_process *q;
int n;
};
static void *test_squareB_dispatcher(void *arg) {
struct squareB_opt *o = (struct squareB_opt *)arg;
int i, *ip;
for (i = 0; i < o->n; i++) {
ip = malloc(sizeof(*ip));
*ip = i;
hts_tpool_dispatch(o->p, o->q, doit_square, ip);
}
*(ip = malloc(sizeof(*ip))) = -1;
hts_tpool_dispatch(o->p, o->q, doit_square, ip);
pthread_exit(NULL);
}
int test_squareB(int n) {
hts_tpool *p = hts_tpool_init(n);
hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
struct squareB_opt o = {p, q, TASK_SIZE};
pthread_t tid;
pthread_create(&tid, NULL, test_squareB_dispatcher, &o);
for(;;) {
hts_tpool_result *r = hts_tpool_next_result_wait(q);
int x = *(int *)hts_tpool_result_data(r);
hts_tpool_delete_result(r, 1);
if (x == -1)
break;
printf("RESULT: %d\n", x);
}
hts_tpool_process_flush(q);
assert(hts_tpool_next_result(q) == NULL);
hts_tpool_process_destroy(q);
hts_tpool_destroy(p);
pthread_join(tid, NULL);
return 0;
}
static void *pipe_input_thread(void *arg);
static void *pipe_stage1(void *arg);
static void *pipe_stage2(void *arg);
static void *pipe_stage3(void *arg);
static void *pipe_output_thread(void *arg);
typedef struct {
hts_tpool *p;
hts_tpool_process *q1;
hts_tpool_process *q2;
hts_tpool_process *q3;
int n;
} pipe_opt;
typedef struct {
pipe_opt *o;
unsigned int x;
int eof; } pipe_job;
static void *pipe_input_thread(void *arg) {
pipe_opt *o = (pipe_opt *)arg;
int i;
for (i = 1; i <= o->n; i++) {
pipe_job *j = malloc(sizeof(*j));
j->o = o;
j->x = i;
j->eof = (i == o->n);
printf("I %08x\n", j->x);
if (hts_tpool_dispatch(o->p, o->q1, pipe_stage1, j) != 0) {
free(j);
pthread_exit((void *)1);
}
}
pthread_exit(NULL);
}
static void *pipe_stage1(void *arg) {
pipe_job *j = (pipe_job *)arg;
j->x <<= 8;
usleep(random() % 10000); printf("1 %08x\n", j->x);
return j;
}
static void *pipe_stage1to2(void *arg) {
pipe_opt *o = (pipe_opt *)arg;
hts_tpool_result *r;
while ((r = hts_tpool_next_result_wait(o->q1))) {
pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
hts_tpool_delete_result(r, 0);
if (hts_tpool_dispatch(j->o->p, j->o->q2, pipe_stage2, j) != 0)
pthread_exit((void *)1);
if (j->eof)
break;
}
pthread_exit(NULL);
}
static void *pipe_stage2(void *arg) {
pipe_job *j = (pipe_job *)arg;
j->x <<= 8;
usleep(random() % 100000); printf("2 %08x\n", j->x);
return j;
}
static void *pipe_stage2to3(void *arg) {
pipe_opt *o = (pipe_opt *)arg;
hts_tpool_result *r;
while ((r = hts_tpool_next_result_wait(o->q2))) {
pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
hts_tpool_delete_result(r, 0);
if (hts_tpool_dispatch(j->o->p, j->o->q3, pipe_stage3, j) != 0)
pthread_exit((void *)1);
if (j->eof)
break;
}
pthread_exit(NULL);
}
static void *pipe_stage3(void *arg) {
pipe_job *j = (pipe_job *)arg;
usleep(random() % 10000); j->x <<= 8;
return j;
}
static void *pipe_output_thread(void *arg) {
pipe_opt *o = (pipe_opt *)arg;
hts_tpool_result *r;
while ((r = hts_tpool_next_result_wait(o->q3))) {
pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
int eof = j->eof;
printf("O %08x\n", j->x);
hts_tpool_delete_result(r, 1);
if (eof)
break;
}
pthread_exit(NULL);
}
int test_pipe(int n) {
hts_tpool *p = hts_tpool_init(n);
hts_tpool_process *q1 = hts_tpool_process_init(p, n*2, 0);
hts_tpool_process *q2 = hts_tpool_process_init(p, n*2, 0);
hts_tpool_process *q3 = hts_tpool_process_init(p, n*2, 0);
pipe_opt o = {p, q1, q2, q3, TASK_SIZE};
pthread_t tidIto1, tid1to2, tid2to3, tid3toO;
void *retv;
int ret;
pthread_create(&tidIto1, NULL, pipe_input_thread, &o);
pthread_create(&tid1to2, NULL, pipe_stage1to2, &o);
pthread_create(&tid2to3, NULL, pipe_stage2to3, &o);
pthread_create(&tid3toO, NULL, pipe_output_thread, &o);
ret = 0;
pthread_join(tidIto1, &retv); ret |= (retv != NULL);
pthread_join(tid1to2, &retv); ret |= (retv != NULL);
pthread_join(tid2to3, &retv); ret |= (retv != NULL);
pthread_join(tid3toO, &retv); ret |= (retv != NULL);
printf("Return value %d\n", ret);
hts_tpool_process_destroy(q1);
hts_tpool_process_destroy(q2);
hts_tpool_process_destroy(q3);
hts_tpool_destroy(p);
return 0;
}
int main(int argc, char **argv) {
int n;
srandom(0);
if (argc < 3) {
fprintf(stderr, "Usage: %s command n_threads\n", argv[0]);
fprintf(stderr, "Where commands are:\n\n");
fprintf(stderr, "unordered # Unordered output\n");
fprintf(stderr, "ordered1 # Main thread with non-block API\n");
fprintf(stderr, "ordered2 # Dispatch thread, blocking API\n");
fprintf(stderr, "pipe # Multi-stage pipeline, several queues\n");
exit(1);
}
n = atoi(argv[2]);
if (strcmp(argv[1], "unordered") == 0) return test_square_u(n);
if (strcmp(argv[1], "ordered1") == 0) return test_square(n);
if (strcmp(argv[1], "ordered2") == 0) return test_squareB(n);
if (strcmp(argv[1], "pipe") == 0) return test_pipe(n);
fprintf(stderr, "Unknown sub-command\n");
exit(1);
}
#endif