#ifndef _LIBPOMP_HPP_
#define _LIBPOMP_HPP_
#include <errno.h>
#include <string>
#include <utility>
#include <vector>
#include <functional>
#ifdef _WIN32
#include <winsock2.h>
#else
#include <sys/socket.h>
#endif
#if defined(__cplusplus) && (__cplusplus >= 201103L)
# define POMP_CXX11
#elif defined(__GNUC__) && defined(__GXX_EXPERIMENTAL_CXX0X__)
# define POMP_CXX11
#endif
#define POMP_DISABLE_COPY(_cls) \
private: \
_cls(const _cls &); \
_cls &operator=(const _cls &);
#include "libpomp.h"
#ifdef POMP_CXX11
# include "libpomp-cxx11.hpp"
#endif
namespace pomp {
class Message;
class Connection;
class EventHandler;
class Loop;
class Event;
class Timer;
class Context;
class Message {
POMP_DISABLE_COPY(Message)
private:
struct pomp_msg *mMsg;
const struct pomp_msg *mConstMsg;
friend class Connection;
friend class Context;
friend class Decoder;
friend class Encoder;
private:
inline Message(const struct pomp_msg *msg) {
mMsg = NULL;
mConstMsg = msg;
}
inline const struct pomp_msg *getMsg() const {
return mMsg != NULL ? mMsg : mConstMsg;
}
public:
inline Message() {
mMsg = pomp_msg_new();
mConstMsg = NULL;
}
inline ~Message() {
if (mMsg != NULL)
(void)pomp_msg_destroy(mMsg);
}
inline uint32_t getId() const {
return pomp_msg_get_id(getMsg());
}
inline int write(uint32_t msgid, const char *fmt, ...) POMP_ATTRIBUTE_FORMAT_PRINTF(3, 4) {
va_list args;
va_start(args, fmt);
pomp_msg_clear(mMsg);
int res = pomp_msg_writev(mMsg, msgid, fmt, args);
va_end(args);
return res;
}
inline int writev(uint32_t msgid, const char *fmt, va_list args) {
pomp_msg_clear(mMsg);
return pomp_msg_writev(mMsg, msgid, fmt, args);
}
inline int read(const char *fmt, ...) const POMP_ATTRIBUTE_FORMAT_SCANF(2, 3) {
va_list args;
va_start(args, fmt);
int res = pomp_msg_readv(getMsg(), fmt, args);
va_end(args);
return res;
}
inline int readv(const char *fmt, va_list args) const {
return pomp_msg_readv(getMsg(), fmt, args);
}
#ifdef POMP_CXX11
template<typename Fmt, typename... ArgsW>
inline int write(const ArgsW&... args) {
if (mMsg == NULL)
return -EINVAL;
struct pomp_encoder *enc = pomp_encoder_new();
pomp_msg_clear(mMsg);
pomp_msg_init(mMsg, Fmt::id);
pomp_encoder_init(enc, mMsg);
int res = Fmt::encode(enc, std::forward<const ArgsW&>(args)...);
pomp_msg_finish(mMsg);
pomp_encoder_destroy(enc);
return res;
}
template<typename Fmt, typename... ArgsR>
inline int read(ArgsR&... args) const {
if (getId() != Fmt::id)
return -EINVAL;
struct pomp_decoder *dec = pomp_decoder_new();
pomp_decoder_init(dec, getMsg());
int res = Fmt::decode(dec, std::forward<ArgsR&>(args)...);
pomp_decoder_destroy(dec);
return res;
}
#endif
};
class Decoder {
POMP_DISABLE_COPY(Decoder)
private:
const Message &mMsg;
pomp_decoder *mDec;
public:
Decoder(const Message &message) :
mMsg(message),
mDec(pomp_decoder_new()) {
pomp_decoder_init(mDec, mMsg.getMsg());
}
~Decoder() {
pomp_decoder_destroy(mDec);
}
int read(int8_t &v) {
return pomp_decoder_read_i8(mDec, &v);
}
int read(uint8_t &v) {
return pomp_decoder_read_u8(mDec, &v);
}
int read(int16_t &v) {
return pomp_decoder_read_i16(mDec, &v);
}
int read(uint16_t &v) {
return pomp_decoder_read_u16(mDec, &v);
}
int read(int32_t &v) {
return pomp_decoder_read_i32(mDec, &v);
}
int read(uint32_t &v) {
return pomp_decoder_read_u32(mDec, &v);
}
int read(int64_t &v) {
return pomp_decoder_read_i64(mDec, &v);
}
int read(uint64_t &v) {
return pomp_decoder_read_u64(mDec, &v);
}
int read(float &v) {
return pomp_decoder_read_f32(mDec, &v);
}
int read(double &v) {
return pomp_decoder_read_f64(mDec, &v);
}
int read(std::string &v) {
const char *s = NULL;
int res = pomp_decoder_read_cstr(mDec, &s);
if (res == 0)
v.assign(s);
return res;
}
int read(std::vector<uint8_t> &v) {
const void *p = NULL;
uint32_t n = 0;
int res = pomp_decoder_read_cbuf(mDec, &p, &n);
if (res == 0) {
const uint8_t *start = reinterpret_cast<const uint8_t *>(p);
const uint8_t *end = start + n;
v.assign(start, end);
}
return res;
}
int readFd(int &v) {
return pomp_decoder_read_fd(mDec, &v);
}
};
class Encoder {
POMP_DISABLE_COPY(Encoder)
private:
Message mMsg;
pomp_encoder *mEnc;
bool mFinished;
public:
Encoder(uint32_t msgId) :
mEnc(pomp_encoder_new()),
mFinished(false) {
pomp_msg_init(mMsg.mMsg, msgId);
pomp_encoder_init(mEnc, mMsg.mMsg);
}
~Encoder() {
pomp_encoder_destroy(mEnc);
}
const Message &getMessage() {
if (!mFinished) {
pomp_msg_finish(mMsg.mMsg);
mFinished = true;
}
return mMsg;
}
int writeI8(int8_t v) { return write(v); }
int write(int8_t v) {
return pomp_encoder_write_i8(mEnc, v);
}
int writeU8(uint8_t v) { return write(v); }
int write(uint8_t v) {
return pomp_encoder_write_u8(mEnc, v);
}
int writeI16(int16_t v) { return write(v); }
int write(int16_t v) {
return pomp_encoder_write_i16(mEnc, v);
}
int writeU16(uint16_t v) { return write(v); }
int write(uint16_t v) {
return pomp_encoder_write_u16(mEnc, v);
}
int writeI32(int32_t v) { return write(v); }
int write(int32_t v) {
return pomp_encoder_write_i32(mEnc, v);
}
int writeU32(uint32_t v) { return write(v); }
int write(uint32_t v) {
return pomp_encoder_write_u32(mEnc, v);
}
int writeI64(int64_t v) { return write(v); }
int write(int64_t v) {
return pomp_encoder_write_i64(mEnc, v);
}
int writeU64(uint64_t v) { return write(v); }
int write(uint64_t v) {
return pomp_encoder_write_u64(mEnc, v);
}
int writeF32(float v) { return write(v); }
int write(float v) {
return pomp_encoder_write_f32(mEnc, v);
}
int writeF64(double v) { return write(v); }
int write(double v) {
return pomp_encoder_write_f64(mEnc, v);
}
int write(const std::string &v) { return write(v.c_str()); }
int write(const char *v) {
return pomp_encoder_write_str(mEnc, v);
}
int write(const std::vector<uint8_t> &v) {
const uint8_t *p = v.data();
uint32_t n = static_cast<uint32_t>(v.size());
return pomp_encoder_write_buf(mEnc, p, n);
}
int writeFd(int fd) {
return pomp_encoder_write_fd(mEnc, fd);
}
};
class Connection {
POMP_DISABLE_COPY(Connection)
private:
struct pomp_conn *mConn;
friend class Context;
private:
inline Connection(struct pomp_conn *conn) {
mConn = conn;
}
public:
inline int disconnect() {
return pomp_conn_disconnect(mConn);
}
inline const struct sockaddr *getLocalAddr(uint32_t *addrlen) {
return pomp_conn_get_local_addr(mConn, addrlen);
}
inline const struct sockaddr *getPeerAddr(uint32_t *addrlen) {
return pomp_conn_get_peer_addr(mConn, addrlen);
}
inline const struct pomp_cred *getPeerCred() {
return pomp_conn_get_peer_cred(mConn);
}
inline int getFd() {
return pomp_conn_get_fd(mConn);
}
inline int sendMsg(const Message &msg) {
return pomp_conn_send_msg(mConn, msg.getMsg());
}
inline int send(uint32_t msgid, const char *fmt, ...) POMP_ATTRIBUTE_FORMAT_PRINTF(3, 4) {
va_list args;
va_start(args, fmt);
int res = pomp_conn_sendv(mConn, msgid, fmt, args);
va_end(args);
return res;
}
inline int sendv(uint32_t msgid, const char *fmt, va_list args) {
return pomp_conn_sendv(mConn, msgid, fmt, args);
}
inline int send(const std::vector<uint8_t> &v) {
int res;
struct pomp_buffer *buf;
buf = pomp_buffer_new_with_data(v.data(), v.size());
if (buf != NULL) {
res = pomp_conn_send_raw_buf(mConn, buf);
pomp_buffer_unref(buf);
return res;
}
return -ENOMEM;
}
#ifdef POMP_CXX11
template<typename Fmt, typename... ArgsW>
inline int send(const ArgsW&... args) {
Message msg;
int res = msg.write<Fmt>(args...);
if (res == 0)
res = sendMsg(msg);
return res;
}
#endif
};
typedef std::vector<Connection *> ConnectionArray;
class Loop {
POMP_DISABLE_COPY(Loop)
private:
struct pomp_loop *mLoop;
bool mOwner;
friend class Context;
friend class Timer;
friend class Event;
private:
inline static void fdEventCb(int _fd, uint32_t _revents, void *_userdata) {
Handler *handler = reinterpret_cast<Handler *>(_userdata);
handler->processFd(_fd, _revents);
}
public:
enum {
EVENT_IN = POMP_FD_EVENT_IN,
EVENT_PRI = POMP_FD_EVENT_PRI,
EVENT_OUT = POMP_FD_EVENT_OUT,
EVENT_ERR = POMP_FD_EVENT_ERR,
EVENT_HUP = POMP_FD_EVENT_HUP,
};
class Handler {
POMP_DISABLE_COPY(Handler)
public:
inline Handler() {}
inline virtual ~Handler() {}
virtual void processFd(int fd, uint32_t revents) = 0;
};
inline Loop() {
mLoop = pomp_loop_new();
mOwner = true;
}
inline Loop(struct pomp_loop *loop) {
mLoop = loop;
mOwner = false;
}
inline ~Loop() {
if (mOwner)
(void)pomp_loop_destroy(mLoop);
}
inline int add(int fd, uint32_t events, Handler *handler) {
return pomp_loop_add(mLoop, fd, events, &Loop::fdEventCb, handler);
}
inline int update(int fd, uint32_t events) {
return pomp_loop_update(mLoop, fd, events);
}
inline int remove(int fd) {
return pomp_loop_remove(mLoop, fd);
}
inline bool hasFd(int fd) {
return pomp_loop_has_fd(mLoop, fd) != 0;
}
inline intptr_t getFd() {
return pomp_loop_get_fd(mLoop);
}
inline int processFd() {
return pomp_loop_process_fd(mLoop);
}
inline int waitAndProcess(int timeout) {
return pomp_loop_wait_and_process(mLoop, timeout);
}
inline int wakeup() {
return pomp_loop_wakeup(mLoop);
}
inline operator struct pomp_loop *() {
return mLoop;
}
inline struct pomp_loop *get() const {
return mLoop;
}
#ifdef POMP_CXX11
class HandlerFunc : public Handler {
POMP_DISABLE_COPY(HandlerFunc)
public:
typedef std::function<void (int fd, uint32_t revents)> Func;
inline HandlerFunc() {}
inline HandlerFunc(const Func &func) : mFunc(func) {}
inline void set(const Func &func) {mFunc = func;}
inline virtual void processFd(int fd, uint32_t revents) {mFunc(fd, revents);}
private:
Func mFunc;
};
#endif
};
class Event {
POMP_DISABLE_COPY(Event)
private:
struct pomp_evt *mEvt;
private:
inline static void eventCb(struct pomp_evt *_evt, void *_userdata) {
(void)_evt;
Handler *handler = reinterpret_cast<Handler *>(_userdata);
handler->processEvent();
}
public:
class Handler {
POMP_DISABLE_COPY(Handler)
public:
inline Handler() {}
inline virtual ~Handler() {}
virtual void processEvent() = 0;
};
inline Event() {
mEvt = pomp_evt_new();
}
inline ~Event() {
(void)pomp_evt_destroy(mEvt);
}
inline int signal() {
return pomp_evt_signal(mEvt);
}
inline int clear() {
return pomp_evt_clear(mEvt);
}
inline int attachToLoop(Loop *loop, Handler *handler) {
return pomp_evt_attach_to_loop(mEvt, loop->mLoop,
&Event::eventCb, handler);
}
inline int detachFromLoop(Loop *loop) {
return pomp_evt_detach_from_loop(mEvt, loop->mLoop);
}
inline bool isAttached(Loop *loop) {
struct pomp_loop *_loop = loop ? loop->mLoop : NULL;
return pomp_evt_is_attached(mEvt, _loop) != 0;
}
#ifdef POMP_CXX11
class HandlerFunc : public Handler {
POMP_DISABLE_COPY(HandlerFunc)
public:
typedef std::function<void ()> Func;
inline HandlerFunc() {}
inline HandlerFunc(const Func &func) : mFunc(func) {}
inline void set(const Func &func) {mFunc = func;}
inline virtual void processEvent() {mFunc();}
private:
Func mFunc;
};
#endif
};
class Timer {
POMP_DISABLE_COPY(Timer)
private:
struct pomp_timer *mTimer;
private:
inline static void timerCb(struct pomp_timer *_timer, void *_userdata) {
(void)_timer;
Handler *handler = reinterpret_cast<Handler *>(_userdata);
handler->processTimer();
}
public:
class Handler {
POMP_DISABLE_COPY(Handler)
public:
inline Handler() {}
inline virtual ~Handler() {}
virtual void processTimer() = 0;
};
inline Timer(Loop *loop, Handler *handler) {
mTimer = pomp_timer_new(loop->mLoop, &Timer::timerCb, handler);
}
inline ~Timer() {
(void)pomp_timer_destroy(mTimer);
}
inline int set(uint32_t delay, uint32_t period = 0) {
if (period == 0)
return pomp_timer_set(mTimer, delay);
else
return pomp_timer_set_periodic(mTimer, delay, period);
}
inline int setPeriodic(uint32_t delay, uint32_t period) {
return pomp_timer_set_periodic(mTimer, delay, period);
}
inline int clear() {
return pomp_timer_clear(mTimer);
}
#ifdef POMP_CXX11
class HandlerFunc : public Handler {
POMP_DISABLE_COPY(HandlerFunc)
public:
typedef std::function<void ()> Func;
inline HandlerFunc() {}
inline HandlerFunc(const Func &func) : mFunc(func) {}
inline void set(const Func &func) {mFunc = func;}
inline virtual void processTimer() {mFunc();}
private:
Func mFunc;
};
#endif
};
class Address
{
public:
Address() : mAddressLength(0), mValid(false) {}
Address(const char * str) :
mAddressLength(sizeof(mAddress)),
mValid(pomp_addr_parse(str, &mAddress.sa, &mAddressLength) == 0)
{
}
bool isValid() const { return mValid; }
const sockaddr * addr() const { return &mAddress.sa; }
uint32_t len() const { return mAddressLength; }
inline static int getRealAddr(const char * str, std::string &dst)
{
char *s = NULL;
int res = pomp_addr_get_real_addr(str, &s);
if (res == 0) {
dst.assign(s);
free(s);
}
return res;
}
#ifdef POMP_CXX11
explicit operator bool () const { return isValid(); }
#endif
private:
union { sockaddr sa;
sockaddr_storage sa_storage;
} mAddress;
uint32_t mAddressLength;
const bool mValid;
};
class EventHandler {
POMP_DISABLE_COPY(EventHandler)
public:
inline EventHandler() {}
inline virtual ~EventHandler() {}
inline virtual void onConnected(Context *ctx, Connection *conn) { (void)ctx; (void)conn; }
inline virtual void onDisconnected(Context *ctx, Connection *conn) { (void)ctx; (void)conn; }
inline virtual void recvMessage(Context *ctx, Connection *conn, const Message &msg) { (void)ctx; (void)conn; (void)msg; }
inline virtual void recvRawBuffer(Context *ctx, Connection *conn, const std::vector<uint8_t> &v) { (void)ctx; (void)conn; (void)v; }
};
class Context {
POMP_DISABLE_COPY(Context)
private:
struct pomp_ctx *mCtx;
EventHandler *mEventHandler;
ConnectionArray mConnections;
Loop *mLoop;
bool mExtLoop;
private:
inline ConnectionArray::iterator findConn(struct pomp_conn *_conn) {
ConnectionArray::iterator it;
for (it = mConnections.begin(); it != mConnections.end(); ++it) {
if ((*it)->mConn == _conn)
return it;
}
return mConnections.end();
}
inline static void eventCb(struct pomp_ctx *_ctx,
enum pomp_event _event,
struct pomp_conn *_conn,
const struct pomp_msg *_msg,
void *_userdata) {
(void)_ctx;
Context *self = reinterpret_cast<Context *>(_userdata);
Connection *conn = NULL;
ConnectionArray::iterator it;
switch (_event) {
case POMP_EVENT_CONNECTED:
conn = new Connection(_conn);
self->mConnections.push_back(conn);
self->mEventHandler->onConnected(self, conn);
break;
case POMP_EVENT_DISCONNECTED:
it = self->findConn(_conn);
if (it != self->mConnections.end()) {
conn = *it;
self->mEventHandler->onDisconnected(self, conn);
self->mConnections.erase(it);
delete conn;
}
break;
case POMP_EVENT_MSG:
it = self->findConn(_conn);
if (it != self->mConnections.end())
conn = *it;
self->mEventHandler->recvMessage(self, conn, Message(_msg));
break;
default:
break;
}
}
inline static void rawCb(struct pomp_ctx *_ctx,
struct pomp_conn *_conn,
struct pomp_buffer *_buf,
void *_userdata) {
(void)_ctx;
std::vector<uint8_t> v;
const void *cdata = NULL;
size_t len;
size_t capacity;
int res;
Context *self = reinterpret_cast<Context *>(_userdata);
Connection *conn = NULL;
ConnectionArray::iterator it;
it = self->findConn(_conn);
if (it != self->mConnections.end())
conn = *it;
res = pomp_buffer_get_cdata(_buf, &cdata, &len, &capacity);
if (res == 0) {
const uint8_t *start = reinterpret_cast<const uint8_t *>(cdata);
const uint8_t *end = start + len;
v.assign(start, end);
self->mEventHandler->recvRawBuffer(self, conn, v);
}
}
public:
inline Context(EventHandler *eventHandler, Loop *loop = NULL) {
if (loop == NULL) {
mCtx = pomp_ctx_new(&Context::eventCb, this);
mEventHandler = eventHandler;
mLoop = NULL;
mExtLoop = false;
} else {
mCtx = pomp_ctx_new_with_loop(&Context::eventCb, this, loop->mLoop);
mEventHandler = eventHandler;
mLoop = loop;
mExtLoop = true;
}
}
inline ~Context() {
(void)pomp_ctx_destroy(mCtx);
if (mLoop != NULL && !mExtLoop)
delete mLoop;
}
inline int listen(const struct sockaddr *addr, uint32_t addrlen) {
return pomp_ctx_listen(mCtx, addr, addrlen);
}
inline int listen(const Address &address) {
return listen(address.addr(), address.len());
}
inline int listen(const struct sockaddr *addr, uint32_t addrlen, uint32_t mode) {
return pomp_ctx_listen_with_access_mode(mCtx, addr, addrlen, mode);
}
inline int listen(const Address &address, uint32_t mode) {
return listen(address.addr(), address.len(), mode);
}
inline int setRaw() {
return pomp_ctx_set_raw(mCtx, &Context::rawCb);
}
inline int connect(const struct sockaddr *addr, uint32_t addrlen) {
return pomp_ctx_connect(mCtx, addr, addrlen);
}
inline int connect(const Address &address) {
return connect(address.addr(), address.len());
}
inline int bind(const struct sockaddr *addr, uint32_t addrlen) {
return pomp_ctx_bind(mCtx, addr, addrlen);
}
inline int bind(const Address & address) {
return bind(address.addr(), address.len());
}
inline int stop() {
return pomp_ctx_stop(mCtx);
}
inline intptr_t getFd() {
return pomp_ctx_get_fd(mCtx);
}
inline int processFd() {
return pomp_ctx_process_fd(mCtx);
}
inline int waitAndProcess(int timeout) {
return pomp_ctx_wait_and_process(mCtx, timeout);
}
inline int wakeup() {
return pomp_ctx_wakeup(mCtx);
}
inline const struct sockaddr *getLocalAddr(uint32_t *addrlen) {
return pomp_ctx_get_local_addr(mCtx, addrlen);
}
inline Loop *getLoop() {
if (mLoop == NULL)
mLoop = new Loop(pomp_ctx_get_loop(mCtx));
return mLoop;
}
inline Connection *getConnection() const {
return mConnections.size() > 0 ? mConnections[0] : NULL;
}
inline const ConnectionArray &getConnections() const {
return mConnections;
}
inline int sendMsg(const Message &msg) {
return pomp_ctx_send_msg(mCtx, msg.getMsg());
}
inline int sendMsgTo(const Message &msg, const struct sockaddr *addr, uint32_t addrlen) {
return pomp_ctx_send_msg_to(mCtx, msg.getMsg(), addr, addrlen);
}
inline int sendMsgTo(const Message &msg, const Address & address) {
return sendMsgTo(msg, address.addr(), address.len());
}
inline int send(uint32_t msgid, const char *fmt, ...) POMP_ATTRIBUTE_FORMAT_PRINTF(3, 4) {
va_list args;
va_start(args, fmt);
int res = pomp_ctx_sendv(mCtx, msgid, fmt, args);
va_end(args);
return res;
}
inline int sendv(uint32_t msgid, const char *fmt, va_list args) {
return pomp_ctx_sendv(mCtx, msgid, fmt, args);
}
inline int send(const std::vector<uint8_t> &v) {
int res;
struct pomp_buffer *buf;
buf = pomp_buffer_new_with_data(v.data(), v.size());
if (buf != NULL) {
res = pomp_ctx_send_raw_buf(mCtx, buf);
pomp_buffer_unref(buf);
return res;
}
return -ENOMEM;
}
#ifdef POMP_CXX11
template<typename Fmt, typename... ArgsW>
inline int send(const ArgsW&... args) {
Message msg;
int res = msg.write<Fmt>(args...);
if (res == 0)
res = sendMsg(msg);
return res;
}
#endif
};
}
#endif