#include <stdlib.h>
#include <stddef.h>
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include "rope.h"
#define MINIMUM(a, b) (a) < (b) ? a : b
#define SEG_RELEASE(seg) (seg)->allocator->s_release((seg)->allocator, seg)
#define SEG_REALLOC(seg, n) (seg)->allocator->s_realloc((seg)->allocator, seg, n)
#define ROPE_SALLOC(rope, n) (rope)->allocator->s_alloc((rope)->allocator, n)
static void wipe_rope(rdb_ROPEBUF *rope);
unsigned
rdb_rdstart(rdb_IOROPE *ior, nb_IOV *iov, unsigned niov)
{
unsigned orig_niov = niov;
unsigned cur_rdsize = 0;
lcb_list_t *ll;
rdb_ROPESEG *seg = RDB_SEG_LAST(&ior->recvd);
if (seg && RDB_SEG_SPACE(seg)) {
iov->iov_base = RDB_SEG_WBUF(seg);
iov->iov_len = RDB_SEG_SPACE(seg);
cur_rdsize += iov->iov_len;
++iov;
--niov;
if (cur_rdsize >= ior->rdsize) {
return 1;
}
}
if (!niov) {
return orig_niov - niov;
}
ior->avail.allocator->r_reserve(
ior->avail.allocator, &ior->avail, ior->rdsize - cur_rdsize);
assert(!LCB_LIST_IS_EMPTY(&ior->avail.segments));
LCB_LIST_FOR(ll, &ior->avail.segments) {
rdb_ROPESEG *cur = LCB_LIST_ITEM(ll, rdb_ROPESEG, llnode);
iov->iov_base = RDB_SEG_WBUF(cur);
iov->iov_len = RDB_SEG_SPACE(cur);
++iov;
if (!--niov) {
break;
}
}
return orig_niov - niov;
}
void
rdb_rdend(rdb_IOROPE *ior, unsigned nr)
{
unsigned to_chop;
lcb_list_t *llcur, *llnext;
rdb_ROPESEG *seg = RDB_SEG_LAST(&ior->recvd);
if (seg && RDB_SEG_SPACE(seg)) {
to_chop = MINIMUM(nr, RDB_SEG_SPACE(seg));
seg->nused += to_chop;
ior->recvd.nused += to_chop;
if (! (nr -= to_chop)) {
wipe_rope(&ior->avail);
return;
}
}
LCB_LIST_SAFE_FOR(llcur, llnext, &ior->avail.segments) {
seg = LCB_LIST_ITEM(llcur, rdb_ROPESEG, llnode);
to_chop = MINIMUM(nr, RDB_SEG_SPACE(seg));
seg->nused += to_chop;
ior->recvd.nused += seg->nused;
lcb_list_delete(&seg->llnode);
lcb_list_append(&ior->recvd.segments, &seg->llnode);
if (! (nr -= to_chop)) {
wipe_rope(&ior->avail);
return;
}
}
fprintf(stderr, "RDB: Tried to consume more than available in the buffer (n=%u)\n", nr);
assert(0);
}
static void
seg_consumed(rdb_ROPEBUF *rope, rdb_ROPESEG *seg, unsigned nr)
{
assert(nr <= seg->nused);
seg->nused -= nr;
seg->start += nr;
rope->nused -= nr;
if (!seg->nused) {
lcb_list_delete(&seg->llnode);
seg->shflags &= ~RDB_ROPESEG_F_LIB;
if (rdb_seg_recyclable(seg)) {
SEG_RELEASE(seg);
}
}
}
static void
rope_consumed(rdb_ROPEBUF *rope, unsigned nr)
{
lcb_list_t *llcur, *llnext;
assert(nr <= rope->nused);
LCB_LIST_SAFE_FOR(llcur, llnext, &rope->segments) {
unsigned to_chop;
rdb_ROPESEG *seg = LCB_LIST_ITEM(llcur, rdb_ROPESEG, llnode);
to_chop = MINIMUM(nr, seg->nused);
seg_consumed(rope, seg, to_chop);
if (! (nr -= to_chop)) {
break;
}
}
}
void
rdb_consumed(rdb_IOROPE *ior, unsigned nr)
{
rope_consumed(&ior->recvd, nr);
}
static void
try_compact(rdb_ROPESEG *seg)
{
char *cp_end;
if (!rdb_seg_recyclable(seg)) {
return;
}
if (seg->start < seg->nalloc / 2) {
return;
}
cp_end = seg->root + seg->nused;
if (seg->root + seg->start < cp_end) {
return;
}
memcpy(seg->root, seg->root + seg->start, seg->nused);
seg->start = 0;
}
static void
rope_consolidate(rdb_ROPEBUF *rope, unsigned nr)
{
rdb_ROPESEG *seg, *newseg;
lcb_list_t *llcur, *llnext;
seg = RDB_SEG_FIRST(rope);
if (seg->nused + RDB_SEG_SPACE(seg) >= nr || nr < 2) {
return;
}
try_compact(seg);
lcb_list_delete(&seg->llnode);
if (rdb_seg_recyclable(seg)) {
unsigned to_alloc = nr + seg->start;
newseg = SEG_REALLOC(seg, to_alloc);
} else {
newseg = ROPE_SALLOC(rope, nr);
memcpy(RDB_SEG_WBUF(newseg), RDB_SEG_RBUF(seg), seg->nused);
newseg->nused = seg->nused;
seg->shflags &= ~RDB_ROPESEG_F_LIB;
}
rope->nused -= newseg->nused;
nr -= newseg->nused;
LCB_LIST_SAFE_FOR(llcur, llnext, &rope->segments) {
unsigned to_copy;
seg = LCB_LIST_ITEM(llcur, rdb_ROPESEG, llnode);
to_copy = MINIMUM(nr, seg->nused);
memcpy(RDB_SEG_WBUF(newseg), RDB_SEG_RBUF(seg), to_copy);
newseg->nused += to_copy;
seg_consumed(rope, seg, to_copy);
if (! (nr -= to_copy)) {
break;
}
}
lcb_list_prepend(&rope->segments, &newseg->llnode);
rope->nused += newseg->nused;
assert(rope->nused >= nr);
}
void
rdb_consolidate(rdb_IOROPE *ior, unsigned nr)
{
rope_consolidate(&ior->recvd, nr);
}
void
rdb_copyread(rdb_IOROPE *ior, void *tgt, unsigned n)
{
lcb_list_t *ll;
char *p = tgt;
LCB_LIST_FOR(ll, &ior->recvd.segments) {
rdb_ROPESEG *seg = LCB_LIST_ITEM(ll, rdb_ROPESEG, llnode);
unsigned to_copy = MINIMUM(seg->nused, n);
memcpy(p, RDB_SEG_RBUF(seg), to_copy);
p += to_copy;
n -= to_copy;
if (!n) {
break;
}
}
}
int
rdb_refread_ex(rdb_IOROPE *ior, nb_IOV *iov, rdb_ROPESEG **segs,
unsigned nelem, unsigned ndata)
{
unsigned orig_nelem = nelem;
lcb_list_t *ll;
LCB_LIST_FOR(ll, &ior->recvd.segments) {
rdb_ROPESEG *seg = LCB_LIST_ITEM(ll, rdb_ROPESEG, llnode);
unsigned cur_len = MINIMUM(ndata, seg->nused);
iov->iov_len = cur_len;
iov->iov_base = RDB_SEG_RBUF(seg);
*segs = seg;
++iov;
++segs;
--nelem;
ndata -= cur_len;
if (!ndata) {
return orig_nelem - nelem;
}
if (!nelem) {
return -1;
}
}
fprintf(stderr, "RDB: refread_ex was passed a size greater than our buffer (n=%u)\n", ndata);
return -1;
}
unsigned
rdb_get_contigsize(rdb_IOROPE *ior)
{
rdb_ROPESEG *seg = RDB_SEG_FIRST(&ior->recvd);
if (!seg) {
return 0;
}
return seg->nused;
}
char *
rdb_get_consolidated(rdb_IOROPE *ior, unsigned n)
{
assert(ior->recvd.nused >= n);
rdb_consolidate(ior, n);
return RDB_SEG_RBUF(RDB_SEG_FIRST(&ior->recvd));
}
void
rdb_seg_ref(rdb_ROPESEG *seg)
{
seg->refcnt++;
seg->shflags |= RDB_ROPESEG_F_USER;
}
void
rdb_seg_unref(rdb_ROPESEG *seg)
{
if (--seg->refcnt) {
return;
}
seg->shflags &= ~RDB_ROPESEG_F_USER;
if (seg->shflags & RDB_ROPESEG_F_LIB) {
return;
}
SEG_RELEASE(seg);
}
void
rdb_init(rdb_IOROPE *ior, rdb_ALLOCATOR *alloc)
{
memset(ior, 0, sizeof(*ior));
lcb_list_init(&ior->recvd.segments);
lcb_list_init(&ior->avail.segments);
rdb_challoc(ior, alloc);
ior->rdsize = 32768;
}
static void
wipe_rope(rdb_ROPEBUF *rope)
{
lcb_list_t *llcur, *llnext;
LCB_LIST_SAFE_FOR(llcur, llnext, &rope->segments) {
rdb_ROPESEG *seg = LCB_LIST_ITEM(llcur, rdb_ROPESEG, llnode);
seg_consumed(rope, seg, seg->nused);
}
}
void
rdb_cleanup(rdb_IOROPE *ior)
{
wipe_rope(&ior->recvd);
wipe_rope(&ior->avail);
ior->recvd.allocator->a_release(ior->recvd.allocator);
}
void
rdb_challoc(rdb_IOROPE *ior, rdb_ALLOCATOR *alloc)
{
if (ior->recvd.allocator) {
ior->recvd.allocator->a_release(ior->recvd.allocator);
}
ior->recvd.allocator = alloc;
ior->avail.allocator = alloc;
}
void
rdb_copywrite(rdb_IOROPE *ior, void *buf, unsigned nbuf)
{
char *cur = buf;
while (nbuf) {
unsigned ii;
unsigned orig_nbuf = nbuf;
nb_IOV iov[32];
unsigned niov;
niov = rdb_rdstart(ior, iov, 32);
for (ii = 0; ii < niov && nbuf; ii++) {
unsigned to_copy = MINIMUM(nbuf, iov[ii].iov_len);
memcpy(iov[ii].iov_base, cur, to_copy);
cur += to_copy;
nbuf -= to_copy;
}
rdb_rdend(ior, orig_nbuf - nbuf);
}
}
static void
dump_ropebuf(const rdb_ROPEBUF *buf, FILE *fp)
{
lcb_list_t *llcur;
fprintf(fp, "TOTAL LENGTH: %u\n", buf->nused);
fprintf(fp, "WILL DUMP SEGMENTS..\n");
LCB_LIST_FOR(llcur, &buf->segments) {
const char *indent = " ";
rdb_ROPESEG *seg = LCB_LIST_ITEM(llcur, rdb_ROPESEG, llnode);
fprintf(fp, "%sSEG=%p\n", indent, (void*)seg);
fprintf(fp, "%sALLOCATOR=%p [%u]\n", indent, (void*)seg->allocator, seg->allocid);
fprintf(fp, "%sBUFROOT=%p\n", indent, seg->root);
fprintf(fp, "%sALLOC SIZE: %u\n", indent, seg->nalloc);
fprintf(fp, "%sDATA SIZE: %u\n", indent, seg->nused);
fprintf(fp, "%sDATS OFFSET: %u\n", indent, seg->start);
fprintf(fp, "%sSEG FLAGS: 0x%x\n", indent, seg->shflags);
fprintf(fp, "%sSEG REFCNT: %u\n", indent, seg->refcnt);
fprintf(fp, "\n");
}
}
void
rdb_dump(const rdb_IOROPE *ior, FILE *fp)
{
fprintf(fp, "@@ DUMP IOROPE=%p\n", (void*)ior);
fprintf(fp, "@@ ROPEBUF[AVAIL]=%p\n", (void*)&ior->avail);
dump_ropebuf(&ior->avail, fp);
fprintf(fp, "@@ ROPEBUF[ACTIVE]=%p\n", (void*)&ior->recvd);
dump_ropebuf(&ior->recvd, fp);
if (ior->avail.allocator && ior->avail.allocator->dump) {
ior->avail.allocator->dump(ior->avail.allocator, fp);
}
}