#ifndef _DB_REPMGR_H_
#define _DB_REPMGR_H_
#include "dbinc_auto/repmgr_automsg.h"
#if defined(__cplusplus)
extern "C" {
#endif
#define REPMGR_APP_MESSAGE 5
#define REPMGR_APP_RESPONSE 6
#define REPMGR_OWN_MSG 8
#define REPMGR_HANDSHAKE 2
#define REPMGR_HEARTBEAT 4
#define REPMGR_PERMLSN 1
#define REPMGR_REP_MESSAGE 3
#define REPMGR_RESP_ERROR 7
#define REPMGR_MAX_V1_MSG_TYPE 3
#define REPMGR_MAX_V2_MSG_TYPE 4
#define REPMGR_MAX_V3_MSG_TYPE 4
#define REPMGR_MAX_V4_MSG_TYPE 8
#define HEARTBEAT_MIN_VERSION 2
#define CHANNEL_MIN_VERSION 4
#define CONN_COLLISION_VERSION 4
#define GM_MIN_VERSION 4
#define OWN_MIN_VERSION 4
#define DB_REPMGR_VERSION 4
#define DB_REPMGR_MIN_VERSION 1
#define REPMGR_CONNECT_REJECT 1
#define REPMGR_GM_FAILURE 2
#define REPMGR_GM_FORWARD 3
#define REPMGR_JOIN_REQUEST 4
#define REPMGR_JOIN_SUCCESS 5
#define REPMGR_PARM_REFRESH 6
#define REPMGR_REJOIN 7
#define REPMGR_REMOVE_REQUEST 8
#define REPMGR_REMOVE_SUCCESS 9
#define REPMGR_RESOLVE_LIMBO 10
#define REPMGR_SHARING 11
struct __repmgr_connection;
typedef struct __repmgr_connection REPMGR_CONNECTION;
struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE;
struct __queued_output; typedef struct __queued_output QUEUED_OUTPUT;
struct __repmgr_response; typedef struct __repmgr_response REPMGR_RESPONSE;
struct __repmgr_retry; typedef struct __repmgr_retry REPMGR_RETRY;
struct __repmgr_runnable; typedef struct __repmgr_runnable REPMGR_RUNNABLE;
struct __repmgr_site; typedef struct __repmgr_site REPMGR_SITE;
struct __cond_waiters_table;
typedef struct __cond_waiters_table COND_WAITERS_TABLE;
#define REPMGR_GMDB_FMT_VERSION 1
#ifdef DB_WIN32
typedef SOCKET socket_t;
typedef HANDLE thread_id_t;
typedef HANDLE mgr_mutex_t;
typedef HANDLE cond_var_t;
typedef COND_WAITERS_TABLE *waiter_t;
typedef WSABUF db_iovec_t;
#else
typedef int socket_t;
typedef pthread_t thread_id_t;
typedef pthread_mutex_t mgr_mutex_t;
typedef pthread_cond_t cond_var_t;
typedef pthread_cond_t waiter_t;
typedef struct iovec db_iovec_t;
#endif
#define OUT_QUEUE_LIMIT 10
#ifndef MAXHOSTNAMELEN
#define MAXHOSTNAMELEN 256
#endif
#define MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20)
typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1];
#define MAX_MSG_BUF (__REPMGR_MAXMSG_SIZE + MAXHOSTNAMELEN + 1)
#define DB_REPMGR_DEFAULT_ACK_TIMEOUT (1 * US_PER_SEC)
#define DB_REPMGR_DEFAULT_CONNECTION_RETRY (30 * US_PER_SEC)
#define DB_REPMGR_DEFAULT_ELECTION_RETRY (10 * US_PER_SEC)
#define DB_REPMGR_DEFAULT_CHANNEL_TIMEOUT (5 * US_PER_SEC)
typedef TAILQ_HEAD(__repmgr_conn_list, __repmgr_connection) CONNECTION_LIST;
typedef STAILQ_HEAD(__repmgr_out_q_head, __queued_output) OUT_Q_HEADER;
typedef TAILQ_HEAD(__repmgr_retry_q, __repmgr_retry) RETRY_Q_HEADER;
struct __repmgr_runnable {
ENV *env;
thread_id_t thread_id;
void *(*run) __P((void *));
int finished;
int quit_requested;
#ifdef DB_WIN32
HANDLE quit_event;
#endif
union {
#define ELECT_F_EVENT_NOTIFY 0x01
#define ELECT_F_FAST 0x02
#define ELECT_F_IMMED 0x04
#define ELECT_F_INVITEE 0x08
#define ELECT_F_STARTUP 0x10
u_int32_t flags;
int eid;
} args;
};
struct __repmgr_retry {
TAILQ_ENTRY(__repmgr_retry) entries;
int eid;
db_timespec time;
};
#define REPMGR_IOVECS_ALLOC_SZ(n) \
(sizeof(REPMGR_IOVECS) + ((n) - MIN_IOVEC) * sizeof(db_iovec_t))
typedef struct {
int offset;
int count;
size_t total_bytes;
#define MIN_IOVEC 3
db_iovec_t vectors[MIN_IOVEC];
} REPMGR_IOVECS;
typedef struct {
size_t length;
int ref_count;
u_int8_t data[1];
} REPMGR_FLAT;
struct __queued_output {
STAILQ_ENTRY(__queued_output) entries;
REPMGR_FLAT *msg;
size_t offset;
};
typedef struct __repmgr_message {
STAILQ_ENTRY(__repmgr_message) entries;
__repmgr_msg_hdr_args msg_hdr;
union {
struct {
int originating_eid;
DBT control, rec;
} repmsg;
struct {
REPMGR_CONNECTION *conn;
DBT request;
} gmdb_msg;
struct {
REPMGR_CONNECTION *conn;
DBT buf;
DBT segments[1];
} appmsg;
} v;
} REPMGR_MESSAGE;
typedef enum {
SIZES_PHASE,
DATA_PHASE
} phase_t;
typedef enum {
APP_CONNECTION,
REP_CONNECTION,
UNKNOWN_CONN_TYPE
} conn_type_t;
struct __repmgr_connection {
TAILQ_ENTRY(__repmgr_connection) entries;
socket_t fd;
#ifdef DB_WIN32
WSAEVENT event_object;
#endif
u_int32_t ref_count;
conn_type_t type;
u_int32_t version;
#define CONN_CONGESTED 1
#define CONN_CONNECTED 2
#define CONN_DEFUNCT 3
#define CONN_NEGOTIATE 4
#define CONN_PARAMETERS 5
#define CONN_READY 6
int state;
phase_t reading_phase;
REPMGR_IOVECS iovecs;
u_int8_t msg_type;
u_int8_t msg_hdr_buf[__REPMGR_MSG_HDR_SIZE];
union {
REPMGR_MESSAGE *rep_message;
struct {
DBT cntrl, rec;
} repmgr_msg;
} input;
OUT_Q_HEADER outbound_queue;
int out_queue_length;
cond_var_t drained;
waiter_t response_waiters;
REPMGR_RESPONSE *responses;
u_int32_t aresp;
u_int32_t cur_resp;
int eid;
};
#define IS_READY_STATE(s) ((s) == CONN_READY || (s) == CONN_CONGESTED)
#ifdef HAVE_GETADDRINFO
typedef struct addrinfo ADDRINFO;
typedef struct sockaddr_storage ACCEPT_ADDR;
#else
typedef struct sockaddr_in ACCEPT_ADDR;
#undef AI_PASSIVE
#define AI_PASSIVE 0x01
#undef AI_CANONNAME
#define AI_CANONNAME 0x02
#undef AI_NUMERICHOST
#define AI_NUMERICHOST 0x04
typedef struct __addrinfo {
int ai_flags;
int ai_family;
int ai_socktype;
int ai_protocol;
size_t ai_addrlen;
char *ai_canonname;
struct sockaddr *ai_addr;
struct __addrinfo *ai_next;
} ADDRINFO;
#endif
typedef struct {
roff_t host;
u_int16_t port;
} SITEADDR;
typedef struct {
SITEADDR addr;
u_int32_t config;
u_int32_t status;
} SITEINFO;
typedef struct {
char *host;
u_int16_t port;
} repmgr_netaddr_t;
#define SITE_FROM_EID(eid) (&db_rep->sites[eid])
#define EID_FROM_SITE(s) ((int)((s) - (&db_rep->sites[0])))
#define IS_VALID_EID(e) ((e) >= 0)
#define IS_KNOWN_REMOTE_SITE(e) ((e) >= 0 && ((e) != db_rep->self_eid) && \
(((u_int)(e)) < db_rep->site_cnt))
#define FOR_EACH_REMOTE_SITE_INDEX(i) \
for ((i) = (db_rep->self_eid == 0 ? 1 : 0); \
((u_int)i) < db_rep->site_cnt; \
(int)(++(i)) == db_rep->self_eid ? ++(i) : i)
struct __repmgr_site {
repmgr_netaddr_t net_addr;
u_int32_t membership;
u_int32_t config;
DB_LSN max_ack;
int ack_policy;
u_int16_t alignment;
db_timespec last_rcvd_timestamp;
struct {
struct {
REPMGR_CONNECTION *in;
REPMGR_CONNECTION *out;
} conn;
REPMGR_RETRY *retry;
} ref;
CONNECTION_LIST sub_conns;
REPMGR_RUNNABLE *connector;
#define SITE_CONNECTED 1
#define SITE_CONNECTING 2
#define SITE_IDLE 3
#define SITE_PAUSING 4
int state;
#define SITE_HAS_PRIO 0x01
#define SITE_ELECTABLE 0x02
#define SITE_TOUCHED 0x04
u_int32_t flags;
};
#define DB_SITE_PREOPEN 0x01
struct __repmgr_response {
DBT dbt;
int ret;
#define RESP_COMPLETE 0x01
#define RESP_DUMMY_BUF 0x02
#define RESP_IN_USE 0x04
#define RESP_READING 0x08
#define RESP_THREAD_WAITING 0x10
u_int32_t flags;
};
struct __channel {
DB_CHANNEL *db_channel;
ENV *env;
union {
REPMGR_CONNECTION *conn;
struct {
mgr_mutex_t *mutex;
REPMGR_CONNECTION **array;
u_int32_t cnt;
} conns;
} c;
REPMGR_MESSAGE *msg;
int responded;
__repmgr_msg_metadata_args *meta;
struct __repmgr_response response;
};
#define REPMGR_HDR1(hdr) ((hdr).word1)
#define REPMGR_HDR2(hdr) ((hdr).word2)
#define APP_MSG_BUFFER_SIZE REPMGR_HDR1
#define APP_MSG_SEGMENT_COUNT REPMGR_HDR2
#define REP_MSG_CONTROL_SIZE REPMGR_HDR1
#define REP_MSG_REC_SIZE REPMGR_HDR2
#define APP_RESP_BUFFER_SIZE REPMGR_HDR1
#define APP_RESP_TAG REPMGR_HDR2
#define RESP_ERROR_CODE REPMGR_HDR1
#define RESP_ERROR_TAG REPMGR_HDR2
#define REPMGR_OWN_BUF_SIZE REPMGR_HDR1
#define REPMGR_OWN_MSG_TYPE REPMGR_HDR2
#define APP_CHANNEL_CONNECTION 0x02
#define ELECTABLE_SITE 0x04
#define REPMGR_SUBORDINATE 0x01
#define REPMGR_MULTI_RESP 0x01
#define REPMGR_REQUEST_MSG_TYPE 0x02
#define REPMGR_RESPONSE_LIMIT 0x04
typedef struct {
u_int32_t version;
u_int16_t port;
u_int32_t priority;
} DB_REPMGR_V1_HANDSHAKE;
#define SITE_ADDING 0x01
#define SITE_DELETING 0x02
#define SITE_PRESENT 0x04
#define IS_DEFERRABLE(t) ((t) == REPMGR_OWN_MSG || (t) == REPMGR_APP_MESSAGE)
#define RESERVED_MSG_TH(env) (IS_USING_LEASES(env) ? 2 : 1)
#define IS_SUBORDINATE(db_rep) (db_rep->listen_fd == INVALID_SOCKET)
#define IS_PEER_POLICY(p) ((p) == DB_REPMGR_ACKS_ALL_PEERS || \
(p) == DB_REPMGR_ACKS_QUORUM || \
(p) == DB_REPMGR_ACKS_ONE_PEER)
#define LOCK_MUTEX(m) do { \
if (__repmgr_lock_mutex(m) != 0) \
return (DB_RUNRECOVERY); \
} while (0)
#define UNLOCK_MUTEX(m) do { \
if (__repmgr_unlock_mutex(m) != 0) \
return (DB_RUNRECOVERY); \
} while (0)
#ifdef DB_WIN32
#define WOULDBLOCK WSAEWOULDBLOCK
#undef DB_REPMGR_EAGAIN
#define net_errno WSAGetLastError()
typedef int socklen_t;
typedef char * sockopt_t;
#define sendsocket(s, buf, len, flags) send((s), (buf), (int)(len), (flags))
#define iov_len len
#define iov_base buf
typedef DWORD threadsync_timeout_t;
#define REPMGR_INITED(db_rep) (db_rep->signaler != NULL)
#else
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#define WOULDBLOCK EWOULDBLOCK
#define DB_REPMGR_EAGAIN EAGAIN
#define net_errno errno
typedef void * sockopt_t;
#define sendsocket(s, buf, len, flags) send((s), (buf), (len), (flags))
#define closesocket(fd) close(fd)
typedef struct timespec threadsync_timeout_t;
#define REPMGR_INITED(db_rep) (db_rep->read_pipe >= 0)
#endif
#define SELECTOR_RUNNING(db_rep) ((db_rep)->selector != NULL)
typedef int (*CONNECTION_ACTION) __P((ENV *, REPMGR_CONNECTION *, void *));
typedef int (*PREDICATE) __P((ENV *, void *));
#include "dbinc_auto/repmgr_ext.h"
#if defined(__cplusplus)
}
#endif
#endif