#include <stdint.h>
#include "ringbuf.h"
#include "util.h"
#include "unittest.h"
static void
fill_fetch_all(void)
{
#define RINGBUF_LEN 16
struct ringbuf *rbuf = ringbuf_new(RINGBUF_LEN);
UT_ASSERTne(rbuf, NULL);
for (uint64_t i = 1; i <= RINGBUF_LEN; ++i) {
ringbuf_enqueue(rbuf, (void *)i);
}
UT_ASSERTne(ringbuf_tryenqueue(rbuf, (void *)1), 0);
for (uint64_t i = 1; i <= RINGBUF_LEN; ++i) {
void *data = ringbuf_dequeue(rbuf);
UT_ASSERTeq(data, (void *)i);
}
UT_ASSERTeq(ringbuf_trydequeue(rbuf), NULL);
ringbuf_delete(rbuf);
#undef RINGBUF_LEN
}
struct th_msg {
int th_id;
int msg_id;
int consumed;
};
struct th_arg {
int th_id;
int nmsg;
struct th_msg *msgs;
int nconsumers;
int nproducers;
long long *consumers_msg_sum;
int *msg_per_producer_sum;
struct ringbuf *rbuf;
};
static void *
producer(void *arg)
{
struct th_arg *thp = arg;
struct th_msg *m;
for (int i = 0; i < thp->nmsg; ++i) {
m = &thp->msgs[i];
m->th_id = thp->th_id;
m->msg_id = i;
UT_ASSERTeq(m->consumed, 0);
ringbuf_enqueue(thp->rbuf, m);
}
return NULL;
}
static void *
consumer(void *arg)
{
struct th_arg *thp = arg;
struct th_msg *m;
int *last_msg_id = MALLOC(sizeof(int) * thp->nproducers);
for (int i = 0; i < thp->nproducers; ++i)
last_msg_id[i] = -1;
for (int i = 0; i < thp->nmsg; ++i) {
m = ringbuf_dequeue_s(thp->rbuf, sizeof(struct th_msg));
long long nmsg_consumed = util_fetch_and_add32(
&thp->msg_per_producer_sum[m->th_id], 1);
util_fetch_and_add32(&m->consumed, 1);
if (thp->nconsumers == 1) {
UT_ASSERTeq(last_msg_id[m->th_id], m->msg_id - 1);
last_msg_id[m->th_id] = m->msg_id;
}
util_fetch_and_add64(thp->consumers_msg_sum, m->msg_id);
UT_ASSERT(nmsg_consumed - (thp->nproducers / 2) <= m->msg_id ||
nmsg_consumed + (thp->nproducers / 2) >= m->msg_id);
}
FREE(last_msg_id);
return NULL;
}
static void
many_consumers_many_producers(int nconsumers, int nproducers, int msg_total)
{
#define RINGBUF_LEN 256
os_thread_t *consumers = MALLOC(sizeof(os_thread_t) * nconsumers);
os_thread_t *producers = MALLOC(sizeof(os_thread_t) * nproducers);
long long consumers_msg_sum = 0;
int *msg_per_producer_sum = ZALLOC(sizeof(int) * nproducers);
struct th_arg arg_proto = {
.th_id = 0,
.nmsg = 0,
.msgs = NULL,
.nconsumers = nconsumers,
.nproducers = nproducers,
.rbuf = ringbuf_new(RINGBUF_LEN),
.consumers_msg_sum = &consumers_msg_sum,
.msg_per_producer_sum = msg_per_producer_sum,
};
UT_ASSERTne(arg_proto.rbuf, NULL);
int msg_per_producer = msg_total / nproducers;
int msg_per_consumer = msg_total / nconsumers;
struct th_arg *targ;
struct th_arg **producer_args =
MALLOC(sizeof(struct th_arg *) * nproducers);
struct th_arg **consumer_args =
MALLOC(sizeof(struct th_arg *) * nconsumers);
UT_ASSERTeq(msg_total % nproducers, 0);
UT_ASSERTeq(msg_total % nconsumers, 0);
for (int i = 0; i < nconsumers; ++i) {
targ = MALLOC(sizeof(*targ));
*targ = arg_proto;
targ->th_id = i;
targ->nmsg = msg_per_consumer;
consumer_args[i] = targ;
PTHREAD_CREATE(&consumers[i], NULL, consumer, targ);
}
for (int i = 0; i < nproducers; ++i) {
targ = MALLOC(sizeof(*targ));
*targ = arg_proto;
targ->th_id = i;
targ->nmsg = msg_per_producer;
targ->msgs = ZALLOC(sizeof(struct th_msg) * msg_per_producer);
producer_args[i] = targ;
PTHREAD_CREATE(&producers[i], NULL, producer, targ);
}
for (int i = 0; i < nproducers; ++i)
PTHREAD_JOIN(&producers[i], NULL);
for (int i = 0; i < nconsumers; ++i)
PTHREAD_JOIN(&consumers[i], NULL);
long long expected_sum = 0;
for (int i = 0; i < nproducers; ++i) {
for (int j = 0; j < msg_per_producer; ++j) {
expected_sum += j;
UT_ASSERTeq(producer_args[i]->msgs[j].consumed, 1);
}
FREE(producer_args[i]->msgs);
FREE(producer_args[i]);
}
for (int i = 0; i < nconsumers; ++i) {
FREE(consumer_args[i]);
}
UT_ASSERTeq(consumers_msg_sum, expected_sum);
ringbuf_delete(arg_proto.rbuf);
FREE(producer_args);
FREE(consumer_args);
FREE(msg_per_producer_sum);
FREE(consumers);
FREE(producers);
#undef RINGBUF_LEN
}
int
main(int argc, char *argv[])
{
START(argc, argv, "obj_ringbuf");
fill_fetch_all();
many_consumers_many_producers(1, 1, 1000000);
many_consumers_many_producers(1, 10, 1000000);
many_consumers_many_producers(10, 1, 1000000);
many_consumers_many_producers(10, 10, 1000000);
DONE(NULL);
}