#include "ops/internal.h"
#include "ops/rowsel.h"
#include "mem/sys.h"
ray_profile_t g_ray_profile;
ray_t* materialize_mapcommon(ray_t* mc) {
ray_t** mc_ptrs = (ray_t**)ray_data(mc);
ray_t* kv = mc_ptrs[0];
ray_t* rc = mc_ptrs[1];
int64_t n_parts = kv->len;
int8_t kv_type = kv->type;
size_t esz = (size_t)ray_sym_elem_size(kv_type, kv->attrs);
const char* kdata = (const char*)ray_data(kv);
const int64_t* counts = (const int64_t*)ray_data(rc);
int64_t total = 0;
for (int64_t p = 0; p < n_parts; p++) total += counts[p];
ray_t* flat = ray_vec_new(kv_type, total);
if (!flat || RAY_IS_ERR(flat)) return ray_error("oom", NULL);
flat->len = total;
char* out = (char*)ray_data(flat);
int64_t off = 0;
for (int64_t p = 0; p < n_parts; p++) {
int64_t cnt = counts[p];
if (esz == 8) {
uint64_t v;
memcpy(&v, kdata + (size_t)p * 8, 8);
uint64_t* dst = (uint64_t*)(out + off * 8);
for (int64_t r = 0; r < cnt; r++) dst[r] = v;
} else if (esz == 4) {
uint32_t v;
memcpy(&v, kdata + (size_t)p * 4, 4);
uint32_t* dst = (uint32_t*)(out + off * 4);
for (int64_t r = 0; r < cnt; r++) dst[r] = v;
} else {
for (int64_t r = 0; r < cnt; r++)
memcpy(out + (off + r) * esz, kdata + (size_t)p * esz, esz);
}
off += cnt;
}
return flat;
}
ray_t* materialize_mapcommon_head(ray_t* mc, int64_t n) {
ray_t** mc_ptrs = (ray_t**)ray_data(mc);
ray_t* kv = mc_ptrs[0];
ray_t* rc = mc_ptrs[1];
int64_t n_parts = kv->len;
int8_t kv_type = kv->type;
size_t esz = (size_t)ray_sym_elem_size(kv_type, kv->attrs);
const char* kdata = (const char*)ray_data(kv);
const int64_t* counts = (const int64_t*)ray_data(rc);
ray_t* flat = ray_vec_new(kv_type, n);
if (!flat || RAY_IS_ERR(flat)) return ray_error("oom", NULL);
flat->len = n;
char* out = (char*)ray_data(flat);
int64_t off = 0;
for (int64_t p = 0; p < n_parts && off < n; p++) {
int64_t take = counts[p];
if (take > n - off) take = n - off;
if (esz == 8) {
uint64_t v;
memcpy(&v, kdata + (size_t)p * 8, 8);
uint64_t* dst = (uint64_t*)(out + off * 8);
for (int64_t r = 0; r < take; r++) dst[r] = v;
} else if (esz == 4) {
uint32_t v;
memcpy(&v, kdata + (size_t)p * 4, 4);
uint32_t* dst = (uint32_t*)(out + off * 4);
for (int64_t r = 0; r < take; r++) dst[r] = v;
} else {
for (int64_t r = 0; r < take; r++)
memcpy(out + (off + r) * esz, kdata + (size_t)p * esz, esz);
}
off += take;
}
return flat;
}
ray_t* materialize_mapcommon_filter(ray_t* mc, ray_t* pred, int64_t pass_count) {
ray_t** mc_ptrs = (ray_t**)ray_data(mc);
ray_t* kv = mc_ptrs[0];
ray_t* rc = mc_ptrs[1];
int64_t n_parts = kv->len;
int8_t kv_type = kv->type;
size_t esz = (size_t)ray_sym_elem_size(kv_type, kv->attrs);
const char* kdata = (const char*)ray_data(kv);
const int64_t* counts = (const int64_t*)ray_data(rc);
ray_t* flat = ray_vec_new(kv_type, pass_count);
if (!flat || RAY_IS_ERR(flat)) return ray_error("oom", NULL);
flat->len = pass_count;
char* out = (char*)ray_data(flat);
int64_t out_idx = 0;
int64_t row = 0;
int64_t part_idx = 0;
int64_t part_end = counts[0];
ray_morsel_t mp;
ray_morsel_init(&mp, pred);
while (ray_morsel_next(&mp)) {
const uint8_t* bits = (const uint8_t*)mp.morsel_ptr;
for (int64_t i = 0; i < mp.morsel_len; i++, row++) {
while (part_idx < n_parts - 1 && row >= part_end) {
part_idx++;
part_end += counts[part_idx];
}
if (bits[i])
memcpy(out + (size_t)out_idx++ * esz,
kdata + (size_t)part_idx * esz, esz);
}
}
return flat;
}
void multi_gather_fn(void* raw, uint32_t wid, int64_t start, int64_t end) {
(void)wid;
multi_gather_ctx_t* c = (multi_gather_ctx_t*)raw;
const int64_t* restrict idx = c->idx;
int64_t nc = c->ncols;
#define MG_BATCH 512
#define MG_PF 32
for (int64_t base = start; base < end; base += MG_BATCH) {
int64_t bstart = base;
int64_t bend = base + MG_BATCH;
if (bend > end) bend = end;
for (int64_t col = 0; col < nc; col++) {
uint8_t e = c->esz[col];
char* src = c->srcs[col];
char* dst = c->dsts[col];
if (e == 8) {
const uint64_t* restrict s8 = (const uint64_t*)src;
uint64_t* restrict d8 = (uint64_t*)dst;
for (int64_t i = bstart; i < bend; i++) {
if (i + MG_PF < bend)
__builtin_prefetch(&s8[idx[i + MG_PF]], 0, 0);
d8[i] = s8[idx[i]];
}
} else if (e == 4) {
const uint32_t* restrict s4 = (const uint32_t*)src;
uint32_t* restrict d4 = (uint32_t*)dst;
for (int64_t i = bstart; i < bend; i++) {
if (i + MG_PF < bend)
__builtin_prefetch(&s4[idx[i + MG_PF]], 0, 0);
d4[i] = s4[idx[i]];
}
} else {
for (int64_t i = bstart; i < bend; i++) {
if (i + MG_PF < bend)
__builtin_prefetch(src + idx[i + MG_PF] * e, 0, 0);
memcpy(dst + i * e, src + idx[i] * e, e);
}
}
}
}
#undef MG_PF
#undef MG_BATCH
}
void gather_fn(void* raw, uint32_t wid, int64_t start, int64_t end) {
(void)wid;
gather_ctx_t* c = (gather_ctx_t*)raw;
char* restrict src = (char*)ray_data(c->src_col);
char* restrict dst = (char*)ray_data(c->dst_col);
uint8_t esz = c->esz;
const int64_t* restrict idx = c->idx;
#define GATHER_PF 16
if (c->nullable) {
for (int64_t i = start; i < end; i++) {
if (i + GATHER_PF < end) {
int64_t pf = idx[i + GATHER_PF];
if (pf >= 0) __builtin_prefetch(src + pf * esz, 0, 0);
}
int64_t r = idx[i];
if (r >= 0)
memcpy(dst + i * esz, src + r * esz, esz);
else
memset(dst + i * esz, 0, esz);
}
} else {
for (int64_t i = start; i < end; i++) {
if (i + GATHER_PF < end)
__builtin_prefetch(src + idx[i + GATHER_PF] * esz, 0, 0);
memcpy(dst + i * esz, src + idx[i] * esz, esz);
}
}
#undef GATHER_PF
}
#define PG_BSHIFT 14
#define PG_BSIZE (1 << PG_BSHIFT)
#define PG_MIN (PG_BSIZE * 8)
typedef struct {
const int64_t* idx;
int64_t* hist;
int64_t n_parts;
int64_t n;
uint32_t n_tasks;
} pg_hist_ctx_t;
static void pg_hist_fn(void* arg, uint32_t wid, int64_t start, int64_t end) {
(void)wid; (void)end;
pg_hist_ctx_t* c = (pg_hist_ctx_t*)arg;
int64_t task = start;
int64_t chunk = (c->n + c->n_tasks - 1) / c->n_tasks;
int64_t lo = task * chunk;
int64_t hi = lo + chunk;
if (hi > c->n) hi = c->n;
if (lo >= hi) { memset(c->hist + task * c->n_parts, 0,
(size_t)c->n_parts * sizeof(int64_t)); return; }
int64_t* h = c->hist + task * c->n_parts;
memset(h, 0, (size_t)c->n_parts * sizeof(int64_t));
const int64_t* idx = c->idx;
for (int64_t i = lo; i < hi; i++)
h[idx[i] >> PG_BSHIFT]++;
}
typedef struct {
const int64_t* idx;
int32_t* rdest;
int32_t* rsrc;
int64_t* offsets;
int64_t n_parts;
int64_t n;
uint32_t n_tasks;
} pg_route_ctx_t;
static void pg_route_fn(void* arg, uint32_t wid, int64_t start, int64_t end) {
(void)wid; (void)end;
pg_route_ctx_t* c = (pg_route_ctx_t*)arg;
int64_t task = start;
int64_t chunk = (c->n + c->n_tasks - 1) / c->n_tasks;
int64_t lo = task * chunk;
int64_t hi = lo + chunk;
if (hi > c->n) hi = c->n;
if (lo >= hi) return;
int64_t* off = c->offsets + task * c->n_parts;
const int64_t* idx = c->idx;
int32_t* rd = c->rdest;
int32_t* rs = c->rsrc;
for (int64_t i = lo; i < hi; i++) {
int64_t src = idx[i];
int64_t pos = off[src >> PG_BSHIFT]++;
rd[pos] = (int32_t)i;
rs[pos] = (int32_t)src;
}
}
typedef struct {
const int32_t* rdest;
const int32_t* rsrc;
const int64_t* part_off;
char** srcs;
char** dsts;
const uint8_t* esz;
int64_t ncols;
} pg_block_ctx_t;
static void pg_block_fn(void* arg, uint32_t wid, int64_t start, int64_t end) {
(void)wid; (void)end;
pg_block_ctx_t* c = (pg_block_ctx_t*)arg;
int64_t blk = start;
int64_t lo = c->part_off[blk];
int64_t hi = c->part_off[blk + 1];
if (lo >= hi) return;
const int32_t* rd = c->rdest + lo;
const int32_t* rs = c->rsrc + lo;
int64_t cnt = hi - lo;
for (int64_t col = 0; col < c->ncols; col++) {
uint8_t e = c->esz[col];
const char* src = c->srcs[col];
char* dst = c->dsts[col];
if (e == 8) {
const uint64_t* s8 = (const uint64_t*)src;
uint64_t* d8 = (uint64_t*)dst;
for (int64_t j = 0; j < cnt; j++)
d8[rd[j]] = s8[rs[j]];
} else if (e == 4) {
const uint32_t* s4 = (const uint32_t*)src;
uint32_t* d4 = (uint32_t*)dst;
for (int64_t j = 0; j < cnt; j++)
d4[rd[j]] = s4[rs[j]];
} else if (e == 2) {
const uint16_t* s2 = (const uint16_t*)src;
uint16_t* d2 = (uint16_t*)dst;
for (int64_t j = 0; j < cnt; j++)
d2[rd[j]] = s2[rs[j]];
} else if (e == 1) {
for (int64_t j = 0; j < cnt; j++)
dst[rd[j]] = src[rs[j]];
} else {
for (int64_t j = 0; j < cnt; j++)
memcpy(dst + (int64_t)rd[j] * e,
src + (int64_t)rs[j] * e, e);
}
}
}
void partitioned_gather(ray_pool_t* pool, const int64_t* idx, int64_t n,
int64_t src_rows, char** srcs, char** dsts,
const uint8_t* esz, int64_t ncols) {
if (!pool || n < PG_MIN || n > INT32_MAX || src_rows > INT32_MAX) {
multi_gather_ctx_t mg = { .idx = idx, .ncols = 0 };
for (int64_t c = 0; c < ncols && c < MGATHER_MAX_COLS; c++) {
mg.srcs[c] = srcs[c]; mg.dsts[c] = dsts[c]; mg.esz[c] = esz[c];
mg.ncols++;
}
if (pool) ray_pool_dispatch(pool, multi_gather_fn, &mg, n);
else multi_gather_fn(&mg, 0, 0, n);
return;
}
int64_t n_parts = (src_rows + PG_BSIZE - 1) >> PG_BSHIFT;
uint32_t nw = ray_pool_total_workers(pool);
ray_t *hist_hdr = NULL, *off_hdr = NULL;
ray_t *rdest_hdr = NULL, *rsrc_hdr = NULL, *poff_hdr = NULL;
int64_t* hist = (int64_t*)scratch_alloc(&hist_hdr,
(size_t)nw * (size_t)n_parts * sizeof(int64_t));
int64_t* offsets = (int64_t*)scratch_alloc(&off_hdr,
(size_t)nw * (size_t)n_parts * sizeof(int64_t));
int32_t* rdest = (int32_t*)scratch_alloc(&rdest_hdr,
(size_t)n * sizeof(int32_t));
int32_t* rsrc = (int32_t*)scratch_alloc(&rsrc_hdr,
(size_t)n * sizeof(int32_t));
int64_t* part_off = (int64_t*)scratch_alloc(&poff_hdr,
(size_t)(n_parts + 1) * sizeof(int64_t));
if (!hist || !offsets || !rdest || !rsrc || !part_off) {
scratch_free(hist_hdr); scratch_free(off_hdr);
scratch_free(rdest_hdr); scratch_free(rsrc_hdr);
scratch_free(poff_hdr);
multi_gather_ctx_t mg = { .idx = idx, .ncols = 0 };
for (int64_t c = 0; c < ncols && c < MGATHER_MAX_COLS; c++) {
mg.srcs[c] = srcs[c]; mg.dsts[c] = dsts[c]; mg.esz[c] = esz[c];
mg.ncols++;
}
ray_pool_dispatch(pool, multi_gather_fn, &mg, n);
return;
}
pg_hist_ctx_t hctx = {
.idx = idx, .hist = hist, .n_parts = n_parts,
.n = n, .n_tasks = nw,
};
ray_pool_dispatch_n(pool, pg_hist_fn, &hctx, nw);
int64_t running = 0;
for (int64_t p = 0; p < n_parts; p++) {
part_off[p] = running;
for (uint32_t t = 0; t < nw; t++) {
offsets[t * n_parts + p] = running;
running += hist[t * n_parts + p];
}
}
part_off[n_parts] = running;
pg_route_ctx_t rctx = {
.idx = idx, .rdest = rdest, .rsrc = rsrc,
.offsets = offsets, .n_parts = n_parts,
.n = n, .n_tasks = nw,
};
ray_pool_dispatch_n(pool, pg_route_fn, &rctx, nw);
pg_block_ctx_t bctx = {
.rdest = rdest, .rsrc = rsrc, .part_off = part_off,
.srcs = srcs, .dsts = dsts, .esz = esz, .ncols = ncols,
};
ray_pool_dispatch_n(pool, pg_block_fn, &bctx, (uint32_t)n_parts);
scratch_free(hist_hdr);
scratch_free(off_hdr);
scratch_free(rdest_hdr);
scratch_free(rsrc_hdr);
scratch_free(poff_hdr);
}
static ray_t* exec_node_inner(ray_graph_t* g, ray_op_t* op);
ray_t* broadcast_scalar(ray_t* atom, int64_t nrows) {
if (!atom) return ray_error("domain", NULL);
if (nrows <= 0) {
int8_t at = atom->type;
int8_t vt;
if (at == -RAY_STR) vt = RAY_STR;
else if (at == -RAY_I64) vt = RAY_I64;
else if (at == -RAY_F64) vt = RAY_F64;
else if (at == -RAY_BOOL) vt = RAY_BOOL;
else if (at == -RAY_SYM) vt = RAY_SYM;
else return ray_error("type", NULL);
return ray_vec_new(vt, 0);
}
int8_t at = atom->type;
if (at == -RAY_STR) {
const char* sp = ray_str_ptr(atom);
size_t sl = ray_str_len(atom);
ray_t* vec = ray_vec_new(RAY_STR, nrows);
if (!vec || RAY_IS_ERR(vec)) return vec;
for (int64_t r = 0; r < nrows; r++) {
vec = ray_str_vec_append(vec, sp, sl);
if (RAY_IS_ERR(vec)) return vec;
}
return vec;
}
int8_t vt;
if (at == -RAY_I64) vt = RAY_I64;
else if (at == -RAY_F64) vt = RAY_F64;
else if (at == -RAY_BOOL) vt = RAY_BOOL;
else if (at == -RAY_SYM) vt = RAY_SYM;
else return ray_error("type", NULL);
size_t esz = (vt == RAY_BOOL) ? 1 : 8;
ray_t* vec = ray_vec_new(vt, nrows);
if (!vec || RAY_IS_ERR(vec)) return vec;
uint8_t elem[8] = {0};
memcpy(elem, &atom->i64, esz);
for (int64_t r = 0; r < nrows; r++) {
vec = ray_vec_append(vec, elem);
if (RAY_IS_ERR(vec)) return vec;
}
if (RAY_ATOM_IS_NULL(atom)) {
for (int64_t r = 0; r < nrows; r++)
ray_vec_set_null(vec, r, true);
}
return vec;
}
typedef struct {
ray_t* col;
const double* svf;
const int64_t* svi;
int64_t sv_len;
uint8_t* ob;
int8_t ct;
bool col_has_nulls;
bool col_atom_null;
bool col_is_atom;
bool use_double;
bool negate;
} in_worker_ctx_t;
static void exec_in_worker(void* vctx, uint32_t worker_id,
int64_t start, int64_t end) {
(void)worker_id;
in_worker_ctx_t* c = (in_worker_ctx_t*)vctx;
ray_t* col = c->col;
const void* cd = c->col_is_atom ? NULL : ray_data(col);
int8_t ct = c->ct;
uint8_t cattrs = c->col_is_atom ? 0 : col->attrs;
uint8_t* ob = c->ob;
int64_t sv_len = c->sv_len;
int negate = c->negate ? 1 : 0;
#define IN_READ_I64(dst, idx) do { \
switch (ct) { \
case RAY_BOOL: case RAY_U8: (dst) = ((const uint8_t*)cd)[idx]; break; \
case RAY_I16: (dst) = ((const int16_t*)cd)[idx]; break; \
case RAY_I32: case RAY_DATE: case RAY_TIME: \
(dst) = ((const int32_t*)cd)[idx]; break; \
case RAY_I64: case RAY_TIMESTAMP: \
(dst) = ((const int64_t*)cd)[idx]; break; \
case RAY_SYM: (dst) = ray_read_sym(cd, (idx), ct, cattrs); break; \
default: (dst) = 0; break; \
} \
} while (0)
#define IN_READ_F64(dst, idx) do { \
switch (ct) { \
case RAY_BOOL: case RAY_U8: (dst) = (double)((const uint8_t*)cd)[idx]; break; \
case RAY_I16: (dst) = (double)((const int16_t*)cd)[idx]; break; \
case RAY_I32: case RAY_DATE: case RAY_TIME: \
(dst) = (double)((const int32_t*)cd)[idx]; break; \
case RAY_I64: case RAY_TIMESTAMP: \
(dst) = (double)((const int64_t*)cd)[idx]; break; \
case RAY_F32: (dst) = (double)((const float*)cd)[idx]; break; \
case RAY_F64: (dst) = ((const double*)cd)[idx]; break; \
default: (dst) = 0.0; break; \
} \
} while (0)
if (c->use_double) {
const double* svf = c->svf;
for (int64_t i = start; i < end; i++) {
bool row_null = c->col_atom_null ||
(c->col_has_nulls && !c->col_is_atom &&
ray_vec_is_null(col, i));
if (row_null) { ob[i] = 0; continue; }
double cv;
if (c->col_is_atom) cv = (ct == RAY_F64) ? col->f64 : (double)col->i64;
else IN_READ_F64(cv, i);
int found = 0;
for (int64_t j = 0; j < sv_len; j++)
if (cv == svf[j]) { found = 1; break; }
ob[i] = (uint8_t)(found ^ negate);
}
} else {
const int64_t* svi = c->svi;
for (int64_t i = start; i < end; i++) {
bool row_null = c->col_atom_null ||
(c->col_has_nulls && !c->col_is_atom &&
ray_vec_is_null(col, i));
if (row_null) { ob[i] = 0; continue; }
int64_t cv;
if (c->col_is_atom) cv = col->i64;
else IN_READ_I64(cv, i);
int found = 0;
for (int64_t j = 0; j < sv_len; j++)
if (cv == svi[j]) { found = 1; break; }
ob[i] = (uint8_t)(found ^ negate);
}
}
#undef IN_READ_I64
#undef IN_READ_F64
}
static ray_t* exec_in(ray_graph_t* g, ray_op_t* op, ray_t* col, ray_t* set) {
(void)g;
bool negate = (op->opcode == OP_NOT_IN);
int64_t col_len = ray_is_atom(col) ? 1 : col->len;
int64_t set_len = ray_is_atom(set) ? 1 : set->len;
if (col_len == 0) {
ray_t* out = ray_vec_new(RAY_BOOL, 0);
if (!out || RAY_IS_ERR(out)) return out;
out->len = 0;
return out;
}
int8_t ct = ray_is_atom(col) ? (int8_t)(-col->type) : col->type;
int8_t st = ray_is_atom(set) ? (int8_t)(-set->type) : set->type;
if (RAY_IS_PARTED(ct)) ct = (int8_t)RAY_PARTED_BASETYPE(ct);
if (RAY_IS_PARTED(st)) st = (int8_t)RAY_PARTED_BASETYPE(st);
if (ct == RAY_STR || st == RAY_STR)
return ray_error("nyi", "OP_IN on RAY_STR not yet implemented");
#define CLASSIFY(t) \
((t) == RAY_SYM ? 2 : \
((t) == RAY_F32 || (t) == RAY_F64) ? 1 : 0)
int col_class = CLASSIFY(ct);
int set_class = CLASSIFY(st);
if ((col_class == 2) != (set_class == 2)) {
set_len = 0;
}
int use_double = (col_class == 1 || set_class == 1);
ray_t* out = ray_vec_new(RAY_BOOL, col_len);
if (!out || RAY_IS_ERR(out)) return out;
out->len = col_len;
uint8_t* ob = (uint8_t*)ray_data(out);
bool col_has_nulls = !ray_is_atom(col) && (col->attrs & RAY_ATTR_HAS_NULLS);
bool col_atom_null = ray_is_atom(col) && RAY_ATOM_IS_NULL(col);
bool set_has_nulls = !ray_is_atom(set) && (set->attrs & RAY_ATTR_HAS_NULLS);
#define READ_I64(dst, vec, type, idx) do { \
const void* _d = ray_data(vec); \
switch (type) { \
case RAY_BOOL: case RAY_U8: (dst) = ((const uint8_t*)_d)[idx]; break; \
case RAY_I16: (dst) = ((const int16_t*)_d)[idx]; break; \
case RAY_I32: case RAY_DATE: case RAY_TIME: \
(dst) = ((const int32_t*)_d)[idx]; break; \
case RAY_I64: case RAY_TIMESTAMP: \
(dst) = ((const int64_t*)_d)[idx]; break; \
case RAY_SYM: (dst) = ray_read_sym(_d, (idx), (type), \
(vec)->attrs); break; \
default: (dst) = 0; break; \
} \
} while (0)
#define READ_F64(dst, vec, type, idx) do { \
const void* _d = ray_data(vec); \
switch (type) { \
case RAY_BOOL: case RAY_U8: (dst) = (double)((const uint8_t*)_d)[idx]; break; \
case RAY_I16: (dst) = (double)((const int16_t*)_d)[idx]; break; \
case RAY_I32: case RAY_DATE: case RAY_TIME: \
(dst) = (double)((const int32_t*)_d)[idx]; break; \
case RAY_I64: case RAY_TIMESTAMP: \
(dst) = (double)((const int64_t*)_d)[idx]; break; \
case RAY_F32: (dst) = (double)((const float*)_d)[idx]; break; \
case RAY_F64: (dst) = ((const double*)_d)[idx]; break; \
default: (dst) = 0.0; break; \
} \
} while (0)
int64_t sv_len = 0;
double svf_stack[32];
int64_t svi_stack[32];
double* svf = svf_stack;
int64_t* svi = svi_stack;
ray_t* sv_hdr = NULL;
if (set_len > 32) {
size_t bytes = (size_t)set_len * (use_double ? sizeof(double) : sizeof(int64_t));
sv_hdr = ray_alloc(bytes);
if (!sv_hdr) { ray_release(out); return ray_error("oom", NULL); }
if (use_double) svf = (double*)ray_data(sv_hdr);
else svi = (int64_t*)ray_data(sv_hdr);
}
if (use_double) {
if (set_len > 0 && ray_is_atom(set)) {
if (!RAY_ATOM_IS_NULL(set)) {
svf[0] = (st == RAY_F64) ? set->f64 : (double)set->i64;
sv_len = 1;
}
} else if (set_len > 0) {
for (int64_t i = 0; i < set_len; i++) {
if (set_has_nulls && ray_vec_is_null(set, i)) continue;
READ_F64(svf[sv_len], set, st, i);
sv_len++;
}
}
} else {
if (set_len > 0 && ray_is_atom(set)) {
if (!RAY_ATOM_IS_NULL(set)) { svi[0] = set->i64; sv_len = 1; }
} else if (set_len > 0) {
for (int64_t i = 0; i < set_len; i++) {
if (set_has_nulls && ray_vec_is_null(set, i)) continue;
READ_I64(svi[sv_len], set, st, i);
sv_len++;
}
}
}
in_worker_ctx_t in_ctx = {
.col = col,
.svf = svf, .svi = svi, .sv_len = sv_len,
.ob = ob, .ct = ct,
.col_has_nulls = col_has_nulls,
.col_atom_null = col_atom_null,
.col_is_atom = ray_is_atom(col),
.use_double = use_double,
.negate = negate,
};
ray_pool_t* pool = ray_pool_get();
if (pool && col_len >= RAY_PARALLEL_THRESHOLD && !ray_is_atom(col))
ray_pool_dispatch(pool, exec_in_worker, &in_ctx, col_len);
else
exec_in_worker(&in_ctx, 0, 0, col_len);
if (sv_hdr) ray_free(sv_hdr);
#undef READ_I64
#undef READ_F64
#undef CLASSIFY
return out;
}
static inline bool op_is_heavy(uint16_t opc) {
return opc == OP_FILTER || opc == OP_SORT || opc == OP_GROUP ||
opc == OP_JOIN || opc == OP_WINDOW_JOIN || opc == OP_SELECT ||
opc == OP_HEAD || opc == OP_TAIL || opc == OP_WINDOW ||
opc == OP_PIVOT ||
(opc >= OP_EXPAND && opc <= OP_KNN_RERANK);
}
ray_t* exec_node(ray_graph_t* g, ray_op_t* op) {
if (!op) return ray_error("nyi", NULL);
if (ray_interrupted()) return ray_error("cancel", "interrupted");
bool heavy = op_is_heavy(op->opcode);
bool profiling = g_ray_profile.active && heavy;
const char* oname = NULL;
if (heavy) {
oname = ray_opcode_name(op->opcode);
ray_progress_label(oname, NULL);
if (profiling) ray_profile_span_start(oname);
}
ray_t* _prof_result = exec_node_inner(g, op);
if (profiling)
ray_profile_span_end(oname);
return _prof_result;
}
static ray_t* exec_node_inner(ray_graph_t* g, ray_op_t* op) {
if (!op) return ray_error("nyi", NULL);
switch (op->opcode) {
case OP_SCAN: {
ray_op_ext_t* ext = find_ext(g, op->id);
if (!ext) return ray_error("nyi", NULL);
uint16_t stored_table_id = 0;
memcpy(&stored_table_id, ext->base.pad, sizeof(uint16_t));
ray_t* scan_tbl;
if (stored_table_id > 0 && g->tables && (stored_table_id - 1) < g->n_tables) {
scan_tbl = g->tables[stored_table_id - 1];
} else {
scan_tbl = g->table;
}
if (!scan_tbl) return ray_error("schema", NULL);
ray_t* col = ray_table_get_col(scan_tbl, ext->sym);
if (!col) return ray_error("schema", NULL);
if (col->type == RAY_MAPCOMMON)
return materialize_mapcommon(col);
if (RAY_IS_PARTED(col->type)) {
int8_t base = (int8_t)RAY_PARTED_BASETYPE(col->type);
ray_t** sps = (ray_t**)ray_data(col);
int64_t total = ray_parted_nrows(col);
if (base == RAY_STR)
return parted_flatten_str(sps, col->len, total);
uint8_t sba = (base == RAY_SYM)
? parted_first_attrs(sps, col->len) : 0;
ray_t* flat = typed_vec_new(base, sba, total);
if (!flat || RAY_IS_ERR(flat)) return ray_error("oom", NULL);
flat->len = total;
ray_t** segs = sps;
size_t esz = (size_t)ray_sym_elem_size(base, sba);
int64_t off = 0;
for (int64_t s = 0; s < col->len; s++) {
if (segs[s] && segs[s]->len > 0 &&
parted_seg_esz_ok(segs[s], base, (uint8_t)esz)) {
memcpy((char*)ray_data(flat) + off * esz,
ray_data(segs[s]), (size_t)segs[s]->len * esz);
off += segs[s]->len;
} else if (segs[s] && segs[s]->len > 0) {
memset((char*)ray_data(flat) + off * esz, 0,
(size_t)segs[s]->len * esz);
off += segs[s]->len;
}
}
return flat;
}
ray_retain(col);
return col;
}
case OP_CONST: {
ray_op_ext_t* ext = find_ext(g, op->id);
if (!ext || !ext->literal) return ray_error("nyi", NULL);
ray_retain(ext->literal);
return ext->literal;
}
case OP_IN: case OP_NOT_IN: {
ray_t* col = exec_node(g, op->inputs[0]);
if (!col || RAY_IS_ERR(col)) return col;
ray_t* set = exec_node(g, op->inputs[1]);
if (!set || RAY_IS_ERR(set)) { ray_release(col); return set; }
ray_t* result = exec_in(g, op, col, set);
ray_release(col);
ray_release(set);
return result;
}
case OP_NEG: case OP_ABS: case OP_NOT: case OP_SQRT:
case OP_LOG: case OP_EXP: case OP_CEIL: case OP_FLOOR: case OP_ROUND:
case OP_ISNULL: case OP_CAST:
case OP_ADD: case OP_SUB: case OP_MUL: case OP_DIV: case OP_MOD:
case OP_EQ: case OP_NE: case OP_LT: case OP_LE:
case OP_GT: case OP_GE: case OP_AND: case OP_OR:
case OP_MIN2: case OP_MAX2: {
if (g->table) {
int64_t nr = ray_table_nrows(g->table);
if (nr > 0) {
ray_expr_t ex;
if (expr_compile(g, g->table, op, &ex)) {
ray_t* vec = expr_eval_full(&ex, nr);
if (vec && !RAY_IS_ERR(vec)) return vec;
}
}
}
if (op->arity == 1) {
ray_t* input = exec_node(g, op->inputs[0]);
if (!input || RAY_IS_ERR(input)) return input;
ray_t* result = exec_elementwise_unary(g, op, input);
ray_release(input);
return result;
} else {
ray_t* lhs = exec_node(g, op->inputs[0]);
ray_t* rhs = exec_node(g, op->inputs[1]);
if (!lhs || RAY_IS_ERR(lhs)) { if (rhs && !RAY_IS_ERR(rhs)) ray_release(rhs); return lhs; }
if (!rhs || RAY_IS_ERR(rhs)) { ray_release(lhs); return rhs; }
ray_t* result = exec_elementwise_binary(g, op, lhs, rhs);
ray_release(lhs);
ray_release(rhs);
return result;
}
}
case OP_SUM: case OP_PROD: case OP_MIN: case OP_MAX:
case OP_COUNT: case OP_AVG: case OP_FIRST: case OP_LAST:
case OP_STDDEV: case OP_STDDEV_POP: case OP_VAR: case OP_VAR_POP: {
ray_t* input = exec_node(g, op->inputs[0]);
if (!input || RAY_IS_ERR(input)) return input;
bool own_input = (input != g->table);
if (g->selection && input->type == RAY_TABLE) {
ray_t* compacted = sel_compact(g, input, g->selection);
if (own_input) ray_release(input);
ray_release(g->selection);
g->selection = NULL;
input = compacted;
own_input = true;
}
ray_t* result = exec_reduction(g, op, input);
if (own_input) ray_release(input);
return result;
}
case OP_COUNT_DISTINCT: {
ray_t* input = exec_node(g, op->inputs[0]);
if (!input || RAY_IS_ERR(input)) return input;
ray_t* result = exec_count_distinct(g, op, input);
ray_release(input);
return result;
}
case OP_FILTER: {
ray_op_t* filter_child = op->inputs[0];
if (filter_child && filter_child->opcode == OP_GROUP) {
ray_t* group_result = exec_node(g, filter_child);
if (!group_result || RAY_IS_ERR(group_result))
return group_result;
ray_t* saved_table = g->table;
ray_t* saved_sel = g->selection;
g->table = group_result;
g->selection = NULL;
ray_t* pred = exec_node(g, op->inputs[1]);
g->table = saved_table;
g->selection = saved_sel;
if (!pred || RAY_IS_ERR(pred)) {
ray_release(group_result);
return pred;
}
ray_t* result = exec_filter(g, op, group_result, pred);
ray_release(pred);
ray_release(group_result);
return result;
}
ray_t* input = exec_node(g, op->inputs[0]);
ray_t* pred = exec_node(g, op->inputs[1]);
if (!input || RAY_IS_ERR(input)) { if (pred && !RAY_IS_ERR(pred)) ray_release(pred); return input; }
if (!pred || RAY_IS_ERR(pred)) { ray_release(input); return pred; }
if (pred->type == RAY_BOOL && input->type == RAY_TABLE) {
if (g->selection) {
ray_t* merged = ray_rowsel_refine(g->selection, pred);
ray_release(pred);
ray_release(g->selection);
g->selection = merged;
} else {
ray_t* new_sel = ray_rowsel_from_pred(pred);
ray_release(pred);
g->selection = new_sel;
}
return input;
}
ray_t* result = exec_filter(g, op, input, pred);
ray_release(input);
ray_release(pred);
return result;
}
case OP_SORT: {
ray_t* input = exec_node(g, op->inputs[0]);
if (!input || RAY_IS_ERR(input)) return input;
ray_t* tbl = (input->type == RAY_TABLE) ? input : g->table;
if (g->selection && tbl && !RAY_IS_ERR(tbl) && tbl->type == RAY_TABLE) {
ray_t* compacted = sel_compact(g, tbl, g->selection);
if (input != g->table) ray_release(input);
ray_release(g->selection);
g->selection = NULL;
input = compacted;
tbl = compacted;
}
ray_t* result = exec_sort(g, op, tbl, 0);
if (input != g->table) ray_release(input);
return result;
}
case OP_GROUP: {
ray_t* tbl = g->table;
ray_t* owned_tbl = NULL;
{
ray_op_ext_t* gext = find_ext(g, op->id);
if (gext && gext->n_keys == 1) {
ray_op_ext_t* kx = find_ext(g, gext->keys[0]->id);
int64_t src_sym = ray_sym_intern("_src", 4);
if (kx && kx->base.opcode == OP_SCAN && kx->sym == src_sym) {
for (uint32_t ei = 0; ei < g->ext_count; ei++) {
ray_op_ext_t* ex = g->ext_nodes[ei];
if (ex && ex->base.id < g->node_count
&& g->nodes[ex->base.id].opcode == OP_EXPAND
&& ex->graph.factorized) {
ray_op_t* expand_op = &g->nodes[ex->base.id];
ray_t* expand_result = exec_node(g, expand_op);
if (!expand_result || RAY_IS_ERR(expand_result))
return expand_result;
if (expand_result->type == RAY_TABLE) {
ray_t* saved = g->table;
g->table = expand_result;
ray_t* result = exec_group(g, op, expand_result, 0);
g->table = saved;
ray_release(expand_result);
return result;
}
ray_release(expand_result);
break;
}
}
}
}
}
ray_t* result = exec_group(g, op, tbl, 0);
if (owned_tbl) ray_release(owned_tbl);
if (g->selection) {
ray_release(g->selection);
g->selection = NULL;
}
return result;
}
case OP_PIVOT: {
ray_t* tbl = g->table;
ray_t* owned_tbl = NULL;
if (g->selection) {
ray_t* compacted = sel_compact(g, tbl, g->selection);
if (!compacted || RAY_IS_ERR(compacted)) return compacted;
ray_release(g->selection);
g->selection = NULL;
owned_tbl = compacted;
tbl = compacted;
}
ray_t* result = exec_pivot(g, op, tbl);
if (owned_tbl) ray_release(owned_tbl);
return result;
}
case OP_JOIN: {
ray_t* left = exec_node(g, op->inputs[0]);
ray_t* right = exec_node(g, op->inputs[1]);
if (!left || RAY_IS_ERR(left)) { if (right && !RAY_IS_ERR(right)) ray_release(right); return left; }
if (!right || RAY_IS_ERR(right)) { ray_release(left); return right; }
if (g->selection && left && !RAY_IS_ERR(left) && left->type == RAY_TABLE) {
ray_t* compacted = sel_compact(g, left, g->selection);
ray_release(left);
ray_release(g->selection);
g->selection = NULL;
left = compacted;
}
ray_t* result = exec_join(g, op, left, right);
ray_release(left);
ray_release(right);
return result;
}
case OP_ANTIJOIN: {
ray_t* left = exec_node(g, op->inputs[0]);
ray_t* right = exec_node(g, op->inputs[1]);
if (!left || RAY_IS_ERR(left)) { if (right && !RAY_IS_ERR(right)) ray_release(right); return left; }
if (!right || RAY_IS_ERR(right)) { ray_release(left); return right; }
if (g->selection && left && !RAY_IS_ERR(left) && left->type == RAY_TABLE) {
ray_t* compacted = sel_compact(g, left, g->selection);
ray_release(left);
ray_release(g->selection);
g->selection = NULL;
left = compacted;
}
ray_t* result = exec_antijoin(g, op, left, right);
ray_release(left);
ray_release(right);
return result;
}
case OP_WINDOW_JOIN: {
ray_t* left = exec_node(g, op->inputs[0]);
ray_t* right = exec_node(g, op->inputs[1]);
if (!left || RAY_IS_ERR(left)) { if (right && !RAY_IS_ERR(right)) ray_release(right); return left; }
if (!right || RAY_IS_ERR(right)) { ray_release(left); return right; }
if (g->selection && left && !RAY_IS_ERR(left) && left->type == RAY_TABLE) {
ray_t* compacted = sel_compact(g, left, g->selection);
ray_release(left);
ray_release(g->selection);
g->selection = NULL;
left = compacted;
}
ray_t* result = exec_window_join(g, op, left, right);
ray_release(left);
ray_release(right);
return result;
}
case OP_WINDOW: {
ray_t* input = exec_node(g, op->inputs[0]);
if (!input || RAY_IS_ERR(input)) return input;
ray_t* wdf = (input->type == RAY_TABLE) ? input : g->table;
if (g->selection && wdf && !RAY_IS_ERR(wdf) && wdf->type == RAY_TABLE) {
ray_t* compacted = sel_compact(g, wdf, g->selection);
if (input != g->table) ray_release(input);
ray_release(g->selection);
g->selection = NULL;
input = compacted;
wdf = compacted;
}
ray_t* result = exec_window(g, op, wdf);
if (input != g->table) ray_release(input);
return result;
}
case OP_HEAD: {
ray_op_ext_t* ext = find_ext(g, op->id);
int64_t n = ext ? ext->sym : 10;
ray_op_t* child_op = op->inputs[0];
if (child_op && child_op->opcode == OP_SORT) {
ray_t* sort_input = exec_node(g, child_op->inputs[0]);
if (!sort_input || RAY_IS_ERR(sort_input)) return sort_input;
ray_t* tbl = (sort_input->type == RAY_TABLE) ? sort_input : g->table;
if (g->selection && tbl && !RAY_IS_ERR(tbl) && tbl->type == RAY_TABLE) {
ray_t* compacted = sel_compact(g, tbl, g->selection);
if (sort_input != g->table) ray_release(sort_input);
ray_release(g->selection);
g->selection = NULL;
sort_input = compacted;
tbl = compacted;
}
ray_t* result = exec_sort(g, child_op, tbl, n);
if (sort_input != g->table) ray_release(sort_input);
return result;
}
ray_t* input;
if (child_op && child_op->opcode == OP_GROUP) {
ray_t* tbl = g->table;
if (!tbl || RAY_IS_ERR(tbl)) return tbl;
ray_t* owned_tbl = NULL;
if (g->selection && tbl->type == RAY_TABLE) {
int needs = 0;
int64_t nc = ray_table_ncols(tbl);
for (int64_t c = 0; c < nc; c++) {
ray_t* col = ray_table_get_col_idx(tbl, c);
if (col && !RAY_IS_PARTED(col->type)
&& col->type != RAY_MAPCOMMON) {
needs = 1; break;
}
}
if (needs) {
ray_t* compacted = sel_compact(g, tbl, g->selection);
if (!compacted || RAY_IS_ERR(compacted)) return compacted;
ray_release(g->selection);
g->selection = NULL;
owned_tbl = compacted;
tbl = compacted;
}
}
input = exec_group(g, child_op, tbl, n);
if (owned_tbl) ray_release(owned_tbl);
} else if (child_op && child_op->opcode == OP_FILTER) {
ray_t* filter_input = exec_node(g, child_op->inputs[0]);
if (!filter_input || RAY_IS_ERR(filter_input))
return filter_input;
ray_t* ftbl = (filter_input->type == RAY_TABLE)
? filter_input : g->table;
if (g->selection && ftbl && ftbl->type == RAY_TABLE) {
ray_t* compacted = sel_compact(g, ftbl, g->selection);
if (filter_input != g->table) ray_release(filter_input);
ray_release(g->selection);
g->selection = NULL;
filter_input = compacted;
ftbl = compacted;
}
ray_t* saved_table = g->table;
g->table = ftbl;
ray_t* pred = exec_node(g, child_op->inputs[1]);
g->table = saved_table;
if (!pred || RAY_IS_ERR(pred)) {
if (filter_input != saved_table)
ray_release(filter_input);
return pred;
}
ray_t* result = exec_filter_head(ftbl, pred, n);
ray_release(pred);
if (filter_input != saved_table)
ray_release(filter_input);
return result;
} else {
input = exec_node(g, op->inputs[0]);
}
if (!input || RAY_IS_ERR(input)) return input;
if (input->type == RAY_TABLE) {
int64_t ncols = ray_table_ncols(input);
int64_t nrows = ray_table_nrows(input);
if (n > nrows) n = nrows;
ray_t* result = ray_table_new(ncols);
for (int64_t c = 0; c < ncols; c++) {
ray_t* col = ray_table_get_col_idx(input, c);
int64_t name_id = ray_table_col_name(input, c);
if (!col) continue;
if (col->type == RAY_MAPCOMMON) {
ray_t* mc_head = materialize_mapcommon_head(col, n);
if (mc_head && !RAY_IS_ERR(mc_head)) {
result = ray_table_add_col(result, name_id, mc_head);
ray_release(mc_head);
}
continue;
}
if (RAY_IS_PARTED(col->type)) {
int8_t base = (int8_t)RAY_PARTED_BASETYPE(col->type);
ray_t** sp = (ray_t**)ray_data(col);
ray_t* head_vec;
if (base == RAY_STR) {
head_vec = parted_head_str(sp, col->len, n);
} else {
uint8_t ba = (base == RAY_SYM)
? parted_first_attrs(sp, col->len) : 0;
uint8_t esz = ray_sym_elem_size(base, ba);
head_vec = typed_vec_new(base, ba, n);
if (head_vec && !RAY_IS_ERR(head_vec)) {
head_vec->len = n;
ray_t** segs = (ray_t**)ray_data(col);
int64_t remaining = n;
int64_t dst_off = 0;
for (int64_t s = 0; s < col->len && remaining > 0; s++) {
if (!segs[s]) continue;
int64_t take = segs[s]->len;
if (take > remaining) take = remaining;
if (parted_seg_esz_ok(segs[s], base, esz)) {
memcpy((char*)ray_data(head_vec) + dst_off * esz,
ray_data(segs[s]), (size_t)take * esz);
} else {
memset((char*)ray_data(head_vec) + dst_off * esz,
0, (size_t)take * esz);
}
dst_off += take;
remaining -= take;
}
}
}
result = ray_table_add_col(result, name_id, head_vec);
ray_release(head_vec);
} else {
uint8_t esz = col_esz(col);
ray_t* head_vec = col_vec_new(col, n);
if (head_vec && !RAY_IS_ERR(head_vec)) {
head_vec->len = n;
memcpy(ray_data(head_vec), ray_data(col),
(size_t)n * esz);
col_propagate_nulls_range(head_vec, 0, col, 0, n);
}
result = ray_table_add_col(result, name_id, head_vec);
ray_release(head_vec);
}
}
ray_release(input);
return result;
}
if (n > input->len) n = input->len;
uint8_t esz = col_esz(input);
ray_t* result = col_vec_new(input, n);
if (result && !RAY_IS_ERR(result)) {
result->len = n;
memcpy(ray_data(result), ray_data(input), (size_t)n * esz);
col_propagate_nulls_range(result, 0, input, 0, n);
}
ray_release(input);
return result;
}
case OP_TAIL: {
ray_op_ext_t* ext = find_ext(g, op->id);
ray_t* input = exec_node(g, op->inputs[0]);
if (!input || RAY_IS_ERR(input)) return input;
int64_t n = ext ? ext->sym : 10;
if (input->type == RAY_TABLE) {
int64_t ncols = ray_table_ncols(input);
int64_t nrows = ray_table_nrows(input);
if (n > nrows) n = nrows;
int64_t skip = nrows - n;
ray_t* result = ray_table_new(ncols);
for (int64_t c = 0; c < ncols; c++) {
ray_t* col = ray_table_get_col_idx(input, c);
int64_t name_id = ray_table_col_name(input, c);
if (!col) continue;
if (col->type == RAY_MAPCOMMON) {
ray_t** mc_ptrs = (ray_t**)ray_data(col);
ray_t* kv = mc_ptrs[0];
ray_t* rc = mc_ptrs[1];
int64_t n_parts = kv->len;
size_t esz = (size_t)col_esz(kv);
const char* kdata = (const char*)ray_data(kv);
const int64_t* counts = (const int64_t*)ray_data(rc);
ray_t* flat = col_vec_new(kv, n);
if (flat && !RAY_IS_ERR(flat)) {
flat->len = n;
char* out = (char*)ray_data(flat);
int64_t remaining = n;
int64_t dst = n;
for (int64_t p = n_parts - 1; p >= 0 && remaining > 0; p--) {
int64_t take = counts[p];
if (take > remaining) take = remaining;
dst -= take;
for (int64_t r = 0; r < take; r++)
memcpy(out + (dst + r) * esz, kdata + (size_t)p * esz, esz);
remaining -= take;
}
}
result = ray_table_add_col(result, name_id, flat);
ray_release(flat);
continue;
}
if (RAY_IS_PARTED(col->type)) {
int8_t base = (int8_t)RAY_PARTED_BASETYPE(col->type);
ray_t** tsp = (ray_t**)ray_data(col);
ray_t* tail_vec;
if (base == RAY_STR) {
tail_vec = parted_tail_str(tsp, col->len, n);
} else {
uint8_t tba = (base == RAY_SYM)
? parted_first_attrs(tsp, col->len) : 0;
uint8_t esz = ray_sym_elem_size(base, tba);
tail_vec = typed_vec_new(base, tba, n);
if (tail_vec && !RAY_IS_ERR(tail_vec)) {
tail_vec->len = n;
ray_t** segs = (ray_t**)ray_data(col);
int64_t remaining = n;
int64_t dst = n;
for (int64_t s = col->len - 1; s >= 0 && remaining > 0; s--) {
if (!segs[s]) continue;
int64_t take = segs[s]->len;
if (take > remaining) take = remaining;
dst -= take;
if (parted_seg_esz_ok(segs[s], base, esz)) {
memcpy((char*)ray_data(tail_vec) + (size_t)dst * esz,
(char*)ray_data(segs[s]) + (size_t)(segs[s]->len - take) * esz,
(size_t)take * esz);
} else {
memset((char*)ray_data(tail_vec) + (size_t)dst * esz,
0, (size_t)take * esz);
}
remaining -= take;
}
}
}
result = ray_table_add_col(result, name_id, tail_vec);
ray_release(tail_vec);
} else {
uint8_t esz = col_esz(col);
ray_t* tail_vec = col_vec_new(col, n);
if (tail_vec && !RAY_IS_ERR(tail_vec)) {
tail_vec->len = n;
memcpy(ray_data(tail_vec),
(char*)ray_data(col) + (size_t)skip * esz,
(size_t)n * esz);
col_propagate_nulls_range(tail_vec, 0, col, skip, n);
}
result = ray_table_add_col(result, name_id, tail_vec);
ray_release(tail_vec);
}
}
ray_release(input);
return result;
}
if (n > input->len) n = input->len;
int64_t skip = input->len - n;
uint8_t esz = col_esz(input);
ray_t* result = col_vec_new(input, n);
if (result && !RAY_IS_ERR(result)) {
result->len = n;
memcpy(ray_data(result),
(char*)ray_data(input) + (size_t)skip * esz,
(size_t)n * esz);
col_propagate_nulls_range(result, 0, input, skip, n);
}
ray_release(input);
return result;
}
case OP_IF: {
return exec_if(g, op);
}
case OP_LIKE: {
return exec_like(g, op);
}
case OP_ILIKE: {
return exec_ilike(g, op);
}
case OP_UPPER: case OP_LOWER: case OP_TRIM: {
return exec_string_unary(g, op);
}
case OP_STRLEN: {
return exec_strlen(g, op);
}
case OP_SUBSTR: {
return exec_substr(g, op);
}
case OP_REPLACE: {
return exec_replace(g, op);
}
case OP_CONCAT: {
return exec_concat(g, op);
}
case OP_EXTRACT: {
return exec_extract(g, op);
}
case OP_DATE_TRUNC: {
return exec_date_trunc(g, op);
}
case OP_ALIAS: {
return exec_node(g, op->inputs[0]);
}
case OP_MATERIALIZE: {
return exec_node(g, op->inputs[0]);
}
case OP_SELECT: {
ray_t* input = exec_node(g, op->inputs[0]);
if (!input || RAY_IS_ERR(input)) return input;
if (input->type != RAY_TABLE) {
ray_release(input);
return ray_error("nyi", NULL);
}
ray_op_ext_t* ext = find_ext(g, op->id);
if (!ext) { ray_release(input); return ray_error("nyi", NULL); }
uint8_t n_cols = ext->sort.n_cols;
ray_op_t** columns = ext->sort.columns;
ray_t* result = ray_table_new(n_cols);
ray_t* saved_table = g->table;
g->table = input;
for (uint8_t c = 0; c < n_cols; c++) {
if (columns[c]->opcode == OP_SCAN) {
ray_op_ext_t* col_ext = find_ext(g, columns[c]->id);
if (!col_ext) continue;
int64_t name_id = col_ext->sym;
ray_t* src_col = ray_table_get_col(input, name_id);
if (src_col) {
ray_retain(src_col);
result = ray_table_add_col(result, name_id, src_col);
ray_release(src_col);
}
} else {
ray_t* vec = exec_node(g, columns[c]);
if (!vec || RAY_IS_ERR(vec)) {
ray_release(result);
g->table = saved_table;
ray_release(input);
return vec ? vec : ray_error("nyi", NULL);
}
if (vec->type < 0) {
int64_t nr = ray_table_nrows(input);
ray_t* col = broadcast_scalar(vec, nr);
ray_release(vec);
vec = col;
if (!vec || RAY_IS_ERR(vec)) {
ray_release(result);
g->table = saved_table;
ray_release(input);
return vec ? vec : ray_error("nyi", NULL);
}
}
char name_buf[16];
int n = 0;
name_buf[n++] = '_'; name_buf[n++] = 'e';
if (c >= 100) name_buf[n++] = '0' + (c / 100);
if (c >= 10) name_buf[n++] = '0' + ((c / 10) % 10);
name_buf[n++] = '0' + (c % 10);
int64_t name_id = ray_sym_intern(name_buf, (size_t)n);
result = ray_table_add_col(result, name_id, vec);
ray_release(vec);
}
}
g->table = saved_table;
ray_release(input);
return result;
}
case OP_EXPAND: {
ray_t* src = exec_node(g, op->inputs[0]);
if (!src || RAY_IS_ERR(src)) return src;
ray_t* result = exec_expand(g, op, src);
ray_release(src);
return result;
}
case OP_VAR_EXPAND: {
ray_t* start = exec_node(g, op->inputs[0]);
if (!start || RAY_IS_ERR(start)) return start;
ray_t* result = exec_var_expand(g, op, start);
ray_release(start);
return result;
}
case OP_SHORTEST_PATH: {
ray_t* src = exec_node(g, op->inputs[0]);
ray_t* dst = exec_node(g, op->inputs[1]);
if (!src || RAY_IS_ERR(src)) {
if (dst && !RAY_IS_ERR(dst)) ray_release(dst);
return src;
}
if (!dst || RAY_IS_ERR(dst)) { ray_release(src); return dst; }
ray_t* result = exec_shortest_path(g, op, src, dst);
ray_release(src);
ray_release(dst);
return result;
}
case OP_WCO_JOIN: {
return exec_wco_join(g, op);
}
case OP_PAGERANK: {
return exec_pagerank(g, op);
}
case OP_CONNECTED_COMP: {
return exec_connected_comp(g, op);
}
case OP_DIJKSTRA: {
ray_t* src = exec_node(g, op->inputs[0]);
if (!src || RAY_IS_ERR(src)) return src;
ray_t* dst = op->inputs[1] ? exec_node(g, op->inputs[1]) : NULL;
if (dst && RAY_IS_ERR(dst)) { ray_release(src); return dst; }
ray_t* result = exec_dijkstra(g, op, src, dst);
ray_release(src);
if (dst) ray_release(dst);
return result;
}
case OP_LOUVAIN: {
return exec_louvain(g, op);
}
case OP_DEGREE_CENT: {
return exec_degree_cent(g, op);
}
case OP_TOPSORT: {
return exec_topsort(g, op);
}
case OP_DFS: {
ray_t* src = exec_node(g, op->inputs[0]);
if (!src || RAY_IS_ERR(src)) return src;
ray_t* result = exec_dfs(g, op, src);
ray_release(src);
return result;
}
case OP_CLUSTER_COEFF: {
return exec_cluster_coeff(g, op);
}
case OP_BETWEENNESS: {
return exec_betweenness(g, op);
}
case OP_CLOSENESS: {
return exec_closeness(g, op);
}
case OP_MST: {
return exec_mst(g, op);
}
case OP_RANDOM_WALK: {
ray_t* src = exec_node(g, op->inputs[0]);
if (!src || RAY_IS_ERR(src)) return src;
ray_t* result = exec_random_walk(g, op, src);
ray_release(src);
return result;
}
case OP_ASTAR: {
ray_t* src = exec_node(g, op->inputs[0]);
if (!src || RAY_IS_ERR(src)) return src;
ray_t* dst = exec_node(g, op->inputs[1]);
if (!dst || RAY_IS_ERR(dst)) { ray_release(src); return dst; }
ray_t* result = exec_astar(g, op, src, dst);
ray_release(src); ray_release(dst);
return result;
}
case OP_K_SHORTEST: {
ray_t* src = exec_node(g, op->inputs[0]);
if (!src || RAY_IS_ERR(src)) return src;
ray_t* dst = exec_node(g, op->inputs[1]);
if (!dst || RAY_IS_ERR(dst)) { ray_release(src); return dst; }
ray_t* result = exec_k_shortest(g, op, src, dst);
ray_release(src); ray_release(dst);
return result;
}
case OP_ANN_RERANK: {
ray_t* src = exec_node(g, op->inputs[0]);
if (!src || RAY_IS_ERR(src)) return src;
ray_t* result = exec_ann_rerank(g, op, src);
ray_release(src);
return result;
}
case OP_KNN_RERANK: {
ray_t* src = exec_node(g, op->inputs[0]);
if (!src || RAY_IS_ERR(src)) return src;
ray_t* result = exec_knn_rerank(g, op, src);
ray_release(src);
return result;
}
default:
return ray_error("nyi", NULL);
}
}
static ray_t* ray_result_merge(ray_t* accum, ray_t* partial) {
if (!accum || RAY_IS_ERR(accum)) {
if (partial && !RAY_IS_ERR(partial)) ray_retain(partial);
return partial;
}
if (!partial || RAY_IS_ERR(partial)) {
ray_retain(accum);
return accum;
}
if (accum->type == RAY_TABLE && partial->type == RAY_TABLE) {
int64_t ncols = ray_table_ncols(accum);
ray_t* merged = ray_table_new(ncols);
for (int64_t c = 0; c < ncols; c++) {
int64_t name_id = ray_table_col_name(accum, c);
ray_t* a_col = ray_table_get_col_idx(accum, c);
ray_t* p_col = ray_table_get_col_idx(partial, c);
if (!a_col || !p_col) {
ray_release(merged);
return ray_error("schema", NULL);
}
ray_t* combined = ray_vec_concat(a_col, p_col);
if (!combined || RAY_IS_ERR(combined)) {
ray_release(merged);
return combined;
}
merged = ray_table_add_col(merged, name_id, combined);
ray_release(combined);
}
return merged;
}
if (accum->type != RAY_TABLE && partial->type != RAY_TABLE) {
return ray_vec_concat(accum, partial);
}
return ray_error("type", NULL);
}
static ray_t* build_segment_table(ray_t* parted_tbl, int32_t seg_idx) {
int64_t ncols = ray_table_ncols(parted_tbl);
ray_t* seg_tbl = ray_table_new(ncols);
if (!seg_tbl || RAY_IS_ERR(seg_tbl)) return seg_tbl;
int64_t seg_rows = 0;
for (int64_t c = 0; c < ncols; c++) {
ray_t* col = ray_table_get_col_idx(parted_tbl, c);
if (col && RAY_IS_PARTED(col->type)) {
ray_t** segs = (ray_t**)ray_data(col);
if (seg_idx < col->len && segs[seg_idx])
seg_rows = segs[seg_idx]->len;
break;
}
}
for (int64_t c = 0; c < ncols; c++) {
int64_t name_id = ray_table_col_name(parted_tbl, c);
ray_t* col = ray_table_get_col_idx(parted_tbl, c);
if (!col) continue;
if (col->type == RAY_MAPCOMMON) {
if (col->len < 2) {
ray_release(seg_tbl);
return ray_error("schema", NULL);
}
ray_t** mc_ptrs = (ray_t**)ray_data(col);
ray_t* kv = mc_ptrs[0];
if (!kv || seg_idx >= kv->len) {
ray_release(seg_tbl);
return ray_error("schema", NULL);
}
int8_t kv_type = kv->type;
size_t esz = (size_t)ray_sym_elem_size(kv_type, kv->attrs);
if (esz == 0) {
ray_release(seg_tbl);
return ray_error("type", NULL);
}
ray_t* flat = ray_vec_new(kv_type, seg_rows);
if (!flat || RAY_IS_ERR(flat)) {
ray_release(seg_tbl);
return ray_error("oom", NULL);
}
flat->len = seg_rows;
const char* src = (const char*)ray_data(kv) + (size_t)seg_idx * esz;
char* dst = (char*)ray_data(flat);
if (esz == 8) {
uint64_t v; memcpy(&v, src, 8);
for (int64_t r = 0; r < seg_rows; r++)
((uint64_t*)dst)[r] = v;
} else if (esz == 4) {
uint32_t v; memcpy(&v, src, 4);
for (int64_t r = 0; r < seg_rows; r++)
((uint32_t*)dst)[r] = v;
} else {
for (int64_t r = 0; r < seg_rows; r++)
memcpy(dst + r * esz, src, esz);
}
seg_tbl = ray_table_add_col(seg_tbl, name_id, flat);
ray_release(flat);
} else if (RAY_IS_PARTED(col->type)) {
ray_t** segs = (ray_t**)ray_data(col);
if (seg_idx >= col->len || !segs[seg_idx]) {
ray_release(seg_tbl);
return ray_error("schema", NULL);
}
ray_retain(segs[seg_idx]);
seg_tbl = ray_table_add_col(seg_tbl, name_id, segs[seg_idx]);
ray_release(segs[seg_idx]);
} else {
ray_release(seg_tbl);
return ray_error("schema", NULL);
}
}
return seg_tbl;
}
static bool op_streamable(uint16_t opc) {
switch (opc) {
case OP_SCAN:
case OP_NEG: case OP_ABS: case OP_NOT: case OP_SQRT:
case OP_LOG: case OP_EXP: case OP_CEIL: case OP_FLOOR: case OP_ROUND:
case OP_ISNULL: case OP_CAST:
case OP_ADD: case OP_SUB: case OP_MUL: case OP_DIV: case OP_MOD:
case OP_EQ: case OP_NE: case OP_LT: case OP_LE:
case OP_GT: case OP_GE: case OP_AND: case OP_OR:
case OP_MIN2: case OP_MAX2: case OP_IF: case OP_IN: case OP_NOT_IN:
case OP_LIKE: case OP_ILIKE: case OP_UPPER: case OP_LOWER:
case OP_STRLEN: case OP_SUBSTR: case OP_REPLACE: case OP_TRIM:
case OP_CONCAT:
case OP_EXTRACT: case OP_DATE_TRUNC:
case OP_FILTER: case OP_SELECT: case OP_ALIAS:
case OP_MATERIALIZE:
return true;
default:
return false;
}
}
static bool subtree_has_default_scan(ray_graph_t* g, ray_op_t* op, bool* ok,
uint64_t* visited) {
if (!op || !*ok) return false;
uint32_t nid = op->id;
if (nid < g->node_count) {
if (visited[nid / 64] & (1ULL << (nid % 64))) return false;
visited[nid / 64] |= (1ULL << (nid % 64));
}
uint16_t opc = op->opcode;
if (opc == OP_CONST) {
ray_op_ext_t* ext = find_ext(g, op->id);
if (ext && ext->literal && !ray_is_atom(ext->literal))
*ok = false;
return false;
}
if (opc == OP_SCAN) {
ray_op_ext_t* ext = find_ext(g, op->id);
if (ext) {
uint16_t stored_id = 0;
memcpy(&stored_id, ext->base.pad, sizeof(uint16_t));
if (stored_id > 0) { *ok = false; return false; }
return true;
}
return false;
}
if (!op_streamable(opc)) { *ok = false; return false; }
bool found = false;
for (uint8_t i = 0; i < op->arity && i < 2; i++)
found |= subtree_has_default_scan(g, op->inputs[i], ok, visited);
if (opc == OP_SELECT) {
ray_op_ext_t* ext = find_ext(g, op->id);
if (ext) {
for (uint8_t c = 0; c < ext->sort.n_cols && *ok; c++)
found |= subtree_has_default_scan(g, ext->sort.columns[c], ok, visited);
}
} else if (opc == OP_IF || opc == OP_SUBSTR || opc == OP_REPLACE) {
ray_op_ext_t* ext = find_ext(g, op->id);
if (ext) {
uint32_t child_id = (uint32_t)(uintptr_t)ext->literal;
if (child_id < g->node_count)
found |= subtree_has_default_scan(g, &g->nodes[child_id], ok, visited);
}
} else if (opc == OP_CONCAT) {
ray_op_ext_t* ext = find_ext(g, op->id);
if (ext) {
int n_args = (int)ext->sym;
uint32_t* trail = (uint32_t*)((char*)(ext + 1));
for (int i = 2; i < n_args && *ok; i++) {
if (trail[i - 2] < g->node_count)
found |= subtree_has_default_scan(g, &g->nodes[trail[i - 2]], ok, visited);
}
}
}
return found;
}
static bool dag_can_stream(ray_graph_t* g, ray_op_t* root) {
uint32_t n_words = (g->node_count + 63) / 64;
uint64_t stack_buf[16];
ray_t* visited_hdr = NULL;
uint64_t* visited;
if (n_words <= 16) {
visited = stack_buf;
} else {
visited = (uint64_t*)scratch_alloc(&visited_hdr, n_words * 8);
if (!visited) return false;
}
memset(visited, 0, n_words * 8);
bool ok = true;
bool has_default_scan = subtree_has_default_scan(g, root, &ok, visited);
if (visited_hdr) scratch_free(visited_hdr);
return ok && has_default_scan;
}
static ray_t* ray_execute_inner(ray_graph_t* g, ray_op_t* root);
ray_t* ray_execute(ray_graph_t* g, ray_op_t* root) {
ray_t* r = ray_execute_inner(g, root);
ray_progress_end();
return r;
}
static ray_t* ray_execute_inner(ray_graph_t* g, ray_op_t* root) {
if (!g || !root) return ray_error("nyi", NULL);
ray_pool_t* pool = ray_pool_get();
if (pool)
atomic_store_explicit(&pool->cancelled, 0, memory_order_relaxed);
int32_t seg_count = 0;
if (g->table) {
bool has_flat = false;
for (int64_t c = 0; c < ray_table_ncols(g->table); c++) {
ray_t* col = ray_table_get_col_idx(g->table, c);
if (!col) continue;
if (RAY_IS_PARTED(col->type)) {
if (seg_count == 0)
seg_count = (int32_t)col->len;
else if ((int32_t)col->len != seg_count)
return ray_error("schema", NULL);
} else if (col->type != RAY_MAPCOMMON) {
has_flat = true;
}
}
if (has_flat)
seg_count = 0;
}
if (seg_count == 0 || !dag_can_stream(g, root)) {
ray_t* result = exec_node(g, root);
if (g->selection && result && !RAY_IS_ERR(result)
&& result->type == RAY_TABLE) {
ray_t* compacted = sel_compact(g, result, g->selection);
ray_release(result);
ray_release(g->selection);
g->selection = NULL;
result = compacted;
}
return result;
}
uint64_t* seg_mask = NULL;
int64_t seg_mask_count = 0;
for (uint32_t e = 0; e < g->ext_count; e++) {
if (g->ext_nodes[e] && g->ext_nodes[e]->seg_mask) {
seg_mask = g->ext_nodes[e]->seg_mask;
seg_mask_count = g->ext_nodes[e]->seg_mask_count;
break;
}
}
if (seg_mask && seg_mask_count != (int64_t)seg_count)
return ray_error("schema", NULL);
ray_t* saved_table = g->table;
ray_t* result = NULL;
for (int32_t s = 0; s < seg_count; s++) {
if (seg_mask && !(seg_mask[s / 64] & (1ULL << (s % 64))))
continue;
if (pool && atomic_load_explicit(&pool->cancelled, memory_order_relaxed)) {
g->table = saved_table;
if (g->selection) { ray_release(g->selection); g->selection = NULL; }
ray_release(result);
return ray_error("cancel", NULL);
}
ray_t* seg_tbl = build_segment_table(saved_table, s);
if (!seg_tbl || RAY_IS_ERR(seg_tbl)) {
g->table = saved_table;
if (g->selection) { ray_release(g->selection); g->selection = NULL; }
ray_release(result);
return seg_tbl;
}
g->table = seg_tbl;
if (g->selection) ray_release(g->selection);
g->selection = NULL;
ray_t* partial = exec_node(g, root);
if (g->selection && partial && !RAY_IS_ERR(partial)
&& partial->type == RAY_TABLE) {
ray_t* compacted = sel_compact(g, partial, g->selection);
ray_release(partial);
ray_release(g->selection);
g->selection = NULL;
partial = compacted;
}
g->table = saved_table;
ray_release(seg_tbl);
if (!partial || RAY_IS_ERR(partial)) {
if (g->selection) { ray_release(g->selection); g->selection = NULL; }
ray_release(result);
return partial;
}
ray_t* merged = ray_result_merge(result, partial);
ray_release(result);
ray_release(partial);
if (!merged || RAY_IS_ERR(merged)) {
if (g->selection) { ray_release(g->selection); g->selection = NULL; }
return merged;
}
result = merged;
}
if (g->selection) { ray_release(g->selection); g->selection = NULL; }
if (!result) {
int64_t ncols = ray_table_ncols(saved_table);
ray_t* empty_tbl = ray_table_new(ncols);
if (empty_tbl && !RAY_IS_ERR(empty_tbl)) {
for (int64_t c = 0; c < ncols; c++) {
int64_t name_id = ray_table_col_name(saved_table, c);
ray_t* col = ray_table_get_col_idx(saved_table, c);
if (!col) continue;
int8_t base = col->type;
if (col->type == RAY_MAPCOMMON) {
ray_t** mc = (ray_t**)ray_data(col);
base = mc[0] ? mc[0]->type : RAY_I64;
} else if (RAY_IS_PARTED(col->type)) {
base = (int8_t)RAY_PARTED_BASETYPE(col->type);
}
ray_t* ecol = ray_vec_new(base, 0);
if (!ecol || RAY_IS_ERR(ecol)) {
ecol = ray_alloc(0);
if (!ecol || RAY_IS_ERR(ecol)) continue;
ecol->type = base;
ecol->len = 0;
}
empty_tbl = ray_table_add_col(empty_tbl, name_id, ecol);
ray_release(ecol);
}
g->table = empty_tbl;
if (g->selection) ray_release(g->selection);
g->selection = NULL;
result = exec_node(g, root);
if (g->selection) {
ray_release(g->selection);
g->selection = NULL;
}
g->table = saved_table;
ray_release(empty_tbl);
}
}
if (!result) return ray_error("oom", NULL);
return result;
}