#ifndef _DB_REP_H_
#define _DB_REP_H_
#include "dbinc_auto/rep_automsg.h"
#if defined(__cplusplus)
extern "C" {
#endif
#define REPFILEPREFIX "__db.rep"
#define REPDBNAME "__db.rep.db"
#define REPPAGENAME "__db.reppg.db"
#define REPSYSDBNAME "__db.rep.system"
#define REPLSNHIST "__db.lsn.history"
#define REPMEMBERSHIP "__db.membership"
#define REPSYSDBPGSZ 1024
#define IS_REP_FILE(name) (strcmp(name, REPSYSDBNAME) == 0)
#define REP_COMMIT_TOKEN_FMT_VERSION 1
#define REP_LSN_HISTORY_FMT_VERSION 1
#define REP_INVALID 0
#define REP_ALIVE 1
#define REP_ALIVE_REQ 2
#define REP_ALL_REQ 3
#define REP_BULK_LOG 4
#define REP_BULK_PAGE 5
#define REP_DUPMASTER 6
#define REP_FILE 7
#define REP_FILE_FAIL 8
#define REP_FILE_REQ 9
#define REP_LEASE_GRANT 10
#define REP_LOG 11
#define REP_LOG_MORE 12
#define REP_LOG_REQ 13
#define REP_MASTER_REQ 14
#define REP_NEWCLIENT 15
#define REP_NEWFILE 16
#define REP_NEWMASTER 17
#define REP_NEWSITE 18
#define REP_PAGE 19
#define REP_PAGE_FAIL 20
#define REP_PAGE_MORE 21
#define REP_PAGE_REQ 22
#define REP_REREQUEST 23
#define REP_START_SYNC 24
#define REP_UPDATE 25
#define REP_UPDATE_REQ 26
#define REP_VERIFY 27
#define REP_VERIFY_FAIL 28
#define REP_VERIFY_REQ 29
#define REP_VOTE1 30
#define REP_VOTE2 31
#define REP_MAX_MSG 31
#define REP_MSG_REQ(rectype) \
(rectype == REP_ALL_REQ || \
rectype == REP_LOG_REQ || \
rectype == REP_PAGE_REQ || \
rectype == REP_VERIFY_REQ)
#define DB_LOGVERSION_42 8
#define DB_LOGVERSION_43 10
#define DB_LOGVERSION_44 11
#define DB_LOGVERSION_45 12
#define DB_LOGVERSION_46 13
#define DB_LOGVERSION_47 14
#define DB_LOGVERSION_48 15
#define DB_LOGVERSION_48p2 16
#define DB_LOGVERSION_50 17
#define DB_LOGVERSION_51 17
#define DB_LOGVERSION_52 18
#define DB_LOGVERSION_53 19
#define DB_LOGVERSION_MIN DB_LOGVERSION_44
#define DB_REPVERSION_INVALID 0
#define DB_REPVERSION_44 3
#define DB_REPVERSION_45 3
#define DB_REPVERSION_46 4
#define DB_REPVERSION_47 5
#define DB_REPVERSION_48 5
#define DB_REPVERSION_50 5
#define DB_REPVERSION_51 5
#define DB_REPVERSION_52 6
#define DB_REPVERSION_53 7
#define DB_REPVERSION DB_REPVERSION_53
#define DB_REPVERSION_MIN DB_REPVERSION_44
#define REP_DIAGNAME "__db.rep.diag%02d"
#define REP_DIAGSIZE MEGABYTE
#define RPRINT(env, x) do { \
if ((env)->dbenv->verbose != 0) \
(void)__rep_print_system x; \
} while (0)
#define VPRINT(env, x) do { \
if ((env)->dbenv->verbose != 0) \
(void)__rep_print x; \
} while (0)
#define REP_PRINT_MESSAGE(env, eid, rp, str, fl) do { \
if ((env)->dbenv->verbose != 0) \
__rep_print_message(env, eid, rp, str, fl); \
} while (0)
#define REP_EGENNAME "__db.rep.egen"
#define REP_GENNAME "__db.rep.gen"
#define REP_INITNAME "__db.rep.init"
#define REP_INITVERSION_46 1
#define REP_INITVERSION_47 2
#define REP_INITVERSION 3
typedef enum {
REP_DB,
REP_PG
} repdb_t;
#define REP_SYSTEM_LOCK(env) \
MUTEX_LOCK(env, (env)->rep_handle->region->mtx_region)
#define REP_SYSTEM_UNLOCK(env) \
MUTEX_UNLOCK(env, (env)->rep_handle->region->mtx_region)
#define REP_EVENT_LOCK(env) \
MUTEX_LOCK(env, (env)->rep_handle->region->mtx_event)
#define REP_EVENT_UNLOCK(env) \
MUTEX_UNLOCK(env, (env)->rep_handle->region->mtx_event)
typedef enum {
SYNC_OFF,
SYNC_LOG,
SYNC_PAGE,
SYNC_UPDATE,
SYNC_VERIFY
} repsync_t;
typedef struct {
DB_LSN lsn;
u_int32_t nsites;
u_int32_t nvotes;
u_int32_t priority;
u_int32_t tiebreaker;
u_int32_t ctlflags;
u_int32_t data_gen;
} VOTE1_CONTENT;
typedef struct __rep {
db_mutex_t mtx_region;
db_mutex_t mtx_clientdb;
db_mutex_t mtx_ckp;
db_mutex_t mtx_diag;
db_mutex_t mtx_repstart;
int diag_index;
off_t diag_off;
roff_t lease_off;
roff_t tally_off;
roff_t v2tally_off;
int eid;
int master_id;
u_int32_t version;
u_int32_t egen;
u_int32_t spent_egen;
u_int32_t gen;
u_int32_t mgen;
u_int32_t asites;
u_int32_t nsites;
u_int32_t nvotes;
u_int32_t priority;
u_int32_t config_nsites;
db_timeout_t elect_timeout;
db_timeout_t full_elect_timeout;
db_timeout_t chkpt_delay;
#define REP_DEFAULT_THROTTLE (10 * MEGABYTE)
u_int32_t gbytes;
u_int32_t bytes;
#define DB_REP_REQUEST_GAP 40000
#define DB_REP_MAX_GAP 1280000
db_timespec request_gap;
db_timespec max_gap;
u_int32_t apply_th;
u_int32_t arch_th;
u_int32_t elect_th;
u_int32_t msg_th;
u_int32_t handle_cnt;
u_int32_t op_cnt;
DB_LSN ckp_lsn;
DB_LSN max_prep_lsn;
db_mutex_t mtx_event;
u_int32_t newmaster_event_gen;
u_int32_t notified_egen;
u_int32_t nfiles;
u_int32_t curfile;
roff_t originfo_off;
u_int32_t infolen;
u_int32_t originfolen;
u_int32_t infoversion;
DB_LSN first_lsn;
u_int32_t first_vers;
DB_LSN last_lsn;
db_timespec last_pg_ts;
db_pgno_t ready_pg;
db_pgno_t waiting_pg;
db_pgno_t max_wait_pg;
u_int32_t npages;
roff_t curinfo_off;
u_int32_t sites;
int winner;
u_int32_t w_priority;
u_int32_t w_gen;
u_int32_t w_datagen;
DB_LSN w_lsn;
u_int32_t w_tiebreaker;
u_int32_t votes;
VOTE1_CONTENT vote1;
db_timespec etime;
int full_elect;
db_timeout_t lease_timeout;
db_timespec lease_duration;
u_int32_t clock_skew;
u_int32_t clock_base;
db_timespec grant_expire;
DB_LSN gen_base_lsn;
u_int32_t master_envid;
SH_TAILQ_HEAD(__wait) waiters;
SH_TAILQ_HEAD(__wfree) free_waiters;
#ifdef HAVE_REPLICATION_THREADS
db_mutex_t mtx_repmgr;
roff_t siteinfo_off;
u_int site_cnt;
u_int site_max;
int self_eid;
u_int siteinfo_seq;
u_int32_t min_log_file;
pid_t listener;
int perm_policy;
db_timeout_t ack_timeout;
db_timeout_t election_retry_wait;
db_timeout_t connection_retry_wait;
db_timeout_t heartbeat_frequency;
db_timeout_t heartbeat_monitor_timeout;
#endif
DB_REP_STAT stat;
#if defined(HAVE_REPLICATION_THREADS) && defined(HAVE_STATISTICS)
DB_REPMGR_STAT mstat;
#endif
#define REP_C_2SITE_STRICT 0x00001
#define REP_C_AUTOINIT 0x00002
#define REP_C_AUTOROLLBACK 0x00004
#define REP_C_BULK 0x00008
#define REP_C_DELAYCLIENT 0x00010
#define REP_C_ELECTIONS 0x00020
#define REP_C_INMEM 0x00040
#define REP_C_LEASE 0x00080
#define REP_C_NOWAIT 0x00100
u_int32_t config;
#define REP_E_PHASE0 0x00000001
#define REP_E_PHASE1 0x00000002
#define REP_E_PHASE2 0x00000004
#define REP_E_TALLY 0x00000008
u_int32_t elect_flags;
#define REP_LOCKOUT_API 0x00000001
#define REP_LOCKOUT_APPLY 0x00000002
#define REP_LOCKOUT_ARCHIVE 0x00000004
#define REP_LOCKOUT_MSG 0x00000008
#define REP_LOCKOUT_OP 0x00000010
u_int32_t lockout_flags;
repsync_t sync_state;
#define REP_F_ABBREVIATED 0x00000001
#define REP_F_APP_BASEAPI 0x00000002
#define REP_F_APP_REPMGR 0x00000004
#define REP_F_CLIENT 0x00000008
#define REP_F_DELAY 0x00000010
#define REP_F_GROUP_ESTD 0x00000020
#define REP_F_INUPDREQ 0x00000040
#define REP_F_LEASE_EXPIRED 0x00000080
#define REP_F_MASTER 0x00000100
#define REP_F_MASTERELECT 0x00000200
#define REP_F_NEWFILE 0x00000400
#define REP_F_NIMDBS_LOADED 0x00000800
#define REP_F_SKIPPED_APPLY 0x00001000
#define REP_F_START_CALLED 0x00002000
#define REP_F_SYS_DB_OP 0x00004000
u_int32_t flags;
} REP;
typedef enum {
AWAIT_GEN,
AWAIT_HISTORY,
AWAIT_LSN,
AWAIT_NIMDB,
LOCKOUT
} rep_waitreason_t;
struct rep_waitgoal {
rep_waitreason_t why;
union {
DB_LSN lsn;
u_int32_t gen;
} u;
};
struct __rep_waiter {
db_mutex_t mtx_repwait;
struct rep_waitgoal goal;
SH_TAILQ_ENTRY links;
#define REP_F_PENDING_LOCKOUT 0x00000001
#define REP_F_WOKEN 0x00000002
u_int32_t flags;
};
#define ISSET_LOCKOUT_BDB(R) \
(FLD_ISSET((R)->lockout_flags, (REP_LOCKOUT_API | REP_LOCKOUT_OP)))
#define CLR_LOCKOUT_BDB(R) \
(FLD_CLR((R)->lockout_flags, (REP_LOCKOUT_API | REP_LOCKOUT_OP)))
#define CLR_RECOVERY_SETTINGS(R) \
do { \
(R)->sync_state = SYNC_OFF; \
CLR_LOCKOUT_BDB(R); \
} while (0)
#define IS_REP_RECOVERING(R) \
((R)->sync_state != SYNC_OFF || ISSET_LOCKOUT_BDB(R))
#define IN_ELECTION(R) \
FLD_ISSET((R)->elect_flags, REP_E_PHASE1 | REP_E_PHASE2)
#define IN_ELECTION_TALLY(R) \
FLD_ISSET((R)->elect_flags, REP_E_PHASE1 | REP_E_PHASE2 | REP_E_TALLY)
#define ELECTION_MAJORITY(n) (((n) / 2) + 1)
#define IN_INTERNAL_INIT(R) \
((R)->sync_state == SYNC_LOG || (R)->sync_state == SYNC_PAGE)
#define IS_REP_MASTER(env) \
(REP_ON(env) && \
F_ISSET(((env)->rep_handle->region), REP_F_MASTER))
#define IS_REP_CLIENT(env) \
(REP_ON(env) && \
F_ISSET(((env)->rep_handle->region), REP_F_CLIENT))
#define IS_REP_STARTED(env) \
(REP_ON(env) && \
F_ISSET(((env)->rep_handle->region), REP_F_START_CALLED))
#define IS_USING_LEASES(env) \
(REP_ON(env) && \
FLD_ISSET(((env)->rep_handle->region)->config, REP_C_LEASE))
#define IS_CLIENT_PGRECOVER(env) \
(IS_REP_CLIENT(env) && \
(((env)->rep_handle->region)->sync_state == SYNC_PAGE))
#define REP_FLAGS_SET(env) \
((env)->rep_handle->region->flags != 0 || \
(env)->rep_handle->region->elect_flags != 0 || \
(env)->rep_handle->region->lockout_flags != 0)
#define IS_ENV_REPLICATED(env) \
(REP_ON(env) && REP_FLAGS_SET(env))
#define MASTER_UPDATE(env, renv) do { \
REP_SYSTEM_LOCK(env); \
F_SET((renv), DB_REGENV_REPLOCKED); \
(void)time(&(renv)->op_timestamp); \
REP_SYSTEM_UNLOCK(env); \
} while (0)
#define SET_GEN(g) do { \
rep->gen = (g); \
ZERO_LSN(rep->gen_base_lsn); \
} while (0)
#define REP_GAP_FORCE 0x001
#define REP_GAP_REREQUEST 0x002
#define REP_REC_COMMIT 0x001
#define REP_REC_PERM 0x002
#define REPLICATION_WRAP(env, func_call, checklock, ret) do { \
int __rep_check, __t_ret; \
__rep_check = IS_ENV_REPLICATED(env) ? 1 : 0; \
(ret) = __rep_check ? __env_rep_enter(env, checklock) : 0; \
if ((ret) == 0) { \
(ret) = func_call; \
if (__rep_check && (__t_ret = \
__env_db_rep_exit(env)) != 0 && (ret) == 0) \
(ret) = __t_ret; \
} \
} while (0)
#define GET_CURINFO(rep, infop, curinfo) \
do { \
curinfo = R_ADDR(infop, rep->curinfo_off); \
if ((curinfo)->uid.size > 0) \
(curinfo)->uid.data = R_ADDR(infop, \
rep->curinfo_off + sizeof(__rep_fileinfo_args)); \
else \
(curinfo)->uid.data = NULL; \
if ((curinfo)->info.size > 0) \
(curinfo)->info.data = R_ADDR(infop, rep->curinfo_off + \
sizeof(__rep_fileinfo_args) + (curinfo)->uid.size); \
else \
(curinfo)->info.data = NULL; \
if ((curinfo)->dir.size > 0) \
(curinfo)->dir.data = R_ADDR(infop, rep->curinfo_off + \
sizeof(__rep_fileinfo_args) + (curinfo)->uid.size + \
(curinfo)->info.size); \
else \
(curinfo)->dir.data = NULL; \
} while (0)
struct __db_rep {
int eid;
u_int32_t gbytes;
u_int32_t bytes;
db_timespec request_gap;
db_timespec max_gap;
u_int32_t clock_skew;
u_int32_t clock_base;
u_int32_t config;
u_int32_t config_nsites;
db_timeout_t elect_timeout;
db_timeout_t full_elect_timeout;
db_timeout_t chkpt_delay;
u_int32_t my_priority;
db_timeout_t lease_timeout;
int (*send)
__P((DB_ENV *, const DBT *, const DBT *,
const DB_LSN *, int, u_int32_t));
DB *rep_db;
DB *lsn_db;
REP *region;
u_int8_t *bulk;
#define DBREP_DIAG_FILES 2
DB_FH *diagfile[DBREP_DIAG_FILES];
off_t diag_off;
DB_MPOOLFILE *file_mpf;
DB *file_dbp;
DBC *queue_dbc;
#define DBREP_APP_BASEAPI 0x0001
#define DBREP_APP_REPMGR 0x0002
#define DBREP_OPENFILES 0x0004
u_int32_t flags;
#ifdef HAVE_REPLICATION_THREADS
u_int nthreads;
u_int athreads;
u_int non_rep_th;
u_int aelect_threads;
u_int32_t init_policy;
int perm_policy;
DB_LSN perm_lsn;
db_timeout_t ack_timeout;
db_timeout_t election_retry_wait;
db_timeout_t connection_retry_wait;
db_timeout_t heartbeat_frequency;
db_timeout_t heartbeat_monitor_timeout;
REPMGR_RUNNABLE *selector, **messengers, **elect_threads;
REPMGR_RUNNABLE *preferred_elect_thr;
db_timespec repstart_time;
mgr_mutex_t *mutex;
cond_var_t check_election, gmdb_idle, msg_avail;
waiter_t ack_waiters;
#ifdef DB_WIN32
HANDLE signaler;
#else
int read_pipe, write_pipe;
#endif
REPMGR_SITE *sites;
u_int site_cnt;
u_int site_max;
int self_eid;
u_int siteinfo_seq;
CONNECTION_LIST connections;
RETRY_Q_HEADER retries;
struct {
int size;
STAILQ_HEAD(__repmgr_q_header, __repmgr_message) header;
} input_queue;
socket_t listen_fd;
db_timespec last_bcast;
enum { ready, running, stopped } repmgr_status;
int new_connection;
int takeover_pending;
int gmdb_busy;
int client_intent;
int gmdb_dirty;
int have_gmdb;
int seen_repmsg;
enum { none, gmdb_primary, gmdb_secondary } active_gmdb_update;
int limbo_resolution_needed;
u_int32_t membership_version;
u_int32_t member_version_gen;
DB_LSN limbo_failure;
int limbo_victim;
DB_LSN durable_lsn;
DB *gmdb;
u_int8_t *restored_list;
size_t restored_list_length;
void (*msg_dispatch) __P((DB_ENV *, DB_CHANNEL *,
DBT *, u_int32_t, u_int32_t));
#endif
};
#ifdef HAVE_REPLICATION_THREADS
#define APP_IS_REPMGR(env) \
(REP_ON(env) ? \
F_ISSET((env)->rep_handle->region, REP_F_APP_REPMGR) : \
F_ISSET((env)->rep_handle, DBREP_APP_REPMGR))
#define APP_IS_BASEAPI(env) \
(REP_ON(env) ? \
F_ISSET((env)->rep_handle->region, REP_F_APP_BASEAPI) : \
F_ISSET((env)->rep_handle, DBREP_APP_BASEAPI))
#define APP_SET_REPMGR(env) do { \
if (REP_ON(env)) { \
ENV_ENTER(env, ip); \
REP_SYSTEM_LOCK(env); \
if (!F_ISSET((env)->rep_handle->region, \
REP_F_APP_BASEAPI)) \
F_SET((env)->rep_handle->region, \
REP_F_APP_REPMGR); \
REP_SYSTEM_UNLOCK(env); \
ENV_LEAVE(env, ip); \
} else if (!F_ISSET((env)->rep_handle, DBREP_APP_BASEAPI)) \
F_SET((env)->rep_handle, DBREP_APP_REPMGR); \
} while (0)
#define APP_SET_BASEAPI(env) do { \
if (REP_ON(env)) { \
ENV_ENTER(env, ip); \
REP_SYSTEM_LOCK(env); \
if (!F_ISSET((env)->rep_handle->region, \
REP_F_APP_REPMGR)) \
F_SET((env)->rep_handle->region, \
REP_F_APP_BASEAPI); \
REP_SYSTEM_UNLOCK(env); \
ENV_LEAVE(env, ip); \
} else if (!F_ISSET((env)->rep_handle, DBREP_APP_REPMGR)) \
F_SET((env)->rep_handle, DBREP_APP_BASEAPI); \
} while (0)
#else
#define APP_IS_REPMGR(env) 0
#define APP_SET_REPMGR(env) do { \
; \
} while (0)
#define APP_IS_BASEAPI(env) 1
#define APP_SET_BASEAPI(env) do { \
; \
} while (0)
#endif
#define DB_LOG_PERM_42_44 0x20
#define DB_LOG_RESEND_42_44 0x40
#define REPCTL_INIT_45 0x02
#define REPCTL_ELECTABLE 0x01
#define REPCTL_FLUSH 0x02
#define REPCTL_GROUP_ESTD 0x04
#define REPCTL_INIT 0x08
#define REPCTL_LEASE 0x10
#define REPCTL_LOG_END 0x80
#define REPCTL_PERM DB_LOG_PERM_42_44
#define REPCTL_RESEND DB_LOG_RESEND_42_44
#define REPINFO_DB_LITTLEENDIAN 0x0001
#define REPINFO_PG_LITTLEENDIAN 0x0002
typedef struct {
u_int32_t rep_version;
u_int32_t log_version;
DB_LSN lsn;
u_int32_t rectype;
u_int32_t gen;
db_timespec msg_time;
u_int32_t flags;
} REP_46_CONTROL;
typedef struct {
u_int32_t rep_version;
u_int32_t log_version;
DB_LSN lsn;
u_int32_t rectype;
u_int32_t gen;
u_int32_t flags;
} REP_OLD_CONTROL;
#define LEASE_REFRESH_MIN 30
#define LEASE_REFRESH_USEC 50000
typedef struct __rep_lease_entry {
int eid;
db_timespec start_time;
db_timespec end_time;
DB_LSN lease_lsn;
} REP_LEASE_ENTRY;
typedef struct {
u_int32_t egen;
int nsites;
int nvotes;
int priority;
u_int32_t tiebreaker;
} REP_OLD_VOTE_INFO;
typedef struct {
u_int32_t egen;
int eid;
} REP_VTALLY;
#define REP_THROTTLE_ONLY 0x0001
typedef struct {
DB_LSN lsn;
DBT *data_dbt;
u_int32_t gbytes;
u_int32_t bytes;
u_int32_t type;
} REP_THROTTLE;
typedef struct {
u_int8_t *addr;
roff_t *offp;
u_int32_t len;
u_int32_t type;
DB_LSN lsn;
int eid;
#define BULK_XMIT 0x001
u_int32_t *flagsp;
} REP_BULK;
typedef struct {
u_int nlsns;
u_int nalloc;
DB_LSN *array;
} LSN_COLLECTION;
typedef struct {
int n;
DB_LOCKREQ *reqs;
DBT *objs;
} linfo_t;
#if defined(__cplusplus)
}
#endif
#include "dbinc_auto/rep_ext.h"
#endif