Skip to main content

chartml_core/
data.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow::array::{
5    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, Float32Array,
6    Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringBuilder,
7    StringArray, Time64MicrosecondArray, Time64NanosecondArray, Time32MillisecondArray,
8    Time32SecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
9    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
10    UInt64Array,
11};
12use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
13
14use crate::error::ChartError;
15
16/// A row of data — a map of field names to values.
17/// Used for inline YAML data and backward compatibility.
18pub type Row = HashMap<String, serde_json::Value>;
19
20// ── Legacy free functions (kept for backward compat / built-in transforms) ──
21
22/// Extract an f64 value from a Row by field name.
23/// Handles both Number and String (parsed) values.
24pub fn get_f64(row: &Row, field: &str) -> Option<f64> {
25    match row.get(field)? {
26        serde_json::Value::Number(n) => n.as_f64(),
27        serde_json::Value::String(s) => s.parse::<f64>().ok(),
28        _ => None,
29    }
30}
31
32/// Extract a string value from a Row by field name.
33pub fn get_string(row: &Row, field: &str) -> Option<String> {
34    match row.get(field)? {
35        serde_json::Value::String(s) => Some(s.clone()),
36        serde_json::Value::Number(n) => Some(n.to_string()),
37        serde_json::Value::Bool(b) => Some(b.to_string()),
38        serde_json::Value::Null => None,
39        other => Some(other.to_string()),
40    }
41}
42
43/// Compute the extent (min, max) of a numeric field across rows.
44pub fn extent_rows(data: &[Row], field: &str) -> Option<(f64, f64)> {
45    let values: Vec<f64> = data.iter().filter_map(|row| get_f64(row, field)).collect();
46    if values.is_empty() {
47        return None;
48    }
49    let min = values.iter().cloned().fold(f64::INFINITY, f64::min);
50    let max = values.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
51    Some((min, max))
52}
53
54/// Sum a numeric field across rows.
55pub fn sum_rows(data: &[Row], field: &str) -> f64 {
56    data.iter().filter_map(|row| get_f64(row, field)).sum()
57}
58
59/// Group rows by a field value.
60pub fn group_by_rows<'a>(data: &'a [Row], field: &str) -> HashMap<String, Vec<&'a Row>> {
61    let mut groups: HashMap<String, Vec<&'a Row>> = HashMap::new();
62    for row in data {
63        if let Some(key) = get_string(row, field) {
64            groups.entry(key).or_default().push(row);
65        }
66    }
67    groups
68}
69
70/// Get unique values for a field, in order of first appearance.
71pub fn unique_values_rows(data: &[Row], field: &str) -> Vec<String> {
72    let mut seen = std::collections::HashSet::new();
73    let mut result = Vec::new();
74    for row in data {
75        if let Some(val) = get_string(row, field) {
76            if seen.insert(val.clone()) {
77                result.push(val);
78            }
79        }
80    }
81    result
82}
83
84// Keep old names as aliases for backward compatibility
85pub use extent_rows as extent;
86pub use sum_rows as sum;
87pub use group_by_rows as group_by;
88pub use unique_values_rows as unique_values;
89
90// ── DataTable: Arrow-backed columnar data ──
91
92/// Type-preserving columnar data backed by Arrow RecordBatch.
93///
94/// Provides row-oriented accessors for renderer compatibility while
95/// maintaining full Arrow type fidelity (timestamps, dates, decimals, etc.)
96/// for the transform pipeline.
97#[derive(Debug, Clone)]
98pub struct DataTable {
99    batch: RecordBatch,
100    /// Column name → column index for fast lookup.
101    field_index: HashMap<String, usize>,
102}
103
104impl DataTable {
105    /// Wrap an existing Arrow RecordBatch.
106    pub fn from_record_batch(batch: RecordBatch) -> Self {
107        let field_index = batch
108            .schema()
109            .fields()
110            .iter()
111            .enumerate()
112            .map(|(i, f)| (f.name().clone(), i))
113            .collect();
114        Self { batch, field_index }
115    }
116
117    /// Convert JSON rows (from inline YAML data) into a DataTable.
118    /// Type inference: Numbers → Float64, Booleans → Boolean, Strings → Utf8.
119    pub fn from_rows(rows: &[Row]) -> Result<Self, ChartError> {
120        if rows.is_empty() {
121            let schema = Arc::new(Schema::new(Vec::<Field>::new()));
122            let batch = RecordBatch::new_empty(schema);
123            return Ok(Self::from_record_batch(batch));
124        }
125
126        // Collect column names preserving first-appearance order, then sort for determinism
127        let mut column_names: Vec<String> = Vec::new();
128        let mut seen = std::collections::HashSet::new();
129        for row in rows {
130            for key in row.keys() {
131                if seen.insert(key.clone()) {
132                    column_names.push(key.clone());
133                }
134            }
135        }
136        column_names.sort();
137
138        // Infer types
139        let mut col_types: Vec<InferredType> = vec![InferredType::Null; column_names.len()];
140        for row in rows {
141            for (i, name) in column_names.iter().enumerate() {
142                if let Some(val) = row.get(name) {
143                    let val_type = match val {
144                        serde_json::Value::Number(_) => InferredType::Float64,
145                        serde_json::Value::Bool(_) => InferredType::Boolean,
146                        serde_json::Value::String(_) => InferredType::Utf8,
147                        serde_json::Value::Null => InferredType::Null,
148                        _ => InferredType::Utf8,
149                    };
150                    col_types[i] = merge_inferred(col_types[i], val_type);
151                }
152            }
153        }
154
155        // Null → Utf8
156        for t in &mut col_types {
157            if *t == InferredType::Null {
158                *t = InferredType::Utf8;
159            }
160        }
161
162        // Build schema
163        let fields: Vec<Field> = column_names
164            .iter()
165            .zip(col_types.iter())
166            .map(|(name, typ)| {
167                let dt = match typ {
168                    InferredType::Float64 => DataType::Float64,
169                    InferredType::Boolean => DataType::Boolean,
170                    InferredType::Utf8 | InferredType::Null => DataType::Utf8,
171                };
172                Field::new(name, dt, true)
173            })
174            .collect();
175        let schema = Arc::new(Schema::new(fields));
176
177        // Build arrays
178        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_names.len());
179        for (i, name) in column_names.iter().enumerate() {
180            let arr: ArrayRef = match col_types[i] {
181                InferredType::Float64 => {
182                    let values: Vec<Option<f64>> = rows
183                        .iter()
184                        .map(|row| {
185                            row.get(name).and_then(|v| match v {
186                                serde_json::Value::Number(n) => n.as_f64(),
187                                serde_json::Value::String(s) => s.parse::<f64>().ok(),
188                                _ => None,
189                            })
190                        })
191                        .collect();
192                    Arc::new(Float64Array::from(values))
193                }
194                InferredType::Boolean => {
195                    let values: Vec<Option<bool>> = rows
196                        .iter()
197                        .map(|row| {
198                            row.get(name).and_then(|v| match v {
199                                serde_json::Value::Bool(b) => Some(*b),
200                                _ => None,
201                            })
202                        })
203                        .collect();
204                    Arc::new(BooleanArray::from(values))
205                }
206                InferredType::Utf8 | InferredType::Null => {
207                    let mut builder = StringBuilder::new();
208                    for row in rows {
209                        match row.get(name) {
210                            Some(serde_json::Value::String(s)) => builder.append_value(s),
211                            Some(serde_json::Value::Number(n)) => {
212                                builder.append_value(n.to_string())
213                            }
214                            Some(serde_json::Value::Bool(b)) => {
215                                builder.append_value(b.to_string())
216                            }
217                            Some(serde_json::Value::Null) | None => builder.append_null(),
218                            Some(other) => builder.append_value(other.to_string()),
219                        }
220                    }
221                    Arc::new(builder.finish())
222                }
223            };
224            arrays.push(arr);
225        }
226
227        let batch = RecordBatch::try_new(schema, arrays)
228            .map_err(|e| ChartError::DataError(format!("Failed to create RecordBatch: {}", e)))?;
229        Ok(Self::from_record_batch(batch))
230    }
231
232    /// Deserialize from Arrow IPC bytes.
233    pub fn from_ipc_bytes(bytes: &[u8]) -> Result<Self, ChartError> {
234        use arrow::ipc::reader::StreamReader;
235        use std::io::Cursor;
236
237        let cursor = Cursor::new(bytes);
238        let reader = StreamReader::try_new(cursor, None)
239            .map_err(|e| ChartError::DataError(format!("Failed to read Arrow IPC: {}", e)))?;
240
241        let schema = reader.schema();
242        let mut batches = Vec::new();
243        for batch_result in reader {
244            let batch = batch_result.map_err(|e| {
245                ChartError::DataError(format!("Failed to read Arrow batch: {}", e))
246            })?;
247            batches.push(batch);
248        }
249
250        if batches.is_empty() {
251            return Ok(Self::from_record_batch(RecordBatch::new_empty(schema)));
252        }
253
254        if batches.len() == 1 {
255            return Ok(Self::from_record_batch(batches.remove(0)));
256        }
257
258        // Concatenate multiple batches
259        let batch = arrow::compute::concat_batches(&schema, &batches)
260            .map_err(|e| ChartError::DataError(format!("Failed to concat batches: {}", e)))?;
261        Ok(Self::from_record_batch(batch))
262    }
263
264    /// Serialize to Arrow IPC bytes.
265    pub fn to_ipc_bytes(&self) -> Result<Vec<u8>, ChartError> {
266        use arrow::ipc::writer::StreamWriter;
267
268        let mut buf = Vec::new();
269        {
270            let mut writer = StreamWriter::try_new(&mut buf, &self.batch.schema())
271                .map_err(|e| ChartError::DataError(format!("Failed to create IPC writer: {}", e)))?;
272            writer.write(&self.batch).map_err(|e| {
273                ChartError::DataError(format!("Failed to write Arrow batch: {}", e))
274            })?;
275            writer.finish().map_err(|e| {
276                ChartError::DataError(format!("Failed to finish IPC stream: {}", e))
277            })?;
278        }
279        Ok(buf)
280    }
281
282    // ── Accessors ──
283
284    /// Number of rows.
285    pub fn num_rows(&self) -> usize {
286        self.batch.num_rows()
287    }
288
289    /// Number of columns.
290    pub fn num_columns(&self) -> usize {
291        self.batch.num_columns()
292    }
293
294    /// Whether the table has no rows.
295    pub fn is_empty(&self) -> bool {
296        self.batch.num_rows() == 0
297    }
298
299    /// Get the underlying RecordBatch.
300    pub fn record_batch(&self) -> &RecordBatch {
301        &self.batch
302    }
303
304    /// Consume self and return the owned RecordBatch.
305    pub fn into_record_batch(self) -> RecordBatch {
306        self.batch
307    }
308
309    /// Get the Arrow schema.
310    pub fn schema(&self) -> Arc<Schema> {
311        self.batch.schema()
312    }
313
314    /// Get a column by name.
315    fn column(&self, field: &str) -> Option<&ArrayRef> {
316        self.field_index.get(field).map(|&i| self.batch.column(i))
317    }
318
319    /// Extract an f64 value from a specific row and field.
320    /// Handles all numeric Arrow types, Date32 (days since epoch), and Timestamps (epoch millis).
321    pub fn get_f64(&self, row: usize, field: &str) -> Option<f64> {
322        let col = self.column(field)?;
323        if col.is_null(row) {
324            return None;
325        }
326        arrow_to_f64(col, row)
327    }
328
329    /// Extract a string value from a specific row and field.
330    /// Formats temporal types as ISO strings, numbers as decimal strings.
331    pub fn get_string(&self, row: usize, field: &str) -> Option<String> {
332        let col = self.column(field)?;
333        if col.is_null(row) {
334            return None;
335        }
336        arrow_to_string(col, row)
337    }
338
339    /// Get unique string values for a field, preserving first-appearance order.
340    pub fn unique_values(&self, field: &str) -> Vec<String> {
341        let col = match self.column(field) {
342            Some(c) => c,
343            None => return Vec::new(),
344        };
345        let mut seen = std::collections::HashSet::new();
346        let mut result = Vec::new();
347        for i in 0..self.batch.num_rows() {
348            if col.is_null(i) {
349                continue;
350            }
351            if let Some(val) = arrow_to_string(col, i) {
352                if seen.insert(val.clone()) {
353                    result.push(val);
354                }
355            }
356        }
357        result
358    }
359
360    /// Get all string values for a field, preserving row order (including duplicates).
361    pub fn all_values(&self, field: &str) -> Vec<String> {
362        let col = match self.column(field) {
363            Some(c) => c,
364            None => return Vec::new(),
365        };
366        let mut result = Vec::new();
367        for i in 0..self.batch.num_rows() {
368            if col.is_null(i) {
369                continue;
370            }
371            if let Some(val) = arrow_to_string(col, i) {
372                result.push(val);
373            }
374        }
375        result
376    }
377
378    /// Compute the extent (min, max) of a numeric field.
379    pub fn extent(&self, field: &str) -> Option<(f64, f64)> {
380        let col = self.column(field)?;
381        let mut min = f64::INFINITY;
382        let mut max = f64::NEG_INFINITY;
383        let mut found = false;
384        for i in 0..self.batch.num_rows() {
385            if col.is_null(i) {
386                continue;
387            }
388            if let Some(v) = arrow_to_f64(col, i) {
389                found = true;
390                if v < min {
391                    min = v;
392                }
393                if v > max {
394                    max = v;
395                }
396            }
397        }
398        if found {
399            Some((min, max))
400        } else {
401            None
402        }
403    }
404
405    /// Sum a numeric field across all rows.
406    pub fn sum(&self, field: &str) -> f64 {
407        let col = match self.column(field) {
408            Some(c) => c,
409            None => return 0.0,
410        };
411        let mut total = 0.0;
412        for i in 0..self.batch.num_rows() {
413            if !col.is_null(i) {
414                if let Some(v) = arrow_to_f64(col, i) {
415                    total += v;
416                }
417            }
418        }
419        total
420    }
421
422    /// Group rows by a field value, returning a map of group key → DataTable.
423    pub fn group_by(&self, field: &str) -> HashMap<String, DataTable> {
424        let col = match self.column(field) {
425            Some(c) => c,
426            None => return HashMap::new(),
427        };
428
429        // Collect indices per group
430        let mut group_indices: HashMap<String, Vec<u32>> = HashMap::new();
431        let mut key_order: Vec<String> = Vec::new();
432        let mut seen_keys = std::collections::HashSet::new();
433
434        for i in 0..self.batch.num_rows() {
435            if col.is_null(i) {
436                continue;
437            }
438            if let Some(key) = arrow_to_string(col, i) {
439                if seen_keys.insert(key.clone()) {
440                    key_order.push(key.clone());
441                }
442                group_indices.entry(key).or_default().push(i as u32);
443            }
444        }
445
446        // Build sub-tables using arrow::compute::take
447        let mut result = HashMap::new();
448        for key in key_order {
449            if let Some(indices) = group_indices.get(&key) {
450                let indices_arr = UInt32Array::from(indices.clone());
451                let take_result: Result<Vec<ArrayRef>, _> = self
452                    .batch
453                    .columns()
454                    .iter()
455                    .map(|col| arrow::compute::take(col.as_ref(), &indices_arr, None))
456                    .collect();
457                if let Ok(columns) = take_result {
458                    if let Ok(sub_batch) = RecordBatch::try_new(self.batch.schema(), columns) {
459                        result.insert(key, DataTable::from_record_batch(sub_batch));
460                    }
461                }
462            }
463        }
464        result
465    }
466
467    /// Check if a field exists in the schema.
468    pub fn has_field(&self, field: &str) -> bool {
469        self.field_index.contains_key(field)
470    }
471
472    /// Get all field names.
473    pub fn field_names(&self) -> Vec<String> {
474        self.batch
475            .schema()
476            .fields()
477            .iter()
478            .map(|f| f.name().clone())
479            .collect()
480    }
481
482    /// Convert the Arrow RecordBatch back to JSON rows.
483    /// Used for the built-in sync transform fallback path which operates on `Vec<Row>`.
484    pub fn to_rows(&self) -> Vec<Row> {
485        let num_rows = self.batch.num_rows();
486        let schema = self.batch.schema();
487        let fields = schema.fields();
488
489        let mut rows = Vec::with_capacity(num_rows);
490        for row_idx in 0..num_rows {
491            let mut row = Row::new();
492            for (col_idx, field) in fields.iter().enumerate() {
493                let col = self.batch.column(col_idx);
494                if col.is_null(row_idx) {
495                    row.insert(field.name().clone(), serde_json::Value::Null);
496                    continue;
497                }
498                let value = match col.data_type() {
499                    DataType::Boolean => {
500                        let v = col.as_any().downcast_ref::<BooleanArray>()
501                            .expect("DataType::Boolean arm guarantees BooleanArray")
502                            .value(row_idx);
503                        serde_json::json!(v)
504                    }
505                    DataType::Float64 | DataType::Float32 |
506                    DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 |
507                    DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 |
508                    DataType::Decimal128(_, _) => {
509                        if let Some(v) = arrow_to_f64(col, row_idx) {
510                            serde_json::json!(v)
511                        } else {
512                            serde_json::Value::Null
513                        }
514                    }
515                    // Dates and timestamps → preserve as ISO strings, not raw f64
516                    DataType::Date32 | DataType::Date64 |
517                    DataType::Timestamp(_, _) => {
518                        if let Some(s) = arrow_to_string(col, row_idx) {
519                            serde_json::Value::String(s)
520                        } else {
521                            serde_json::Value::Null
522                        }
523                    }
524                    _ => {
525                        if let Some(s) = arrow_to_string(col, row_idx) {
526                            serde_json::Value::String(s)
527                        } else {
528                            serde_json::Value::Null
529                        }
530                    }
531                };
532                row.insert(field.name().clone(), value);
533            }
534            rows.push(row);
535        }
536        rows
537    }
538}
539
540// ── Arrow value extraction helpers ──
541
542/// Extract an f64 from any Arrow array at the given index.
543fn arrow_to_f64(col: &ArrayRef, idx: usize) -> Option<f64> {
544    match col.data_type() {
545        DataType::Float64 => {
546            Some(col.as_any().downcast_ref::<Float64Array>()
547                .expect("DataType::Float64 arm guarantees Float64Array")
548                .value(idx))
549        }
550        DataType::Float32 => {
551            Some(col.as_any().downcast_ref::<Float32Array>()
552                .expect("DataType::Float32 arm guarantees Float32Array")
553                .value(idx) as f64)
554        }
555        DataType::Int64 => {
556            Some(col.as_any().downcast_ref::<Int64Array>()
557                .expect("DataType::Int64 arm guarantees Int64Array")
558                .value(idx) as f64)
559        }
560        DataType::Int32 => {
561            Some(col.as_any().downcast_ref::<Int32Array>()
562                .expect("DataType::Int32 arm guarantees Int32Array")
563                .value(idx) as f64)
564        }
565        DataType::Int16 => {
566            Some(col.as_any().downcast_ref::<Int16Array>()
567                .expect("DataType::Int16 arm guarantees Int16Array")
568                .value(idx) as f64)
569        }
570        DataType::Int8 => {
571            Some(col.as_any().downcast_ref::<Int8Array>()
572                .expect("DataType::Int8 arm guarantees Int8Array")
573                .value(idx) as f64)
574        }
575        DataType::UInt64 => {
576            Some(col.as_any().downcast_ref::<UInt64Array>()
577                .expect("DataType::UInt64 arm guarantees UInt64Array")
578                .value(idx) as f64)
579        }
580        DataType::UInt32 => {
581            Some(col.as_any().downcast_ref::<UInt32Array>()
582                .expect("DataType::UInt32 arm guarantees UInt32Array")
583                .value(idx) as f64)
584        }
585        DataType::UInt16 => {
586            Some(col.as_any().downcast_ref::<UInt16Array>()
587                .expect("DataType::UInt16 arm guarantees UInt16Array")
588                .value(idx) as f64)
589        }
590        DataType::UInt8 => {
591            Some(col.as_any().downcast_ref::<UInt8Array>()
592                .expect("DataType::UInt8 arm guarantees UInt8Array")
593                .value(idx) as f64)
594        }
595        DataType::Boolean => {
596            let v = col.as_any().downcast_ref::<BooleanArray>()
597                .expect("DataType::Boolean arm guarantees BooleanArray")
598                .value(idx);
599            Some(if v { 1.0 } else { 0.0 })
600        }
601        DataType::Date32 => {
602            // Days since epoch
603            Some(col.as_any().downcast_ref::<Date32Array>()
604                .expect("DataType::Date32 arm guarantees Date32Array")
605                .value(idx) as f64)
606        }
607        DataType::Date64 => {
608            // Milliseconds since epoch
609            Some(col.as_any().downcast_ref::<Date64Array>()
610                .expect("DataType::Date64 arm guarantees Date64Array")
611                .value(idx) as f64)
612        }
613        DataType::Timestamp(unit, _) => {
614            let raw = match unit {
615                TimeUnit::Second => col
616                    .as_any()
617                    .downcast_ref::<TimestampSecondArray>()
618                    .expect("Timestamp(Second) arm guarantees TimestampSecondArray")
619                    .value(idx),
620                TimeUnit::Millisecond => col
621                    .as_any()
622                    .downcast_ref::<TimestampMillisecondArray>()
623                    .expect("Timestamp(Millisecond) arm guarantees TimestampMillisecondArray")
624                    .value(idx),
625                TimeUnit::Microsecond => col
626                    .as_any()
627                    .downcast_ref::<TimestampMicrosecondArray>()
628                    .expect("Timestamp(Microsecond) arm guarantees TimestampMicrosecondArray")
629                    .value(idx),
630                TimeUnit::Nanosecond => col
631                    .as_any()
632                    .downcast_ref::<TimestampNanosecondArray>()
633                    .expect("Timestamp(Nanosecond) arm guarantees TimestampNanosecondArray")
634                    .value(idx),
635            };
636            // Convert to epoch milliseconds for consistent f64 representation
637            let millis = match unit {
638                TimeUnit::Second => raw * 1000,
639                TimeUnit::Millisecond => raw,
640                TimeUnit::Microsecond => raw / 1000,
641                TimeUnit::Nanosecond => raw / 1_000_000,
642            };
643            Some(millis as f64)
644        }
645        DataType::Decimal128(_, scale) => {
646            let raw = col
647                .as_any()
648                .downcast_ref::<Decimal128Array>()
649                .expect("DataType::Decimal128 arm guarantees Decimal128Array")
650                .value(idx);
651            let divisor = 10_f64.powi(*scale as i32);
652            Some(raw as f64 / divisor)
653        }
654        DataType::Utf8 => {
655            // Try parsing string as number
656            let s = col.as_any().downcast_ref::<StringArray>()
657                .expect("DataType::Utf8 arm guarantees StringArray")
658                .value(idx);
659            s.parse::<f64>().ok()
660        }
661        _ => None,
662    }
663}
664
665/// Extract a string from any Arrow array at the given index.
666fn arrow_to_string(col: &ArrayRef, idx: usize) -> Option<String> {
667    match col.data_type() {
668        DataType::Utf8 => {
669            Some(
670                col.as_any()
671                    .downcast_ref::<StringArray>()
672                    .expect("DataType::Utf8 arm guarantees StringArray")
673                    .value(idx)
674                    .to_string(),
675            )
676        }
677        DataType::LargeUtf8 => {
678            Some(
679                col.as_any()
680                    .downcast_ref::<arrow::array::LargeStringArray>()
681                    .expect("DataType::LargeUtf8 arm guarantees LargeStringArray")
682                    .value(idx)
683                    .to_string(),
684            )
685        }
686        DataType::Float64 => {
687            let v = col.as_any().downcast_ref::<Float64Array>()
688                .expect("DataType::Float64 arm guarantees Float64Array")
689                .value(idx);
690            Some(format_f64(v))
691        }
692        DataType::Float32 => {
693            let v = col.as_any().downcast_ref::<Float32Array>()
694                .expect("DataType::Float32 arm guarantees Float32Array")
695                .value(idx) as f64;
696            Some(format_f64(v))
697        }
698        DataType::Int64 => {
699            Some(col.as_any().downcast_ref::<Int64Array>()
700                .expect("DataType::Int64 arm guarantees Int64Array")
701                .value(idx).to_string())
702        }
703        DataType::Int32 => {
704            Some(col.as_any().downcast_ref::<Int32Array>()
705                .expect("DataType::Int32 arm guarantees Int32Array")
706                .value(idx).to_string())
707        }
708        DataType::Int16 => {
709            Some(col.as_any().downcast_ref::<Int16Array>()
710                .expect("DataType::Int16 arm guarantees Int16Array")
711                .value(idx).to_string())
712        }
713        DataType::Int8 => {
714            Some(col.as_any().downcast_ref::<Int8Array>()
715                .expect("DataType::Int8 arm guarantees Int8Array")
716                .value(idx).to_string())
717        }
718        DataType::UInt64 => {
719            Some(col.as_any().downcast_ref::<UInt64Array>()
720                .expect("DataType::UInt64 arm guarantees UInt64Array")
721                .value(idx).to_string())
722        }
723        DataType::UInt32 => {
724            Some(col.as_any().downcast_ref::<UInt32Array>()
725                .expect("DataType::UInt32 arm guarantees UInt32Array")
726                .value(idx).to_string())
727        }
728        DataType::UInt16 => {
729            Some(col.as_any().downcast_ref::<UInt16Array>()
730                .expect("DataType::UInt16 arm guarantees UInt16Array")
731                .value(idx).to_string())
732        }
733        DataType::UInt8 => {
734            Some(col.as_any().downcast_ref::<UInt8Array>()
735                .expect("DataType::UInt8 arm guarantees UInt8Array")
736                .value(idx).to_string())
737        }
738        DataType::Boolean => {
739            Some(col.as_any().downcast_ref::<BooleanArray>()
740                .expect("DataType::Boolean arm guarantees BooleanArray")
741                .value(idx).to_string())
742        }
743        DataType::Date32 => {
744            let days = col.as_any().downcast_ref::<Date32Array>()
745                .expect("DataType::Date32 arm guarantees Date32Array")
746                .value(idx);
747            Some(days_to_iso(days as i64))
748        }
749        DataType::Date64 => {
750            let millis = col.as_any().downcast_ref::<Date64Array>()
751                .expect("DataType::Date64 arm guarantees Date64Array")
752                .value(idx);
753            // Convert millis to days, then to ISO
754            let days = millis / 86_400_000;
755            Some(days_to_iso(days))
756        }
757        DataType::Timestamp(unit, tz) => {
758            let raw = match unit {
759                TimeUnit::Second => col
760                    .as_any()
761                    .downcast_ref::<TimestampSecondArray>()
762                    .expect("Timestamp(Second) arm guarantees TimestampSecondArray")
763                    .value(idx),
764                TimeUnit::Millisecond => col
765                    .as_any()
766                    .downcast_ref::<TimestampMillisecondArray>()
767                    .expect("Timestamp(Millisecond) arm guarantees TimestampMillisecondArray")
768                    .value(idx),
769                TimeUnit::Microsecond => col
770                    .as_any()
771                    .downcast_ref::<TimestampMicrosecondArray>()
772                    .expect("Timestamp(Microsecond) arm guarantees TimestampMicrosecondArray")
773                    .value(idx),
774                TimeUnit::Nanosecond => col
775                    .as_any()
776                    .downcast_ref::<TimestampNanosecondArray>()
777                    .expect("Timestamp(Nanosecond) arm guarantees TimestampNanosecondArray")
778                    .value(idx),
779            };
780            // Convert to seconds + subseconds using Euclidean division
781            // so the remainder is always non-negative (safe for pre-epoch timestamps).
782            let (secs, nanos_u32) = match unit {
783                TimeUnit::Second => (raw, 0u32),
784                TimeUnit::Millisecond => {
785                    let (s, r) = (raw.div_euclid(1000), raw.rem_euclid(1000));
786                    (s, (r * 1_000_000) as u32)
787                }
788                TimeUnit::Microsecond => {
789                    let (s, r) = (raw.div_euclid(1_000_000), raw.rem_euclid(1_000_000));
790                    (s, (r * 1000) as u32)
791                }
792                TimeUnit::Nanosecond => {
793                    let (s, r) = (raw.div_euclid(1_000_000_000), raw.rem_euclid(1_000_000_000));
794                    (s, r as u32)
795                }
796            };
797            let iso = epoch_to_iso(secs, nanos_u32);
798            if tz.is_some() {
799                Some(format!("{}Z", iso))
800            } else {
801                Some(iso)
802            }
803        }
804        DataType::Time64(TimeUnit::Microsecond) => {
805            let micros = col
806                .as_any()
807                .downcast_ref::<Time64MicrosecondArray>()
808                .expect("Time64(Microsecond) arm guarantees Time64MicrosecondArray")
809                .value(idx);
810            Some(micros_to_hms(micros))
811        }
812        DataType::Time64(TimeUnit::Nanosecond) => {
813            let nanos = col
814                .as_any()
815                .downcast_ref::<Time64NanosecondArray>()
816                .expect("Time64(Nanosecond) arm guarantees Time64NanosecondArray")
817                .value(idx);
818            Some(micros_to_hms(nanos / 1000))
819        }
820        DataType::Time32(TimeUnit::Second) => {
821            let secs = col
822                .as_any()
823                .downcast_ref::<Time32SecondArray>()
824                .expect("Time32(Second) arm guarantees Time32SecondArray")
825                .value(idx);
826            Some(micros_to_hms(secs as i64 * 1_000_000))
827        }
828        DataType::Time32(TimeUnit::Millisecond) => {
829            let millis = col
830                .as_any()
831                .downcast_ref::<Time32MillisecondArray>()
832                .expect("Time32(Millisecond) arm guarantees Time32MillisecondArray")
833                .value(idx);
834            Some(micros_to_hms(millis as i64 * 1000))
835        }
836        DataType::Decimal128(_, scale) => {
837            let raw = col
838                .as_any()
839                .downcast_ref::<Decimal128Array>()
840                .expect("DataType::Decimal128 arm guarantees Decimal128Array")
841                .value(idx);
842            Some(format_decimal128(raw, *scale))
843        }
844        _ => None,
845    }
846}
847
848/// Format an f64 nicely — remove trailing zeros for integers.
849fn format_f64(v: f64) -> String {
850    if v.fract() == 0.0 && v.abs() < 1e15 {
851        format!("{}", v as i64)
852    } else {
853        v.to_string()
854    }
855}
856
857/// Convert days-since-epoch to ISO date string (YYYY-MM-DD).
858fn days_to_iso(days: i64) -> String {
859    let (year, month, day) = civil_from_days(days);
860    format!("{:04}-{:02}-{:02}", year, month, day)
861}
862
863/// Convert epoch seconds + nanos to ISO datetime string.
864fn epoch_to_iso(secs: i64, nanos: u32) -> String {
865    let days = if secs >= 0 {
866        secs / 86400
867    } else {
868        (secs - 86399) / 86400
869    };
870    let day_secs = secs - days * 86400;
871    let (year, month, day) = civil_from_days(days);
872    let hours = day_secs / 3600;
873    let minutes = (day_secs % 3600) / 60;
874    let seconds = day_secs % 60;
875
876    if nanos == 0 {
877        format!(
878            "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}",
879            year, month, day, hours, minutes, seconds
880        )
881    } else {
882        let millis = nanos / 1_000_000;
883        format!(
884            "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}",
885            year, month, day, hours, minutes, seconds, millis
886        )
887    }
888}
889
890/// Convert microseconds since midnight to HH:MM:SS format.
891/// Sub-second precision is intentionally truncated — the rendering
892/// layer does not need fractional seconds for time-of-day columns.
893fn micros_to_hms(micros: i64) -> String {
894    let sign = if micros < 0 { "-" } else { "" };
895    let total_secs = micros.unsigned_abs() / 1_000_000;
896    let hours = total_secs / 3600;
897    let minutes = (total_secs % 3600) / 60;
898    let seconds = total_secs % 60;
899    format!("{sign}{:02}:{:02}:{:02}", hours, minutes, seconds)
900}
901
902/// Convert days since Unix epoch to (year, month, day).
903/// Algorithm from Howard Hinnant's date algorithms.
904fn civil_from_days(days: i64) -> (i64, u32, u32) {
905    let z = days + 719468;
906    let era = if z >= 0 { z } else { z - 146096 } / 146097;
907    let doe = (z - era * 146097) as u32;
908    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
909    let y = yoe as i64 + era * 400;
910    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
911    let mp = (5 * doy + 2) / 153;
912    let d = doy - (153 * mp + 2) / 5 + 1;
913    let m = if mp < 10 { mp + 3 } else { mp - 9 };
914    let y = if m <= 2 { y + 1 } else { y };
915    (y, m, d)
916}
917
918/// Format a Decimal128 value with the given scale.
919fn format_decimal128(raw: i128, scale: i8) -> String {
920    if scale <= 0 {
921        return raw.to_string();
922    }
923    let divisor = 10_i128.pow(scale as u32);
924    let whole = raw / divisor;
925    let frac = (raw % divisor).abs();
926    // For negative values between -1 and 0 (e.g., -0.05), whole truncates to 0
927    // and loses the sign. Restore it explicitly.
928    let sign = if raw < 0 && whole == 0 { "-" } else { "" };
929    format!("{}{}.{:0>width$}", sign, whole, frac, width = scale as usize)
930}
931
932// ── Type inference helpers ──
933
934#[derive(Debug, Clone, Copy, PartialEq)]
935enum InferredType {
936    Float64,
937    Boolean,
938    Utf8,
939    Null,
940}
941
942fn merge_inferred(existing: InferredType, new: InferredType) -> InferredType {
943    if new == InferredType::Null {
944        return existing;
945    }
946    if existing == InferredType::Null {
947        return new;
948    }
949    if existing == new {
950        return existing;
951    }
952    // Mixed types → string
953    InferredType::Utf8
954}
955
956#[cfg(test)]
957mod tests {
958    #![allow(clippy::unwrap_used)]
959    use super::*;
960    use serde_json::json;
961
962    fn make_row(pairs: Vec<(&str, serde_json::Value)>) -> Row {
963        pairs
964            .into_iter()
965            .map(|(k, v)| (k.to_string(), v))
966            .collect()
967    }
968
969    // ── Legacy Row tests ──
970
971    #[test]
972    fn get_f64_from_number() {
973        let row = make_row(vec![("value", json!(42.5))]);
974        assert_eq!(get_f64(&row, "value"), Some(42.5));
975    }
976
977    #[test]
978    fn get_f64_from_string() {
979        let row = make_row(vec![("value", json!("123.45"))]);
980        assert_eq!(get_f64(&row, "value"), Some(123.45));
981    }
982
983    #[test]
984    fn get_f64_missing_field() {
985        let row = make_row(vec![("other", json!(1.0))]);
986        assert_eq!(get_f64(&row, "value"), None);
987    }
988
989    #[test]
990    fn get_string_from_various() {
991        let row_num = make_row(vec![("x", json!(42))]);
992        assert_eq!(get_string(&row_num, "x"), Some("42".to_string()));
993
994        let row_str = make_row(vec![("x", json!("hello"))]);
995        assert_eq!(get_string(&row_str, "x"), Some("hello".to_string()));
996
997        let row_bool = make_row(vec![("x", json!(true))]);
998        assert_eq!(get_string(&row_bool, "x"), Some("true".to_string()));
999
1000        let row_null = make_row(vec![("x", json!(null))]);
1001        assert_eq!(get_string(&row_null, "x"), None);
1002    }
1003
1004    #[test]
1005    fn extent_basic() {
1006        let data = vec![
1007            make_row(vec![("v", json!(10.0))]),
1008            make_row(vec![("v", json!(30.0))]),
1009            make_row(vec![("v", json!(20.0))]),
1010        ];
1011        assert_eq!(extent(&data, "v"), Some((10.0, 30.0)));
1012    }
1013
1014    #[test]
1015    fn extent_empty() {
1016        let data: Vec<Row> = vec![];
1017        assert_eq!(extent(&data, "v"), None);
1018
1019        let data = vec![make_row(vec![("other", json!(1.0))])];
1020        assert_eq!(extent(&data, "v"), None);
1021    }
1022
1023    #[test]
1024    fn sum_basic() {
1025        let data = vec![
1026            make_row(vec![("v", json!(10.0))]),
1027            make_row(vec![("v", json!(20.0))]),
1028            make_row(vec![("v", json!(30.0))]),
1029        ];
1030        assert_eq!(sum(&data, "v"), 60.0);
1031    }
1032
1033    #[test]
1034    fn group_by_basic() {
1035        let data = vec![
1036            make_row(vec![("cat", json!("A")), ("v", json!(1))]),
1037            make_row(vec![("cat", json!("B")), ("v", json!(2))]),
1038            make_row(vec![("cat", json!("A")), ("v", json!(3))]),
1039        ];
1040        let groups = group_by(&data, "cat");
1041        assert_eq!(groups.len(), 2);
1042        assert_eq!(groups["A"].len(), 2);
1043        assert_eq!(groups["B"].len(), 1);
1044    }
1045
1046    #[test]
1047    fn unique_values_preserves_order() {
1048        let data = vec![
1049            make_row(vec![("x", json!("banana"))]),
1050            make_row(vec![("x", json!("apple"))]),
1051            make_row(vec![("x", json!("banana"))]),
1052            make_row(vec![("x", json!("cherry"))]),
1053            make_row(vec![("x", json!("apple"))]),
1054        ];
1055        let uniq = unique_values(&data, "x");
1056        assert_eq!(uniq, vec!["banana", "apple", "cherry"]);
1057    }
1058
1059    // ── DataTable tests ──
1060
1061    #[test]
1062    fn datatable_from_rows_roundtrip() {
1063        let rows = vec![
1064            make_row(vec![
1065                ("name", json!("Alice")),
1066                ("age", json!(30)),
1067                ("active", json!(true)),
1068            ]),
1069            make_row(vec![
1070                ("name", json!("Bob")),
1071                ("age", json!(25)),
1072                ("active", json!(false)),
1073            ]),
1074        ];
1075
1076        let dt = DataTable::from_rows(&rows).unwrap();
1077        assert_eq!(dt.num_rows(), 2);
1078        assert_eq!(dt.num_columns(), 3);
1079
1080        assert_eq!(dt.get_string(0, "name"), Some("Alice".to_string()));
1081        assert_eq!(dt.get_f64(0, "age"), Some(30.0));
1082        assert_eq!(dt.get_string(1, "name"), Some("Bob".to_string()));
1083        assert_eq!(dt.get_f64(1, "age"), Some(25.0));
1084    }
1085
1086    #[test]
1087    fn datatable_empty() {
1088        let dt = DataTable::from_rows(&[]).unwrap();
1089        assert_eq!(dt.num_rows(), 0);
1090        assert!(dt.is_empty());
1091    }
1092
1093    #[test]
1094    fn datatable_unique_values() {
1095        let rows = vec![
1096            make_row(vec![("x", json!("banana"))]),
1097            make_row(vec![("x", json!("apple"))]),
1098            make_row(vec![("x", json!("banana"))]),
1099            make_row(vec![("x", json!("cherry"))]),
1100        ];
1101        let dt = DataTable::from_rows(&rows).unwrap();
1102        // Values preserve first-appearance order from the rows
1103        assert_eq!(dt.unique_values("x"), vec!["banana", "apple", "cherry"]);
1104    }
1105
1106    #[test]
1107    fn datatable_extent() {
1108        let rows = vec![
1109            make_row(vec![("v", json!(10.0))]),
1110            make_row(vec![("v", json!(30.0))]),
1111            make_row(vec![("v", json!(20.0))]),
1112        ];
1113        let dt = DataTable::from_rows(&rows).unwrap();
1114        assert_eq!(dt.extent("v"), Some((10.0, 30.0)));
1115    }
1116
1117    #[test]
1118    fn datatable_group_by() {
1119        let rows = vec![
1120            make_row(vec![("cat", json!("A")), ("v", json!(1))]),
1121            make_row(vec![("cat", json!("B")), ("v", json!(2))]),
1122            make_row(vec![("cat", json!("A")), ("v", json!(3))]),
1123        ];
1124        let dt = DataTable::from_rows(&rows).unwrap();
1125        let groups = dt.group_by("cat");
1126        assert_eq!(groups.len(), 2);
1127        assert_eq!(groups["A"].num_rows(), 2);
1128        assert_eq!(groups["B"].num_rows(), 1);
1129    }
1130
1131    #[test]
1132    fn datatable_sum() {
1133        let rows = vec![
1134            make_row(vec![("v", json!(10.0))]),
1135            make_row(vec![("v", json!(20.0))]),
1136            make_row(vec![("v", json!(30.0))]),
1137        ];
1138        let dt = DataTable::from_rows(&rows).unwrap();
1139        assert_eq!(dt.sum("v"), 60.0);
1140    }
1141
1142    #[test]
1143    fn datatable_ipc_roundtrip() {
1144        let rows = vec![
1145            make_row(vec![("name", json!("Alice")), ("score", json!(95.5))]),
1146            make_row(vec![("name", json!("Bob")), ("score", json!(87.0))]),
1147        ];
1148        let dt = DataTable::from_rows(&rows).unwrap();
1149        let bytes = dt.to_ipc_bytes().unwrap();
1150        let dt2 = DataTable::from_ipc_bytes(&bytes).unwrap();
1151        assert_eq!(dt2.num_rows(), 2);
1152        assert_eq!(dt2.get_string(0, "name"), Some("Alice".to_string()));
1153        assert_eq!(dt2.get_f64(1, "score"), Some(87.0));
1154    }
1155
1156    #[test]
1157    fn datatable_record_batch_with_timestamps() {
1158        use arrow::array::TimestampMicrosecondArray;
1159        use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1160
1161        // Build a RecordBatch with a proper Timestamp column
1162        let schema = Arc::new(Schema::new(vec![
1163            Field::new("ts", DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), true),
1164            Field::new("value", DataType::Float64, true),
1165        ]));
1166        // 2026-01-15T10:30:00Z = 1768474200 seconds = 1768474200000000 microseconds
1167        let ts_array = TimestampMicrosecondArray::from(vec![Some(1768474200000000i64)])
1168            .with_timezone("UTC");
1169        let val_array = Float64Array::from(vec![Some(42.0)]);
1170        let batch = RecordBatch::try_new(
1171            schema,
1172            vec![Arc::new(ts_array) as ArrayRef, Arc::new(val_array) as ArrayRef],
1173        )
1174        .unwrap();
1175
1176        let dt = DataTable::from_record_batch(batch);
1177        // get_f64 returns epoch millis
1178        assert_eq!(dt.get_f64(0, "ts"), Some(1768474200000.0));
1179        // get_string returns ISO 8601 with Z suffix (has timezone)
1180        let ts_str = dt.get_string(0, "ts").unwrap();
1181        assert!(ts_str.ends_with('Z'), "Expected Z suffix, got: {}", ts_str);
1182        assert!(ts_str.starts_with("2026-01-15T"), "Expected 2026-01-15T, got: {}", ts_str);
1183    }
1184
1185    #[test]
1186    fn datatable_record_batch_with_dates() {
1187        let schema = Arc::new(Schema::new(vec![
1188            Field::new("d", DataType::Date32, true),
1189        ]));
1190        // 2026-01-15 = 20468 days since epoch
1191        let date_array = Date32Array::from(vec![Some(20468)]);
1192        let batch = RecordBatch::try_new(schema, vec![Arc::new(date_array) as ArrayRef]).unwrap();
1193
1194        let dt = DataTable::from_record_batch(batch);
1195        assert_eq!(dt.get_string(0, "d"), Some("2026-01-15".to_string()));
1196        assert_eq!(dt.get_f64(0, "d"), Some(20468.0));
1197    }
1198
1199    #[test]
1200    fn datatable_has_field() {
1201        let rows = vec![make_row(vec![("x", json!(1))])];
1202        let dt = DataTable::from_rows(&rows).unwrap();
1203        assert!(dt.has_field("x"));
1204        assert!(!dt.has_field("y"));
1205    }
1206
1207    #[test]
1208    fn datatable_record_batch_with_time_columns() {
1209        use arrow::array::{Time64MicrosecondArray, Time32SecondArray};
1210
1211        let schema = Arc::new(Schema::new(vec![
1212            Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond), true),
1213            Field::new("t32_s", DataType::Time32(TimeUnit::Second), false),
1214        ]));
1215
1216        let batch = RecordBatch::try_new(
1217            schema,
1218            vec![
1219                // 10:30:45 = 10*3600 + 30*60 + 45 = 37845 seconds = 37_845_000_000 micros
1220                // Also test midnight (0) and a value with sub-second micros (truncated)
1221                Arc::new(Time64MicrosecondArray::from(vec![
1222                    Some(37_845_000_000i64),
1223                    Some(0i64),
1224                    Some(37_845_123_456i64),
1225                    None,
1226                ])),
1227                Arc::new(Time32SecondArray::from(vec![
1228                    37845i32,
1229                    0i32,
1230                    86399i32,
1231                    3661i32,
1232                ])),
1233            ],
1234        )
1235        .unwrap();
1236
1237        let dt = DataTable::from_record_batch(batch);
1238
1239        // Time64(Microsecond) formatting
1240        assert_eq!(dt.get_string(0, "t64_us"), Some("10:30:45".to_string()));
1241        assert_eq!(dt.get_string(1, "t64_us"), Some("00:00:00".to_string()));
1242        // Sub-second precision is intentionally truncated
1243        assert_eq!(dt.get_string(2, "t64_us"), Some("10:30:45".to_string()));
1244        assert_eq!(dt.get_string(3, "t64_us"), None);
1245
1246        // Time32(Second) formatting
1247        assert_eq!(dt.get_string(0, "t32_s"), Some("10:30:45".to_string()));
1248        assert_eq!(dt.get_string(1, "t32_s"), Some("00:00:00".to_string()));
1249        assert_eq!(dt.get_string(2, "t32_s"), Some("23:59:59".to_string()));
1250        assert_eq!(dt.get_string(3, "t32_s"), Some("01:01:01".to_string()));
1251    }
1252
1253    #[test]
1254    fn datatable_null_values() {
1255        let rows = vec![
1256            make_row(vec![("x", json!(1.0)), ("y", json!(null))]),
1257            make_row(vec![("x", json!(null)), ("y", json!("hello"))]),
1258        ];
1259        let dt = DataTable::from_rows(&rows).unwrap();
1260        assert_eq!(dt.get_f64(0, "x"), Some(1.0));
1261        assert_eq!(dt.get_f64(0, "y"), None);
1262        assert_eq!(dt.get_f64(1, "x"), None);
1263        assert_eq!(dt.get_string(1, "y"), Some("hello".to_string()));
1264    }
1265}