#include "plugin-internal.h"
static my_uvreq_t *alloc_uvreq(my_sockdata_t *sock, generic_callback_t callback);
static void set_last_error(my_iops_t *io, int error);
static void socket_closed_callback(uv_handle_t *handle);
static void wire_iops2(int version,
lcb_loop_procs *loop,
lcb_timer_procs *timer,
lcb_bsd_procs *bsd,
lcb_ev_procs *ev,
lcb_completion_procs *iocp,
lcb_iomodel_t *model);
static void decref_iops(lcb_io_opt_t iobase)
{
my_iops_t *io = (my_iops_t *)iobase;
lcb_assert(io->iops_refcount);
if (--io->iops_refcount) {
return;
}
memset(io, 0xff, sizeof(*io));
free(io);
}
static void iops_lcb_dtor(lcb_io_opt_t iobase)
{
my_iops_t *io = (my_iops_t *)iobase;
if (io->startstop_noop) {
decref_iops(iobase);
return;
}
while (io->iops_refcount > 1) {
UVC_RUN_ONCE(io->loop);
}
if (io->external_loop == 0) {
uv_loop_delete(io->loop);
}
decref_iops(iobase);
}
#if UV_VERSION < 0x000900
static void do_run_loop(my_iops_t *io)
{
while (uv_run_once(io->loop) && io->do_stop == 0) {
}
io->do_stop = 0;
}
static void do_stop_loop(my_iops_t *io)
{
io->do_stop = 1;
}
#else
static void do_run_loop(my_iops_t *io) {
uv_run(io->loop, UV_RUN_DEFAULT);
}
static void do_stop_loop(my_iops_t *io)
{
uv_stop(io->loop);
}
#endif
static void run_event_loop(lcb_io_opt_t iobase)
{
my_iops_t *io = (my_iops_t *)iobase;
if (!io->startstop_noop) {
do_run_loop(io);
}
}
static void tick_event_loop(lcb_io_opt_t iobase)
{
my_iops_t *io = (my_iops_t *)iobase;
if (!io->startstop_noop) {
#if UV_VERSION < 0x000900
uv_run_once(io->loop);
io->do_stop = 0;
#else
uv_run(io->loop, UV_RUN_NOWAIT);
#endif
}
}
static void stop_event_loop(lcb_io_opt_t iobase)
{
my_iops_t *io = (my_iops_t *)iobase;
if (!io->startstop_noop) {
do_stop_loop(io);
}
}
LCBUV_API
lcb_error_t lcb_create_libuv_io_opts(int version,
lcb_io_opt_t *io,
lcbuv_options_t *options)
{
lcb_io_opt_t iop;
uv_loop_t *loop = NULL;
my_iops_t *ret;
if (version != 0) {
return LCB_PLUGIN_VERSION_MISMATCH;
}
#ifdef _WIN32
{
HMODULE module;
static int dummy;
BOOL result;
result = GetModuleHandleEx(GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS |
GET_MODULE_HANDLE_EX_FLAG_PIN,
(LPCSTR)&dummy,
&module);
if (!result) {
return LCB_EINTERNAL;
}
}
#endif
ret = calloc(1, sizeof(*ret));
if (!ret) {
return LCB_CLIENT_ENOMEM;
}
iop = &ret->base;
iop->version = 2;
iop->destructor = iops_lcb_dtor;
iop->v.v2.get_procs = wire_iops2;
ret->iops_refcount = 1;
*io = iop;
if (options) {
if (options->v.v0.loop) {
ret->external_loop = 1;
loop = options->v.v0.loop;
}
ret->startstop_noop = options->v.v0.startsop_noop;
}
if (!loop) {
loop = uv_loop_new();
}
ret->loop = loop;
return LCB_SUCCESS;
}
#define SOCK_INCR_PENDING(s, fld) (s)->pending.fld++
#define SOCK_DECR_PENDING(s, fld) (s)->pending.fld--
#ifdef DEBUG
static void sock_dump_pending(my_sockdata_t *sock)
{
printf("Socket %p:\n", (void *)sock);
printf("\tRead: %d\n", sock->pending.read);
printf("\tWrite: %d\n", sock->pending.write);
}
#endif
static void sock_do_uv_close(my_sockdata_t *sock)
{
if (!sock->uv_close_called) {
sock->uv_close_called = 1;
uv_close((uv_handle_t *)&sock->tcp, socket_closed_callback);
}
}
static void decref_sock(my_sockdata_t *sock)
{
lcb_assert(sock->refcount);
if (--sock->refcount) {
return;
}
sock_do_uv_close(sock);
}
#define incref_sock(sd) (sd)->refcount++
static lcb_sockdata_t *create_socket(lcb_io_opt_t iobase,
int domain,
int type,
int protocol)
{
my_sockdata_t *ret;
my_iops_t *io = (my_iops_t *)iobase;
ret = calloc(1, sizeof(*ret));
if (!ret) {
return NULL;
}
uv_tcp_init(io->loop, &ret->tcp.t);
incref_iops(io);
incref_sock(ret);
set_last_error(io, 0);
(void)domain;
(void)type;
(void)protocol;
return (lcb_sockdata_t *)ret;
}
static void socket_closed_callback(uv_handle_t *handle)
{
my_sockdata_t *sock = PTR_FROM_FIELD(my_sockdata_t, handle, tcp);
my_iops_t *io = (my_iops_t *)sock->base.parent;
if (sock->pending.read) {
CbREQ(&sock->tcp)(&sock->base, -1, sock->rdarg);
}
memset(sock, 0xEE, sizeof(*sock));
free(sock);
decref_iops(&io->base);
}
static unsigned int close_socket(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase)
{
my_sockdata_t *sock = (my_sockdata_t *)sockbase;
sock->uv_close_called = 1;
uv_close((uv_handle_t *)&sock->tcp, socket_closed_callback);
(void)iobase;
return 0;
}
static int cntl_socket(lcb_io_opt_t iobase, lcb_sockdata_t *sockbase,
int mode, int option, void *arg)
{
my_sockdata_t *sd = (my_sockdata_t *)sockbase;
int rv;
switch (option) {
case LCB_IO_CNTL_TCP_NODELAY:
if (mode == LCB_IO_CNTL_SET) {
rv = uv_tcp_nodelay(&sd->tcp.t, *(int *)arg);
if (rv != 0) {
set_last_error((my_iops_t*)iobase, rv);
}
return rv;
} else {
LCB_IOPS_ERRNO(iobase) = ENOTSUP;
return -1;
}
default:
LCB_IOPS_ERRNO(iobase) = ENOTSUP;
return -1;
}
}
static void connect_callback(uv_connect_t *req, int status)
{
my_uvreq_t *uvr = (my_uvreq_t *)req;
set_last_error((my_iops_t *)uvr->socket->base.parent, status);
if (uvr->cb.conn) {
uvr->cb.conn(&uvr->socket->base, status);
}
decref_sock(uvr->socket);
free(uvr);
}
static int start_connect(lcb_io_opt_t iobase,
lcb_sockdata_t *sockbase,
const struct sockaddr *name,
unsigned int namelen,
lcb_io_connect_cb callback)
{
my_sockdata_t *sock = (my_sockdata_t *)sockbase;
my_iops_t *io = (my_iops_t *)iobase;
my_uvreq_t *uvr;
int ret;
int err_is_set = 0;
uvr = alloc_uvreq(sock, (generic_callback_t)callback);
if (!uvr) {
return -1;
}
if (namelen == sizeof(struct sockaddr_in)) {
ret = UVC_TCP_CONNECT(&uvr->uvreq.conn,
&sock->tcp.t,
name,
connect_callback);
} else if (namelen == sizeof(struct sockaddr_in6)) {
ret = UVC_TCP_CONNECT6(&uvr->uvreq.conn,
&sock->tcp.t,
name,
connect_callback);
} else {
io->base.v.v1.error = EINVAL;
ret = -1;
err_is_set = 1;
}
if (ret) {
if (!err_is_set) {
set_last_error(io, ret);
}
free(uvr);
} else {
incref_sock(sock);
}
return ret;
}
static void write2_callback(uv_write_t *req, int status)
{
my_write_t *mw = (my_write_t *)req;
my_sockdata_t *sock = mw->sock;
if (status != 0) {
set_last_error((my_iops_t *)sock->base.parent, status);
}
mw->callback(&sock->base, status, mw->w.data);
free(mw);
}
static int start_write2(lcb_io_opt_t iobase,
lcb_sockdata_t *sockbase,
struct lcb_iovec_st *iov,
lcb_size_t niov,
void *uarg,
lcb_ioC_write2_callback callback)
{
my_write_t *w;
my_sockdata_t *sd = (my_sockdata_t *)sockbase;
int ret;
w = calloc(1, sizeof(*w));
w->w.data = uarg;
w->callback = callback;
w->sock = sd;
ret = uv_write(&w->w, (uv_stream_t *)&sd->tcp,
(uv_buf_t *)iov,
niov,
write2_callback);
if (ret != 0) {
free(w);
set_last_error((my_iops_t *)iobase, -1);
}
return ret;
}
static UVC_ALLOC_CB(alloc_cb)
{
UVC_ALLOC_CB_VARS()
my_sockdata_t *sock = PTR_FROM_FIELD(my_sockdata_t, handle, tcp);
buf->base = sock->iov.iov_base;
buf->len = sock->iov.iov_len;
(void)suggested_size;
UVC_ALLOC_CB_RETURN();
}
static UVC_READ_CB(read_cb)
{
UVC_READ_CB_VARS()
my_tcp_t *mt = (my_tcp_t *)stream;
my_sockdata_t *sock = PTR_FROM_FIELD(my_sockdata_t, mt, tcp);
my_iops_t *io = (my_iops_t *)sock->base.parent;
lcb_ioC_read2_callback callback = CbREQ(mt);
if (nread == 0) {
return;
}
SOCK_DECR_PENDING(sock, read);
uv_read_stop(stream);
CbREQ(mt) = NULL;
if (nread < 0) {
set_last_error(io, uvc_last_errno(io->loop, nread));
if (uvc_is_eof(io->loop, nread)) {
nread = 0;
}
}
callback(&sock->base, nread, sock->rdarg);
decref_sock(sock);
(void)buf;
}
static int start_read(lcb_io_opt_t iobase,
lcb_sockdata_t *sockbase,
lcb_IOV *iov,
lcb_size_t niov,
void *uarg,
lcb_ioC_read2_callback callback)
{
my_sockdata_t *sock = (my_sockdata_t *)sockbase;
my_iops_t *io = (my_iops_t *)iobase;
int ret;
sock->iov = *iov;
sock->rdarg = uarg;
sock->tcp.callback = callback;
ret = uv_read_start((uv_stream_t *)&sock->tcp.t, alloc_cb, read_cb);
set_last_error(io, ret);
if (ret == 0) {
SOCK_INCR_PENDING(sock, read);
incref_sock(sock);
}
return ret;
}
static int get_nameinfo(lcb_io_opt_t iobase,
lcb_sockdata_t *sockbase,
struct lcb_nameinfo_st *ni)
{
my_sockdata_t *sock = (my_sockdata_t *)sockbase;
my_iops_t *io = (my_iops_t *)iobase;
uv_tcp_getpeername(&sock->tcp.t, ni->remote.name, ni->remote.len);
uv_tcp_getsockname(&sock->tcp.t, ni->local.name, ni->local.len);
(void)io;
return 0;
}
static UVC_TIMER_CB(timer_cb)
{
my_timer_t *mytimer = (my_timer_t *)timer;
if (mytimer->callback) {
mytimer->callback(-1, 0, mytimer->cb_arg);
}
}
static void *create_timer(lcb_io_opt_t iobase)
{
my_iops_t *io = (my_iops_t *)iobase;
my_timer_t *timer = calloc(1, sizeof(*timer));
if (!timer) {
return NULL;
}
timer->parent = io;
incref_iops(io);
uv_timer_init(io->loop, &timer->uvt);
return timer;
}
static int update_timer(lcb_io_opt_t iobase,
void *timer_opaque,
lcb_uint32_t usec,
void *cbdata,
v0_callback_t callback)
{
my_timer_t *timer = (my_timer_t *)timer_opaque;
timer->callback = callback;
timer->cb_arg = cbdata;
(void)iobase;
return uv_timer_start(&timer->uvt, timer_cb, usec / 1000, 0);
}
static void delete_timer(lcb_io_opt_t iobase, void *timer_opaque)
{
my_timer_t *timer = (my_timer_t *)timer_opaque;
uv_timer_stop(&timer->uvt);
timer->callback = NULL;
(void)iobase;
}
static void timer_close_cb(uv_handle_t *handle)
{
my_timer_t *timer = (my_timer_t *)handle;
decref_iops(&timer->parent->base);
memset(timer, 0xff, sizeof(*timer));
free(timer);
}
static void destroy_timer(lcb_io_opt_t io, void *timer_opaque)
{
delete_timer(io, timer_opaque);
uv_close((uv_handle_t *)timer_opaque, timer_close_cb);
}
static my_uvreq_t *alloc_uvreq(my_sockdata_t *sock, generic_callback_t callback)
{
my_uvreq_t *ret = calloc(1, sizeof(*ret));
if (!ret) {
sock->base.parent->v.v1.error = ENOMEM;
return NULL;
}
ret->socket = sock;
ret->cb.cb_ = callback;
return ret;
}
static void set_last_error(my_iops_t *io, int error)
{
io->base.v.v1.error = uvc_last_errno(io->loop, error);
}
static void wire_iops2(int version,
lcb_loop_procs *loop,
lcb_timer_procs *timer,
lcb_bsd_procs *bsd,
lcb_ev_procs *ev,
lcb_completion_procs *iocp,
lcb_iomodel_t *model)
{
*model = LCB_IOMODEL_COMPLETION;
loop->start = run_event_loop;
loop->stop = stop_event_loop;
loop->tick = tick_event_loop;
timer->create = create_timer;
timer->cancel = delete_timer;
timer->schedule = update_timer;
timer->destroy = destroy_timer;
iocp->close = close_socket;
iocp->socket = create_socket;
iocp->connect = start_connect;
iocp->nameinfo = get_nameinfo;
iocp->read2 = start_read;
iocp->write2 = start_write2;
iocp->cntl = cntl_socket;
iocp->write = NULL;
iocp->wballoc = NULL;
iocp->wbfree = NULL;
iocp->serve = NULL;
(void)bsd;
(void)version;
(void)ev;
}