#include "internal.h"
#include "logging.h"
#include "vbucket/aliases.h"
#include "settings.h"
#include "negotiate.h"
#include "bucketconfig/clconfig.h"
#include "mc/mcreq-flush-inl.h"
#include <lcbio/ssl.h>
#include "ctx-log-inl.h"
#define LOGARGS(c, lvl) (c)->settings, "server", LCB_LOG_##lvl, __FILE__, __LINE__
#define LOGFMT "<%s:%s> (SRV=%p,IX=%d) "
#define LOGID(server) get_ctx_host(server->connctx), get_ctx_port(server->connctx), (void*)server, server->pipeline.index
#define MCREQ_MAXIOV 32
#define LCBCONN_UNWANT(conn, flags) (conn)->want &= ~(flags)
typedef enum {
S_CLEAN,
S_ERRDRAIN,
S_CLOSED
} mcserver_STATE;
static int check_closed(mc_SERVER *);
static void start_errored_ctx(mc_SERVER *server, mcserver_STATE next_state);
static void finalize_errored_ctx(mc_SERVER *server);
static void on_error(lcbio_CTX *ctx, lcb_error_t err);
static void server_socket_failed(mc_SERVER *server, lcb_error_t err);
static void
on_flush_ready(lcbio_CTX *ctx)
{
mc_SERVER *server = lcbio_ctx_data(ctx);
nb_IOV iov[MCREQ_MAXIOV];
int ready;
do {
int niov = 0;
unsigned nb;
nb = mcreq_flush_iov_fill(&server->pipeline, iov, MCREQ_MAXIOV, &niov);
if (!nb) {
return;
}
ready = lcbio_ctx_put_ex(ctx, (lcb_IOV *)iov, niov, nb);
} while (ready);
lcbio_ctx_wwant(ctx);
}
static void
on_flush_done(lcbio_CTX *ctx, unsigned expected, unsigned actual)
{
mc_SERVER *server = lcbio_ctx_data(ctx);
lcb_U64 now = 0;
if (server->settings->readj_ts_wait) {
now = gethrtime();
}
mcreq_flush_done_ex(&server->pipeline, actual, expected, now);
check_closed(server);
}
void
mcserver_flush(mc_SERVER *server)
{
if (!server->connctx->rdwant) {
lcbio_ctx_rwant(server->connctx, 24);
}
lcbio_ctx_wwant(server->connctx);
lcbio_ctx_schedule(server->connctx);
if (!lcbio_timer_armed(server->io_timer)) {
lcbio_timer_rearm(server->io_timer, MCSERVER_TIMEOUT(server));
}
}
LIBCOUCHBASE_API
void
lcb_sched_flush(lcb_t instance)
{
unsigned ii;
for (ii = 0; ii < LCBT_NSERVERS(instance); ii++) {
mc_SERVER *server = LCBT_GET_SERVER(instance, ii);
if (!mcserver_has_pending(server)) {
continue;
}
server->pipeline.flush_start(&server->pipeline);
}
}
static int
handle_nmv(mc_SERVER *oldsrv, packet_info *resinfo, mc_PACKET *oldpkt)
{
mc_PACKET *newpkt;
protocol_binary_request_header hdr;
lcb_error_t err = LCB_ERROR;
lcb_t instance = oldsrv->instance;
lcb_U16 vbid;
clconfig_provider *cccp = lcb_confmon_get_provider(instance->confmon,
LCB_CLCONFIG_CCCP);
mcreq_read_hdr(oldpkt, &hdr);
vbid = ntohs(hdr.request.vbucket);
lcb_log(LOGARGS(oldsrv, WARN), LOGFMT "NOT_MY_VBUCKET. Packet=%p (S=%u). VBID=%u", LOGID(oldsrv), (void*)oldpkt, oldpkt->opaque, vbid);
lcb_vbguess_remap(instance, vbid, oldsrv->pipeline.index);
if (PACKET_NBODY(resinfo) && cccp->enabled) {
lcb_string s;
lcb_string_init(&s);
lcb_string_append(&s, PACKET_VALUE(resinfo), PACKET_NVALUE(resinfo));
err = lcb_cccp_update(cccp, mcserver_get_host(oldsrv), &s);
lcb_string_release(&s);
}
if (err != LCB_SUCCESS) {
int bs_options;
if (instance->cur_configinfo->origin == LCB_CLCONFIG_CCCP) {
bs_options = LCB_BS_REFRESH_THROTTLE;
} else {
bs_options = LCB_BS_REFRESH_ALWAYS;
}
lcb_bootstrap_common(instance, bs_options);
}
if (!lcb_should_retry(oldsrv->settings, oldpkt, LCB_NOT_MY_VBUCKET)) {
return 0;
}
newpkt = mcreq_renew_packet(oldpkt);
newpkt->flags &= ~MCREQ_STATE_FLAGS;
lcb_retryq_nmvadd(instance->retryq, (mc_EXPACKET*)newpkt);
return 1;
}
#define PKT_READ_COMPLETE 1
#define PKT_READ_PARTIAL 0
static int
try_read(lcbio_CTX *ctx, mc_SERVER *server, rdb_IOROPE *ior)
{
packet_info info_s, *info = &info_s;
mc_PACKET *request;
mc_PIPELINE *pl = &server->pipeline;
unsigned pktsize = 24, is_last = 1;
#define RETURN_NEED_MORE(n) \
if (mcserver_has_pending(server)) { \
lcbio_ctx_rwant(ctx, n); \
} \
return PKT_READ_PARTIAL; \
#define DO_ASSIGN_PAYLOAD() \
rdb_consumed(ior, sizeof(info->res.bytes)); \
if (PACKET_NBODY(info)) { \
info->payload = rdb_get_consolidated(ior, PACKET_NBODY(info)); \
} {
#define DO_SWALLOW_PAYLOAD() \
} if (PACKET_NBODY(info)) { \
rdb_consumed(ior, PACKET_NBODY(info)); \
}
if (rdb_get_nused(ior) < pktsize) {
RETURN_NEED_MORE(pktsize)
}
rdb_copyread(ior, info->res.bytes, sizeof info->res.bytes);
pktsize += PACKET_NBODY(info);
if (rdb_get_nused(ior) < pktsize) {
RETURN_NEED_MORE(pktsize);
}
if (PACKET_OPCODE(info) == PROTOCOL_BINARY_CMD_STAT && PACKET_NKEY(info) != 0) {
is_last = 0;
request = mcreq_pipeline_find(pl, PACKET_OPAQUE(info));
} else {
is_last = 1;
request = mcreq_pipeline_remove(pl, PACKET_OPAQUE(info));
}
if (!request) {
lcb_log(LOGARGS(server, WARN), LOGFMT "Found stale packet (OP=0x%x, RC=0x%x, SEQ=%u)", LOGID(server), PACKET_OPCODE(info), PACKET_STATUS(info), PACKET_OPAQUE(info));
rdb_consumed(ior, pktsize);
return PKT_READ_COMPLETE;
}
if (PACKET_STATUS(info) == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
DO_ASSIGN_PAYLOAD()
if (!handle_nmv(server, info, request)) {
mcreq_dispatch_response(pl, request, info, LCB_NOT_MY_VBUCKET);
}
DO_SWALLOW_PAYLOAD()
goto GT_DONE;
}
if (!(request->flags & MCREQ_F_UFWD)) {
DO_ASSIGN_PAYLOAD();
info->bufh = rdb_get_first_segment(ior);
mcreq_dispatch_response(pl, request, info, LCB_SUCCESS);
DO_SWALLOW_PAYLOAD()
} else {
lcb_PKTFWDRESP resp = { 0 };
rdb_ROPESEG *segs;
nb_IOV iov;
rdb_consolidate(ior, pktsize);
rdb_refread_ex(ior, &iov, &segs, 1, pktsize);
resp.bufs = &segs;
resp.iovs = (lcb_IOV*)&iov;
resp.nitems = 1;
resp.header = info->res.bytes;
server->instance->callbacks.pktfwd(
server->instance, MCREQ_PKT_COOKIE(request), LCB_SUCCESS, &resp);
rdb_consumed(ior, pktsize);
}
GT_DONE:
if (is_last) {
mcreq_packet_handled(pl, request);
}
return PKT_READ_COMPLETE;
}
static void
on_read(lcbio_CTX *ctx, unsigned nb)
{
int rv;
mc_SERVER *server = lcbio_ctx_data(ctx);
rdb_IOROPE *ior = &ctx->ior;
if (check_closed(server)) {
return;
}
while ((rv = try_read(ctx, server, ior)) == PKT_READ_COMPLETE);
lcbio_ctx_schedule(ctx);
lcb_maybe_breakout(server->instance);
(void)nb;
}
LCB_INTERNAL_API
int
mcserver_has_pending(mc_SERVER *server)
{
return !SLLIST_IS_EMPTY(&server->pipeline.requests);
}
static void flush_noop(mc_PIPELINE *pipeline) {
(void)pipeline;
}
static void server_connect(mc_SERVER *server);
typedef enum {
REFRESH_ALWAYS,
REFRESH_ONFAILED,
REFRESH_NEVER
} mc_REFRESHPOLICY;
static int
maybe_retry(mc_PIPELINE *pipeline, mc_PACKET *pkt, lcb_error_t err)
{
mc_SERVER *srv = (mc_SERVER *)pipeline;
mc_PACKET *newpkt;
lcb_t instance = pipeline->parent->cqdata;
lcbvb_DISTMODE dist_t = lcbvb_get_distmode(pipeline->parent->config);
if (dist_t != LCBVB_DIST_VBUCKET) {
return 0;
}
if (!lcb_should_retry(srv->settings, pkt, err)) {
return 0;
}
newpkt = mcreq_renew_packet(pkt);
newpkt->flags &= ~MCREQ_STATE_FLAGS;
lcb_retryq_add(instance->retryq, (mc_EXPACKET *)newpkt, err);
return 1;
}
static void
fail_callback(mc_PIPELINE *pipeline, mc_PACKET *pkt, lcb_error_t err, void *arg)
{
int rv;
mc_SERVER *server = (mc_SERVER *)pipeline;
packet_info info;
protocol_binary_request_header hdr;
protocol_binary_response_header *res = &info.res;
if (maybe_retry(pipeline, pkt, err)) {
return;
}
if (err == LCB_AUTH_ERROR) {
err = LCB_MAP_CHANGED;
}
if (err == LCB_ETIMEDOUT) {
lcb_error_t tmperr = lcb_retryq_origerr(pkt);
if (tmperr != LCB_SUCCESS) {
err = tmperr;
}
}
memset(&info, 0, sizeof(info));
memcpy(hdr.bytes, SPAN_BUFFER(&pkt->kh_span), sizeof(hdr.bytes));
res->response.status = ntohs(PROTOCOL_BINARY_RESPONSE_EINVAL);
res->response.opcode = hdr.request.opcode;
res->response.opaque = hdr.request.opaque;
lcb_log(LOGARGS(server, WARN), LOGFMT "Failing command (pkt=%p, opaque=%lu, opcode=0x%x) with error 0x%x", LOGID(server), (void*)pkt, (unsigned long)pkt->opaque, hdr.request.opcode, err);
rv = mcreq_dispatch_response(pipeline, pkt, &info, err);
lcb_assert(rv == 0);
(void)arg;
}
static int
purge_single_server(mc_SERVER *server, lcb_error_t error,
hrtime_t thresh, hrtime_t *next, int policy)
{
unsigned affected;
mc_PIPELINE *pl = &server->pipeline;
if (thresh) {
affected = mcreq_pipeline_timeout(
pl, error, fail_callback, NULL, thresh, next);
} else {
mcreq_pipeline_fail(pl, error, fail_callback, NULL);
affected = -1;
}
if (policy == REFRESH_NEVER) {
return affected;
}
if (affected || policy == REFRESH_ALWAYS) {
lcb_bootstrap_common(server->instance,
LCB_BS_REFRESH_THROTTLE|LCB_BS_REFRESH_INCRERR);
}
return affected;
}
static void flush_errdrain(mc_PIPELINE *pipeline)
{
mc_SERVER *server = (mc_SERVER *)pipeline;
if (!lcbio_timer_armed(server->io_timer)) {
lcbio_timer_rearm(server->io_timer, MCSERVER_TIMEOUT(server));
}
}
void
mcserver_fail_chain(mc_SERVER *server, lcb_error_t err)
{
purge_single_server(server, err, 0, NULL, REFRESH_NEVER);
}
static uint32_t
get_next_timeout(mc_SERVER *server)
{
hrtime_t now, expiry, diff;
mc_PACKET *pkt = mcreq_first_packet(&server->pipeline);
if (!pkt) {
return MCSERVER_TIMEOUT(server);
}
now = gethrtime();
expiry = MCREQ_PKT_RDATA(pkt)->start + LCB_US2NS(MCSERVER_TIMEOUT(server));
if (expiry <= now) {
diff = 0;
} else {
diff = expiry - now;
}
return LCB_NS2US(diff);
}
static void
timeout_server(void *arg)
{
mc_SERVER *server = arg;
hrtime_t now, min_valid, next_ns = 0;
uint32_t next_us;
int npurged;
now = gethrtime();
min_valid = now - LCB_US2NS(MCSERVER_TIMEOUT(server));
npurged = purge_single_server(server,
LCB_ETIMEDOUT, min_valid, &next_ns, REFRESH_ONFAILED);
if (npurged) {
lcb_log(LOGARGS(server, ERROR), LOGFMT "Server timed out. Some commands have failed", LOGID(server));
}
next_us = get_next_timeout(server);
lcb_log(LOGARGS(server, DEBUG), LOGFMT "Scheduling next timeout for %u ms", LOGID(server), next_us / 1000);
lcbio_timer_rearm(server->io_timer, next_us);
lcb_maybe_breakout(server->instance);
}
static int
maybe_retry_timeout_connection(mc_SERVER *server, lcb_error_t err)
{
lcb_U32 next_tmo;
if (err != LCB_ETIMEDOUT) {
return 0;
}
if (!server->settings->readj_ts_wait) {
return 0;
}
if (!mcserver_has_pending(server)) {
return 0;
}
next_tmo = get_next_timeout(server);
if (next_tmo < MCSERVER_TIMEOUT(server) / 2) {
return 0;
}
lcb_log(LOGARGS(server, INFO), LOGFMT "Retrying connection. Assuming timeout because of stalled event loop", LOGID(server));
server_connect(server);
return 1;
}
static void
on_connected(lcbio_SOCKET *sock, void *data, lcb_error_t err, lcbio_OSERR syserr)
{
mc_SERVER *server = data;
lcbio_CTXPROCS procs;
uint32_t tmo;
mc_pSESSINFO sessinfo = NULL;
LCBIO_CONNREQ_CLEAR(&server->connreq);
if (err != LCB_SUCCESS) {
lcb_log(LOGARGS(server, ERR), LOGFMT "Got error for connection! (OS=%d)", LOGID(server), syserr);
if (maybe_retry_timeout_connection(server, err)) {
return;
}
server_socket_failed(server, err);
return;
}
lcb_assert(sock);
sessinfo = mc_sess_get(sock);
if (sessinfo == NULL) {
mc_pSESSREQ sreq;
lcb_log(LOGARGS(server, TRACE), "<%s:%s> (SRV=%p) Session not yet negotiated. Negotiating", server->curhost->host, server->curhost->port, (void*)server);
sreq = mc_sessreq_start(sock, server->settings, MCSERVER_TIMEOUT(server),
on_connected, data);
LCBIO_CONNREQ_MKGENERIC(&server->connreq, sreq, mc_sessreq_cancel);
return;
} else {
server->compsupport = mc_sess_chkfeature(sessinfo,
PROTOCOL_BINARY_FEATURE_DATATYPE);
server->mutation_tokens = mc_sess_chkfeature(sessinfo,
PROTOCOL_BINARY_FEATURE_MUTATION_SEQNO);
}
procs.cb_err = on_error;
procs.cb_read = on_read;
procs.cb_flush_done = on_flush_done;
procs.cb_flush_ready = on_flush_ready;
server->connctx = lcbio_ctx_new(sock, server, &procs);
server->connctx->subsys = "memcached";
server->pipeline.flush_start = (mcreq_flushstart_fn)mcserver_flush;
tmo = get_next_timeout(server);
lcb_log(LOGARGS(server, DEBUG), LOGFMT "Setting initial timeout=%ums", LOGID(server), tmo/1000);
lcbio_timer_rearm(server->io_timer, get_next_timeout(server));
mcserver_flush(server);
}
static void
server_connect(mc_SERVER *server)
{
lcbio_pMGRREQ mr;
mr = lcbio_mgr_get(server->instance->memd_sockpool, server->curhost,
MCSERVER_TIMEOUT(server), on_connected, server);
LCBIO_CONNREQ_MKPOOLED(&server->connreq, mr);
server->pipeline.flush_start = flush_noop;
server->state = S_CLEAN;
}
static void
buf_done_cb(mc_PIPELINE *pl, const void *cookie, void *kbuf, void *vbuf)
{
mc_SERVER *server = (mc_SERVER*)pl;
server->instance->callbacks.pktflushed(server->instance, cookie);
(void)kbuf; (void)vbuf;
}
mc_SERVER *
mcserver_alloc(lcb_t instance, int ix)
{
mc_SERVER *ret;
lcbvb_SVCMODE mode;
const char *datahost;
ret = calloc(1, sizeof(*ret));
if (!ret) {
return ret;
}
ret->instance = instance;
ret->settings = instance->settings;
ret->curhost = calloc(1, sizeof(*ret->curhost));
mode = ret->settings->sslopts & LCB_SSL_ENABLED
? LCBVB_SVCMODE_SSL : LCBVB_SVCMODE_PLAIN;
lcb_settings_ref(ret->settings);
mcreq_pipeline_init(&ret->pipeline);
ret->pipeline.flush_start = (mcreq_flushstart_fn)server_connect;
ret->pipeline.buf_done_callback = buf_done_cb;
datahost = lcbvb_get_hostport(
LCBT_VBCONFIG(instance), ix, LCBVB_SVCTYPE_DATA, mode);
if (datahost) {
lcb_host_parsez(ret->curhost, datahost, LCB_CONFIG_MCD_PORT);
}
ret->io_timer = lcbio_timer_new(instance->iotable, ret, timeout_server);
return ret;
}
static void
server_free(mc_SERVER *server)
{
mcreq_pipeline_cleanup(&server->pipeline);
if (server->io_timer) {
lcbio_timer_destroy(server->io_timer);
}
free(server->curhost);
lcb_settings_unref(server->settings);
free(server);
}
static void
close_cb(lcbio_SOCKET *sock, int reusable, void *arg)
{
lcbio_ref(sock);
lcbio_mgr_discard(sock);
(void)reusable;(void)arg;
}
static void
release_unflushed_packets(mc_SERVER *server)
{
unsigned toflush;
nb_IOV iov;
mc_PIPELINE *pl = &server->pipeline;
while ((toflush = mcreq_flush_iov_fill(pl, &iov, 1, NULL))) {
mcreq_flush_done(pl, toflush, toflush);
}
}
static void
on_error(lcbio_CTX *ctx, lcb_error_t err)
{
mc_SERVER *server = lcbio_ctx_data(ctx);
lcb_log(LOGARGS(server, WARN), LOGFMT "Got socket error 0x%x", LOGID(server), err);
if (check_closed(server)) {
return;
}
server_socket_failed(server, err);
}
static void
server_socket_failed(mc_SERVER *server, lcb_error_t err)
{
if (check_closed(server)) {
return;
}
purge_single_server(server, err, 0, NULL, REFRESH_ALWAYS);
lcb_maybe_breakout(server->instance);
start_errored_ctx(server, S_ERRDRAIN);
}
void
mcserver_close(mc_SERVER *server)
{
lcb_assert(server->state != S_CLOSED);
start_errored_ctx(server, S_CLOSED);
}
static void
start_errored_ctx(mc_SERVER *server, mcserver_STATE next_state)
{
lcbio_CTX *ctx = server->connctx;
server->state = next_state;
lcbio_connreq_cancel(&server->connreq);
if (next_state == S_CLOSED && server->io_timer != NULL) {
lcbio_timer_destroy(server->io_timer);
server->io_timer = NULL;
}
if (ctx == NULL) {
if (next_state == S_CLOSED) {
server_free(server);
return;
} else {
server->pipeline.flush_start = (mcreq_flushstart_fn)server_connect;
if (mcserver_has_pending(server)) {
if (!lcbio_timer_armed(server->io_timer)) {
lcbio_timer_rearm(server->io_timer, MCSERVER_TIMEOUT(server));
}
server_connect(server);
}
}
} else {
if (ctx->npending) {
lcbio_ctx_schedule(ctx);
lcbio_shutdown(lcbio_ctx_sock(ctx));
if (next_state == S_ERRDRAIN) {
server->pipeline.flush_start = (mcreq_flushstart_fn)flush_errdrain;
}
} else {
finalize_errored_ctx(server);
}
}
}
static void
finalize_errored_ctx(mc_SERVER *server)
{
if (server->connctx->npending) {
return;
}
lcb_log(LOGARGS(server, DEBUG), LOGFMT "Finalizing ctx %p", LOGID(server), (void*)server->connctx);
lcbio_ctx_close(server->connctx, close_cb, NULL);
server->connctx = NULL;
release_unflushed_packets(server);
if (server->state == S_CLOSED) {
server_free(server);
} else {
server->state = S_CLEAN;
server->pipeline.flush_start = (mcreq_flushstart_fn)server_connect;
server_connect(server);
}
}
static int
check_closed(mc_SERVER *server)
{
if (server->state == S_CLEAN) {
return 0;
}
lcb_log(LOGARGS(server, INFO), LOGFMT "Got handler after close. Checking pending calls (pending=%u)", LOGID(server), server->connctx->npending);
finalize_errored_ctx(server);
return 1;
}