#include <ex_common.h>
#include <sys/stat.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include "libpmemobj.h"
#include "libpmem.h"
#include "libpmemlog.h"
#define LAYOUT_NAME "obj_pmemlog"
#define POOL_SIZE ((size_t)(1024 * 1024 * 100))
enum types {
LOG_TYPE,
LOG_HDR_TYPE,
BASE_TYPE,
MAX_TYPES
};
struct log_hdr {
PMEMoid next;
size_t size;
};
struct log {
struct log_hdr hdr;
char data[];
};
struct base {
PMEMoid head;
PMEMoid tail;
PMEMrwlock rwlock;
size_t bytes_written;
};
PMEMlogpool *
pmemlog_open(const char *path)
{
return (PMEMlogpool *)pmemobj_open(path, LAYOUT_NAME);
}
PMEMlogpool *
pmemlog_create(const char *path, size_t poolsize, mode_t mode)
{
return (PMEMlogpool *)pmemobj_create(path, LAYOUT_NAME,
poolsize, mode);
}
void
pmemlog_close(PMEMlogpool *plp)
{
pmemobj_close((PMEMobjpool *)plp);
}
size_t
pmemlog_nbyte(PMEMlogpool *plp)
{
return 0;
}
int
pmemlog_append(PMEMlogpool *plp, const void *buf, size_t count)
{
PMEMobjpool *pop = (PMEMobjpool *)plp;
PMEMoid baseoid = pmemobj_root(pop, sizeof(struct base));
struct base *bp = pmemobj_direct(baseoid);
jmp_buf env;
if (setjmp(env)) {
(void) pmemobj_tx_end();
return 1;
}
if (pmemobj_tx_begin(pop, env, TX_PARAM_RWLOCK, &bp->rwlock,
TX_PARAM_NONE))
return -1;
PMEMoid log = pmemobj_tx_alloc(count + sizeof(struct log_hdr),
LOG_TYPE);
struct log *logp = pmemobj_direct(log);
logp->hdr.size = count;
memcpy(logp->data, buf, count);
logp->hdr.next = OID_NULL;
pmemobj_tx_add_range(baseoid, 0, sizeof(struct base));
if (bp->tail.off == 0) {
bp->head = log;
} else {
pmemobj_tx_add_range(bp->tail, 0, sizeof(struct log));
((struct log *)pmemobj_direct(bp->tail))->hdr.next = log;
}
bp->tail = log;
bp->bytes_written += count;
pmemobj_tx_commit();
(void) pmemobj_tx_end();
return 0;
}
int
pmemlog_appendv(PMEMlogpool *plp, const struct iovec *iov, int iovcnt)
{
PMEMobjpool *pop = (PMEMobjpool *)plp;
PMEMoid baseoid = pmemobj_root(pop, sizeof(struct base));
struct base *bp = pmemobj_direct(baseoid);
jmp_buf env;
if (setjmp(env)) {
pmemobj_tx_end();
return 1;
}
if (pmemobj_tx_begin(pop, env, TX_PARAM_RWLOCK, &bp->rwlock,
TX_PARAM_NONE))
return -1;
pmemobj_tx_add_range(baseoid, 0, sizeof(struct base));
if (!OID_IS_NULL(bp->tail))
pmemobj_tx_add_range(bp->tail, 0, sizeof(struct log));
for (int i = 0; i < iovcnt; ++i) {
char *buf = iov[i].iov_base;
size_t count = iov[i].iov_len;
PMEMoid log = pmemobj_tx_alloc(count + sizeof(struct log_hdr),
LOG_TYPE);
struct log *logp = pmemobj_direct(log);
logp->hdr.size = count;
memcpy(logp->data, buf, count);
logp->hdr.next = OID_NULL;
if (bp->tail.off == 0) {
bp->head = log;
} else {
((struct log *)pmemobj_direct(bp->tail))->hdr.next =
log;
}
bp->tail = log;
bp->bytes_written += count;
}
pmemobj_tx_commit();
(void) pmemobj_tx_end();
return 0;
}
long long
pmemlog_tell(PMEMlogpool *plp)
{
PMEMobjpool *pop = (PMEMobjpool *)plp;
struct base *bp = pmemobj_direct(pmemobj_root(pop,
sizeof(struct base)));
if (pmemobj_rwlock_rdlock(pop, &bp->rwlock) != 0)
return 0;
long long bytes_written = bp->bytes_written;
pmemobj_rwlock_unlock(pop, &bp->rwlock);
return bytes_written;
}
void
pmemlog_rewind(PMEMlogpool *plp)
{
PMEMobjpool *pop = (PMEMobjpool *)plp;
PMEMoid baseoid = pmemobj_root(pop, sizeof(struct base));
struct base *bp = pmemobj_direct(baseoid);
jmp_buf env;
if (setjmp(env)) {
pmemobj_tx_end();
return;
}
if (pmemobj_tx_begin(pop, env, TX_PARAM_RWLOCK, &bp->rwlock,
TX_PARAM_NONE))
return;
pmemobj_tx_add_range(baseoid, 0, sizeof(struct base));
while (bp->head.off != 0) {
PMEMoid nextoid =
((struct log *)pmemobj_direct(bp->head))->hdr.next;
pmemobj_tx_free(bp->head);
bp->head = nextoid;
}
bp->head = OID_NULL;
bp->tail = OID_NULL;
bp->bytes_written = 0;
pmemobj_tx_commit();
(void) pmemobj_tx_end();
}
void
pmemlog_walk(PMEMlogpool *plp, size_t chunksize,
int (*process_chunk)(const void *buf, size_t len, void *arg), void *arg)
{
PMEMobjpool *pop = (PMEMobjpool *)plp;
struct base *bp = pmemobj_direct(pmemobj_root(pop,
sizeof(struct base)));
if (pmemobj_rwlock_rdlock(pop, &bp->rwlock) != 0)
return;
struct log *next = pmemobj_direct(bp->head);
while (next != NULL) {
(*process_chunk)(next->data, next->hdr.size, arg);
next = pmemobj_direct(next->hdr.next);
}
pmemobj_rwlock_unlock(pop, &bp->rwlock);
}
static int
process_chunk(const void *buf, size_t len, void *arg)
{
char *tmp = malloc(len + 1);
if (tmp == NULL) {
fprintf(stderr, "malloc error\n");
return 0;
}
memcpy(tmp, buf, len);
tmp[len] = '\0';
printf("log contains:\n");
printf("%s\n", tmp);
free(tmp);
return 1;
}
static int
count_iovec(char *arg)
{
int count = 1;
char *pch = strchr(arg, ':');
while (pch != NULL) {
++count;
pch = strchr(++pch, ':');
}
return count;
}
static void
fill_iovec(struct iovec *iov, char *arg)
{
char *pch = strtok(arg, ":");
while (pch != NULL) {
iov->iov_base = pch;
iov->iov_len = strlen((char *)iov->iov_base);
++iov;
pch = strtok(NULL, ":");
}
}
int
main(int argc, char *argv[])
{
if (argc < 2) {
fprintf(stderr, "usage: %s [o,c] file [val...]\n", argv[0]);
return 1;
}
PMEMlogpool *plp;
if (strncmp(argv[1], "c", 1) == 0) {
plp = pmemlog_create(argv[2], POOL_SIZE, CREATE_MODE_RW);
} else if (strncmp(argv[1], "o", 1) == 0) {
plp = pmemlog_open(argv[2]);
} else {
fprintf(stderr, "usage: %s [o,c] file [val...]\n", argv[0]);
return 1;
}
if (plp == NULL) {
perror("pmemlog_create/pmemlog_open");
return 1;
}
for (int i = 3; i < argc; i++) {
switch (*argv[i]) {
case 'a': {
printf("append: %s\n", argv[i] + 2);
if (pmemlog_append(plp, argv[i] + 2,
strlen(argv[i] + 2)))
fprintf(stderr, "pmemlog_append"
" error\n");
break;
}
case 'v': {
printf("appendv: %s\n", argv[i] + 2);
int count = count_iovec(argv[i] + 2);
struct iovec *iov = calloc(count,
sizeof(struct iovec));
fill_iovec(iov, argv[i] + 2);
if (pmemlog_appendv(plp, iov, count))
fprintf(stderr, "pmemlog_appendv"
" error\n");
free(iov);
break;
}
case 'r': {
printf("rewind\n");
pmemlog_rewind(plp);
break;
}
case 'w': {
printf("walk\n");
pmemlog_walk(plp, 0, process_chunk, NULL);
break;
}
case 'n': {
printf("nbytes: %zu\n", pmemlog_nbyte(plp));
break;
}
case 't': {
printf("offset: %lld\n", pmemlog_tell(plp));
break;
}
default: {
fprintf(stderr, "unrecognized command %s\n",
argv[i]);
break;
}
};
}
pmemlog_close(plp);
return 0;
}