#include <assert.h>
#include <limits.h>
#include "dds/ddsrt/mh3.h"
#include "dds/ddsrt/md5.h"
#include "dds/ddsrt/bswap.h"
#include "dds/dds.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/q_radmin.h"
#include "dds/dds.h"
#include "dds/ddsrt/atomics.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/threads.h"
struct sampletype {
char *key;
char *value;
};
struct stp {
struct ddsi_sertopic c;
};
static bool stp_equal (const struct ddsi_sertopic *acmn, const struct ddsi_sertopic *bcmn)
{
(void) acmn; (void) bcmn;
return true;
}
static uint32_t stp_hash (const struct ddsi_sertopic *tpcmn)
{
(void) tpcmn;
return 0;
}
static void stp_free (struct ddsi_sertopic *tpcmn)
{
struct stp * const stp = (struct stp *) tpcmn;
ddsi_sertopic_fini (&stp->c);
free (stp);
}
static void stp_zero_samples (const struct ddsi_sertopic *dcmn, void *samples, size_t count)
{
(void) dcmn;
memset (samples, 0, count * sizeof (struct sampletype));
}
static void stp_realloc_samples (void **ptrs, const struct ddsi_sertopic *dcmn, void *old, size_t oldcount, size_t count)
{
(void) dcmn;
const size_t size = sizeof (struct sampletype);
char *new = (oldcount == count) ? old : dds_realloc (old, size * count);
if (new && count > oldcount)
memset (new + size * oldcount, 0, size * (count - oldcount));
for (size_t i = 0; i < count; i++)
{
void *ptr = (char *) new + i * size;
ptrs[i] = ptr;
}
}
static void stp_free_samples (const struct ddsi_sertopic *dcmn, void **ptrs, size_t count, dds_free_op_t op)
{
(void) dcmn;
if (count == 0)
return;
char *ptr = ptrs[0];
for (size_t i = 0; i < count; i++)
{
struct sampletype *s = (struct sampletype *) ptr;
dds_free (s->key);
dds_free (s->value);
ptr += sizeof (struct sampletype);
}
if (op & DDS_FREE_ALL_BIT)
{
dds_free (ptrs[0]);
}
}
static const struct ddsi_sertopic_ops stp_ops = {
.free = stp_free,
.zero_samples = stp_zero_samples,
.realloc_samples = stp_realloc_samples,
.free_samples = stp_free_samples,
.equal = stp_equal,
.hash = stp_hash
};
#if DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN
#define NATIVE_ENCODING CDR_LE
#elif DDSRT_ENDIAN == DDSRT_BIG_ENDIAN
#define NATIVE_ENCODING CDR_BE
#else
#error "DDSRT_ENDIAN neither LITTLE nor BIG"
#endif
struct sd {
struct ddsi_serdata c;
struct sampletype data;
uint32_t keysz, pad0, valuesz, pad1;
};
static uint32_t sd_get_size (const struct ddsi_serdata *dcmn)
{
const struct sd *d = (const struct sd *) dcmn;
return 4 + d->keysz + d->pad0 + d->valuesz + d->pad1;
}
static bool sd_eqkey (const struct ddsi_serdata *acmn, const struct ddsi_serdata *bcmn)
{
const struct sd *a = (const struct sd *) acmn;
const struct sd *b = (const struct sd *) bcmn;
return a->keysz == b->keysz && memcmp (a->data.key, b->data.key, a->keysz) == 0;
}
static void sd_free (struct ddsi_serdata *dcmn)
{
struct sd *d = (struct sd *) dcmn;
free (d->data.key);
free (d->data.value);
free (d);
}
static char *strdup_with_len (const char *x, size_t l)
{
char *y = malloc (l);
memcpy (y, x, l);
return y;
}
static struct ddsi_serdata *sd_from_ser_iov (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size)
{
struct stp const * const stp = (const struct stp *) tpcmn;
struct sd *sd = malloc (sizeof (*sd));
ddsi_serdata_init (&sd->c, &stp->c, kind);
(void) size;
assert (niov == 1);
(void) niov;
size_t off = 0;
const char *base = (const char *) iov[0].iov_base + 4;
memcpy (&sd->keysz, base + off, sizeof (uint32_t));
off += sizeof (uint32_t);
sd->data.key = strdup_with_len (base + off, sd->keysz);
off += sd->keysz;
sd->pad0 = ((off % 4) == 0) ? 0 : 4 - (off % 4);
off += sd->pad0;
if (kind == SDK_DATA)
{
memcpy (&sd->valuesz, base + off, sizeof (uint32_t));
off += sizeof (uint32_t);
sd->data.value = strdup_with_len (base + off, sd->valuesz);
off += sd->valuesz;
sd->pad1 = ((off % 4) == 0) ? 0 : 4 - (off % 4);
}
else
{
sd->data.value = NULL;
sd->valuesz = sd->pad1 = 0;
}
sd->c.hash = ddsrt_mh3 (sd->data.key, sd->keysz, 0) ^ tpcmn->serdata_basehash;
return &sd->c;
}
static struct ddsi_serdata *sd_from_ser (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size)
{
assert (fragchain->nextfrag == NULL);
ddsrt_iovec_t iov = {
.iov_base = NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain)),
.iov_len = fragchain->maxp1 };
return sd_from_ser_iov (tpcmn, kind, 1, &iov, size);
}
static struct ddsi_serdata *sd_from_keyhash (const struct ddsi_sertopic *tpcmn, const ddsi_keyhash_t *keyhash)
{
(void) tpcmn; (void) keyhash;
return NULL;
}
static struct ddsi_serdata *sd_from_sample (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *sample)
{
const struct stp *tp = (const struct stp *) tpcmn;
struct sd *sd = malloc (sizeof (*sd));
const struct sampletype *s = sample;
ddsi_serdata_init (&sd->c, &tp->c, kind);
sd->keysz = (uint32_t) strlen (s->key) + 1;
sd->data.key = strdup_with_len (s->key, sd->keysz);
sd->pad0 = ((sd->keysz % 4) == 0) ? 0 : 4 - (sd->keysz % 4);
if (kind == SDK_DATA)
{
sd->valuesz = (uint32_t) strlen (s->value) + 1;
sd->data.value = strdup_with_len (s->value, sd->valuesz);
sd->pad1 = ((sd->valuesz % 4) == 0) ? 0 : 4 - (sd->valuesz % 4);
}
else
{
sd->data.value = NULL;
sd->valuesz = sd->pad1 = 0;
}
sd->c.hash = ddsrt_mh3 (sd->data.key, sd->keysz, 0) ^ tpcmn->serdata_basehash;
return &sd->c;
}
static struct ddsi_serdata *sd_to_topicless (const struct ddsi_serdata *serdata_common)
{
const struct sd *sd = (const struct sd *) serdata_common;
const struct stp *tp = (const struct stp *) sd->c.topic;
struct sd *sd_tl = malloc (sizeof (*sd_tl));
ddsi_serdata_init (&sd_tl->c, &tp->c, SDK_KEY);
sd_tl->c.topic = NULL;
sd_tl->c.hash = sd->c.hash;
sd_tl->c.timestamp.v = INT64_MIN;
sd_tl->data.key = strdup_with_len (sd->data.key, sd->keysz);
sd_tl->data.value = NULL;
sd_tl->keysz = sd->keysz;
sd_tl->pad0 = sd->pad0;
sd_tl->valuesz = sd_tl->pad1 = 0;
return &sd_tl->c;
}
static struct ddsi_serdata *sd_to_ser_ref (const struct ddsi_serdata *serdata_common, size_t cdr_off, size_t cdr_sz, ddsrt_iovec_t *ref)
{
const struct sd *sd = (const struct sd *) serdata_common;
assert (cdr_off == 0 && cdr_sz == sd_get_size (&sd->c));
(void) cdr_off;
(void) cdr_sz;
ref->iov_len = sd_get_size (&sd->c);
ref->iov_base = malloc (ref->iov_len);
char * const header = ref->iov_base;
header[0] = 0;
header[1] = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN) ? 1 : 0;
header[2] = 0;
header[3] = 0;
char * const base = header + 4;
size_t off = 0;
memcpy (base + off, &sd->keysz, sizeof (uint32_t));
off += sizeof (uint32_t);
memcpy (base + off, sd->data.key, sd->keysz);
off += sd->keysz;
for (uint32_t i = 0; i < sd->pad0; i++)
base[off++] = 0;
if (sd->c.kind == SDK_DATA)
{
memcpy (base + off, &sd->valuesz, sizeof (uint32_t));
off += sizeof (uint32_t);
memcpy (base + off, sd->data.value, sd->valuesz);
off += sd->valuesz;
for (uint32_t i = 0; i < sd->pad0; i++)
base[off++] = 0;
}
return (struct ddsi_serdata *) &sd->c;
}
static void sd_to_ser_unref (struct ddsi_serdata *serdata_common, const ddsrt_iovec_t *ref)
{
(void) serdata_common;
free ((void *) ref->iov_base);
}
static void sd_to_ser (const struct ddsi_serdata *serdata_common, size_t off, size_t sz, void *buf)
{
ddsrt_iovec_t iov;
(void) sd_to_ser_ref (serdata_common, off, sz, &iov);
memcpy (buf, iov.iov_base, iov.iov_len);
sd_to_ser_unref ((struct ddsi_serdata *) serdata_common, &iov);
}
static bool sd_to_sample (const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim)
{
const struct sd *sd = (const struct sd *) serdata_common;
struct sampletype *s = sample;
if (bufptr) abort(); else { (void)buflim; }
s->key = dds_realloc (s->key, sd->keysz);
memcpy (s->key, sd->data.key, sd->keysz);
if (sd->c.kind == SDK_DATA)
{
s->value = dds_realloc (s->value, sd->valuesz);
memcpy (s->value, sd->data.value, sd->valuesz);
}
return true;
}
static bool sd_topicless_to_sample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim)
{
(void) topic;
const struct sd *sd = (const struct sd *) serdata_common;
struct sampletype *s = sample;
if (bufptr) abort(); else { (void)buflim; }
s->key = dds_realloc (s->key, sd->keysz);
memcpy (s->key, sd->data.key, sd->keysz);
return true;
}
static size_t sd_print (const struct ddsi_sertopic *sertopic_common, const struct ddsi_serdata *serdata_common, char *buf, size_t size)
{
(void) sertopic_common;
const struct sd *sd = (const struct sd *) serdata_common;
int cnt;
if (sd->c.kind == SDK_DATA)
cnt = snprintf (buf, size, "%s -> %s", sd->data.key, sd->data.value);
else
cnt = snprintf (buf, size, "%s", sd->data.key);
return ((size_t) cnt > size) ? size : (size_t) cnt;
}
static void sd_get_keyhash (const struct ddsi_serdata *serdata_common, struct ddsi_keyhash *buf, bool force_md5)
{
(void) force_md5; struct sd const * const sd = (const struct sd *) serdata_common;
const uint32_t keysz_be = ddsrt_toBE4u (sd->keysz);
ddsrt_md5_state_t md5st;
ddsrt_md5_init (&md5st);
ddsrt_md5_append (&md5st, (ddsrt_md5_byte_t *) &keysz_be, sizeof (keysz_be));
ddsrt_md5_append (&md5st, (ddsrt_md5_byte_t *) sd->data.key, sd->keysz);
ddsrt_md5_finish (&md5st, (ddsrt_md5_byte_t *) (buf->value));
}
static const struct ddsi_serdata_ops sd_ops = {
.get_size = sd_get_size,
.eqkey = sd_eqkey,
.free = sd_free,
.from_ser = sd_from_ser,
.from_ser_iov = sd_from_ser_iov,
.from_keyhash = sd_from_keyhash,
.from_sample = sd_from_sample,
.to_ser = sd_to_ser,
.to_sample = sd_to_sample,
.to_ser_ref = sd_to_ser_ref,
.to_ser_unref = sd_to_ser_unref,
.to_topicless = sd_to_topicless,
.topicless_to_sample = sd_topicless_to_sample,
.print = sd_print,
.get_keyhash = sd_get_keyhash
};
static struct ddsi_sertopic *make_sertopic (const char *topicname, const char *typename)
{
struct stp *stp = malloc (sizeof (*stp));
ddsi_sertopic_init (&stp->c, topicname, typename, &stp_ops, &sd_ops, false);
return &stp->c;
}
char *create_unique_topic_name (const char *prefix, char *name, size_t size)
{
static ddsrt_atomic_uint32_t count = DDSRT_ATOMIC_UINT64_INIT (0);
const ddsrt_pid_t pid = ddsrt_getpid();
const ddsrt_tid_t tid = ddsrt_gettid();
const uint32_t nr = ddsrt_atomic_inc32_nv (&count);
(void) snprintf (name, size, "%s%"PRIu32"_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, 1, 1);
return name;
}
int main(int argc, char *argv[])
{
dds_return_t rc;
char topicname[100];
const dds_entity_t pp = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
create_unique_topic_name ("ddsc_cdr_basic", topicname, sizeof topicname);
printf("Topic: %s\n", topicname);
struct ddsi_sertopic *st = make_sertopic (topicname, "x");
const dds_entity_t tp = dds_create_topic_generic (pp, &st, NULL, NULL, NULL);
const dds_entity_t rd = dds_create_reader (pp, tp, NULL, NULL);
struct sampletype xs[] = {
{ .key = "aap", .value = "banaan" },
{ .key = "kolibrie", .value = "nectar" }
};
do
{
{
struct sampletype s = { .key = NULL, .value = NULL };
void *raw = &s;
dds_sample_info_t si;
for (size_t i = 0; i < sizeof (xs) / sizeof (xs[0]); i++)
{
rc = dds_read_mask (rd, &raw, &si, 1, 1, DDS_NOT_READ_SAMPLE_STATE);
if (NULL != s.key)
{
printf("1 Reading message KEY: %s\n", s.key);
}
if (NULL != s.value)
{
printf("1 Reading message VALUE: %s\n", s.value);
}
}
rc = dds_read_mask (rd, &raw, &si, 1, 1, DDS_NOT_READ_SAMPLE_STATE);
dds_free (s.key);
dds_free (s.value);
}
printf("Reading message\n");
dds_sleepfor (DDS_MSECS (100));
} while(true);
rc = dds_delete (pp);
}