#include "db_config.h"
#include "db_int.h"
typedef int (*HEARTBEAT_ACTION) __P((ENV *));
static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
static void check_min_log_file __P((ENV *));
static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *));
static int prepare_input __P((ENV *, REPMGR_CONNECTION *));
static int process_own_msg __P((ENV *, REPMGR_CONNECTION *));
static int process_parameters __P((ENV *,
REPMGR_CONNECTION *, char *, u_int, u_int32_t, int, u_int32_t));
static int read_version_response __P((ENV *, REPMGR_CONNECTION *));
static int record_permlsn __P((ENV *, REPMGR_CONNECTION *));
static int __repmgr_call_election __P((ENV *));
static int __repmgr_connector_main __P((ENV *, REPMGR_RUNNABLE *));
static void *__repmgr_connector_thread __P((void *));
static int __repmgr_next_timeout __P((ENV *,
db_timespec *, HEARTBEAT_ACTION *));
static int __repmgr_retry_connections __P((ENV *));
static int __repmgr_send_heartbeat __P((ENV *));
static int __repmgr_try_one __P((ENV *, int));
static int resolve_collision __P((ENV *, REPMGR_SITE *, REPMGR_CONNECTION *));
static int send_version_response __P((ENV *, REPMGR_CONNECTION *));
#define ONLY_HANDSHAKE(env, conn) do { \
if (conn->msg_type != REPMGR_HANDSHAKE) { \
__db_errx(env, DB_STR_A("3613", \
"unexpected msg type %d in state %d", "%d %d"), \
(int)conn->msg_type, conn->state); \
return (DB_REP_UNAVAIL); \
} \
} while (0)
void *
__repmgr_select_thread(argsp)
void *argsp;
{
REPMGR_RUNNABLE *args;
ENV *env;
int ret;
args = argsp;
env = args->env;
if ((ret = __repmgr_select_loop(env)) != 0) {
__db_err(env, ret, DB_STR("3614", "select loop failed"));
(void)__repmgr_thread_failure(env, ret);
}
return (NULL);
}
int
__repmgr_bow_out(env)
ENV *env;
{
DB_REP *db_rep;
int ret;
db_rep = env->rep_handle;
LOCK_MUTEX(db_rep->mutex);
ret = __repmgr_stop_threads(env);
UNLOCK_MUTEX(db_rep->mutex);
DB_EVENT(env, DB_EVENT_REP_LOCAL_SITE_REMOVED, NULL);
return (ret);
}
int
__repmgr_accept(env)
ENV *env;
{
DB_REP *db_rep;
REPMGR_CONNECTION *conn;
ACCEPT_ADDR siaddr;
socklen_t addrlen;
socket_t s;
int ret;
db_rep = env->rep_handle;
addrlen = sizeof(siaddr);
if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr,
&addrlen)) == -1) {
switch (ret = net_errno) {
#ifdef DB_WIN32
case WSAECONNRESET:
case WSAEWOULDBLOCK:
#else
case EINTR:
case EWOULDBLOCK:
case ECONNABORTED:
case ENETDOWN:
#ifdef EPROTO
case EPROTO:
#endif
case ENOPROTOOPT:
case EHOSTDOWN:
#ifdef ENONET
case ENONET:
#endif
case EHOSTUNREACH:
case EOPNOTSUPP:
case ENETUNREACH:
#endif
VPRINT(env, (env, DB_VERB_REPMGR_MISC,
"accept error %d considered innocuous", ret));
return (0);
default:
__db_err(env, ret, DB_STR("3615", "accept error"));
return (ret);
}
}
RPRINT(env, (env, DB_VERB_REPMGR_MISC, "accepted a new connection"));
if ((ret =
__repmgr_new_connection(env, &conn, s, CONN_NEGOTIATE)) != 0) {
(void)closesocket(s);
return (ret);
}
if ((ret = __repmgr_set_keepalive(env, conn)) != 0) {
(void)__repmgr_destroy_conn(env, conn);
return (ret);
}
if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
__db_err(env, ret, DB_STR("3616",
"can't set nonblock after accept"));
(void)__repmgr_destroy_conn(env, conn);
return (ret);
}
conn->eid = -1;
TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries);
conn->ref_count++;
return (0);
}
int
__repmgr_compute_timeout(env, timeout)
ENV *env;
db_timespec *timeout;
{
DB_REP *db_rep;
REPMGR_RETRY *retry;
db_timespec now, t;
int have_timeout;
db_rep = env->rep_handle;
have_timeout = __repmgr_next_timeout(env, &t, NULL);
if (!TAILQ_EMPTY(&db_rep->retries)) {
retry = TAILQ_FIRST(&db_rep->retries);
if (have_timeout) {
t = timespeccmp(&retry->time, &t, <) ? retry->time : t;
} else {
t = retry->time;
have_timeout = TRUE;
}
}
if (have_timeout) {
__os_gettime(env, &now, 1);
if (timespeccmp(&now, &t, >=))
timespecclear(timeout);
else {
*timeout = t;
timespecsub(timeout, &now);
}
}
return (have_timeout);
}
static int
__repmgr_next_timeout(env, deadline, action)
ENV *env;
db_timespec *deadline;
HEARTBEAT_ACTION *action;
{
DB_REP *db_rep;
REP *rep;
HEARTBEAT_ACTION my_action;
REPMGR_CONNECTION *conn;
REPMGR_SITE *master;
db_timespec t;
u_int32_t version;
db_rep = env->rep_handle;
rep = db_rep->region;
if (rep->master_id == db_rep->self_eid &&
rep->heartbeat_frequency > 0) {
t = db_rep->last_bcast;
TIMESPEC_ADD_DB_TIMEOUT(&t, rep->heartbeat_frequency);
my_action = __repmgr_send_heartbeat;
} else if ((master = __repmgr_connected_master(env)) != NULL &&
!IS_SUBORDINATE(db_rep) &&
rep->heartbeat_monitor_timeout > 0) {
version = 0;
if ((conn = master->ref.conn.in) != NULL &&
IS_READY_STATE(conn->state))
version = conn->version;
if ((conn = master->ref.conn.out) != NULL &&
IS_READY_STATE(conn->state) &&
conn->version > version)
version = conn->version;
if (version >= HEARTBEAT_MIN_VERSION) {
t = master->last_rcvd_timestamp;
TIMESPEC_ADD_DB_TIMEOUT(&t,
rep->heartbeat_monitor_timeout);
my_action = __repmgr_call_election;
} else
return (FALSE);
} else
return (FALSE);
*deadline = t;
if (action != NULL)
*action = my_action;
return (TRUE);
}
static int
__repmgr_send_heartbeat(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
DBT control, rec;
__repmgr_permlsn_args permlsn;
u_int8_t buf[__REPMGR_PERMLSN_SIZE];
u_int unused1, unused2;
int ret, unused3;
db_rep = env->rep_handle;
rep = db_rep->region;
permlsn.generation = rep->gen;
if ((ret = __rep_get_maxpermlsn(env, &permlsn.lsn)) != 0)
return (ret);
__repmgr_permlsn_marshal(env, &permlsn, buf);
control.data = buf;
control.size = __REPMGR_PERMLSN_SIZE;
DB_INIT_DBT(rec, NULL, 0);
return (__repmgr_send_broadcast(env,
REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3));
}
REPMGR_SITE *
__repmgr_connected_master(env)
ENV *env;
{
DB_REP *db_rep;
REPMGR_SITE *master;
int master_id;
db_rep = env->rep_handle;
master_id = db_rep->region->master_id;
if (!IS_KNOWN_REMOTE_SITE(master_id))
return (NULL);
master = SITE_FROM_EID(master_id);
if (master->state == SITE_CONNECTED)
return (master);
return (NULL);
}
static int
__repmgr_call_election(env)
ENV *env;
{
REPMGR_CONNECTION *conn;
REPMGR_SITE *master;
int ret;
master = __repmgr_connected_master(env);
if (master == NULL)
return (0);
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"heartbeat monitor timeout expired"));
STAT(env->rep_handle->region->mstat.st_connection_drop++);
if ((conn = master->ref.conn.in) != NULL &&
(ret = __repmgr_bust_connection(env, conn)) != 0)
return (ret);
if ((conn = master->ref.conn.out) != NULL &&
(ret = __repmgr_bust_connection(env, conn)) != 0)
return (ret);
return (0);
}
int
__repmgr_check_timeouts(env)
ENV *env;
{
db_timespec when, now;
HEARTBEAT_ACTION action;
int ret;
if (__repmgr_next_timeout(env, &when, &action)) {
__os_gettime(env, &now, 1);
if (timespeccmp(&when, &now, <=) &&
(ret = (*action)(env)) != 0)
return (ret);
}
return (__repmgr_retry_connections(env));
}
static int
__repmgr_retry_connections(env)
ENV *env;
{
DB_REP *db_rep;
REPMGR_SITE *site;
REPMGR_RETRY *retry;
db_timespec now;
int eid, ret;
db_rep = env->rep_handle;
__os_gettime(env, &now, 1);
while (!TAILQ_EMPTY(&db_rep->retries)) {
retry = TAILQ_FIRST(&db_rep->retries);
if (timespeccmp(&retry->time, &now, >=))
break;
TAILQ_REMOVE(&db_rep->retries, retry, entries);
eid = retry->eid;
__os_free(env, retry);
DB_ASSERT(env, IS_VALID_EID(eid));
site = SITE_FROM_EID(eid);
DB_ASSERT(env, site->state == SITE_PAUSING);
if (site->membership == SITE_PRESENT) {
if ((ret = __repmgr_try_one(env, eid)) != 0)
return (ret);
} else
site->state = SITE_IDLE;
}
return (0);
}
int
__repmgr_first_try_connections(env)
ENV *env;
{
DB_REP *db_rep;
REPMGR_SITE *site;
int eid, ret;
db_rep = env->rep_handle;
FOR_EACH_REMOTE_SITE_INDEX(eid) {
site = SITE_FROM_EID(eid);
if (site->state == SITE_IDLE &&
site->membership == SITE_PRESENT &&
(ret = __repmgr_try_one(env, eid)) != 0)
return (ret);
}
return (0);
}
static int
__repmgr_try_one(env, eid)
ENV *env;
int eid;
{
DB_REP *db_rep;
REPMGR_SITE *site;
REPMGR_RUNNABLE *th;
int ret;
db_rep = env->rep_handle;
DB_ASSERT(env, IS_VALID_EID(eid));
site = SITE_FROM_EID(eid);
th = site->connector;
if (th == NULL) {
if ((ret = __os_malloc(env, sizeof(REPMGR_RUNNABLE), &th)) != 0)
return (ret);
site->connector = th;
} else if (th->finished) {
if ((ret = __repmgr_thread_join(th)) != 0)
return (ret);
} else {
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"eid %lu previous connector thread still running; will retry",
(u_long)eid));
return (__repmgr_schedule_connection_attempt(env,
eid, FALSE));
}
site->state = SITE_CONNECTING;
th->run = __repmgr_connector_thread;
th->args.eid = eid;
if ((ret = __repmgr_thread_start(env, th)) != 0) {
__os_free(env, th);
site->connector = NULL;
}
return (ret);
}
static void *
__repmgr_connector_thread(argsp)
void *argsp;
{
REPMGR_RUNNABLE *th;
ENV *env;
int ret;
th = argsp;
env = th->env;
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"starting connector thread, eid %u", th->args.eid));
if ((ret = __repmgr_connector_main(env, th)) != 0) {
__db_err(env, ret, DB_STR("3617", "connector thread failed"));
(void)__repmgr_thread_failure(env, ret);
}
RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connector thread is exiting"));
th->finished = TRUE;
return (NULL);
}
static int
__repmgr_connector_main(env, th)
ENV *env;
REPMGR_RUNNABLE *th;
{
DB_REP *db_rep;
REPMGR_SITE *site;
REPMGR_CONNECTION *conn;
DB_REPMGR_CONN_ERR info;
repmgr_netaddr_t netaddr;
SITE_STRING_BUFFER site_string;
int err, ret, t_ret;
db_rep = env->rep_handle;
ret = 0;
LOCK_MUTEX(db_rep->mutex);
DB_ASSERT(env, IS_VALID_EID(th->args.eid));
site = SITE_FROM_EID(th->args.eid);
if (site->state != SITE_CONNECTING && db_rep->repmgr_status == stopped)
goto unlock;
netaddr = site->net_addr;
RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connecting to %s",
__repmgr_format_site_loc(site, site_string)));
UNLOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_connect(env, &netaddr, &conn, &err)) == 0) {
DB_EVENT(env, DB_EVENT_REP_CONNECT_ESTD, &th->args.eid);
LOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
__db_err(env, ret, DB_STR("3618",
"set_nonblock in connnect thread"));
goto cleanup;
}
conn->type = REP_CONNECTION;
site = SITE_FROM_EID(th->args.eid);
if (site->state != SITE_CONNECTING ||
db_rep->repmgr_status == stopped)
goto cleanup;
conn->eid = th->args.eid;
site = SITE_FROM_EID(th->args.eid);
site->ref.conn.out = conn;
site->state = SITE_CONNECTED;
__os_gettime(env, &site->last_rcvd_timestamp, 1);
ret = __repmgr_wake_main_thread(env);
} else if (ret == DB_REP_UNAVAIL) {
info.eid = th->args.eid;
info.error = err;
DB_EVENT(env, DB_EVENT_REP_CONNECT_TRY_FAILED, &info);
STAT(db_rep->region->mstat.st_connect_fail++);
LOCK_MUTEX(db_rep->mutex);
site = SITE_FROM_EID(th->args.eid);
if (site->state != SITE_CONNECTING ||
db_rep->repmgr_status == stopped) {
ret = 0;
goto unlock;
}
ret = __repmgr_schedule_connection_attempt(env,
th->args.eid, FALSE);
} else
goto out;
if (0) {
cleanup:
if ((t_ret = __repmgr_destroy_conn(env, conn)) != 0 &&
ret == 0)
ret = t_ret;
}
unlock:
UNLOCK_MUTEX(db_rep->mutex);
out:
return (ret);
}
int
__repmgr_send_v1_handshake(env, conn, buf, len)
ENV *env;
REPMGR_CONNECTION *conn;
void *buf;
size_t len;
{
DB_REP *db_rep;
REP *rep;
repmgr_netaddr_t *my_addr;
DB_REPMGR_V1_HANDSHAKE buffer;
DBT cntrl, rec;
db_rep = env->rep_handle;
rep = db_rep->region;
my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
memset(&buffer, 0, sizeof(buffer));
buffer.version = 1;
buffer.priority = htonl(rep->priority);
buffer.port = my_addr->port;
cntrl.data = &buffer;
cntrl.size = sizeof(buffer);
rec.data = buf;
rec.size = (u_int32_t)len;
return (__repmgr_send_one(env,
conn, REPMGR_HANDSHAKE, &cntrl, &rec, 0));
}
int
__repmgr_read_from_site(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
REPMGR_SITE *site;
int ret;
db_rep = env->rep_handle;
for (;;) {
switch ((ret = __repmgr_read_conn(conn))) {
#ifndef DB_WIN32
case EINTR:
continue;
#endif
#if defined(DB_REPMGR_EAGAIN) && DB_REPMGR_EAGAIN != WOULDBLOCK
case DB_REPMGR_EAGAIN:
#endif
case WOULDBLOCK:
return (0);
case DB_REP_UNAVAIL:
__repmgr_fire_conn_err_event(env, conn, 0);
STAT(env->rep_handle->
region->mstat.st_connection_drop++);
return (DB_REP_UNAVAIL);
case 0:
if (IS_VALID_EID(conn->eid)) {
site = SITE_FROM_EID(conn->eid);
__os_gettime(env,
&site->last_rcvd_timestamp, 1);
}
return (conn->reading_phase == SIZES_PHASE ?
prepare_input(env, conn) :
dispatch_msgin(env, conn));
default:
#ifdef EBADF
DB_ASSERT(env, ret != EBADF);
#endif
__repmgr_fire_conn_err_event(env, conn, ret);
STAT(db_rep->region->mstat.st_connection_drop++);
return (DB_REP_UNAVAIL);
}
}
}
int
__repmgr_read_conn(conn)
REPMGR_CONNECTION *conn;
{
size_t nr;
int ret;
for (;;) {
if ((ret = __repmgr_readv(conn->fd,
&conn->iovecs.vectors[conn->iovecs.offset],
conn->iovecs.count - conn->iovecs.offset, &nr)) != 0)
return (ret);
if (nr == 0)
return (DB_REP_UNAVAIL);
if (__repmgr_update_consumed(&conn->iovecs, nr)) {
return (0);
}
}
}
static int
prepare_input(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
#define MEM_ALIGN sizeof(double)
DBT *dbt;
__repmgr_msg_hdr_args msg_hdr;
REPMGR_RESPONSE *resp;
u_int32_t control_size, rec_size, size;
size_t memsize, control_offset, rec_offset;
void *membase;
int ret, skip;
DB_ASSERT(env, conn->reading_phase == SIZES_PHASE);
ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
DB_ASSERT(env, ret == 0);
__repmgr_iovec_init(&conn->iovecs);
skip = FALSE;
switch ((conn->msg_type = msg_hdr.type)) {
case REPMGR_HEARTBEAT:
case REPMGR_REP_MESSAGE:
env->rep_handle->seen_repmsg = TRUE;
control_size = REP_MSG_CONTROL_SIZE(msg_hdr);
rec_size = REP_MSG_REC_SIZE(msg_hdr);
if (control_size == 0) {
if (conn->msg_type == REPMGR_HEARTBEAT) {
skip = TRUE;
break;
} else {
__db_errx(env, DB_STR("3619",
"illegal size for rep msg"));
return (DB_REP_UNAVAIL);
}
}
memsize = DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN);
control_offset = memsize;
memsize += control_size;
if (rec_size > 0) {
memsize = DB_ALIGN(memsize, MEM_ALIGN);
rec_offset = memsize;
memsize += rec_size;
} else
COMPQUIET(rec_offset, 0);
if ((ret = __os_malloc(env, memsize, &membase)) != 0)
return (ret);
conn->input.rep_message = membase;
conn->input.rep_message->msg_hdr = msg_hdr;
conn->input.rep_message->v.repmsg.originating_eid = conn->eid;
DB_INIT_DBT(conn->input.rep_message->v.repmsg.control,
(u_int8_t*)membase + control_offset, control_size);
__repmgr_add_dbt(&conn->iovecs,
&conn->input.rep_message->v.repmsg.control);
if (rec_size > 0) {
DB_INIT_DBT(conn->input.rep_message->v.repmsg.rec,
(rec_size > 0 ?
(u_int8_t*)membase + rec_offset : NULL),
rec_size);
__repmgr_add_dbt(&conn->iovecs,
&conn->input.rep_message->v.repmsg.rec);
} else
DB_INIT_DBT(conn->input.rep_message->v.repmsg.rec,
NULL, 0);
break;
case REPMGR_APP_MESSAGE:
size = DB_ALIGN((size_t)(sizeof(REPMGR_MESSAGE) +
APP_MSG_SEGMENT_COUNT(msg_hdr) * sizeof(DBT)),
MEM_ALIGN);
memsize = size + APP_MSG_BUFFER_SIZE(msg_hdr);
if ((ret = __os_malloc(env, memsize, &membase)) != 0)
return (ret);
conn->input.rep_message = membase;
conn->input.rep_message->msg_hdr = msg_hdr;
conn->input.rep_message->v.appmsg.conn = conn;
DB_INIT_DBT(conn->input.rep_message->v.appmsg.buf,
(u_int8_t*)membase + size,
APP_MSG_BUFFER_SIZE(msg_hdr));
__repmgr_add_dbt(&conn->iovecs,
&conn->input.rep_message->v.appmsg.buf);
break;
case REPMGR_OWN_MSG:
size = sizeof(REPMGR_MESSAGE) + REPMGR_OWN_BUF_SIZE(msg_hdr);
if ((ret = __os_malloc(env, size, &membase)) != 0)
return (ret);
conn->input.rep_message = membase;
conn->input.rep_message->msg_hdr = msg_hdr;
if (REPMGR_OWN_BUF_SIZE(msg_hdr) == 0) {
__db_errx(env, DB_STR_A("3680",
"invalid own buf size %lu in prepare_input", "%lu"),
(u_long)REPMGR_OWN_BUF_SIZE(msg_hdr));
return (DB_REP_UNAVAIL);
}
DB_INIT_DBT(conn->input.rep_message->v.gmdb_msg.request,
(u_int8_t*)membase + sizeof(REPMGR_MESSAGE),
REPMGR_OWN_BUF_SIZE(msg_hdr));
__repmgr_add_dbt(&conn->iovecs,
&conn->input.rep_message->v.gmdb_msg.request);
break;
case REPMGR_APP_RESPONSE:
size = APP_RESP_BUFFER_SIZE(msg_hdr);
conn->cur_resp = APP_RESP_TAG(msg_hdr);
if (conn->cur_resp >= conn->aresp) {
__db_errx(env, DB_STR_A("3681",
"invalid cur resp %lu in prepare_input", "%lu"),
(u_long)conn->cur_resp);
return (DB_REP_UNAVAIL);
}
resp = &conn->responses[conn->cur_resp];
DB_ASSERT(env, F_ISSET(resp, RESP_IN_USE));
dbt = &resp->dbt;
ret = 0;
if (!F_ISSET(resp, RESP_THREAD_WAITING)) {
if (size > 0) {
memset(dbt, 0, sizeof(*dbt));
ret = __os_malloc(env, size, &dbt->data);
F_SET(resp, RESP_DUMMY_BUF);
} else
F_CLR(resp, RESP_IN_USE);
} else if (F_ISSET(dbt, DB_DBT_MALLOC))
ret = __os_umalloc(env, size, &dbt->data);
else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
if (dbt->data == NULL || dbt->size < size)
ret = __os_urealloc(env, size, &dbt->data);
} else if (F_ISSET(dbt, DB_DBT_USERMEM)) {
DB_ASSERT(env, size <= dbt->ulen);
}
dbt->size = size;
if (ret != 0)
return (ret);
if (size > 0) {
__repmgr_add_dbt(&conn->iovecs, dbt);
F_SET(resp, RESP_READING);
} else {
skip = TRUE;
if (F_ISSET(resp, RESP_THREAD_WAITING)) {
F_SET(resp, RESP_COMPLETE);
if ((ret = __repmgr_wake_waiters(env,
&conn->response_waiters)) != 0)
return (ret);
}
}
break;
case REPMGR_RESP_ERROR:
DB_ASSERT(env, RESP_ERROR_TAG(msg_hdr) < conn->aresp &&
conn->responses != NULL);
resp = &conn->responses[RESP_ERROR_TAG(msg_hdr)];
DB_ASSERT(env, !F_ISSET(resp, RESP_READING));
if (F_ISSET(resp, RESP_THREAD_WAITING)) {
F_SET(resp, RESP_COMPLETE);
resp->ret = -((int)RESP_ERROR_CODE(msg_hdr));
if ((ret = __repmgr_wake_waiters(env,
&conn->response_waiters)) != 0)
return (ret);
} else
F_CLR(resp, RESP_IN_USE);
skip = TRUE;
break;
case REPMGR_HANDSHAKE:
case REPMGR_PERMLSN:
if ((ret = __repmgr_prepare_simple_input(env,
conn, &msg_hdr)) != 0)
return (ret);
break;
default:
__db_errx(env, DB_STR_A("3676",
"unexpected msg type %lu in prepare_input", "%lu"),
(u_long)conn->msg_type);
return (DB_REP_UNAVAIL);
}
if (skip) {
__repmgr_reset_for_reading(conn);
} else
conn->reading_phase = DATA_PHASE;
return (0);
}
int
__repmgr_prepare_simple_input(env, conn, msg_hdr)
ENV *env;
REPMGR_CONNECTION *conn;
__repmgr_msg_hdr_args *msg_hdr;
{
DBT *dbt;
u_int32_t control_size, rec_size;
int ret;
control_size = REP_MSG_CONTROL_SIZE(*msg_hdr);
rec_size = REP_MSG_REC_SIZE(*msg_hdr);
dbt = &conn->input.repmgr_msg.cntrl;
if ((dbt->size = control_size) > 0) {
if ((ret = __os_malloc(env,
dbt->size, &dbt->data)) != 0)
return (ret);
__repmgr_add_dbt(&conn->iovecs, dbt);
}
dbt = &conn->input.repmgr_msg.rec;
if ((dbt->size = rec_size) > 0) {
if ((ret = __os_malloc(env,
dbt->size, &dbt->data)) != 0) {
dbt = &conn->input.repmgr_msg.cntrl;
if (dbt->size > 0)
__os_free(env, dbt->data);
return (ret);
}
__repmgr_add_dbt(&conn->iovecs, dbt);
}
return (0);
}
static int
dispatch_msgin(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
REPMGR_SITE *site;
REPMGR_RUNNABLE *th;
REPMGR_RESPONSE *resp;
DBT *dbt;
char *hostname;
int eid, ret;
DB_ASSERT(env, conn->reading_phase == DATA_PHASE);
db_rep = env->rep_handle;
switch (conn->state) {
case CONN_CONNECTED:
ONLY_HANDSHAKE(env, conn);
eid = conn->eid;
DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(conn->eid));
site = SITE_FROM_EID(eid);
th = site->connector;
if (th != NULL && th->finished) {
if ((ret = __repmgr_thread_join(th)) != 0)
return (ret);
__os_free(env, th);
site->connector = NULL;
}
if ((ret = read_version_response(env, conn)) != 0)
return (ret);
break;
case CONN_NEGOTIATE:
ONLY_HANDSHAKE(env, conn);
if ((ret = send_version_response(env, conn)) != 0)
return (ret);
break;
case CONN_PARAMETERS:
switch (conn->msg_type) {
case REPMGR_HANDSHAKE:
dbt = &conn->input.repmgr_msg.rec;
hostname = dbt->data;
hostname[dbt->size-1] = '\0';
if ((ret = accept_handshake(env, conn, hostname)) != 0)
return (ret);
conn->state = CONN_READY;
break;
case REPMGR_OWN_MSG:
conn->input.rep_message->v.gmdb_msg.conn = conn;
TAILQ_REMOVE(&db_rep->connections, conn, entries);
if ((ret = __repmgr_queue_put(env,
conn->input.rep_message)) != 0)
return (ret);
break;
default:
__db_errx(env, DB_STR_A("3620",
"unexpected msg type %d in PARAMETERS state", "%d"),
(int)conn->msg_type);
return (DB_REP_UNAVAIL);
}
break;
case CONN_READY:
case CONN_CONGESTED:
switch (conn->msg_type) {
case REPMGR_PERMLSN:
if ((ret = record_permlsn(env, conn)) != 0)
return (ret);
break;
case REPMGR_HEARTBEAT:
case REPMGR_APP_MESSAGE:
case REPMGR_REP_MESSAGE:
if ((ret = __repmgr_queue_put(env,
conn->input.rep_message)) != 0)
return (ret);
if (conn->msg_type == REPMGR_APP_MESSAGE)
conn->ref_count++;
break;
case REPMGR_OWN_MSG:
conn->input.rep_message->v.gmdb_msg.conn = NULL;
if ((ret = process_own_msg(env, conn)) != 0)
return (ret);
break;
case REPMGR_APP_RESPONSE:
DB_ASSERT(env, conn->cur_resp < conn->aresp &&
conn->responses != NULL);
resp = &conn->responses[conn->cur_resp];
DB_ASSERT(env, F_ISSET(resp, RESP_READING));
F_CLR(resp, RESP_READING);
if (F_ISSET(resp, RESP_THREAD_WAITING)) {
F_SET(resp, RESP_COMPLETE);
if ((ret = __repmgr_wake_waiters(env,
&conn->response_waiters)) != 0)
return (ret);
} else {
DB_ASSERT(env, F_ISSET(resp, RESP_DUMMY_BUF));
__os_free(env, resp->dbt.data);
F_CLR(resp, RESP_IN_USE);
}
break;
case REPMGR_RESP_ERROR:
default:
__db_errx(env, DB_STR_A("3621",
"unexpected msg type rcvd in ready state: %d",
"%d"), (int)conn->msg_type);
return (DB_REP_UNAVAIL);
}
break;
case CONN_DEFUNCT:
break;
default:
DB_ASSERT(env, FALSE);
}
switch (conn->msg_type) {
case REPMGR_HANDSHAKE:
case REPMGR_PERMLSN:
dbt = &conn->input.repmgr_msg.cntrl;
if (dbt->size > 0)
__os_free(env, dbt->data);
dbt = &conn->input.repmgr_msg.rec;
if (dbt->size > 0)
__os_free(env, dbt->data);
break;
default:
break;
}
__repmgr_reset_for_reading(conn);
return (0);
}
static int
process_own_msg(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
DBT *dbt;
REPMGR_SITE *site;
REPMGR_MESSAGE *msg;
__repmgr_connect_reject_args reject;
__repmgr_parm_refresh_args parms;
int ret;
ret = 0;
switch (REPMGR_OWN_MSG_TYPE((msg = conn->input.rep_message)->msg_hdr)) {
case REPMGR_CONNECT_REJECT:
dbt = &msg->v.gmdb_msg.request;
if ((ret = __repmgr_connect_reject_unmarshal(env,
&reject, dbt->data, dbt->size, NULL)) != 0)
return (DB_REP_UNAVAIL);
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"got rejection msg citing version %lu/%lu",
(u_long)reject.gen, (u_long)reject.version));
if (__repmgr_gmdb_version_cmp(env,
reject.gen, reject.version) > 0) {
if (env->rep_handle->seen_repmsg)
ret = DB_DELETED;
else if ((ret = __repmgr_defer_op(env,
REPMGR_REJOIN)) == 0)
ret = DB_REP_UNAVAIL;
} else
ret = DB_REP_UNAVAIL;
DB_ASSERT(env, ret != 0);
return (ret);
case REPMGR_SHARING:
if ((ret = __repmgr_queue_put(env, msg)) != 0)
return (ret);
msg = NULL;
break;
case REPMGR_PARM_REFRESH:
dbt = &conn->input.rep_message->v.gmdb_msg.request;
if ((ret = __repmgr_parm_refresh_unmarshal(env,
&parms, dbt->data, dbt->size, NULL)) != 0)
return (DB_REP_UNAVAIL);
db_rep = env->rep_handle;
DB_ASSERT(env, conn->type == REP_CONNECTION &&
IS_KNOWN_REMOTE_SITE(conn->eid));
site = SITE_FROM_EID(conn->eid);
site->ack_policy = (int)parms.ack_policy;
if (F_ISSET(&parms, ELECTABLE_SITE))
F_SET(site, SITE_ELECTABLE);
else
F_CLR(site, SITE_ELECTABLE);
F_SET(site, SITE_HAS_PRIO);
break;
case REPMGR_GM_FAILURE:
case REPMGR_GM_FORWARD:
case REPMGR_JOIN_REQUEST:
case REPMGR_JOIN_SUCCESS:
case REPMGR_REMOVE_REQUEST:
case REPMGR_RESOLVE_LIMBO:
default:
__db_errx(env, DB_STR_A("3677",
"unexpected msg type %lu in process_own_msg", "%lu"),
(u_long)REPMGR_OWN_MSG_TYPE(msg->msg_hdr));
return (DB_REP_UNAVAIL);
}
if (msg != NULL)
__os_free(env, msg);
return (ret);
}
static int
send_version_response(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
__repmgr_version_proposal_args versions;
__repmgr_version_confirmation_args conf;
repmgr_netaddr_t *my_addr;
char *hostname;
u_int8_t buf[__REPMGR_VERSION_CONFIRMATION_SIZE+1];
DBT vi;
int ret;
db_rep = env->rep_handle;
my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
return (ret);
if (vi.size == 0) {
hostname = conn->input.repmgr_msg.rec.data;
if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
return (ret);
if ((ret = __repmgr_send_v1_handshake(env,
conn, my_addr->host, strlen(my_addr->host) + 1)) != 0)
return (ret);
conn->state = CONN_READY;
} else {
if ((ret = __repmgr_version_proposal_unmarshal(env,
&versions, vi.data, vi.size, NULL)) != 0)
return (DB_REP_UNAVAIL);
if (DB_REPMGR_VERSION >= versions.min &&
DB_REPMGR_VERSION <= versions.max)
conf.version = DB_REPMGR_VERSION;
else if (versions.max >= DB_REPMGR_MIN_VERSION &&
versions.max <= DB_REPMGR_VERSION)
conf.version = versions.max;
else {
__db_errx(env, DB_STR_A("3622",
"No available version between %lu and %lu",
"%lu %lu"), (u_long)versions.min,
(u_long)versions.max);
return (DB_REP_UNAVAIL);
}
conn->version = conf.version;
__repmgr_version_confirmation_marshal(env, &conf, buf);
buf[__REPMGR_VERSION_CONFIRMATION_SIZE] = '\0';
DB_ASSERT(env, !IS_SUBORDINATE(db_rep));
if ((ret = __repmgr_send_handshake(env,
conn, buf, sizeof(buf), 0)) != 0)
return (ret);
conn->state = CONN_PARAMETERS;
}
return (ret);
}
int
__repmgr_send_handshake(env, conn, opt, optlen, flags)
ENV *env;
REPMGR_CONNECTION *conn;
void *opt;
size_t optlen;
u_int32_t flags;
{
DB_REP *db_rep;
REP *rep;
DBT cntrl, rec;
__repmgr_handshake_args hs;
__repmgr_v2handshake_args v2hs;
__repmgr_v3handshake_args v3hs;
repmgr_netaddr_t *my_addr;
size_t hostname_len, rec_len;
void *buf;
u_int8_t *p;
u_int32_t cntrl_len;
int ret;
db_rep = env->rep_handle;
rep = db_rep->region;
my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
switch (conn->version) {
case 2:
cntrl_len = __REPMGR_V2HANDSHAKE_SIZE;
break;
case 3:
cntrl_len = __REPMGR_V3HANDSHAKE_SIZE;
break;
case 4:
cntrl_len = __REPMGR_HANDSHAKE_SIZE;
break;
default:
__db_errx(env, DB_STR_A("3678",
"unexpected conn version %lu in send_handshake", "%lu"),
(u_long)conn->version);
return (DB_REP_UNAVAIL);
}
hostname_len = strlen(my_addr->host);
rec_len = hostname_len + 1 +
(opt == NULL ? 0 : optlen);
if ((ret = __os_malloc(env, cntrl_len + rec_len, &buf)) != 0)
return (ret);
cntrl.data = p = buf;
switch (conn->version) {
case 2:
DB_ASSERT(env, !IS_SUBORDINATE(db_rep));
v2hs.port = my_addr->port;
v2hs.priority = rep->priority;
__repmgr_v2handshake_marshal(env, &v2hs, p);
break;
case 3:
v3hs.port = my_addr->port;
v3hs.priority = rep->priority;
v3hs.flags = flags;
__repmgr_v3handshake_marshal(env, &v3hs, p);
break;
case 4:
hs.port = my_addr->port;
hs.alignment = MEM_ALIGN;
hs.ack_policy = (u_int32_t)rep->perm_policy;
hs.flags = flags;
if (rep->priority > 0)
F_SET(&hs, ELECTABLE_SITE);
__repmgr_handshake_marshal(env, &hs, p);
break;
default:
DB_ASSERT(env, FALSE);
break;
}
cntrl.size = cntrl_len;
p = rec.data = &p[cntrl_len];
(void)strcpy((char*)p, my_addr->host);
p += hostname_len + 1;
if (opt != NULL) {
memcpy(p, opt, optlen);
p += optlen;
}
rec.size = (u_int32_t)(p - (u_int8_t*)rec.data);
ret = __repmgr_send_one(env,
conn, REPMGR_HANDSHAKE, &cntrl, &rec, 0);
__os_free(env, buf);
return (ret);
}
static int
read_version_response(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
__repmgr_version_confirmation_args conf;
DBT vi;
char *hostname;
u_int32_t flags;
int ret;
db_rep = env->rep_handle;
if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
return (ret);
hostname = conn->input.repmgr_msg.rec.data;
if (vi.size == 0) {
if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
return (ret);
} else {
if ((ret = __repmgr_version_confirmation_unmarshal(env,
&conf, vi.data, vi.size, NULL)) != 0)
return (DB_REP_UNAVAIL);
if (conf.version >= DB_REPMGR_MIN_VERSION &&
conf.version <= DB_REPMGR_VERSION)
conn->version = conf.version;
else {
__db_errx(env, DB_STR_A("3623",
"Can't support confirmed version %lu", "%lu"),
(u_long)conf.version);
return (DB_REP_UNAVAIL);
}
if ((ret = accept_handshake(env, conn, hostname)) != 0)
return (ret);
flags = IS_SUBORDINATE(db_rep) ? REPMGR_SUBORDINATE : 0;
if ((ret = __repmgr_send_handshake(env,
conn, NULL, 0, flags)) != 0)
return (ret);
}
conn->state = CONN_READY;
return (ret);
}
int
__repmgr_find_version_info(env, conn, vi)
ENV *env;
REPMGR_CONNECTION *conn;
DBT *vi;
{
DBT *dbt;
char *hostname;
u_int32_t hostname_len;
dbt = &conn->input.repmgr_msg.rec;
if (dbt->size == 0) {
__db_errx(env, DB_STR("3624",
"handshake is missing rec part"));
return (DB_REP_UNAVAIL);
}
hostname = dbt->data;
hostname[dbt->size-1] = '\0';
hostname_len = (u_int32_t)strlen(hostname);
if (hostname_len + 1 == dbt->size) {
vi->size = 0;
} else {
vi->data = &((u_int8_t *)dbt->data)[hostname_len + 1];
vi->size = (dbt->size - (hostname_len+1)) - 1;
}
return (0);
}
static int
accept_handshake(env, conn, hostname)
ENV *env;
REPMGR_CONNECTION *conn;
char *hostname;
{
__repmgr_handshake_args hs;
__repmgr_v2handshake_args hs2;
__repmgr_v3handshake_args hs3;
u_int port;
u_int32_t ack, flags;
int electable;
switch (conn->version) {
case 2:
if (__repmgr_v2handshake_unmarshal(env, &hs2,
conn->input.repmgr_msg.cntrl.data,
conn->input.repmgr_msg.cntrl.size, NULL) != 0)
return (DB_REP_UNAVAIL);
port = hs2.port;
electable = hs2.priority > 0;
ack = flags = 0;
break;
case 3:
if (__repmgr_v3handshake_unmarshal(env, &hs3,
conn->input.repmgr_msg.cntrl.data,
conn->input.repmgr_msg.cntrl.size, NULL) != 0)
return (DB_REP_UNAVAIL);
port = hs3.port;
electable = hs3.priority > 0;
flags = hs3.flags;
ack = 0;
break;
case 4:
if (__repmgr_handshake_unmarshal(env, &hs,
conn->input.repmgr_msg.cntrl.data,
conn->input.repmgr_msg.cntrl.size, NULL) != 0)
return (DB_REP_UNAVAIL);
port = hs.port;
electable = F_ISSET(&hs, ELECTABLE_SITE);
flags = hs.flags;
ack = hs.ack_policy;
break;
default:
__db_errx(env, DB_STR_A("3679",
"unexpected conn version %lu in accept_handshake", "%lu"),
(u_long)conn->version);
return (DB_REP_UNAVAIL);
}
return (process_parameters(env,
conn, hostname, port, ack, electable, flags));
}
static int
accept_v1_handshake(env, conn, hostname)
ENV *env;
REPMGR_CONNECTION *conn;
char *hostname;
{
DB_REPMGR_V1_HANDSHAKE *handshake;
u_int32_t prio;
int electable;
handshake = conn->input.repmgr_msg.cntrl.data;
if (conn->input.repmgr_msg.cntrl.size != sizeof(*handshake) ||
handshake->version != 1) {
__db_errx(env, DB_STR("3625", "malformed V1 handshake"));
return (DB_REP_UNAVAIL);
}
conn->version = 1;
prio = ntohl(handshake->priority);
electable = prio > 0;
return (process_parameters(env,
conn, hostname, handshake->port, 0, electable, 0));
}
static int
process_parameters(env, conn, host, port, ack, electable, flags)
ENV *env;
REPMGR_CONNECTION *conn;
char *host;
u_int port;
int electable;
u_int32_t ack, flags;
{
DB_REP *db_rep;
REPMGR_RETRY *retry;
REPMGR_SITE *site;
__repmgr_connect_reject_args reject;
u_int8_t reject_buf[__REPMGR_CONNECT_REJECT_SIZE];
int eid, ret;
db_rep = env->rep_handle;
if (conn->state == CONN_CONNECTED) {
DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(conn->eid));
site = SITE_FROM_EID(conn->eid);
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"handshake from connection to %s:%lu EID %u",
site->net_addr.host,
(u_long)site->net_addr.port, conn->eid));
} else {
DB_ASSERT(env, conn->state == CONN_NEGOTIATE ||
conn->state == CONN_PARAMETERS);
if (LF_ISSET(APP_CHANNEL_CONNECTION)) {
conn->type = APP_CONNECTION;
return (0);
} else
conn->type = REP_CONNECTION;
if ((site = __repmgr_lookup_site(env, host, port)) != NULL &&
site->membership == SITE_PRESENT) {
TAILQ_REMOVE(&db_rep->connections, conn, entries);
conn->ref_count--;
eid = EID_FROM_SITE(site);
if (LF_ISSET(REPMGR_SUBORDINATE)) {
TAILQ_INSERT_TAIL(&site->sub_conns,
conn, entries);
conn->eid = eid;
} else {
DB_EVENT(env,
DB_EVENT_REP_CONNECT_ESTD, &eid);
switch (site->state) {
case SITE_PAUSING:
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"handshake from paused site %s:%u EID %u",
host, port, eid));
retry = site->ref.retry;
TAILQ_REMOVE(&db_rep->retries,
retry, entries);
__os_free(env, retry);
break;
case SITE_CONNECTED:
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"connection from %s:%u EID %u while already connected",
host, port, eid));
if ((ret = resolve_collision(env,
site, conn)) != 0)
return (ret);
break;
case SITE_CONNECTING:
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"handshake from connecting site %s:%u EID %u",
host, port, eid));
break;
default:
DB_ASSERT(env, FALSE);
}
conn->eid = eid;
site->state = SITE_CONNECTED;
site->ref.conn.in = conn;
__os_gettime(env,
&site->last_rcvd_timestamp, 1);
}
} else {
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"rejecting connection from unknown or provisional site %s:%u",
host, port));
reject.version = db_rep->membership_version;
reject.gen = db_rep->member_version_gen;
__repmgr_connect_reject_marshal(env,
&reject, reject_buf);
if ((ret = __repmgr_send_own_msg(env, conn,
REPMGR_CONNECT_REJECT, reject_buf,
__REPMGR_CONNECT_REJECT_SIZE)) != 0)
return (ret);
return (DB_REP_UNAVAIL);
}
}
if (electable)
F_SET(site, SITE_ELECTABLE);
else
F_CLR(site, SITE_ELECTABLE);
F_SET(site, SITE_HAS_PRIO);
site->ack_policy = (int)ack;
if (!IS_SUBORDINATE(db_rep) &&
!__repmgr_master_is_known(env) &&
!LF_ISSET(REPMGR_SUBORDINATE)) {
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"handshake with no known master to wake election thread"));
db_rep->new_connection = TRUE;
if ((ret = __repmgr_signal(&db_rep->check_election)) != 0)
return (ret);
}
return (0);
}
static int
resolve_collision(env, site, conn)
ENV *env;
REPMGR_SITE *site;
REPMGR_CONNECTION *conn;
{
int ret;
if (site->ref.conn.in != NULL) {
ret = __repmgr_disable_connection(env, site->ref.conn.in);
site->ref.conn.in = NULL;
if (ret != 0)
return (ret);
}
if (site->ref.conn.out != NULL &&
conn->version >= CONN_COLLISION_VERSION &&
__repmgr_is_server(env, site)) {
ret = __repmgr_disable_connection(env, site->ref.conn.out);
site->ref.conn.out = NULL;
if (ret != 0)
return (ret);
}
return (0);
}
static int
record_permlsn(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
REPMGR_SITE *site;
__repmgr_permlsn_args *ackp, ack;
SITE_STRING_BUFFER location;
u_int32_t gen;
int ret;
u_int do_log_check;
db_rep = env->rep_handle;
do_log_check = 0;
if (conn->version == 0 ||
!IS_READY_STATE(conn->state) || !IS_VALID_EID(conn->eid)) {
__db_errx(env, DB_STR("3682",
"unexpected connection info in record_permlsn"));
return (DB_REP_UNAVAIL);
}
site = SITE_FROM_EID(conn->eid);
if (conn->version == 1) {
ackp = conn->input.repmgr_msg.cntrl.data;
if (conn->input.repmgr_msg.cntrl.size != sizeof(ack) ||
conn->input.repmgr_msg.rec.size != 0) {
__db_errx(env, DB_STR("3627", "bad ack msg size"));
return (DB_REP_UNAVAIL);
}
} else {
ackp = &ack;
if ((ret = __repmgr_permlsn_unmarshal(env, ackp,
conn->input.repmgr_msg.cntrl.data,
conn->input.repmgr_msg.cntrl.size, NULL)) != 0)
return (DB_REP_UNAVAIL);
}
gen = db_rep->region->gen;
if (ackp->generation < gen) {
VPRINT(env, (env, DB_VERB_REPMGR_MISC,
"ignoring stale ack (%lu<%lu), from %s",
(u_long)ackp->generation, (u_long)gen,
__repmgr_format_site_loc(site, location)));
return (0);
}
VPRINT(env, (env, DB_VERB_REPMGR_MISC,
"got ack [%lu][%lu](%lu) from %s", (u_long)ackp->lsn.file,
(u_long)ackp->lsn.offset, (u_long)ackp->generation,
__repmgr_format_site_loc(site, location)));
if (ackp->generation == gen &&
LOG_COMPARE(&ackp->lsn, &site->max_ack) == 1) {
if (ackp->lsn.file > site->max_ack.file)
do_log_check = 1;
memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN));
if (do_log_check)
check_min_log_file(env);
if ((ret = __repmgr_wake_waiters(env,
&db_rep->ack_waiters)) != 0)
return (ret);
}
return (0);
}
static void
check_min_log_file(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
REPMGR_CONNECTION *conn;
REPMGR_SITE *site;
u_int32_t min_log;
int eid;
db_rep = env->rep_handle;
rep = db_rep->region;
min_log = 0;
FOR_EACH_REMOTE_SITE_INDEX(eid) {
if (eid == rep->master_id)
continue;
site = SITE_FROM_EID(eid);
if (site->state == SITE_CONNECTED &&
(((conn = site->ref.conn.in) != NULL &&
conn->state == CONN_READY) ||
((conn = site->ref.conn.out) != NULL &&
conn->state == CONN_READY)) &&
!IS_ZERO_LSN(site->max_ack) &&
(min_log == 0 || site->max_ack.file < min_log))
min_log = site->max_ack.file;
}
if (min_log != 0 && min_log != rep->min_log_file)
rep->min_log_file = min_log;
}
int
__repmgr_write_some(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
QUEUED_OUTPUT *output;
REPMGR_FLAT *msg;
int bytes, ret;
while (!STAILQ_EMPTY(&conn->outbound_queue)) {
output = STAILQ_FIRST(&conn->outbound_queue);
msg = output->msg;
if ((bytes = sendsocket(conn->fd, &msg->data[output->offset],
msg->length - output->offset, 0)) == SOCKET_ERROR) {
switch (ret = net_errno) {
case WOULDBLOCK:
#if defined(DB_REPMGR_EAGAIN) && DB_REPMGR_EAGAIN != WOULDBLOCK
case DB_REPMGR_EAGAIN:
#endif
return (0);
default:
__repmgr_fire_conn_err_event(env, conn, ret);
STAT(env->rep_handle->
region->mstat.st_connection_drop++);
return (DB_REP_UNAVAIL);
}
}
if ((output->offset += (size_t)bytes) >= msg->length) {
STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
__os_free(env, output);
conn->out_queue_length--;
if (--msg->ref_count <= 0)
__os_free(env, msg);
conn->state = CONN_READY;
if ((ret = __repmgr_signal(&conn->drained)) != 0)
return (ret);
}
}
return (0);
}