#include "replica.h"
#include <errno.h>
#include <sys/mman.h>
#include <errno.h>
#include <fcntl.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include <libgen.h>
#include "obj.h"
#include "palloc.h"
#include "file.h"
#include "os.h"
#include "out.h"
#include "pool_hdr.h"
#include "set.h"
#include "util.h"
#include "uuid.h"
static int
check_flags_sync(unsigned flags)
{
flags &= ~(unsigned)PMEMPOOL_DRY_RUN;
return flags > 0;
}
static int
check_flags_transform(unsigned flags)
{
flags &= ~(unsigned)PMEMPOOL_DRY_RUN;
return flags > 0;
}
size_t
replica_get_part_data_len(struct pool_set *set_in, unsigned repn,
unsigned partn)
{
size_t hdrsize = (set_in->options & OPTION_NO_HDRS) ? 0 : Mmap_align;
return MMAP_ALIGN_DOWN(
set_in->replica[repn]->part[partn].filesize) -
((partn == 0) ? POOL_HDR_SIZE : hdrsize);
}
size_t
replica_get_part_range_data_len(struct pool_set *set, unsigned repn,
unsigned pstart, unsigned pend)
{
LOG(3, "set %p, repn %u, pstart %u, pend %u", set, repn, pstart, pend);
size_t len = 0;
for (unsigned p = pstart; p < pend; ++p)
len += replica_get_part_data_len(set, repn, p);
return len;
}
uint64_t
replica_get_part_data_offset(struct pool_set *set, unsigned repn,
unsigned partn)
{
return replica_get_part_range_data_len(set, repn, 0, partn) +
POOL_HDR_SIZE;
}
int
replica_remove_part(struct pool_set *set, unsigned repn, unsigned partn)
{
LOG(3, "set %p, repn %u, partn %u", set, repn, partn);
struct pool_set_part *part = &PART(REP(set, repn), partn);
if (part->fd != -1) {
os_close(part->fd);
part->fd = -1;
}
int olderrno = errno;
if (util_unlink(part->path)) {
if (errno != ENOENT) {
ERR("!removing part %u from replica %u failed",
partn, repn);
return -1;
}
}
errno = olderrno;
LOG(4, "Removed part %s number %u from replica %u", part->path, partn,
repn);
return 0;
}
static struct replica_health_status *
create_replica_health_status(struct pool_set *set, unsigned repn)
{
LOG(3, "set %p, repn %u", set, repn);
unsigned nparts = set->replica[repn]->nparts;
struct replica_health_status *replica_hs;
replica_hs = Zalloc(sizeof(struct replica_health_status)
+ nparts * sizeof(unsigned));
if (replica_hs == NULL) {
ERR("!Zalloc for replica health status");
return NULL;
}
replica_hs->nparts = nparts;
replica_hs->nhdrs = set->replica[repn]->nhdrs;
return replica_hs;
}
void
replica_free_poolset_health_status(struct poolset_health_status *set_hs)
{
LOG(3, "set_hs %p", set_hs);
for (unsigned i = 0; i < set_hs->nreplicas; ++i) {
Free(set_hs->replica[i]);
}
Free(set_hs);
}
int
replica_create_poolset_health_status(struct pool_set *set,
struct poolset_health_status **set_hsp)
{
LOG(3, "set %p, set_hsp %p", set, set_hsp);
unsigned nreplicas = set->nreplicas;
struct poolset_health_status *set_hs;
set_hs = Zalloc(sizeof(struct poolset_health_status) +
nreplicas * sizeof(struct replica_health_status *));
if (set_hs == NULL) {
ERR("!Zalloc for poolset health state");
return -1;
}
set_hs->nreplicas = nreplicas;
for (unsigned i = 0; i < nreplicas; ++i) {
struct replica_health_status *replica_hs =
create_replica_health_status(set, i);
if (replica_hs == NULL) {
replica_free_poolset_health_status(set_hs);
return -1;
}
set_hs->replica[i] = replica_hs;
}
*set_hsp = set_hs;
return 0;
}
int
replica_is_part_broken(unsigned repn, unsigned partn,
struct poolset_health_status *set_hs)
{
return (REP(set_hs, repn)->flags & IS_BROKEN) ||
(PART(REP(set_hs, repn), partn) & IS_BROKEN);
}
int
replica_is_replica_broken(unsigned repn, struct poolset_health_status *set_hs)
{
LOG(3, "repn %u, set_hs %p", repn, set_hs);
struct replica_health_status *r_hs = REP(set_hs, repn);
if (r_hs->flags & IS_BROKEN)
return 1;
for (unsigned p = 0; p < r_hs->nparts; ++p) {
if (replica_is_part_broken(repn, p, set_hs))
return 1;
}
return 0;
}
int
replica_is_replica_consistent(unsigned repn,
struct poolset_health_status *set_hs)
{
return !(REP(set_hs, repn)->flags & IS_INCONSISTENT);
}
int
replica_is_replica_healthy(unsigned repn,
struct poolset_health_status *set_hs)
{
return !replica_is_replica_broken(repn, set_hs) &&
replica_is_replica_consistent(repn, set_hs);
}
int
replica_is_poolset_healthy(struct poolset_health_status *set_hs)
{
LOG(3, "set_hs %p", set_hs);
for (unsigned r = 0; r < set_hs->nreplicas; ++r) {
if (!replica_is_replica_healthy(r, set_hs))
return 0;
}
return 1;
}
int
replica_is_poolset_transformed(unsigned flags)
{
return flags & IS_TRANSFORMED;
}
unsigned
replica_find_unbroken_part(unsigned repn, struct poolset_health_status *set_hs)
{
LOG(3, "repn %u, set_hs %p", repn, set_hs);
for (unsigned p = 0; p < REP(set_hs, repn)->nparts; ++p) {
if (!replica_is_part_broken(repn, p, set_hs))
return p;
}
return UNDEF_PART;
}
unsigned
replica_find_healthy_replica(struct poolset_health_status *set_hs)
{
LOG(3, "set_hs %p", set_hs);
if (set_hs->nreplicas == 1) {
return replica_is_replica_broken(0, set_hs) ? UNDEF_REPLICA : 0;
} else {
for (unsigned r = 0; r < set_hs->nreplicas; ++r) {
if (replica_is_replica_healthy(r, set_hs))
return r;
}
return UNDEF_REPLICA;
}
}
static int
replica_check_store_size(struct pool_set *set,
struct poolset_health_status *set_hs, unsigned repn)
{
LOG(3, "set %p, set_hs %p, repn %u", set, set_hs, repn);
struct pool_replica *rep = set->replica[repn];
struct pmemobjpool pop;
if (rep->remote) {
memcpy(&pop.hdr, rep->part[0].hdr, sizeof(pop.hdr));
void *descr = (void *)((uintptr_t)&pop + POOL_HDR_SIZE);
if (Rpmem_read(rep->remote->rpp, descr, 0,
sizeof(pop) - POOL_HDR_SIZE, 0)) {
return -1;
}
} else {
if (util_map_part(&rep->part[0], NULL,
MMAP_ALIGN_UP(sizeof(pop)), 0, MAP_SHARED, 1)) {
return -1;
}
memcpy(&pop, rep->part[0].addr, sizeof(pop));
util_unmap_part(&rep->part[0]);
}
void *dscp = (void *)((uintptr_t)&pop + sizeof(pop.hdr));
if (!util_checksum(dscp, OBJ_DSC_P_SIZE, &pop.checksum, 0)) {
set_hs->replica[repn]->flags |= IS_BROKEN;
return 0;
}
set_hs->replica[repn]->pool_size = pop.heap_offset + pop.heap_size;
return 0;
}
static int
check_store_all_sizes(struct pool_set *set,
struct poolset_health_status *set_hs)
{
LOG(3, "set %p, set_hs %p", set, set_hs);
for (unsigned r = 0; r < set->nreplicas; ++r) {
if (!replica_is_replica_healthy(r, set_hs))
continue;
if (replica_check_store_size(set, set_hs, r))
return -1;
}
return 0;
}
static int
check_and_open_poolset_part_files(struct pool_set *set,
struct poolset_health_status *set_hs, unsigned flags)
{
LOG(3, "set %p, set_hs %p, flags %u", set, set_hs, flags);
for (unsigned r = 0; r < set->nreplicas; ++r) {
struct pool_replica *rep = set->replica[r];
struct replica_health_status *rep_hs = set_hs->replica[r];
if (rep->remote) {
if (util_replica_open_remote(set, r, 0)) {
LOG(1, "cannot open remote replica no %u", r);
return -1;
}
unsigned nlanes = REMOTE_NLANES;
int ret = util_poolset_remote_open(rep, r,
rep->repsize, 0,
rep->part[0].addr,
rep->part[0].size, &nlanes);
if (ret)
rep_hs->flags |= IS_BROKEN;
continue;
}
for (unsigned p = 0; p < rep->nparts; ++p) {
if (os_access(rep->part[p].path, R_OK|W_OK) != 0) {
LOG(1, "part file %s is not accessible",
rep->part[p].path);
errno = 0;
rep_hs->part[p] |= IS_BROKEN;
if (is_dry_run(flags))
continue;
}
if (util_part_open(&rep->part[p], 0, 0)) {
LOG(1, "opening part %s failed",
rep->part[p].path);
errno = 0;
rep_hs->part[p] |= IS_BROKEN;
}
}
}
return 0;
}
static int
map_all_unbroken_headers(struct pool_set *set,
struct poolset_health_status *set_hs)
{
LOG(3, "set %p, set_hs %p", set, set_hs);
for (unsigned r = 0; r < set->nreplicas; ++r) {
struct pool_replica *rep = set->replica[r];
struct replica_health_status *rep_hs = set_hs->replica[r];
if (rep->remote)
continue;
for (unsigned p = 0; p < rep->nhdrs; ++p) {
if (replica_is_part_broken(r, p, set_hs))
continue;
LOG(4, "mapping header for part %u, replica %u", p, r);
if (util_map_hdr(&rep->part[p], MAP_SHARED, 0) != 0) {
LOG(1, "header mapping failed - part #%d", p);
rep_hs->part[p] |= IS_BROKEN;
}
}
}
return 0;
}
static int
unmap_all_headers(struct pool_set *set)
{
LOG(3, "set %p", set);
for (unsigned r = 0; r < set->nreplicas; ++r) {
struct pool_replica *rep = set->replica[r];
util_replica_close(set, r);
if (rep->remote && rep->remote->rpp) {
Rpmem_close(rep->remote->rpp);
rep->remote->rpp = NULL;
}
}
return 0;
}
static int
check_checksums(struct pool_set *set, struct poolset_health_status *set_hs)
{
LOG(3, "set %p, set_hs %p", set, set_hs);
for (unsigned r = 0; r < set->nreplicas; ++r) {
struct pool_replica *rep = REP(set, r);
struct replica_health_status *rep_hs = REP(set_hs, r);
if (rep->remote)
continue;
for (unsigned p = 0; p < rep->nhdrs; ++p) {
if (replica_is_part_broken(r, p, set_hs))
continue;
LOG(4, "checking checksum for part %u, replica %u",
p, r);
struct pool_hdr *hdr;
if (rep->remote) {
hdr = rep->part[p].remote_hdr;
} else {
hdr = HDR(rep, p);
}
if (!util_checksum(hdr, sizeof(*hdr),
&hdr->checksum, 0)) {;
ERR("invalid checksum of pool header");
rep_hs->part[p] |= IS_BROKEN;
} else if (util_is_zeroed(hdr, sizeof(*hdr))) {
rep_hs->part[p] |= IS_BROKEN;
}
}
}
return 0;
}
static int
check_uuids_between_parts(struct pool_set *set, unsigned repn,
struct poolset_health_status *set_hs)
{
LOG(3, "set %p, repn %u, set_hs %p", set, repn, set_hs);
struct pool_replica *rep = REP(set, repn);
LOG(4, "checking consistency of poolset uuid in replica %u", repn);
uuid_t poolset_uuid;
int uuid_stored = 0;
unsigned part_stored = UNDEF_PART;
for (unsigned p = 0; p < rep->nhdrs; ++p) {
if (replica_is_part_broken(repn, p, set_hs))
continue;
if (!uuid_stored) {
memcpy(poolset_uuid, HDR(rep, p)->poolset_uuid,
POOL_HDR_UUID_LEN);
uuid_stored = 1;
part_stored = p;
continue;
}
if (uuidcmp(HDR(rep, p)->poolset_uuid, poolset_uuid)) {
ERR("different poolset uuids in parts from the same"
" replica (repn %u, parts %u and %u); cannot"
" synchronize", repn, part_stored, p);
errno = EINVAL;
return -1;
}
}
LOG(4, "checking consistency of adjacent replicas' uuids in replica %u",
repn);
unsigned unbroken_p = UNDEF_PART;
for (unsigned p = 0; p < rep->nhdrs; ++p) {
if (replica_is_part_broken(repn, p, set_hs))
continue;
if (unbroken_p == UNDEF_PART) {
unbroken_p = p;
continue;
}
struct pool_hdr *hdrp = HDR(rep, p);
int prev_differ = uuidcmp(HDR(rep, unbroken_p)->prev_repl_uuid,
hdrp->prev_repl_uuid);
int next_differ = uuidcmp(HDR(rep, unbroken_p)->next_repl_uuid,
hdrp->next_repl_uuid);
if (prev_differ || next_differ) {
ERR("different adjacent replica UUID between parts"
" (repn %u, parts %u and %u);"
" cannot synchronize", repn, unbroken_p, p);
errno = EINVAL;
return -1;
}
}
LOG(4, "checking parts linkage in replica %u", repn);
for (unsigned p = 0; p < rep->nhdrs; ++p) {
if (replica_is_part_broken(repn, p, set_hs))
continue;
struct pool_hdr *hdrp = HDR(rep, p);
struct pool_hdr *next_hdrp = HDRN(rep, p);
int next_is_broken = replica_is_part_broken(repn, p + 1,
set_hs);
if (!next_is_broken) {
int next_decoupled =
uuidcmp(next_hdrp->prev_part_uuid,
hdrp->uuid) ||
uuidcmp(hdrp->next_part_uuid, next_hdrp->uuid);
if (next_decoupled) {
ERR("two consecutive unbroken parts are not"
" linked to each other (repn %u, parts"
" %u and %u); cannot synchronize",
repn, p, p + 1);
errno = EINVAL;
return -1;
}
}
}
return 0;
}
static int
check_replicas_consistency(struct pool_set *set,
struct poolset_health_status *set_hs)
{
LOG(3, "set %p, set_hs %p", set, set_hs);
for (unsigned r = 0; r < set->nreplicas; ++r) {
if (check_uuids_between_parts(set, r, set_hs))
return -1;
}
return 0;
}
static int
check_replica_options(struct pool_set *set, unsigned repn,
struct poolset_health_status *set_hs)
{
LOG(3, "set %p, repn %u, set_hs %p", set, repn, set_hs);
struct pool_replica *rep = REP(set, repn);
struct replica_health_status *rep_hs = REP(set_hs, repn);
for (unsigned p = 0; p < rep->nhdrs; ++p) {
if (replica_is_part_broken(repn, p, set_hs))
continue;
struct pool_hdr *hdr = HDR(rep, p);
if (((hdr->incompat_features & POOL_FEAT_NOHDRS) == 0) !=
((set->options & OPTION_NO_HDRS) == 0)) {
LOG(1, "improper options are set in part %u's header in"
" replica %u", p, repn);
rep_hs->part[p] |= IS_BROKEN;
}
}
return 0;
}
static int
check_options(struct pool_set *set, struct poolset_health_status *set_hs)
{
LOG(3, "set %p, set_hs %p", set, set_hs);
for (unsigned r = 0; r < set->nreplicas; ++r) {
if (check_replica_options(set, r, set_hs))
return -1;
}
return 0;
}
static int
check_replica_poolset_uuids(struct pool_set *set, unsigned repn,
uuid_t poolset_uuid, struct poolset_health_status *set_hs)
{
LOG(3, "set %p, repn %u, poolset_uuid %p, set_hs %p", set, repn,
poolset_uuid, set_hs);
struct pool_replica *rep = REP(set, repn);
for (unsigned p = 0; p < rep->nhdrs; ++p) {
if (replica_is_part_broken(repn, p, set_hs))
continue;
if (uuidcmp(HDR(rep, p)->poolset_uuid, poolset_uuid)) {
return -1;
} else {
break;
}
}
return 0;
}
static int
check_poolset_uuids(struct pool_set *set,
struct poolset_health_status *set_hs)
{
LOG(3, "set %p, set_hs %p", set, set_hs);
unsigned r_h = replica_find_healthy_replica(set_hs);
if (r_h == UNDEF_REPLICA) {
ERR("no healthy replica. Cannot synchronize.");
return -1;
}
uuid_t poolset_uuid;
memcpy(poolset_uuid, HDR(REP(set, r_h), 0)->poolset_uuid,
POOL_HDR_UUID_LEN);
for (unsigned r = 0; r < set->nreplicas; ++r) {
if (!replica_is_replica_consistent(r, set_hs) || r == r_h)
continue;
if (check_replica_poolset_uuids(set, r, poolset_uuid, set_hs)) {
ERR("inconsistent poolset uuids between replicas %u and"
" %u; cannot synchronize", r_h, r);
return -1;
}
}
return 0;
}
static int
get_replica_uuid(struct pool_replica *rep, unsigned repn,
struct poolset_health_status *set_hs, uuid_t **uuidpp)
{
unsigned nhdrs = REP(set_hs, repn)->nhdrs;
if (!replica_is_part_broken(repn, 0, set_hs)) {
*uuidpp = &HDR(rep, 0)->uuid;
return 0;
} else if (nhdrs > 1 && !replica_is_part_broken(repn, 1, set_hs)) {
*uuidpp = &HDR(rep, 1)->prev_part_uuid;
return 0;
} else if (nhdrs > 1 &&
!replica_is_part_broken(repn, nhdrs - 1, set_hs)) {
*uuidpp = &HDR(rep, nhdrs - 1)->next_part_uuid;
return 0;
} else {
return -1;
}
}
static int
check_uuids_between_replicas(struct pool_set *set,
struct poolset_health_status *set_hs)
{
LOG(3, "set %p, set_hs %p", set, set_hs);
for (unsigned r = 0; r < set->nreplicas; ++r) {
if (!replica_is_replica_consistent(r, set_hs) ||
!replica_is_replica_consistent(r + 1, set_hs))
continue;
struct pool_replica *rep = REP(set, r);
struct pool_replica *rep_n = REPN(set, r);
uuid_t *rep_uuidp;
uuid_t *rep_n_uuidp;
if (get_replica_uuid(rep, r, set_hs, &rep_uuidp)) {
LOG(2, "cannot get replica uuid, replica %u", r);
continue;
}
unsigned r_n = REPNidx(set_hs, r);
if (get_replica_uuid(rep_n, r_n, set_hs, &rep_n_uuidp)) {
LOG(2, "cannot get replica uuid, replica %u", r_n);
continue;
}
unsigned p = replica_find_unbroken_part(r, set_hs);
unsigned p_n = replica_find_unbroken_part(r_n, set_hs);
if (p == UNDEF_PART || p_n == UNDEF_PART) {
LOG(2, "cannot compare uuids between replicas %u and"
" %u", r, r_n);
continue;
}
if (uuidcmp(*rep_uuidp, HDR(rep_n, p_n)->prev_repl_uuid) ||
uuidcmp(*rep_n_uuidp,
HDR(rep, p)->next_repl_uuid)) {
ERR("inconsistent replica uuids between replicas %u and"
" %u", r, r_n);
return -1;
}
}
return 0;
}
static int
check_replica_cycles(struct pool_set *set,
struct poolset_health_status *set_hs)
{
LOG(3, "set %p, set_hs %p", set, set_hs);
unsigned first_healthy;
unsigned count_healthy = 0;
for (unsigned r = 0; r < set->nreplicas; ++r) {
if (!replica_is_replica_healthy(r, set_hs)) {
count_healthy = 0;
continue;
}
if (count_healthy == 0)
first_healthy = r;
++count_healthy;
struct pool_hdr *hdrh =
PART(REP(set, first_healthy), 0).hdr;
struct pool_hdr *hdr = PART(REP(set, r), 0).hdr;
if (uuidcmp(hdrh->uuid, hdr->next_repl_uuid) == 0 &&
count_healthy < set->nreplicas) {
ERR("there exist healthy replicas which come"
" from a different poolset file");
return -1;
}
}
return 0;
}
static int
check_replica_sizes(struct pool_set *set, struct poolset_health_status *set_hs)
{
LOG(3, "set %p, set_hs %p", set, set_hs);
unsigned healthy_replica = replica_find_healthy_replica(set_hs);
if (set->poolsize < replica_get_pool_size(set, healthy_replica)) {
ERR("some replicas are too small to hold synchronized data");
return -1;
}
return 0;
}
int
replica_check_poolset_health(struct pool_set *set,
struct poolset_health_status **set_hsp, unsigned flags)
{
LOG(3, "set %p, set_hsp %p, flags %u", set, set_hsp, flags);
if (replica_create_poolset_health_status(set, set_hsp)) {
LOG(1, "creating poolset health status failed");
return -1;
}
struct poolset_health_status *set_hs = *set_hsp;
check_and_open_poolset_part_files(set, set_hs, flags);
map_all_unbroken_headers(set, set_hs);
check_checksums(set, set_hs);
if (check_options(set, set_hs)) {
LOG(1, "flags check failed");
goto err;
}
if (check_replicas_consistency(set, set_hs)) {
LOG(1, "replica consistency check failed");
goto err;
}
if (check_poolset_uuids(set, set_hs)) {
LOG(1, "poolset uuids check failed");
goto err;
}
if (check_uuids_between_replicas(set, set_hs)) {
LOG(1, "replica uuids check failed");
goto err;
}
if (check_replica_cycles(set, set_hs)) {
LOG(1, "replica cycles check failed");
goto err;
}
if (check_replica_sizes(set, set_hs)) {
LOG(1, "replica sizes check failed");
goto err;
}
if (check_store_all_sizes(set, set_hs)) {
LOG(1, "reading pool sizes failed");
goto err;
}
unmap_all_headers(set);
util_poolset_fdclose_always(set);
return 0;
err:
unmap_all_headers(set);
util_poolset_fdclose_always(set);
replica_free_poolset_health_status(set_hs);
return -1;
}
size_t
replica_get_pool_size(struct pool_set *set, unsigned repn)
{
LOG(3, "set %p, repn %u", set, repn);
struct pool_set_part *part = &PART(REP(set, repn), 0);
int should_close_part = 0;
int should_unmap_part = 0;
if (part->fd == -1) {
if (util_part_open(part, 0, 0))
return set->poolsize;
should_close_part = 1;
}
if (part->addr == NULL) {
if (util_map_part(part, NULL,
MMAP_ALIGN_UP(sizeof(PMEMobjpool)), 0,
MAP_SHARED, 1)) {
util_part_fdclose(part);
return set->poolsize;
}
should_unmap_part = 1;
}
PMEMobjpool *pop = (PMEMobjpool *)part->addr;
size_t ret = pop->heap_offset + pop->heap_size;
if (should_unmap_part)
util_unmap_part(part);
if (should_close_part)
util_part_fdclose(part);
return ret;
}
int
replica_check_part_sizes(struct pool_set *set, size_t min_size)
{
LOG(3, "set %p, min_size %zu", set, min_size);
for (unsigned r = 0; r < set->nreplicas; ++r) {
struct pool_replica *rep = set->replica[r];
if (rep->remote != NULL)
continue;
for (unsigned p = 0; p < rep->nparts; ++p) {
if (PART(rep, p).filesize < min_size) {
ERR("replica %u, part %u: file is too small",
r, p);
errno = EINVAL;
return -1;
}
}
}
return 0;
}
int
replica_check_local_part_dir(struct pool_set *set, unsigned repn,
unsigned partn)
{
LOG(3, "set %p, repn %u, partn %u", set, repn, partn);
char *path = Strdup(PART(REP(set, repn), partn).path);
const char *dir = dirname(path);
os_stat_t sb;
if (os_stat(dir, &sb) != 0 || !(sb.st_mode & S_IFDIR)) {
ERR("directory %s for part %u in replica %u"
" does not exist or is not accessible",
path, partn, repn);
Free(path);
return -1;
}
Free(path);
return 0;
}
int
replica_check_part_dirs(struct pool_set *set)
{
LOG(3, "set %p", set);
for (unsigned r = 0; r < set->nreplicas; ++r) {
struct pool_replica *rep = set->replica[r];
if (rep->remote != NULL)
continue;
for (unsigned p = 0; p < rep->nparts; ++p) {
if (replica_check_local_part_dir(set, r, p))
return -1;
}
}
return 0;
}
int
replica_open_replica_part_files(struct pool_set *set, unsigned repn)
{
LOG(3, "set %p, repn %u", set, repn);
struct pool_replica *rep = set->replica[repn];
for (unsigned p = 0; p < rep->nparts; ++p) {
if (rep->part[p].fd != -1)
continue;
if (util_part_open(&rep->part[p], 0, 0)) {
LOG(1, "part files open failed for replica %u, part %u",
repn, p);
errno = EINVAL;
goto err;
}
}
return 0;
err:
util_replica_fdclose(set->replica[repn]);
return -1;
}
int
replica_open_poolset_part_files(struct pool_set *set)
{
LOG(3, "set %p", set);
for (unsigned r = 0; r < set->nreplicas; ++r) {
if (set->replica[r]->remote)
continue;
if (replica_open_replica_part_files(set, r)) {
LOG(1, "opening replica %u, part files failed", r);
goto err;
}
}
return 0;
err:
util_poolset_fdclose_always(set);
return -1;
}
#ifndef _WIN32
static inline
#endif
int
pmempool_syncU(const char *poolset, unsigned flags)
{
LOG(3, "poolset %s, flags %u", poolset, flags);
ASSERTne(poolset, NULL);
if (util_is_poolset_file(poolset) != 1) {
ERR("file is not a poolset file");
goto err;
}
if (check_flags_sync(flags)) {
ERR("unsupported flags");
errno = EINVAL;
goto err;
}
int fd = util_file_open(poolset, NULL, 0, O_RDONLY);
if (fd < 0) {
ERR("cannot open a poolset file");
goto err;
}
struct pool_set *set = NULL;
if (util_poolset_parse(&set, poolset, fd)) {
ERR("parsing input poolset failed");
goto err_close_file;
}
if (set->remote && util_remote_load()) {
ERR("remote replication not available");
goto err_close_file;
}
if (replica_sync(set, NULL, flags)) {
LOG(1, "synchronization failed");
goto err_close_all;
}
util_poolset_close(set, DO_NOT_DELETE_PARTS);
os_close(fd);
return 0;
err_close_all:
util_poolset_close(set, DO_NOT_DELETE_PARTS);
err_close_file:
os_close(fd);
err:
if (errno == 0)
errno = EINVAL;
return -1;
}
#ifndef _WIN32
int
pmempool_sync(const char *poolset, unsigned flags)
{
return pmempool_syncU(poolset, flags);
}
#else
int
pmempool_syncW(const wchar_t *poolset, unsigned flags)
{
char *path = util_toUTF8(poolset);
if (path == NULL) {
ERR("Invalid poolest file path.");
return -1;
}
int ret = pmempool_syncU(path, flags);
util_free_UTF8(path);
return ret;
}
#endif
#ifndef _WIN32
static inline
#endif
int
pmempool_transformU(const char *poolset_src,
const char *poolset_dst, unsigned flags)
{
LOG(3, "poolset_src %s, poolset_dst %s, flags %u", poolset_src,
poolset_dst, flags);
ASSERTne(poolset_src, NULL);
ASSERTne(poolset_dst, NULL);
if (util_is_poolset_file(poolset_src) != 1) {
ERR("source file is not a poolset file");
goto err;
}
if (util_is_poolset_file(poolset_dst) != 1) {
ERR("destination file is not a poolset file");
goto err;
}
if (check_flags_transform(flags)) {
ERR("unsupported flags");
errno = EINVAL;
goto err;
}
int fd_in = util_file_open(poolset_src, NULL, 0, O_RDONLY);
if (fd_in < 0) {
ERR("cannot open source poolset file");
goto err;
}
struct pool_set *set_in = NULL;
if (util_poolset_parse(&set_in, poolset_src, fd_in)) {
ERR("parsing source poolset failed");
os_close(fd_in);
goto err;
}
os_close(fd_in);
int fd_out = util_file_open(poolset_dst, NULL, 0, O_RDONLY);
if (fd_out < 0) {
ERR("cannot open destination poolset file");
goto err;
}
enum del_parts_mode del = DO_NOT_DELETE_PARTS;
struct pool_set *set_out = NULL;
if (util_poolset_parse(&set_out, poolset_dst, fd_out)) {
ERR("parsing destination poolset failed");
os_close(fd_out);
goto err_free_poolin;
}
os_close(fd_out);
if (pool_set_type(set_in) != POOL_TYPE_OBJ) {
ERR("source poolset is of a wrong type");
goto err_free_poolout;
}
if (set_in->remote && util_remote_load()) {
ERR("remote replication not available");
goto err_free_poolout;
}
if (set_out->remote && util_remote_load()) {
ERR("remote replication not available");
goto err_free_poolout;
}
del = is_dry_run(flags) ? DO_NOT_DELETE_PARTS : DELETE_CREATED_PARTS;
if (replica_transform(set_in, set_out, flags)) {
ERR("transformation failed");
goto err_free_poolout;
}
util_poolset_close(set_in, DO_NOT_DELETE_PARTS);
util_poolset_close(set_out, DO_NOT_DELETE_PARTS);
return 0;
err_free_poolout:
util_poolset_close(set_out, del);
err_free_poolin:
util_poolset_close(set_in, DO_NOT_DELETE_PARTS);
err:
if (errno == 0)
errno = EINVAL;
return -1;
}
#ifndef _WIN32
int
pmempool_transform(const char *poolset_src,
const char *poolset_dst, unsigned flags)
{
return pmempool_transformU(poolset_src, poolset_dst, flags);
}
#else
int
pmempool_transformW(const wchar_t *poolset_src,
const wchar_t *poolset_dst, unsigned flags)
{
char *path_src = util_toUTF8(poolset_src);
if (path_src == NULL) {
ERR("Invalid source poolest file path.");
return -1;
}
char *path_dst = util_toUTF8(poolset_dst);
if (path_dst == NULL) {
ERR("Invalid destination poolest file path.");
Free(path_src);
return -1;
}
int ret = pmempool_transformU(path_src, path_dst, flags);
util_free_UTF8(path_src);
util_free_UTF8(path_dst);
return ret;
}
#endif