#include "internal.h"
#include "packetutils.h"
#include "mc/mcreq.h"
#include "mc/compress.h"
#include "trace.h"
#define MK_RESPACK(dstname, srcname) \
typedef struct { srcname resp; lcb_MUTATION_TOKEN mt; } dstname
MK_RESPACK(respack_STORE, lcb_RESPSTORE);
MK_RESPACK(respack_COUNTER, lcb_RESPCOUNTER);
MK_RESPACK(respack_REMOVE, lcb_RESPREMOVE);
MK_RESPACK(respack_SUBDOC, lcb_RESPSUBDOC);
LIBCOUCHBASE_API
lcb_error_t
lcb_errmap_default(lcb_t instance, lcb_uint16_t in)
{
(void)instance;
switch (in) {
case PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET:
return LCB_ETIMEDOUT;
case PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE:
return LCB_AUTH_CONTINUE;
case PROTOCOL_BINARY_RESPONSE_EBUSY:
return LCB_EBUSY;
case PROTOCOL_BINARY_RESPONSE_ETMPFAIL:
return LCB_ETMPFAIL;
case PROTOCOL_BINARY_RESPONSE_EINTERNAL:
default:
fprintf(stderr, "COUCHBASE: Unhandled memcached status=0x%x\n", in);
return LCB_UNKNOWN_MEMCACHED_ERROR;
}
}
static lcb_error_t
map_error(lcb_t instance, int in)
{
(void)instance;
switch (in) {
case PROTOCOL_BINARY_RESPONSE_SUCCESS:
return LCB_SUCCESS;
case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
return LCB_KEY_ENOENT;
case PROTOCOL_BINARY_RESPONSE_E2BIG:
return LCB_E2BIG;
case PROTOCOL_BINARY_RESPONSE_ENOMEM:
return LCB_ENOMEM;
case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
return LCB_KEY_EEXISTS;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_ENOENT:
return LCB_SUBDOC_PATH_ENOENT;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_MISMATCH:
return LCB_SUBDOC_PATH_MISMATCH;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_EINVAL:
return LCB_SUBDOC_PATH_EINVAL;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_E2BIG:
return LCB_SUBDOC_PATH_E2BIG;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_DOC_E2DEEP:
return LCB_SUBDOC_DOC_E2DEEP;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_VALUE_ETOODEEP:
return LCB_SUBDOC_VALUE_E2DEEP;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_VALUE_CANTINSERT:
return LCB_SUBDOC_VALUE_CANTINSERT;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_DOC_NOTJSON:
return LCB_SUBDOC_DOC_NOTJSON;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_NUM_ERANGE:
return LCB_SUBDOC_NUM_ERANGE;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_DELTA_ERANGE:
return LCB_SUBDOC_BAD_DELTA;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_PATH_EEXISTS:
return LCB_SUBDOC_PATH_EEXISTS;
case PROTOCOL_BINARY_RESPONSE_SUBDOC_MULTI_PATH_FAILURE:
return LCB_SUBDOC_MULTI_FAILURE;
case PROTOCOL_BINARY_RESPONSE_EINVAL:
return LCB_EINVAL_MCD;
case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
return LCB_NOT_STORED;
case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
return LCB_DELTA_BADVAL;
case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
return LCB_AUTH_ERROR;
case PROTOCOL_BINARY_RESPONSE_ERANGE:
return LCB_ERANGE;
case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
return LCB_UNKNOWN_COMMAND;
case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED:
return LCB_NOT_SUPPORTED;
default:
if (instance != NULL) {
return instance->callbacks.errmap(instance, in);
} else {
return lcb_errmap_default(NULL, in);
}
}
}
static lcb_RESPCALLBACK
find_callback(lcb_t instance, lcb_CALLBACKTYPE type)
{
lcb_RESPCALLBACK cb = instance->callbacks.v3callbacks[type];
if (!cb) {
cb = lcb_find_callback(instance, type);
}
return cb;
}
#define MK_ERROR(root, resp, response, imm) do { \
if (imm) { \
(resp)->rc = imm; \
(resp)->rflags |= LCB_RESP_F_CLIENTGEN; \
} else if (PACKET_STATUS(response) == PROTOCOL_BINARY_RESPONSE_SUCCESS) { \
(resp)->rc = LCB_SUCCESS; \
} else { \
(resp)->rc = map_error(root, PACKET_STATUS(response)); \
} \
} while (0);
static void
init_resp3(lcb_t instance, const packet_info *mc_resp, const mc_PACKET *req,
lcb_error_t immerr, lcb_RESPBASE *resp)
{
MK_ERROR(instance, resp, mc_resp, immerr);
resp->cas = PACKET_CAS(mc_resp);
resp->cookie = (void *)MCREQ_PKT_COOKIE(req);
mcreq_get_key(req, &resp->key, &resp->nkey);
}
static void
handle_mutation_token(lcb_t instance, const packet_info *mc_resp,
const mc_PACKET *req, lcb_MUTATION_TOKEN *stok)
{
const char *sbuf;
uint16_t vbid;
if (PACKET_EXTLEN(mc_resp) == 0) {
return;
}
if (!instance->dcpinfo && LCBT_SETTING(instance, dur_mutation_tokens)) {
size_t nvb = LCBT_VBCONFIG(instance)->nvb;
if (nvb) {
instance->dcpinfo = calloc(nvb, sizeof(*instance->dcpinfo));
}
}
sbuf = PACKET_BODY(mc_resp);
vbid = mcreq_get_vbucket(req);
stok->vbid_ = vbid;
memcpy(&stok->uuid_, sbuf, 8);
memcpy(&stok->seqno_, sbuf + 8, 8);
stok->uuid_ = lcb_ntohll(stok->uuid_);
stok->seqno_ = lcb_ntohll(stok->seqno_);
if (instance->dcpinfo) {
instance->dcpinfo[vbid] = *stok;
}
}
#define MK_RESPKEY3(resp, req) do { \
mcreq_get_key(req, &(resp)->key, &(resp)->nkey); \
} while (0);
static void
invoke_callback3(const mc_PACKET *pkt,
lcb_t instance,lcb_CALLBACKTYPE cbtype, lcb_RESPBASE * resp)
{
if (!(pkt->flags & MCREQ_F_INVOKED)) {
resp->cookie = (void *)MCREQ_PKT_COOKIE(pkt);
if ((pkt->flags & MCREQ_F_PRIVCALLBACK) == 0) {
find_callback(instance, cbtype)(instance, cbtype, resp);
} else {
(*(lcb_RESPCALLBACK*)resp->cookie)(instance, cbtype, resp);
}
}
}
#define INVOKE_CALLBACK3(req, res_, instance, type) { \
invoke_callback3(req, instance, type, (lcb_RESPBASE *)res_); \
}
static void
maybe_decompress(lcb_t o,
const packet_info *respkt, lcb_RESPGET *rescmd, void **freeptr)
{
lcb_U8 dtype = 0;
if (!PACKET_NVALUE(respkt)) {
return;
}
if (PACKET_DATATYPE(respkt) & PROTOCOL_BINARY_DATATYPE_JSON) {
dtype = LCB_VALUE_F_JSON;
}
if (PACKET_DATATYPE(respkt) & PROTOCOL_BINARY_DATATYPE_COMPRESSED) {
if (LCBT_SETTING(o, compressopts) & LCB_COMPRESS_IN) {
mcreq_inflate_value(
PACKET_VALUE(respkt), PACKET_NVALUE(respkt),
&rescmd->value, &rescmd->nvalue, freeptr);
} else {
dtype |= LCB_VALUE_F_SNAPPYCOMP;
}
}
rescmd->datatype = dtype;
}
static void
H_get(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_t o;
lcb_RESPGET resp = { 0 };
void *freeptr = NULL;
o = pipeline->parent->cqdata;
init_resp3(o, response, request, immerr, (lcb_RESPBASE *)&resp);
resp.rflags |= LCB_RESP_F_FINAL;
if (resp.rc == LCB_SUCCESS) {
const protocol_binary_response_getq *getq =
PACKET_EPHEMERAL_START(response);
resp.datatype = PACKET_DATATYPE(response);
resp.itmflags = ntohl(getq->message.body.flags);
resp.value = PACKET_VALUE(response);
resp.nvalue = PACKET_NVALUE(response);
resp.bufh = response->bufh;
}
maybe_decompress(o, response, &resp, &freeptr);
TRACE_GET_END(response, &resp);
INVOKE_CALLBACK3(request, &resp, o, LCB_CALLBACK_GET);
free(freeptr);
}
static void
H_getreplica(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_RESPGET resp = { 0 };
lcb_t instance = pipeline->parent->cqdata;
void *freeptr = NULL;
mc_REQDATAEX *rd = request->u_rdata.exdata;
init_resp3(instance, response, request, immerr, (lcb_RESPBASE *)&resp);
if (resp.rc == LCB_SUCCESS) {
const protocol_binary_response_get *get = PACKET_EPHEMERAL_START(response);
resp.itmflags = ntohl(get->message.body.flags);
resp.datatype = PACKET_DATATYPE(response);
resp.value = PACKET_VALUE(response);
resp.nvalue = PACKET_NVALUE(response);
resp.bufh = response->bufh;
}
maybe_decompress(instance, response, &resp, &freeptr);
rd->procs->handler(pipeline, request, resp.rc, &resp);
free(freeptr);
}
static void
H_subdoc(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_t o;
respack_SUBDOC w = { { 0 } };
int cbtype;
o = pipeline->parent->cqdata;
init_resp3(o, response, request, immerr, (lcb_RESPBASE *)&w.resp);
w.resp.rflags |= LCB_RESP_F_FINAL;
switch (PACKET_OPCODE(response)) {
case PROTOCOL_BINARY_CMD_SUBDOC_GET:
case PROTOCOL_BINARY_CMD_SUBDOC_EXISTS:
case PROTOCOL_BINARY_CMD_SUBDOC_MULTI_LOOKUP:
cbtype = LCB_CALLBACK_SDLOOKUP;
break;
default:
handle_mutation_token(o, response, request, &w.mt);
w.resp.rflags |= LCB_RESP_F_EXTDATA;
cbtype = LCB_CALLBACK_SDMUTATE;
break;
}
if (PACKET_OPCODE(response) == PROTOCOL_BINARY_CMD_SUBDOC_MULTI_LOOKUP ||
PACKET_OPCODE(response) == PROTOCOL_BINARY_CMD_SUBDOC_MULTI_MUTATION) {
if (w.resp.rc == LCB_SUCCESS || w.resp.rc == LCB_SUBDOC_MULTI_FAILURE) {
w.resp.responses = response;
}
} else {
w.resp.rflags |= LCB_RESP_F_SDSINGLE;
if (w.resp.rc == LCB_SUCCESS) {
w.resp.responses = response;
} else if (LCB_EIFSUBDOC(w.resp.rc)) {
w.resp.responses = response;
w.resp.rc = LCB_SUBDOC_MULTI_FAILURE;
}
}
INVOKE_CALLBACK3(request, &w.resp, o, cbtype);
}
static int
sdlookup_next(const packet_info *response, lcb_SDENTRY *ent, size_t *iter)
{
const char *buf;
lcb_U16 rc;
lcb_U32 vlen;
if (*iter == PACKET_NVALUE(response)) {
return 0;
}
buf = PACKET_VALUE(response);
buf += *iter;
memcpy(&rc, buf, 2);
memcpy(&vlen, buf + 2, 4);
rc = ntohs(rc);
vlen = ntohl(vlen);
ent->status = map_error(NULL, rc);
ent->nvalue = vlen;
if (ent->status == LCB_SUCCESS) {
ent->value = buf + 6;
} else {
ent->value = NULL;
ent->nvalue = 0;
}
*iter += (6 + vlen);
return 1;
}
static int
sdmutate_next(const packet_info *response, lcb_SDENTRY *ent, size_t *iter)
{
const char *buf, *buf_end;
lcb_U16 rc;
lcb_U32 vlen;
if (*iter == PACKET_NVALUE(response)) {
return 0;
}
buf_end = (const char *)PACKET_VALUE(response) + PACKET_NVALUE(response);
buf = ((const char *)(PACKET_VALUE(response))) + *iter;
#define ADVANCE_BUF(sz) \
buf += sz; \
*iter += sz; \
assert(buf <= buf_end); \
ent->index = *(lcb_U8*)buf;
ADVANCE_BUF(1);
memcpy(&rc, buf, 2);
ADVANCE_BUF(2);
rc = ntohs(rc);
ent->status = map_error(NULL, rc);
if (rc == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
memcpy(&vlen, buf, 4);
ADVANCE_BUF(4);
vlen = ntohl(vlen);
ent->nvalue = vlen;
ent->value = buf;
ADVANCE_BUF(vlen);
} else {
ent->value = NULL;
ent->nvalue = 0;
}
return 1;
#undef ADVANCE_BUF
}
LIBCOUCHBASE_API
int
lcb_sdresult_next(const lcb_RESPSUBDOC *resp, lcb_SDENTRY *ent, size_t *iter)
{
size_t iter_s = 0;
const packet_info *response = (const packet_info*)resp->responses;
if (!response) {
return 0;
}
if (!iter) {
iter = &iter_s;
}
switch (PACKET_OPCODE(response)) {
case PROTOCOL_BINARY_CMD_SUBDOC_MULTI_LOOKUP:
return sdlookup_next(response, ent, iter);
case PROTOCOL_BINARY_CMD_SUBDOC_MULTI_MUTATION:
return sdmutate_next(response, ent, iter);
default:
if (*iter) {
return 0;
}
*iter = 1;
if (resp->rc == LCB_SUCCESS || resp->rc == LCB_SUBDOC_MULTI_FAILURE) {
ent->status = map_error(NULL, PACKET_STATUS(response));
ent->value = PACKET_VALUE(response);
ent->nvalue = PACKET_NVALUE(response);
ent->index = 0;
return 1;
} else {
return 0;
}
}
}
static void
H_delete(mc_PIPELINE *pipeline, mc_PACKET *packet, packet_info *response,
lcb_error_t immerr)
{
lcb_t root = pipeline->parent->cqdata;
respack_REMOVE w = { { 0 } };
w.resp.rflags |= LCB_RESP_F_EXTDATA | LCB_RESP_F_FINAL;
init_resp3(root, response, packet, immerr, (lcb_RESPBASE *)&w.resp);
handle_mutation_token(root, response, packet, &w.mt);
TRACE_REMOVE_END(response, &w.resp);
INVOKE_CALLBACK3(packet, &w.resp, root, LCB_CALLBACK_REMOVE);
}
static void
H_observe(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_t root = pipeline->parent->cqdata;
uint32_t ttp;
uint32_t ttr;
lcb_size_t pos;
lcbvb_CONFIG* config;
const char *end, *ptr;
mc_REQDATAEX *rd = request->u_rdata.exdata;
lcb_RESPOBSERVE resp = { 0 };
MK_ERROR(root, &resp, response, immerr);
if (resp.rc != LCB_SUCCESS) {
if (! (request->flags & MCREQ_F_INVOKED)) {
rd->procs->handler(pipeline, request, resp.rc, NULL);
}
return;
}
ptr = (char *)&response->res.response.cas;
memcpy(&ttp, ptr, sizeof(ttp));
memcpy(&ttr, ptr + sizeof(ttp), sizeof(ttp));
ttp = ntohl(ttp);
ttr = ntohl(ttr);
ptr = response->payload;
end = (char *)ptr + PACKET_NBODY(response);
config = pipeline->parent->config;
for (pos = 0; ptr < end; pos++) {
lcb_cas_t cas;
lcb_uint8_t obs;
lcb_uint16_t nkey, vb;
const char *key;
memcpy(&vb, ptr, sizeof(vb));
vb = ntohs(vb);
ptr += sizeof(vb);
memcpy(&nkey, ptr, sizeof(nkey));
nkey = ntohs(nkey);
ptr += sizeof(nkey);
key = (const char *)ptr;
ptr += nkey;
obs = *((lcb_uint8_t *)ptr);
ptr += sizeof(obs);
memcpy(&cas, ptr, sizeof(cas));
ptr += sizeof(cas);
resp.key = key;
resp.nkey = nkey;
resp.cas = lcb_ntohll(cas);
resp.status = obs;
resp.ismaster = pipeline->index == lcbvb_vbmaster(config, vb);
resp.ttp = 0;
resp.ttr = 0;
TRACE_OBSERVE_PROGRESS(response, &resp);
if (! (request->flags & MCREQ_F_INVOKED)) {
rd->procs->handler(pipeline, request, resp.rc, &resp);
}
}
TRACE_OBSERVE_END(response);
}
static void
H_observe_seqno(mc_PIPELINE *pipeline, mc_PACKET *request,
packet_info *response, lcb_error_t immerr)
{
lcb_t root = pipeline->parent->cqdata;
lcb_RESPOBSEQNO resp = { 0 };
init_resp3(root, response, request, immerr, (lcb_RESPBASE*)&resp);
resp.server_index = pipeline->index;
if (resp.rc == LCB_SUCCESS) {
const lcb_U8 *data = PACKET_BODY(response);
int is_failover = *data != 0;
data++;
#define COPY_ADV(dstfld, n, conv_fn) \
memcpy(&resp.dstfld, data, n); \
data += n; \
resp.dstfld = conv_fn(resp.dstfld);
COPY_ADV(vbid, 2, ntohs);
COPY_ADV(cur_uuid, 8, lcb_ntohll);
COPY_ADV(persisted_seqno, 8, lcb_ntohll);
COPY_ADV(mem_seqno, 8, lcb_ntohll);
if (is_failover) {
COPY_ADV(old_uuid, 8, lcb_ntohll);
COPY_ADV(old_seqno, 8, lcb_ntohll);
}
#undef COPY_ADV
}
INVOKE_CALLBACK3(request, &resp, root, LCB_CALLBACK_OBSEQNO);
}
static void
H_store(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_t root = pipeline->parent->cqdata;
respack_STORE w = { { 0 } };
lcb_U8 opcode;
init_resp3(root, response, request, immerr, (lcb_RESPBASE*)&w.resp);
if (!immerr) {
opcode = PACKET_OPCODE(response);
} else {
protocol_binary_request_header hdr;
mcreq_read_hdr(request, &hdr);
opcode = hdr.request.opcode;
}
if (opcode == PROTOCOL_BINARY_CMD_ADD) {
w.resp.op = LCB_ADD;
} else if (opcode == PROTOCOL_BINARY_CMD_REPLACE) {
w.resp.op = LCB_REPLACE;
} else if (opcode == PROTOCOL_BINARY_CMD_APPEND) {
w.resp.op = LCB_APPEND;
} else if (opcode == PROTOCOL_BINARY_CMD_PREPEND) {
w.resp.op = LCB_PREPEND;
} else if (opcode == PROTOCOL_BINARY_CMD_SET) {
w.resp.op = LCB_SET;
}
w.resp.rflags |= LCB_RESP_F_EXTDATA | LCB_RESP_F_FINAL;
handle_mutation_token(root, response, request, &w.mt);
TRACE_STORE_END(response, &w.resp);
if (request->flags & MCREQ_F_REQEXT) {
request->u_rdata.exdata->procs->handler(pipeline, request, immerr, &w.resp);
} else {
INVOKE_CALLBACK3(request, &w.resp, root, LCB_CALLBACK_STORE);
}
}
static void
H_arithmetic(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_t root = pipeline->parent->cqdata;
respack_COUNTER w = { { 0 } };
init_resp3(root, response, request, immerr, (lcb_RESPBASE*)&w.resp);
if (w.resp.rc == LCB_SUCCESS) {
memcpy(&w.resp.value, PACKET_VALUE(response), sizeof(w.resp.value));
w.resp.value = lcb_ntohll(w.resp.value);
w.resp.rflags |= LCB_RESP_F_EXTDATA;
handle_mutation_token(root, response, request, &w.mt);
}
w.resp.rflags |= LCB_RESP_F_FINAL;
w.resp.cas = PACKET_CAS(response);
TRACE_ARITHMETIC_END(response, &w.resp);
INVOKE_CALLBACK3(request, &w.resp, root, LCB_CALLBACK_COUNTER);
}
static void
H_stats(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_t root = pipeline->parent->cqdata;
lcb_RESPSTATS resp = { 0 };
mc_REQDATAEX *exdata;
MK_ERROR(root, &resp, response, immerr);
resp.version = 0;
exdata = request->u_rdata.exdata;
if (resp.rc != LCB_SUCCESS || PACKET_NKEY(response) == 0) {
exdata->procs->handler(pipeline, request, resp.rc, NULL);
return;
}
if ((resp.nkey = PACKET_NKEY(response))) {
resp.key = PACKET_KEY(response);
if ((resp.value = PACKET_VALUE(response))) {
resp.nvalue = PACKET_NVALUE(response);
}
}
exdata->procs->handler(pipeline, request, resp.rc, &resp);
}
static void
H_verbosity(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_t root = pipeline->parent->cqdata;
lcb_RESPBASE dummy = { 0 };
mc_REQDATAEX *exdata = request->u_rdata.exdata;
MK_ERROR(root, &dummy, response, immerr);
exdata->procs->handler(pipeline, request, dummy.rc, NULL);
}
static void
H_version(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_t root = pipeline->parent->cqdata;
lcb_RESPMCVERSION resp = { 0 };
mc_REQDATAEX *exdata = request->u_rdata.exdata;
MK_ERROR(root, &resp, response, immerr);
if (PACKET_NBODY(response)) {
resp.mcversion = response->payload;
resp.nversion = PACKET_NBODY(response);
}
exdata->procs->handler(pipeline, request, resp.rc, &resp);
}
static void
H_touch(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_t root = pipeline->parent->cqdata;
lcb_RESPTOUCH resp = { 0 };
init_resp3(root, response, request, immerr, &resp);
resp.rflags |= LCB_RESP_F_FINAL;
TRACE_TOUCH_END(response, &resp);
INVOKE_CALLBACK3(request, &resp, root, LCB_CALLBACK_TOUCH);
}
static void
H_flush(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_t root = pipeline->parent->cqdata;
lcb_RESPFLUSH resp = { 0 };
mc_REQDATAEX *exdata = request->u_rdata.exdata;
MK_ERROR(root, &resp, response, immerr);
exdata->procs->handler(pipeline, request, resp.rc, &resp);
}
static void
H_unlock(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_t root = pipeline->parent->cqdata;
lcb_RESPUNLOCK resp = { 0 };
init_resp3(root, response, request, immerr, &resp);
resp.rflags |= LCB_RESP_F_FINAL;
TRACE_UNLOCK_END(response, &resp);
INVOKE_CALLBACK3(request, &resp, root, LCB_CALLBACK_UNLOCK);
}
static void
H_config(mc_PIPELINE *pipeline, mc_PACKET *request, packet_info *response,
lcb_error_t immerr)
{
lcb_RESPBASE dummy;
mc_REQDATAEX *exdata = request->u_rdata.exdata;
MK_ERROR(pipeline->parent->cqdata, &dummy, response, immerr);
exdata->procs->handler(pipeline, request, dummy.rc, response);
}
static void
record_metrics(mc_PIPELINE *pipeline, mc_PACKET *req, packet_info *res)
{
lcb_t instance = pipeline->parent->cqdata;
if (instance->kv_timings) {
lcb_histogram_record(instance->kv_timings,
gethrtime() - MCREQ_PKT_RDATA(req)->start);
}
(void)res;
}
static void
dispatch_ufwd_error(mc_PIPELINE *pipeline, mc_PACKET *req, lcb_error_t immerr)
{
lcb_PKTFWDRESP resp = { 0 };
lcb_t instance = ((mc_SERVER*)pipeline)->instance;
assert(immerr != LCB_SUCCESS);
resp.version = 0;
instance->callbacks.pktfwd(instance, MCREQ_PKT_COOKIE(req), immerr, &resp);
}
int
mcreq_dispatch_response(
mc_PIPELINE *pipeline, mc_PACKET *req, packet_info *res,
lcb_error_t immerr)
{
record_metrics(pipeline, req, res);
if (req->flags & MCREQ_F_UFWD) {
dispatch_ufwd_error(pipeline, req, immerr);
return 0;
}
#define INVOKE_OP(handler) \
handler(pipeline, req, res, immerr); \
return 0; \
break;
switch (PACKET_OPCODE(res)) {
case PROTOCOL_BINARY_CMD_GETQ:
case PROTOCOL_BINARY_CMD_GATQ:
case PROTOCOL_BINARY_CMD_GET:
case PROTOCOL_BINARY_CMD_GAT:
case PROTOCOL_BINARY_CMD_GET_LOCKED:
INVOKE_OP(H_get);
case PROTOCOL_BINARY_CMD_ADD:
case PROTOCOL_BINARY_CMD_REPLACE:
case PROTOCOL_BINARY_CMD_SET:
case PROTOCOL_BINARY_CMD_APPEND:
case PROTOCOL_BINARY_CMD_PREPEND:
INVOKE_OP(H_store);
case PROTOCOL_BINARY_CMD_INCREMENT:
case PROTOCOL_BINARY_CMD_DECREMENT:
INVOKE_OP(H_arithmetic);
case PROTOCOL_BINARY_CMD_SUBDOC_GET:
case PROTOCOL_BINARY_CMD_SUBDOC_EXISTS:
case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_ADD_UNIQUE:
case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_FIRST:
case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_LAST:
case PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_INSERT:
case PROTOCOL_BINARY_CMD_SUBDOC_DICT_ADD:
case PROTOCOL_BINARY_CMD_SUBDOC_DICT_UPSERT:
case PROTOCOL_BINARY_CMD_SUBDOC_REPLACE:
case PROTOCOL_BINARY_CMD_SUBDOC_DELETE:
case PROTOCOL_BINARY_CMD_SUBDOC_COUNTER:
case PROTOCOL_BINARY_CMD_SUBDOC_GET_COUNT:
case PROTOCOL_BINARY_CMD_SUBDOC_MULTI_LOOKUP:
case PROTOCOL_BINARY_CMD_SUBDOC_MULTI_MUTATION:
INVOKE_OP(H_subdoc);
case PROTOCOL_BINARY_CMD_OBSERVE:
INVOKE_OP(H_observe);
case PROTOCOL_BINARY_CMD_GET_REPLICA:
INVOKE_OP(H_getreplica);
case PROTOCOL_BINARY_CMD_UNLOCK_KEY:
INVOKE_OP(H_unlock);
case PROTOCOL_BINARY_CMD_DELETE:
INVOKE_OP(H_delete);
case PROTOCOL_BINARY_CMD_TOUCH:
INVOKE_OP(H_touch);
case PROTOCOL_BINARY_CMD_OBSERVE_SEQNO:
INVOKE_OP(H_observe_seqno);
case PROTOCOL_BINARY_CMD_STAT:
INVOKE_OP(H_stats);
case PROTOCOL_BINARY_CMD_FLUSH:
INVOKE_OP(H_flush);
case PROTOCOL_BINARY_CMD_VERSION:
INVOKE_OP(H_version);
case PROTOCOL_BINARY_CMD_VERBOSITY:
INVOKE_OP(H_verbosity);
#if 0#endif
case PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG:
INVOKE_OP(H_config);
default:
fprintf(stderr, "COUCHBASE: Received unknown opcode=0x%x\n", PACKET_OPCODE(res));
return -1;
}
}
const lcb_MUTATION_TOKEN *
lcb_resp_get_mutation_token(int cbtype, const lcb_RESPBASE *rb)
{
const lcb_MUTATION_TOKEN *ss = NULL;
if ((rb->rflags & LCB_RESP_F_EXTDATA) == 0) {
return NULL;
}
switch (cbtype) {
case LCB_CALLBACK_STORE:
ss = &((const respack_STORE*)rb)->mt;
break;
case LCB_CALLBACK_COUNTER:
ss = &((const respack_COUNTER*)rb)->mt;
break;
case LCB_CALLBACK_REMOVE:
ss = &((const respack_REMOVE*)rb)->mt;
break;
case LCB_CALLBACK_SDMUTATE:
ss = &((const respack_SUBDOC*)rb)->mt;
break;
default:
return NULL;
}
if (ss->uuid_ == 0 && ss->seqno_ == 0) {
return NULL;
}
return ss;
}