#include "internal.h"
lcb_error_t
lcb_observe_seqno3(lcb_t instance, const void *cookie, const lcb_CMDOBSEQNO *cmd)
{
mc_PACKET *pkt;
mc_SERVER *server;
mc_PIPELINE *pl;
protocol_binary_request_header hdr;
lcb_U64 uuid;
if (cmd->server_index > LCBT_NSERVERS(instance)) {
return LCB_EINVAL;
}
server = LCBT_GET_SERVER(instance, cmd->server_index);
pl = &server->pipeline;
pkt = mcreq_allocate_packet(pl);
mcreq_reserve_header(pl, pkt, MCREQ_PKT_BASESIZE);
mcreq_reserve_value2(pl, pkt, 8);
MCREQ_PKT_RDATA(pkt)->cookie = cookie;
MCREQ_PKT_RDATA(pkt)->start = gethrtime();
if (cmd->cmdflags & LCB_CMD_F_INTERNAL_CALLBACK) {
pkt->flags |= MCREQ_F_PRIVCALLBACK;
}
memset(&hdr, 0, sizeof hdr);
hdr.request.opaque = pkt->opaque;
hdr.request.magic = PROTOCOL_BINARY_REQ;
hdr.request.opcode = PROTOCOL_BINARY_CMD_OBSERVE_SEQNO;
hdr.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
hdr.request.bodylen = htonl((lcb_U32)8);
hdr.request.vbucket = htons(cmd->vbid);
memcpy(SPAN_BUFFER(&pkt->kh_span), hdr.bytes, sizeof hdr.bytes);
uuid = lcb_htonll(cmd->uuid);
memcpy(SPAN_BUFFER(&pkt->u_value.single), &uuid, sizeof uuid);
LCB_SCHED_ADD(instance, pl, pkt);
return LCB_SUCCESS;
}
const lcb_MUTATION_TOKEN *
lcb_get_mutation_token(lcb_t instance, const lcb_KEYBUF *kb, lcb_error_t *errp)
{
int vbix, srvix;
lcb_error_t err_s;
const lcb_MUTATION_TOKEN *existing;
if (!errp) {
errp = &err_s;
}
if (!LCBT_VBCONFIG(instance)) {
*errp = LCB_CLIENT_ETMPFAIL;
return NULL;
}
if (LCBT_VBCONFIG(instance)->dtype != LCBVB_DIST_VBUCKET) {
*errp = LCB_NOT_SUPPORTED;
return NULL;
}
if (!LCBT_SETTING(instance, fetch_mutation_tokens)) {
*errp = LCB_NOT_SUPPORTED;
return NULL;
}
if (!instance->dcpinfo) {
*errp = LCB_DURABILITY_NO_MUTATION_TOKENS;
return NULL;
}
mcreq_map_key(&instance->cmdq, kb, kb, 0, &vbix, &srvix);
existing = instance->dcpinfo + vbix;
if (existing->uuid_ == 0 && existing->seqno_ == 0) {
*errp = LCB_DURABILITY_NO_MUTATION_TOKENS;
return NULL;
}
*errp = LCB_SUCCESS;
return existing;
}