#include "protoZMQ.h"
#include "protoTimer.h"
#include "protoApp.h"
#include <stdio.h>
#include <stdlib.h>
class ZmqExample : public ProtoApp
{
public:
ZmqExample();
~ZmqExample();
bool OnStartup(int argc, const char*const* argv);
bool ProcessCommands(int argc, const char*const* argv);
void OnShutdown();
bool OnCommand(const char* cmd, const char* val);
private:
enum CmdType {CMD_INVALID, CMD_ARG, CMD_NOARG};
static CmdType GetCmdType(const char* string);
static const char* const CMD_LIST[];
void Usage();
bool OnTxTimeout(ProtoTimer& theTimer);
void OnSocketEvent(ProtoEvent& theEvent);
ProtoTimer tx_timer;
ProtoZmq::Socket zmq_socket;
char* msg_buffer;
unsigned int msg_len;
unsigned int msg_index;
int msg_repeat;
int msg_repeat_count;
};
PROTO_INSTANTIATE_APP(ZmqExample)
ZmqExample::ZmqExample()
: msg_buffer(NULL), msg_len(0), msg_index(0),
msg_repeat(0), msg_repeat_count(0)
{
tx_timer.SetListener(this, &ZmqExample::OnTxTimeout);
tx_timer.SetInterval(1.0);
tx_timer.SetRepeat(-1);
zmq_socket.SetNotifier(&dispatcher);
zmq_socket.SetListener(this, &ZmqExample::OnSocketEvent);
}
ZmqExample::~ZmqExample()
{
}
const char* const ZmqExample::CMD_LIST[] =
{
"+publish", "+subscribe", "+radio", "+dish", "+repeat", "+send", NULL
};
void ZmqExample::Usage()
{
fprintf(stderr, "zmqExample [publish <endpoint>][subscribe <endpoint>]\n"
" [send <message>][repeat <repeatCount>]\n");
}
bool ZmqExample::OnStartup(int argc, const char*const* argv)
{
if (argc < 2)
{
Usage();
return false;
}
if (!ProcessCommands(argc, argv))
{
PLOG(PL_ERROR, "ZmqExample::OnStartup() error processing command line\n");
Usage();
return false;
}
return true;
}
void ZmqExample::OnShutdown()
{
if (tx_timer.IsActive()) tx_timer.Deactivate();
if (zmq_socket.IsOpen()) zmq_socket.Close();
if (zmq_socket.IsOpen()) zmq_socket.Close();
PLOG(PL_ERROR, "zmqExample: Done.\n");
}
bool ZmqExample::OnCommand(const char* cmd, const char* val)
{
CmdType type = GetCmdType(cmd);
ASSERT(CMD_INVALID != type);
unsigned int len = strlen(cmd);
if ((CMD_ARG == type) && !val)
{
PLOG(PL_ERROR, "zmqExample::ProcessCommand(%s) missing argument\n", cmd);
return false;
}
else if (!strncmp("publish", cmd, len))
{
if (zmq_socket.IsOpen()) zmq_socket.Close();
if (!zmq_socket.Open(ZMQ_PUB))
{
PLOG(PL_ERROR, "ZmqExample::OnCommand() zmq_socket.Open() error\n");
return false;
}
if (!zmq_socket.Bind(val))
{
PLOG(PL_ERROR, "ZmqExample::OnCommand() zmq_socket.Bind() error\n");
return false;
}
TRACE("zmqExample: publishing to %s ...\n", val);
}
else if (!strncmp("subscribe", cmd, len))
{
if (zmq_socket.IsOpen()) zmq_socket.Close();
if (!zmq_socket.Open(ZMQ_SUB))
{
PLOG(PL_ERROR, "ZmqExample::OnCommand() zmq_socket.Open() error\n");
return false;
}
if (!zmq_socket.Connect(val))
{
PLOG(PL_ERROR, "ZmqExample::OnCommand() zmq_socket.Connect() error\n");
return false;
}
if (!zmq_socket.Subscribe(NULL))
{
PLOG(PL_ERROR, "ZmqExample::OnCommand() zmq_setsockopt(ZMQ_SUBSCRIBE) error\n");
}
TRACE("zmqExample: subscribed to %s ...\n", val);
}
else if (!strncmp("radio", cmd, len))
{
if (zmq_socket.IsOpen()) zmq_socket.Close();
if (!zmq_socket.Open(ZMQ_RADIO))
{
PLOG(PL_ERROR, "ZmqExample::OnCommand() zmq_socket.Open() error\n");
return false;
}
if (!zmq_socket.Connect(val))
{
PLOG(PL_ERROR, "ZmqExample::OnCommand() zmq_socket.Bind() error\n");
return false;
}
TRACE("zmqExample: publishing to %s ...\n", val);
}
else if (!strncmp("dish", cmd, len))
{
if (zmq_socket.IsOpen()) zmq_socket.Close();
if (!zmq_socket.Open(ZMQ_DISH))
{
PLOG(PL_ERROR, "ZmqExample::OnCommand() zmq_socket.Open() error\n");
return false;
}
if (!zmq_socket.Bind(val))
{
PLOG(PL_ERROR, "ZmqExample::OnCommand() zmq_socket.Connect() error\n");
return false;
}
if (!zmq_socket.Join("")) {
PLOG(PL_ERROR, "ZmqExample::OnCommand() zmq_join() error: %s\n", zmq_strerror(zmq_errno()));
return false;
}
TRACE("zmqExample: subscribed to %s ...\n", val);
}
else if (!strncmp("repeat", cmd, len))
{
msg_repeat = atoi(val);
}
else if (!strncmp("send", cmd, len))
{
if (msg_buffer) delete[] msg_buffer;
msg_len = strlen(val);
if (!(msg_buffer = new char[msg_len + 8]))
{
PLOG(PL_ERROR, "zmqExample: new msg_buffer error: %s\n", GetErrorString());
msg_len = 0;
return false;
}
strcpy(msg_buffer, val);
msg_index = 0;
msg_repeat_count = msg_repeat;
if (tx_timer.IsActive()) tx_timer.Deactivate();
if (msg_repeat_count) ActivateTimer(tx_timer);
OnTxTimeout(tx_timer); }
else
{
PLOG(PL_ERROR, "ZmqExample::OnCommand() unknown command error?\n");
return false;
}
return true;
}
bool ZmqExample::OnTxTimeout(ProtoTimer& )
{
TRACE("zmqExample::OnTxTimeout() ...\n");
sprintf(msg_buffer + msg_len, " %u", (unsigned int)msg_index++);
unsigned int numBytes = strlen(msg_buffer) + 1;
TRACE("sending %u bytes: %s ...\n", numBytes, msg_buffer);
zmq_socket.Send(msg_buffer, numBytes);
msg_buffer[msg_len] = '\0';
msg_index %= 1024; return true;
}
void ZmqExample::OnSocketEvent(ProtoEvent& )
{
int events;
size_t len = sizeof(int);
unsigned int numBytes;
do
{
if (0 != zmq_getsockopt(zmq_socket.GetSocket(), ZMQ_EVENTS, &events, &len))
{
PLOG(PL_ERROR, "ZmqExample::OnSubEvent() zmq_getsockopt() error: %s\n", GetErrorString());
break;
}
if (0 != (ZMQ_POLLIN & events))
{
char buffer[2048];
buffer[2047] = '\0';
numBytes = 2047;
if (zmq_socket.Recv(buffer, numBytes))
{
buffer[numBytes] = '\0';
TRACE("received %u bytes: %s\n", numBytes, buffer);
}
else
{
TRACE("receive error?\n");
}
}
} while (0 != (ZMQ_POLLIN & events));
if ((0 != (ZMQ_POLLOUT & zmq_socket.GetPollFlags())) &&
(0 != (ZMQ_POLLOUT & events)))
{
TRACE(" ZMQ_POLLOUT\n"); }
if (0 != (ZMQ_POLLERR & events))
{
TRACE(" ZMQ_POLLERR\n");
}
}
ZmqExample::CmdType ZmqExample::GetCmdType(const char* cmd)
{
if (!cmd) return CMD_INVALID;
unsigned int len = strlen(cmd);
bool matched = false;
CmdType type = CMD_INVALID;
const char* const* nextCmd = CMD_LIST;
while (*nextCmd)
{
if (!strncmp(cmd, *nextCmd+1, len))
{
if (matched)
{
return CMD_INVALID;
}
else
{
matched = true;
if ('+' == *nextCmd[0])
type = CMD_ARG;
else
type = CMD_NOARG;
}
}
nextCmd++;
}
return type;
}
bool ZmqExample::ProcessCommands(int argc, const char*const* argv)
{
int i = 1;
while ( i < argc)
{
switch (GetCmdType(argv[i]))
{
case CMD_INVALID:
PLOG(PL_ERROR, "zmqExample::ProcessCommands() Invalid command:%s\n",
argv[i]);
return false;
case CMD_NOARG:
if (!OnCommand(argv[i], NULL))
{
PLOG(PL_ERROR, "zmqExample::ProcessCommands() ProcessCommand(%s) error\n",
argv[i]);
return false;
}
i++;
break;
case CMD_ARG:
if (!OnCommand(argv[i], argv[i+1]))
{
PLOG(PL_ERROR, "zmqExample::ProcessCommands() ProcessCommand(%s, %s) error\n",
argv[i], argv[i+1]);
return false;
}
i += 2;
break;
}
}
return true;
}