#define _DEFAULT_SOURCE
#include <ctype.h>
#include <inttypes.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "rs_ctrl_os.h"
#define DYN_PREFIX_MAX 256
#define PAYLOAD_MAX 512
static const char *skip_ws(const char *p) {
while (*p == ' ' || *p == '\t' || *p == '\n' || *p == '\r') {
++p;
}
return p;
}
static int toml_line_string(const char *tbl, const char *key, char *out, size_t cap) {
const char *p = tbl;
size_t keylen = strlen(key);
while (p && *p) {
const char *line = p;
const char *nl = strchr(p, '\n');
size_t linelen = nl ? (size_t)(nl - p) : strlen(p);
const char *end = p + linelen;
const char *s = skip_ws(line);
if ((size_t)(end - s) >= keylen && strncmp(s, key, keylen) == 0) {
unsigned char c = (unsigned char)s[keylen];
if (isalnum(c) || c == '_') {
goto next_line;
}
const char *after = s + keylen;
after = skip_ws(after);
if (*after != '=') {
return -1;
}
after = skip_ws(after + 1);
size_t i = 0;
if (*after == '"') {
++after;
while (*after && *after != '"' && i + 1 < cap) {
out[i++] = *after++;
}
} else {
while (after < end && !isspace((unsigned char)*after) && *after != '#' && i + 1 < cap) {
out[i++] = *after++;
}
}
out[i] = '\0';
return 0;
}
next_line:
if (!nl) {
break;
}
p = nl + 1;
}
return -1;
}
static int toml_line_u64(const char *tbl, const char *key, uint64_t *out) {
const char *p = tbl;
size_t keylen = strlen(key);
while (p && *p) {
const char *line = p;
const char *nl = strchr(p, '\n');
size_t linelen = nl ? (size_t)(nl - p) : strlen(p);
const char *end = p + linelen;
const char *s = skip_ws(line);
if ((size_t)(end - s) >= keylen && strncmp(s, key, keylen) == 0) {
unsigned char c = (unsigned char)s[keylen];
if (isalnum(c) || c == '_') {
goto next_line_u;
}
const char *after = s + keylen;
after = skip_ws(after);
if (*after != '=') {
return -1;
}
after = skip_ws(after + 1);
return sscanf(after, "%" SCNu64, out) == 1 ? 0 : -1;
}
next_line_u:
if (!nl) {
break;
}
p = nl + 1;
}
return -1;
}
static void print_last_error(const char *where) {
char buf[512];
buf[0] = '\0';
(void)rs_ctrl_os_last_error(buf, sizeof(buf));
fprintf(stderr, "[%s] %s\n", where, buf);
}
int main(int argc, char **argv) {
const char *config_path = (argc >= 2) ? argv[1] : "example_config.toml";
int max_ticks = (argc >= 3) ? atoi(argv[2]) : 600;
if (max_ticks <= 0) {
max_ticks = 600;
}
(void)rs_ctrl_os_init_logging();
RcOsConfig *cfg = rs_ctrl_os_config_open(config_path);
if (!cfg) {
print_last_error("config_open");
return 1;
}
char *my_id = rs_ctrl_os_config_get_my_id(cfg);
char *host = rs_ctrl_os_config_get_host(cfg);
if (!my_id || !host) {
fprintf(stderr, "missing my_id/host\n");
if (my_id) {
rs_ctrl_os_str_free(my_id);
}
if (host) {
rs_ctrl_os_str_free(host);
}
rs_ctrl_os_config_destroy(cfg);
return 1;
}
printf("static: my_id=%s host=%s port=%u master=%d publish_hz=%" PRId64 " subscribe_hz=%" PRId64
" dynamic_hot_reload=%d\n",
my_id,
host,
(unsigned)rs_ctrl_os_config_get_port(cfg),
rs_ctrl_os_config_get_is_master(cfg),
(int64_t)rs_ctrl_os_config_get_publish_hz(cfg),
(int64_t)rs_ctrl_os_config_get_subscribe_hz(cfg),
rs_ctrl_os_config_get_dynamic_load_enable(cfg));
RcOsTimeSyncHandle *ts = rs_ctrl_os_time_sync_new();
RcOsServiceRegistry *reg =
rs_ctrl_os_discovery_start(my_id, host, rs_ctrl_os_config_get_port(cfg),
rs_ctrl_os_config_get_is_master(cfg), ts);
rs_ctrl_os_str_free(my_id);
rs_ctrl_os_str_free(host);
if (!reg) {
print_last_error("discovery_start");
rs_ctrl_os_time_sync_destroy(ts);
rs_ctrl_os_config_destroy(cfg);
return 1;
}
RcOsPubSub *bus = rs_ctrl_os_pubsub_new(cfg, reg);
if (!bus) {
print_last_error("pubsub_new");
rs_ctrl_os_time_sync_destroy(ts);
rs_ctrl_os_config_destroy(cfg);
return 1;
}
char prefix[DYN_PREFIX_MAX] = "hello";
uint64_t interval_ms = 200;
char prev_dyn[1024] = "";
for (int tick = 0; tick < max_ticks; ++tick) {
char *dyn = NULL;
if (rs_ctrl_os_config_get_dynamic_toml(cfg, &dyn) != RCOS_OK || !dyn) {
print_last_error("get_dynamic_toml");
break;
}
if (strcmp(dyn, prev_dyn) != 0) {
printf("[dynamic] TOML updated:\n%s\n", dyn);
strncpy(prev_dyn, dyn, sizeof(prev_dyn) - 1);
prev_dyn[sizeof(prev_dyn) - 1] = '\0';
}
{
char tmp[DYN_PREFIX_MAX];
uint64_t iv = interval_ms;
if (toml_line_string(dyn, "message_prefix", tmp, sizeof(tmp)) == 0) {
if (strcmp(tmp, prefix) != 0) {
printf("[dynamic] message_prefix -> %s\n", tmp);
strncpy(prefix, tmp, sizeof(prefix) - 1);
prefix[sizeof(prefix) - 1] = '\0';
}
}
if (toml_line_u64(dyn, "interval_ms", &iv) == 0 && iv > 0 && iv != interval_ms) {
printf("[dynamic] interval_ms -> %" PRIu64 "\n", iv);
interval_ms = iv;
}
}
rs_ctrl_os_str_free(dyn);
uint64_t ts_ms = rs_ctrl_os_time_sync_now_ms(ts);
int synced = rs_ctrl_os_time_sync_is_synced(ts);
char payload[PAYLOAD_MAX];
int n = snprintf(payload,
sizeof(payload),
"%s|node|%" PRIu64 "|synced=%d",
prefix,
ts_ms,
synced);
if (n < 0 || (size_t)n >= sizeof(payload)) {
fprintf(stderr, "payload truncated\n");
}
rcos_err_t pr = rs_ctrl_os_pubsub_publish_raw(
bus, "control", "telemetry", (const uint8_t *)payload, (size_t)strlen(payload));
if (pr != RCOS_OK) {
print_last_error("publish_raw");
}
for (int r = 0; r < 3; ++r) {
char *st = NULL;
uint8_t *pl = NULL;
size_t plen = 0;
int got = 0;
rcos_err_t rr =
rs_ctrl_os_pubsub_try_recv_raw(bus, "local_sub", NULL, &st, &pl, &plen, &got);
if (rr != RCOS_OK) {
print_last_error("try_recv_raw");
break;
}
if (got && st) {
printf("[recv] sub_topic=%s payload=%.*s\n", st, (int)plen, (const char *)pl);
rs_ctrl_os_str_free(st);
rs_ctrl_os_payload_free(pl, plen);
}
}
usleep((useconds_t)(interval_ms * 1000u));
}
rs_ctrl_os_pubsub_destroy(bus);
rs_ctrl_os_time_sync_destroy(ts);
rs_ctrl_os_config_destroy(cfg);
printf("done (%d ticks).\n", max_ticks);
return 0;
}