#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "config.h"
#include "ipcio.h"
#ifdef HAVE_CUDA
#include <cuda_runtime.h>
#endif
void ipcio_init (ipcio_t* ipc)
{
ipc -> bytes = 0;
ipc -> rdwrt = 0;
ipc -> curbuf = 0;
ipc -> curbufsz = 0;
ipc -> rdwrt = 0;
ipc -> marked_filled = 0;
ipc -> sod_pending = 0;
ipc -> sod_buf = 0;
ipc -> sod_byte = 0;
ipc -> bufs_opened_max = 0;
ipc -> bufs_opened = 0;
ipc -> buf_ptrs = NULL;
}
void ipcio_alloc (ipcio_t* ipc)
{
uint64_t nbufs = ipcbuf_get_nbufs ((ipcbuf_t *) ipc);
ipc->bufs_opened_max = nbufs - 1;
#ifdef _DEBUG
fprintf (stderr, "ipcio_alloc: nbufs=%"PRIu64" buf_ptrs=%p\n", nbufs, ipc->buf_ptrs);
#endif
if (ipc->buf_ptrs)
free (ipc->buf_ptrs);
ipc->buf_ptrs = malloc(sizeof(char *) * nbufs);
#ifdef _DEBUG
fprintf (stderr, "ipc->buf_ptrs=()\n");
#endif
}
void ipcio_dealloc (ipcio_t* ipc)
{
if (ipc->buf_ptrs)
free (ipc->buf_ptrs);
ipc -> buf_ptrs = NULL;
ipc -> bufs_opened_max = 0;
ipc -> bufs_opened = 0;
}
int ipcio_create (ipcio_t* ipc, key_t key, uint64_t nbufs, uint64_t bufsz, unsigned num_read)
{
return ipcio_create_work (ipc, key, nbufs, bufsz, num_read, -1);
}
int ipcio_create_work (ipcio_t* ipc, key_t key, uint64_t nbufs, uint64_t bufsz, unsigned num_read, int device_id)
{
if (ipcbuf_create_work ((ipcbuf_t*)ipc, key, nbufs, bufsz, num_read, device_id) < 0)
{
fprintf (stderr, "ipcio_create: ipcbuf_create error\n");
return -1;
}
ipcio_init (ipc);
return 0;
}
int ipcio_connect (ipcio_t* ipc, key_t key)
{
if (ipcbuf_connect ((ipcbuf_t*)ipc, key) < 0) {
fprintf (stderr, "ipcio_connect: ipcbuf_connect error\n");
return -1;
}
ipcio_init (ipc);
return 0;
}
int ipcio_disconnect (ipcio_t* ipc)
{
if (ipcbuf_disconnect ((ipcbuf_t*)ipc) < 0) {
fprintf (stderr, "ipcio_disconnect: ipcbuf_disconnect error\n");
return -1;
}
ipcio_init (ipc);
return 0;
}
int ipcio_destroy (ipcio_t* ipc)
{
ipcio_init (ipc);
return ipcbuf_destroy ((ipcbuf_t*)ipc);
}
int ipcio_open (ipcio_t* ipc, char rdwrt)
{
if (rdwrt != 'R' && rdwrt != 'r' && rdwrt != 'w' && rdwrt != 'W') {
fprintf (stderr, "ipcio_open: invalid rdwrt = '%c'\n", rdwrt);
return -1;
}
ipc -> rdwrt = 0;
ipc -> bytes = 0;
ipc -> curbuf = 0;
if (rdwrt == 'w' || rdwrt == 'W') {
if (ipcbuf_lock_write ((ipcbuf_t*)ipc) < 0) {
fprintf (stderr, "ipcio_open: error ipcbuf_lock_write\n");
return -1;
}
if (rdwrt == 'w' && ipcbuf_disable_sod((ipcbuf_t*)ipc) < 0) {
fprintf (stderr, "ipcio_open: error ipcbuf_disable_sod\n");
return -1;
}
ipc -> rdwrt = rdwrt;
ipcio_alloc (ipc);
return 0;
}
if (rdwrt == 'R') {
#ifdef _DEBUG
fprintf (stderr, "ipcio_open: ipcbuf_lock_read(ipc)\n");
#endif
if (ipcbuf_lock_read ((ipcbuf_t*)ipc) < 0) {
fprintf (stderr, "ipcio_open: error ipcbuf_lock_read\n");
return -1;
}
}
ipc -> rdwrt = rdwrt;
ipcio_alloc (ipc);
return 0;
}
uint64_t ipcio_get_start_minimum (ipcio_t* ipc)
{
uint64_t bufsz = ipcbuf_get_bufsz ((ipcbuf_t*)ipc);
uint64_t minbuf = ipcbuf_get_sod_minbuf ((ipcbuf_t*)ipc);
return minbuf * bufsz;
}
int ipcio_check_pending_sod (ipcio_t* ipc)
{
ipcbuf_t* buf = (ipcbuf_t*) ipc;
if (ipc->sod_pending == 0)
return 0;
if (ipcbuf_get_write_count (buf) <= ipc->sod_buf)
return 0;
if (ipcbuf_enable_sod (buf, ipc->sod_buf, ipc->sod_byte) < 0) {
fprintf (stderr, "ipcio_check_pendind_sod: fail ipcbuf_enable_sod\n");
return -1;
}
ipc->sod_pending = 0;
return 0;
}
int ipcio_start (ipcio_t* ipc, uint64_t byte)
{
ipcbuf_t* buf = (ipcbuf_t*) ipc;
uint64_t bufsz = ipcbuf_get_bufsz (buf);
if (ipc->rdwrt != 'w') {
fprintf (stderr, "ipcio_start: invalid ipcio_t (%c)\n",ipc->rdwrt);
return -1;
}
ipc->sod_buf = byte / bufsz;
ipc->sod_byte = byte % bufsz;
ipc->sod_pending = 1;
ipc->rdwrt = 'W';
return ipcio_check_pending_sod (ipc);
}
int ipcio_stop_close (ipcio_t* ipc, char unlock)
{
if (ipc -> rdwrt == 'W') {
#ifdef _DEBUG
if (ipc->curbuf)
fprintf (stderr, "ipcio_close:W curbuf: %"PRIu64" nextbuf: %"PRIu64" %"
PRIu64" bytes. buf[0]=%x\n", ipc->buf.sync->w_buf_curr,
ipc->buf.sync->w_buf_next, ipc->bytes, (void *) ipc->curbuf);
#endif
if (ipcbuf_is_writing((ipcbuf_t*)ipc)) {
while (ipc->bufs_opened > 0)
{
uint64_t bufsz = ipcbuf_get_bufsz((ipcbuf_t*)ipc);
#ifdef _DEBUG
fprintf (stderr, "ipcio_close: bufs_opened=%u, close_block_write(%lu)\n", ipc->bufs_opened, bufsz);
#endif
if (ipcio_close_block_write(ipc, bufsz) < 0)
{
fprintf (stderr, "ipcio_close: failed to close an open buffer bufs_opened=%u\n", ipc->bufs_opened);
return -1;
}
}
if (ipcbuf_enable_eod ((ipcbuf_t*)ipc) < 0) {
fprintf (stderr, "ipcio_close:W error ipcbuf_enable_eod\n");
return -1;
}
if (ipcbuf_mark_filled ((ipcbuf_t*)ipc, ipc->bytes) < 0) {
fprintf (stderr, "ipcio_close:W error ipcbuf_mark_filled\n");
return -1;
}
if (ipcio_check_pending_sod (ipc) < 0) {
fprintf (stderr, "ipcio_close:W error ipcio_check_pending_sod\n");
return -1;
}
ipc->marked_filled = 1;
if (ipc->bytes == ipcbuf_get_bufsz((ipcbuf_t*)ipc)) {
#ifdef _DEBUG
fprintf (stderr, "ipcio_close:W last buffer was filled\n");
#endif
ipc->curbuf = 0;
}
}
ipc->rdwrt = 'w';
if (!unlock)
return 0;
}
if (ipc -> rdwrt == 'w') {
if (ipc->buf.sync->w_xfer > 0) {
uint64_t prev_xfer = ipc->buf.sync->w_xfer - 1;
ipc->buf.sync->w_buf_curr = ipc->buf.sync->e_buf[prev_xfer % IPCBUF_XFERS]+1;
ipc->buf.sync->w_buf_next = ipc->buf.sync->w_buf_curr;
}
if (ipcbuf_unlock_write ((ipcbuf_t*)ipc) < 0) {
fprintf (stderr, "ipcio_close:W error ipcbuf_unlock_write\n");
return -1;
}
ipc -> rdwrt = 0;
return 0;
}
if (ipc -> rdwrt == 'R') {
#ifdef _DEBUG
fprintf (stderr, "ipcio_close:R ipcbuf_unlock_read()\n");
#endif
if (ipcbuf_unlock_read ((ipcbuf_t*)ipc) < 0) {
fprintf (stderr, "ipcio_close:R error ipcbuf_unlock_read\n");
return -1;
}
ipc -> rdwrt = 0;
return 0;
}
fprintf (stderr, "ipcio_close: invalid ipcio_t\n");
return -1;
}
int ipcio_stop (ipcio_t* ipc)
{
if (ipc->rdwrt != 'W') {
fprintf (stderr, "ipcio_stop: not writing!\n");
return -1;
}
return ipcio_stop_close (ipc, 0);
}
int ipcio_close (ipcio_t* ipc)
{
return ipcio_stop_close (ipc, 1);
}
int ipcio_is_open (ipcio_t* ipc)
{
char rdwrt = ipc->rdwrt;
return rdwrt == 'R' || rdwrt == 'r' || rdwrt == 'w' || rdwrt == 'W';
}
ssize_t ipcio_write (ipcio_t* ipc, char* ptr, size_t bytes)
{
size_t space = 0;
size_t towrite = bytes;
if (ipc->rdwrt != 'W' && ipc->rdwrt != 'w') {
fprintf (stderr, "ipcio_write: invalid ipcio_t (%c)\n",ipc->rdwrt);
return -1;
}
#ifdef HAVE_CUDA
int device_id = ipcbuf_get_device ((ipcbuf_t*)ipc);
cudaError_t err;
if (device_id >= 0)
{
#ifdef _DEBUG
fprintf (stderr, "ipcio_write: cudaSetDevice(%d)\n", device_id);
#endif
err = cudaSetDevice (device_id);
if (err != cudaSuccess)
{
fprintf (stderr, "ipcio_write: cudaSetDevice failed %s\n",
cudaGetErrorString(err));
return -1;
}
}
else
{
#ifdef _DEBUG
fprintf (stderr, "ipcio_write: device_id=%d\n", device_id);
#endif
}
#endif
while (bytes) {
if (ipc->bytes == ipcbuf_get_bufsz((ipcbuf_t*)ipc)) {
if (!ipc->marked_filled) {
#ifdef _DEBUG
fprintf (stderr, "ipcio_write buffer:%"PRIu64" mark_filled\n",
ipc->buf.sync->w_buf_curr);
#endif
if (ipcbuf_mark_filled ((ipcbuf_t*)ipc, ipc->bytes) < 0) {
fprintf (stderr, "ipcio_write: error ipcbuf_mark_filled\n");
return -1;
}
if (ipcio_check_pending_sod (ipc) < 0) {
fprintf (stderr, "ipcio_write: error ipcio_check_pending_sod\n");
return -1;
}
}
ipc->curbuf = 0;
ipc->bytes = 0;
ipc->marked_filled = 1;
}
if (!ipc->curbuf) {
#ifdef _DEBUG
fprintf (stderr, "ipcio_write buffer:%"PRIu64" ipcbuf_get_next_write\n",
ipc->buf.sync->w_buf_next);
#endif
ipc->curbuf = ipcbuf_get_next_write ((ipcbuf_t*)ipc);
#ifdef _DEBUG
fprintf (stderr, "ipcio_write: ipcbuf_get_next_write returns\n");
#endif
if (!ipc->curbuf) {
fprintf (stderr, "ipcio_write: ipcbuf_next_write\n");
return -1;
}
ipc->marked_filled = 0;
ipc->bytes = 0;
}
space = ipcbuf_get_bufsz((ipcbuf_t*)ipc) - ipc->bytes;
if (space > bytes)
space = bytes;
#ifdef _DEBUG
fprintf (stderr, "ipcio_write space=%"PRIu64"\n", space);
#endif
if (space > 0) {
#ifdef _DEBUG
fprintf (stderr, "ipcio_write buffer:%"PRIu64" offset:%"PRIu64
" count=%"PRIu64"\n", ipc->buf.sync->w_buf_curr, ipc->bytes, space);
#endif
#ifdef HAVE_CUDA
if (device_id >= 0)
{
#ifdef _DEBUG
fprintf (stderr, "ipcio_write: cudaMemcpy (%p, %p, :%"PRIu64", cudaMemcpyHostToDevice\n",
(void *) ipc->curbuf + ipc->bytes, (void *) ptr, space);
#endif
err = cudaMemcpy(ipc->curbuf + ipc->bytes, ptr, space, cudaMemcpyHostToDevice);
if (err != cudaSuccess)
{
fprintf (stderr, "ipcio_write: cudaMemcpy failed %s\n", cudaGetErrorString(err));
}
#ifdef _DEBUG
fprintf (stderr, "ipcio_write: cudaDeviceSynchronize()\n");
#endif
cudaDeviceSynchronize();
}
else
#endif
{
#ifdef _DEBUG
fprintf (stderr, "ipcio_write: memcpy (%p, %p, :%"PRIu64"\n",
(void *) ipc->curbuf + ipc->bytes, (void *) ptr, space);
#endif
memcpy (ipc->curbuf + ipc->bytes, ptr, space);
}
ipc->bytes += space;
ptr += space;
bytes -= space;
}
}
return towrite;
}
char * ipcio_open_block_read (ipcio_t *ipc, uint64_t *curbufsz, uint64_t *block_id)
{
if (ipc->bytes != 0)
{
fprintf (stderr, "ipcio_open_block_read: ipc->bytes != 0\n");
return 0;
}
if (ipc->curbuf)
{
fprintf(stderr, "ipcio_open_block_read: ipc->curbuf != 0\n");
return 0;
}
if (ipc -> rdwrt != 'r' && ipc -> rdwrt != 'R')
{
fprintf(stderr, "ipcio_open_block_read: ipc -> rdwrt != [rR]\n");
return 0;
}
if (ipcbuf_eod((ipcbuf_t*)ipc))
{
fprintf(stderr, "ipcio_open_block_read: ipcbuf_eod true, returning null ptr\n");
return 0;
}
ipc->curbuf = ipcbuf_get_next_read ((ipcbuf_t*)ipc, &(ipc->curbufsz));
if (!ipc->curbuf)
{
fprintf (stderr, "ipcio_open_block_read: could not get next block rdwrt=%c\n", ipc -> rdwrt);
return 0;
}
*block_id = ipcbuf_get_read_index ((ipcbuf_t*)ipc);
*curbufsz = ipc->curbufsz;
ipc->bytes = 0;
return ipc->curbuf;
}
ssize_t ipcio_close_block_read (ipcio_t *ipc, uint64_t bytes)
{
if (ipc->bytes != 0)
{
fprintf (stderr, "ipcio_close_block_read: ipc->bytes != 0\n");
return -1;
}
if (!ipc->curbuf)
{
fprintf (stderr, "ipcio_close_block_read: ipc->curbuf == 0\n");
return -1;
}
if (ipc -> rdwrt != 'R')
{
fprintf (stderr, "ipcio_close_block_read: ipc->rdwrt != W\n");
return -1;
}
if (bytes != ipc->curbufsz)
{
fprintf (stderr, "ipcio_close_block_read: WARNING! bytes [%"PRIu64"] != ipc->curbufsz [%"PRIu64"]\n",
bytes, ipc->curbufsz);
}
ipc->bytes += bytes;
if (ipc->bytes == ipc->curbufsz)
{
if (ipcbuf_mark_cleared ((ipcbuf_t*)ipc) < 0)
{
fprintf (stderr, "ipcio_close_block: error ipcbuf_mark_filled\n");
return -1;
}
ipc->curbuf = 0;
ipc->bytes = 0;
}
return 0;
}
char * ipcio_open_block_write (ipcio_t *ipc, uint64_t *block_id)
{
if (ipc->bytes != 0)
{
fprintf (stderr, "ipcio_open_block_write: ipc->bytes != 0\n");
return 0;
}
if (ipc->curbuf && ipc->bufs_opened == 0)
{
fprintf (stderr, "ipcio_open_block_write: ipc->curbuf != 0\n");
return 0;
}
if (ipc -> rdwrt != 'W')
{
fprintf (stderr, "ipcio_open_block_write: ipc -> rdwrt != W\n");
return 0;
}
if (ipc->bufs_opened >= ipc->bufs_opened_max)
{
fprintf (stderr, "ipcio_open_block_write: cannot open more than %u bufs\n",
ipc->bufs_opened_max);
return 0;
}
#ifdef _DEBUG
fprintf (stderr, "ipcio_open_block_write: ipcbuf_get_next_write(ipc)\n");
#endif
char * buf_ptr = ipcbuf_get_next_write ((ipcbuf_t*) ipc);
if (!buf_ptr)
{
fprintf (stderr, "ipcio_open_block_write: could not get next block rdwrt=%c\n", ipc -> rdwrt);
return 0;
}
#ifdef _DEBUG
fprintf (stderr, "ipcio_open_block_write: saving buf_ptr=%p to ipc->buf_ptrs[%u]=%p\n",
buf_ptr, ipc->bufs_opened, ipc->buf_ptrs[ipc->bufs_opened]);
#endif
ipc->buf_ptrs[ipc->bufs_opened] = buf_ptr;
if (ipc->bufs_opened == 0)
{
ipc->curbuf = ipc->buf_ptrs[ipc->bufs_opened];
ipc->marked_filled = 0;
ipc->bytes = 0;
}
ipc->bufs_opened++;
#ifdef _DEBUG
fprintf (stderr, "ipcio_open_block_write: ipcbuf_get_write_index (ipc)\n");
#endif
*block_id = ipcbuf_get_write_index ((ipcbuf_t*)ipc);
return buf_ptr;
}
int ipcio_zero_next_block (ipcio_t *ipc)
{
if (ipc -> rdwrt != 'W')
{
fprintf(stderr, "ipcio_open_block_write: ipc -> rdwrt != W\n");
return -1;
}
return ipcbuf_zero_next_write ((ipcbuf_t*)ipc);
}
ssize_t ipcio_update_block_write (ipcio_t *ipc, uint64_t bytes)
{
if (ipc->bytes != 0)
{
fprintf (stderr, "ipcio_update_block_write: ipc->bytes [%"PRIu64"] != 0\n", ipc->bytes);
return -1;
}
if (!ipc->curbuf)
{
fprintf (stderr, "ipcio_update_block_write: ipc->curbuf == 0\n");
return -1;
}
if (ipc->rdwrt != 'W')
{
fprintf(stderr, "ipcio_update_block_write: ipc->rdwrt != W\n");
return -1;
}
if (ipc->bytes + bytes > ipcbuf_get_bufsz((ipcbuf_t*)ipc))
{
fprintf(stderr, "ipcio_update_block_write: wrote more bytes than there was space for! [%"PRIu64" + %"PRIu64"] > %"PRIu64"\n",
ipc->bytes, bytes, ipcbuf_get_bufsz((ipcbuf_t*)ipc));
return -1;
}
ipc->bytes += bytes;
return 0;
}
ssize_t ipcio_close_block_write (ipcio_t *ipc, uint64_t bytes)
{
if (ipcio_update_block_write (ipc, bytes) < 0)
{
fprintf (stderr, "ipcio_close_block_write: ipcio_update_block_write failed\n");
return -1;
}
if (!ipc->marked_filled)
{
if (ipcbuf_mark_filled ((ipcbuf_t*)ipc, ipc->bytes) < 0)
{
fprintf (stderr, "ipcio_close_block_write: error ipcbuf_mark_filled\n");
return -2;
}
if (ipcio_check_pending_sod (ipc) < 0)
{
fprintf (stderr, "ipcio_close_block_write: error ipcio_check_pending_sod\n");
return -3;
}
ipc->bufs_opened--;
if (ipc->bufs_opened == 0)
{
ipc->buf_ptrs[0] = NULL;
ipc->curbuf = 0;
ipc->marked_filled = 1;
ipc->bytes = 0;
}
else
{
int idx;
for (idx=0; idx<ipc->bufs_opened; idx++)
ipc->buf_ptrs[idx] = ipc->buf_ptrs[idx+1];
ipc->buf_ptrs[ipc->bufs_opened] = NULL;
ipc->curbuf = ipc->buf_ptrs[0];
ipc->marked_filled = 0;
ipc->bytes = 0;
}
}
return 0;
}
ssize_t ipcio_read (ipcio_t* ipc, char* ptr, size_t bytes)
{
size_t space = 0;
size_t toread = bytes;
if (ipc -> rdwrt != 'r' && ipc -> rdwrt != 'R')
{
fprintf (stderr, "ipcio_read: invalid ipcio_t (rdwrt=%c)\n", ipc->rdwrt);
return -1;
}
#ifdef HAVE_CUDA
int device_id = ipcbuf_get_device ((ipcbuf_t*)ipc);
cudaError_t err;
if (device_id >= 0)
{
#ifdef _DEBUG
fprintf (stderr, "ipcio_read: cudaSetDevice(%d)\n", device_id);
#endif
err = cudaSetDevice (device_id);
if (err != cudaSuccess)
{
fprintf (stderr, "ipcio_read: cudaSetDevice failed %s\n",
cudaGetErrorString(err));
return -1;
}
}
else
{
#ifdef _DEBUG
fprintf (stderr, "ipcio_read: device_id=%d\n", device_id);
#endif
}
#endif
while (!ipcbuf_eod((ipcbuf_t*)ipc))
{
if (!ipc->curbuf)
{
ipc->curbuf = ipcbuf_get_next_read ((ipcbuf_t*)ipc, &(ipc->curbufsz));
#ifdef _DEBUG
fprintf (stderr, "ipcio_read buffer:%"PRIu64" %"PRIu64" bytes. buf[0]=%p\n",
ipc->buf.sync->r_bufs[0], ipc->curbufsz, (void *) (ipc->curbuf));
#endif
if (!ipc->curbuf)
{
fprintf (stderr, "ipcio_read: error ipcbuf_next_read\n");
return -1;
}
ipc->bytes = 0;
}
if (bytes)
{
space = ipc->curbufsz - ipc->bytes;
if (space > bytes)
space = bytes;
if (ptr)
{
#ifdef HAVE_CUDA
if (device_id >= 0)
{
#ifdef _DEBUG
fprintf (stderr, "ipcio_read: cudaMemcpy (%p, %p, ,%"PRIu64", cudaMemcpyHostToDevice)\n",
(void *) ptr, (void *) ipc->curbuf + ipc->bytes, space);
#endif
err = cudaMemcpy(ptr, ipc->curbuf + ipc->bytes, space, cudaMemcpyDeviceToHost);
if (err != cudaSuccess)
{
fprintf (stderr, "ipcio_read: cudaMemcpy failed %s\n", cudaGetErrorString(err));
}
#ifdef _DEBUG
fprintf (stderr, "ipcio_write: cudaDeviceSynchronize()\n");
#endif
cudaDeviceSynchronize();
}
else
#endif
{
#ifdef _DEBUG
fprintf (stderr, "ipcio_read: memcpy (%p, %p, ,%"PRIu64")\n",
(void *) ptr, (void *) ipc->curbuf + ipc->bytes, space);
#endif
memcpy (ptr, ipc->curbuf + ipc->bytes, space);
}
ptr += space;
}
ipc->bytes += space;
bytes -= space;
}
if (ipc->bytes == ipc->curbufsz)
{
if (ipc -> rdwrt == 'R' && ipcbuf_mark_cleared ((ipcbuf_t*)ipc) < 0)
{
fprintf (stderr, "ipcio_read: error ipcbuf_mark_filled\n");
return -1;
}
ipc->curbuf = 0;
ipc->bytes = 0;
}
else if (!bytes)
break;
}
return toread - bytes;
}
uint64_t ipcio_tell (ipcio_t* ipc)
{
int64_t current = -1;
if (ipc -> rdwrt == 'R' || ipc -> rdwrt == 'r')
current = ipcbuf_tell_read ((ipcbuf_t*)ipc);
else if (ipc -> rdwrt == 'W' || ipc -> rdwrt == 'w')
current = ipcbuf_tell_write ((ipcbuf_t*)ipc);
if (current < 0)
{
fprintf (stderr, "ipcio_tell: failed ipcbuf_tell"
" mode=%c current=%"PRIi64"\n", ipc->rdwrt, current);
return 0;
}
return current + ipc->bytes;
}
int64_t ipcio_seek (ipcio_t* ipc, int64_t offset, int whence)
{
uint64_t current = ipcio_tell (ipc);
#ifdef _DEBUG
fprintf (stderr, "ipcio_seek: offset=%"PRIi64" tell=%"PRIu64"\n",
offset, current);
#endif
if (whence == SEEK_CUR)
offset += ipcio_tell (ipc);
if (current < offset)
{
if (ipcio_read (ipc, 0, offset - current) < 0)
{
fprintf (stderr, "ipcio_seek: empty read %"PRIi64" bytes error\n",
offset-current);
return -1;
}
}
else if (offset < current)
{
offset = current - offset;
if (offset > ipc->bytes)
{
fprintf (stderr, "ipcio_seek: %"PRIu64" > max backwards %"PRIu64"\n",
offset, ipc->bytes);
return -1;
}
ipc->bytes -= offset;
}
return ipcio_tell (ipc);
}
int64_t ipcio_space_left(ipcio_t* ipc)
{
uint64_t bufsz = ipcbuf_get_bufsz ((ipcbuf_t *)ipc);
uint64_t nbufs = ipcbuf_get_nbufs ((ipcbuf_t *)ipc);
uint64_t full_bufs = ipcbuf_get_nfull((ipcbuf_t*) ipc);
int64_t available_bufs = (nbufs - full_bufs);
#ifdef _DEBUG
uint64_t clear_bufs = ipcbuf_get_nclear((ipcbuf_t*) ipc);
fprintf (stderr,"ipcio_space_left: full_bufs = %"PRIu64", clear_bufs = %"
PRIu64", available_bufs = %"PRIu64", sum = %"PRIu64"\n",
full_bufs, clear_bufs, available_bufs, available_bufs*bufsz);
#endif
return available_bufs * bufsz;
}
float ipcio_percent_full(ipcio_t* ipc) {
uint64_t nbufs = ipcbuf_get_nbufs ((ipcbuf_t *)ipc);
uint64_t full_bufs = ipcbuf_get_nfull((ipcbuf_t*) ipc);
return ((float)full_bufs) / ((float)nbufs);
}
uint64_t ipcio_get_soclock_byte(ipcio_t* ipc) {
uint64_t bufsz = ipcbuf_get_bufsz ((ipcbuf_t *)ipc);
return bufsz * ((ipcbuf_t*)ipc)->soclock_buf;
}