#include "db_config.h"
#include "db_int.h"
#include "dbinc/mp.h"
struct sending_msg {
REPMGR_IOVECS *iovecs;
REPMGR_FLAT *fmsg;
};
struct repmgr_permanence {
DB_LSN lsn;
u_int threshold;
u_int quorum;
int policy;
int is_durable;
};
#ifdef CONFIG_TEST
static u_int fake_port __P((ENV *, u_int));
#endif
static int final_cleanup __P((ENV *, REPMGR_CONNECTION *, void *));
static int flatten __P((ENV *, struct sending_msg *));
static int got_acks __P((ENV *, void *));
static int __repmgr_finish_connect
__P((ENV *, socket_t s, REPMGR_CONNECTION **));
static int __repmgr_propose_version __P((ENV *, REPMGR_CONNECTION *));
static int __repmgr_start_connect __P((ENV*, socket_t *, ADDRINFO *, int *));
static void setup_sending_msg __P((ENV *,
struct sending_msg *, u_int8_t *, u_int, const DBT *, const DBT *));
static int __repmgr_send_internal
__P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, db_timeout_t));
static int enqueue_msg
__P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
static REPMGR_SITE *connected_site __P((ENV *, int));
static REPMGR_SITE *__repmgr_find_available_peer __P((ENV *));
static int send_connection __P((ENV *, u_int,
REPMGR_CONNECTION *, struct sending_msg *, int *));
int
__repmgr_connect(env, netaddr, connp, errp)
ENV *env;
repmgr_netaddr_t *netaddr;
REPMGR_CONNECTION **connp;
int *errp;
{
REPMGR_CONNECTION *conn;
ADDRINFO *ai0, *ai;
socket_t sock;
int err, ret;
u_int port;
COMPQUIET(err, 0);
#ifdef CONFIG_TEST
port = fake_port(env, netaddr->port);
#else
port = netaddr->port;
#endif
if ((ret = __repmgr_getaddr(env, netaddr->host, port, 0, &ai0)) != 0)
return (ret);
for (ai = ai0; ai != NULL; ai = ai->ai_next) {
switch ((ret = __repmgr_start_connect(env, &sock, ai, &err))) {
case 0:
if ((ret = __repmgr_finish_connect(env,
sock, &conn)) == 0)
*connp = conn;
else
(void)closesocket(sock);
goto out;
case DB_REP_UNAVAIL:
continue;
default:
goto out;
}
}
out:
__os_freeaddrinfo(env, ai0);
if (ret == DB_REP_UNAVAIL) {
__repmgr_print_conn_err(env, netaddr, err);
*errp = err;
}
return (ret);
}
static int
__repmgr_start_connect(env, socket_result, ai, err)
ENV *env;
socket_t *socket_result;
ADDRINFO *ai;
int *err;
{
socket_t s;
int ret;
if ((s = socket(ai->ai_family,
ai->ai_socktype, ai->ai_protocol)) == SOCKET_ERROR) {
ret = net_errno;
__db_err(env, ret, "create socket");
return (ret);
}
if (connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) {
*err = net_errno;
(void)closesocket(s);
return (DB_REP_UNAVAIL);
}
RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connection established"));
*socket_result = s;
return (0);
}
static int
__repmgr_finish_connect(env, s, connp)
ENV *env;
socket_t s;
REPMGR_CONNECTION **connp;
{
REPMGR_CONNECTION *conn;
int ret;
if ((ret = __repmgr_new_connection(env, &conn, s, CONN_CONNECTED)) != 0)
return (ret);
if ((ret = __repmgr_set_keepalive(env, conn)) == 0 &&
(ret = __repmgr_propose_version(env, conn)) == 0)
*connp = conn;
else
(void)__repmgr_destroy_conn(env, conn);
return (ret);
}
static int
__repmgr_propose_version(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
__repmgr_version_proposal_args versions;
repmgr_netaddr_t *my_addr;
size_t hostname_len, rec_length;
u_int8_t *buf, *p;
int ret;
db_rep = env->rep_handle;
my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
hostname_len = strlen(my_addr->host);
rec_length = hostname_len + 1 +
__REPMGR_VERSION_PROPOSAL_SIZE + 1;
if ((ret = __os_malloc(env, rec_length, &buf)) != 0)
goto out;
p = buf;
(void)strcpy((char*)p, my_addr->host);
p += hostname_len + 1;
versions.min = DB_REPMGR_MIN_VERSION;
versions.max = DB_REPMGR_VERSION;
__repmgr_version_proposal_marshal(env, &versions, p);
ret = __repmgr_send_v1_handshake(env, conn, buf, rec_length);
__os_free(env, buf);
out:
return (ret);
}
int
__repmgr_send(dbenv, control, rec, lsnp, eid, flags)
DB_ENV *dbenv;
const DBT *control, *rec;
const DB_LSN *lsnp;
int eid;
u_int32_t flags;
{
DB_REP *db_rep;
REP *rep;
ENV *env;
REPMGR_CONNECTION *conn;
REPMGR_SITE *site;
struct repmgr_permanence perm;
db_timeout_t maxblock;
u_int32_t available, nclients, needed, npeers_sent, nsites_sent, quorum;
int missed_peer, policy, ret, t_ret;
env = dbenv->env;
db_rep = env->rep_handle;
rep = db_rep->region;
ret = 0;
LOCK_MUTEX(db_rep->mutex);
if (db_rep->repmgr_status == stopped) {
ret = DB_REP_UNAVAIL;
goto out;
}
if (rep->siteinfo_seq > db_rep->siteinfo_seq &&
(ret = __repmgr_sync_siteaddr(env)) != 0)
goto out;
if (eid == DB_EID_BROADCAST) {
if ((ret = __repmgr_send_broadcast(env,
REPMGR_REP_MESSAGE, control, rec,
&nsites_sent, &npeers_sent, &missed_peer)) != 0)
goto out;
} else {
DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(eid));
if (IS_USING_LEASES(env) && !rep->stat.st_startup_complete)
LF_CLR(DB_REP_ANYWHERE);
if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) ==
DB_REP_ANYWHERE &&
(site = __repmgr_find_available_peer(env)) != NULL) {
VPRINT(env, (env, DB_VERB_REPMGR_MISC,
"sending request to peer"));
} else if ((site = connected_site(env, eid)) == NULL) {
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"ignoring message sent to unavailable site"));
ret = DB_REP_UNAVAIL;
goto out;
}
maxblock = OUT_QUEUE_LIMIT *
(rep->ack_timeout == 0 ?
DB_REPMGR_DEFAULT_ACK_TIMEOUT : rep->ack_timeout);
#undef SEND_ONE_CONNECTION
#define SEND_ONE_CONNECTION(c) \
do { \
if ((conn = (c)) != NULL && \
IS_READY_STATE(conn->state) && \
(ret = __repmgr_send_one(env, \
conn, REPMGR_REP_MESSAGE, \
control, rec, maxblock)) == DB_REP_UNAVAIL && \
(t_ret = \
__repmgr_bust_connection(env, conn)) != 0) \
ret = t_ret; \
} while (0)
SEND_ONE_CONNECTION(site->ref.conn.in);
if (ret != 0 && ret != DB_REP_UNAVAIL)
goto out;
SEND_ONE_CONNECTION(site->ref.conn.out);
if (ret != 0)
goto out;
#undef SEND_ONE_CONNECTION
nsites_sent = 1;
npeers_sent = F_ISSET(site, SITE_ELECTABLE) ? 1 : 0;
missed_peer = FALSE;
}
if (LF_ISSET(DB_REP_PERMANENT)) {
nclients = db_rep->region->config_nsites -1;
policy = rep->perm_policy;
switch (db_rep->active_gmdb_update) {
case gmdb_primary:
if (policy == DB_REPMGR_ACKS_ALL ||
policy == DB_REPMGR_ACKS_ALL_PEERS)
policy = DB_REPMGR_ACKS_ALL_AVAILABLE;
else if (policy == DB_REPMGR_ACKS_QUORUM &&
nclients == 1)
nclients = 0;
else if ((policy == DB_REPMGR_ACKS_ONE ||
policy == DB_REPMGR_ACKS_ONE_PEER) &&
nclients == 1) {
nclients = 0;
policy = DB_REPMGR_ACKS_QUORUM;
}
break;
case gmdb_secondary:
policy = DB_REPMGR_ACKS_NONE;
break;
case none:
break;
}
quorum = 0;
switch (policy) {
case DB_REPMGR_ACKS_NONE:
needed = 0;
COMPQUIET(available, 0);
break;
case DB_REPMGR_ACKS_ONE:
needed = 1;
available = nsites_sent;
break;
case DB_REPMGR_ACKS_ALL:
needed = nclients;
available = nsites_sent;
break;
case DB_REPMGR_ACKS_ONE_PEER:
needed = 1;
available = npeers_sent;
break;
case DB_REPMGR_ACKS_ALL_PEERS:
needed = 1;
available = npeers_sent;
break;
case DB_REPMGR_ACKS_QUORUM:
case DB_REPMGR_ACKS_ALL_AVAILABLE:
if (nclients > 1 ||
FLD_ISSET(db_rep->region->config,
REP_C_2SITE_STRICT) ||
db_rep->active_gmdb_update == gmdb_primary)
quorum = nclients / 2;
else
quorum = nclients;
if (policy == DB_REPMGR_ACKS_ALL_AVAILABLE) {
if (nsites_sent > 0)
needed = available = nsites_sent;
else {
ret = quorum > 0 ? DB_REP_UNAVAIL : 0;
goto out;
}
} else {
DB_ASSERT(env, policy == DB_REPMGR_ACKS_QUORUM);
needed = quorum;
available = npeers_sent;
if (npeers_sent < quorum && !missed_peer) {
needed = npeers_sent;
}
}
break;
default:
ret = __db_unknown_path(env, "__repmgr_send");
goto out;
}
if (policy != DB_REPMGR_ACKS_ALL_AVAILABLE) {
if (needed == 0)
goto out;
if (available < needed) {
ret = DB_REP_UNAVAIL;
goto out;
}
}
VPRINT(env, (env, DB_VERB_REPMGR_MISC,
"will await acknowledgement: need %u", needed));
perm.lsn = *lsnp;
perm.threshold = needed;
perm.policy = policy;
perm.quorum = quorum;
perm.is_durable = FALSE;
ret = __repmgr_await_cond(env, got_acks,
&perm, rep->ack_timeout, &db_rep->ack_waiters);
if (ret == 0 || ret == DB_TIMEOUT)
ret = perm.is_durable ? 0 : DB_REP_UNAVAIL;
}
out: UNLOCK_MUTEX(db_rep->mutex);
if (LF_ISSET(DB_REP_PERMANENT)) {
if (ret != 0) {
switch (db_rep->active_gmdb_update) {
case none:
STAT(db_rep->region->mstat.st_perm_failed++);
DB_EVENT(env, DB_EVENT_REP_PERM_FAILED, NULL);
break;
case gmdb_primary:
db_rep->limbo_failure = *lsnp;
case gmdb_secondary:
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"GMDB perm failure %d at [%lu][%lu]",
(int)db_rep->active_gmdb_update,
(u_long)lsnp->file, (u_long)lsnp->offset));
break;
}
} else if (db_rep->limbo_resolution_needed) {
db_rep->durable_lsn = *lsnp;
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"perm success [%lu][%lu] with limbo resolution needed",
(u_long)lsnp->file, (u_long)lsnp->offset));
db_rep->limbo_resolution_needed = FALSE;
LOCK_MUTEX(db_rep->mutex);
if ((t_ret = __repmgr_defer_op(env,
REPMGR_RESOLVE_LIMBO)) != 0)
__db_err(env, t_ret, "repmgr_defer_op");
UNLOCK_MUTEX(db_rep->mutex);
}
}
return (ret);
}
static REPMGR_SITE *
connected_site(env, eid)
ENV *env;
int eid;
{
DB_REP *db_rep;
REPMGR_SITE *site;
db_rep = env->rep_handle;
DB_ASSERT(env, IS_VALID_EID(eid));
site = SITE_FROM_EID(eid);
if (site->state == SITE_CONNECTED)
return (site);
return (NULL);
}
int
__repmgr_sync_siteaddr(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
u_int added;
int ret;
db_rep = env->rep_handle;
rep = db_rep->region;
ret = 0;
MUTEX_LOCK(env, rep->mtx_repmgr);
if (!IS_VALID_EID(db_rep->self_eid))
db_rep->self_eid = rep->self_eid;
added = db_rep->site_cnt;
if ((ret = __repmgr_copy_in_added_sites(env)) == 0)
ret = __repmgr_init_new_sites(env, (int)added,
(int)db_rep->site_cnt);
MUTEX_UNLOCK(env, rep->mtx_repmgr);
return (ret);
}
int
__repmgr_send_broadcast(env, type, control, rec, nsitesp, npeersp, missingp)
ENV *env;
u_int type;
const DBT *control, *rec;
u_int *nsitesp, *npeersp;
int *missingp;
{
DB_REP *db_rep;
REP *rep;
struct sending_msg msg;
REPMGR_SITE *site;
REPMGR_IOVECS iovecs;
u_int8_t msg_hdr_buf[__REPMGR_MSG_HDR_SIZE];
u_int nsites, npeers;
int eid, full_member, has_missing_peer, ret, sent1, sent2;
db_rep = env->rep_handle;
rep = db_rep->region;
__os_gettime(env, &db_rep->last_bcast, 1);
msg.iovecs = &iovecs;
setup_sending_msg(env, &msg, msg_hdr_buf, type, control, rec);
nsites = npeers = 0;
has_missing_peer = FALSE;
FOR_EACH_REMOTE_SITE_INDEX(eid) {
sent1 = sent2 = FALSE;
site = SITE_FROM_EID(eid);
if (site->membership == SITE_PRESENT)
full_member = TRUE;
else {
full_member = FALSE;
if (rep->master_id != db_rep->self_eid)
goto next;
}
if ((ret = send_connection(env, type,
site->ref.conn.in, &msg, &sent1)) != 0)
return (ret);
if ((ret = send_connection(env, type,
site->ref.conn.out, &msg, &sent2)) != 0)
return (ret);
next:
if (full_member) {
if (sent1 || sent2) {
nsites++;
if (F_ISSET(site, SITE_ELECTABLE))
npeers++;
} else {
if (!F_ISSET(site, SITE_HAS_PRIO) ||
F_ISSET(site, SITE_ELECTABLE))
has_missing_peer = TRUE;
}
}
}
*nsitesp = nsites;
*npeersp = npeers;
*missingp = has_missing_peer;
return (0);
}
static int
send_connection(env, type, conn, msg, sent)
ENV *env;
u_int type;
REPMGR_CONNECTION *conn;
struct sending_msg *msg;
int *sent;
{
DB_REP *db_rep;
int ret;
static const u_int version_max_msg_type[] = {
0,
REPMGR_MAX_V1_MSG_TYPE,
REPMGR_MAX_V2_MSG_TYPE,
REPMGR_MAX_V3_MSG_TYPE,
REPMGR_MAX_V4_MSG_TYPE
};
db_rep = env->rep_handle;
*sent = FALSE;
if (conn == NULL || !IS_READY_STATE(conn->state))
return (0);
DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(conn->eid) &&
conn->version > 0 &&
conn->version <= DB_REPMGR_VERSION);
if (type > version_max_msg_type[conn->version])
return (0);
if ((ret = __repmgr_send_internal(env, conn, msg, 0)) == 0)
*sent = TRUE;
else if (ret == DB_TIMEOUT) {
ret = 0;
} else if (ret == DB_REP_UNAVAIL)
ret = __repmgr_bust_connection(env, conn);
return (ret);
}
int
__repmgr_send_one(env, conn, msg_type, control, rec, maxblock)
ENV *env;
REPMGR_CONNECTION *conn;
u_int msg_type;
const DBT *control, *rec;
db_timeout_t maxblock;
{
struct sending_msg msg;
REPMGR_IOVECS iovecs;
u_int8_t hdr_buf[__REPMGR_MSG_HDR_SIZE];
int ret;
msg.iovecs = &iovecs;
setup_sending_msg(env, &msg, hdr_buf, msg_type, control, rec);
if ((ret =
__repmgr_send_internal(env, conn, &msg, maxblock)) == DB_TIMEOUT &&
maxblock == 0)
ret = 0;
return (ret);
}
int
__repmgr_send_many(env, conn, iovecs, maxblock)
ENV *env;
REPMGR_CONNECTION *conn;
REPMGR_IOVECS *iovecs;
db_timeout_t maxblock;
{
struct sending_msg msg;
int ret;
if (conn->state == CONN_DEFUNCT)
return (DB_REP_UNAVAIL);
msg.iovecs = iovecs;
msg.fmsg = NULL;
if ((ret =
__repmgr_send_internal(env, conn, &msg, maxblock)) == DB_TIMEOUT &&
maxblock == 0)
ret = 0;
if (ret != 0 && ret != DB_TIMEOUT)
(void)__repmgr_disable_connection(env, conn);
return (ret);
}
int
__repmgr_send_own_msg(env, conn, type, buf, len)
ENV *env;
REPMGR_CONNECTION *conn;
u_int8_t *buf;
u_int32_t len, type;
{
REPMGR_IOVECS iovecs;
struct sending_msg msg;
__repmgr_msg_hdr_args msg_hdr;
u_int8_t hdr_buf[__REPMGR_MSG_HDR_SIZE];
if (conn->version < OWN_MIN_VERSION)
return (0);
msg_hdr.type = REPMGR_OWN_MSG;
REPMGR_OWN_BUF_SIZE(msg_hdr) = len;
REPMGR_OWN_MSG_TYPE(msg_hdr) = type;
__repmgr_msg_hdr_marshal(env, &msg_hdr, hdr_buf);
__repmgr_iovec_init(&iovecs);
__repmgr_add_buffer(&iovecs, hdr_buf, __REPMGR_MSG_HDR_SIZE);
if (len > 0)
__repmgr_add_buffer(&iovecs, buf, len);
msg.iovecs = &iovecs;
msg.fmsg = NULL;
return (__repmgr_send_internal(env, conn, &msg, 0));
}
static int
__repmgr_send_internal(env, conn, msg, maxblock)
ENV *env;
REPMGR_CONNECTION *conn;
struct sending_msg *msg;
db_timeout_t maxblock;
{
DB_REP *db_rep;
SITE_STRING_BUFFER buffer;
int ret;
size_t total_written;
db_rep = env->rep_handle;
DB_ASSERT(env, conn->state != CONN_DEFUNCT);
if (!STAILQ_EMPTY(&conn->outbound_queue)) {
VPRINT(env, (env, DB_VERB_REPMGR_MISC,
"msg to %s to be queued",
__repmgr_format_eid_loc(db_rep, conn, buffer)));
if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
maxblock > 0 && conn->state != CONN_CONGESTED) {
VPRINT(env, (env, DB_VERB_REPMGR_MISC,
"block thread, awaiting output queue space"));
conn->ref_count++;
ret = __repmgr_await_drain(env, conn, maxblock);
conn->ref_count--;
VPRINT(env, (env, DB_VERB_REPMGR_MISC,
"drain returned %d (%d,%d)", ret,
db_rep->repmgr_status, conn->out_queue_length));
if (db_rep->repmgr_status == stopped)
return (DB_TIMEOUT);
if (ret != 0)
return (ret);
if (STAILQ_EMPTY(&conn->outbound_queue))
goto empty;
}
if (conn->out_queue_length < OUT_QUEUE_LIMIT)
return (enqueue_msg(env, conn, msg, 0));
else {
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"queue limit exceeded"));
STAT(env->rep_handle->
region->mstat.st_msgs_dropped++);
return (DB_TIMEOUT);
}
}
empty:
if ((ret = __repmgr_write_iovecs(env,
conn, msg->iovecs, &total_written)) == 0)
return (0);
switch (ret) {
case WOULDBLOCK:
#if defined(DB_REPMGR_EAGAIN) && DB_REPMGR_EAGAIN != WOULDBLOCK
case DB_REPMGR_EAGAIN:
#endif
break;
default:
#ifdef EBADF
DB_ASSERT(env, ret != EBADF);
#endif
__repmgr_fire_conn_err_event(env, conn, ret);
STAT(env->rep_handle->region->mstat.st_connection_drop++);
return (DB_REP_UNAVAIL);
}
VPRINT(env, (env, DB_VERB_REPMGR_MISC, "wrote only %lu bytes to %s",
(u_long)total_written,
__repmgr_format_eid_loc(db_rep, conn, buffer)));
if ((ret = enqueue_msg(env, conn, msg, total_written)) != 0)
return (ret);
STAT(env->rep_handle->region->mstat.st_msgs_queued++);
return (__repmgr_wake_main_thread(env));
}
int
__repmgr_write_iovecs(env, conn, iovecs, writtenp)
ENV *env;
REPMGR_CONNECTION *conn;
REPMGR_IOVECS *iovecs;
size_t *writtenp;
{
REPMGR_IOVECS iovec_buf, *v;
size_t nw, sz, total_written;
int ret;
if (iovecs->count <= MIN_IOVEC) {
v = &iovec_buf;
sz = sizeof(iovec_buf);
} else {
sz = (size_t)REPMGR_IOVECS_ALLOC_SZ((u_int)iovecs->count);
if ((ret = __os_malloc(env, sz, &v)) != 0)
return (ret);
}
memcpy(v, iovecs, sz);
total_written = 0;
while ((ret = __repmgr_writev(conn->fd, &v->vectors[v->offset],
v->count-v->offset, &nw)) == 0) {
total_written += nw;
if (__repmgr_update_consumed(v, nw))
break;
}
*writtenp = total_written;
if (v != &iovec_buf)
__os_free(env, v);
return (ret);
}
static int
got_acks(env, context)
ENV *env;
void *context;
{
DB_REP *db_rep;
REPMGR_SITE *site;
struct repmgr_permanence *perm;
u_int sites_acked, peers_acked;
int done, eid, has_unacked_peer, is_perm, policy;
db_rep = env->rep_handle;
perm = context;
policy = perm->policy;
sites_acked = peers_acked = 0;
has_unacked_peer = FALSE;
FOR_EACH_REMOTE_SITE_INDEX(eid) {
site = SITE_FROM_EID(eid);
if (site->membership != SITE_PRESENT)
continue;
if (!F_ISSET(site, SITE_HAS_PRIO)) {
has_unacked_peer = TRUE;
continue;
}
if (LOG_COMPARE(&site->max_ack, &perm->lsn) >= 0) {
sites_acked++;
if (F_ISSET(site, SITE_ELECTABLE))
peers_acked++;
} else {
if (F_ISSET(site, SITE_ELECTABLE))
has_unacked_peer = TRUE;
}
}
VPRINT(env, (env, DB_VERB_REPMGR_MISC,
"checking perm result, %lu, %lu, %d",
(u_long)sites_acked, (u_long)peers_acked, has_unacked_peer));
switch (policy) {
case DB_REPMGR_ACKS_ALL:
case DB_REPMGR_ACKS_ONE:
is_perm = (sites_acked >= perm->threshold);
break;
case DB_REPMGR_ACKS_ONE_PEER:
is_perm = (peers_acked >= perm->threshold);
break;
case DB_REPMGR_ACKS_QUORUM:
case DB_REPMGR_ACKS_ALL_AVAILABLE:
is_perm = (peers_acked >= perm->quorum) || !has_unacked_peer;
break;
case DB_REPMGR_ACKS_ALL_PEERS:
is_perm = !has_unacked_peer;
break;
default:
is_perm = FALSE;
(void)__db_unknown_path(env, "got_acks");
}
if (is_perm)
perm->is_durable = TRUE;
if (policy == DB_REPMGR_ACKS_ALL_AVAILABLE)
done = sites_acked >= perm->threshold;
else
done = is_perm;
return (done);
}
int
__repmgr_bust_connection(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
REP *rep;
REPMGR_SITE *site;
u_int32_t flags;
int ret, eid;
db_rep = env->rep_handle;
rep = db_rep->region;
if (conn->state == CONN_DEFUNCT)
return (0);
eid = conn->eid;
if ((ret = __repmgr_disable_connection(env, conn)) != 0)
return (ret);
if (conn->type != REP_CONNECTION || !IS_KNOWN_REMOTE_SITE(eid))
goto out;
site = SITE_FROM_EID(eid);
if (conn == site->ref.conn.in) {
site->ref.conn.in = NULL;
if (site->ref.conn.out != NULL)
goto out;
} else if (conn == site->ref.conn.out) {
site->ref.conn.out = NULL;
if (site->ref.conn.in != NULL)
goto out;
} else
goto out;
if ((ret = __repmgr_schedule_connection_attempt(env, eid, FALSE)) != 0)
goto out;
if (!IS_SUBORDINATE(db_rep) && eid == rep->master_id) {
flags = ELECT_F_EVENT_NOTIFY;
if (FLD_ISSET(db_rep->region->config, REP_C_ELECTIONS))
LF_SET(ELECT_F_IMMED | ELECT_F_FAST);
else
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"Master failure, but no elections"));
if ((ret = __repmgr_init_election(env, flags)) != 0)
goto out;
}
if (rep->master_id == db_rep->self_eid) {
RPRINT(env, (env, DB_VERB_REPMGR_MISC,
"Repmgr: bust connection. Block archive"));
MASTER_UPDATE(env, (REGENV *)env->reginfo->primary);
}
out:
return (ret);
}
int
__repmgr_disable_connection(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
REPMGR_SITE *site;
REPMGR_RESPONSE *resp;
u_int32_t i;
int eid, ret, t_ret;
db_rep = env->rep_handle;
ret = 0;
conn->state = CONN_DEFUNCT;
if (conn->type == REP_CONNECTION) {
eid = conn->eid;
if (IS_VALID_EID(eid)) {
site = SITE_FROM_EID(eid);
if (conn != site->ref.conn.in &&
conn != site->ref.conn.out)
TAILQ_REMOVE(&site->sub_conns, conn, entries);
TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries);
conn->ref_count++;
}
conn->eid = -1;
} else if (conn->type == APP_CONNECTION) {
for (i = 0; i < conn->aresp; i++) {
resp = &conn->responses[i];
if (F_ISSET(resp, RESP_IN_USE) &&
F_ISSET(resp, RESP_THREAD_WAITING)) {
F_SET(resp, RESP_COMPLETE);
resp->ret = DB_REP_UNAVAIL;
}
}
ret = __repmgr_wake_waiters(env, &conn->response_waiters);
}
if ((t_ret = __repmgr_signal(&conn->drained)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret = __repmgr_wake_main_thread(env)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
int
__repmgr_cleanup_defunct(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_REP *db_rep;
int ret, t_ret;
db_rep = env->rep_handle;
ret = __repmgr_close_connection(env, conn);
TAILQ_REMOVE(&db_rep->connections, conn, entries);
if ((t_ret = __repmgr_decr_conn_ref(env, conn)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
int
__repmgr_close_connection(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
int ret;
#ifdef DB_WIN32
int t_ret;
#endif
ret = 0;
if (conn->fd != INVALID_SOCKET &&
closesocket(conn->fd) == SOCKET_ERROR) {
ret = net_errno;
__db_err(env, ret, DB_STR("3582", "closing socket"));
}
conn->fd = INVALID_SOCKET;
#ifdef DB_WIN32
if (conn->event_object != WSA_INVALID_EVENT &&
!WSACloseEvent(conn->event_object)) {
t_ret = net_errno;
__db_err(env, t_ret, DB_STR("3583",
"releasing WSA event object"));
if (ret == 0)
ret = t_ret;
}
conn->event_object = WSA_INVALID_EVENT;
#endif
return (ret);
}
int
__repmgr_decr_conn_ref(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
DB_ASSERT(env, conn->ref_count > 0);
return (--conn->ref_count > 0 ? 0 :
__repmgr_destroy_conn(env, conn));
}
int
__repmgr_destroy_conn(env, conn)
ENV *env;
REPMGR_CONNECTION *conn;
{
QUEUED_OUTPUT *out;
REPMGR_FLAT *msg;
REPMGR_RESPONSE *resp;
DBT *dbt;
int ret, t_ret;
ret = 0;
DB_ASSERT(env, conn->ref_count == 0);
if (conn->reading_phase == DATA_PHASE) {
switch (conn->msg_type) {
case REPMGR_OWN_MSG:
if (conn->input.rep_message == NULL)
break;
case REPMGR_APP_MESSAGE:
case REPMGR_HEARTBEAT:
case REPMGR_REP_MESSAGE:
__os_free(env, conn->input.rep_message);
break;
case REPMGR_APP_RESPONSE:
DB_ASSERT(env, conn->cur_resp < conn->aresp &&
conn->responses != NULL);
resp = &conn->responses[conn->cur_resp];
DB_ASSERT(env, F_ISSET(resp, RESP_READING));
if (F_ISSET(resp, RESP_DUMMY_BUF))
__os_free(env, resp->dbt.data);
break;
case REPMGR_PERMLSN:
case REPMGR_HANDSHAKE:
dbt = &conn->input.repmgr_msg.cntrl;
if (dbt->size > 0)
__os_free(env, dbt->data);
dbt = &conn->input.repmgr_msg.rec;
if (dbt->size > 0)
__os_free(env, dbt->data);
break;
case REPMGR_RESP_ERROR:
default:
ret = __db_unknown_path(env, "destroy_conn");
}
}
if (conn->type == APP_CONNECTION && conn->responses != NULL)
__os_free(env, conn->responses);
if ((t_ret = __repmgr_destroy_waiters(env,
&conn->response_waiters)) != 0 && ret == 0)
ret = t_ret;
while (!STAILQ_EMPTY(&conn->outbound_queue)) {
out = STAILQ_FIRST(&conn->outbound_queue);
STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
msg = out->msg;
if (--msg->ref_count <= 0)
__os_free(env, msg);
__os_free(env, out);
}
if ((t_ret = __repmgr_free_cond(&conn->drained)) != 0 &&
ret == 0)
ret = t_ret;
__os_free(env, conn);
return (ret);
}
static int
enqueue_msg(env, conn, msg, offset)
ENV *env;
REPMGR_CONNECTION *conn;
struct sending_msg *msg;
size_t offset;
{
QUEUED_OUTPUT *q_element;
int ret;
if (msg->fmsg == NULL && ((ret = flatten(env, msg)) != 0))
return (ret);
if ((ret = __os_malloc(env, sizeof(QUEUED_OUTPUT), &q_element)) != 0)
return (ret);
q_element->msg = msg->fmsg;
msg->fmsg->ref_count++;
q_element->offset = offset;
STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries);
conn->out_queue_length++;
return (0);
}
static void
setup_sending_msg(env, msg, hdr_buf, type, control, rec)
ENV *env;
struct sending_msg *msg;
u_int8_t *hdr_buf;
u_int type;
const DBT *control, *rec;
{
__repmgr_msg_hdr_args msg_hdr;
__repmgr_iovec_init(msg->iovecs);
__repmgr_add_buffer(msg->iovecs, hdr_buf, __REPMGR_MSG_HDR_SIZE);
msg_hdr.type = type;
if ((REP_MSG_CONTROL_SIZE(msg_hdr) =
(control == NULL ? 0 : control->size)) > 0)
__repmgr_add_dbt(msg->iovecs, control);
if ((REP_MSG_REC_SIZE(msg_hdr) = (rec == NULL ? 0 : rec->size)) > 0)
__repmgr_add_dbt(msg->iovecs, rec);
__repmgr_msg_hdr_marshal(env, &msg_hdr, hdr_buf);
msg->fmsg = NULL;
}
static int
flatten(env, msg)
ENV *env;
struct sending_msg *msg;
{
u_int8_t *p;
size_t msg_size;
int i, ret;
DB_ASSERT(env, msg->fmsg == NULL);
msg_size = msg->iovecs->total_bytes;
if ((ret = __os_malloc(env, sizeof(*msg->fmsg) + msg_size,
&msg->fmsg)) != 0)
return (ret);
msg->fmsg->length = msg_size;
msg->fmsg->ref_count = 0;
p = &msg->fmsg->data[0];
for (i = 0; i < msg->iovecs->count; i++) {
memcpy(p, msg->iovecs->vectors[i].iov_base,
msg->iovecs->vectors[i].iov_len);
p = &p[msg->iovecs->vectors[i].iov_len];
}
__repmgr_iovec_init(msg->iovecs);
__repmgr_add_buffer(msg->iovecs, &msg->fmsg->data[0], msg_size);
return (0);
}
static REPMGR_SITE *
__repmgr_find_available_peer(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
REPMGR_CONNECTION *conn;
REPMGR_SITE *site;
u_int i;
db_rep = env->rep_handle;
rep = db_rep->region;
FOR_EACH_REMOTE_SITE_INDEX(i) {
site = &db_rep->sites[i];
if (FLD_ISSET(site->config, DB_REPMGR_PEER) &&
EID_FROM_SITE(site) != rep->master_id &&
site->state == SITE_CONNECTED &&
(((conn = site->ref.conn.in) != NULL &&
conn->state == CONN_READY) ||
((conn = site->ref.conn.out) != NULL &&
conn->state == CONN_READY)))
return (site);
}
return (NULL);
}
int
__repmgr_pack_netaddr(env, host, port, addr)
ENV *env;
const char *host;
u_int port;
repmgr_netaddr_t *addr;
{
int ret;
DB_ASSERT(env, host != NULL);
if ((ret = __os_strdup(env, host, &addr->host)) != 0)
return (ret);
addr->port = (u_int16_t)port;
return (0);
}
int
__repmgr_getaddr(env, host, port, flags, result)
ENV *env;
const char *host;
u_int port;
int flags;
ADDRINFO **result;
{
ADDRINFO *answer, hints;
char buffer[10];
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = flags;
(void)snprintf(buffer, sizeof(buffer), "%u", port);
if (__os_getaddrinfo(env, host, port, buffer, &hints, &answer) != 0)
return (DB_REP_UNAVAIL);
*result = answer;
return (0);
}
int
__repmgr_listen(env)
ENV *env;
{
ADDRINFO *ai;
DB_REP *db_rep;
repmgr_netaddr_t *addrp;
char *why;
int sockopt, ret;
socket_t s;
db_rep = env->rep_handle;
s = INVALID_SOCKET;
addrp = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
if ((ret = __repmgr_getaddr(env,
addrp->host, addrp->port, AI_PASSIVE, &ai)) != 0)
return (ret);
COMPQUIET(why, "");
DB_ASSERT(env, ai != NULL);
for (; ai != NULL; ai = ai->ai_next) {
if ((s = socket(ai->ai_family,
ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) {
why = DB_STR("3584", "can't create listen socket");
continue;
}
sockopt = 1;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt,
sizeof(sockopt)) != 0) {
why = DB_STR("3585",
"can't set REUSEADDR socket option");
break;
}
if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) {
why = DB_STR("3586",
"can't bind socket to listening address");
ret = net_errno;
(void)closesocket(s);
s = INVALID_SOCKET;
continue;
}
if (listen(s, 5) != 0) {
why = DB_STR("3587", "listen()");
break;
}
if ((ret = __repmgr_set_nonblocking(s)) != 0) {
__db_err(env, ret, DB_STR("3588",
"can't unblock listen socket"));
goto clean;
}
db_rep->listen_fd = s;
goto out;
}
if (ret == 0)
ret = net_errno;
__db_err(env, ret, "%s", why);
clean: if (s != INVALID_SOCKET)
(void)closesocket(s);
out:
__os_freeaddrinfo(env, ai);
return (ret);
}
int
__repmgr_net_close(env)
ENV *env;
{
DB_REP *db_rep;
REP *rep;
REPMGR_SITE *site;
u_int eid;
int ret;
db_rep = env->rep_handle;
rep = db_rep->region;
if ((ret = __repmgr_each_connection(env, final_cleanup, NULL,
FALSE)) == 0) {
FOR_EACH_REMOTE_SITE_INDEX(eid) {
site = SITE_FROM_EID(eid);
site->ref.conn.in = NULL;
site->ref.conn.out = NULL;
}
}
if (db_rep->listen_fd != INVALID_SOCKET) {
if (closesocket(db_rep->listen_fd) == SOCKET_ERROR && ret == 0)
ret = net_errno;
db_rep->listen_fd = INVALID_SOCKET;
rep->listener = 0;
}
return (ret);
}
static int
final_cleanup(env, conn, unused)
ENV *env;
REPMGR_CONNECTION *conn;
void *unused;
{
DB_REP *db_rep;
REPMGR_SITE *site;
int ret, t_ret;
COMPQUIET(unused, NULL);
db_rep = env->rep_handle;
ret = __repmgr_close_connection(env, conn);
if (conn->type == REP_CONNECTION && IS_VALID_EID(conn->eid)) {
site = SITE_FROM_EID(conn->eid);
if (site->state == SITE_CONNECTED &&
(conn == site->ref.conn.in || conn == site->ref.conn.out)) {
} else
TAILQ_REMOVE(&site->sub_conns, conn, entries);
t_ret = __repmgr_destroy_conn(env, conn);
} else {
TAILQ_REMOVE(&db_rep->connections, conn, entries);
t_ret = __repmgr_decr_conn_ref(env, conn);
}
if (t_ret != 0 && ret == 0)
ret = t_ret;
return (ret);
}
void
__repmgr_net_destroy(env, db_rep)
ENV *env;
DB_REP *db_rep;
{
REPMGR_RETRY *retry;
while (!TAILQ_EMPTY(&db_rep->retries)) {
retry = TAILQ_FIRST(&db_rep->retries);
TAILQ_REMOVE(&db_rep->retries, retry, entries);
__os_free(env, retry);
}
DB_ASSERT(env, TAILQ_EMPTY(&db_rep->connections));
}
#ifdef CONFIG_TEST
static u_int
fake_port(env, port)
ENV *env;
u_int port;
{
#define MIN_PORT 1
#define MAX_PORT 65535
ADDRINFO *ai0, *ai;
db_iovec_t iovec;
char *arbiter, buf[100], *end, *p;
socket_t s;
long result;
size_t count;
int ret;
u_int arbiter_port;
if ((arbiter = getenv("DB_TEST_FAKE_PORT")) == NULL)
return (port);
if (__db_getlong(env->dbenv, "repmgr_net.c:fake_port",
arbiter, MIN_PORT, MAX_PORT, &result) != 0)
return (port);
arbiter_port = (u_int)result;
if ((ret = __repmgr_getaddr(env,
"localhost", arbiter_port, 0, &ai0)) != 0) {
__db_err(env, ret, "fake_port:getaddr");
return (port);
}
s = INVALID_SOCKET;
for (ai = ai0; ai != NULL; ai = ai->ai_next) {
if ((s = socket(ai->ai_family,
ai->ai_socktype, ai->ai_protocol)) == SOCKET_ERROR) {
ret = net_errno;
s = INVALID_SOCKET;
__db_err(env, ret, "fake_port:socket");
goto err;
}
if (connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) {
ret = net_errno;
(void)closesocket(s);
s = INVALID_SOCKET;
}
}
if (ret != 0)
goto err;
(void)snprintf(buf, sizeof(buf), "{config,%u}\r\n", port);
iovec.iov_base = buf;
iovec.iov_len = (u_long)strlen(buf);
while ((ret = __repmgr_writev(s, &iovec, 1, &count)) == 0) {
iovec.iov_base = (u_int8_t *)iovec.iov_base + count;
if ((iovec.iov_len -= (u_long)count) == 0)
break;
}
if (ret != 0) {
__db_err(env, ret, "fake_port:writev");
goto err;
}
iovec.iov_base = buf;
iovec.iov_len = sizeof(buf);
p = buf;
while ((ret = __repmgr_readv(s, &iovec, 1, &count)) == 0) {
if (count == 0) {
__db_errx(env, "fake_port: premature EOF");
goto err;
}
for (p = iovec.iov_base, end = &p[count]; p < end; p++)
if (*p == '\r' || *p == '\n')
break;
if (p < end) {
*p = '\0';
break;
}
iovec.iov_base = (u_int8_t *)iovec.iov_base + count;
iovec.iov_len -= (u_long)count;
DB_ASSERT(env, iovec.iov_len > 0);
}
if (ret != 0)
goto err;
if (__db_getlong(env->dbenv, "repmgr_net.c:fake_port",
buf, MIN_PORT, MAX_PORT, &result) == 0)
port = (u_int)result;
err:
if (s != INVALID_SOCKET)
(void)closesocket(s);
__os_freeaddrinfo(env, ai0);
return (port);
}
#endif