1use std::error::Error;
14use std::sync::Arc;
15
16use arrow_array::builder::{
17 BooleanBuilder, Float64Builder, Int32Builder, StringDictionaryBuilder,
18 TimestampMillisecondBuilder,
19};
20use arrow_array::cast::AsArray;
21use arrow_array::types::Int32Type;
22use arrow_array::{
23 Array, ArrayAccessor, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
24 Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray,
25 RecordBatch, StringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
26 Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
27 TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
28 UInt64Array,
29};
30use arrow_ipc::reader::{FileReader, StreamReader};
31use arrow_ipc::writer::StreamWriter;
32use arrow_schema::{DataType, Field, Schema, TimeUnit};
33use indexmap::IndexMap;
34use serde::Serialize;
35
36use crate::config::{GroupRollupMode, Scalar, ViewConfig};
37
38pub enum ColumnBuilder {
41 Boolean(BooleanBuilder),
42 String(StringDictionaryBuilder<Int32Type>),
43 Float(Float64Builder),
44 Integer(Int32Builder),
45 Datetime(TimestampMillisecondBuilder),
46}
47
48fn dict_data_type() -> DataType {
49 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
50}
51
52#[derive(Debug, Serialize)]
57#[serde(untagged)]
58pub enum VirtualDataCell {
59 Boolean(Option<bool>),
60 String(Option<String>),
61 Float(Option<f64>),
62 Integer(Option<i32>),
63 Datetime(Option<i64>),
64 RowPath(Vec<Scalar>),
65}
66
67#[derive(Copy, Clone, Debug, PartialEq, Eq)]
68pub enum RowPathStyle {
69 Sidecar,
75
76 PerLevel,
80}
81
82pub trait SetVirtualDataColumn {
88 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str>;
92
93 fn new_builder() -> ColumnBuilder;
96
97 fn to_scalar(self) -> Scalar;
99}
100
101impl SetVirtualDataColumn for Option<String> {
102 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
103 if let ColumnBuilder::String(builder) = col {
104 match self {
105 Some(s) => builder.append_value(&s),
106 None => builder.append_null(),
107 }
108 Ok(())
109 } else {
110 Err("Bad type")
111 }
112 }
113
114 fn new_builder() -> ColumnBuilder {
115 ColumnBuilder::String(StringDictionaryBuilder::new())
116 }
117
118 fn to_scalar(self) -> Scalar {
119 if let Some(x) = self {
120 Scalar::String(x)
121 } else {
122 Scalar::Null
123 }
124 }
125}
126
127impl SetVirtualDataColumn for Option<f64> {
128 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
129 if let ColumnBuilder::Float(builder) = col {
130 match self {
131 Some(v) => builder.append_value(v),
132 None => builder.append_null(),
133 }
134 Ok(())
135 } else {
136 Err("Bad type")
137 }
138 }
139
140 fn new_builder() -> ColumnBuilder {
141 ColumnBuilder::Float(Float64Builder::new())
142 }
143
144 fn to_scalar(self) -> Scalar {
145 if let Some(x) = self {
146 Scalar::Float(x)
147 } else {
148 Scalar::Null
149 }
150 }
151}
152
153impl SetVirtualDataColumn for Option<i32> {
154 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
155 if let ColumnBuilder::Integer(builder) = col {
156 match self {
157 Some(v) => builder.append_value(v),
158 None => builder.append_null(),
159 }
160 Ok(())
161 } else {
162 Err("Bad type")
163 }
164 }
165
166 fn new_builder() -> ColumnBuilder {
167 ColumnBuilder::Integer(Int32Builder::new())
168 }
169
170 fn to_scalar(self) -> Scalar {
171 if let Some(x) = self {
172 Scalar::Float(x as f64)
173 } else {
174 Scalar::Null
175 }
176 }
177}
178
179impl SetVirtualDataColumn for Option<i64> {
180 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
181 if let ColumnBuilder::Datetime(builder) = col {
182 match self {
183 Some(v) => builder.append_value(v),
184 None => builder.append_null(),
185 }
186 Ok(())
187 } else {
188 Err("Bad type")
189 }
190 }
191
192 fn new_builder() -> ColumnBuilder {
193 ColumnBuilder::Datetime(TimestampMillisecondBuilder::new())
194 }
195
196 fn to_scalar(self) -> Scalar {
197 if let Some(x) = self {
198 Scalar::Float(x as f64)
199 } else {
200 Scalar::Null
201 }
202 }
203}
204
205impl SetVirtualDataColumn for Option<bool> {
206 fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
207 if let ColumnBuilder::Boolean(builder) = col {
208 match self {
209 Some(v) => builder.append_value(v),
210 None => builder.append_null(),
211 }
212 Ok(())
213 } else {
214 Err("Bad type")
215 }
216 }
217
218 fn new_builder() -> ColumnBuilder {
219 ColumnBuilder::Boolean(BooleanBuilder::new())
220 }
221
222 fn to_scalar(self) -> Scalar {
223 if let Some(x) = self {
224 Scalar::Bool(x)
225 } else {
226 Scalar::Null
227 }
228 }
229}
230
231#[derive(Debug)]
237pub struct VirtualDataSlice {
238 config: ViewConfig,
239 builders: IndexMap<String, ColumnBuilder>,
240 row_path: Option<Vec<Vec<Scalar>>>,
241 frozen: Option<RecordBatch>,
242}
243
244impl std::fmt::Debug for ColumnBuilder {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 match self {
247 ColumnBuilder::Boolean(_) => write!(f, "ColumnBuilder::Boolean(..)"),
248 ColumnBuilder::String(_) => write!(f, "ColumnBuilder::String(..)"),
249 ColumnBuilder::Float(_) => write!(f, "ColumnBuilder::Float(..)"),
250 ColumnBuilder::Integer(_) => write!(f, "ColumnBuilder::Integer(..)"),
251 ColumnBuilder::Datetime(_) => write!(f, "ColumnBuilder::Datetime(..)"),
252 }
253 }
254}
255
256fn cast_to_int64(array: &ArrayRef) -> Result<Vec<i64>, Box<dyn Error>> {
258 let num_rows = array.len();
259 let mut result = Vec::with_capacity(num_rows);
260 match array.data_type() {
261 DataType::Int32 => {
262 let arr = array.as_any().downcast_ref::<Int32Array>().unwrap();
263 for i in 0..num_rows {
264 result.push(if arr.is_null(i) {
265 0
266 } else {
267 arr.value(i) as i64
268 });
269 }
270 },
271 DataType::Int64 => {
272 let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
273 for i in 0..num_rows {
274 result.push(if arr.is_null(i) { 0 } else { arr.value(i) });
275 }
276 },
277 DataType::Float64 => {
278 let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
279 for i in 0..num_rows {
280 result.push(if arr.is_null(i) {
281 0
282 } else {
283 arr.value(i) as i64
284 });
285 }
286 },
287 dt => return Err(format!("Cannot cast {} to Int64", dt).into()),
288 }
289 Ok(result)
290}
291
292fn extract_scalar(array: &ArrayRef, row_idx: usize) -> Scalar {
294 if array.is_null(row_idx) {
295 return Scalar::Null;
296 }
297 match array.data_type() {
298 DataType::Utf8 => {
299 let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
300 Scalar::String(arr.value(row_idx).to_string())
301 },
302 DataType::Dictionary(..) => {
303 let dict = array.as_dictionary::<Int32Type>();
304 let values = dict.downcast_dict::<StringArray>().unwrap();
305 Scalar::String(values.value(row_idx).to_string())
306 },
307 DataType::Float64 => {
308 let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
309 Scalar::Float(arr.value(row_idx))
310 },
311 DataType::Int32 => {
312 let arr = array.as_any().downcast_ref::<Int32Array>().unwrap();
313 Scalar::Float(arr.value(row_idx) as f64)
314 },
315 DataType::Int64 => {
316 let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
317 Scalar::Float(arr.value(row_idx) as f64)
318 },
319 DataType::Boolean => {
320 let arr = array.as_any().downcast_ref::<BooleanArray>().unwrap();
321 Scalar::Bool(arr.value(row_idx))
322 },
323 DataType::Timestamp(TimeUnit::Millisecond, _) => {
324 let arr = array
325 .as_any()
326 .downcast_ref::<TimestampMillisecondArray>()
327 .unwrap();
328 Scalar::Float(arr.value(row_idx) as f64)
329 },
330 DataType::Date32 => {
331 let arr = array.as_any().downcast_ref::<Date32Array>().unwrap();
332 Scalar::Float(arr.value(row_idx) as f64 * 86_400_000.0)
333 },
334 _ => {
335 let scalar_arr = array.slice(row_idx, 1);
336 Scalar::String(format!("{:?}", scalar_arr))
337 },
338 }
339}
340
341fn timestamp_to_millis(array: &ArrayRef, unit: &TimeUnit) -> ArrayRef {
345 let millis: TimestampMillisecondArray = match unit {
346 TimeUnit::Second => {
347 let arr = array
348 .as_any()
349 .downcast_ref::<TimestampSecondArray>()
350 .unwrap();
351 arr.iter().map(|v| v.map(|v| v * 1_000)).collect()
352 },
353 TimeUnit::Microsecond => {
354 let arr = array
355 .as_any()
356 .downcast_ref::<TimestampMicrosecondArray>()
357 .unwrap();
358 arr.iter().map(|v| v.map(|v| v / 1_000)).collect()
359 },
360 TimeUnit::Nanosecond => {
361 let arr = array
362 .as_any()
363 .downcast_ref::<TimestampNanosecondArray>()
364 .unwrap();
365 arr.iter().map(|v| v.map(|v| v / 1_000_000)).collect()
366 },
367 TimeUnit::Millisecond => {
368 return array.clone();
369 },
370 };
371 Arc::new(millis) as ArrayRef
372}
373
374fn coerce_column(
375 name: &str,
376 field: &Field,
377 array: &ArrayRef,
378) -> Result<(Field, ArrayRef), Box<dyn Error>> {
379 match field.data_type() {
380 DataType::Boolean | DataType::Float64 | DataType::Int32 | DataType::Date32 => Ok((
381 Field::new(name, field.data_type().clone(), true),
382 array.clone(),
383 )),
384 DataType::Dictionary(..) => Ok((Field::new(name, dict_data_type(), true), array.clone())),
385 DataType::Utf8 => {
386 let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
387 let mut builder = StringDictionaryBuilder::<Int32Type>::new();
388 for i in 0..arr.len() {
389 if arr.is_null(i) {
390 builder.append_null();
391 } else {
392 builder.append_value(arr.value(i));
393 }
394 }
395 Ok((
396 Field::new(name, dict_data_type(), true),
397 Arc::new(builder.finish()) as ArrayRef,
398 ))
399 },
400 DataType::Timestamp(TimeUnit::Millisecond, _) => Ok((
401 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
402 array.clone(),
403 )),
404 DataType::Int8 => {
405 let arr = array.as_any().downcast_ref::<Int8Array>().unwrap();
406 let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
407 Ok((
408 Field::new(name, DataType::Int32, true),
409 Arc::new(result) as ArrayRef,
410 ))
411 },
412 DataType::Int16 => {
413 let arr = array.as_any().downcast_ref::<Int16Array>().unwrap();
414 let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
415 Ok((
416 Field::new(name, DataType::Int32, true),
417 Arc::new(result) as ArrayRef,
418 ))
419 },
420 DataType::UInt8 => {
421 let arr = array.as_any().downcast_ref::<UInt8Array>().unwrap();
422 let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
423 Ok((
424 Field::new(name, DataType::Int32, true),
425 Arc::new(result) as ArrayRef,
426 ))
427 },
428 DataType::UInt16 => {
429 let arr = array.as_any().downcast_ref::<UInt16Array>().unwrap();
430 let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
431 Ok((
432 Field::new(name, DataType::Int32, true),
433 Arc::new(result) as ArrayRef,
434 ))
435 },
436 DataType::UInt32 => {
437 let arr = array.as_any().downcast_ref::<UInt32Array>().unwrap();
438 let result: Int64Array = arr.iter().map(|v| v.map(|v| v as i64)).collect();
439 let result: Float64Array = result.iter().map(|v| v.map(|v| v as f64)).collect();
440 Ok((
441 Field::new(name, DataType::Float64, true),
442 Arc::new(result) as ArrayRef,
443 ))
444 },
445 DataType::Int64 => {
446 let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
447 let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
448 Ok((
449 Field::new(name, DataType::Float64, true),
450 Arc::new(result) as ArrayRef,
451 ))
452 },
453 DataType::UInt64 => {
454 let arr = array.as_any().downcast_ref::<UInt64Array>().unwrap();
455 let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
456 Ok((
457 Field::new(name, DataType::Float64, true),
458 Arc::new(result) as ArrayRef,
459 ))
460 },
461 DataType::Float32 => {
462 let arr = array.as_any().downcast_ref::<Float32Array>().unwrap();
463 let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
464 Ok((
465 Field::new(name, DataType::Float64, true),
466 Arc::new(result) as ArrayRef,
467 ))
468 },
469 DataType::Decimal128(_, scale) => {
470 let scale = *scale;
471 let arr = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
472 let divisor = 10_f64.powi(scale as i32);
473 let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64 / divisor)).collect();
474 Ok((
475 Field::new(name, DataType::Float64, true),
476 Arc::new(result) as ArrayRef,
477 ))
478 },
479 DataType::Date64 => {
480 let arr = array.as_any().downcast_ref::<Date64Array>().unwrap();
481 let result: Date32Array = arr
482 .iter()
483 .map(|v| v.map(|v| (v / 86_400_000) as i32))
484 .collect();
485 Ok((
486 Field::new(name, DataType::Date32, true),
487 Arc::new(result) as ArrayRef,
488 ))
489 },
490 DataType::Timestamp(unit, _) => {
491 let casted = timestamp_to_millis(array, unit);
492 Ok((
493 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
494 casted,
495 ))
496 },
497 DataType::Time32(TimeUnit::Second) => {
498 let arr = array.as_any().downcast_ref::<Time32SecondArray>().unwrap();
499 let result: TimestampMillisecondArray =
500 arr.iter().map(|v| v.map(|v| v as i64 * 1_000)).collect();
501 Ok((
502 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
503 Arc::new(result) as ArrayRef,
504 ))
505 },
506 DataType::Time32(TimeUnit::Millisecond) => {
507 let arr = array
508 .as_any()
509 .downcast_ref::<Time32MillisecondArray>()
510 .unwrap();
511 let result: TimestampMillisecondArray =
512 arr.iter().map(|v| v.map(|v| v as i64)).collect();
513 Ok((
514 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
515 Arc::new(result) as ArrayRef,
516 ))
517 },
518 DataType::Time64(TimeUnit::Microsecond) => {
519 let arr = array
520 .as_any()
521 .downcast_ref::<Time64MicrosecondArray>()
522 .unwrap();
523 let result: TimestampMillisecondArray =
524 arr.iter().map(|v| v.map(|v| v / 1_000)).collect();
525 Ok((
526 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
527 Arc::new(result) as ArrayRef,
528 ))
529 },
530 DataType::Time64(TimeUnit::Nanosecond) => {
531 let arr = array
532 .as_any()
533 .downcast_ref::<Time64NanosecondArray>()
534 .unwrap();
535 let result: TimestampMillisecondArray =
536 arr.iter().map(|v| v.map(|v| v / 1_000_000)).collect();
537 Ok((
538 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
539 Arc::new(result) as ArrayRef,
540 ))
541 },
542 DataType::LargeUtf8 => {
543 let arr = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
544 let mut builder = StringDictionaryBuilder::<Int32Type>::new();
545 for i in 0..arr.len() {
546 if arr.is_null(i) {
547 builder.append_null();
548 } else {
549 builder.append_value(arr.value(i));
550 }
551 }
552 Ok((
553 Field::new(name, dict_data_type(), true),
554 Arc::new(builder.finish()) as ArrayRef,
555 ))
556 },
557 dt => {
558 tracing::warn!(
559 "Coercing unknown Arrow type {} to Dictionary for column '{}'",
560 dt,
561 name
562 );
563 let num_rows = array.len();
564 let mut builder = StringDictionaryBuilder::<Int32Type>::new();
565 for i in 0..num_rows {
566 if array.is_null(i) {
567 builder.append_null();
568 } else {
569 let scalar_arr = array.slice(i, 1);
570 builder.append_value(format!("{:?}", scalar_arr));
571 }
572 }
573 Ok((
574 Field::new(name, dict_data_type(), true),
575 Arc::new(builder.finish()) as ArrayRef,
576 ))
577 },
578 }
579}
580
581impl VirtualDataSlice {
582 pub fn new(config: ViewConfig) -> Self {
583 VirtualDataSlice {
584 config,
585 builders: IndexMap::default(),
586 row_path: None,
587 frozen: None,
588 }
589 }
590
591 pub fn from_arrow_ipc(&mut self, ipc: &[u8]) -> Result<(), Box<dyn Error>> {
610 let cursor = std::io::Cursor::new(ipc);
611 let batches: Vec<RecordBatch> = if &ipc[0..6] == "ARROW1".as_bytes() {
612 FileReader::try_new(cursor, None)?.collect::<Result<Vec<_>, _>>()?
613 } else {
614 StreamReader::try_new(cursor, None)?.collect::<Result<Vec<_>, _>>()?
615 };
616
617 let batch = match batches.len() {
618 0 => return Err("Arrow IPC stream contained no record batches".into()),
619 1 => batches.into_iter().next().unwrap(),
620 _ => arrow_select::concat::concat_batches(&batches[0].schema(), &batches)?,
621 };
622
623 let has_group_by = !self.config.group_by.is_empty();
624 let has_split_by = !self.config.split_by.is_empty();
625 let is_total = self.config.group_rollup_mode == GroupRollupMode::Total;
626
627 if !has_group_by && !has_split_by && !is_total {
628 self.frozen = Some(batch);
629 return Ok(());
630 }
631
632 let num_rows = batch.num_rows();
633 let schema = batch.schema();
634
635 if has_group_by {
637 let group_by_len = self.config.group_by.len();
638 let is_flat = self.config.group_rollup_mode == GroupRollupMode::Flat;
639 let grouping_ids = if is_flat {
640 None
641 } else {
642 let grouping_id_idx = schema
643 .index_of("__GROUPING_ID__")
644 .map_err(|_| "Missing __GROUPING_ID__ column")?;
645 Some(cast_to_int64(batch.column(grouping_id_idx))?)
646 };
647
648 let mut row_paths: Vec<Vec<Scalar>> = (0..num_rows).map(|_| Vec::new()).collect();
649 for gidx in 0..group_by_len {
650 let col_name = format!("__ROW_PATH_{}__", gidx);
651 let col_idx = schema
652 .index_of(&col_name)
653 .map_err(|_| format!("Missing {} column", col_name))?;
654
655 let col = batch.column(col_idx);
656
657 if is_flat {
659 #[allow(clippy::needless_range_loop)]
663 for row_idx in 0..num_rows {
664 row_paths[row_idx].push(extract_scalar(col, row_idx));
665 }
666 } else {
667 let gids = grouping_ids.as_ref().unwrap();
668 let max_grouping_id = 2_i64.pow(group_by_len as u32 - gidx as u32) - 1;
669 for row_idx in 0..num_rows {
670 if gids[row_idx] < max_grouping_id {
671 row_paths[row_idx].push(extract_scalar(col, row_idx));
672 }
673 }
674 }
675 }
676
677 self.row_path = Some(row_paths);
678 }
679
680 let mut new_fields = Vec::new();
683 let mut new_arrays: Vec<ArrayRef> = Vec::new();
684 for (col_idx, field) in schema.fields().iter().enumerate() {
685 let name = field.name();
686 if name == "__GROUPING_ID__" {
702 continue;
703 }
704
705 let new_name = if has_split_by && !name.starts_with("__") {
706 name.replace('_', "|")
707 } else {
708 name.clone()
709 };
710
711 let (coerced_field, coerced_array) =
712 coerce_column(&new_name, field, batch.column(col_idx))?;
713 new_fields.push(coerced_field);
714 new_arrays.push(coerced_array);
715 }
716
717 let new_schema = Arc::new(Schema::new(new_fields));
718 self.frozen = if new_arrays.is_empty() {
719 Some(RecordBatch::new_empty(new_schema))
720 } else {
721 Some(RecordBatch::try_new(new_schema, new_arrays)?)
722 };
723 Ok(())
724 }
725
726 pub(crate) fn freeze(&mut self) -> &RecordBatch {
729 if self.frozen.is_none() {
730 let mut fields = Vec::new();
731 let mut arrays: Vec<ArrayRef> = Vec::new();
732
733 for (name, builder) in &mut self.builders {
734 let (field, array): (Field, ArrayRef) = match builder {
735 ColumnBuilder::Boolean(b) => (
736 Field::new(name, DataType::Boolean, true),
737 Arc::new(b.finish()),
738 ),
739 ColumnBuilder::String(b) => (
740 Field::new(name, dict_data_type(), true),
741 Arc::new(b.finish()),
742 ),
743 ColumnBuilder::Float(b) => (
744 Field::new(name, DataType::Float64, true),
745 Arc::new(b.finish()),
746 ),
747 ColumnBuilder::Integer(b) => (
748 Field::new(name, DataType::Int32, true),
749 Arc::new(b.finish()),
750 ),
751 ColumnBuilder::Datetime(b) => (
752 Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
753 Arc::new(b.finish()),
754 ),
755 };
756 fields.push(field);
757 arrays.push(array);
758 }
759
760 let schema = Arc::new(Schema::new(fields));
761 self.frozen = Some(
762 RecordBatch::try_new(schema, arrays)
763 .expect("RecordBatch construction should not fail for well-formed builders"),
764 );
765 }
766 self.frozen.as_ref().unwrap()
767 }
768
769 pub(crate) fn render_to_arrow_ipc(&mut self) -> Result<Vec<u8>, Box<dyn Error>> {
771 let batch = self.freeze().clone();
772 let schema = batch.schema();
773 let mut buf = Vec::new();
774 {
775 let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
776 writer.write(&batch)?;
777 writer.finish()?;
778 }
779 Ok(buf)
780 }
781
782 pub(crate) fn render_to_rows(
790 &mut self,
791 style: RowPathStyle,
792 ) -> Vec<IndexMap<String, VirtualDataCell>> {
793 let batch = self.freeze().clone();
794 let num_rows = batch.num_rows();
795 let schema = batch.schema();
796
797 (0..num_rows)
798 .map(|row_idx| {
799 let mut row = IndexMap::new();
800 if style == RowPathStyle::Sidecar
801 && let Some(ref rp) = self.row_path
802 && row_idx < rp.len()
803 {
804 row.insert(
805 "__ROW_PATH__".to_string(),
806 VirtualDataCell::RowPath(rp[row_idx].clone()),
807 );
808 }
809
810 for (col_idx, field) in schema.fields().iter().enumerate() {
811 if style == RowPathStyle::Sidecar && field.name().starts_with("__ROW_PATH_") {
812 continue;
813 }
814
815 let col = batch.column(col_idx);
816 let cell = if col.is_null(row_idx) {
817 match field.data_type() {
818 DataType::Boolean => VirtualDataCell::Boolean(None),
819 DataType::Utf8 | DataType::Dictionary(..) => {
820 VirtualDataCell::String(None)
821 },
822 DataType::Float64 => VirtualDataCell::Float(None),
823 DataType::Int32 => VirtualDataCell::Integer(None),
824 DataType::Timestamp(TimeUnit::Millisecond, _) => {
825 VirtualDataCell::Datetime(None)
826 },
827 _ => continue,
828 }
829 } else {
830 match field.data_type() {
831 DataType::Boolean => {
832 let arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
833 VirtualDataCell::Boolean(Some(arr.value(row_idx)))
834 },
835 DataType::Utf8 => {
836 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
837 VirtualDataCell::String(Some(arr.value(row_idx).to_string()))
838 },
839 DataType::Dictionary(..) => {
840 let dict = col.as_dictionary::<Int32Type>();
841 let values = dict.downcast_dict::<StringArray>().unwrap();
842 VirtualDataCell::String(Some(values.value(row_idx).to_string()))
843 },
844 DataType::Float64 => {
845 let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
846 VirtualDataCell::Float(Some(arr.value(row_idx)))
847 },
848 DataType::Int32 => {
849 let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
850 VirtualDataCell::Integer(Some(arr.value(row_idx)))
851 },
852 DataType::Int64 => {
853 let arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
855 VirtualDataCell::Float(Some(arr.value(row_idx) as f64))
856 },
857 DataType::Time64(TimeUnit::Microsecond) => {
858 let arr = col
859 .as_any()
860 .downcast_ref::<Time64MicrosecondArray>()
861 .unwrap();
862 VirtualDataCell::Float(Some(arr.value(row_idx) as f64))
863 },
864 DataType::Timestamp(TimeUnit::Microsecond, _) => {
865 let arr = col
866 .as_any()
867 .downcast_ref::<Time64MicrosecondArray>()
868 .unwrap();
869 VirtualDataCell::Datetime(Some(arr.value(row_idx) * 1000))
870 },
871 DataType::Timestamp(TimeUnit::Millisecond, _) => {
872 let arr = col
873 .as_any()
874 .downcast_ref::<TimestampMillisecondArray>()
875 .unwrap();
876 VirtualDataCell::Datetime(Some(arr.value(row_idx)))
877 },
878 DataType::Date32 => {
879 let arr = col.as_any().downcast_ref::<Date32Array>().unwrap();
880 VirtualDataCell::Datetime(Some(
881 arr.value(row_idx) as i64 * 86_400_000,
882 ))
883 },
884 x => {
885 tracing::error!("Unknown Arrow IPC type {}", x);
886 continue;
887 },
888 }
889 };
890 row.insert(field.name().clone(), cell);
891 }
892
893 row
894 })
895 .collect()
896 }
897
898 pub fn render_to_columns_json(
906 &mut self,
907 style: RowPathStyle,
908 ) -> Result<String, Box<dyn Error>> {
909 let batch = self.freeze().clone();
910 let schema = batch.schema();
911 let mut map = serde_json::Map::new();
912
913 if style == RowPathStyle::Sidecar
914 && let Some(ref rp) = self.row_path
915 {
916 map.insert("__ROW_PATH__".to_string(), serde_json::to_value(rp)?);
917 }
918
919 for (col_idx, field) in schema.fields().iter().enumerate() {
920 if style == RowPathStyle::Sidecar && field.name().starts_with("__ROW_PATH_") {
921 continue;
922 }
923
924 let col = batch.column(col_idx);
925 let num_rows = col.len();
926 let values: serde_json::Value = match field.data_type() {
927 DataType::Boolean => {
928 let arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
929 serde_json::to_value(
930 (0..num_rows)
931 .map(|i| {
932 if arr.is_null(i) {
933 None
934 } else {
935 Some(arr.value(i))
936 }
937 })
938 .collect::<Vec<_>>(),
939 )?
940 },
941 DataType::Utf8 => {
942 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
943 serde_json::to_value(
944 (0..num_rows)
945 .map(|i| {
946 if arr.is_null(i) {
947 None
948 } else {
949 Some(arr.value(i))
950 }
951 })
952 .collect::<Vec<_>>(),
953 )?
954 },
955 DataType::Dictionary(..) => {
956 let dict = col.as_dictionary::<Int32Type>();
957 let values = dict.downcast_dict::<StringArray>().unwrap();
958 serde_json::to_value(
959 (0..num_rows)
960 .map(|i| {
961 if col.is_null(i) {
962 None
963 } else {
964 Some(values.value(i))
965 }
966 })
967 .collect::<Vec<_>>(),
968 )?
969 },
970 DataType::Float64 => {
971 let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
972 serde_json::to_value(
973 (0..num_rows)
974 .map(|i| {
975 if arr.is_null(i) {
976 None
977 } else {
978 Some(arr.value(i))
979 }
980 })
981 .collect::<Vec<_>>(),
982 )?
983 },
984 DataType::Int32 => {
985 let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
986 serde_json::to_value(
987 (0..num_rows)
988 .map(|i| {
989 if arr.is_null(i) {
990 None
991 } else {
992 Some(arr.value(i))
993 }
994 })
995 .collect::<Vec<_>>(),
996 )?
997 },
998 DataType::Int64 => {
999 let arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1000 serde_json::to_value(
1001 (0..num_rows)
1002 .map(|i| {
1003 if arr.is_null(i) {
1004 None
1005 } else {
1006 Some(arr.value(i) as f64)
1007 }
1008 })
1009 .collect::<Vec<_>>(),
1010 )?
1011 },
1012 DataType::Timestamp(TimeUnit::Millisecond, _) => {
1013 let arr = col
1014 .as_any()
1015 .downcast_ref::<TimestampMillisecondArray>()
1016 .unwrap();
1017 serde_json::to_value(
1018 (0..num_rows)
1019 .map(|i| {
1020 if arr.is_null(i) {
1021 None
1022 } else {
1023 Some(arr.value(i))
1024 }
1025 })
1026 .collect::<Vec<_>>(),
1027 )?
1028 },
1029 DataType::Time64(TimeUnit::Microsecond) => {
1030 let arr = col
1031 .as_any()
1032 .downcast_ref::<Time64MicrosecondArray>()
1033 .unwrap();
1034 serde_json::to_value(
1035 (0..num_rows)
1036 .map(|i| {
1037 if arr.is_null(i) {
1038 None
1039 } else {
1040 Some(arr.value(i) as f64)
1041 }
1042 })
1043 .collect::<Vec<_>>(),
1044 )?
1045 },
1046 DataType::Date32 => {
1047 let arr = col.as_any().downcast_ref::<Date32Array>().unwrap();
1048 serde_json::to_value(
1049 (0..num_rows)
1050 .map(|i| {
1051 if arr.is_null(i) {
1052 None
1053 } else {
1054 Some(arr.value(i) as i64 * 86_400_000)
1055 }
1056 })
1057 .collect::<Vec<_>>(),
1058 )?
1059 },
1060 x => {
1061 tracing::error!("Unknown Arrow IPC type {}", x);
1062 continue;
1063 },
1064 };
1065 map.insert(field.name().clone(), values);
1066 }
1067
1068 Ok(serde_json::to_string(&map)?)
1069 }
1070
1071 pub fn set_col<T: SetVirtualDataColumn>(
1079 &mut self,
1080 name: &str,
1081 grouping_id: Option<usize>,
1082 index: usize,
1083 value: T,
1084 ) -> Result<(), Box<dyn Error>> {
1085 if name == "__GROUPING_ID__" {
1086 return Ok(());
1087 }
1088
1089 if name.starts_with("__ROW_PATH_") {
1090 let group_by_index: u32 = name[11..name.len() - 2].parse()?;
1091 let max_grouping_id =
1092 2_i32.pow((self.config.group_by.len() as u32) - group_by_index) - 1;
1093
1094 if grouping_id.map(|x| x as i32).unwrap_or(i32::MAX) < max_grouping_id {
1095 let col = self.row_path.get_or_insert_with(Vec::new);
1096 if let Some(row) = col.get_mut(index) {
1097 let scalar = value.to_scalar();
1098 row.push(scalar);
1099 } else {
1100 while col.len() < index {
1101 col.push(vec![])
1102 }
1103
1104 let scalar = value.to_scalar();
1105 col.push(vec![scalar]);
1106 }
1107 }
1108
1109 Ok(())
1110 } else {
1111 let col_name = if !self.config.split_by.is_empty() && !name.starts_with("__") {
1112 name.replace('_', "|")
1113 } else {
1114 name.to_owned()
1115 };
1116
1117 if !self.builders.contains_key(&col_name) {
1118 self.builders.insert(col_name.clone(), T::new_builder());
1119 }
1120
1121 let col = self
1122 .builders
1123 .get_mut(&col_name)
1124 .ok_or_else(|| format!("Column '{}' not found after insertion", col_name))?;
1125
1126 Ok(value.write_to(col)?)
1127 }
1128 }
1129}