#include "internal.h"
#include "clconfig.h"
#include "bc_http.h"
#include <lcbio/ssl.h>
#include "ctx-log-inl.h"
#define LOGARGS(ht, lvlbase) ht->base.parent->settings, "htconfig", LCB_LOG_##lvlbase, __FILE__, __LINE__
#define LOGFMT "<%s:%s> "
#define LOGID(h) get_ctx_host(h->ioctx), get_ctx_port(h->ioctx)
static void io_error_handler(lcbio_CTX *, lcb_error_t);
static void on_connected(lcbio_SOCKET *, void *, lcb_error_t, lcbio_OSERR);
static lcb_error_t connect_next(http_provider *);
static void read_common(lcbio_CTX *, unsigned);
static lcb_error_t setup_request_header(http_provider *, const lcb_host_t *);
static int is_v220_compat(http_provider *http)
{
lcb_uint32_t setting = PROVIDER_SETTING(&http->base, bc_http_stream_time);
if (setting == (lcb_uint32_t)-1) {
return 1;
}
return 0;
}
static void close_current(http_provider *http)
{
lcbio_timer_disarm(http->disconn_timer);
if (http->ioctx) {
lcbio_ctx_close(http->ioctx, NULL, NULL);
} else if (http->creq){
lcbio_connect_cancel(http->creq);
}
http->creq = NULL;
http->ioctx = NULL;
}
static lcb_error_t
io_error(http_provider *http, lcb_error_t origerr)
{
lcb_confmon *mon = http->base.parent;
lcb_settings *settings = mon->settings;
close_current(http);
http->creq = lcbio_connect_hl(
mon->iot, settings, http->nodes, 0, settings->config_node_timeout,
on_connected, http);
if (http->creq) {
return LCB_SUCCESS;
}
lcb_confmon_provider_failed(&http->base, origerr);
lcbio_timer_disarm(http->io_timer);
if (is_v220_compat(http) && http->base.parent->config != NULL) {
lcb_log(LOGARGS(http, INFO), "HTTP node list finished. Trying to obtain connection from first node in list");
if (!lcbio_timer_armed(http->as_reconnect)) {
lcbio_timer_rearm(http->as_reconnect,
PROVIDER_SETTING(&http->base, grace_next_cycle));
}
}
return origerr;
}
static void set_new_config(http_provider *http)
{
const lcb_host_t *curhost;
if (http->current_config) {
lcb_clconfig_decref(http->current_config);
}
curhost = lcbio_get_host(lcbio_ctx_sock(http->ioctx));
http->current_config = http->last_parsed;
lcb_clconfig_incref(http->current_config);
lcbvb_replace_host(http->current_config->vbc, curhost->host);
lcb_confmon_provider_success(&http->base, http->current_config);
}
static lcb_error_t
process_chunk(http_provider *http, const void *buf, unsigned nbuf)
{
lcb_error_t err = LCB_SUCCESS;
char *term;
int rv;
lcbvb_CONFIG *cfgh;
lcbht_RESPSTATE state, oldstate, diff;
lcbht_RESPONSE *resp = lcbht_get_response(http->htp);
oldstate = resp->state;
state = lcbht_parse(http->htp, buf, nbuf);
diff = state ^ oldstate;
if (state & LCBHT_S_ERROR) {
return LCB_PROTOCOL_ERROR;
}
if (diff & LCBHT_S_HEADER) {
if (resp->status == 200) {
} else if (resp->status == 404) {
const int urlmode = PROVIDER_SETTING(&http->base, bc_http_urltype);
err = LCB_BUCKET_ENOENT;
if (++http->uritype > LCB_HTCONFIG_URLTYPE_COMPAT) {
lcb_log(LOGARGS(http, ERR), LOGFMT "Got 404 on config stream. Assuming bucket does not exist as we've tried both URL types", LOGID(http));
goto GT_HT_ERROR;
} else if ((urlmode & LCB_HTCONFIG_URLTYPE_COMPAT) == 0) {
lcb_log(LOGARGS(http, ERR), LOGFMT "Got 404 on config stream for terse URI. Compat URI disabled, so not trying", LOGID(http));
} else {
lcb_log(LOGARGS(http, WARN), LOGFMT "Got 404 on config stream. Assuming terse URI not supported on cluster", LOGID(http));
http->try_nexturi = 1;
err = LCB_SUCCESS;
goto GT_CHECKDONE;
}
} else if (resp->status == 401) {
err = LCB_AUTH_ERROR;
} else {
err = LCB_ERROR;
}
GT_HT_ERROR:
if (err != LCB_SUCCESS) {
lcb_log(LOGARGS(http, ERR), LOGFMT "Got non-success HTTP status code %d", LOGID(http), resp->status);
return err;
}
}
GT_CHECKDONE:
if (http->try_nexturi) {
lcb_host_t *host;
if (!(state & LCBHT_S_DONE)) {
return LCB_SUCCESS;
}
host = lcbio_get_host(lcbio_ctx_sock(http->ioctx));
http->try_nexturi = 0;
if ((err = setup_request_header(http, host)) != LCB_SUCCESS) {
return err;
}
lcbht_reset(http->htp);
lcbio_ctx_put(http->ioctx, http->request_buf, strlen(http->request_buf));
return LCB_SUCCESS;
}
if (PROVIDER_SETTING(&http->base, conntype) == LCB_TYPE_CLUSTER) {
resp->body.nused = 0;
return LCB_SUCCESS;
}
if (!(state & LCBHT_S_BODY)) {
return LCB_SUCCESS;
}
term = strstr(resp->body.base, CONFIG_DELIMITER);
if (!term) {
return LCB_SUCCESS;
}
*term = '\0';
cfgh = lcbvb_create();
if (!cfgh) {
return LCB_CLIENT_ENOMEM;
}
rv = lcbvb_load_json(cfgh, resp->body.base);
if (rv != 0) {
lcb_log(LOGARGS(http, ERR), LOGFMT "Failed to parse a valid config from HTTP stream", LOGID(http));
lcb_log_badconfig(LOGARGS(http, ERR), cfgh, resp->body.base);
lcbvb_destroy(cfgh);
return LCB_PROTOCOL_ERROR;
}
if (http->last_parsed) {
lcb_clconfig_decref(http->last_parsed);
}
http->last_parsed = lcb_clconfig_create(cfgh, LCB_CLCONFIG_HTTP);
http->last_parsed->cmpclock = gethrtime();
http->generation++;
lcb_string_erase_beginning(&resp->body,
(term+sizeof(CONFIG_DELIMITER)-1)-resp->body.base);
return LCB_SUCCESS;
}
static void
read_common(lcbio_CTX *ctx, unsigned nr)
{
lcbio_CTXRDITER riter;
http_provider *http = lcbio_ctx_data(ctx);
int old_generation = http->generation;
lcb_log(LOGARGS(http, TRACE), LOGFMT "Received %d bytes on HTTP stream", LOGID(http), nr);
lcbio_timer_rearm(http->io_timer,
PROVIDER_SETTING(&http->base, config_node_timeout));
LCBIO_CTX_ITERFOR(ctx, &riter, nr) {
unsigned nbuf = lcbio_ctx_risize(&riter);
void *buf = lcbio_ctx_ribuf(&riter);
lcb_error_t err = process_chunk(http, buf, nbuf);
if (err != LCB_SUCCESS) {
io_error(http, err);
return;
}
}
if (http->generation != old_generation) {
lcb_log(LOGARGS(http, DEBUG), LOGFMT "Generation %d -> %d", LOGID(http), old_generation, http->generation);
lcbio_timer_disarm(http->io_timer);
set_new_config(http);
}
lcbio_ctx_rwant(ctx, 1);
lcbio_ctx_schedule(ctx);
}
static lcb_error_t
setup_request_header(http_provider *http, const lcb_host_t *host)
{
lcb_settings *settings = http->base.parent->settings;
char *buf = http->request_buf;
const char *username = NULL, *password = NULL;
lcb_size_t nbuf = sizeof(http->request_buf);
lcb_size_t offset = 0;
http->request_buf[0] = '\0';
if (settings->conntype == LCB_TYPE_BUCKET) {
const char *fmt;
if (http->uritype == LCB_HTCONFIG_URLTYPE_25PLUS) {
fmt = REQBUCKET_TERSE_FMT;
} else {
fmt = REQBUCKET_COMPAT_FMT;
}
offset = snprintf(buf, nbuf, fmt, settings->bucket);
} else if (settings->conntype == LCB_TYPE_CLUSTER) {
offset = snprintf(buf, nbuf, REQPOOLS_FMT);
} else {
return LCB_EINVAL;
}
lcbauth_get_upass(settings->auth, &username, &password);
if (password) {
char cred[256], b64[256];
snprintf(cred, sizeof(cred), "%s:%s", username, password);
if (lcb_base64_encode(cred, b64, sizeof(b64)) == -1) {
return LCB_EINTERNAL;
}
offset += snprintf(buf + offset, nbuf - offset, AUTHDR_FMT, b64);
}
offset += snprintf(buf + offset, nbuf - offset, HOSTHDR_FMT,
host->host, host->port);
offset += snprintf(buf + offset, nbuf - offset, "%s\r\n", LAST_HTTP_HEADER);
return LCB_SUCCESS;
}
static void reset_stream_state(http_provider *http)
{
const int urlmode = PROVIDER_SETTING(&http->base, bc_http_urltype);
if (http->last_parsed) {
lcb_clconfig_decref(http->last_parsed);
http->last_parsed = NULL;
}
if (urlmode & LCB_HTCONFIG_URLTYPE_25PLUS) {
http->uritype = LCB_HTCONFIG_URLTYPE_25PLUS;
} else {
http->uritype = LCB_HTCONFIG_URLTYPE_COMPAT;
}
http->try_nexturi = 0;
lcbht_reset(http->htp);
}
static void
on_connected(lcbio_SOCKET *sock, void *arg, lcb_error_t err, lcbio_OSERR syserr)
{
http_provider *http = arg;
lcb_host_t *host;
lcbio_CTXPROCS procs;
http->creq = NULL;
if (err != LCB_SUCCESS) {
lcb_log(LOGARGS(http, ERR), "Connection to REST API failed with code=0x%x (%d)", err, syserr);
io_error(http, err);
return;
}
host = lcbio_get_host(sock);
lcb_log(LOGARGS(http, DEBUG), "Successfuly connected to REST API %s:%s", host->host, host->port);
lcbio_sslify_if_needed(sock, http->base.parent->settings);
reset_stream_state(http);
if ((err = setup_request_header(http, host)) != LCB_SUCCESS) {
lcb_log(LOGARGS(http, ERR), "Couldn't setup request header");
io_error(http, err);
return;
}
memset(&procs, 0, sizeof(procs));
procs.cb_err = io_error_handler;
procs.cb_read = read_common;
http->ioctx = lcbio_ctx_new(sock, http, &procs);
http->ioctx->subsys = "bc_http";
lcbio_ctx_put(http->ioctx, http->request_buf, strlen(http->request_buf));
lcbio_ctx_rwant(http->ioctx, 1);
lcbio_ctx_schedule(http->ioctx);
lcbio_timer_rearm(http->io_timer,
PROVIDER_SETTING(&http->base, config_node_timeout));
}
static void
timeout_handler(void *arg)
{
http_provider *http = arg;
lcb_log(LOGARGS(http, ERR), LOGFMT "HTTP Provider timed out waiting for I/O", LOGID(http));
if (&http->base != http->base.parent->cur_provider ||
lcb_confmon_is_refreshing(http->base.parent) == 0) {
lcb_log(LOGARGS(http, DEBUG), LOGFMT "Ignoring timeout because we're either not in a refresh or not the current provider", LOGID(http));
return;
}
io_error(http, LCB_ETIMEDOUT);
}
static lcb_error_t
connect_next(http_provider *http)
{
lcb_settings *settings = http->base.parent->settings;
lcb_log(LOGARGS(http, TRACE), "Starting HTTP Configuration Provider %p", (void*)http);
close_current(http);
lcbio_timer_disarm(http->as_reconnect);
if (!hostlist_size(http->nodes)) {
lcb_log(LOGARGS(http, ERROR), "Not scheduling HTTP provider since no nodes have been configured for HTTP bootstrap");
return LCB_CONNECT_ERROR;
}
http->creq = lcbio_connect_hl(http->base.parent->iot, settings, http->nodes, 1,
settings->config_node_timeout, on_connected, http);
if (http->creq) {
return LCB_SUCCESS;
}
lcb_log(LOGARGS(http, ERROR), "%p: Couldn't schedule connection", (void*)http);
return LCB_CONNECT_ERROR;
}
static void delayed_disconn(void *arg)
{
http_provider *http = arg;
lcb_log(LOGARGS(http, DEBUG), "Stopping HTTP provider %p", (void*)http);
close_current(http);
lcbio_timer_disarm(http->io_timer);
}
static void delayed_reconnect(void *arg)
{
http_provider *http = arg;
lcb_error_t err;
if (http->ioctx) {
return;
}
err = connect_next(http);
if (err != LCB_SUCCESS) {
io_error(http, err);
}
}
static lcb_error_t pause_http(clconfig_provider *pb)
{
http_provider *http = (http_provider *)pb;
if (is_v220_compat(http)) {
return LCB_SUCCESS;
}
if (!lcbio_timer_armed(http->disconn_timer)) {
lcbio_timer_rearm(http->disconn_timer,
PROVIDER_SETTING(pb, bc_http_stream_time));
}
return LCB_SUCCESS;
}
static lcb_error_t get_refresh(clconfig_provider *provider)
{
http_provider *http = (http_provider *)provider;
if (http->ioctx == NULL && http->creq == NULL) {
lcbio_async_signal(http->as_reconnect);
}
lcbio_timer_disarm(http->disconn_timer);
if (http->ioctx) {
lcbio_timer_rearm(http->io_timer,
PROVIDER_SETTING(provider, config_node_timeout));
}
return LCB_SUCCESS;
}
static clconfig_info* http_get_cached(clconfig_provider *provider)
{
http_provider *http = (http_provider *)provider;
return http->current_config;
}
static void
config_updated(clconfig_provider *pb, lcbvb_CONFIG *newconfig)
{
unsigned int ii;
http_provider *http = (http_provider *)pb;
lcb_SSLOPTS sopts;
lcbvb_SVCMODE mode;
hostlist_clear(http->nodes);
sopts = PROVIDER_SETTING(pb, sslopts);
if (sopts & LCB_SSL_ENABLED) {
mode = LCBVB_SVCMODE_SSL;
} else {
mode = LCBVB_SVCMODE_PLAIN;
}
for (ii = 0; ii < newconfig->nsrv; ++ii) {
const char *ss;
lcb_error_t status;
ss = lcbvb_get_hostport(newconfig, ii, LCBVB_SVCTYPE_MGMT, mode);
if (!ss) {
continue;
}
status = hostlist_add_stringz(http->nodes, ss, LCB_CONFIG_HTTP_PORT);
lcb_assert(status == LCB_SUCCESS);
}
if (!hostlist_size(http->nodes)) {
lcb_log(LOGARGS(http, FATAL), "New nodes do not contain management ports");
}
if (PROVIDER_SETTING(pb, randomize_bootstrap_nodes)) {
hostlist_randomize(http->nodes);
}
}
static void
configure_nodes(clconfig_provider *pb, const hostlist_t newnodes)
{
http_provider *http = (void *)pb;
hostlist_assign(http->nodes, newnodes);
if (PROVIDER_SETTING(pb, randomize_bootstrap_nodes)) {
hostlist_randomize(http->nodes);
}
}
static hostlist_t
get_nodes(const clconfig_provider *pb)
{
return ((http_provider *)pb)->nodes;
}
static void shutdown_http(clconfig_provider *provider)
{
http_provider *http = (http_provider *)provider;
reset_stream_state(http);
close_current(http);
lcbht_free(http->htp);
if (http->current_config) {
lcb_clconfig_decref(http->current_config);
}
if (http->disconn_timer) {
lcbio_timer_destroy(http->disconn_timer);
}
if (http->io_timer) {
lcbio_timer_destroy(http->io_timer);
}
if (http->as_reconnect) {
lcbio_timer_destroy(http->as_reconnect);
}
if (http->nodes) {
hostlist_destroy(http->nodes);
}
free(http);
}
static void
do_http_dump(clconfig_provider *pb, FILE *fp)
{
http_provider *ht = (http_provider *)pb;
fprintf(fp, "## BEGIN HTTP PROVIDER DUMP\n");
fprintf(fp, "NUMBER OF CONFIGS RECEIVED: %u\n", ht->generation);
fprintf(fp, "DUMPING I/O TIMER\n");
lcbio_timer_dump(ht->io_timer, fp);
if (ht->ioctx) {
fprintf(fp, "DUMPING CURRENT CONNECTION:\n");
lcbio_ctx_dump(ht->ioctx, fp);
} else if (ht->creq) {
fprintf(fp, "CURRENTLY CONNECTING..\n");
} else {
fprintf(fp, "NO CONNECTION ACTIVE\n");
}
}
clconfig_provider * lcb_clconfig_create_http(lcb_confmon *parent)
{
http_provider *http = calloc(1, sizeof(*http));
if (!http) {
return NULL;
}
if (! (http->nodes = hostlist_create())) {
free(http);
return NULL;
}
http->base.type = LCB_CLCONFIG_HTTP;
http->base.refresh = get_refresh;
http->base.pause = pause_http;
http->base.get_cached = http_get_cached;
http->base.shutdown = shutdown_http;
http->base.config_updated = config_updated;
http->base.configure_nodes = configure_nodes;
http->base.get_nodes = get_nodes;
http->base.dump = do_http_dump;
http->base.enabled = 0;
http->io_timer = lcbio_timer_new(parent->iot, http, timeout_handler);
http->disconn_timer = lcbio_timer_new(parent->iot, http, delayed_disconn);
http->as_reconnect = lcbio_timer_new(parent->iot, http, delayed_reconnect);
http->htp = lcbht_new(parent->settings);
return &http->base;
}
static void
io_error_handler(lcbio_CTX *ctx, lcb_error_t err)
{
io_error((http_provider *)lcbio_ctx_data(ctx), err);
}
void lcb_clconfig_http_enable(clconfig_provider *http)
{
http->enabled = 1;
}
lcbio_SOCKET *
lcb_confmon_get_rest_connection(lcb_confmon *mon)
{
http_provider *http = (http_provider *)mon->all_providers[LCB_CLCONFIG_HTTP];
if (!http->ioctx) {
return NULL;
}
return lcbio_ctx_sock(http->ioctx);
}
lcb_host_t *
lcb_confmon_get_rest_host(lcb_confmon *mon)
{
lcbio_SOCKET *sock = lcb_confmon_get_rest_connection(mon);
if (sock) {
return lcbio_get_host(sock);
}
return NULL;
}