Skip to main content

chartml_core/
data.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow::array::{
5    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, Float32Array,
6    Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringBuilder,
7    StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
8    TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
9};
10use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
11
12use crate::error::ChartError;
13
14/// A row of data — a map of field names to values.
15/// Used for inline YAML data and backward compatibility.
16pub type Row = HashMap<String, serde_json::Value>;
17
18// ── Legacy free functions (kept for backward compat / built-in transforms) ──
19
20/// Extract an f64 value from a Row by field name.
21/// Handles both Number and String (parsed) values.
22pub fn get_f64(row: &Row, field: &str) -> Option<f64> {
23    match row.get(field)? {
24        serde_json::Value::Number(n) => n.as_f64(),
25        serde_json::Value::String(s) => s.parse::<f64>().ok(),
26        _ => None,
27    }
28}
29
30/// Extract a string value from a Row by field name.
31pub fn get_string(row: &Row, field: &str) -> Option<String> {
32    match row.get(field)? {
33        serde_json::Value::String(s) => Some(s.clone()),
34        serde_json::Value::Number(n) => Some(n.to_string()),
35        serde_json::Value::Bool(b) => Some(b.to_string()),
36        serde_json::Value::Null => None,
37        other => Some(other.to_string()),
38    }
39}
40
41/// Compute the extent (min, max) of a numeric field across rows.
42pub fn extent_rows(data: &[Row], field: &str) -> Option<(f64, f64)> {
43    let values: Vec<f64> = data.iter().filter_map(|row| get_f64(row, field)).collect();
44    if values.is_empty() {
45        return None;
46    }
47    let min = values.iter().cloned().fold(f64::INFINITY, f64::min);
48    let max = values.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
49    Some((min, max))
50}
51
52/// Sum a numeric field across rows.
53pub fn sum_rows(data: &[Row], field: &str) -> f64 {
54    data.iter().filter_map(|row| get_f64(row, field)).sum()
55}
56
57/// Group rows by a field value.
58pub fn group_by_rows<'a>(data: &'a [Row], field: &str) -> HashMap<String, Vec<&'a Row>> {
59    let mut groups: HashMap<String, Vec<&'a Row>> = HashMap::new();
60    for row in data {
61        if let Some(key) = get_string(row, field) {
62            groups.entry(key).or_default().push(row);
63        }
64    }
65    groups
66}
67
68/// Get unique values for a field, in order of first appearance.
69pub fn unique_values_rows(data: &[Row], field: &str) -> Vec<String> {
70    let mut seen = std::collections::HashSet::new();
71    let mut result = Vec::new();
72    for row in data {
73        if let Some(val) = get_string(row, field) {
74            if seen.insert(val.clone()) {
75                result.push(val);
76            }
77        }
78    }
79    result
80}
81
82// Keep old names as aliases for backward compatibility
83pub use extent_rows as extent;
84pub use sum_rows as sum;
85pub use group_by_rows as group_by;
86pub use unique_values_rows as unique_values;
87
88// ── DataTable: Arrow-backed columnar data ──
89
90/// Type-preserving columnar data backed by Arrow RecordBatch.
91///
92/// Provides row-oriented accessors for renderer compatibility while
93/// maintaining full Arrow type fidelity (timestamps, dates, decimals, etc.)
94/// for the transform pipeline.
95#[derive(Debug, Clone)]
96pub struct DataTable {
97    batch: RecordBatch,
98    /// Column name → column index for fast lookup.
99    field_index: HashMap<String, usize>,
100}
101
102impl DataTable {
103    /// Wrap an existing Arrow RecordBatch.
104    pub fn from_record_batch(batch: RecordBatch) -> Self {
105        let field_index = batch
106            .schema()
107            .fields()
108            .iter()
109            .enumerate()
110            .map(|(i, f)| (f.name().clone(), i))
111            .collect();
112        Self { batch, field_index }
113    }
114
115    /// Convert JSON rows (from inline YAML data) into a DataTable.
116    /// Type inference: Numbers → Float64, Booleans → Boolean, Strings → Utf8.
117    pub fn from_rows(rows: &[Row]) -> Result<Self, ChartError> {
118        if rows.is_empty() {
119            let schema = Arc::new(Schema::new(Vec::<Field>::new()));
120            let batch = RecordBatch::new_empty(schema);
121            return Ok(Self::from_record_batch(batch));
122        }
123
124        // Collect column names preserving first-appearance order, then sort for determinism
125        let mut column_names: Vec<String> = Vec::new();
126        let mut seen = std::collections::HashSet::new();
127        for row in rows {
128            for key in row.keys() {
129                if seen.insert(key.clone()) {
130                    column_names.push(key.clone());
131                }
132            }
133        }
134        column_names.sort();
135
136        // Infer types
137        let mut col_types: Vec<InferredType> = vec![InferredType::Null; column_names.len()];
138        for row in rows {
139            for (i, name) in column_names.iter().enumerate() {
140                if let Some(val) = row.get(name) {
141                    let val_type = match val {
142                        serde_json::Value::Number(_) => InferredType::Float64,
143                        serde_json::Value::Bool(_) => InferredType::Boolean,
144                        serde_json::Value::String(_) => InferredType::Utf8,
145                        serde_json::Value::Null => InferredType::Null,
146                        _ => InferredType::Utf8,
147                    };
148                    col_types[i] = merge_inferred(col_types[i], val_type);
149                }
150            }
151        }
152
153        // Null → Utf8
154        for t in &mut col_types {
155            if *t == InferredType::Null {
156                *t = InferredType::Utf8;
157            }
158        }
159
160        // Build schema
161        let fields: Vec<Field> = column_names
162            .iter()
163            .zip(col_types.iter())
164            .map(|(name, typ)| {
165                let dt = match typ {
166                    InferredType::Float64 => DataType::Float64,
167                    InferredType::Boolean => DataType::Boolean,
168                    InferredType::Utf8 | InferredType::Null => DataType::Utf8,
169                };
170                Field::new(name, dt, true)
171            })
172            .collect();
173        let schema = Arc::new(Schema::new(fields));
174
175        // Build arrays
176        let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_names.len());
177        for (i, name) in column_names.iter().enumerate() {
178            let arr: ArrayRef = match col_types[i] {
179                InferredType::Float64 => {
180                    let values: Vec<Option<f64>> = rows
181                        .iter()
182                        .map(|row| {
183                            row.get(name).and_then(|v| match v {
184                                serde_json::Value::Number(n) => n.as_f64(),
185                                serde_json::Value::String(s) => s.parse::<f64>().ok(),
186                                _ => None,
187                            })
188                        })
189                        .collect();
190                    Arc::new(Float64Array::from(values))
191                }
192                InferredType::Boolean => {
193                    let values: Vec<Option<bool>> = rows
194                        .iter()
195                        .map(|row| {
196                            row.get(name).and_then(|v| match v {
197                                serde_json::Value::Bool(b) => Some(*b),
198                                _ => None,
199                            })
200                        })
201                        .collect();
202                    Arc::new(BooleanArray::from(values))
203                }
204                InferredType::Utf8 | InferredType::Null => {
205                    let mut builder = StringBuilder::new();
206                    for row in rows {
207                        match row.get(name) {
208                            Some(serde_json::Value::String(s)) => builder.append_value(s),
209                            Some(serde_json::Value::Number(n)) => {
210                                builder.append_value(n.to_string())
211                            }
212                            Some(serde_json::Value::Bool(b)) => {
213                                builder.append_value(b.to_string())
214                            }
215                            Some(serde_json::Value::Null) | None => builder.append_null(),
216                            Some(other) => builder.append_value(other.to_string()),
217                        }
218                    }
219                    Arc::new(builder.finish())
220                }
221            };
222            arrays.push(arr);
223        }
224
225        let batch = RecordBatch::try_new(schema, arrays)
226            .map_err(|e| ChartError::DataError(format!("Failed to create RecordBatch: {}", e)))?;
227        Ok(Self::from_record_batch(batch))
228    }
229
230    /// Deserialize from Arrow IPC bytes.
231    pub fn from_ipc_bytes(bytes: &[u8]) -> Result<Self, ChartError> {
232        use arrow::ipc::reader::StreamReader;
233        use std::io::Cursor;
234
235        let cursor = Cursor::new(bytes);
236        let reader = StreamReader::try_new(cursor, None)
237            .map_err(|e| ChartError::DataError(format!("Failed to read Arrow IPC: {}", e)))?;
238
239        let schema = reader.schema();
240        let mut batches = Vec::new();
241        for batch_result in reader {
242            let batch = batch_result.map_err(|e| {
243                ChartError::DataError(format!("Failed to read Arrow batch: {}", e))
244            })?;
245            batches.push(batch);
246        }
247
248        if batches.is_empty() {
249            return Ok(Self::from_record_batch(RecordBatch::new_empty(schema)));
250        }
251
252        if batches.len() == 1 {
253            return Ok(Self::from_record_batch(batches.remove(0)));
254        }
255
256        // Concatenate multiple batches
257        let batch = arrow::compute::concat_batches(&schema, &batches)
258            .map_err(|e| ChartError::DataError(format!("Failed to concat batches: {}", e)))?;
259        Ok(Self::from_record_batch(batch))
260    }
261
262    /// Serialize to Arrow IPC bytes.
263    pub fn to_ipc_bytes(&self) -> Result<Vec<u8>, ChartError> {
264        use arrow::ipc::writer::StreamWriter;
265
266        let mut buf = Vec::new();
267        {
268            let mut writer = StreamWriter::try_new(&mut buf, &self.batch.schema())
269                .map_err(|e| ChartError::DataError(format!("Failed to create IPC writer: {}", e)))?;
270            writer.write(&self.batch).map_err(|e| {
271                ChartError::DataError(format!("Failed to write Arrow batch: {}", e))
272            })?;
273            writer.finish().map_err(|e| {
274                ChartError::DataError(format!("Failed to finish IPC stream: {}", e))
275            })?;
276        }
277        Ok(buf)
278    }
279
280    // ── Accessors ──
281
282    /// Number of rows.
283    pub fn num_rows(&self) -> usize {
284        self.batch.num_rows()
285    }
286
287    /// Number of columns.
288    pub fn num_columns(&self) -> usize {
289        self.batch.num_columns()
290    }
291
292    /// Whether the table has no rows.
293    pub fn is_empty(&self) -> bool {
294        self.batch.num_rows() == 0
295    }
296
297    /// Get the underlying RecordBatch.
298    pub fn record_batch(&self) -> &RecordBatch {
299        &self.batch
300    }
301
302    /// Get the Arrow schema.
303    pub fn schema(&self) -> Arc<Schema> {
304        self.batch.schema()
305    }
306
307    /// Get a column by name.
308    fn column(&self, field: &str) -> Option<&ArrayRef> {
309        self.field_index.get(field).map(|&i| self.batch.column(i))
310    }
311
312    /// Extract an f64 value from a specific row and field.
313    /// Handles all numeric Arrow types, Date32 (days since epoch), and Timestamps (epoch millis).
314    pub fn get_f64(&self, row: usize, field: &str) -> Option<f64> {
315        let col = self.column(field)?;
316        if col.is_null(row) {
317            return None;
318        }
319        arrow_to_f64(col, row)
320    }
321
322    /// Extract a string value from a specific row and field.
323    /// Formats temporal types as ISO strings, numbers as decimal strings.
324    pub fn get_string(&self, row: usize, field: &str) -> Option<String> {
325        let col = self.column(field)?;
326        if col.is_null(row) {
327            return None;
328        }
329        arrow_to_string(col, row)
330    }
331
332    /// Get unique string values for a field, preserving first-appearance order.
333    pub fn unique_values(&self, field: &str) -> Vec<String> {
334        let col = match self.column(field) {
335            Some(c) => c,
336            None => return Vec::new(),
337        };
338        let mut seen = std::collections::HashSet::new();
339        let mut result = Vec::new();
340        for i in 0..self.batch.num_rows() {
341            if col.is_null(i) {
342                continue;
343            }
344            if let Some(val) = arrow_to_string(col, i) {
345                if seen.insert(val.clone()) {
346                    result.push(val);
347                }
348            }
349        }
350        result
351    }
352
353    /// Get all string values for a field, preserving row order (including duplicates).
354    pub fn all_values(&self, field: &str) -> Vec<String> {
355        let col = match self.column(field) {
356            Some(c) => c,
357            None => return Vec::new(),
358        };
359        let mut result = Vec::new();
360        for i in 0..self.batch.num_rows() {
361            if col.is_null(i) {
362                continue;
363            }
364            if let Some(val) = arrow_to_string(col, i) {
365                result.push(val);
366            }
367        }
368        result
369    }
370
371    /// Compute the extent (min, max) of a numeric field.
372    pub fn extent(&self, field: &str) -> Option<(f64, f64)> {
373        let col = self.column(field)?;
374        let mut min = f64::INFINITY;
375        let mut max = f64::NEG_INFINITY;
376        let mut found = false;
377        for i in 0..self.batch.num_rows() {
378            if col.is_null(i) {
379                continue;
380            }
381            if let Some(v) = arrow_to_f64(col, i) {
382                found = true;
383                if v < min {
384                    min = v;
385                }
386                if v > max {
387                    max = v;
388                }
389            }
390        }
391        if found {
392            Some((min, max))
393        } else {
394            None
395        }
396    }
397
398    /// Sum a numeric field across all rows.
399    pub fn sum(&self, field: &str) -> f64 {
400        let col = match self.column(field) {
401            Some(c) => c,
402            None => return 0.0,
403        };
404        let mut total = 0.0;
405        for i in 0..self.batch.num_rows() {
406            if !col.is_null(i) {
407                if let Some(v) = arrow_to_f64(col, i) {
408                    total += v;
409                }
410            }
411        }
412        total
413    }
414
415    /// Group rows by a field value, returning a map of group key → DataTable.
416    pub fn group_by(&self, field: &str) -> HashMap<String, DataTable> {
417        let col = match self.column(field) {
418            Some(c) => c,
419            None => return HashMap::new(),
420        };
421
422        // Collect indices per group
423        let mut group_indices: HashMap<String, Vec<u32>> = HashMap::new();
424        let mut key_order: Vec<String> = Vec::new();
425        let mut seen_keys = std::collections::HashSet::new();
426
427        for i in 0..self.batch.num_rows() {
428            if col.is_null(i) {
429                continue;
430            }
431            if let Some(key) = arrow_to_string(col, i) {
432                if seen_keys.insert(key.clone()) {
433                    key_order.push(key.clone());
434                }
435                group_indices.entry(key).or_default().push(i as u32);
436            }
437        }
438
439        // Build sub-tables using arrow::compute::take
440        let mut result = HashMap::new();
441        for key in key_order {
442            if let Some(indices) = group_indices.get(&key) {
443                let indices_arr = UInt32Array::from(indices.clone());
444                let take_result: Result<Vec<ArrayRef>, _> = self
445                    .batch
446                    .columns()
447                    .iter()
448                    .map(|col| arrow::compute::take(col.as_ref(), &indices_arr, None))
449                    .collect();
450                if let Ok(columns) = take_result {
451                    if let Ok(sub_batch) = RecordBatch::try_new(self.batch.schema(), columns) {
452                        result.insert(key, DataTable::from_record_batch(sub_batch));
453                    }
454                }
455            }
456        }
457        result
458    }
459
460    /// Check if a field exists in the schema.
461    pub fn has_field(&self, field: &str) -> bool {
462        self.field_index.contains_key(field)
463    }
464
465    /// Get all field names.
466    pub fn field_names(&self) -> Vec<String> {
467        self.batch
468            .schema()
469            .fields()
470            .iter()
471            .map(|f| f.name().clone())
472            .collect()
473    }
474
475    /// Convert the Arrow RecordBatch back to JSON rows.
476    /// Used for the built-in sync transform fallback path which operates on `Vec<Row>`.
477    pub fn to_rows(&self) -> Vec<Row> {
478        let num_rows = self.batch.num_rows();
479        let schema = self.batch.schema();
480        let fields = schema.fields();
481
482        let mut rows = Vec::with_capacity(num_rows);
483        for row_idx in 0..num_rows {
484            let mut row = Row::new();
485            for (col_idx, field) in fields.iter().enumerate() {
486                let col = self.batch.column(col_idx);
487                if col.is_null(row_idx) {
488                    row.insert(field.name().clone(), serde_json::Value::Null);
489                    continue;
490                }
491                let value = match col.data_type() {
492                    DataType::Boolean => {
493                        let v = col.as_any().downcast_ref::<BooleanArray>().unwrap().value(row_idx);
494                        serde_json::json!(v)
495                    }
496                    DataType::Float64 | DataType::Float32 |
497                    DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 |
498                    DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 |
499                    DataType::Decimal128(_, _) => {
500                        if let Some(v) = arrow_to_f64(col, row_idx) {
501                            serde_json::json!(v)
502                        } else {
503                            serde_json::Value::Null
504                        }
505                    }
506                    // Dates and timestamps → preserve as ISO strings, not raw f64
507                    DataType::Date32 | DataType::Date64 |
508                    DataType::Timestamp(_, _) => {
509                        if let Some(s) = arrow_to_string(col, row_idx) {
510                            serde_json::Value::String(s)
511                        } else {
512                            serde_json::Value::Null
513                        }
514                    }
515                    _ => {
516                        if let Some(s) = arrow_to_string(col, row_idx) {
517                            serde_json::Value::String(s)
518                        } else {
519                            serde_json::Value::Null
520                        }
521                    }
522                };
523                row.insert(field.name().clone(), value);
524            }
525            rows.push(row);
526        }
527        rows
528    }
529}
530
531// ── Arrow value extraction helpers ──
532
533/// Extract an f64 from any Arrow array at the given index.
534fn arrow_to_f64(col: &ArrayRef, idx: usize) -> Option<f64> {
535    match col.data_type() {
536        DataType::Float64 => {
537            Some(col.as_any().downcast_ref::<Float64Array>().unwrap().value(idx))
538        }
539        DataType::Float32 => {
540            Some(col.as_any().downcast_ref::<Float32Array>().unwrap().value(idx) as f64)
541        }
542        DataType::Int64 => {
543            Some(col.as_any().downcast_ref::<Int64Array>().unwrap().value(idx) as f64)
544        }
545        DataType::Int32 => {
546            Some(col.as_any().downcast_ref::<Int32Array>().unwrap().value(idx) as f64)
547        }
548        DataType::Int16 => {
549            Some(col.as_any().downcast_ref::<Int16Array>().unwrap().value(idx) as f64)
550        }
551        DataType::Int8 => {
552            Some(col.as_any().downcast_ref::<Int8Array>().unwrap().value(idx) as f64)
553        }
554        DataType::UInt64 => {
555            Some(col.as_any().downcast_ref::<UInt64Array>().unwrap().value(idx) as f64)
556        }
557        DataType::UInt32 => {
558            Some(col.as_any().downcast_ref::<UInt32Array>().unwrap().value(idx) as f64)
559        }
560        DataType::UInt16 => {
561            Some(col.as_any().downcast_ref::<UInt16Array>().unwrap().value(idx) as f64)
562        }
563        DataType::UInt8 => {
564            Some(col.as_any().downcast_ref::<UInt8Array>().unwrap().value(idx) as f64)
565        }
566        DataType::Boolean => {
567            let v = col.as_any().downcast_ref::<BooleanArray>().unwrap().value(idx);
568            Some(if v { 1.0 } else { 0.0 })
569        }
570        DataType::Date32 => {
571            // Days since epoch
572            Some(col.as_any().downcast_ref::<Date32Array>().unwrap().value(idx) as f64)
573        }
574        DataType::Date64 => {
575            // Milliseconds since epoch
576            Some(col.as_any().downcast_ref::<Date64Array>().unwrap().value(idx) as f64)
577        }
578        DataType::Timestamp(unit, _) => {
579            let raw = match unit {
580                TimeUnit::Second => col
581                    .as_any()
582                    .downcast_ref::<TimestampSecondArray>()
583                    .unwrap()
584                    .value(idx),
585                TimeUnit::Millisecond => col
586                    .as_any()
587                    .downcast_ref::<TimestampMillisecondArray>()
588                    .unwrap()
589                    .value(idx),
590                TimeUnit::Microsecond => col
591                    .as_any()
592                    .downcast_ref::<TimestampMicrosecondArray>()
593                    .unwrap()
594                    .value(idx),
595                TimeUnit::Nanosecond => col
596                    .as_any()
597                    .downcast_ref::<TimestampNanosecondArray>()
598                    .unwrap()
599                    .value(idx),
600            };
601            // Convert to epoch milliseconds for consistent f64 representation
602            let millis = match unit {
603                TimeUnit::Second => raw * 1000,
604                TimeUnit::Millisecond => raw,
605                TimeUnit::Microsecond => raw / 1000,
606                TimeUnit::Nanosecond => raw / 1_000_000,
607            };
608            Some(millis as f64)
609        }
610        DataType::Decimal128(_, scale) => {
611            let raw = col
612                .as_any()
613                .downcast_ref::<Decimal128Array>()
614                .unwrap()
615                .value(idx);
616            let divisor = 10_f64.powi(*scale as i32);
617            Some(raw as f64 / divisor)
618        }
619        DataType::Utf8 => {
620            // Try parsing string as number
621            let s = col.as_any().downcast_ref::<StringArray>().unwrap().value(idx);
622            s.parse::<f64>().ok()
623        }
624        _ => None,
625    }
626}
627
628/// Extract a string from any Arrow array at the given index.
629fn arrow_to_string(col: &ArrayRef, idx: usize) -> Option<String> {
630    match col.data_type() {
631        DataType::Utf8 => {
632            Some(
633                col.as_any()
634                    .downcast_ref::<StringArray>()
635                    .unwrap()
636                    .value(idx)
637                    .to_string(),
638            )
639        }
640        DataType::LargeUtf8 => {
641            Some(
642                col.as_any()
643                    .downcast_ref::<arrow::array::LargeStringArray>()
644                    .unwrap()
645                    .value(idx)
646                    .to_string(),
647            )
648        }
649        DataType::Float64 => {
650            let v = col.as_any().downcast_ref::<Float64Array>().unwrap().value(idx);
651            Some(format_f64(v))
652        }
653        DataType::Float32 => {
654            let v = col.as_any().downcast_ref::<Float32Array>().unwrap().value(idx) as f64;
655            Some(format_f64(v))
656        }
657        DataType::Int64 => {
658            Some(col.as_any().downcast_ref::<Int64Array>().unwrap().value(idx).to_string())
659        }
660        DataType::Int32 => {
661            Some(col.as_any().downcast_ref::<Int32Array>().unwrap().value(idx).to_string())
662        }
663        DataType::Int16 => {
664            Some(col.as_any().downcast_ref::<Int16Array>().unwrap().value(idx).to_string())
665        }
666        DataType::Int8 => {
667            Some(col.as_any().downcast_ref::<Int8Array>().unwrap().value(idx).to_string())
668        }
669        DataType::UInt64 => {
670            Some(col.as_any().downcast_ref::<UInt64Array>().unwrap().value(idx).to_string())
671        }
672        DataType::UInt32 => {
673            Some(col.as_any().downcast_ref::<UInt32Array>().unwrap().value(idx).to_string())
674        }
675        DataType::UInt16 => {
676            Some(col.as_any().downcast_ref::<UInt16Array>().unwrap().value(idx).to_string())
677        }
678        DataType::UInt8 => {
679            Some(col.as_any().downcast_ref::<UInt8Array>().unwrap().value(idx).to_string())
680        }
681        DataType::Boolean => {
682            Some(col.as_any().downcast_ref::<BooleanArray>().unwrap().value(idx).to_string())
683        }
684        DataType::Date32 => {
685            let days = col.as_any().downcast_ref::<Date32Array>().unwrap().value(idx);
686            Some(days_to_iso(days as i64))
687        }
688        DataType::Date64 => {
689            let millis = col.as_any().downcast_ref::<Date64Array>().unwrap().value(idx);
690            // Convert millis to days, then to ISO
691            let days = millis / 86_400_000;
692            Some(days_to_iso(days))
693        }
694        DataType::Timestamp(unit, tz) => {
695            let raw = match unit {
696                TimeUnit::Second => col
697                    .as_any()
698                    .downcast_ref::<TimestampSecondArray>()
699                    .unwrap()
700                    .value(idx),
701                TimeUnit::Millisecond => col
702                    .as_any()
703                    .downcast_ref::<TimestampMillisecondArray>()
704                    .unwrap()
705                    .value(idx),
706                TimeUnit::Microsecond => col
707                    .as_any()
708                    .downcast_ref::<TimestampMicrosecondArray>()
709                    .unwrap()
710                    .value(idx),
711                TimeUnit::Nanosecond => col
712                    .as_any()
713                    .downcast_ref::<TimestampNanosecondArray>()
714                    .unwrap()
715                    .value(idx),
716            };
717            // Convert to seconds + subseconds using Euclidean division
718            // so the remainder is always non-negative (safe for pre-epoch timestamps).
719            let (secs, nanos_u32) = match unit {
720                TimeUnit::Second => (raw, 0u32),
721                TimeUnit::Millisecond => {
722                    let (s, r) = (raw.div_euclid(1000), raw.rem_euclid(1000));
723                    (s, (r * 1_000_000) as u32)
724                }
725                TimeUnit::Microsecond => {
726                    let (s, r) = (raw.div_euclid(1_000_000), raw.rem_euclid(1_000_000));
727                    (s, (r * 1000) as u32)
728                }
729                TimeUnit::Nanosecond => {
730                    let (s, r) = (raw.div_euclid(1_000_000_000), raw.rem_euclid(1_000_000_000));
731                    (s, r as u32)
732                }
733            };
734            let iso = epoch_to_iso(secs, nanos_u32);
735            if tz.is_some() {
736                Some(format!("{}Z", iso))
737            } else {
738                Some(iso)
739            }
740        }
741        DataType::Decimal128(_, scale) => {
742            let raw = col
743                .as_any()
744                .downcast_ref::<Decimal128Array>()
745                .unwrap()
746                .value(idx);
747            Some(format_decimal128(raw, *scale))
748        }
749        _ => None,
750    }
751}
752
753/// Format an f64 nicely — remove trailing zeros for integers.
754fn format_f64(v: f64) -> String {
755    if v.fract() == 0.0 && v.abs() < 1e15 {
756        format!("{}", v as i64)
757    } else {
758        v.to_string()
759    }
760}
761
762/// Convert days-since-epoch to ISO date string (YYYY-MM-DD).
763fn days_to_iso(days: i64) -> String {
764    let (year, month, day) = civil_from_days(days);
765    format!("{:04}-{:02}-{:02}", year, month, day)
766}
767
768/// Convert epoch seconds + nanos to ISO datetime string.
769fn epoch_to_iso(secs: i64, nanos: u32) -> String {
770    let days = if secs >= 0 {
771        secs / 86400
772    } else {
773        (secs - 86399) / 86400
774    };
775    let day_secs = secs - days * 86400;
776    let (year, month, day) = civil_from_days(days);
777    let hours = day_secs / 3600;
778    let minutes = (day_secs % 3600) / 60;
779    let seconds = day_secs % 60;
780
781    if nanos == 0 {
782        format!(
783            "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}",
784            year, month, day, hours, minutes, seconds
785        )
786    } else {
787        let millis = nanos / 1_000_000;
788        format!(
789            "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}",
790            year, month, day, hours, minutes, seconds, millis
791        )
792    }
793}
794
795/// Convert days since Unix epoch to (year, month, day).
796/// Algorithm from Howard Hinnant's date algorithms.
797fn civil_from_days(days: i64) -> (i64, u32, u32) {
798    let z = days + 719468;
799    let era = if z >= 0 { z } else { z - 146096 } / 146097;
800    let doe = (z - era * 146097) as u32;
801    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
802    let y = yoe as i64 + era * 400;
803    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
804    let mp = (5 * doy + 2) / 153;
805    let d = doy - (153 * mp + 2) / 5 + 1;
806    let m = if mp < 10 { mp + 3 } else { mp - 9 };
807    let y = if m <= 2 { y + 1 } else { y };
808    (y, m, d)
809}
810
811/// Format a Decimal128 value with the given scale.
812fn format_decimal128(raw: i128, scale: i8) -> String {
813    if scale <= 0 {
814        return raw.to_string();
815    }
816    let divisor = 10_i128.pow(scale as u32);
817    let whole = raw / divisor;
818    let frac = (raw % divisor).abs();
819    // For negative values between -1 and 0 (e.g., -0.05), whole truncates to 0
820    // and loses the sign. Restore it explicitly.
821    let sign = if raw < 0 && whole == 0 { "-" } else { "" };
822    format!("{}{}.{:0>width$}", sign, whole, frac, width = scale as usize)
823}
824
825// ── Type inference helpers ──
826
827#[derive(Debug, Clone, Copy, PartialEq)]
828enum InferredType {
829    Float64,
830    Boolean,
831    Utf8,
832    Null,
833}
834
835fn merge_inferred(existing: InferredType, new: InferredType) -> InferredType {
836    if new == InferredType::Null {
837        return existing;
838    }
839    if existing == InferredType::Null {
840        return new;
841    }
842    if existing == new {
843        return existing;
844    }
845    // Mixed types → string
846    InferredType::Utf8
847}
848
849#[cfg(test)]
850mod tests {
851    use super::*;
852    use serde_json::json;
853
854    fn make_row(pairs: Vec<(&str, serde_json::Value)>) -> Row {
855        pairs
856            .into_iter()
857            .map(|(k, v)| (k.to_string(), v))
858            .collect()
859    }
860
861    // ── Legacy Row tests ──
862
863    #[test]
864    fn get_f64_from_number() {
865        let row = make_row(vec![("value", json!(42.5))]);
866        assert_eq!(get_f64(&row, "value"), Some(42.5));
867    }
868
869    #[test]
870    fn get_f64_from_string() {
871        let row = make_row(vec![("value", json!("123.45"))]);
872        assert_eq!(get_f64(&row, "value"), Some(123.45));
873    }
874
875    #[test]
876    fn get_f64_missing_field() {
877        let row = make_row(vec![("other", json!(1.0))]);
878        assert_eq!(get_f64(&row, "value"), None);
879    }
880
881    #[test]
882    fn get_string_from_various() {
883        let row_num = make_row(vec![("x", json!(42))]);
884        assert_eq!(get_string(&row_num, "x"), Some("42".to_string()));
885
886        let row_str = make_row(vec![("x", json!("hello"))]);
887        assert_eq!(get_string(&row_str, "x"), Some("hello".to_string()));
888
889        let row_bool = make_row(vec![("x", json!(true))]);
890        assert_eq!(get_string(&row_bool, "x"), Some("true".to_string()));
891
892        let row_null = make_row(vec![("x", json!(null))]);
893        assert_eq!(get_string(&row_null, "x"), None);
894    }
895
896    #[test]
897    fn extent_basic() {
898        let data = vec![
899            make_row(vec![("v", json!(10.0))]),
900            make_row(vec![("v", json!(30.0))]),
901            make_row(vec![("v", json!(20.0))]),
902        ];
903        assert_eq!(extent(&data, "v"), Some((10.0, 30.0)));
904    }
905
906    #[test]
907    fn extent_empty() {
908        let data: Vec<Row> = vec![];
909        assert_eq!(extent(&data, "v"), None);
910
911        let data = vec![make_row(vec![("other", json!(1.0))])];
912        assert_eq!(extent(&data, "v"), None);
913    }
914
915    #[test]
916    fn sum_basic() {
917        let data = vec![
918            make_row(vec![("v", json!(10.0))]),
919            make_row(vec![("v", json!(20.0))]),
920            make_row(vec![("v", json!(30.0))]),
921        ];
922        assert_eq!(sum(&data, "v"), 60.0);
923    }
924
925    #[test]
926    fn group_by_basic() {
927        let data = vec![
928            make_row(vec![("cat", json!("A")), ("v", json!(1))]),
929            make_row(vec![("cat", json!("B")), ("v", json!(2))]),
930            make_row(vec![("cat", json!("A")), ("v", json!(3))]),
931        ];
932        let groups = group_by(&data, "cat");
933        assert_eq!(groups.len(), 2);
934        assert_eq!(groups["A"].len(), 2);
935        assert_eq!(groups["B"].len(), 1);
936    }
937
938    #[test]
939    fn unique_values_preserves_order() {
940        let data = vec![
941            make_row(vec![("x", json!("banana"))]),
942            make_row(vec![("x", json!("apple"))]),
943            make_row(vec![("x", json!("banana"))]),
944            make_row(vec![("x", json!("cherry"))]),
945            make_row(vec![("x", json!("apple"))]),
946        ];
947        let uniq = unique_values(&data, "x");
948        assert_eq!(uniq, vec!["banana", "apple", "cherry"]);
949    }
950
951    // ── DataTable tests ──
952
953    #[test]
954    fn datatable_from_rows_roundtrip() {
955        let rows = vec![
956            make_row(vec![
957                ("name", json!("Alice")),
958                ("age", json!(30)),
959                ("active", json!(true)),
960            ]),
961            make_row(vec![
962                ("name", json!("Bob")),
963                ("age", json!(25)),
964                ("active", json!(false)),
965            ]),
966        ];
967
968        let dt = DataTable::from_rows(&rows).unwrap();
969        assert_eq!(dt.num_rows(), 2);
970        assert_eq!(dt.num_columns(), 3);
971
972        assert_eq!(dt.get_string(0, "name"), Some("Alice".to_string()));
973        assert_eq!(dt.get_f64(0, "age"), Some(30.0));
974        assert_eq!(dt.get_string(1, "name"), Some("Bob".to_string()));
975        assert_eq!(dt.get_f64(1, "age"), Some(25.0));
976    }
977
978    #[test]
979    fn datatable_empty() {
980        let dt = DataTable::from_rows(&[]).unwrap();
981        assert_eq!(dt.num_rows(), 0);
982        assert!(dt.is_empty());
983    }
984
985    #[test]
986    fn datatable_unique_values() {
987        let rows = vec![
988            make_row(vec![("x", json!("banana"))]),
989            make_row(vec![("x", json!("apple"))]),
990            make_row(vec![("x", json!("banana"))]),
991            make_row(vec![("x", json!("cherry"))]),
992        ];
993        let dt = DataTable::from_rows(&rows).unwrap();
994        // Values preserve first-appearance order from the rows
995        assert_eq!(dt.unique_values("x"), vec!["banana", "apple", "cherry"]);
996    }
997
998    #[test]
999    fn datatable_extent() {
1000        let rows = vec![
1001            make_row(vec![("v", json!(10.0))]),
1002            make_row(vec![("v", json!(30.0))]),
1003            make_row(vec![("v", json!(20.0))]),
1004        ];
1005        let dt = DataTable::from_rows(&rows).unwrap();
1006        assert_eq!(dt.extent("v"), Some((10.0, 30.0)));
1007    }
1008
1009    #[test]
1010    fn datatable_group_by() {
1011        let rows = vec![
1012            make_row(vec![("cat", json!("A")), ("v", json!(1))]),
1013            make_row(vec![("cat", json!("B")), ("v", json!(2))]),
1014            make_row(vec![("cat", json!("A")), ("v", json!(3))]),
1015        ];
1016        let dt = DataTable::from_rows(&rows).unwrap();
1017        let groups = dt.group_by("cat");
1018        assert_eq!(groups.len(), 2);
1019        assert_eq!(groups["A"].num_rows(), 2);
1020        assert_eq!(groups["B"].num_rows(), 1);
1021    }
1022
1023    #[test]
1024    fn datatable_sum() {
1025        let rows = vec![
1026            make_row(vec![("v", json!(10.0))]),
1027            make_row(vec![("v", json!(20.0))]),
1028            make_row(vec![("v", json!(30.0))]),
1029        ];
1030        let dt = DataTable::from_rows(&rows).unwrap();
1031        assert_eq!(dt.sum("v"), 60.0);
1032    }
1033
1034    #[test]
1035    fn datatable_ipc_roundtrip() {
1036        let rows = vec![
1037            make_row(vec![("name", json!("Alice")), ("score", json!(95.5))]),
1038            make_row(vec![("name", json!("Bob")), ("score", json!(87.0))]),
1039        ];
1040        let dt = DataTable::from_rows(&rows).unwrap();
1041        let bytes = dt.to_ipc_bytes().unwrap();
1042        let dt2 = DataTable::from_ipc_bytes(&bytes).unwrap();
1043        assert_eq!(dt2.num_rows(), 2);
1044        assert_eq!(dt2.get_string(0, "name"), Some("Alice".to_string()));
1045        assert_eq!(dt2.get_f64(1, "score"), Some(87.0));
1046    }
1047
1048    #[test]
1049    fn datatable_record_batch_with_timestamps() {
1050        use arrow::array::TimestampMicrosecondArray;
1051        use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1052
1053        // Build a RecordBatch with a proper Timestamp column
1054        let schema = Arc::new(Schema::new(vec![
1055            Field::new("ts", DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), true),
1056            Field::new("value", DataType::Float64, true),
1057        ]));
1058        // 2026-01-15T10:30:00Z = 1768474200 seconds = 1768474200000000 microseconds
1059        let ts_array = TimestampMicrosecondArray::from(vec![Some(1768474200000000i64)])
1060            .with_timezone("UTC");
1061        let val_array = Float64Array::from(vec![Some(42.0)]);
1062        let batch = RecordBatch::try_new(
1063            schema,
1064            vec![Arc::new(ts_array) as ArrayRef, Arc::new(val_array) as ArrayRef],
1065        )
1066        .unwrap();
1067
1068        let dt = DataTable::from_record_batch(batch);
1069        // get_f64 returns epoch millis
1070        assert_eq!(dt.get_f64(0, "ts"), Some(1768474200000.0));
1071        // get_string returns ISO 8601 with Z suffix (has timezone)
1072        let ts_str = dt.get_string(0, "ts").unwrap();
1073        assert!(ts_str.ends_with('Z'), "Expected Z suffix, got: {}", ts_str);
1074        assert!(ts_str.starts_with("2026-01-15T"), "Expected 2026-01-15T, got: {}", ts_str);
1075    }
1076
1077    #[test]
1078    fn datatable_record_batch_with_dates() {
1079        let schema = Arc::new(Schema::new(vec![
1080            Field::new("d", DataType::Date32, true),
1081        ]));
1082        // 2026-01-15 = 20468 days since epoch
1083        let date_array = Date32Array::from(vec![Some(20468)]);
1084        let batch = RecordBatch::try_new(schema, vec![Arc::new(date_array) as ArrayRef]).unwrap();
1085
1086        let dt = DataTable::from_record_batch(batch);
1087        assert_eq!(dt.get_string(0, "d"), Some("2026-01-15".to_string()));
1088        assert_eq!(dt.get_f64(0, "d"), Some(20468.0));
1089    }
1090
1091    #[test]
1092    fn datatable_has_field() {
1093        let rows = vec![make_row(vec![("x", json!(1))])];
1094        let dt = DataTable::from_rows(&rows).unwrap();
1095        assert!(dt.has_field("x"));
1096        assert!(!dt.has_field("y"));
1097    }
1098
1099    #[test]
1100    fn datatable_null_values() {
1101        let rows = vec![
1102            make_row(vec![("x", json!(1.0)), ("y", json!(null))]),
1103            make_row(vec![("x", json!(null)), ("y", json!("hello"))]),
1104        ];
1105        let dt = DataTable::from_rows(&rows).unwrap();
1106        assert_eq!(dt.get_f64(0, "x"), Some(1.0));
1107        assert_eq!(dt.get_f64(0, "y"), None);
1108        assert_eq!(dt.get_f64(1, "x"), None);
1109        assert_eq!(dt.get_string(1, "y"), Some("hello".to_string()));
1110    }
1111}