#include "protoZMQ.h"
#include "protoDebug.h"
#ifdef WIN32
#include <processthreadsapi.h>
#else
#include <unistd.h>
#endif
ProtoZmq::PollerThread* ProtoZmq::Socket::default_poller_thread = NULL;
ProtoMutex ProtoZmq::Socket::default_poller_mutex;
ProtoZmq::Socket::Socket()
: state(CLOSED), zmq_ctx(NULL), zmq_sock(NULL), poll_flags(ZMQ_POLLIN)
{
}
ProtoZmq::Socket::~Socket()
{
Close();
}
bool ProtoZmq::Socket::Open(int socketType, void* zmqSocket, void* zmqContext)
{
if (IsOpen()) Close();
ext_ctx = false;
if (NULL == zmqContext)
zmqContext = zmq_ctx_new();
else
ext_ctx = true;
if (NULL == zmqContext)
{
PLOG(PL_ERROR, " ProtoZmq::Socket::Open() zmq_ctx_new() error: %s\n", GetErrorString());
return false;
}
zmq_ctx = zmqContext;
ext_sock = false;
if (NULL == zmqSocket)
zmqSocket = zmq_socket(zmq_ctx, socketType);
else
ext_sock = true;
if (NULL == zmqSocket)
{
PLOG(PL_ERROR, " ProtoZmq::Socket::Open() zmq_socket() error: %s\n", GetErrorString());
Close();
return false;
}
zmq_sock = zmqSocket;
if (!ProtoEvent::Open())
{
PLOG(PL_ERROR, " ProtoZmq::Socket::Open() ProtoEvent::Open() error: %s\n", GetErrorString());
Close();
return false;
}
state = IDLE;
return true;
}
void ProtoZmq::Socket::Close()
{
state = CLOSED;
UpdateNotification();
if (ProtoEvent::IsOpen())
ProtoEvent::Close();
if (NULL != zmq_sock)
{
if (!ext_sock)
zmq_close(zmq_sock);
zmq_sock = NULL;
ext_sock = false;
}
if (NULL != zmq_ctx)
{
if (!ext_ctx)
zmq_ctx_term(zmq_ctx);
zmq_ctx = NULL;
ext_ctx = false;
}
}
bool ProtoZmq::Socket::StartInputNotification()
{
poller_mutex.Lock();
if (0 == (ZMQ_POLLIN & poll_flags))
{
poll_flags |= ZMQ_POLLIN;
if (!UpdateNotification())
{
PLOG(PL_ERROR, "ProtoZmq::Socket::StartInputNotification() error: unable to update notifications\n");
poll_flags &= ~ZMQ_POLLIN;
poller_mutex.Unlock();
return false;
}
}
poller_mutex.Unlock();
return true;
}
bool ProtoZmq::Socket::StopInputNotification()
{
poller_mutex.Lock();
if (0 != (ZMQ_POLLIN & poll_flags))
{
poll_flags &= ~ZMQ_POLLIN;
if (!UpdateNotification())
{
PLOG(PL_ERROR, "ProtoZmq::Socket::StopInputNotification() error: unable to update notifications\n");
poll_flags |= ~ZMQ_POLLIN;
poller_mutex.Unlock();
return false;
}
}
poller_mutex.Unlock();
return true;
}
bool ProtoZmq::Socket::StartOutputNotification()
{
poller_mutex.Lock();
if (0 == (ZMQ_POLLOUT & poll_flags))
{
poll_flags |= ZMQ_POLLOUT;
if (!UpdateNotification())
{
PLOG(PL_ERROR, "ProtoZmq::Socket::StartOutputNotification() error: unable to update notifications\n");
poll_flags &= ~ZMQ_POLLOUT;
poller_mutex.Unlock();
return false;
}
}
poller_mutex.Unlock();
return true;
}
bool ProtoZmq::Socket::StopOutputNotification()
{
poller_mutex.Lock();
if (0 != (ZMQ_POLLOUT & poll_flags))
{
poll_flags &= ~ZMQ_POLLOUT;
if (!UpdateNotification())
{
PLOG(PL_ERROR, "ProtoZmq::Socket::StopOutputNotification() error: unable to update notifications\n");
poll_flags |= ~ZMQ_POLLOUT;
poller_mutex.Unlock();
return false;
}
}
poller_mutex.Unlock();
return true;
}
bool ProtoZmq::Socket::SetNotifier(ProtoEvent::Notifier* theNotifier, PollerThread* pollerThread)
{
poller_mutex.Lock();
if (NULL != theNotifier)
{
if (theNotifier == GetNotifier())
{
poller_mutex.Unlock();
return true; }
if (HasNotifier())
{
if ((NULL != poller_thread) && poller_active)
{
poller_thread->RemoveSocket(*this);
poller_active = false;
}
ProtoEvent::SetNotifier(NULL);
}
if (NULL == pollerThread)
{
default_poller_mutex.Lock();
if (NULL == default_poller_thread)
{
if (NULL == (default_poller_thread = new PollerThread()))
{
PLOG(PL_ERROR, "ProtoZmq::Socket::SetNotifier() new default_poller_thread error: %s\n", GetErrorString());
default_poller_mutex.Unlock();
poller_mutex.Unlock();
return false;
}
}
poller_thread = default_poller_thread;
default_poller_mutex.Unlock();
}
else
{
poller_thread = pollerThread;
}
}
else if (HasNotifier())
{
if ((NULL != poller_thread) && poller_active)
{
poller_thread->RemoveSocket(*this);
poller_active = false;
}
poller_thread = NULL;
}
ProtoEvent::SetNotifier(theNotifier);
poller_mutex.Unlock();
return UpdateNotification(); }
bool ProtoZmq::Socket::UpdateNotification()
{
if (CONNECTED == state)
{
if (HasNotifier())
{
ASSERT(NULL != poller_thread);
if (poller_active)
{
if (0 != poll_flags)
{
if (!poller_thread->ModSocket(*this))
{
PLOG(PL_ERROR, "ProtoZmq::Socket::UpdateNotification() error: unable to modify socket on poller_thread!\n");
return false;
}
}
else
{
poller_thread->RemoveSocket(*this);
poller_active = false;
}
}
else
{
if (!poller_thread->AddSocket(*this))
{
PLOG(PL_ERROR, "ProtoZmq::Socket::UpdateNotification() error: unable to add socket to poller_thread!\n");
return false;
}
poller_active = true;
}
}
}
else if ((NULL != poller_thread) && poller_active)
{
poller_thread->RemoveSocket(*this);
poller_active = false;
}
return true;
}
bool ProtoZmq::Socket::Bind(const char* endpoint)
{
poller_mutex.Lock();
if (!IsOpen())
{
PLOG(PL_ERROR, "ProtoZmq::Socket::Bind() error: socket not open!\n");
poller_mutex.Unlock();
return false;
}
if (0 != zmq_bind(zmq_sock, endpoint))
{
PLOG(PL_ERROR, "ProtoZmq::Socket::Bind() zmq_bind() error: %s\n", GetErrorString());
poller_mutex.Unlock();
return false;
}
state = CONNECTED; if (!UpdateNotification())
{
PLOG(PL_ERROR, "ProtoZmq::Socket::Bind() error: unable to update notification status\n");
Close();
poller_mutex.Unlock();
return false;
}
poller_mutex.Unlock();
return true;
}
bool ProtoZmq::Socket::Connect(const char* endpoint)
{
poller_mutex.Lock();
if (!IsOpen())
{
PLOG(PL_ERROR, "ProtoZmq::Socket::Connect() error: socket not open!\n");
poller_mutex.Unlock();
return false;
}
if (0 != zmq_connect(zmq_sock, endpoint))
{
PLOG(PL_ERROR, "ProtoZmq::Socket::Connect() zmq_bind() error: %s\n", GetErrorString());
poller_mutex.Unlock();
return false;
}
state = CONNECTED; if (!UpdateNotification())
{
PLOG(PL_ERROR, "ProtoZmq::Socket::Connect() error: unable to update notification status\n");
Close();
poller_mutex.Unlock();
return false;
}
poller_mutex.Unlock();
return true;
}
bool ProtoZmq::Socket::Subscribe(const char* prefix, unsigned int length)
{
if (NULL == zmq_sock)
{
PLOG(PL_ERROR, "ProtoZmq::Socket::Subscribe() error: socket not open\n");
return false;
}
if (NULL == prefix)
length = 0;
else if (0 == length)
length = (unsigned int)strlen(prefix);
if (0 != zmq_setsockopt(zmq_sock, ZMQ_SUBSCRIBE, prefix, length))
{
PLOG(PL_ERROR, "ProtoZmq::Socket::Subscribe() zmq_setsockopt(ZMQ_SUBSCRIBE) error: %s\n", zmq_strerror(zmq_errno()));
return false;
}
return true;
}
bool ProtoZmq::Socket::Join(const char* group)
{
if (NULL == zmq_sock)
{
PLOG(PL_ERROR, "ProtoZmq::Socket::Join() error: socket not open\n");
return false;
}
if (0 != zmq_join(zmq_sock, group))
{
PLOG(PL_ERROR, "ProtoZmq::Socket::Join() zmq_join() error: %s\n", zmq_strerror(zmq_errno()));
return false;
}
return true;
}
bool ProtoZmq::Socket::Send(char* buffer, unsigned int& numBytes)
{
poller_mutex.Lock();
int result = zmq_send(zmq_sock, buffer, numBytes, ZMQ_DONTWAIT);
if (poller_active && (0 != (ZMQ_POLLOUT & poll_flags)))
UpdateNotification(); if (result < 0)
{
switch (zmq_errno())
{
case EAGAIN:
case EINTR: numBytes = 0;
break;
default:
PLOG(PL_ERROR, "ProtoZmq::Socket::Send() zmq_send() error: %s\n", GetErrorString());
numBytes = 0;
poller_mutex.Unlock();
return false;
}
}
else
{
numBytes = result;
}
poller_mutex.Unlock();
return true;
}
bool ProtoZmq::Socket::SendToGroup(char* buffer, unsigned int& numBytes,const char* group)
{
poller_mutex.Lock();
zmq_msg_t msg;
int result = zmq_msg_init_size (&msg, numBytes);
assert (result == 0);
memcpy (zmq_msg_data (&msg),buffer,numBytes);
zmq_msg_set_group(&msg,group);
result = zmq_msg_send (&msg, zmq_sock, ZMQ_DONTWAIT);
if (poller_active && (0 != (ZMQ_POLLOUT & poll_flags)))
UpdateNotification(); if (result < 0)
{
switch (zmq_errno())
{
case EAGAIN:
case EINTR: numBytes = 0;
break;
default:
PLOG(PL_ERROR, "ProtoZmq::Socket::SendToGroup() zmq_msg_send() error: %s\n", GetErrorString());
numBytes = 0;
return false;
poller_mutex.Unlock();
}
}
else
{
numBytes = result;
}
poller_mutex.Unlock();
return true;
}
bool ProtoZmq::Socket::Recv(char* buffer, unsigned int& numBytes)
{
poller_mutex.Lock();
int result = zmq_recv(zmq_sock, buffer, numBytes, ZMQ_DONTWAIT);
if (poller_active && (0 != (ZMQ_POLLIN & poll_flags)))
UpdateNotification(); if (result < 0)
{
switch (zmq_errno())
{
case EAGAIN:
case EINTR: numBytes = 0;
break;
default:
PLOG(PL_ERROR, "ProtoZmq::Socket::Recv() zmq_recv() error: %s\n", GetErrorString());
numBytes = 0;
poller_mutex.Unlock();
return false;
}
}
else
{
numBytes = result;
}
poller_mutex.Unlock();
return true;
}
bool ProtoZmq::Socket::RecvMsg(zmq_msg_t* zmqMsg)
{
poller_mutex.Lock();
int result = zmq_msg_recv(zmqMsg, zmq_sock, ZMQ_DONTWAIT);
if (poller_active && (0 != (ZMQ_POLLIN & poll_flags)))
UpdateNotification(); if (result < 0)
{
switch (zmq_errno())
{
case EAGAIN:
case EINTR: break;
default:
PLOG(PL_ERROR, "ProtoZmq::Socket::RecvMsg() zmq_recv() error: %s\n", GetErrorString());
poller_mutex.Unlock();
return false;
}
}
poller_mutex.Unlock();
return true;
}
ProtoZmq::PollerThread::PollerThread()
: zmq_ctx(NULL), ext_ctx(false), zmq_poller(NULL), event_array(NULL),
event_array_length(0), socket_count(0)
{
}
ProtoZmq::PollerThread::~PollerThread()
{
bool isDefault = (this == ProtoZmq::Socket::default_poller_thread);
if (isDefault)
{
ProtoZmq::Socket::default_poller_mutex.Lock();
if (NULL == ProtoZmq::Socket::default_poller_thread)
{
ProtoZmq::Socket::default_poller_mutex.Unlock();
return; }
}
Close();
if (isDefault)
{
ProtoZmq::Socket::default_poller_thread = NULL;
ProtoZmq::Socket::default_poller_mutex.Unlock();
}
}
bool ProtoZmq::PollerThread::Open(void* zmqContext, bool retainLock)
{
suspend_mutex.Lock();
if (NULL != zmq_poller)
{
if (!retainLock) suspend_mutex.Unlock();
return true;
}
if (NULL == (zmq_poller = zmq_poller_new()))
{
PLOG(PL_ERROR, "ProtoZmq::PollerThread::Open() zmoq_poller_new() error %s\n", zmq_strerror(zmq_errno()));
suspend_mutex.Unlock();
return false;
}
ext_ctx = false;
if (NULL == zmqContext)
zmqContext = zmq_ctx_new();
else
ext_ctx = true;
if (NULL == zmqContext)
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::Open() zmq_ctx_new() error: %s\n", zmq_strerror(zmq_errno()));
ClosePrivate();
suspend_mutex.Unlock();
return false;
}
zmq_ctx = zmqContext;
if (!break_server.Open(ZMQ_SERVER, NULL, zmq_ctx))
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::Open() error: unable to open break_server\n");
ClosePrivate();
suspend_mutex.Unlock();
return false;
}
char endpoint[256];
endpoint[255] = '\0';
#ifdef WIN32
snprintf(endpoint, 255, "inproc://ProtoZmq-%d-%p", GetCurrentProcessId(), this);
#else
snprintf(endpoint, 255, "inproc://ProtoZmq-%d-%p",getpid(), this);
#endif if (!break_server.Bind(endpoint))
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::Open() error: unable to bind break_server\n");
ClosePrivate();
suspend_mutex.Unlock();
return false;
}
if (!AddSocketPrivate(break_server))
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::Open() error: unable to add break_server socket to poller\n");
ClosePrivate();
suspend_mutex.Unlock();
return false;
}
if (!break_client.Open(ZMQ_CLIENT, NULL, zmq_ctx))
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::Open() error: unable to open break_client\n");
ClosePrivate();
suspend_mutex.Unlock();
return false;
}
if (!break_client.Connect(endpoint))
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::Open() error: unable to connect break_client\n");
ClosePrivate();
suspend_mutex.Unlock();
return false;
}
if (!StartThread())
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::Open() error: unable to start poller thread!\n");
ClosePrivate();
suspend_mutex.Unlock();
}
while(!poller_running); if (!retainLock) suspend_mutex.Unlock();
return true;
}
void ProtoZmq::PollerThread::Close()
{
suspend_mutex.Lock();
ClosePrivate();
suspend_mutex.Unlock();
if (this == ProtoZmq::Socket::default_poller_thread)
{
delete ProtoZmq::Socket::default_poller_thread; }
}
void ProtoZmq::PollerThread::ClosePrivate()
{
if (poller_running)
{
poller_running = false;
if (Signal()) {
suspend_mutex.Unlock(); StopThread(); suspend_mutex.Lock();
signal_mutex.Unlock();
}
}
if (break_client.IsOpen())
{
break_client.Close();
}
if (break_server.IsOpen()) break_server.Close();
if (NULL != zmq_ctx)
{
if (!ext_ctx)
zmq_ctx_term(zmq_ctx);
zmq_ctx = NULL;
ext_ctx = false;
}
if (NULL != zmq_poller)
{
zmq_poller_destroy(&zmq_poller);
zmq_poller = NULL;
}
}
int ProtoZmq::PollerThread::RunThread()
{
poller_running = true;
suspend_mutex.Lock();
while(poller_running)
{
signal_mutex.Lock();
suspend_mutex.Unlock();
int result = zmq_poller_wait_all(zmq_poller, event_array, event_array_length, -1);
signal_mutex.Unlock();
suspend_mutex.Lock();
if (!poller_running) break;
if (result < 0)
{
switch (zmq_errno())
{
case ETERM:
PLOG(PL_ERROR, " ProtoZmq::PollerThread::RunThread() zmq_poller_wait_all() error: %s\n", zmq_strerror(ETERM));
poller_running = false;
break;
case EINTR:
case EAGAIN: break;
default:
PLOG(PL_ERROR, " ProtoZmq::PollerThread::RunThread() zmq_poller_wait_all() error: %s\n", zmq_strerror(zmq_errno()));
poller_running = false;
break;
}
continue;
}
for (int i = 0; i < result; i++)
{
zmq_poller_event_t* item = event_array + i;
if (break_server.GetSocket() == item->socket)
{
char dummy[64];
unsigned int numBytes;
do
{
numBytes = 64;
if (!break_server.Recv(dummy, numBytes))
{
PLOG(PL_ERROR, "ProtoZmq::PollerThread::RunThread() error: break_server.Recv() failure!\n");
break;
}
} while (0 != numBytes);
continue;
}
Socket* zmqSocket = reinterpret_cast<Socket*>(item->user_data);
zmqSocket->poller_mutex.Lock();
zmqSocket->SetPollStatus(item->events);
int tempFlags = zmqSocket->GetPollFlags() & ~item->events;
zmq_poller_modify(zmq_poller, item->socket, tempFlags | ZMQ_POLLERR);
zmqSocket->SetEvent();
zmqSocket->poller_mutex.Unlock();
}
}
suspend_mutex.Unlock();
return ProtoThread::GetExitCode();
}
bool ProtoZmq::PollerThread::AddSocket(Socket& zmqSocket)
{
void* zmqContext = zmqSocket.UsingExternalContext() ? zmqSocket.GetContext() : NULL;
if (!Open(zmqContext, true))
{
PLOG(PL_ERROR, "ProtoZmq::PollerThread::AddSocket() error: unable to open/start poller thread!\n");
return false;
}
if (!Signal())
{
PLOG(PL_ERROR, "ProtoZmq::PollerThread::AddSocket() error: unable to signal thread!\n");
suspend_mutex.Unlock();
return false;
}
if (!AddSocketPrivate(zmqSocket))
{
PLOG(PL_ERROR, "ProtoZmq::PollerThread::AddSocket() error: unable to add socket to poller!\n");
Unsignal();
suspend_mutex.Unlock();
return false;
}
Unsignal();
suspend_mutex.Unlock();
return true;
}
bool ProtoZmq::PollerThread::ModSocket(Socket& zmqSocket)
{
suspend_mutex.Lock();
ASSERT (NULL != zmq_poller);
if (!Signal())
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::ModSocket() error: unable to signal poller thread!\n");
suspend_mutex.Unlock();
return false;
}
if (-1 == zmq_poller_modify(zmq_poller, zmqSocket.GetSocket(), zmqSocket.GetPollFlags() | ZMQ_POLLERR))
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::ModSocket() zmq_poller_modify() error: %s\n", zmq_strerror(zmq_errno()));
Unsignal();
suspend_mutex.Unlock();
return false;
}
Unsignal();
suspend_mutex.Unlock();
return true;
}
bool ProtoZmq::PollerThread::AddSocketPrivate(Socket& zmqSocket)
{
if (socket_count >= event_array_length)
{
unsigned int length = (0 != event_array_length) ? event_array_length*2 : 2;
zmq_poller_event_t* tempArray = new zmq_poller_event_t[length];
if (NULL == tempArray)
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::AddSocketPrivate() new event_array error: %s\n", GetErrorString());
return false;
}
if (NULL != event_array) delete[] event_array; event_array = tempArray; event_array_length = length; }
if (-1 == zmq_poller_add(zmq_poller, zmqSocket.GetSocket(), &zmqSocket, zmqSocket.GetPollFlags() | ZMQ_POLLERR))
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::AddSocketPrivate() zmq_poller_add() error: %s\n", zmq_strerror(zmq_errno()));
return false;
}
socket_count += 1;
return true;
}
bool ProtoZmq::PollerThread::RemoveSocket(Socket& zmqSocket)
{
suspend_mutex.Lock();
if (NULL == zmq_poller)
{
suspend_mutex.Unlock();
return true;
}
if (!Signal())
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::ModSocket() error: unable to signal poller thread!\n");
suspend_mutex.Unlock();
return false;
}
if (-1 == zmq_poller_remove(zmq_poller, zmqSocket.GetSocket()))
{
PLOG(PL_ERROR, " ProtoZmq::PollerThread::RemoveSocket() zmq_poller_remove() error: %s\n", zmq_strerror(zmq_errno()));
Unsignal();
suspend_mutex.Unlock();
return false;
}
socket_count -= 1;
Unsignal();
if (socket_count < 2)
ClosePrivate(); suspend_mutex.Unlock();
return true;
}
bool ProtoZmq::PollerThread::Signal()
{
if (IsStarted() && !IsMyself())
{
char dummy = 0;
unsigned int one;
do
{
one = 1;
if (!break_client.Send(&dummy, one))
{
PLOG(PL_ERROR, "ProtoDispatcher::Signal() error: SetBreak() failed!\n");
return false;
}
} while (0 == one);
signal_mutex.Lock();
}
if (!poller_running)
{
signal_mutex.Unlock();
return false;
}
return true;
}
void ProtoZmq::PollerThread::Unsignal()
{
if (IsStarted() && !IsMyself())
{
signal_mutex.Unlock();
}
}