#include "docreq.h"
#include "internal.h"
#include "sllist-inl.h"
static void docreq_handler(void *arg);
static void invoke_pending(lcb_DOCQUEUE*);
static void doc_callback(lcb_t,int, const lcb_RESPBASE *);
#define MAX_PENDING_DOCREQ 10
#define MIN_SCHED_SIZE 5
#define DOCQ_DELAY_US 200000
#define DOCQ_REF(q) (q)->refcount++
#define DOCQ_UNREF(q) if (!--(q)->refcount) { docq_free(q); }
lcb_DOCQUEUE *
lcbdocq_create(lcb_t instance)
{
lcb_DOCQUEUE *q = calloc(1, sizeof *q);
q->timer = lcbio_timer_new(instance->iotable, q, docreq_handler);
q->refcount = 1;
q->instance = instance;
q->max_pending_response = MAX_PENDING_DOCREQ;
q->min_batch_size = MIN_SCHED_SIZE;
return q;
}
static void
docq_free(lcb_DOCQUEUE *q)
{
lcbdocq_cancel(q);
lcbio_timer_destroy(q->timer);
free(q);
}
void
lcbdocq_unref(lcb_DOCQUEUE *q)
{
DOCQ_UNREF(q);
}
void
lcbdocq_cancel(lcb_DOCQUEUE *q)
{
if (!q->cancelled) {
q->cancelled = 1;
}
}
static void
docq_poke(lcb_DOCQUEUE *q)
{
if (q->n_awaiting_response < q->max_pending_response) {
if (q->n_awaiting_schedule > q->min_batch_size) {
lcbio_async_signal(q->timer);
q->cb_throttle(q, 0);
}
}
if (!lcbio_timer_armed(q->timer)) {
lcbio_timer_rearm(q->timer, DOCQ_DELAY_US);
}
}
void
lcbdocq_add(lcb_DOCQUEUE *q, lcb_DOCQREQ *req)
{
sllist_append(&q->pending_gets, &req->slnode);
q->n_awaiting_schedule++;
req->parent = q;
req->ready = 0;
DOCQ_REF(q);
docq_poke(q);
}
static void
docreq_handler(void *arg)
{
lcb_DOCQUEUE *q = arg;
sllist_iterator iter;
lcb_t instance = q->instance;
lcb_sched_enter(instance);
SLLIST_ITERFOR(&q->pending_gets, &iter) {
lcb_DOCQREQ *cont = SLLIST_ITEM(iter.cur, lcb_DOCQREQ, slnode);
if (q->n_awaiting_response > q->max_pending_response) {
lcbio_timer_rearm(q->timer, DOCQ_DELAY_US);
q->cb_throttle(q, 1);
break;
}
q->n_awaiting_schedule--;
if (q->cancelled) {
cont->docresp.rc = LCB_EINTERNAL;
cont->ready = 1;
} else {
lcb_error_t rc;
lcb_CMDGET gcmd = { 0 };
LCB_CMD_SET_KEY(&gcmd, cont->docid.iov_base, cont->docid.iov_len);
cont->callback = doc_callback;
gcmd.cmdflags |= LCB_CMD_F_INTERNAL_CALLBACK;
rc = lcb_get3(instance, &cont->callback, &gcmd);
if (rc != LCB_SUCCESS) {
cont->docresp.rc = rc;
cont->ready = 1;
} else {
q->n_awaiting_response++;
}
}
sllist_iter_remove(&q->pending_gets, &iter);
sllist_append(&q->cb_queue, &cont->slnode);
}
lcb_sched_leave(instance);
lcb_sched_flush(instance);
if (q->n_awaiting_schedule < q->min_batch_size) {
q->cb_throttle(q, 0);
}
docq_poke(q);
invoke_pending(q);
}
static void
invoke_pending(lcb_DOCQUEUE *q)
{
sllist_iterator iter = { NULL };
DOCQ_REF(q);
SLLIST_ITERFOR(&q->cb_queue, &iter) {
lcb_DOCQREQ *dreq = SLLIST_ITEM(iter.cur, lcb_DOCQREQ, slnode);
void *bufh = NULL;
if (dreq->ready == 0) {
break;
}
if (dreq->docresp.rc == LCB_SUCCESS && dreq->docresp.bufh) {
bufh = dreq->docresp.bufh;
}
sllist_iter_remove(&q->cb_queue, &iter);
q->cb_ready(q, dreq);
if (bufh) {
lcb_backbuf_unref(bufh);
}
DOCQ_UNREF(q);
}
DOCQ_UNREF(q);
}
static void
doc_callback(lcb_t instance, int cbtype, const lcb_RESPBASE *rb)
{
const lcb_RESPGET *rg = (const lcb_RESPGET *)rb;
lcb_DOCQREQ *dreq = rb->cookie;
lcb_DOCQUEUE *q = dreq->parent;
DOCQ_REF(q);
q->n_awaiting_response--;
dreq->docresp = *rg;
dreq->ready = 1;
dreq->docresp.key = dreq->docid.iov_base;
dreq->docresp.nkey = dreq->docid.iov_len;
if (rg->rc == LCB_SUCCESS) {
lcb_backbuf_ref(dreq->docresp.bufh);
}
invoke_pending(q);
docq_poke(q);
DOCQ_UNREF(q);
(void)instance; (void)cbtype;
}