#include "packetutils.h"
#include "retryq.h"
#include "config.h"
#include "logging.h"
#include "internal.h"
#include "bucketconfig/clconfig.h"
#define LOGARGS(rq, lvl) (rq)->settings, "retryq", LCB_LOG_##lvl, __FILE__, __LINE__
#define RETRY_PKT_KEY "retry_queue"
typedef struct {
mc_EPKTDATUM epd;
lcb_list_t ll_sched;
lcb_list_t ll_tmo;
hrtime_t start;
hrtime_t trytime;
mc_PACKET *pkt;
lcb_error_t origerr;
} lcb_RETRYOP;
#define RETRY_INTERVAL_NS(q) LCB_US2NS((q)->settings->retry_interval)
#define TIMEFUZZ_NS LCB_US2NS(LCB_MS2US(5))
static void
update_trytime(lcb_RETRYQ *rq, lcb_RETRYOP *op, hrtime_t now)
{
if (!now) {
now = gethrtime();
}
op->trytime = now + (hrtime_t) (
(float)RETRY_INTERVAL_NS(rq) *
(float)op->pkt->retries *
(float)rq->settings->retry_backoff);
}
static int
cmpfn_tmo(lcb_list_t *ll_a, lcb_list_t *ll_b)
{
lcb_RETRYOP *a = LCB_LIST_ITEM(ll_a, lcb_RETRYOP, ll_tmo);
lcb_RETRYOP *b = LCB_LIST_ITEM(ll_b, lcb_RETRYOP, ll_tmo);
if (a->start == b->start) {
return 0;
} else if (a->start> b->start) {
return 1;
} else {
return -1;
}
}
static int
cmpfn_retry(lcb_list_t *ll_a, lcb_list_t *ll_b)
{
lcb_RETRYOP *a = LCB_LIST_ITEM(ll_a, lcb_RETRYOP, ll_sched);
lcb_RETRYOP *b = LCB_LIST_ITEM(ll_b, lcb_RETRYOP, ll_sched);
if (a->trytime == b->trytime) {
return 0;
} else if (a->trytime > b->trytime) {
return 1;
} else {
return -1;
}
}
static void
assign_error(lcb_RETRYOP *op, lcb_error_t err)
{
if (err == LCB_NOT_MY_VBUCKET) {
err = LCB_ETIMEDOUT;
}
if (op->origerr == LCB_SUCCESS) {
op->origerr = err;
}
if (err == LCB_ETIMEDOUT) {
return;
}
if (LCB_EIFNET(op->origerr) && op->origerr != LCB_ETIMEDOUT &&
(err == LCB_NETWORK_ERROR || err == LCB_CONNECT_ERROR)) {
return;
}
op->origerr = err;
}
static void
clean_op(lcb_RETRYOP *op)
{
lcb_list_delete(&op->ll_sched);
lcb_list_delete(&op->ll_tmo);
}
static void
bail_op(lcb_RETRYQ *rq, lcb_RETRYOP *op, lcb_error_t err)
{
packet_info info;
protocol_binary_request_header hdr;
protocol_binary_response_header *res = &info.res;
mc_SERVER tmpsrv;
mc_PIPELINE *pltmp = &tmpsrv.pipeline;
memset(&tmpsrv, 0, sizeof tmpsrv);
tmpsrv.instance = rq->cq->cqdata;
tmpsrv.pipeline.parent = rq->cq;
memset(&info, 0, sizeof(info));
mcreq_read_hdr(op->pkt, &hdr);
res->response.opcode = hdr.request.opcode;
res->response.status = ntohs(PROTOCOL_BINARY_RESPONSE_EINVAL);
res->response.opaque = hdr.request.opaque;
assign_error(op, err);
lcb_log(LOGARGS(rq, WARN), "Failing command (seq=%u) from retry queue with error code 0x%x", op->pkt->opaque, op->origerr);
mcreq_dispatch_response(pltmp, op->pkt, &info, op->origerr);
op->pkt->flags |= MCREQ_F_FLUSHED|MCREQ_F_INVOKED;
clean_op(op);
mcreq_packet_done(pltmp, op->pkt);
lcb_maybe_breakout(rq->cq->cqdata);
}
static void
do_schedule(lcb_RETRYQ *q, hrtime_t now)
{
hrtime_t tmonext, schednext, diff, selected;
uint32_t us_interval;
lcb_RETRYOP *first_tmo, *first_sched;
if (!now) {
now = gethrtime();
}
if (LCB_LIST_IS_EMPTY(&q->schedops)) {
lcbio_timer_disarm(q->timer);
return;
}
first_tmo = LCB_LIST_ITEM(LCB_LIST_HEAD(&q->tmoops), lcb_RETRYOP, ll_tmo);
first_sched = LCB_LIST_ITEM(LCB_LIST_HEAD(&q->schedops), lcb_RETRYOP, ll_sched);
schednext = first_sched->trytime;
tmonext = first_tmo->start;
tmonext += LCB_US2NS(q->settings->operation_timeout);
selected = schednext > tmonext ? tmonext : schednext;
if (selected <= now) {
diff = 0;
} else {
diff = selected - now;
}
us_interval = LCB_NS2US(diff);
lcb_log(LOGARGS(q, TRACE), "Next tick in %u ms", (unsigned)us_interval/1000);
lcbio_timer_rearm(q->timer, us_interval);
}
static void
rq_flush(lcb_RETRYQ *rq, int throttle)
{
hrtime_t now = gethrtime();
lcb_list_t *ll, *ll_next;
lcb_list_t resched_next;
LCB_LIST_SAFE_FOR(ll, ll_next, &rq->tmoops) {
lcb_RETRYOP *op = LCB_LIST_ITEM(ll, lcb_RETRYOP, ll_tmo);
hrtime_t curtmo = op->start + LCB_US2NS(rq->settings->operation_timeout);
if (curtmo <= now) {
bail_op(rq, op, LCB_ETIMEDOUT);
} else {
break;
}
}
lcb_list_init(&resched_next);
LCB_LIST_SAFE_FOR(ll, ll_next, &rq->schedops) {
protocol_binary_request_header hdr;
int vbid, srvix;
hrtime_t curnext;
lcb_RETRYOP *op = LCB_LIST_ITEM(ll, lcb_RETRYOP, ll_sched);
curnext = op->trytime - TIMEFUZZ_NS;
if (curnext > now && throttle) {
break;
}
mcreq_read_hdr(op->pkt, &hdr);
vbid = ntohs(hdr.request.vbucket);
srvix = lcbvb_vbmaster(rq->cq->config, vbid);
if (srvix < 0 || (unsigned)srvix >= rq->cq->npipelines) {
lcb_t instance = rq->cq->cqdata;
assign_error(op, LCB_NO_MATCHING_SERVER);
lcb_bootstrap_common(instance, LCB_BS_REFRESH_THROTTLE);
if (lcb_confmon_is_refreshing(instance->confmon) ||
rq->settings->retry[LCB_RETRY_ON_MISSINGNODE]) {
lcb_list_delete(&op->ll_sched);
lcb_list_delete(&op->ll_tmo);
lcb_list_append(&resched_next, &op->ll_sched);
op->pkt->retries++;
update_trytime(rq, op, now);
} else {
bail_op(rq, op, LCB_NO_MATCHING_SERVER);
}
} else {
mc_PIPELINE *newpl = rq->cq->pipelines[srvix];
mcreq_enqueue_packet(newpl, op->pkt);
newpl->flush_start(newpl);
clean_op(op);
}
}
LCB_LIST_SAFE_FOR(ll, ll_next, &resched_next) {
lcb_RETRYOP *op = LCB_LIST_ITEM(ll, lcb_RETRYOP, ll_sched);
lcb_list_add_sorted(&rq->schedops, &op->ll_sched, cmpfn_retry);
lcb_list_add_sorted(&rq->tmoops, &op->ll_tmo, cmpfn_tmo);
}
do_schedule(rq, now);
}
static void
rq_tick(void *arg)
{
rq_flush(arg, 1);
}
void
lcb_retryq_signal(lcb_RETRYQ *rq)
{
rq_flush(rq, 0);
}
static void
op_dtorfn(mc_EPKTDATUM *d)
{
free(d);
}
#define RETRY_SCHED_IMM 0x01
static void
add_op(lcb_RETRYQ *rq, mc_EXPACKET *pkt, const lcb_error_t err, int options)
{
lcb_RETRYOP *op;
mc_EPKTDATUM *d;
d = mcreq_epkt_find(pkt, RETRY_PKT_KEY);
if (d) {
op = (lcb_RETRYOP *)d;
} else {
op = calloc(1, sizeof *op);
op->epd.dtorfn = op_dtorfn;
op->epd.key = RETRY_PKT_KEY;
op->start = MCREQ_PKT_RDATA(&pkt->base)->start;
mcreq_epkt_insert(pkt, &op->epd);
}
op->pkt = &pkt->base;
pkt->base.retries++;
assign_error(op, err);
if (options & RETRY_SCHED_IMM) {
op->trytime = gethrtime();
} else if (err == LCB_NOT_MY_VBUCKET) {
op->trytime = gethrtime() + LCB_US2NS(rq->settings->retry_nmv_interval);
} else {
update_trytime(rq, op, 0);
}
lcb_list_add_sorted(&rq->schedops, &op->ll_sched, cmpfn_retry);
lcb_list_add_sorted(&rq->tmoops, &op->ll_tmo, cmpfn_tmo);
lcb_log(LOGARGS(rq, DEBUG), "Adding PKT=%p to retry queue. Try count=%u", (void*)pkt, pkt->base.retries);
do_schedule(rq, 0);
}
void
lcb_retryq_add(lcb_RETRYQ *rq, mc_EXPACKET *pkt, lcb_error_t err)
{
add_op(rq, pkt, err, 0);
}
void
lcb_retryq_nmvadd(lcb_RETRYQ *rq, mc_EXPACKET *detchpkt)
{
int flags = 0;
if (rq->settings->nmv_retry_imm) {
flags = RETRY_SCHED_IMM;
}
add_op(rq, detchpkt, LCB_NOT_MY_VBUCKET, flags);
}
static void
fallback_handler(mc_CMDQUEUE *cq, mc_PACKET *pkt)
{
lcb_t instance = cq->cqdata;
mc_PACKET *copy = mcreq_renew_packet(pkt);
add_op(instance->retryq, (mc_EXPACKET*)copy,
LCB_NO_MATCHING_SERVER, RETRY_SCHED_IMM);
}
void
lcb_retryq_reset_timeouts(lcb_RETRYQ *rq, lcb_U64 now)
{
lcb_list_t *ll;
LCB_LIST_FOR(ll, &rq->schedops) {
lcb_RETRYOP *op = LCB_LIST_ITEM(ll, lcb_RETRYOP, ll_sched);
op->start = now;
}
}
lcb_RETRYQ *
lcb_retryq_new(mc_CMDQUEUE *cq, lcbio_pTABLE table, lcb_settings *settings)
{
lcb_RETRYQ *rq = calloc(1, sizeof(*rq));
rq->settings = settings;
rq->cq = cq;
rq->timer = lcbio_timer_new(table, rq, rq_tick);
lcb_settings_ref(settings);
lcb_list_init(&rq->tmoops);
lcb_list_init(&rq->schedops);
mcreq_set_fallback_handler(cq, fallback_handler);
return rq;
}
void
lcb_retryq_destroy(lcb_RETRYQ *rq)
{
lcb_list_t *llcur, *llnext;
LCB_LIST_SAFE_FOR(llcur, llnext, &rq->schedops) {
lcb_RETRYOP *op = LCB_LIST_ITEM(llcur, lcb_RETRYOP, ll_sched);
bail_op(rq, op, LCB_ERROR);
}
lcbio_timer_destroy(rq->timer);
lcb_settings_unref(rq->settings);
free(rq);
}
lcb_error_t
lcb_retryq_origerr(const mc_PACKET *packet)
{
mc_EPKTDATUM *d;
lcb_RETRYOP *op;
if (! (packet->flags & MCREQ_F_DETACHED)) {
return LCB_SUCCESS;
}
d = mcreq_epkt_find((mc_EXPACKET*)packet, RETRY_PKT_KEY);
if (!d) {
return LCB_SUCCESS;
}
op = (lcb_RETRYOP *)d;
return op->origerr;
}
void
lcb_retryq_dump(lcb_RETRYQ *rq, FILE *fp, mcreq_payload_dump_fn dumpfn)
{
lcb_list_t *cur;
LCB_LIST_FOR(cur, &rq->schedops) {
lcb_RETRYOP *op = LCB_LIST_ITEM(cur, lcb_RETRYOP, ll_sched);
mcreq_dump_packet(op->pkt, fp, dumpfn);
}
(void)fp;
}