use crate::error::ChartonError;
use ahash::{AHashMap, AHashSet};
use std::fmt;
use std::sync::Arc;
use time::OffsetDateTime;
#[derive(Clone, Debug)]
pub enum ColumnVector {
F64 { data: Vec<f64> },
F32 { data: Vec<f32> },
I64 {
data: Vec<i64>,
validity: Option<Vec<u8>>,
},
I32 {
data: Vec<i32>,
validity: Option<Vec<u8>>,
},
U32 {
data: Vec<u32>,
validity: Option<Vec<u8>>,
},
String {
data: Vec<String>,
validity: Option<Vec<u8>>,
},
DateTime {
data: Vec<OffsetDateTime>,
validity: Option<Vec<u8>>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SemanticType {
Continuous,
Discrete,
Temporal,
}
impl ColumnVector {
pub fn semantic_type(&self) -> SemanticType {
match self {
ColumnVector::F64 { .. }
| ColumnVector::F32 { .. }
| ColumnVector::I64 { .. }
| ColumnVector::I32 { .. }
| ColumnVector::U32 { .. } => SemanticType::Continuous,
ColumnVector::String { .. } => SemanticType::Discrete,
ColumnVector::DateTime { .. } => SemanticType::Temporal,
}
}
pub fn dtype_name(&self) -> &'static str {
match self {
ColumnVector::F64 { .. } => "f64",
ColumnVector::F32 { .. } => "f32",
ColumnVector::I64 { .. } => "i64",
ColumnVector::I32 { .. } => "i32",
ColumnVector::U32 { .. } => "u32",
ColumnVector::String { .. } => "str", ColumnVector::DateTime { .. } => "datetime", }
}
pub fn len(&self) -> usize {
match self {
ColumnVector::F64 { data } => data.len(),
ColumnVector::F32 { data } => data.len(),
ColumnVector::I64 { data, .. } => data.len(),
ColumnVector::I32 { data, .. } => data.len(),
ColumnVector::U32 { data, .. } => data.len(),
ColumnVector::String { data, .. } => data.len(),
ColumnVector::DateTime { data, .. } => data.len(),
}
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn is_valid_in_mask(mask: &Option<Vec<u8>>, index: usize) -> bool {
match mask {
None => true,
Some(bits) => {
let byte_idx = index / 8;
let bit_idx = index % 8;
bits.get(byte_idx)
.map(|&byte| (byte >> bit_idx) & 1 == 1)
.unwrap_or(false)
}
}
}
pub fn get_f64(&self, row: usize) -> Option<f64> {
match self {
ColumnVector::F64 { data } => {
let v = data[row];
if v.is_nan() { None } else { Some(v) }
}
ColumnVector::F32 { data } => {
let v = data[row];
if v.is_nan() { None } else { Some(v as f64) }
}
ColumnVector::I64 { data, validity } => {
if ColumnVector::is_valid_in_mask(validity, row) {
Some(data[row] as f64)
} else {
None
}
}
ColumnVector::I32 { data, validity } => {
if ColumnVector::is_valid_in_mask(validity, row) {
Some(data[row] as f64)
} else {
None
}
}
ColumnVector::U32 { data, validity } => {
if ColumnVector::is_valid_in_mask(validity, row) {
Some(data[row] as f64)
} else {
None
}
}
ColumnVector::DateTime { data, validity } => {
if ColumnVector::is_valid_in_mask(validity, row) {
Some(data[row].unix_timestamp_nanos() as f64) } else {
None
}
}
ColumnVector::String { .. } => None,
}
}
pub fn get_f64_or(&self, row: usize, default: f64) -> f64 {
self.get_f64(row).unwrap_or(default)
}
pub fn to_f64_vec(&self) -> Vec<f64> {
let n = self.len();
let mut out = Vec::with_capacity(n);
match self {
ColumnVector::F64 { data } => {
out.extend(data.iter().map(|&v| if v.is_nan() { 0.0 } else { v }));
}
ColumnVector::F32 { data } => {
out.extend(
data.iter()
.map(|&v| if v.is_nan() { 0.0 } else { v as f64 }),
);
}
_ => {
for i in 0..n {
out.push(self.get_f64(i).unwrap_or(0.0));
}
}
}
out
}
pub fn to_f64_options(&self) -> Vec<Option<f64>> {
(0..self.len()).map(|i| self.get_f64(i)).collect()
}
pub fn get_str(&self, row: usize) -> Option<String> {
match self {
ColumnVector::String { data, validity } => {
if Self::is_valid_in_mask(validity, row) {
Some(data[row].clone())
} else {
None
}
}
ColumnVector::I64 { data, validity } => {
if Self::is_valid_in_mask(validity, row) {
Some(format!("{}", data[row]))
} else {
None
}
}
ColumnVector::I32 { data, validity } => {
if Self::is_valid_in_mask(validity, row) {
Some(format!("{}", data[row]))
} else {
None
}
}
ColumnVector::U32 { data, validity } => {
if Self::is_valid_in_mask(validity, row) {
Some(format!("{}", data[row]))
} else {
None
}
}
ColumnVector::F64 { data } => {
let v = data[row];
if v.is_nan() {
None
} else {
Some(format!("{}", v))
}
}
ColumnVector::F32 { data } => {
let v = data[row];
if v.is_nan() {
None
} else {
Some(format!("{}", v))
}
}
ColumnVector::DateTime { data, validity } => {
if Self::is_valid_in_mask(validity, row) {
Some(format!("{}", data[row]))
} else {
None
}
}
}
}
pub fn get_str_or(&self, row: usize, default: &str) -> String {
self.get_str(row).unwrap_or_else(|| default.to_string())
}
pub fn n_unique(&self) -> usize {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
match self {
ColumnVector::F64 { data } => {
data.par_iter()
.filter(|&&v| !v.is_nan())
.fold(AHashSet::new, |mut set, &v| {
let norm = if v == 0.0 { 0.0 } else { v };
set.insert(norm.to_bits());
set
})
.reduce(AHashSet::new, |mut s1, s2| {
s1.extend(s2);
s1
})
.len()
}
ColumnVector::F32 { data } => data
.par_iter()
.filter(|&&v| !v.is_nan())
.fold(AHashSet::new, |mut set, &v| {
let norm = if v == 0.0 { 0.0 } else { v };
set.insert(norm.to_bits());
set
})
.reduce(AHashSet::new, |mut s1, s2| {
s1.extend(s2);
s1
})
.len(),
ColumnVector::String { data, validity } => (0..data.len())
.into_par_iter()
.fold(AHashSet::new, |mut set, i| {
if Self::is_valid_in_mask(validity, i) {
set.insert(&data[i]);
}
set
})
.reduce(AHashSet::new, |mut s1, s2| {
s1.extend(s2);
s1
})
.len(),
ColumnVector::I64 { data, validity } => (0..data.len())
.into_par_iter()
.fold(AHashSet::new, |mut set, i| {
if Self::is_valid_in_mask(validity, i) {
set.insert(data[i]);
}
set
})
.reduce(AHashSet::new, |mut s1, s2| {
s1.extend(s2);
s1
})
.len(),
ColumnVector::I32 { data, validity } => (0..data.len())
.into_par_iter()
.fold(AHashSet::new, |mut set, i| {
if Self::is_valid_in_mask(validity, i) {
set.insert(data[i]);
}
set
})
.reduce(AHashSet::new, |mut s1, s2| {
s1.extend(s2);
s1
})
.len(),
ColumnVector::U32 { data, validity } => (0..data.len())
.into_par_iter()
.fold(AHashSet::new, |mut set, i| {
if Self::is_valid_in_mask(validity, i) {
set.insert(data[i]);
}
set
})
.reduce(AHashSet::new, |mut s1, s2| {
s1.extend(s2);
s1
})
.len(),
ColumnVector::DateTime { data, validity } => (0..data.len())
.into_par_iter()
.fold(AHashSet::new, |mut set, i| {
if Self::is_valid_in_mask(validity, i) {
set.insert(data[i]);
}
set
})
.reduce(AHashSet::new, |mut s1, s2| {
s1.extend(s2);
s1
})
.len(),
}
}
#[cfg(not(feature = "parallel"))]
{
self.n_unique_serial()
}
}
#[cfg(not(feature = "parallel"))]
fn n_unique_serial(&self) -> usize {
match self {
ColumnVector::F64 { data } => {
let mut seen = AHashSet::with_capacity(data.len() / 4);
for &v in data {
if !v.is_nan() {
let norm = if v == 0.0 { 0.0 } else { v };
seen.insert(norm.to_bits());
}
}
seen.len()
}
ColumnVector::F32 { data } => {
let mut seen = AHashSet::with_capacity(data.len() / 4);
for &v in data {
if !v.is_nan() {
let norm = if v == 0.0 { 0.0 } else { v };
seen.insert(norm.to_bits());
}
}
seen.len()
}
ColumnVector::I64 { data, validity } => {
let mut seen = AHashSet::new();
for (i, &v) in data.iter().enumerate() {
if Self::is_valid_in_mask(validity, i) {
seen.insert(v);
}
}
seen.len()
}
ColumnVector::I32 { data, validity } => {
let mut seen = AHashSet::new();
for (i, &v) in data.iter().enumerate() {
if Self::is_valid_in_mask(validity, i) {
seen.insert(v);
}
}
seen.len()
}
ColumnVector::U32 { data, validity } => {
let mut seen = AHashSet::new();
for (i, &v) in data.iter().enumerate() {
if Self::is_valid_in_mask(validity, i) {
seen.insert(v);
}
}
seen.len()
}
ColumnVector::String { data, validity } => {
let mut seen = AHashSet::new();
for (i, s) in data.iter().enumerate() {
if Self::is_valid_in_mask(validity, i) {
seen.insert(s);
}
}
seen.len()
}
ColumnVector::DateTime { data, validity } => {
let mut seen = AHashSet::new();
for (i, &dt) in data.iter().enumerate() {
if Self::is_valid_in_mask(validity, i) {
seen.insert(dt);
}
}
seen.len()
}
}
}
pub fn unique_values(&self) -> Vec<String> {
let mut result = Vec::new();
let mut seen = AHashSet::new();
match self {
ColumnVector::F64 { data } => {
for &v in data {
if !v.is_nan() {
let s = v.to_string();
if seen.insert(s.clone()) {
result.push(s);
}
}
}
}
ColumnVector::F32 { data } => {
for &v in data {
if !v.is_nan() {
let s = v.to_string();
if seen.insert(s.clone()) {
result.push(s);
}
}
}
}
ColumnVector::I64 { data, validity } => {
for (i, &v) in data.iter().enumerate() {
if Self::is_valid_in_mask(validity, i) {
let s = v.to_string();
if seen.insert(s.clone()) {
result.push(s);
}
}
}
}
ColumnVector::I32 { data, validity } => {
for (i, &v) in data.iter().enumerate() {
if Self::is_valid_in_mask(validity, i) {
let s = v.to_string();
if seen.insert(s.clone()) {
result.push(s);
}
}
}
}
ColumnVector::U32 { data, validity } => {
for (i, &v) in data.iter().enumerate() {
if Self::is_valid_in_mask(validity, i) {
let s = v.to_string();
if seen.insert(s.clone()) {
result.push(s);
}
}
}
}
ColumnVector::String { data, validity } => {
for (i, s) in data.iter().enumerate() {
if Self::is_valid_in_mask(validity, i) && seen.insert(s.clone()) {
result.push(s.clone());
}
}
}
ColumnVector::DateTime { data, validity } => {
for (i, dt) in data.iter().enumerate() {
if Self::is_valid_in_mask(validity, i) {
let s = dt.to_string();
if seen.insert(s.clone()) {
result.push(s);
}
}
}
}
}
result
}
pub fn min_max(&self) -> (f64, f64) {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
let identity = (f64::INFINITY, f64::NEG_INFINITY);
match self {
ColumnVector::F64 { data } => data
.par_iter()
.filter(|&&v| !v.is_nan())
.fold(|| identity, |(min, max), &v| (min.min(v), max.max(v)))
.reduce(|| identity, |(m1, x1), (m2, x2)| (m1.min(m2), x1.max(x2))),
ColumnVector::F32 { data } => data
.par_iter()
.filter(|&&v| !v.is_nan())
.fold(
|| identity,
|(min, max), &v| {
let v64 = v as f64;
(min.min(v64), max.max(v64))
},
)
.reduce(|| identity, |(m1, x1), (m2, x2)| (m1.min(m2), x1.max(x2))),
ColumnVector::I64 { data, validity } => {
self.parallel_scan_with_mask(data, validity, |&v| v as f64)
}
ColumnVector::I32 { data, validity } => {
self.parallel_scan_with_mask(data, validity, |&v| v as f64)
}
ColumnVector::U32 { data, validity } => {
self.parallel_scan_with_mask(data, validity, |&v| v as f64)
}
ColumnVector::DateTime { data, validity } => {
self.parallel_scan_with_mask(data, validity, |&v| {
v.unix_timestamp_nanos() as f64
})
}
_ => (0.0, 0.0),
}
}
#[cfg(not(feature = "parallel"))]
{
self.min_max_serial()
}
}
#[cfg(feature = "parallel")]
fn parallel_scan_with_mask<T, F>(
&self,
data: &[T],
validity: &Option<Vec<u8>>,
convert: F,
) -> (f64, f64)
where
T: Copy + Sync + Send,
F: Fn(&T) -> f64 + Sync + Send,
{
use rayon::prelude::*;
let identity = (f64::INFINITY, f64::NEG_INFINITY);
if let Some(mask) = validity {
data.par_iter()
.enumerate()
.fold(
|| identity,
|(min, max), (i, v)| {
if (mask[i / 8] >> (i % 8)) & 1 == 1 {
let val = convert(v);
(min.min(val), max.max(val))
} else {
(min, max)
}
},
)
.reduce(|| identity, |(m1, x1), (m2, x2)| (m1.min(m2), x1.max(x2)))
} else {
data.par_iter()
.fold(
|| identity,
|(min, max), v| {
let val = convert(v);
(min.min(val), max.max(val))
},
)
.reduce(|| identity, |(m1, x1), (m2, x2)| (m1.min(m2), x1.max(x2)))
}
}
#[cfg(not(feature = "parallel"))]
fn min_max_serial(&self) -> (f64, f64) {
let identity = (f64::INFINITY, f64::NEG_INFINITY);
match self {
ColumnVector::F64 { data } => {
let mut m = identity;
for &v in data {
if !v.is_nan() {
m.0 = m.0.min(v);
m.1 = m.1.max(v);
}
}
m
}
ColumnVector::F32 { data } => {
let mut m = identity;
for &v in data {
if !v.is_nan() {
let v64 = v as f64;
m.0 = m.0.min(v64);
m.1 = m.1.max(v64);
}
}
m
}
ColumnVector::I64 { data, validity } => {
self.serial_scan_with_mask(data, validity, |&v| v as f64)
}
ColumnVector::I32 { data, validity } => {
self.serial_scan_with_mask(data, validity, |&v| v as f64)
}
ColumnVector::U32 { data, validity } => {
self.serial_scan_with_mask(data, validity, |&v| v as f64)
}
ColumnVector::DateTime { data, validity } => {
self.serial_scan_with_mask(data, validity, |&v| v.unix_timestamp_nanos() as f64)
}
_ => (0.0, 0.0),
}
}
#[cfg(not(feature = "parallel"))]
fn serial_scan_with_mask<T, F>(
&self,
data: &[T],
validity: &Option<Vec<u8>>,
convert: F,
) -> (f64, f64)
where
F: Fn(&T) -> f64,
{
let mut min = f64::INFINITY;
let mut max = f64::NEG_INFINITY;
if let Some(mask) = validity {
for (i, v) in data.iter().enumerate() {
if (mask[i / 8] >> (i % 8)) & 1 == 1 {
let val = convert(v);
if val < min {
min = val;
}
if val > max {
max = val;
}
}
}
} else {
for v in data {
let val = convert(v);
if val < min {
min = val;
}
if val > max {
max = val;
}
}
}
(min, max)
}
#[cfg(feature = "arrow")]
pub fn from_arrow(array: &dyn Array) -> Result<Self, ChartonError> {
match array.data_type() {
DataType::Float64 => {
let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
let data: Vec<f64> = (0..arr.len())
.map(|i| {
if arr.is_null(i) {
f64::NAN
} else {
arr.value(i)
}
})
.collect();
Ok(ColumnVector::F64 { data })
}
DataType::Float32 => {
let arr = array.as_any().downcast_ref::<Float32Array>().unwrap();
let data: Vec<f32> = (0..arr.len())
.map(|i| {
if arr.is_null(i) {
f32::NAN
} else {
arr.value(i)
}
})
.collect();
Ok(ColumnVector::F32 { data })
}
DataType::Int64 => {
let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
let (data, validity) = collect_with_validity(
(0..arr.len()).map(|i| {
if arr.is_valid(i) {
Some(arr.value(i))
} else {
None
}
}),
0i64,
);
Ok(ColumnVector::I64 { data, validity })
}
DataType::Utf8 | DataType::LargeUtf8 => {
let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
let (data, validity) = collect_with_validity(
(0..arr.len()).map(|i| {
if arr.is_valid(i) {
Some(arr.value(i).to_string())
} else {
None
}
}),
String::new(),
);
Ok(ColumnVector::String { data, validity })
}
DataType::Timestamp(unit, _) => {
let (data, validity) = match unit {
TimeUnit::Second => {
let arr = array
.as_any()
.downcast_ref::<arrow::array::TimestampSecondArray>()
.unwrap();
collect_with_validity(
(0..arr.len()).map(|i| {
if arr.is_valid(i) {
Some(
OffsetDateTime::from_unix_timestamp(arr.value(i))
.unwrap_or(OffsetDateTime::UNIX_EPOCH),
)
} else {
None
}
}),
OffsetDateTime::UNIX_EPOCH,
)
}
TimeUnit::Millisecond => {
let arr = array
.as_any()
.downcast_ref::<arrow::array::TimestampMillisecondArray>()
.unwrap();
collect_with_validity(
(0..arr.len()).map(|i| {
if arr.is_valid(i) {
Some(
OffsetDateTime::from_unix_timestamp_nanos(
arr.value(i) as i128 * 1_000_000,
)
.unwrap_or(OffsetDateTime::UNIX_EPOCH),
)
} else {
None
}
}),
OffsetDateTime::UNIX_EPOCH,
)
}
TimeUnit::Microsecond => {
let arr = array
.as_any()
.downcast_ref::<arrow::array::TimestampMicrosecondArray>()
.unwrap();
collect_with_validity(
(0..arr.len()).map(|i| {
if arr.is_valid(i) {
Some(
OffsetDateTime::from_unix_timestamp_nanos(
arr.value(i) as i128 * 1_000,
)
.unwrap_or(OffsetDateTime::UNIX_EPOCH),
)
} else {
None
}
}),
OffsetDateTime::UNIX_EPOCH,
)
}
TimeUnit::Nanosecond => {
let arr = array
.as_any()
.downcast_ref::<arrow::array::TimestampNanosecondArray>()
.unwrap();
collect_with_validity(
(0..arr.len()).map(|i| {
if arr.is_valid(i) {
Some(
OffsetDateTime::from_unix_timestamp_nanos(
arr.value(i) as i128
)
.unwrap_or(OffsetDateTime::UNIX_EPOCH),
)
} else {
None
}
}),
OffsetDateTime::UNIX_EPOCH,
)
}
};
Ok(ColumnVector::DateTime { data, validity })
}
_ => Err(ChartonError::Data(format!(
"Unsupported Arrow type: {:?}",
array.data_type()
))),
}
}
pub fn slice(&self, offset: usize, len: usize) -> Self {
match self {
ColumnVector::F64 { data } => ColumnVector::F64 {
data: data[offset..offset + len].to_vec(),
},
ColumnVector::F32 { data } => ColumnVector::F32 {
data: data[offset..offset + len].to_vec(),
},
ColumnVector::I64 { data, validity } => ColumnVector::I64 {
data: data[offset..offset + len].to_vec(),
validity: validity
.as_ref()
.map(|v| self.slice_validity(v, offset, len)),
},
ColumnVector::I32 { data, validity } => ColumnVector::I32 {
data: data[offset..offset + len].to_vec(),
validity: validity
.as_ref()
.map(|v| self.slice_validity(v, offset, len)),
},
ColumnVector::U32 { data, validity } => ColumnVector::U32 {
data: data[offset..offset + len].to_vec(),
validity: validity
.as_ref()
.map(|v| self.slice_validity(v, offset, len)),
},
ColumnVector::String { data, validity } => ColumnVector::String {
data: data[offset..offset + len].to_vec(),
validity: validity
.as_ref()
.map(|v| self.slice_validity(v, offset, len)),
},
ColumnVector::DateTime { data, validity } => ColumnVector::DateTime {
data: data[offset..offset + len].to_vec(),
validity: validity
.as_ref()
.map(|v| self.slice_validity(v, offset, len)),
},
}
}
fn slice_validity(&self, v: &[u8], offset: usize, len: usize) -> Vec<u8> {
let mut new_v = vec![0u8; len.div_ceil(8)];
for i in 0..len {
let old_idx = offset + i;
let byte_idx = old_idx / 8;
let bit_idx = old_idx % 8;
let is_valid = (v[byte_idx] & (1 << bit_idx)) != 0;
if is_valid {
let new_byte_idx = i / 8;
let new_bit_idx = i % 8;
new_v[new_byte_idx] |= 1 << new_bit_idx;
}
}
new_v
}
}
impl From<Vec<Option<f64>>> for ColumnVector {
fn from(v: Vec<Option<f64>>) -> Self {
let data = v.into_iter().map(|opt| opt.unwrap_or(f64::NAN)).collect();
ColumnVector::F64 { data }
}
}
impl From<Vec<Option<f32>>> for ColumnVector {
fn from(v: Vec<Option<f32>>) -> Self {
let data = v.into_iter().map(|opt| opt.unwrap_or(f32::NAN)).collect();
ColumnVector::F32 { data }
}
}
impl From<Vec<Option<i64>>> for ColumnVector {
fn from(v: Vec<Option<i64>>) -> Self {
let (data, validity) = collect_with_validity(v, 0i64);
ColumnVector::I64 { data, validity }
}
}
impl From<Vec<Option<i32>>> for ColumnVector {
fn from(v: Vec<Option<i32>>) -> Self {
let (data, validity) = collect_with_validity(v, 0i32);
ColumnVector::I32 { data, validity }
}
}
impl From<Vec<Option<u32>>> for ColumnVector {
fn from(v: Vec<Option<u32>>) -> Self {
let (data, validity) = collect_with_validity(v, 0u32);
ColumnVector::U32 { data, validity }
}
}
impl From<Vec<Option<String>>> for ColumnVector {
fn from(v: Vec<Option<String>>) -> Self {
let (data, validity) = collect_with_validity(v, String::new());
ColumnVector::String { data, validity }
}
}
impl From<Vec<Option<&str>>> for ColumnVector {
fn from(v: Vec<Option<&str>>) -> Self {
let (data, validity) = collect_with_validity(
v.into_iter().map(|opt| opt.map(|s| s.to_string())),
String::new(),
);
ColumnVector::String { data, validity }
}
}
impl From<Vec<Option<OffsetDateTime>>> for ColumnVector {
fn from(v: Vec<Option<OffsetDateTime>>) -> Self {
let (data, validity) = collect_with_validity(v, OffsetDateTime::UNIX_EPOCH);
ColumnVector::DateTime { data, validity }
}
}
impl From<Vec<f64>> for ColumnVector {
fn from(data: Vec<f64>) -> Self {
ColumnVector::F64 { data }
}
}
impl From<Vec<f32>> for ColumnVector {
fn from(data: Vec<f32>) -> Self {
ColumnVector::F32 { data }
}
}
impl From<Vec<i64>> for ColumnVector {
fn from(data: Vec<i64>) -> Self {
ColumnVector::I64 {
data,
validity: None,
}
}
}
impl From<Vec<i32>> for ColumnVector {
fn from(data: Vec<i32>) -> Self {
ColumnVector::I32 {
data,
validity: None,
}
}
}
impl From<Vec<u32>> for ColumnVector {
fn from(data: Vec<u32>) -> Self {
ColumnVector::U32 {
data,
validity: None,
}
}
}
impl From<Vec<String>> for ColumnVector {
fn from(data: Vec<String>) -> Self {
ColumnVector::String {
data,
validity: None,
}
}
}
impl From<Vec<&str>> for ColumnVector {
fn from(v: Vec<&str>) -> Self {
let data = v.into_iter().map(|s| s.to_string()).collect();
ColumnVector::String {
data,
validity: None,
}
}
}
impl From<Vec<OffsetDateTime>> for ColumnVector {
fn from(data: Vec<OffsetDateTime>) -> Self {
ColumnVector::DateTime {
data,
validity: None,
}
}
}
fn collect_with_validity<T, I>(iter: I, default: T) -> (Vec<T>, Option<Vec<u8>>)
where
I: IntoIterator<Item = Option<T>>,
T: Clone, {
let iter = iter.into_iter();
let (lower, _) = iter.size_hint();
let mut data = Vec::with_capacity(lower);
let mut validity = Vec::with_capacity(lower.div_ceil(8));
let mut has_nulls = false;
let mut current_byte = 0u8;
let mut bit_count = 0;
for opt in iter {
match opt {
Some(v) => {
data.push(v);
current_byte |= 1 << (bit_count % 8);
}
None => {
data.push(default.clone());
has_nulls = true;
}
}
bit_count += 1;
if bit_count % 8 == 0 {
validity.push(current_byte);
current_byte = 0;
}
}
if bit_count % 8 != 0 {
validity.push(current_byte);
}
(data, if has_nulls { Some(validity) } else { None })
}
pub trait IntoColumn {
fn into_column(self) -> ColumnVector;
}
impl<T> IntoColumn for T
where
T: Into<ColumnVector>,
{
#[inline]
fn into_column(self) -> ColumnVector {
self.into()
}
}
impl<Item, const N: usize> From<[Item; N]> for ColumnVector
where
Vec<Item>: Into<ColumnVector>,
Item: Clone,
{
fn from(arr: [Item; N]) -> Self {
arr.to_vec().into()
}
}
impl<Item, const N: usize> From<&[Item; N]> for ColumnVector
where
Vec<Item>: Into<ColumnVector>,
Item: Clone,
{
fn from(arr: &[Item; N]) -> Self {
arr.to_vec().into()
}
}
impl<Item> From<&[Item]> for ColumnVector
where
Vec<Item>: Into<ColumnVector>,
Item: Clone,
{
fn from(slice: &[Item]) -> Self {
slice.to_vec().into()
}
}
impl<Item> From<&Vec<Item>> for ColumnVector
where
Vec<Item>: Into<ColumnVector>,
Item: Clone,
{
fn from(v: &Vec<Item>) -> Self {
v.as_slice().into()
}
}
pub trait FromColumnVector: Sized {
fn try_from_col(col: &ColumnVector) -> Option<&[Self]>;
}
macro_rules! impl_from_col {
($t:ty, $variant:ident) => {
impl FromColumnVector for $t {
fn try_from_col(col: &ColumnVector) -> Option<&[Self]> {
match col {
ColumnVector::$variant { data, .. } => Some(data),
_ => None,
}
}
}
};
}
impl_from_col!(f64, F64);
impl_from_col!(f32, F32);
impl_from_col!(i64, I64);
impl_from_col!(i32, I32);
impl_from_col!(u32, U32);
impl_from_col!(String, String);
impl_from_col!(OffsetDateTime, DateTime);
pub struct GroupedIndices {
pub groups: Vec<(Option<String>, Vec<usize>)>,
}
#[derive(Clone, Default)]
pub struct Dataset {
pub(crate) schema: AHashMap<String, usize>,
pub(crate) columns: Vec<Arc<ColumnVector>>,
pub(crate) row_count: usize,
}
impl Dataset {
pub fn new() -> Self {
Self::default()
}
fn validate_len(&mut self, name: &str, incoming_len: usize) -> Result<(), ChartonError> {
if self.columns.is_empty() {
self.row_count = incoming_len;
Ok(())
} else if incoming_len != self.row_count {
Err(ChartonError::Data(format!(
"Inconsistent column length in '{}': expected {} rows, found {}",
name, self.row_count, incoming_len
)))
} else {
Ok(())
}
}
pub fn add_column<S, V>(&mut self, name: S, data: V) -> Result<(), ChartonError>
where
S: Into<String>,
V: Into<ColumnVector>,
{
let name_str = name.into();
let vec: ColumnVector = data.into();
self.validate_len(&name_str, vec.len())?;
if let Some(&index) = self.schema.get(&name_str) {
self.columns[index] = Arc::new(vec);
} else {
let index = self.columns.len();
self.columns.push(Arc::new(vec));
self.schema.insert(name_str, index);
}
Ok(())
}
pub fn with_column<S, V>(mut self, name: S, data: V) -> Result<Self, ChartonError>
where
S: Into<String>,
V: Into<ColumnVector>,
{
self.add_column(name, data)?;
Ok(self)
}
pub fn height(&self) -> usize {
self.row_count
}
pub fn width(&self) -> usize {
self.columns.len()
}
pub fn get_column_names(&self) -> Vec<String> {
self.schema.keys().cloned().collect()
}
pub fn column(&self, name: &str) -> Result<&ColumnVector, ChartonError> {
let index = self
.schema
.get(name)
.ok_or_else(|| ChartonError::Data(format!("Column '{}' not found in dataset", name)))?;
Ok(&self.columns[*index])
}
pub fn get_column<T: FromColumnVector>(&self, name: &str) -> Result<&[T], ChartonError> {
let index = self
.schema
.get(name)
.ok_or_else(|| ChartonError::Data(format!("Column '{}' not found", name)))?;
T::try_from_col(&self.columns[*index]).ok_or_else(|| {
ChartonError::Data(format!(
"Type mismatch: Column '{}' cannot be accessed as the requested type",
name
))
})
}
pub fn get_f64(&self, name: &str, row: usize) -> Option<f64> {
self.column(name).ok().and_then(|col| col.get_f64(row))
}
pub fn get_f64_or(&self, name: &str, row: usize, default: f64) -> f64 {
self.get_f64(name, row).unwrap_or(default)
}
pub fn get_str(&self, name: &str, row: usize) -> Option<String> {
self.column(name).ok().and_then(|col| col.get_str(row))
}
pub fn get_str_or(&self, name: &str, row: usize, default: &str) -> String {
self.get_str(name, row)
.unwrap_or_else(|| default.to_string())
}
pub fn is_null(&self, name: &str, row: usize) -> bool {
let index = match self.schema.get(name) {
Some(i) => *i,
None => return true,
};
match &*self.columns[index] {
ColumnVector::F64 { data } => data[row].is_nan(),
ColumnVector::F32 { data } => data[row].is_nan(),
ColumnVector::I64 { validity, .. }
| ColumnVector::I32 { validity, .. }
| ColumnVector::U32 { validity, .. }
| ColumnVector::String { validity, .. }
| ColumnVector::DateTime { validity, .. } => {
if let Some(v) = validity {
(v[row / 8] >> (row % 8)) & 1 == 0
} else {
false }
}
}
}
pub fn get_combined_mask(&self, column_names: &[&str]) -> Result<Vec<u8>, ChartonError> {
if self.row_count == 0 {
return Ok(Vec::new());
}
let byte_count = self.row_count.div_ceil(8);
let mut final_mask = vec![0xFFu8; byte_count];
for &name in column_names {
let col = self.column(name)?;
match col {
ColumnVector::F64 { data } => {
for (i, val) in data.iter().enumerate() {
if val.is_nan() {
final_mask[i / 8] &= !(1 << (i % 8));
}
}
}
ColumnVector::F32 { data } => {
for (i, val) in data.iter().enumerate() {
if val.is_nan() {
final_mask[i / 8] &= !(1 << (i % 8));
}
}
}
ColumnVector::I64 { validity, .. }
| ColumnVector::I32 { validity, .. }
| ColumnVector::U32 { validity, .. }
| ColumnVector::String { validity, .. }
| ColumnVector::DateTime { validity, .. } => {
if let Some(v) = validity {
for (i, byte) in v.iter().enumerate() {
final_mask[i] &= byte;
}
}
}
}
}
if !self.row_count.is_multiple_of(8) {
let last_idx = byte_count - 1;
let mask = (1 << (self.row_count % 8)) - 1;
final_mask[last_idx] &= mask;
}
Ok(final_mask)
}
pub fn group_by(&self, col_name: Option<&str>) -> GroupedIndices {
let col_vector = col_name.and_then(|name| self.column(name).ok());
let vector = match col_vector {
Some(v) => v,
None => {
return GroupedIndices {
groups: vec![(None, (0..self.row_count).collect())],
};
}
};
#[cfg(feature = "parallel")]
{
self.group_by_parallel(vector)
}
#[cfg(not(feature = "parallel"))]
{
self.group_by_serial(vector)
}
}
#[cfg(feature = "parallel")]
fn group_by_parallel(&self, vector: &ColumnVector) -> GroupedIndices {
use rayon::prelude::*;
let groups_map = (0..self.row_count)
.into_par_iter()
.fold(
|| AHashMap::<Option<String>, (usize, Vec<usize>)>::with_capacity(64),
|mut local_map, i| {
let key = vector.get_str(i);
local_map
.entry(key)
.and_modify(|(_, indices)| {
indices.push(i);
})
.or_insert((i, vec![i]));
local_map
},
)
.reduce(AHashMap::default, |mut map1, mut map2| {
for (key, (first_idx2, mut indices2)) in map2.drain() {
map1.entry(key)
.and_modify(|(first_idx1, indices1)| {
if first_idx2 < *first_idx1 {
*first_idx1 = first_idx2;
}
indices1.append(&mut indices2);
})
.or_insert((first_idx2, indices2));
}
map1
});
self.finalize_groups(groups_map)
}
#[cfg(not(feature = "parallel"))]
fn group_by_serial(&self, vector: &ColumnVector) -> GroupedIndices {
let mut groups_map = AHashMap::<Option<String>, (usize, Vec<usize>)>::with_capacity(64);
for i in 0..self.row_count {
let key = vector.get_str(i);
groups_map
.entry(key)
.and_modify(|(_, indices)| {
indices.push(i);
})
.or_insert((i, vec![i]));
}
self.finalize_groups(groups_map)
}
#[allow(clippy::type_complexity)]
fn finalize_groups(
&self,
groups_map: ahash::AHashMap<Option<String>, (usize, Vec<usize>)>,
) -> GroupedIndices {
let mut sorted_groups: Vec<(Option<String>, (usize, Vec<usize>))> =
groups_map.into_iter().collect();
sorted_groups.sort_by_key(|(_key, (first_idx, _indices))| *first_idx);
let groups = sorted_groups
.into_iter()
.map(|(key, (_first_idx, mut indices))| {
indices.sort_unstable();
(key, indices)
})
.collect();
GroupedIndices { groups }
}
#[cfg(feature = "arrow")]
pub fn from_record_batches(
batches: &[arrow::record_batch::RecordBatch],
) -> Result<Self, ChartonError> {
use arrow::array::{Array, Float32Array, Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, TimeUnit};
if batches.is_empty() {
return Ok(Self::new());
}
let schema = batches[0].schema();
let mut dataset = Self::new();
for (i, field) in schema.fields().iter().enumerate() {
let column_arrays: Vec<&dyn arrow::array::Array> =
batches.iter().map(|b| b.column(i).as_ref()).collect();
let merged_array = arrow::compute::concat(&column_arrays)
.map_err(|e| ChartonError::Data(format!("Arrow concat error: {}", e)))?;
let column_vector = ColumnVector::from_arrow(merged_array.as_ref())?;
dataset.add_column(field.name(), column_vector)?;
}
Ok(dataset)
}
pub fn head(&self, n: usize) -> Self {
let actual_n = n.min(self.row_count);
self.slice(0, actual_n)
}
pub fn tail(&self, n: usize) -> Self {
let actual_n = n.min(self.row_count);
let offset = self.row_count - actual_n;
self.slice(offset, actual_n)
}
pub fn slice(&self, offset: usize, len: usize) -> Self {
if len == 0 {
return Self::new();
}
let new_columns: Vec<Arc<ColumnVector>> = self
.columns
.iter()
.map(|col| Arc::new(col.slice(offset, len)))
.collect();
Self {
schema: self.schema.clone(), columns: new_columns,
row_count: len,
}
}
fn debug_cell(&self, col_name: &str, row: usize) -> String {
if self.is_null(col_name, row) {
return "null".to_string();
}
let idx = *self.schema.get(col_name).expect("Schema integrity error");
match &*self.columns[idx] {
ColumnVector::F64 { data } => format!("{:.4}", data[row]),
ColumnVector::F32 { data } => format!("{:.4}", data[row]),
ColumnVector::I64 { data, .. } => data[row].to_string(),
ColumnVector::I32 { data, .. } => data[row].to_string(),
ColumnVector::U32 { data, .. } => data[row].to_string(),
ColumnVector::String { data, .. } => {
let s = &data[row];
if s.chars().count() > 10 {
let safe_index = s
.char_indices()
.nth(7)
.map(|(idx, _char)| idx)
.unwrap_or(s.len());
format!("{}...", &s[..safe_index])
} else {
s.clone()
}
}
ColumnVector::DateTime { data, .. } => data[row]
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "err_date".to_string()),
}
}
fn render_to_format(
&self,
f: &mut fmt::Formatter<'_>,
offset: usize,
len: usize,
) -> fmt::Result {
writeln!(
f,
"Dataset View: rows {}..{} (Total {} rows)",
offset,
offset + len,
self.row_count
)?;
let mut names: Vec<_> = self.schema.keys().collect();
names.sort_by_key(|name| self.schema.get(*name).expect("Schema integrity error"));
let header = names
.iter()
.map(|n| format!("{:<12}", n))
.collect::<Vec<_>>()
.join("| ");
writeln!(f, "{}", header)?;
let types_row = names
.iter()
.map(|name| {
let dtype = self
.column(name)
.map(|col| col.dtype_name())
.unwrap_or("unknown");
let type_label = format!("({})", dtype);
format!("{:<12}", type_label)
})
.collect::<Vec<_>>()
.join("| ");
writeln!(f, "{}", types_row)?;
writeln!(f, "{}", "-".repeat(header.len()))?;
for row in offset..(offset + len) {
let mut row_str = Vec::new();
for name in &names {
let cell = self.debug_cell(name, row);
row_str.push(format!("{:<12}", cell));
}
writeln!(f, "{}", row_str.join("| "))?;
}
Ok(())
}
pub fn view(&self, offset: usize, len: usize) -> DatasetView<'_> {
let safe_len = if offset >= self.row_count {
0
} else {
len.min(self.row_count - offset)
};
DatasetView {
ds: self,
offset,
len: safe_len,
}
}
}
pub struct DatasetView<'a> {
pub(crate) ds: &'a Dataset,
pub(crate) offset: usize,
pub(crate) len: usize,
}
impl<'a> std::fmt::Debug for DatasetView<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.ds.render_to_format(f, self.offset, self.len)
}
}
impl fmt::Debug for Dataset {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.view(0, 10).fmt(f)?;
if self.row_count > 10 {
writeln!(f, "... and {} more rows", self.row_count - 10)?;
}
Ok(())
}
}
pub trait ToDataset {
fn to_dataset(self) -> Result<Dataset, ChartonError>;
}
impl<I, S, V> ToDataset for I
where
I: IntoIterator<Item = (S, V)>,
S: Into<String>,
V: Into<ColumnVector>,
{
fn to_dataset(self) -> Result<Dataset, ChartonError> {
let mut ds = Dataset::new();
for (name, data) in self {
ds.add_column(name, data)?;
}
Ok(ds)
}
}
impl ToDataset for Dataset {
#[inline]
fn to_dataset(self) -> Result<Dataset, ChartonError> {
Ok(self)
}
}
impl ToDataset for &Dataset {
#[inline]
fn to_dataset(self) -> Result<Dataset, ChartonError> {
Ok(self.clone())
}
}
#[derive(Copy, Clone)]
pub struct RowAccessor<'a> {
ds: &'a Dataset,
current_row: usize,
}
impl<'a> RowAccessor<'a> {
pub fn new(ds: &'a Dataset, row: usize) -> Self {
Self {
ds,
current_row: row,
}
}
#[inline]
pub fn val(&self, field: &str) -> Option<f64> {
self.ds.get_f64(field, self.current_row)
}
#[inline]
pub fn str(&self, field: &str) -> Option<String> {
self.ds.get_str(field, self.current_row)
}
pub fn index(&self) -> usize {
self.current_row
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AggregateOp {
#[default]
Sum,
Mean,
Median,
Min,
Max,
Count,
}
impl From<&str> for AggregateOp {
fn from(s: &str) -> Self {
match s.to_lowercase().as_str() {
"mean" | "avg" => Self::Mean,
"sum" => Self::Sum,
"min" => Self::Min,
"max" => Self::Max,
"count" | "n" => Self::Count,
"median" => Self::Median,
_ => Self::Sum,
}
}
}
impl AggregateOp {
pub fn aggregate_by_index(&self, col: &ColumnVector, indices: &[usize]) -> f64 {
if indices.is_empty() {
return f64::NAN;
}
match self {
AggregateOp::Count => indices.len() as f64,
AggregateOp::Sum => indices.iter().filter_map(|&i| col.get_f64(i)).sum(),
AggregateOp::Mean => {
let mut sum = 0.0;
let mut count = 0;
for &i in indices {
if let Some(v) = col.get_f64(i) {
sum += v;
count += 1;
}
}
if count > 0 {
sum / count as f64
} else {
f64::NAN
}
}
AggregateOp::Min => indices
.iter()
.filter_map(|&i| col.get_f64(i))
.fold(f64::INFINITY, f64::min),
AggregateOp::Max => indices
.iter()
.filter_map(|&i| col.get_f64(i))
.fold(f64::NEG_INFINITY, f64::max),
AggregateOp::Median => {
let vals = self.extract_and_sort(col, indices);
get_quantile(&vals, 0.5)
}
}
}
fn extract_and_sort(&self, col: &ColumnVector, indices: &[usize]) -> Vec<f64> {
let mut vals: Vec<f64> = indices.iter().filter_map(|&i| col.get_f64(i)).collect();
vals.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
vals
}
}
pub fn get_quantile(sorted_data: &[f64], q: f64) -> f64 {
let len = sorted_data.len();
if len == 0 {
return f64::NAN;
}
let pos = q * (len - 1) as f64;
let base = pos.floor() as usize;
let fract = pos - base as f64;
if base + 1 < len {
sorted_data[base] + fract * (sorted_data[base + 1] - sorted_data[base])
} else {
sorted_data[base]
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dataset_construction_methods() {
use time::macros::datetime;
let mut ds_manual = Dataset::new();
ds_manual.add_column("id", vec![1i64, 2, 3]).unwrap();
ds_manual
.add_column("value", vec![Some(10.1), None, Some(30.3)])
.unwrap();
assert_eq!(ds_manual.row_count, 3);
assert!(ds_manual.is_null("value", 1));
let raw_data = vec![
(
"name",
vec![Some("A".to_string()), Some("B".to_string())].into_column(),
),
("score", vec![100i64, 200i64].into_column()),
];
let ds_from_tuples = raw_data
.to_dataset()
.expect("Should convert from tuples successfully");
assert_eq!(ds_from_tuples.row_count, 2);
assert_eq!(ds_from_tuples.get_str("name", 0).unwrap(), "A");
let complex_data = vec![
(
"timestamp",
vec![
datetime!(2026-03-30 00:00 UTC),
datetime!(2026-03-31 00:00 UTC),
]
.into_column(),
),
("f32_val", vec![1.1f32, 2.2f32].into_column()),
("tags", vec![Some("heavy".to_string()), None].into_column()),
];
let ds_complex = complex_data
.to_dataset()
.expect("Should handle heterogeneous types");
assert_eq!(ds_complex.row_count, 2);
assert!(!ds_complex.is_null("timestamp", 0));
assert!(ds_complex.is_null("tags", 1));
println!("\n--- Construction Method 3 Output ---");
println!("{:?}", ds_complex);
let ds_fluent = Dataset::new()
.with_column("x", vec![10.0, 20.0, 30.0])
.unwrap()
.with_column("y", vec![Some(100i64), None, Some(300i64)])
.unwrap()
.with_column("category", vec!["A", "B", "C"])
.unwrap();
assert_eq!(ds_fluent.row_count, 3);
assert_eq!(ds_fluent.width(), 3);
assert!(ds_fluent.is_null("y", 1)); assert!(!ds_fluent.is_null("x", 1));
println!("\n--- Construction Method 4 (Fluent) Output ---");
println!("{:?}", ds_fluent);
}
#[test]
fn test_get_column_and_nan_handling() {
let mut ds = Dataset::new();
let prices = vec![10.5, f64::NAN, 30.2];
ds.add_column("price", prices).unwrap();
let col = ds.get_column::<f64>("price").expect("Column should exist");
assert_eq!(col.len(), 3);
assert_eq!(col[0], 10.5);
assert!(col[1].is_nan());
let wrong_type = ds.get_column::<i64>("price");
assert!(wrong_type.is_err());
}
#[test]
fn test_get_value_with_bitmaps() {
let mut ds = Dataset::new();
let ids = vec![Some(100), None, Some(300)];
ds.add_column("id", ids).unwrap();
assert_eq!(ds.get_f64("id", 0).unwrap(), 100.0);
assert!(!ds.is_null("id", 0));
assert!(ds.is_null("id", 1));
assert_eq!(ds.get_f64("id", 2).unwrap(), 300.0);
assert!(ds.is_null("non_existent", 0));
}
#[test]
fn test_dataset_display_and_truncation() {
let mut ds = Dataset::new();
ds.add_column("id", vec![Some(1), Some(2)]).unwrap();
ds.add_column("city", vec![Some("San Francisco"), None])
.unwrap();
ds.add_column("value", vec![1.234567, 8.9]).unwrap();
println!("\n--- Dataset Debug Output ---");
println!("{:?}", ds);
println!("----------------------------");
assert_eq!(ds.row_count, 2);
}
#[cfg(feature = "arrow")]
mod arrow_tests {
use super::*;
use arrow::array::{Float64Array, Int64Array, StringArray, TimestampMillisecondArray};
#[test]
fn test_arrow_ingestion() {
let f64_array = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
let col_f64 = ColumnVector::from_arrow(&f64_array).expect("F64 ingestion failed");
if let ColumnVector::F64 { data } = col_f64 {
println!("F64 Data (converted): {:?}", data);
assert_eq!(data[0], 1.1);
assert!(data[1].is_nan()); assert_eq!(data[2], 3.3);
}
let i64_array = Int64Array::from(vec![Some(10), None, Some(30)]);
let col_i64 = ColumnVector::from_arrow(&i64_array).expect("I64 ingestion failed");
if let ColumnVector::I64 { data, validity } = col_i64 {
println!("I64 Data: {:?}, Validity Mask: {:?}", data, validity);
assert_eq!(data, vec![10, 0, 30]);
assert!(validity.is_some());
assert_eq!(validity.unwrap()[0], 0b101);
}
let str_array = StringArray::from(vec![Some("Charton"), None, Some("Rust")]);
let col_str = ColumnVector::from_arrow(&str_array).expect("String ingestion failed");
if let ColumnVector::String { data, validity } = col_str {
println!("String Data: {:?}, Validity Mask: {:?}", data, validity);
assert_eq!(data[0], "Charton");
assert_eq!(data[1], ""); assert_eq!(data[2], "Rust");
assert!(validity.is_some());
}
let ts_array = TimestampMillisecondArray::from(vec![Some(1711872000000), None]);
let col_ts = ColumnVector::from_arrow(&ts_array).expect("Timestamp ingestion failed");
if let ColumnVector::DateTime { data, validity } = col_ts {
println!("DateTime Data: {:?}, Validity Mask: {:?}", data, validity);
assert_eq!(data[0].year(), 2024);
assert_eq!(data[0].month(), time::Month::March);
assert_eq!(data[1].year(), 1970);
assert!(validity.is_some());
}
}
}
}