#include "splay.h"
#include "store/col.h"
#include "store/fileio.h"
#include <string.h>
#include <stdio.h>
static ray_err_t validate_sym_columns(ray_t* tbl, int64_t schema_ncols) {
if (ray_sym_count() != 0) return RAY_OK;
int64_t nc = ray_table_ncols(tbl);
if (schema_ncols > 0 && nc == 0) return RAY_ERR_CORRUPT;
for (int64_t c = 0; c < nc; c++) {
ray_t* col = ray_table_get_col_idx(tbl, c);
if (col && col->type == RAY_SYM) return RAY_ERR_CORRUPT;
}
return RAY_OK;
}
ray_err_t ray_splay_save(ray_t* tbl, const char* dir, const char* sym_path) {
if (!tbl || RAY_IS_ERR(tbl)) return RAY_ERR_TYPE;
if (!dir) return RAY_ERR_IO;
ray_err_t mkdir_err = ray_mkdir_p(dir);
if (mkdir_err != RAY_OK) return mkdir_err;
if (sym_path) {
ray_err_t sym_err = ray_sym_save(sym_path);
if (sym_err != RAY_OK) return sym_err;
}
int64_t ncols = ray_table_ncols(tbl);
ray_t* schema = ray_table_schema(tbl);
if (schema) {
char path[1024];
int path_len = snprintf(path, sizeof(path), "%s/.d", dir);
if (path_len < 0 || (size_t)path_len >= sizeof(path)) return RAY_ERR_RANGE;
ray_err_t err = ray_col_save(schema, path);
if (err != RAY_OK) return err;
}
for (int64_t c = 0; c < ncols; c++) {
ray_t* col = ray_table_get_col_idx(tbl, c);
int64_t name_id = ray_table_col_name(tbl, c);
if (!col) continue;
ray_t* name_atom = ray_sym_str(name_id);
if (!name_atom) continue;
const char* name = ray_str_ptr(name_atom);
size_t name_len = ray_str_len(name_atom);
if (name_len == 0 || name[0] == '.' ||
memchr(name, '/', name_len) || memchr(name, '\\', name_len) ||
memchr(name, '\0', name_len))
continue;
char path[1024];
int path_len = snprintf(path, sizeof(path), "%s/%.*s", dir, (int)name_len, name);
if (path_len < 0 || (size_t)path_len >= sizeof(path)) return RAY_ERR_RANGE;
ray_err_t err = ray_col_save(col, path);
if (err != RAY_OK) return err;
}
return RAY_OK;
}
static ray_t* splay_load_impl(const char* dir, const char* sym_path, bool use_mmap) {
if (!dir) return ray_error("io", NULL);
if (sym_path) {
ray_err_t sym_err = ray_sym_load(sym_path);
if (sym_err != RAY_OK) return ray_error(ray_err_code_str(sym_err), NULL);
}
char path[1024];
int path_len = snprintf(path, sizeof(path), "%s/.d", dir);
if (path_len < 0 || (size_t)path_len >= sizeof(path))
return ray_error("range", NULL);
ray_t* schema = ray_col_load(path);
if (!schema || RAY_IS_ERR(schema)) return schema;
int64_t ncols = schema->len;
int64_t* name_ids = (int64_t*)ray_data(schema);
ray_t* tbl = ray_table_new(ncols);
if (!tbl || RAY_IS_ERR(tbl)) {
ray_release(schema);
return tbl;
}
for (int64_t c = 0; c < ncols; c++) {
int64_t name_id = name_ids[c];
ray_t* name_atom = ray_sym_str(name_id);
if (!name_atom) {
ray_release(schema);
ray_release(tbl);
return ray_error("corrupt", NULL);
}
const char* name = ray_str_ptr(name_atom);
size_t name_len = ray_str_len(name_atom);
if (name_len == 0 || name[0] == '.' ||
memchr(name, '/', name_len) || memchr(name, '\\', name_len) ||
memchr(name, '\0', name_len)) {
ray_release(schema);
ray_release(tbl);
return ray_error("corrupt", NULL);
}
path_len = snprintf(path, sizeof(path), "%s/%.*s", dir, (int)name_len, name);
if (path_len < 0 || (size_t)path_len >= sizeof(path)) {
ray_release(schema);
ray_release(tbl);
return ray_error("range", NULL);
}
ray_t* col = use_mmap ? ray_col_mmap(path) : ray_col_load(path);
if (use_mmap && col && RAY_IS_ERR(col) &&
strcmp(ray_err_code(col), "nyi") == 0) {
ray_error_free(col);
col = ray_col_load(path);
}
if (!col || RAY_IS_ERR(col)) {
ray_release(schema);
ray_release(tbl);
return col ? col : ray_error("io", NULL);
}
ray_t* new_df = ray_table_add_col(tbl, name_id, col);
if (!new_df || RAY_IS_ERR(new_df)) {
ray_release(col);
ray_release(schema);
ray_release(tbl);
return new_df ? new_df : ray_error("oom", NULL);
}
ray_release(col);
tbl = new_df;
}
ray_release(schema);
ray_err_t sym_check = validate_sym_columns(tbl, ncols);
if (sym_check != RAY_OK) {
ray_release(tbl);
return ray_error(ray_err_code_str(sym_check), NULL);
}
return tbl;
}
ray_t* ray_splay_load(const char* dir, const char* sym_path) {
return splay_load_impl(dir, sym_path, false);
}
ray_t* ray_read_splayed(const char* dir, const char* sym_path) {
return splay_load_impl(dir, sym_path, true);
}