#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <inttypes.h>
#include <libpmemobj.h>
#include "hashmap_atomic.h"
#include "hashmap_internal.h"
TOID_DECLARE(struct buckets, HASHMAP_ATOMIC_TYPE_OFFSET + 1);
TOID_DECLARE(struct entry, HASHMAP_ATOMIC_TYPE_OFFSET + 2);
struct entry {
uint64_t key;
PMEMoid value;
POBJ_LIST_ENTRY(struct entry) list;
};
struct entry_args {
uint64_t key;
PMEMoid value;
};
POBJ_LIST_HEAD(entries_head, struct entry);
struct buckets {
size_t nbuckets;
struct entries_head bucket[];
};
struct hashmap_atomic {
uint32_t seed;
uint32_t hash_fun_a;
uint32_t hash_fun_b;
uint64_t hash_fun_p;
uint64_t count;
uint32_t count_dirty;
TOID(struct buckets) buckets;
TOID(struct buckets) buckets_tmp;
};
static int
create_entry(PMEMobjpool *pop, void *ptr, void *arg)
{
struct entry *e = (struct entry *)ptr;
struct entry_args *args = (struct entry_args *)arg;
e->key = args->key;
e->value = args->value;
memset(&e->list, 0, sizeof(e->list));
pmemobj_persist(pop, e, sizeof(*e));
return 0;
}
static int
create_buckets(PMEMobjpool *pop, void *ptr, void *arg)
{
struct buckets *b = (struct buckets *)ptr;
b->nbuckets = *((size_t *)arg);
pmemobj_memset_persist(pop, &b->bucket, 0,
b->nbuckets * sizeof(b->bucket[0]));
pmemobj_persist(pop, &b->nbuckets, sizeof(b->nbuckets));
return 0;
}
static void
create_hashmap(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap,
uint32_t seed)
{
D_RW(hashmap)->seed = seed;
do {
D_RW(hashmap)->hash_fun_a = (uint32_t)rand();
} while (D_RW(hashmap)->hash_fun_a == 0);
D_RW(hashmap)->hash_fun_b = (uint32_t)rand();
D_RW(hashmap)->hash_fun_p = HASH_FUNC_COEFF_P;
size_t len = INIT_BUCKETS_NUM;
size_t sz = sizeof(struct buckets) +
len * sizeof(struct entries_head);
if (POBJ_ALLOC(pop, &D_RW(hashmap)->buckets, struct buckets, sz,
create_buckets, &len)) {
fprintf(stderr, "root alloc failed: %s\n", pmemobj_errormsg());
abort();
}
pmemobj_persist(pop, D_RW(hashmap), sizeof(*D_RW(hashmap)));
}
static uint64_t
hash(const TOID(struct hashmap_atomic) *hashmap,
const TOID(struct buckets) *buckets,
uint64_t value)
{
uint32_t a = D_RO(*hashmap)->hash_fun_a;
uint32_t b = D_RO(*hashmap)->hash_fun_b;
uint64_t p = D_RO(*hashmap)->hash_fun_p;
size_t len = D_RO(*buckets)->nbuckets;
return ((a * value + b) % p) % len;
}
static void
hm_atomic_rebuild_finish(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap)
{
TOID(struct buckets) cur = D_RO(hashmap)->buckets;
TOID(struct buckets) tmp = D_RO(hashmap)->buckets_tmp;
for (size_t i = 0; i < D_RO(cur)->nbuckets; ++i) {
while (!POBJ_LIST_EMPTY(&D_RO(cur)->bucket[i])) {
TOID(struct entry) en =
POBJ_LIST_FIRST(&D_RO(cur)->bucket[i]);
uint64_t h = hash(&hashmap, &tmp, D_RO(en)->key);
if (POBJ_LIST_MOVE_ELEMENT_HEAD(pop,
&D_RW(cur)->bucket[i],
&D_RW(tmp)->bucket[h],
en, list, list)) {
fprintf(stderr, "move failed: %s\n",
pmemobj_errormsg());
abort();
}
}
}
POBJ_FREE(&D_RO(hashmap)->buckets);
D_RW(hashmap)->buckets = D_RO(hashmap)->buckets_tmp;
pmemobj_persist(pop, &D_RW(hashmap)->buckets,
sizeof(D_RW(hashmap)->buckets));
D_RW(hashmap)->buckets_tmp.oid.off = 0;
pmemobj_persist(pop, &D_RW(hashmap)->buckets_tmp,
sizeof(D_RW(hashmap)->buckets_tmp));
}
static void
hm_atomic_rebuild(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap,
size_t new_len)
{
if (new_len == 0)
new_len = D_RO(D_RO(hashmap)->buckets)->nbuckets;
size_t sz = sizeof(struct buckets) +
new_len * sizeof(struct entries_head);
POBJ_ALLOC(pop, &D_RW(hashmap)->buckets_tmp, struct buckets, sz,
create_buckets, &new_len);
if (TOID_IS_NULL(D_RO(hashmap)->buckets_tmp)) {
fprintf(stderr,
"failed to allocate temporary space of size: %zu"
", %s\n",
new_len, pmemobj_errormsg());
return;
}
hm_atomic_rebuild_finish(pop, hashmap);
}
int
hm_atomic_insert(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap,
uint64_t key, PMEMoid value)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var;
uint64_t h = hash(&hashmap, &buckets, key);
int num = 0;
POBJ_LIST_FOREACH(var, &D_RO(buckets)->bucket[h], list) {
if (D_RO(var)->key == key)
return 1;
num++;
}
D_RW(hashmap)->count_dirty = 1;
pmemobj_persist(pop, &D_RW(hashmap)->count_dirty,
sizeof(D_RW(hashmap)->count_dirty));
struct entry_args args;
args.key = key;
args.value = value;
PMEMoid oid = POBJ_LIST_INSERT_NEW_HEAD(pop,
&D_RW(buckets)->bucket[h],
list, sizeof(struct entry), create_entry, &args);
if (OID_IS_NULL(oid)) {
fprintf(stderr, "failed to allocate entry: %s\n",
pmemobj_errormsg());
return -1;
}
D_RW(hashmap)->count++;
pmemobj_persist(pop, &D_RW(hashmap)->count,
sizeof(D_RW(hashmap)->count));
D_RW(hashmap)->count_dirty = 0;
pmemobj_persist(pop, &D_RW(hashmap)->count_dirty,
sizeof(D_RW(hashmap)->count_dirty));
num++;
if (num > MAX_HASHSET_THRESHOLD ||
(num > MIN_HASHSET_THRESHOLD &&
D_RO(hashmap)->count > 2 * D_RO(buckets)->nbuckets))
hm_atomic_rebuild(pop, hashmap, D_RW(buckets)->nbuckets * 2);
return 0;
}
PMEMoid
hm_atomic_remove(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap,
uint64_t key)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var;
uint64_t h = hash(&hashmap, &buckets, key);
POBJ_LIST_FOREACH(var, &D_RW(buckets)->bucket[h], list) {
if (D_RO(var)->key == key)
break;
}
if (TOID_IS_NULL(var))
return OID_NULL;
D_RW(hashmap)->count_dirty = 1;
pmemobj_persist(pop, &D_RW(hashmap)->count_dirty,
sizeof(D_RW(hashmap)->count_dirty));
if (POBJ_LIST_REMOVE_FREE(pop, &D_RW(buckets)->bucket[h],
var, list)) {
fprintf(stderr, "list remove failed: %s\n",
pmemobj_errormsg());
return OID_NULL;
}
D_RW(hashmap)->count--;
pmemobj_persist(pop, &D_RW(hashmap)->count,
sizeof(D_RW(hashmap)->count));
D_RW(hashmap)->count_dirty = 0;
pmemobj_persist(pop, &D_RW(hashmap)->count_dirty,
sizeof(D_RW(hashmap)->count_dirty));
if (D_RO(hashmap)->count < D_RO(buckets)->nbuckets)
hm_atomic_rebuild(pop, hashmap, D_RO(buckets)->nbuckets / 2);
return D_RO(var)->value;
}
int
hm_atomic_foreach(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap,
int (*cb)(uint64_t key, PMEMoid value, void *arg), void *arg)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var;
int ret = 0;
for (size_t i = 0; i < D_RO(buckets)->nbuckets; ++i)
POBJ_LIST_FOREACH(var, &D_RO(buckets)->bucket[i], list) {
ret = cb(D_RO(var)->key, D_RO(var)->value, arg);
if (ret)
return ret;
}
return 0;
}
static void
hm_atomic_debug(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap,
FILE *out)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var;
fprintf(out, "a: %u b: %u p: %" PRIu64 "\n", D_RO(hashmap)->hash_fun_a,
D_RO(hashmap)->hash_fun_b, D_RO(hashmap)->hash_fun_p);
fprintf(out, "count: %" PRIu64 ", buckets: %zu\n",
D_RO(hashmap)->count, D_RO(buckets)->nbuckets);
for (size_t i = 0; i < D_RO(buckets)->nbuckets; ++i) {
if (POBJ_LIST_EMPTY(&D_RO(buckets)->bucket[i]))
continue;
int num = 0;
fprintf(out, "%zu: ", i);
POBJ_LIST_FOREACH(var, &D_RO(buckets)->bucket[i], list) {
fprintf(out, "%" PRIu64 " ", D_RO(var)->key);
num++;
}
fprintf(out, "(%d)\n", num);
}
}
PMEMoid
hm_atomic_get(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap,
uint64_t key)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var;
uint64_t h = hash(&hashmap, &buckets, key);
POBJ_LIST_FOREACH(var, &D_RO(buckets)->bucket[h], list)
if (D_RO(var)->key == key)
return D_RO(var)->value;
return OID_NULL;
}
int
hm_atomic_lookup(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap,
uint64_t key)
{
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
TOID(struct entry) var;
uint64_t h = hash(&hashmap, &buckets, key);
POBJ_LIST_FOREACH(var, &D_RO(buckets)->bucket[h], list)
if (D_RO(var)->key == key)
return 1;
return 0;
}
int
hm_atomic_create(PMEMobjpool *pop, TOID(struct hashmap_atomic) *map, void *arg)
{
struct hashmap_args *args = (struct hashmap_args *)arg;
uint32_t seed = args ? args->seed : 0;
POBJ_ZNEW(pop, map, struct hashmap_atomic);
create_hashmap(pop, *map, seed);
return 0;
}
int
hm_atomic_init(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap)
{
srand(D_RO(hashmap)->seed);
if (!TOID_IS_NULL(D_RO(hashmap)->buckets_tmp)) {
printf("rebuild, previous attempt crashed\n");
if (TOID_EQUALS(D_RO(hashmap)->buckets,
D_RO(hashmap)->buckets_tmp)) {
D_RW(hashmap)->buckets_tmp.oid.off = 0;
pmemobj_persist(pop, &D_RW(hashmap)->buckets_tmp,
sizeof(D_RW(hashmap)->buckets_tmp));
} else if (TOID_IS_NULL(D_RW(hashmap)->buckets)) {
D_RW(hashmap)->buckets = D_RW(hashmap)->buckets_tmp;
pmemobj_persist(pop, &D_RW(hashmap)->buckets,
sizeof(D_RW(hashmap)->buckets));
D_RW(hashmap)->buckets_tmp.oid.off = 0;
pmemobj_persist(pop, &D_RW(hashmap)->buckets_tmp,
sizeof(D_RW(hashmap)->buckets_tmp));
} else {
hm_atomic_rebuild_finish(pop, hashmap);
}
}
if (D_RO(hashmap)->count_dirty) {
printf("count dirty, recalculating\n");
TOID(struct entry) var;
TOID(struct buckets) buckets = D_RO(hashmap)->buckets;
uint64_t cnt = 0;
for (size_t i = 0; i < D_RO(buckets)->nbuckets; ++i)
POBJ_LIST_FOREACH(var, &D_RO(buckets)->bucket[i], list)
cnt++;
printf("old count: %" PRIu64 ", new count: %" PRIu64 "\n",
D_RO(hashmap)->count, cnt);
D_RW(hashmap)->count = cnt;
pmemobj_persist(pop, &D_RW(hashmap)->count,
sizeof(D_RW(hashmap)->count));
D_RW(hashmap)->count_dirty = 0;
pmemobj_persist(pop, &D_RW(hashmap)->count_dirty,
sizeof(D_RW(hashmap)->count_dirty));
}
return 0;
}
int
hm_atomic_check(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap)
{
return TOID_IS_NULL(hashmap) || !TOID_VALID(hashmap);
}
size_t
hm_atomic_count(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap)
{
return D_RO(hashmap)->count;
}
int
hm_atomic_cmd(PMEMobjpool *pop, TOID(struct hashmap_atomic) hashmap,
unsigned cmd, uint64_t arg)
{
switch (cmd) {
case HASHMAP_CMD_REBUILD:
hm_atomic_rebuild(pop, hashmap, arg);
return 0;
case HASHMAP_CMD_DEBUG:
if (!arg)
return -EINVAL;
hm_atomic_debug(pop, hashmap, (FILE *)arg);
return 0;
default:
return -EINVAL;
}
}