#include "db_config.h"
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/db_am.h"
#include "dbinc/txn.h"
static int __rep_internal_init __P((ENV *, u_int32_t));
int
__rep_verify(env, rp, rec, eid, savetime)
ENV *env;
__rep_control_args *rp;
DBT *rec;
int eid;
time_t savetime;
{
DBT mylog;
DB_LOG *dblp;
DB_LOGC *logc;
DB_LSN lsn, prev_ckp;
DB_REP *db_rep;
LOG *lp;
REP *rep;
__txn_ckp_args *ckp_args;
u_int32_t logflag, rectype;
int master, match, ret, t_ret;
ret = 0;
db_rep = env->rep_handle;
rep = db_rep->region;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
if (rep->sync_state != SYNC_VERIFY)
return (ret);
#ifdef DIAGNOSTIC
if (IS_USING_LEASES(env)) {
REP_SYSTEM_LOCK(env);
DB_ASSERT(env, __rep_islease_granted(env) == 0);
REP_SYSTEM_UNLOCK(env);
}
#endif
if ((ret = __log_cursor(env, &logc)) != 0)
return (ret);
memset(&mylog, 0, sizeof(mylog));
MUTEX_LOCK(env, rep->mtx_clientdb);
logflag = IS_ZERO_LSN(lp->verify_lsn) ? DB_LAST : DB_SET;
prev_ckp = lp->prev_ckp;
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if ((ret = __logc_get(logc, &rp->lsn, &mylog, logflag)) != 0)
goto out;
match = 0;
if (mylog.size == rec->size &&
memcmp(mylog.data, rec->data, rec->size) == 0)
match = 1;
if (match == 0) {
master = rep->master_id;
LOGCOPY_32(env, &rectype, mylog.data);
DB_ASSERT(env, ret == 0);
if (!lp->db_log_inmemory && rectype == DB___txn_ckp) {
if ((ret = __txn_ckp_read(env,
mylog.data, &ckp_args)) != 0)
goto out;
lsn = ckp_args->last_ckp;
__os_free(env, ckp_args);
MUTEX_LOCK(env, rep->mtx_clientdb);
lp->prev_ckp = lsn;
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (IS_ZERO_LSN(lsn)) {
if ((ret = __logc_get(logc,
&lsn, &mylog, DB_FIRST)) != 0)
goto out;
if (lsn.file != 1) {
ret = __rep_internal_init(env, 0);
goto out;
}
if ((ret = __logc_get(logc,
&rp->lsn, &mylog, DB_SET)) != 0)
goto out;
}
}
if ((ret = __rep_log_backup(env, logc, &lsn,
REP_REC_PERM)) == 0) {
MUTEX_LOCK(env, rep->mtx_clientdb);
lp->verify_lsn = lsn;
__os_gettime(env, &lp->rcvd_ts, 1);
lp->wait_ts = rep->request_gap;
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (master != DB_EID_INVALID)
eid = master;
(void)__rep_send_message(env, eid, REP_VERIFY_REQ,
&lsn, NULL, 0, DB_REP_ANYWHERE);
} else if (ret == DB_NOTFOUND) {
ret = __rep_internal_init(env, 0);
}
} else {
if (!lp->db_log_inmemory && !IS_ZERO_LSN(prev_ckp)) {
if ((ret = __logc_get(logc,
&prev_ckp, &mylog, DB_SET)) != 0) {
if (ret == DB_NOTFOUND)
ret = __rep_internal_init(env, 0);
goto out;
}
}
if (rep->version == DB_REPVERSION_44) {
REP_SYSTEM_LOCK(env);
F_SET(rep, REP_F_NIMDBS_LOADED);
REP_SYSTEM_UNLOCK(env);
}
if (F_ISSET(rep, REP_F_NIMDBS_LOADED))
ret = __rep_verify_match(env, &rp->lsn, savetime);
else {
ret = __rep_internal_init(env, REP_F_ABBREVIATED);
}
}
out: if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
static int
__rep_internal_init(env, abbrev)
ENV *env;
u_int32_t abbrev;
{
REP *rep;
int master, ret;
rep = env->rep_handle->region;
REP_SYSTEM_LOCK(env);
#ifdef HAVE_STATISTICS
if (!abbrev)
rep->stat.st_outdated++;
#endif
if (!FLD_ISSET(rep->config, REP_C_AUTOINIT) && !abbrev)
ret = DB_REP_JOIN_FAILURE;
else {
rep->sync_state = SYNC_UPDATE;
if (abbrev) {
RPRINT(env, (env, DB_VERB_REP_SYNC,
"send UPDATE_REQ, merely to check for NIMDB refresh"));
F_SET(rep, REP_F_ABBREVIATED);
} else
F_CLR(rep, REP_F_ABBREVIATED);
ZERO_LSN(rep->first_lsn);
ZERO_LSN(rep->ckp_lsn);
ret = 0;
}
master = rep->master_id;
REP_SYSTEM_UNLOCK(env);
if (ret == 0 && master != DB_EID_INVALID)
(void)__rep_send_message(env,
master, REP_UPDATE_REQ, NULL, NULL, 0, 0);
return (ret);
}
int
__rep_verify_fail(env, rp)
ENV *env;
__rep_control_args *rp;
{
DB_LOG *dblp;
DB_REP *db_rep;
LOG *lp;
REP *rep;
int clnt_lock_held, lockout, master, ret;
clnt_lock_held = lockout = 0;
ret = 0;
db_rep = env->rep_handle;
rep = db_rep->region;
dblp = env->lg_handle;
lp = dblp->reginfo.primary;
if (rep->sync_state == SYNC_PAGE || rep->sync_state == SYNC_UPDATE)
return (0);
REP_SYSTEM_LOCK(env);
DB_ASSERT(env,
!IS_USING_LEASES(env) || __rep_islease_granted(env) == 0);
if (rep->sync_state == SYNC_LOG &&
LOG_COMPARE(&rep->first_lsn, &rp->lsn) <= 0 &&
LOG_COMPARE(&rep->last_lsn, &rp->lsn) >= 0) {
if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG))
goto unlock;
if ((ret = __rep_lockout_msg(env, rep, 1)) != 0)
goto unlock;
lockout = 1;
if (ISSET_LOCKOUT_BDB(rep)) {
RPRINT(env, (env, DB_VERB_REP_SYNC,
"VERIFY_FAIL is cleaning up old internal init for missing log"));
if ((ret =
__rep_init_cleanup(env, rep, DB_FORCE)) != 0) {
RPRINT(env, (env, DB_VERB_REP_SYNC,
"VERIFY_FAIL error cleaning up internal init for missing log: %d", ret));
goto msglck;
}
CLR_RECOVERY_SETTINGS(rep);
}
FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
lockout = 0;
}
REP_SYSTEM_UNLOCK(env);
MUTEX_LOCK(env, rep->mtx_clientdb);
clnt_lock_held = 1;
REP_SYSTEM_LOCK(env);
if ((rep->sync_state == SYNC_VERIFY &&
LOG_COMPARE(&rp->lsn, &lp->verify_lsn) == 0) ||
(rep->sync_state == SYNC_LOG &&
LOG_COMPARE(&rep->first_lsn, &rp->lsn) <= 0 &&
LOG_COMPARE(&rep->last_lsn, &rp->lsn) >= 0) ||
(rep->sync_state == SYNC_OFF &&
LOG_COMPARE(&rp->lsn, &lp->ready_lsn) >= 0)) {
STAT(rep->stat.st_outdated++);
if (!FLD_ISSET(rep->config, REP_C_AUTOINIT)) {
ret = DB_REP_JOIN_FAILURE;
goto unlock;
}
rep->sync_state = SYNC_UPDATE;
ZERO_LSN(rep->first_lsn);
ZERO_LSN(rep->ckp_lsn);
lp->wait_ts = rep->request_gap;
master = rep->master_id;
REP_SYSTEM_UNLOCK(env);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
if (master != DB_EID_INVALID)
(void)__rep_send_message(env,
master, REP_UPDATE_REQ, NULL, NULL, 0, 0);
} else {
msglck: if (lockout)
FLD_CLR(rep->lockout_flags, REP_LOCKOUT_MSG);
unlock: REP_SYSTEM_UNLOCK(env);
if (clnt_lock_held)
MUTEX_UNLOCK(env, rep->mtx_clientdb);
}
return (ret);
}
int
__rep_verify_req(env, rp, eid)
ENV *env;
__rep_control_args *rp;
int eid;
{
DBT *d, data_dbt;
DB_LOGC *logc;
DB_REP *db_rep;
REP *rep;
u_int32_t type;
int old, ret;
ret = 0;
db_rep = env->rep_handle;
rep = db_rep->region;
type = REP_VERIFY;
if ((ret = __log_cursor(env, &logc)) != 0)
return (ret);
d = &data_dbt;
memset(d, 0, sizeof(data_dbt));
F_SET(logc, DB_LOG_SILENT_ERR);
ret = __logc_get(logc, &rp->lsn, d, DB_SET);
if (ret == DB_NOTFOUND) {
if (F_ISSET(rep, REP_F_CLIENT)) {
(void)__logc_close(logc);
return (DB_NOTFOUND);
}
if (__log_is_outdated(env, rp->lsn.file, &old) == 0 &&
old != 0)
type = REP_VERIFY_FAIL;
}
if (ret != 0)
d = NULL;
(void)__rep_send_message(env, eid, type, &rp->lsn, d, 0, 0);
return (__logc_close(logc));
}
int
__rep_dorecovery(env, lsnp, trunclsnp)
ENV *env;
DB_LSN *lsnp, *trunclsnp;
{
DBT mylog;
DB_LOGC *logc;
DB_LSN last_ckp, lsn;
DB_REP *db_rep;
DB_THREAD_INFO *ip;
REP *rep;
int ret, rollback, skip_rec, t_ret, update;
u_int32_t rectype, opcode;
__txn_regop_args *txnrec;
__txn_regop_42_args *txn42rec;
db_rep = env->rep_handle;
rep = db_rep->region;
ENV_GET_THREAD_INFO(env, ip);
if ((ret = __log_cursor(env, &logc)) != 0)
return (ret);
memset(&mylog, 0, sizeof(mylog));
if (rep->sync_state == SYNC_LOG) {
skip_rec = 0;
update = 1;
} else {
skip_rec = 1;
update = 0;
}
rollback = 0;
while (update == 0 &&
(ret = __logc_get(logc, &lsn, &mylog, DB_PREV)) == 0 &&
LOG_COMPARE(&lsn, lsnp) > 0) {
LOGCOPY_32(env, &rectype, mylog.data);
DB_ASSERT(env, rep->op_cnt == 0);
DB_ASSERT(env, rep->msg_th == 1);
if (rectype == DB___txn_regop || rectype == DB___txn_ckp ||
rectype == DB___dbreg_register)
skip_rec = 0;
if (rectype == DB___txn_regop) {
if (rep->version >= DB_REPVERSION_44) {
if ((ret = __txn_regop_read(
env, mylog.data, &txnrec)) != 0)
goto err;
opcode = txnrec->opcode;
__os_free(env, txnrec);
} else {
if ((ret = __txn_regop_42_read(
env, mylog.data, &txn42rec)) != 0)
goto err;
opcode = txn42rec->opcode;
__os_free(env, txn42rec);
}
if (opcode != TXN_ABORT) {
rollback = 1;
update = 1;
}
}
}
if (ret != 0)
goto err;
if (skip_rec) {
if ((ret = __log_get_stable_lsn(env, &last_ckp, 0)) != 0) {
if (ret != DB_NOTFOUND)
goto err;
ZERO_LSN(last_ckp);
}
RPRINT(env, (env, DB_VERB_REP_SYNC,
"Skip sync-up rec. Truncate log to [%lu][%lu], ckp [%lu][%lu]",
(u_long)lsnp->file, (u_long)lsnp->offset,
(u_long)last_ckp.file, (u_long)last_ckp.offset));
ret = __log_vtruncate(env, lsnp, &last_ckp, trunclsnp);
} else {
if (rollback && !FLD_ISSET(rep->config, REP_C_AUTOROLLBACK)) {
ret = DB_REP_WOULDROLLBACK;
goto err;
}
ret = __db_apprec(env, ip, lsnp, trunclsnp, update, 0);
}
if (ret != 0)
goto err;
F_SET(db_rep, DBREP_OPENFILES);
if (update && db_rep->lsn_db != NULL) {
ret = __db_close(db_rep->lsn_db, NULL, DB_NOSYNC);
db_rep->lsn_db = NULL;
}
err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
int
__rep_verify_match(env, reclsnp, savetime)
ENV *env;
DB_LSN *reclsnp;
time_t savetime;
{
DB_LOG *dblp;
DB_LSN trunclsn;
DB_REP *db_rep;
DB_THREAD_INFO *ip;
LOG *lp;
REGENV *renv;
REGINFO *infop;
REP *rep;
int done, event, master, ret;
u_int32_t unused;
dblp = env->lg_handle;
db_rep = env->rep_handle;
rep = db_rep->region;
lp = dblp->reginfo.primary;
ret = 0;
event = 0;
infop = env->reginfo;
renv = infop->primary;
ENV_GET_THREAD_INFO(env, ip);
MUTEX_LOCK(env, rep->mtx_clientdb);
done = savetime != renv->rep_timestamp;
if (done) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
return (0);
}
ZERO_LSN(lp->verify_lsn);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
REP_SYSTEM_LOCK(env);
if (FLD_ISSET(rep->lockout_flags, REP_LOCKOUT_MSG) ||
(rep->sync_state != SYNC_LOG &&
ISSET_LOCKOUT_BDB(rep))) {
STAT(rep->stat.st_msgs_recover++);
goto errunlock;
}
if ((ret = __rep_lockout_msg(env, rep, 1)) != 0)
goto errunlock;
if ((ret = __rep_lockout_api(env, rep)) != 0)
goto errunlock;
REP_SYSTEM_UNLOCK(env);
if ((ret = __rep_dorecovery(env, reclsnp, &trunclsn)) != 0 ||
(ret = __rep_remove_init_file(env)) != 0) {
REP_SYSTEM_LOCK(env);
FLD_CLR(rep->lockout_flags,
REP_LOCKOUT_API | REP_LOCKOUT_MSG | REP_LOCKOUT_OP);
goto errunlock;
}
MUTEX_LOCK(env, rep->mtx_clientdb);
lp->ready_lsn = trunclsn;
ZERO_LSN(lp->waiting_lsn);
ZERO_LSN(lp->max_wait_lsn);
lp->max_perm_lsn = *reclsnp;
lp->wait_ts = rep->request_gap;
__os_gettime(env, &lp->rcvd_ts, 1);
ZERO_LSN(lp->verify_lsn);
ZERO_LSN(lp->prev_ckp);
if (db_rep->rep_db == NULL &&
(ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
goto out;
}
F_SET(db_rep->rep_db, DB_AM_RECOVER);
MUTEX_UNLOCK(env, rep->mtx_clientdb);
ret = __db_truncate(db_rep->rep_db, ip, NULL, &unused);
MUTEX_LOCK(env, rep->mtx_clientdb);
F_CLR(db_rep->rep_db, DB_AM_RECOVER);
REP_SYSTEM_LOCK(env);
STAT(rep->stat.st_log_queued = 0);
if (IN_INTERNAL_INIT(rep))
event = 1;
CLR_RECOVERY_SETTINGS(rep);
FLD_CLR(rep->lockout_flags, REP_LOCKOUT_ARCHIVE | REP_LOCKOUT_MSG);
if (ret != 0)
goto errunlock2;
master = rep->master_id;
REP_SYSTEM_UNLOCK(env);
if (master == DB_EID_INVALID) {
MUTEX_UNLOCK(env, rep->mtx_clientdb);
ret = 0;
} else {
lp->wait_ts = rep->max_gap;
MUTEX_UNLOCK(env, rep->mtx_clientdb);
(void)__rep_send_message(env,
master, REP_ALL_REQ, reclsnp, NULL, 0, DB_REP_ANYWHERE);
}
if (event)
__rep_fire_event(env, DB_EVENT_REP_INIT_DONE, NULL);
if (0) {
errunlock2: MUTEX_UNLOCK(env, rep->mtx_clientdb);
errunlock: REP_SYSTEM_UNLOCK(env);
}
out: return (ret);
}