#include "internal.h"
#include "trace.h"
LIBCOUCHBASE_API
lcb_error_t
lcb_get3(lcb_t instance, const void *cookie, const lcb_CMDGET *cmd)
{
mc_PIPELINE *pl;
mc_PACKET *pkt;
mc_REQDATA *rdata;
mc_CMDQUEUE *q = &instance->cmdq;
lcb_error_t err;
lcb_uint8_t extlen = 0;
lcb_uint8_t opcode = PROTOCOL_BINARY_CMD_GET;
protocol_binary_request_gat gcmd;
protocol_binary_request_header *hdr = &gcmd.message.header;
if (LCB_KEYBUF_IS_EMPTY(&cmd->key)) {
return LCB_EMPTY_KEY;
}
if (cmd->cas) {
return LCB_OPTIONS_CONFLICT;
}
if (cmd->lock) {
extlen = 4;
opcode = PROTOCOL_BINARY_CMD_GET_LOCKED;
} else if (cmd->exptime || (cmd->cmdflags & LCB_CMDGET_F_CLEAREXP)) {
extlen = 4;
opcode = PROTOCOL_BINARY_CMD_GAT;
}
err = mcreq_basic_packet(q, (const lcb_CMDBASE *)cmd, hdr, extlen, &pkt, &pl,
MCREQ_BASICPACKET_F_FALLBACKOK);
if (err != LCB_SUCCESS) {
return err;
}
rdata = &pkt->u_rdata.reqdata;
rdata->cookie = cookie;
rdata->start = gethrtime();
hdr->request.magic = PROTOCOL_BINARY_REQ;
hdr->request.opcode = opcode;
hdr->request.datatype = PROTOCOL_BINARY_RAW_BYTES;
hdr->request.bodylen = htonl(extlen + ntohs(hdr->request.keylen));
hdr->request.opaque = pkt->opaque;
hdr->request.cas = 0;
if (extlen) {
gcmd.message.body.expiration = htonl(cmd->exptime);
}
if (cmd->cmdflags & LCB_CMD_F_INTERNAL_CALLBACK) {
pkt->flags |= MCREQ_F_PRIVCALLBACK;
}
memcpy(SPAN_BUFFER(&pkt->kh_span), gcmd.bytes, MCREQ_PKT_BASESIZE + extlen);
LCB_SCHED_ADD(instance, pl, pkt);
TRACE_GET_BEGIN(hdr, cmd);
return LCB_SUCCESS;
}
LIBCOUCHBASE_API
lcb_error_t lcb_get(lcb_t instance,
const void *command_cookie,
lcb_size_t num,
const lcb_get_cmd_t *const *items)
{
unsigned ii;
lcb_sched_enter(instance);
for (ii = 0; ii < num; ii++) {
const lcb_get_cmd_t *src = items[ii];
lcb_CMDGET dst;
lcb_error_t err;
memset(&dst, 0, sizeof(dst));
dst.key.contig.bytes = src->v.v0.key;
dst.key.contig.nbytes = src->v.v0.nkey;
dst._hashkey.contig.bytes = src->v.v0.hashkey;
dst._hashkey.contig.nbytes = src->v.v0.nhashkey;
dst.lock = src->v.v0.lock;
dst.exptime = src->v.v0.exptime;
err = lcb_get3(instance, command_cookie, &dst);
if (err != LCB_SUCCESS) {
lcb_sched_fail(instance);
return err;
}
}
lcb_sched_leave(instance);
SYNCMODE_INTERCEPT(instance)
}
LIBCOUCHBASE_API
lcb_error_t
lcb_unlock3(lcb_t instance, const void *cookie, const lcb_CMDUNLOCK *cmd)
{
mc_CMDQUEUE *cq = &instance->cmdq;
mc_PIPELINE *pl;
mc_PACKET *pkt;
mc_REQDATA *rd;
lcb_error_t err;
protocol_binary_request_header hdr;
if (LCB_KEYBUF_IS_EMPTY(&cmd->key)) {
return LCB_EMPTY_KEY;
}
err = mcreq_basic_packet(cq, cmd, &hdr, 0, &pkt, &pl,
MCREQ_BASICPACKET_F_FALLBACKOK);
if (err != LCB_SUCCESS) {
return err;
}
rd = &pkt->u_rdata.reqdata;
rd->cookie = cookie;
rd->start = gethrtime();
hdr.request.magic = PROTOCOL_BINARY_REQ;
hdr.request.opcode = PROTOCOL_BINARY_CMD_UNLOCK_KEY;
hdr.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
hdr.request.bodylen = htonl((lcb_uint32_t)ntohs(hdr.request.keylen));
hdr.request.opaque = pkt->opaque;
hdr.request.cas = lcb_htonll(cmd->cas);
memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof(hdr.bytes));
TRACE_UNLOCK_BEGIN(&hdr, cmd);
LCB_SCHED_ADD(instance, pl, pkt);
return LCB_SUCCESS;
}
LIBCOUCHBASE_API
lcb_error_t
lcb_unlock(lcb_t instance, const void *cookie, lcb_size_t num,
const lcb_unlock_cmd_t * const * items)
{
unsigned ii;
lcb_error_t err = LCB_SUCCESS;
lcb_sched_enter(instance);
for (ii = 0; ii < num; ii++) {
const lcb_unlock_cmd_t *src = items[ii];
lcb_CMDUNLOCK dst;
memset(&dst, 0, sizeof(dst));
dst.key.contig.bytes = src->v.v0.key;
dst.key.contig.nbytes = src->v.v0.nkey;
dst._hashkey.contig.bytes = src->v.v0.hashkey;
dst._hashkey.contig.nbytes = src->v.v0.nhashkey;
dst.cas = src->v.v0.cas;
err = lcb_unlock3(instance, cookie, &dst);
if (err != LCB_SUCCESS) {
break;
}
}
if (err != LCB_SUCCESS) {
lcb_sched_fail(instance);
return err;
} else {
lcb_sched_leave(instance);
SYNCMODE_INTERCEPT(instance)
}
}
typedef struct {
mc_REQDATAEX base;
unsigned r_cur;
unsigned r_max;
int remaining;
int vbucket;
lcb_replica_t strategy;
lcb_t instance;
} rget_cookie;
static void rget_dtor(mc_PACKET *pkt) {
rget_cookie *rck = (rget_cookie *)pkt->u_rdata.exdata;
if (! --rck->remaining) {
free(rck);
}
}
static void
rget_callback(mc_PIPELINE *pl, mc_PACKET *pkt, lcb_error_t err, const void *arg)
{
rget_cookie *rck = (rget_cookie *)pkt->u_rdata.exdata;
lcb_RESPGET *resp = (void *)arg;
lcb_RESPCALLBACK callback;
lcb_t instance = rck->instance;
callback = lcb_find_callback(instance, LCB_CALLBACK_GETREPLICA);
if (rck->strategy == LCB_REPLICA_SELECT || rck->strategy == LCB_REPLICA_ALL) {
if (rck->strategy == LCB_REPLICA_SELECT || rck->remaining == 1) {
resp->rflags |= LCB_RESP_F_FINAL;
}
callback(instance, LCB_CALLBACK_GETREPLICA, (const lcb_RESPBASE *)resp);
} else {
mc_CMDQUEUE *cq = &instance->cmdq;
mc_PIPELINE *nextpl = NULL;
do {
int nextix;
rck->r_cur++;
nextix = lcbvb_vbreplica(cq->config, rck->vbucket, rck->r_cur);
if (nextix > -1 && nextix < (int)cq->npipelines) {
nextpl = cq->pipelines[nextix];
break;
}
} while (rck->r_cur < rck->r_max);
if (err == LCB_SUCCESS || rck->r_cur == rck->r_max || nextpl == NULL) {
resp->rflags |= LCB_RESP_F_FINAL;
callback(instance, LCB_CALLBACK_GETREPLICA, (lcb_RESPBASE *)resp);
rck->remaining = 1;
} else if (err != LCB_SUCCESS) {
mc_PACKET *newpkt = mcreq_renew_packet(pkt);
newpkt->flags &= ~MCREQ_STATE_FLAGS;
mcreq_sched_add(nextpl, newpkt);
mcreq_sched_leave(cq, 1);
rck->remaining = 2;
}
}
if (!--rck->remaining) {
free(rck);
}
(void)pl;
}
static mc_REQDATAPROCS rget_procs = {
rget_callback,
rget_dtor
};
LIBCOUCHBASE_API
lcb_error_t
lcb_rget3(lcb_t instance, const void *cookie, const lcb_CMDGETREPLICA *cmd)
{
mc_CMDQUEUE *cq = &instance->cmdq;
int vbid, ixtmp;
protocol_binary_request_header req;
unsigned r0, r1 = 0;
rget_cookie *rck = NULL;
if (LCB_KEYBUF_IS_EMPTY(&cmd->key)) {
return LCB_EMPTY_KEY;
}
if (!cq->config) {
return LCB_CLIENT_ETMPFAIL;
}
if (!LCBT_NREPLICAS(instance)) {
return LCB_NO_MATCHING_SERVER;
}
mcreq_map_key(cq, &cmd->key, &cmd->_hashkey, MCREQ_PKT_BASESIZE,
&vbid, &ixtmp);
if (cmd->strategy == LCB_REPLICA_SELECT) {
r0 = r1 = cmd->index;
if ((ixtmp = lcbvb_vbreplica(cq->config, vbid, r0)) < 0) {
return LCB_NO_MATCHING_SERVER;
}
} else if (cmd->strategy == LCB_REPLICA_ALL) {
unsigned ii;
r0 = 0;
r1 = LCBT_NREPLICAS(instance);
for (ii = 0; ii < LCBT_NREPLICAS(instance); ii++) {
if ((ixtmp = lcbvb_vbreplica(cq->config, vbid, ii)) < 0) {
return LCB_NO_MATCHING_SERVER;
}
}
} else {
for (r0 = 0; r0 < LCBT_NREPLICAS(instance); r0++) {
if ((ixtmp = lcbvb_vbreplica(cq->config, vbid, r0)) > -1) {
r1 = r0;
break;
}
}
if (r0 == LCBT_NREPLICAS(instance)) {
return LCB_NO_MATCHING_SERVER;
}
}
if (r1 < r0 || r1 >= cq->npipelines) {
return LCB_NO_MATCHING_SERVER;
}
rck = calloc(1, sizeof(*rck));
rck->base.cookie = cookie;
rck->base.start = gethrtime();
rck->base.procs = &rget_procs;
rck->strategy = cmd->strategy;
rck->r_cur = r0;
rck->r_max = LCBT_NREPLICAS(instance);
rck->instance = instance;
rck->vbucket = vbid;
req.request.magic = PROTOCOL_BINARY_REQ;
req.request.opcode = PROTOCOL_BINARY_CMD_GET_REPLICA;
req.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
req.request.vbucket = htons((lcb_uint16_t)vbid);
req.request.cas = 0;
req.request.extlen = 0;
req.request.keylen = htons((lcb_uint16_t)cmd->key.contig.nbytes);
req.request.bodylen = htonl((lcb_uint32_t)cmd->key.contig.nbytes);
do {
int curix;
mc_PIPELINE *pl;
mc_PACKET *pkt;
curix = lcbvb_vbreplica(cq->config, vbid, r0);
pl = cq->pipelines[curix];
pkt = mcreq_allocate_packet(pl);
if (!pkt) {
return LCB_CLIENT_ENOMEM;
}
pkt->u_rdata.exdata = &rck->base;
pkt->flags |= MCREQ_F_REQEXT;
mcreq_reserve_key(pl, pkt, sizeof(req.bytes), &cmd->key);
req.request.opaque = pkt->opaque;
rck->remaining++;
mcreq_write_hdr(pkt, &req);
mcreq_sched_add(pl, pkt);
} while (++r0 < r1);
MAYBE_SCHEDLEAVE(instance);
return LCB_SUCCESS;
}
LIBCOUCHBASE_API
lcb_error_t
lcb_get_replica(lcb_t instance, const void *cookie, lcb_size_t num,
const lcb_get_replica_cmd_t * const * items)
{
unsigned ii;
lcb_error_t err = LCB_SUCCESS;
lcb_sched_enter(instance);
for (ii = 0; ii < num; ii++) {
const lcb_get_replica_cmd_t *src = items[ii];
lcb_CMDGETREPLICA dst;
memset(&dst, 0, sizeof(dst));
dst.key.contig.bytes = src->v.v1.key;
dst.key.contig.nbytes = src->v.v1.nkey;
dst._hashkey.contig.bytes = src->v.v1.hashkey;
dst._hashkey.contig.nbytes = src->v.v1.nhashkey;
dst.strategy = src->v.v1.strategy;
dst.index = src->v.v1.index;
err = lcb_rget3(instance, cookie, &dst);
if (err != LCB_SUCCESS) {
break;
}
}
if (err == LCB_SUCCESS) {
lcb_sched_leave(instance);
SYNCMODE_INTERCEPT(instance)
} else {
lcb_sched_fail(instance);
return err;
}
}