1use std::error::Error;
14use std::sync::Arc;
15
16use arrow_array::builder::{
17 BooleanBuilder, Float64Builder, Int32Builder, StringBuilder, TimestampMillisecondBuilder,
18};
19use arrow_array::{
20 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, Float32Array,
21 Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch,
22 StringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
23 Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
24 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
25 UInt64Array,
26};
27use arrow_ipc::reader::{FileReader, StreamReader};
28use arrow_ipc::writer::StreamWriter;
29use arrow_schema::{DataType, Field, Schema, TimeUnit};
30use indexmap::IndexMap;
31use serde::Serialize;
32
33use crate::config::{GroupRollupMode, Scalar, ViewConfig};
34
35pub enum ColumnBuilder {
38 Boolean(BooleanBuilder),
39 String(StringBuilder),
40 Float(Float64Builder),
41 Integer(Int32Builder),
42 Datetime(TimestampMillisecondBuilder),
43}
44
45#[derive(Debug, Serialize)]
50#[serde(untagged)]
51pub enum VirtualDataCell {
52 Boolean(Option<bool>),
53 String(Option<String>),
54 Float(Option<f64>),
55 Integer(Option<i32>),
56 Datetime(Option<i64>),
57 RowPath(Vec<Scalar>),
58}
59
60pub trait SetVirtualDataColumn {
66 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str>;
70
71 fn new_builder() -> ColumnBuilder;
74
75 fn to_scalar(self) -> Scalar;
77}
78
79impl SetVirtualDataColumn for Option<String> {
80 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
81 if let ColumnBuilder::String(builder) = col {
82 match self {
83 Some(s) => builder.append_value(&s),
84 None => builder.append_null(),
85 }
86 Ok(())
87 } else {
88 Err("Bad type")
89 }
90 }
91
92 fn new_builder() -> ColumnBuilder {
93 ColumnBuilder::String(StringBuilder::new())
94 }
95
96 fn to_scalar(self) -> Scalar {
97 if let Some(x) = self {
98 Scalar::String(x)
99 } else {
100 Scalar::Null
101 }
102 }
103}
104
105impl SetVirtualDataColumn for Option<f64> {
106 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
107 if let ColumnBuilder::Float(builder) = col {
108 match self {
109 Some(v) => builder.append_value(v),
110 None => builder.append_null(),
111 }
112 Ok(())
113 } else {
114 Err("Bad type")
115 }
116 }
117
118 fn new_builder() -> ColumnBuilder {
119 ColumnBuilder::Float(Float64Builder::new())
120 }
121
122 fn to_scalar(self) -> Scalar {
123 if let Some(x) = self {
124 Scalar::Float(x)
125 } else {
126 Scalar::Null
127 }
128 }
129}
130
131impl SetVirtualDataColumn for Option<i32> {
132 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
133 if let ColumnBuilder::Integer(builder) = col {
134 match self {
135 Some(v) => builder.append_value(v),
136 None => builder.append_null(),
137 }
138 Ok(())
139 } else {
140 Err("Bad type")
141 }
142 }
143
144 fn new_builder() -> ColumnBuilder {
145 ColumnBuilder::Integer(Int32Builder::new())
146 }
147
148 fn to_scalar(self) -> Scalar {
149 if let Some(x) = self {
150 Scalar::Float(x as f64)
151 } else {
152 Scalar::Null
153 }
154 }
155}
156
157impl SetVirtualDataColumn for Option<i64> {
158 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
159 if let ColumnBuilder::Datetime(builder) = col {
160 match self {
161 Some(v) => builder.append_value(v),
162 None => builder.append_null(),
163 }
164 Ok(())
165 } else {
166 Err("Bad type")
167 }
168 }
169
170 fn new_builder() -> ColumnBuilder {
171 ColumnBuilder::Datetime(TimestampMillisecondBuilder::new())
172 }
173
174 fn to_scalar(self) -> Scalar {
175 if let Some(x) = self {
176 Scalar::Float(x as f64)
177 } else {
178 Scalar::Null
179 }
180 }
181}
182
183impl SetVirtualDataColumn for Option<bool> {
184 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
185 if let ColumnBuilder::Boolean(builder) = col {
186 match self {
187 Some(v) => builder.append_value(v),
188 None => builder.append_null(),
189 }
190 Ok(())
191 } else {
192 Err("Bad type")
193 }
194 }
195
196 fn new_builder() -> ColumnBuilder {
197 ColumnBuilder::Boolean(BooleanBuilder::new())
198 }
199
200 fn to_scalar(self) -> Scalar {
201 if let Some(x) = self {
202 Scalar::Bool(x)
203 } else {
204 Scalar::Null
205 }
206 }
207}
208
209#[derive(Debug)]
215pub struct VirtualDataSlice {
216 config: ViewConfig,
217 builders: IndexMap<String, ColumnBuilder>,
218 row_path: Option<Vec<Vec<Scalar>>>,
219 frozen: Option<RecordBatch>,
220}
221
222impl std::fmt::Debug for ColumnBuilder {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 match self {
225 ColumnBuilder::Boolean(_) => write!(f, "ColumnBuilder::Boolean(..)"),
226 ColumnBuilder::String(_) => write!(f, "ColumnBuilder::String(..)"),
227 ColumnBuilder::Float(_) => write!(f, "ColumnBuilder::Float(..)"),
228 ColumnBuilder::Integer(_) => write!(f, "ColumnBuilder::Integer(..)"),
229 ColumnBuilder::Datetime(_) => write!(f, "ColumnBuilder::Datetime(..)"),
230 }
231 }
232}
233
234fn cast_to_int64(array: &ArrayRef) -> Result<Vec<i64>, Box<dyn Error>> {
236 let num_rows = array.len();
237 let mut result = Vec::with_capacity(num_rows);
238 match array.data_type() {
239 DataType::Int32 => {
240 let arr = array.as_any().downcast_ref::<Int32Array>().unwrap();
241 for i in 0..num_rows {
242 result.push(if arr.is_null(i) {
243 0
244 } else {
245 arr.value(i) as i64
246 });
247 }
248 },
249 DataType::Int64 => {
250 let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
251 for i in 0..num_rows {
252 result.push(if arr.is_null(i) { 0 } else { arr.value(i) });
253 }
254 },
255 DataType::Float64 => {
256 let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
257 for i in 0..num_rows {
258 result.push(if arr.is_null(i) {
259 0
260 } else {
261 arr.value(i) as i64
262 });
263 }
264 },
265 dt => return Err(format!("Cannot cast {} to Int64", dt).into()),
266 }
267 Ok(result)
268}
269
270fn extract_scalar(array: &ArrayRef, row_idx: usize) -> Scalar {
272 if array.is_null(row_idx) {
273 return Scalar::Null;
274 }
275 match array.data_type() {
276 DataType::Utf8 => {
277 let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
278 Scalar::String(arr.value(row_idx).to_string())
279 },
280 DataType::Float64 => {
281 let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
282 Scalar::Float(arr.value(row_idx))
283 },
284 DataType::Int32 => {
285 let arr = array.as_any().downcast_ref::<Int32Array>().unwrap();
286 Scalar::Float(arr.value(row_idx) as f64)
287 },
288 DataType::Int64 => {
289 let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
290 Scalar::Float(arr.value(row_idx) as f64)
291 },
292 DataType::Boolean => {
293 let arr = array.as_any().downcast_ref::<BooleanArray>().unwrap();
294 Scalar::Bool(arr.value(row_idx))
295 },
296 DataType::Timestamp(TimeUnit::Millisecond, _) => {
297 let arr = array
298 .as_any()
299 .downcast_ref::<TimestampMillisecondArray>()
300 .unwrap();
301 Scalar::Float(arr.value(row_idx) as f64)
302 },
303 DataType::Date32 => {
304 let arr = array.as_any().downcast_ref::<Date32Array>().unwrap();
305 Scalar::Float(arr.value(row_idx) as f64 * 86_400_000.0)
306 },
307 _ => {
308 let scalar_arr = array.slice(row_idx, 1);
309 Scalar::String(format!("{:?}", scalar_arr))
310 },
311 }
312}
313
314fn timestamp_to_millis(array: &ArrayRef, unit: &TimeUnit) -> ArrayRef {
318 let millis: TimestampMillisecondArray = match unit {
319 TimeUnit::Second => {
320 let arr = array
321 .as_any()
322 .downcast_ref::<TimestampSecondArray>()
323 .unwrap();
324 arr.iter().map(|v| v.map(|v| v * 1_000)).collect()
325 },
326 TimeUnit::Microsecond => {
327 let arr = array
328 .as_any()
329 .downcast_ref::<TimestampMicrosecondArray>()
330 .unwrap();
331 arr.iter().map(|v| v.map(|v| v / 1_000)).collect()
332 },
333 TimeUnit::Nanosecond => {
334 let arr = array
335 .as_any()
336 .downcast_ref::<TimestampNanosecondArray>()
337 .unwrap();
338 arr.iter().map(|v| v.map(|v| v / 1_000_000)).collect()
339 },
340 TimeUnit::Millisecond => {
341 return array.clone();
342 },
343 };
344 Arc::new(millis) as ArrayRef
345}
346
347fn coerce_column(
348 name: &str,
349 field: &Field,
350 array: &ArrayRef,
351) -> Result<(Field, ArrayRef), Box<dyn Error>> {
352 match field.data_type() {
353 DataType::Boolean
354 | DataType::Utf8
355 | DataType::Float64
356 | DataType::Int32
357 | DataType::Date32 => Ok((
358 Field::new(name, field.data_type().clone(), true),
359 array.clone(),
360 )),
361 DataType::Timestamp(TimeUnit::Millisecond, _) => Ok((
362 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
363 array.clone(),
364 )),
365 DataType::Int8 => {
366 let arr = array.as_any().downcast_ref::<Int8Array>().unwrap();
367 let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
368 Ok((
369 Field::new(name, DataType::Int32, true),
370 Arc::new(result) as ArrayRef,
371 ))
372 },
373 DataType::Int16 => {
374 let arr = array.as_any().downcast_ref::<Int16Array>().unwrap();
375 let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
376 Ok((
377 Field::new(name, DataType::Int32, true),
378 Arc::new(result) as ArrayRef,
379 ))
380 },
381 DataType::UInt8 => {
382 let arr = array.as_any().downcast_ref::<UInt8Array>().unwrap();
383 let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
384 Ok((
385 Field::new(name, DataType::Int32, true),
386 Arc::new(result) as ArrayRef,
387 ))
388 },
389 DataType::UInt16 => {
390 let arr = array.as_any().downcast_ref::<UInt16Array>().unwrap();
391 let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
392 Ok((
393 Field::new(name, DataType::Int32, true),
394 Arc::new(result) as ArrayRef,
395 ))
396 },
397 DataType::UInt32 => {
398 let arr = array.as_any().downcast_ref::<UInt32Array>().unwrap();
399 let result: Int64Array = arr.iter().map(|v| v.map(|v| v as i64)).collect();
400 let result: Float64Array = result.iter().map(|v| v.map(|v| v as f64)).collect();
401 Ok((
402 Field::new(name, DataType::Float64, true),
403 Arc::new(result) as ArrayRef,
404 ))
405 },
406 DataType::Int64 => {
407 let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
408 let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
409 Ok((
410 Field::new(name, DataType::Float64, true),
411 Arc::new(result) as ArrayRef,
412 ))
413 },
414 DataType::UInt64 => {
415 let arr = array.as_any().downcast_ref::<UInt64Array>().unwrap();
416 let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
417 Ok((
418 Field::new(name, DataType::Float64, true),
419 Arc::new(result) as ArrayRef,
420 ))
421 },
422 DataType::Float32 => {
423 let arr = array.as_any().downcast_ref::<Float32Array>().unwrap();
424 let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
425 Ok((
426 Field::new(name, DataType::Float64, true),
427 Arc::new(result) as ArrayRef,
428 ))
429 },
430 DataType::Decimal128(_, scale) => {
431 let scale = *scale;
432 let arr = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
433 let divisor = 10_f64.powi(scale as i32);
434 let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64 / divisor)).collect();
435 Ok((
436 Field::new(name, DataType::Float64, true),
437 Arc::new(result) as ArrayRef,
438 ))
439 },
440 DataType::Date64 => {
441 let arr = array.as_any().downcast_ref::<Date64Array>().unwrap();
442 let result: Date32Array = arr
443 .iter()
444 .map(|v| v.map(|v| (v / 86_400_000) as i32))
445 .collect();
446 Ok((
447 Field::new(name, DataType::Date32, true),
448 Arc::new(result) as ArrayRef,
449 ))
450 },
451 DataType::Timestamp(unit, _) => {
452 let casted = timestamp_to_millis(array, unit);
453 Ok((
454 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
455 casted,
456 ))
457 },
458 DataType::Time32(TimeUnit::Second) => {
459 let arr = array.as_any().downcast_ref::<Time32SecondArray>().unwrap();
460 let result: TimestampMillisecondArray =
461 arr.iter().map(|v| v.map(|v| v as i64 * 1_000)).collect();
462 Ok((
463 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
464 Arc::new(result) as ArrayRef,
465 ))
466 },
467 DataType::Time32(TimeUnit::Millisecond) => {
468 let arr = array
469 .as_any()
470 .downcast_ref::<Time32MillisecondArray>()
471 .unwrap();
472 let result: TimestampMillisecondArray =
473 arr.iter().map(|v| v.map(|v| v as i64)).collect();
474 Ok((
475 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
476 Arc::new(result) as ArrayRef,
477 ))
478 },
479 DataType::Time64(TimeUnit::Microsecond) => {
480 let arr = array
481 .as_any()
482 .downcast_ref::<Time64MicrosecondArray>()
483 .unwrap();
484 let result: TimestampMillisecondArray =
485 arr.iter().map(|v| v.map(|v| v / 1_000)).collect();
486 Ok((
487 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
488 Arc::new(result) as ArrayRef,
489 ))
490 },
491 DataType::Time64(TimeUnit::Nanosecond) => {
492 let arr = array
493 .as_any()
494 .downcast_ref::<Time64NanosecondArray>()
495 .unwrap();
496 let result: TimestampMillisecondArray =
497 arr.iter().map(|v| v.map(|v| v / 1_000_000)).collect();
498 Ok((
499 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
500 Arc::new(result) as ArrayRef,
501 ))
502 },
503 DataType::LargeUtf8 => {
504 let arr = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
505 let result: StringArray = arr.iter().map(|v| v.map(|v| v.to_string())).collect();
506 Ok((
507 Field::new(name, DataType::Utf8, true),
508 Arc::new(result) as ArrayRef,
509 ))
510 },
511 dt => {
512 tracing::warn!(
513 "Coercing unknown Arrow type {} to Utf8 for column '{}'",
514 dt,
515 name
516 );
517 let num_rows = array.len();
518 let mut builder = StringBuilder::new();
519 for i in 0..num_rows {
520 if array.is_null(i) {
521 builder.append_null();
522 } else {
523 let scalar_arr = array.slice(i, 1);
524 builder.append_value(format!("{:?}", scalar_arr));
525 }
526 }
527 Ok((
528 Field::new(name, DataType::Utf8, true),
529 Arc::new(builder.finish()) as ArrayRef,
530 ))
531 },
532 }
533}
534
535impl VirtualDataSlice {
536 pub fn new(config: ViewConfig) -> Self {
537 VirtualDataSlice {
538 config,
539 builders: IndexMap::default(),
540 row_path: None,
541 frozen: None,
542 }
543 }
544
545 pub fn from_arrow_ipc(&mut self, ipc: &[u8]) -> Result<(), Box<dyn Error>> {
558 let cursor = std::io::Cursor::new(ipc);
559 let batch = if &ipc[0..6] == "ARROW1".as_bytes() {
560 FileReader::try_new(cursor, None)?
561 .next()
562 .ok_or("Arrow IPC stream contained no record batches")??
563 } else {
564 StreamReader::try_new(cursor, None)?
565 .next()
566 .ok_or("Arrow IPC stream contained no record batches")??
567 };
568
569 let has_group_by = !self.config.group_by.is_empty();
570 let has_split_by = !self.config.split_by.is_empty();
571 let is_total = self.config.group_rollup_mode == GroupRollupMode::Total;
572
573 if !has_group_by && !has_split_by && !is_total {
574 self.frozen = Some(batch);
575 return Ok(());
576 }
577
578 let num_rows = batch.num_rows();
579 let schema = batch.schema();
580
581 if has_group_by {
583 let group_by_len = self.config.group_by.len();
584 let is_flat = self.config.group_rollup_mode == GroupRollupMode::Flat;
585 let grouping_ids = if is_flat {
586 None
587 } else {
588 let grouping_id_idx = schema
589 .index_of("__GROUPING_ID__")
590 .map_err(|_| "Missing __GROUPING_ID__ column")?;
591 Some(cast_to_int64(batch.column(grouping_id_idx))?)
592 };
593
594 let mut row_paths: Vec<Vec<Scalar>> = (0..num_rows).map(|_| Vec::new()).collect();
595 for gidx in 0..group_by_len {
596 let col_name = format!("__ROW_PATH_{}__", gidx);
597 let col_idx = schema
598 .index_of(&col_name)
599 .map_err(|_| format!("Missing {} column", col_name))?;
600
601 let col = batch.column(col_idx);
602
603 if is_flat {
605 #[allow(clippy::needless_range_loop)]
609 for row_idx in 0..num_rows {
610 row_paths[row_idx].push(extract_scalar(col, row_idx));
611 }
612 } else {
613 let gids = grouping_ids.as_ref().unwrap();
614 let max_grouping_id = 2_i64.pow(group_by_len as u32 - gidx as u32) - 1;
615 for row_idx in 0..num_rows {
616 if gids[row_idx] < max_grouping_id {
617 row_paths[row_idx].push(extract_scalar(col, row_idx));
618 }
619 }
620 }
621 }
622
623 self.row_path = Some(row_paths);
624 }
625
626 let mut new_fields = Vec::new();
629 let mut new_arrays: Vec<ArrayRef> = Vec::new();
630 for (col_idx, field) in schema.fields().iter().enumerate() {
631 let name = field.name();
632 if name == "__GROUPING_ID__" || name.starts_with("__ROW_PATH_") {
633 continue;
634 }
635
636 let new_name = if has_split_by && !name.starts_with("__") {
637 name.replace('_', "|")
638 } else {
639 name.clone()
640 };
641
642 let (coerced_field, coerced_array) =
643 coerce_column(&new_name, field, batch.column(col_idx))?;
644 new_fields.push(coerced_field);
645 new_arrays.push(coerced_array);
646 }
647
648 let new_schema = Arc::new(Schema::new(new_fields));
649 self.frozen = if new_arrays.is_empty() {
650 Some(RecordBatch::new_empty(new_schema))
651 } else {
652 Some(RecordBatch::try_new(new_schema, new_arrays)?)
653 };
654 Ok(())
655 }
656
657 pub(crate) fn freeze(&mut self) -> &RecordBatch {
660 if self.frozen.is_none() {
661 let mut fields = Vec::new();
662 let mut arrays: Vec<ArrayRef> = Vec::new();
663
664 for (name, builder) in &mut self.builders {
665 let (field, array): (Field, ArrayRef) = match builder {
666 ColumnBuilder::Boolean(b) => (
667 Field::new(name, DataType::Boolean, true),
668 Arc::new(b.finish()),
669 ),
670 ColumnBuilder::String(b) => {
671 (Field::new(name, DataType::Utf8, true), Arc::new(b.finish()))
672 },
673 ColumnBuilder::Float(b) => (
674 Field::new(name, DataType::Float64, true),
675 Arc::new(b.finish()),
676 ),
677 ColumnBuilder::Integer(b) => (
678 Field::new(name, DataType::Int32, true),
679 Arc::new(b.finish()),
680 ),
681 ColumnBuilder::Datetime(b) => (
682 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
683 Arc::new(b.finish()),
684 ),
685 };
686 fields.push(field);
687 arrays.push(array);
688 }
689
690 let schema = Arc::new(Schema::new(fields));
691 self.frozen = Some(
692 RecordBatch::try_new(schema, arrays)
693 .expect("RecordBatch construction should not fail for well-formed builders"),
694 );
695 }
696 self.frozen.as_ref().unwrap()
697 }
698
699 pub(crate) fn render_to_arrow_ipc(&mut self) -> Result<Vec<u8>, Box<dyn Error>> {
701 let batch = self.freeze().clone();
702 let schema = batch.schema();
703 let mut buf = Vec::new();
704 {
705 let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
706 writer.write(&batch)?;
707 writer.finish()?;
708 }
709 Ok(buf)
710 }
711
712 pub(crate) fn render_to_rows(&mut self) -> Vec<IndexMap<String, VirtualDataCell>> {
715 let batch = self.freeze().clone();
716 let num_rows = batch.num_rows();
717 let schema = batch.schema();
718
719 (0..num_rows)
720 .map(|row_idx| {
721 let mut row = IndexMap::new();
722
723 if let Some(ref rp) = self.row_path
725 && row_idx < rp.len()
726 {
727 row.insert(
728 "__ROW_PATH__".to_string(),
729 VirtualDataCell::RowPath(rp[row_idx].clone()),
730 );
731 }
732
733 for (col_idx, field) in schema.fields().iter().enumerate() {
735 let col = batch.column(col_idx);
736 let cell = if col.is_null(row_idx) {
737 match field.data_type() {
738 DataType::Boolean => VirtualDataCell::Boolean(None),
739 DataType::Utf8 => VirtualDataCell::String(None),
740 DataType::Float64 => VirtualDataCell::Float(None),
741 DataType::Int32 => VirtualDataCell::Integer(None),
742 DataType::Timestamp(TimeUnit::Millisecond, _) => {
743 VirtualDataCell::Datetime(None)
744 },
745 _ => continue,
746 }
747 } else {
748 match field.data_type() {
749 DataType::Boolean => {
750 let arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
751 VirtualDataCell::Boolean(Some(arr.value(row_idx)))
752 },
753 DataType::Utf8 => {
754 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
755 VirtualDataCell::String(Some(arr.value(row_idx).to_string()))
756 },
757 DataType::Float64 => {
758 let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
759 VirtualDataCell::Float(Some(arr.value(row_idx)))
760 },
761 DataType::Int32 => {
762 let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
763 VirtualDataCell::Integer(Some(arr.value(row_idx)))
764 },
765 DataType::Timestamp(TimeUnit::Millisecond, _) => {
766 let arr = col
767 .as_any()
768 .downcast_ref::<TimestampMillisecondArray>()
769 .unwrap();
770 VirtualDataCell::Datetime(Some(arr.value(row_idx)))
771 },
772 DataType::Date32 => {
773 let arr = col.as_any().downcast_ref::<Date32Array>().unwrap();
774 VirtualDataCell::Datetime(Some(
775 arr.value(row_idx) as i64 * 86_400_000,
776 ))
777 },
778 x => {
779 tracing::error!("Unknown Arrow IPC type {}", x);
780 continue;
781 },
782 }
783 };
784 row.insert(field.name().clone(), cell);
785 }
786
787 row
788 })
789 .collect()
790 }
791
792 pub fn render_to_columns_json(&mut self) -> Result<String, Box<dyn Error>> {
794 let batch = self.freeze().clone();
795 let schema = batch.schema();
796 let mut map = serde_json::Map::new();
797
798 if let Some(ref rp) = self.row_path {
800 map.insert("__ROW_PATH__".to_string(), serde_json::to_value(rp)?);
801 }
802
803 for (col_idx, field) in schema.fields().iter().enumerate() {
804 let col = batch.column(col_idx);
805 let num_rows = col.len();
806 let values: serde_json::Value = match field.data_type() {
807 DataType::Boolean => {
808 let arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
809 serde_json::to_value(
810 (0..num_rows)
811 .map(|i| {
812 if arr.is_null(i) {
813 None
814 } else {
815 Some(arr.value(i))
816 }
817 })
818 .collect::<Vec<_>>(),
819 )?
820 },
821 DataType::Utf8 => {
822 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
823 serde_json::to_value(
824 (0..num_rows)
825 .map(|i| {
826 if arr.is_null(i) {
827 None
828 } else {
829 Some(arr.value(i))
830 }
831 })
832 .collect::<Vec<_>>(),
833 )?
834 },
835 DataType::Float64 => {
836 let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
837 serde_json::to_value(
838 (0..num_rows)
839 .map(|i| {
840 if arr.is_null(i) {
841 None
842 } else {
843 Some(arr.value(i))
844 }
845 })
846 .collect::<Vec<_>>(),
847 )?
848 },
849 DataType::Int32 => {
850 let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
851 serde_json::to_value(
852 (0..num_rows)
853 .map(|i| {
854 if arr.is_null(i) {
855 None
856 } else {
857 Some(arr.value(i))
858 }
859 })
860 .collect::<Vec<_>>(),
861 )?
862 },
863 DataType::Timestamp(TimeUnit::Millisecond, _) => {
864 let arr = col
865 .as_any()
866 .downcast_ref::<TimestampMillisecondArray>()
867 .unwrap();
868 serde_json::to_value(
869 (0..num_rows)
870 .map(|i| {
871 if arr.is_null(i) {
872 None
873 } else {
874 Some(arr.value(i))
875 }
876 })
877 .collect::<Vec<_>>(),
878 )?
879 },
880 DataType::Date32 => {
881 let arr = col.as_any().downcast_ref::<Date32Array>().unwrap();
882 serde_json::to_value(
883 (0..num_rows)
884 .map(|i| {
885 if arr.is_null(i) {
886 None
887 } else {
888 Some(arr.value(i) as i64 * 86_400_000)
889 }
890 })
891 .collect::<Vec<_>>(),
892 )?
893 },
894 x => {
895 tracing::error!("Unknown Arrow IPC type {}", x);
896 continue;
897 },
898 };
899 map.insert(field.name().clone(), values);
900 }
901
902 Ok(serde_json::to_string(&map)?)
903 }
904
905 pub fn set_col<T: SetVirtualDataColumn>(
913 &mut self,
914 name: &str,
915 grouping_id: Option<usize>,
916 index: usize,
917 value: T,
918 ) -> Result<(), Box<dyn Error>> {
919 if name == "__GROUPING_ID__" {
920 return Ok(());
921 }
922
923 if name.starts_with("__ROW_PATH_") {
924 let group_by_index: u32 = name[11..name.len() - 2].parse()?;
925 let max_grouping_id =
926 2_i32.pow((self.config.group_by.len() as u32) - group_by_index) - 1;
927
928 if grouping_id.map(|x| x as i32).unwrap_or(i32::MAX) < max_grouping_id {
929 let col = self.row_path.get_or_insert_with(Vec::new);
930 if let Some(row) = col.get_mut(index) {
931 let scalar = value.to_scalar();
932 row.push(scalar);
933 } else {
934 while col.len() < index {
935 col.push(vec![])
936 }
937
938 let scalar = value.to_scalar();
939 col.push(vec![scalar]);
940 }
941 }
942
943 Ok(())
944 } else {
945 let col_name = if !self.config.split_by.is_empty() && !name.starts_with("__") {
946 name.replace('_', "|")
947 } else {
948 name.to_owned()
949 };
950
951 if !self.builders.contains_key(&col_name) {
952 self.builders.insert(col_name.clone(), T::new_builder());
953 }
954
955 let col = self
956 .builders
957 .get_mut(&col_name)
958 .ok_or_else(|| format!("Column '{}' not found after insertion", col_name))?;
959
960 Ok(value.write_to(col)?)
961 }
962 }
963}