#[cfg(feature = "fast_hash")]
use ahash::AHashSet as HashSet;
#[cfg(not(feature = "fast_hash"))]
use std::collections::HashSet;
use std::simd::{Mask, MaskElement};
use std::{fmt::Display, sync::Arc};
use crate::enums::error::KernelError;
#[cfg(feature = "chunked")]
use crate::enums::error::MinarrowError;
#[cfg(feature = "chunked")]
use crate::structs::field_array::create_field_for_array;
use crate::traits::masked_array::MaskedArray;
#[cfg(feature = "chunked")]
use crate::{Array, FieldArray, SuperArray};
use crate::{
Bitmask, CategoricalArray, Float, FloatArray, Integer, IntegerArray, StringArray, TextArray,
};
#[inline(always)]
pub fn validate_null_mask_len(data_len: usize, null_mask: &Option<Bitmask>) {
if let Some(mask) = null_mask {
assert_eq!(
mask.len(),
data_len,
"Validation Error: Null mask length ({}) does not match data length ({})",
mask.len(),
data_len
);
}
}
pub fn parse_datetime_str(s: &str) -> Option<i64> {
if s.is_empty() {
return None;
}
#[cfg(feature = "datetime_ops")]
{
use time::{
Date, OffsetDateTime, PrimitiveDateTime, Time,
format_description::well_known::{Iso8601, Rfc3339},
macros::format_description,
};
if let Ok(dt) = OffsetDateTime::parse(s, &Rfc3339) {
return Some(dt.unix_timestamp() * 1_000 + (dt.nanosecond() / 1_000_000) as i64);
}
if let Ok(dt) = OffsetDateTime::parse(s, &Iso8601::DEFAULT) {
return Some(dt.unix_timestamp() * 1_000 + (dt.nanosecond() / 1_000_000) as i64);
}
let format = format_description!("[year]-[month]-[day] [hour]:[minute]:[second]");
if let Ok(dt) = PrimitiveDateTime::parse(s, format) {
let dt_utc = dt.assume_utc();
return Some(
dt_utc.unix_timestamp() * 1_000 + (dt_utc.nanosecond() / 1_000_000) as i64,
);
}
let date_format = format_description!("[year]-[month]-[day]");
if let Ok(date) = Date::parse(s, date_format) {
if let Ok(dt) = date.with_hms(0, 0, 0) {
let dt_utc = dt.assume_utc();
return Some(dt_utc.unix_timestamp() * 1_000);
}
}
let time_format = format_description!("[hour]:[minute]:[second]");
if let Ok(time) = Time::parse(s, time_format) {
let today = OffsetDateTime::now_utc().date();
let dt_primitive = today.with_time(time);
let dt_utc = dt_primitive.assume_utc();
return Some(
dt_utc.unix_timestamp() * 1_000 + (dt_utc.nanosecond() / 1_000_000) as i64,
);
}
}
if let Ok(ms) = s.parse::<i64>() {
return Some(ms);
}
None
}
pub fn int_to_text_array<T: Display + Integer>(arr: &Arc<IntegerArray<T>>) -> TextArray {
let mut strings: Vec<String> = Vec::with_capacity(arr.len());
for i in 0..arr.len() {
if arr.is_null(i) {
strings.push(String::new()); } else {
strings.push(format!("{}", arr.data[i]));
}
}
let refs: Vec<&str> = strings.iter().map(String::as_str).collect();
let string_array = StringArray::<u32>::from_vec(refs, arr.null_mask.clone());
TextArray::String32(Arc::new(string_array))
}
pub fn float_to_text_array<T: Display + Float>(arr: &Arc<FloatArray<T>>) -> TextArray {
let mut strings: Vec<String> = Vec::with_capacity(arr.len());
for i in 0..arr.len() {
if arr.is_null(i) {
strings.push(String::new()); } else {
strings.push(format!("{}", arr.data[i]));
}
}
let refs: Vec<&str> = strings.iter().map(String::as_str).collect();
let string_array = StringArray::<u32>::from_vec(refs, arr.null_mask.clone());
TextArray::String32(Arc::new(string_array))
}
#[inline(always)]
pub fn confirm_equal_len(label: &str, a: usize, b: usize) -> Result<(), KernelError> {
if a != b {
return Err(KernelError::LengthMismatch(format!(
"{}: length mismatch (lhs: {}, rhs: {})",
label, a, b
)));
}
Ok(())
}
#[inline(always)]
pub fn align64(byte_count: usize) -> usize {
(byte_count + 63) & !63
}
#[inline(always)]
pub fn is_simd_aligned<T>(slice: &[T]) -> bool {
if slice.is_empty() {
true
} else {
(slice.as_ptr() as usize) % 64 == 0
}
}
#[inline(always)]
pub fn simd_mask<T: MaskElement, const N: usize>(
mask: &Bitmask,
offset: usize,
len: usize,
) -> Mask<T, N>
where
{
let word_idx = offset / 64;
let bit_shift = offset % 64;
let raw = unsafe { mask.word_unchecked(word_idx) } >> bit_shift;
let raw = if bit_shift > 0 && word_idx + 1 < (mask.len + 63) / 64 {
raw | (unsafe { mask.word_unchecked(word_idx + 1) } << (64 - bit_shift))
} else {
raw
};
let remaining = if offset < len { len - offset } else { 0 };
let raw = if remaining < N && remaining < 64 {
raw & ((1u64 << remaining) - 1)
} else {
raw
};
Mask::from_bitmask(raw)
}
#[inline(always)]
pub fn write_simd_mask_bits<T: MaskElement, const N: usize>(
out_mask: &mut Bitmask,
offset: usize,
m: Mask<T, N>,
)
where
{
let mbits = m.to_bitmask();
let word_idx = offset / 64;
let bit_shift = offset % 64;
unsafe {
let existing = out_mask.word_unchecked(word_idx);
let lane_mask = if N >= 64 { !0u64 } else { (1u64 << N) - 1 };
let cleared = existing & !(lane_mask << bit_shift);
out_mask.set_word_unchecked(word_idx, cleared | (mbits << bit_shift));
if bit_shift > 0 && bit_shift + N > 64 {
let overflow_bits = N - (64 - bit_shift);
let next_existing = out_mask.word_unchecked(word_idx + 1);
let overflow_mask = (1u64 << overflow_bits) - 1;
let cleared_next = next_existing & !overflow_mask;
out_mask.set_word_unchecked(word_idx + 1, cleared_next | (mbits >> (64 - bit_shift)));
}
}
}
#[inline(always)]
pub fn confirm_mask_capacity(cmp_len: usize, mask: Option<&Bitmask>) -> Result<(), KernelError> {
if let Some(m) = mask {
confirm_capacity("mask (Bitmask)", m.capacity(), cmp_len)?;
}
Ok(())
}
#[inline(always)]
pub fn confirm_capacity(label: &str, actual: usize, expected: usize) -> Result<(), KernelError> {
if actual != expected {
return Err(KernelError::InvalidArguments(format!(
"{}: capacity mismatch (expected {}, got {})",
label, expected, actual
)));
}
Ok(())
}
#[inline(always)]
pub fn estimate_categorical_cardinality(cat: &CategoricalArray<u32>, sample_size: usize) -> f64 {
let len = cat.data.len();
if len == 0 {
return 0.0;
}
let mut seen = HashSet::with_capacity(sample_size.min(len));
let step = (len / sample_size.max(1)).max(1);
for i in (0..len).step_by(step) {
let s = unsafe { cat.get_str_unchecked(i) };
seen.insert(s);
if seen.len() >= sample_size {
break;
}
}
(seen.len() as f64) / (sample_size.min(len) as f64)
}
#[inline(always)]
pub fn estimate_string_cardinality<T: Integer>(arr: &StringArray<T>, sample_size: usize) -> f64 {
let len = arr.len();
if len == 0 {
return 0.0;
}
let mut seen = HashSet::with_capacity(sample_size.min(len));
let step = (len / sample_size.max(1)).max(1);
for i in (0..len).step_by(step) {
let s = unsafe { arr.get_str_unchecked(i) };
seen.insert(s);
if seen.len() >= sample_size {
break;
}
}
(seen.len() as f64) / (sample_size.min(len) as f64)
}
#[cfg(feature = "chunked")]
fn union_array_superarray_masks(
array: &Array,
super_array: &SuperArray,
) -> Result<Option<Bitmask>, MinarrowError> {
let array_mask = array.null_mask();
let super_array_masks: Vec<Option<&Bitmask>> = super_array
.chunks()
.iter()
.map(|chunk| chunk.null_mask())
.collect();
let super_array_concatenated_mask = if super_array_masks
.iter()
.any(|m: &Option<&Bitmask>| m.is_some())
{
let mut concatenated_bits = Vec::new();
for (chunk, mask_opt) in super_array.chunks().iter().zip(super_array_masks.iter()) {
if let Some(mask) = mask_opt {
concatenated_bits.extend((0..mask.len()).map(|i| mask.get(i)));
} else {
concatenated_bits.extend(std::iter::repeat(true).take(chunk.len()));
}
}
Some(Bitmask::from_bools(&concatenated_bits))
} else {
None
};
match (array_mask, super_array_concatenated_mask) {
(Some(arr_mask), Some(super_mask)) => {
if arr_mask.len() == super_mask.len() {
Ok(Some(arr_mask.union(&super_mask)))
} else {
Err(MinarrowError::ShapeError {
message: format!(
"Mask lengths must match for union: {} vs {}",
arr_mask.len(),
super_mask.len()
),
})
}
}
(Some(mask), None) => Ok(Some(mask.clone())),
(None, Some(mask)) => Ok(Some(mask)),
(None, None) => Ok(None),
}
}
#[cfg(feature = "chunked")]
pub fn create_aligned_chunks_from_array(
array: Array,
super_array: &SuperArray,
field_name: &str,
) -> Result<SuperArray, MinarrowError> {
if array.len() != super_array.len() {
return Err(MinarrowError::ShapeError {
message: format!(
"Array and SuperArray must have same total length for broadcasting: {} vs {}",
array.len(),
super_array.len()
),
});
}
let full_mask = union_array_superarray_masks(&array, super_array)?;
let chunk_lengths: Vec<usize> = super_array
.chunks()
.iter()
.map(|chunk| chunk.len())
.collect();
let mut start = 0;
let mut mask_start = 0;
let chunks: Result<Vec<_>, _> = chunk_lengths
.iter()
.map(|&chunk_len| {
let end = start + chunk_len;
if end > array.len() {
return Err(MinarrowError::ShapeError {
message: format!(
"Chunk alignment failed: index {} out of bounds for length {}",
end,
array.len()
),
});
}
let view = array.view(start, chunk_len);
let mut array_chunk = view.array.slice_clone(view.offset, view.len());
if let Some(ref mask) = full_mask {
let mask_end = mask_start + chunk_len;
let chunk_mask_bits: Vec<bool> =
(mask_start..mask_end).map(|i| mask.get(i)).collect();
let chunk_mask = Bitmask::from_bools(&chunk_mask_bits);
array_chunk.set_null_mask(chunk_mask);
mask_start = mask_end;
}
start = end;
let first_super_chunk = &super_array.chunks()[0];
let field =
create_field_for_array(field_name, &array_chunk, Some(first_super_chunk), None);
Ok(FieldArray::new(field, array_chunk))
})
.collect();
Ok(SuperArray::from_chunks(chunks?))
}