#include "db_config.h"
#include "db_int.h"
#define NO_SUCH_FILE_DESC (-1)
struct io_info {
fd_set *reads, *writes;
int maxfd;
};
static int __repmgr_conn_work __P((ENV *, REPMGR_CONNECTION *, void *));
static int prepare_io __P((ENV *, REPMGR_CONNECTION *, void *));
int
__repmgr_thread_start(env, runnable)
ENV *env;
REPMGR_RUNNABLE *runnable;
{
pthread_attr_t *attrp;
#if defined(_POSIX_THREAD_ATTR_STACKSIZE) && defined(DB_STACKSIZE)
pthread_attr_t attributes;
size_t size;
int ret;
attrp = &attributes;
if ((ret = pthread_attr_init(&attributes)) != 0) {
__db_err(env, ret, DB_STR("3630",
"pthread_attr_init in repmgr_thread_start"));
return (ret);
}
size = DB_STACKSIZE;
#ifdef PTHREAD_STACK_MIN
if (size < PTHREAD_STACK_MIN)
size = PTHREAD_STACK_MIN;
#endif
if ((ret = pthread_attr_setstacksize(&attributes, size)) != 0) {
__db_err(env, ret, DB_STR("3631",
"pthread_attr_setstacksize in repmgr_thread_start"));
return (ret);
}
#else
attrp = NULL;
#endif
runnable->finished = FALSE;
runnable->quit_requested = FALSE;
runnable->env = env;
return (pthread_create(&runnable->thread_id, attrp,
runnable->run, runnable));
}
int
__repmgr_thread_join(thread)
REPMGR_RUNNABLE *thread;
{
return (pthread_join(thread->thread_id, NULL));
}
int
__repmgr_set_nonblock_conn(conn)
REPMGR_CONNECTION *conn;
{
return (__repmgr_set_nonblocking(conn->fd));
}
int
__repmgr_set_nonblocking(fd)
socket_t fd;
{
int flags;
if ((flags = fcntl(fd, F_GETFL, 0)) < 0)
return (errno);
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
return (errno);
return (0);
}
int
__repmgr_wake_waiters(env, waiter)
ENV *env;
waiter_t *waiter;
{
COMPQUIET(env, NULL);
return (pthread_cond_broadcast(waiter));
}
int
__repmgr_await_cond(env, pred, ctx, timeout, wait_condition)
ENV *env;
PREDICATE pred;
void *ctx;
db_timeout_t timeout;
waiter_t *wait_condition;
{
DB_REP *db_rep;
struct timespec deadline;
int ret, timed;
db_rep = env->rep_handle;
if ((timed = (timeout > 0)))
__repmgr_compute_wait_deadline(env, &deadline, timeout);
else
COMPQUIET(deadline.tv_sec, 0);
while (!(*pred)(env, ctx)) {
if (timed)
ret = pthread_cond_timedwait(wait_condition,
db_rep->mutex, &deadline);
else
ret = pthread_cond_wait(wait_condition, db_rep->mutex);
if (db_rep->repmgr_status == stopped)
return (DB_REP_UNAVAIL);
if (ret == ETIMEDOUT)
return (DB_TIMEOUT);
if (ret != 0)
return (ret);
}
return (0);
}
int
__repmgr_await_gmdbop(env)
ENV *env;
{
DB_REP *db_rep;
int ret;
db_rep = env->rep_handle;
while (db_rep->gmdb_busy)
if ((ret = pthread_cond_wait(&db_rep->gmdb_idle,
db_rep->mutex)) != 0)
return (ret);
return (0);
}
void
__repmgr_compute_wait_deadline(env, result, wait)
ENV *env;
struct timespec *result;
db_timeout_t wait;
{
__os_gettime(env, (db_timespec *)result, 0);
TIMESPEC_ADD_DB_TIMEOUT(result, wait);
}
int
__repmgr_await_drain(env, conn, timeout)
ENV *env;
REPMGR_CONNECTION *conn;
db_timeout_t timeout;
{
DB_REP *db_rep;
struct timespec deadline;
int ret;
db_rep = env->rep_handle;
__repmgr_compute_wait_deadline(env, &deadline, timeout);
ret = 0;
while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
ret = pthread_cond_timedwait(&conn->drained,
db_rep->mutex, &deadline);
switch (ret) {
case 0:
if (db_rep->repmgr_status == stopped)
goto out;
if (conn->state == CONN_DEFUNCT) {
ret = DB_REP_UNAVAIL;
goto out;
}
break;
case ETIMEDOUT:
conn->state = CONN_CONGESTED;
ret = 0;
goto out;
default:
goto out;
}
}
out:
return (ret);
}
int
__repmgr_alloc_cond(c)
cond_var_t *c;
{
return (pthread_cond_init(c, NULL));
}
int
__repmgr_free_cond(c)
cond_var_t *c;
{
return (pthread_cond_destroy(c));
}
void
__repmgr_env_create_pf(db_rep)
DB_REP *db_rep;
{
db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC;
}
int
__repmgr_create_mutex_pf(mutex)
mgr_mutex_t *mutex;
{
return (pthread_mutex_init(mutex, NULL));
}
int
__repmgr_destroy_mutex_pf(mutex)
mgr_mutex_t *mutex;
{
return (pthread_mutex_destroy(mutex));
}
int
__repmgr_init(env)
ENV *env;
{
DB_REP *db_rep;
struct sigaction sigact;
int ack_inited, elect_inited, file_desc[2], gmdb_inited, queue_inited;
int ret;
db_rep = env->rep_handle;
if (sigaction(SIGPIPE, NULL, &sigact) == -1) {
ret = errno;
__db_err(env, ret, DB_STR("3632",
"can't access signal handler"));
return (ret);
}
if (sigact.sa_handler == SIG_DFL) {
sigact.sa_handler = SIG_IGN;
sigact.sa_flags = 0;
if (sigaction(SIGPIPE, &sigact, NULL) == -1) {
ret = errno;
__db_err(env, ret, DB_STR("3633",
"can't access signal handler"));
return (ret);
}
}
ack_inited = elect_inited = gmdb_inited = queue_inited = FALSE;
if ((ret = __repmgr_init_waiters(env, &db_rep->ack_waiters)) != 0)
goto err;
ack_inited = TRUE;
if ((ret = pthread_cond_init(&db_rep->check_election, NULL)) != 0)
goto err;
elect_inited = TRUE;
if ((ret = pthread_cond_init(&db_rep->gmdb_idle, NULL)) != 0)
goto err;
gmdb_inited = TRUE;
if ((ret = pthread_cond_init(&db_rep->msg_avail, NULL)) != 0)
goto err;
queue_inited = TRUE;
if ((ret = pipe(file_desc)) == -1) {
ret = errno;
goto err;
}
db_rep->read_pipe = file_desc[0];
db_rep->write_pipe = file_desc[1];
return (0);
err:
if (queue_inited)
(void)pthread_cond_destroy(&db_rep->msg_avail);
if (gmdb_inited)
(void)pthread_cond_destroy(&db_rep->gmdb_idle);
if (elect_inited)
(void)pthread_cond_destroy(&db_rep->check_election);
if (ack_inited)
(void)__repmgr_destroy_waiters(env, &db_rep->ack_waiters);
db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC;
return (ret);
}
int
__repmgr_deinit(env)
ENV *env;
{
DB_REP *db_rep;
int ret, t_ret;
db_rep = env->rep_handle;
if (!(REPMGR_INITED(db_rep)))
return (0);
ret = pthread_cond_destroy(&db_rep->msg_avail);
if ((t_ret = pthread_cond_destroy(&db_rep->gmdb_idle)) != 0 &&
ret == 0)
ret = t_ret;
if ((t_ret = pthread_cond_destroy(&db_rep->check_election)) != 0 &&
ret == 0)
ret = t_ret;
if ((t_ret = __repmgr_destroy_waiters(env,
&db_rep->ack_waiters)) != 0 && ret == 0)
ret = t_ret;
if (close(db_rep->read_pipe) == -1 && ret == 0)
ret = errno;
if (close(db_rep->write_pipe) == -1 && ret == 0)
ret = errno;
db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC;
return (ret);
}
int
__repmgr_init_waiters(env, waiters)
ENV *env;
waiter_t *waiters;
{
COMPQUIET(env, NULL);
return (pthread_cond_init(waiters, NULL));
}
int
__repmgr_destroy_waiters(env, waiters)
ENV *env;
waiter_t *waiters;
{
COMPQUIET(env, NULL);
return (pthread_cond_destroy(waiters));
}
int
__repmgr_lock_mutex(mutex)
mgr_mutex_t *mutex;
{
return (pthread_mutex_lock(mutex));
}
int
__repmgr_unlock_mutex(mutex)
mgr_mutex_t *mutex;
{
return (pthread_mutex_unlock(mutex));
}
int
__repmgr_signal(v)
cond_var_t *v;
{
return (pthread_cond_broadcast(v));
}
int
__repmgr_wake_msngers(env, n)
ENV *env;
u_int n;
{
DB_REP *db_rep;
COMPQUIET(n, 0);
db_rep = env->rep_handle;
return (__repmgr_signal(&db_rep->msg_avail));
}
int
__repmgr_wake_main_thread(env)
ENV *env;
{
DB_REP *db_rep;
u_int8_t any_value;
COMPQUIET(any_value, 0);
db_rep = env->rep_handle;
if (write(db_rep->write_pipe, VOID_STAR_CAST &any_value, 1) == -1)
return (errno);
return (0);
}
int
__repmgr_writev(fd, iovec, buf_count, byte_count_p)
socket_t fd;
db_iovec_t *iovec;
int buf_count;
size_t *byte_count_p;
{
int nw, result;
if ((nw = writev(fd, iovec, buf_count)) == -1) {
result = errno;
DB_ASSERT(NULL, result != 0);
return (result);
}
*byte_count_p = (size_t)nw;
return (0);
}
int
__repmgr_readv(fd, iovec, buf_count, byte_count_p)
socket_t fd;
db_iovec_t *iovec;
int buf_count;
size_t *byte_count_p;
{
int result;
ssize_t nw;
if ((nw = readv(fd, iovec, buf_count)) == -1) {
result = errno;
DB_ASSERT(NULL, result != 0);
return (result);
}
*byte_count_p = (size_t)nw;
return (0);
}
int
__repmgr_select_loop(env)
ENV *env;
{
struct timeval select_timeout, *select_timeout_p;
DB_REP *db_rep;
db_timespec timeout;
fd_set reads, writes;
struct io_info io_info;
int ret;
u_int8_t buf[10];
db_rep = env->rep_handle;
LOCK_MUTEX(db_rep->mutex);
if ((ret = __repmgr_first_try_connections(env)) != 0)
goto out;
for (;;) {
FD_ZERO(&reads);
FD_ZERO(&writes);
FD_SET((u_int)db_rep->read_pipe, &reads);
io_info.maxfd = db_rep->read_pipe;
if (!IS_SUBORDINATE(db_rep)) {
FD_SET((u_int)db_rep->listen_fd, &reads);
if (db_rep->listen_fd > io_info.maxfd)
io_info.maxfd = db_rep->listen_fd;
}
io_info.reads = &reads;
io_info.writes = &writes;
if ((ret = __repmgr_each_connection(env,
prepare_io, &io_info, TRUE)) != 0)
goto out;
if (__repmgr_compute_timeout(env, &timeout)) {
select_timeout.tv_sec = timeout.tv_sec;
select_timeout.tv_usec = timeout.tv_nsec / NS_PER_US;
select_timeout_p = &select_timeout;
} else {
select_timeout_p = NULL;
}
UNLOCK_MUTEX(db_rep->mutex);
if ((ret = select(io_info.maxfd + 1,
&reads, &writes, NULL, select_timeout_p)) == -1) {
switch (ret = errno) {
case EINTR:
case EWOULDBLOCK:
LOCK_MUTEX(db_rep->mutex);
continue;
default:
__db_err(env, ret, DB_STR("3634",
"select"));
return (ret);
}
}
LOCK_MUTEX(db_rep->mutex);
if (db_rep->repmgr_status == stopped) {
ret = 0;
goto out;
}
if ((ret = __repmgr_check_timeouts(env)) != 0)
goto out;
if ((ret = __repmgr_each_connection(env,
__repmgr_conn_work, &io_info, TRUE)) != 0)
goto out;
if (FD_ISSET((u_int)db_rep->read_pipe, &reads) &&
read(db_rep->read_pipe, VOID_STAR_CAST buf,
sizeof(buf)) <= 0) {
ret = errno;
goto out;
}
if (!IS_SUBORDINATE(db_rep) &&
FD_ISSET((u_int)db_rep->listen_fd, &reads) &&
(ret = __repmgr_accept(env)) != 0)
goto out;
}
out:
UNLOCK_MUTEX(db_rep->mutex);
if (ret == DB_DELETED)
ret = __repmgr_bow_out(env);
LOCK_MUTEX(db_rep->mutex);
(void)__repmgr_net_close(env);
UNLOCK_MUTEX(db_rep->mutex);
return (ret);
}
static int
prepare_io(env, conn, info_)
ENV *env;
REPMGR_CONNECTION *conn;
void *info_;
{
struct io_info *info;
info = info_;
if (conn->state == CONN_DEFUNCT)
return (__repmgr_cleanup_defunct(env, conn));
if (!STAILQ_EMPTY(&conn->outbound_queue)) {
FD_SET((u_int)conn->fd, info->writes);
if (conn->fd > info->maxfd)
info->maxfd = conn->fd;
}
FD_SET((u_int)conn->fd, info->reads);
if (conn->fd > info->maxfd)
info->maxfd = conn->fd;
return (0);
}
static int
__repmgr_conn_work(env, conn, info_)
ENV *env;
REPMGR_CONNECTION *conn;
void *info_;
{
struct io_info *info;
int ret;
u_int fd;
ret = 0;
fd = (u_int)conn->fd;
info = info_;
if (conn->state == CONN_DEFUNCT)
return (0);
if (FD_ISSET(fd, info->writes))
ret = __repmgr_write_some(env, conn);
if (ret == 0 && FD_ISSET(fd, info->reads))
ret = __repmgr_read_from_site(env, conn);
if (ret == DB_REP_UNAVAIL)
ret = __repmgr_bust_connection(env, conn);
return (ret);
}