#include "lang/internal.h"
#include "lang/env.h"
#include "lang/parse.h"
#include "mem/heap.h"
#include "mem/sys.h"
#include "store/serde.h"
#include "store/splay.h"
#include "store/part.h"
#include "core/ipc.h"
#include <time.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#if !defined(RAY_OS_WINDOWS)
#include <unistd.h>
#endif
ray_t* ray_ser_fn(ray_t* val) {
return ray_ser(val);
}
ray_t* ray_de_fn(ray_t* val) {
return ray_de(val);
}
static const char* splay_default_sym(const char* dir, char* buf, size_t bufsz,
bool must_exist) {
int n = snprintf(buf, bufsz, "%s/sym", dir);
if (n < 0 || (size_t)n >= bufsz) return NULL;
if (must_exist && access(buf, F_OK) != 0) return NULL;
return buf;
}
static const char* str_to_cpath(ray_t* s, char* buf, size_t bufsz) {
if (!s || s->type != -RAY_STR) return NULL;
const char* p = ray_str_ptr(s);
size_t len = ray_str_len(s);
if (!p || len == 0 || len >= bufsz) return NULL;
memcpy(buf, p, len);
buf[len] = '\0';
return buf;
}
ray_t* ray_set_splayed_fn(ray_t** args, int64_t n) {
if (n < 2 || n > 3) return ray_error("domain", NULL);
char dir[1024];
if (!str_to_cpath(args[0], dir, sizeof(dir))) return ray_error("type", NULL);
ray_t* tbl = args[1];
if (!tbl || tbl->type != RAY_TABLE) return ray_error("type", NULL);
char sym[1024];
const char* sym_path = NULL;
if (n == 3 && args[2] && args[2]->type == -RAY_STR)
sym_path = str_to_cpath(args[2], sym, sizeof(sym));
else
sym_path = splay_default_sym(dir, sym, sizeof(sym), false);
ray_err_t err = ray_splay_save(tbl, dir, sym_path);
if (err != RAY_OK) return ray_error(ray_err_code_str(err), NULL);
ray_retain(tbl);
return tbl;
}
ray_t* ray_get_splayed_fn(ray_t** args, int64_t n) {
if (n < 1 || n > 2) return ray_error("domain", NULL);
char dir[1024];
if (!str_to_cpath(args[0], dir, sizeof(dir))) return ray_error("type", NULL);
char sym[1024];
const char* sym_path = NULL;
if (n == 2 && args[1] && args[1]->type == -RAY_STR)
sym_path = str_to_cpath(args[1], sym, sizeof(sym));
else
sym_path = splay_default_sym(dir, sym, sizeof(sym), true);
return ray_splay_load(dir, sym_path);
}
ray_t* ray_get_parted_fn(ray_t** args, int64_t n) {
if (n != 2) return ray_error("domain", NULL);
char root[1024];
if (!str_to_cpath(args[0], root, sizeof(root))) return ray_error("type", NULL);
if (!args[1] || args[1]->type != -RAY_SYM) return ray_error("type", NULL);
ray_t* name_atom = ray_sym_str(args[1]->i64);
if (!name_atom) return ray_error("name", NULL);
char name[256];
size_t nlen = ray_str_len(name_atom);
if (nlen == 0 || nlen >= sizeof(name)) return ray_error("domain", NULL);
memcpy(name, ray_str_ptr(name_atom), nlen);
name[nlen] = '\0';
return ray_read_parted(root, name);
}
#include <sys/stat.h>
#include <dirent.h>
static int dir_is_splayed(const char* dir) {
char path[1024];
int n = snprintf(path, sizeof(path), "%s/.d", dir);
if (n <= 0 || n >= (int)sizeof(path)) return 0;
return access(path, F_OK) == 0;
}
static int name_looks_partition(const char* name) {
if (!name || !name[0]) return 0;
for (const char* c = name; *c; c++)
if (!(*c == '.' || (*c >= '0' && *c <= '9'))) return 0;
return 1;
}
static int dir_is_parted_root(const char* dir) {
DIR* d = opendir(dir);
if (!d) return 0;
int found = 0;
struct dirent* ent;
while ((ent = readdir(d)) != NULL) {
if (ent->d_name[0] == '.') continue;
if (strcmp(ent->d_name, "sym") == 0) continue;
if (!name_looks_partition(ent->d_name)) continue;
char child[2048];
int n = snprintf(child, sizeof(child), "%s/%s", dir, ent->d_name);
if (n <= 0 || n >= (int)sizeof(child)) continue;
struct stat st;
if (stat(child, &st) == 0 && S_ISDIR(st.st_mode)) { found = 1; break; }
}
closedir(d);
return found;
}
static void mount_record(int64_t* names_buf, ray_t** vals_buf, int* count,
int max, const char* name, size_t nlen, ray_t* tbl) {
if (*count >= max) return;
int64_t sym_id = ray_sym_intern(name, nlen);
ray_env_set(sym_id, tbl);
names_buf[*count] = sym_id;
ray_retain(tbl);
vals_buf[*count] = tbl;
(*count)++;
}
static ray_t* finalize_mount_dict(int64_t* names_buf, ray_t** vals_buf, int count) {
if (count == 0) return ray_dict_new(ray_list_new(0), ray_list_new(0));
ray_t* keys = ray_vec_new(RAY_SYM, count);
if (!keys || RAY_IS_ERR(keys)) return keys ? keys : ray_error("oom", NULL);
keys->len = count;
int64_t* k = (int64_t*)ray_data(keys);
for (int i = 0; i < count; i++) k[i] = names_buf[i];
ray_t* vals = ray_list_new(count);
if (!vals || RAY_IS_ERR(vals)) { ray_release(keys); return vals ? vals : ray_error("oom", NULL); }
for (int i = 0; i < count; i++) {
vals = ray_list_append(vals, vals_buf[i]);
ray_release(vals_buf[i]);
}
return ray_dict_new(keys, vals);
}
ray_t* ray_db_splayed_mount_fn(ray_t** args, int64_t n) {
if (n != 1) return ray_error("domain", NULL);
char root[1024];
if (!str_to_cpath(args[0], root, sizeof(root))) return ray_error("type", NULL);
DIR* d = opendir(root);
if (!d) return ray_error("io", "cannot open directory");
int64_t names_buf[256];
ray_t* vals_buf[256];
int count = 0;
struct dirent* ent;
while ((ent = readdir(d)) != NULL) {
if (ent->d_name[0] == '.') continue;
char child[2048];
int cn = snprintf(child, sizeof(child), "%s/%s", root, ent->d_name);
if (cn <= 0 || cn >= (int)sizeof(child)) continue;
struct stat st;
if (stat(child, &st) != 0 || !S_ISDIR(st.st_mode)) continue;
if (!dir_is_splayed(child)) continue;
ray_t* tbl = ray_splay_load(child, NULL);
if (!tbl || RAY_IS_ERR(tbl)) {
if (tbl) ray_release(tbl);
continue;
}
mount_record(names_buf, vals_buf, &count, 256,
ent->d_name, strlen(ent->d_name), tbl);
ray_release(tbl);
}
closedir(d);
return finalize_mount_dict(names_buf, vals_buf, count);
}
ray_t* ray_db_parted_mount_fn(ray_t** args, int64_t n) {
if (n != 1) return ray_error("domain", NULL);
char root[1024];
if (!str_to_cpath(args[0], root, sizeof(root))) return ray_error("type", NULL);
if (!dir_is_parted_root(root))
return ray_error("domain", "not a parted-table root (no partition directories found)");
DIR* d = opendir(root);
if (!d) return ray_error("io", "cannot open directory");
char first_part[2048] = {0};
struct dirent* ent;
while ((ent = readdir(d)) != NULL) {
if (ent->d_name[0] == '.') continue;
if (strcmp(ent->d_name, "sym") == 0) continue;
if (!name_looks_partition(ent->d_name)) continue;
int cn = snprintf(first_part, sizeof(first_part), "%s/%s", root, ent->d_name);
if (cn <= 0 || cn >= (int)sizeof(first_part)) { first_part[0] = '\0'; continue; }
struct stat st;
if (stat(first_part, &st) == 0 && S_ISDIR(st.st_mode)) break;
first_part[0] = '\0';
}
closedir(d);
if (!first_part[0])
return ray_error("io", "parted root has no readable partition");
DIR* dp = opendir(first_part);
if (!dp) return ray_error("io", "cannot scan partition");
int64_t names_buf[256];
ray_t* vals_buf[256];
int count = 0;
while ((ent = readdir(dp)) != NULL) {
if (ent->d_name[0] == '.') continue;
char tbl_in_part[3072];
int cn = snprintf(tbl_in_part, sizeof(tbl_in_part), "%s/%s", first_part, ent->d_name);
if (cn <= 0 || cn >= (int)sizeof(tbl_in_part)) continue;
struct stat st;
if (stat(tbl_in_part, &st) != 0 || !S_ISDIR(st.st_mode)) continue;
ray_t* tbl = ray_read_parted(root, ent->d_name);
if (!tbl || RAY_IS_ERR(tbl)) {
if (tbl) ray_release(tbl);
continue;
}
mount_record(names_buf, vals_buf, &count, 256,
ent->d_name, strlen(ent->d_name), tbl);
ray_release(tbl);
}
closedir(dp);
return finalize_mount_dict(names_buf, vals_buf, count);
}
ray_t* ray_os_size_fn(ray_t* x) {
if (!ray_is_atom(x) || x->type != -RAY_STR)
return ray_error("type", ".os.size expects a string path");
char path[1024];
if (!str_to_cpath(x, path, sizeof(path))) return ray_error("type", NULL);
struct stat st;
if (stat(path, &st) != 0)
return ray_error("io", "%s: %s", path, strerror(errno));
if (S_ISDIR(st.st_mode))
return ray_error("io", "%s: is a directory", path);
return ray_i64((int64_t)st.st_size);
}
static int dir_entry_cmp(const void* a, const void* b) {
const char* sa = *(const char* const*)a;
const char* sb = *(const char* const*)b;
return strcmp(sa, sb);
}
ray_t* ray_os_list_fn(ray_t* x) {
if (!ray_is_atom(x) || x->type != -RAY_STR)
return ray_error("type", ".os.list expects a string path");
char path[1024];
if (!str_to_cpath(x, path, sizeof(path))) return ray_error("type", NULL);
DIR* d = opendir(path);
if (!d) return ray_error("io", "%s: %s", path, strerror(errno));
char** names = NULL;
int64_t count = 0;
int64_t cap = 0;
struct dirent* ent;
while ((ent = readdir(d)) != NULL) {
if (ent->d_name[0] == '.' &&
(ent->d_name[1] == '\0' || (ent->d_name[1] == '.' && ent->d_name[2] == '\0')))
continue;
if (count >= cap) {
int64_t new_cap = cap == 0 ? 16 : cap * 2;
char** tmp = (char**)ray_sys_realloc(names, (size_t)new_cap * sizeof(char*));
if (!tmp) {
closedir(d);
for (int64_t i = 0; i < count; i++) ray_sys_free(names[i]);
ray_sys_free(names);
return ray_error("oom", NULL);
}
names = tmp;
cap = new_cap;
}
char* dup = ray_sys_strdup(ent->d_name);
if (!dup) {
closedir(d);
for (int64_t i = 0; i < count; i++) ray_sys_free(names[i]);
ray_sys_free(names);
return ray_error("oom", NULL);
}
names[count++] = dup;
}
closedir(d);
qsort(names, (size_t)count, sizeof(char*), dir_entry_cmp);
ray_t* result = ray_vec_new(RAY_SYM, count);
if (!result || RAY_IS_ERR(result)) {
for (int64_t i = 0; i < count; i++) ray_sys_free(names[i]);
ray_sys_free(names);
return result ? result : ray_error("oom", NULL);
}
result->len = count;
int64_t* out = (int64_t*)ray_data(result);
for (int64_t i = 0; i < count; i++) {
out[i] = ray_sym_intern(names[i], strlen(names[i]));
ray_sys_free(names[i]);
}
ray_sys_free(names);
return result;
}
static __thread uint64_t guid_rng_state = 0;
static inline uint64_t guid_rng_next(void) {
uint64_t x = guid_rng_state;
if (RAY_UNLIKELY(x == 0)) {
uint64_t a = (uint64_t)rand();
uint64_t b = (uint64_t)rand();
uint64_t c = (uint64_t)rand();
x = (a << 33) ^ (b << 17) ^ c ^ 0x9E3779B97F4A7C15ULL;
if (x == 0) x = 0x9E3779B97F4A7C15ULL;
}
x ^= x >> 12;
x ^= x << 25;
x ^= x >> 27;
guid_rng_state = x;
return x * 0x2545F4914F6CDD1DULL;
}
ray_t* ray_guid_fn(ray_t* n_arg) {
if (!n_arg || !is_numeric(n_arg)) return ray_error("type", NULL);
int64_t n = as_i64(n_arg);
if (n < 0) return ray_error("domain", NULL);
ray_t* result = ray_vec_new(RAY_GUID, n);
if (RAY_IS_ERR(result)) return result;
result->len = n;
uint8_t* data = (uint8_t*)ray_data(result);
for (int64_t i = 0; i < n; i++) {
uint64_t lo = guid_rng_next();
uint64_t hi = guid_rng_next();
memcpy(data + i * 16, &lo, 8);
memcpy(data + i * 16 + 8, &hi, 8);
data[i * 16 + 6] = (data[i * 16 + 6] & 0x0F) | 0x40;
data[i * 16 + 8] = (data[i * 16 + 8] & 0x3F) | 0x80;
}
return result;
}
ray_t* ray_eval_builtin_fn(ray_t* x) {
return ray_eval(x);
}
ray_t* ray_parse_builtin_fn(ray_t* x) {
if (x->type != -RAY_STR) return ray_error("type", "parse expects a string");
const char* src = ray_str_ptr(x);
if (!src) return ray_error("domain", NULL);
ray_t* parsed = ray_parse(src);
return parsed ? parsed : ray_error("parse", NULL);
}
ray_t* ray_meta_fn(ray_t* x) {
if (!x) return ray_error("type", NULL);
const char* tname = ray_type_name(x->type);
int64_t type_sym = ray_sym_intern("type", 4);
int64_t type_id = ray_sym_intern(tname, strlen(tname));
int64_t cap = ray_is_atom(x) ? 1 : 2;
ray_t* keys = ray_sym_vec_new(RAY_SYM_W64, cap);
if (RAY_IS_ERR(keys)) return keys;
ray_t* vals = ray_list_new(cap);
if (RAY_IS_ERR(vals)) { ray_release(keys); return vals; }
keys = ray_vec_append(keys, &type_sym);
if (RAY_IS_ERR(keys)) { ray_release(vals); return keys; }
ray_t* tv = ray_sym(type_id);
vals = ray_list_append(vals, tv);
ray_release(tv);
if (RAY_IS_ERR(vals)) { ray_release(keys); return vals; }
if (!ray_is_atom(x)) {
int64_t len_sym = ray_sym_intern("len", 3);
keys = ray_vec_append(keys, &len_sym);
if (RAY_IS_ERR(keys)) { ray_release(vals); return keys; }
int64_t row_count;
if (x->type == RAY_DICT) row_count = ray_dict_len(x);
else if (x->type == RAY_TABLE) row_count = ray_table_ncols(x);
else row_count = x->len;
ray_t* lv = make_i64(row_count);
vals = ray_list_append(vals, lv);
ray_release(lv);
if (RAY_IS_ERR(vals)) { ray_release(keys); return vals; }
}
return ray_dict_new(keys, vals);
}
ray_t* ray_gc_fn(ray_t** args, int64_t n) { (void)args; (void)n; return ray_i64(0); }
ray_t* ray_system_fn(ray_t* x) {
if (x->type != -RAY_STR) return ray_error("type", "system expects a string");
const char* cmd = ray_str_ptr(x);
if (!cmd) return ray_error("domain", NULL);
int rc = system(cmd);
return make_i64(rc);
}
ray_t* ray_getenv_fn(ray_t* x) {
if (x->type != -RAY_STR) return ray_error("type", "getenv expects a string");
const char* name = ray_str_ptr(x);
if (!name) return ray_error("domain", NULL);
const char* val = getenv(name);
return val ? ray_str(val, strlen(val)) : ray_str("", 0);
}
#if !defined(RAY_OS_WINDOWS)
extern int setenv(const char*, const char*, int);
#endif
ray_t* ray_setenv_fn(ray_t* name, ray_t* val) {
if (name->type != -RAY_STR || val->type != -RAY_STR)
return ray_error("type", "setenv expects two strings");
const char* n = ray_str_ptr(name);
const char* v = ray_str_ptr(val);
if (!n || !v) return ray_error("domain", NULL);
#if defined(RAY_OS_WINDOWS)
_putenv_s(n, v);
#else
setenv(n, v, 1);
#endif
return val;
}
ray_t* ray_quote_fn(ray_t** args, int64_t n) {
if (n < 1) return ray_error("domain", "quote expects 1 argument");
ray_retain(args[0]);
return args[0];
}
ray_t* ray_return_fn(ray_t* x) {
ray_retain(x);
return x;
}
ray_t* ray_args_fn(ray_t* x) {
(void)x;
ray_t* list = ray_list_new(0);
if (!list) return ray_error("oom", NULL);
return list;
}
ray_t* ray_rc_fn(ray_t* x) {
if (!x || RAY_IS_ERR(x)) return make_i64(0);
return make_i64((int64_t)x->rc);
}
ray_t* ray_diverse_fn(ray_t* x) {
if (ray_is_atom(x)) return make_bool(1);
if (!is_collection(x)) return ray_error("type", "diverse expects a collection");
int64_t n = ray_len(x);
if (n <= 1) return make_bool(1);
ray_t* d = ray_distinct_fn(x);
if (RAY_IS_ERR(d)) return d;
int64_t dn = ray_len(d);
ray_release(d);
return make_bool(dn == n ? 1 : 0);
}
ray_t* ray_get_fn(ray_t* dict, ray_t* key) {
return ray_at_fn(dict, key);
}
ray_t* ray_remove_fn(ray_t* dict, ray_t* key) {
if (!dict || dict->type != RAY_DICT)
return ray_error("type", "remove expects a dict");
ray_retain(dict);
return ray_dict_remove(dict, key);
}
ray_t* ray_timer_fn(ray_t* x) {
(void)x;
clock_t t = clock();
int64_t nanos = (int64_t)((double)t / (double)CLOCKS_PER_SEC * 1e9);
return make_i64(nanos);
}
ray_t* ray_env_fn(ray_t* x) {
(void)x;
int64_t sym_ids[1024];
ray_t* vals_buf[1024];
int32_t count = ray_env_list(sym_ids, vals_buf, 1024);
ray_t* keys = ray_sym_vec_new(RAY_SYM_W64, count);
if (RAY_IS_ERR(keys)) return keys;
ray_t* vals = ray_list_new(count);
if (RAY_IS_ERR(vals)) { ray_release(keys); return vals; }
for (int32_t i = 0; i < count; i++) {
keys = ray_vec_append(keys, &sym_ids[i]);
if (RAY_IS_ERR(keys)) { ray_release(vals); return keys; }
vals = ray_list_append(vals, vals_buf[i]);
if (RAY_IS_ERR(vals)) { ray_release(keys); return vals; }
}
return ray_dict_new(keys, vals);
}
ray_t* ray_internals_fn(ray_t** args, int64_t n) {
(void)args; (void)n;
ray_t* keys = ray_sym_vec_new(RAY_SYM_W64, 2);
if (RAY_IS_ERR(keys)) return keys;
ray_t* vals = ray_list_new(2);
if (RAY_IS_ERR(vals)) { ray_release(keys); return vals; }
int64_t ver_sym = ray_sym_intern("version", 7);
keys = ray_vec_append(keys, &ver_sym);
#ifdef RAYFORCE_VERSION
ray_t* v1 = ray_str(RAYFORCE_VERSION, strlen(RAYFORCE_VERSION));
#else
ray_t* v1 = ray_str("unknown", 7);
#endif
vals = ray_list_append(vals, v1); ray_release(v1);
int64_t date_sym = ray_sym_intern("build-date", 10);
keys = ray_vec_append(keys, &date_sym);
#ifdef RAYFORCE_BUILD_DATE
ray_t* v2 = ray_str(RAYFORCE_BUILD_DATE, strlen(RAYFORCE_BUILD_DATE));
#else
ray_t* v2 = ray_str("unknown", 7);
#endif
vals = ray_list_append(vals, v2); ray_release(v2);
return ray_dict_new(keys, vals);
}
ray_t* ray_memstat_fn(ray_t** args, int64_t n) {
(void)args; (void)n;
ray_mem_stats_t st;
ray_mem_stats(&st);
ray_t* keys = ray_sym_vec_new(RAY_SYM_W64, 5);
if (RAY_IS_ERR(keys)) return keys;
ray_t* vals = ray_list_new(5);
if (RAY_IS_ERR(vals)) { ray_release(keys); return vals; }
struct { const char* name; size_t nlen; int64_t v; } rows[] = {
{ "alloc-count", 11, (int64_t)st.alloc_count },
{ "bytes-allocated", 15, (int64_t)st.bytes_allocated },
{ "peak-bytes", 10, (int64_t)st.peak_bytes },
{ "slab-hits", 9, (int64_t)st.slab_hits },
{ "sys-current", 11, (int64_t)st.sys_current },
};
for (size_t i = 0; i < sizeof(rows)/sizeof(rows[0]); i++) {
int64_t s = ray_sym_intern(rows[i].name, rows[i].nlen);
keys = ray_vec_append(keys, &s);
ray_t* v = make_i64(rows[i].v);
vals = ray_list_append(vals, v); ray_release(v);
}
return ray_dict_new(keys, vals);
}
ray_t* ray_sysinfo_fn(ray_t** args, int64_t n) {
(void)args; (void)n;
ray_t* keys = ray_sym_vec_new(RAY_SYM_W64, 3);
if (RAY_IS_ERR(keys)) return keys;
ray_t* vals = ray_list_new(3);
if (RAY_IS_ERR(vals)) { ray_release(keys); return vals; }
#if !defined(RAY_OS_WINDOWS)
int64_t s1 = ray_sym_intern("cores", 5);
keys = ray_vec_append(keys, &s1);
ray_t* v1 = make_i64(sysconf(_SC_NPROCESSORS_ONLN));
vals = ray_list_append(vals, v1); ray_release(v1);
int64_t s2 = ray_sym_intern("page-size", 9);
keys = ray_vec_append(keys, &s2);
ray_t* v2 = make_i64(sysconf(_SC_PAGESIZE));
vals = ray_list_append(vals, v2); ray_release(v2);
long pages = sysconf(_SC_PHYS_PAGES);
long psize = sysconf(_SC_PAGESIZE);
int64_t s3 = ray_sym_intern("total-mem", 9);
keys = ray_vec_append(keys, &s3);
ray_t* v3 = make_i64((int64_t)pages * (int64_t)psize);
vals = ray_list_append(vals, v3); ray_release(v3);
#else
int64_t s1 = ray_sym_intern("cores", 5);
keys = ray_vec_append(keys, &s1);
ray_t* v1 = make_i64(1);
vals = ray_list_append(vals, v1); ray_release(v1);
#endif
return ray_dict_new(keys, vals);
}
ray_t* ray_hopen_fn(ray_t* x) {
if (!ray_is_atom(x) || x->type != -RAY_STR)
return ray_error("type", NULL);
const char* s = ray_str_ptr(x);
size_t slen = ray_str_len(x);
const char* parts[4] = {0};
size_t part_lens[4] = {0};
int n_parts = 0;
const char* start = s;
for (size_t i = 0; i <= slen && n_parts < 4; i++) {
if (i == slen || s[i] == ':') {
parts[n_parts] = start;
part_lens[n_parts] = (size_t)(&s[i] - start);
n_parts++;
start = &s[i + 1];
}
}
if (n_parts < 2) return ray_error("domain", NULL);
char host[256];
if (part_lens[0] >= sizeof(host)) return ray_error("domain", NULL);
memcpy(host, parts[0], part_lens[0]);
host[part_lens[0]] = '\0';
char port_str[8];
if (part_lens[1] >= sizeof(port_str)) return ray_error("domain", NULL);
memcpy(port_str, parts[1], part_lens[1]);
port_str[part_lens[1]] = '\0';
int port = atoi(port_str);
if (port <= 0 || port > 65535) return ray_error("domain", NULL);
char user[128] = "";
char password[128] = "";
if (n_parts >= 4) {
if (part_lens[2] < sizeof(user)) {
memcpy(user, parts[2], part_lens[2]);
user[part_lens[2]] = '\0';
}
if (part_lens[3] < sizeof(password)) {
memcpy(password, parts[3], part_lens[3]);
password[part_lens[3]] = '\0';
}
}
const char* pw_ptr = (n_parts >= 4) ? password : NULL;
const char* us_ptr = (n_parts >= 4) ? user : NULL;
int64_t h = ray_ipc_connect(host, (uint16_t)port, us_ptr, pw_ptr);
if (h == -2) return ray_error("access", "server requires authentication");
if (h == -3) return ray_error("access", "authentication failed");
if (h < 0) return ray_error("io", "connection refused: %s:%d", host, port);
return make_i64(h);
}
ray_t* ray_hclose_fn(ray_t* x) {
if (!ray_is_atom(x) || (x->type != -RAY_I64 && x->type != -RAY_I32))
return ray_error("type", NULL);
int64_t h = (x->type == -RAY_I64) ? x->i64 : x->i32;
ray_ipc_close(h);
return RAY_NULL_OBJ;
}
ray_t* ray_hsend_fn(ray_t* handle, ray_t* msg) {
if (!ray_is_atom(handle) || (handle->type != -RAY_I64 && handle->type != -RAY_I32))
return ray_error("type", NULL);
int64_t h = (handle->type == -RAY_I64) ? handle->i64 : handle->i32;
if (ray_serde_size(msg) <= 0)
return ray_error("type", "message not serializable");
return ray_ipc_send(h, msg);
}