#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdint.h>
#include <endian.h>
#include <errno.h>
#include <stddef.h>
#include <time.h>
#include <ctype.h>
#include <linux/limits.h>
#include <sys/mman.h>
#include "libpmem.h"
#include "librpmem.h"
#include "set.h"
#include "file.h"
#include "os.h"
#include "mmap.h"
#include "util.h"
#include "out.h"
#include "dlsym.h"
#include "valgrind_internal.h"
#include "sys_util.h"
#include "util_pmem.h"
#include "fs.h"
#define LIBRARY_REMOTE "librpmem.so.1"
#define SIZE_AUTODETECT_STR "AUTO"
#define PMEM_EXT ".pmem"
#define PMEM_EXT_LEN sizeof(PMEM_EXT)
#define PMEM_FILE_PADDING 6
#define PMEM_FILE_NAME_MAX_LEN 20
#define PMEM_FILE_MAX_LEN (PMEM_FILE_NAME_MAX_LEN + PMEM_FILE_PADDING)
static void *Rpmem_handle_remote;
static RPMEMpool *(*Rpmem_create)(const char *target, const char *pool_set_name,
void *pool_addr, size_t pool_size, unsigned *nlanes,
const struct rpmem_pool_attr *rpmem_attr);
static RPMEMpool *(*Rpmem_open)(const char *target, const char *pool_set_name,
void *pool_addr, size_t pool_size, unsigned *nlanes,
struct rpmem_pool_attr *rpmem_attr);
int (*Rpmem_close)(RPMEMpool *rpp);
int (*Rpmem_persist)(RPMEMpool *rpp, size_t offset, size_t length,
unsigned lane);
int (*Rpmem_read)(RPMEMpool *rpp, void *buff, size_t offset,
size_t length, unsigned lane);
int (*Rpmem_remove)(const char *target, const char *pool_set_name, int flags);
int (*Rpmem_set_attr)(RPMEMpool *rpp, const struct rpmem_pool_attr *attr);
static int Remote_replication_available;
static os_mutex_t Remote_lock;
static int Remote_usage_counter;
int Prefault_at_open = 0;
int Prefault_at_create = 0;
static struct pool_set_option Options[] = {
{ "NOHDRS", OPTION_NO_HDRS },
{ NULL, OPTION_UNKNOWN }
};
void
util_remote_init(void)
{
LOG(3, NULL);
if (!Remote_replication_available) {
util_mutex_init(&Remote_lock);
Remote_replication_available = 1;
}
}
void
util_remote_fini(void)
{
LOG(3, NULL);
if (Remote_replication_available) {
Remote_replication_available = 0;
util_mutex_destroy(&Remote_lock);
}
}
static int
util_dl_check_error(void *handle, const char *func)
{
LOG(15, "handle %p func %s", handle, func);
if (handle == NULL) {
char *errstr = util_dlerror();
if (errstr)
ERR("%s(): %s", func, errstr);
errno = ELIBACC;
return -1;
}
return 0;
}
static void
util_remote_unload_core(void)
{
if (Rpmem_handle_remote != NULL) {
util_dlclose(Rpmem_handle_remote);
Rpmem_handle_remote = NULL;
}
Rpmem_create = NULL;
Rpmem_open = NULL;
Rpmem_close = NULL;
Rpmem_persist = NULL;
Rpmem_read = NULL;
Rpmem_remove = NULL;
Rpmem_set_attr = NULL;
}
void
util_remote_unload(void)
{
LOG(3, NULL);
if (!Remote_replication_available)
return;
util_mutex_lock(&Remote_lock);
if (Remote_usage_counter == 0)
goto end_unlock;
if (Remote_usage_counter > 1)
goto end_dec;
util_remote_unload_core();
end_dec:
Remote_usage_counter--;
end_unlock:
util_mutex_unlock(&Remote_lock);
}
int
util_remote_load(void)
{
LOG(3, NULL);
if (!Remote_replication_available) {
ERR("remote replication is not available");
return -1;
}
CHECK_FUNC_COMPATIBLE(rpmem_create, *Rpmem_create);
CHECK_FUNC_COMPATIBLE(rpmem_open, *Rpmem_open);
CHECK_FUNC_COMPATIBLE(rpmem_close, *Rpmem_close);
CHECK_FUNC_COMPATIBLE(rpmem_persist, *Rpmem_persist);
CHECK_FUNC_COMPATIBLE(rpmem_read, *Rpmem_read);
CHECK_FUNC_COMPATIBLE(rpmem_remove, *Rpmem_remove);
util_mutex_lock(&Remote_lock);
if (Remote_usage_counter > 0)
goto end;
Rpmem_handle_remote = util_dlopen(LIBRARY_REMOTE);
if (util_dl_check_error(Rpmem_handle_remote, "dlopen")) {
ERR("the pool set requires a remote replica, "
"but the '%s' library cannot be loaded",
LIBRARY_REMOTE);
goto err;
}
Rpmem_create = util_dlsym(Rpmem_handle_remote, "rpmem_create");
if (util_dl_check_error(Rpmem_create, "dlsym")) {
ERR("symbol 'rpmem_create' not found");
goto err;
}
Rpmem_open = util_dlsym(Rpmem_handle_remote, "rpmem_open");
if (util_dl_check_error(Rpmem_open, "dlsym")) {
ERR("symbol 'rpmem_open' not found");
goto err;
}
Rpmem_close = util_dlsym(Rpmem_handle_remote, "rpmem_close");
if (util_dl_check_error(Rpmem_close, "dlsym")) {
ERR("symbol 'rpmem_close' not found");
goto err;
}
Rpmem_persist = util_dlsym(Rpmem_handle_remote, "rpmem_persist");
if (util_dl_check_error(Rpmem_persist, "dlsym")) {
ERR("symbol 'rpmem_persist' not found");
goto err;
}
Rpmem_read = util_dlsym(Rpmem_handle_remote, "rpmem_read");
if (util_dl_check_error(Rpmem_read, "dlsym")) {
ERR("symbol 'rpmem_read' not found");
goto err;
}
Rpmem_remove = util_dlsym(Rpmem_handle_remote, "rpmem_remove");
if (util_dl_check_error(Rpmem_remove, "dlsym")) {
ERR("symbol 'rpmem_remove' not found");
goto err;
}
Rpmem_set_attr = util_dlsym(Rpmem_handle_remote, "rpmem_set_attr");
if (util_dl_check_error(Rpmem_set_attr, "dlsym")) {
ERR("symbol 'rpmem_set_attr' not found");
goto err;
}
end:
Remote_usage_counter++;
util_mutex_unlock(&Remote_lock);
return 0;
err:
LOG(4, "error clean up");
util_remote_unload_core();
util_mutex_unlock(&Remote_lock);
return -1;
}
#define PARSER_MAX_LINE (PATH_MAX + 1024)
enum parser_codes {
PARSER_CONTINUE = 0,
PARSER_PMEMPOOLSET,
PARSER_REPLICA,
PARSER_SIZE_PATH_EXPECTED,
PARSER_REMOTE_REPLICA_EXPECTED,
PARSER_WRONG_SIZE,
PARSER_ABSOLUTE_PATH_EXPECTED,
PARSER_RELATIVE_PATH_EXPECTED,
PARSER_SET_NO_PARTS,
PARSER_REP_NO_PARTS,
PARSER_SIZE_MISMATCH,
PARSER_OUT_OF_MEMORY,
PARSER_OPTION_UNKNOWN,
PARSER_OPTION_EXPECTED,
PARSER_FORMAT_OK,
PARSER_MAX_CODE
};
static const char *parser_errstr[PARSER_MAX_CODE] = {
"",
"the first line must be exactly 'PMEMPOOLSET'",
"exactly 'REPLICA' expected",
"size and path expected",
"address of remote node and descriptor of remote pool set expected",
"incorrect format of size",
"incorrect path (must be an absolute one)",
"incorrect descriptor (must be a relative path)",
"no pool set parts",
"no replica parts",
"sizes of pool set and replica mismatch",
"allocating memory failed",
"unknown option",
"missing option name",
""
};
static void
util_replica_force_page_allocation(struct pool_replica *rep)
{
volatile char *cur_addr = rep->part[0].addr;
char *addr_end = (char *)cur_addr + rep->part[0].size;
for (; cur_addr < addr_end; cur_addr += Pagesize) {
*cur_addr = *cur_addr;
VALGRIND_SET_CLEAN(cur_addr, 1);
}
}
int
util_map_hdr(struct pool_set_part *part, int flags, int rdonly)
{
LOG(3, "part %p flags %d", part, flags);
COMPILE_ERROR_ON(POOL_HDR_SIZE == 0);
ASSERTeq(POOL_HDR_SIZE % Pagesize, 0);
size_t hdrsize = part->alignment > POOL_HDR_SIZE
? part->alignment : POOL_HDR_SIZE;
void *addr = NULL;
#ifdef USE_VG_MEMCHECK
if (On_valgrind) {
addr = util_map_hint(hdrsize, hdrsize);
if (addr == MAP_FAILED) {
ERR("cannot find a contiguous region of given size");
return -1;
}
}
#endif
void *hdrp = mmap(addr, hdrsize,
rdonly ? PROT_READ : PROT_READ|PROT_WRITE,
flags, part->fd, 0);
if (hdrp == MAP_FAILED) {
ERR("!mmap: %s", part->path);
return -1;
}
part->hdrsize = hdrsize;
part->hdr = hdrp;
VALGRIND_REGISTER_PMEM_MAPPING(part->hdr, part->hdrsize);
VALGRIND_REGISTER_PMEM_FILE(part->fd, part->hdr, part->hdrsize, 0);
return 0;
}
int
util_unmap_hdr(struct pool_set_part *part)
{
if (part->hdr != NULL && part->hdrsize != 0) {
LOG(4, "munmap: addr %p size %zu", part->hdr, part->hdrsize);
VALGRIND_REMOVE_PMEM_MAPPING(part->hdr, part->hdrsize);
if (munmap(part->hdr, part->hdrsize) != 0) {
ERR("!munmap: %s", part->path);
}
part->hdr = NULL;
part->hdrsize = 0;
}
return 0;
}
int
util_map_part(struct pool_set_part *part, void *addr, size_t size,
size_t offset, int flags, int rdonly)
{
LOG(3, "part %p addr %p size %zu offset %zu flags %d",
part, addr, size, offset, flags);
ASSERTeq((uintptr_t)addr % Mmap_align, 0);
ASSERTeq(offset % Mmap_align, 0);
ASSERTeq(size % Mmap_align, 0);
ASSERT(((os_off_t)offset) >= 0);
ASSERTeq(offset % part->alignment, 0);
ASSERT(offset < part->filesize);
if (!size)
size = (part->filesize - offset) & ~(part->alignment - 1);
else
size = roundup(size, part->alignment);
void *addrp = mmap(addr, size,
rdonly ? PROT_READ : PROT_READ|PROT_WRITE,
flags, part->fd, (os_off_t)offset);
if (addrp == MAP_FAILED) {
ERR("!mmap: %s", part->path);
return -1;
}
if (addr != NULL && (flags & MAP_FIXED) && addrp != addr) {
ERR("unable to map at requested address %p", addr);
munmap(addrp, size);
return -1;
}
part->addr = addrp;
part->size = size;
VALGRIND_REGISTER_PMEM_MAPPING(part->addr, part->size);
VALGRIND_REGISTER_PMEM_FILE(part->fd, part->addr, part->size, offset);
return 0;
}
int
util_unmap_part(struct pool_set_part *part)
{
LOG(3, "part %p", part);
if (part->addr != NULL && part->size != 0) {
LOG(4, "munmap: addr %p size %zu", part->addr, part->size);
VALGRIND_REMOVE_PMEM_MAPPING(part->addr, part->size);
if (munmap(part->addr, part->size) != 0) {
ERR("!munmap: %s", part->path);
}
part->addr = NULL;
part->size = 0;
}
return 0;
}
int
util_unmap_parts(struct pool_replica *rep, unsigned start_index,
unsigned end_index)
{
LOG(3, "rep: %p, start_index: %u, end_index: %u", rep, start_index,
end_index);
for (unsigned p = start_index; p <= end_index; p++)
util_unmap_part(&rep->part[p]);
return 0;
}
void
util_poolset_free(struct pool_set *set)
{
LOG(3, "set %p", set);
for (unsigned r = 0; r < set->nreplicas; r++) {
struct pool_replica *rep = set->replica[r];
if (rep->remote == NULL) {
for (unsigned p = 0; p < rep->nallocated; p++) {
Free((void *)(rep->part[p].path));
}
} else {
ASSERTeq(rep->nparts, 1);
Free(rep->remote->node_addr);
Free(rep->remote->pool_desc);
Free(rep->remote);
}
struct pool_set_directory *d;
VEC_FOREACH_BY_PTR(d, &rep->directory) {
Free((void *)d->path);
}
VEC_DELETE(&rep->directory);
Free(set->replica[r]);
}
Free(set);
}
int
util_poolset_open(struct pool_set *set)
{
for (unsigned r = 0; r < set->nreplicas; ++r) {
if (util_replica_open(set, r, MAP_SHARED)) {
LOG(2, "replica open failed: replica %u", r);
errno = EINVAL;
return -1;
}
}
return 0;
}
int
util_replica_close_local(struct pool_replica *rep, unsigned repn,
enum del_parts_mode del)
{
for (unsigned p = 0; p < rep->nparts; p++) {
if (rep->part[p].fd != -1)
(void) os_close(rep->part[p].fd);
if ((del == DELETE_CREATED_PARTS && rep->part[p].created) ||
del == DELETE_ALL_PARTS) {
LOG(4, "unlink %s", rep->part[p].path);
int olderrno = errno;
if (util_unlink(rep->part[p].path) && errno != ENOENT) {
ERR("!unlink %s failed (part %u, replica %u)",
rep->part[p].path, p, repn);
return -1;
}
errno = olderrno;
}
}
return 0;
}
int
util_replica_close_remote(struct pool_replica *rep, unsigned repn,
enum del_parts_mode del)
{
if (!rep->remote)
return 0;
if (rep->remote->rpp) {
LOG(4, "closing remote replica #%u", repn);
Rpmem_close(rep->remote->rpp);
rep->remote->rpp = NULL;
}
if ((del == DELETE_CREATED_PARTS && rep->part[0].created) ||
del == DELETE_ALL_PARTS) {
LOG(4, "removing remote replica #%u", repn);
int ret = Rpmem_remove(rep->remote->node_addr,
rep->remote->pool_desc, 0);
if (ret) {
LOG(1, "!removing remote replica #%u failed", repn);
return -1;
}
}
return 0;
}
void
util_poolset_close(struct pool_set *set, enum del_parts_mode del)
{
LOG(3, "set %p del %d", set, del);
int oerrno = errno;
for (unsigned r = 0; r < set->nreplicas; r++) {
util_replica_close(set, r);
struct pool_replica *rep = set->replica[r];
if (!rep->remote)
(void) util_replica_close_local(rep, r, del);
else
(void) util_replica_close_remote(rep, r, del);
}
if (set->remote)
util_remote_unload();
#ifdef __FreeBSD__
util_poolset_fdclose_always(set);
#endif
util_poolset_free(set);
errno = oerrno;
}
int
util_poolset_chmod(struct pool_set *set, mode_t mode)
{
LOG(3, "set %p mode %o", set, mode);
for (unsigned r = 0; r < set->nreplicas; r++) {
struct pool_replica *rep = set->replica[r];
if (rep->remote != NULL)
continue;
for (unsigned p = 0; p < rep->nparts; p++) {
struct pool_set_part *part = &rep->part[p];
if (!part->created)
continue;
os_stat_t stbuf;
if (os_fstat(part->fd, &stbuf) != 0) {
ERR("!fstat");
return -1;
}
if (stbuf.st_mode & ~(unsigned)S_IFMT) {
LOG(1, "file permissions changed during pool "
"initialization, file: %s (%o)",
part->path,
stbuf.st_mode & ~(unsigned)S_IFMT);
}
if (os_chmod(part->path, mode)) {
ERR("!chmod %u/%u/%s", r, p, part->path);
return -1;
}
}
}
return 0;
}
void
util_poolset_fdclose_always(struct pool_set *set)
{
LOG(3, "set %p", set);
for (unsigned r = 0; r < set->nreplicas; r++)
util_replica_fdclose(set->replica[r]);
}
void
util_poolset_fdclose(struct pool_set *set)
{
#ifdef __FreeBSD__
LOG(3, "set %p: holding open", set);
#else
util_poolset_fdclose_always(set);
#endif
}
static ssize_t
util_autodetect_size(const char *path)
{
if (!util_file_is_device_dax(path)) {
ERR("size autodetection is supported only for device dax");
return -1;
}
return util_file_get_size(path);
}
static enum parser_codes
parser_read_line(char *line, size_t *size, char **path)
{
int ret;
char *size_str;
char *path_str;
char *saveptr = NULL;
size_str = strtok_r(line, " \t", &saveptr);
path_str = strtok_r(NULL, " \t", &saveptr);
if (!size_str || !path_str)
return PARSER_SIZE_PATH_EXPECTED;
LOG(10, "size '%s' path '%s'", size_str, path_str);
if (!util_is_absolute_path(path_str))
return PARSER_ABSOLUTE_PATH_EXPECTED;
*path = Strdup(path_str);
if (!(*path)) {
ERR("!Strdup");
return PARSER_OUT_OF_MEMORY;
}
if (strcmp(SIZE_AUTODETECT_STR, size_str) == 0) {
ssize_t s = util_autodetect_size(path_str);
if (s < 0) {
Free(*path);
*path = NULL;
return PARSER_WRONG_SIZE;
}
*size = (size_t)s;
return PARSER_CONTINUE;
}
ret = util_parse_size(size_str, size);
if (ret != 0 || *size == 0) {
Free(*path);
*path = NULL;
return PARSER_WRONG_SIZE;
}
return PARSER_CONTINUE;
}
static enum parser_codes
parser_read_replica(char *line, char **node_addr, char **pool_desc)
{
char *addr_str;
char *desc_str;
char *saveptr = NULL;
addr_str = strtok_r(line, " \t", &saveptr);
desc_str = strtok_r(NULL, " \t", &saveptr);
if (!addr_str || !desc_str)
return PARSER_REMOTE_REPLICA_EXPECTED;
LOG(10, "node address '%s' pool set descriptor '%s'",
addr_str, desc_str);
if (util_is_absolute_path(desc_str))
return PARSER_RELATIVE_PATH_EXPECTED;
*node_addr = Strdup(addr_str);
*pool_desc = Strdup(desc_str);
if (!(*node_addr) || !(*pool_desc)) {
ERR("!Strdup");
if (*node_addr)
Free(*node_addr);
if (*pool_desc)
Free(*pool_desc);
return PARSER_OUT_OF_MEMORY;
}
return PARSER_CONTINUE;
}
static enum parser_codes
parser_read_options(char *line, unsigned *options)
{
LOG(3, "line '%s'", line);
int opt_cnt = 0;
char *saveptr = NULL;
char *opt_str = strtok_r(line, " \t", &saveptr);
while (opt_str != NULL) {
LOG(4, "option '%s'", opt_str);
int i = 0;
while (Options[i].name && strcmp(opt_str, Options[i].name) != 0)
i++;
if (Options[i].name == NULL) {
LOG(4, "unknown option '%s'", opt_str);
return PARSER_OPTION_UNKNOWN;
}
if (*options & Options[i].flag)
LOG(4, "duplicated option '%s'", opt_str);
*options |= Options[i].flag;
opt_cnt++;
opt_str = strtok_r(NULL, " \t", &saveptr);
}
if (opt_cnt == 0)
return PARSER_OPTION_EXPECTED;
return PARSER_CONTINUE;
}
static int
util_replica_reserve(struct pool_replica **repp, unsigned n)
{
LOG(3, "replica %p n %u", *repp, n);
struct pool_replica *rep = *repp;
if (rep->nallocated >= n)
return 0;
rep = Realloc(rep, sizeof(struct pool_replica) +
(n) * sizeof(struct pool_set_part));
if (rep == NULL) {
ERR("!Realloc");
return -1;
}
size_t nsize = sizeof(struct pool_set_part) * (n - rep->nallocated);
memset(rep->part + rep->nallocated, 0, nsize);
rep->nallocated = n;
*repp = rep;
return 0;
}
static int
util_replica_add_part_by_idx(struct pool_replica **repp,
const char *path, size_t filesize, unsigned p)
{
LOG(3, "replica %p path %s filesize %zu", *repp, path, filesize);
if (util_replica_reserve(repp, p + 1) != 0)
return -1;
struct pool_replica *rep = *repp;
ASSERTne(rep, NULL);
int is_dev_dax = util_file_is_device_dax(path);
rep->part[p].path = path;
rep->part[p].filesize = filesize;
rep->part[p].fd = -1;
rep->part[p].is_dev_dax = is_dev_dax;
rep->part[p].created = 0;
rep->part[p].hdr = NULL;
rep->part[p].addr = NULL;
rep->part[p].remote_hdr = NULL;
if (is_dev_dax)
rep->part[p].alignment = util_file_device_dax_alignment(path);
else
rep->part[p].alignment = Mmap_align;
ASSERTne(rep->part[p].alignment, 0);
rep->nparts += 1;
return 0;
}
static int
util_replica_add_part(struct pool_replica **repp,
const char *path, size_t filesize)
{
return util_replica_add_part_by_idx(repp, path,
filesize, (*repp)->nparts);
}
static int
util_parse_add_part(struct pool_set *set, const char *path, size_t filesize)
{
LOG(3, "set %p path %s filesize %zu", set, path, filesize);
ASSERTne(set, NULL);
if (set->directory_based) {
ERR("cannot mix directories and files in a set");
errno = EINVAL;
return -1;
}
return util_replica_add_part(&set->replica[set->nreplicas - 1],
path, filesize);
}
static int
util_parse_add_directory(struct pool_set *set, const char *path,
size_t filesize)
{
LOG(3, "set %p path %s filesize %zu", set, path, filesize);
ASSERTne(set, NULL);
struct pool_replica *rep = set->replica[set->nreplicas - 1];
ASSERTne(rep, NULL);
if (set->directory_based == 0) {
if (rep->nparts != 0) {
ERR("cannot mix directories and files in a set");
errno = EINVAL;
return -1;
}
set->directory_based = 1;
}
struct pool_set_directory d;
d.path = path;
d.resvsize = filesize;
VEC_PUSH_BACK(&rep->directory, d);
rep->resvsize += filesize;
return 0;
}
static int
util_parse_add_element(struct pool_set *set, const char *path, size_t filesize)
{
LOG(3, "set %p path %s filesize %zu", set, path, filesize);
os_stat_t stat;
int olderrno = errno;
if (os_stat(path, &stat) == 0 && S_ISDIR(stat.st_mode))
return util_parse_add_directory(set, path, filesize);
errno = olderrno;
return util_parse_add_part(set, path, filesize);
}
static int
util_parse_add_replica(struct pool_set **setp)
{
LOG(3, "setp %p", setp);
ASSERTne(setp, NULL);
struct pool_set *set = *setp;
ASSERTne(set, NULL);
set = Realloc(set, sizeof(struct pool_set) +
(set->nreplicas + 1) * sizeof(struct pool_replica *));
if (set == NULL) {
ERR("!Realloc");
return -1;
}
*setp = set;
struct pool_replica *rep;
rep = Zalloc(sizeof(struct pool_replica));
if (rep == NULL) {
ERR("!Malloc");
return -1;
}
VEC_INIT(&rep->directory);
unsigned r = set->nreplicas++;
set->replica[r] = rep;
return 0;
}
static int
util_poolset_check_devdax(struct pool_set *set)
{
LOG(3, "set %p", set);
if (set->directory_based)
return 0;
for (unsigned r = 0; r < set->nreplicas; r++) {
struct pool_replica *rep = set->replica[r];
int is_dev_dax = rep->part[0].is_dev_dax;
for (unsigned p = 0; p < rep->nparts; p++) {
if (rep->part[p].is_dev_dax != is_dev_dax) {
ERR(
"either all the parts must be Device DAX or none");
return -1;
}
if (is_dev_dax && rep->nparts > 1 &&
(set->options & OPTION_NO_HDRS) == 0 &&
util_file_device_dax_alignment(rep->part[p].path)
!= Pagesize) {
ERR(
"Device DAX using huge pages must be the only part of the replica");
return -1;
}
}
}
return 0;
}
static void
util_poolset_set_size(struct pool_set *set)
{
set->poolsize = SIZE_MAX;
set->resvsize = SIZE_MAX;
for (unsigned r = 0; r < set->nreplicas; r++) {
struct pool_replica *rep = set->replica[r];
rep->nhdrs = (set->options & OPTION_NO_HDRS) ? 1 : rep->nparts;
rep->repsize = 0;
for (unsigned p = 0; p < rep->nparts; p++) {
rep->repsize +=
(rep->part[p].filesize & ~(Mmap_align - 1));
}
rep->repsize -= (rep->nhdrs - 1) * Mmap_align;
if (rep->resvsize == 0)
rep->resvsize = rep->repsize;
if (rep->remote == NULL && rep->repsize < set->poolsize)
set->poolsize = rep->repsize;
if (rep->remote == NULL && rep->resvsize < set->resvsize)
set->resvsize = rep->resvsize;
}
LOG(3, "pool size set to %zu", set->poolsize);
}
static int
util_parse_add_remote_replica(struct pool_set **setp, char *node_addr,
char *pool_desc)
{
LOG(3, "setp %p node_addr %s pool_desc %s", setp, node_addr, pool_desc);
ASSERTne(setp, NULL);
ASSERTne(node_addr, NULL);
ASSERTne(pool_desc, NULL);
int ret = util_parse_add_replica(setp);
if (ret != 0)
return ret;
ret = util_parse_add_part(*setp, NULL, 2 * POOL_HDR_SIZE);
if (ret != 0)
return ret;
struct pool_set *set = *setp;
struct pool_replica *rep = set->replica[set->nreplicas - 1];
ASSERTne(rep, NULL);
rep->remote = Zalloc(sizeof(struct remote_replica));
if (rep->remote == NULL) {
ERR("!Malloc");
return -1;
}
rep->remote->node_addr = node_addr;
rep->remote->pool_desc = pool_desc;
set->remote = 1;
return 0;
}
static char *
util_readline(FILE *fh)
{
size_t bufsize = PARSER_MAX_LINE;
size_t position = 0;
char *buffer = NULL;
do {
char *tmp = buffer;
buffer = Realloc(buffer, bufsize);
if (buffer == NULL) {
Free(tmp);
return NULL;
}
ASSERT(bufsize / 2 <= INT_MAX);
ASSERT((bufsize - position) >= (bufsize / 2));
char *s = util_fgets(buffer + position, (int)bufsize / 2, fh);
if (s == NULL) {
Free(buffer);
return NULL;
}
position = strlen(buffer);
bufsize *= 2;
} while (!feof(fh) && buffer[position - 1] != '\n');
return buffer;
}
static long
util_part_idx_by_file_name(const char *filename)
{
int olderrno = errno;
errno = 0;
long part_idx = strtol(filename, NULL, 0);
if (errno != 0)
return -1;
errno = olderrno;
return part_idx;
}
static int
util_poolset_directory_load(struct pool_replica **repp, const char *directory)
{
struct fs *f = fs_new(directory);
if (f == NULL)
return -1;
int nparts = 0;
char *path = NULL;
struct fs_entry *entry;
while ((entry = fs_read(f)) != NULL) {
if (entry->type != FS_ENTRY_FILE)
continue;
if (entry->namelen < PMEM_EXT_LEN)
continue;
const char *ext = entry->path + entry->pathlen -
PMEM_EXT_LEN + 1;
if (strcmp(PMEM_EXT, ext) != 0)
continue;
long part_idx = util_part_idx_by_file_name(entry->name);
if (part_idx < 0)
continue;
ssize_t size = util_file_get_size(entry->path);
if (size < 0) {
LOG(3,
"cannot read size of file (%s) in a poolset directory",
entry->path);
goto err_file_size;
}
if ((path = Strdup(entry->path)) == NULL)
goto err_path_alloc;
if (util_replica_add_part_by_idx(repp, path,
(size_t)size, (unsigned)part_idx) != 0) {
ERR("unable to load part %s",
entry->path);
goto err_replica_add;
}
nparts++;
}
fs_delete(f);
return nparts;
err_replica_add:
Free(path);
err_file_size:
err_path_alloc:
fs_delete(f);
return -1;
}
static int
util_poolset_directories_load(struct pool_set *set)
{
LOG(3, "set %p", set);
if (!set->directory_based)
return 0;
unsigned next_part_id = 0;
unsigned max_parts_rep = 0;
for (unsigned r = 0; r < set->nreplicas; r++) {
next_part_id = 0;
struct pool_set_directory *d;
int nparts = 0;
int prev_nparts = 0;
VEC_FOREACH_BY_PTR(d, &set->replica[r]->directory) {
prev_nparts = nparts;
nparts = util_poolset_directory_load(&set->replica[r],
d->path);
if (nparts < 0) {
ERR("failed to load parts from directory %s",
d->path);
return -1;
}
next_part_id += (unsigned)nparts;
if (r == 0 && prev_nparts > nparts)
set->next_directory_id++;
}
if (next_part_id > set->replica[max_parts_rep]->nparts)
max_parts_rep = r;
if (r == 0)
set->next_id = next_part_id;
}
struct pool_replica *rep;
struct pool_replica *mrep = set->replica[max_parts_rep];
for (unsigned r = 0; r < set->nreplicas; r++) {
if (set->replica[r]->nparts == mrep->nparts)
continue;
if (VEC_SIZE(&set->replica[r]->directory) == 0) {
ERR("no directories in replica");
return -1;
}
if (util_replica_reserve(&set->replica[r], mrep->nparts) != 0)
return -1;
rep = set->replica[r];
struct pool_set_directory *d = VEC_GET(&rep->directory, 0);
for (unsigned pidx = 0; pidx < rep->nallocated; ++pidx) {
struct pool_set_part *p = &rep->part[pidx];
*p = mrep->part[pidx];
size_t path_len = strlen(d->path) + PMEM_FILE_MAX_LEN;
if ((p->path = Malloc(path_len)) == NULL)
return -1;
snprintf((char *)p->path, path_len, "%s/%0*u%s",
d->path, PMEM_FILE_PADDING,
pidx, PMEM_EXT);
}
rep->nparts = mrep->nparts;
}
return 0;
}
int
util_poolset_parse(struct pool_set **setp, const char *path, int fd)
{
LOG(3, "setp %p path %s fd %d", setp, path, fd);
struct pool_set *set = NULL;
enum parser_codes result;
char *line;
char *ppath;
char *pool_desc;
char *node_addr;
char *cp;
size_t psize;
FILE *fs;
if (os_lseek(fd, 0, SEEK_SET) != 0) {
ERR("!lseek %d", fd);
return -1;
}
fd = dup(fd);
if (fd < 0) {
ERR("!dup");
return -1;
}
if ((fs = os_fdopen(fd, "r")) == NULL) {
ERR("!fdopen %d", fd);
os_close(fd);
return -1;
}
unsigned nlines = 0;
unsigned nparts = 0;
line = util_readline(fs);
if (line == NULL) {
ERR("!Reading poolset file");
goto err;
}
nlines++;
set = Zalloc(sizeof(struct pool_set));
if (set == NULL) {
ERR("!Malloc for pool set");
goto err;
}
if (strncmp(line, POOLSET_HDR_SIG, POOLSET_HDR_SIG_LEN) == 0 &&
line[POOLSET_HDR_SIG_LEN] == '\n') {
LOG(10, "PMEMPOOLSET");
int ret = util_parse_add_replica(&set);
if (ret != 0)
goto err;
nparts = 0;
result = PARSER_CONTINUE;
} else {
result = PARSER_PMEMPOOLSET;
}
while (result == PARSER_CONTINUE) {
Free(line);
line = util_readline(fs);
nlines++;
if (line) {
if ((cp = strchr(line, '\n')) != NULL)
*cp = '\0';
if (cp != line && (cp = strchr(line, '#')) != NULL)
*cp = '\0';
if (cp == line)
continue;
}
if (!line) {
if (nparts >= 1) {
result = PARSER_FORMAT_OK;
} else {
if (set->nreplicas == 1)
result = PARSER_SET_NO_PARTS;
else
result = PARSER_REP_NO_PARTS;
}
} else if (strncmp(line, POOLSET_OPTION_SIG,
POOLSET_OPTION_SIG_LEN) == 0) {
result = parser_read_options(
line + POOLSET_OPTION_SIG_LEN,
&set->options);
if (result == PARSER_CONTINUE) {
LOG(10, "OPTIONS: %x", set->options);
}
} else if (strncmp(line, POOLSET_REPLICA_SIG,
POOLSET_REPLICA_SIG_LEN) == 0) {
if (line[POOLSET_REPLICA_SIG_LEN] != '\0') {
char c = line[POOLSET_REPLICA_SIG_LEN];
if (!isblank((unsigned char)c)) {
result = PARSER_REPLICA;
continue;
}
result = parser_read_replica(
line + POOLSET_REPLICA_SIG_LEN,
&node_addr, &pool_desc);
if (result == PARSER_CONTINUE) {
LOG(10, "REMOTE REPLICA "
"node address '%s' "
"pool set descriptor '%s'",
node_addr, pool_desc);
if (util_parse_add_remote_replica(&set,
node_addr, pool_desc))
goto err;
}
} else if (nparts >= 1) {
LOG(10, "REPLICA");
int ret = util_parse_add_replica(&set);
if (ret != 0)
goto err;
nparts = 0;
result = PARSER_CONTINUE;
} else {
if (set->nreplicas == 1)
result = PARSER_SET_NO_PARTS;
else
result = PARSER_REP_NO_PARTS;
}
} else {
result = parser_read_line(line, &psize, &ppath);
if (result == PARSER_CONTINUE) {
int ret = util_parse_add_element(set,
ppath, psize);
if (ret != 0) {
Free(ppath);
goto err;
}
nparts++;
}
}
}
if (result != PARSER_FORMAT_OK) {
ERR("%s [%s:%d]", path, parser_errstr[result], nlines);
errno = EINVAL;
goto err;
}
if (util_poolset_check_devdax(set) != 0) {
errno = EINVAL;
goto err;
}
if (util_poolset_directories_load(set) != 0) {
ERR("cannot load part files from directories");
goto err;
}
LOG(4, "set file format correct (%s)", path);
(void) fclose(fs);
Free(line);
util_poolset_set_size(set);
*setp = set;
return 0;
err:
Free(line);
(void) fclose(fs);
if (set)
util_poolset_free(set);
return -1;
}
static struct pool_set *
util_poolset_single(const char *path, size_t filesize, int create)
{
LOG(3, "path %s filesize %zu create %d",
path, filesize, create);
struct pool_set *set;
set = Zalloc(sizeof(struct pool_set) +
sizeof(struct pool_replica *));
if (set == NULL) {
ERR("!Malloc for pool set");
return NULL;
}
struct pool_replica *rep;
rep = Zalloc(sizeof(struct pool_replica) +
sizeof(struct pool_set_part));
if (rep == NULL) {
ERR("!Malloc for pool set replica");
Free(set);
return NULL;
}
VEC_INIT(&rep->directory);
set->replica[0] = rep;
rep->part[0].filesize = filesize;
rep->part[0].path = Strdup(path);
rep->part[0].fd = -1;
rep->part[0].is_dev_dax = util_file_is_device_dax(path);
rep->part[0].created = create;
rep->part[0].hdr = NULL;
rep->part[0].addr = NULL;
if (rep->part[0].is_dev_dax)
rep->part[0].alignment = util_file_device_dax_alignment(path);
else
rep->part[0].alignment = Mmap_align;
ASSERTne(rep->part[0].alignment, 0);
rep->nallocated = 1;
rep->nparts = 1;
rep->nhdrs = 1;
rep->remote = NULL;
set->remote = 0;
rep->repsize = rep->part[0].filesize & ~(rep->part[0].alignment - 1);
rep->resvsize = rep->repsize;
set->poolsize = rep->repsize;
set->resvsize = rep->resvsize;
set->nreplicas = 1;
return set;
}
int
util_part_open(struct pool_set_part *part, size_t minsize, int create)
{
LOG(3, "part %p minsize %zu create %d", part, minsize, create);
if (os_access(part->path, F_OK) == 0)
create = 0;
part->created = 0;
if (create) {
part->fd = util_file_create(part->path, part->filesize,
minsize);
if (part->fd == -1) {
LOG(2, "failed to create file: %s", part->path);
return -1;
}
part->created = 1;
} else {
size_t size = 0;
int flags = O_RDWR;
part->fd = util_file_open(part->path, &size, minsize, flags);
if (part->fd == -1) {
LOG(2, "failed to open file: %s", part->path);
return -1;
}
if (part->filesize != size) {
ERR("file size does not match config: %s, %zu != %zu",
part->path, size, part->filesize);
errno = EINVAL;
return -1;
}
}
return 0;
}
void
util_part_fdclose(struct pool_set_part *part)
{
LOG(3, "part %p", part);
if (part->fd != -1) {
(void) os_close(part->fd);
part->fd = -1;
}
}
static void
util_part_set_attr(struct pool_hdr *hdrp, const char *sig,
const uint32_t major, const uint32_t compat, const uint32_t incompat,
const uint32_t ro_compat, const unsigned char *poolset_uuid,
const unsigned char *uuid, const unsigned char *next_part_uuid,
const unsigned char *prev_part_uuid,
const unsigned char *next_repl_uuid,
const unsigned char *prev_repl_uuid,
const unsigned char *arch_flags)
{
LOG(3, "hdrp %p sig %.8s major %u compat %#x incompat %#x "
"ro_compat %#x poolset_uuid %p uuid %p next_part_uuid %p"
"prev_part_uuid %p next_repl_uuid %p prev_repl_uuid %p "
"arch_flags %p", hdrp, sig, major, compat, incompat, ro_compat,
poolset_uuid, uuid, next_part_uuid, prev_part_uuid,
next_repl_uuid, prev_repl_uuid, arch_flags);
memcpy(hdrp->signature, sig, POOL_HDR_SIG_LEN);
hdrp->major = major;
hdrp->compat_features = compat;
hdrp->incompat_features = incompat;
hdrp->ro_compat_features = ro_compat;
memcpy(hdrp->poolset_uuid, poolset_uuid, POOL_HDR_UUID_LEN);
if (uuid)
memcpy(hdrp->uuid, uuid, POOL_HDR_UUID_LEN);
if (next_part_uuid)
memcpy(hdrp->next_part_uuid, next_part_uuid,
POOL_HDR_UUID_LEN);
if (prev_part_uuid)
memcpy(hdrp->prev_part_uuid, prev_part_uuid,
POOL_HDR_UUID_LEN);
memcpy(hdrp->next_repl_uuid, next_repl_uuid, POOL_HDR_UUID_LEN);
memcpy(hdrp->prev_repl_uuid, prev_repl_uuid, POOL_HDR_UUID_LEN);
memcpy(&hdrp->arch_flags, arch_flags, sizeof(struct arch_flags));
}
static void
util_get_attr(struct rpmem_pool_attr *rpmem_attr, const struct pool_hdr *hdrp)
{
LOG(4, "hdrp %p rpmem_attr %p", hdrp, rpmem_attr);
memcpy(rpmem_attr->signature, hdrp->signature, POOL_HDR_SIG_LEN);
rpmem_attr->major = hdrp->major;
rpmem_attr->compat_features = hdrp->compat_features;
rpmem_attr->incompat_features = hdrp->incompat_features;
rpmem_attr->ro_compat_features = hdrp->ro_compat_features;
memcpy(rpmem_attr->poolset_uuid, hdrp->poolset_uuid,
POOL_HDR_UUID_LEN);
memcpy(rpmem_attr->uuid, hdrp->uuid, POOL_HDR_UUID_LEN);
memcpy(rpmem_attr->next_uuid, hdrp->next_repl_uuid, POOL_HDR_UUID_LEN);
memcpy(rpmem_attr->prev_uuid, hdrp->prev_repl_uuid, POOL_HDR_UUID_LEN);
memcpy(rpmem_attr->user_flags, &hdrp->arch_flags,
sizeof(struct arch_flags));
}
static void
util_remote_store_attr(const struct rpmem_pool_attr *rpmem_attr,
struct pool_hdr *hdrp)
{
LOG(4, "rpmem_attr %p hdrp %p", rpmem_attr, hdrp);
util_part_set_attr(hdrp,
rpmem_attr->signature,
rpmem_attr->major,
rpmem_attr->compat_features,
rpmem_attr->incompat_features,
rpmem_attr->ro_compat_features,
rpmem_attr->poolset_uuid,
rpmem_attr->uuid,
rpmem_attr->uuid,
rpmem_attr->uuid,
rpmem_attr->next_uuid,
rpmem_attr->prev_uuid,
rpmem_attr->user_flags);
}
int
util_update_remote_header(struct pool_set *set, unsigned repn)
{
LOG(3, "set %p, repn %u", set, repn);
ASSERTne(REP(set, repn)->remote, NULL);
ASSERTne(REP(set, repn)->remote->rpp, NULL);
struct pool_replica *rep = REP(set, repn);
struct pool_hdr *hdr = HDR(rep, 0);
struct rpmem_pool_attr attributes;
util_get_attr(&attributes, hdr);
RPMEMpool *rpp = rep->remote->rpp;
int ret = Rpmem_set_attr(rpp, &attributes);
if (ret) {
ERR("!Rpmem_set_attr");
return -1;
}
return 0;
}
int
util_pool_close_remote(RPMEMpool *rpp)
{
LOG(3, "rpp %p", rpp);
return Rpmem_close(rpp);
}
int
util_poolset_remote_open(struct pool_replica *rep, unsigned repidx,
size_t minsize, int create, void *pool_addr,
size_t pool_size, unsigned *nlanes)
{
LOG(3, "rep %p repidx %u minsize %zu create %d "
"pool_addr %p pool_size %zu nlanes %p",
rep, repidx, minsize, create,
pool_addr, pool_size, nlanes);
ASSERTne(nlanes, NULL);
if (!Rpmem_handle_remote) {
return -1;
}
unsigned remote_nlanes = *nlanes;
if (create) {
struct rpmem_pool_attr rpmem_attr_create;
util_get_attr(&rpmem_attr_create, rep->part[0].hdr);
rep->remote->rpp = Rpmem_create(rep->remote->node_addr,
rep->remote->pool_desc,
pool_addr,
pool_size,
&remote_nlanes,
&rpmem_attr_create);
if (rep->remote->rpp == NULL) {
ERR("creating remote replica #%u failed", repidx);
return -1;
}
rep->part[0].created = 1;
} else {
struct rpmem_pool_attr rpmem_attr_open;
rep->remote->rpp = Rpmem_open(rep->remote->node_addr,
rep->remote->pool_desc,
pool_addr,
pool_size,
&remote_nlanes,
&rpmem_attr_open);
if (rep->remote->rpp == NULL) {
ERR("opening remote replica #%u failed", repidx);
return -1;
}
util_remote_store_attr(&rpmem_attr_open, rep->part[0].hdr);
}
if (remote_nlanes < *nlanes)
*nlanes = remote_nlanes;
return 0;
}
static int
util_poolset_files_local(struct pool_set *set, size_t minpartsize, int create)
{
LOG(3, "set %p minpartsize %zu create %d", set, minpartsize, create);
for (unsigned r = 0; r < set->nreplicas; r++) {
struct pool_replica *rep = set->replica[r];
if (!rep->remote) {
for (unsigned p = 0; p < rep->nparts; p++) {
if (util_part_open(&rep->part[p], minpartsize,
create))
return -1;
}
}
}
return 0;
}
int
util_poolset_remote_replica_open(struct pool_set *set, unsigned repidx,
size_t minsize, int create, unsigned *nlanes)
{
#ifndef _WIN32
if (set->replica[0]->part[0].is_dev_dax) {
int ret = os_madvise(set->replica[0]->part[0].addr,
set->replica[0]->part[0].filesize,
MADV_DONTFORK);
if (ret) {
ERR("!madvise");
return ret;
}
}
#endif
void *pool_addr = (void *)((uintptr_t)set->replica[0]->part[0].addr +
POOL_HDR_SIZE);
size_t pool_size = set->poolsize - POOL_HDR_SIZE;
return util_poolset_remote_open(set->replica[repidx], repidx, minsize,
create, pool_addr, pool_size, nlanes);
}
static int
util_poolset_files_remote(struct pool_set *set, size_t minsize,
unsigned *nlanes, int create)
{
LOG(3, "set %p minsize %zu nlanes %p create %d",
set, minsize, nlanes, create);
for (unsigned r = 0; r < set->nreplicas; r++) {
struct pool_replica *rep = set->replica[r];
if (rep->remote) {
if (util_poolset_remote_replica_open(set, r,
minsize, create, nlanes))
return -1;
}
}
return 0;
}
int
util_poolset_read(struct pool_set **setp, const char *path)
{
LOG(3, "setp %p path %s", setp, path);
int oerrno;
int ret = 0;
int fd;
if ((fd = os_open(path, O_RDONLY)) < 0)
return -1;
ret = util_poolset_parse(setp, path, fd);
oerrno = errno;
(void) os_close(fd);
errno = oerrno;
return ret;
}
int
util_poolset_create_set(struct pool_set **setp, const char *path,
size_t poolsize, size_t minsize)
{
LOG(3, "setp %p path %s poolsize %zu minsize %zu",
setp, path, poolsize, minsize);
int oerrno;
int ret = 0;
int fd;
size_t size = 0;
int is_dev_dax = util_file_is_device_dax(path);
if (poolsize != 0) {
if (is_dev_dax) {
ERR("size must be zero for device dax");
return -1;
}
*setp = util_poolset_single(path, poolsize, 1);
if (*setp == NULL)
return -1;
return 0;
}
if ((fd = util_file_open(path, &size, 0, O_RDONLY)) == -1)
return -1;
char signature[POOLSET_HDR_SIG_LEN];
if (!is_dev_dax) {
ret = (int)read(fd, signature, POOLSET_HDR_SIG_LEN);
if (ret < 0) {
ERR("!read %d", fd);
goto err;
}
}
if (is_dev_dax || ret < POOLSET_HDR_SIG_LEN ||
strncmp(signature, POOLSET_HDR_SIG, POOLSET_HDR_SIG_LEN)) {
LOG(4, "not a pool set header");
(void) os_close(fd);
if (size < minsize) {
ERR("file is not a poolset file and its size (%zu)"
" is smaller than %zu", size, minsize);
errno = EINVAL;
return -1;
}
*setp = util_poolset_single(path, size, 0);
if (*setp == NULL)
return -1;
return 0;
}
ret = util_poolset_parse(setp, path, fd);
#ifdef _WIN32
if (ret)
goto err;
if ((*setp)->remote) {
util_poolset_free(*setp);
ERR("remote replication is not supported on Windows");
errno = ENOTSUP;
ret = -1;
goto err;
}
#endif
err:
oerrno = errno;
(void) os_close(fd);
errno = oerrno;
return ret;
}
int
util_header_create(struct pool_set *set, unsigned repidx, unsigned partidx,
const char *sig, uint32_t major, uint32_t compat, uint32_t incompat,
uint32_t ro_compat, const unsigned char *prev_repl_uuid,
const unsigned char *next_repl_uuid, const unsigned char *arch_flags,
int overwrite)
{
LOG(3, "set %p repidx %u partidx %u sig %.8s major %u "
"compat %#x incompat %#x ro_compat %#x "
"prev_repl_uuid %p next_repl_uuid %p arch_flags %p "
"overwrite %d",
set, repidx, partidx, sig, major, compat, incompat,
ro_compat, prev_repl_uuid, next_repl_uuid, arch_flags,
overwrite);
struct pool_replica *rep = set->replica[repidx];
struct pool_hdr *hdrp = rep->part[partidx].hdr;
if (!util_is_zeroed(hdrp, sizeof(*hdrp)) && !overwrite) {
ERR("Non-empty file detected");
errno = EEXIST;
return -1;
}
memcpy(hdrp->signature, sig, POOL_HDR_SIG_LEN);
hdrp->major = major;
hdrp->compat_features = compat;
hdrp->incompat_features = incompat;
hdrp->ro_compat_features = ro_compat;
if (set->options & OPTION_NO_HDRS)
hdrp->incompat_features |= POOL_FEAT_NOHDRS;
memcpy(hdrp->poolset_uuid, set->uuid, POOL_HDR_UUID_LEN);
memcpy(hdrp->uuid, PART(rep, partidx).uuid, POOL_HDR_UUID_LEN);
if (set->options & OPTION_NO_HDRS) {
ASSERTeq(partidx, 0);
memcpy(hdrp->prev_part_uuid, PART(rep, 0).uuid,
POOL_HDR_UUID_LEN);
memcpy(hdrp->next_part_uuid, PART(rep, 0).uuid,
POOL_HDR_UUID_LEN);
} else {
memcpy(hdrp->prev_part_uuid, PARTP(rep, partidx).uuid,
POOL_HDR_UUID_LEN);
memcpy(hdrp->next_part_uuid, PARTN(rep, partidx).uuid,
POOL_HDR_UUID_LEN);
}
if (prev_repl_uuid) {
memcpy(hdrp->prev_repl_uuid, prev_repl_uuid, POOL_HDR_UUID_LEN);
} else {
memcpy(hdrp->prev_repl_uuid, PART(REPP(set, repidx), 0).uuid,
POOL_HDR_UUID_LEN);
}
if (next_repl_uuid) {
memcpy(hdrp->next_repl_uuid, next_repl_uuid, POOL_HDR_UUID_LEN);
} else {
memcpy(hdrp->next_repl_uuid, PART(REPN(set, repidx), 0).uuid,
POOL_HDR_UUID_LEN);
}
if (!rep->remote) {
os_stat_t stbuf;
if (os_fstat(rep->part[partidx].fd, &stbuf) != 0) {
ERR("!fstat");
return -1;
}
ASSERT(stbuf.st_ctime);
hdrp->crtime = (uint64_t)stbuf.st_ctime;
}
if (!arch_flags)
util_get_arch_flags(&hdrp->arch_flags);
util_convert2le_hdr(hdrp);
if (arch_flags) {
memcpy(&hdrp->arch_flags, arch_flags,
sizeof(struct arch_flags));
}
util_checksum(hdrp, sizeof(*hdrp), &hdrp->checksum, 1);
util_persist_auto(rep->part[partidx].is_dev_dax, hdrp, sizeof(*hdrp));
return 0;
}
static int
util_header_check(struct pool_set *set, unsigned repidx, unsigned partidx,
const char *sig, uint32_t major, uint32_t compat, uint32_t incompat,
uint32_t ro_compat)
{
LOG(3, "set %p repidx %u partidx %u sig %.8s major %u "
"compat %#x incompat %#x ro_compat %#x",
set, repidx, partidx, sig, major, compat, incompat, ro_compat);
struct pool_replica *rep = set->replica[repidx];
struct pool_hdr *hdrp = rep->part[partidx].hdr;
struct pool_hdr hdr;
memcpy(&hdr, hdrp, sizeof(hdr));
if (rep->remote == NULL && !util_convert_hdr(&hdr)) {
errno = EINVAL;
return -1;
}
if (memcmp(hdr.signature, sig, POOL_HDR_SIG_LEN)) {
ERR("wrong pool type: \"%.8s\"", hdr.signature);
errno = EINVAL;
return -1;
}
if (hdr.major != major) {
ERR("pool version %d (library expects %d)", hdr.major, major);
if (hdr.major < major) {
ERR("Please run the pmempool convert utility to "
"upgrade the pool.");
}
errno = EINVAL;
return -1;
}
if (util_check_arch_flags(&hdr.arch_flags)) {
ERR("wrong architecture flags");
errno = EINVAL;
return -1;
}
if (memcmp(HDR(REP(set, 0), 0)->poolset_uuid, hdr.poolset_uuid,
POOL_HDR_UUID_LEN)) {
ERR("wrong pool set UUID");
errno = EINVAL;
return -1;
}
if (memcmp(HDRP(rep, partidx)->uuid, hdr.prev_part_uuid,
POOL_HDR_UUID_LEN) ||
memcmp(HDRN(rep, partidx)->uuid, hdr.next_part_uuid,
POOL_HDR_UUID_LEN)) {
ERR("wrong part UUID");
errno = EINVAL;
return -1;
}
if (HDR(rep, 0)->major != hdrp->major) {
ERR("incompatible pool format");
errno = EINVAL;
return -1;
}
if (HDR(rep, 0)->compat_features != hdrp->compat_features ||
HDR(rep, 0)->incompat_features != hdrp->incompat_features ||
HDR(rep, 0)->ro_compat_features != hdrp->ro_compat_features) {
ERR("incompatible feature flags");
errno = EINVAL;
return -1;
}
rep->part[partidx].rdonly = 0;
int retval = util_feature_check(&hdr, incompat, ro_compat, compat);
if (retval < 0)
return -1;
if (retval == 0)
rep->part[partidx].rdonly = 1;
return 0;
}
static int
util_header_check_remote(struct pool_replica *rep, unsigned partidx)
{
LOG(3, "rep %p partidx %u ", rep, partidx);
struct pool_hdr *hdrp = rep->part[partidx].hdr;
struct pool_hdr hdr;
if (util_is_zeroed(hdrp, sizeof(*hdrp))) {
ERR("pool header zeroed");
errno = EINVAL;
return -1;
}
memcpy(&hdr, hdrp, sizeof(hdr));
if (!util_convert_hdr_remote(&hdr)) {
errno = EINVAL;
return -1;
}
if (memcmp(HDR(rep, 0)->signature, hdrp->signature, POOL_HDR_SIG_LEN)) {
ERR("pool signature mismatch in part %d", partidx);
errno = EINVAL;
return -1;
}
if (HDR(rep, 0)->major != hdrp->major) {
ERR("pool version mismatch in part %d", partidx);
errno = EINVAL;
return -1;
}
if (HDR(rep, 0)->compat_features != hdrp->compat_features) {
ERR("'may have' compatibility flags mismatch in part %d",
partidx);
errno = EINVAL;
return -1;
}
if (HDR(rep, 0)->incompat_features != hdrp->incompat_features) {
ERR("'must support' compatibility flags mismatch in part %d",
partidx);
errno = EINVAL;
return -1;
}
if (HDR(rep, 0)->ro_compat_features != hdrp->ro_compat_features) {
ERR("'force read-only' compatibility flags mismatch in part %d",
partidx);
errno = EINVAL;
return -1;
}
if (memcmp(HDR(rep, 0)->poolset_uuid, hdrp->poolset_uuid,
POOL_HDR_UUID_LEN)) {
ERR("wrong pool set UUID in part %d", partidx);
errno = EINVAL;
return -1;
}
if (memcmp(HDR(rep, 0)->prev_repl_uuid, hdrp->prev_repl_uuid,
POOL_HDR_UUID_LEN)) {
ERR("wrong previous replica UUID in part %d", partidx);
errno = EINVAL;
return -1;
}
if (memcmp(HDR(rep, 0)->next_repl_uuid, hdrp->next_repl_uuid,
POOL_HDR_UUID_LEN)) {
ERR("wrong next replica UUID in part %d", partidx);
errno = EINVAL;
return -1;
}
if (memcmp(&HDR(rep, 0)->arch_flags, &hdrp->arch_flags,
sizeof(hdrp->arch_flags))) {
ERR("wrong architecture flags");
errno = EINVAL;
return -1;
}
if (memcmp(HDRP(rep, partidx)->uuid, hdrp->prev_part_uuid,
POOL_HDR_UUID_LEN) ||
memcmp(HDRN(rep, partidx)->uuid, hdrp->next_part_uuid,
POOL_HDR_UUID_LEN)) {
ERR("wrong part UUID in part %d", partidx);
errno = EINVAL;
return -1;
}
rep->part[partidx].rdonly = 0;
return 0;
}
static int
util_replica_map_local(struct pool_set *set, unsigned repidx, int flags)
{
LOG(3, "set %p repidx %u flags %d", set, repidx, flags);
#ifndef _WIN32
int remaining_retries = 0;
#else
int remaining_retries = 10;
#endif
int retry_for_contiguous_addr;
size_t mapsize;
size_t hdrsize = (set->options & OPTION_NO_HDRS) ? 0 : Mmap_align;
void *addr;
struct pool_replica *rep = set->replica[repidx];
ASSERTeq(rep->remote, NULL);
ASSERTne(rep->part, NULL);
do {
retry_for_contiguous_addr = 0;
mapsize = rep->part[0].filesize & ~(Mmap_align - 1);
addr = util_map_hint(rep->resvsize, 0);
if (addr == MAP_FAILED) {
ERR("cannot find a contiguous region of given size");
return -1;
}
if (util_map_part(&rep->part[0], addr, rep->resvsize, 0,
flags, 0) != 0) {
LOG(2, "pool mapping failed - replica #%u part #0",
repidx);
return -1;
}
VALGRIND_REGISTER_PMEM_MAPPING(rep->part[0].addr,
rep->part[0].size);
VALGRIND_REGISTER_PMEM_FILE(rep->part[0].fd,
rep->part[0].addr, rep->part[0].size, 0);
set->zeroed &= rep->part[0].created;
addr = (char *)rep->part[0].addr + mapsize;
for (unsigned p = 1; p < rep->nparts; p++) {
if (util_map_part(&rep->part[p], addr, 0, hdrsize,
flags | MAP_FIXED, 0) != 0) {
if ((errno == EINVAL) &&
(remaining_retries > 0)) {
LOG(2, "usable space mapping failed - "
"part #%d - retrying", p);
retry_for_contiguous_addr = 1;
remaining_retries--;
util_unmap_parts(rep, 0, p - 1);
ASSERTne(addr, NULL);
ASSERTne(addr, MAP_FAILED);
munmap(addr, rep->resvsize - mapsize);
break;
}
LOG(2, "usable space mapping failed - part #%d",
p);
goto err;
}
VALGRIND_REGISTER_PMEM_FILE(rep->part[p].fd,
rep->part[p].addr, rep->part[p].size,
hdrsize);
mapsize += rep->part[p].size;
set->zeroed &= rep->part[p].created;
addr = (char *)addr + rep->part[p].size;
}
} while (retry_for_contiguous_addr);
rep->is_pmem = rep->part[0].is_dev_dax ||
pmem_is_pmem(rep->part[0].addr, rep->part[0].size);
if (Prefault_at_create)
util_replica_force_page_allocation(rep);
ASSERTeq(mapsize, rep->repsize);
LOG(3, "replica #%u addr %p", repidx, rep->part[0].addr);
return 0;
err:
LOG(4, "error clean up");
int oerrno = errno;
if (mapsize < rep->repsize) {
ASSERTne(rep->part[0].addr, NULL);
ASSERTne(rep->part[0].addr, MAP_FAILED);
munmap(rep->part[0].addr, rep->resvsize - mapsize);
}
for (unsigned p = 0; p < rep->nparts; p++) {
util_unmap_part(&rep->part[p]);
}
errno = oerrno;
return -1;
}
static int
util_replica_init_headers_local(struct pool_set *set, unsigned repidx,
int flags, const char *sig, uint32_t major, uint32_t compat,
uint32_t incompat, uint32_t ro_compat,
const unsigned char *prev_repl_uuid,
const unsigned char *next_repl_uuid, const unsigned char *arch_flags)
{
LOG(3, "set %p repidx %u flags %d sig %.8s major %u "
"compat %#x incompat %#x ro_compat %#x"
"prev_repl_uuid %p next_repl_uuid %p arch_flags %p",
set, repidx, flags, sig, major,
compat, incompat, ro_compat,
prev_repl_uuid, next_repl_uuid, arch_flags);
struct pool_replica *rep = set->replica[repidx];
for (unsigned p = 0; p < rep->nhdrs; p++) {
if (util_map_hdr(&rep->part[p], flags, 0) != 0) {
LOG(2, "header mapping failed - part #%d", p);
goto err;
}
}
for (unsigned p = 0; p < rep->nhdrs; p++) {
if (util_header_create(set, repidx, p, sig, major,
compat, incompat, ro_compat,
prev_repl_uuid, next_repl_uuid,
arch_flags, 0) != 0) {
LOG(2, "header creation failed - part #%d", p);
goto err;
}
}
for (unsigned p = 0; p < rep->nhdrs; p++)
util_unmap_hdr(&rep->part[p]);
return 0;
err:
LOG(4, "error clean up");
int oerrno = errno;
for (unsigned p = 0; p < rep->nhdrs; p++) {
util_unmap_hdr(&rep->part[p]);
}
errno = oerrno;
return -1;
}
static int
util_replica_create_local(struct pool_set *set, unsigned repidx, int flags,
const char *sig, uint32_t major, uint32_t compat, uint32_t incompat,
uint32_t ro_compat, const unsigned char *prev_repl_uuid,
const unsigned char *next_repl_uuid, const unsigned char *arch_flags)
{
LOG(3, "set %p repidx %u flags %d sig %.8s major %u "
"compat %#x incompat %#x ro_compat %#x "
"prev_repl_uuid %p next_repl_uuid %p arch_flags %p",
set, repidx, flags, sig, major,
compat, incompat, ro_compat,
prev_repl_uuid, next_repl_uuid, arch_flags);
if (PART(REP(set, repidx), 0).addr == NULL) {
if (util_replica_map_local(set, repidx, flags) != 0) {
LOG(2, "replica #%u map failed", repidx);
return -1;
}
}
if (util_replica_init_headers_local(set, repidx, flags, sig, major,
compat, incompat, ro_compat, prev_repl_uuid,
next_repl_uuid, arch_flags) != 0) {
LOG(2, "replica #%u headers initialization failed", repidx);
return -1;
}
return 0;
}
static int
util_replica_create_remote(struct pool_set *set, unsigned repidx, int flags,
const char *sig, uint32_t major, uint32_t compat, uint32_t incompat,
uint32_t ro_compat, const unsigned char *prev_repl_uuid,
const unsigned char *next_repl_uuid)
{
LOG(3, "set %p repidx %u flags %d sig %.8s major %u "
"compat %#x incompat %#x ro_compat %#x "
"prev_repl_uuid %p next_repl_uuid %p",
set, repidx, flags, sig, major,
compat, incompat, ro_compat,
prev_repl_uuid, next_repl_uuid);
struct pool_replica *rep = set->replica[repidx];
ASSERTne(rep->remote, NULL);
ASSERTne(rep->part, NULL);
ASSERTeq(rep->nparts, 1);
ASSERTeq(rep->nhdrs, 1);
struct pool_set_part *part = rep->part;
part->size = rep->repsize;
ASSERT(IS_PAGE_ALIGNED(part->size));
part->remote_hdr = Zalloc(part->size + Pagesize);
if (!part->remote_hdr) {
ERR("!Zalloc");
return -1;
}
part->hdr = PAGE_ALIGN_UP(part->remote_hdr);
part->addr = PAGE_ALIGN_UP(part->remote_hdr);
part->hdrsize = POOL_HDR_SIZE;
if (util_header_create(set, repidx, 0, sig, major,
compat, incompat, ro_compat,
prev_repl_uuid, next_repl_uuid, NULL, 0) != 0) {
LOG(2, "header creation failed - part #0");
Free(part->remote_hdr);
return -1;
}
LOG(3, "replica #%u addr %p", repidx, rep->part[0].addr);
return 0;
}
int
util_replica_close(struct pool_set *set, unsigned repidx)
{
LOG(3, "set %p repidx %u", set, repidx);
struct pool_replica *rep = set->replica[repidx];
if (rep->remote == NULL) {
for (unsigned p = 0; p < rep->nhdrs; p++)
util_unmap_hdr(&rep->part[p]);
util_unmap_part(&rep->part[0]);
} else {
LOG(4, "freeing volatile header of remote replica #%u", repidx);
Free(rep->part[0].remote_hdr);
rep->part[0].remote_hdr = NULL;
rep->part[0].hdr = NULL;
rep->part[0].hdrsize = 0;
rep->part[0].addr = NULL;
rep->part[0].size = 0;
}
return 0;
}
static int
util_poolset_append_new_part(struct pool_set *set, size_t size)
{
LOG(3, "set %p size %zu", set, size);
if (!set->directory_based)
return -1;
struct pool_set_directory *d;
size_t directory_id;
struct part {
char *path;
size_t path_len;
int fd;
} *parts = Zalloc(sizeof(*parts) * set->nreplicas);
if (parts == NULL)
return -1;
for (unsigned r = 0; r < set->nreplicas; ++r) {
if (util_replica_reserve(&set->replica[r],
set->replica[r]->nparts + 1) != 0)
goto err_part_init;
struct pool_replica *rep = set->replica[r];
directory_id = set->next_directory_id %
VEC_SIZE(&rep->directory);
d = VEC_GET(&rep->directory, directory_id);
struct part *p = &parts[r];
p->path_len = strlen(d->path) + PMEM_FILE_MAX_LEN;
if ((p->path = Malloc(p->path_len)) == NULL)
goto err_part_init;
snprintf(p->path, p->path_len, "%s/%0*u%s",
d->path, PMEM_FILE_PADDING, set->next_id, PMEM_EXT);
p->fd = os_open(p->path, O_RDWR | O_CREAT | O_EXCL,
S_IRUSR | S_IWUSR);
if (p->fd < 0)
goto err_part_init;
if (os_posix_fallocate(p->fd, 0, (os_off_t)size) != 0)
goto err_part_init;
}
for (unsigned r = 0; r < set->nreplicas; ++r) {
struct part *p = &parts[r];
if (util_replica_add_part(&set->replica[r], p->path, size) != 0)
FATAL("cannot add a new part to the replica info");
os_close(p->fd);
p->fd = 0;
}
Free(parts);
set->next_directory_id += 1;
set->next_id += 1;
util_poolset_set_size(set);
return 0;
err_part_init:
for (unsigned r = 0; r < set->nreplicas; ++r) {
struct part *p = &parts[r];
if (p->path != NULL) {
os_unlink(p->path);
Free(p->path);
}
if (p->fd > 0)
os_close(p->fd);
}
Free(parts);
return -1;
}
void *
util_pool_extend(struct pool_set *set, size_t size)
{
LOG(3, "set %p size %zu", set, size);
if (size == 0) {
ERR("cannot extend pool by 0 bytes");
return NULL;
}
if ((set->options & OPTION_NO_HDRS) == 0) {
ERR(
"extending the pool by appending parts with headers is not supported!");
return NULL;
}
if (set->poolsize + size > set->resvsize) {
ERR("exceeded reservation size");
return NULL;
}
size_t old_poolsize = set->poolsize;
if (util_poolset_append_new_part(set, size) != 0) {
ERR("unable to append a new part to the pool");
return NULL;
}
size_t hdrsize = (set->options & OPTION_NO_HDRS) ? 0 : Mmap_align;
void *addr = NULL;
void *addr_base = NULL;
unsigned r;
for (r = 0; r < set->nreplicas; r++) {
struct pool_replica *rep = set->replica[r];
unsigned pidx = rep->nparts - 1;
struct pool_set_part *p = &rep->part[pidx];
if (util_part_open(p, 0, 0) != 0) {
ERR("cannot open the new part");
goto err;
}
addr = (char *)rep->part[0].addr + old_poolsize;
if (addr_base == NULL)
addr_base = addr;
if (util_map_part(p, addr, 0, hdrsize,
MAP_SHARED | MAP_FIXED, 0) != 0) {
ERR("cannot map the new part");
goto err;
}
}
return addr_base;
err:
for (unsigned rn = 0; rn <= r; ++rn) {
struct pool_replica *rep = set->replica[r];
unsigned pidx = rep->nparts - 1;
struct pool_set_part *p = &rep->part[pidx];
rep->nparts--;
if (p->fd != 0)
os_close(p->fd);
os_unlink(p->path);
}
util_poolset_set_size(set);
return NULL;
}
int
util_pool_create_uuids(struct pool_set **setp, const char *path,
size_t poolsize, size_t minsize, size_t minpartsize, const char *sig,
uint32_t major, uint32_t compat, uint32_t incompat, uint32_t ro_compat,
unsigned *nlanes, int can_have_rep, int remote, struct pool_attr *pattr)
{
LOG(3, "setp %p path %s poolsize %zu minsize %zu minpartsize %zu "
"sig %.8s major %u compat %#x incompat %#x ro_compat %#x "
"nlanes %p can_have_rep %i remote %i pattr %p",
setp, path, poolsize, minsize, minpartsize,
sig, major, compat, incompat, ro_compat,
nlanes, can_have_rep, remote, pattr);
ASSERT(!remote || pattr != NULL);
int flags = MAP_SHARED;
int oerrno;
if (poolsize > 0 && os_access(path, F_OK) == 0) {
ERR("file %s already exists", path);
errno = EEXIST;
return -1;
}
int ret = util_poolset_create_set(setp, path, poolsize, minsize);
if (ret < 0) {
LOG(2, "cannot create pool set -- '%s'", path);
return -1;
}
struct pool_set *set = *setp;
if (set->directory_based &&
util_poolset_append_new_part(set, minsize) != 0) {
ERR("cannot create a new part in provided directories");
util_poolset_free(set);
return -1;
}
ASSERT(set->nreplicas > 0);
if (set->poolsize < minsize) {
ERR("net pool size %zu smaller than %zu", set->poolsize,
minsize);
util_poolset_free(set);
errno = EINVAL;
return -1;
}
if (remote) {
if (set->nreplicas > 1) {
LOG(2, "remote pool set cannot have replicas");
util_poolset_free(set);
errno = EINVAL;
return -1;
}
}
if (!can_have_rep && set->nreplicas > 1) {
ERR("replication not supported");
util_poolset_free(set);
errno = ENOTSUP;
return -1;
}
if (set->remote && util_remote_load()) {
ERR("the pool set requires a remote replica, "
"but the '%s' library cannot be loaded",
LIBRARY_REMOTE);
util_poolset_free(set);
return -1;
}
set->zeroed = 1;
if (pattr && pattr->poolset_uuid) {
memcpy(set->uuid, pattr->poolset_uuid, POOL_HDR_UUID_LEN);
} else {
ret = util_uuid_generate(set->uuid);
if (ret < 0) {
LOG(2, "cannot generate pool set UUID");
goto err_unload;
}
}
for (unsigned r = 0; r < set->nreplicas; r++) {
struct pool_replica *rep = set->replica[r];
for (unsigned i = 0; i < rep->nhdrs; i++) {
ret = util_uuid_generate(rep->part[i].uuid);
if (ret < 0) {
LOG(2, "cannot generate pool set part UUID");
goto err_unload;
}
}
}
if (pattr && pattr->first_part_uuid) {
memcpy(set->replica[0]->part[0].uuid, pattr->first_part_uuid,
POOL_HDR_UUID_LEN);
}
ret = util_poolset_files_local(set, minpartsize, 1);
if (ret != 0)
goto err_poolset;
ret = util_replica_map_local(set, 0, flags);
if (ret != 0)
goto err_poolset;
if (set->remote) {
for (unsigned r = 0; r < set->nreplicas; r++) {
if (REP(set, r)->remote == NULL) {
continue;
}
if (util_replica_create_remote(set, r, flags, sig,
major, compat, incompat,
ro_compat, NULL, NULL) != 0) {
LOG(2, "replica #%u creation failed", r);
goto err_create;
}
}
ret = util_poolset_files_remote(set, minsize, nlanes,
1 );
if (ret != 0)
goto err_create;
}
if (remote) {
if (util_replica_create_local(set, 0, flags, sig, major,
compat, incompat, ro_compat,
pattr->prev_repl_uuid,
pattr->next_repl_uuid,
pattr->user_flags) != 0) {
LOG(2, "replica #0 creation failed");
goto err_create;
}
} else {
for (unsigned r = 0; r < set->nreplicas; r++) {
if (REP(set, r)->remote != NULL) {
continue;
}
if (util_replica_create_local(set, r, flags, sig, major,
compat, incompat, ro_compat,
NULL, NULL,
NULL) != 0) {
LOG(2, "replica #%u creation failed", r);
goto err_create;
}
}
}
return 0;
err_create:
oerrno = errno;
for (unsigned r = 0; r < set->nreplicas; r++)
util_replica_close(set, r);
errno = oerrno;
err_poolset:
oerrno = errno;
util_poolset_close(set, DELETE_CREATED_PARTS);
errno = oerrno;
return -1;
err_unload:
oerrno = errno;
if (set->remote)
util_remote_unload();
errno = oerrno;
return -1;
}
int
util_pool_create(struct pool_set **setp, const char *path, size_t poolsize,
size_t minsize, size_t minpartsize, const char *sig, uint32_t major,
uint32_t compat, uint32_t incompat, uint32_t ro_compat,
unsigned *nlanes, int can_have_rep)
{
LOG(3, "setp %p path %s poolsize %zu minsize %zu minpartsize %zu "
"sig %.8s major %u compat %#x incompat %#x "
"ro_compat %#x nlanes %p can_have_rep %i",
setp, path, poolsize, minsize, minpartsize,
sig, major, compat, incompat, ro_compat, nlanes, can_have_rep);
return util_pool_create_uuids(setp, path, poolsize, minsize,
minpartsize, sig, major, compat, incompat, ro_compat,
nlanes, can_have_rep, POOL_LOCAL, NULL);
}
static int
util_replica_open_local(struct pool_set *set, unsigned repidx, int flags)
{
LOG(3, "set %p repidx %u flags %d", set, repidx, flags);
int remaining_retries = 10;
int retry_for_contiguous_addr;
size_t mapsize;
size_t hdrsize = (set->options & OPTION_NO_HDRS) ? 0 : Mmap_align;
struct pool_replica *rep = set->replica[repidx];
void *addr = rep->mapaddr;
ASSERT(repidx == 0 || addr == NULL);
do {
retry_for_contiguous_addr = 0;
if (addr == NULL)
addr = util_map_hint(rep->resvsize, 0);
if (addr == MAP_FAILED) {
ERR("cannot find a contiguous region of given size");
return -1;
}
mapsize = rep->part[0].filesize & ~(Mmap_align - 1);
if (util_map_part(&rep->part[0], addr, rep->resvsize, 0,
flags, 0) != 0) {
LOG(2, "pool mapping failed - replica #%u part #0",
repidx);
return -1;
}
VALGRIND_REGISTER_PMEM_MAPPING(rep->part[0].addr,
rep->resvsize);
VALGRIND_REGISTER_PMEM_FILE(rep->part[0].fd,
rep->part[0].addr, rep->part[0].size, 0);
for (unsigned p = 0; p < rep->nhdrs; p++) {
if (util_map_hdr(&rep->part[p], flags, 0) != 0) {
LOG(2, "header mapping failed - part #%d", p);
goto err;
}
}
addr = (char *)rep->part[0].addr + mapsize;
for (unsigned p = 1; p < rep->nparts; p++) {
if (util_map_part(&rep->part[p], addr, 0, hdrsize,
flags | MAP_FIXED, 0) != 0) {
if ((errno == EINVAL) &&
(remaining_retries > 0)) {
LOG(2, "usable space mapping failed - "
"part #%d - retrying", p);
retry_for_contiguous_addr = 1;
remaining_retries--;
util_unmap_parts(rep, 0, p - 1);
munmap(rep->part[0].addr,
rep->resvsize);
break;
}
LOG(2, "usable space mapping failed - part #%d",
p);
goto err;
}
VALGRIND_REGISTER_PMEM_FILE(rep->part[p].fd,
rep->part[p].addr, rep->part[p].size,
hdrsize);
mapsize += rep->part[p].size;
addr = (char *)addr + rep->part[p].size;
}
} while (retry_for_contiguous_addr);
rep->is_pmem = rep->part[0].is_dev_dax ||
pmem_is_pmem(rep->part[0].addr, rep->part[0].size);
if (Prefault_at_open)
util_replica_force_page_allocation(rep);
ASSERTeq(mapsize, rep->repsize);
if (rep->repsize < set->poolsize)
set->poolsize = rep->repsize;
LOG(3, "replica addr %p", rep->part[0].addr);
return 0;
err:
LOG(4, "error clean up");
int oerrno = errno;
if (mapsize < rep->repsize) {
ASSERTne(rep->part[0].addr, NULL);
ASSERTne(rep->part[0].addr, MAP_FAILED);
munmap(rep->part[0].addr, rep->resvsize - mapsize);
}
for (unsigned p = 0; p < rep->nhdrs; p++)
util_unmap_hdr(&rep->part[p]);
for (unsigned p = 0; p < rep->nparts; p++)
util_unmap_part(&rep->part[p]);
errno = oerrno;
return -1;
}
int
util_replica_open_remote(struct pool_set *set, unsigned repidx, int flags)
{
LOG(3, "set %p repidx %u flags %d", set, repidx, flags);
struct pool_replica *rep = set->replica[repidx];
ASSERTne(rep->remote, NULL);
ASSERTne(rep->part, NULL);
ASSERTeq(rep->nparts, 1);
ASSERTeq(rep->nhdrs, 1);
struct pool_set_part *part = rep->part;
part->size = rep->repsize;
ASSERT(IS_PAGE_ALIGNED(part->size));
part->remote_hdr = Zalloc(part->size + Pagesize);
if (!part->remote_hdr) {
ERR("!Zalloc");
return -1;
}
part->hdr = PAGE_ALIGN_UP(part->remote_hdr);
part->addr = PAGE_ALIGN_UP(part->remote_hdr);
part->hdrsize = POOL_HDR_SIZE;
LOG(3, "replica #%u addr %p", repidx, rep->part[0].addr);
return 0;
}
int
util_replica_open(struct pool_set *set, unsigned repidx, int flags)
{
LOG(3, "set %p repidx %u flags %d", set, repidx, flags);
if (set->replica[repidx]->remote == NULL)
return util_replica_open_local(set, repidx, flags);
else
return util_replica_open_remote(set, repidx, flags);
}
int
util_replica_set_attr(struct pool_replica *rep, const char *sig,
uint32_t major, uint32_t compat, uint32_t incompat, uint32_t ro_compat,
const unsigned char *poolset_uuid, const unsigned char *uuid,
const unsigned char *next_repl_uuid,
const unsigned char *prev_repl_uuid,
const unsigned char *arch_flags)
{
int flags = MAP_SHARED;
for (unsigned p = 0; p < rep->nparts; p++) {
if (util_map_hdr(&rep->part[p], flags, 0) != 0) {
LOG(2, "header mapping failed - part #%d", p);
goto err;
}
}
const unsigned char *part_uuid;
const unsigned char *next_part_uuid;
const unsigned char *prev_part_uuid;
for (unsigned p = 0; p < rep->nhdrs; p++) {
struct pool_hdr *hdrp = HDR(rep, p);
ASSERTne(hdrp, NULL);
util_convert2h_hdr_nocheck(hdrp);
part_uuid = (hdrp == HDR(rep, 0)) ? uuid : NULL;
next_part_uuid = (hdrp == HDRP(rep, 0)) ? uuid : NULL;
prev_part_uuid = (hdrp == HDRN(rep, 0)) ? uuid : NULL;
util_part_set_attr(hdrp, sig, major, compat, incompat,
ro_compat, poolset_uuid, part_uuid,
next_part_uuid, prev_part_uuid, next_repl_uuid,
prev_repl_uuid, arch_flags);
util_convert2le_hdr(hdrp);
util_checksum(hdrp, sizeof(*hdrp), &hdrp->checksum, 1);
util_persist_auto(rep->part[p].is_dev_dax, hdrp, sizeof(*hdrp));
}
for (unsigned p = 0; p < rep->nhdrs; p++)
util_unmap_hdr(&rep->part[p]);
return 0;
err:
for (unsigned p = 0; p < rep->nhdrs; p++) {
util_unmap_hdr(&rep->part[p]);
}
return -1;
}
static void
util_unmap_all_hdrs(struct pool_set *set)
{
LOG(3, "set %p", set);
for (unsigned r = 0; r < set->nreplicas; r++) {
struct pool_replica *rep = set->replica[r];
if (rep->remote == NULL) {
for (unsigned p = 0; p < rep->nhdrs; p++)
util_unmap_hdr(&rep->part[p]);
} else {
rep->part[0].hdr = NULL;
rep->part[0].hdrsize = 0;
}
}
}
static int
util_replica_check(struct pool_set *set, const char *sig, uint32_t major,
uint32_t compat, uint32_t incompat, uint32_t ro_compat)
{
LOG(3, "set %p sig %.8s major %u compat %#x incompat %#x ro_compat %#x",
set, sig, major, compat, incompat, ro_compat);
for (unsigned r = 0; r < set->nreplicas; r++) {
struct pool_replica *rep = set->replica[r];
for (unsigned p = 0; p < rep->nhdrs; p++) {
if (util_header_check(set, r, p, sig, major,
compat, incompat, ro_compat) != 0) {
LOG(2, "header check failed - part #%d", p);
return -1;
}
set->rdonly |= rep->part[p].rdonly;
}
if (memcmp(HDR(REPP(set, r), 0)->uuid,
HDR(REP(set, r), 0)->prev_repl_uuid,
POOL_HDR_UUID_LEN) ||
memcmp(HDR(REPN(set, r), 0)->uuid,
HDR(REP(set, r), 0)->next_repl_uuid,
POOL_HDR_UUID_LEN)) {
ERR("wrong replica UUID");
errno = EINVAL;
return -1;
}
}
return 0;
}
int
util_pool_open_nocheck(struct pool_set *set, int cow)
{
LOG(3, "set %p cow %i", set, cow);
if (cow && set->replica[0]->part[0].is_dev_dax) {
ERR("device dax cannot be mapped privately");
errno = ENOTSUP;
return -1;
}
int flags = cow ? MAP_PRIVATE|MAP_NORESERVE : MAP_SHARED;
int oerrno;
ASSERTne(set, NULL);
ASSERT(set->nreplicas > 0);
if (set->remote && util_remote_load()) {
ERR("the pool set requires a remote replica, "
"but the '%s' library cannot be loaded",
LIBRARY_REMOTE);
return -1;
}
int ret = util_poolset_files_local(set, 0 , 0);
if (ret != 0)
goto err_poolset;
set->rdonly = 0;
for (unsigned r = 0; r < set->nreplicas; r++)
if (util_replica_open(set, r, flags) != 0) {
LOG(2, "replica #%u open failed", r);
goto err_replica;
}
if (set->remote) {
ret = util_poolset_files_remote(set, 0, NULL, 0);
if (ret != 0)
goto err_replica;
}
util_unmap_all_hdrs(set);
return 0;
err_replica:
LOG(4, "error clean up");
oerrno = errno;
for (unsigned r = 0; r < set->nreplicas; r++)
util_replica_close(set, r);
errno = oerrno;
err_poolset:
oerrno = errno;
util_poolset_close(set, DO_NOT_DELETE_PARTS);
errno = oerrno;
return -1;
}
int
util_pool_open(struct pool_set **setp, const char *path, int cow,
size_t minpartsize, const char *sig, uint32_t major,
uint32_t compat, uint32_t incompat, uint32_t ro_compat,
unsigned *nlanes, void *addr)
{
LOG(3, "setp %p path %s cow %d minpartsize %zu sig %.8s major %u "
"compat %#x incompat %#x ro_compat %#x nlanes %p addr %p",
setp, path, cow, minpartsize, sig, major,
compat, incompat, ro_compat, nlanes, addr);
int flags = cow ? MAP_PRIVATE|MAP_NORESERVE : MAP_SHARED;
int oerrno;
int ret = util_poolset_create_set(setp, path, 0, 0);
if (ret < 0) {
LOG(2, "cannot open pool set -- '%s'", path);
return -1;
}
(*setp)->replica[0]->mapaddr = addr;
if (cow && (*setp)->replica[0]->part[0].is_dev_dax) {
ERR("device dax cannot be mapped privately");
errno = ENOTSUP;
util_poolset_free(*setp);
return -1;
}
struct pool_set *set = *setp;
ASSERT(set->nreplicas > 0);
if (set->remote && util_remote_load()) {
ERR("the pool set requires a remote replica, "
"but the '%s' library cannot be loaded",
LIBRARY_REMOTE);
util_poolset_free(*setp);
return -1;
}
ret = util_poolset_files_local(set, minpartsize, 0);
if (ret != 0)
goto err_poolset;
for (unsigned r = 0; r < set->nreplicas; r++)
if (util_replica_open(set, r, flags) != 0) {
LOG(2, "replica #%u open failed", r);
goto err_replica;
}
if (set->remote) {
ret = util_poolset_files_remote(set, 0, nlanes, 0);
if (ret != 0)
goto err_replica;
}
if (util_replica_check(set, sig, major, compat, incompat, ro_compat))
goto err_replica;
util_unmap_all_hdrs(set);
return 0;
err_replica:
LOG(4, "error clean up");
oerrno = errno;
for (unsigned r = 0; r < set->nreplicas; r++)
util_replica_close(set, r);
errno = oerrno;
err_poolset:
oerrno = errno;
util_poolset_close(set, DO_NOT_DELETE_PARTS);
errno = oerrno;
return -1;
}
int
util_pool_open_remote(struct pool_set **setp, const char *path, int cow,
size_t minpartsize, char *sig, uint32_t *major,
uint32_t *compat, uint32_t *incompat, uint32_t *ro_compat,
unsigned char *poolset_uuid, unsigned char *first_part_uuid,
unsigned char *prev_repl_uuid, unsigned char *next_repl_uuid,
unsigned char *arch_flags)
{
LOG(3, "setp %p path %s cow %d minpartsize %zu "
"sig %p major %p compat %p incompat %p ro_compat %p"
"poolset_uuid %p first_part_uuid %p"
"prev_repl_uuid %p next_repl_uuid %p arch_flags %p",
setp, path, cow, minpartsize,
sig, major, compat, incompat, ro_compat,
poolset_uuid, first_part_uuid, prev_repl_uuid, next_repl_uuid,
arch_flags);
int flags = cow ? MAP_PRIVATE|MAP_NORESERVE : MAP_SHARED;
int oerrno;
int ret = util_poolset_create_set(setp, path, 0, 0);
if (ret < 0) {
LOG(2, "cannot open pool set -- '%s'", path);
return -1;
}
if (cow && (*setp)->replica[0]->part[0].is_dev_dax) {
ERR("device dax cannot be mapped privately");
errno = ENOTSUP;
return -1;
}
struct pool_set *set = *setp;
if (set->nreplicas > 1) {
LOG(2, "remote pool set cannot have replicas");
goto err_poolset;
}
ret = util_poolset_files_local(set, minpartsize, 0);
if (ret != 0)
goto err_poolset;
if (util_replica_open(set, 0, flags) != 0) {
LOG(2, "replica open failed");
goto err_replica;
}
struct pool_replica *rep = set->replica[0];
struct pool_hdr *hdr = rep->part[0].hdr;
set->rdonly |= rep->part[0].rdonly;
for (unsigned p = 0; p < rep->nhdrs; p++) {
if (util_header_check_remote(rep, p) != 0) {
LOG(2, "header check failed - part #%d", p);
goto err_replica;
}
set->rdonly |= rep->part[p].rdonly;
}
memcpy(sig, hdr->signature, POOL_HDR_SIG_LEN);
*major = hdr->major;
*compat = hdr->compat_features;
*incompat = hdr->incompat_features;
*ro_compat = hdr->ro_compat_features;
memcpy(poolset_uuid, hdr->poolset_uuid, POOL_HDR_UUID_LEN);
memcpy(first_part_uuid, hdr->uuid, POOL_HDR_UUID_LEN);
memcpy(prev_repl_uuid, hdr->prev_repl_uuid, POOL_HDR_UUID_LEN);
memcpy(next_repl_uuid, hdr->next_repl_uuid, POOL_HDR_UUID_LEN);
memcpy(arch_flags, &hdr->arch_flags, sizeof(struct arch_flags));
for (unsigned p = 0; p < rep->nhdrs; p++)
util_unmap_hdr(&rep->part[p]);
return 0;
err_replica:
LOG(4, "error clean up");
oerrno = errno;
util_replica_close(set, 0);
errno = oerrno;
err_poolset:
oerrno = errno;
util_poolset_close(set, DO_NOT_DELETE_PARTS);
errno = oerrno;
return -1;
}
int
util_is_poolset_file(const char *path)
{
if (util_file_is_device_dax(path))
return 0;
int fd = util_file_open(path, NULL, 0, O_RDONLY);
if (fd < 0)
return -1;
int ret = 0;
ssize_t sret;
char signature[POOLSET_HDR_SIG_LEN];
size_t rd = 0;
do {
sret = util_read(fd, &signature[rd], sizeof(signature) - rd);
if (sret > 0)
rd += (size_t)sret;
} while (sret > 0);
if (sret < 0) {
ERR("!read");
ret = -1;
goto out;
} else if (rd != sizeof(signature)) {
ret = 0;
goto out;
}
if (memcmp(signature, POOLSET_HDR_SIG, POOLSET_HDR_SIG_LEN) == 0)
ret = 1;
out:
os_close(fd);
return ret;
}
int
util_poolset_foreach_part(const char *path,
int (*cb)(struct part_file *pf, void *arg), void *arg)
{
int fd = os_open(path, O_RDONLY);
if (fd < 0)
return -1;
struct pool_set *set;
int ret = util_poolset_parse(&set, path, fd);
if (ret) {
ret = -1;
goto err_close;
}
for (unsigned r = 0; r < set->nreplicas; r++) {
struct part_file part;
if (set->replica[r]->remote) {
part.is_remote = 1;
part.node_addr = set->replica[r]->remote->node_addr;
part.pool_desc = set->replica[r]->remote->pool_desc;
ret = cb(&part, arg);
if (ret)
goto out;
} else {
part.is_remote = 0;
for (unsigned p = 0; p < set->replica[r]->nparts; p++) {
part.path = set->replica[r]->part[p].path;
ret = cb(&part, arg);
if (ret)
goto out;
}
}
}
out:
ASSERTne(ret, -1);
util_poolset_free(set);
err_close:
os_close(fd);
return ret;
}
size_t
util_poolset_size(const char *path)
{
int fd = os_open(path, O_RDONLY);
if (fd < 0)
return 0;
size_t size = 0;
struct pool_set *set;
if (util_poolset_parse(&set, path, fd))
goto err_close;
size = set->poolsize;
util_poolset_free(set);
err_close:
os_close(fd);
return size;
}
void
util_replica_fdclose(struct pool_replica *rep)
{
for (unsigned p = 0; p < rep->nparts; p++) {
struct pool_set_part *part = &rep->part[p];
util_part_fdclose(part);
}
}