#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <inttypes.h>
#include <libpmemobj.h>
#include "hashmap_tx.h"
#include "hashmap_internal.h"
TOID_DECLARE(struct buckets, HASHMAP_TX_TYPE_OFFSET + 1);
TOID_DECLARE(struct entry, HASHMAP_TX_TYPE_OFFSET + 2);
struct entry {
uint64_t key;
PMEMoid value;
TOID(struct entry) next;
};
struct buckets {
size_t nbuckets;
TOID(struct entry) bucket[];
};
struct hashmap_tx {
uint32_t seed;
uint32_t hash_fun_a;
uint32_t hash_fun_b;
uint64_t hash_fun_p;
uint64_t count;
TOID(struct buckets) buckets;
};
static void
create_hashmap(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap, uint32_t seed)
{
size_t len = INIT_BUCKETS_NUM;
size_t sz = sizeof(struct buckets) +
len * sizeof(TOID(struct entry));
TX_BEGIN(pop) {
TX_ADD(hashmap);
D_RW(hashmap)->seed = seed;
do {
D_RW(hashmap)->hash_fun_a = (uint32_t)rand();
} while (D_RW(hashmap)->hash_fun_a == 0);
D_RW(hashmap)->hash_fun_b = (uint32_t)rand();
D_RW(hashmap)->hash_fun_p = HASH_FUNC_COEFF_P;
D_RW(hashmap)->buckets = TX_ZALLOC(struct buckets, sz);
D_RW(D_RW(hashmap)->buckets)->nbuckets = len;
} TX_ONABORT {
fprintf(stderr, "%s: transaction aborted: %s\n", __func__,
pmemobj_errormsg());
abort();
} TX_END
}
static uint64_t
hash(const TOID(struct hashmap_tx) *hashmap,
const TOID(struct buckets) *buckets, uint64_t value)
{
uint32_t a = D_RO(*hashmap)->hash_fun_a;
uint32_t b = D_RO(*hashmap)->hash_fun_b;
uint64_t p = D_RO(*hashmap)->hash_fun_p;
size_t len = D_RO(*buckets)->nbuckets;
return ((a * value + b) % p) % len;
}
static void
hm_tx_rebuild(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap, size_t new_len)
{
TOID(struct buckets) buckets_old = D_RO(hashmap)->buckets;
if (new_len == 0)
new_len = D_RO(buckets_old)->nbuckets;
size_t sz_old = sizeof(struct buckets) +
D_RO(buckets_old)->nbuckets *
sizeof(TOID(struct entry));
size_t sz_new = sizeof(struct buckets) +
new_len * sizeof(TOID(struct entry));
TX_BEGIN(pop) {
TX_ADD_FIELD(hashmap, buckets);
TOID(struct buckets) buckets_new =
TX_ZALLOC(struct buckets, sz_new);
D_RW(buckets_new)->nbuckets = new_len;
pmemobj_tx_add_range(buckets_old.oid, 0, sz_old);
for (size_t i = 0; i < D_RO(buckets_old)->nbuckets; ++i) {
while (!TOID_IS_NULL(D_RO(buckets_old)->bucket[i])) {
TOID(struct entry) en =
D_RO(buckets_old)->bucket[i];
uint64_t h = hash(&hashmap, &buckets_new,
D_RO(en)->key);
D_RW(buckets_old)->bucket[i] = D_RO(en)->next;
TX_ADD_FIELD(en, next);
D_RW(en)->next = D_RO(buckets_new)->bucket[h];
D_RW(buckets_new)->bucket[h] = en;
}
}
D_RW(hashmap)->buckets = buckets_new;
TX_FREE(buckets_old);
} TX_ONABORT {
fprintf(stderr, "%s: transaction aborted: %s\n", __func__,
pmemobj_errormsg());
} TX_END
}
int
hm_tx_insert(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap,
uint64_t key, PMEMoid value)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var;
uint64_t h = hash(&hashmap, &buckets, key);
int num = 0;
for (var = D_RO(buckets)->bucket[h];
!TOID_IS_NULL(var);
var = D_RO(var)->next) {
if (D_RO(var)->key == key)
return 1;
num++;
}
int ret = 0;
TX_BEGIN(pop) {
TX_ADD_FIELD(D_RO(hashmap)->buckets, bucket[h]);
TX_ADD_FIELD(hashmap, count);
TOID(struct entry) e = TX_NEW(struct entry);
D_RW(e)->key = key;
D_RW(e)->value = value;
D_RW(e)->next = D_RO(buckets)->bucket[h];
D_RW(buckets)->bucket[h] = e;
D_RW(hashmap)->count++;
num++;
} TX_ONABORT {
fprintf(stderr, "transaction aborted: %s\n",
pmemobj_errormsg());
ret = -1;
} TX_END
if (ret)
return ret;
if (num > MAX_HASHSET_THRESHOLD ||
(num > MIN_HASHSET_THRESHOLD &&
D_RO(hashmap)->count > 2 * D_RO(buckets)->nbuckets))
hm_tx_rebuild(pop, hashmap, D_RO(buckets)->nbuckets * 2);
return 0;
}
PMEMoid
hm_tx_remove(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap, uint64_t key)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var, prev = TOID_NULL(struct entry);
uint64_t h = hash(&hashmap, &buckets, key);
for (var = D_RO(buckets)->bucket[h];
!TOID_IS_NULL(var);
prev = var, var = D_RO(var)->next) {
if (D_RO(var)->key == key)
break;
}
if (TOID_IS_NULL(var))
return OID_NULL;
int ret = 0;
TX_BEGIN(pop) {
if (TOID_IS_NULL(prev))
TX_ADD_FIELD(D_RO(hashmap)->buckets, bucket[h]);
else
TX_ADD_FIELD(prev, next);
TX_ADD_FIELD(hashmap, count);
if (TOID_IS_NULL(prev))
D_RW(buckets)->bucket[h] = D_RO(var)->next;
else
D_RW(prev)->next = D_RO(var)->next;
D_RW(hashmap)->count--;
TX_FREE(var);
} TX_ONABORT {
fprintf(stderr, "transaction aborted: %s\n",
pmemobj_errormsg());
ret = -1;
} TX_END
if (ret)
return OID_NULL;
if (D_RO(hashmap)->count < D_RO(buckets)->nbuckets)
hm_tx_rebuild(pop, hashmap, D_RO(buckets)->nbuckets / 2);
return D_RO(var)->value;
}
int
hm_tx_foreach(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap,
int (*cb)(uint64_t key, PMEMoid value, void *arg), void *arg)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var;
int ret = 0;
for (size_t i = 0; i < D_RO(buckets)->nbuckets; ++i) {
if (TOID_IS_NULL(D_RO(buckets)->bucket[i]))
continue;
for (var = D_RO(buckets)->bucket[i]; !TOID_IS_NULL(var);
var = D_RO(var)->next) {
ret = cb(D_RO(var)->key, D_RO(var)->value, arg);
if (ret)
break;
}
}
return ret;
}
static void
hm_tx_debug(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap, FILE *out)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var;
fprintf(out, "a: %u b: %u p: %" PRIu64 "\n", D_RO(hashmap)->hash_fun_a,
D_RO(hashmap)->hash_fun_b, D_RO(hashmap)->hash_fun_p);
fprintf(out, "count: %" PRIu64 ", buckets: %zu\n",
D_RO(hashmap)->count, D_RO(buckets)->nbuckets);
for (size_t i = 0; i < D_RO(buckets)->nbuckets; ++i) {
if (TOID_IS_NULL(D_RO(buckets)->bucket[i]))
continue;
int num = 0;
fprintf(out, "%zu: ", i);
for (var = D_RO(buckets)->bucket[i]; !TOID_IS_NULL(var);
var = D_RO(var)->next) {
fprintf(out, "%" PRIu64 " ", D_RO(var)->key);
num++;
}
fprintf(out, "(%d)\n", num);
}
}
PMEMoid
hm_tx_get(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap, uint64_t key)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var;
uint64_t h = hash(&hashmap, &buckets, key);
for (var = D_RO(buckets)->bucket[h];
!TOID_IS_NULL(var);
var = D_RO(var)->next)
if (D_RO(var)->key == key)
return D_RO(var)->value;
return OID_NULL;
}
int
hm_tx_lookup(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap, uint64_t key)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var;
uint64_t h = hash(&hashmap, &buckets, key);
for (var = D_RO(buckets)->bucket[h];
!TOID_IS_NULL(var);
var = D_RO(var)->next)
if (D_RO(var)->key == key)
return 1;
return 0;
}
size_t
hm_tx_count(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap)
{
return D_RO(hashmap)->count;
}
int
hm_tx_init(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap)
{
srand(D_RO(hashmap)->seed);
return 0;
}
int
hm_tx_create(PMEMobjpool *pop, TOID(struct hashmap_tx) *map, void *arg)
{
struct hashmap_args *args = (struct hashmap_args *)arg;
int ret = 0;
TX_BEGIN(pop) {
TX_ADD_DIRECT(map);
*map = TX_ZNEW(struct hashmap_tx);
uint32_t seed = args ? args->seed : 0;
create_hashmap(pop, *map, seed);
} TX_ONABORT {
ret = -1;
} TX_END
return ret;
}
int
hm_tx_check(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap)
{
return TOID_IS_NULL(hashmap) || !TOID_VALID(hashmap);
}
int
hm_tx_cmd(PMEMobjpool *pop, TOID(struct hashmap_tx) hashmap,
unsigned cmd, uint64_t arg)
{
switch (cmd) {
case HASHMAP_CMD_REBUILD:
hm_tx_rebuild(pop, hashmap, arg);
return 0;
case HASHMAP_CMD_DEBUG:
if (!arg)
return -EINVAL;
hm_tx_debug(pop, hashmap, (FILE *)arg);
return 0;
default:
return -EINVAL;
}
}