#include "common/attributes.h"
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#ifndef _WIN32
#include "log-private.h"
#endif
#ifdef _WIN32
#if !defined(UNDER_CE)
# define _NO_OLDNAMES 1
# include <io.h>
#endif
#include <winsock2.h>
#include "log-private.h"
typedef unsigned long int nfds_t;
#if !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
struct pollfd {
int fd;
short events;
short revents;
};
#define POLLIN 0x001
#define POLLPRI 0x002
#define POLLOUT 0x004
# define POLLRDNORM 0x040
# define POLLRDBAND 0x080
# define POLLWRNORM 0x100
# define POLLWRBAND 0x200
#define POLLERR 0x008
#define POLLHUP 0x010
#define POLLNVAL 0x020
#endif
#include "contrib/poll_win.c"
#else
# include <poll.h>
#endif
#include "stdio-shim.h"
#include "libevsocket.h"
#include "socket-shim.h"
#include "pthread-shim.h"
#include "librist/udpsocket.h"
struct evsocket_event {
int fd;
short events;
void (*callback)(struct evsocket_ctx *ctx, int fd, short revents, void *arg);
void (*err_callback)(struct evsocket_ctx *ctx, int fd, short revents, void *arg);
void *arg;
struct evsocket_event *next;
};
struct evsocket_ctx {
int changed;
int n_events;
int last_served;
struct pollfd *pfd;
struct evsocket_event *events;
struct evsocket_event *_array;
int giveup;
struct evsocket_ctx *next;
};
#if !defined(_WIN32) || HAVE_PTHREADS
static pthread_mutex_t ctx_list_mutex = PTHREAD_MUTEX_INITIALIZER;
#else
static pthread_mutex_t ctx_list_mutex;
static INIT_ONCE once_var;
#endif
static struct evsocket_ctx *CTX_LIST = NULL;
static void ctx_add(struct evsocket_ctx *c)
{
#if defined(_WIN32) && !HAVE_PTHREADS
init_mutex_once(&ctx_list_mutex, &once_var);
#endif
pthread_mutex_lock(&ctx_list_mutex);
c->next = CTX_LIST;
CTX_LIST = c;
pthread_mutex_unlock(&ctx_list_mutex);
}
static void ctx_del(struct evsocket_ctx *delme)
{
pthread_mutex_lock(&ctx_list_mutex);
struct evsocket_ctx *p = NULL, *c = CTX_LIST;
while(c) {
if (c == delme) {
if (p) {
p->next = c->next;
} else {
CTX_LIST = NULL;
}
goto out;
}
p = c;
c = c->next;
}
out:
pthread_mutex_unlock(&ctx_list_mutex);
}
struct evsocket_event *evsocket_addevent(struct evsocket_ctx *ctx, int fd, short events,
void (*callback)(struct evsocket_ctx *ctx, int fd, short revents, void *arg),
void (*err_callback)(struct evsocket_ctx *ctx, int fd, short revents, void *arg),
void *arg)
{
struct evsocket_event *e;
if (!ctx) {
return NULL;
}
e = malloc(sizeof(struct evsocket_event));
if (!e) {
return e;
}
e->fd = fd;
e->events = events;
e->callback = callback;
e->err_callback = err_callback;
e->arg = arg;
ctx->changed = 1;
e->next = ctx->events;
ctx->events = e;
ctx->n_events++;
return e;
}
void evsocket_delevent(struct evsocket_ctx *ctx, struct evsocket_event *e)
{
struct evsocket_event *cur, *prev;
if (!ctx) {
return;
}
ctx->changed = 1;
cur = ctx->events;
prev = NULL;
while(cur) {
if (cur == e) {
if (!prev) {
ctx->events = e->next;
} else {
prev->next = e->next;
}
free(e);
break;
}
prev = cur;
cur = cur->next;
}
ctx->n_events--;
}
static void rebuild_poll(struct evsocket_ctx *ctx)
{
struct evsocket_event *e;
void *ptr = NULL;
if (!ctx) {
return;
}
if (ctx->pfd) {
ptr = ctx->pfd;
ctx->pfd = NULL;
free(ptr);
}
if (ctx->_array) {
ptr = ctx->_array;
ctx->_array = NULL;
free(ptr);
}
if (ctx->n_events > 0) {
ctx->pfd = malloc(sizeof(struct pollfd) * ctx->n_events);
ctx->_array = calloc(sizeof(struct evsocket_event), ctx->n_events);
}
if ((!ctx->pfd) || (!ctx->_array)) {
if (ctx->n_events > 0) {
rist_log_priv3( RIST_LOG_ERROR, "libevsocket, rebuild_poll: events are disabled (%d)\n",
ctx->n_events);
}
ctx->n_events = 0;
ctx->changed = 0;
return;
}
int i = 0;
e = ctx->events;
while(e) {
memcpy(ctx->_array + i, e, sizeof(struct evsocket_event));
ctx->pfd[i].fd = e->fd;
ctx->pfd[i++].events = (e->events & (POLLIN | POLLOUT)) | (POLLHUP | POLLERR);
e = e->next;
}
ctx->last_served = 1;
ctx->changed = 0;
}
static void serve_event(struct evsocket_ctx *ctx, int n)
{
struct evsocket_event *e = ctx->_array + n;
if (!ctx) {
return;
}
if (n >= ctx->n_events) {
rist_log_priv3( RIST_LOG_ERROR, "libevsocket, serve_event: Invalid event %d >= %d\n",
n, ctx->n_events);
return;
}
if (e) {
ctx->last_served = n;
if ((ctx->pfd[n].revents & (POLLHUP | POLLERR)) && e->err_callback)
e->err_callback(ctx, e->fd, ctx->pfd[n].revents, e->arg);
else if (e->callback) {
e->callback(ctx, e->fd, ctx->pfd[n].revents, e->arg);
}
}
}
struct evsocket_ctx *evsocket_create(void)
{
struct evsocket_ctx *ctx;
pthread_mutex_init(&ctx_list_mutex, NULL);
ctx = calloc(1, sizeof(struct evsocket_ctx));
if (!ctx) {
return NULL;
}
ctx->giveup = 0;
ctx->n_events = 0;
ctx->changed = 0;
ctx_add(ctx);
return ctx;
}
void evsocket_loop(struct evsocket_ctx *ctx, int timeout)
{
for(;;) {
if (!ctx || ctx->giveup)
break;
evsocket_loop_single(ctx, timeout, 10);
}
}
int evsocket_loop_single(struct evsocket_ctx *ctx, int timeout, int max_events)
{
int pollret, i;
int event_count = 0;
int retval = 0;
if (!ctx || ctx->giveup) {
retval = -1;
goto loop_error;
}
if (ctx->changed) {
rebuild_poll(ctx);
}
if (ctx->pfd == NULL) {
ctx->changed = 1;
retval = -2;
goto loop_error;
}
if (ctx->n_events < 1) {
rist_log_priv3( RIST_LOG_ERROR, "libevsocket, evsocket_loop_single: no events (%d)\n",
ctx->n_events);
retval = -3;
goto loop_error;
}
pollret = poll(ctx->pfd, ctx->n_events, timeout);
if (pollret <= 0) {
if (pollret < 0) {
rist_log_priv3( RIST_LOG_ERROR, "libevsocket, evsocket_loop: poll returned %d, n_events = %d, error = %d\n",
pollret, ctx->n_events, errno);
retval = -4;
goto loop_error;
}
return 0;
}
for (i = ctx->last_served +1; i < ctx->n_events; i++) {
if (ctx->pfd[i].revents != 0) {
serve_event(ctx, i);
if (max_events > 0 && ++event_count >= max_events)
return 0;
}
}
for (i = 0; i <= ctx->last_served; i++) {
if (ctx->pfd[i].revents != 0) {
serve_event(ctx, i);
if (max_events > 0 && ++event_count >= max_events)
return 0;
}
}
return 0;
loop_error:
if (timeout > 0)
usleep(timeout * 1000);
return retval;
}
void evsocket_destroy(struct evsocket_ctx *ctx)
{
ctx_del(ctx);
if (ctx->pfd)
free(ctx->pfd);
if (ctx->_array)
free(ctx->_array);
free(ctx);
ctx = NULL;
}
void evsocket_loop_stop(struct evsocket_ctx *ctx)
{
if (ctx)
ctx->giveup = 1;
}
int evsocket_geteventcount(struct evsocket_ctx *ctx)
{
if (ctx)
return ctx->n_events;
else
return 0;
}