#include "internal.h"
#include <vector>
#include <string>
#include <include/libcouchbase/subdoc.h>
static lcb_size_t
get_value_size(mc_PACKET *packet)
{
if (packet->flags & MCREQ_F_HASVALUE) {
if (packet->flags & MCREQ_F_VALUE_IOV) {
return packet->u_value.multi.total_length;
} else {
return packet->u_value.single.size;
}
} else {
return 0;
}
}
namespace SubdocCmdTraits {
enum Options {
EMPTY_PATH = 1<<0,
ALLOW_EXPIRY = 1<<1,
HAS_VALUE = 1<<2,
ALLOW_MKDIRP = 1<<3,
IS_LOOKUP = 1<<4
};
struct Traits {
const unsigned allow_empty_path;
const unsigned allow_expiry;
const unsigned has_value;
const unsigned allow_mkdir_p;
const unsigned is_lookup;
const uint8_t opcode;
inline bool valid() const {
return opcode != 0;
}
inline unsigned mode() const {
return is_lookup ? LCB_SDMULTI_MODE_LOOKUP : LCB_SDMULTI_MODE_MUTATE;
}
inline Traits(uint8_t op, unsigned options) :
allow_empty_path(options & EMPTY_PATH),
allow_expiry(options & ALLOW_EXPIRY),
has_value(options & HAS_VALUE),
allow_mkdir_p(options & ALLOW_MKDIRP),
is_lookup(options & IS_LOOKUP),
opcode(op) {}
};
static const Traits
Get(PROTOCOL_BINARY_CMD_SUBDOC_GET, IS_LOOKUP);
static const Traits
Exists(PROTOCOL_BINARY_CMD_SUBDOC_EXISTS, IS_LOOKUP);
static const Traits
GetCount(PROTOCOL_BINARY_CMD_SUBDOC_GET_COUNT, IS_LOOKUP|EMPTY_PATH);
static const Traits
DictAdd(PROTOCOL_BINARY_CMD_SUBDOC_DICT_ADD, ALLOW_EXPIRY|HAS_VALUE);
static const Traits
DictUpsert(PROTOCOL_BINARY_CMD_SUBDOC_DICT_UPSERT,
ALLOW_EXPIRY|HAS_VALUE|ALLOW_MKDIRP);
static const Traits
Remove(PROTOCOL_BINARY_CMD_SUBDOC_DELETE, ALLOW_EXPIRY);
static const Traits
ArrayInsert(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_INSERT,
ALLOW_EXPIRY|HAS_VALUE);
static const Traits
Replace(PROTOCOL_BINARY_CMD_SUBDOC_REPLACE, ALLOW_EXPIRY|HAS_VALUE);
static const Traits
ArrayAddFirst(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_FIRST,
ALLOW_EXPIRY|HAS_VALUE|EMPTY_PATH|ALLOW_MKDIRP);
static const Traits
ArrayAddLast(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_PUSH_LAST,
ALLOW_EXPIRY|HAS_VALUE|EMPTY_PATH|ALLOW_MKDIRP);
static const Traits
ArrayAddUnique(PROTOCOL_BINARY_CMD_SUBDOC_ARRAY_ADD_UNIQUE,
ALLOW_EXPIRY|HAS_VALUE|EMPTY_PATH|ALLOW_MKDIRP);
static const Traits
Counter(PROTOCOL_BINARY_CMD_SUBDOC_COUNTER, ALLOW_EXPIRY|HAS_VALUE|ALLOW_MKDIRP);
static const Traits
Invalid(0, 0);
const Traits&
find(unsigned mode)
{
switch (mode) {
case LCB_SDCMD_REPLACE:
return Replace;
case LCB_SDCMD_DICT_ADD:
return DictAdd;
case LCB_SDCMD_DICT_UPSERT:
return DictUpsert;
case LCB_SDCMD_ARRAY_ADD_FIRST:
return ArrayAddFirst;
case LCB_SDCMD_ARRAY_ADD_LAST:
return ArrayAddLast;
case LCB_SDCMD_ARRAY_ADD_UNIQUE:
return ArrayAddUnique;
case LCB_SDCMD_ARRAY_INSERT:
return ArrayInsert;
case LCB_SDCMD_GET:
return Get;
case LCB_SDCMD_EXISTS:
return Exists;
case LCB_SDCMD_GET_COUNT:
return GetCount;
case LCB_SDCMD_REMOVE:
return Remove;
case LCB_SDCMD_COUNTER:
return Counter;
default:
return Invalid;
}
}
}
static size_t
get_valbuf_size(const lcb_VALBUF& vb)
{
if (vb.vtype == LCB_KV_COPY || vb.vtype == LCB_KV_CONTIG) {
return vb.u_buf.contig.nbytes;
} else {
if (vb.u_buf.multi.total_length) {
return vb.u_buf.multi.total_length;
} else {
size_t tmp = 0;
for (size_t ii = 0; ii < vb.u_buf.multi.niov; ++ii) {
tmp += vb.u_buf.multi.iov[ii].iov_len;
}
return tmp;
}
}
}
struct MultiBuilder {
MultiBuilder(const lcb_CMDSUBDOC *cmd_)
: cmd(cmd_), payload_size(0), mode(0) {
size_t ebufsz = is_lookup() ? cmd->nspecs * 4 : cmd->nspecs * 8;
extra_body = new char[ebufsz];
bodysz = 0;
}
~MultiBuilder() {
if (extra_body != NULL) {
delete[] extra_body;
}
}
inline MultiBuilder(const MultiBuilder&);
const lcb_CMDSUBDOC *cmd;
std::vector<lcb_IOV> iovs;
char *extra_body;
size_t bodysz;
size_t payload_size;
unsigned mode;
bool is_lookup() const {
return mode == LCB_SDMULTI_MODE_LOOKUP;
}
bool is_mutate() const {
return mode == LCB_SDMULTI_MODE_MUTATE;
}
void maybe_setmode(const SubdocCmdTraits::Traits& t) {
if (mode == 0) {
mode = t.mode();
}
}
template <typename T> void add_field(T itm, size_t len) {
const char *b = reinterpret_cast<const char *>(&itm);
memcpy(extra_body + bodysz, b, len);
bodysz += len;
}
const char *extra_mark() const {
return extra_body + bodysz;
}
void add_extras_iov(const char *last_begin) {
const char *p_end = extra_mark();
add_iov(last_begin, p_end - last_begin);
}
void add_iov(const void *b, size_t n) {
if (!n) {
return;
}
lcb_IOV iov;
iov.iov_base = const_cast<void*>(b);
iov.iov_len = n;
iovs.push_back(iov);
payload_size += n;
}
void add_iov(const lcb_VALBUF& vb) {
if (vb.vtype == LCB_KV_CONTIG || vb.vtype == LCB_KV_COPY) {
add_iov(vb.u_buf.contig.bytes, vb.u_buf.contig.nbytes);
} else {
for (size_t ii = 0; ii < vb.u_buf.contig.nbytes; ++ii) {
const lcb_IOV& iov = vb.u_buf.multi.iov[ii];
if (!iov.iov_len) {
continue; }
payload_size += iov.iov_len;
iovs.push_back(iov);
}
}
}
inline lcb_error_t add_spec(const lcb_SDSPEC *);
};
lcb_error_t
MultiBuilder::add_spec(const lcb_SDSPEC *spec)
{
const SubdocCmdTraits::Traits& trait = SubdocCmdTraits::find(spec->sdcmd);
if (!trait.valid()) {
return LCB_UNKNOWN_SDCMD;
}
maybe_setmode(trait);
if (trait.mode() != mode) {
return LCB_OPTIONS_CONFLICT;
}
const char *p_begin = extra_mark();
add_field(trait.opcode, 1);
uint8_t sdflags = 0;
if (spec->options & LCB_SDSPEC_F_MKINTERMEDIATES) {
sdflags = SUBDOC_FLAG_MKDIR_P;
}
if (spec->options & LCB_SDSPEC_F_MKDOCUMENT) {
sdflags |= SUBDOC_FLAG_MKDOC;
}
add_field(sdflags, 1);
uint16_t npath = static_cast<uint16_t>(spec->path.contig.nbytes);
if (!npath && !trait.allow_empty_path) {
return LCB_EMPTY_PATH;
}
add_field(static_cast<uint16_t>(htons(npath)), 2);
uint32_t vsize = 0;
if (is_mutate()) {
vsize = get_valbuf_size(spec->value);
add_field(static_cast<uint32_t>(htonl(vsize)), 4);
}
add_extras_iov(p_begin);
add_iov(spec->path.contig.bytes, spec->path.contig.nbytes);
if (vsize) {
add_iov(spec->value);
}
return LCB_SUCCESS;
}
static lcb_error_t
sd3_single(lcb_t instance, const void *cookie, const lcb_CMDSUBDOC *cmd)
{
const lcb_SDSPEC *spec = cmd->specs;
const SubdocCmdTraits::Traits& traits = SubdocCmdTraits::find(spec->sdcmd);
lcb_error_t rc;
if (cmd->error_index) {
*cmd->error_index = 0;
}
if (!traits.valid()) {
return LCB_UNKNOWN_SDCMD;
}
if (cmd->multimode != 0 && cmd->multimode != traits.mode()) {
return LCB_OPTIONS_CONFLICT;
}
if (LCB_KEYBUF_IS_EMPTY(&cmd->key)) {
return LCB_EMPTY_KEY;
}
if (LCB_KEYBUF_IS_EMPTY(&spec->path) && !traits.allow_empty_path) {
return LCB_EMPTY_PATH;
}
lcb_VALBUF valbuf;
const lcb_VALBUF *valbuf_p = &valbuf;
lcb_IOV tmpiov[2];
lcb_FRAGBUF *fbuf = &valbuf.u_buf.multi;
valbuf.vtype = LCB_KV_IOVCOPY;
fbuf->iov = tmpiov;
fbuf->niov = 1;
fbuf->total_length = 0;
tmpiov[0].iov_base = const_cast<void*>(spec->path.contig.bytes);
tmpiov[0].iov_len = spec->path.contig.nbytes;
if (traits.has_value) {
if (spec->value.vtype == LCB_KV_COPY) {
fbuf->niov = 2;
tmpiov[1].iov_base = (void *)spec->value.u_buf.contig.bytes;
tmpiov[1].iov_len = spec->value.u_buf.contig.nbytes;
} else {
valbuf_p = &spec->value;
}
}
uint8_t extlen = 3;
uint32_t exptime = 0;
if (cmd->exptime) {
if (!traits.allow_expiry) {
return LCB_OPTIONS_CONFLICT;
}
exptime = cmd->exptime;
extlen = 7;
}
protocol_binary_request_subdocument request;
protocol_binary_request_header *hdr = &request.message.header;
mc_PACKET *packet;
mc_PIPELINE *pipeline;
rc = mcreq_basic_packet(&instance->cmdq,
(const lcb_CMDBASE*)cmd,
hdr, extlen, &packet, &pipeline, MCREQ_BASICPACKET_F_FALLBACKOK);
if (rc != LCB_SUCCESS) {
return rc;
}
rc = mcreq_reserve_value(pipeline, packet, valbuf_p);
if (rc != LCB_SUCCESS) {
mcreq_wipe_packet(pipeline, packet);
mcreq_release_packet(pipeline, packet);
return rc;
}
MCREQ_PKT_RDATA(packet)->cookie = cookie;
MCREQ_PKT_RDATA(packet)->start = gethrtime();
hdr->request.magic = PROTOCOL_BINARY_REQ;
hdr->request.datatype = PROTOCOL_BINARY_RAW_BYTES;
hdr->request.extlen = packet->extlen;
hdr->request.opaque = packet->opaque;
hdr->request.cas = lcb_htonll(cmd->cas);
hdr->request.bodylen = htonl(hdr->request.extlen +
ntohs(hdr->request.keylen) + get_value_size(packet));
request.message.extras.pathlen = htons(spec->path.contig.nbytes);
request.message.extras.subdoc_flags = 0;
if (spec->options & LCB_SDSPEC_F_MKINTERMEDIATES) {
request.message.extras.subdoc_flags |= SUBDOC_FLAG_MKDIR_P;
}
if (spec->options & LCB_SDSPEC_F_MKDOCUMENT) {
request.message.extras.subdoc_flags |= SUBDOC_FLAG_MKDOC;
}
hdr->request.opcode = traits.opcode;
memcpy(SPAN_BUFFER(&packet->kh_span), request.bytes, sizeof request.bytes);
if (exptime) {
exptime = htonl(exptime);
memcpy(SPAN_BUFFER(&packet->kh_span) + sizeof request.bytes, &exptime, 4);
}
LCB_SCHED_ADD(instance, pipeline, packet);
return LCB_SUCCESS;
}
LIBCOUCHBASE_API
lcb_error_t
lcb_subdoc3(lcb_t instance, const void *cookie, const lcb_CMDSUBDOC *cmd)
{
if (cmd->nspecs == 0) {
return LCB_ENO_COMMANDS;
}
if (cmd->nspecs == 1) {
return sd3_single(instance, cookie, cmd);
}
uint32_t exp = cmd->exptime;
lcb_error_t rc = LCB_SUCCESS;
MultiBuilder ctx(cmd);
if (cmd->error_index) {
*cmd->error_index = -1;
}
if (exp && !ctx.is_mutate()) {
return LCB_OPTIONS_CONFLICT;
}
for (size_t ii = 0; ii < cmd->nspecs; ++ii) {
if (cmd->error_index) {
*cmd->error_index = ii;
}
rc = ctx.add_spec(cmd->specs + ii);
if (rc != LCB_SUCCESS) {
return rc;
}
}
mc_PIPELINE *pl;
mc_PACKET *pkt;
uint8_t extlen = exp ? 4 : 0;
protocol_binary_request_header hdr;
if (cmd->error_index) {
*cmd->error_index = -1;
}
rc = mcreq_basic_packet(
&instance->cmdq, reinterpret_cast<const lcb_CMDBASE*>(cmd),
&hdr, extlen, &pkt, &pl, MCREQ_BASICPACKET_F_FALLBACKOK);
if (rc != LCB_SUCCESS) {
return rc;
}
lcb_VALBUF vb = { LCB_KV_IOVCOPY };
vb.u_buf.multi.iov = &ctx.iovs[0];
vb.u_buf.multi.niov = ctx.iovs.size();
vb.u_buf.multi.total_length = ctx.payload_size;
rc = mcreq_reserve_value(pl, pkt, &vb);
if (rc != LCB_SUCCESS) {
mcreq_wipe_packet(pl, pkt);
mcreq_release_packet(pl, pkt);
return rc;
}
hdr.request.magic = PROTOCOL_BINARY_REQ;
if (ctx.is_lookup()) {
hdr.request.opcode = PROTOCOL_BINARY_CMD_SUBDOC_MULTI_LOOKUP;
} else {
hdr.request.opcode = PROTOCOL_BINARY_CMD_SUBDOC_MULTI_MUTATION;
}
hdr.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
hdr.request.extlen = pkt->extlen;
hdr.request.opaque = pkt->opaque;
hdr.request.cas = lcb_htonll(cmd->cas);
hdr.request.bodylen = htonl(hdr.request.extlen +
ntohs(hdr.request.keylen) + ctx.payload_size);
memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof hdr.bytes);
if (exp) {
exp = htonl(exp);
memcpy(SPAN_BUFFER(&pkt->kh_span) + 24, &exp, 4);
}
MCREQ_PKT_RDATA(pkt)->cookie = cookie;
MCREQ_PKT_RDATA(pkt)->start = gethrtime();
LCB_SCHED_ADD(instance, pl, pkt);
return LCB_SUCCESS;
}