#include "ops/internal.h"
#include "lang/internal.h"
#include "ops/temporal.h"
#include <time.h>
#define RTE_USEC_PER_SEC 1000000LL
#define RTE_USEC_PER_MIN (60LL * RTE_USEC_PER_SEC)
#define RTE_USEC_PER_HOUR (3600LL * RTE_USEC_PER_SEC)
#define RTE_USEC_PER_DAY (86400LL * RTE_USEC_PER_SEC)
static int64_t rte_extract_one(int64_t us, int field) {
if (field == RAY_EXTRACT_EPOCH) return us;
if (field == RAY_EXTRACT_HOUR) {
int64_t day_us = us % RTE_USEC_PER_DAY;
if (day_us < 0) day_us += RTE_USEC_PER_DAY;
return day_us / RTE_USEC_PER_HOUR;
}
if (field == RAY_EXTRACT_MINUTE) {
int64_t day_us = us % RTE_USEC_PER_DAY;
if (day_us < 0) day_us += RTE_USEC_PER_DAY;
return (day_us % RTE_USEC_PER_HOUR) / RTE_USEC_PER_MIN;
}
if (field == RAY_EXTRACT_SECOND) {
int64_t day_us = us % RTE_USEC_PER_DAY;
if (day_us < 0) day_us += RTE_USEC_PER_DAY;
return (day_us % RTE_USEC_PER_MIN) / RTE_USEC_PER_SEC;
}
int64_t days_since_2000 = us / RTE_USEC_PER_DAY;
if (us < 0 && us % RTE_USEC_PER_DAY != 0) days_since_2000--;
int64_t z = days_since_2000 + 10957 + 719468;
int64_t era = (z >= 0 ? z : z - 146096) / 146097;
uint64_t doe = (uint64_t)(z - era * 146097);
uint64_t yoe = (doe - doe/1460 + doe/36524 - doe/146096) / 365;
int64_t y = (int64_t)yoe + era * 400;
uint64_t doy_mar = doe - (365*yoe + yoe/4 - yoe/100);
uint64_t mp = (5*doy_mar + 2) / 153;
uint64_t d = doy_mar - (153*mp + 2) / 5 + 1;
uint64_t mo = mp < 10 ? mp + 3 : mp - 9;
y += (mo <= 2);
if (field == RAY_EXTRACT_YEAR) return y;
if (field == RAY_EXTRACT_MONTH) return (int64_t)mo;
if (field == RAY_EXTRACT_DAY) return (int64_t)d;
if (field == RAY_EXTRACT_DOW) {
return ((days_since_2000 % 7) + 7 + 5) % 7 + 1;
}
if (field == RAY_EXTRACT_DOY) {
static const int dbm[13] = {
0, 0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334
};
if (mo < 1 || mo > 12) return 0;
int leap = (y % 4 == 0 && (y % 100 != 0 || y % 400 == 0));
int64_t doy_jan = dbm[mo] + (int64_t)d;
if (mo > 2 && leap) doy_jan++;
return doy_jan;
}
return 0;
}
static inline int64_t rte_to_us(int8_t type, int64_t raw) {
if (type == RAY_DATE || type == -RAY_DATE) return raw * RTE_USEC_PER_DAY;
if (type == RAY_TIME || type == -RAY_TIME) return raw * 1000LL;
return raw >= 0 ? raw / 1000LL
: -(((-raw) + 999LL) / 1000LL);
}
static inline int64_t rte_us_to_ts_raw(int64_t us) { return us * 1000LL; }
ray_t* ray_temporal_extract(ray_t* input, int field) {
if (!input || RAY_IS_ERR(input)) return input;
if (input->type < 0) {
int8_t t = input->type;
if (t != -RAY_DATE && t != -RAY_TIME && t != -RAY_TIMESTAMP)
return ray_error("type", NULL);
if (RAY_ATOM_IS_NULL(input)) return ray_typed_null(-RAY_I64);
int64_t raw = input->i64;
int64_t us = rte_to_us(t, raw);
return ray_i64(rte_extract_one(us, field));
}
int8_t t = input->type;
if (t != RAY_DATE && t != RAY_TIME && t != RAY_TIMESTAMP)
return ray_error("type", NULL);
int64_t len = input->len;
ray_t* result = ray_vec_new(RAY_I64, len);
if (!result || RAY_IS_ERR(result)) return result;
result->len = len;
int64_t* out = (int64_t*)ray_data(result);
bool src_has_nulls =
(input->attrs & RAY_ATTR_HAS_NULLS) ||
((input->attrs & RAY_ATTR_SLICE) && input->slice_parent &&
(input->slice_parent->attrs & RAY_ATTR_HAS_NULLS));
const char* base = (const char*)ray_data(input);
for (int64_t i = 0; i < len; i++) {
if (src_has_nulls && ray_vec_is_null(input, i)) {
out[i] = 0;
ray_vec_set_null(result, i, true);
continue;
}
int64_t raw;
if (t == RAY_DATE) raw = (int64_t)((const int32_t*)base)[i];
else if (t == RAY_TIME) raw = (int64_t)((const int32_t*)base)[i];
else raw = ((const int64_t*)base)[i];
out[i] = rte_extract_one(rte_to_us(t, raw), field);
}
return result;
}
int ray_temporal_field_from_sym(int64_t sym_id) {
ray_t* s = ray_sym_str(sym_id);
if (!s) return -1;
const char* p = ray_str_ptr(s);
size_t n = ray_str_len(s);
if (!p) return -1;
if (n == 4 && memcmp(p, "yyyy", 4) == 0) return RAY_EXTRACT_YEAR;
if (n == 2 && memcmp(p, "mm", 2) == 0) return RAY_EXTRACT_MONTH;
if (n == 2 && memcmp(p, "dd", 2) == 0) return RAY_EXTRACT_DAY;
if (n == 2 && memcmp(p, "hh", 2) == 0) return RAY_EXTRACT_HOUR;
if (n == 6 && memcmp(p, "minute", 6) == 0) return RAY_EXTRACT_MINUTE;
if (n == 2 && memcmp(p, "ss", 2) == 0) return RAY_EXTRACT_SECOND;
if (n == 3 && memcmp(p, "dow", 3) == 0) return RAY_EXTRACT_DOW;
if (n == 3 && memcmp(p, "doy", 3) == 0) return RAY_EXTRACT_DOY;
return -1;
}
ray_t* ray_extract_ss_fn(ray_t* x) { return ray_temporal_extract(x, RAY_EXTRACT_SECOND); }
ray_t* ray_extract_hh_fn(ray_t* x) { return ray_temporal_extract(x, RAY_EXTRACT_HOUR); }
ray_t* ray_extract_minute_fn(ray_t* x) { return ray_temporal_extract(x, RAY_EXTRACT_MINUTE); }
ray_t* ray_extract_yyyy_fn(ray_t* x) { return ray_temporal_extract(x, RAY_EXTRACT_YEAR); }
ray_t* ray_extract_mm_fn(ray_t* x) { return ray_temporal_extract(x, RAY_EXTRACT_MONTH); }
ray_t* ray_extract_dd_fn(ray_t* x) { return ray_temporal_extract(x, RAY_EXTRACT_DAY); }
ray_t* ray_extract_dow_fn(ray_t* x) { return ray_temporal_extract(x, RAY_EXTRACT_DOW); }
ray_t* ray_extract_doy_fn(ray_t* x) { return ray_temporal_extract(x, RAY_EXTRACT_DOY); }
int ray_temporal_trunc_from_sym(int64_t sym_id) {
ray_t* s = ray_sym_str(sym_id);
if (!s) return -1;
const char* p = ray_str_ptr(s);
size_t n = ray_str_len(s);
if (!p) return -1;
if (n == 4 && memcmp(p, "date", 4) == 0) return RAY_EXTRACT_DAY;
if (n == 4 && memcmp(p, "time", 4) == 0) return RAY_EXTRACT_SECOND;
return -1;
}
ray_t* ray_temporal_truncate(ray_t* input, int kind) {
if (!input || RAY_IS_ERR(input)) return input;
if (input->type < 0) {
int8_t t = input->type;
if (t != -RAY_DATE && t != -RAY_TIME && t != -RAY_TIMESTAMP)
return ray_error("type", NULL);
if (RAY_ATOM_IS_NULL(input)) return ray_typed_null(-RAY_TIMESTAMP);
int64_t us = rte_to_us(t, input->i64);
int64_t bucket = (kind == RAY_EXTRACT_DAY)
? RTE_USEC_PER_DAY
: RTE_USEC_PER_SEC;
int64_t r = us % bucket;
int64_t out_us = us - r - (r < 0 ? bucket : 0);
return ray_timestamp(rte_us_to_ts_raw(out_us));
}
int8_t t = input->type;
if (t != RAY_DATE && t != RAY_TIME && t != RAY_TIMESTAMP)
return ray_error("type", NULL);
int64_t len = input->len;
ray_t* result = ray_vec_new(RAY_TIMESTAMP, len);
if (!result || RAY_IS_ERR(result)) return result;
result->len = len;
int64_t* out = (int64_t*)ray_data(result);
bool src_has_nulls =
(input->attrs & RAY_ATTR_HAS_NULLS) ||
((input->attrs & RAY_ATTR_SLICE) && input->slice_parent &&
(input->slice_parent->attrs & RAY_ATTR_HAS_NULLS));
const char* base = (const char*)ray_data(input);
int64_t bucket = (kind == RAY_EXTRACT_DAY)
? RTE_USEC_PER_DAY
: RTE_USEC_PER_SEC;
for (int64_t i = 0; i < len; i++) {
if (src_has_nulls && ray_vec_is_null(input, i)) {
out[i] = 0;
ray_vec_set_null(result, i, true);
continue;
}
int64_t raw;
if (t == RAY_DATE) raw = (int64_t)((const int32_t*)base)[i];
else if (t == RAY_TIME) raw = (int64_t)((const int32_t*)base)[i];
else raw = ((const int64_t*)base)[i];
int64_t us = rte_to_us(t, raw);
int64_t r = us % bucket;
out[i] = rte_us_to_ts_raw(us - r - (r < 0 ? bucket : 0));
}
return result;
}
ray_t* exec_extract(ray_graph_t* g, ray_op_t* op) {
ray_t* input = exec_node(g, op->inputs[0]);
if (!input || RAY_IS_ERR(input)) return input;
ray_op_ext_t* ext = find_ext(g, op->id);
if (!ext) { ray_release(input); return ray_error("nyi", NULL); }
int64_t field = ext->sym;
int64_t len = input->len;
int8_t in_type = input->type;
ray_t* result = ray_vec_new(RAY_I64, len);
if (!result || RAY_IS_ERR(result)) { ray_release(input); return result; }
result->len = len;
int64_t* out = (int64_t*)ray_data(result);
#undef USEC_PER_SEC
#define USEC_PER_SEC 1000000LL
#define USEC_PER_MIN (60LL * USEC_PER_SEC)
#define USEC_PER_HOUR (3600LL * USEC_PER_SEC)
#define USEC_PER_DAY (86400LL * USEC_PER_SEC)
bool src_has_nulls =
(input->attrs & RAY_ATTR_HAS_NULLS) ||
((input->attrs & RAY_ATTR_SLICE) && input->slice_parent &&
(input->slice_parent->attrs & RAY_ATTR_HAS_NULLS));
ray_morsel_t m;
ray_morsel_init(&m, input);
int64_t off = 0;
while (ray_morsel_next(&m)) {
int64_t n = m.morsel_len;
for (int64_t i = 0; i < n; i++) {
if (src_has_nulls && ray_vec_is_null(input, off + i)) {
out[off + i] = 0;
ray_vec_set_null(result, off + i, true);
continue;
}
int64_t us;
if (in_type == RAY_DATE) {
int32_t d = ((const int32_t*)m.morsel_ptr)[i];
us = (int64_t)d * USEC_PER_DAY;
} else if (in_type == RAY_TIME) {
int32_t ms = ((const int32_t*)m.morsel_ptr)[i];
us = (int64_t)ms * 1000LL;
} else {
int64_t ns = ((const int64_t*)m.morsel_ptr)[i];
us = ns >= 0 ? ns / 1000LL
: -(((-ns) + 999LL) / 1000LL);
}
if (field == RAY_EXTRACT_EPOCH) {
out[off + i] = us;
} else if (field == RAY_EXTRACT_HOUR) {
int64_t day_us = us % USEC_PER_DAY;
if (day_us < 0) day_us += USEC_PER_DAY;
out[off + i] = day_us / USEC_PER_HOUR;
} else if (field == RAY_EXTRACT_MINUTE) {
int64_t day_us = us % USEC_PER_DAY;
if (day_us < 0) day_us += USEC_PER_DAY;
out[off + i] = (day_us % USEC_PER_HOUR) / USEC_PER_MIN;
} else if (field == RAY_EXTRACT_SECOND) {
int64_t day_us = us % USEC_PER_DAY;
if (day_us < 0) day_us += USEC_PER_DAY;
out[off + i] = (day_us % USEC_PER_MIN) / USEC_PER_SEC;
} else {
int64_t days_since_2000 = us / USEC_PER_DAY;
if (us < 0 && us % USEC_PER_DAY != 0) days_since_2000--;
int64_t z = days_since_2000 + 10957 + 719468;
int64_t era = (z >= 0 ? z : z - 146096) / 146097;
uint64_t doe = (uint64_t)(z - era * 146097);
uint64_t yoe = (doe - doe/1460 + doe/36524 - doe/146096) / 365;
int64_t y = (int64_t)yoe + era * 400;
uint64_t doy_mar = doe - (365*yoe + yoe/4 - yoe/100);
uint64_t mp = (5*doy_mar + 2) / 153;
uint64_t d = doy_mar - (153*mp + 2) / 5 + 1;
uint64_t mo = mp < 10 ? mp + 3 : mp - 9;
y += (mo <= 2);
if (field == RAY_EXTRACT_YEAR) {
out[off + i] = y;
} else if (field == RAY_EXTRACT_MONTH) {
out[off + i] = (int64_t)mo;
} else if (field == RAY_EXTRACT_DAY) {
out[off + i] = (int64_t)d;
} else if (field == RAY_EXTRACT_DOW) {
out[off + i] = ((days_since_2000 % 7) + 7 + 5) % 7 + 1;
} else if (field == RAY_EXTRACT_DOY) {
static const int dbm[13] = {
0, 0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334
};
if (mo < 1 || mo > 12) { out[off + i] = 0; continue; }
int leap = (y % 4 == 0 && (y % 100 != 0 || y % 400 == 0));
int64_t doy_jan = dbm[mo] + (int64_t)d;
if (mo > 2 && leap) doy_jan++;
out[off + i] = doy_jan;
} else {
out[off + i] = 0;
}
}
}
off += n;
}
#undef USEC_PER_SEC
#undef USEC_PER_MIN
#undef USEC_PER_HOUR
#undef USEC_PER_DAY
ray_release(input);
return result;
}
static int64_t days_from_civil(int64_t y, int64_t m, int64_t d) {
y -= (m <= 2);
int64_t era = (y >= 0 ? y : y - 399) / 400;
uint64_t yoe = (uint64_t)(y - era * 400);
uint64_t doy = (153 * (m > 2 ? (uint64_t)m - 3 : (uint64_t)m + 9) + 2) / 5 + (uint64_t)d - 1;
uint64_t doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
return era * 146097 + (int64_t)doe - 719468 - 10957;
}
ray_t* exec_date_trunc(ray_graph_t* g, ray_op_t* op) {
ray_t* input = exec_node(g, op->inputs[0]);
if (!input || RAY_IS_ERR(input)) return input;
ray_op_ext_t* ext = find_ext(g, op->id);
if (!ext) { ray_release(input); return ray_error("nyi", NULL); }
int64_t field = ext->sym;
int64_t len = input->len;
int8_t in_type = input->type;
ray_t* result = ray_vec_new(RAY_TIMESTAMP, len);
if (!result || RAY_IS_ERR(result)) { ray_release(input); return result; }
result->len = len;
int64_t* out = (int64_t*)ray_data(result);
#define DT_USEC_PER_SEC 1000000LL
#define DT_USEC_PER_MIN (60LL * DT_USEC_PER_SEC)
#define DT_USEC_PER_HOUR (3600LL * DT_USEC_PER_SEC)
#define DT_USEC_PER_DAY (86400LL * DT_USEC_PER_SEC)
bool src_has_nulls =
(input->attrs & RAY_ATTR_HAS_NULLS) ||
((input->attrs & RAY_ATTR_SLICE) && input->slice_parent &&
(input->slice_parent->attrs & RAY_ATTR_HAS_NULLS));
ray_morsel_t m;
ray_morsel_init(&m, input);
int64_t off = 0;
while (ray_morsel_next(&m)) {
int64_t n = m.morsel_len;
for (int64_t i = 0; i < n; i++) {
if (src_has_nulls && ray_vec_is_null(input, off + i)) {
out[off + i] = 0;
ray_vec_set_null(result, off + i, true);
continue;
}
int64_t us;
if (in_type == RAY_DATE) {
int32_t d = ((const int32_t*)m.morsel_ptr)[i];
us = (int64_t)d * DT_USEC_PER_DAY;
} else if (in_type == RAY_TIME) {
int32_t ms = ((const int32_t*)m.morsel_ptr)[i];
us = (int64_t)ms * 1000LL;
} else {
int64_t ns = ((const int64_t*)m.morsel_ptr)[i];
us = ns >= 0 ? ns / 1000LL
: -(((-ns) + 999LL) / 1000LL);
}
int64_t out_us;
switch (field) {
case RAY_EXTRACT_SECOND: {
int64_t r = us % DT_USEC_PER_SEC;
out_us = us - r - (r < 0 ? DT_USEC_PER_SEC : 0);
break;
}
case RAY_EXTRACT_MINUTE: {
int64_t r = us % DT_USEC_PER_MIN;
out_us = us - r - (r < 0 ? DT_USEC_PER_MIN : 0);
break;
}
case RAY_EXTRACT_HOUR: {
int64_t r = us % DT_USEC_PER_HOUR;
out_us = us - r - (r < 0 ? DT_USEC_PER_HOUR : 0);
break;
}
case RAY_EXTRACT_DAY: {
int64_t r = us % DT_USEC_PER_DAY;
out_us = us - r - (r < 0 ? DT_USEC_PER_DAY : 0);
break;
}
case RAY_EXTRACT_MONTH: {
int64_t days2k = us / DT_USEC_PER_DAY;
if (us < 0 && us % DT_USEC_PER_DAY != 0) days2k--;
int64_t z = days2k + 10957 + 719468;
int64_t era = (z >= 0 ? z : z - 146096) / 146097;
uint64_t doe = (uint64_t)(z - era * 146097);
uint64_t yoe = (doe - doe/1460 + doe/36524 - doe/146096) / 365;
int64_t y = (int64_t)yoe + era * 400;
uint64_t doy_mar = doe - (365*yoe + yoe/4 - yoe/100);
uint64_t mp = (5*doy_mar + 2) / 153;
uint64_t mo = mp < 10 ? mp + 3 : mp - 9;
y += (mo <= 2);
out_us = days_from_civil(y, (int64_t)mo, 1) * DT_USEC_PER_DAY;
break;
}
case RAY_EXTRACT_YEAR: {
int64_t days2k = us / DT_USEC_PER_DAY;
if (us < 0 && us % DT_USEC_PER_DAY != 0) days2k--;
int64_t z = days2k + 10957 + 719468;
int64_t era = (z >= 0 ? z : z - 146096) / 146097;
uint64_t doe = (uint64_t)(z - era * 146097);
uint64_t yoe = (doe - doe/1460 + doe/36524 - doe/146096) / 365;
int64_t y = (int64_t)yoe + era * 400;
uint64_t doy_mar = doe - (365*yoe + yoe/4 - yoe/100);
uint64_t mp = (5*doy_mar + 2) / 153;
uint64_t mo = mp < 10 ? mp + 3 : mp - 9;
y += (mo <= 2);
out_us = days_from_civil(y, 1, 1) * DT_USEC_PER_DAY;
break;
}
default:
out_us = us;
break;
}
out[off + i] = out_us * 1000LL;
}
off += n;
}
#undef DT_USEC_PER_SEC
#undef DT_USEC_PER_MIN
#undef DT_USEC_PER_HOUR
#undef DT_USEC_PER_DAY
ray_release(input);
return result;
}
static bool is_global_arg(ray_t* arg) {
if (arg && arg->type == -RAY_SYM) {
ray_t* s = ray_sym_str(arg->i64);
if (s && ray_str_len(s) == 6 && memcmp(ray_str_ptr(s), "global", 6) == 0)
return true;
}
return false;
}
static time_t ray_epoch_offset(void) {
return (time_t)946684800;
}
ray_t* ray_date_clock_fn(ray_t* arg) {
if (arg) {
int8_t t = arg->type < 0 ? (int8_t)-arg->type : arg->type;
if (t == RAY_DATE || t == RAY_TIME || t == RAY_TIMESTAMP)
return ray_temporal_truncate(arg, RAY_EXTRACT_DAY);
}
bool local = !is_global_arg(arg);
time_t now = time(NULL);
struct tm* t = local ? localtime(&now) : gmtime(&now);
if (!t) return ray_error("domain", "date: failed to get current time");
struct tm day = *t;
day.tm_hour = 0; day.tm_min = 0; day.tm_sec = 0; day.tm_isdst = -1;
time_t day_time = mktime(&day);
if (!local) {
int32_t days = (int32_t)((now - ray_epoch_offset()) / 86400);
return ray_date((int64_t)days);
}
int32_t days = (int32_t)((day_time - ray_epoch_offset()) / 86400);
return ray_date((int64_t)days);
}
ray_t* ray_time_clock_fn(ray_t* arg) {
if (arg) {
int8_t t = arg->type < 0 ? (int8_t)-arg->type : arg->type;
if (t == RAY_DATE || t == RAY_TIME || t == RAY_TIMESTAMP)
return ray_temporal_truncate(arg, RAY_EXTRACT_SECOND);
}
bool local = !is_global_arg(arg);
time_t now = time(NULL);
struct tm* t = local ? localtime(&now) : gmtime(&now);
if (!t) return ray_error("domain", "time: failed to get current time");
int32_t ms = t->tm_hour * 3600000 + t->tm_min * 60000 + t->tm_sec * 1000;
return ray_time((int64_t)ms);
}
ray_t* ray_timestamp_clock_fn(ray_t* arg) {
bool local = !is_global_arg(arg);
time_t now = time(NULL);
struct tm* t = local ? localtime(&now) : gmtime(&now);
if (!t) return ray_error("domain", "timestamp: failed to get current time");
int64_t secs;
if (!local) {
secs = now - ray_epoch_offset();
} else {
struct tm lt = *t;
lt.tm_isdst = -1;
secs = mktime(<) - ray_epoch_offset();
}
int64_t nanos = secs * 1000000000LL;
return ray_timestamp(nanos);
}