#include "valgrind_internal.h"
#include "ringbuf.h"
#include "util.h"
#include "out.h"
#include "os.h"
#include "os_thread.h"
#include "sys_util.h"
#define RINGBUF_MAX_CONSUMER_THREADS 1024
#define CACHELINE_PADDING(type, name)\
union { type name; uint64_t name##_padding[8]; }
struct ringbuf {
CACHELINE_PADDING(uint64_t, read_pos);
CACHELINE_PADDING(uint64_t, write_pos);
CACHELINE_PADDING(os_semaphore_t, nfree);
CACHELINE_PADDING(os_semaphore_t, nused);
unsigned len;
uint64_t len_mask;
int running;
void *data[];
};
struct ringbuf *
ringbuf_new(unsigned length)
{
LOG(4, NULL);
if (util_popcount(length) > 1)
return NULL;
struct ringbuf *rbuf =
Zalloc(sizeof(*rbuf) + (length * sizeof(void *)));
if (rbuf == NULL)
return NULL;
if (os_semaphore_init(&rbuf->nfree, length)) {
Free(rbuf);
return NULL;
}
if (os_semaphore_init(&rbuf->nused, 0)) {
util_semaphore_destroy(&rbuf->nfree);
Free(rbuf);
return NULL;
}
rbuf->read_pos = 0;
rbuf->write_pos = 0;
rbuf->len = length;
rbuf->len_mask = length - 1;
rbuf->running = 1;
return rbuf;
}
unsigned
ringbuf_length(struct ringbuf *rbuf)
{
LOG(4, NULL);
return rbuf->len;
}
void
ringbuf_stop(struct ringbuf *rbuf)
{
LOG(4, NULL);
while (rbuf->read_pos != rbuf->write_pos)
util_synchronize();
int ret = util_bool_compare_and_swap32(&rbuf->running, 1, 0);
ASSERTeq(ret, 1);
for (int64_t i = 0; i < RINGBUF_MAX_CONSUMER_THREADS; ++i)
util_semaphore_post(&rbuf->nused);
}
void
ringbuf_delete(struct ringbuf *rbuf)
{
LOG(4, NULL);
ASSERTeq(rbuf->read_pos, rbuf->write_pos);
util_semaphore_destroy(&rbuf->nfree);
util_semaphore_destroy(&rbuf->nused);
Free(rbuf);
}
static void
ringbuf_enqueue_atomic(struct ringbuf *rbuf, void *data)
{
LOG(4, NULL);
size_t w = util_fetch_and_add64(&rbuf->write_pos, 1) & rbuf->len_mask;
ASSERT(rbuf->running);
while (!util_bool_compare_and_swap64(&rbuf->data[w], NULL, data))
;
VALGRIND_ANNOTATE_HAPPENS_BEFORE(&rbuf->data[w]);
}
int
ringbuf_enqueue(struct ringbuf *rbuf, void *data)
{
LOG(4, NULL);
util_semaphore_wait(&rbuf->nfree);
ringbuf_enqueue_atomic(rbuf, data);
util_semaphore_post(&rbuf->nused);
return 0;
}
int
ringbuf_tryenqueue(struct ringbuf *rbuf, void *data)
{
LOG(4, NULL);
if (util_semaphore_trywait(&rbuf->nfree) != 0)
return -1;
ringbuf_enqueue_atomic(rbuf, data);
util_semaphore_post(&rbuf->nused);
return 0;
}
static void *
ringbuf_dequeue_atomic(struct ringbuf *rbuf)
{
LOG(4, NULL);
size_t r = util_fetch_and_add64(&rbuf->read_pos, 1) & rbuf->len_mask;
void *data = NULL;
VALGRIND_ANNOTATE_HAPPENS_AFTER(&rbuf->data[r]);
do {
while ((data = rbuf->data[r]) == NULL)
util_synchronize();
} while (!util_bool_compare_and_swap64(&rbuf->data[r], data, NULL));
return data;
}
void *
ringbuf_dequeue(struct ringbuf *rbuf)
{
LOG(4, NULL);
util_semaphore_wait(&rbuf->nused);
if (!rbuf->running)
return NULL;
void *data = ringbuf_dequeue_atomic(rbuf);
util_semaphore_post(&rbuf->nfree);
return data;
}
void *
ringbuf_trydequeue(struct ringbuf *rbuf)
{
LOG(4, NULL);
if (util_semaphore_trywait(&rbuf->nused) != 0)
return NULL;
if (!rbuf->running)
return NULL;
void *data = ringbuf_dequeue_atomic(rbuf);
util_semaphore_post(&rbuf->nfree);
return data;
}
void *
ringbuf_trydequeue_s(struct ringbuf *rbuf, size_t data_size)
{
LOG(4, NULL);
void *r = ringbuf_trydequeue(rbuf);
if (r != NULL)
VALGRIND_ANNOTATE_NEW_MEMORY(r, data_size);
return r;
}
void *
ringbuf_dequeue_s(struct ringbuf *rbuf, size_t data_size)
{
LOG(4, NULL);
void *r = ringbuf_dequeue(rbuf);
if (r != NULL)
VALGRIND_ANNOTATE_NEW_MEMORY(r, data_size);
return r;
}