#include "dada_pwc_main.h"
#include "dada_def.h"
#include "ascii_header.h"
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
dada_pwc_main_t* dada_pwc_main_create ()
{
dada_pwc_main_t* pwcm = malloc (sizeof(dada_pwc_main_t));
assert (pwcm != 0);
pwcm -> pwc = 0;
pwcm -> log = 0;
pwcm -> data_block = 0;
pwcm -> header_block = 0;
pwcm -> start_function = 0;
pwcm -> buffer_function = 0;
pwcm -> block_function = 0;
pwcm -> stop_function = 0;
pwcm -> error_function = 0;
pwcm -> header_valid_function = 0;
pwcm -> xfer_pending_function = 0;
pwcm -> new_xfer_function = 0;
pwcm -> context = 0;
pwcm -> header = 0;
pwcm -> header_size = 0;
pwcm -> verbose = 0;
pwcm -> header_valid = 0;
return pwcm;
}
void dada_pwc_main_destroy (dada_pwc_main_t* pwcm)
{
free (pwcm);
}
int dada_pwc_main_prepare (dada_pwc_main_t* pwcm);
int dada_pwc_main_start_transfer (dada_pwc_main_t* pwcm);
int dada_pwc_main_transfer_data (dada_pwc_main_t* pwcm);
int dada_pwc_main_transfer_data_block (dada_pwc_main_t* pwcm);
int dada_pwc_main_stop_transfer (dada_pwc_main_t* pwcm);
int dada_pwc_main (dada_pwc_main_t* pwcm)
{
if (!pwcm) {
fprintf (stderr, "dada_pwc_main no main!\n");
return -1;
}
if (!pwcm->pwc) {
fprintf (stderr, "dada_pwc_main no PWC command connection\n");
return -1;
}
if (!pwcm->log) {
fprintf (stderr, "dada_pwc_main no logging facility\n");
return -1;
}
if (!pwcm->start_function) {
fprintf (stderr, "dada_pwc_main no start function\n");
return -1;
}
if (!pwcm->buffer_function) {
fprintf (stderr, "dada_pwc_main no buffer function\n");
return -1;
}
if (!pwcm->stop_function) {
fprintf (stderr, "dada_pwc_main no stop function\n");
return -1;
}
if (!pwcm->pwc->log)
pwcm->pwc->log = pwcm->log;
int rval = 0;
while (!dada_pwc_quit (pwcm->pwc))
{
rval = dada_pwc_main_prepare (pwcm);
if (dada_pwc_quit (pwcm->pwc))
break;
if (rval < 0)
dada_pwc_main_process_error (pwcm, rval);
else
{
rval = dada_pwc_main_start_transfer (pwcm);
if (rval < 0)
dada_pwc_main_process_error (pwcm, rval);
else {
if (pwcm->block_function)
rval = dada_pwc_main_transfer_data_block (pwcm);
else
rval = dada_pwc_main_transfer_data (pwcm);
if (rval < 0)
dada_pwc_main_process_error (pwcm, rval);
}
rval = dada_pwc_main_stop_transfer (pwcm);
if (rval < 0)
dada_pwc_main_process_error (pwcm, rval);
}
if (pwcm->pwc->state == dada_pwc_fatal_error)
pwcm->pwc->quit = 1;
}
return rval;
}
int dada_pwc_main_prepare (dada_pwc_main_t* pwcm)
{
if (pwcm->header_block) {
pwcm->header_size = ipcbuf_get_bufsz (pwcm->header_block);
pwcm->header = ipcbuf_get_next_write (pwcm->header_block);
if (!pwcm->header) {
multilog (pwcm->log, LOG_ERR, "Could not get next header block\n");
return DADA_ERROR_HARD;
}
}
if (pwcm->data_block && ipcio_is_open (pwcm->data_block)
&& ipcio_close (pwcm->data_block) < 0)
{
multilog (pwcm->log, LOG_ERR, "Could not close Data Block\n");
return DADA_ERROR_HARD;
}
while (!dada_pwc_quit (pwcm->pwc))
{
pwcm->command = dada_pwc_command_get (pwcm->pwc);
if (dada_pwc_quit (pwcm->pwc))
break;
if (pwcm->command.code == dada_pwc_reset)
{
dada_pwc_set_state (pwcm->pwc, dada_pwc_idle, 0);
}
else if (pwcm->command.code == dada_pwc_header)
{
#ifdef _DEBUG
multilog (pwcm->log, LOG_INFO,
"HEADER START\n%s\nHEADER END\n", pwcm->command.header);
#endif
if (pwcm->header_block)
strncpy (pwcm->header, pwcm->command.header, pwcm->header_size);
dada_pwc_set_state (pwcm->pwc, dada_pwc_prepared, 0);
}
else if (pwcm->command.code == dada_pwc_clock)
{
if (pwcm->command.byte_count) {
multilog (pwcm->log, LOG_ERR, "dada_pwc_main_idle internal error. "
"byte count specified in CLOCK command\n");
return DADA_ERROR_SOFT;
}
if (pwcm->data_block && ipcio_open (pwcm->data_block, 'w') < 0) {
multilog (pwcm->log, LOG_ERR, "Could not open data block\n");
return DADA_ERROR_HARD;
}
return 0;
}
else if (pwcm->command.code == dada_pwc_start)
{
#ifdef _DEBUG
multilog (pwcm->log, LOG_INFO, "Start recording data\n");
#endif
if (pwcm->command.byte_count)
multilog (pwcm->log, LOG_INFO,
"Will record %"PRIu64" bytes\n", pwcm->command.byte_count);
if (pwcm->data_block && ipcio_open (pwcm->data_block, 'W') < 0) {
multilog (pwcm->log, LOG_ERR, "Could not open data block\n");
return DADA_ERROR_HARD;
}
return 0;
}
else if (pwcm->command.code == dada_pwc_stop)
{
if (pwcm->pwc->state == dada_pwc_soft_error)
multilog (pwcm->log, LOG_WARNING, "Resetting soft_error to idle\n");
else
multilog (pwcm->log, LOG_WARNING, "dada_pwc_main_prepare: Unexpected stop command\n");
dada_pwc_set_state (pwcm->pwc, dada_pwc_idle, 0);
}
else
{
multilog (pwcm->log, LOG_ERR, "dada_pwc_main_prepare internal error = "
"unexpected command code %s\n",
dada_pwc_cmd_code_string(pwcm->command.code));
return DADA_ERROR_HARD;
}
}
return 0;
}
int dada_pwc_main_start_transfer (dada_pwc_main_t* pwcm)
{
#ifdef _DEBUG
fprintf (stderr, "dada_pwc_main_start_transfer: call start function\n");
#endif
time_t utc = pwcm->start_function (pwcm, pwcm->command.utc);
const unsigned int buffer_size = 32;
char buffer[buffer_size];
if (utc < 0) {
multilog (pwcm->log, LOG_ERR, "start_function returned invalid UTC\n");
return DADA_ERROR_HARD;
}
if (utc == 0)
sprintf (buffer, "%s", "UNKNOWN");
else
{
strftime (buffer, buffer_size, DADA_TIMESTR, gmtime (&utc));
}
multilog (pwcm->log, LOG_INFO, "UTC_START = %s\n", buffer);
if (pwcm->header_block) {
if (ascii_header_set (pwcm->header, "UTC_START", "%s", buffer) < 0) {
multilog (pwcm->log, LOG_ERR, "failed ascii_header_set UTC_START\n");
return DADA_ERROR_SOFT;
}
if (utc > 0) {
pwcm->pwc->utc_start = utc;
multilog(pwcm->log, LOG_INFO, "Setting pwcm->pwc->utc_start = %d\n",pwcm->pwc->utc_start);
}
if (pwcm->command.code == dada_pwc_start) {
if (pwcm->header_valid_function)
pwcm->header_valid = pwcm->header_valid_function(pwcm);
else
pwcm->header_valid = 1;
if (pwcm->header_valid) {
#ifdef _DEBUG
multilog(pwcm->log, LOG_INFO, "dada_pwc_main_start_transfer: marking header filled\n");
#endif
if ( ipcbuf_mark_filled (pwcm->header_block, pwcm->header_size) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not marked header filled or command.code != start\n");
return DADA_ERROR_HARD;
}
}
}
}
#ifdef _DEBUG
fprintf (stderr, "dada_pwc_main_start_transfer: exit change state\n");
#endif
ipcbuf_set_soclock_buf ((ipcbuf_t*) pwcm->data_block);
if (pwcm->command.code == dada_pwc_clock)
return dada_pwc_set_state (pwcm->pwc, dada_pwc_clocking, utc);
else if (pwcm->command.code == dada_pwc_start)
return dada_pwc_set_state (pwcm->pwc, dada_pwc_recording, utc);
multilog (pwcm->log, LOG_ERR, "dada_pwc_main_start_transfer"
" internal error = invalid state\n");
return DADA_ERROR_HARD;
}
int dada_pwc_main_record_start (dada_pwc_main_t* pwcm)
{
uint64_t minimum_record_start = 0;
uint64_t utc_start_byte = 0;
uint64_t command_start_byte = 0;
char* header = 0;
minimum_record_start = ipcio_get_start_minimum (pwcm->data_block);
utc_start_byte = ipcio_get_soclock_byte(pwcm->data_block);
command_start_byte = utc_start_byte + pwcm->command.byte_count;
if (command_start_byte < minimum_record_start) {
multilog (pwcm->log, LOG_ERR, "Requested start byte=%"PRIu64
" reset to minimum=%"PRIu64"\n", command_start_byte,
minimum_record_start);
command_start_byte = minimum_record_start;
pwcm->command.byte_count = minimum_record_start - utc_start_byte;
}
multilog (pwcm->log, LOG_INFO, "REC_START\n");
multilog (pwcm->log, LOG_INFO, "pwcm->command.utc = %d\n",pwcm->command.utc);
multilog (pwcm->log, LOG_INFO, "pwcm->pwc->utc_start = %d\n",pwcm->pwc->utc_start);
header = ipcbuf_get_next_write (pwcm->header_block);
if (header != pwcm->header) {
memcpy (header, pwcm->header, pwcm->header_size);
pwcm->header = header;
}
time_t utc = pwcm->command.utc;
int buffer_size = 64;
char buffer[buffer_size];
strftime (buffer, buffer_size, DADA_TIMESTR, gmtime (&utc));
multilog (pwcm->log, LOG_INFO, "dada_pwc_main_record_start: UTC_START reset to REC_START = %s\n", buffer);
if (ascii_header_set (pwcm->header, "UTC_START", "%s", buffer) < 0) {
multilog (pwcm->log, LOG_ERR, "fail ascii_header_set UTC_START\n");
return DADA_ERROR_HARD;
}
multilog (pwcm->log, LOG_INFO, "dada_pwc_main_record_start: OBS_OFFSET = 0\n");
if (ascii_header_set (pwcm->header, "OBS_OFFSET", "%"PRIu64, 0) < 0) {
multilog (pwcm->log, LOG_ERR, "fail ascii_header_set OBS_OFFSET\n");
return DADA_ERROR_HARD;
}
multilog (pwcm->log, LOG_INFO,"command_start_byte = %"PRIu64", command.byte_"
"count = %"PRIu64"\n",command_start_byte,pwcm->command.byte_count);
if (ipcio_start (pwcm->data_block, command_start_byte) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not start data block"
" at %"PRIu64"\n", command_start_byte);
return DADA_ERROR_HARD;
}
if (!(pwcm->header_valid)) {
if (pwcm->header_valid_function)
pwcm->header_valid = pwcm->header_valid_function(pwcm);
else
pwcm->header_valid = 1;
if (pwcm->header_valid) {
#ifdef _DEBUG
multilog(pwcm->log, LOG_INFO, "dada_pwc_main_record_start: marking header filled\n");
#endif
if (ipcbuf_mark_filled (pwcm->header_block, pwcm->header_size) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not mark filled header\n");
return DADA_ERROR_HARD;
}
} else {
multilog (pwcm->log, LOG_ERR, "Cannot transit from clocking to recoding "
"if when header is invalid\n");;
return DADA_ERROR_HARD;
}
}
return 0;
}
int dada_pwc_main_transfer_data (dada_pwc_main_t* pwcm)
{
uint64_t total_bytes_written = 0;
uint64_t transit_byte = pwcm->command.byte_count;
uint64_t bytes_to_write = pwcm->command.byte_count;
uint64_t buf_bytes = 0;
int64_t bytes_written = 0;
char* buffer = 0;
int64_t buffer_size = 0;
uint64_t data_transfer_error_state = 0;
char* command_string = 0;
uint64_t utc_start_set = 0;
int utc_size = 1024;
char utc_buffer[utc_size];
int64_t first_byte_of_xfer = 1;
if (pwcm->new_xfer_function) {
total_bytes_written = pwcm->new_xfer_function(pwcm);
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data: first byte expected on %"PRIu64"\n", total_bytes_written);
}
if (ascii_header_get (pwcm->header, "UTC_START", "%s", utc_buffer) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not read UTC_START from header\n");
return DADA_ERROR_HARD;
}
if (strcmp(utc_buffer,"UNKNOWN") != 0)
utc_start_set = 1;
#ifdef _DEBUG
fprintf (stderr, "transfer_data: enter main loop\n");
#endif
while ((!dada_pwc_quit (pwcm->pwc)) && (data_transfer_error_state <= 1)) {
#ifdef _DEBUG
fprintf (stderr, "transfer_data: check for command\n");
#endif
if (dada_pwc_command_check (pwcm->pwc)) {
#ifdef _DEBUG
fprintf (stderr, "transfer_data: get command\n");
#endif
pwcm->command = dada_pwc_command_get (pwcm->pwc);
if (pwcm->command.code == dada_pwc_set_utc_start) {
assert(pwcm->command.utc > 0);
if (utc_start_set) {
multilog (pwcm->log, LOG_WARNING, "WARNING, UTC_START was already "
"set. Ignoring set_utc_start command\n");
} else {
strftime (utc_buffer, utc_size, DADA_TIMESTR,
(struct tm*) gmtime(&(pwcm->command.utc)));
#ifdef _DEBUG
fprintf (stderr,"transfer_data: set UTC_START in header to : %s\n",
utc_buffer);
#endif
multilog (pwcm->log, LOG_INFO,"UTC_START = %s\n",utc_buffer);
if (ascii_header_set (pwcm->header,"UTC_START","%s",utc_buffer) < 0) {
multilog (pwcm->log,LOG_ERR,"failed ascii_header_set UTC_START\n");
return DADA_ERROR_HARD;
}
#ifdef _DEBUG
fprintf (stderr,"transfer_data: header block filled\n");
#endif
if (pwcm->pwc->state == dada_pwc_recording) {
if (pwcm->header_valid_function)
pwcm->header_valid = pwcm->header_valid_function(pwcm);
else
pwcm->header_valid = 1;
if (pwcm->header_valid) {
#ifdef _DEBUG
multilog (pwcm->log, LOG_INFO, "transfer_data: marking header valid\n");
#endif
if (ipcbuf_mark_filled (pwcm->header_block,pwcm->header_size) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not mark filled header\n");
return DADA_ERROR_HARD;
}
}
}
utc_start_set = 1;
}
pthread_mutex_lock (&(pwcm->pwc->mutex));
pthread_cond_signal (&(pwcm->pwc->cond));
pthread_mutex_unlock (&(pwcm->pwc->mutex));
} else {
if (pwcm->command.code == dada_pwc_record_stop)
command_string = "recording->clocking";
else if (pwcm->command.code == dada_pwc_record_start)
command_string = "clocking->recording";
else if (pwcm->command.code == dada_pwc_stop)
command_string = "stopping";
else {
multilog (pwcm->log, LOG_ERR, "transfer_data: internal error = "
"unexpected command code %d\n", pwcm->command.code);
return DADA_ERROR_HARD;
}
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data: %s byte count = %"PRIu64" bytes\n",
command_string, pwcm->command.byte_count);
if (pwcm->command.byte_count > total_bytes_written) {
transit_byte = pwcm->command.byte_count;
bytes_to_write = pwcm->command.byte_count - total_bytes_written;
multilog (pwcm->log, LOG_INFO, "%s in %"PRIu64" bytes\n",
command_string, bytes_to_write);
} else {
multilog (pwcm->log, LOG_INFO, "%s immediately\n", command_string);
if (total_bytes_written == 0)
{
multilog (pwcm->log, LOG_INFO, "Received 0 bytes, setting to 1\n");
total_bytes_written = 1;
}
transit_byte = total_bytes_written;
bytes_to_write = 0;
if (pwcm->command.byte_count &&
pwcm->command.byte_count < total_bytes_written &&
pwcm->verbose)
multilog (pwcm->log, LOG_INFO,
"requested transit byte=%"PRIu64" passed\n",
pwcm->command.byte_count);
}
}
if (pwcm->verbose) {
if (pwcm->pwc->state == dada_pwc_recording)
multilog (pwcm->log, LOG_INFO, "recording\n");
else if (pwcm->pwc->state == dada_pwc_clocking)
multilog (pwcm->log, LOG_INFO, "clocking\n");
if (transit_byte) {
if (pwcm->command.code == dada_pwc_record_stop)
multilog (pwcm->log, LOG_INFO, "record stop in %"PRIu64" bytes\n",
bytes_to_write);
else if (pwcm->command.code == dada_pwc_record_start)
multilog (pwcm->log, LOG_INFO, "record start in %"PRIu64" bytes\n",
bytes_to_write);
else if (pwcm->command.code == dada_pwc_record_stop)
multilog (pwcm->log, LOG_INFO, "stop in %"PRIu64" bytes\n",
bytes_to_write);
else
;
}
}
}
if (!transit_byte || bytes_to_write) {
#ifdef _DEBUG
fprintf (stderr, "transfer_data: call buffer function\n");
#endif
buffer = pwcm->buffer_function (pwcm, &buffer_size);
if (buffer_size < 0)
return buffer_size;
if ((data_transfer_error_state) && (buffer_size != 0)) {
data_transfer_error_state = 0;
if (total_bytes_written)
multilog (pwcm->log, LOG_WARNING, "pwc buffer_function "
"recovered from error state\n");
}
if (buffer_size == 0) {
if (pwcm->xfer_pending_function)
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data: buffer_size=0, checking xfer status\n");
first_byte_of_xfer = dada_pwc_main_process_xfer (pwcm, 0, 0);
if (first_byte_of_xfer == -2)
{
multilog (pwcm->log, LOG_ERR, "transfer_data: process_xfer failed\n");
return DADA_ERROR_HARD;
}
else if (first_byte_of_xfer == -1)
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data: process_xfer indicates end of observation\n");
return 0;
}
else if (first_byte_of_xfer == 0)
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data: process_xfer indicates waiting for SOD on xfer\n");
}
else
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data: process_xfer indicates next byte is %"PRIi64"\n", first_byte_of_xfer);
total_bytes_written = (uint64_t) first_byte_of_xfer;
}
} else {
if (pwcm->error_function) {
data_transfer_error_state = pwcm->error_function(pwcm);
} else {
data_transfer_error_state = 2;
}
if (data_transfer_error_state == 2) {
multilog (pwcm->log, LOG_ERR, "pwc buffer_function returned 0 bytes."
" Stopping\n");
bytes_written = ipcio_write (pwcm->data_block, buffer, 1);
return DADA_ERROR_HARD;
}
if ((data_transfer_error_state == 1) && total_bytes_written) {
multilog (pwcm->log, LOG_INFO, "PWC buffer function "
"returned 0 bytes, trying to continue\n");
}
}
}
if (!buffer) {
multilog (pwcm->log, LOG_ERR, "buffer function error\n");
return DADA_ERROR_HARD;
}
if ((!pwcm->header_valid) && (pwcm->pwc->state == dada_pwc_recording)) {
pwcm->header_valid = pwcm->header_valid_function(pwcm);
if (pwcm->header_valid) {
#ifdef _DEBUG
multilog (pwcm->log, LOG_INFO,"transfer_data: marking header filled\n");
#endif
if (ipcbuf_mark_filled (pwcm->header_block,pwcm->header_size) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not mark filled header\n");
return DADA_ERROR_HARD;
}
}
}
buf_bytes = buffer_size;
if (transit_byte && buf_bytes > bytes_to_write)
buf_bytes = bytes_to_write;
if (pwcm->data_block) {
#ifdef _DEBUG
fprintf (stderr, "transfer_data: write to data block buffer=%p bytes=%"
PRIu64"\n", buffer, buf_bytes);
#endif
if ((!utc_start_set) && (ipcio_space_left (pwcm->data_block) < buf_bytes)) {
multilog (pwcm->log, LOG_ERR, "Data block full and UTC_START not "
"set.\n");
return DADA_ERROR_FATAL;
}
bytes_written = ipcio_write (pwcm->data_block, buffer, buf_bytes);
#ifdef _DEBUG
fprintf (stderr, "transfer_data: return from write\n");
#endif
if (bytes_written < 0 || bytes_written < buf_bytes) {
multilog (pwcm->log, LOG_ERR, "Cannot write %"PRIu64
" bytes to Data Block\n", buf_bytes);
return DADA_ERROR_FATAL;
}
}
total_bytes_written += buf_bytes;
if (bytes_to_write)
bytes_to_write -= buf_bytes;
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "Written %"PRIu64" bytes\n",
total_bytes_written);
}
if (transit_byte && pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transit=%"PRIu64" total=%"PRIu64"\n",
transit_byte, total_bytes_written);
if (transit_byte && (transit_byte <= total_bytes_written)) {
#ifdef _DEBUG
fprintf (stderr, "transfer_data: transit state\n");
#endif
transit_byte = 0;
if (pwcm->command.code == dada_pwc_record_stop) {
multilog (pwcm->log, LOG_INFO, "record stop\n");
if (pwcm->data_block && ipcio_stop (pwcm->data_block) < 0){
multilog (pwcm->log, LOG_ERR, "Could not stop data block\n");
return DADA_ERROR_FATAL;
}
dada_pwc_set_state (pwcm->pwc, dada_pwc_clocking, 0);
}
else if (pwcm->command.code == dada_pwc_record_start) {
multilog (pwcm->log, LOG_INFO, "record start\n");
if (dada_pwc_main_record_start (pwcm) < 0)
return DADA_ERROR_HARD;
dada_pwc_set_state (pwcm->pwc, dada_pwc_recording, 0);
}
else if (pwcm->command.code == dada_pwc_stop) {
if (pwcm->xfer_pending_function)
{
dada_pwc_main_process_xfer(pwcm, 2, 0);
}
#ifdef _DEBUG
multilog (pwcm->log, LOG_INFO, "stopping... entering idle state\n");
#endif
return 0;
}
else if (pwcm->command.code == dada_pwc_set_utc_start) {
if (total_bytes_written) {
multilog (pwcm->log, LOG_ERR, "Error. unexpected set_utc_start\n");
return DADA_ERROR_HARD;
}
} else if (pwcm->command.code == dada_pwc_clock) {
if (total_bytes_written) {
multilog (pwcm->log, LOG_ERR, "Error. unexpected clock command\n");
return DADA_ERROR_HARD;
}
} else if (pwcm->command.code == dada_pwc_start) {
if (total_bytes_written) {
multilog (pwcm->log, LOG_ERR, "Error. unexpected start command\n");
return DADA_ERROR_HARD;
}
} else {
multilog (pwcm->log, LOG_ERR, "Error. unpected command: %d\n",
pwcm->command.code);
return DADA_ERROR_HARD;
}
buffer += buf_bytes;
buf_bytes = buffer_size - buf_bytes;
if (buf_bytes && pwcm->data_block) {
if (ipcio_space_left(pwcm->data_block) < buf_bytes)
{
if (!utc_start_set) {
multilog (pwcm->log, LOG_ERR, "Data block full and UTC_START not set.\n");
return DADA_ERROR_FATAL;
}
else
multilog(pwcm->log, LOG_WARNING, "Data block full, waiting for space\n");
}
if (ipcio_write (pwcm->data_block, buffer, buf_bytes) < buf_bytes) {
multilog (pwcm->log, LOG_ERR, "Cannot write %"PRIu64
" bytes to Data Block\n", buf_bytes);
return DADA_ERROR_FATAL;
}
total_bytes_written += buf_bytes;
}
}
}
return 0;
}
int dada_pwc_main_transfer_data_block (dada_pwc_main_t* pwcm)
{
uint64_t total_bytes_written = 0;
uint64_t transit_byte = pwcm->command.byte_count;
uint64_t bytes_to_write = pwcm->command.byte_count;
uint64_t buf_bytes = 0;
int64_t bytes_written = 0;
uint64_t bytes_written_this_xfer = 0;
char* buffer = 0;
uint64_t data_transfer_error_state = 0;
uint64_t block_id = 0;
uint64_t block_size = ipcbuf_get_bufsz ((ipcbuf_t*) pwcm->data_block);
char* command_string = 0;
uint64_t utc_start_set = 0;
int utc_size = 1024;
char utc_buffer[utc_size];
int64_t first_byte_of_xfer = 1;
if (pwcm->new_xfer_function) {
total_bytes_written = pwcm->new_xfer_function(pwcm);
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "first byte expected on %"PRIu64"\n", total_bytes_written);
}
if (ascii_header_get (pwcm->header, "UTC_START", "%s", utc_buffer) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not read UTC_START from header\n");
return DADA_ERROR_HARD;
}
if (strcmp(utc_buffer,"UNKNOWN") != 0)
utc_start_set = 1;
#ifdef _DEBUG
fprintf (stderr, "transfer_data_block: enter main loop\n");
#endif
while ((!dada_pwc_quit (pwcm->pwc)) && (data_transfer_error_state <= 1)) {
#ifdef _DEBUG
fprintf (stderr, "transfer_data_block: check for command\n");
#endif
if (dada_pwc_command_check (pwcm->pwc)) {
#ifdef _DEBUG
fprintf (stderr, "transfer_data_block: get command\n");
#endif
pwcm->command = dada_pwc_command_get (pwcm->pwc);
#ifdef _DEBUG
multilog (pwcm->log, LOG_INFO, "pwcm->command.utc = %d\n",pwcm->command.utc);
multilog (pwcm->log, LOG_INFO, "pwcm->pwc->utc_start = %d\n",pwcm->pwc->utc_start);
#endif
if (pwcm->command.code == dada_pwc_set_utc_start) {
#ifdef _DEBUG
multilog (pwcm->log, LOG_INFO, "pwcm->command.code = dada_pwc_set_utc_start\n");
multilog (pwcm->log, LOG_INFO, "pwcm->command.utc = %d\n", pwcm->command.utc);
multilog (pwcm->log, LOG_INFO, "pwcm->command.byte_count = %"PRIu64"\n", pwcm->command.byte_count);
#endif
assert(pwcm->command.utc > 0);
if (utc_start_set) {
multilog (pwcm->log, LOG_WARNING, "WARNING, UTC_START was already "
"set. Ignoring set_utc_start command\n");
} else {
strftime (utc_buffer, utc_size, DADA_TIMESTR,
(struct tm*) gmtime(&(pwcm->command.utc)));
#ifdef _DEBUG
fprintf (stderr,"transfer_data_block: set UTC_START in "
"header to : %s\n",utc_buffer);
#endif
multilog (pwcm->log, LOG_INFO,"UTC_START = %s\n",utc_buffer);
if (ascii_header_set (pwcm->header,"UTC_START","%s",utc_buffer) < 0) {
multilog (pwcm->log,LOG_ERR,"failed ascii_header_set UTC_START\n");
return DADA_ERROR_HARD;
}
#ifdef _DEBUG
fprintf (stderr,"transfer_data_block: header block filled\n");
#endif
if (pwcm->pwc->state == dada_pwc_recording) {
if (pwcm->header_valid_function)
pwcm->header_valid = pwcm->header_valid_function(pwcm);
else
pwcm->header_valid = 1;
if (pwcm->header_valid) {
#ifdef _DEBUG
multilog (pwcm->log, LOG_INFO, "transfer_data_block: marking header valid\n");
#endif
if (ipcbuf_mark_filled (pwcm->header_block,pwcm->header_size) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not mark filled header\n");
return DADA_ERROR_HARD;
}
}
}
utc_start_set = 1;
}
pthread_mutex_lock (&(pwcm->pwc->mutex));
pthread_cond_signal (&(pwcm->pwc->cond));
pthread_mutex_unlock (&(pwcm->pwc->mutex));
} else {
if (pwcm->command.code == dada_pwc_record_stop)
command_string = "recording->clocking";
else if (pwcm->command.code == dada_pwc_record_start)
command_string = "clocking->recording";
else if (pwcm->command.code == dada_pwc_stop)
command_string = "stopping";
else {
multilog (pwcm->log, LOG_ERR,
"dada_pwc_main_transfer data internal error = "
"unexpected command code %d\n", pwcm->command.code);
return DADA_ERROR_HARD;
}
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "%s byte count = %"PRIu64" bytes\n",
command_string, pwcm->command.byte_count);
if (pwcm->command.byte_count > total_bytes_written) {
transit_byte = pwcm->command.byte_count;
bytes_to_write = pwcm->command.byte_count - total_bytes_written;
multilog (pwcm->log, LOG_INFO, "%s in %"PRIu64" bytes\n",
command_string, bytes_to_write);
} else {
multilog (pwcm->log, LOG_INFO, "%s immediately\n", command_string);
transit_byte = total_bytes_written;
bytes_to_write = 0;
if (pwcm->command.byte_count &&
pwcm->command.byte_count < total_bytes_written)
multilog (pwcm->log, LOG_NOTICE,
"requested transit byte=%"PRIu64" passed\n",
pwcm->command.byte_count);
}
}
if (pwcm->verbose) {
if (pwcm->pwc->state == dada_pwc_recording)
multilog (pwcm->log, LOG_INFO, "recording\n");
else if (pwcm->pwc->state == dada_pwc_clocking)
multilog (pwcm->log, LOG_INFO, "clocking\n");
if (transit_byte) {
if (pwcm->command.code == dada_pwc_record_stop)
multilog (pwcm->log, LOG_INFO, "record stop in %"PRIu64" bytes\n",
bytes_to_write);
else if (pwcm->command.code == dada_pwc_record_start)
multilog (pwcm->log, LOG_INFO, "record start in %"PRIu64" bytes\n",
bytes_to_write);
else if (pwcm->command.code == dada_pwc_record_stop)
multilog (pwcm->log, LOG_INFO, "stop in %"PRIu64" bytes\n",
bytes_to_write);
else
;
}
}
}
buf_bytes = 0;
if (!transit_byte || bytes_to_write)
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data_block: ipcio_open_block_write()\n");
buffer = ipcio_open_block_write (pwcm->data_block, &block_id);
if (!buffer)
{
multilog (pwcm->log, LOG_ERR, "ipcio_open_block_write error %s\n", strerror(errno));
return DADA_ERROR_FATAL;
}
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data_block: opened block id=%"PRIu64"\n", block_id);
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data_block: block_function(%"PRIu64", %"PRIu64")\n", block_size, block_id);
bytes_written = pwcm->block_function (pwcm, buffer, block_size, block_id);
if (bytes_written < 0)
{
multilog (pwcm->log, LOG_ERR, "transfer_data_block: block_function failed\n");
ipcio_close_block_write (pwcm->data_block, 0);
return DADA_ERROR_FATAL;
}
if (bytes_written > block_size)
{
multilog (pwcm->log, LOG_ERR, "transfer_data_block: block_function wrote %"PRIu64
" > block_size %"PRIu64"\n", bytes_written, block_size);
ipcio_close_block_write (pwcm->data_block, block_size);
return DADA_ERROR_FATAL;
}
buf_bytes = bytes_written;
if (transit_byte && bytes_written > bytes_to_write)
buf_bytes = bytes_to_write;
if (bytes_written == 0)
{
multilog (pwcm->log, LOG_INFO, "transfer_data_block: block_function returned 0 bytes, "
"buf_bytes=%"PRIu64" bytes_written_this_xfer=%"PRIu64"\n",
buf_bytes, bytes_written_this_xfer);
}
total_bytes_written += buf_bytes;
if (bytes_to_write)
bytes_to_write -= buf_bytes;
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "Written %"PRIu64" bytes\n", total_bytes_written);
if (!pwcm->xfer_pending_function)
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data_block: ipcio_close_block_write(%"PRIu64")\n", (uint64_t) buf_bytes);
if (ipcio_close_block_write (pwcm->data_block, (uint64_t) buf_bytes) < 0) {
multilog (pwcm->log, LOG_ERR, "transfer_data_block: ipcio_close_block_write error\n");
return DADA_ERROR_FATAL;
}
}
else
{
bytes_written_this_xfer += buf_bytes;
if (transit_byte && (transit_byte == total_bytes_written))
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data_block: transit_byte == total_bytes_written\n");
void * ending_buffer = 0;
uint64_t ending_block_size = 0;
pwcm->block_function (pwcm, ending_buffer, ending_block_size, block_id);
}
first_byte_of_xfer = dada_pwc_main_process_xfer (pwcm, buf_bytes, bytes_written_this_xfer);
if (first_byte_of_xfer == 0)
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data_block: process_xfer reports xfer continuing\n");
}
else if (first_byte_of_xfer < 0)
{
multilog (pwcm->log, LOG_ERR, "dada_pwc_main_process_xfer failed\n");
return DADA_ERROR_HARD;
}
else
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data_block: new XFER, next byte=%"PRIi64"\n", first_byte_of_xfer);
total_bytes_written = (uint64_t) first_byte_of_xfer;
bytes_written_this_xfer = 0;
}
}
if ((!pwcm->header_valid) && (pwcm->pwc->state == dada_pwc_recording)) {
pwcm->header_valid = pwcm->header_valid_function(pwcm);
if (pwcm->header_valid) {
multilog (pwcm->log, LOG_INFO,"transfer_data_block: marking header filled\n");
if (ipcbuf_mark_filled (pwcm->header_block,pwcm->header_size) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not mark filled header\n");
return DADA_ERROR_HARD;
}
}
}
} else {
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transfer_data_block: transit byte was set, block_function ignored\n");
}
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "transit=%"PRIu64" total=%"PRIu64"\n",
transit_byte, total_bytes_written);
if (transit_byte && (transit_byte <= total_bytes_written)) {
#ifdef _DEBUG
fprintf (stderr, "transfer_data_block: transit state\n");
#endif
transit_byte = 0;
if (pwcm->command.code == dada_pwc_record_stop) {
multilog (pwcm->log, LOG_INFO, "record stop\n");
if (pwcm->data_block && ipcio_stop (pwcm->data_block) < 0){
multilog (pwcm->log, LOG_ERR, "Could not stop data block\n");
return DADA_ERROR_FATAL;
}
dada_pwc_set_state (pwcm->pwc, dada_pwc_clocking, 0);
}
else if (pwcm->command.code == dada_pwc_record_start) {
multilog (pwcm->log, LOG_INFO, "record start\n");
if (dada_pwc_main_record_start (pwcm) < 0)
return DADA_ERROR_HARD;
dada_pwc_set_state (pwcm->pwc, dada_pwc_recording, 0);
}
else if (pwcm->command.code == dada_pwc_stop) {
if (pwcm->xfer_pending_function)
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "stopping, but calling process_xfer\n");
int64_t result = dada_pwc_main_process_xfer (pwcm, buf_bytes, bytes_written_this_xfer);
multilog (pwcm->log, LOG_INFO, "process_xfer returned %"PRIi64"\n", result);
}
multilog (pwcm->log, LOG_INFO, "stopping... entering idle state\n");
return 0;
}
else if (pwcm->command.code == dada_pwc_set_utc_start) {
if (total_bytes_written) {
multilog (pwcm->log, LOG_ERR, "Error. unexpected set_utc_start\n");
return DADA_ERROR_HARD;
}
} else if (pwcm->command.code == dada_pwc_clock) {
if (total_bytes_written) {
multilog (pwcm->log, LOG_ERR, "Error. unexpected clock command\n");
return DADA_ERROR_HARD;
}
} else if (pwcm->command.code == dada_pwc_start) {
if (total_bytes_written) {
multilog (pwcm->log, LOG_ERR, "Error. unexpected start command\n");
return DADA_ERROR_HARD;
}
} else {
multilog (pwcm->log, LOG_ERR, "Error. unpected command: %d\n",
pwcm->command.code);
return DADA_ERROR_HARD;
}
}
}
return 0;
}
int dada_pwc_main_stop_transfer (dada_pwc_main_t* pwcm)
{
pwcm->header_valid = 0;
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "stop_transfer: calling stop_function()\n");
if (pwcm->stop_function (pwcm) < 0) {
multilog (pwcm->log, LOG_ERR, "dada_pwc_main_stop_transfer"
" stop function returned error code\n");
return DADA_ERROR_HARD;
}
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "stop_transfer: closing data_block()\n");
if (pwcm->data_block && ipcio_close (pwcm->data_block) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not close Data Block\n");
return DADA_ERROR_FATAL;
}
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "stop_transfer: setting state\n");
if (pwcm->pwc->state != dada_pwc_soft_error &&
pwcm->pwc->state != dada_pwc_hard_error &&
pwcm->pwc->state != dada_pwc_fatal_error)
dada_pwc_set_state (pwcm->pwc, dada_pwc_idle, 0);
return 0;
}
void dada_pwc_main_process_error (dada_pwc_main_t* pwcm, int rval)
{
int new_state = pwcm->pwc->state;
switch (rval)
{
case DADA_ERROR_SOFT:
if ( (pwcm->pwc->state != dada_pwc_hard_error) &&
(pwcm->pwc->state != dada_pwc_fatal_error) )
new_state = dada_pwc_soft_error;
break;
case DADA_ERROR_HARD:
if (pwcm->pwc->state != dada_pwc_fatal_error)
new_state = dada_pwc_hard_error;
break;
case DADA_ERROR_FATAL:
new_state = dada_pwc_fatal_error;
break;
default:
multilog (pwcm->log, LOG_ERR, "Unknown error state: %d\n",rval);
new_state = dada_pwc_fatal_error;
}
multilog(pwcm->log, LOG_WARNING, "PWC entering error state: %s\n",
dada_pwc_state_to_string(new_state));
if (dada_pwc_set_state (pwcm->pwc, new_state, 0) < 0)
multilog(pwcm->log, LOG_ERR, "Failed to change state from %s to %s\n",
dada_pwc_state_to_string(pwcm->pwc->state),
dada_pwc_state_to_string(new_state));
}
int64_t dada_pwc_main_process_xfer (dada_pwc_main_t* pwcm, uint64_t buf_bytes, uint64_t bytes_this_xfer)
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "process_xfer(%"PRIu64", %"PRIu64")\n", buf_bytes, bytes_this_xfer);
int xfer_status = 0;
int64_t first_byte = 0;
xfer_status = pwcm->xfer_pending_function(pwcm);
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "process_xfer: xfer_status=%d\n", xfer_status);
if (xfer_status == DADA_XFER_NORMAL)
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "process_xfer: status=XFER_NORMAL, ipcio_close_block_write(%"PRIu64")\n", buf_bytes);
int rval = ipcio_close_block_write (pwcm->data_block, buf_bytes);
if (rval < 0) {
multilog (pwcm->log, LOG_ERR, "process_xfer: ipcio_close_block_write error [%d]\n", rval);
return DADA_ERROR_FATAL;
}
return 0;
}
else if (xfer_status == DADA_OBS_ENDING)
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "process_xfer: status=OBS_ENDING\n");
return 0;
}
else
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "process_xfer: status=%d, ipcio_update_block_write(%"PRIu64")\n", xfer_status, buf_bytes);
if (ipcio_update_block_write (pwcm->data_block, (uint64_t) buf_bytes) < 0)
{
multilog (pwcm->log, LOG_ERR, "process_xfer: ipcio_update_block_write error\n");
return DADA_ERROR_FATAL;
}
char * last_header = (char *) malloc (sizeof(char) * pwcm->header_size);
if (!last_header)
{
multilog (pwcm->log, LOG_ERR, "process_xfer: malloc failed for last_header\n");
return -2;
}
memcpy(last_header, pwcm->header, pwcm->header_size);
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "process_xfer: closing header/data block\n");
if (ipcio_close (pwcm->data_block) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not unlock Data Block write\n");
return -2;
}
if (ipcbuf_unlock_write (pwcm->header_block) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not unlock Header Block write\n");
return -2;
}
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "process_xfer: opening header/data block\n");
if (ipcbuf_lock_write (pwcm->header_block) < 0) {
multilog (pwcm->log, LOG_ERR, "Could not lock Header Block for writing\n");
return -2;
}
if (ipcio_open (pwcm->data_block, 'W') < 0) {
multilog (pwcm->log, LOG_ERR, "Could not lock Data Block for writing\n");
return -2;
}
pwcm->header = ipcbuf_get_next_write (pwcm->header_block);
memcpy(pwcm->header, last_header, pwcm->header_size);
first_byte = (int64_t) pwcm->new_xfer_function (pwcm);
if (pwcm->verbose)
multilog(pwcm->log, LOG_INFO, "process_xfer: next XFER begins on byte=%"PRIu64"\n", first_byte);
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "process_xfer: marking header block filled\n");
if (ipcbuf_mark_filled (pwcm->header_block, pwcm->header_size) < 0) {
multilog (pwcm->log, LOG_ERR, "process_xfer: Could not mark header block filled\n");
return -2;
}
xfer_status = pwcm->xfer_pending_function(pwcm);
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "process_xfer: xfer_status=%d\n", xfer_status);
if (xfer_status == DADA_OBS_ENDING)
{
if (pwcm->verbose)
multilog (pwcm->log, LOG_INFO, "process_xfer: OBS_ENDING, write 1 byte to empty DB\n");
char zerod_char = '0';
if (ipcio_write(pwcm->data_block, &zerod_char, 1) != 1)
{
multilog(pwcm->log, LOG_ERR, "process_xfer: failed to write 1 byte to empty datablock at end of OBS\n");
return -2;
}
}
return first_byte;
}
}