#include "internal.h"
#include "packetutils.h"
#include "bucketconfig/clconfig.h"
#include "vbucket/aliases.h"
#include "sllist-inl.h"
#define LOGARGS(instance, lvl) (instance)->settings, "newconfig", LCB_LOG_##lvl, __FILE__, __LINE__
#define LOG(instance, lvlbase, msg) lcb_log(instance->settings, "newconfig", LCB_LOG_##lvlbase, __FILE__, __LINE__, msg)
#define SERVER_FMT "%s:%s (%p)"
#define SERVER_ARGS(s) (s)->curhost->host, (s)->curhost->port, (void *)s
typedef struct lcb_GUESSVB_st {
time_t last_update;
char newix;
char oldix;
char used;
} lcb_GUESSVB;
#define MAX_KEEP_GUESS 20
static int
should_keep_guess(lcb_GUESSVB *guess, lcbvb_VBUCKET *vb)
{
if (guess->newix == guess->oldix) {
return 0;
}
if (vb->servers[0] != guess->oldix) {
return 0;
}
if (time(NULL) - guess->last_update > MAX_KEEP_GUESS) {
return 0;
}
return 1;
}
void
lcb_vbguess_newconfig(lcb_t instance, lcbvb_CONFIG *cfg, lcb_GUESSVB *guesses)
{
unsigned ii;
if (!guesses) {
return;
}
for (ii = 0; ii < cfg->nvb; ii++) {
lcb_GUESSVB *guess = guesses + ii;
lcbvb_VBUCKET *vb = cfg->vbuckets + ii;
if (!guess->used) {
continue;
}
if (should_keep_guess(guess, vb)) {
lcb_log(LOGARGS(instance, TRACE), "Keeping heuristically guessed index. VBID=%d. Current=%d. Old=%d.", ii, guess->newix, guess->oldix);
vb->servers[0] = guess->newix;
} else {
lcb_log(LOGARGS(instance, TRACE), "Ignoring heuristically guessed index. VBID=%d. Current=%d. Old=%d. New=%d", ii, guess->newix, guess->oldix, vb->servers[0]);
guess->used = 0;
}
}
}
int
lcb_vbguess_remap(lcb_t instance, int vbid, int bad)
{
if (LCBT_SETTING(instance, vb_noguess)) {
int newix = lcbvb_nmv_remap_ex(LCBT_VBCONFIG(instance), vbid, bad, 0);
if (newix > -1 && newix != bad) {
lcb_log(LOGARGS(instance, TRACE), "Got new index from ffmap. VBID=%d. Old=%d. New=%d", vbid, bad, newix);
}
return newix;
} else {
lcb_GUESSVB *guesses = instance->vbguess;
lcb_GUESSVB *guess = guesses + vbid;
int newix = lcbvb_nmv_remap_ex(LCBT_VBCONFIG(instance), vbid, bad, 1);
if (!guesses) {
guesses = instance->vbguess = calloc(
LCBT_VBCONFIG(instance)->nvb, sizeof *guesses);
}
if (newix > -1 && newix != bad) {
guess->newix = newix;
guess->oldix = bad;
guess->used = 1;
guess->last_update = time(NULL);
lcb_log(LOGARGS(instance, TRACE), "Guessed new heuristic index VBID=%d. Old=%d. New=%d", vbid, bad, newix);
}
return newix;
}
}
static int
find_new_data_index(lcbvb_CONFIG *oldconfig, lcbvb_CONFIG* newconfig,
mc_SERVER *server)
{
size_t ii;
const char *old_datahost = lcbvb_get_hostport(oldconfig,
server->pipeline.index, LCBVB_SVCTYPE_DATA, LCBVB_SVCMODE_PLAIN);
if (!old_datahost) {
return -1;
}
for (ii = 0; ii < LCBVB_NSERVERS(newconfig); ii++) {
const char *new_datahost = lcbvb_get_hostport(newconfig, ii,
LCBVB_SVCTYPE_DATA, LCBVB_SVCMODE_PLAIN);
if (new_datahost && strcmp(new_datahost, old_datahost) == 0) {
return ii;
}
}
return -1;
}
static void
log_vbdiff(lcb_t instance, lcbvb_CONFIGDIFF *diff)
{
char **curserver;
lcb_log(LOGARGS(instance, INFO), "Config Diff: [ vBuckets Modified=%d ], [Sequence Changed=%d]", diff->n_vb_changes, diff->sequence_changed);
if (diff->servers_added) {
for (curserver = diff->servers_added; *curserver; curserver++) {
lcb_log(LOGARGS(instance, INFO), "Detected server %s added", *curserver);
}
}
if (diff->servers_removed) {
for (curserver = diff->servers_removed; *curserver; curserver++) {
lcb_log(LOGARGS(instance, INFO), "Detected server %s removed", *curserver);
}
}
}
static int
iterwipe_cb(mc_CMDQUEUE *cq, mc_PIPELINE *oldpl, mc_PACKET *oldpkt, void *arg)
{
protocol_binary_request_header hdr;
mc_SERVER *srv = (mc_SERVER *)oldpl;
mc_PIPELINE *newpl;
mc_PACKET *newpkt;
int newix;
(void)arg;
mcreq_read_hdr(oldpkt, &hdr);
if (!lcb_should_retry(srv->settings, oldpkt, LCB_MAX_ERROR)) {
return MCREQ_KEEP_PACKET;
}
if (LCBVB_DISTTYPE(cq->config) == LCBVB_DIST_VBUCKET) {
newix = lcbvb_vbmaster(cq->config, ntohs(hdr.request.vbucket));
} else {
const void *key = NULL;
lcb_SIZE nkey = 0;
int tmpid;
mcreq_get_key(oldpkt, &key, &nkey);
lcbvb_map_key(cq->config, key, nkey, &tmpid, &newix);
}
if (newix < 0 || newix > (int)cq->npipelines-1) {
return MCREQ_KEEP_PACKET;
}
newpl = cq->pipelines[newix];
if (newpl == oldpl || newpl == NULL) {
return MCREQ_KEEP_PACKET;
}
lcb_log(LOGARGS((lcb_t)cq->cqdata, DEBUG), "Remapped packet %p (SEQ=%u) from "SERVER_FMT " to " SERVER_FMT,
(void*)oldpkt, oldpkt->opaque, SERVER_ARGS((mc_SERVER*)oldpl), SERVER_ARGS((mc_SERVER*)newpl));
newpkt = mcreq_renew_packet(oldpkt);
newpkt->flags &= ~MCREQ_STATE_FLAGS;
mcreq_reenqueue_packet(newpl, newpkt);
mcreq_packet_handled(oldpl, oldpkt);
return MCREQ_REMOVE_PACKET;
}
static int
replace_config(lcb_t instance, lcbvb_CONFIG *oldconfig, lcbvb_CONFIG *newconfig)
{
mc_CMDQUEUE *cq = &instance->cmdq;
mc_PIPELINE **ppold, **ppnew;
unsigned ii, nold, nnew;
assert(LCBT_VBCONFIG(instance) == newconfig);
nnew = LCBVB_NSERVERS(newconfig);
ppnew = calloc(nnew, sizeof(*ppnew));
ppold = mcreq_queue_take_pipelines(cq, &nold);
for (ii = 0; ii < nold; ii++) {
mc_SERVER *cur = (mc_SERVER *)ppold[ii];
int newix = find_new_data_index(oldconfig, newconfig, cur);
if (newix > -1) {
cur->pipeline.index = newix;
ppnew[newix] = &cur->pipeline;
ppold[ii] = NULL;
lcb_log(LOGARGS(instance, INFO), "Reusing server "SERVER_FMT". OldIndex=%d. NewIndex=%d", SERVER_ARGS(cur), ii, newix);
}
}
for (ii = 0; ii < nnew; ii++) {
if (!ppnew[ii]) {
ppnew[ii] = (mc_PIPELINE *)mcserver_alloc(instance, ii);
ppnew[ii]->index = ii;
}
}
mcreq_queue_add_pipelines(cq, ppnew, nnew, newconfig);
for (ii = 0; ii < nnew; ii++) {
mcreq_iterwipe(cq, ppnew[ii], iterwipe_cb, NULL);
}
for (ii = 0; ii < nold; ii++) {
if (!ppold[ii]) {
continue;
}
mcreq_iterwipe(cq, ppold[ii], iterwipe_cb, NULL);
mcserver_fail_chain((mc_SERVER *)ppold[ii], LCB_MAP_CHANGED);
mcserver_close((mc_SERVER *)ppold[ii]);
}
for (ii = 0; ii < nnew; ii++) {
if (mcserver_has_pending((mc_SERVER*)ppnew[ii])) {
ppnew[ii]->flush_start(ppnew[ii]);
}
}
free(ppnew);
free(ppold);
return LCB_CONFIGURATION_CHANGED;
}
void lcb_update_vbconfig(lcb_t instance, clconfig_info *config)
{
lcb_size_t ii;
int change_status;
clconfig_info *old_config;
mc_CMDQUEUE *q = &instance->cmdq;
old_config = instance->cur_configinfo;
instance->cur_configinfo = config;
lcb_clconfig_incref(config);
q->config = instance->cur_configinfo->vbc;
q->cqdata = instance;
if (old_config) {
lcbvb_CONFIGDIFF *diff = lcbvb_compare(old_config->vbc, config->vbc);
if (diff) {
log_vbdiff(instance, diff);
lcbvb_free_diff(diff);
}
lcb_vbguess_newconfig(instance, config->vbc, instance->vbguess);
change_status = replace_config(instance, old_config->vbc, config->vbc);
if (change_status == -1) {
LOG(instance, ERR, "Couldn't replace config");
return;
}
lcb_clconfig_decref(old_config);
} else {
unsigned nservers;
mc_PIPELINE **servers;
nservers = VB_NSERVERS(config->vbc);
if ((servers = malloc(sizeof(*servers) * nservers)) == NULL) {
assert(servers);
lcb_log(LOGARGS(instance, FATAL), "Couldn't allocate memory for new server list! (n=%u)", nservers);
return;
}
for (ii = 0; ii < nservers; ii++) {
mc_SERVER *srv;
if ((srv = mcserver_alloc(instance, ii)) == NULL) {
assert(srv);
lcb_log(LOGARGS(instance, FATAL), "Couldn't allocate memory for server instance!");
return;
}
servers[ii] = &srv->pipeline;
}
mcreq_queue_add_pipelines(q, servers, nservers, config->vbc);
change_status = LCB_CONFIGURATION_NEW;
free(servers);
}
hostlist_clear(instance->ht_nodes);
for (ii = 0; ii < LCBVB_NSERVERS(config->vbc); ++ii) {
const char *hp = lcbvb_get_hostport(config->vbc, ii,
LCBVB_SVCTYPE_MGMT, LCBVB_SVCMODE_PLAIN);
if (hp) {
hostlist_add_stringz(instance->ht_nodes, hp, LCB_CONFIG_HTTP_PORT);
}
}
instance->callbacks.configuration(instance, change_status);
lcb_maybe_breakout(instance);
}