#include "linkop.h"
#include "idxop.h"
#include "ops/internal.h"
#include "ops/ops.h"
#include "mem/cow.h"
#include "vec/vec.h"
#include "table/table.h"
#include "table/sym.h"
#include "lang/eval.h"
#include "lang/env.h"
#include <string.h>
static ray_err_t promote_inline_to_ext(ray_t* vec) {
if (!(vec->attrs & RAY_ATTR_HAS_NULLS)) return RAY_OK;
if (vec->attrs & RAY_ATTR_NULLMAP_EXT) return RAY_OK;
int64_t bitmap_len = (vec->len + 7) / 8;
if (bitmap_len < 1) bitmap_len = 1;
ray_t* ext = ray_vec_new(RAY_U8, bitmap_len);
if (!ext || RAY_IS_ERR(ext)) return RAY_ERR_OOM;
ext->len = bitmap_len;
int64_t copy = bitmap_len < 16 ? bitmap_len : 16;
memcpy(ray_data(ext), vec->nullmap, (size_t)copy);
if (bitmap_len > 16) {
memset((char*)ray_data(ext) + 16, 0, (size_t)(bitmap_len - 16));
}
vec->ext_nullmap = ext;
vec->attrs |= RAY_ATTR_NULLMAP_EXT;
return RAY_OK;
}
ray_t* ray_link_attach(ray_t** vp, int64_t target_sym_id) {
if (!vp || !*vp || RAY_IS_ERR(*vp))
return ray_error("type", "link: null/error vector");
ray_t* v = *vp;
if (!ray_is_vec(v) || (v->type != RAY_I32 && v->type != RAY_I64))
return ray_error("type", "link: column must be RAY_I32 or RAY_I64 (got %d)",
(int)v->type);
if (v->attrs & RAY_ATTR_SLICE)
return ray_error("type", "link: cannot attach to a slice; materialize first");
if (target_sym_id < 0)
return ray_error("type", "link: invalid target sym ID");
ray_t* target = ray_env_get(target_sym_id);
if (!target || target->type != RAY_TABLE)
return ray_error("name", "link: target sym does not name a table");
int64_t tcols = ray_table_ncols(target);
for (int64_t c = 0; c < tcols; c++) {
ray_t* tcol = ray_table_get_col_idx(target, c);
if (tcol && RAY_IS_PARTED(tcol->type))
return ray_error("nyi",
"link: target table has a parted column (%d); "
"link targets must be non-parted (in-memory or splayed) tables",
(int)c);
}
v = ray_cow(v);
if (!v || RAY_IS_ERR(v)) return v;
*vp = v;
ray_err_t err = promote_inline_to_ext(v);
if (err != RAY_OK) return ray_error(ray_err_code_str(err), "link: oom");
v->link_target = target_sym_id;
v->attrs |= RAY_ATTR_HAS_LINK;
if (v->attrs & RAY_ATTR_HAS_INDEX) {
ray_index_t* ix = ray_index_payload(v->index);
memcpy(&ix->saved_nullmap[8], &target_sym_id, 8);
}
return v;
}
ray_t* ray_link_detach(ray_t** vp) {
if (!vp || !*vp || RAY_IS_ERR(*vp)) return *vp;
ray_t* v = *vp;
if (!(v->attrs & RAY_ATTR_HAS_LINK)) return v;
v = ray_cow(v);
if (!v || RAY_IS_ERR(v)) { *vp = v; return v; }
*vp = v;
v->link_target = 0;
v->attrs &= (uint8_t)~RAY_ATTR_HAS_LINK;
if (v->attrs & RAY_ATTR_HAS_INDEX) {
ray_index_t* ix = ray_index_payload(v->index);
memset(&ix->saved_nullmap[8], 0, 8);
}
return v;
}
ray_t* ray_link_deref(ray_t* v, int64_t sym_id) {
if (!ray_link_has(v)) return NULL;
if (v->type != RAY_I32 && v->type != RAY_I64) return NULL;
int64_t target_sym = (v->attrs & RAY_ATTR_SLICE)
? v->slice_parent->link_target
: v->link_target;
ray_t* target_tab = ray_env_get(target_sym);
if (!target_tab || target_tab->type != RAY_TABLE) return NULL;
int64_t tcols = ray_table_ncols(target_tab);
for (int64_t c = 0; c < tcols; c++) {
ray_t* tcol = ray_table_get_col_idx(target_tab, c);
if (tcol && RAY_IS_PARTED(tcol->type))
return ray_error("nyi",
"link deref: target table has a parted column (%d); "
"links to parted dim tables are not supported in v1",
(int)c);
}
ray_t* target_col = ray_table_get_col(target_tab, sym_id);
if (!target_col) return NULL;
int64_t n = v->len;
int64_t target_n = target_col->len;
int8_t out_type = target_col->type;
ray_t* col_owner = (target_col->attrs & RAY_ATTR_SLICE)
? target_col->slice_parent : target_col;
int64_t col_off = (target_col->attrs & RAY_ATTR_SLICE)
? target_col->slice_offset : 0;
uint8_t target_width = col_owner->attrs & RAY_SYM_W_MASK;
uint8_t target_esz = (out_type == RAY_SYM)
? (uint8_t)(1u << target_width)
: ray_sym_elem_size(out_type, col_owner->attrs);
ray_t* result;
if (out_type == RAY_SYM) {
result = ray_sym_vec_new(target_width, n);
} else {
result = ray_vec_new(out_type, n);
}
if (!result || RAY_IS_ERR(result)) return result;
result->len = n;
uint8_t out_esz = ray_sym_elem_size(out_type, result->attrs);
if (out_esz > 0) memset(ray_data(result), 0, (size_t)n * out_esz);
const uint8_t* link_base = (const uint8_t*)ray_data(v);
uint8_t link_esz = ray_sym_elem_size(v->type, v->attrs);
char* out_base = (char*)ray_data(result);
const char* col_data_base = (const char*)ray_data(col_owner);
const char* tgt_base = col_data_base + (size_t)col_off * target_esz;
for (int64_t i = 0; i < n; i++) {
if (ray_vec_is_null(v, i)) {
ray_vec_set_null(result, i, true);
continue;
}
int64_t rid;
if (link_esz == 4) {
int32_t r;
memcpy(&r, link_base + i * 4, 4);
rid = (int64_t)r;
} else {
memcpy(&rid, link_base + i * 8, 8);
}
if (rid < 0 || rid >= target_n) {
ray_vec_set_null(result, i, true);
continue;
}
if (ray_vec_is_null(target_col, rid)) {
ray_vec_set_null(result, i, true);
continue;
}
if (target_esz > 0 && out_esz == target_esz) {
memcpy(out_base + i * out_esz,
tgt_base + rid * target_esz,
target_esz);
}
}
if (out_type == RAY_STR) {
col_propagate_str_pool(result, target_col);
} else if (out_type == RAY_SYM) {
if (col_owner && !(col_owner->attrs & RAY_ATTR_SLICE) &&
(!(col_owner->attrs & RAY_ATTR_HAS_NULLS) ||
(col_owner->attrs & RAY_ATTR_NULLMAP_EXT)) &&
col_owner->sym_dict) {
ray_retain(col_owner->sym_dict);
result->sym_dict = col_owner->sym_dict;
}
}
return result;
}
ray_t* ray_col_link_fn(ray_t* target_sym, ray_t* int_vec) {
if (!target_sym || target_sym->type != -RAY_SYM)
return ray_error("type", "(.col.link target v): target must be a sym");
if (!int_vec || RAY_IS_ERR(int_vec))
return int_vec ? int_vec : ray_error("type", "(.col.link target v): null v");
int64_t target_id = target_sym->i64;
ray_t* w = int_vec;
ray_retain(w);
ray_t* r = ray_link_attach(&w, target_id);
if (RAY_IS_ERR(r)) { ray_release(w); return r; }
return w;
}
ray_t* ray_col_unlink_fn(ray_t* v) {
if (!v || RAY_IS_ERR(v)) return v;
ray_t* w = v;
ray_retain(w);
ray_t* r = ray_link_detach(&w);
if (RAY_IS_ERR(r)) { ray_release(w); return r; }
return w;
}
ray_t* ray_col_link_p_fn(ray_t* v) {
return ray_bool(ray_link_has(v) ? 1 : 0);
}
ray_t* ray_col_target_fn(ray_t* v) {
if (!ray_link_has(v)) return RAY_NULL_OBJ;
return ray_sym(ray_link_target_id(v));
}