#include "dada_client.h"
#include "dada_hdu.h"
#include "dada_def.h"
#include "node_array.h"
#include "string_array.h"
#include "ascii_header.h"
#include "daemon.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
typedef struct {
unsigned verbose;
unsigned quit;
unsigned check_xfers;
void * local_buffer;
size_t local_buffer_size;
} dada_dbnull_t;
#define DADA_DBNULL_INIT { 0, 0, 0, 0, 0 }
void usage()
{
fprintf (stdout,
"dada_dbnull [options]\n"
" -k key connect to key data block\n"
" -l copy contents into a local memory buffer\n"
" -v be verbose\n"
" -q be quiet\n"
" -x mbytes transfer size MB [default 64]\n"
" -X mbytes transfer size MiB [default 64]\n"
" -o mbytes transfer block sizei MB [default 8]\n"
" -O mbytes transfer block size Mi [default 8]\n"
" -s 1 transfer, then exit\n"
" -S multiple transfers until OBS_XFER=-1, then exit\n"
" -z use zero copy direct shm access\n"
" -d run as daemon\n");
}
int sock_open_function (dada_client_t* client)
{
assert (client != 0);
multilog_t* log = client->log;
assert (log != 0);
dada_dbnull_t * ctx = (dada_dbnull_t *) client->context;
assert(ctx != 0);
char * header = client->header;
assert (header != 0);
if (ctx->check_xfers)
{
int64_t obs_xfer = 0;
if (ascii_header_get (header, "OBS_XFER", "%"PRIi64, &obs_xfer) != 1)
{
multilog (log, LOG_WARNING, "header with no OBS_XFER, assuming END of XFERS\n");
obs_xfer = -1;
}
if (obs_xfer == -1)
{
if (ctx->verbose)
multilog (log, LOG_INFO, "open: OBS_XFER == -1\n");
ctx->quit = 1;
}
}
if (ctx->verbose)
{
uint64_t obs_offset = 0;
if (ascii_header_get (header, "OBS_OFFSET", "%"PRIu64, &obs_offset) != 1)
{
multilog (log, LOG_WARNING, "header with no OBS_OFFSET\n");
}
char utc_start[64];
if (ascii_header_get (header, "UTC_START", "%s", utc_start) != 1)
{
multilog (log, LOG_WARNING, "header with no UTC_START\n");
}
multilog (log, LOG_INFO, "UTC_START=%s, OBS_OFFSET=%"PRIu64"\n", utc_start, obs_offset);
}
uint64_t file_size = 0;
if (ascii_header_get (header, "FILE_SIZE", "%"PRIu64, &file_size) != 1)
{
multilog (log, LOG_WARNING, "header with no FILE_SIZE, transfer_bytes set to 0\n");
file_size = 0;
}
client->transfer_bytes = file_size;
if (ctx->verbose)
{
multilog (log, LOG_INFO, "open: transfer_bytes=%"PRIu64" bytes\n", client->transfer_bytes);
}
client->fd = 1;
return 0;
}
int sock_close_function (dada_client_t* client, uint64_t bytes_written)
{
assert (client != 0);
multilog_t* log = client->log;
assert (log != 0);
dada_dbnull_t * ctx = (dada_dbnull_t *) client->context;
assert(ctx != 0);
if (bytes_written < client->transfer_bytes) {
if (!client->quiet || ctx->verbose)
multilog (log, LOG_INFO, "close: Transfer stopped early at %"PRIu64" bytes\n",
bytes_written);
}
return 0;
}
int64_t sock_send_function (dada_client_t* client,
void* data, uint64_t data_size)
{
return data_size;
}
int64_t sock_send_block_function (dada_client_t* client,
void* data, uint64_t data_size, uint64_t block_id)
{
dada_dbnull_t * ctx = (dada_dbnull_t *) client->context;
if (ctx->local_buffer)
memcpy (ctx->local_buffer, data, data_size);
return data_size;
}
int64_t sock_send_block_cuda_function (dada_client_t* client,
void* data, uint64_t data_size, uint64_t block_id)
{
return data_size;
}
int main (int argc, char **argv)
{
dada_dbnull_t dbnull = DADA_DBNULL_INIT;
dada_hdu_t* hdu = 0;
dada_client_t* client = 0;
multilog_t* log = 0;
char daemon = 0;
char verbose = 0;
char quiet = 0;
char single_transfer = 0;
char multiple_xfers = 0;
int optimal_mbytes = 8;
int transfer_size_mbytes = 64;
int byte_base = 1024*1024;
key_t dada_key = DADA_DEFAULT_BLOCK_KEY;
char zero_copy = 0;
int arg = 0;
char local_copy = 0;
while ((arg=getopt(argc,argv,"dlN:vk:o:O:qsSx:X:z")) != -1)
switch (arg) {
case 'd':
daemon=1;
break;
case 'v':
verbose=1;
break;
case 'k':
if (sscanf (optarg, "%x", &dada_key) != 1) {
fprintf (stderr, "dada_db: could not parse key from %s\n", optarg);
return -1;
}
break;
case 'l':
local_copy = 1;
break;
case 'o':
optimal_mbytes = atoi(optarg);
break;
case 'O':
optimal_mbytes = atoi(optarg);
byte_base = 1000000;
break;
case 'q':
quiet = 1;
break;
case 's':
single_transfer = 1;
break;
case 'S':
multiple_xfers = 1;
break;
case 'x':
transfer_size_mbytes = atoi(optarg);
break;
case 'X':
transfer_size_mbytes = atoi(optarg);
byte_base = 1000000;
break;
case 'z':
zero_copy = 1;
break;
default:
usage ();
return 0;
}
log = multilog_open ("dada_dbnull", daemon);
if (daemon) {
be_a_daemon ();
multilog_serve (log, DADA_DEFAULT_DBNULL_LOG);
}
else
multilog_add (log, stderr);
hdu = dada_hdu_create (log);
dada_hdu_set_key(hdu, dada_key);
if (dada_hdu_connect (hdu) < 0)
return EXIT_FAILURE;
if (dada_hdu_lock_read (hdu) < 0)
return EXIT_FAILURE;
if (local_copy)
{
dbnull.local_buffer_size = ipcbuf_get_bufsz ((ipcbuf_t *) hdu->data_block);
dbnull.local_buffer = malloc (dbnull.local_buffer_size);
}
client = dada_client_create ();
dbnull.check_xfers = multiple_xfers;
dbnull.verbose = verbose;
client->context = &dbnull;
client->log = log;
client->data_block = hdu->data_block;
client->header_block = hdu->header_block;
client->open_function = sock_open_function;
if (zero_copy)
client->io_block_function = sock_send_block_function;
client->io_block_function_cuda = sock_send_block_cuda_function;
client->io_function = sock_send_function;
client->close_function = sock_close_function;
client->direction = dada_client_reader;
client->transfer_bytes = transfer_size_mbytes * byte_base;
client->optimal_bytes = optimal_mbytes * byte_base;
client->quiet = quiet;
while (!client->quit)
{
if (dada_client_read (client) < 0)
multilog (log, LOG_ERR, "Error during transfer\n");
if (dada_hdu_unlock_read (hdu) < 0)
{
multilog (log, LOG_ERR, "could not unlock read on hdu\n");
return EXIT_FAILURE;
}
if (single_transfer || (multiple_xfers && dbnull.quit))
client->quit = 1;
if (!client->quit)
{
if (dada_hdu_lock_read (hdu) < 0)
{
multilog (log, LOG_ERR, "could not lock read on hdu\n");
return EXIT_FAILURE;
}
}
}
if (dada_hdu_disconnect (hdu) < 0)
return EXIT_FAILURE;
if (local_copy)
free (dbnull.local_buffer);
return EXIT_SUCCESS;
}