#include "internal.h"
typedef struct {
mc_REQDATAEX base;
lcb_CALLBACKTYPE type;
int remaining;
} bcast_cookie;
static void
refcnt_dtor_common(mc_PACKET *pkt)
{
bcast_cookie *ck = (bcast_cookie *)pkt->u_rdata.exdata;
if (!--ck->remaining) {
free(ck);
}
}
static void
stats_handler(mc_PIPELINE *pl, mc_PACKET *req, lcb_error_t err, const void *arg)
{
bcast_cookie *ck = (bcast_cookie *)req->u_rdata.exdata;
mc_SERVER *server = (mc_SERVER *)pl;
lcb_RESPSTATS *resp = (void *)arg;
char epbuf[NI_MAXHOST + NI_MAXSERV + 4];
lcb_RESPCALLBACK callback;
lcb_t instance = server->instance;
sprintf(epbuf, "%s:%s", mcserver_get_host(server), mcserver_get_port(server));
callback = lcb_find_callback(instance, LCB_CALLBACK_STATS);
if (!arg) {
lcb_RESPSTATS s_resp = { 0 };
if (--ck->remaining) {
return;
}
s_resp.rc = err;
s_resp.cookie = (void *)ck->base.cookie;
s_resp.rflags = LCB_RESP_F_CLIENTGEN|LCB_RESP_F_FINAL;
callback(instance, LCB_CALLBACK_STATS, (lcb_RESPBASE *)&s_resp);
free(ck);
} else {
resp->server = epbuf;
resp->cookie = (void *)ck->base.cookie;
callback(instance, LCB_CALLBACK_STATS, (lcb_RESPBASE *)resp);
return;
}
}
static mc_REQDATAPROCS stats_procs = {
stats_handler,
refcnt_dtor_common
};
LIBCOUCHBASE_API
lcb_error_t
lcb_stats3(lcb_t instance, const void *cookie, const lcb_CMDSTATS * cmd)
{
unsigned ii;
int vbid = -1;
char ksbuf[512] = { 0 };
mc_CMDQUEUE *cq = &instance->cmdq;
bcast_cookie *ckwrap = NULL;
lcbvb_CONFIG *vbc = cq->config;
const lcb_CONTIGBUF *kbuf_in = &cmd->key.contig;
lcb_KEYBUF kbuf_out;
kbuf_out.type = LCB_KV_COPY;
if (cmd->cmdflags & LCB_CMDSTATS_F_KV) {
if (kbuf_in->nbytes == 0 || kbuf_in->nbytes > sizeof(ksbuf) - 30) {
return LCB_EINVAL;
}
if (vbc == NULL) {
return LCB_CLIENT_ETMPFAIL;
}
if (lcbvb_get_distmode(vbc) != LCBVB_DIST_VBUCKET) {
return LCB_NOT_SUPPORTED;
}
vbid = lcbvb_k2vb(vbc, kbuf_in->bytes, kbuf_in->nbytes);
if (vbid < 0) {
return LCB_CLIENT_ETMPFAIL;
}
for (ii = 0; ii < kbuf_in->nbytes; ii++) {
if (isspace( ((char *)kbuf_in->bytes)[ii])) {
return LCB_EINVAL;
}
}
sprintf(ksbuf, "key %.*s %d", (int)kbuf_in->nbytes,
(const char *)kbuf_in->bytes, vbid);
kbuf_out.contig.nbytes = strlen(ksbuf);
kbuf_out.contig.bytes = ksbuf;
} else {
kbuf_out.contig = *kbuf_in;
}
ckwrap = calloc(1, sizeof(*ckwrap));
ckwrap->base.cookie = cookie;
ckwrap->base.start = gethrtime();
ckwrap->base.procs = &stats_procs;
for (ii = 0; ii < cq->npipelines; ii++) {
mc_PACKET *pkt;
mc_PIPELINE *pl = cq->pipelines[ii];
protocol_binary_request_header hdr = { { 0 } };
if (vbid > -1 && lcbvb_has_vbucket(vbc, vbid, ii) == 0) {
continue;
}
pkt = mcreq_allocate_packet(pl);
if (!pkt) {
return LCB_CLIENT_ENOMEM;
}
hdr.request.opcode = PROTOCOL_BINARY_CMD_STAT;
hdr.request.magic = PROTOCOL_BINARY_REQ;
if (cmd->key.contig.nbytes) {
mcreq_reserve_key(pl, pkt, MCREQ_PKT_BASESIZE, &kbuf_out);
hdr.request.keylen = ntohs((lcb_U16)kbuf_out.contig.nbytes);
hdr.request.bodylen = ntohl((lcb_U32)kbuf_out.contig.nbytes);
} else {
mcreq_reserve_header(pl, pkt, MCREQ_PKT_BASESIZE);
}
pkt->u_rdata.exdata = &ckwrap->base;
pkt->flags |= MCREQ_F_REQEXT;
ckwrap->remaining++;
hdr.request.opaque = pkt->opaque;
memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof(hdr.bytes));
mcreq_sched_add(pl, pkt);
}
if (!ii) {
free(ckwrap);
return LCB_NO_MATCHING_SERVER;
}
MAYBE_SCHEDLEAVE(instance);
return LCB_SUCCESS;
}
static void
handle_bcast(mc_PIPELINE *pipeline, mc_PACKET *req, lcb_error_t err,
const void *arg)
{
mc_SERVER *server = (mc_SERVER *)pipeline;
char epbuf[NI_MAXHOST + NI_MAXSERV + 4];
bcast_cookie *ck = (bcast_cookie *)req->u_rdata.exdata;
lcb_RESPCALLBACK callback;
union {
lcb_RESPSERVERBASE *base;
lcb_RESPVERBOSITY *verbosity;
lcb_RESPMCVERSION *version;
lcb_RESPFLUSH *flush;
} u_resp;
union {
lcb_RESPSERVERBASE base;
lcb_RESPVERBOSITY verbosity;
lcb_RESPMCVERSION version;
lcb_RESPFLUSH flush;
} u_empty;
memset(&u_empty, 0, sizeof(u_empty));
if (arg) {
u_resp.base = (void *)arg;
} else {
u_resp.base = &u_empty.base;
u_resp.base->rflags = LCB_RESP_F_CLIENTGEN;
}
u_resp.base->rc = err;
u_resp.base->cookie = (void *)ck->base.cookie;
u_resp.base->server = epbuf;
sprintf(epbuf, "%s:%s", mcserver_get_host(server), mcserver_get_port(server));
callback = lcb_find_callback(server->instance, ck->type);
callback(server->instance, ck->type, (lcb_RESPBASE *)u_resp.base);
if (--ck->remaining) {
return;
}
u_empty.base.server = NULL;
u_empty.base.rc = err;
u_empty.base.rflags = LCB_RESP_F_CLIENTGEN|LCB_RESP_F_FINAL;
u_empty.base.cookie = (void *)ck->base.cookie;
callback(server->instance, ck->type, (lcb_RESPBASE *)&u_empty.base);
free(ck);
}
static mc_REQDATAPROCS bcast_procs = {
handle_bcast,
refcnt_dtor_common
};
static lcb_error_t
pkt_bcast_simple(lcb_t instance, const void *cookie, lcb_CALLBACKTYPE type)
{
mc_CMDQUEUE *cq = &instance->cmdq;
unsigned ii;
bcast_cookie *ckwrap;
if (!cq->config) {
return LCB_CLIENT_ETMPFAIL;
}
ckwrap = calloc(1, sizeof(*ckwrap));
ckwrap->base.cookie = cookie;
ckwrap->base.start = gethrtime();
ckwrap->base.procs = &bcast_procs;
ckwrap->type = type;
for (ii = 0; ii < cq->npipelines; ii++) {
mc_PIPELINE *pl = cq->pipelines[ii];
mc_PACKET *pkt = mcreq_allocate_packet(pl);
protocol_binary_request_header hdr;
memset(&hdr, 0, sizeof(hdr));
if (!pkt) {
return LCB_CLIENT_ENOMEM;
}
pkt->u_rdata.exdata = &ckwrap->base;
pkt->flags |= MCREQ_F_REQEXT;
hdr.request.magic = PROTOCOL_BINARY_REQ;
hdr.request.opaque = pkt->opaque;
if (type == LCB_CALLBACK_FLUSH) {
hdr.request.opcode = PROTOCOL_BINARY_CMD_FLUSH;
} else if (type == LCB_CALLBACK_VERSIONS) {
hdr.request.opcode = PROTOCOL_BINARY_CMD_VERSION;
} else {
fprintf(stderr, "pkt_bcast_simple passed unknown type %u\n", type);
assert(0);
}
mcreq_reserve_header(pl, pkt, MCREQ_PKT_BASESIZE);
memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof(hdr.bytes));
mcreq_sched_add(pl, pkt);
ckwrap->remaining++;
}
if (ii == 0) {
free(ckwrap);
return LCB_NO_MATCHING_SERVER;
}
MAYBE_SCHEDLEAVE(instance);
return LCB_SUCCESS;
}
LIBCOUCHBASE_API
lcb_error_t
lcb_server_versions3(lcb_t instance, const void *cookie, const lcb_CMDBASE * cmd)
{
(void)cmd;
return pkt_bcast_simple(instance, cookie, LCB_CALLBACK_VERSIONS);
}
LIBCOUCHBASE_API
lcb_error_t
lcb_flush3(lcb_t instance, const void *cookie, const lcb_CMDFLUSH *cmd)
{
(void)cmd;
return pkt_bcast_simple(instance, cookie, LCB_CALLBACK_FLUSH);
}
LIBCOUCHBASE_API
lcb_error_t
lcb_server_verbosity3(lcb_t instance, const void *cookie,
const lcb_CMDVERBOSITY *cmd)
{
mc_CMDQUEUE *cq = &instance->cmdq;
unsigned ii;
bcast_cookie *ckwrap;
if (!cq->config) {
return LCB_CLIENT_ETMPFAIL;
}
ckwrap = calloc(1, sizeof(*ckwrap));
ckwrap->base.cookie = cookie;
ckwrap->base.start = gethrtime();
ckwrap->base.procs = &bcast_procs;
ckwrap->type = LCB_CALLBACK_VERBOSITY;
for (ii = 0; ii < cq->npipelines; ii++) {
mc_PACKET *pkt;
mc_PIPELINE *pl = cq->pipelines[ii];
mc_SERVER *server = (mc_SERVER *)pl;
char cmpbuf[NI_MAXHOST + NI_MAXSERV + 4];
protocol_binary_request_verbosity vcmd;
protocol_binary_request_header *hdr = &vcmd.message.header;
uint32_t level;
sprintf(cmpbuf, "%s:%s",
mcserver_get_host(server), mcserver_get_port(server));
if (cmd->server && strncmp(cmpbuf, cmd->server, strlen(cmd->server))) {
continue;
}
if (cmd->level == LCB_VERBOSITY_DETAIL) {
level = 3;
} else if (cmd->level == LCB_VERBOSITY_DEBUG) {
level = 2;
} else if (cmd->level == LCB_VERBOSITY_INFO) {
level = 1;
} else {
level = 0;
}
pkt = mcreq_allocate_packet(pl);
if (!pkt) {
return LCB_CLIENT_ENOMEM;
}
pkt->u_rdata.exdata = &ckwrap->base;
pkt->flags |= MCREQ_F_REQEXT;
mcreq_reserve_header(pl, pkt, MCREQ_PKT_BASESIZE + 4);
hdr->request.magic = PROTOCOL_BINARY_REQ;
hdr->request.opcode = PROTOCOL_BINARY_CMD_VERBOSITY;
hdr->request.datatype = PROTOCOL_BINARY_RAW_BYTES;
hdr->request.cas = 0;
hdr->request.vbucket = 0;
hdr->request.opaque = pkt->opaque;
hdr->request.extlen = 4;
hdr->request.keylen = 0;
hdr->request.bodylen = htonl((uint32_t)hdr->request.extlen);
vcmd.message.body.level = htonl((uint32_t)level);
memcpy(SPAN_BUFFER(&pkt->kh_span), vcmd.bytes, sizeof(vcmd.bytes));
mcreq_sched_add(pl, pkt);
ckwrap->remaining++;
}
if (!ckwrap->remaining) {
free(ckwrap);
return LCB_NO_MATCHING_SERVER;
}
MAYBE_SCHEDLEAVE(instance);
return LCB_SUCCESS;
}
LIBCOUCHBASE_API
lcb_error_t
lcb_server_stats(lcb_t instance, const void *cookie, lcb_size_t num,
const lcb_server_stats_cmd_t * const * items)
{
unsigned ii;
lcb_sched_enter(instance);
for (ii = 0; ii < num; ii++) {
const lcb_server_stats_cmd_t *src = items[ii];
lcb_CMDSTATS dst;
lcb_error_t err;
memset(&dst, 0, sizeof(dst));
dst.key.contig.bytes = src->v.v0.name;
dst.key.contig.nbytes = src->v.v0.nname;
err = lcb_stats3(instance, cookie, &dst);
if (err != LCB_SUCCESS) {
lcb_sched_fail(instance);
return err;
}
}
lcb_sched_leave(instance);
SYNCMODE_INTERCEPT(instance)
}
LIBCOUCHBASE_API
lcb_error_t
lcb_set_verbosity(lcb_t instance, const void *cookie, lcb_size_t num,
const lcb_verbosity_cmd_t * const * items)
{
unsigned ii;
lcb_sched_enter(instance);
for (ii = 0; ii < num; ii++) {
lcb_CMDVERBOSITY dst;
lcb_error_t err;
const lcb_verbosity_cmd_t *src = items[ii];
memset(&dst, 0, sizeof(dst));
dst.level = src->v.v0.level;
dst.server = src->v.v0.server;
err = lcb_server_verbosity3(instance, cookie, &dst);
if (err != LCB_SUCCESS) {
lcb_sched_fail(instance);
return err;
}
}
lcb_sched_leave(instance);
SYNCMODE_INTERCEPT(instance)
}
LIBCOUCHBASE_API
lcb_error_t
lcb_flush(lcb_t instance, const void *cookie, lcb_size_t num,
const lcb_flush_cmd_t * const * items)
{
unsigned ii;
lcb_sched_enter(instance);
for (ii = 0; ii < num; ii++) {
lcb_error_t rc = lcb_flush3(instance, cookie, NULL);
if (rc != LCB_SUCCESS) {
lcb_sched_fail(instance);
return rc;
}
}
lcb_sched_leave(instance);
(void)items;
SYNCMODE_INTERCEPT(instance)
}
LIBCOUCHBASE_API
lcb_error_t
lcb_server_versions(lcb_t instance, const void *cookie, lcb_size_t num,
const lcb_server_version_cmd_t * const * items)
{
unsigned ii;
(void)items;
lcb_sched_enter(instance);
for (ii = 0; ii < num; ii++) {
lcb_error_t rc = lcb_server_versions3(instance, cookie, NULL);
if (rc != LCB_SUCCESS) {
lcb_sched_fail(instance);
return rc;
}
}
lcb_sched_leave(instance);
SYNCMODE_INTERCEPT(instance)
}