#include <stdio.h>
#include <stdint.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <fcntl.h>
#include <limits.h>
#include <dirent.h>
#include <assert.h>
#include "replica.h"
#include "out.h"
#include "file.h"
#include "os.h"
#include "libpmem.h"
#include "util_pmem.h"
struct poolset_compare_status
{
unsigned nreplicas;
unsigned flags;
unsigned replica[];
};
enum transform_op {
NOT_TRANSFORMABLE,
ADD_REPLICAS,
RM_REPLICAS,
ADD_HDRS,
RM_HDRS,
};
static int
check_if_part_used_once(struct pool_set *set, unsigned repn, unsigned partn)
{
LOG(3, "set %p, repn %u, partn %u", set, repn, partn);
struct pool_replica *rep = REP(set, repn);
char *path = util_part_realpath(PART(rep, partn).path);
if (path == NULL) {
LOG(1, "cannot get absolute path for %s, replica %u, part %u",
PART(rep, partn).path, repn, partn);
errno = 0;
path = strdup(PART(rep, partn).path);
if (path == NULL) {
ERR("!strdup");
return -1;
}
}
int ret = 0;
for (unsigned r = repn; r < set->nreplicas; ++r) {
struct pool_replica *repr = set->replica[r];
if (repr->remote != NULL)
continue;
unsigned i = (r == repn) ? partn + 1 : 0;
for (unsigned p = i; p < repr->nparts; ++p) {
char *pathp = util_part_realpath(PART(repr, p).path);
if (pathp == NULL) {
if (errno != ENOENT) {
ERR("realpath failed for %s, errno %d",
PART(repr, p).path, errno);
ret = -1;
goto out;
}
LOG(1, "cannot get absolute path for %s,"
" replica %u, part %u",
PART(rep, partn).path, repn,
partn);
pathp = strdup(PART(repr, p).path);
errno = 0;
}
int result = util_compare_file_inodes(path, pathp);
if (result == 0) {
ERR("some part file's path is"
" used multiple times");
ret = -1;
errno = EINVAL;
free(pathp);
goto out;
} else if (result < 0) {
ERR("comparing file inodes failed for %s and"
" %s", path, pathp);
ret = -1;
free(pathp);
goto out;
}
free(pathp);
}
}
out:
free(path);
return ret;
}
static int
check_if_remote_replica_used_once(struct pool_set *set, unsigned repn)
{
LOG(3, "set %p, repn %u", set, repn);
struct remote_replica *rep = REP(set, repn)->remote;
ASSERTne(rep, NULL);
for (unsigned r = repn + 1; r < set->nreplicas; ++r) {
if (REP(set, r)->remote == NULL)
continue;
struct remote_replica *repr = REP(set, r)->remote;
if (strcmp(rep->node_addr, repr->node_addr) == 0 &&
strcmp(rep->pool_desc, repr->pool_desc) == 0) {
ERR("remote replica %u is used multiple times", repn);
return -1;
}
}
return 0;
}
static int
check_paths(struct pool_set *set)
{
LOG(3, "set %p", set);
for (unsigned r = 0; r < set->nreplicas; ++r) {
struct pool_replica *rep = set->replica[r];
if (rep->remote != NULL) {
if (check_if_remote_replica_used_once(set, r))
return -1;
} else {
for (unsigned p = 0; p < rep->nparts; ++p) {
if (replica_check_local_part_dir(set, r, p))
return -1;
if (check_if_part_used_once(set, r, p))
return -1;
}
}
}
return 0;
}
static int
validate_args(struct pool_set *set_in, struct pool_set *set_out)
{
LOG(3, "set_in %p, set_out %p", set_in, set_out);
if (set_in->directory_based) {
ERR("transform of directory poolsets is not supported");
goto err;
}
if (replica_check_part_sizes(set_out, PMEMOBJ_MIN_POOL)) {
ERR("part sizes check failed");
goto err;
}
if (check_paths(set_out))
goto err;
if (set_out->poolsize < replica_get_pool_size(set_in, 0)) {
ERR("target poolset is too small");
goto err;
}
return 0;
err:
if (errno == 0)
errno = EINVAL;
return -1;
}
static int
create_poolset_compare_status(struct pool_set *set,
struct poolset_compare_status **set_sp)
{
LOG(3, "set %p, set_sp %p", set, set_sp);
struct poolset_compare_status *set_s;
set_s = Zalloc(sizeof(struct poolset_compare_status)
+ set->nreplicas * sizeof(unsigned));
if (set_s == NULL) {
ERR("!Zalloc for poolset status");
return -1;
}
for (unsigned r = 0; r < set->nreplicas; ++r)
set_s->replica[r] = UNDEF_REPLICA;
set_s->nreplicas = set->nreplicas;
*set_sp = set_s;
return 0;
}
static int
compare_parts(struct pool_set_part *p1, struct pool_set_part *p2)
{
LOG(3, "p1 %p, p2 %p", p1, p2);
LOG(4, "p1->path: %s, p1->filesize: %lu", p1->path, p1->filesize);
LOG(4, "p2->path: %s, p2->filesize: %lu", p2->path, p2->filesize);
return strcmp(p1->path, p2->path) || (p1->filesize != p2->filesize);
}
static int
compare_replicas(struct pool_replica *r1, struct pool_replica *r2)
{
LOG(3, "r1 %p, r2 %p", r1, r2);
LOG(4, "r1->nparts: %u, r2->nparts: %u", r1->nparts, r2->nparts);
if (r1->remote == NULL && r2->remote == NULL) {
if (r1->nparts != r2->nparts)
return 1;
for (unsigned p = 0; p < r1->nparts; ++p) {
if (compare_parts(&r1->part[p], &r2->part[p]))
return 1;
}
return 0;
}
if (r1->remote != NULL && r2->remote != NULL) {
return strcmp(r1->remote->node_addr, r2->remote->node_addr) ||
strcmp(r1->remote->pool_desc, r2->remote->pool_desc);
}
return 1;
}
static int
check_compare_poolsets_status(struct pool_set *set_in,
struct pool_set *set_out,
struct poolset_compare_status *set_in_s,
struct poolset_compare_status *set_out_s)
{
LOG(3, "set_in %p, set_out %p, set_in_s %p, set_out_s %p", set_in,
set_out, set_in_s, set_out_s);
for (unsigned ri = 0; ri < set_in->nreplicas; ++ri) {
struct pool_replica *rep_in = REP(set_in, ri);
for (unsigned ro = 0; ro < set_out->nreplicas; ++ro) {
struct pool_replica *rep_out = REP(set_out, ro);
LOG(1, "comparing rep_in %u with rep_out %u", ri, ro);
if (compare_replicas(rep_in, rep_out))
continue;
if (set_in_s->replica[ri] != UNDEF_REPLICA ||
set_out_s->replica[ro]
!= UNDEF_REPLICA) {
ERR("there are more then one corresponding"
" replicas; cannot transform");
errno = EINVAL;
return -1;
}
set_in_s->replica[ri] = ro;
set_out_s->replica[ro] = ri;
}
}
return 0;
}
static void
check_compare_poolsets_options(struct pool_set *set_in,
struct pool_set *set_out,
struct poolset_compare_status *set_in_s,
struct poolset_compare_status *set_out_s)
{
if (set_in->options & OPTION_NO_HDRS)
set_in_s->flags |= OPTION_NO_HDRS;
if (set_out->options & OPTION_NO_HDRS)
set_out_s->flags |= OPTION_NO_HDRS;
}
static int
compare_poolsets(struct pool_set *set_in, struct pool_set *set_out,
struct poolset_compare_status **set_in_s,
struct poolset_compare_status **set_out_s)
{
LOG(3, "set_in %p, set_out %p, set_in_s %p, set_out_s %p", set_in,
set_out, set_in_s, set_out_s);
if (create_poolset_compare_status(set_in, set_in_s))
return -1;
if (create_poolset_compare_status(set_out, set_out_s))
goto err_free_in;
if (check_compare_poolsets_status(set_in, set_out, *set_in_s,
*set_out_s))
goto err_free_out;
check_compare_poolsets_options(set_in, set_out, *set_in_s, *set_out_s);
return 0;
err_free_out:
Free(*set_out_s);
err_free_in:
Free(*set_in_s);
return -1;
}
static unsigned
replica_counterpart(unsigned repn,
struct poolset_compare_status *set_s)
{
return set_s->replica[repn];
}
static enum transform_op
identify_transform_operation(struct poolset_compare_status *set_in_s,
struct poolset_compare_status *set_out_s,
struct poolset_health_status *set_in_hs,
struct poolset_health_status *set_out_hs)
{
LOG(3, "set_in_s %p, set_out_s %p", set_in_s, set_out_s);
int has_replica_to_keep = 0;
int is_removing_replicas = 0;
int is_adding_replicas = 0;
for (unsigned r = 0; r < set_in_s->nreplicas; ++r) {
unsigned c = replica_counterpart(r, set_in_s);
if (c != UNDEF_REPLICA) {
LOG(2, "replica %u has a counterpart %u", r,
set_in_s->replica[r]);
has_replica_to_keep = 1;
REP(set_out_hs, c)->pool_size =
REP(set_in_hs, r)->pool_size;
} else {
LOG(2, "replica %u has no counterpart", r);
is_removing_replicas = 1;
}
}
if (!has_replica_to_keep)
return NOT_TRANSFORMABLE;
for (unsigned r = 0; r < set_out_s->nreplicas; ++r) {
if (replica_counterpart(r, set_out_s) == UNDEF_REPLICA) {
LOG(2, "Replica %u from output set has no counterpart",
r);
if (is_removing_replicas) {
LOG(2, "adding and removing replicas at the"
"same time is not allowed");
return NOT_TRANSFORMABLE;
}
REP(set_out_hs, r)->flags |= IS_BROKEN;
is_adding_replicas = 1;
}
}
if (!is_removing_replicas && !is_adding_replicas &&
(set_in_s->flags & OPTION_NO_HDRS) ==
(set_out_s->flags & OPTION_NO_HDRS)) {
LOG(2, "both poolsets are equal");
return NOT_TRANSFORMABLE;
}
if ((is_removing_replicas || is_adding_replicas) &&
(set_in_s->flags & OPTION_NO_HDRS) !=
(set_out_s->flags & OPTION_NO_HDRS)) {
LOG(2, "cannot add/remove replicas and change the NOHDRS option"
" at the same time");
return NOT_TRANSFORMABLE;
}
if (is_removing_replicas)
return RM_REPLICAS;
if (is_adding_replicas)
return ADD_REPLICAS;
if (set_out_s->flags & OPTION_NO_HDRS)
return RM_HDRS;
if (set_in_s->flags & OPTION_NO_HDRS)
return ADD_HDRS;
ASSERT(0);
return NOT_TRANSFORMABLE;
}
static int
do_added_parts_exist(struct pool_set *set,
struct poolset_health_status *set_hs)
{
for (unsigned r = 0; r < set->nreplicas; ++r) {
if (!replica_is_replica_broken(r, set_hs))
continue;
struct pool_replica *rep = REP(set, r);
if (rep->remote)
continue;
for (unsigned p = 0; p < rep->nparts; ++p) {
int oerrno = errno;
if (os_access(rep->part[p].path, F_OK) == 0 &&
!rep->part[p].is_dev_dax) {
LOG(1, "part file %s exists",
rep->part[p].path);
return 1;
}
errno = oerrno;
}
}
return 0;
}
static int
delete_replicas(struct pool_set *set, struct poolset_compare_status *set_s)
{
LOG(3, "set %p, set_s %p", set, set_s);
for (unsigned r = 0; r < set->nreplicas; ++r) {
struct pool_replica *rep = REP(set, r);
if (replica_counterpart(r, set_s) == UNDEF_REPLICA) {
if (!rep->remote) {
if (util_replica_close_local(rep, r,
DELETE_ALL_PARTS))
return -1;
} else {
if (util_replica_close_remote(rep, r,
DELETE_ALL_PARTS))
return -1;
}
}
}
return 0;
}
static void
copy_replica_data_fw(struct pool_set *set_dst, struct pool_set *set_src,
unsigned repn)
{
LOG(3, "set_in %p, set_out %p, repn %u", set_src, set_dst, repn);
size_t len = replica_get_pool_size(set_src, repn) - POOL_HDR_SIZE -
replica_get_part_data_len(set_src, repn, 0);
void *src = PART(REP(set_src, repn), 1).addr;
void *dst = PART(REP(set_dst, repn), 1).addr;
size_t count = len / POOL_HDR_SIZE;
while (count-- > 0) {
pmem_memcpy_persist(dst, src, POOL_HDR_SIZE);
src = ADDR_SUM(src, POOL_HDR_SIZE);
dst = ADDR_SUM(dst, POOL_HDR_SIZE);
}
}
static void
copy_replica_data_bw(struct pool_set *set_dst, struct pool_set *set_src,
unsigned repn)
{
LOG(3, "set_in %p, set_out %p, repn %u", set_src, set_dst, repn);
size_t len = replica_get_pool_size(set_src, repn) - POOL_HDR_SIZE -
replica_get_part_data_len(set_src, repn, 0);
size_t count = len / POOL_HDR_SIZE;
void *src = ADDR_SUM(PART(REP(set_src, repn), 1).addr, len);
void *dst = ADDR_SUM(PART(REP(set_dst, repn), 1).addr, len);
while (count-- > 0) {
src = ADDR_SUM(src, -(ssize_t)POOL_HDR_SIZE);
dst = ADDR_SUM(dst, -(ssize_t)POOL_HDR_SIZE);
pmem_memcpy_persist(dst, src, POOL_HDR_SIZE);
}
}
static int
create_missing_headers(struct pool_set *set, unsigned repn)
{
LOG(3, "set %p, repn %u", set, repn);
struct pool_hdr *src_hdr = HDR(REP(set, repn), 0);
for (unsigned p = 1; p < set->replica[repn]->nhdrs; ++p) {
if (util_header_create(set, repn, p,
src_hdr->signature, src_hdr->major,
src_hdr->compat_features,
src_hdr->incompat_features &
(uint32_t)(~POOL_FEAT_NOHDRS),
src_hdr->ro_compat_features,
NULL, NULL, NULL, 1) != 0) {
LOG(1, "part headers create failed for"
" replica %u part %u", repn, p);
errno = EINVAL;
return -1;
}
}
return 0;
}
static void
update_replica_header(struct pool_set *set, unsigned repn)
{
LOG(3, "set %p, repn %u", set, repn);
struct pool_set_part part = PART(REP(set, repn), 0);
struct pool_hdr *hdr = (struct pool_hdr *)part.hdr;
if (set->options & OPTION_NO_HDRS) {
hdr->incompat_features |= POOL_FEAT_NOHDRS;
memcpy(hdr->next_part_uuid, hdr->uuid, POOL_HDR_UUID_LEN);
memcpy(hdr->prev_part_uuid, hdr->uuid, POOL_HDR_UUID_LEN);
} else {
hdr->incompat_features &= (uint32_t)(~POOL_FEAT_NOHDRS);
}
util_checksum(hdr, sizeof(*hdr), &hdr->checksum, 1);
util_persist_auto(part.is_dev_dax, hdr, sizeof(*hdr));
}
static int
fill_replica_struct_uuids(struct pool_set *set, unsigned repn)
{
LOG(3, "set %p, repn %u", set, repn);
struct pool_replica *rep = REP(set, repn);
memcpy(PART(rep, 0).uuid, HDR(rep, 0)->uuid, POOL_HDR_UUID_LEN);
for (unsigned p = 1; p < rep->nhdrs; ++p) {
if (util_uuid_generate(rep->part[p].uuid) < 0) {
ERR("cannot generate part UUID");
errno = EINVAL;
return -1;
}
}
return 0;
}
static void
update_uuids(struct pool_set *set, unsigned repn)
{
LOG(3, "set %p, repn %u", set, repn);
struct pool_replica *rep = REP(set, repn);
struct pool_hdr *hdr0 = HDR(rep, 0);
for (unsigned p = 0; p < rep->nhdrs; ++p) {
struct pool_hdr *hdrp = HDR(rep, p);
memcpy(hdrp->next_part_uuid, PARTN(rep, p).uuid,
POOL_HDR_UUID_LEN);
memcpy(hdrp->prev_part_uuid, PARTP(rep, p).uuid,
POOL_HDR_UUID_LEN);
memcpy(hdrp->next_repl_uuid, hdr0->next_repl_uuid,
POOL_HDR_UUID_LEN);
memcpy(hdrp->prev_repl_uuid, hdr0->prev_repl_uuid,
POOL_HDR_UUID_LEN);
memcpy(hdrp->poolset_uuid, hdr0->poolset_uuid,
POOL_HDR_UUID_LEN);
util_checksum(hdrp, sizeof(*hdrp), &hdrp->checksum, 1);
util_persist(PART(rep, p).is_dev_dax, hdrp, sizeof(*hdrp));
}
}
static void
copy_part_fds(struct pool_set *set_dst, struct pool_set *set_src)
{
ASSERTeq(set_src->nreplicas, set_dst->nreplicas);
for (unsigned r = 0; r < set_dst->nreplicas; ++r) {
ASSERTeq(REP(set_src, r)->nparts, REP(set_dst, r)->nparts);
for (unsigned p = 0; p < REP(set_dst, r)->nparts; ++p) {
PART(REP(set_dst, r), p).fd =
PART(REP(set_src, r), p).fd;
}
}
}
static int
remove_hdrs_replica(struct pool_set *set_in, struct pool_set *set_out,
unsigned repn)
{
LOG(3, "set %p, repn %u", set_in, repn);
int ret = 0;
if (replica_open_replica_part_files(set_in, repn)) {
LOG(1, "opening replica %u, part files failed", repn);
ret = -1;
goto out;
}
copy_part_fds(set_out, set_in);
if (util_replica_open(set_in, repn, MAP_SHARED)) {
LOG(1, "opening input replica failed: replica %u", repn);
ret = -1;
goto out_close;
}
if (util_replica_open(set_out, repn, MAP_SHARED)) {
LOG(1, "opening output replica failed: replica %u", repn);
ret = -1;
goto out_unmap_in;
}
if (REP(set_in, repn)->nparts > 1)
copy_replica_data_fw(set_out, set_in, repn);
update_replica_header(set_out, repn);
util_replica_close(set_out, repn);
out_unmap_in:
util_replica_close(set_in, repn);
out_close:
util_replica_fdclose(REP(set_in, repn));
out:
return ret;
}
static int
add_hdrs_replica(struct pool_set *set_in, struct pool_set *set_out,
unsigned repn)
{
LOG(3, "set %p, repn %u", set_in, repn);
int ret = 0;
if (replica_open_replica_part_files(set_in, repn)) {
LOG(1, "opening replica %u, part files failed", repn);
ret = -1;
goto out;
}
copy_part_fds(set_out, set_in);
if (util_replica_open(set_in, repn, MAP_SHARED)) {
LOG(1, "opening input replica failed: replica %u", repn);
ret = -1;
goto out_close;
}
if (util_replica_open(set_out, repn, MAP_SHARED)) {
LOG(1, "opening output replica failed: replica %u", repn);
ret = -1;
goto out_unmap_in;
}
if (fill_replica_struct_uuids(set_out, repn)) {
LOG(1, "generating lacking uuids for parts failed: replica %u",
repn);
ret = -1;
goto out_unmap_out;
}
if (REP(set_in, repn)->nparts > 1)
copy_replica_data_bw(set_out, set_in, repn);
if (create_missing_headers(set_out, repn)) {
LOG(1, "creating lacking headers failed: replica %u", repn);
if (REP(set_in, repn)->nparts > 1)
copy_replica_data_fw(set_in, set_out, repn);
ret = -1;
goto out_unmap_out;
}
update_replica_header(set_out, repn);
update_uuids(set_out, repn);
out_unmap_out:
util_replica_close(set_out, repn);
out_unmap_in:
util_replica_close(set_in, repn);
out_close:
util_replica_fdclose(REP(set_in, repn));
out:
return ret;
}
static int
remove_hdrs(struct pool_set *set_in, struct pool_set *set_out,
struct poolset_health_status *set_in_hs, unsigned flags)
{
LOG(3, "set_in %p, set_out %p, set_in_hs %p, flags %u",
set_in, set_out, set_in_hs, flags);
for (unsigned r = 0; r < set_in->nreplicas; ++r) {
if (remove_hdrs_replica(set_in, set_out, r)) {
LOG(1, "removing headers from replica %u failed", r);
while (--r < set_in->nreplicas)
REP(set_in_hs, r)->flags |= IS_BROKEN;
return -1;
}
}
return 0;
}
static int
add_hdrs(struct pool_set *set_in, struct pool_set *set_out,
struct poolset_health_status *set_in_hs,
unsigned flags)
{
LOG(3, "set_in %p, set_out %p, set_in_hs %p, flags %u",
set_in, set_out, set_in_hs, flags);
for (unsigned r = 0; r < set_in->nreplicas; ++r) {
if (add_hdrs_replica(set_in, set_out, r)) {
LOG(1, "adding headers to replica %u failed", r);
while (--r < set_in->nreplicas)
REP(set_in_hs, r)->flags |= IS_BROKEN;
return -1;
}
}
return 0;
}
int
replica_transform(struct pool_set *set_in, struct pool_set *set_out,
unsigned flags)
{
LOG(3, "set_in %p, set_out %p", set_in, set_out);
int ret = 0;
if (validate_args(set_in, set_out))
return -1;
struct poolset_health_status *set_in_hs = NULL;
if (replica_check_poolset_health(set_in, &set_in_hs, flags)) {
ERR("source poolset health check failed");
return -1;
}
if (!replica_is_poolset_healthy(set_in_hs)) {
ERR("source poolset is broken");
ret = -1;
goto free_hs_in;
}
struct poolset_health_status *set_out_hs = NULL;
if (replica_create_poolset_health_status(set_out, &set_out_hs)) {
ERR("creating poolset health status failed");
ret = -1;
goto free_hs_in;
}
struct poolset_compare_status *set_in_cs = NULL;
struct poolset_compare_status *set_out_cs = NULL;
if (compare_poolsets(set_in, set_out, &set_in_cs, &set_out_cs)) {
ERR("comparing poolsets failed");
ret = -1;
goto free_hs_out;
}
enum transform_op operation = identify_transform_operation(set_in_cs,
set_out_cs, set_in_hs, set_out_hs);
if (operation == NOT_TRANSFORMABLE) {
ERR("poolsets are not transformable");
ret = -1;
errno = EINVAL;
goto free_cs;
}
if (operation == RM_HDRS) {
if (!is_dry_run(flags) &&
remove_hdrs(set_in, set_out, set_in_hs,
flags)) {
ERR("removing headers failed; falling back to the "
"input poolset");
if (replica_sync(set_in, set_in_hs,
flags | IS_TRANSFORMED)) {
LOG(1, "falling back to the input poolset "
"failed");
} else {
LOG(1, "falling back to the input poolset "
"succeeded");
}
ret = -1;
}
goto free_cs;
}
if (operation == ADD_HDRS) {
if (!is_dry_run(flags) &&
add_hdrs(set_in, set_out, set_in_hs, flags)) {
ERR("adding headers failed; falling back to the "
"input poolset");
if (replica_sync(set_in, set_in_hs,
flags | IS_TRANSFORMED)) {
LOG(1, "falling back to the input poolset "
"failed");
} else {
LOG(1, "falling back to the input poolset "
"succeeded");
}
ret = -1;
}
goto free_cs;
}
if (operation == ADD_REPLICAS) {
if (do_added_parts_exist(set_out, set_out_hs)) {
ERR("some parts being added already exist");
ret = -1;
errno = EINVAL;
goto free_cs;
}
}
if (replica_sync(set_out, set_out_hs, flags | IS_TRANSFORMED)) {
ret = -1;
goto free_cs;
}
if (operation == RM_REPLICAS) {
if (!is_dry_run(flags) && delete_replicas(set_in, set_in_cs))
ret = -1;
}
free_cs:
Free(set_in_cs);
Free(set_out_cs);
free_hs_out:
replica_free_poolset_health_status(set_out_hs);
free_hs_in:
replica_free_poolset_health_status(set_in_hs);
return ret;
}