#include <cassert>
#include <cerrno>
#include <cstring>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/uio.h>
#include <unistd.h>
#include "benchmark.hpp"
#include "libpmemlog.h"
#include "os.h"
#define POOL_HDR_SIZE (3 * 4096)
#define MIN_VEC_SIZE 1
struct prog_args {
unsigned seed;
bool rand;
int vec_size;
size_t el_size;
size_t min_size;
bool no_warmup;
bool fileio;
};
struct log_worker_info {
unsigned seed;
struct iovec *iov;
char *buf;
size_t buf_size;
size_t buf_ptr;
size_t *rand_sizes;
size_t *vec_sizes;
};
struct log_bench {
size_t psize;
PMEMlogpool *plp;
struct prog_args *args;
int fd;
unsigned seed;
int (*func_op)(struct benchmark *, struct operation_info *);
};
static int
do_warmup(struct log_bench *lb, size_t nops)
{
int ret = 0;
size_t bsize = lb->args->vec_size * lb->args->el_size;
char *buf = (char *)calloc(1, bsize);
if (!buf) {
perror("calloc");
return -1;
}
if (!lb->args->fileio) {
for (size_t i = 0; i < nops; i++) {
if (pmemlog_append(lb->plp, buf, lb->args->el_size) <
0) {
ret = -1;
perror("pmemlog_append");
goto out;
}
}
pmemlog_rewind(lb->plp);
} else {
for (size_t i = 0; i < nops; i++) {
if (write(lb->fd, buf, (unsigned)lb->args->el_size) !=
(ssize_t)lb->args->el_size) {
ret = -1;
perror("write");
os_close(lb->fd);
goto out;
}
}
os_lseek(lb->fd, 0, SEEK_SET);
}
out:
free(buf);
return ret;
}
static int
log_append(struct benchmark *bench, struct operation_info *info)
{
struct log_bench *lb = (struct log_bench *)pmembench_get_priv(bench);
assert(lb);
struct log_worker_info *worker_info =
(struct log_worker_info *)info->worker->priv;
assert(worker_info);
size_t size = lb->args->rand ? worker_info->rand_sizes[info->index]
: lb->args->el_size;
if (pmemlog_append(lb->plp, worker_info->buf, size) < 0) {
perror("pmemlog_append");
return -1;
}
return 0;
}
static int
log_appendv(struct benchmark *bench, struct operation_info *info)
{
struct log_bench *lb = (struct log_bench *)pmembench_get_priv(bench);
assert(lb);
struct log_worker_info *worker_info =
(struct log_worker_info *)info->worker->priv;
assert(worker_info);
struct iovec *iov = &worker_info->iov[info->index * lb->args->vec_size];
if (pmemlog_appendv(lb->plp, iov, lb->args->vec_size) < 0) {
perror("pmemlog_appendv");
return -1;
}
return 0;
}
static int
fileio_append(struct benchmark *bench, struct operation_info *info)
{
struct log_bench *lb = (struct log_bench *)pmembench_get_priv(bench);
assert(lb);
struct log_worker_info *worker_info =
(struct log_worker_info *)info->worker->priv;
assert(worker_info);
size_t size = lb->args->rand ? worker_info->rand_sizes[info->index]
: lb->args->el_size;
if (write(lb->fd, worker_info->buf, (unsigned)size) != (ssize_t)size) {
perror("write");
return -1;
}
return 0;
}
static int
fileio_appendv(struct benchmark *bench, struct operation_info *info)
{
struct log_bench *lb = (struct log_bench *)pmembench_get_priv(bench);
assert(lb != NULL);
struct log_worker_info *worker_info =
(struct log_worker_info *)info->worker->priv;
assert(worker_info);
struct iovec *iov = &worker_info->iov[info->index * lb->args->vec_size];
size_t vec_size = worker_info->vec_sizes[info->index];
if (os_writev(lb->fd, iov, lb->args->vec_size) != (ssize_t)vec_size) {
perror("writev");
return -1;
}
return 0;
}
static int
log_process_data(const void *buf, size_t len, void *arg)
{
struct log_worker_info *worker_info = (struct log_worker_info *)arg;
size_t left = worker_info->buf_size - worker_info->buf_ptr;
if (len > left) {
worker_info->buf_ptr = 0;
left = worker_info->buf_size;
}
len = len < left ? len : left;
assert(len <= left);
void *buff = &worker_info->buf[worker_info->buf_ptr];
memcpy(buff, buf, len);
worker_info->buf_ptr += len;
return 1;
}
static int
fileio_read(int fd, ssize_t len, struct log_worker_info *worker_info)
{
ssize_t left = worker_info->buf_size - worker_info->buf_ptr;
if (len > left) {
worker_info->buf_ptr = 0;
left = worker_info->buf_size;
}
len = len < left ? len : left;
assert(len <= left);
size_t off = worker_info->buf_ptr;
void *buff = &worker_info->buf[off];
if ((len = pread(fd, buff, len, off)) < 0)
return -1;
worker_info->buf_ptr += len;
return 1;
}
static int
log_read_op(struct benchmark *bench, struct operation_info *info)
{
struct log_bench *lb = (struct log_bench *)pmembench_get_priv(bench);
assert(lb);
struct log_worker_info *worker_info =
(struct log_worker_info *)info->worker->priv;
assert(worker_info);
worker_info->buf_ptr = 0;
size_t chunk_size = lb->args->rand
? worker_info->rand_sizes[info->index]
: lb->args->el_size;
if (!lb->args->fileio) {
pmemlog_walk(lb->plp, chunk_size, log_process_data,
worker_info);
return 0;
}
int ret;
while ((ret = fileio_read(lb->fd, chunk_size, worker_info)) == 1)
;
return ret;
}
static int
log_init_worker(struct benchmark *bench, struct benchmark_args *args,
struct worker_info *worker)
{
int ret = 0;
struct log_bench *lb = (struct log_bench *)pmembench_get_priv(bench);
size_t i_size, n_vectors;
assert(lb);
struct log_worker_info *worker_info = (struct log_worker_info *)malloc(
sizeof(struct log_worker_info));
if (!worker_info) {
perror("malloc");
return -1;
}
worker_info->buf_size = lb->args->el_size * lb->args->vec_size;
worker_info->buf = (char *)malloc(worker_info->buf_size);
if (!worker_info->buf) {
perror("malloc");
ret = -1;
goto err_free_worker_info;
}
n_vectors = args->n_ops_per_thread;
worker_info->iov = (struct iovec *)malloc(
n_vectors * lb->args->vec_size * sizeof(struct iovec));
if (!worker_info->iov) {
perror("malloc");
ret = -1;
goto err_free_buf;
}
if (lb->args->rand) {
worker_info->seed = (unsigned)os_rand_r(&lb->seed);
size_t n_sizes = args->n_ops_per_thread * lb->args->vec_size;
worker_info->rand_sizes = (size_t *)malloc(
n_sizes * sizeof(*worker_info->rand_sizes));
if (!worker_info->rand_sizes) {
perror("malloc");
ret = -1;
goto err_free_iov;
}
for (size_t i = 0; i < n_sizes; i++) {
uint32_t hr = (uint32_t)os_rand_r(&worker_info->seed);
uint32_t lr = (uint32_t)os_rand_r(&worker_info->seed);
uint64_t r64 = (uint64_t)hr << 32 | lr;
size_t width = lb->args->el_size - lb->args->min_size;
worker_info->rand_sizes[i] =
r64 % width + lb->args->min_size;
}
} else {
worker_info->rand_sizes = NULL;
}
worker_info->vec_sizes = (size_t *)calloc(
args->n_ops_per_thread, sizeof(*worker_info->vec_sizes));
if (!worker_info->vec_sizes) {
perror("malloc\n");
ret = -1;
goto err_free_rand_sizes;
}
i_size = 0;
for (size_t n = 0; n < args->n_ops_per_thread; n++) {
size_t buf_ptr = 0;
size_t vec_off = n * lb->args->vec_size;
for (int i = 0; i < lb->args->vec_size; ++i) {
size_t el_size = lb->args->rand
? worker_info->rand_sizes[i_size++]
: lb->args->el_size;
worker_info->iov[vec_off + i].iov_base =
&worker_info->buf[buf_ptr];
worker_info->iov[vec_off + i].iov_len = el_size;
worker_info->vec_sizes[n] += el_size;
buf_ptr += el_size;
}
}
worker->priv = worker_info;
return 0;
err_free_rand_sizes:
free(worker_info->rand_sizes);
err_free_iov:
free(worker_info->iov);
err_free_buf:
free(worker_info->buf);
err_free_worker_info:
free(worker_info);
return ret;
}
static void
log_free_worker(struct benchmark *bench, struct benchmark_args *args,
struct worker_info *worker)
{
struct log_worker_info *worker_info =
(struct log_worker_info *)worker->priv;
assert(worker_info);
free(worker_info->buf);
free(worker_info->iov);
free(worker_info->rand_sizes);
free(worker_info->vec_sizes);
free(worker_info);
}
static int
log_init(struct benchmark *bench, struct benchmark_args *args)
{
int ret = 0;
assert(bench);
assert(args != NULL);
assert(args->opts != NULL);
struct benchmark_info *bench_info;
struct log_bench *lb =
(struct log_bench *)malloc(sizeof(struct log_bench));
if (!lb) {
perror("malloc");
return -1;
}
lb->args = (struct prog_args *)args->opts;
lb->args->el_size = args->dsize;
if (lb->args->vec_size == 0)
lb->args->vec_size = 1;
if (lb->args->rand && lb->args->min_size > lb->args->el_size) {
errno = EINVAL;
ret = -1;
goto err_free_lb;
}
if (lb->args->rand && lb->args->min_size == lb->args->el_size)
lb->args->rand = false;
lb->psize =
MMAP_ALIGN_UP(POOL_HDR_SIZE +
args->n_ops_per_thread * args->n_threads *
lb->args->vec_size * lb->args->el_size);
if (lb->psize < PMEMLOG_MIN_POOL)
lb->psize = PMEMLOG_MIN_POOL;
if (args->is_poolset) {
if (lb->psize > args->fsize) {
fprintf(stderr, "insufficient size of poolset\n");
ret = -1;
goto err_free_lb;
}
lb->psize = 0;
}
bench_info = pmembench_get_info(bench);
if (!lb->args->fileio) {
if ((lb->plp = pmemlog_create(args->fname, lb->psize,
args->fmode)) == NULL) {
perror("pmemlog_create");
ret = -1;
goto err_free_lb;
}
bench_info->operation =
(lb->args->vec_size > 1) ? log_appendv : log_append;
} else {
int flags = O_CREAT | O_RDWR | O_SYNC;
if ((lb->fd = os_open(args->fname, flags, args->fmode)) < 0) {
perror(args->fname);
ret = -1;
goto err_free_lb;
}
if ((errno = os_posix_fallocate(lb->fd, 0, lb->psize)) != 0) {
perror("posix_fallocate");
ret = -1;
goto err_close;
}
bench_info->operation = (lb->args->vec_size > 1)
? fileio_appendv
: fileio_append;
}
if (!lb->args->no_warmup) {
size_t warmup_nops = args->n_threads * args->n_ops_per_thread;
if (do_warmup(lb, warmup_nops)) {
fprintf(stderr, "warmup failed\n");
ret = -1;
goto err_close;
}
}
pmembench_set_priv(bench, lb);
return 0;
err_close:
if (lb->args->fileio)
os_close(lb->fd);
else
pmemlog_close(lb->plp);
err_free_lb:
free(lb);
return ret;
}
static int
log_exit(struct benchmark *bench, struct benchmark_args *args)
{
struct log_bench *lb = (struct log_bench *)pmembench_get_priv(bench);
if (!lb->args->fileio)
pmemlog_close(lb->plp);
else
os_close(lb->fd);
free(lb);
return 0;
}
static struct benchmark_clo log_clo[6];
static struct benchmark_info log_append_info;
static struct benchmark_info log_read_info;
CONSTRUCTOR(log_costructor)
void
log_costructor(void)
{
log_clo[0].opt_short = 'r';
log_clo[0].opt_long = "random";
log_clo[0].descr = "Use random sizes for append/read";
log_clo[0].off = clo_field_offset(struct prog_args, rand);
log_clo[0].type = CLO_TYPE_FLAG;
log_clo[1].opt_short = 'S';
log_clo[1].opt_long = "seed";
log_clo[1].descr = "Random mode";
log_clo[1].off = clo_field_offset(struct prog_args, seed);
log_clo[1].def = "1";
log_clo[1].type = CLO_TYPE_UINT;
log_clo[1].type_uint.size = clo_field_size(struct prog_args, seed);
log_clo[1].type_uint.base = CLO_INT_BASE_DEC;
log_clo[1].type_uint.min = 1;
log_clo[1].type_uint.max = UINT_MAX;
log_clo[2].opt_short = 'i';
log_clo[2].opt_long = "file-io";
log_clo[2].descr = "File I/O mode";
log_clo[2].off = clo_field_offset(struct prog_args, fileio);
log_clo[2].type = CLO_TYPE_FLAG;
log_clo[3].opt_short = 'w';
log_clo[3].opt_long = "no-warmup";
log_clo[3].descr = "Don't do warmup", log_clo[3].type = CLO_TYPE_FLAG;
log_clo[3].off = clo_field_offset(struct prog_args, no_warmup);
log_clo[4].opt_short = 'm';
log_clo[4].opt_long = "min-size";
log_clo[4].descr = "Minimum size of append/read for "
"random mode";
log_clo[4].type = CLO_TYPE_UINT;
log_clo[4].off = clo_field_offset(struct prog_args, min_size);
log_clo[4].def = "1";
log_clo[4].type_uint.size = clo_field_size(struct prog_args, min_size);
log_clo[4].type_uint.base = CLO_INT_BASE_DEC;
log_clo[4].type_uint.min = 1;
log_clo[4].type_uint.max = UINT64_MAX;
log_clo[5].opt_short = 'v';
log_clo[5].opt_long = "vector";
log_clo[5].descr = "Vector size";
log_clo[5].off = clo_field_offset(struct prog_args, vec_size);
log_clo[5].def = "1";
log_clo[5].type = CLO_TYPE_INT;
log_clo[5].type_int.size = clo_field_size(struct prog_args, vec_size);
log_clo[5].type_int.base = CLO_INT_BASE_DEC;
log_clo[5].type_int.min = MIN_VEC_SIZE;
log_clo[5].type_int.max = INT_MAX;
log_append_info.name = "log_append";
log_append_info.brief = "Benchmark for pmemlog_append() "
"operation";
log_append_info.init = log_init;
log_append_info.exit = log_exit;
log_append_info.multithread = true;
log_append_info.multiops = true;
log_append_info.init_worker = log_init_worker;
log_append_info.free_worker = log_free_worker;
log_append_info.operation = NULL;
log_append_info.measure_time = true;
log_append_info.clos = log_clo;
log_append_info.nclos = ARRAY_SIZE(log_clo);
log_append_info.opts_size = sizeof(struct prog_args);
log_append_info.rm_file = true;
log_append_info.allow_poolset = true;
REGISTER_BENCHMARK(log_append_info);
log_read_info.name = "log_read";
log_read_info.brief = "Benchmark for pmemlog_walk() "
"operation";
log_read_info.init = log_init;
log_read_info.exit = log_exit;
log_read_info.multithread = true;
log_read_info.multiops = true;
log_read_info.init_worker = log_init_worker;
log_read_info.free_worker = log_free_worker;
log_read_info.operation = log_read_op;
log_read_info.measure_time = true;
log_read_info.clos = log_clo;
log_read_info.nclos = ARRAY_SIZE(log_clo) - 1;
log_read_info.opts_size = sizeof(struct prog_args);
log_read_info.rm_file = true;
log_read_info.allow_poolset = true;
REGISTER_BENCHMARK(log_read_info);
};