#include "db_config.h"
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/btree.h"
#include "dbinc/txn.h"
#define INITIAL_SITES_ALLOCATION 3
static int get_eid __P((ENV *, const char *, u_int, int *));
static int __repmgr_addrcmp __P((repmgr_netaddr_t *, repmgr_netaddr_t *));
static int read_gmdb __P((ENV *, DB_THREAD_INFO *, u_int8_t **, size_t *));
int
__repmgr_schedule_connection_attempt(env, eid, immediate)
ENV *env;
int eid;
int immediate;
{
DB_REP *db_rep;
REP *rep;
REPMGR_RETRY *retry, *target;
REPMGR_SITE *site;
db_timespec t;
int ret;
db_rep = env->rep_handle;
rep = db_rep->region;
if ((ret = __os_malloc(env, sizeof(*retry), &retry)) != 0)
return (ret);
DB_ASSERT(env, IS_VALID_EID(eid));
site = SITE_FROM_EID(eid);
__os_gettime(env, &t, 1);
if (immediate)
TAILQ_INSERT_HEAD(&db_rep->retries, retry, entries);
else {
TIMESPEC_ADD_DB_TIMEOUT(&t, rep->connection_retry_wait);
TAILQ_FOREACH(target, &db_rep->retries, entries) {
if (timespeccmp(&target->time, &t, >))
break;
}
if (target == NULL)
TAILQ_INSERT_TAIL(&db_rep->retries, retry, entries);
else
TAILQ_INSERT_BEFORE(target, retry, entries);
}
retry->eid = eid;
retry->time = t;
site->state = SITE_PAUSING;
site->ref.retry = retry;
return (__repmgr_wake_main_thread(env));
}
int
__repmgr_is_server(env, site)
ENV *env;
REPMGR_SITE *site;
{
DB_REP *db_rep;
int cmp;
db_rep = env->rep_handle;
cmp = __repmgr_addrcmp(&site->net_addr,
&SITE_FROM_EID(db_rep->self_eid)->net_addr);
DB_ASSERT(env, cmp != 0);
return (cmp == -1);
}
static int
__repmgr_addrcmp(addr1, addr2)
repmgr_netaddr_t *addr1, *addr2;
{
int cmp;
cmp = strcmp(addr1->host, addr2->host);
if (cmp != 0)
return (cmp);
if (addr1->port < addr2->port)
return (-1);
else if (addr1->port > addr2->port)
return (1);
return (0);
}
void
__repmgr_reset_for_reading(con)
REPMGR_CONNECTION *con;
{
con->reading_phase = SIZES_PHASE;
__repmgr_iovec_init(&con->iovecs);
__repmgr_add_buffer(&con->iovecs,
con->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE);
}
int
__repmgr_new_connection(env, connp, s, state)
ENV *env;
REPMGR_CONNECTION **connp;
socket_t s;
int state;
{
REPMGR_CONNECTION *c;
int ret;
if ((ret = __os_calloc(env, 1, sizeof(REPMGR_CONNECTION), &c)) != 0)
return (ret);
if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
__os_free(env, c);
return (ret);
}
if ((ret = __repmgr_init_waiters(env, &c->response_waiters)) != 0) {
(void)__repmgr_free_cond(&c->drained);
__os_free(env, c);
return (ret);
}
c->fd = s;
c->state = state;
c->type = UNKNOWN_CONN_TYPE;
#ifdef DB_WIN32
c->event_object = WSA_INVALID_EVENT;
#endif
STAILQ_INIT(&c->outbound_queue);
c->out_queue_length = 0;
__repmgr_reset_for_reading(c);
*connp = c;
return (0);
}
int
__repmgr_set_keepalive(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
int ret, sockopt;
ret = 0;
#ifdef SO_KEEPALIVE
sockopt = 1;
if (setsockopt(conn->fd, SOL_SOCKET,
SO_KEEPALIVE, (sockopt_t)&sockopt, sizeof(sockopt)) != 0) {
ret = net_errno;
__db_err(env, ret, DB_STR("3626",
"can't set KEEPALIVE socket option"));
(void)__repmgr_destroy_conn(env, conn);
}
#endif
return (ret);
}
int
__repmgr_new_site(env, sitep, host, port)
ENV *env;
REPMGR_SITE **sitep;
const char *host;
u_int port;
{
DB_REP *db_rep;
REPMGR_CONNECTION *conn;
REPMGR_SITE *site, *sites;
char *p;
u_int i, new_site_max;
int ret;
db_rep = env->rep_handle;
if (db_rep->site_cnt >= db_rep->site_max) {
new_site_max = db_rep->site_max == 0 ?
INITIAL_SITES_ALLOCATION : db_rep->site_max * 2;
if ((ret = __os_malloc(env,
sizeof(REPMGR_SITE) * new_site_max, &sites)) != 0)
return (ret);
if (db_rep->site_max > 0) {
for (i = 0; i < db_rep->site_cnt; i++) {
sites[i] = db_rep->sites[i];
TAILQ_INIT(&sites[i].sub_conns);
while (!TAILQ_EMPTY(
&db_rep->sites[i].sub_conns)) {
conn = TAILQ_FIRST(
&db_rep->sites[i].sub_conns);
TAILQ_REMOVE(
&db_rep->sites[i].sub_conns,
conn, entries);
TAILQ_INSERT_TAIL(&sites[i].sub_conns,
conn, entries);
}
}
__os_free(env, db_rep->sites);
}
db_rep->sites = sites;
db_rep->site_max = new_site_max;
}
if ((ret = __os_strdup(env, host, &p)) != 0) {
return (ret);
}
site = &db_rep->sites[db_rep->site_cnt++];
site->net_addr.host = p;
site->net_addr.port = (u_int16_t)port;
ZERO_LSN(site->max_ack);
site->ack_policy = 0;
site->alignment = 0;
site->flags = 0;
timespecclear(&site->last_rcvd_timestamp);
TAILQ_INIT(&site->sub_conns);
site->connector = NULL;
site->ref.conn.in = site->ref.conn.out = NULL;
site->state = SITE_IDLE;
site->membership = 0;
site->config = 0;
*sitep = site;
return (0);
}
int
__repmgr_create_mutex(env, mtxp)
ENV *env;
mgr_mutex_t **mtxp;
{
mgr_mutex_t *mtx;
int ret;
if ((ret = __os_malloc(env, sizeof(mgr_mutex_t), &mtx)) == 0 &&
(ret = __repmgr_create_mutex_pf(mtx)) != 0) {
__os_free(env, mtx);
}
if (ret == 0)
*mtxp = mtx;
return (ret);
}
int
__repmgr_destroy_mutex(env, mtx)
ENV *env;
mgr_mutex_t *mtx;
{
int ret;
ret = __repmgr_destroy_mutex_pf(mtx);
__os_free(env, mtx);
return (ret);
}
void
__repmgr_cleanup_netaddr(env, addr)
ENV *env;
repmgr_netaddr_t *addr;
{
if (addr->host != NULL) {
__os_free(env, addr->host);
addr->host = NULL;
}
}
void
__repmgr_iovec_init(v)
REPMGR_IOVECS *v;
{
v->offset = v->count = 0;
v->total_bytes = 0;
}
void
__repmgr_add_buffer(v, address, length)
REPMGR_IOVECS *v;
void *address;
size_t length;
{
if (length > 0) {
v->vectors[v->count].iov_base = address;
v->vectors[v->count++].iov_len = (u_long)length;
v->total_bytes += length;
}
}
void
__repmgr_add_dbt(v, dbt)
REPMGR_IOVECS *v;
const DBT *dbt;
{
if (dbt->size > 0) {
v->vectors[v->count].iov_base = dbt->data;
v->vectors[v->count++].iov_len = dbt->size;
v->total_bytes += dbt->size;
}
}
int
__repmgr_update_consumed(v, byte_count)
REPMGR_IOVECS *v;
size_t byte_count;
{
db_iovec_t *iov;
int i;
for (i = v->offset; ; i++) {
DB_ASSERT(NULL, i < v->count && byte_count > 0);
iov = &v->vectors[i];
if (byte_count > iov->iov_len) {
byte_count -= iov->iov_len;
} else {
iov->iov_len -= (u_int32_t)byte_count;
if (iov->iov_len > 0) {
iov->iov_base = (void *)
((u_int8_t *)iov->iov_base + byte_count);
v->offset = i;
} else {
v->offset = i+1;
}
return (v->offset >= v->count);
}
}
}
int
__repmgr_prepare_my_addr(env, dbt)
ENV *env;
DBT *dbt;
{
DB_REP *db_rep;
repmgr_netaddr_t addr;
size_t size, hlen;
u_int16_t port_buffer;
u_int8_t *ptr;
int ret;
db_rep = env->rep_handle;
LOCK_MUTEX(db_rep->mutex);
addr = SITE_FROM_EID(db_rep->self_eid)->net_addr;
UNLOCK_MUTEX(db_rep->mutex);
port_buffer = htons(addr.port);
size = sizeof(port_buffer) + (hlen = strlen(addr.host) + 1);
if ((ret = __os_malloc(env, size, &ptr)) != 0)
return (ret);
DB_INIT_DBT(*dbt, ptr, size);
memcpy(ptr, &port_buffer, sizeof(port_buffer));
ptr = &ptr[sizeof(port_buffer)];
memcpy(ptr, addr.host, hlen);
return (0);
}
int
__repmgr_get_nsites(env, nsitesp)
ENV *env;
u_int32_t *nsitesp;
{
DB_REP *db_rep;
u_int32_t nsites;
db_rep = env->rep_handle;
if ((nsites = db_rep->region->config_nsites) == 0) {
__db_errx(env, DB_STR("3672",
"Nsites unknown before repmgr_start()"));
return (EINVAL);
}
*nsitesp = nsites;
return (0);
}
int
__repmgr_thread_failure(env, why)
ENV *env;
int why;
{
DB_REP *db_rep;
db_rep = env->rep_handle;
LOCK_MUTEX(db_rep->mutex);
(void)__repmgr_stop_threads(env);
UNLOCK_MUTEX(db_rep->mutex);
return (__env_panic(env, why));
}
char *
__repmgr_format_eid_loc(db_rep, conn, buffer)
DB_REP *db_rep;
REPMGR_CONNECTION *conn;
char *buffer;
{
int eid;
if (conn->type == APP_CONNECTION)
snprintf(buffer,
MAX_SITE_LOC_STRING, "(application channel)");
else if (conn->type == REP_CONNECTION &&
IS_VALID_EID(eid = conn->eid))
(void)__repmgr_format_site_loc(SITE_FROM_EID(eid), buffer);
else
snprintf(buffer, MAX_SITE_LOC_STRING, "(unidentified site)");
return (buffer);
}
char *
__repmgr_format_site_loc(site, buffer)
REPMGR_SITE *site;
char *buffer;
{
return (__repmgr_format_addr_loc(&site->net_addr, buffer));
}
char *
__repmgr_format_addr_loc(addr, buffer)
repmgr_netaddr_t *addr;
char *buffer;
{
snprintf(buffer, MAX_SITE_LOC_STRING, "site %s:%lu",
addr->host, (u_long)addr->port);
return (buffer);
}
int
__repmgr_repstart(env, flags)
ENV *env;
u_int32_t flags;
{
DBT my_addr;
int ret;
if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0)
return (ret);
ret = __rep_start_int(env, &my_addr, flags);
__os_free(env, my_addr.data);
if (ret != 0)
__db_err(env, ret, DB_STR("3673", "rep_start"));
return (ret);
}
int
__repmgr_become_master(env)
ENV *env;
{
DB_REP *db_rep;
DB_THREAD_INFO *ip;
DB *dbp;
DB_TXN *txn;
REPMGR_SITE *site;
DBT key_dbt, data_dbt;
__repmgr_membership_key_args key;
__repmgr_membership_data_args member_status;
repmgr_netaddr_t addr;
u_int32_t status;
u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE];
u_int8_t key_buf[MAX_MSG_BUF];
size_t len;
u_int i;
int ret, t_ret;
db_rep = env->rep_handle;
dbp = NULL;
txn = NULL;
LOCK_MUTEX(db_rep->mutex);
ZERO_LSN(db_rep->limbo_failure);
ZERO_LSN(db_rep->durable_lsn);
db_rep->limbo_victim = DB_EID_INVALID;
db_rep->limbo_resolution_needed = FALSE;
FOR_EACH_REMOTE_SITE_INDEX(i) {
site = SITE_FROM_EID(i);
if (site->membership == SITE_ADDING ||
site->membership == SITE_DELETING) {
db_rep->limbo_victim = (int)i;
db_rep->limbo_resolution_needed = TRUE;
break;
}
}
db_rep->client_intent = FALSE;
UNLOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_repstart(env, DB_REP_MASTER)) != 0)
return (ret);
if (db_rep->have_gmdb)
return (0);
db_rep->member_version_gen = db_rep->region->gen;
ENV_ENTER(env, ip);
if ((ret = __repmgr_hold_master_role(env, NULL)) != 0)
goto leave;
retry:
if ((ret = __repmgr_setup_gmdb_op(env, ip, &txn, DB_CREATE)) != 0)
goto err;
DB_ASSERT(env, txn != NULL);
dbp = db_rep->gmdb;
DB_ASSERT(env, dbp != NULL);
if ((ret = __repmgr_set_gm_version(env, ip, txn, 1)) != 0)
goto err;
for (i = 0; i < db_rep->site_cnt; i++) {
LOCK_MUTEX(db_rep->mutex);
site = SITE_FROM_EID(i);
addr = site->net_addr;
status = site->membership;
UNLOCK_MUTEX(db_rep->mutex);
if (status == 0)
continue;
DB_INIT_DBT(key.host, addr.host, strlen(addr.host) + 1);
key.port = addr.port;
ret = __repmgr_membership_key_marshal(env,
&key, key_buf, sizeof(key_buf), &len);
DB_ASSERT(env, ret == 0);
DB_INIT_DBT(key_dbt, key_buf, len);
member_status.flags = status;
__repmgr_membership_data_marshal(env, &member_status, data_buf);
DB_INIT_DBT(data_dbt, data_buf, __REPMGR_MEMBERSHIP_DATA_SIZE);
if ((ret = __db_put(dbp, ip, txn, &key_dbt, &data_dbt, 0)) != 0)
goto err;
}
err:
if (txn != NULL) {
if ((t_ret = __db_txn_auto_resolve(env, txn, 0, ret)) != 0 &&
ret == 0)
ret = t_ret;
if ((t_ret = __repmgr_cleanup_gmdb_op(env, TRUE)) != 0 &&
ret == 0)
ret = t_ret;
}
if (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED)
goto retry;
if ((t_ret = __repmgr_rlse_master_role(env)) != 0 && ret == 0)
ret = t_ret;
leave:
ENV_LEAVE(env, ip);
return (ret);
}
int
__repmgr_each_connection(env, callback, info, err_quit)
ENV *env;
CONNECTION_ACTION callback;
void *info;
int err_quit;
{
DB_REP *db_rep;
REPMGR_CONNECTION *conn, *next;
REPMGR_SITE *site;
int eid, ret, t_ret;
#define HANDLE_ERROR \
do { \
if (err_quit) \
return (t_ret); \
if (ret == 0) \
ret = t_ret; \
} while (0)
db_rep = env->rep_handle;
ret = 0;
for (conn = TAILQ_FIRST(&db_rep->connections);
conn != NULL;
conn = next) {
next = TAILQ_NEXT(conn, entries);
if ((t_ret = (*callback)(env, conn, info)) != 0)
HANDLE_ERROR;
}
FOR_EACH_REMOTE_SITE_INDEX(eid) {
site = SITE_FROM_EID(eid);
if (site->state == SITE_CONNECTED) {
if ((conn = site->ref.conn.in) != NULL &&
(t_ret = (*callback)(env, conn, info)) != 0)
HANDLE_ERROR;
if ((conn = site->ref.conn.out) != NULL &&
(t_ret = (*callback)(env, conn, info)) != 0)
HANDLE_ERROR;
}
for (conn = TAILQ_FIRST(&site->sub_conns);
conn != NULL;
conn = next) {
next = TAILQ_NEXT(conn, entries);
if ((t_ret = (*callback)(env, conn, info)) != 0)
HANDLE_ERROR;
}
}
return (0);
}
int
__repmgr_open(env, rep_)
ENV *env;
void *rep_;
{
DB_REP *db_rep;
REP *rep;
int ret;
db_rep = env->rep_handle;
rep = rep_;
if ((ret = __mutex_alloc(env, MTX_REPMGR, 0, &rep->mtx_repmgr)) != 0)
return (ret);
DB_ASSERT(env, rep->siteinfo_seq == 0 && db_rep->siteinfo_seq == 0);
rep->siteinfo_off = INVALID_ROFF;
rep->siteinfo_seq = 0;
if ((ret = __repmgr_share_netaddrs(env, rep, 0, db_rep->site_cnt)) != 0)
return (ret);
rep->self_eid = db_rep->self_eid;
rep->perm_policy = db_rep->perm_policy;
rep->ack_timeout = db_rep->ack_timeout;
rep->connection_retry_wait = db_rep->connection_retry_wait;
rep->election_retry_wait = db_rep->election_retry_wait;
rep->heartbeat_monitor_timeout = db_rep->heartbeat_monitor_timeout;
rep->heartbeat_frequency = db_rep->heartbeat_frequency;
return (ret);
}
int
__repmgr_join(env, rep_)
ENV *env;
void *rep_;
{
DB_REP *db_rep;
REGINFO *infop;
REP *rep;
SITEINFO *p;
REPMGR_SITE *site, temp;
repmgr_netaddr_t *addrp;
char *host;
u_int i, j;
int ret;
db_rep = env->rep_handle;
infop = env->reginfo;
rep = rep_;
ret = 0;
MUTEX_LOCK(env, rep->mtx_repmgr);
i = 0;
if (rep->siteinfo_off != INVALID_ROFF) {
p = R_ADDR(infop, rep->siteinfo_off);
for (; i < rep->site_cnt; i++) {
host = R_ADDR(infop, p[i].addr.host);
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"Site %s:%lu found at EID %u",
host, (u_long)p[i].addr.port, i));
for (j = i; j < db_rep->site_cnt; j++) {
site = &db_rep->sites[j];
addrp = &site->net_addr;
if (strcmp(host, addrp->host) == 0 &&
p[i].addr.port == addrp->port) {
p[i].config = site->config;
site->membership = p[i].status;
break;
}
}
if (j == db_rep->site_cnt) {
if ((ret = __repmgr_new_site(env,
&site, host, p[i].addr.port)) != 0)
goto unlock;
site->config = p[i].config;
site->membership = p[i].status;
}
DB_ASSERT(env, j < db_rep->site_cnt);
if (i != j) {
temp = db_rep->sites[j];
db_rep->sites[j] = db_rep->sites[i];
db_rep->sites[i] = temp;
if (db_rep->self_eid == (int)j)
db_rep->self_eid = (int)i;
}
}
}
if ((ret = __repmgr_share_netaddrs(env, rep, i, db_rep->site_cnt)) != 0)
goto unlock;
if (db_rep->self_eid == DB_EID_INVALID)
db_rep->self_eid = rep->self_eid;
else if (rep->self_eid == DB_EID_INVALID)
rep->self_eid = db_rep->self_eid;
else if (db_rep->self_eid != rep->self_eid) {
__db_errx(env, DB_STR("3674",
"A mismatching local site address has been set in the environment"));
ret = EINVAL;
goto unlock;
}
db_rep->siteinfo_seq = rep->siteinfo_seq;
unlock:
MUTEX_UNLOCK(env, rep->mtx_repmgr);
return (ret);
}
int
__repmgr_env_refresh(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
REGINFO *infop;
SITEINFO *shared_array;
u_int i;
int ret;
db_rep = env->rep_handle;
rep = db_rep->region;
infop = env->reginfo;
ret = 0;
COMPQUIET(i, 0);
if (F_ISSET(env, ENV_PRIVATE)) {
ret = __mutex_free(env, &rep->mtx_repmgr);
if (rep->siteinfo_off != INVALID_ROFF) {
shared_array = R_ADDR(infop, rep->siteinfo_off);
for (i = 0; i < db_rep->site_cnt; i++)
__env_alloc_free(infop, R_ADDR(infop,
shared_array[i].addr.host));
__env_alloc_free(infop, shared_array);
rep->siteinfo_off = INVALID_ROFF;
}
}
return (ret);
}
int
__repmgr_share_netaddrs(env, rep_, start, limit)
ENV *env;
void *rep_;
u_int start, limit;
{
DB_REP *db_rep;
REP *rep;
REGINFO *infop;
REGENV *renv;
SITEINFO *orig, *shared_array;
char *host, *hostbuf;
size_t sz;
u_int i, n;
int eid, ret, touched;
db_rep = env->rep_handle;
infop = env->reginfo;
renv = infop->primary;
rep = rep_;
ret = 0;
touched = FALSE;
MUTEX_LOCK(env, renv->mtx_regenv);
for (i = start; i < limit; i++) {
if (rep->site_cnt >= rep->site_max) {
if (rep->siteinfo_off == INVALID_ROFF) {
n = INITIAL_SITES_ALLOCATION;
sz = n * sizeof(SITEINFO);
if ((ret = __env_alloc(infop,
sz, &shared_array)) != 0)
goto out;
} else {
n = 2 * rep->site_max;
sz = n * sizeof(SITEINFO);
if ((ret = __env_alloc(infop,
sz, &shared_array)) != 0)
goto out;
orig = R_ADDR(infop, rep->siteinfo_off);
memcpy(shared_array, orig,
sizeof(SITEINFO) * rep->site_cnt);
__env_alloc_free(infop, orig);
}
rep->siteinfo_off = R_OFFSET(infop, shared_array);
rep->site_max = n;
} else
shared_array = R_ADDR(infop, rep->siteinfo_off);
DB_ASSERT(env, rep->site_cnt < rep->site_max &&
rep->siteinfo_off != INVALID_ROFF);
host = db_rep->sites[i].net_addr.host;
sz = strlen(host) + 1;
if ((ret = __env_alloc(infop, sz, &hostbuf)) != 0)
goto out;
eid = (int)rep->site_cnt++;
(void)strcpy(hostbuf, host);
shared_array[eid].addr.host = R_OFFSET(infop, hostbuf);
shared_array[eid].addr.port = db_rep->sites[i].net_addr.port;
shared_array[eid].config = db_rep->sites[i].config;
shared_array[eid].status = db_rep->sites[i].membership;
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"EID %d is assigned for site %s:%lu",
eid, host, (u_long)shared_array[eid].addr.port));
touched = TRUE;
}
out:
if (touched)
db_rep->siteinfo_seq = ++rep->siteinfo_seq;
MUTEX_UNLOCK(env, renv->mtx_regenv);
return (ret);
}
int
__repmgr_copy_in_added_sites(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
REGINFO *infop;
SITEINFO *base, *p;
REPMGR_SITE *site;
char *host;
int ret;
u_int i;
db_rep = env->rep_handle;
rep = db_rep->region;
if (rep->siteinfo_off == INVALID_ROFF)
goto out;
infop = env->reginfo;
base = R_ADDR(infop, rep->siteinfo_off);
for (i = db_rep->site_cnt; i < rep->site_cnt; i++) {
p = &base[i];
host = R_ADDR(infop, p->addr.host);
if ((ret = __repmgr_new_site(env,
&site, host, p->addr.port)) != 0)
return (ret);
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"Site %s:%lu found at EID %u",
host, (u_long)p->addr.port, i));
}
for (i = 0; i < db_rep->site_cnt; i++) {
p = &base[i];
site = SITE_FROM_EID(i);
site->config = p->config;
site->membership = p->status;
}
out:
DB_ASSERT(env, db_rep->site_cnt == rep->site_cnt);
db_rep->siteinfo_seq = rep->siteinfo_seq;
return (0);
}
int
__repmgr_init_new_sites(env, from, limit)
ENV *env;
int from, limit;
{
DB_REP *db_rep;
REPMGR_SITE *site;
int i, ret;
db_rep = env->rep_handle;
if (db_rep->selector == NULL)
return (0);
DB_ASSERT(env, IS_VALID_EID(from) && IS_VALID_EID(limit) &&
from <= limit);
for (i = from; i < limit; i++) {
site = SITE_FROM_EID(i);
if (site->membership == SITE_PRESENT &&
(ret = __repmgr_schedule_connection_attempt(env,
i, TRUE)) != 0)
return (ret);
}
return (0);
}
int
__repmgr_failchk(env)
ENV *env;
{
DB_ENV *dbenv;
DB_REP *db_rep;
REP *rep;
db_threadid_t unused;
dbenv = env->dbenv;
db_rep = env->rep_handle;
rep = db_rep->region;
DB_THREADID_INIT(unused);
MUTEX_LOCK(env, rep->mtx_repmgr);
if (rep->listener != 0 && !dbenv->is_alive(dbenv,
rep->listener, unused, DB_MUTEX_PROCESS_ONLY))
rep->listener = 0;
MUTEX_UNLOCK(env, rep->mtx_repmgr);
return (0);
}
int
__repmgr_master_is_known(env)
ENV *env;
{
DB_REP *db_rep;
REPMGR_CONNECTION *conn;
REPMGR_SITE *master;
db_rep = env->rep_handle;
if (db_rep->region->master_id == db_rep->self_eid)
return (TRUE);
if ((master = __repmgr_connected_master(env)) == NULL)
return (FALSE);
if ((conn = master->ref.conn.in) != NULL &&
IS_READY_STATE(conn->state))
return (TRUE);
if ((conn = master->ref.conn.out) != NULL &&
IS_READY_STATE(conn->state))
return (TRUE);
return (FALSE);
}
int
__repmgr_stable_lsn(env, stable_lsn)
ENV *env;
DB_LSN *stable_lsn;
{
DB_REP *db_rep;
REP *rep;
db_rep = env->rep_handle;
rep = db_rep->region;
if (rep->min_log_file != 0 && rep->min_log_file < stable_lsn->file) {
stable_lsn->file = rep->min_log_file;
stable_lsn->offset = 0;
}
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"Repmgr_stable_lsn: Returning stable_lsn[%lu][%lu]",
(u_long)stable_lsn->file, (u_long)stable_lsn->offset));
return (0);
}
int
__repmgr_send_sync_msg(env, conn, type, buf, len)
ENV *env;
REPMGR_CONNECTION *conn;
u_int8_t *buf;
u_int32_t len, type;
{
REPMGR_IOVECS iovecs;
__repmgr_msg_hdr_args msg_hdr;
u_int8_t hdr_buf[__REPMGR_MSG_HDR_SIZE];
size_t unused;
msg_hdr.type = REPMGR_OWN_MSG;
REPMGR_OWN_BUF_SIZE(msg_hdr) = len;
REPMGR_OWN_MSG_TYPE(msg_hdr) = type;
__repmgr_msg_hdr_marshal(env, &msg_hdr, hdr_buf);
__repmgr_iovec_init(&iovecs);
__repmgr_add_buffer(&iovecs, hdr_buf, __REPMGR_MSG_HDR_SIZE);
if (len > 0)
__repmgr_add_buffer(&iovecs, buf, len);
return (__repmgr_write_iovecs(env, conn, &iovecs, &unused));
}
int
__repmgr_marshal_member_list(env, bufp, lenp)
ENV *env;
u_int8_t **bufp;
size_t *lenp;
{
DB_REP *db_rep;
REP *rep;
REPMGR_SITE *site;
__repmgr_membr_vers_args membr_vers;
__repmgr_site_info_args site_info;
u_int8_t *buf, *p;
size_t bufsize, len;
u_int i;
int ret;
db_rep = env->rep_handle;
rep = db_rep->region;
bufsize = __REPMGR_MEMBR_VERS_SIZE +
db_rep->site_cnt * (__REPMGR_SITE_INFO_SIZE + MAXHOSTNAMELEN + 1);
if ((ret = __os_malloc(env, bufsize, &buf)) != 0)
return (ret);
p = buf;
membr_vers.version = db_rep->membership_version;
membr_vers.gen = rep->gen;
__repmgr_membr_vers_marshal(env, &membr_vers, p);
p += __REPMGR_MEMBR_VERS_SIZE;
for (i = 0; i < db_rep->site_cnt; i++) {
site = SITE_FROM_EID(i);
if (site->membership == 0)
continue;
site_info.host.data = site->net_addr.host;
site_info.host.size =
(u_int32_t)strlen(site->net_addr.host) + 1;
site_info.port = site->net_addr.port;
site_info.flags = site->membership;
ret = __repmgr_site_info_marshal(env,
&site_info, p, (size_t)(&buf[bufsize]-p), &len);
DB_ASSERT(env, ret == 0);
p += len;
}
len = (size_t)(p - buf);
*bufp = buf;
*lenp = len;
DB_ASSERT(env, ret == 0);
return (0);
}
static int
read_gmdb(env, ip, bufp, lenp)
ENV *env;
DB_THREAD_INFO *ip;
u_int8_t **bufp;
size_t *lenp;
{
DB_TXN *txn;
DB *dbp;
DBC *dbc;
DBT key_dbt, data_dbt;
__repmgr_membership_key_args key;
__repmgr_membership_data_args member_status;
__repmgr_member_metadata_args metadata;
__repmgr_membr_vers_args membr_vers;
__repmgr_site_info_args site_info;
u_int8_t data_buf[__REPMGR_MEMBERSHIP_DATA_SIZE];
u_int8_t key_buf[MAX_MSG_BUF];
u_int8_t metadata_buf[__REPMGR_MEMBER_METADATA_SIZE];
char *host;
size_t bufsize, len;
u_int8_t *buf, *p;
u_int32_t gen;
int ret, t_ret;
txn = NULL;
dbp = NULL;
dbc = NULL;
buf = NULL;
COMPQUIET(len, 0);
if ((ret = __rep_get_datagen(env, &gen)) != 0)
return (ret);
if ((ret = __txn_begin(env, ip, NULL, &txn, DB_IGNORE_LEASE)) != 0)
goto err;
if ((ret = __rep_open_sysdb(env, ip, txn, REPMEMBERSHIP, 0, &dbp)) != 0)
goto err;
if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
goto err;
memset(&key_dbt, 0, sizeof(key_dbt));
key_dbt.data = key_buf;
key_dbt.ulen = sizeof(key_buf);
F_SET(&key_dbt, DB_DBT_USERMEM);
memset(&data_dbt, 0, sizeof(data_dbt));
data_dbt.data = metadata_buf;
data_dbt.ulen = sizeof(metadata_buf);
F_SET(&data_dbt, DB_DBT_USERMEM);
if ((ret = __dbc_get(dbc, &key_dbt, &data_dbt, DB_NEXT)) != 0)
goto err;
ret = __repmgr_membership_key_unmarshal(env,
&key, key_buf, key_dbt.size, NULL);
DB_ASSERT(env, ret == 0);
DB_ASSERT(env, key.host.size == 0);
DB_ASSERT(env, key.port == 0);
ret = __repmgr_member_metadata_unmarshal(env,
&metadata, metadata_buf, data_dbt.size, NULL);
DB_ASSERT(env, ret == 0);
DB_ASSERT(env, metadata.format == REPMGR_GMDB_FMT_VERSION);
DB_ASSERT(env, metadata.version > 0);
bufsize = 1000;
if ((ret = __os_malloc(env, bufsize, &buf)) != 0)
goto err;
membr_vers.version = metadata.version;
membr_vers.gen = gen;
__repmgr_membr_vers_marshal(env, &membr_vers, buf);
p = &buf[__REPMGR_MEMBR_VERS_SIZE];
data_dbt.data = data_buf;
data_dbt.ulen = sizeof(data_buf);
while ((ret = __dbc_get(dbc, &key_dbt, &data_dbt, DB_NEXT)) == 0) {
ret = __repmgr_membership_key_unmarshal(env,
&key, key_buf, key_dbt.size, NULL);
DB_ASSERT(env, ret == 0);
DB_ASSERT(env, key.host.size <= MAXHOSTNAMELEN + 1 &&
key.host.size > 1);
host = (char*)key.host.data;
DB_ASSERT(env, host[key.host.size-1] == '\0');
DB_ASSERT(env, key.port > 0);
ret = __repmgr_membership_data_unmarshal(env,
&member_status, data_buf, data_dbt.size, NULL);
DB_ASSERT(env, ret == 0);
DB_ASSERT(env, member_status.flags != 0);
site_info.host = key.host;
site_info.port = key.port;
site_info.flags = member_status.flags;
if ((ret = __repmgr_site_info_marshal(env, &site_info,
p, (size_t)(&buf[bufsize]-p), &len)) == ENOMEM) {
bufsize *= 2;
len = (size_t)(p - buf);
if ((ret = __os_realloc(env, bufsize, &buf)) != 0)
goto err;
p = &buf[len];
ret = __repmgr_site_info_marshal(env,
&site_info, p, (size_t)(&buf[bufsize]-p), &len);
DB_ASSERT(env, ret == 0);
}
p += len;
}
len = (size_t)(p - buf);
if (ret == DB_NOTFOUND)
ret = 0;
err:
if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
ret = t_ret;
if (dbp != NULL &&
(t_ret = __db_close(dbp, txn, DB_NOSYNC)) != 0 && ret == 0)
ret = t_ret;
if (txn != NULL &&
(t_ret = __db_txn_auto_resolve(env, txn, 0, ret)) != 0 && ret == 0)
ret = t_ret;
if (ret == 0) {
*bufp = buf;
*lenp = len;
} else if (buf != NULL)
__os_free(env, buf);
return (ret);
}
int
__repmgr_refresh_membership(env, buf, len)
ENV *env;
u_int8_t *buf;
size_t len;
{
DB_REP *db_rep;
REPMGR_SITE *site;
__repmgr_membr_vers_args membr_vers;
__repmgr_site_info_args site_info;
char *host;
u_int8_t *p;
u_int16_t port;
u_int32_t i, n;
int eid, ret;
db_rep = env->rep_handle;
ret = __repmgr_membr_vers_unmarshal(env, &membr_vers, buf, len, &p);
DB_ASSERT(env, ret == 0);
if (db_rep->repmgr_status == stopped)
return (0);
if (__repmgr_gmdb_version_cmp(env,
membr_vers.gen, membr_vers.version) <= 0)
return (0);
LOCK_MUTEX(db_rep->mutex);
db_rep->membership_version = membr_vers.version;
db_rep->member_version_gen = membr_vers.gen;
for (i = 0; i < db_rep->site_cnt; i++)
F_CLR(SITE_FROM_EID(i), SITE_TOUCHED);
for (n = 0; p < &buf[len]; ++n) {
ret = __repmgr_site_info_unmarshal(env,
&site_info, p, (size_t)(&buf[len] - p), &p);
DB_ASSERT(env, ret == 0);
host = site_info.host.data;
DB_ASSERT(env,
(u_int8_t*)site_info.host.data + site_info.host.size <= p);
host[site_info.host.size-1] = '\0';
port = site_info.port;
if ((ret = __repmgr_set_membership(env,
host, port, site_info.flags)) != 0)
goto err;
if ((ret = __repmgr_find_site(env, host, port, &eid)) != 0)
goto err;
DB_ASSERT(env, IS_VALID_EID(eid));
F_SET(SITE_FROM_EID(eid), SITE_TOUCHED);
}
ret = __rep_set_nsites_int(env, n);
DB_ASSERT(env, ret == 0);
for (i = 0; i < db_rep->site_cnt; i++) {
site = SITE_FROM_EID(i);
if (F_ISSET(site, SITE_TOUCHED))
continue;
host = site->net_addr.host;
port = site->net_addr.port;
if ((ret = __repmgr_set_membership(env, host, port, 0)) != 0)
goto err;
}
err:
UNLOCK_MUTEX(db_rep->mutex);
return (ret);
}
int
__repmgr_reload_gmdb(env)
ENV *env;
{
DB_THREAD_INFO *ip;
u_int8_t *buf;
size_t len;
int ret;
ENV_ENTER(env, ip);
if ((ret = read_gmdb(env, ip, &buf, &len)) == 0) {
env->rep_handle->have_gmdb = TRUE;
ret = __repmgr_refresh_membership(env, buf, len);
__os_free(env, buf);
}
ENV_LEAVE(env, ip);
return (ret);
}
int
__repmgr_gmdb_version_cmp(env, gen, version)
ENV *env;
u_int32_t gen, version;
{
DB_REP *db_rep;
u_int32_t g, v;
db_rep = env->rep_handle;
g = db_rep->member_version_gen;
v = db_rep->membership_version;
if (gen == g)
return (version == v ? 0 :
(version < v ? -1 : 1));
return (gen < g ? -1 : 1);
}
int
__repmgr_init_save(env, dbt)
ENV *env;
DBT *dbt;
{
DB_REP *db_rep;
u_int8_t *buf;
size_t len;
int ret;
db_rep = env->rep_handle;
LOCK_MUTEX(db_rep->mutex);
if (db_rep->site_cnt == 0) {
dbt->data = NULL;
dbt->size = 0;
ret = 0;
} else if ((ret = __repmgr_marshal_member_list(env, &buf, &len)) == 0) {
dbt->data = buf;
dbt->size = (u_int32_t)len;
}
UNLOCK_MUTEX(db_rep->mutex);
return (ret);
}
int
__repmgr_init_restore(env, dbt)
ENV *env;
DBT *dbt;
{
DB_REP *db_rep;
db_rep = env->rep_handle;
db_rep->restored_list = dbt->data;
db_rep->restored_list_length = dbt->size;
return (0);
}
int
__repmgr_defer_op(env, op)
ENV *env;
u_int32_t op;
{
REPMGR_MESSAGE *msg;
int ret;
if ((ret = __os_calloc(env, 1, sizeof(*msg), &msg)) != 0)
return (ret);
msg->msg_hdr.type = REPMGR_OWN_MSG;
REPMGR_OWN_MSG_TYPE(msg->msg_hdr) = op;
ret = __repmgr_queue_put(env, msg);
return (ret);
}
void
__repmgr_fire_conn_err_event(env, conn, err)
ENV *env;
REPMGR_CONNECTION *conn;
int err;
{
DB_REP *db_rep;
DB_REPMGR_CONN_ERR info;
db_rep = env->rep_handle;
if (conn->type == REP_CONNECTION && IS_VALID_EID(conn->eid)) {
__repmgr_print_conn_err(env,
&SITE_FROM_EID(conn->eid)->net_addr, err);
info.eid = conn->eid;
info.error = err;
DB_EVENT(env, DB_EVENT_REP_CONNECT_BROKEN, &info);
}
}
void
__repmgr_print_conn_err(env, netaddr, err)
ENV *env;
repmgr_netaddr_t *netaddr;
int err;
{
SITE_STRING_BUFFER site_loc_buf;
char msgbuf[200];
(void)__repmgr_format_addr_loc(netaddr, site_loc_buf);
if (err == 0)
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"EOF on connection to %s", site_loc_buf));
else
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"`%s' (%d) on connection to %s",
__os_strerror(err, msgbuf, sizeof(msgbuf)),
err, site_loc_buf));
}
int
__repmgr_become_client(env)
ENV *env;
{
DB_REP *db_rep;
int ret;
db_rep = env->rep_handle;
LOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_await_gmdbop(env)) == 0)
db_rep->client_intent = TRUE;
UNLOCK_MUTEX(db_rep->mutex);
return (ret == 0 ? __repmgr_repstart(env, DB_REP_CLIENT) : ret);
}
REPMGR_SITE *
__repmgr_lookup_site(env, host, port)
ENV *env;
const char *host;
u_int port;
{
DB_REP *db_rep;
REPMGR_SITE *site;
u_int i;
db_rep = env->rep_handle;
for (i = 0; i < db_rep->site_cnt; i++) {
site = &db_rep->sites[i];
if (strcmp(site->net_addr.host, host) == 0 &&
site->net_addr.port == port)
return (site);
}
return (NULL);
}
int
__repmgr_find_site(env, host, port, eidp)
ENV *env;
const char *host;
u_int port;
int *eidp;
{
DB_REP *db_rep;
REP *rep;
REPMGR_SITE *site;
int eid, ret;
db_rep = env->rep_handle;
ret = 0;
if (REP_ON(env)) {
rep = db_rep->region;
MUTEX_LOCK(env, rep->mtx_repmgr);
ret = get_eid(env, host, port, &eid);
MUTEX_UNLOCK(env, rep->mtx_repmgr);
} else {
if ((site = __repmgr_lookup_site(env, host, port)) == NULL &&
(ret = __repmgr_new_site(env, &site, host, port)) != 0)
return (ret);
eid = EID_FROM_SITE(site);
}
if (ret == 0)
*eidp = eid;
return (ret);
}
static int
get_eid(env, host, port, eidp)
ENV *env;
const char *host;
u_int port;
int *eidp;
{
DB_REP *db_rep;
REP *rep;
REPMGR_SITE *site;
int eid, ret;
db_rep = env->rep_handle;
rep = db_rep->region;
if ((ret = __repmgr_copy_in_added_sites(env)) != 0)
return (ret);
if ((site = __repmgr_lookup_site(env, host, port)) == NULL) {
if ((ret = __repmgr_new_site(env, &site, host, port)) != 0)
return (ret);
eid = EID_FROM_SITE(site);
DB_ASSERT(env, (u_int)eid == db_rep->site_cnt - 1);
if ((ret = __repmgr_share_netaddrs(env,
rep, (u_int)eid, db_rep->site_cnt)) == 0) {
db_rep->siteinfo_seq = ++rep->siteinfo_seq;
} else {
db_rep->site_cnt--;
__repmgr_cleanup_netaddr(env, &site->net_addr);
}
} else
eid = EID_FROM_SITE(site);
if (ret == 0)
*eidp = eid;
return (ret);
}
int
__repmgr_set_membership(env, host, port, status)
ENV *env;
const char *host;
u_int port;
u_int32_t status;
{
DB_REP *db_rep;
REP *rep;
REGINFO *infop;
REPMGR_SITE *site;
SITEINFO *sites;
u_int32_t orig;
int eid, ret;
db_rep = env->rep_handle;
rep = db_rep->region;
infop = env->reginfo;
COMPQUIET(orig, 0);
COMPQUIET(site, NULL);
DB_ASSERT(env, REP_ON(env));
MUTEX_LOCK(env, rep->mtx_repmgr);
if ((ret = get_eid(env, host, port, &eid)) == 0) {
DB_ASSERT(env, IS_VALID_EID(eid));
site = SITE_FROM_EID(eid);
orig = site->membership;
sites = R_ADDR(infop, rep->siteinfo_off);
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"set membership for %s:%lu %lu (was %lu)",
host, (u_long)port, (u_long)status, (u_long)orig));
if (status != sites[eid].status) {
db_rep->siteinfo_seq = ++rep->siteinfo_seq;
}
site->membership = status;
sites[eid].status = status;
}
MUTEX_UNLOCK(env, rep->mtx_repmgr);
if (ret == 0 && db_rep->repmgr_status == running &&
SELECTOR_RUNNING(db_rep)) {
if (eid == db_rep->self_eid && status != SITE_PRESENT)
ret = DB_DELETED;
else if (orig != SITE_PRESENT && status == SITE_PRESENT &&
site->state == SITE_IDLE) {
ret = __repmgr_schedule_connection_attempt(env,
eid, TRUE);
if (eid != db_rep->self_eid)
DB_EVENT(env, DB_EVENT_REP_SITE_ADDED, &eid);
} else if (orig != 0 && status == 0)
DB_EVENT(env, DB_EVENT_REP_SITE_REMOVED, &eid);
}
return (ret);
}
int
__repmgr_bcast_parm_refresh(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
__repmgr_parm_refresh_args parms;
u_int8_t buf[__REPMGR_PARM_REFRESH_SIZE];
int ret;
DB_ASSERT(env, REP_ON(env));
db_rep = env->rep_handle;
rep = db_rep->region;
LOCK_MUTEX(db_rep->mutex);
parms.ack_policy = (u_int32_t)rep->perm_policy;
if (rep->priority == 0)
parms.flags = 0;
else
parms.flags = SITE_ELECTABLE;
__repmgr_parm_refresh_marshal(env, &parms, buf);
ret = __repmgr_bcast_own_msg(env,
REPMGR_PARM_REFRESH, buf, __REPMGR_PARM_REFRESH_SIZE);
UNLOCK_MUTEX(db_rep->mutex);
return (ret);
}
int
__repmgr_chg_prio(env, prev, cur)
ENV *env;
u_int32_t prev, cur;
{
if ((prev == 0 && cur != 0) ||
(prev != 0 && cur == 0))
return (__repmgr_bcast_parm_refresh(env));
return (0);
}
int
__repmgr_bcast_own_msg(env, type, buf, len)
ENV *env;
u_int32_t type;
u_int8_t *buf;
size_t len;
{
DB_REP *db_rep;
REPMGR_CONNECTION *conn;
REPMGR_SITE *site;
int ret;
u_int i;
db_rep = env->rep_handle;
if (!SELECTOR_RUNNING(db_rep))
return (0);
FOR_EACH_REMOTE_SITE_INDEX(i) {
site = SITE_FROM_EID(i);
if (site->state != SITE_CONNECTED)
continue;
if ((conn = site->ref.conn.in) != NULL &&
conn->state == CONN_READY &&
(ret = __repmgr_send_own_msg(env,
conn, type, buf, (u_int32_t)len)) != 0 &&
(ret = __repmgr_bust_connection(env, conn)) != 0)
return (ret);
if ((conn = site->ref.conn.out) != NULL &&
conn->state == CONN_READY &&
(ret = __repmgr_send_own_msg(env,
conn, type, buf, (u_int32_t)len)) != 0 &&
(ret = __repmgr_bust_connection(env, conn)) != 0)
return (ret);
}
return (0);
}