#include "db_config.h"
#include "db_int.h"
static db_timeout_t __repmgr_compute_response_time __P((ENV *));
static int __repmgr_elect __P((ENV *, u_int32_t, db_timespec *));
static int __repmgr_elect_main __P((ENV *, REPMGR_RUNNABLE *));
static void *__repmgr_elect_thread __P((void *));
static int send_membership __P((ENV *));
int
__repmgr_init_election(env, flags)
ENV *env;
u_int32_t flags;
{
DB_REP *db_rep;
REPMGR_RUNNABLE *th;
int ret;
u_int i, new_size;
COMPQUIET(th, NULL);
db_rep = env->rep_handle;
if (db_rep->repmgr_status == stopped) {
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"ignoring elect thread request %#lx; repmgr is stopped",
(u_long)flags));
return (0);
}
for (i = 0; i < db_rep->aelect_threads; i++) {
th = db_rep->elect_threads[i];
if (th == NULL)
break;
if (th->finished) {
if ((ret = __repmgr_thread_join(th)) != 0)
return (ret);
break;
}
}
if (i == db_rep->aelect_threads) {
new_size = db_rep->aelect_threads + 1;
if ((ret = __os_realloc(env,
sizeof(REPMGR_RUNNABLE*) * new_size,
&db_rep->elect_threads)) != 0)
return (ret);
db_rep->aelect_threads = new_size;
STAT(db_rep->region->mstat.st_max_elect_threads = new_size);
th = db_rep->elect_threads[i] = NULL;
}
if (th == NULL &&
(ret = __os_malloc(env, sizeof(REPMGR_RUNNABLE), &th)) != 0)
return (ret);
th->run = __repmgr_elect_thread;
th->args.flags = flags;
if ((ret = __repmgr_thread_start(env, th)) == 0)
STAT(db_rep->region->mstat.st_elect_threads++);
else {
__os_free(env, th);
th = NULL;
}
db_rep->elect_threads[i] = th;
return (ret);
}
static void *
__repmgr_elect_thread(argsp)
void *argsp;
{
REPMGR_RUNNABLE *th;
ENV *env;
int ret;
th = argsp;
env = th->env;
RPRINT(env, (env, DB_VERB_REPMGR_MISC, "starting election thread"));
if ((ret = __repmgr_elect_main(env, th)) != 0) {
__db_err(env, ret, "election thread failed");
(void)__repmgr_thread_failure(env, ret);
}
RPRINT(env, (env, DB_VERB_REPMGR_MISC, "election thread is exiting"));
th->finished = TRUE;
return (NULL);
}
static int
__repmgr_elect_main(env, th)
ENV *env;
REPMGR_RUNNABLE *th;
{
DB_REP *db_rep;
REP *rep;
#ifdef DB_WIN32
DWORD duration;
db_timeout_t t;
#else
struct timespec deadline;
#endif
db_timespec failtime, now, repstart_time, target, wait_til;
db_timeout_t delay_time, response_time, tmp_time;
u_long sec, usec;
u_int32_t flags;
int done_repstart, ret, suppress_election;
enum { ELECTION, REPSTART } action;
COMPQUIET(action, ELECTION);
db_rep = env->rep_handle;
rep = db_rep->region;
flags = th->args.flags;
if (LF_ISSET(ELECT_F_EVENT_NOTIFY))
DB_EVENT(env, DB_EVENT_REP_MASTER_FAILURE, NULL);
if (FLD_ISSET(db_rep->region->config, REP_C_LEASE)) {
if ((ret = __rep_get_timeout(env->dbenv,
DB_REP_LEASE_TIMEOUT, &delay_time)) != 0)
goto out;
if ((ret = __rep_get_timeout(env->dbenv,
DB_REP_ACK_TIMEOUT, &tmp_time)) != 0)
goto out;
if (tmp_time < delay_time)
delay_time = tmp_time;
if ((ret = __rep_get_timeout(env->dbenv,
DB_REP_CONNECTION_RETRY, &tmp_time)) != 0)
goto out;
if (tmp_time < delay_time)
delay_time = tmp_time;
sec = delay_time / US_PER_SEC;
usec = delay_time % US_PER_SEC;
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"Election with leases pause sec %lu, usec %lu", sec, usec));
__os_yield(env, sec, usec);
}
LOCK_MUTEX(db_rep->mutex);
db_rep->preferred_elect_thr = th;
UNLOCK_MUTEX(db_rep->mutex);
if (LF_ISSET(ELECT_F_IMMED)) {
if ((ret = __repmgr_elect(env, flags, &failtime)) ==
DB_REP_UNAVAIL)
done_repstart = FALSE;
else
goto out;
} else {
__os_gettime(env, &failtime, 1);
done_repstart = TRUE;
}
LOCK_MUTEX(db_rep->mutex);
for (;;) {
ret = 0;
if (db_rep->repmgr_status == stopped)
goto unlock;
if (__repmgr_master_is_known(env))
goto unlock;
if (db_rep->preferred_elect_thr != th)
goto unlock;
timespecclear(&wait_til);
__os_gettime(env, &now, 1);
suppress_election = LF_ISSET(ELECT_F_STARTUP) ?
db_rep->init_policy == DB_REP_CLIENT :
!FLD_ISSET(rep->config, REP_C_ELECTIONS);
repstart_time = db_rep->repstart_time;
target = suppress_election ? repstart_time : failtime;
TIMESPEC_ADD_DB_TIMEOUT(&target, rep->election_retry_wait);
if (timespeccmp(&now, &target, >=)) {
action = suppress_election ? REPSTART :
(done_repstart ? ELECTION : REPSTART);
} else if (db_rep->new_connection) {
action = REPSTART;
} else
wait_til = target;
if (!timespecisset(&wait_til)) {
response_time = __repmgr_compute_response_time(env);
target = repstart_time;
TIMESPEC_ADD_DB_TIMEOUT(&target, response_time);
if (timespeccmp(&now, &target, <)) {
wait_til = target;
}
}
if (timespecisset(&wait_til)) {
#ifdef DB_WIN32
timespecsub(&wait_til, &now);
DB_TIMESPEC_TO_TIMEOUT(t, &wait_til, TRUE);
duration = t / US_PER_MS;
if ((ret = SignalObjectAndWait(*db_rep->mutex,
db_rep->check_election, duration, FALSE)) !=
WAIT_OBJECT_0 && ret != WAIT_TIMEOUT)
goto out;
LOCK_MUTEX(db_rep->mutex);
if (ret == WAIT_OBJECT_0 &&
db_rep->preferred_elect_thr == th &&
!ResetEvent(db_rep->check_election)) {
ret = GetLastError();
goto unlock;
}
#else
deadline.tv_sec = wait_til.tv_sec;
deadline.tv_nsec = wait_til.tv_nsec;
if ((ret = pthread_cond_timedwait(
&db_rep->check_election, db_rep->mutex, &deadline))
!= ETIMEDOUT && ret != 0)
goto unlock;
#endif
continue;
}
UNLOCK_MUTEX(db_rep->mutex);
if (action == ELECTION) {
db_rep->new_connection = FALSE;
if ((ret = __repmgr_elect(env, 0, &failtime)) ==
DB_REP_UNAVAIL)
done_repstart = FALSE;
else
goto out;
LOCK_MUTEX(db_rep->mutex);
db_rep->preferred_elect_thr = th;
} else {
DB_ASSERT(env, action == REPSTART);
db_rep->new_connection = FALSE;
if ((ret = __repmgr_repstart(env, DB_REP_CLIENT)) != 0)
goto out;
done_repstart = TRUE;
LOCK_MUTEX(db_rep->mutex);
__os_gettime(env, &db_rep->repstart_time, 1);
}
}
#ifdef HAVE_STATISTICS
out:
LOCK_MUTEX(db_rep->mutex);
unlock:
rep->mstat.st_elect_threads--;
UNLOCK_MUTEX(db_rep->mutex);
#else
unlock:
UNLOCK_MUTEX(db_rep->mutex);
out:
#endif
return (ret);
}
static db_timeout_t
__repmgr_compute_response_time(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
db_timeout_t ato, eto;
db_rep = env->rep_handle;
rep = db_rep->region;
ato = rep->ack_timeout;
eto = rep->elect_timeout;
if (ato > 0 &&
rep->perm_policy != DB_REPMGR_ACKS_NONE &&
rep->priority > 0 &&
ato < eto)
return (ato);
return (eto);
}
static int
__repmgr_elect(env, flags, failtimep)
ENV *env;
u_int32_t flags;
db_timespec *failtimep;
{
DB_REP *db_rep;
REP *rep;
u_int32_t invitation, nsites, nvotes;
int ret, t_ret;
db_rep = env->rep_handle;
nsites = db_rep->region->config_nsites;
DB_ASSERT(env, nsites > 0);
if (nsites == 2 &&
!FLD_ISSET(db_rep->region->config, REP_C_2SITE_STRICT))
nvotes = 1;
else
nvotes = ELECTION_MAJORITY(nsites);
if (LF_ISSET(ELECT_F_INVITEE)) {
rep = db_rep->region;
invitation = rep->nsites;
if (invitation == nsites || invitation == nsites - 1) {
nsites = invitation;
}
}
if (LF_ISSET(ELECT_F_FAST) && nsites > nvotes) {
nsites--;
}
if (IS_USING_LEASES(env))
nsites = 0;
switch (ret = __rep_elect_int(env, nsites, nvotes, 0)) {
case DB_REP_UNAVAIL:
__os_gettime(env, failtimep, 1);
DB_EVENT(env, DB_EVENT_REP_ELECTION_FAILED, NULL);
if ((t_ret = send_membership(env)) != 0)
ret = t_ret;
break;
case 0:
if (db_rep->takeover_pending)
ret = __repmgr_claim_victory(env);
break;
case DB_REP_IGNORE:
ret = 0;
break;
default:
__db_err(env, ret, DB_STR("3629",
"unexpected election failure"));
break;
}
return (ret);
}
static int
send_membership(env)
ENV *env;
{
DB_REP *db_rep;
u_int8_t *buf;
size_t len;
int ret;
db_rep = env->rep_handle;
buf = NULL;
LOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_marshal_member_list(env, &buf, &len)) != 0)
goto out;
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"Broadcast latest membership list"));
ret = __repmgr_bcast_own_msg(env, REPMGR_SHARING, buf, len);
out:
UNLOCK_MUTEX(db_rep->mutex);
if (buf != NULL)
__os_free(env, buf);
return (ret);
}
int
__repmgr_claim_victory(env)
ENV *env;
{
int ret;
env->rep_handle->takeover_pending = FALSE;
if ((ret = __repmgr_become_master(env)) == DB_REP_UNAVAIL) {
ret = 0;
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"Won election but lost race with DUPMASTER client intent"));
}
return (ret);
}
int
__repmgr_turn_on_elections(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
int ret;
db_rep = env->rep_handle;
rep = db_rep->region;
ret = 0;
DB_ASSERT(env, REP_ON(env));
LOCK_MUTEX(db_rep->mutex);
if (db_rep->selector == NULL ||
!FLD_ISSET(rep->config, REP_C_ELECTIONS) ||
__repmgr_master_is_known(env))
goto out;
ret = __repmgr_init_election(env, ELECT_F_IMMED);
out:
UNLOCK_MUTEX(db_rep->mutex);
return (ret);
}