gauc 0.3.0

Couchbase Rust Adapter / CLI
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
 *     Copyright 2011-2014 Couchbase, Inc.
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 */

#include "internal.h"
#include "logging.h"
#include "vbucket/aliases.h"
#include "settings.h"
#include "negotiate.h"
#include "bucketconfig/clconfig.h"
#include "mc/mcreq-flush-inl.h"
#include <lcbio/ssl.h>
#include "ctx-log-inl.h"

#define LOGARGS(c, lvl) (c)->settings, "server", LCB_LOG_##lvl, __FILE__, __LINE__
#define LOGFMT "<%s:%s> (SRV=%p,IX=%d) "
#define LOGID(server) get_ctx_host(server->connctx), get_ctx_port(server->connctx), (void*)server, server->pipeline.index
#define MCREQ_MAXIOV 32
#define LCBCONN_UNWANT(conn, flags) (conn)->want &= ~(flags)

typedef enum {
    /* There are no known errored commands on this server */
    S_CLEAN,

    /* In the process of draining remaining commands to be flushed. The commands
     * being drained may have already been rescheduled to another server or
     * placed inside the error queue, but are pending being flushed. This will
     * only happen in completion-style I/O plugins. When this state is in effect,
     * subsequent attempts to connect will be blocked until all commands have
     * been properly drained.
     */
    S_ERRDRAIN,

    /* The server object has been closed, either because it has been removed
     * from the cluster or because the related lcb_t has been destroyed.
     */
    S_CLOSED
} mcserver_STATE;

static int check_closed(mc_SERVER *);
static void start_errored_ctx(mc_SERVER *server, mcserver_STATE next_state);
static void finalize_errored_ctx(mc_SERVER *server);
static void on_error(lcbio_CTX *ctx, lcb_error_t err);
static void server_socket_failed(mc_SERVER *server, lcb_error_t err);

static void
on_flush_ready(lcbio_CTX *ctx)
{
    mc_SERVER *server = lcbio_ctx_data(ctx);
    nb_IOV iov[MCREQ_MAXIOV];
    int ready;

    do {
        int niov = 0;
        unsigned nb;
        nb = mcreq_flush_iov_fill(&server->pipeline, iov, MCREQ_MAXIOV, &niov);
        if (!nb) {
            return;
        }
        ready = lcbio_ctx_put_ex(ctx, (lcb_IOV *)iov, niov, nb);
    } while (ready);
    lcbio_ctx_wwant(ctx);
}

static void
on_flush_done(lcbio_CTX *ctx, unsigned expected, unsigned actual)
{
    mc_SERVER *server = lcbio_ctx_data(ctx);
    lcb_U64 now = 0;
    if (server->settings->readj_ts_wait) {
        now = gethrtime();
    }

    mcreq_flush_done_ex(&server->pipeline, actual, expected, now);
    check_closed(server);
}

void
mcserver_flush(mc_SERVER *server)
{
    /** Call into the wwant stuff.. */
    if (!server->connctx->rdwant) {
        lcbio_ctx_rwant(server->connctx, 24);
    }

    lcbio_ctx_wwant(server->connctx);
    lcbio_ctx_schedule(server->connctx);

    if (!lcbio_timer_armed(server->io_timer)) {
        /**
         * XXX: Maybe use get_next_timeout(), although here we can assume
         * that a command was just scheduled
         */
        lcbio_timer_rearm(server->io_timer, MCSERVER_TIMEOUT(server));
    }
}

LIBCOUCHBASE_API
void
lcb_sched_flush(lcb_t instance)
{
    unsigned ii;
    for (ii = 0; ii < LCBT_NSERVERS(instance); ii++) {
        mc_SERVER *server = LCBT_GET_SERVER(instance, ii);

        if (!mcserver_has_pending(server)) {
            continue;
        }
        server->pipeline.flush_start(&server->pipeline);
    }
}

/**
 * Invoked when get a NOT_MY_VBUCKET response. If the response contains a JSON
 * payload then we refresh the configuration with it.
 *
 * This function returns 1 if the operation was successfully rescheduled;
 * otherwise it returns 0. If it returns 0 then we give the error back to the
 * user.
 */
static int
handle_nmv(mc_SERVER *oldsrv, packet_info *resinfo, mc_PACKET *oldpkt)
{
    mc_PACKET *newpkt;
    protocol_binary_request_header hdr;
    lcb_error_t err = LCB_ERROR;
    lcb_t instance = oldsrv->instance;
    lcb_U16 vbid;
    clconfig_provider *cccp = lcb_confmon_get_provider(instance->confmon,
        LCB_CLCONFIG_CCCP);

    mcreq_read_hdr(oldpkt, &hdr);
    vbid = ntohs(hdr.request.vbucket);
    lcb_log(LOGARGS(oldsrv, WARN), LOGFMT "NOT_MY_VBUCKET. Packet=%p (S=%u). VBID=%u", LOGID(oldsrv), (void*)oldpkt, oldpkt->opaque, vbid);

    /* Notify of new map */
    lcb_vbguess_remap(instance, vbid, oldsrv->pipeline.index);

    if (PACKET_NBODY(resinfo) && cccp->enabled) {
        lcb_string s;

        lcb_string_init(&s);
        lcb_string_append(&s, PACKET_VALUE(resinfo), PACKET_NVALUE(resinfo));
        err = lcb_cccp_update(cccp, mcserver_get_host(oldsrv), &s);
        lcb_string_release(&s);
    }

    if (err != LCB_SUCCESS) {
        int bs_options;
        if (instance->cur_configinfo->origin == LCB_CLCONFIG_CCCP) {
            /**
             * XXX: Not enough to see if cccp was enabled, since cccp might
             * be requested by a user, but would still not actually be active
             * for clusters < 2.5 If our current config is from CCCP
             * then we can be fairly certain that CCCP is indeed working.
             *
             * For this reason, we don't use if (cccp->enabled) {...}
             */
            bs_options = LCB_BS_REFRESH_THROTTLE;
        } else {
            bs_options = LCB_BS_REFRESH_ALWAYS;
        }
        lcb_bootstrap_common(instance, bs_options);
    }

    if (!lcb_should_retry(oldsrv->settings, oldpkt, LCB_NOT_MY_VBUCKET)) {
        return 0;
    }

    /** Reschedule the packet again .. */
    newpkt = mcreq_renew_packet(oldpkt);
    newpkt->flags &= ~MCREQ_STATE_FLAGS;
    lcb_retryq_nmvadd(instance->retryq, (mc_EXPACKET*)newpkt);
    return 1;
}

#define PKT_READ_COMPLETE 1
#define PKT_READ_PARTIAL 0

/* This function is called within a loop to process a single packet.
 *
 * If a full packet is available, it will process the packet and return
 * PKT_READ_COMPLETE, resulting in the `on_read()` function calling this
 * function in a loop.
 *
 * When a complete packet is not available, PKT_READ_PARTIAL will be returned
 * and the `on_read()` loop will exit, scheduling any required pending I/O.
 */
static int
try_read(lcbio_CTX *ctx, mc_SERVER *server, rdb_IOROPE *ior)
{
    packet_info info_s, *info = &info_s;
    mc_PACKET *request;
    mc_PIPELINE *pl = &server->pipeline;
    unsigned pktsize = 24, is_last = 1;

    #define RETURN_NEED_MORE(n) \
        if (mcserver_has_pending(server)) { \
            lcbio_ctx_rwant(ctx, n); \
        } \
        return PKT_READ_PARTIAL; \

    #define DO_ASSIGN_PAYLOAD() \
        rdb_consumed(ior, sizeof(info->res.bytes)); \
        if (PACKET_NBODY(info)) { \
            info->payload = rdb_get_consolidated(ior, PACKET_NBODY(info)); \
        } {

    #define DO_SWALLOW_PAYLOAD() \
        } if (PACKET_NBODY(info)) { \
            rdb_consumed(ior, PACKET_NBODY(info)); \
        }

    if (rdb_get_nused(ior) < pktsize) {
        RETURN_NEED_MORE(pktsize)
    }

    /* copy bytes into the info structure */
    rdb_copyread(ior, info->res.bytes, sizeof info->res.bytes);

    pktsize += PACKET_NBODY(info);
    if (rdb_get_nused(ior) < pktsize) {
        RETURN_NEED_MORE(pktsize);
    }

    /* Find the packet */
    if (PACKET_OPCODE(info) == PROTOCOL_BINARY_CMD_STAT && PACKET_NKEY(info) != 0) {
        is_last = 0;
        request = mcreq_pipeline_find(pl, PACKET_OPAQUE(info));
    } else {
        is_last = 1;
        request = mcreq_pipeline_remove(pl, PACKET_OPAQUE(info));
    }

    if (!request) {
        lcb_log(LOGARGS(server, WARN), LOGFMT "Found stale packet (OP=0x%x, RC=0x%x, SEQ=%u)", LOGID(server), PACKET_OPCODE(info), PACKET_STATUS(info), PACKET_OPAQUE(info));
        rdb_consumed(ior, pktsize);
        return PKT_READ_COMPLETE;
    }

    if (PACKET_STATUS(info) == PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET) {
        /* consume the header */
        DO_ASSIGN_PAYLOAD()
        if (!handle_nmv(server, info, request)) {
            mcreq_dispatch_response(pl, request, info, LCB_NOT_MY_VBUCKET);
        }
        DO_SWALLOW_PAYLOAD()
        goto GT_DONE;
    }

    /* Figure out if the request is 'ufwd' or not */
    if (!(request->flags & MCREQ_F_UFWD)) {
        DO_ASSIGN_PAYLOAD();
        info->bufh = rdb_get_first_segment(ior);
        mcreq_dispatch_response(pl, request, info, LCB_SUCCESS);
        DO_SWALLOW_PAYLOAD()

    } else {
        /* figure out how many buffers we want to use as an upper limit for the
         * IOV arrays. Currently we'll keep it simple and ensure the entire
         * response is contiguous. */
        lcb_PKTFWDRESP resp = { 0 };
        rdb_ROPESEG *segs;
        nb_IOV iov;

        rdb_consolidate(ior, pktsize);
        rdb_refread_ex(ior, &iov, &segs, 1, pktsize);

        resp.bufs = &segs;
        resp.iovs = (lcb_IOV*)&iov;
        resp.nitems = 1;
        resp.header = info->res.bytes;
        server->instance->callbacks.pktfwd(
            server->instance, MCREQ_PKT_COOKIE(request), LCB_SUCCESS, &resp);
        rdb_consumed(ior, pktsize);
    }

    GT_DONE:
    if (is_last) {
        mcreq_packet_handled(pl, request);
    }
    return PKT_READ_COMPLETE;
}

static void
on_read(lcbio_CTX *ctx, unsigned nb)
{
    int rv;
    mc_SERVER *server = lcbio_ctx_data(ctx);
    rdb_IOROPE *ior = &ctx->ior;

    if (check_closed(server)) {
        return;
    }

    while ((rv = try_read(ctx, server, ior)) == PKT_READ_COMPLETE);
    lcbio_ctx_schedule(ctx);
    lcb_maybe_breakout(server->instance);

    (void)nb;
}

LCB_INTERNAL_API
int
mcserver_has_pending(mc_SERVER *server)
{
    return !SLLIST_IS_EMPTY(&server->pipeline.requests);
}

static void flush_noop(mc_PIPELINE *pipeline) {
    (void)pipeline;
}
static void server_connect(mc_SERVER *server);

typedef enum {
    REFRESH_ALWAYS,
    REFRESH_ONFAILED,
    REFRESH_NEVER
} mc_REFRESHPOLICY;

static int
maybe_retry(mc_PIPELINE *pipeline, mc_PACKET *pkt, lcb_error_t err)
{
    mc_SERVER *srv = (mc_SERVER *)pipeline;
    mc_PACKET *newpkt;
    lcb_t instance = pipeline->parent->cqdata;
    lcbvb_DISTMODE dist_t = lcbvb_get_distmode(pipeline->parent->config);

    if (dist_t != LCBVB_DIST_VBUCKET) {
        /** memcached bucket */
        return 0;
    }
    if (!lcb_should_retry(srv->settings, pkt, err)) {
        return 0;
    }

    newpkt = mcreq_renew_packet(pkt);
    newpkt->flags &= ~MCREQ_STATE_FLAGS;
    lcb_retryq_add(instance->retryq, (mc_EXPACKET *)newpkt, err);
    return 1;
}

static void
fail_callback(mc_PIPELINE *pipeline, mc_PACKET *pkt, lcb_error_t err, void *arg)
{
    int rv;
    mc_SERVER *server = (mc_SERVER *)pipeline;
    packet_info info;
    protocol_binary_request_header hdr;
    protocol_binary_response_header *res = &info.res;

    if (maybe_retry(pipeline, pkt, err)) {
        return;
    }

    if (err == LCB_AUTH_ERROR) {
        /* In-situ auth errors are actually dead servers. Let's provide this
         * as the actual error code. */
        err = LCB_MAP_CHANGED;
    }

    if (err == LCB_ETIMEDOUT) {
        lcb_error_t tmperr = lcb_retryq_origerr(pkt);
        if (tmperr != LCB_SUCCESS) {
            err = tmperr;
        }
    }

    memset(&info, 0, sizeof(info));
    memcpy(hdr.bytes, SPAN_BUFFER(&pkt->kh_span), sizeof(hdr.bytes));

    res->response.status = ntohs(PROTOCOL_BINARY_RESPONSE_EINVAL);
    res->response.opcode = hdr.request.opcode;
    res->response.opaque = hdr.request.opaque;

    lcb_log(LOGARGS(server, WARN), LOGFMT "Failing command (pkt=%p, opaque=%lu, opcode=0x%x) with error 0x%x", LOGID(server), (void*)pkt, (unsigned long)pkt->opaque, hdr.request.opcode, err);
    rv = mcreq_dispatch_response(pipeline, pkt, &info, err);
    lcb_assert(rv == 0);
    (void)arg;
}

static int
purge_single_server(mc_SERVER *server, lcb_error_t error,
                    hrtime_t thresh, hrtime_t *next, int policy)
{
    unsigned affected;
    mc_PIPELINE *pl = &server->pipeline;

    if (thresh) {
        affected = mcreq_pipeline_timeout(
                pl, error, fail_callback, NULL, thresh, next);

    } else {
        mcreq_pipeline_fail(pl, error, fail_callback, NULL);
        affected = -1;
    }

    if (policy == REFRESH_NEVER) {
        return affected;
    }

    if (affected || policy == REFRESH_ALWAYS) {
        lcb_bootstrap_common(server->instance,
            LCB_BS_REFRESH_THROTTLE|LCB_BS_REFRESH_INCRERR);
    }
    return affected;
}

static void flush_errdrain(mc_PIPELINE *pipeline)
{
    /* Called when we are draining errors. */
    mc_SERVER *server = (mc_SERVER *)pipeline;
    if (!lcbio_timer_armed(server->io_timer)) {
        lcbio_timer_rearm(server->io_timer, MCSERVER_TIMEOUT(server));
    }
}

void
mcserver_fail_chain(mc_SERVER *server, lcb_error_t err)
{
    purge_single_server(server, err, 0, NULL, REFRESH_NEVER);
}


static uint32_t
get_next_timeout(mc_SERVER *server)
{
    hrtime_t now, expiry, diff;
    mc_PACKET *pkt = mcreq_first_packet(&server->pipeline);

    if (!pkt) {
        return MCSERVER_TIMEOUT(server);
    }

    now = gethrtime();
    expiry = MCREQ_PKT_RDATA(pkt)->start + LCB_US2NS(MCSERVER_TIMEOUT(server));
    if (expiry <= now) {
        diff = 0;
    } else {
        diff = expiry - now;
    }

    return LCB_NS2US(diff);
}

static void
timeout_server(void *arg)
{
    mc_SERVER *server = arg;
    hrtime_t now, min_valid, next_ns = 0;
    uint32_t next_us;
    int npurged;

    now = gethrtime();
    min_valid = now - LCB_US2NS(MCSERVER_TIMEOUT(server));
    npurged = purge_single_server(server,
        LCB_ETIMEDOUT, min_valid, &next_ns, REFRESH_ONFAILED);
    if (npurged) {
        lcb_log(LOGARGS(server, ERROR), LOGFMT "Server timed out. Some commands have failed", LOGID(server));
    }

    next_us = get_next_timeout(server);
    lcb_log(LOGARGS(server, DEBUG), LOGFMT "Scheduling next timeout for %u ms", LOGID(server), next_us / 1000);
    lcbio_timer_rearm(server->io_timer, next_us);
    lcb_maybe_breakout(server->instance);
}

static int
maybe_retry_timeout_connection(mc_SERVER *server, lcb_error_t err)
{
    lcb_U32 next_tmo;
    if (err != LCB_ETIMEDOUT) {
        return 0; /* not a timeout */
    }
    if (!server->settings->readj_ts_wait) {
        return 0; /* normal timeout behavior */
    }
    if (!mcserver_has_pending(server)) {
        return 0; /* nothing pending */
    }

    next_tmo = get_next_timeout(server);
    if (next_tmo < MCSERVER_TIMEOUT(server) / 2) {
        /* Ideally we'd have a fuzz interval to shave off the actual timeout,
         * since there will inevitably be some time taken off the next timeout */
        return 0;
    }

    lcb_log(LOGARGS(server, INFO), LOGFMT "Retrying connection. Assuming timeout because of stalled event loop", LOGID(server));
    server_connect(server);
    return 1;
}

static void
on_connected(lcbio_SOCKET *sock, void *data, lcb_error_t err, lcbio_OSERR syserr)
{
    mc_SERVER *server = data;
    lcbio_CTXPROCS procs;
    uint32_t tmo;
    mc_pSESSINFO sessinfo = NULL;
    LCBIO_CONNREQ_CLEAR(&server->connreq);

    if (err != LCB_SUCCESS) {
        lcb_log(LOGARGS(server, ERR), LOGFMT "Got error for connection! (OS=%d)", LOGID(server), syserr);
        if (maybe_retry_timeout_connection(server, err)) {
            return;
        }
        server_socket_failed(server, err);
        return;
    }

    lcb_assert(sock);

    /** Do we need sasl? */
    sessinfo = mc_sess_get(sock);
    if (sessinfo == NULL) {
        mc_pSESSREQ sreq;
        lcb_log(LOGARGS(server, TRACE), "<%s:%s> (SRV=%p) Session not yet negotiated. Negotiating", server->curhost->host, server->curhost->port, (void*)server);
        sreq = mc_sessreq_start(sock, server->settings, MCSERVER_TIMEOUT(server),
            on_connected, data);
        LCBIO_CONNREQ_MKGENERIC(&server->connreq, sreq, mc_sessreq_cancel);
        return;
    } else {
        server->compsupport = mc_sess_chkfeature(sessinfo,
            PROTOCOL_BINARY_FEATURE_DATATYPE);
        server->mutation_tokens = mc_sess_chkfeature(sessinfo,
            PROTOCOL_BINARY_FEATURE_MUTATION_SEQNO);
    }

    procs.cb_err = on_error;
    procs.cb_read = on_read;
    procs.cb_flush_done = on_flush_done;
    procs.cb_flush_ready = on_flush_ready;
    server->connctx = lcbio_ctx_new(sock, server, &procs);
    server->connctx->subsys = "memcached";
    server->pipeline.flush_start = (mcreq_flushstart_fn)mcserver_flush;

    tmo = get_next_timeout(server);
    lcb_log(LOGARGS(server, DEBUG), LOGFMT "Setting initial timeout=%ums", LOGID(server), tmo/1000);
    lcbio_timer_rearm(server->io_timer, get_next_timeout(server));
    mcserver_flush(server);
}

static void
server_connect(mc_SERVER *server)
{
    lcbio_pMGRREQ mr;
    mr = lcbio_mgr_get(server->instance->memd_sockpool, server->curhost,
                       MCSERVER_TIMEOUT(server), on_connected, server);
    LCBIO_CONNREQ_MKPOOLED(&server->connreq, mr);
    server->pipeline.flush_start = flush_noop;
    server->state = S_CLEAN;
}

static void
buf_done_cb(mc_PIPELINE *pl, const void *cookie, void *kbuf, void *vbuf)
{
    mc_SERVER *server = (mc_SERVER*)pl;
    server->instance->callbacks.pktflushed(server->instance, cookie);
    (void)kbuf; (void)vbuf;
}

mc_SERVER *
mcserver_alloc(lcb_t instance, int ix)
{
    mc_SERVER *ret;
    lcbvb_SVCMODE mode;
    const char *datahost;

    ret = calloc(1, sizeof(*ret));
    if (!ret) {
        return ret;
    }

    ret->instance = instance;
    ret->settings = instance->settings;
    ret->curhost = calloc(1, sizeof(*ret->curhost));
    mode = ret->settings->sslopts & LCB_SSL_ENABLED
            ? LCBVB_SVCMODE_SSL : LCBVB_SVCMODE_PLAIN;

    lcb_settings_ref(ret->settings);
    mcreq_pipeline_init(&ret->pipeline);
    ret->pipeline.flush_start = (mcreq_flushstart_fn)server_connect;
    ret->pipeline.buf_done_callback = buf_done_cb;

    datahost = lcbvb_get_hostport(
        LCBT_VBCONFIG(instance), ix, LCBVB_SVCTYPE_DATA, mode);
    if (datahost) {
        lcb_host_parsez(ret->curhost, datahost, LCB_CONFIG_MCD_PORT);
    }
    ret->io_timer = lcbio_timer_new(instance->iotable, ret, timeout_server);
    return ret;
}

static void
server_free(mc_SERVER *server)
{
    mcreq_pipeline_cleanup(&server->pipeline);

    if (server->io_timer) {
        lcbio_timer_destroy(server->io_timer);
    }

    free(server->curhost);
    lcb_settings_unref(server->settings);
    free(server);
}

static void
close_cb(lcbio_SOCKET *sock, int reusable, void *arg)
{
    lcbio_ref(sock);
    lcbio_mgr_discard(sock);
    (void)reusable;(void)arg;
}


/**Marks any unflushed data inside this server as being already flushed. This
 * should be done within error handling. If subsequent data is flushed on this
 * pipeline to the same connection, the results are undefined. */
static void
release_unflushed_packets(mc_SERVER *server)
{
    unsigned toflush;
    nb_IOV iov;
    mc_PIPELINE *pl = &server->pipeline;
    while ((toflush = mcreq_flush_iov_fill(pl, &iov, 1, NULL))) {
        mcreq_flush_done(pl, toflush, toflush);
    }
}

static void
on_error(lcbio_CTX *ctx, lcb_error_t err)
{
    mc_SERVER *server = lcbio_ctx_data(ctx);
    lcb_log(LOGARGS(server, WARN), LOGFMT "Got socket error 0x%x", LOGID(server), err);
    if (check_closed(server)) {
        return;
    }
    server_socket_failed(server, err);
}

/**Handle a socket error. This function will close the current connection
 * and trigger a failout of any pending commands.
 * This function triggers a configuration refresh */
static void
server_socket_failed(mc_SERVER *server, lcb_error_t err)
{
    if (check_closed(server)) {
        return;
    }

    purge_single_server(server, err, 0, NULL, REFRESH_ALWAYS);
    lcb_maybe_breakout(server->instance);
    start_errored_ctx(server, S_ERRDRAIN);
}

void
mcserver_close(mc_SERVER *server)
{
    /* Should never be called twice */
    lcb_assert(server->state != S_CLOSED);
    start_errored_ctx(server, S_CLOSED);
}

/**
 * Call to signal an error or similar on the current socket.
 * @param server The server
 * @param next_state The next state (S_CLOSED or S_ERRDRAIN)
 */
static void
start_errored_ctx(mc_SERVER *server, mcserver_STATE next_state)
{
    lcbio_CTX *ctx = server->connctx;

    server->state = next_state;
    /* Cancel any pending connection attempt? */
    lcbio_connreq_cancel(&server->connreq);

    /* If the server is being destroyed, silence the timer */
    if (next_state == S_CLOSED && server->io_timer != NULL) {
        lcbio_timer_destroy(server->io_timer);
        server->io_timer = NULL;
    }

    if (ctx == NULL) {
        if (next_state == S_CLOSED) {
            server_free(server);
            return;
        } else {
            /* Not closed but don't have a current context */
            server->pipeline.flush_start = (mcreq_flushstart_fn)server_connect;
            if (mcserver_has_pending(server)) {
                if (!lcbio_timer_armed(server->io_timer)) {
                    /* TODO: Maybe throttle reconnection attempts? */
                    lcbio_timer_rearm(server->io_timer, MCSERVER_TIMEOUT(server));
                }
                server_connect(server);
            }
        }

    } else {
        if (ctx->npending) {
            /* Have pending items? */

            /* Flush any remaining events */
            lcbio_ctx_schedule(ctx);

            /* Close the socket not to leak resources */
            lcbio_shutdown(lcbio_ctx_sock(ctx));
            if (next_state == S_ERRDRAIN) {
                server->pipeline.flush_start = (mcreq_flushstart_fn)flush_errdrain;
            }
        } else {
            finalize_errored_ctx(server);
        }
    }
}

/**
 * This function actually finalizes a ctx which has an error on it. If the
 * ctx has pending operations remaining then this function returns immediately.
 * Otherwise this will either reinitialize the connection or free the server
 * object depending on the actual object state (i.e. if it was closed or
 * simply errored).
 */
static void
finalize_errored_ctx(mc_SERVER *server)
{
    if (server->connctx->npending) {
        return;
    }

    lcb_log(LOGARGS(server, DEBUG), LOGFMT "Finalizing ctx %p", LOGID(server), (void*)server->connctx);

    /* Always close the existing context. */
    lcbio_ctx_close(server->connctx, close_cb, NULL);
    server->connctx = NULL;

    /* And pretend to flush any outstanding data. There's nothing pending! */
    release_unflushed_packets(server);

    if (server->state == S_CLOSED) {
        /* If the server is closed, time to free it */
        server_free(server);
    } else {
        /* Otherwise, cycle the state back to CLEAN and reinit
         * the connection */
        server->state = S_CLEAN;
        server->pipeline.flush_start = (mcreq_flushstart_fn)server_connect;
        server_connect(server);
    }
}

/**
 * This little function checks to see if the server struct is still valid, or
 * whether it should just be cleaned once no pending I/O remainds.
 *
 * If this function returns false then the server is still valid; otherwise it
 * is invalid and must not be used further.
 */
static int
check_closed(mc_SERVER *server)
{
    if (server->state == S_CLEAN) {
        return 0;
    }
    lcb_log(LOGARGS(server, INFO), LOGFMT "Got handler after close. Checking pending calls (pending=%u)", LOGID(server), server->connctx->npending);
    finalize_errored_ctx(server);
    return 1;
}