#include <inttypes.h>
#include <wchar.h>
#include "queue.h"
#include "ctree.h"
#include "obj.h"
#include "out.h"
#include "pmalloc.h"
#include "tx.h"
#include "valgrind_internal.h"
#define RANGE_FLAGS_MIN_BIT 48
#define RANGE_FLAGS_MASK (0xffffULL << RANGE_FLAGS_MIN_BIT)
#define RANGE_GET_SIZE(val) ((val) & ~RANGE_FLAGS_MASK)
#define RANGE_FLAG_NO_FLUSH (0x1ULL << RANGE_FLAGS_MIN_BIT)
struct tx_data {
SLIST_ENTRY(tx_data) tx_entry;
jmp_buf env;
};
struct tx {
PMEMobjpool *pop;
enum pobj_tx_stage stage;
int last_errnum;
struct lane_section *section;
SLIST_HEAD(txl, tx_lock_data) tx_locks;
SLIST_HEAD(txd, tx_data) tx_entries;
pmemobj_tx_callback stage_callback;
void *stage_callback_arg;
};
static struct tx *
get_tx()
{
static __thread struct tx tx;
return &tx;
}
struct tx_lock_data {
union {
PMEMmutex *mutex;
PMEMrwlock *rwlock;
} lock;
enum pobj_tx_param lock_type;
SLIST_ENTRY(tx_lock_data) tx_lock;
};
struct tx_undo_runtime {
struct pvector_context *ctx[MAX_UNDO_TYPES];
};
#define MAX_MEMOPS_ENTRIES_PER_TX_ALLOC 2
#define MAX_TX_ALLOC_RESERVATIONS (MAX_MEMOPS_ENTRIES /\
MAX_MEMOPS_ENTRIES_PER_TX_ALLOC)
struct lane_tx_runtime {
unsigned lane_idx;
struct ctree *ranges;
uint64_t cache_offset;
struct tx_undo_runtime undo;
struct pobj_action alloc_actv[MAX_TX_ALLOC_RESERVATIONS];
int actvcnt;
int actvundo;
};
struct tx_alloc_args {
uint64_t flags;
const void *copy_ptr;
size_t copy_size;
};
#define COPY_ARGS(flags, copy_ptr, copy_size)\
(struct tx_alloc_args){flags, copy_ptr, copy_size}
#define ALLOC_ARGS(flags)\
(struct tx_alloc_args){flags, NULL, 0}
struct tx_add_range_args {
PMEMobjpool *pop;
uint64_t offset;
uint64_t size;
uint64_t flags;
};
enum tx_clr_flag {
TX_CLR_FLAG_FREE = 1 << 0,
TX_CLR_FLAG_VG_CLEAN = 1 << 1,
TX_CLR_FLAG_VG_TX_REMOVE = 1 << 2,
TX_CLR_FLAG_FREE_IF_EXISTS = 1 << 3,
};
struct tx_parameters {
size_t cache_size;
size_t cache_threshold;
};
struct tx_parameters *
tx_params_new(void)
{
struct tx_parameters *tx_params = Malloc(sizeof(*tx_params));
if (tx_params == NULL)
return NULL;
tx_params->cache_size = TX_DEFAULT_RANGE_CACHE_SIZE;
tx_params->cache_threshold = TX_DEFAULT_RANGE_CACHE_THRESHOLD;
return tx_params;
}
void
tx_params_delete(struct tx_parameters *tx_params)
{
Free(tx_params);
}
static void
obj_tx_abort(int errnum, int user);
static inline int
obj_tx_abort_err(int errnum)
{
obj_tx_abort(errnum, 0);
return errnum;
}
static inline PMEMoid
obj_tx_abort_null(int errnum)
{
obj_tx_abort(errnum, 0);
return OID_NULL;
}
#define ASSERT_IN_TX(tx) do {\
if (tx->stage == TX_STAGE_NONE)\
FATAL("%s called outside of transaction", __func__);\
} while (0)
#define ASSERT_TX_STAGE_WORK(tx) do {\
if (tx->stage != TX_STAGE_WORK)\
FATAL("%s called in invalid stage %d", __func__, tx->stage);\
} while (0)
static int
constructor_tx_alloc(void *ctx, void *ptr, size_t usable_size, void *arg)
{
LOG(5, NULL);
ASSERTne(ptr, NULL);
ASSERTne(arg, NULL);
struct tx_alloc_args *args = arg;
VALGRIND_ADD_TO_TX(ptr, usable_size);
if (args->flags & POBJ_FLAG_ZERO)
memset(ptr, 0, usable_size);
if (args->copy_ptr && args->copy_size != 0) {
memcpy(ptr, args->copy_ptr, args->copy_size);
}
return 0;
}
static int
constructor_tx_add_range(void *ctx, void *ptr, size_t usable_size, void *arg)
{
LOG(5, NULL);
PMEMobjpool *pop = ctx;
ASSERTne(ptr, NULL);
ASSERTne(arg, NULL);
struct tx_add_range_args *args = arg;
struct tx_range *range = ptr;
const struct pmem_ops *p_ops = &pop->p_ops;
VALGRIND_ADD_TO_TX(range, sizeof(struct tx_range) + args->size);
range->offset = args->offset;
range->size = args->size;
void *src = OBJ_OFF_TO_PTR(args->pop, args->offset);
pmemops_flush(p_ops, range, sizeof(struct tx_range));
pmemops_memcpy_persist(p_ops, range->data, src, args->size);
VALGRIND_REMOVE_FROM_TX(range, sizeof(struct tx_range) + args->size);
VALGRIND_ADD_TO_TX(src, args->size);
return 0;
}
static inline void
tx_set_state(PMEMobjpool *pop, struct lane_tx_layout *layout, uint64_t state)
{
layout->state = state;
pmemops_persist(&pop->p_ops, &layout->state, sizeof(layout->state));
}
static void
tx_clear_vec_entry(PMEMobjpool *pop, uint64_t *entry)
{
VALGRIND_ADD_TO_TX(entry, sizeof(*entry));
*entry = 0;
pmemops_persist(&pop->p_ops, entry, sizeof(*entry));
VALGRIND_REMOVE_FROM_TX(entry, sizeof(*entry));
}
static void
tx_free_vec_entry(PMEMobjpool *pop, uint64_t *entry)
{
pfree(pop, entry);
}
static void
tx_free_existing_vec_entry(PMEMobjpool *pop, uint64_t *entry)
{
if (palloc_is_allocated(&pop->heap, *entry))
pfree(pop, entry);
else
tx_clear_vec_entry(pop, entry);
}
static void
tx_clear_undo_log_vg(PMEMobjpool *pop, uint64_t off, enum tx_clr_flag flags)
{
#ifdef USE_VG_PMEMCHECK
if (!On_valgrind)
return;
if (flags & TX_CLR_FLAG_VG_CLEAN) {
void *ptr = OBJ_OFF_TO_PTR(pop, off);
size_t size = palloc_usable_size(&pop->heap, off);
VALGRIND_SET_CLEAN(ptr, size);
}
if (flags & TX_CLR_FLAG_VG_TX_REMOVE) {
size_t size = palloc_usable_size(&pop->heap, off);
VALGRIND_REMOVE_FROM_TX(OBJ_OFF_TO_PTR(pop, off), size);
}
#endif
}
static void
tx_clear_undo_log(PMEMobjpool *pop, struct pvector_context *undo, int nskip,
enum tx_clr_flag flags)
{
LOG(7, NULL);
uint64_t val;
while ((val = pvector_last(undo)) != 0) {
tx_clear_undo_log_vg(pop, val, flags);
if (nskip > 0) {
nskip--;
pvector_pop_back(undo, tx_clear_vec_entry);
continue;
}
if (flags & TX_CLR_FLAG_FREE) {
pvector_pop_back(undo, tx_free_vec_entry);
} else if (flags & TX_CLR_FLAG_FREE_IF_EXISTS) {
pvector_pop_back(undo, tx_free_existing_vec_entry);
} else {
pvector_pop_back(undo, tx_clear_vec_entry);
}
}
}
static void
tx_abort_alloc(PMEMobjpool *pop, struct tx_undo_runtime *tx_rt,
struct lane_tx_runtime *lane)
{
LOG(5, NULL);
enum tx_clr_flag flags = TX_CLR_FLAG_VG_TX_REMOVE |
TX_CLR_FLAG_VG_CLEAN |
(lane ? TX_CLR_FLAG_FREE : TX_CLR_FLAG_FREE_IF_EXISTS);
tx_clear_undo_log(pop, tx_rt->ctx[UNDO_ALLOC],
lane ? lane->actvundo : 0,
flags);
}
static void
tx_abort_free(PMEMobjpool *pop, struct tx_undo_runtime *tx_rt)
{
LOG(5, NULL);
tx_clear_undo_log(pop, tx_rt->ctx[UNDO_FREE], 0, 0);
}
struct tx_range_data {
void *begin;
void *end;
SLIST_ENTRY(tx_range_data) tx_range;
};
SLIST_HEAD(txr, tx_range_data);
static void
tx_remove_range(struct txr *tx_ranges, void *begin, void *end)
{
struct tx_range_data *txr = SLIST_FIRST(tx_ranges);
while (txr) {
if (begin >= txr->end || end < txr->begin) {
txr = SLIST_NEXT(txr, tx_range);
continue;
}
LOG(4, "detected PMEM lock in undo log; "
"range %p-%p, lock %p-%p",
txr->begin, txr->end, begin, end);
if (begin > txr->begin) {
struct tx_range_data *txrn = Malloc(sizeof(*txrn));
if (txrn == NULL)
FATAL("!Malloc");
txrn->begin = txr->begin;
txrn->end = begin;
LOG(4, "range split; %p-%p", txrn->begin, txrn->end);
SLIST_INSERT_HEAD(tx_ranges, txrn, tx_range);
}
if (end < txr->end) {
struct tx_range_data *txrn = Malloc(sizeof(*txrn));
if (txrn == NULL)
FATAL("!Malloc");
txrn->begin = end;
txrn->end = txr->end;
LOG(4, "range split; %p-%p", txrn->begin, txrn->end);
SLIST_INSERT_HEAD(tx_ranges, txrn, tx_range);
}
struct tx_range_data *next = SLIST_NEXT(txr, tx_range);
SLIST_REMOVE(tx_ranges, txr, tx_range_data, tx_range);
Free(txr);
txr = next;
}
}
static void
tx_restore_range(PMEMobjpool *pop, struct tx *tx, struct tx_range *range)
{
COMPILE_ERROR_ON(sizeof(PMEMmutex) != _POBJ_CL_SIZE);
COMPILE_ERROR_ON(sizeof(PMEMrwlock) != _POBJ_CL_SIZE);
COMPILE_ERROR_ON(sizeof(PMEMcond) != _POBJ_CL_SIZE);
struct lane_tx_runtime *runtime =
(struct lane_tx_runtime *)tx->section->runtime;
ASSERTne(runtime, NULL);
struct txr tx_ranges;
SLIST_INIT(&tx_ranges);
struct tx_range_data *txr;
txr = Malloc(sizeof(*txr));
if (txr == NULL) {
FATAL("!Malloc");
}
txr->begin = OBJ_OFF_TO_PTR(pop, range->offset);
txr->end = (char *)txr->begin + range->size;
SLIST_INSERT_HEAD(&tx_ranges, txr, tx_range);
struct tx_lock_data *txl;
SLIST_FOREACH(txl, &tx->tx_locks, tx_lock) {
void *lock_begin = txl->lock.mutex;
void *lock_end = (char *)lock_begin + _POBJ_CL_SIZE;
tx_remove_range(&tx_ranges, lock_begin, lock_end);
}
ASSERT(!SLIST_EMPTY(&tx_ranges));
void *dst_ptr = OBJ_OFF_TO_PTR(pop, range->offset);
while (!SLIST_EMPTY(&tx_ranges)) {
txr = SLIST_FIRST(&tx_ranges);
SLIST_REMOVE_HEAD(&tx_ranges, tx_range);
ASSERT((char *)txr->begin >= (char *)dst_ptr);
uint8_t *src = &range->data[
(char *)txr->begin - (char *)dst_ptr];
ASSERT((char *)txr->end >= (char *)txr->begin);
size_t size = (size_t)((char *)txr->end - (char *)txr->begin);
pmemops_memcpy_persist(&pop->p_ops, txr->begin, src, size);
Free(txr);
}
}
static void
tx_foreach_set(PMEMobjpool *pop, struct tx *tx, struct tx_undo_runtime *tx_rt,
void (*cb)(PMEMobjpool *pop, struct tx *tx, struct tx_range *range))
{
LOG(7, NULL);
struct tx_range *range = NULL;
uint64_t off;
struct pvector_context *ctx = tx_rt->ctx[UNDO_SET];
for (off = pvector_first(ctx); off != 0; off = pvector_next(ctx)) {
range = OBJ_OFF_TO_PTR(pop, off);
cb(pop, tx, range);
}
struct tx_range_cache *cache;
uint64_t cache_size;
ctx = tx_rt->ctx[UNDO_SET_CACHE];
for (off = pvector_first(ctx); off != 0; off = pvector_next(ctx)) {
cache = OBJ_OFF_TO_PTR(pop, off);
cache_size = palloc_usable_size(&pop->heap, off);
for (uint64_t cache_offset = 0; cache_offset < cache_size; ) {
range = (struct tx_range *)
((char *)cache + cache_offset);
if (range->offset == 0 || range->size == 0)
break;
cb(pop, tx, range);
size_t amask = pop->conversion_flags &
CONVERSION_FLAG_OLD_SET_CACHE ?
TX_RANGE_MASK_LEGACY : TX_RANGE_MASK;
cache_offset += TX_ALIGN_SIZE(range->size, amask) +
sizeof(struct tx_range);
}
}
}
static void
tx_abort_restore_range(PMEMobjpool *pop, struct tx *tx, struct tx_range *range)
{
tx_restore_range(pop, tx, range);
VALGRIND_REMOVE_FROM_TX(OBJ_OFF_TO_PTR(pop, range->offset),
range->size);
}
static void
tx_abort_recover_range(PMEMobjpool *pop, struct tx *tx, struct tx_range *range)
{
ASSERTeq(tx, NULL);
void *ptr = OBJ_OFF_TO_PTR(pop, range->offset);
pmemops_memcpy_persist(&pop->p_ops, ptr, range->data, range->size);
}
static void
tx_clear_set_cache_but_first(PMEMobjpool *pop, struct tx_undo_runtime *tx_rt,
struct tx *tx, enum tx_clr_flag vg_flags)
{
LOG(4, NULL);
struct pvector_context *cache_undo = tx_rt->ctx[UNDO_SET_CACHE];
uint64_t first_cache = pvector_first(cache_undo);
if (first_cache == 0)
return;
uint64_t off;
int zero_all = tx == NULL;
while ((off = pvector_last(cache_undo)) != first_cache) {
tx_clear_undo_log_vg(pop, off, vg_flags);
pvector_pop_back(cache_undo, tx_free_vec_entry);
zero_all = 1;
}
tx_clear_undo_log_vg(pop, first_cache, vg_flags);
struct tx_range_cache *cache = OBJ_OFF_TO_PTR(pop, first_cache);
size_t sz;
if (zero_all) {
sz = palloc_usable_size(&pop->heap, first_cache);
} else {
ASSERTne(tx, NULL);
struct lane_tx_runtime *r = tx->section->runtime;
sz = r->cache_offset;
}
if (sz) {
VALGRIND_ADD_TO_TX(cache, sz);
pmemops_memset_persist(&pop->p_ops, cache, 0, sz);
VALGRIND_REMOVE_FROM_TX(cache, sz);
}
#ifdef DEBUG
if (!zero_all &&
!pop->tx_debug_skip_expensive_checks) {
uint64_t usable_size = palloc_usable_size(&pop->heap,
first_cache);
ASSERTeq(util_is_zeroed(cache, usable_size), 1);
}
#endif
}
static void
tx_abort_set(PMEMobjpool *pop, struct tx_undo_runtime *tx_rt, int recovery)
{
LOG(7, NULL);
struct tx *tx = recovery ? NULL : get_tx();
if (recovery)
tx_foreach_set(pop, NULL, tx_rt, tx_abort_recover_range);
else
tx_foreach_set(pop, tx, tx_rt, tx_abort_restore_range);
if (recovery)
tx_clear_undo_log(pop, tx_rt->ctx[UNDO_SET_CACHE], 0,
TX_CLR_FLAG_FREE | TX_CLR_FLAG_VG_CLEAN);
else
tx_clear_set_cache_but_first(pop, tx_rt, tx,
TX_CLR_FLAG_VG_CLEAN);
tx_clear_undo_log(pop, tx_rt->ctx[UNDO_SET], 0,
TX_CLR_FLAG_FREE | TX_CLR_FLAG_VG_CLEAN);
}
static void
tx_post_commit_alloc(PMEMobjpool *pop, struct tx_undo_runtime *tx_rt)
{
LOG(7, NULL);
tx_clear_undo_log(pop, tx_rt->ctx[UNDO_ALLOC], 0,
TX_CLR_FLAG_VG_TX_REMOVE);
}
static void
tx_post_commit_free(PMEMobjpool *pop, struct tx_undo_runtime *tx_rt)
{
LOG(7, NULL);
tx_clear_undo_log(pop, tx_rt->ctx[UNDO_FREE], 0,
TX_CLR_FLAG_FREE | TX_CLR_FLAG_VG_TX_REMOVE);
}
#ifdef USE_VG_PMEMCHECK
static void
tx_post_commit_range_vg_tx_remove(PMEMobjpool *pop, struct tx *tx,
struct tx_range *range)
{
VALGRIND_REMOVE_FROM_TX(OBJ_OFF_TO_PTR(pop, range->offset),
range->size);
}
#endif
static void
tx_post_commit_set(PMEMobjpool *pop, struct tx *tx,
struct tx_undo_runtime *tx_rt, int recovery)
{
LOG(7, NULL);
#ifdef USE_VG_PMEMCHECK
if (On_valgrind)
tx_foreach_set(pop, tx, tx_rt,
tx_post_commit_range_vg_tx_remove);
#endif
if (recovery)
tx_clear_undo_log(pop, tx_rt->ctx[UNDO_SET_CACHE], 0,
TX_CLR_FLAG_FREE);
else
tx_clear_set_cache_but_first(pop, tx_rt, tx, 0);
tx_clear_undo_log(pop, tx_rt->ctx[UNDO_SET], 0, TX_CLR_FLAG_FREE);
}
static void
tx_flush_range(uint64_t offset, uint64_t size_flags, void *ctx)
{
if (size_flags & RANGE_FLAG_NO_FLUSH)
return;
PMEMobjpool *pop = ctx;
pmemops_flush(&pop->p_ops, OBJ_OFF_TO_PTR(pop, offset),
RANGE_GET_SIZE(size_flags));
}
static void
tx_fulfill_reservations(struct tx *tx)
{
struct lane_tx_runtime *lane =
(struct lane_tx_runtime *)tx->section->runtime;
if (lane->actvcnt == 0)
return;
PMEMobjpool *pop = tx->pop;
struct redo_log *redo = pmalloc_redo_hold(pop);
struct operation_context ctx;
operation_init(&ctx, pop, pop->redo, redo);
palloc_publish(&pop->heap, lane->alloc_actv, lane->actvcnt, &ctx);
lane->actvcnt = 0;
lane->actvundo = 0;
pmalloc_redo_release(pop);
}
static void
tx_cancel_reservations(PMEMobjpool *pop, struct lane_tx_runtime *lane)
{
palloc_cancel(&pop->heap, lane->alloc_actv, lane->actvcnt);
lane->actvcnt = 0;
lane->actvundo = 0;
}
static void
tx_pre_commit(PMEMobjpool *pop, struct tx *tx, struct lane_tx_runtime *lane)
{
LOG(5, NULL);
ASSERTne(tx->section->runtime, NULL);
tx_fulfill_reservations(tx);
ctree_delete_cb(lane->ranges, tx_flush_range, pop);
lane->ranges = NULL;
}
static int
tx_rebuild_undo_runtime(PMEMobjpool *pop, struct lane_tx_layout *layout,
struct tx_undo_runtime *tx_rt)
{
LOG(7, NULL);
int i;
for (i = UNDO_ALLOC; i < MAX_UNDO_TYPES; ++i) {
if (tx_rt->ctx[i] == NULL)
tx_rt->ctx[i] = pvector_new(pop, &layout->undo_log[i]);
else
pvector_reinit(tx_rt->ctx[i]);
if (tx_rt->ctx[i] == NULL)
goto error_init;
}
return 0;
error_init:
for (--i; i >= 0; --i)
pvector_delete(tx_rt->ctx[i]);
return -1;
}
static void
tx_destroy_undo_runtime(struct tx_undo_runtime *tx)
{
LOG(7, NULL);
for (int i = UNDO_ALLOC; i < MAX_UNDO_TYPES; ++i)
pvector_delete(tx->ctx[i]);
}
static void
tx_post_commit(PMEMobjpool *pop, struct tx *tx, struct lane_tx_layout *layout,
int recovery)
{
LOG(7, NULL);
struct tx_undo_runtime *tx_rt;
struct tx_undo_runtime new_rt = { .ctx = {NULL, } };
if (recovery) {
if (tx_rebuild_undo_runtime(pop, layout, &new_rt) != 0)
FATAL("!Cannot rebuild runtime undo log state");
tx_rt = &new_rt;
} else {
struct lane_tx_runtime *lane = tx->section->runtime;
tx_rt = &lane->undo;
}
tx_post_commit_set(pop, tx, tx_rt, recovery);
tx_post_commit_alloc(pop, tx_rt);
tx_post_commit_free(pop, tx_rt);
if (recovery)
tx_destroy_undo_runtime(tx_rt);
}
#ifdef USE_VG_MEMCHECK
static void
tx_abort_register_valgrind(PMEMobjpool *pop, struct pvector_context *ctx)
{
uint64_t off;
for (off = pvector_first(ctx); off != 0; off = pvector_next(ctx)) {
palloc_vg_register_off(&pop->heap, off);
}
}
#endif
static void
tx_abort(PMEMobjpool *pop, struct lane_tx_runtime *lane,
struct lane_tx_layout *layout, int recovery)
{
LOG(7, NULL);
struct tx_undo_runtime *tx_rt;
struct tx_undo_runtime new_rt = { .ctx = {NULL, } };
if (recovery) {
if (tx_rebuild_undo_runtime(pop, layout, &new_rt) != 0)
FATAL("!Cannot rebuild runtime undo log state");
tx_rt = &new_rt;
} else {
tx_rt = &lane->undo;
}
#ifdef USE_VG_MEMCHECK
if (recovery && On_valgrind) {
tx_abort_register_valgrind(pop, tx_rt->ctx[UNDO_SET]);
tx_abort_register_valgrind(pop, tx_rt->ctx[UNDO_ALLOC]);
tx_abort_register_valgrind(pop, tx_rt->ctx[UNDO_SET_CACHE]);
}
#endif
tx_abort_set(pop, tx_rt, recovery);
tx_abort_alloc(pop, tx_rt, lane);
tx_abort_free(pop, tx_rt);
if (recovery) {
tx_destroy_undo_runtime(tx_rt);
} else {
tx_cancel_reservations(pop, lane);
ASSERTne(lane, NULL);
ctree_delete(lane->ranges);
lane->ranges = NULL;
}
}
PMEMobjpool *
tx_get_pop(void)
{
return get_tx()->pop;
}
static int
add_to_tx_and_lock(struct tx *tx, enum pobj_tx_param type, void *lock)
{
LOG(15, NULL);
int retval = 0;
struct tx_lock_data *txl;
SLIST_FOREACH(txl, &tx->tx_locks, tx_lock) {
if (memcmp(&txl->lock, &lock, sizeof(lock)) == 0)
return 0;
}
txl = Malloc(sizeof(*txl));
if (txl == NULL)
return ENOMEM;
txl->lock_type = type;
switch (txl->lock_type) {
case TX_PARAM_MUTEX:
txl->lock.mutex = lock;
retval = pmemobj_mutex_lock(tx->pop,
txl->lock.mutex);
if (retval) {
errno = retval;
ERR("!pmemobj_mutex_lock");
}
break;
case TX_PARAM_RWLOCK:
txl->lock.rwlock = lock;
retval = pmemobj_rwlock_wrlock(tx->pop,
txl->lock.rwlock);
if (retval) {
errno = retval;
ERR("!pmemobj_rwlock_wrlock");
}
break;
default:
ERR("Unrecognized lock type");
ASSERT(0);
break;
}
SLIST_INSERT_HEAD(&tx->tx_locks, txl, tx_lock);
return retval;
}
static void
release_and_free_tx_locks(struct tx *tx)
{
LOG(15, NULL);
while (!SLIST_EMPTY(&tx->tx_locks)) {
struct tx_lock_data *tx_lock = SLIST_FIRST(&tx->tx_locks);
SLIST_REMOVE_HEAD(&tx->tx_locks, tx_lock);
switch (tx_lock->lock_type) {
case TX_PARAM_MUTEX:
pmemobj_mutex_unlock(tx->pop,
tx_lock->lock.mutex);
break;
case TX_PARAM_RWLOCK:
pmemobj_rwlock_unlock(tx->pop,
tx_lock->lock.rwlock);
break;
default:
ERR("Unrecognized lock type");
ASSERT(0);
break;
}
Free(tx_lock);
}
}
static PMEMoid
tx_alloc_common(struct tx *tx, size_t size, type_num_t type_num,
palloc_constr constructor, struct tx_alloc_args args)
{
LOG(3, NULL);
if (size > PMEMOBJ_MAX_ALLOC_SIZE) {
ERR("requested size too large");
return obj_tx_abort_null(ENOMEM);
}
struct lane_tx_runtime *lane =
(struct lane_tx_runtime *)tx->section->runtime;
PMEMobjpool *pop = tx->pop;
if ((lane->actvcnt + 1) == MAX_TX_ALLOC_RESERVATIONS) {
tx_fulfill_reservations(tx);
}
int rs = lane->actvcnt;
uint64_t flags = args.flags;
if (palloc_reserve(&pop->heap, size, constructor, &args, type_num, 0,
CLASS_ID_FROM_FLAG(flags), &lane->alloc_actv[rs]) != 0) {
ERR("out of memory");
return obj_tx_abort_null(ENOMEM);
}
lane->actvcnt++;
PMEMoid retoid = OID_NULL;
retoid.off = lane->alloc_actv[rs].heap.offset;
retoid.pool_uuid_lo = pop->uuid_lo;
uint64_t range_flags = (flags & POBJ_FLAG_NO_FLUSH) ?
RANGE_FLAG_NO_FLUSH : 0;
size = palloc_usable_size(&pop->heap, retoid.off);
ASSERTeq(size & RANGE_FLAGS_MASK, 0);
if (ctree_insert_unlocked(lane->ranges, retoid.off,
size | range_flags) != 0)
goto err_oom;
uint64_t *entry_offset = pvector_push_back(lane->undo.ctx[UNDO_ALLOC]);
if (entry_offset == NULL)
goto err_oom;
*entry_offset = retoid.off;
pmemops_persist(&pop->p_ops, entry_offset, sizeof(*entry_offset));
lane->actvundo++;
return retoid;
err_oom:
ERR("out of memory");
return obj_tx_abort_null(ENOMEM);
}
static PMEMoid
tx_realloc_common(struct tx *tx, PMEMoid oid, size_t size, uint64_t type_num,
palloc_constr constructor_alloc,
palloc_constr constructor_realloc,
uint64_t flags)
{
LOG(3, NULL);
if (size > PMEMOBJ_MAX_ALLOC_SIZE) {
ERR("requested size too large");
return obj_tx_abort_null(ENOMEM);
}
struct lane_tx_runtime *lane =
(struct lane_tx_runtime *)tx->section->runtime;
if (OBJ_OID_IS_NULL(oid))
return tx_alloc_common(tx, size, (type_num_t)type_num,
constructor_alloc, ALLOC_ARGS(flags));
ASSERT(OBJ_OID_IS_VALID(tx->pop, oid));
if (size == 0) {
if (pmemobj_tx_free(oid)) {
ERR("pmemobj_tx_free failed");
return oid;
} else {
return OID_NULL;
}
}
void *ptr = OBJ_OFF_TO_PTR(tx->pop, oid.off);
size_t old_size = palloc_usable_size(&tx->pop->heap, oid.off);
size_t copy_size = old_size < size ? old_size : size;
PMEMoid new_obj = tx_alloc_common(tx, size, (type_num_t)type_num,
constructor_realloc, COPY_ARGS(flags, ptr, copy_size));
if (!OBJ_OID_IS_NULL(new_obj)) {
if (pmemobj_tx_free(oid)) {
ERR("pmemobj_tx_free failed");
pvector_pop_back(lane->undo.ctx[UNDO_ALLOC],
tx_free_vec_entry);
return OID_NULL;
}
}
return new_obj;
}
int
pmemobj_tx_begin(PMEMobjpool *pop, jmp_buf env, ...)
{
LOG(3, NULL);
int err = 0;
struct tx *tx = get_tx();
struct lane_tx_runtime *lane = NULL;
if (tx->stage == TX_STAGE_WORK) {
ASSERTne(tx->section, NULL);
if (tx->pop != pop) {
ERR("nested transaction for different pool");
return obj_tx_abort_err(EINVAL);
}
VALGRIND_START_TX;
} else if (tx->stage == TX_STAGE_NONE) {
VALGRIND_START_TX;
unsigned idx = lane_hold(pop, &tx->section,
LANE_SECTION_TRANSACTION);
lane = tx->section->runtime;
VALGRIND_ANNOTATE_NEW_MEMORY(lane, sizeof(*lane));
SLIST_INIT(&tx->tx_entries);
SLIST_INIT(&tx->tx_locks);
lane->ranges = ctree_new();
lane->cache_offset = 0;
lane->lane_idx = idx;
lane->actvcnt = 0;
lane->actvundo = 0;
struct lane_tx_layout *layout =
(struct lane_tx_layout *)tx->section->layout;
if (tx_rebuild_undo_runtime(pop, layout, &lane->undo) != 0) {
tx->stage = TX_STAGE_ONABORT;
err = errno;
return err;
}
tx->pop = pop;
} else {
FATAL("Invalid stage %d to begin new transaction", tx->stage);
}
struct tx_data *txd = Malloc(sizeof(*txd));
if (txd == NULL) {
err = errno;
ERR("!Malloc");
goto err_abort;
}
tx->last_errnum = 0;
if (env != NULL)
memcpy(txd->env, env, sizeof(jmp_buf));
else
memset(txd->env, 0, sizeof(jmp_buf));
SLIST_INSERT_HEAD(&tx->tx_entries, txd, tx_entry);
tx->stage = TX_STAGE_WORK;
va_list argp;
va_start(argp, env);
enum pobj_tx_param param_type;
while ((param_type = va_arg(argp, enum pobj_tx_param)) !=
TX_PARAM_NONE) {
if (param_type == TX_PARAM_CB) {
pmemobj_tx_callback cb =
va_arg(argp, pmemobj_tx_callback);
void *arg = va_arg(argp, void *);
if (tx->stage_callback &&
(tx->stage_callback != cb ||
tx->stage_callback_arg != arg)) {
FATAL("transaction callback is already set, "
"old %p new %p old_arg %p new_arg %p",
tx->stage_callback, cb,
tx->stage_callback_arg, arg);
}
tx->stage_callback = cb;
tx->stage_callback_arg = arg;
} else {
err = add_to_tx_and_lock(tx, param_type,
va_arg(argp, void *));
if (err) {
va_end(argp);
goto err_abort;
}
}
}
va_end(argp);
ASSERT(err == 0);
return 0;
err_abort:
if (tx->stage == TX_STAGE_WORK)
obj_tx_abort(err, 0);
else
tx->stage = TX_STAGE_ONABORT;
return err;
}
int
pmemobj_tx_lock(enum pobj_tx_param type, void *lockp)
{
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
return add_to_tx_and_lock(tx, type, lockp);
}
static void
obj_tx_callback(struct tx *tx)
{
if (!tx->stage_callback)
return;
struct tx_data *txd = SLIST_FIRST(&tx->tx_entries);
if (SLIST_NEXT(txd, tx_entry) == NULL)
tx->stage_callback(tx->pop, tx->stage, tx->stage_callback_arg);
}
enum pobj_tx_stage
pmemobj_tx_stage(void)
{
LOG(3, NULL);
return get_tx()->stage;
}
static void
obj_tx_abort(int errnum, int user)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
ASSERT(tx->section != NULL);
if (errnum == 0)
errnum = ECANCELED;
tx->stage = TX_STAGE_ONABORT;
struct lane_tx_runtime *lane = tx->section->runtime;
struct tx_data *txd = SLIST_FIRST(&tx->tx_entries);
if (SLIST_NEXT(txd, tx_entry) == NULL) {
struct lane_tx_layout *layout =
(struct lane_tx_layout *)tx->section->layout;
tx_abort(tx->pop, lane, layout, 0 );
lane_release(tx->pop);
tx->section = NULL;
}
tx->last_errnum = errnum;
errno = errnum;
if (user)
ERR("!explicit transaction abort");
obj_tx_callback(tx);
if (!util_is_zeroed(txd->env, sizeof(jmp_buf)))
longjmp(txd->env, errnum);
}
void
pmemobj_tx_abort(int errnum)
{
obj_tx_abort(errnum, 1);
}
int
pmemobj_tx_errno(void)
{
LOG(3, NULL);
return get_tx()->last_errnum;
}
static void
tx_post_commit_cleanup(PMEMobjpool *pop,
struct lane_section *section, int detached)
{
struct lane_tx_runtime *runtime =
(struct lane_tx_runtime *)section->runtime;
struct lane_tx_layout *layout =
(struct lane_tx_layout *)section->layout;
struct tx *tx = get_tx();
if (detached) {
#if defined(USE_VG_HELGRIND) || defined(USE_VG_DRD)
if (On_valgrind) {
VALGRIND_ANNOTATE_NEW_MEMORY(layout, sizeof(*layout));
VALGRIND_ANNOTATE_NEW_MEMORY(runtime, sizeof(*runtime));
int ret = tx_rebuild_undo_runtime(pop, layout,
&runtime->undo);
ASSERTeq(ret, 0);
}
#endif
lane_attach(pop, runtime->lane_idx);
tx->pop = pop;
tx->section = section;
tx->stage = TX_STAGE_ONCOMMIT;
}
tx_post_commit(pop, tx, layout, 0 );
tx_set_state(pop, layout, TX_STATE_NONE);
runtime->cache_offset = 0;
ASSERTeq(pvector_nvalues(runtime->undo.ctx[UNDO_ALLOC]), 0);
ASSERTeq(pvector_nvalues(runtime->undo.ctx[UNDO_SET]), 0);
ASSERTeq(pvector_nvalues(runtime->undo.ctx[UNDO_FREE]), 0);
ASSERT(pvector_nvalues(runtime->undo.ctx[UNDO_FREE]) == 0 ||
pvector_nvalues(runtime->undo.ctx[UNDO_FREE]) == 1);
lane_release(pop);
}
void
pmemobj_tx_commit(void)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
obj_tx_callback(tx);
ASSERT(tx->section != NULL);
struct lane_tx_runtime *lane =
(struct lane_tx_runtime *)tx->section->runtime;
struct tx_data *txd = SLIST_FIRST(&tx->tx_entries);
if (SLIST_NEXT(txd, tx_entry) == NULL) {
struct lane_tx_layout *layout =
(struct lane_tx_layout *)tx->section->layout;
PMEMobjpool *pop = tx->pop;
tx_pre_commit(pop, tx, lane);
pmemops_drain(&pop->p_ops);
tx_set_state(pop, layout, TX_STATE_COMMITTED);
if (pop->tx_postcommit_tasks != NULL &&
ringbuf_tryenqueue(pop->tx_postcommit_tasks,
tx->section) == 0) {
lane_detach(pop);
} else {
tx_post_commit_cleanup(pop, tx->section, 0);
}
tx->section = NULL;
}
tx->stage = TX_STAGE_ONCOMMIT;
obj_tx_callback(tx);
}
int
pmemobj_tx_end(void)
{
LOG(3, NULL);
struct tx *tx = get_tx();
if (tx->stage == TX_STAGE_WORK)
FATAL("pmemobj_tx_end called without pmemobj_tx_commit");
if (tx->pop == NULL)
FATAL("pmemobj_tx_end called without pmemobj_tx_begin");
if (tx->stage_callback &&
(tx->stage == TX_STAGE_ONCOMMIT ||
tx->stage == TX_STAGE_ONABORT)) {
tx->stage = TX_STAGE_FINALLY;
obj_tx_callback(tx);
}
struct tx_data *txd = SLIST_FIRST(&tx->tx_entries);
SLIST_REMOVE_HEAD(&tx->tx_entries, tx_entry);
Free(txd);
VALGRIND_END_TX;
if (SLIST_EMPTY(&tx->tx_entries)) {
ASSERTeq(tx->section, NULL);
release_and_free_tx_locks(tx);
tx->pop = NULL;
tx->stage = TX_STAGE_NONE;
if (tx->stage_callback) {
pmemobj_tx_callback cb = tx->stage_callback;
void *arg = tx->stage_callback_arg;
tx->stage_callback = NULL;
tx->stage_callback_arg = NULL;
cb(tx->pop, TX_STAGE_NONE, arg);
}
} else {
tx->stage = TX_STAGE_WORK;
if (tx->last_errnum)
obj_tx_abort(tx->last_errnum, 0);
}
return tx->last_errnum;
}
void
pmemobj_tx_process(void)
{
LOG(5, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
switch (tx->stage) {
case TX_STAGE_NONE:
break;
case TX_STAGE_WORK:
pmemobj_tx_commit();
break;
case TX_STAGE_ONABORT:
case TX_STAGE_ONCOMMIT:
tx->stage = TX_STAGE_FINALLY;
obj_tx_callback(tx);
break;
case TX_STAGE_FINALLY:
tx->stage = TX_STAGE_NONE;
break;
case MAX_TX_STAGE:
ASSERT(0);
}
}
static int
pmemobj_tx_add_large(struct tx *tx, struct tx_add_range_args *args)
{
struct lane_tx_runtime *runtime = tx->section->runtime;
struct pvector_context *undo = runtime->undo.ctx[UNDO_SET];
uint64_t *entry = pvector_push_back(undo);
if (entry == NULL) {
ERR("large set undo log too large");
return -1;
}
int ret = pmalloc_construct(args->pop, entry,
args->size + sizeof(struct tx_range),
constructor_tx_add_range, args,
0, OBJ_INTERNAL_OBJECT_MASK, 0);
if (ret != 0) {
pvector_pop_back(undo, NULL);
}
return ret;
}
static int
constructor_tx_range_cache(void *ctx, void *ptr, size_t usable_size, void *arg)
{
LOG(5, NULL);
PMEMobjpool *pop = ctx;
const struct pmem_ops *p_ops = &pop->p_ops;
ASSERTne(ptr, NULL);
VALGRIND_ADD_TO_TX(ptr, usable_size);
pmemops_memset_persist(p_ops, ptr, 0, usable_size);
VALGRIND_REMOVE_FROM_TX(ptr, usable_size);
return 0;
}
static struct tx_range_cache *
pmemobj_tx_get_range_cache(PMEMobjpool *pop, struct tx *tx,
struct pvector_context *undo, uint64_t *remaining_space)
{
uint64_t last_cache = pvector_last(undo);
uint64_t cache_size;
struct tx_range_cache *cache = NULL;
if (last_cache != 0) {
cache = OBJ_OFF_TO_PTR(pop, last_cache);
cache_size = palloc_usable_size(&pop->heap, last_cache);
}
struct lane_tx_runtime *runtime = tx->section->runtime;
if (cache == NULL || runtime->cache_offset +
sizeof(struct tx_range) >= cache_size) {
uint64_t *entry = pvector_push_back(undo);
if (entry == NULL) {
ERR("cache set undo log too large");
return NULL;
}
int err = pmalloc_construct(pop, entry,
pop->tx_params->cache_size,
constructor_tx_range_cache, NULL,
0, OBJ_INTERNAL_OBJECT_MASK, 0);
if (err != 0) {
pvector_pop_back(undo, NULL);
return NULL;
}
cache = OBJ_OFF_TO_PTR(pop, *entry);
cache_size = palloc_usable_size(&pop->heap, *entry);
runtime->cache_offset = 0;
}
*remaining_space = cache_size - runtime->cache_offset;
return cache;
}
static int
pmemobj_tx_add_small(struct tx *tx, struct tx_add_range_args *args)
{
PMEMobjpool *pop = args->pop;
struct lane_tx_runtime *runtime = tx->section->runtime;
struct pvector_context *undo = runtime->undo.ctx[UNDO_SET_CACHE];
const struct pmem_ops *p_ops = &pop->p_ops;
uint64_t remaining_space;
struct tx_range_cache *cache = pmemobj_tx_get_range_cache(pop, tx,
undo, &remaining_space);
if (cache == NULL) {
ERR("Failed to create range cache");
return 1;
}
struct tx_range *range =
(struct tx_range *)((char *)cache + runtime->cache_offset);
uint64_t data_offset = args->offset;
uint64_t data_size = args->size;
uint64_t range_size = TX_ALIGN_SIZE(args->size, TX_RANGE_MASK) +
sizeof(struct tx_range);
if (remaining_space < range_size) {
ASSERT(remaining_space > sizeof(struct tx_range));
range_size = remaining_space;
data_size = remaining_space - sizeof(struct tx_range);
args->offset += data_size;
args->size -= data_size;
} else {
args->size = 0;
}
runtime->cache_offset += range_size;
VALGRIND_ADD_TO_TX(range, range_size);
void *src = OBJ_OFF_TO_PTR(pop, data_offset);
VALGRIND_ADD_TO_TX(src, data_size);
pmemops_memcpy_persist(p_ops, range->data, src, data_size);
range->size = data_size;
range->offset = data_offset;
pmemops_persist(p_ops, range, sizeof(struct tx_range));
VALGRIND_REMOVE_FROM_TX(range, range_size);
if (args->size != 0)
return pmemobj_tx_add_small(tx, args);
return 0;
}
static int
pmemobj_tx_add_common(struct tx *tx, struct tx_add_range_args *args)
{
LOG(15, NULL);
if (args->size > PMEMOBJ_MAX_ALLOC_SIZE) {
ERR("snapshot size too large");
return obj_tx_abort_err(EINVAL);
}
if (args->offset < args->pop->heap_offset ||
(args->offset + args->size) >
(args->pop->heap_offset + args->pop->heap_size)) {
ERR("object outside of heap");
return obj_tx_abort_err(EINVAL);
}
struct lane_tx_runtime *runtime = tx->section->runtime;
uint64_t spoint = args->offset + args->size - 1;
uint64_t apoint = 0;
int ret = 0;
uint64_t range_flags = (args->flags & POBJ_FLAG_NO_FLUSH) ?
RANGE_FLAG_NO_FLUSH : 0;
while (spoint >= args->offset) {
apoint = spoint + 1;
uint64_t size_flags = ctree_find_le_unlocked(runtime->ranges,
&spoint);
uint64_t size = RANGE_GET_SIZE(size_flags);
struct tx_add_range_args nargs;
nargs.pop = args->pop;
if (spoint < args->offset) {
nargs.size = apoint - args->offset;
if (spoint + size > args->offset) {
nargs.offset = spoint + size;
if (nargs.size <= nargs.offset - args->offset)
break;
nargs.size -= nargs.offset - args->offset;
} else {
nargs.offset = args->offset;
}
if (args->size == 0)
break;
spoint = 0;
} else {
nargs.offset = spoint + size;
spoint -= 1;
if (nargs.offset >= apoint)
continue;
nargs.size = apoint - nargs.offset;
}
ret = ctree_insert_unlocked(runtime->ranges, nargs.offset,
nargs.size | range_flags);
if (ret != 0) {
if (ret == EEXIST)
FATAL("invalid state of ranges tree");
break;
}
ret = nargs.size > tx->pop->tx_params->cache_threshold ?
pmemobj_tx_add_large(tx, &nargs) :
pmemobj_tx_add_small(tx, &nargs);
if (ret != 0)
break;
}
if (ret != 0) {
ERR("out of memory");
return obj_tx_abort_err(ENOMEM);
}
return 0;
}
int
pmemobj_tx_add_range_direct(const void *ptr, size_t size)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
PMEMobjpool *pop = tx->pop;
if (!OBJ_PTR_FROM_POOL(pop, ptr)) {
ERR("object outside of pool");
return obj_tx_abort_err(EINVAL);
}
struct tx_add_range_args args = {
.pop = pop,
.offset = (uint64_t)((char *)ptr - (char *)pop),
.size = size,
.flags = 0,
};
return pmemobj_tx_add_common(tx, &args);
}
int
pmemobj_tx_xadd_range_direct(const void *ptr, size_t size, uint64_t flags)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
if (!OBJ_PTR_FROM_POOL(tx->pop, ptr)) {
ERR("object outside of pool");
return obj_tx_abort_err(EINVAL);
}
if (flags & ~POBJ_XADD_VALID_FLAGS) {
ERR("unknown flags 0x%" PRIx64, flags & ~POBJ_XADD_VALID_FLAGS);
return obj_tx_abort_err(EINVAL);
}
struct tx_add_range_args args = {
.pop = tx->pop,
.offset = (uint64_t)((char *)ptr - (char *)tx->pop),
.size = size,
.flags = flags,
};
return pmemobj_tx_add_common(tx, &args);
}
int
pmemobj_tx_add_range(PMEMoid oid, uint64_t hoff, size_t size)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
if (oid.pool_uuid_lo != tx->pop->uuid_lo) {
ERR("invalid pool uuid");
return obj_tx_abort_err(EINVAL);
}
ASSERT(OBJ_OID_IS_VALID(tx->pop, oid));
struct tx_add_range_args args = {
.pop = tx->pop,
.offset = oid.off + hoff,
.size = size,
.flags = 0,
};
return pmemobj_tx_add_common(tx, &args);
}
int
pmemobj_tx_xadd_range(PMEMoid oid, uint64_t hoff, size_t size, uint64_t flags)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
if (oid.pool_uuid_lo != tx->pop->uuid_lo) {
ERR("invalid pool uuid");
return obj_tx_abort_err(EINVAL);
}
ASSERT(OBJ_OID_IS_VALID(tx->pop, oid));
if (flags & ~POBJ_XADD_VALID_FLAGS) {
ERR("unknown flags 0x%" PRIx64, flags & ~POBJ_XADD_VALID_FLAGS);
return obj_tx_abort_err(EINVAL);
}
struct tx_add_range_args args = {
.pop = tx->pop,
.offset = oid.off + hoff,
.size = size,
.flags = flags,
};
return pmemobj_tx_add_common(tx, &args);
}
PMEMoid
pmemobj_tx_alloc(size_t size, uint64_t type_num)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
if (size == 0) {
ERR("allocation with size 0");
return obj_tx_abort_null(EINVAL);
}
return tx_alloc_common(tx, size, (type_num_t)type_num,
constructor_tx_alloc, ALLOC_ARGS(0));
}
PMEMoid
pmemobj_tx_zalloc(size_t size, uint64_t type_num)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
if (size == 0) {
ERR("allocation with size 0");
return obj_tx_abort_null(EINVAL);
}
return tx_alloc_common(tx, size, (type_num_t)type_num,
constructor_tx_alloc, ALLOC_ARGS(POBJ_FLAG_ZERO));
}
PMEMoid
pmemobj_tx_xalloc(size_t size, uint64_t type_num, uint64_t flags)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
if (size == 0) {
ERR("allocation with size 0");
return obj_tx_abort_null(EINVAL);
}
if (flags & ~POBJ_TX_XALLOC_VALID_FLAGS) {
ERR("unknown flags 0x%" PRIx64,
flags & ~POBJ_TX_XALLOC_VALID_FLAGS);
return obj_tx_abort_null(EINVAL);
}
return tx_alloc_common(tx, size, (type_num_t)type_num,
constructor_tx_alloc, ALLOC_ARGS(flags));
}
PMEMoid
pmemobj_tx_realloc(PMEMoid oid, size_t size, uint64_t type_num)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
return tx_realloc_common(tx, oid, size, type_num,
constructor_tx_alloc, constructor_tx_alloc, 0);
}
PMEMoid
pmemobj_tx_zrealloc(PMEMoid oid, size_t size, uint64_t type_num)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
return tx_realloc_common(tx, oid, size, type_num,
constructor_tx_alloc, constructor_tx_alloc,
POBJ_FLAG_ZERO);
}
PMEMoid
pmemobj_tx_strdup(const char *s, uint64_t type_num)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
if (NULL == s) {
ERR("cannot duplicate NULL string");
return obj_tx_abort_null(EINVAL);
}
size_t len = strlen(s);
if (len == 0)
return tx_alloc_common(tx, sizeof(char), (type_num_t)type_num,
constructor_tx_alloc,
ALLOC_ARGS(POBJ_FLAG_ZERO));
size_t size = (len + 1) * sizeof(char);
return tx_alloc_common(tx, size, (type_num_t)type_num,
constructor_tx_alloc, COPY_ARGS(0, s, size));
}
PMEMoid
pmemobj_tx_wcsdup(const wchar_t *s, uint64_t type_num)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
if (NULL == s) {
ERR("cannot duplicate NULL string");
return obj_tx_abort_null(EINVAL);
}
size_t len = wcslen(s);
if (len == 0)
return tx_alloc_common(tx, sizeof(wchar_t),
(type_num_t)type_num, constructor_tx_alloc,
ALLOC_ARGS(POBJ_FLAG_ZERO));
size_t size = (len + 1) * sizeof(wchar_t);
return tx_alloc_common(tx, size, (type_num_t)type_num,
constructor_tx_alloc, COPY_ARGS(0, s, size));
}
int
pmemobj_tx_free(PMEMoid oid)
{
LOG(3, NULL);
struct tx *tx = get_tx();
ASSERT_IN_TX(tx);
ASSERT_TX_STAGE_WORK(tx);
if (OBJ_OID_IS_NULL(oid))
return 0;
struct lane_tx_runtime *lane =
(struct lane_tx_runtime *)tx->section->runtime;
PMEMobjpool *pop = tx->pop;
if (pop->uuid_lo != oid.pool_uuid_lo) {
ERR("invalid pool uuid");
return obj_tx_abort_err(EINVAL);
}
ASSERT(OBJ_OID_IS_VALID(pop, oid));
uint64_t *entry = pvector_push_back(lane->undo.ctx[UNDO_FREE]);
if (entry == NULL) {
ERR("free undo log too large");
return obj_tx_abort_err(ENOMEM);
}
*entry = oid.off;
pmemops_persist(&pop->p_ops, entry, sizeof(*entry));
return 0;
}
int
pmemobj_tx_publish(struct pobj_action *actv, int actvcnt)
{
struct tx *tx = get_tx();
ASSERT_TX_STAGE_WORK(tx);
tx_fulfill_reservations(tx);
ASSERT((unsigned)actvcnt <= MAX_TX_ALLOC_RESERVATIONS);
struct lane_tx_runtime *lane =
(struct lane_tx_runtime *)tx->section->runtime;
struct pvector_context *ctx = lane->undo.ctx[UNDO_ALLOC];
int nentries = 0;
int i;
for (i = 0; i < actvcnt; ++i) {
if (actv[i].type != POBJ_ACTION_TYPE_HEAP) {
ERR("only heap actions can be "
"published with a transaction");
break;
}
uint64_t *e = pvector_push_back(ctx);
if (e == NULL)
break;
*e = actv[i].heap.offset;
pmemops_persist(&tx->pop->p_ops, e, sizeof(*e));
nentries++;
size_t size = palloc_usable_size(&tx->pop->heap,
actv[i].heap.offset);
ASSERTeq(size & RANGE_FLAGS_MASK, 0);
if (ctree_insert_unlocked(lane->ranges, actv[i].heap.offset,
size | RANGE_FLAG_NO_FLUSH) != 0)
break;
}
if (i != actvcnt) {
while (nentries--) {
pvector_pop_back(ctx, tx_clear_vec_entry);
}
ERR("alloc undo log too large");
return obj_tx_abort_err(ENOMEM);
}
memcpy(lane->alloc_actv, actv,
sizeof(struct pobj_action) * (unsigned)actvcnt);
lane->actvcnt = actvcnt;
lane->actvundo = actvcnt;
return 0;
}
static void *
lane_transaction_construct_rt(PMEMobjpool *pop)
{
return Zalloc(sizeof(struct lane_tx_runtime));
}
static void
lane_transaction_destroy_rt(PMEMobjpool *pop, void *rt)
{
struct lane_tx_runtime *lane = rt;
tx_destroy_undo_runtime(&lane->undo);
Free(lane);
}
static int
lane_transaction_recovery(PMEMobjpool *pop, void *data, unsigned length)
{
struct lane_tx_layout *layout = data;
int ret = 0;
ASSERT(sizeof(*layout) <= length);
if (layout->state == TX_STATE_COMMITTED) {
tx_post_commit(pop, NULL, layout, 1 );
tx_set_state(pop, layout, TX_STATE_NONE);
} else {
tx_abort(pop, NULL, layout, 1 );
}
return ret;
}
static int
lane_transaction_check(PMEMobjpool *pop, void *data, unsigned length)
{
LOG(3, "tx lane %p", data);
struct lane_tx_layout *tx_sec = data;
if (tx_sec->state != TX_STATE_NONE &&
tx_sec->state != TX_STATE_COMMITTED) {
ERR("tx lane: invalid transaction state");
return -1;
}
return 0;
}
static int
lane_transaction_boot(PMEMobjpool *pop)
{
return 0;
}
static struct section_operations transaction_ops = {
.construct_rt = lane_transaction_construct_rt,
.destroy_rt = lane_transaction_destroy_rt,
.recover = lane_transaction_recovery,
.check = lane_transaction_check,
.boot = lane_transaction_boot
};
SECTION_PARM(LANE_SECTION_TRANSACTION, &transaction_ops);
static int
CTL_READ_HANDLER(size)(PMEMobjpool *pop,
enum ctl_query_source source, void *arg, struct ctl_indexes *indexes)
{
ssize_t *arg_out = arg;
*arg_out = (ssize_t)pop->tx_params->cache_size;
return 0;
}
static int
CTL_WRITE_HANDLER(size)(PMEMobjpool *pop,
enum ctl_query_source source, void *arg, struct ctl_indexes *indexes)
{
ssize_t arg_in = *(int *)arg;
if (arg_in < 0 || arg_in > (ssize_t)PMEMOBJ_MAX_ALLOC_SIZE) {
errno = EINVAL;
ERR("invalid cache size, must be between 0 and max alloc size");
return -1;
}
size_t argu = (size_t)arg_in;
pop->tx_params->cache_size = argu;
if (pop->tx_params->cache_threshold > argu)
pop->tx_params->cache_threshold = argu;
return 0;
}
static struct ctl_argument CTL_ARG(size) = CTL_ARG_LONG_LONG;
static int
CTL_READ_HANDLER(threshold)(PMEMobjpool *pop,
enum ctl_query_source source, void *arg, struct ctl_indexes *indexes)
{
ssize_t *arg_out = arg;
*arg_out = (ssize_t)pop->tx_params->cache_threshold;
return 0;
}
static int
CTL_WRITE_HANDLER(threshold)(PMEMobjpool *pop,
enum ctl_query_source source, void *arg, struct ctl_indexes *indexes)
{
ssize_t arg_in = *(int *)arg;
if (arg_in < 0 || arg_in > (ssize_t)pop->tx_params->cache_size) {
errno = EINVAL;
ERR("invalid threshold size, must be between 0 and cache size");
return -1;
}
pop->tx_params->cache_threshold = (size_t)arg_in;
return 0;
}
static struct ctl_argument CTL_ARG(threshold) = CTL_ARG_LONG_LONG;
static const struct ctl_node CTL_NODE(cache)[] = {
CTL_LEAF_RW(size),
CTL_LEAF_RW(threshold),
CTL_NODE_END
};
static int
CTL_READ_HANDLER(skip_expensive_checks)(PMEMobjpool *pop,
enum ctl_query_source source, void *arg, struct ctl_indexes *indexes)
{
int *arg_out = arg;
*arg_out = pop->tx_debug_skip_expensive_checks;
return 0;
}
static int
CTL_WRITE_HANDLER(skip_expensive_checks)(PMEMobjpool *pop,
enum ctl_query_source source, void *arg, struct ctl_indexes *indexes)
{
int arg_in = *(int *)arg;
pop->tx_debug_skip_expensive_checks = arg_in;
return 0;
}
static struct ctl_argument CTL_ARG(skip_expensive_checks) = CTL_ARG_BOOLEAN;
static const struct ctl_node CTL_NODE(debug)[] = {
CTL_LEAF_RW(skip_expensive_checks),
CTL_NODE_END
};
static int
CTL_READ_HANDLER(queue_depth)(PMEMobjpool *pop, enum ctl_query_source source,
void *arg, struct ctl_indexes *indexes)
{
int *arg_out = arg;
*arg_out = (int)ringbuf_length(pop->tx_postcommit_tasks);
return 0;
}
static int
CTL_WRITE_HANDLER(queue_depth)(PMEMobjpool *pop, enum ctl_query_source source,
void *arg, struct ctl_indexes *indexes)
{
int arg_in = *(int *)arg;
struct ringbuf *ntasks = ringbuf_new((unsigned)arg_in);
if (ntasks == NULL)
return -1;
if (pop->tx_postcommit_tasks != NULL) {
ringbuf_delete(pop->tx_postcommit_tasks);
}
pop->tx_postcommit_tasks = ntasks;
return 0;
}
static struct ctl_argument CTL_ARG(queue_depth) = CTL_ARG_INT;
static int
CTL_READ_HANDLER(worker)(PMEMobjpool *pop, enum ctl_query_source source,
void *arg, struct ctl_indexes *indexes)
{
struct lane_section *section;
while ((section = ringbuf_dequeue_s(pop->tx_postcommit_tasks,
sizeof(*section))) != NULL) {
tx_post_commit_cleanup(pop, section, 1);
}
return 0;
}
static int
CTL_READ_HANDLER(stop)(PMEMobjpool *pop, enum ctl_query_source source,
void *arg, struct ctl_indexes *indexes)
{
ringbuf_stop(pop->tx_postcommit_tasks);
return 0;
}
static const struct ctl_node CTL_NODE(post_commit)[] = {
CTL_LEAF_RW(queue_depth),
CTL_LEAF_RO(worker),
CTL_LEAF_RO(stop),
CTL_NODE_END
};
static const struct ctl_node CTL_NODE(tx)[] = {
CTL_CHILD(debug),
CTL_CHILD(cache),
CTL_CHILD(post_commit),
CTL_NODE_END
};
void
tx_ctl_register(PMEMobjpool *pop)
{
CTL_REGISTER_MODULE(pop->ctl, tx);
}