#ifdef MPI_YES
#include <mpi.h>
#else
#include <mpi_dummy.h>
#endif
#include <zmq.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <stdio.h>
#include "msg_zmq.h"
using namespace CSLIB_NS;
MsgZMQ::MsgZMQ(int csflag, const void *ptr, MPI_Comm cworld) :
Msg(csflag, ptr, cworld)
{
char *port = (char *) ptr;
init(port);
}
MsgZMQ::MsgZMQ(int csflag, const void *ptr) : Msg(csflag, ptr)
{
char *port = (char *) ptr;
init(port);
}
MsgZMQ::~MsgZMQ()
{
if (me == 0) {
zmq_close(socket);
zmq_ctx_destroy(context);
}
}
void MsgZMQ::init(char *port)
{
#ifdef ZMQ_NO
error_all("constructor(): Library not built with ZMQ support");
#endif
if (me == 0) {
int n = strlen(port) + 8;
char *socket_name = new char[n];
strcpy(socket_name,"tcp://");
strcat(socket_name,port);
if (client) {
context = zmq_ctx_new();
socket = zmq_socket(context,ZMQ_REQ);
zmq_connect(socket,socket_name);
} else if (server) {
context = zmq_ctx_new();
socket = zmq_socket(context,ZMQ_REP);
int rc = zmq_bind(socket,socket_name);
if (rc) error_one("constructor(): Server could not make socket connection");
}
delete [] socket_name;
}
}
void MsgZMQ::send(int nheader, int *header, int nbuf, char *buf)
{
lengths[0] = nheader;
lengths[1] = nbuf;
if (me == 0) {
zmq_send(socket,lengths,2*sizeof(int),0);
zmq_recv(socket,NULL,0,0);
}
if (me == 0) {
zmq_send(socket,header,nheader*sizeof(int),0);
zmq_recv(socket,NULL,0,0);
}
if (me == 0) zmq_send(socket,buf,nbuf,0);
}
void MsgZMQ::recv(int &maxheader, int *&header, int &maxbuf, char *&buf)
{
if (me == 0) {
zmq_recv(socket,lengths,2*sizeof(int),0);
zmq_send(socket,NULL,0,0);
}
if (nprocs > 1) MPI_Bcast(lengths,2,MPI_INT,0,world);
int nheader = lengths[0];
int nbuf = lengths[1];
allocate(nheader,maxheader,header,nbuf,maxbuf,buf);
if (me == 0) {
zmq_recv(socket,header,nheader*sizeof(int),0);
zmq_send(socket,NULL,0,0);
}
if (nprocs > 1) MPI_Bcast(header,nheader,MPI_INT,0,world);
if (me == 0) zmq_recv(socket,buf,nbuf,0);
if (nprocs > 1) MPI_Bcast(buf,nbuf,MPI_CHAR,0,world);
}