#include <inttypes.h>
#include "redo.h"
#include "out.h"
#include "util.h"
#include "valgrind_internal.h"
#define REDO_FINISH_FLAG ((uint64_t)1<<0)
#define REDO_FLAG_MASK (~REDO_FINISH_FLAG)
struct redo_ctx {
void *base;
struct pmem_ops p_ops;
redo_check_offset_fn check_offset;
void *check_offset_ctx;
unsigned redo_num_entries;
};
struct redo_ctx *
redo_log_config_new(void *base,
const struct pmem_ops *p_ops,
redo_check_offset_fn check_offset,
void *check_offset_ctx,
unsigned redo_num_entries)
{
struct redo_ctx *cfg = Malloc(sizeof(*cfg));
if (!cfg) {
ERR("!can't create redo log config");
return NULL;
}
cfg->base = base;
cfg->p_ops = *p_ops;
cfg->check_offset = check_offset;
cfg->check_offset_ctx = check_offset_ctx;
cfg->redo_num_entries = redo_num_entries;
return cfg;
}
void
redo_log_config_delete(struct redo_ctx *ctx)
{
Free(ctx);
}
size_t
redo_log_nflags(const struct redo_log *redo, size_t nentries)
{
size_t ret = 0;
size_t i;
for (i = 0; i < nentries; i++) {
if (redo[i].offset & REDO_FINISH_FLAG)
ret++;
}
LOG(15, "redo %p nentries %zu nflags %zu", redo, nentries, ret);
return ret;
}
void
redo_log_store(const struct redo_ctx *ctx, struct redo_log *redo, size_t index,
uint64_t offset, uint64_t value)
{
LOG(15, "redo %p index %zu offset %" PRIu64 " value %" PRIu64,
redo, index, offset, value);
ASSERTeq(offset & REDO_FINISH_FLAG, 0);
ASSERT(index < ctx->redo_num_entries);
redo[index].offset = offset;
redo[index].value = value;
}
void
redo_log_store_last(const struct redo_ctx *ctx, struct redo_log *redo,
size_t index, uint64_t offset, uint64_t value)
{
LOG(15, "redo %p index %zu offset %" PRIu64 " value %" PRIu64,
redo, index, offset, value);
ASSERTeq(offset & REDO_FINISH_FLAG, 0);
ASSERT(index < ctx->redo_num_entries);
const struct pmem_ops *p_ops = &ctx->p_ops;
redo[index].value = value;
pmemops_persist(p_ops, redo, (index + 1) * sizeof(struct redo_log));
redo[index].offset = offset | REDO_FINISH_FLAG;
pmemops_persist(p_ops, &redo[index].offset, sizeof(redo[index].offset));
}
void
redo_log_set_last(const struct redo_ctx *ctx, struct redo_log *redo,
size_t index)
{
LOG(15, "redo %p index %zu", redo, index);
ASSERT(index < ctx->redo_num_entries);
const struct pmem_ops *p_ops = &ctx->p_ops;
pmemops_persist(p_ops, redo, (index + 1) * sizeof(struct redo_log));
redo[index].offset |= REDO_FINISH_FLAG;
pmemops_persist(p_ops, &redo[index].offset, sizeof(redo[index].offset));
}
void
redo_log_process(const struct redo_ctx *ctx, struct redo_log *redo,
size_t nentries)
{
LOG(15, "redo %p nentries %zu", redo, nentries);
#ifdef DEBUG
ASSERTeq(redo_log_check(ctx, redo, nentries), 0);
#endif
const struct pmem_ops *p_ops = &ctx->p_ops;
uint64_t *val;
while ((redo->offset & REDO_FINISH_FLAG) == 0) {
val = (uint64_t *)((uintptr_t)ctx->base + redo->offset);
VALGRIND_ADD_TO_TX(val, sizeof(*val));
*val = redo->value;
VALGRIND_REMOVE_FROM_TX(val, sizeof(*val));
pmemops_flush(p_ops, val, sizeof(uint64_t));
redo++;
}
uint64_t offset = redo->offset & REDO_FLAG_MASK;
val = (uint64_t *)((uintptr_t)ctx->base + offset);
VALGRIND_ADD_TO_TX(val, sizeof(*val));
*val = redo->value;
VALGRIND_REMOVE_FROM_TX(val, sizeof(*val));
pmemops_persist(p_ops, val, sizeof(uint64_t));
redo->offset = 0;
pmemops_persist(p_ops, &redo->offset, sizeof(redo->offset));
}
void
redo_log_recover(const struct redo_ctx *ctx, struct redo_log *redo,
size_t nentries)
{
LOG(15, "redo %p nentries %zu", redo, nentries);
ASSERTne(ctx, NULL);
size_t nflags = redo_log_nflags(redo, nentries);
ASSERT(nflags < 2);
if (nflags == 1)
redo_log_process(ctx, redo, nentries);
}
int
redo_log_check(const struct redo_ctx *ctx, struct redo_log *redo,
size_t nentries)
{
LOG(15, "redo %p nentries %zu", redo, nentries);
ASSERTne(ctx, NULL);
size_t nflags = redo_log_nflags(redo, nentries);
if (nflags > 1) {
LOG(15, "redo %p too many finish flags", redo);
return -1;
}
if (nflags == 1) {
void *cctx = ctx->check_offset_ctx;
while ((redo->offset & REDO_FINISH_FLAG) == 0) {
if (!ctx->check_offset(cctx, redo->offset)) {
LOG(15, "redo %p invalid offset %" PRIu64,
redo, redo->offset);
return -1;
}
redo++;
}
uint64_t offset = redo->offset & REDO_FLAG_MASK;
if (!ctx->check_offset(cctx, offset)) {
LOG(15, "redo %p invalid offset %" PRIu64,
redo, offset);
return -1;
}
}
return 0;
}
uint64_t
redo_log_offset(const struct redo_log *redo)
{
return redo->offset & REDO_FLAG_MASK;
}
int
redo_log_is_last(const struct redo_log *redo)
{
return redo->offset & REDO_FINISH_FLAG;
}
const struct pmem_ops *
redo_get_pmem_ops(const struct redo_ctx *ctx)
{
return &ctx->p_ops;
}