#ifndef RAY_EXEC_INTERNAL_H
#define RAY_EXEC_INTERNAL_H
#if !defined(RAY_OS_WINDOWS) && !defined(_GNU_SOURCE)
#define _GNU_SOURCE
#endif
#include "exec.h"
#include "hash.h"
#include "core/pool.h"
#include "core/profile.h"
#include "store/csr.h"
#include "store/hnsw.h"
#include "lftj.h"
#include "mem/heap.h"
#include "table/sym.h"
#include "table/table.h"
#include "vec/str.h"
#include "vec/vec.h"
#include <string.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <float.h>
#include <ctype.h>
static inline uint8_t parted_first_attrs(ray_t** segs, int64_t n_segs) {
for (int64_t i = 0; i < n_segs; i++)
if (segs[i]) return segs[i]->attrs;
return 0;
}
static inline bool parted_seg_esz_ok(ray_t* seg, int8_t base, uint8_t expected_esz) {
if (!seg) return false;
if (base != RAY_SYM) return true;
return ray_sym_elem_size(base, seg->attrs) == expected_esz;
}
extern ray_profile_t g_ray_profile;
static inline void* scratch_calloc(ray_t** hdr_out, size_t nbytes) {
ray_t* h = ray_alloc(nbytes);
if (!h) { *hdr_out = NULL; return NULL; }
void* p = ray_data(h);
memset(p, 0, nbytes);
*hdr_out = h;
return p;
}
static inline void* scratch_alloc(ray_t** hdr_out, size_t nbytes) {
ray_t* h = ray_alloc(nbytes);
if (!h) { *hdr_out = NULL; return NULL; }
*hdr_out = h;
return ray_data(h);
}
static inline void* scratch_realloc(ray_t** hdr_out, size_t old_bytes, size_t new_bytes) {
ray_t* old_h = *hdr_out;
ray_t* new_h = ray_alloc(new_bytes);
if (!new_h) return NULL;
void* new_p = ray_data(new_h);
if (old_h) {
memcpy(new_p, ray_data(old_h), old_bytes < new_bytes ? old_bytes : new_bytes);
ray_free(old_h);
}
*hdr_out = new_h;
return new_p;
}
static inline void scratch_free(ray_t* hdr) {
if (!hdr) return;
ray_free(hdr);
}
static inline int64_t sym_intern_safe(const char* s, size_t len) {
int64_t id = ray_sym_intern(s, len);
return id >= 0 ? id : 0;
}
static inline int64_t read_col_i64(const void* data, int64_t row,
int8_t type, uint8_t attrs) {
switch (type) {
case RAY_I64: case RAY_TIMESTAMP:
return ((const int64_t*)data)[row];
case RAY_SYM:
switch (attrs & RAY_SYM_W_MASK) {
case RAY_SYM_W8: return (int64_t)((const uint8_t*)data)[row];
case RAY_SYM_W16: return (int64_t)((const uint16_t*)data)[row];
case RAY_SYM_W32: return (int64_t)((const uint32_t*)data)[row];
default: return ((const int64_t*)data)[row];
}
case RAY_I32: case RAY_DATE: case RAY_TIME:
return (int64_t)((const int32_t*)data)[row];
case RAY_I16:
return (int64_t)((const int16_t*)data)[row];
default:
return (int64_t)((const uint8_t*)data)[row];
}
}
static inline void write_col_i64(void* data, int64_t row, int64_t val,
int8_t type, uint8_t attrs) {
switch (type) {
case RAY_I64: case RAY_TIMESTAMP:
((int64_t*)data)[row] = val; return;
case RAY_SYM:
ray_write_sym(data, row, (uint64_t)val, type, attrs); return;
case RAY_I32: case RAY_DATE: case RAY_TIME:
((int32_t*)data)[row] = (int32_t)val; return;
case RAY_I16:
((int16_t*)data)[row] = (int16_t)val; return;
default:
((uint8_t*)data)[row] = (uint8_t)val; return;
}
}
static inline uint8_t col_esz(const ray_t* col) {
return ray_sym_elem_size(col->type, col->attrs);
}
static inline int64_t read_by_esz(const void* data, int64_t row, uint8_t esz) {
switch (esz) {
case 1: return (int64_t)((const uint8_t*)data)[row];
case 2: return (int64_t)((const uint16_t*)data)[row];
case 4: return (int64_t)((const uint32_t*)data)[row];
default: return ((const int64_t*)data)[row];
}
}
static inline ray_t* col_vec_new(const ray_t* src, int64_t cap) {
if (src->type == RAY_SYM)
return ray_sym_vec_new(src->attrs & RAY_SYM_W_MASK, cap);
return ray_vec_new(src->type, cap);
}
static inline void col_propagate_str_pool(ray_t* dst, const ray_t* src) {
if (src->type != RAY_STR || dst->type != RAY_STR) return;
const ray_t* owner = (src->attrs & RAY_ATTR_SLICE) ? src->slice_parent : src;
if (owner->str_pool) {
if (dst->str_pool) ray_release(dst->str_pool);
ray_retain(owner->str_pool);
dst->str_pool = owner->str_pool;
}
}
static inline void col_propagate_str_pool_parted(ray_t* dst, ray_t** segs, int64_t n_segs) {
if (dst->type != RAY_STR) return;
for (int64_t i = 0; i < n_segs; i++) {
if (segs[i] && segs[i]->type == RAY_STR && segs[i]->str_pool) {
col_propagate_str_pool(dst, segs[i]);
return;
}
}
}
static inline bool parted_str_single_pool(ray_t** segs, int64_t n_segs) {
ray_t* pool = NULL;
for (int64_t i = 0; i < n_segs; i++) {
if (!segs[i] || segs[i]->type != RAY_STR || !segs[i]->str_pool) continue;
if (!pool) pool = segs[i]->str_pool;
else if (segs[i]->str_pool != pool) return false;
}
return true;
}
static inline void col_propagate_nulls_gather(ray_t* dst, const ray_t* src,
const int64_t* indices,
int64_t count) {
bool src_has_nulls = (src->attrs & RAY_ATTR_HAS_NULLS) != 0;
for (int64_t r = 0; r < count; r++) {
if (indices[r] < 0 ||
(src_has_nulls && ray_vec_is_null((ray_t*)src, indices[r])))
ray_vec_set_null(dst, r, true);
}
}
static inline void col_propagate_nulls_range(ray_t* dst, int64_t dst_off,
const ray_t* src, int64_t src_off,
int64_t count) {
if (!(src->attrs & RAY_ATTR_HAS_NULLS)) return;
for (int64_t i = 0; i < count; i++) {
if (ray_vec_is_null((ray_t*)src, src_off + i))
ray_vec_set_null(dst, dst_off + i, true);
}
}
static inline void col_propagate_nulls_filter(ray_t* dst, const ray_t* src,
const uint8_t* mask,
int64_t src_len) {
if (!(src->attrs & RAY_ATTR_HAS_NULLS)) return;
int64_t out = 0;
for (int64_t i = 0; i < src_len; i++) {
if (mask[i]) {
if (ray_vec_is_null((ray_t*)src, i))
ray_vec_set_null(dst, out, true);
out++;
}
}
}
static inline ray_t* parted_str_append_elem(ray_t* out, ray_t* seg,
int64_t local_idx,
const char* pool_base) {
if ((seg->attrs & RAY_ATTR_HAS_NULLS) && ray_vec_is_null(seg, local_idx)) {
out = ray_str_vec_append(out, "", 0);
if (!RAY_IS_ERR(out))
ray_vec_set_null(out, out->len - 1, true);
} else {
ray_str_t* elems = (ray_str_t*)ray_data(seg);
const char* str = ray_str_t_ptr(&elems[local_idx], pool_base);
out = ray_str_vec_append(out, str, elems[local_idx].len);
}
return out;
}
static inline ray_t* parted_gather_str_rows(ray_t** segs, int64_t n_segs,
const int64_t* row_indices,
int64_t count) {
int64_t cumul = 0;
int64_t stack_ends[64];
int64_t* seg_ends = (n_segs <= 64) ? stack_ends : NULL;
ray_t* ends_hdr = NULL;
if (!seg_ends) {
seg_ends = (int64_t*)scratch_alloc(&ends_hdr, (size_t)n_segs * sizeof(int64_t));
if (!seg_ends) return ray_error("oom", NULL);
}
for (int64_t i = 0; i < n_segs; i++) {
cumul += (segs[i]) ? segs[i]->len : 0;
seg_ends[i] = cumul;
}
ray_t* out = ray_vec_new(RAY_STR, count);
if (!out || RAY_IS_ERR(out)) { if (ends_hdr) scratch_free(ends_hdr); return out; }
int64_t seg = 0;
for (int64_t i = 0; i < count; i++) {
int64_t row = row_indices[i];
while (seg < n_segs - 1 && row >= seg_ends[seg]) seg++;
if (!segs[seg]) {
out = ray_str_vec_append(out, "", 0);
if (!RAY_IS_ERR(out))
ray_vec_set_null(out, out->len - 1, true);
} else {
int64_t seg_start = (seg > 0) ? seg_ends[seg - 1] : 0;
int64_t local = row - seg_start;
const char* pool_base = segs[seg]->str_pool
? (const char*)ray_data(segs[seg]->str_pool) : NULL;
out = parted_str_append_elem(out, segs[seg], local, pool_base);
}
if (RAY_IS_ERR(out)) { if (ends_hdr) scratch_free(ends_hdr); return out; }
}
if (ends_hdr) scratch_free(ends_hdr);
return out;
}
static inline ray_t* parted_head_str(ray_t** segs, int64_t n_segs, int64_t n) {
ray_t* out = ray_vec_new(RAY_STR, n);
if (!out || RAY_IS_ERR(out)) return out;
int64_t remaining = n;
for (int64_t s = 0; s < n_segs && remaining > 0; s++) {
if (!segs[s]) continue;
int64_t seg_len = segs[s]->len;
int64_t take = (seg_len > remaining) ? remaining : seg_len;
const char* pool_base = segs[s]->str_pool
? (const char*)ray_data(segs[s]->str_pool) : NULL;
for (int64_t i = 0; i < take; i++) {
out = parted_str_append_elem(out, segs[s], i, pool_base);
if (RAY_IS_ERR(out)) return out;
}
remaining -= take;
}
return out;
}
static inline ray_t* parted_tail_str(ray_t** segs, int64_t n_segs, int64_t n) {
int64_t total = 0;
for (int64_t s = 0; s < n_segs; s++)
if (segs[s]) total += segs[s]->len;
int64_t skip = total - n;
if (skip < 0) { skip = 0; n = total; }
ray_t* out = ray_vec_new(RAY_STR, n);
if (!out || RAY_IS_ERR(out)) return out;
int64_t skipped = 0;
for (int64_t s = 0; s < n_segs; s++) {
if (!segs[s]) continue;
int64_t seg_len = segs[s]->len;
int64_t seg_start = 0;
if (skipped + seg_len <= skip) { skipped += seg_len; continue; }
if (skipped < skip) { seg_start = skip - skipped; skipped = skip; }
const char* pool_base = segs[s]->str_pool
? (const char*)ray_data(segs[s]->str_pool) : NULL;
for (int64_t i = seg_start; i < seg_len; i++) {
out = parted_str_append_elem(out, segs[s], i, pool_base);
if (RAY_IS_ERR(out)) return out;
}
skipped += seg_len;
}
return out;
}
static inline ray_t* parted_flatten_str(ray_t** segs, int64_t n_segs, int64_t total) {
ray_t* out = ray_vec_new(RAY_STR, total);
if (!out || RAY_IS_ERR(out)) return out;
for (int64_t s = 0; s < n_segs; s++) {
if (!segs[s] || segs[s]->len <= 0) continue;
const char* pool_base = segs[s]->str_pool
? (const char*)ray_data(segs[s]->str_pool) : NULL;
for (int64_t i = 0; i < segs[s]->len; i++) {
out = parted_str_append_elem(out, segs[s], i, pool_base);
if (RAY_IS_ERR(out)) return out;
}
}
return out;
}
static inline ray_t* typed_vec_new(int8_t type, uint8_t attrs, int64_t cap) {
if (type == RAY_SYM)
return ray_sym_vec_new(attrs & RAY_SYM_W_MASK, cap);
return ray_vec_new(type, cap);
}
static inline bool pool_cancelled(ray_pool_t* pool) {
if (RAY_UNLIKELY(ray_interrupted())) return true;
return pool && RAY_UNLIKELY(atomic_load_explicit(&pool->cancelled,
memory_order_relaxed));
}
#define CHECK_CANCEL(pool) \
do { if (pool_cancelled(pool)) \
return ray_error("cancel", NULL); } while(0)
#define CHECK_CANCEL_GOTO(pool, lbl) \
do { if (pool_cancelled(pool)) { \
result = ray_error("cancel", NULL); \
goto lbl; \
} \
} while(0)
static inline ray_op_ext_t* find_ext(ray_graph_t* g, uint32_t node_id) {
for (uint32_t i = 0; i < g->ext_count; i++) {
if (g->ext_nodes[i] && g->ext_nodes[i]->base.id == node_id)
return g->ext_nodes[i];
}
return NULL;
}
static inline void atom_to_str_t(ray_t* atom, ray_str_t* out, const char** out_pool) {
const char* sp;
size_t sl;
if (atom->type == -RAY_STR) {
sp = ray_str_ptr(atom);
sl = ray_str_len(atom);
} else if (atom->type == RAY_STR) {
if (atom->len < 1) {
memset(out, 0, sizeof(ray_str_t));
*out_pool = NULL;
return;
}
ray_t* src = atom;
int64_t idx = 0;
if (atom->attrs & RAY_ATTR_SLICE) {
src = atom->slice_parent;
idx = atom->slice_offset;
}
const ray_str_t* elems = (const ray_str_t*)ray_data(src);
*out = elems[idx];
*out_pool = src->str_pool ? (const char*)ray_data(src->str_pool) : NULL;
return;
} else if (RAY_IS_SYM(atom->type) && ray_is_atom(atom)) {
ray_t* s = ray_sym_str(atom->i64);
sp = s ? ray_str_ptr(s) : "";
sl = s ? ray_str_len(s) : 0;
} else {
sp = ""; sl = 0;
}
memset(out, 0, sizeof(ray_str_t));
out->len = (uint32_t)sl;
if (sl <= RAY_STR_INLINE_MAX) {
if (sl > 0) memcpy(out->data, sp, sl);
*out_pool = NULL;
} else {
memcpy(out->prefix, sp, 4);
out->pool_off = 0;
*out_pool = sp;
}
}
static inline void str_resolve(const ray_t* v, const ray_str_t** elems,
const char** pool) {
const ray_t* owner = (v->attrs & RAY_ATTR_SLICE) ? v->slice_parent : v;
int64_t base = (v->attrs & RAY_ATTR_SLICE) ? v->slice_offset : 0;
*elems = (const ray_str_t*)ray_data((ray_t*)owner) + base;
*pool = owner->str_pool ? (const char*)ray_data(owner->str_pool) : NULL;
}
static inline void sym_elem(const ray_t* input, int64_t i,
const char** out_str, size_t* out_len) {
int64_t sym_id = ray_read_sym(ray_data((ray_t*)input), i, input->type, input->attrs);
ray_t* atom = ray_sym_str(sym_id);
if (!atom) { *out_str = ""; *out_len = 0; return; }
*out_str = ray_str_ptr(atom);
*out_len = ray_str_len(atom);
}
typedef struct {
bool enabled;
double bias_f64;
int64_t bias_i64;
} agg_affine_t;
#define AGG_LINEAR_MAX_TERMS 8
typedef struct {
bool enabled;
uint8_t n_terms;
void* term_ptrs[AGG_LINEAR_MAX_TERMS];
int8_t term_types[AGG_LINEAR_MAX_TERMS];
int64_t coeff_i64[AGG_LINEAR_MAX_TERMS];
int64_t bias_i64;
} agg_linear_t;
typedef struct {
uint8_t n_terms;
int64_t syms[AGG_LINEAR_MAX_TERMS];
int64_t coeff_i64[AGG_LINEAR_MAX_TERMS];
int64_t bias_i64;
} linear_expr_i64_t;
#define EXPR_MAX_REGS 16
#define EXPR_MAX_INS 48
#define EXPR_MORSEL RAY_MORSEL_ELEMS
typedef struct {
uint8_t opcode;
uint8_t dst;
uint8_t src1;
uint8_t src2;
} expr_ins_t;
enum { REG_SCAN = 0, REG_CONST = 1, REG_SCRATCH = 2 };
typedef struct {
uint8_t n_ins;
uint8_t n_regs;
uint8_t n_scratch;
uint8_t out_reg;
int8_t out_type;
bool has_parted;
struct {
uint8_t kind;
int8_t type;
int8_t col_type;
uint8_t col_attrs;
bool is_parted;
const void* data;
ray_t* parted_col;
double const_f64;
int64_t const_i64;
} regs[EXPR_MAX_REGS];
expr_ins_t ins[EXPR_MAX_INS];
} ray_expr_t;
#define MGATHER_MAX_COLS 16
typedef struct {
const int64_t* idx;
char* srcs[MGATHER_MAX_COLS];
char* dsts[MGATHER_MAX_COLS];
uint8_t esz[MGATHER_MAX_COLS];
int64_t ncols;
} multi_gather_ctx_t;
typedef struct {
int64_t* idx;
ray_t* src_col;
ray_t* dst_col;
uint8_t esz;
bool nullable;
} gather_ctx_t;
#define RADIX_SORT_THRESHOLD 4096
#define SMALL_POOL_THRESHOLD 8192
#define NEARLY_SORTED_FRAC 0.05
#define MK_PRESCAN_MAX_KEYS 8
typedef struct {
ray_t** vecs;
uint8_t* desc;
uint8_t* nulls_first;
uint8_t n_sort;
} sort_cmp_ctx_t;
typedef struct {
const uint64_t* keys;
const int64_t* idx;
uint64_t* keys_out;
int64_t* idx_out;
int64_t n;
uint8_t shift;
uint32_t n_tasks;
uint32_t* hist;
int64_t* offsets;
} radix_pass_ctx_t;
typedef struct {
uint64_t* keys;
int64_t* indices;
const void* data;
ray_t* col;
int8_t type;
uint8_t col_attrs;
bool desc;
bool nulls_first;
const uint32_t* enum_rank;
uint8_t n_keys;
ray_t** vecs;
int64_t mins[16];
int64_t ranges[16];
uint8_t bit_shifts[16];
uint8_t descs[16];
const uint32_t* enum_ranks[16];
} radix_encode_ctx_t;
typedef struct {
ray_t* const* vecs;
uint32_t* const* enum_ranks;
uint8_t n_keys;
int64_t nrows;
uint32_t n_workers;
int64_t* pw_mins;
int64_t* pw_maxs;
} mk_prescan_ctx_t;
typedef struct {
const sort_cmp_ctx_t* cmp_ctx;
int64_t* indices;
int64_t* tmp;
int64_t nrows;
uint32_t n_chunks;
} sort_phase1_ctx_t;
typedef struct {
const sort_cmp_ctx_t* cmp_ctx;
const int64_t* src;
int64_t* dst;
int64_t nrows;
int64_t run_size;
} sort_merge_ctx_t;
static inline uint8_t radix_key_bytes(int8_t type) {
switch (type) {
case RAY_BOOL: case RAY_U8: return 1;
case RAY_I16: return 2;
case RAY_I32: case RAY_DATE: case RAY_TIME: return 4;
default: return 8;
}
}
void multi_gather_fn(void* raw, uint32_t wid, int64_t start, int64_t end);
void gather_fn(void* raw, uint32_t wid, int64_t start, int64_t end);
void partitioned_gather(ray_pool_t* pool, const int64_t* idx, int64_t n,
int64_t src_rows, char** srcs, char** dsts,
const uint8_t* esz, int64_t ncols);
ray_t* exec_filter(ray_graph_t* g, ray_op_t* op, ray_t* input, ray_t* pred);
ray_t* exec_filter_head(ray_t* input, ray_t* pred, int64_t limit);
ray_t* sel_compact(ray_graph_t* g, ray_t* tbl, ray_t* sel);
bool try_affine_sumavg_input(ray_graph_t* g, ray_t* tbl, ray_op_t* input_op,
ray_t** out_vec, agg_affine_t* out_affine);
bool try_linear_sumavg_input_i64(ray_graph_t* g, ray_t* tbl, ray_op_t* input_op,
agg_linear_t* out_plan);
bool expr_compile(ray_graph_t* g, ray_t* tbl, ray_op_t* root, ray_expr_t* out);
ray_t* expr_eval_full(const ray_expr_t* expr, int64_t nrows);
ray_t* exec_elementwise_unary(ray_graph_t* g, ray_op_t* op, ray_t* input);
ray_t* exec_elementwise_binary(ray_graph_t* g, ray_op_t* op, ray_t* lhs, ray_t* rhs);
int sort_cmp(const sort_cmp_ctx_t* ctx, int64_t a, int64_t b);
void sort_insertion(const sort_cmp_ctx_t* ctx, int64_t* arr, int64_t n);
void sort_merge_recursive(const sort_cmp_ctx_t* ctx,
int64_t* arr, int64_t* tmp, int64_t n);
void sort_phase1_fn(void* arg, uint32_t worker_id, int64_t start, int64_t end);
void sort_merge_fn(void* arg, uint32_t worker_id, int64_t start, int64_t end);
void key_introsort(uint64_t* keys, int64_t* idx, int64_t n);
double detect_sortedness(ray_pool_t* pool, const uint64_t* keys, int64_t n);
uint8_t compute_key_nbytes(ray_pool_t* pool, const uint64_t* keys,
int64_t n, uint8_t type_max);
int64_t* radix_sort_run(ray_pool_t* pool, uint64_t* keys, int64_t* indices,
uint64_t* keys_tmp, int64_t* idx_tmp,
int64_t n, uint8_t n_bytes,
uint64_t** sorted_keys_out);
uint64_t* packed_radix_sort_run(ray_pool_t* pool, uint64_t* data,
uint64_t* tmp, int64_t n, uint8_t n_bytes);
int64_t* msd_radix_sort_run(ray_pool_t* pool, uint64_t* keys, int64_t* indices,
uint64_t* keys_tmp, int64_t* idx_tmp,
int64_t n, uint8_t n_bytes,
uint64_t** sorted_keys_out);
void radix_encode_fn(void* arg, uint32_t wid, int64_t start, int64_t end);
void mk_prescan_fn(void* arg, uint32_t wid, int64_t start, int64_t end);
uint32_t* build_enum_rank(ray_t* col, int64_t nrows, ray_t** hdr_out);
ray_t* exec_sort(ray_graph_t* g, ray_op_t* op, ray_t* tbl, int64_t limit);
ray_t* exec_join(ray_graph_t* g, ray_op_t* op, ray_t* left_table, ray_t* right_table);
ray_t* exec_antijoin(ray_graph_t* g, ray_op_t* op,
ray_t* left_table, ray_t* right_table);
ray_t* exec_window_join(ray_graph_t* g, ray_op_t* op,
ray_t* left_table, ray_t* right_table);
ray_t* exec_reduction(ray_graph_t* g, ray_op_t* op, ray_t* input);
ray_t* exec_count_distinct(ray_graph_t* g, ray_op_t* op, ray_t* input);
ray_t* exec_group(ray_graph_t* g, ray_op_t* op, ray_t* tbl, int64_t group_limit);
#define GHT_NEED_SUM 0x01
#define GHT_NEED_MIN 0x02
#define GHT_NEED_MAX 0x04
#define GHT_NEED_SUMSQ 0x08
typedef struct {
uint16_t entry_stride;
uint16_t row_stride;
uint8_t n_keys;
uint8_t n_aggs;
uint8_t n_agg_vals;
uint8_t need_flags;
uint8_t agg_is_f64;
uint8_t agg_is_first;
uint8_t agg_is_last;
uint8_t agg_is_prod;
int8_t agg_val_slot[8];
uint16_t off_sum;
uint16_t off_min;
uint16_t off_max;
uint16_t off_sumsq;
uint16_t off_first_row;
uint16_t off_last_row;
uint8_t wide_key_mask;
uint8_t wide_key_esz[8];
} ght_layout_t;
typedef struct {
uint32_t* slots;
uint32_t ht_cap;
char* rows;
uint32_t grp_count;
uint32_t grp_cap;
ght_layout_t layout;
void* key_data[8];
ray_t* _h_slots;
ray_t* _h_rows;
uint8_t oom;
} group_ht_t;
#define HT_SALT(h) ((uint8_t)((h) >> 56))
#define HT_EMPTY UINT32_MAX
#define HT_PACK(salt, gid) (((uint32_t)(uint8_t)(salt) << 24) | ((gid) & 0xFFFFFF))
#define HT_GID(s) ((s) & 0xFFFFFF)
#define HT_SALT_V(s) ((uint8_t)((s) >> 24))
#define ROW_RD_F64(row, off, slot) (((const double*)((const void*)((row) + (off))))[(slot)])
#define ROW_RD_I64(row, off, slot) (((const int64_t*)((const void*)((row) + (off))))[(slot)])
#define ROW_WR_F64(row, off, slot) (((double*)((void*)((row) + (off))))[(slot)])
#define ROW_WR_I64(row, off, slot) (((int64_t*)((void*)((row) + (off))))[(slot)])
ght_layout_t ght_compute_layout(uint8_t n_keys, uint8_t n_aggs,
ray_t** agg_vecs, uint8_t need_flags,
const uint16_t* agg_ops,
const int8_t* key_types);
bool group_ht_init(group_ht_t* ht, uint32_t cap, const ght_layout_t* ly);
void group_ht_free(group_ht_t* ht);
void group_rows_range(group_ht_t* ht, void** key_data, int8_t* key_types,
uint8_t* key_attrs, ray_t** key_vecs, ray_t** agg_vecs,
int64_t start, int64_t end,
const int64_t* match_idx);
typedef struct {
group_ht_t* part_hts;
uint32_t* part_offsets;
uint32_t n_parts;
uint32_t total_grps;
uint16_t row_stride;
ray_t* _part_hts_hdr;
ray_t* _offsets_hdr;
void* _radix_bufs;
ray_t* _radix_bufs_hdr;
size_t _n_bufs;
} pivot_ingest_t;
bool pivot_ingest_run(pivot_ingest_t* out,
const ght_layout_t* ly,
void** key_data, int8_t* key_types, uint8_t* key_attrs,
ray_t** key_vecs, ray_t** agg_vecs,
int64_t n_scan);
void pivot_ingest_free(pivot_ingest_t* out);
ray_t* exec_window(ray_graph_t* g, ray_op_t* op, ray_t* tbl);
ray_t* exec_expand(ray_graph_t* g, ray_op_t* op, ray_t* src_vec);
ray_t* exec_var_expand(ray_graph_t* g, ray_op_t* op, ray_t* start_vec);
ray_t* exec_shortest_path(ray_graph_t* g, ray_op_t* op,
ray_t* src_val, ray_t* dst_val);
ray_t* exec_pagerank(ray_graph_t* g, ray_op_t* op);
ray_t* exec_connected_comp(ray_graph_t* g, ray_op_t* op);
ray_t* exec_dijkstra(ray_graph_t* g, ray_op_t* op,
ray_t* src_val, ray_t* dst_val);
ray_t* exec_wco_join(ray_graph_t* g, ray_op_t* op);
ray_t* exec_louvain(ray_graph_t* g, ray_op_t* op);
ray_t* exec_degree_cent(ray_graph_t* g, ray_op_t* op);
ray_t* exec_topsort(ray_graph_t* g, ray_op_t* op);
ray_t* exec_cluster_coeff(ray_graph_t* g, ray_op_t* op);
ray_t* exec_betweenness(ray_graph_t* g, ray_op_t* op);
ray_t* exec_closeness(ray_graph_t* g, ray_op_t* op);
ray_t* exec_mst(ray_graph_t* g, ray_op_t* op);
ray_t* exec_random_walk(ray_graph_t* g, ray_op_t* op, ray_t* src_val);
ray_t* exec_dfs(ray_graph_t* g, ray_op_t* op, ray_t* src_val);
ray_t* exec_astar(ray_graph_t* g, ray_op_t* op,
ray_t* src_val, ray_t* dst_val);
ray_t* exec_k_shortest(ray_graph_t* g, ray_op_t* op,
ray_t* src_val, ray_t* dst_val);
ray_t* exec_if(ray_graph_t* g, ray_op_t* op);
ray_t* exec_pivot(ray_graph_t* g, ray_op_t* op, ray_t* tbl);
ray_t* exec_ann_rerank(ray_graph_t* g, ray_op_t* op, ray_t* src);
ray_t* exec_knn_rerank(ray_graph_t* g, ray_op_t* op, ray_t* src);
ray_t* exec_extract(ray_graph_t* g, ray_op_t* op);
ray_t* exec_date_trunc(ray_graph_t* g, ray_op_t* op);
ray_t* exec_like(ray_graph_t* g, ray_op_t* op);
ray_t* exec_ilike(ray_graph_t* g, ray_op_t* op);
ray_t* exec_string_unary(ray_graph_t* g, ray_op_t* op);
ray_t* exec_strlen(ray_graph_t* g, ray_op_t* op);
ray_t* exec_substr(ray_graph_t* g, ray_op_t* op);
ray_t* exec_replace(ray_graph_t* g, ray_op_t* op);
ray_t* exec_concat(ray_graph_t* g, ray_op_t* op);
ray_t* materialize_mapcommon(ray_t* mc);
ray_t* materialize_mapcommon_head(ray_t* mc, int64_t n);
ray_t* materialize_mapcommon_filter(ray_t* mc, ray_t* pred, int64_t pass_count);
ray_t* broadcast_scalar(ray_t* atom, int64_t nrows);
ray_t* exec_node(ray_graph_t* g, ray_op_t* op);
static inline void par_set_null(ray_t* vec, int64_t idx) {
if (!(vec->attrs & RAY_ATTR_NULLMAP_EXT)) {
if (idx >= 128) {
ray_vec_set_null(vec, idx, true);
return;
}
int byte_idx = (int)(idx / 8);
int bit_idx = (int)(idx % 8);
__atomic_fetch_or(&vec->nullmap[byte_idx],
(uint8_t)(1u << bit_idx), __ATOMIC_RELAXED);
return;
}
ray_t* ext = vec->ext_nullmap;
uint8_t* bits = (uint8_t*)ray_data(ext);
int byte_idx = (int)(idx / 8);
int bit_idx = (int)(idx % 8);
__atomic_fetch_or(&bits[byte_idx],
(uint8_t)(1u << bit_idx), __ATOMIC_RELAXED);
}
static inline ray_err_t par_prepare_nullmap(ray_t* vec) {
if (vec->len <= 128) return RAY_OK;
ray_err_t err = ray_vec_set_null_checked(vec, 0, true);
if (err != RAY_OK) return err;
ray_vec_set_null_checked(vec, 0, false);
vec->attrs &= (uint8_t)~RAY_ATTR_HAS_NULLS;
return RAY_OK;
}
static inline void par_finalize_nulls(ray_t* vec) {
if (vec->attrs & RAY_ATTR_NULLMAP_EXT) {
ray_t* ext = vec->ext_nullmap;
uint8_t* bits = (uint8_t*)ray_data(ext);
int64_t nbytes = (vec->len + 7) / 8;
for (int64_t i = 0; i < nbytes; i++) {
if (bits[i]) { vec->attrs |= RAY_ATTR_HAS_NULLS; return; }
}
} else {
int64_t nbytes = (vec->len + 7) / 8;
if (nbytes > 16) nbytes = 16;
for (int64_t i = 0; i < nbytes; i++) {
if (vec->nullmap[i]) { vec->attrs |= RAY_ATTR_HAS_NULLS; return; }
}
}
}
#endif