#include "db_config.h"
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/db_am.h"
#include "dbinc/mp.h"
#include "dbinc/txn.h"
#ifdef REP_DIAGNOSTIC
#include "dbinc/db_page.h"
#include "dbinc/fop.h"
#include "dbinc/btree.h"
#include "dbinc/hash.h"
#include "dbinc/qam.h"
#endif
#define TIMESTAMP_CHECK(env, ts, renv) do { \
if (renv->op_timestamp != 0 && \
renv->op_timestamp + DB_REGENV_TIMEOUT < ts) { \
REP_SYSTEM_LOCK(env); \
F_CLR(renv, DB_REGENV_REPLOCKED); \
renv->op_timestamp = 0; \
REP_SYSTEM_UNLOCK(env); \
} \
} while (0)
static int __rep_lockout_int __P((ENV *, REP *, u_int32_t *, u_int32_t,
const char *, u_int32_t));
static int __rep_newmaster_empty __P((ENV *, int));
static int __rep_print_int __P((ENV *, u_int32_t, const char *, va_list));
#ifdef REP_DIAGNOSTIC
static void __rep_print_logmsg __P((ENV *, const DBT *, DB_LSN *));
#endif
static int __rep_show_progress __P((ENV *, const char *, int mins));
int
__rep_bulk_message(env, bulk, repth, lsn, dbt, flags)
ENV *env;
REP_BULK *bulk;
REP_THROTTLE *repth;
DB_LSN *lsn;
const DBT *dbt;
u_int32_t flags;
{
DB_REP *db_rep;
REP *rep;
__rep_bulk_args b_args;
size_t len;
int ret;
u_int32_t recsize, typemore;
u_int8_t *p;
db_rep = env->rep_handle;
rep = db_rep->region;
ret = 0;
recsize = sizeof(len) + dbt->size + sizeof(DB_LSN) + sizeof(dbt->size);
MUTEX_LOCK(env, rep->mtx_clientdb);
if (FLD_ISSET(*(bulk->flagsp), BULK_XMIT)) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (DB_REP_BULKOVF);
}
if (recsize > bulk->len) {
RPRINT(env, (env, DB_VERB_REP_MSGS,
"bulk_msg: Record %d (0x%x) larger than entire buffer 0x%x",
recsize, recsize, bulk->len));
STAT(rep->stat.st_bulk_overflows++);
(void)__rep_send_bulk(env, bulk, flags);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (DB_REP_BULKOVF);
}
while (recsize + *(bulk->offp) > bulk->len) {
RPRINT(env, (env, DB_VERB_REP_MSGS,
"bulk_msg: Record %lu (%#lx) doesn't fit. Send %lu (%#lx) now.",
(u_long)recsize, (u_long)recsize,
(u_long)bulk->len, (u_long)bulk->len));
STAT(rep->stat.st_bulk_fills++);
if ((ret = __rep_send_bulk(env, bulk, flags)) != 0) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (ret);
}
}
if (bulk->type == REP_BULK_LOG)
typemore = REP_LOG_MORE;
else
typemore = REP_PAGE_MORE;
if (repth != NULL) {
if ((ret = __rep_send_throttle(env,
bulk->eid, repth, REP_THROTTLE_ONLY, flags)) != 0) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (ret);
}
if (repth->type == typemore) {
VPRINT(env, (env, DB_VERB_REP_MSGS,
"bulk_msg: Record %lu (0x%lx) hit throttle limit.",
(u_long)recsize, (u_long)recsize));
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (ret);
}
}
p = bulk->addr + *(bulk->offp);
b_args.len = dbt->size;
b_args.lsn = *lsn;
b_args.bulkdata = *dbt;
if (*(bulk->offp) == 0)
bulk->lsn = *lsn;
if (rep->version < DB_REPVERSION_47) {
len = 0;
memcpy(p, &dbt->size, sizeof(dbt->size));
p += sizeof(dbt->size);
memcpy(p, lsn, sizeof(DB_LSN));
p += sizeof(DB_LSN);
memcpy(p, dbt->data, dbt->size);
p += dbt->size;
} else if ((ret = __rep_bulk_marshal(env, &b_args, p,
bulk->len, &len)) != 0)
goto err;
*(bulk->offp) = (roff_t)(p + len - bulk->addr);
STAT(rep->stat.st_bulk_records++);
if (LF_ISSET(REPCTL_PERM)) {
VPRINT(env, (env, DB_VERB_REP_MSGS,
"bulk_msg: Send buffer after copy due to PERM"));
ret = __rep_send_bulk(env, bulk, flags);
}
err:
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (ret);
}
int
__rep_send_bulk(env, bulkp, ctlflags)
ENV *env;
REP_BULK *bulkp;
u_int32_t ctlflags;
{
DBT dbt;
DB_REP *db_rep;
REP *rep;
int ret;
if (*(bulkp->offp) == 0)
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
FLD_SET(*(bulkp->flagsp), BULK_XMIT);
DB_INIT_DBT(dbt, bulkp->addr, *(bulkp->offp));
MUTEX_UNLOCK(env, rep->mtx_clientdb);
VPRINT(env, (env, DB_VERB_REP_MSGS,
"send_bulk: Send %d (0x%x) bulk buffer bytes", dbt.size, dbt.size));
STAT(rep->stat.st_bulk_transfers++);
if ((ret = __rep_send_message(env,
bulkp->eid, bulkp->type, &bulkp->lsn, &dbt, ctlflags, 0)) != 0)
ret = DB_REP_UNAVAIL;
MUTEX_LOCK(env, rep->mtx_clientdb);
*(bulkp->offp) = 0;
FLD_CLR(*(bulkp->flagsp), BULK_XMIT);
return (ret);
}
int
__rep_bulk_alloc(env, bulkp, eid, offp, flagsp, type)
ENV *env;
REP_BULK *bulkp;
int eid;
uintptr_t *offp;
u_int32_t *flagsp, type;
{
int ret;
memset(bulkp, 0, sizeof(REP_BULK));
*offp = *flagsp = 0;
bulkp->len = MEGABYTE;
if ((ret = __os_malloc(env, bulkp->len, &bulkp->addr)) != 0)
return (ret);
bulkp->offp = (roff_t *)offp;
bulkp->type = type;
bulkp->eid = eid;
bulkp->flagsp = flagsp;
return (ret);
}
int
__rep_bulk_free(env, bulkp, flags)
ENV *env;
REP_BULK *bulkp;
u_int32_t flags;
{
DB_REP *db_rep;
int ret;
db_rep = env->rep_handle;
MUTEX_LOCK(env, db_rep->region->mtx_clientdb);
ret = __rep_send_bulk(env, bulkp, flags);
MUTEX_UNLOCK(env, db_rep->region->mtx_clientdb);
__os_free(env, bulkp->addr);
return (ret);
}
int
__rep_send_message(env, eid, rtype, lsnp, dbt, ctlflags, repflags)
ENV *env;
int eid;
u_int32_t rtype;
DB_LSN *lsnp;
const DBT *dbt;
u_int32_t ctlflags, repflags;
{
DBT cdbt, scrap_dbt;
DB_ENV *dbenv;
DB_LOG *dblp;
DB_REP *db_rep;
LOG *lp;
REP *rep;
REP_46_CONTROL cntrl46;
REP_OLD_CONTROL ocntrl;
__rep_control_args cntrl;
db_timespec msg_time;
int ret;
u_int32_t myflags;
u_int8_t buf[__REP_CONTROL_SIZE];
size_t len;
dbenv = env->dbenv;
db_rep = env->rep_handle;
rep = db_rep->region;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
ret = 0;
#if defined(DEBUG_ROP) || defined(DEBUG_WOP)
if (db_rep->send == NULL)
return (0);
#endif
memset(&cntrl, 0, sizeof(cntrl));
memset(&ocntrl, 0, sizeof(ocntrl));
memset(&cntrl46, 0, sizeof(cntrl46));
if (lsnp == NULL)
ZERO_LSN(cntrl.lsn);
else
cntrl.lsn = *lsnp;
if (rep->version == DB_REPVERSION)
cntrl.rectype = rtype;
else if (rep->version < DB_REPVERSION) {
cntrl.rectype = __rep_msg_to_old(rep->version, rtype);
VPRINT(env, (env, DB_VERB_REP_MSGS,
"rep_send_msg: rtype %lu to version %lu record %lu.",
(u_long)rtype, (u_long)rep->version,
(u_long)cntrl.rectype));
if (cntrl.rectype == REP_INVALID)
return (ret);
} else {
__db_errx(env, DB_STR_A("3503",
"rep_send_message: Unknown rep version %lu, my version %lu",
"%lu %lu"), (u_long)rep->version, (u_long)DB_REPVERSION);
return (__env_panic(env, EINVAL));
}
cntrl.flags = ctlflags;
cntrl.rep_version = rep->version;
cntrl.log_version = lp->persist.version;
cntrl.gen = rep->gen;
if (dbt == NULL) {
memset(&scrap_dbt, 0, sizeof(DBT));
dbt = &scrap_dbt;
}
myflags = repflags;
if (FLD_ISSET(ctlflags, REPCTL_PERM)) {
if (!F_ISSET(rep, REP_F_SYS_DB_OP))
myflags |= DB_REP_PERMANENT;
} else if (rtype != REP_LOG || FLD_ISSET(ctlflags, REPCTL_RESEND))
myflags |= DB_REP_NOBUFFER;
if (F_ISSET(rep, REP_F_GROUP_ESTD))
F_SET(&cntrl, REPCTL_GROUP_ESTD);
if (IS_REP_MASTER(env) && IS_USING_LEASES(env) &&
FLD_ISSET(ctlflags, REPCTL_LEASE | REPCTL_PERM)) {
F_SET(&cntrl, REPCTL_LEASE);
DB_ASSERT(env, rep->version == DB_REPVERSION);
__os_gettime(env, &msg_time, 1);
cntrl.msg_sec = (u_int32_t)msg_time.tv_sec;
cntrl.msg_nsec = (u_int32_t)msg_time.tv_nsec;
}
REP_PRINT_MESSAGE(env, eid, &cntrl, "rep_send_message", myflags);
#ifdef REP_DIAGNOSTIC
if (FLD_ISSET(
env->dbenv->verbose, DB_VERB_REP_MSGS) && rtype == REP_LOG)
__rep_print_logmsg(env, dbt, lsnp);
#endif
DB_ASSERT(env, !FLD_ISSET(myflags, DB_REP_PERMANENT) ||
!IS_ZERO_LSN(cntrl.lsn));
memset(&cdbt, 0, sizeof(cdbt));
if (rep->version <= DB_REPVERSION_45) {
if (rep->version == DB_REPVERSION_45 &&
F_ISSET(&cntrl, REPCTL_INIT)) {
F_CLR(&cntrl, REPCTL_INIT);
F_SET(&cntrl, REPCTL_INIT_45);
}
ocntrl.rep_version = cntrl.rep_version;
ocntrl.log_version = cntrl.log_version;
ocntrl.lsn = cntrl.lsn;
ocntrl.rectype = cntrl.rectype;
ocntrl.gen = cntrl.gen;
ocntrl.flags = cntrl.flags;
cdbt.data = &ocntrl;
cdbt.size = sizeof(ocntrl);
} else if (rep->version == DB_REPVERSION_46) {
cntrl46.rep_version = cntrl.rep_version;
cntrl46.log_version = cntrl.log_version;
cntrl46.lsn = cntrl.lsn;
cntrl46.rectype = cntrl.rectype;
cntrl46.gen = cntrl.gen;
cntrl46.msg_time.tv_sec = (time_t)cntrl.msg_sec;
cntrl46.msg_time.tv_nsec = (long)cntrl.msg_nsec;
cntrl46.flags = cntrl.flags;
cdbt.data = &cntrl46;
cdbt.size = sizeof(cntrl46);
} else {
(void)__rep_control_marshal(env, &cntrl, buf,
__REP_CONTROL_SIZE, &len);
DB_INIT_DBT(cdbt, buf, len);
}
ret = db_rep->send(dbenv, &cdbt, dbt, &cntrl.lsn, eid, myflags);
if (ret != 0) {
RPRINT(env, (env, DB_VERB_REP_MSGS,
"rep_send_function returned: %d", ret));
#ifdef HAVE_STATISTICS
rep->stat.st_msgs_send_failures++;
} else
rep->stat.st_msgs_sent++;
#else
}
#endif
return (ret);
}
#ifdef REP_DIAGNOSTIC
static void
__rep_print_logmsg(env, logdbt, lsnp)
ENV *env;
const DBT *logdbt;
DB_LSN *lsnp;
{
static int first = 1;
static DB_DISTAB dtab;
if (first) {
first = 0;
(void)__bam_init_print(env, &dtab);
(void)__crdel_init_print(env, &dtab);
(void)__db_init_print(env, &dtab);
(void)__dbreg_init_print(env, &dtab);
(void)__fop_init_print(env, &dtab);
(void)__ham_init_print(env, &dtab);
(void)__qam_init_print(env, &dtab);
(void)__repmgr_init_print(env, &dtab);
(void)__txn_init_print(env, &dtab);
}
(void)__db_dispatch(
env, &dtab, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL);
}
#endif
int
__rep_new_master(env, cntrl, eid)
ENV *env;
__rep_control_args *cntrl;
int eid;
{
DBT dbt;
DB_LOG *dblp;
DB_LOGC *logc;
DB_LSN first_lsn, lsn;
DB_REP *db_rep;
DB_THREAD_INFO *ip;
LOG *lp;
REGENV *renv;
REGINFO *infop;
REP *rep;
db_timeout_t lease_to;
u_int32_t unused, vers;
int change, do_req, lockout_msg, ret, t_ret;
db_rep = env->rep_handle;
rep = db_rep->region;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
ret = 0;
logc = NULL;
lockout_msg = 0;
REP_SYSTEM_LOCK(env);
change = rep->gen != cntrl->gen || rep->master_id != eid;
FLD_CLR(rep->elect_flags, REP_E_PHASE0);
if (change) {
if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG))
goto lckout;
if ((ret = __rep_lockout_msg(env, rep, 1)) != 0)
goto errlck;
(void)__memp_set_config(env->dbenv, DB_MEMP_SYNC_INTERRUPT, 1);
lockout_msg = 1;
if (IS_USING_LEASES(env) &&
((lease_to = __rep_lease_waittime(env)) != 0)) {
REP_SYSTEM_UNLOCK(env);
__os_yield(env, 0, (u_long)lease_to);
REP_SYSTEM_LOCK(env);
F_SET(rep, REP_F_LEASE_EXPIRED);
}
vers = lp->persist.version;
if (cntrl->log_version != vers) {
DB_ASSERT(env, vers != 0);
if (cntrl->log_version < vers)
vers = cntrl->log_version;
RPRINT(env, (env, DB_VERB_REP_MISC,
"newmaster: Setting log version to %d",vers));
__log_set_version(env, vers);
if ((ret = __env_init_rec(env, vers)) != 0)
goto errlck;
}
REP_SYSTEM_UNLOCK(env);
MUTEX_LOCK(env, rep->mtx_clientdb);
__os_gettime(env, &lp->rcvd_ts, 1);
lp->wait_ts = rep->request_gap;
ZERO_LSN(lp->verify_lsn);
ZERO_LSN(lp->prev_ckp);
ZERO_LSN(lp->waiting_lsn);
ZERO_LSN(lp->max_wait_lsn);
if (db_rep->rep_db == NULL &&
(ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
goto err;
}
REP_SYSTEM_LOCK(env);
if (ISSET_LOCKOUT_BDB(rep)) {
ret = __rep_init_cleanup(env, rep, DB_FORCE);
F_CLR(rep, REP_F_ABBREVIATED);
CLR_RECOVERY_SETTINGS(rep);
}
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (ret != 0) {
goto errlck;
}
ENV_GET_THREAD_INFO(env, ip);
if ((ret = __db_truncate(db_rep->rep_db, ip, NULL, &unused))
!= 0)
goto errlck;
STAT(rep->stat.st_log_queued = 0);
__rep_elect_done(env, rep);
RPRINT(env, (env, DB_VERB_REP_MISC,
"Updating gen from %lu to %lu from master %d",
(u_long)rep->gen, (u_long)cntrl->gen, eid));
SET_GEN(cntrl->gen);
rep->mgen = cntrl->gen;
if ((ret = __rep_notify_threads(env, AWAIT_GEN)) != 0)
goto errlck;
(void)__rep_write_gen(env, rep, rep->gen);
if (rep->egen <= rep->gen)
rep->egen = rep->gen + 1;
rep->master_id = eid;
STAT(rep->stat.st_master_changes++);
rep->stat.st_startup_complete = 0;
rep->version = cntrl->rep_version;
RPRINT(env, (env, DB_VERB_REP_MISC,
"egen: %lu. rep version %lu",
(u_long)rep->egen, (u_long)rep->version));
if (FLD_ISSET(rep->config, REP_C_DELAYCLIENT))
F_SET(rep, REP_F_DELAY);
if ((ret = __rep_lockout_archive(env, rep)) != 0)
goto errlck;
rep->sync_state = SYNC_VERIFY;
FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
(void)__memp_set_config(env->dbenv, DB_MEMP_SYNC_INTERRUPT, 0);
lockout_msg = 0;
} else
__rep_elect_done(env, rep);
REP_SYSTEM_UNLOCK(env);
MUTEX_LOCK(env, rep->mtx_clientdb);
lsn = lp->ready_lsn;
if (!change) {
ret = 0;
do_req = __rep_check_doreq(env, rep);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (do_req &&
(rep->sync_state != SYNC_OFF ||
LOG_COMPARE(&lsn, &cntrl->lsn) < 0)) {
ret = __rep_resend_req(env, 0);
if (ret != 0)
RPRINT(env, (env, DB_VERB_REP_MISC,
"resend_req ret is %lu", (u_long)ret));
}
if (rep->sync_state == SYNC_OFF) {
REP_SYSTEM_LOCK(env);
FLD_CLR(rep->lockout_flags, REP_LOCKOUT_ARCHIVE);
REP_SYSTEM_UNLOCK(env);
}
return (ret);
}
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) {
if ((ret = __rep_newmaster_empty(env, eid)) != 0)
goto err;
goto newmaster_complete;
}
memset(&dbt, 0, sizeof(dbt));
if (cntrl->lsn.file < lsn.file) {
if ((ret = __log_cursor(env, &logc)) != 0)
goto err;
ret = __logc_get(logc, &first_lsn, &dbt, DB_FIRST);
if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
if (ret == DB_NOTFOUND)
goto notfound;
else if (ret != 0)
goto err;
if (cntrl->lsn.file < first_lsn.file)
goto notfound;
}
if ((ret = __log_cursor(env, &logc)) != 0)
goto err;
ret = __rep_log_backup(env, logc, &lsn, REP_REC_PERM);
if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
if (ret == DB_NOTFOUND)
goto notfound;
else if (ret != 0)
goto err;
MUTEX_LOCK(env, rep->mtx_clientdb);
lp->verify_lsn = lsn;
__os_gettime(env, &lp->rcvd_ts, 1);
lp->wait_ts = rep->request_gap;
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (!F_ISSET(rep, REP_F_DELAY))
(void)__rep_send_message(env,
eid, REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE);
goto newmaster_complete;
err:
REP_SYSTEM_LOCK(env);
errlck: if (lockout_msg) {
FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
(void)__memp_set_config(env->dbenv, DB_MEMP_SYNC_INTERRUPT, 0);
}
F_CLR(rep, REP_F_DELAY);
CLR_RECOVERY_SETTINGS(rep);
lckout: REP_SYSTEM_UNLOCK(env);
return (ret);
notfound:
RPRINT(env, (env, DB_VERB_REP_MISC,
"No commit or ckp found. Truncate log."));
if (lp->db_log_inmemory) {
ZERO_LSN(lsn);
ret = __log_zero(env, &lsn);
} else {
INIT_LSN(lsn);
ret = __log_vtruncate(env, &lsn, &lsn, NULL);
}
if (ret != 0 && ret != DB_NOTFOUND)
return (ret);
infop = env->reginfo;
renv = infop->primary;
REP_SYSTEM_LOCK(env);
(void)time(&renv->rep_timestamp);
REP_SYSTEM_UNLOCK(env);
if ((ret = __rep_newmaster_empty(env, eid)) != 0)
goto err;
newmaster_complete:
return (DB_REP_NEWMASTER);
}
static int
__rep_newmaster_empty(env, eid)
ENV *env;
int eid;
{
DB_REP *db_rep;
LOG *lp;
REP *rep;
int msg, ret;
db_rep = env->rep_handle;
rep = db_rep->region;
lp = env->lg_handle->reginfo.primary;
msg = ret = 0;
MUTEX_LOCK(env, rep->mtx_clientdb);
REP_SYSTEM_LOCK(env);
lp->wait_ts = rep->request_gap;
rep->sync_state = SYNC_UPDATE;
if (F_ISSET(rep, REP_F_DELAY)) {
} else if (!FLD_ISSET(rep->config, REP_C_AUTOINIT)) {
FLD_CLR(rep->lockout_flags, REP_LOCKOUT_ARCHIVE);
CLR_RECOVERY_SETTINGS(rep);
ret = DB_REP_JOIN_FAILURE;
} else {
msg = 1;
}
REP_SYSTEM_UNLOCK(env);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (msg)
(void)__rep_send_message(env, eid, REP_UPDATE_REQ,
NULL, NULL, 0, 0);
return (ret);
}
void
__rep_elect_done(env, rep)
ENV *env;
REP *rep;
{
int inelect;
db_timespec endtime;
inelect = IN_ELECTION(rep);
FLD_CLR(rep->elect_flags, REP_E_PHASE1 | REP_E_PHASE2 | REP_E_TALLY);
rep->sites = 0;
rep->votes = 0;
if (inelect) {
if (timespecisset(&rep->etime)) {
__os_gettime(env, &endtime, 1);
timespecsub(&endtime, &rep->etime);
#ifdef HAVE_STATISTICS
rep->stat.st_election_sec = (u_int32_t)endtime.tv_sec;
rep->stat.st_election_usec = (u_int32_t)
(endtime.tv_nsec / NS_PER_US);
#endif
RPRINT(env, (env, DB_VERB_REP_ELECT,
"Election finished in %lu.%09lu sec",
(u_long)endtime.tv_sec, (u_long)endtime.tv_nsec));
timespecclear(&rep->etime);
}
rep->egen++;
}
RPRINT(env, (env, DB_VERB_REP_ELECT,
"Election done; egen %lu", (u_long)rep->egen));
}
int
__env_rep_enter(env, checklock)
ENV *env;
int checklock;
{
DB_REP *db_rep;
REGENV *renv;
REGINFO *infop;
REP *rep;
int cnt, ret;
time_t timestamp;
if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
infop = env->reginfo;
renv = infop->primary;
if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
(void)time(×tamp);
TIMESTAMP_CHECK(env, timestamp, renv);
if (F_ISSET(renv, DB_REGENV_REPLOCKED))
return (EINVAL);
}
REP_SYSTEM_LOCK(env);
for (cnt = 0; FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_API);) {
REP_SYSTEM_UNLOCK(env);
PANIC_CHECK(env);
if (FLD_ISSET(rep->config, REP_C_NOWAIT)) {
__db_errx(env, DB_STR("3504",
"Operation locked out. Waiting for replication lockout to complete"));
return (DB_REP_LOCKOUT);
}
__os_yield(env, 1, 0);
if (++cnt % 60 == 0 &&
(ret = __rep_show_progress(env,
DB_STR_P("DB_ENV handle"), cnt / 60)) != 0)
return (ret);
REP_SYSTEM_LOCK(env);
}
rep->handle_cnt++;
REP_SYSTEM_UNLOCK(env);
return (0);
}
static int
__rep_show_progress(env, which, mins)
ENV *env;
const char *which;
int mins;
{
DB_LOG *dblp;
LOG *lp;
REP *rep;
DB_LSN ready_lsn;
rep = env->rep_handle->region;
dblp = env->lg_handle;
lp = dblp == NULL ? NULL : dblp->reginfo.primary;
#define WAITING_MSG DB_STR_A("3505", \
"%s waiting %d minutes for replication lockout to complete", "%s %d")
#define WAITING_ARGS WAITING_MSG, which, mins
__db_errx(env, WAITING_ARGS);
RPRINT(env, (env, DB_VERB_REP_SYNC, WAITING_ARGS));
if (lp == NULL)
ZERO_LSN(ready_lsn);
else {
MUTEX_LOCK(env, rep->mtx_clientdb);
ready_lsn = lp->ready_lsn;
MUTEX_UNLOCK(env, rep->mtx_clientdb);
}
REP_SYSTEM_LOCK(env);
switch (rep->sync_state) {
case SYNC_PAGE:
#define PAGE_MSG DB_STR_A("3506", \
"SYNC_PAGE: files %lu/%lu; pages %lu (%lu next)", "%lu %lu %lu %lu")
#define PAGE_ARGS (u_long)rep->curfile, (u_long)rep->nfiles, \
(u_long)rep->npages, (u_long)rep->ready_pg
__db_errx(env, PAGE_MSG, PAGE_ARGS);
RPRINT(env, (env, DB_VERB_REP_SYNC, PAGE_MSG, PAGE_ARGS));
break;
case SYNC_LOG:
#define LSN_ARG(lsn) (u_long)(lsn).file, (u_long)(lsn).offset
#define LOG_LSN_ARGS LSN_ARG(ready_lsn), \
LSN_ARG(rep->first_lsn), LSN_ARG(rep->last_lsn)
#ifdef HAVE_STATISTICS
#define LOG_MSG DB_STR_A("3507", \
"SYNC_LOG: thru [%lu][%lu] from [%lu][%lu]/[%lu][%lu] (%lu queued)",\
"%lu %lu %lu %lu %lu %lu %lu")
#define LOG_ARGS LOG_LSN_ARGS, (u_long)rep->stat.st_log_queued
#else
#define LOG_MSG DB_STR_A("3508", \
"SYNC_LOG: thru [%lu][%lu] from [%lu][%lu]/[%lu][%lu]", \
"%lu %lu %lu %lu %lu %lu")
#define LOG_ARGS LOG_LSN_ARGS
#endif
__db_errx(env, LOG_MSG, LOG_ARGS);
RPRINT(env, (env, DB_VERB_REP_SYNC, LOG_MSG, LOG_ARGS));
break;
default:
RPRINT(env, (env, DB_VERB_REP_SYNC,
"sync state %d", (int)rep->sync_state));
break;
}
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__env_db_rep_exit(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
REP_SYSTEM_LOCK(env);
rep->handle_cnt--;
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__db_rep_enter(dbp, checkgen, checklock, return_now)
DB *dbp;
int checkgen, checklock, return_now;
{
DB_REP *db_rep;
ENV *env;
REGENV *renv;
REGINFO *infop;
REP *rep;
time_t timestamp;
env = dbp->env;
if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
infop = env->reginfo;
renv = infop->primary;
if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
(void)time(×tamp);
TIMESTAMP_CHECK(env, timestamp, renv);
if (F_ISSET(renv, DB_REGENV_REPLOCKED))
return (EINVAL);
}
if (checkgen && dbp->mpf->mfp && IS_REP_CLIENT(env)) {
if (dbp->mpf->mfp->excl_lockout)
return (DB_REP_HANDLE_DEAD);
}
REP_SYSTEM_LOCK(env);
if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_OP)) {
REP_SYSTEM_UNLOCK(env);
if (!return_now)
__os_yield(env, 5, 0);
return (DB_LOCK_DEADLOCK);
}
if (checkgen && dbp->timestamp != renv->rep_timestamp) {
REP_SYSTEM_UNLOCK(env);
return (DB_REP_HANDLE_DEAD);
}
rep->handle_cnt++;
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__op_handle_enter(env)
ENV *env;
{
REP *rep;
int ret;
rep = env->rep_handle->region;
REP_SYSTEM_LOCK(env);
if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_OP))
ret = DB_LOCK_DEADLOCK;
else {
rep->handle_cnt++;
ret = 0;
}
REP_SYSTEM_UNLOCK(env);
return (ret);
}
int
__op_rep_enter(env, local_nowait, obey_user)
ENV *env;
int local_nowait, obey_user;
{
DB_REP *db_rep;
REP *rep;
int cnt, ret;
if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
REP_SYSTEM_LOCK(env);
for (cnt = 0; FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_OP);) {
REP_SYSTEM_UNLOCK(env);
PANIC_CHECK(env);
if (local_nowait)
return (DB_REP_LOCKOUT);
if (FLD_ISSET(rep->config, REP_C_NOWAIT) && obey_user) {
__db_errx(env, DB_STR("3509",
"Operation locked out. Waiting for replication lockout to complete"));
return (DB_REP_LOCKOUT);
}
__os_yield(env, 5, 0);
cnt += 5;
if (++cnt % 60 == 0 &&
(ret = __rep_show_progress(env,
"__op_rep_enter", cnt / 60)) != 0)
return (ret);
REP_SYSTEM_LOCK(env);
}
rep->op_cnt++;
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__op_rep_exit(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
REP_SYSTEM_LOCK(env);
DB_ASSERT(env, rep->op_cnt > 0);
rep->op_cnt--;
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__archive_rep_enter(env)
ENV *env;
{
DB_REP *db_rep;
REGENV *renv;
REGINFO *infop;
REP *rep;
time_t timestamp;
int ret;
ret = 0;
infop = env->reginfo;
renv = infop->primary;
if (F_ISSET(renv, DB_REGENV_REPLOCKED)) {
(void)time(×tamp);
TIMESTAMP_CHECK(env, timestamp, renv);
if (F_ISSET(renv, DB_REGENV_REPLOCKED))
return (DB_REP_LOCKOUT);
}
if (!REP_ON(env))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
REP_SYSTEM_LOCK(env);
if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_ARCHIVE))
ret = DB_REP_LOCKOUT;
else
rep->arch_th++;
REP_SYSTEM_UNLOCK(env);
return (ret);
}
int
__archive_rep_exit(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
if (!REP_ON(env))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
REP_SYSTEM_LOCK(env);
rep->arch_th--;
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__rep_lockout_archive(env, rep)
ENV *env;
REP *rep;
{
return (__rep_lockout_int(env, rep, &rep->arch_th, 0,
"arch_th", REP_LOCKOUT_ARCHIVE));
}
int
__rep_lockout_api(env, rep)
ENV *env;
REP *rep;
{
int ret;
if ((ret = __rep_lockout_int(env, rep, &rep->op_cnt, 0,
"op_cnt", REP_LOCKOUT_OP)) != 0)
return (ret);
if ((ret = __rep_lockout_int(env, rep, &rep->handle_cnt, 0,
"handle_cnt", REP_LOCKOUT_API)) != 0)
FLD_CLR(rep->lockout_flags, REP_LOCKOUT_OP);
return (ret);
}
int
__rep_take_apilockout(env)
ENV *env;
{
REP *rep;
int ret;
rep = env->rep_handle->region;
REP_SYSTEM_LOCK(env);
ret = __rep_lockout_api(env, rep);
REP_SYSTEM_UNLOCK(env);
return (ret);
}
int
__rep_clear_apilockout(env)
ENV *env;
{
REP *rep;
rep = env->rep_handle->region;
REP_SYSTEM_LOCK(env);
CLR_LOCKOUT_BDB(rep);
REP_SYSTEM_UNLOCK(env);
return (0);
}
int
__rep_lockout_apply(env, rep, apply_th)
ENV *env;
REP *rep;
u_int32_t apply_th;
{
return (__rep_lockout_int(env, rep, &rep->apply_th, apply_th,
"apply_th", REP_LOCKOUT_APPLY));
}
int
__rep_lockout_msg(env, rep, msg_th)
ENV *env;
REP *rep;
u_int32_t msg_th;
{
return (__rep_lockout_int(env, rep, &rep->msg_th, msg_th,
"msg_th", REP_LOCKOUT_MSG));
}
static int
__rep_lockout_int(env, rep, fieldp, field_val, msg, lockout_flag)
ENV *env;
REP *rep;
u_int32_t *fieldp;
const char *msg;
u_int32_t field_val, lockout_flag;
{
int ret, wait_cnt;
FLD_SET(rep->lockout_flags, lockout_flag);
for (wait_cnt = 0; *fieldp > field_val;) {
if ((ret = __rep_notify_threads(env, LOCKOUT)) != 0)
return (ret);
REP_SYSTEM_UNLOCK(env);
PANIC_CHECK(env);
__os_yield(env, 1, 0);
#ifdef DIAGNOSTIC
if (wait_cnt == 5) {
RPRINT(env, (env, DB_VERB_REP_MISC,
"Waiting for %s (%lu) to complete lockout to %lu",
msg, (u_long)*fieldp, (u_long)field_val));
__db_errx(env, DB_STR_A("3510",
"Waiting for %s (%lu) to complete replication lockout",
"%s %lu"), msg, (u_long)*fieldp);
}
if (++wait_cnt % 60 == 0)
__db_errx(env, DB_STR_A("3511",
"Waiting for %s (%lu) to complete replication lockout for %d minutes",
"%s %lu %d"), msg, (u_long)*fieldp, wait_cnt / 60);
#endif
REP_SYSTEM_LOCK(env);
}
COMPQUIET(msg, NULL);
return (0);
}
int
__rep_send_throttle(env, eid, repth, flags, ctlflags)
ENV *env;
int eid;
REP_THROTTLE *repth;
u_int32_t ctlflags, flags;
{
DB_REP *db_rep;
REP *rep;
u_int32_t size, typemore;
int check_limit;
check_limit = repth->gbytes != 0 || repth->bytes != 0;
if (!check_limit && LF_ISSET(REP_THROTTLE_ONLY))
return (0);
db_rep = env->rep_handle;
rep = db_rep->region;
typemore = 0;
if (repth->type == REP_LOG)
typemore = REP_LOG_MORE;
if (repth->type == REP_PAGE)
typemore = REP_PAGE_MORE;
DB_ASSERT(env, typemore != 0);
size = repth->data_dbt->size + sizeof(__rep_control_args);
if (check_limit) {
while (repth->bytes <= size) {
if (repth->gbytes > 0) {
repth->bytes += GIGABYTE;
--(repth->gbytes);
continue;
}
STAT(rep->stat.st_nthrottles++);
repth->type = typemore;
goto snd;
}
repth->bytes -= size;
}
snd: if ((repth->type == typemore || !LF_ISSET(REP_THROTTLE_ONLY)) &&
(__rep_send_message(env, eid, repth->type,
&repth->lsn, repth->data_dbt, (REPCTL_RESEND | ctlflags), 0) != 0))
return (DB_REP_UNAVAIL);
return (0);
}
u_int32_t
__rep_msg_to_old(version, rectype)
u_int32_t version, rectype;
{
static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = {
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
REP_INVALID,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
REP_INVALID,
23,
24,
25,
26,
27,
28,
29
},
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
24,
25,
26,
27,
28,
29,
30,
31
},
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
24,
25,
26,
27,
28,
29,
30,
31
},
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
24,
25,
26,
27,
28,
29,
30,
31
}
};
return (table[version][rectype]);
}
u_int32_t
__rep_msg_from_old(version, rectype)
u_int32_t version, rectype;
{
static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = {
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID,
REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID },
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
25,
26,
27,
28,
29,
30,
31,
REP_INVALID,
REP_INVALID
},
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
24,
25,
26,
27,
28,
29,
30,
31
},
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
24,
25,
26,
27,
28,
29,
30,
31
},
{ REP_INVALID,
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20,
21,
22,
23,
24,
25,
26,
27,
28,
29,
30,
31
}
};
return (table[version][rectype]);
}
int
#ifdef STDC_HEADERS
__rep_print_system(ENV *env, u_int32_t verbose, const char *fmt, ...)
#else
__rep_print_system(env, verbose, fmt, va_alist)
ENV *env;
u_int32_t verbose;
const char *fmt;
va_dcl
#endif
{
va_list ap;
int ret;
#ifdef STDC_HEADERS
va_start(ap, fmt);
#else
va_start(ap);
#endif
ret = __rep_print_int(env, verbose | DB_VERB_REP_SYSTEM, fmt, ap);
va_end(ap);
return (ret);
}
int
#ifdef STDC_HEADERS
__rep_print(ENV *env, u_int32_t verbose, const char *fmt, ...)
#else
__rep_print(env, verbose, fmt, va_alist)
ENV *env;
u_int32_t verbose;
const char *fmt;
va_dcl
#endif
{
va_list ap;
int ret;
#ifdef STDC_HEADERS
va_start(ap, fmt);
#else
va_start(ap);
#endif
ret = __rep_print_int(env, verbose, fmt, ap);
va_end(ap);
return (ret);
}
static int
__rep_print_int(env, verbose, fmt, ap)
ENV *env;
u_int32_t verbose;
const char *fmt;
va_list ap;
{
DB_MSGBUF mb;
REP *rep;
db_timespec ts;
pid_t pid;
db_threadid_t tid;
int diag_msg;
u_int32_t regular_msg, tmp_verbose;
const char *s;
char buf[DB_THREADID_STRLEN];
tmp_verbose = env->dbenv->verbose;
if (FLD_ISSET(tmp_verbose, verbose | DB_VERB_REPLICATION) == 0)
return (0);
DB_MSGBUF_INIT(&mb);
diag_msg = 0;
if (REP_ON(env)) {
rep = env->rep_handle->region;
diag_msg = FLD_ISSET(tmp_verbose, DB_VERB_REP_SYSTEM) &&
FLD_ISSET(verbose, DB_VERB_REP_SYSTEM) &&
!FLD_ISSET(rep->config, REP_C_INMEM);
} else
rep = NULL;
FLD_CLR(tmp_verbose, DB_VERB_REP_SYSTEM);
regular_msg = FLD_ISSET(tmp_verbose,
verbose | DB_VERB_REPLICATION);
if (diag_msg == 0 && regular_msg == 0)
return (0);
s = NULL;
if (env->dbenv->db_errpfx != NULL)
s = env->dbenv->db_errpfx;
else if (rep != NULL) {
if (F_ISSET(rep, REP_F_CLIENT))
s = "CLIENT";
else if (F_ISSET(rep, REP_F_MASTER))
s = "MASTER";
}
if (s == NULL)
s = "REP_UNDEF";
__os_id(env->dbenv, &pid, &tid);
if (diag_msg)
MUTEX_LOCK(env, rep->mtx_diag);
__os_gettime(env, &ts, 1);
__db_msgadd(env, &mb, "[%lu:%lu][%s] %s: ",
(u_long)ts.tv_sec, (u_long)ts.tv_nsec/NS_PER_US,
env->dbenv->thread_id_string(env->dbenv, pid, tid, buf), s);
__db_msgadd_ap(env, &mb, fmt, ap);
DB_MSGBUF_REP_FLUSH(env, &mb, diag_msg, regular_msg);
if (diag_msg)
MUTEX_UNLOCK(env, rep->mtx_diag);
return (0);
}
void
__rep_print_message(env, eid, rp, str, flags)
ENV *env;
int eid;
__rep_control_args *rp;
char *str;
u_int32_t flags;
{
u_int32_t ctlflags, rectype, verbflag;
char ftype[64], *home, *type;
rectype = rp->rectype;
ctlflags = rp->flags;
verbflag = DB_VERB_REP_MSGS | DB_VERB_REPLICATION;
if (rp->rep_version != DB_REPVERSION)
rectype = __rep_msg_from_old(rp->rep_version, rectype);
switch (rectype) {
case REP_ALIVE:
FLD_SET(verbflag, DB_VERB_REP_ELECT | DB_VERB_REP_MISC);
type = "alive";
break;
case REP_ALIVE_REQ:
type = "alive_req";
break;
case REP_ALL_REQ:
FLD_SET(verbflag, DB_VERB_REP_MISC);
type = "all_req";
break;
case REP_BULK_LOG:
FLD_SET(verbflag, DB_VERB_REP_MISC);
type = "bulk_log";
break;
case REP_BULK_PAGE:
FLD_SET(verbflag, DB_VERB_REP_SYNC);
type = "bulk_page";
break;
case REP_DUPMASTER:
FLD_SET(verbflag, DB_VERB_REP_SYSTEM);
type = "dupmaster";
break;
case REP_FILE:
type = "file";
break;
case REP_FILE_FAIL:
type = "file_fail";
break;
case REP_FILE_REQ:
type = "file_req";
break;
case REP_LEASE_GRANT:
FLD_SET(verbflag, DB_VERB_REP_LEASE);
type = "lease_grant";
break;
case REP_LOG:
FLD_SET(verbflag, DB_VERB_REP_MISC);
type = "log";
break;
case REP_LOG_MORE:
FLD_SET(verbflag, DB_VERB_REP_MISC);
type = "log_more";
break;
case REP_LOG_REQ:
FLD_SET(verbflag, DB_VERB_REP_MISC);
type = "log_req";
break;
case REP_MASTER_REQ:
type = "master_req";
break;
case REP_NEWCLIENT:
FLD_SET(verbflag, DB_VERB_REP_MISC | DB_VERB_REP_SYSTEM);
type = "newclient";
break;
case REP_NEWFILE:
FLD_SET(verbflag, DB_VERB_REP_MISC);
type = "newfile";
break;
case REP_NEWMASTER:
FLD_SET(verbflag, DB_VERB_REP_MISC | DB_VERB_REP_SYSTEM);
type = "newmaster";
break;
case REP_NEWSITE:
type = "newsite";
break;
case REP_PAGE:
FLD_SET(verbflag, DB_VERB_REP_SYNC);
type = "page";
break;
case REP_PAGE_FAIL:
FLD_SET(verbflag, DB_VERB_REP_SYNC);
type = "page_fail";
break;
case REP_PAGE_MORE:
FLD_SET(verbflag, DB_VERB_REP_SYNC);
type = "page_more";
break;
case REP_PAGE_REQ:
FLD_SET(verbflag, DB_VERB_REP_SYNC);
type = "page_req";
break;
case REP_REREQUEST:
type = "rerequest";
break;
case REP_START_SYNC:
FLD_SET(verbflag, DB_VERB_REP_MISC);
type = "start_sync";
break;
case REP_UPDATE:
FLD_SET(verbflag, DB_VERB_REP_SYNC | DB_VERB_REP_SYSTEM);
type = "update";
break;
case REP_UPDATE_REQ:
FLD_SET(verbflag, DB_VERB_REP_SYNC | DB_VERB_REP_SYSTEM);
type = "update_req";
break;
case REP_VERIFY:
FLD_SET(verbflag, DB_VERB_REP_SYNC | DB_VERB_REP_SYSTEM);
type = "verify";
break;
case REP_VERIFY_FAIL:
FLD_SET(verbflag, DB_VERB_REP_SYNC | DB_VERB_REP_SYSTEM);
type = "verify_fail";
break;
case REP_VERIFY_REQ:
FLD_SET(verbflag, DB_VERB_REP_SYNC | DB_VERB_REP_SYSTEM);
type = "verify_req";
break;
case REP_VOTE1:
FLD_SET(verbflag, DB_VERB_REP_ELECT | DB_VERB_REP_SYSTEM);
type = "vote1";
break;
case REP_VOTE2:
FLD_SET(verbflag, DB_VERB_REP_ELECT | DB_VERB_REP_SYSTEM);
type = "vote2";
break;
default:
type = "NOTYPE";
break;
}
ftype[0] = '\0';
if (LF_ISSET(DB_REP_ANYWHERE))
(void)strcat(ftype, " any");
if (FLD_ISSET(ctlflags, REPCTL_FLUSH))
(void)strcat(ftype, " flush");
if (!FLD_ISSET(ctlflags, REPCTL_GROUP_ESTD))
(void)strcat(ftype, " nogroup");
if (FLD_ISSET(ctlflags, REPCTL_LEASE))
(void)strcat(ftype, " lease");
if (LF_ISSET(DB_REP_NOBUFFER))
(void)strcat(ftype, " nobuf");
if (FLD_ISSET(ctlflags, REPCTL_PERM))
(void)strcat(ftype, " perm");
if (LF_ISSET(DB_REP_REREQUEST))
(void)strcat(ftype, " rereq");
if (FLD_ISSET(ctlflags, REPCTL_RESEND))
(void)strcat(ftype, " resend");
if (FLD_ISSET(ctlflags, REPCTL_LOG_END))
(void)strcat(ftype, " logend");
if ((home = env->db_home) == NULL)
home = "NULL";
VPRINT(env, (env, verbflag,
"%s %s: msgv = %lu logv %lu gen = %lu eid %d, type %s, LSN [%lu][%lu] %s",
home, str,
(u_long)rp->rep_version, (u_long)rp->log_version, (u_long)rp->gen,
eid, type, (u_long)rp->lsn.file, (u_long)rp->lsn.offset, ftype));
DB_ASSERT(env, rp->rep_version <= DB_REPVERSION+10);
DB_ASSERT(env, rp->log_version <= DB_LOGVERSION+10);
}
void
__rep_fire_event(env, event, info)
ENV *env;
u_int32_t event;
void *info;
{
int ret;
ret = __repmgr_handle_event(env, event, info);
DB_ASSERT(env, ret == 0 || ret == DB_EVENT_NOT_HANDLED);
if (ret == DB_EVENT_NOT_HANDLED)
DB_EVENT(env, event, info);
}
void
__rep_msg(env, msg)
const ENV *env;
const char *msg;
{
DB_FH *fhp;
DB_REP *db_rep;
REP *rep;
int i;
size_t cnt, nlcnt;
char nl = '\n';
if (PANIC_ISSET(env))
return;
db_rep = env->rep_handle;
rep = db_rep->region;
DB_ASSERT((ENV *)env, !FLD_ISSET(rep->config, REP_C_INMEM));
i = rep->diag_index;
fhp = db_rep->diagfile[i];
if (db_rep->diag_off != rep->diag_off)
(void)__os_seek((ENV *)env, fhp, 0, 0, rep->diag_off);
if (__os_write((ENV *)env, fhp, (void *)msg, strlen(msg), &cnt) != 0)
return;
if (__os_write((ENV *)env, fhp, &nl, 1, &nlcnt) != 0)
return;
db_rep->diag_off = rep->diag_off += (cnt + nlcnt);
if (rep->diag_off >= REP_DIAGSIZE) {
rep->diag_index = (++i % DBREP_DIAG_FILES);
rep->diag_off = 0;
}
return;
}
int
__rep_notify_threads(env, wake_reason)
ENV *env;
rep_waitreason_t wake_reason;
{
REP *rep;
struct __rep_waiter *waiter;
struct rep_waitgoal *goal;
int ret, wake;
ret = 0;
rep = env->rep_handle->region;
SH_TAILQ_FOREACH(waiter, &rep->waiters, links, __rep_waiter) {
goal = &waiter->goal;
wake = 0;
if (wake_reason == LOCKOUT) {
F_SET(waiter, REP_F_PENDING_LOCKOUT);
wake = 1;
} else if (wake_reason == goal->why ||
(goal->why == AWAIT_HISTORY && wake_reason == AWAIT_LSN)) {
if ((ret = __rep_check_goal(env, goal)) == 0)
wake = 1;
else if (ret == DB_TIMEOUT)
ret = 0;
else
goto out;
}
if (wake) {
MUTEX_UNLOCK(env, waiter->mtx_repwait);
SH_TAILQ_REMOVE(&rep->waiters,
waiter, links, __rep_waiter);
F_SET(waiter, REP_F_WOKEN);
}
}
out:
return (ret);
}
int
__rep_check_goal(env, goal)
ENV *env;
struct rep_waitgoal *goal;
{
REP *rep;
LOG *lp;
int ret;
rep = env->rep_handle->region;
lp = env->lg_handle->reginfo.primary;
ret = DB_TIMEOUT;
switch (goal->why) {
case AWAIT_LSN:
if (LOG_COMPARE(&lp->max_perm_lsn, &goal->u.lsn) >= 0)
ret = 0;
break;
case AWAIT_HISTORY:
if (LOG_COMPARE(&lp->max_perm_lsn, &goal->u.lsn) > 0)
ret = 0;
break;
case AWAIT_GEN:
if (rep->gen >= goal->u.gen)
ret = 0;
break;
case AWAIT_NIMDB:
if (F_ISSET(rep, REP_F_NIMDBS_LOADED))
ret = 0;
break;
default:
DB_ASSERT(env, 0);
}
return (ret);
}
int
__rep_log_backup(env, logc, lsn, match)
ENV *env;
DB_LOGC *logc;
DB_LSN *lsn;
u_int32_t match;
{
DBT mylog;
u_int32_t rectype;
int ret;
ret = 0;
memset(&mylog, 0, sizeof(mylog));
while ((ret = __logc_get(logc, lsn, &mylog, DB_PREV)) == 0) {
LOGCOPY_32(env, &rectype, mylog.data);
if ((match == REP_REC_COMMIT &&
rectype == DB___txn_regop) ||
(match == REP_REC_PERM &&
(rectype == DB___txn_ckp || rectype == DB___txn_regop)))
break;
}
return (ret);
}
int
__rep_get_maxpermlsn(env, max_perm_lsnp)
ENV *env;
DB_LSN *max_perm_lsnp;
{
DB_LOG *dblp;
DB_REP *db_rep;
DB_THREAD_INFO *ip;
LOG *lp;
REP *rep;
db_rep = env->rep_handle;
rep = db_rep->region;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
ENV_ENTER(env, ip);
MUTEX_LOCK(env, rep->mtx_clientdb);
*max_perm_lsnp = lp->max_perm_lsn;
MUTEX_UNLOCK(env, rep->mtx_clientdb);
ENV_LEAVE(env, ip);
return (0);
}
int
__rep_is_internal_rep_file(filename)
char *filename;
{
return (strncmp(filename,
REPFILEPREFIX, sizeof(REPFILEPREFIX) - 1) == 0 ? 1 : 0);
}
int
__rep_get_datagen(env, data_genp)
ENV *env;
u_int32_t *data_genp;
{
DB_REP *db_rep;
DB_TXN *txn;
DB *dbp;
DBC *dbc;
__rep_lsn_hist_key_args key;
u_int8_t key_buf[__REP_LSN_HIST_KEY_SIZE];
u_int8_t data_buf[__REP_LSN_HIST_DATA_SIZE];
DBT key_dbt, data_dbt;
u_int32_t flags;
int ret, t_ret, tries;
db_rep = env->rep_handle;
ret = 0;
*data_genp = 0;
tries = 0;
flags = DB_LAST;
retry:
if ((ret = __txn_begin(env, NULL, NULL, &txn, DB_IGNORE_LEASE)) != 0)
return (ret);
if ((dbp = db_rep->lsn_db) == NULL) {
if ((ret = __rep_open_sysdb(env,
NULL, txn, REPLSNHIST, 0, &dbp)) != 0) {
ret = 0;
goto out;
}
db_rep->lsn_db = dbp;
}
if ((ret = __db_cursor(dbp, NULL, txn, &dbc, 0)) != 0)
goto out;
DB_INIT_DBT(key_dbt, key_buf, __REP_LSN_HIST_KEY_SIZE);
key_dbt.ulen = __REP_LSN_HIST_KEY_SIZE;
F_SET(&key_dbt, DB_DBT_USERMEM);
memset(&data_dbt, 0, sizeof(data_dbt));
data_dbt.data = data_buf;
data_dbt.ulen = __REP_LSN_HIST_DATA_SIZE;
F_SET(&data_dbt, DB_DBT_USERMEM);
if ((ret = __dbc_get(dbc, &key_dbt, &data_dbt, flags)) != 0) {
if ((ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED) &&
++tries < 5)
ret = 0;
if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret = __txn_abort(txn)) != 0 && ret == 0)
ret = t_ret;
if (ret != 0)
goto err;
__os_yield(env, 0, 10000);
goto retry;
}
if ((ret = __dbc_close(dbc)) == 0 &&
(ret = __rep_lsn_hist_key_unmarshal(env,
&key, key_buf, __REP_LSN_HIST_KEY_SIZE, NULL)) == 0)
*data_genp = key.gen;
out:
if ((t_ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0 && ret == 0)
ret = t_ret;
err:
return (ret);
}