1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow::array::{
5 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, Float32Array,
6 Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringBuilder,
7 StringArray, Time64MicrosecondArray, Time64NanosecondArray, Time32MillisecondArray,
8 Time32SecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
9 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
10 UInt64Array,
11};
12use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
13
14use crate::error::ChartError;
15
16pub type Row = HashMap<String, serde_json::Value>;
19
20pub fn get_f64(row: &Row, field: &str) -> Option<f64> {
25 match row.get(field)? {
26 serde_json::Value::Number(n) => n.as_f64(),
27 serde_json::Value::String(s) => s.parse::<f64>().ok(),
28 _ => None,
29 }
30}
31
32pub fn get_string(row: &Row, field: &str) -> Option<String> {
34 match row.get(field)? {
35 serde_json::Value::String(s) => Some(s.clone()),
36 serde_json::Value::Number(n) => Some(n.to_string()),
37 serde_json::Value::Bool(b) => Some(b.to_string()),
38 serde_json::Value::Null => None,
39 other => Some(other.to_string()),
40 }
41}
42
43pub fn extent_rows(data: &[Row], field: &str) -> Option<(f64, f64)> {
45 let values: Vec<f64> = data.iter().filter_map(|row| get_f64(row, field)).collect();
46 if values.is_empty() {
47 return None;
48 }
49 let min = values.iter().cloned().fold(f64::INFINITY, f64::min);
50 let max = values.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
51 Some((min, max))
52}
53
54pub fn sum_rows(data: &[Row], field: &str) -> f64 {
56 data.iter().filter_map(|row| get_f64(row, field)).sum()
57}
58
59pub fn group_by_rows<'a>(data: &'a [Row], field: &str) -> HashMap<String, Vec<&'a Row>> {
61 let mut groups: HashMap<String, Vec<&'a Row>> = HashMap::new();
62 for row in data {
63 if let Some(key) = get_string(row, field) {
64 groups.entry(key).or_default().push(row);
65 }
66 }
67 groups
68}
69
70pub fn unique_values_rows(data: &[Row], field: &str) -> Vec<String> {
72 let mut seen = std::collections::HashSet::new();
73 let mut result = Vec::new();
74 for row in data {
75 if let Some(val) = get_string(row, field) {
76 if seen.insert(val.clone()) {
77 result.push(val);
78 }
79 }
80 }
81 result
82}
83
84pub use extent_rows as extent;
86pub use sum_rows as sum;
87pub use group_by_rows as group_by;
88pub use unique_values_rows as unique_values;
89
90#[derive(Debug, Clone)]
98pub struct DataTable {
99 batch: RecordBatch,
100 field_index: HashMap<String, usize>,
102}
103
104impl DataTable {
105 pub fn from_record_batch(batch: RecordBatch) -> Self {
107 let field_index = batch
108 .schema()
109 .fields()
110 .iter()
111 .enumerate()
112 .map(|(i, f)| (f.name().clone(), i))
113 .collect();
114 Self { batch, field_index }
115 }
116
117 pub fn from_rows(rows: &[Row]) -> Result<Self, ChartError> {
120 if rows.is_empty() {
121 let schema = Arc::new(Schema::new(Vec::<Field>::new()));
122 let batch = RecordBatch::new_empty(schema);
123 return Ok(Self::from_record_batch(batch));
124 }
125
126 let mut column_names: Vec<String> = Vec::new();
128 let mut seen = std::collections::HashSet::new();
129 for row in rows {
130 for key in row.keys() {
131 if seen.insert(key.clone()) {
132 column_names.push(key.clone());
133 }
134 }
135 }
136 column_names.sort();
137
138 let mut col_types: Vec<InferredType> = vec![InferredType::Null; column_names.len()];
140 for row in rows {
141 for (i, name) in column_names.iter().enumerate() {
142 if let Some(val) = row.get(name) {
143 let val_type = match val {
144 serde_json::Value::Number(_) => InferredType::Float64,
145 serde_json::Value::Bool(_) => InferredType::Boolean,
146 serde_json::Value::String(_) => InferredType::Utf8,
147 serde_json::Value::Null => InferredType::Null,
148 _ => InferredType::Utf8,
149 };
150 col_types[i] = merge_inferred(col_types[i], val_type);
151 }
152 }
153 }
154
155 for t in &mut col_types {
157 if *t == InferredType::Null {
158 *t = InferredType::Utf8;
159 }
160 }
161
162 let fields: Vec<Field> = column_names
164 .iter()
165 .zip(col_types.iter())
166 .map(|(name, typ)| {
167 let dt = match typ {
168 InferredType::Float64 => DataType::Float64,
169 InferredType::Boolean => DataType::Boolean,
170 InferredType::Utf8 | InferredType::Null => DataType::Utf8,
171 };
172 Field::new(name, dt, true)
173 })
174 .collect();
175 let schema = Arc::new(Schema::new(fields));
176
177 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_names.len());
179 for (i, name) in column_names.iter().enumerate() {
180 let arr: ArrayRef = match col_types[i] {
181 InferredType::Float64 => {
182 let values: Vec<Option<f64>> = rows
183 .iter()
184 .map(|row| {
185 row.get(name).and_then(|v| match v {
186 serde_json::Value::Number(n) => n.as_f64(),
187 serde_json::Value::String(s) => s.parse::<f64>().ok(),
188 _ => None,
189 })
190 })
191 .collect();
192 Arc::new(Float64Array::from(values))
193 }
194 InferredType::Boolean => {
195 let values: Vec<Option<bool>> = rows
196 .iter()
197 .map(|row| {
198 row.get(name).and_then(|v| match v {
199 serde_json::Value::Bool(b) => Some(*b),
200 _ => None,
201 })
202 })
203 .collect();
204 Arc::new(BooleanArray::from(values))
205 }
206 InferredType::Utf8 | InferredType::Null => {
207 let mut builder = StringBuilder::new();
208 for row in rows {
209 match row.get(name) {
210 Some(serde_json::Value::String(s)) => builder.append_value(s),
211 Some(serde_json::Value::Number(n)) => {
212 builder.append_value(n.to_string())
213 }
214 Some(serde_json::Value::Bool(b)) => {
215 builder.append_value(b.to_string())
216 }
217 Some(serde_json::Value::Null) | None => builder.append_null(),
218 Some(other) => builder.append_value(other.to_string()),
219 }
220 }
221 Arc::new(builder.finish())
222 }
223 };
224 arrays.push(arr);
225 }
226
227 let batch = RecordBatch::try_new(schema, arrays)
228 .map_err(|e| ChartError::DataError(format!("Failed to create RecordBatch: {}", e)))?;
229 Ok(Self::from_record_batch(batch))
230 }
231
232 pub fn from_ipc_bytes(bytes: &[u8]) -> Result<Self, ChartError> {
234 use arrow::ipc::reader::StreamReader;
235 use std::io::Cursor;
236
237 let cursor = Cursor::new(bytes);
238 let reader = StreamReader::try_new(cursor, None)
239 .map_err(|e| ChartError::DataError(format!("Failed to read Arrow IPC: {}", e)))?;
240
241 let schema = reader.schema();
242 let mut batches = Vec::new();
243 for batch_result in reader {
244 let batch = batch_result.map_err(|e| {
245 ChartError::DataError(format!("Failed to read Arrow batch: {}", e))
246 })?;
247 batches.push(batch);
248 }
249
250 if batches.is_empty() {
251 return Ok(Self::from_record_batch(RecordBatch::new_empty(schema)));
252 }
253
254 if batches.len() == 1 {
255 return Ok(Self::from_record_batch(batches.remove(0)));
256 }
257
258 let batch = arrow::compute::concat_batches(&schema, &batches)
260 .map_err(|e| ChartError::DataError(format!("Failed to concat batches: {}", e)))?;
261 Ok(Self::from_record_batch(batch))
262 }
263
264 pub fn to_ipc_bytes(&self) -> Result<Vec<u8>, ChartError> {
266 use arrow::ipc::writer::StreamWriter;
267
268 let mut buf = Vec::new();
269 {
270 let mut writer = StreamWriter::try_new(&mut buf, &self.batch.schema())
271 .map_err(|e| ChartError::DataError(format!("Failed to create IPC writer: {}", e)))?;
272 writer.write(&self.batch).map_err(|e| {
273 ChartError::DataError(format!("Failed to write Arrow batch: {}", e))
274 })?;
275 writer.finish().map_err(|e| {
276 ChartError::DataError(format!("Failed to finish IPC stream: {}", e))
277 })?;
278 }
279 Ok(buf)
280 }
281
282 pub fn num_rows(&self) -> usize {
286 self.batch.num_rows()
287 }
288
289 pub fn num_columns(&self) -> usize {
291 self.batch.num_columns()
292 }
293
294 pub fn is_empty(&self) -> bool {
296 self.batch.num_rows() == 0
297 }
298
299 pub fn record_batch(&self) -> &RecordBatch {
301 &self.batch
302 }
303
304 pub fn into_record_batch(self) -> RecordBatch {
306 self.batch
307 }
308
309 pub fn schema(&self) -> Arc<Schema> {
311 self.batch.schema()
312 }
313
314 fn column(&self, field: &str) -> Option<&ArrayRef> {
316 self.field_index.get(field).map(|&i| self.batch.column(i))
317 }
318
319 pub fn get_f64(&self, row: usize, field: &str) -> Option<f64> {
322 let col = self.column(field)?;
323 if col.is_null(row) {
324 return None;
325 }
326 arrow_to_f64(col, row)
327 }
328
329 pub fn get_string(&self, row: usize, field: &str) -> Option<String> {
332 let col = self.column(field)?;
333 if col.is_null(row) {
334 return None;
335 }
336 arrow_to_string(col, row)
337 }
338
339 pub fn unique_values(&self, field: &str) -> Vec<String> {
341 let col = match self.column(field) {
342 Some(c) => c,
343 None => return Vec::new(),
344 };
345 let mut seen = std::collections::HashSet::new();
346 let mut result = Vec::new();
347 for i in 0..self.batch.num_rows() {
348 if col.is_null(i) {
349 continue;
350 }
351 if let Some(val) = arrow_to_string(col, i) {
352 if seen.insert(val.clone()) {
353 result.push(val);
354 }
355 }
356 }
357 result
358 }
359
360 pub fn all_values(&self, field: &str) -> Vec<String> {
362 let col = match self.column(field) {
363 Some(c) => c,
364 None => return Vec::new(),
365 };
366 let mut result = Vec::new();
367 for i in 0..self.batch.num_rows() {
368 if col.is_null(i) {
369 continue;
370 }
371 if let Some(val) = arrow_to_string(col, i) {
372 result.push(val);
373 }
374 }
375 result
376 }
377
378 pub fn extent(&self, field: &str) -> Option<(f64, f64)> {
380 let col = self.column(field)?;
381 let mut min = f64::INFINITY;
382 let mut max = f64::NEG_INFINITY;
383 let mut found = false;
384 for i in 0..self.batch.num_rows() {
385 if col.is_null(i) {
386 continue;
387 }
388 if let Some(v) = arrow_to_f64(col, i) {
389 found = true;
390 if v < min {
391 min = v;
392 }
393 if v > max {
394 max = v;
395 }
396 }
397 }
398 if found {
399 Some((min, max))
400 } else {
401 None
402 }
403 }
404
405 pub fn sum(&self, field: &str) -> f64 {
407 let col = match self.column(field) {
408 Some(c) => c,
409 None => return 0.0,
410 };
411 let mut total = 0.0;
412 for i in 0..self.batch.num_rows() {
413 if !col.is_null(i) {
414 if let Some(v) = arrow_to_f64(col, i) {
415 total += v;
416 }
417 }
418 }
419 total
420 }
421
422 pub fn group_by(&self, field: &str) -> HashMap<String, DataTable> {
424 let col = match self.column(field) {
425 Some(c) => c,
426 None => return HashMap::new(),
427 };
428
429 let mut group_indices: HashMap<String, Vec<u32>> = HashMap::new();
431 let mut key_order: Vec<String> = Vec::new();
432 let mut seen_keys = std::collections::HashSet::new();
433
434 for i in 0..self.batch.num_rows() {
435 if col.is_null(i) {
436 continue;
437 }
438 if let Some(key) = arrow_to_string(col, i) {
439 if seen_keys.insert(key.clone()) {
440 key_order.push(key.clone());
441 }
442 group_indices.entry(key).or_default().push(i as u32);
443 }
444 }
445
446 let mut result = HashMap::new();
448 for key in key_order {
449 if let Some(indices) = group_indices.get(&key) {
450 let indices_arr = UInt32Array::from(indices.clone());
451 let take_result: Result<Vec<ArrayRef>, _> = self
452 .batch
453 .columns()
454 .iter()
455 .map(|col| arrow::compute::take(col.as_ref(), &indices_arr, None))
456 .collect();
457 if let Ok(columns) = take_result {
458 if let Ok(sub_batch) = RecordBatch::try_new(self.batch.schema(), columns) {
459 result.insert(key, DataTable::from_record_batch(sub_batch));
460 }
461 }
462 }
463 }
464 result
465 }
466
467 pub fn has_field(&self, field: &str) -> bool {
469 self.field_index.contains_key(field)
470 }
471
472 pub fn field_names(&self) -> Vec<String> {
474 self.batch
475 .schema()
476 .fields()
477 .iter()
478 .map(|f| f.name().clone())
479 .collect()
480 }
481
482 pub fn to_rows(&self) -> Vec<Row> {
485 let num_rows = self.batch.num_rows();
486 let schema = self.batch.schema();
487 let fields = schema.fields();
488
489 let mut rows = Vec::with_capacity(num_rows);
490 for row_idx in 0..num_rows {
491 let mut row = Row::new();
492 for (col_idx, field) in fields.iter().enumerate() {
493 let col = self.batch.column(col_idx);
494 if col.is_null(row_idx) {
495 row.insert(field.name().clone(), serde_json::Value::Null);
496 continue;
497 }
498 let value = match col.data_type() {
499 DataType::Boolean => {
500 let v = col.as_any().downcast_ref::<BooleanArray>()
501 .expect("DataType::Boolean arm guarantees BooleanArray")
502 .value(row_idx);
503 serde_json::json!(v)
504 }
505 DataType::Float64 | DataType::Float32 |
506 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 |
507 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 |
508 DataType::Decimal128(_, _) => {
509 if let Some(v) = arrow_to_f64(col, row_idx) {
510 serde_json::json!(v)
511 } else {
512 serde_json::Value::Null
513 }
514 }
515 DataType::Date32 | DataType::Date64 |
517 DataType::Timestamp(_, _) => {
518 if let Some(s) = arrow_to_string(col, row_idx) {
519 serde_json::Value::String(s)
520 } else {
521 serde_json::Value::Null
522 }
523 }
524 _ => {
525 if let Some(s) = arrow_to_string(col, row_idx) {
526 serde_json::Value::String(s)
527 } else {
528 serde_json::Value::Null
529 }
530 }
531 };
532 row.insert(field.name().clone(), value);
533 }
534 rows.push(row);
535 }
536 rows
537 }
538}
539
540fn arrow_to_f64(col: &ArrayRef, idx: usize) -> Option<f64> {
544 match col.data_type() {
545 DataType::Float64 => {
546 Some(col.as_any().downcast_ref::<Float64Array>()
547 .expect("DataType::Float64 arm guarantees Float64Array")
548 .value(idx))
549 }
550 DataType::Float32 => {
551 Some(col.as_any().downcast_ref::<Float32Array>()
552 .expect("DataType::Float32 arm guarantees Float32Array")
553 .value(idx) as f64)
554 }
555 DataType::Int64 => {
556 Some(col.as_any().downcast_ref::<Int64Array>()
557 .expect("DataType::Int64 arm guarantees Int64Array")
558 .value(idx) as f64)
559 }
560 DataType::Int32 => {
561 Some(col.as_any().downcast_ref::<Int32Array>()
562 .expect("DataType::Int32 arm guarantees Int32Array")
563 .value(idx) as f64)
564 }
565 DataType::Int16 => {
566 Some(col.as_any().downcast_ref::<Int16Array>()
567 .expect("DataType::Int16 arm guarantees Int16Array")
568 .value(idx) as f64)
569 }
570 DataType::Int8 => {
571 Some(col.as_any().downcast_ref::<Int8Array>()
572 .expect("DataType::Int8 arm guarantees Int8Array")
573 .value(idx) as f64)
574 }
575 DataType::UInt64 => {
576 Some(col.as_any().downcast_ref::<UInt64Array>()
577 .expect("DataType::UInt64 arm guarantees UInt64Array")
578 .value(idx) as f64)
579 }
580 DataType::UInt32 => {
581 Some(col.as_any().downcast_ref::<UInt32Array>()
582 .expect("DataType::UInt32 arm guarantees UInt32Array")
583 .value(idx) as f64)
584 }
585 DataType::UInt16 => {
586 Some(col.as_any().downcast_ref::<UInt16Array>()
587 .expect("DataType::UInt16 arm guarantees UInt16Array")
588 .value(idx) as f64)
589 }
590 DataType::UInt8 => {
591 Some(col.as_any().downcast_ref::<UInt8Array>()
592 .expect("DataType::UInt8 arm guarantees UInt8Array")
593 .value(idx) as f64)
594 }
595 DataType::Boolean => {
596 let v = col.as_any().downcast_ref::<BooleanArray>()
597 .expect("DataType::Boolean arm guarantees BooleanArray")
598 .value(idx);
599 Some(if v { 1.0 } else { 0.0 })
600 }
601 DataType::Date32 => {
602 Some(col.as_any().downcast_ref::<Date32Array>()
604 .expect("DataType::Date32 arm guarantees Date32Array")
605 .value(idx) as f64)
606 }
607 DataType::Date64 => {
608 Some(col.as_any().downcast_ref::<Date64Array>()
610 .expect("DataType::Date64 arm guarantees Date64Array")
611 .value(idx) as f64)
612 }
613 DataType::Timestamp(unit, _) => {
614 let raw = match unit {
615 TimeUnit::Second => col
616 .as_any()
617 .downcast_ref::<TimestampSecondArray>()
618 .expect("Timestamp(Second) arm guarantees TimestampSecondArray")
619 .value(idx),
620 TimeUnit::Millisecond => col
621 .as_any()
622 .downcast_ref::<TimestampMillisecondArray>()
623 .expect("Timestamp(Millisecond) arm guarantees TimestampMillisecondArray")
624 .value(idx),
625 TimeUnit::Microsecond => col
626 .as_any()
627 .downcast_ref::<TimestampMicrosecondArray>()
628 .expect("Timestamp(Microsecond) arm guarantees TimestampMicrosecondArray")
629 .value(idx),
630 TimeUnit::Nanosecond => col
631 .as_any()
632 .downcast_ref::<TimestampNanosecondArray>()
633 .expect("Timestamp(Nanosecond) arm guarantees TimestampNanosecondArray")
634 .value(idx),
635 };
636 let millis = match unit {
638 TimeUnit::Second => raw * 1000,
639 TimeUnit::Millisecond => raw,
640 TimeUnit::Microsecond => raw / 1000,
641 TimeUnit::Nanosecond => raw / 1_000_000,
642 };
643 Some(millis as f64)
644 }
645 DataType::Decimal128(_, scale) => {
646 let raw = col
647 .as_any()
648 .downcast_ref::<Decimal128Array>()
649 .expect("DataType::Decimal128 arm guarantees Decimal128Array")
650 .value(idx);
651 let divisor = 10_f64.powi(*scale as i32);
652 Some(raw as f64 / divisor)
653 }
654 DataType::Utf8 => {
655 let s = col.as_any().downcast_ref::<StringArray>()
657 .expect("DataType::Utf8 arm guarantees StringArray")
658 .value(idx);
659 s.parse::<f64>().ok()
660 }
661 _ => None,
662 }
663}
664
665fn arrow_to_string(col: &ArrayRef, idx: usize) -> Option<String> {
667 match col.data_type() {
668 DataType::Utf8 => {
669 Some(
670 col.as_any()
671 .downcast_ref::<StringArray>()
672 .expect("DataType::Utf8 arm guarantees StringArray")
673 .value(idx)
674 .to_string(),
675 )
676 }
677 DataType::LargeUtf8 => {
678 Some(
679 col.as_any()
680 .downcast_ref::<arrow::array::LargeStringArray>()
681 .expect("DataType::LargeUtf8 arm guarantees LargeStringArray")
682 .value(idx)
683 .to_string(),
684 )
685 }
686 DataType::Float64 => {
687 let v = col.as_any().downcast_ref::<Float64Array>()
688 .expect("DataType::Float64 arm guarantees Float64Array")
689 .value(idx);
690 Some(format_f64(v))
691 }
692 DataType::Float32 => {
693 let v = col.as_any().downcast_ref::<Float32Array>()
694 .expect("DataType::Float32 arm guarantees Float32Array")
695 .value(idx) as f64;
696 Some(format_f64(v))
697 }
698 DataType::Int64 => {
699 Some(col.as_any().downcast_ref::<Int64Array>()
700 .expect("DataType::Int64 arm guarantees Int64Array")
701 .value(idx).to_string())
702 }
703 DataType::Int32 => {
704 Some(col.as_any().downcast_ref::<Int32Array>()
705 .expect("DataType::Int32 arm guarantees Int32Array")
706 .value(idx).to_string())
707 }
708 DataType::Int16 => {
709 Some(col.as_any().downcast_ref::<Int16Array>()
710 .expect("DataType::Int16 arm guarantees Int16Array")
711 .value(idx).to_string())
712 }
713 DataType::Int8 => {
714 Some(col.as_any().downcast_ref::<Int8Array>()
715 .expect("DataType::Int8 arm guarantees Int8Array")
716 .value(idx).to_string())
717 }
718 DataType::UInt64 => {
719 Some(col.as_any().downcast_ref::<UInt64Array>()
720 .expect("DataType::UInt64 arm guarantees UInt64Array")
721 .value(idx).to_string())
722 }
723 DataType::UInt32 => {
724 Some(col.as_any().downcast_ref::<UInt32Array>()
725 .expect("DataType::UInt32 arm guarantees UInt32Array")
726 .value(idx).to_string())
727 }
728 DataType::UInt16 => {
729 Some(col.as_any().downcast_ref::<UInt16Array>()
730 .expect("DataType::UInt16 arm guarantees UInt16Array")
731 .value(idx).to_string())
732 }
733 DataType::UInt8 => {
734 Some(col.as_any().downcast_ref::<UInt8Array>()
735 .expect("DataType::UInt8 arm guarantees UInt8Array")
736 .value(idx).to_string())
737 }
738 DataType::Boolean => {
739 Some(col.as_any().downcast_ref::<BooleanArray>()
740 .expect("DataType::Boolean arm guarantees BooleanArray")
741 .value(idx).to_string())
742 }
743 DataType::Date32 => {
744 let days = col.as_any().downcast_ref::<Date32Array>()
745 .expect("DataType::Date32 arm guarantees Date32Array")
746 .value(idx);
747 Some(days_to_iso(days as i64))
748 }
749 DataType::Date64 => {
750 let millis = col.as_any().downcast_ref::<Date64Array>()
751 .expect("DataType::Date64 arm guarantees Date64Array")
752 .value(idx);
753 let days = millis / 86_400_000;
755 Some(days_to_iso(days))
756 }
757 DataType::Timestamp(unit, tz) => {
758 let raw = match unit {
759 TimeUnit::Second => col
760 .as_any()
761 .downcast_ref::<TimestampSecondArray>()
762 .expect("Timestamp(Second) arm guarantees TimestampSecondArray")
763 .value(idx),
764 TimeUnit::Millisecond => col
765 .as_any()
766 .downcast_ref::<TimestampMillisecondArray>()
767 .expect("Timestamp(Millisecond) arm guarantees TimestampMillisecondArray")
768 .value(idx),
769 TimeUnit::Microsecond => col
770 .as_any()
771 .downcast_ref::<TimestampMicrosecondArray>()
772 .expect("Timestamp(Microsecond) arm guarantees TimestampMicrosecondArray")
773 .value(idx),
774 TimeUnit::Nanosecond => col
775 .as_any()
776 .downcast_ref::<TimestampNanosecondArray>()
777 .expect("Timestamp(Nanosecond) arm guarantees TimestampNanosecondArray")
778 .value(idx),
779 };
780 let (secs, nanos_u32) = match unit {
783 TimeUnit::Second => (raw, 0u32),
784 TimeUnit::Millisecond => {
785 let (s, r) = (raw.div_euclid(1000), raw.rem_euclid(1000));
786 (s, (r * 1_000_000) as u32)
787 }
788 TimeUnit::Microsecond => {
789 let (s, r) = (raw.div_euclid(1_000_000), raw.rem_euclid(1_000_000));
790 (s, (r * 1000) as u32)
791 }
792 TimeUnit::Nanosecond => {
793 let (s, r) = (raw.div_euclid(1_000_000_000), raw.rem_euclid(1_000_000_000));
794 (s, r as u32)
795 }
796 };
797 let iso = epoch_to_iso(secs, nanos_u32);
798 if tz.is_some() {
799 Some(format!("{}Z", iso))
800 } else {
801 Some(iso)
802 }
803 }
804 DataType::Time64(TimeUnit::Microsecond) => {
805 let micros = col
806 .as_any()
807 .downcast_ref::<Time64MicrosecondArray>()
808 .expect("Time64(Microsecond) arm guarantees Time64MicrosecondArray")
809 .value(idx);
810 Some(micros_to_hms(micros))
811 }
812 DataType::Time64(TimeUnit::Nanosecond) => {
813 let nanos = col
814 .as_any()
815 .downcast_ref::<Time64NanosecondArray>()
816 .expect("Time64(Nanosecond) arm guarantees Time64NanosecondArray")
817 .value(idx);
818 Some(micros_to_hms(nanos / 1000))
819 }
820 DataType::Time32(TimeUnit::Second) => {
821 let secs = col
822 .as_any()
823 .downcast_ref::<Time32SecondArray>()
824 .expect("Time32(Second) arm guarantees Time32SecondArray")
825 .value(idx);
826 Some(micros_to_hms(secs as i64 * 1_000_000))
827 }
828 DataType::Time32(TimeUnit::Millisecond) => {
829 let millis = col
830 .as_any()
831 .downcast_ref::<Time32MillisecondArray>()
832 .expect("Time32(Millisecond) arm guarantees Time32MillisecondArray")
833 .value(idx);
834 Some(micros_to_hms(millis as i64 * 1000))
835 }
836 DataType::Decimal128(_, scale) => {
837 let raw = col
838 .as_any()
839 .downcast_ref::<Decimal128Array>()
840 .expect("DataType::Decimal128 arm guarantees Decimal128Array")
841 .value(idx);
842 Some(format_decimal128(raw, *scale))
843 }
844 _ => None,
845 }
846}
847
848fn format_f64(v: f64) -> String {
850 if v.fract() == 0.0 && v.abs() < 1e15 {
851 format!("{}", v as i64)
852 } else {
853 v.to_string()
854 }
855}
856
857fn days_to_iso(days: i64) -> String {
859 let (year, month, day) = civil_from_days(days);
860 format!("{:04}-{:02}-{:02}", year, month, day)
861}
862
863fn epoch_to_iso(secs: i64, nanos: u32) -> String {
865 let days = if secs >= 0 {
866 secs / 86400
867 } else {
868 (secs - 86399) / 86400
869 };
870 let day_secs = secs - days * 86400;
871 let (year, month, day) = civil_from_days(days);
872 let hours = day_secs / 3600;
873 let minutes = (day_secs % 3600) / 60;
874 let seconds = day_secs % 60;
875
876 if nanos == 0 {
877 format!(
878 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}",
879 year, month, day, hours, minutes, seconds
880 )
881 } else {
882 let millis = nanos / 1_000_000;
883 format!(
884 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}",
885 year, month, day, hours, minutes, seconds, millis
886 )
887 }
888}
889
890fn micros_to_hms(micros: i64) -> String {
894 let sign = if micros < 0 { "-" } else { "" };
895 let total_secs = micros.unsigned_abs() / 1_000_000;
896 let hours = total_secs / 3600;
897 let minutes = (total_secs % 3600) / 60;
898 let seconds = total_secs % 60;
899 format!("{sign}{:02}:{:02}:{:02}", hours, minutes, seconds)
900}
901
902fn civil_from_days(days: i64) -> (i64, u32, u32) {
905 let z = days + 719468;
906 let era = if z >= 0 { z } else { z - 146096 } / 146097;
907 let doe = (z - era * 146097) as u32;
908 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
909 let y = yoe as i64 + era * 400;
910 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
911 let mp = (5 * doy + 2) / 153;
912 let d = doy - (153 * mp + 2) / 5 + 1;
913 let m = if mp < 10 { mp + 3 } else { mp - 9 };
914 let y = if m <= 2 { y + 1 } else { y };
915 (y, m, d)
916}
917
918fn format_decimal128(raw: i128, scale: i8) -> String {
920 if scale <= 0 {
921 return raw.to_string();
922 }
923 let divisor = 10_i128.pow(scale as u32);
924 let whole = raw / divisor;
925 let frac = (raw % divisor).abs();
926 let sign = if raw < 0 && whole == 0 { "-" } else { "" };
929 format!("{}{}.{:0>width$}", sign, whole, frac, width = scale as usize)
930}
931
932#[derive(Debug, Clone, Copy, PartialEq)]
935enum InferredType {
936 Float64,
937 Boolean,
938 Utf8,
939 Null,
940}
941
942fn merge_inferred(existing: InferredType, new: InferredType) -> InferredType {
943 if new == InferredType::Null {
944 return existing;
945 }
946 if existing == InferredType::Null {
947 return new;
948 }
949 if existing == new {
950 return existing;
951 }
952 InferredType::Utf8
954}
955
956#[cfg(test)]
957mod tests {
958 #![allow(clippy::unwrap_used)]
959 use super::*;
960 use serde_json::json;
961
962 fn make_row(pairs: Vec<(&str, serde_json::Value)>) -> Row {
963 pairs
964 .into_iter()
965 .map(|(k, v)| (k.to_string(), v))
966 .collect()
967 }
968
969 #[test]
972 fn get_f64_from_number() {
973 let row = make_row(vec![("value", json!(42.5))]);
974 assert_eq!(get_f64(&row, "value"), Some(42.5));
975 }
976
977 #[test]
978 fn get_f64_from_string() {
979 let row = make_row(vec![("value", json!("123.45"))]);
980 assert_eq!(get_f64(&row, "value"), Some(123.45));
981 }
982
983 #[test]
984 fn get_f64_missing_field() {
985 let row = make_row(vec![("other", json!(1.0))]);
986 assert_eq!(get_f64(&row, "value"), None);
987 }
988
989 #[test]
990 fn get_string_from_various() {
991 let row_num = make_row(vec![("x", json!(42))]);
992 assert_eq!(get_string(&row_num, "x"), Some("42".to_string()));
993
994 let row_str = make_row(vec![("x", json!("hello"))]);
995 assert_eq!(get_string(&row_str, "x"), Some("hello".to_string()));
996
997 let row_bool = make_row(vec![("x", json!(true))]);
998 assert_eq!(get_string(&row_bool, "x"), Some("true".to_string()));
999
1000 let row_null = make_row(vec![("x", json!(null))]);
1001 assert_eq!(get_string(&row_null, "x"), None);
1002 }
1003
1004 #[test]
1005 fn extent_basic() {
1006 let data = vec![
1007 make_row(vec![("v", json!(10.0))]),
1008 make_row(vec![("v", json!(30.0))]),
1009 make_row(vec![("v", json!(20.0))]),
1010 ];
1011 assert_eq!(extent(&data, "v"), Some((10.0, 30.0)));
1012 }
1013
1014 #[test]
1015 fn extent_empty() {
1016 let data: Vec<Row> = vec![];
1017 assert_eq!(extent(&data, "v"), None);
1018
1019 let data = vec![make_row(vec![("other", json!(1.0))])];
1020 assert_eq!(extent(&data, "v"), None);
1021 }
1022
1023 #[test]
1024 fn sum_basic() {
1025 let data = vec![
1026 make_row(vec![("v", json!(10.0))]),
1027 make_row(vec![("v", json!(20.0))]),
1028 make_row(vec![("v", json!(30.0))]),
1029 ];
1030 assert_eq!(sum(&data, "v"), 60.0);
1031 }
1032
1033 #[test]
1034 fn group_by_basic() {
1035 let data = vec![
1036 make_row(vec![("cat", json!("A")), ("v", json!(1))]),
1037 make_row(vec![("cat", json!("B")), ("v", json!(2))]),
1038 make_row(vec![("cat", json!("A")), ("v", json!(3))]),
1039 ];
1040 let groups = group_by(&data, "cat");
1041 assert_eq!(groups.len(), 2);
1042 assert_eq!(groups["A"].len(), 2);
1043 assert_eq!(groups["B"].len(), 1);
1044 }
1045
1046 #[test]
1047 fn unique_values_preserves_order() {
1048 let data = vec![
1049 make_row(vec![("x", json!("banana"))]),
1050 make_row(vec![("x", json!("apple"))]),
1051 make_row(vec![("x", json!("banana"))]),
1052 make_row(vec![("x", json!("cherry"))]),
1053 make_row(vec![("x", json!("apple"))]),
1054 ];
1055 let uniq = unique_values(&data, "x");
1056 assert_eq!(uniq, vec!["banana", "apple", "cherry"]);
1057 }
1058
1059 #[test]
1062 fn datatable_from_rows_roundtrip() {
1063 let rows = vec![
1064 make_row(vec![
1065 ("name", json!("Alice")),
1066 ("age", json!(30)),
1067 ("active", json!(true)),
1068 ]),
1069 make_row(vec![
1070 ("name", json!("Bob")),
1071 ("age", json!(25)),
1072 ("active", json!(false)),
1073 ]),
1074 ];
1075
1076 let dt = DataTable::from_rows(&rows).unwrap();
1077 assert_eq!(dt.num_rows(), 2);
1078 assert_eq!(dt.num_columns(), 3);
1079
1080 assert_eq!(dt.get_string(0, "name"), Some("Alice".to_string()));
1081 assert_eq!(dt.get_f64(0, "age"), Some(30.0));
1082 assert_eq!(dt.get_string(1, "name"), Some("Bob".to_string()));
1083 assert_eq!(dt.get_f64(1, "age"), Some(25.0));
1084 }
1085
1086 #[test]
1087 fn datatable_empty() {
1088 let dt = DataTable::from_rows(&[]).unwrap();
1089 assert_eq!(dt.num_rows(), 0);
1090 assert!(dt.is_empty());
1091 }
1092
1093 #[test]
1094 fn datatable_unique_values() {
1095 let rows = vec![
1096 make_row(vec![("x", json!("banana"))]),
1097 make_row(vec![("x", json!("apple"))]),
1098 make_row(vec![("x", json!("banana"))]),
1099 make_row(vec![("x", json!("cherry"))]),
1100 ];
1101 let dt = DataTable::from_rows(&rows).unwrap();
1102 assert_eq!(dt.unique_values("x"), vec!["banana", "apple", "cherry"]);
1104 }
1105
1106 #[test]
1107 fn datatable_extent() {
1108 let rows = vec![
1109 make_row(vec![("v", json!(10.0))]),
1110 make_row(vec![("v", json!(30.0))]),
1111 make_row(vec![("v", json!(20.0))]),
1112 ];
1113 let dt = DataTable::from_rows(&rows).unwrap();
1114 assert_eq!(dt.extent("v"), Some((10.0, 30.0)));
1115 }
1116
1117 #[test]
1118 fn datatable_group_by() {
1119 let rows = vec![
1120 make_row(vec![("cat", json!("A")), ("v", json!(1))]),
1121 make_row(vec![("cat", json!("B")), ("v", json!(2))]),
1122 make_row(vec![("cat", json!("A")), ("v", json!(3))]),
1123 ];
1124 let dt = DataTable::from_rows(&rows).unwrap();
1125 let groups = dt.group_by("cat");
1126 assert_eq!(groups.len(), 2);
1127 assert_eq!(groups["A"].num_rows(), 2);
1128 assert_eq!(groups["B"].num_rows(), 1);
1129 }
1130
1131 #[test]
1132 fn datatable_sum() {
1133 let rows = vec![
1134 make_row(vec![("v", json!(10.0))]),
1135 make_row(vec![("v", json!(20.0))]),
1136 make_row(vec![("v", json!(30.0))]),
1137 ];
1138 let dt = DataTable::from_rows(&rows).unwrap();
1139 assert_eq!(dt.sum("v"), 60.0);
1140 }
1141
1142 #[test]
1143 fn datatable_ipc_roundtrip() {
1144 let rows = vec![
1145 make_row(vec![("name", json!("Alice")), ("score", json!(95.5))]),
1146 make_row(vec![("name", json!("Bob")), ("score", json!(87.0))]),
1147 ];
1148 let dt = DataTable::from_rows(&rows).unwrap();
1149 let bytes = dt.to_ipc_bytes().unwrap();
1150 let dt2 = DataTable::from_ipc_bytes(&bytes).unwrap();
1151 assert_eq!(dt2.num_rows(), 2);
1152 assert_eq!(dt2.get_string(0, "name"), Some("Alice".to_string()));
1153 assert_eq!(dt2.get_f64(1, "score"), Some(87.0));
1154 }
1155
1156 #[test]
1157 fn datatable_record_batch_with_timestamps() {
1158 use arrow::array::TimestampMicrosecondArray;
1159 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1160
1161 let schema = Arc::new(Schema::new(vec![
1163 Field::new("ts", DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), true),
1164 Field::new("value", DataType::Float64, true),
1165 ]));
1166 let ts_array = TimestampMicrosecondArray::from(vec![Some(1768474200000000i64)])
1168 .with_timezone("UTC");
1169 let val_array = Float64Array::from(vec![Some(42.0)]);
1170 let batch = RecordBatch::try_new(
1171 schema,
1172 vec![Arc::new(ts_array) as ArrayRef, Arc::new(val_array) as ArrayRef],
1173 )
1174 .unwrap();
1175
1176 let dt = DataTable::from_record_batch(batch);
1177 assert_eq!(dt.get_f64(0, "ts"), Some(1768474200000.0));
1179 let ts_str = dt.get_string(0, "ts").unwrap();
1181 assert!(ts_str.ends_with('Z'), "Expected Z suffix, got: {}", ts_str);
1182 assert!(ts_str.starts_with("2026-01-15T"), "Expected 2026-01-15T, got: {}", ts_str);
1183 }
1184
1185 #[test]
1186 fn datatable_record_batch_with_dates() {
1187 let schema = Arc::new(Schema::new(vec![
1188 Field::new("d", DataType::Date32, true),
1189 ]));
1190 let date_array = Date32Array::from(vec![Some(20468)]);
1192 let batch = RecordBatch::try_new(schema, vec![Arc::new(date_array) as ArrayRef]).unwrap();
1193
1194 let dt = DataTable::from_record_batch(batch);
1195 assert_eq!(dt.get_string(0, "d"), Some("2026-01-15".to_string()));
1196 assert_eq!(dt.get_f64(0, "d"), Some(20468.0));
1197 }
1198
1199 #[test]
1200 fn datatable_has_field() {
1201 let rows = vec![make_row(vec![("x", json!(1))])];
1202 let dt = DataTable::from_rows(&rows).unwrap();
1203 assert!(dt.has_field("x"));
1204 assert!(!dt.has_field("y"));
1205 }
1206
1207 #[test]
1208 fn datatable_record_batch_with_time_columns() {
1209 use arrow::array::{Time64MicrosecondArray, Time32SecondArray};
1210
1211 let schema = Arc::new(Schema::new(vec![
1212 Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond), true),
1213 Field::new("t32_s", DataType::Time32(TimeUnit::Second), false),
1214 ]));
1215
1216 let batch = RecordBatch::try_new(
1217 schema,
1218 vec![
1219 Arc::new(Time64MicrosecondArray::from(vec![
1222 Some(37_845_000_000i64),
1223 Some(0i64),
1224 Some(37_845_123_456i64),
1225 None,
1226 ])),
1227 Arc::new(Time32SecondArray::from(vec![
1228 37845i32,
1229 0i32,
1230 86399i32,
1231 3661i32,
1232 ])),
1233 ],
1234 )
1235 .unwrap();
1236
1237 let dt = DataTable::from_record_batch(batch);
1238
1239 assert_eq!(dt.get_string(0, "t64_us"), Some("10:30:45".to_string()));
1241 assert_eq!(dt.get_string(1, "t64_us"), Some("00:00:00".to_string()));
1242 assert_eq!(dt.get_string(2, "t64_us"), Some("10:30:45".to_string()));
1244 assert_eq!(dt.get_string(3, "t64_us"), None);
1245
1246 assert_eq!(dt.get_string(0, "t32_s"), Some("10:30:45".to_string()));
1248 assert_eq!(dt.get_string(1, "t32_s"), Some("00:00:00".to_string()));
1249 assert_eq!(dt.get_string(2, "t32_s"), Some("23:59:59".to_string()));
1250 assert_eq!(dt.get_string(3, "t32_s"), Some("01:01:01".to_string()));
1251 }
1252
1253 #[test]
1254 fn datatable_null_values() {
1255 let rows = vec![
1256 make_row(vec![("x", json!(1.0)), ("y", json!(null))]),
1257 make_row(vec![("x", json!(null)), ("y", json!("hello"))]),
1258 ];
1259 let dt = DataTable::from_rows(&rows).unwrap();
1260 assert_eq!(dt.get_f64(0, "x"), Some(1.0));
1261 assert_eq!(dt.get_f64(0, "y"), None);
1262 assert_eq!(dt.get_f64(1, "x"), None);
1263 assert_eq!(dt.get_string(1, "y"), Some("hello".to_string()));
1264 }
1265}