1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow::array::{
5 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, Float32Array,
6 Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringBuilder,
7 StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
8 TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
9};
10use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
11
12use crate::error::ChartError;
13
14pub type Row = HashMap<String, serde_json::Value>;
17
18pub fn get_f64(row: &Row, field: &str) -> Option<f64> {
23 match row.get(field)? {
24 serde_json::Value::Number(n) => n.as_f64(),
25 serde_json::Value::String(s) => s.parse::<f64>().ok(),
26 _ => None,
27 }
28}
29
30pub fn get_string(row: &Row, field: &str) -> Option<String> {
32 match row.get(field)? {
33 serde_json::Value::String(s) => Some(s.clone()),
34 serde_json::Value::Number(n) => Some(n.to_string()),
35 serde_json::Value::Bool(b) => Some(b.to_string()),
36 serde_json::Value::Null => None,
37 other => Some(other.to_string()),
38 }
39}
40
41pub fn extent_rows(data: &[Row], field: &str) -> Option<(f64, f64)> {
43 let values: Vec<f64> = data.iter().filter_map(|row| get_f64(row, field)).collect();
44 if values.is_empty() {
45 return None;
46 }
47 let min = values.iter().cloned().fold(f64::INFINITY, f64::min);
48 let max = values.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
49 Some((min, max))
50}
51
52pub fn sum_rows(data: &[Row], field: &str) -> f64 {
54 data.iter().filter_map(|row| get_f64(row, field)).sum()
55}
56
57pub fn group_by_rows<'a>(data: &'a [Row], field: &str) -> HashMap<String, Vec<&'a Row>> {
59 let mut groups: HashMap<String, Vec<&'a Row>> = HashMap::new();
60 for row in data {
61 if let Some(key) = get_string(row, field) {
62 groups.entry(key).or_default().push(row);
63 }
64 }
65 groups
66}
67
68pub fn unique_values_rows(data: &[Row], field: &str) -> Vec<String> {
70 let mut seen = std::collections::HashSet::new();
71 let mut result = Vec::new();
72 for row in data {
73 if let Some(val) = get_string(row, field) {
74 if seen.insert(val.clone()) {
75 result.push(val);
76 }
77 }
78 }
79 result
80}
81
82pub use extent_rows as extent;
84pub use sum_rows as sum;
85pub use group_by_rows as group_by;
86pub use unique_values_rows as unique_values;
87
88#[derive(Debug, Clone)]
96pub struct DataTable {
97 batch: RecordBatch,
98 field_index: HashMap<String, usize>,
100}
101
102impl DataTable {
103 pub fn from_record_batch(batch: RecordBatch) -> Self {
105 let field_index = batch
106 .schema()
107 .fields()
108 .iter()
109 .enumerate()
110 .map(|(i, f)| (f.name().clone(), i))
111 .collect();
112 Self { batch, field_index }
113 }
114
115 pub fn from_rows(rows: &[Row]) -> Result<Self, ChartError> {
118 if rows.is_empty() {
119 let schema = Arc::new(Schema::new(Vec::<Field>::new()));
120 let batch = RecordBatch::new_empty(schema);
121 return Ok(Self::from_record_batch(batch));
122 }
123
124 let mut column_names: Vec<String> = Vec::new();
126 let mut seen = std::collections::HashSet::new();
127 for row in rows {
128 for key in row.keys() {
129 if seen.insert(key.clone()) {
130 column_names.push(key.clone());
131 }
132 }
133 }
134 column_names.sort();
135
136 let mut col_types: Vec<InferredType> = vec![InferredType::Null; column_names.len()];
138 for row in rows {
139 for (i, name) in column_names.iter().enumerate() {
140 if let Some(val) = row.get(name) {
141 let val_type = match val {
142 serde_json::Value::Number(_) => InferredType::Float64,
143 serde_json::Value::Bool(_) => InferredType::Boolean,
144 serde_json::Value::String(_) => InferredType::Utf8,
145 serde_json::Value::Null => InferredType::Null,
146 _ => InferredType::Utf8,
147 };
148 col_types[i] = merge_inferred(col_types[i], val_type);
149 }
150 }
151 }
152
153 for t in &mut col_types {
155 if *t == InferredType::Null {
156 *t = InferredType::Utf8;
157 }
158 }
159
160 let fields: Vec<Field> = column_names
162 .iter()
163 .zip(col_types.iter())
164 .map(|(name, typ)| {
165 let dt = match typ {
166 InferredType::Float64 => DataType::Float64,
167 InferredType::Boolean => DataType::Boolean,
168 InferredType::Utf8 | InferredType::Null => DataType::Utf8,
169 };
170 Field::new(name, dt, true)
171 })
172 .collect();
173 let schema = Arc::new(Schema::new(fields));
174
175 let mut arrays: Vec<ArrayRef> = Vec::with_capacity(column_names.len());
177 for (i, name) in column_names.iter().enumerate() {
178 let arr: ArrayRef = match col_types[i] {
179 InferredType::Float64 => {
180 let values: Vec<Option<f64>> = rows
181 .iter()
182 .map(|row| {
183 row.get(name).and_then(|v| match v {
184 serde_json::Value::Number(n) => n.as_f64(),
185 serde_json::Value::String(s) => s.parse::<f64>().ok(),
186 _ => None,
187 })
188 })
189 .collect();
190 Arc::new(Float64Array::from(values))
191 }
192 InferredType::Boolean => {
193 let values: Vec<Option<bool>> = rows
194 .iter()
195 .map(|row| {
196 row.get(name).and_then(|v| match v {
197 serde_json::Value::Bool(b) => Some(*b),
198 _ => None,
199 })
200 })
201 .collect();
202 Arc::new(BooleanArray::from(values))
203 }
204 InferredType::Utf8 | InferredType::Null => {
205 let mut builder = StringBuilder::new();
206 for row in rows {
207 match row.get(name) {
208 Some(serde_json::Value::String(s)) => builder.append_value(s),
209 Some(serde_json::Value::Number(n)) => {
210 builder.append_value(n.to_string())
211 }
212 Some(serde_json::Value::Bool(b)) => {
213 builder.append_value(b.to_string())
214 }
215 Some(serde_json::Value::Null) | None => builder.append_null(),
216 Some(other) => builder.append_value(other.to_string()),
217 }
218 }
219 Arc::new(builder.finish())
220 }
221 };
222 arrays.push(arr);
223 }
224
225 let batch = RecordBatch::try_new(schema, arrays)
226 .map_err(|e| ChartError::DataError(format!("Failed to create RecordBatch: {}", e)))?;
227 Ok(Self::from_record_batch(batch))
228 }
229
230 pub fn from_ipc_bytes(bytes: &[u8]) -> Result<Self, ChartError> {
232 use arrow::ipc::reader::StreamReader;
233 use std::io::Cursor;
234
235 let cursor = Cursor::new(bytes);
236 let reader = StreamReader::try_new(cursor, None)
237 .map_err(|e| ChartError::DataError(format!("Failed to read Arrow IPC: {}", e)))?;
238
239 let schema = reader.schema();
240 let mut batches = Vec::new();
241 for batch_result in reader {
242 let batch = batch_result.map_err(|e| {
243 ChartError::DataError(format!("Failed to read Arrow batch: {}", e))
244 })?;
245 batches.push(batch);
246 }
247
248 if batches.is_empty() {
249 return Ok(Self::from_record_batch(RecordBatch::new_empty(schema)));
250 }
251
252 if batches.len() == 1 {
253 return Ok(Self::from_record_batch(batches.remove(0)));
254 }
255
256 let batch = arrow::compute::concat_batches(&schema, &batches)
258 .map_err(|e| ChartError::DataError(format!("Failed to concat batches: {}", e)))?;
259 Ok(Self::from_record_batch(batch))
260 }
261
262 pub fn to_ipc_bytes(&self) -> Result<Vec<u8>, ChartError> {
264 use arrow::ipc::writer::StreamWriter;
265
266 let mut buf = Vec::new();
267 {
268 let mut writer = StreamWriter::try_new(&mut buf, &self.batch.schema())
269 .map_err(|e| ChartError::DataError(format!("Failed to create IPC writer: {}", e)))?;
270 writer.write(&self.batch).map_err(|e| {
271 ChartError::DataError(format!("Failed to write Arrow batch: {}", e))
272 })?;
273 writer.finish().map_err(|e| {
274 ChartError::DataError(format!("Failed to finish IPC stream: {}", e))
275 })?;
276 }
277 Ok(buf)
278 }
279
280 pub fn num_rows(&self) -> usize {
284 self.batch.num_rows()
285 }
286
287 pub fn num_columns(&self) -> usize {
289 self.batch.num_columns()
290 }
291
292 pub fn is_empty(&self) -> bool {
294 self.batch.num_rows() == 0
295 }
296
297 pub fn record_batch(&self) -> &RecordBatch {
299 &self.batch
300 }
301
302 pub fn schema(&self) -> Arc<Schema> {
304 self.batch.schema()
305 }
306
307 fn column(&self, field: &str) -> Option<&ArrayRef> {
309 self.field_index.get(field).map(|&i| self.batch.column(i))
310 }
311
312 pub fn get_f64(&self, row: usize, field: &str) -> Option<f64> {
315 let col = self.column(field)?;
316 if col.is_null(row) {
317 return None;
318 }
319 arrow_to_f64(col, row)
320 }
321
322 pub fn get_string(&self, row: usize, field: &str) -> Option<String> {
325 let col = self.column(field)?;
326 if col.is_null(row) {
327 return None;
328 }
329 arrow_to_string(col, row)
330 }
331
332 pub fn unique_values(&self, field: &str) -> Vec<String> {
334 let col = match self.column(field) {
335 Some(c) => c,
336 None => return Vec::new(),
337 };
338 let mut seen = std::collections::HashSet::new();
339 let mut result = Vec::new();
340 for i in 0..self.batch.num_rows() {
341 if col.is_null(i) {
342 continue;
343 }
344 if let Some(val) = arrow_to_string(col, i) {
345 if seen.insert(val.clone()) {
346 result.push(val);
347 }
348 }
349 }
350 result
351 }
352
353 pub fn all_values(&self, field: &str) -> Vec<String> {
355 let col = match self.column(field) {
356 Some(c) => c,
357 None => return Vec::new(),
358 };
359 let mut result = Vec::new();
360 for i in 0..self.batch.num_rows() {
361 if col.is_null(i) {
362 continue;
363 }
364 if let Some(val) = arrow_to_string(col, i) {
365 result.push(val);
366 }
367 }
368 result
369 }
370
371 pub fn extent(&self, field: &str) -> Option<(f64, f64)> {
373 let col = self.column(field)?;
374 let mut min = f64::INFINITY;
375 let mut max = f64::NEG_INFINITY;
376 let mut found = false;
377 for i in 0..self.batch.num_rows() {
378 if col.is_null(i) {
379 continue;
380 }
381 if let Some(v) = arrow_to_f64(col, i) {
382 found = true;
383 if v < min {
384 min = v;
385 }
386 if v > max {
387 max = v;
388 }
389 }
390 }
391 if found {
392 Some((min, max))
393 } else {
394 None
395 }
396 }
397
398 pub fn sum(&self, field: &str) -> f64 {
400 let col = match self.column(field) {
401 Some(c) => c,
402 None => return 0.0,
403 };
404 let mut total = 0.0;
405 for i in 0..self.batch.num_rows() {
406 if !col.is_null(i) {
407 if let Some(v) = arrow_to_f64(col, i) {
408 total += v;
409 }
410 }
411 }
412 total
413 }
414
415 pub fn group_by(&self, field: &str) -> HashMap<String, DataTable> {
417 let col = match self.column(field) {
418 Some(c) => c,
419 None => return HashMap::new(),
420 };
421
422 let mut group_indices: HashMap<String, Vec<u32>> = HashMap::new();
424 let mut key_order: Vec<String> = Vec::new();
425 let mut seen_keys = std::collections::HashSet::new();
426
427 for i in 0..self.batch.num_rows() {
428 if col.is_null(i) {
429 continue;
430 }
431 if let Some(key) = arrow_to_string(col, i) {
432 if seen_keys.insert(key.clone()) {
433 key_order.push(key.clone());
434 }
435 group_indices.entry(key).or_default().push(i as u32);
436 }
437 }
438
439 let mut result = HashMap::new();
441 for key in key_order {
442 if let Some(indices) = group_indices.get(&key) {
443 let indices_arr = UInt32Array::from(indices.clone());
444 let take_result: Result<Vec<ArrayRef>, _> = self
445 .batch
446 .columns()
447 .iter()
448 .map(|col| arrow::compute::take(col.as_ref(), &indices_arr, None))
449 .collect();
450 if let Ok(columns) = take_result {
451 if let Ok(sub_batch) = RecordBatch::try_new(self.batch.schema(), columns) {
452 result.insert(key, DataTable::from_record_batch(sub_batch));
453 }
454 }
455 }
456 }
457 result
458 }
459
460 pub fn has_field(&self, field: &str) -> bool {
462 self.field_index.contains_key(field)
463 }
464
465 pub fn field_names(&self) -> Vec<String> {
467 self.batch
468 .schema()
469 .fields()
470 .iter()
471 .map(|f| f.name().clone())
472 .collect()
473 }
474
475 pub fn to_rows(&self) -> Vec<Row> {
478 let num_rows = self.batch.num_rows();
479 let schema = self.batch.schema();
480 let fields = schema.fields();
481
482 let mut rows = Vec::with_capacity(num_rows);
483 for row_idx in 0..num_rows {
484 let mut row = Row::new();
485 for (col_idx, field) in fields.iter().enumerate() {
486 let col = self.batch.column(col_idx);
487 if col.is_null(row_idx) {
488 row.insert(field.name().clone(), serde_json::Value::Null);
489 continue;
490 }
491 let value = match col.data_type() {
492 DataType::Boolean => {
493 let v = col.as_any().downcast_ref::<BooleanArray>().unwrap().value(row_idx);
494 serde_json::json!(v)
495 }
496 DataType::Float64 | DataType::Float32 |
497 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 |
498 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 |
499 DataType::Decimal128(_, _) => {
500 if let Some(v) = arrow_to_f64(col, row_idx) {
501 serde_json::json!(v)
502 } else {
503 serde_json::Value::Null
504 }
505 }
506 DataType::Date32 | DataType::Date64 |
508 DataType::Timestamp(_, _) => {
509 if let Some(s) = arrow_to_string(col, row_idx) {
510 serde_json::Value::String(s)
511 } else {
512 serde_json::Value::Null
513 }
514 }
515 _ => {
516 if let Some(s) = arrow_to_string(col, row_idx) {
517 serde_json::Value::String(s)
518 } else {
519 serde_json::Value::Null
520 }
521 }
522 };
523 row.insert(field.name().clone(), value);
524 }
525 rows.push(row);
526 }
527 rows
528 }
529}
530
531fn arrow_to_f64(col: &ArrayRef, idx: usize) -> Option<f64> {
535 match col.data_type() {
536 DataType::Float64 => {
537 Some(col.as_any().downcast_ref::<Float64Array>().unwrap().value(idx))
538 }
539 DataType::Float32 => {
540 Some(col.as_any().downcast_ref::<Float32Array>().unwrap().value(idx) as f64)
541 }
542 DataType::Int64 => {
543 Some(col.as_any().downcast_ref::<Int64Array>().unwrap().value(idx) as f64)
544 }
545 DataType::Int32 => {
546 Some(col.as_any().downcast_ref::<Int32Array>().unwrap().value(idx) as f64)
547 }
548 DataType::Int16 => {
549 Some(col.as_any().downcast_ref::<Int16Array>().unwrap().value(idx) as f64)
550 }
551 DataType::Int8 => {
552 Some(col.as_any().downcast_ref::<Int8Array>().unwrap().value(idx) as f64)
553 }
554 DataType::UInt64 => {
555 Some(col.as_any().downcast_ref::<UInt64Array>().unwrap().value(idx) as f64)
556 }
557 DataType::UInt32 => {
558 Some(col.as_any().downcast_ref::<UInt32Array>().unwrap().value(idx) as f64)
559 }
560 DataType::UInt16 => {
561 Some(col.as_any().downcast_ref::<UInt16Array>().unwrap().value(idx) as f64)
562 }
563 DataType::UInt8 => {
564 Some(col.as_any().downcast_ref::<UInt8Array>().unwrap().value(idx) as f64)
565 }
566 DataType::Boolean => {
567 let v = col.as_any().downcast_ref::<BooleanArray>().unwrap().value(idx);
568 Some(if v { 1.0 } else { 0.0 })
569 }
570 DataType::Date32 => {
571 Some(col.as_any().downcast_ref::<Date32Array>().unwrap().value(idx) as f64)
573 }
574 DataType::Date64 => {
575 Some(col.as_any().downcast_ref::<Date64Array>().unwrap().value(idx) as f64)
577 }
578 DataType::Timestamp(unit, _) => {
579 let raw = match unit {
580 TimeUnit::Second => col
581 .as_any()
582 .downcast_ref::<TimestampSecondArray>()
583 .unwrap()
584 .value(idx),
585 TimeUnit::Millisecond => col
586 .as_any()
587 .downcast_ref::<TimestampMillisecondArray>()
588 .unwrap()
589 .value(idx),
590 TimeUnit::Microsecond => col
591 .as_any()
592 .downcast_ref::<TimestampMicrosecondArray>()
593 .unwrap()
594 .value(idx),
595 TimeUnit::Nanosecond => col
596 .as_any()
597 .downcast_ref::<TimestampNanosecondArray>()
598 .unwrap()
599 .value(idx),
600 };
601 let millis = match unit {
603 TimeUnit::Second => raw * 1000,
604 TimeUnit::Millisecond => raw,
605 TimeUnit::Microsecond => raw / 1000,
606 TimeUnit::Nanosecond => raw / 1_000_000,
607 };
608 Some(millis as f64)
609 }
610 DataType::Decimal128(_, scale) => {
611 let raw = col
612 .as_any()
613 .downcast_ref::<Decimal128Array>()
614 .unwrap()
615 .value(idx);
616 let divisor = 10_f64.powi(*scale as i32);
617 Some(raw as f64 / divisor)
618 }
619 DataType::Utf8 => {
620 let s = col.as_any().downcast_ref::<StringArray>().unwrap().value(idx);
622 s.parse::<f64>().ok()
623 }
624 _ => None,
625 }
626}
627
628fn arrow_to_string(col: &ArrayRef, idx: usize) -> Option<String> {
630 match col.data_type() {
631 DataType::Utf8 => {
632 Some(
633 col.as_any()
634 .downcast_ref::<StringArray>()
635 .unwrap()
636 .value(idx)
637 .to_string(),
638 )
639 }
640 DataType::LargeUtf8 => {
641 Some(
642 col.as_any()
643 .downcast_ref::<arrow::array::LargeStringArray>()
644 .unwrap()
645 .value(idx)
646 .to_string(),
647 )
648 }
649 DataType::Float64 => {
650 let v = col.as_any().downcast_ref::<Float64Array>().unwrap().value(idx);
651 Some(format_f64(v))
652 }
653 DataType::Float32 => {
654 let v = col.as_any().downcast_ref::<Float32Array>().unwrap().value(idx) as f64;
655 Some(format_f64(v))
656 }
657 DataType::Int64 => {
658 Some(col.as_any().downcast_ref::<Int64Array>().unwrap().value(idx).to_string())
659 }
660 DataType::Int32 => {
661 Some(col.as_any().downcast_ref::<Int32Array>().unwrap().value(idx).to_string())
662 }
663 DataType::Int16 => {
664 Some(col.as_any().downcast_ref::<Int16Array>().unwrap().value(idx).to_string())
665 }
666 DataType::Int8 => {
667 Some(col.as_any().downcast_ref::<Int8Array>().unwrap().value(idx).to_string())
668 }
669 DataType::UInt64 => {
670 Some(col.as_any().downcast_ref::<UInt64Array>().unwrap().value(idx).to_string())
671 }
672 DataType::UInt32 => {
673 Some(col.as_any().downcast_ref::<UInt32Array>().unwrap().value(idx).to_string())
674 }
675 DataType::UInt16 => {
676 Some(col.as_any().downcast_ref::<UInt16Array>().unwrap().value(idx).to_string())
677 }
678 DataType::UInt8 => {
679 Some(col.as_any().downcast_ref::<UInt8Array>().unwrap().value(idx).to_string())
680 }
681 DataType::Boolean => {
682 Some(col.as_any().downcast_ref::<BooleanArray>().unwrap().value(idx).to_string())
683 }
684 DataType::Date32 => {
685 let days = col.as_any().downcast_ref::<Date32Array>().unwrap().value(idx);
686 Some(days_to_iso(days as i64))
687 }
688 DataType::Date64 => {
689 let millis = col.as_any().downcast_ref::<Date64Array>().unwrap().value(idx);
690 let days = millis / 86_400_000;
692 Some(days_to_iso(days))
693 }
694 DataType::Timestamp(unit, tz) => {
695 let raw = match unit {
696 TimeUnit::Second => col
697 .as_any()
698 .downcast_ref::<TimestampSecondArray>()
699 .unwrap()
700 .value(idx),
701 TimeUnit::Millisecond => col
702 .as_any()
703 .downcast_ref::<TimestampMillisecondArray>()
704 .unwrap()
705 .value(idx),
706 TimeUnit::Microsecond => col
707 .as_any()
708 .downcast_ref::<TimestampMicrosecondArray>()
709 .unwrap()
710 .value(idx),
711 TimeUnit::Nanosecond => col
712 .as_any()
713 .downcast_ref::<TimestampNanosecondArray>()
714 .unwrap()
715 .value(idx),
716 };
717 let (secs, nanos_u32) = match unit {
720 TimeUnit::Second => (raw, 0u32),
721 TimeUnit::Millisecond => {
722 let (s, r) = (raw.div_euclid(1000), raw.rem_euclid(1000));
723 (s, (r * 1_000_000) as u32)
724 }
725 TimeUnit::Microsecond => {
726 let (s, r) = (raw.div_euclid(1_000_000), raw.rem_euclid(1_000_000));
727 (s, (r * 1000) as u32)
728 }
729 TimeUnit::Nanosecond => {
730 let (s, r) = (raw.div_euclid(1_000_000_000), raw.rem_euclid(1_000_000_000));
731 (s, r as u32)
732 }
733 };
734 let iso = epoch_to_iso(secs, nanos_u32);
735 if tz.is_some() {
736 Some(format!("{}Z", iso))
737 } else {
738 Some(iso)
739 }
740 }
741 DataType::Decimal128(_, scale) => {
742 let raw = col
743 .as_any()
744 .downcast_ref::<Decimal128Array>()
745 .unwrap()
746 .value(idx);
747 Some(format_decimal128(raw, *scale))
748 }
749 _ => None,
750 }
751}
752
753fn format_f64(v: f64) -> String {
755 if v.fract() == 0.0 && v.abs() < 1e15 {
756 format!("{}", v as i64)
757 } else {
758 v.to_string()
759 }
760}
761
762fn days_to_iso(days: i64) -> String {
764 let (year, month, day) = civil_from_days(days);
765 format!("{:04}-{:02}-{:02}", year, month, day)
766}
767
768fn epoch_to_iso(secs: i64, nanos: u32) -> String {
770 let days = if secs >= 0 {
771 secs / 86400
772 } else {
773 (secs - 86399) / 86400
774 };
775 let day_secs = secs - days * 86400;
776 let (year, month, day) = civil_from_days(days);
777 let hours = day_secs / 3600;
778 let minutes = (day_secs % 3600) / 60;
779 let seconds = day_secs % 60;
780
781 if nanos == 0 {
782 format!(
783 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}",
784 year, month, day, hours, minutes, seconds
785 )
786 } else {
787 let millis = nanos / 1_000_000;
788 format!(
789 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}",
790 year, month, day, hours, minutes, seconds, millis
791 )
792 }
793}
794
795fn civil_from_days(days: i64) -> (i64, u32, u32) {
798 let z = days + 719468;
799 let era = if z >= 0 { z } else { z - 146096 } / 146097;
800 let doe = (z - era * 146097) as u32;
801 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
802 let y = yoe as i64 + era * 400;
803 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
804 let mp = (5 * doy + 2) / 153;
805 let d = doy - (153 * mp + 2) / 5 + 1;
806 let m = if mp < 10 { mp + 3 } else { mp - 9 };
807 let y = if m <= 2 { y + 1 } else { y };
808 (y, m, d)
809}
810
811fn format_decimal128(raw: i128, scale: i8) -> String {
813 if scale <= 0 {
814 return raw.to_string();
815 }
816 let divisor = 10_i128.pow(scale as u32);
817 let whole = raw / divisor;
818 let frac = (raw % divisor).abs();
819 let sign = if raw < 0 && whole == 0 { "-" } else { "" };
822 format!("{}{}.{:0>width$}", sign, whole, frac, width = scale as usize)
823}
824
825#[derive(Debug, Clone, Copy, PartialEq)]
828enum InferredType {
829 Float64,
830 Boolean,
831 Utf8,
832 Null,
833}
834
835fn merge_inferred(existing: InferredType, new: InferredType) -> InferredType {
836 if new == InferredType::Null {
837 return existing;
838 }
839 if existing == InferredType::Null {
840 return new;
841 }
842 if existing == new {
843 return existing;
844 }
845 InferredType::Utf8
847}
848
849#[cfg(test)]
850mod tests {
851 use super::*;
852 use serde_json::json;
853
854 fn make_row(pairs: Vec<(&str, serde_json::Value)>) -> Row {
855 pairs
856 .into_iter()
857 .map(|(k, v)| (k.to_string(), v))
858 .collect()
859 }
860
861 #[test]
864 fn get_f64_from_number() {
865 let row = make_row(vec![("value", json!(42.5))]);
866 assert_eq!(get_f64(&row, "value"), Some(42.5));
867 }
868
869 #[test]
870 fn get_f64_from_string() {
871 let row = make_row(vec![("value", json!("123.45"))]);
872 assert_eq!(get_f64(&row, "value"), Some(123.45));
873 }
874
875 #[test]
876 fn get_f64_missing_field() {
877 let row = make_row(vec![("other", json!(1.0))]);
878 assert_eq!(get_f64(&row, "value"), None);
879 }
880
881 #[test]
882 fn get_string_from_various() {
883 let row_num = make_row(vec![("x", json!(42))]);
884 assert_eq!(get_string(&row_num, "x"), Some("42".to_string()));
885
886 let row_str = make_row(vec![("x", json!("hello"))]);
887 assert_eq!(get_string(&row_str, "x"), Some("hello".to_string()));
888
889 let row_bool = make_row(vec![("x", json!(true))]);
890 assert_eq!(get_string(&row_bool, "x"), Some("true".to_string()));
891
892 let row_null = make_row(vec![("x", json!(null))]);
893 assert_eq!(get_string(&row_null, "x"), None);
894 }
895
896 #[test]
897 fn extent_basic() {
898 let data = vec![
899 make_row(vec![("v", json!(10.0))]),
900 make_row(vec![("v", json!(30.0))]),
901 make_row(vec![("v", json!(20.0))]),
902 ];
903 assert_eq!(extent(&data, "v"), Some((10.0, 30.0)));
904 }
905
906 #[test]
907 fn extent_empty() {
908 let data: Vec<Row> = vec![];
909 assert_eq!(extent(&data, "v"), None);
910
911 let data = vec![make_row(vec![("other", json!(1.0))])];
912 assert_eq!(extent(&data, "v"), None);
913 }
914
915 #[test]
916 fn sum_basic() {
917 let data = vec![
918 make_row(vec![("v", json!(10.0))]),
919 make_row(vec![("v", json!(20.0))]),
920 make_row(vec![("v", json!(30.0))]),
921 ];
922 assert_eq!(sum(&data, "v"), 60.0);
923 }
924
925 #[test]
926 fn group_by_basic() {
927 let data = vec![
928 make_row(vec![("cat", json!("A")), ("v", json!(1))]),
929 make_row(vec![("cat", json!("B")), ("v", json!(2))]),
930 make_row(vec![("cat", json!("A")), ("v", json!(3))]),
931 ];
932 let groups = group_by(&data, "cat");
933 assert_eq!(groups.len(), 2);
934 assert_eq!(groups["A"].len(), 2);
935 assert_eq!(groups["B"].len(), 1);
936 }
937
938 #[test]
939 fn unique_values_preserves_order() {
940 let data = vec![
941 make_row(vec![("x", json!("banana"))]),
942 make_row(vec![("x", json!("apple"))]),
943 make_row(vec![("x", json!("banana"))]),
944 make_row(vec![("x", json!("cherry"))]),
945 make_row(vec![("x", json!("apple"))]),
946 ];
947 let uniq = unique_values(&data, "x");
948 assert_eq!(uniq, vec!["banana", "apple", "cherry"]);
949 }
950
951 #[test]
954 fn datatable_from_rows_roundtrip() {
955 let rows = vec![
956 make_row(vec![
957 ("name", json!("Alice")),
958 ("age", json!(30)),
959 ("active", json!(true)),
960 ]),
961 make_row(vec![
962 ("name", json!("Bob")),
963 ("age", json!(25)),
964 ("active", json!(false)),
965 ]),
966 ];
967
968 let dt = DataTable::from_rows(&rows).unwrap();
969 assert_eq!(dt.num_rows(), 2);
970 assert_eq!(dt.num_columns(), 3);
971
972 assert_eq!(dt.get_string(0, "name"), Some("Alice".to_string()));
973 assert_eq!(dt.get_f64(0, "age"), Some(30.0));
974 assert_eq!(dt.get_string(1, "name"), Some("Bob".to_string()));
975 assert_eq!(dt.get_f64(1, "age"), Some(25.0));
976 }
977
978 #[test]
979 fn datatable_empty() {
980 let dt = DataTable::from_rows(&[]).unwrap();
981 assert_eq!(dt.num_rows(), 0);
982 assert!(dt.is_empty());
983 }
984
985 #[test]
986 fn datatable_unique_values() {
987 let rows = vec![
988 make_row(vec![("x", json!("banana"))]),
989 make_row(vec![("x", json!("apple"))]),
990 make_row(vec![("x", json!("banana"))]),
991 make_row(vec![("x", json!("cherry"))]),
992 ];
993 let dt = DataTable::from_rows(&rows).unwrap();
994 assert_eq!(dt.unique_values("x"), vec!["banana", "apple", "cherry"]);
996 }
997
998 #[test]
999 fn datatable_extent() {
1000 let rows = vec![
1001 make_row(vec![("v", json!(10.0))]),
1002 make_row(vec![("v", json!(30.0))]),
1003 make_row(vec![("v", json!(20.0))]),
1004 ];
1005 let dt = DataTable::from_rows(&rows).unwrap();
1006 assert_eq!(dt.extent("v"), Some((10.0, 30.0)));
1007 }
1008
1009 #[test]
1010 fn datatable_group_by() {
1011 let rows = vec![
1012 make_row(vec![("cat", json!("A")), ("v", json!(1))]),
1013 make_row(vec![("cat", json!("B")), ("v", json!(2))]),
1014 make_row(vec![("cat", json!("A")), ("v", json!(3))]),
1015 ];
1016 let dt = DataTable::from_rows(&rows).unwrap();
1017 let groups = dt.group_by("cat");
1018 assert_eq!(groups.len(), 2);
1019 assert_eq!(groups["A"].num_rows(), 2);
1020 assert_eq!(groups["B"].num_rows(), 1);
1021 }
1022
1023 #[test]
1024 fn datatable_sum() {
1025 let rows = vec![
1026 make_row(vec![("v", json!(10.0))]),
1027 make_row(vec![("v", json!(20.0))]),
1028 make_row(vec![("v", json!(30.0))]),
1029 ];
1030 let dt = DataTable::from_rows(&rows).unwrap();
1031 assert_eq!(dt.sum("v"), 60.0);
1032 }
1033
1034 #[test]
1035 fn datatable_ipc_roundtrip() {
1036 let rows = vec![
1037 make_row(vec![("name", json!("Alice")), ("score", json!(95.5))]),
1038 make_row(vec![("name", json!("Bob")), ("score", json!(87.0))]),
1039 ];
1040 let dt = DataTable::from_rows(&rows).unwrap();
1041 let bytes = dt.to_ipc_bytes().unwrap();
1042 let dt2 = DataTable::from_ipc_bytes(&bytes).unwrap();
1043 assert_eq!(dt2.num_rows(), 2);
1044 assert_eq!(dt2.get_string(0, "name"), Some("Alice".to_string()));
1045 assert_eq!(dt2.get_f64(1, "score"), Some(87.0));
1046 }
1047
1048 #[test]
1049 fn datatable_record_batch_with_timestamps() {
1050 use arrow::array::TimestampMicrosecondArray;
1051 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1052
1053 let schema = Arc::new(Schema::new(vec![
1055 Field::new("ts", DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), true),
1056 Field::new("value", DataType::Float64, true),
1057 ]));
1058 let ts_array = TimestampMicrosecondArray::from(vec![Some(1768474200000000i64)])
1060 .with_timezone("UTC");
1061 let val_array = Float64Array::from(vec![Some(42.0)]);
1062 let batch = RecordBatch::try_new(
1063 schema,
1064 vec![Arc::new(ts_array) as ArrayRef, Arc::new(val_array) as ArrayRef],
1065 )
1066 .unwrap();
1067
1068 let dt = DataTable::from_record_batch(batch);
1069 assert_eq!(dt.get_f64(0, "ts"), Some(1768474200000.0));
1071 let ts_str = dt.get_string(0, "ts").unwrap();
1073 assert!(ts_str.ends_with('Z'), "Expected Z suffix, got: {}", ts_str);
1074 assert!(ts_str.starts_with("2026-01-15T"), "Expected 2026-01-15T, got: {}", ts_str);
1075 }
1076
1077 #[test]
1078 fn datatable_record_batch_with_dates() {
1079 let schema = Arc::new(Schema::new(vec![
1080 Field::new("d", DataType::Date32, true),
1081 ]));
1082 let date_array = Date32Array::from(vec![Some(20468)]);
1084 let batch = RecordBatch::try_new(schema, vec![Arc::new(date_array) as ArrayRef]).unwrap();
1085
1086 let dt = DataTable::from_record_batch(batch);
1087 assert_eq!(dt.get_string(0, "d"), Some("2026-01-15".to_string()));
1088 assert_eq!(dt.get_f64(0, "d"), Some(20468.0));
1089 }
1090
1091 #[test]
1092 fn datatable_has_field() {
1093 let rows = vec![make_row(vec![("x", json!(1))])];
1094 let dt = DataTable::from_rows(&rows).unwrap();
1095 assert!(dt.has_field("x"));
1096 assert!(!dt.has_field("y"));
1097 }
1098
1099 #[test]
1100 fn datatable_null_values() {
1101 let rows = vec![
1102 make_row(vec![("x", json!(1.0)), ("y", json!(null))]),
1103 make_row(vec![("x", json!(null)), ("y", json!("hello"))]),
1104 ];
1105 let dt = DataTable::from_rows(&rows).unwrap();
1106 assert_eq!(dt.get_f64(0, "x"), Some(1.0));
1107 assert_eq!(dt.get_f64(0, "y"), None);
1108 assert_eq!(dt.get_f64(1, "x"), None);
1109 assert_eq!(dt.get_string(1, "y"), Some("hello".to_string()));
1110 }
1111}