#include <ex_common.h>
#include <sys/stat.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include "libpmemobj.h"
#include "libpmem.h"
#include "libpmemlog.h"
#define POOL_SIZE ((size_t)(1024 * 1024 * 100))
POBJ_LAYOUT_BEGIN(obj_pmemlog_macros);
POBJ_LAYOUT_ROOT(obj_pmemlog_macros, struct base);
POBJ_LAYOUT_TOID(obj_pmemlog_macros, struct log);
POBJ_LAYOUT_END(obj_pmemlog_macros);
struct log_hdr {
TOID(struct log) next;
size_t size;
};
struct log {
struct log_hdr hdr;
char data[];
};
struct base {
TOID(struct log) head;
TOID(struct log) tail;
PMEMrwlock rwlock;
size_t bytes_written;
};
PMEMlogpool *
pmemlog_open(const char *path)
{
return (PMEMlogpool *)pmemobj_open(path,
POBJ_LAYOUT_NAME(obj_pmemlog_macros));
}
PMEMlogpool *
pmemlog_create(const char *path, size_t poolsize, mode_t mode)
{
return (PMEMlogpool *)pmemobj_create(path,
POBJ_LAYOUT_NAME(obj_pmemlog_macros),
poolsize, mode);
}
void
pmemlog_close(PMEMlogpool *plp)
{
pmemobj_close((PMEMobjpool *)plp);
}
size_t
pmemlog_nbyte(PMEMlogpool *plp)
{
return 0;
}
int
pmemlog_append(PMEMlogpool *plp, const void *buf, size_t count)
{
PMEMobjpool *pop = (PMEMobjpool *)plp;
int retval = 0;
TOID(struct base) bp;
bp = POBJ_ROOT(pop, struct base);
TX_BEGIN_PARAM(pop, TX_PARAM_RWLOCK, &D_RW(bp)->rwlock, TX_PARAM_NONE) {
TOID(struct log) logp;
logp = TX_ALLOC(struct log, count + sizeof(struct log_hdr));
D_RW(logp)->hdr.size = count;
memcpy(D_RW(logp)->data, buf, count);
D_RW(logp)->hdr.next = TOID_NULL(struct log);
TX_ADD(bp);
if (TOID_IS_NULL(D_RO(bp)->tail)) {
D_RW(bp)->head = logp;
} else {
TX_ADD(D_RW(bp)->tail);
D_RW(D_RW(bp)->tail)->hdr.next = logp;
}
D_RW(bp)->tail = logp;
D_RW(bp)->bytes_written += count;
} TX_ONABORT {
retval = -1;
} TX_END
return retval;
}
int
pmemlog_appendv(PMEMlogpool *plp, const struct iovec *iov, int iovcnt)
{
PMEMobjpool *pop = (PMEMobjpool *)plp;
int retval = 0;
TOID(struct base) bp;
bp = POBJ_ROOT(pop, struct base);
TX_BEGIN_PARAM(pop, TX_PARAM_RWLOCK, &D_RW(bp)->rwlock, TX_PARAM_NONE) {
TX_ADD(bp);
if (!TOID_IS_NULL(D_RO(bp)->tail))
TX_ADD(D_RW(bp)->tail);
for (int i = 0; i < iovcnt; ++i) {
char *buf = (char *)iov[i].iov_base;
size_t count = iov[i].iov_len;
TOID(struct log) logp;
logp = TX_ALLOC(struct log,
count + sizeof(struct log_hdr));
D_RW(logp)->hdr.size = count;
memcpy(D_RW(logp)->data, buf, count);
D_RW(logp)->hdr.next = TOID_NULL(struct log);
if (TOID_IS_NULL(D_RO(bp)->tail))
D_RW(bp)->head = logp;
else
D_RW(D_RW(bp)->tail)->hdr.next = logp;
D_RW(bp)->tail = logp;
D_RW(bp)->bytes_written += count;
}
} TX_ONABORT {
retval = -1;
} TX_END
return retval;
}
long long
pmemlog_tell(PMEMlogpool *plp)
{
TOID(struct base) bp;
bp = POBJ_ROOT((PMEMobjpool *)plp, struct base);
return D_RO(bp)->bytes_written;
}
void
pmemlog_rewind(PMEMlogpool *plp)
{
PMEMobjpool *pop = (PMEMobjpool *)plp;
TOID(struct base) bp;
bp = POBJ_ROOT(pop, struct base);
TX_BEGIN_PARAM(pop, TX_PARAM_RWLOCK, &D_RW(bp)->rwlock, TX_PARAM_NONE) {
TX_ADD(bp);
while (!TOID_IS_NULL(D_RO(bp)->head)) {
TOID(struct log) nextp;
nextp = D_RW(D_RW(bp)->head)->hdr.next;
TX_FREE(D_RW(bp)->head);
D_RW(bp)->head = nextp;
}
D_RW(bp)->head = TOID_NULL(struct log);
D_RW(bp)->tail = TOID_NULL(struct log);
D_RW(bp)->bytes_written = 0;
} TX_END
}
void
pmemlog_walk(PMEMlogpool *plp, size_t chunksize,
int (*process_chunk)(const void *buf, size_t len, void *arg), void *arg)
{
PMEMobjpool *pop = (PMEMobjpool *)plp;
TOID(struct base) bp;
bp = POBJ_ROOT(pop, struct base);
if (pmemobj_rwlock_rdlock(pop, &D_RW(bp)->rwlock) != 0)
return;
TOID(struct log) next;
next = D_RO(bp)->head;
while (!TOID_IS_NULL(next)) {
(*process_chunk)(D_RO(next)->data,
D_RO(next)->hdr.size, arg);
next = D_RO(next)->hdr.next;
}
pmemobj_rwlock_unlock(pop, &D_RW(bp)->rwlock);
}
static int
process_chunk(const void *buf, size_t len, void *arg)
{
char *tmp = (char *)malloc(len + 1);
if (tmp == NULL) {
fprintf(stderr, "malloc error\n");
return 0;
}
memcpy(tmp, buf, len);
tmp[len] = '\0';
printf("log contains:\n");
printf("%s\n", tmp);
free(tmp);
return 1;
}
static int
count_iovec(char *arg)
{
int count = 1;
char *pch = strchr(arg, ':');
while (pch != NULL) {
++count;
pch = strchr(++pch, ':');
}
return count;
}
static void
fill_iovec(struct iovec *iov, char *arg)
{
char *pch = strtok(arg, ":");
while (pch != NULL) {
iov->iov_base = pch;
iov->iov_len = strlen((char *)iov->iov_base);
++iov;
pch = strtok(NULL, ":");
}
}
int
main(int argc, char *argv[])
{
if (argc < 2) {
fprintf(stderr, "usage: %s [o,c] file [val...]\n", argv[0]);
return 1;
}
PMEMlogpool *plp;
if (strncmp(argv[1], "c", 1) == 0) {
plp = pmemlog_create(argv[2], POOL_SIZE, CREATE_MODE_RW);
} else if (strncmp(argv[1], "o", 1) == 0) {
plp = pmemlog_open(argv[2]);
} else {
fprintf(stderr, "usage: %s [o,c] file [val...]\n", argv[0]);
return 1;
}
if (plp == NULL) {
perror("pmemlog_create/pmemlog_open");
return 1;
}
for (int i = 3; i < argc; i++) {
switch (*argv[i]) {
case 'a': {
printf("append: %s\n", argv[i] + 2);
if (pmemlog_append(plp, argv[i] + 2,
strlen(argv[i] + 2)))
fprintf(stderr, "pmemlog_append"
" error\n");
break;
}
case 'v': {
printf("appendv: %s\n", argv[i] + 2);
int count = count_iovec(argv[i] + 2);
struct iovec *iov = (struct iovec *)malloc(
count * sizeof(struct iovec));
if (iov == NULL) {
fprintf(stderr, "malloc error\n");
break;
}
fill_iovec(iov, argv[i] + 2);
if (pmemlog_appendv(plp, iov, count))
fprintf(stderr, "pmemlog_appendv"
" error\n");
free(iov);
break;
}
case 'r': {
printf("rewind\n");
pmemlog_rewind(plp);
break;
}
case 'w': {
printf("walk\n");
pmemlog_walk(plp, 0, process_chunk, NULL);
break;
}
case 'n': {
printf("nbytes: %zu\n", pmemlog_nbyte(plp));
break;
}
case 't': {
printf("offset: %lld\n", pmemlog_tell(plp));
break;
}
default: {
fprintf(stderr, "unrecognized command %s\n",
argv[i]);
break;
}
};
}
pmemlog_close(plp);
return 0;
}