#define _POSIX_C_SOURCE 200809L
#include <arpa/inet.h>
#include <assert.h>
#include <demi/libos.h>
#include <demi/sga.h>
#include <demi/wait.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#define DATA_SIZE 64
#define MAX_MSGS 1024
static void sighandler(int signum)
{
const char *signame = strsignal(signum);
fprintf(stderr, "\nReceived %s signal\n", signame);
fprintf(stderr, "Exiting...\n");
exit(EXIT_SUCCESS);
}
static void push_wait(int qd, demi_sgarray_t *sga, demi_qresult_t *qr)
{
demi_qtoken_t qt = -1;
assert(demi_push(&qt, qd, sga) == 0);
assert(demi_wait(qr, qt, NULL) == 0);
assert(qr->qr_opcode == DEMI_OPC_PUSH);
}
static void pop_wait(int qd, demi_qresult_t *qr)
{
demi_qtoken_t qt = -1;
assert(demi_pop(&qt, qd) == 0);
assert(demi_wait(qr, qt, NULL) == 0);
assert(qr->qr_opcode == DEMI_OPC_POP);
assert(qr->qr_value.sga.sga_segs != 0);
}
static void server(int argc, char *const argv[], size_t data_size, unsigned max_msgs)
{
size_t nbytes = 0;
char name[1024];
int pipeqd_rx = -1;
int pipeqd_tx = -1;
size_t max_bytes = data_size * max_msgs;
const struct demi_args args = {
.argc = argc,
.argv = argv,
.callback = NULL,
};
assert(demi_init(&args) == 0);
sprintf(name, "%s:rx", argv[2]);
assert(demi_create_pipe(&pipeqd_rx, name) == 0);
sprintf(name, "%s:tx", argv[2]);
assert(demi_create_pipe(&pipeqd_tx, name) == 0);
while (nbytes < max_bytes)
{
demi_qresult_t qr = {0};
demi_sgarray_t sga = {0};
pop_wait(pipeqd_rx, &qr);
memcpy(&sga, &qr.qr_value.sga, sizeof(demi_sgarray_t));
nbytes += sga.sga_segs[0].sgaseg_len;
push_wait(pipeqd_tx, &sga, &qr);
assert(demi_sgafree(&sga) == 0);
fprintf(stdout, "ping (%zu)\n", nbytes);
}
}
static void client(int argc, char *const argv[], size_t data_size, unsigned max_msgs)
{
size_t nbytes = 0;
char name[1024];
int pipeqd_tx = -1;
int pipeqd_rx = -1;
size_t max_bytes = data_size * max_msgs;
const struct demi_args args = {
.argc = argc,
.argv = argv,
.callback = NULL,
};
assert(demi_init(&args) == 0);
sprintf(name, "%s:tx", argv[2]);
assert(demi_open_pipe(&pipeqd_rx, name) == 0);
sprintf(name, "%s:rx", argv[2]);
assert(demi_open_pipe(&pipeqd_tx, name) == 0);
while (nbytes < max_bytes)
{
demi_qresult_t qr = {0};
demi_sgarray_t sga = {0};
sga = demi_sgaalloc(data_size);
assert(sga.sga_segs != 0);
memset(sga.sga_segs[0].sgaseg_buf, 1, data_size);
push_wait(pipeqd_tx, &sga, &qr);
assert(demi_sgafree(&sga) == 0);
memset(&qr, 0, sizeof(demi_qresult_t));
pop_wait(pipeqd_rx, &qr);
for (uint32_t i = 0; i < qr.qr_value.sga.sga_segs[0].sgaseg_len; i++)
assert(((char *)qr.qr_value.sga.sga_segs[0].sgaseg_buf)[i] == 1);
nbytes += qr.qr_value.sga.sga_segs[0].sgaseg_len;
assert(demi_sgafree(&qr.qr_value.sga) == 0);
fprintf(stdout, "pong (%zu)\n", nbytes);
}
}
static void usage(const char *progname)
{
fprintf(stderr, "Usage: %s MODE pipe-name\n", progname);
fprintf(stderr, "MODE:\n");
fprintf(stderr, " --client Run in client mode.\n");
fprintf(stderr, " --server Run in server mode.\n");
}
int main(int argc, char *const argv[])
{
signal(SIGINT, sighandler);
signal(SIGQUIT, sighandler);
signal(SIGTSTP, sighandler);
if (argc >= 3)
{
size_t data_size = DATA_SIZE;
unsigned max_msgs = MAX_MSGS;
if (argc >= 4)
sscanf(argv[3], "%zu", &data_size);
if (argc >= 5)
sscanf(argv[4], "%u", &max_msgs);
if (!strcmp(argv[1], "--server"))
server(argc, argv, data_size, max_msgs);
else if (!strcmp(argv[1], "--client"))
client(argc, argv, data_size, max_msgs);
return (EXIT_SUCCESS);
}
usage(argv[0]);
return (EXIT_SUCCESS);
}