psrdada-sys 0.2.0

Bindgen wrappers for psrdada
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
#include "dada_pwc_nexus.h"
#include "ascii_header.h"
#include "dada_def.h"

#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <sys/stat.h>


/* #define _DEBUG 1 */

/*! Initialize a new dada_node_t struct with default empty values */
void dada_pwc_node_init (dada_pwc_node_t* node)
{
  node_t* node_base = (node_t*) node;
  node_init (node_base);

  node -> header = 0;
  node -> header_size = 0;
  node -> state = dada_pwc_pending;
}

/*! Return pointer to a newly allocated and initialized dada_node_t struct */
node_t* dada_pwc_node_create ()
{
  dada_pwc_node_t* node = (dada_pwc_node_t*) malloc (sizeof(dada_pwc_node_t));
  assert (node != 0);
  dada_pwc_node_init (node);
  return (node_t*) node;
}

int dada_pwc_nexus_update_state (dada_pwc_nexus_t* nexus);

/*! Pointer to function that initializes a new connection with a node */
int dada_pwc_nexus_node_init (nexus_t* nexus, node_t* node)
{
  dada_pwc_nexus_t* dada_pwc_nexus = (dada_pwc_nexus_t*) nexus;
  dada_pwc_node_t* dada_pwc_node = (dada_pwc_node_t*) node;

  unsigned buffer_size = 1024;
  static char* buffer = 0;

  char* key = 0;

  if (!buffer)
    buffer = malloc (buffer_size);
  assert (buffer != 0);

#ifdef _DEBUG
  fprintf (stderr, "dada_pwc_node_init: receiving the welcome message\n");
#endif

  if (node_recv (node, buffer, buffer_size) < 0) {
    dada_pwc_node -> state = dada_pwc_undefined;
    return -1;
  }

#ifdef _DEBUG
  fprintf (stderr, "dada_pwc_node_init: requesting state\n");
#endif

  if (node_send (node, "state") < 0) {
    dada_pwc_node -> state = dada_pwc_undefined;
    return -1;
  }

#ifdef _DEBUG
  fprintf (stderr, "dada_pwc_node_init: receiving state\n");
#endif

  if (node_recv (node, buffer, buffer_size) < 0) {
    dada_pwc_node -> state = dada_pwc_undefined;
    return -1;
  }

#ifdef _DEBUG
  fprintf (stderr, "dada_pwc_node_init: received '%s'\n", buffer);
#endif

  key = strtok (buffer, " \t\n\r");

  dada_pwc_node->state = dada_pwc_string_to_state (key);

  return dada_pwc_nexus_update_state (dada_pwc_nexus);
}

/*! Parse DADA PWC nexus configuration parameters from the config buffer */
int dada_pwc_nexus_parse (nexus_t* n, const char* config)
{

  /* the nexus is actually a DADA PWC nexus */
  dada_pwc_nexus_t* nexus = (dada_pwc_nexus_t*) n;

  /* the size of the header parsed from the config */
  unsigned hdr_size = 0;

  /* the status returned by sub-routines */
  int status = 0;

  /* get the heaer size */
  if (ascii_header_get (config, "HDR_SIZE", "%u", &hdr_size) < 0)
   multilog_fprintf (stderr, LOG_WARNING, "dada_pwc_nexus_parse: using default HDR_SIZE\n");
  else
    dada_pwc_set_header_size (nexus->pwc, hdr_size);

  if (ascii_header_get (config, "USE_BASEPORT", "%d", &(n->mirror->use_baseport)) < 0)
    n->mirror->use_baseport = 0;

  // Get the PWCC control port from the config file
  char node_name [16];
  sprintf (node_name, "%sC_PORT", n->node_prefix);
  if (ascii_header_get (config, node_name, "%d", &(nexus->pwc->port)) < 0) {
   multilog_fprintf (stderr, LOG_WARNING, "nexus_parse: %s not specified.\n", node_name);
    nexus->pwc->port = DADA_DEFAULT_PWCC_PORT;
    if (nexus->pwc->port)
     multilog_fprintf (stderr, LOG_WARNING, "using default=%d\n", nexus->pwc->port);
    else {
     multilog_fprintf (stderr, LOG_ERR, "no default available\n");
      return -1;
    }
  }

  char logfile_dir [256];
  n->mirror->logfile_dir = NULL;

  /* If this is the mirror nexus, then setup the log file */
  if (ascii_header_get (config,"LOGFILE_DIR", "%s", &logfile_dir) < 0) {

    multilog_fprintf (stderr, LOG_WARNING, "nexus_parse: LOGFILE_DIR not specified, not logging\n");

  } else {

    // Get the PWC multilog port from the config file
    sprintf (node_name, "%s_LOGPORT", n->node_prefix);
    if (ascii_header_get (config, node_name, "%d", &(n->mirror->node_port)) < 0) {
      multilog_fprintf (stderr, LOG_WARNING, "nexus_parse: %s not specified.", node_name);
      n->mirror->node_port = DADA_DEFAULT_PWC_LOG;
      if (n->mirror->node_port)
       multilog_fprintf (stderr, LOG_WARNING, " using default=%d\n", n->mirror->node_port);
      else {
       multilog_fprintf (stderr, LOG_ERR, " no default available\n");
        return -1;
      }
    }

    // Get the PWCC multilog port from the config file
    sprintf (node_name, "%sC_LOGPORT", n->node_prefix);
    if (ascii_header_get (config, node_name, "%d", &(n->mirror->multilog_port)) < 0) {
      multilog_fprintf (stderr, LOG_WARNING, "nexus_parse: %s not specified.", node_name);
      n->mirror->multilog_port = DADA_DEFAULT_PWCC_LOGPORT;
      if (n->mirror->multilog_port)
       multilog_fprintf (stderr, LOG_WARNING, " using default=%d\n", n->mirror->multilog_port);
      else {
       multilog_fprintf (stderr, LOG_ERR, " no default available\n");
        return -1;
      }
    } 

    n->mirror->logfile_dir = malloc(strlen(logfile_dir));
    sprintf(n->mirror->logfile_dir,"%s",logfile_dir);

    struct stat st;
    stat(n->mirror->logfile_dir,&st);
                                                                                                                                                                              
    /* check the dir:  is dir     write      execute  permissions */
    if (!(S_ISDIR(st.st_mode))) {
     multilog_fprintf (stderr, LOG_WARNING, "nexus_parse: logfile directory %s did not exist\n",
                                            n->mirror->logfile_dir);
      n->mirror->logfile_dir = NULL;
      return -1;
    }
                                                                                                                                                                              
    if (!((st.st_mode & S_IWUSR) && (st.st_mode & S_IXUSR) &&
                                    (st.st_mode & S_IRUSR))) {
     multilog_fprintf (stderr, LOG_ERR, "nexus_parse: logfile directory %s was not writeable\n",
                      n->mirror->logfile_dir);
      n->logfile_dir = NULL;
      return -1;
    }
  }

  /* call the base class configuration parser */
  if (nexus_parse (n, config) < 0)
    return -1;

  return status;
}

int dada_pwc_nexus_update_state (dada_pwc_nexus_t* nexus)
{
  nexus_t* base = (nexus_t*) nexus;
  dada_pwc_node_t* node = 0;
  dada_pwc_state_t state = 0;

  unsigned inode = 0;
  unsigned nnode = 0;
  unsigned nsoft_err = 0;
  unsigned nhard_err = 0;
  unsigned nfatal_err = 0;

  /* fprintf (stderr, "dada_pwc_nexus_update_state: locking mutex\n"); */

  pthread_mutex_lock (&(base->mutex));

  /* fprintf (stderr, "dada_pwc_nexus_update_state: mutex locked\n"); */

  nnode = base->nnode;

  /* the nexus will always be in the same state as its PWC's. The only
   * exception to this is if nodes are in an error state. */
  
  for (inode = 0; inode < nnode; inode++)
  {
    node = base->nodes[inode];

    if (node->state == dada_pwc_soft_error)
      nsoft_err++;
    else if (node->state == dada_pwc_hard_error)
      nhard_err++;
    else if (node->state == dada_pwc_fatal_error)
      nfatal_err++;
    else if (inode == (nsoft_err+nhard_err+nfatal_err))
    {
      /* set state of the first non error node */
      state = node->state;
    }
    else if (state != node->state)
    {
      state = dada_pwc_undefined;
      break;
    }

    /*if (inode == 0)
      state = node->state;
    else if (state != node->state) {
      state = dada_pwc_undefined;
      break;
    }*/
  }

  /* If we don't have any clients that are non erroneous */
  if ((nsoft_err+nhard_err+nfatal_err) == nnode)
  {
    /* set state to the strongest error */
    if (nsoft_err) state = dada_pwc_soft_error;
    if (nhard_err) state = dada_pwc_hard_error;
    if (nfatal_err) state = dada_pwc_fatal_error;
  }

  pthread_mutex_unlock (&(base->mutex));

  nexus->pwc->state = state;

  return 0;
}

int dada_pwc_nexus_handle_message (void* me, unsigned inode, const char* msg)
{
  char state_string [16];
  dada_pwc_nexus_t* nexus = (dada_pwc_nexus_t*) me;
  dada_pwc_node_t* node = nexus->nexus.nodes[inode];

  char* state_change = strstr (msg, "STATE = ");

  if (state_change)
  {
    sscanf (state_change, "STATE = %s", state_string);
    node->state = dada_pwc_string_to_state (state_string);
    dada_pwc_nexus_update_state (nexus);
  }

  return 0;
}

/*! Send a unique header to each of the nodes */
int dada_pwc_nexus_cmd_config (void* context, FILE* fptr, char* args);

/*! Report on the state of each of the nodes */
int dada_pwc_nexus_cmd_state (void* context, FILE* fptr, char* args)
{
  dada_pwc_nexus_t* nexus = (dada_pwc_nexus_t*) context;
  nexus_t* base = (nexus_t*) context;
  dada_pwc_node_t* node = 0;

  unsigned inode = 0;
  unsigned nnode = 0;

  fprintf (fptr, "overall: %s\n", 
           dada_pwc_state_to_string( nexus->pwc->state ));

  pthread_mutex_lock (&(base->mutex));

  nnode = base->nnode;

  for (inode = 0; inode < nnode; inode++) {
    node = base->nodes[inode];
    fprintf (fptr, "PWC_%d: %s\n", inode,
             dada_pwc_state_to_string( node->state ));
  }

  pthread_mutex_unlock (&(base->mutex));

  return 0;
}

int dada_pwc_nexus_cmd_duration (void* context, FILE* fptr, char* args)
{
  unsigned buffer_size = 128;
  static char* buffer = 0;

  dada_pwc_nexus_t* nexus = (dada_pwc_nexus_t*) context;

  if (dada_pwc_cmd_duration (nexus->pwc, fptr, args) < 0)
    return -1;

  if (!buffer)
    buffer = malloc (buffer_size);
  assert (buffer != 0);

  snprintf (buffer, buffer_size, "duration %s", args);
  return nexus_send ((nexus_t*)nexus, buffer);
}

void dada_pwc_nexus_init (dada_pwc_nexus_t* nexus)
{
  nexus_t* nexus_base = (nexus_t*) nexus;
  nexus_init (nexus_base);

  if (nexus_base->node_prefix)
    free (nexus_base->node_prefix);
  nexus_base->node_prefix = strdup ("PWC");
  assert (nexus_base->node_prefix != 0);

  /* over-ride the nexus base class methods with dada_pwc_nexus methods */
  nexus_base->node_port = DADA_DEFAULT_PWC_PORT;
  nexus_base->node_create = &dada_pwc_node_create;
  nexus_base->node_init   = &dada_pwc_nexus_node_init;
  nexus_base->nexus_parse = &dada_pwc_nexus_parse;

  /* set up a mirror nexus for monitoring the PWC multilog messages */
  nexus_base->mirror = nexus_create ();
  nexus_base->mirror->node_port = DADA_DEFAULT_PWC_LOG;

  /* set up the monitor of the mirror nexus */
  nexus->monitor = monitor_create ();
  nexus->monitor->nexus = nexus_base->mirror;
  nexus->monitor->handle_message = &dada_pwc_nexus_handle_message;
  nexus->monitor->context = nexus;

#ifdef _DEBUG
  fprintf (stderr, "dada_pwc_nexus_init dada_pwc_create\n");
#endif

  nexus->pwc = dada_pwc_create ();

  /* do not convert times and sample counts into bytes */
  // nexus->pwc->convert_to_bytes = 0;

  /* convert time_t to local time strings */
  //nexus->convert_to_tm = localtime;
  nexus->convert_to_tm = gmtime;

  nexus->header_template = 0;

#ifdef _DEBUG
  fprintf (stderr, "dada_pwc_nexus_init command_parse_add\n");
#endif

  /* replace the header command with the config command */
  command_parse_remove (nexus->pwc->parser, "header");
  command_parse_add (nexus->pwc->parser, dada_pwc_nexus_cmd_config, nexus,
                     "config", "configure all nodes", NULL);

  /* replace the state command */
  command_parse_remove (nexus->pwc->parser, "state");
  command_parse_add (nexus->pwc->parser, dada_pwc_nexus_cmd_state, nexus,
                     "state", "get the current state of all nodes", NULL);

  /* replace the duration command */
  command_parse_remove (nexus->pwc->parser, "duration");
  command_parse_add (nexus->pwc->parser, dada_pwc_nexus_cmd_duration, nexus,
                     "duration", "set the duration of next recording", NULL);
}

/*! Create a new DADA nexus */
dada_pwc_nexus_t* dada_pwc_nexus_create ()
{
  dada_pwc_nexus_t* nexus = 0;
  nexus = (dada_pwc_nexus_t*) malloc (sizeof(dada_pwc_nexus_t));
  assert (nexus != 0);
  dada_pwc_nexus_init (nexus);
  return nexus;
}

/*! Destroy a DADA nexus */
int dada_pwc_nexus_destroy (dada_pwc_nexus_t* nexus)
{
  return nexus_destroy ((nexus_t*) nexus);
}

int dada_pwc_nexus_configure (dada_pwc_nexus_t* nexus, const char* filename)
{
  return nexus_configure ((nexus_t*) nexus, filename);
}

int dada_pwc_nexus_send (dada_pwc_nexus_t* nexus, dada_pwc_command_t command)
{
  unsigned buffer_size = 128;
  static char* buffer = 0;

  if (!buffer)
    buffer = malloc (buffer_size);
  assert (buffer != 0);

  switch (command.code) {

  case dada_pwc_clock:

    if (dada_pwc_set_state (nexus->pwc, dada_pwc_clocking, time(0)) < 0)
      return -1;

    return nexus_send ((nexus_t*)nexus, "clock");
    
  case dada_pwc_record_start:

    if (dada_pwc_set_state (nexus->pwc, dada_pwc_recording, time(0)) < 0)
      return -1;

    strftime (buffer, buffer_size, "rec_start " DADA_TIMESTR,
              nexus->convert_to_tm (&command.utc));
    return nexus_send ((nexus_t*)nexus, buffer);
    
  case dada_pwc_record_stop:

    if (dada_pwc_set_state (nexus->pwc, dada_pwc_clocking, time(0)) < 0)
      return -1;

    strftime (buffer, buffer_size, "rec_stop " DADA_TIMESTR,
              nexus->convert_to_tm (&command.utc));
    return nexus_send ((nexus_t*)nexus, buffer);
    
  case dada_pwc_start:
    
    if (dada_pwc_set_state (nexus->pwc, dada_pwc_recording, time(0)) < 0)
      return -1;

    if (!command.utc)
      return nexus_send ((nexus_t*)nexus, "start");

    strftime (buffer, buffer_size, "start " DADA_TIMESTR,
              nexus->convert_to_tm (&command.utc));
    return nexus_send ((nexus_t*)nexus, buffer);
    
  case dada_pwc_stop:

    if (dada_pwc_set_state (nexus->pwc, dada_pwc_idle, time(0)) < 0)
      return -1;

    if (!command.utc)
      return nexus_send ((nexus_t*)nexus, "stop");

    strftime (buffer, buffer_size, "stop " DADA_TIMESTR,
              nexus->convert_to_tm (&command.utc));
    return nexus_send ((nexus_t*)nexus, buffer);

  case dada_pwc_set_utc_start:
    /* Special case for PWC's who must be told their UTC_START */
    if (!command.utc)
      return -1;

    strftime (buffer, buffer_size, "set_utc_start " DADA_TIMESTR,
              nexus->convert_to_tm (&command.utc));
    return nexus_send ((nexus_t*)nexus, buffer);
          
  case dada_pwc_reset:

    return nexus_send((nexus_t*)nexus, "reset");

  }

  return -1;
}

int dada_pwc_nexus_serve (dada_pwc_nexus_t* nexus)
{
  /* the DADA PWC command */
  dada_pwc_command_t command = DADA_PWC_COMMAND_INIT;

  if (dada_pwc_serve (nexus->pwc) < 0) {
   multilog_fprintf (stderr, LOG_ERR, "dada_pwc_nexus_serve: could not start PWC server\n");
    return -1;
  }

  monitor_launch (nexus->monitor);

  while (!dada_pwc_quit (nexus->pwc))
  {
    command = dada_pwc_command_get (nexus->pwc);

    if (command.code == dada_pwc_exit)
      nexus->pwc->quit = 1;

    else if (dada_pwc_nexus_send (nexus, command) < 0)
     multilog_fprintf (stderr, LOG_ERR, "error issuing command = %d\n", command.code);
  }

  return 0;
}