Skip to main content

perspective_client/virtual_server/
data.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::error::Error;
14use std::sync::Arc;
15
16use arrow_array::builder::{
17    BooleanBuilder, Float64Builder, Int32Builder, StringBuilder, TimestampMillisecondBuilder,
18};
19use arrow_array::{
20    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, Float32Array,
21    Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray, RecordBatch,
22    StringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
23    Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
24    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
25    UInt64Array,
26};
27use arrow_ipc::reader::{FileReader, StreamReader};
28use arrow_ipc::writer::StreamWriter;
29use arrow_schema::{DataType, Field, Schema, TimeUnit};
30use indexmap::IndexMap;
31use serde::Serialize;
32
33use crate::config::{GroupRollupMode, Scalar, ViewConfig};
34
35/// An Arrow column builder, used during the population phase of
36/// [`VirtualDataSlice`].
37pub enum ColumnBuilder {
38    Boolean(BooleanBuilder),
39    String(StringBuilder),
40    Float(Float64Builder),
41    Integer(Int32Builder),
42    Datetime(TimestampMillisecondBuilder),
43}
44
45/// A single cell value in a row-oriented data representation.
46///
47/// Used when converting [`VirtualDataSlice`] to row format for JSON
48/// serialization.
49#[derive(Debug, Serialize)]
50#[serde(untagged)]
51pub enum VirtualDataCell {
52    Boolean(Option<bool>),
53    String(Option<String>),
54    Float(Option<f64>),
55    Integer(Option<i32>),
56    Datetime(Option<i64>),
57    RowPath(Vec<Scalar>),
58}
59
60/// Trait for types that can be written to a [`ColumnBuilder`] which
61/// enforces sequential construction.
62///
63/// This trait enables type-safe insertion of values into virtual data columns,
64/// ensuring that values are written to columns of the correct type.
65pub trait SetVirtualDataColumn {
66    /// Writes this value (sequentially) to the given column builder.
67    ///
68    /// Returns an error if the column type does not match the value type.
69    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str>;
70
71    /// Creates a new empty column builder of the appropriate type for this
72    /// value.
73    fn new_builder() -> ColumnBuilder;
74
75    /// Converts this value to a [`Scalar`] representation.
76    fn to_scalar(self) -> Scalar;
77}
78
79impl SetVirtualDataColumn for Option<String> {
80    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
81        if let ColumnBuilder::String(builder) = col {
82            match self {
83                Some(s) => builder.append_value(&s),
84                None => builder.append_null(),
85            }
86            Ok(())
87        } else {
88            Err("Bad type")
89        }
90    }
91
92    fn new_builder() -> ColumnBuilder {
93        ColumnBuilder::String(StringBuilder::new())
94    }
95
96    fn to_scalar(self) -> Scalar {
97        if let Some(x) = self {
98            Scalar::String(x)
99        } else {
100            Scalar::Null
101        }
102    }
103}
104
105impl SetVirtualDataColumn for Option<f64> {
106    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
107        if let ColumnBuilder::Float(builder) = col {
108            match self {
109                Some(v) => builder.append_value(v),
110                None => builder.append_null(),
111            }
112            Ok(())
113        } else {
114            Err("Bad type")
115        }
116    }
117
118    fn new_builder() -> ColumnBuilder {
119        ColumnBuilder::Float(Float64Builder::new())
120    }
121
122    fn to_scalar(self) -> Scalar {
123        if let Some(x) = self {
124            Scalar::Float(x)
125        } else {
126            Scalar::Null
127        }
128    }
129}
130
131impl SetVirtualDataColumn for Option<i32> {
132    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
133        if let ColumnBuilder::Integer(builder) = col {
134            match self {
135                Some(v) => builder.append_value(v),
136                None => builder.append_null(),
137            }
138            Ok(())
139        } else {
140            Err("Bad type")
141        }
142    }
143
144    fn new_builder() -> ColumnBuilder {
145        ColumnBuilder::Integer(Int32Builder::new())
146    }
147
148    fn to_scalar(self) -> Scalar {
149        if let Some(x) = self {
150            Scalar::Float(x as f64)
151        } else {
152            Scalar::Null
153        }
154    }
155}
156
157impl SetVirtualDataColumn for Option<i64> {
158    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
159        if let ColumnBuilder::Datetime(builder) = col {
160            match self {
161                Some(v) => builder.append_value(v),
162                None => builder.append_null(),
163            }
164            Ok(())
165        } else {
166            Err("Bad type")
167        }
168    }
169
170    fn new_builder() -> ColumnBuilder {
171        ColumnBuilder::Datetime(TimestampMillisecondBuilder::new())
172    }
173
174    fn to_scalar(self) -> Scalar {
175        if let Some(x) = self {
176            Scalar::Float(x as f64)
177        } else {
178            Scalar::Null
179        }
180    }
181}
182
183impl SetVirtualDataColumn for Option<bool> {
184    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
185        if let ColumnBuilder::Boolean(builder) = col {
186            match self {
187                Some(v) => builder.append_value(v),
188                None => builder.append_null(),
189            }
190            Ok(())
191        } else {
192            Err("Bad type")
193        }
194    }
195
196    fn new_builder() -> ColumnBuilder {
197        ColumnBuilder::Boolean(BooleanBuilder::new())
198    }
199
200    fn to_scalar(self) -> Scalar {
201        if let Some(x) = self {
202            Scalar::Bool(x)
203        } else {
204            Scalar::Null
205        }
206    }
207}
208
209/// A columnar data slice returned from a virtual server view query.
210///
211/// This struct represents a rectangular slice of data from a view, stored
212/// internally as Arrow builders during population and frozen into a
213/// `RecordBatch` on first consumption.
214#[derive(Debug)]
215pub struct VirtualDataSlice {
216    config: ViewConfig,
217    builders: IndexMap<String, ColumnBuilder>,
218    row_path: Option<Vec<Vec<Scalar>>>,
219    frozen: Option<RecordBatch>,
220}
221
222impl std::fmt::Debug for ColumnBuilder {
223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        match self {
225            ColumnBuilder::Boolean(_) => write!(f, "ColumnBuilder::Boolean(..)"),
226            ColumnBuilder::String(_) => write!(f, "ColumnBuilder::String(..)"),
227            ColumnBuilder::Float(_) => write!(f, "ColumnBuilder::Float(..)"),
228            ColumnBuilder::Integer(_) => write!(f, "ColumnBuilder::Integer(..)"),
229            ColumnBuilder::Datetime(_) => write!(f, "ColumnBuilder::Datetime(..)"),
230        }
231    }
232}
233
234/// Extracts grouping ID values from an Arrow array as `i64`.
235fn cast_to_int64(array: &ArrayRef) -> Result<Vec<i64>, Box<dyn Error>> {
236    let num_rows = array.len();
237    let mut result = Vec::with_capacity(num_rows);
238    match array.data_type() {
239        DataType::Int32 => {
240            let arr = array.as_any().downcast_ref::<Int32Array>().unwrap();
241            for i in 0..num_rows {
242                result.push(if arr.is_null(i) {
243                    0
244                } else {
245                    arr.value(i) as i64
246                });
247            }
248        },
249        DataType::Int64 => {
250            let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
251            for i in 0..num_rows {
252                result.push(if arr.is_null(i) { 0 } else { arr.value(i) });
253            }
254        },
255        DataType::Float64 => {
256            let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
257            for i in 0..num_rows {
258                result.push(if arr.is_null(i) {
259                    0
260                } else {
261                    arr.value(i) as i64
262                });
263            }
264        },
265        dt => return Err(format!("Cannot cast {} to Int64", dt).into()),
266    }
267    Ok(result)
268}
269
270/// Extracts a single cell from an Arrow array as a [`Scalar`].
271fn extract_scalar(array: &ArrayRef, row_idx: usize) -> Scalar {
272    if array.is_null(row_idx) {
273        return Scalar::Null;
274    }
275    match array.data_type() {
276        DataType::Utf8 => {
277            let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
278            Scalar::String(arr.value(row_idx).to_string())
279        },
280        DataType::Float64 => {
281            let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
282            Scalar::Float(arr.value(row_idx))
283        },
284        DataType::Int32 => {
285            let arr = array.as_any().downcast_ref::<Int32Array>().unwrap();
286            Scalar::Float(arr.value(row_idx) as f64)
287        },
288        DataType::Int64 => {
289            let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
290            Scalar::Float(arr.value(row_idx) as f64)
291        },
292        DataType::Boolean => {
293            let arr = array.as_any().downcast_ref::<BooleanArray>().unwrap();
294            Scalar::Bool(arr.value(row_idx))
295        },
296        DataType::Timestamp(TimeUnit::Millisecond, _) => {
297            let arr = array
298                .as_any()
299                .downcast_ref::<TimestampMillisecondArray>()
300                .unwrap();
301            Scalar::Float(arr.value(row_idx) as f64)
302        },
303        DataType::Date32 => {
304            let arr = array.as_any().downcast_ref::<Date32Array>().unwrap();
305            Scalar::Float(arr.value(row_idx) as f64 * 86_400_000.0)
306        },
307        _ => {
308            let scalar_arr = array.slice(row_idx, 1);
309            Scalar::String(format!("{:?}", scalar_arr))
310        },
311    }
312}
313
314/// Coerces an Arrow column to Perspective-compatible types, optionally
315/// renaming.
316/// Manually converts a timestamp array of any unit to milliseconds.
317fn timestamp_to_millis(array: &ArrayRef, unit: &TimeUnit) -> ArrayRef {
318    let millis: TimestampMillisecondArray = match unit {
319        TimeUnit::Second => {
320            let arr = array
321                .as_any()
322                .downcast_ref::<TimestampSecondArray>()
323                .unwrap();
324            arr.iter().map(|v| v.map(|v| v * 1_000)).collect()
325        },
326        TimeUnit::Microsecond => {
327            let arr = array
328                .as_any()
329                .downcast_ref::<TimestampMicrosecondArray>()
330                .unwrap();
331            arr.iter().map(|v| v.map(|v| v / 1_000)).collect()
332        },
333        TimeUnit::Nanosecond => {
334            let arr = array
335                .as_any()
336                .downcast_ref::<TimestampNanosecondArray>()
337                .unwrap();
338            arr.iter().map(|v| v.map(|v| v / 1_000_000)).collect()
339        },
340        TimeUnit::Millisecond => {
341            return array.clone();
342        },
343    };
344    Arc::new(millis) as ArrayRef
345}
346
347fn coerce_column(
348    name: &str,
349    field: &Field,
350    array: &ArrayRef,
351) -> Result<(Field, ArrayRef), Box<dyn Error>> {
352    match field.data_type() {
353        DataType::Boolean
354        | DataType::Utf8
355        | DataType::Float64
356        | DataType::Int32
357        | DataType::Date32 => Ok((
358            Field::new(name, field.data_type().clone(), true),
359            array.clone(),
360        )),
361        DataType::Timestamp(TimeUnit::Millisecond, _) => Ok((
362            Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
363            array.clone(),
364        )),
365        DataType::Int8 => {
366            let arr = array.as_any().downcast_ref::<Int8Array>().unwrap();
367            let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
368            Ok((
369                Field::new(name, DataType::Int32, true),
370                Arc::new(result) as ArrayRef,
371            ))
372        },
373        DataType::Int16 => {
374            let arr = array.as_any().downcast_ref::<Int16Array>().unwrap();
375            let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
376            Ok((
377                Field::new(name, DataType::Int32, true),
378                Arc::new(result) as ArrayRef,
379            ))
380        },
381        DataType::UInt8 => {
382            let arr = array.as_any().downcast_ref::<UInt8Array>().unwrap();
383            let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
384            Ok((
385                Field::new(name, DataType::Int32, true),
386                Arc::new(result) as ArrayRef,
387            ))
388        },
389        DataType::UInt16 => {
390            let arr = array.as_any().downcast_ref::<UInt16Array>().unwrap();
391            let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
392            Ok((
393                Field::new(name, DataType::Int32, true),
394                Arc::new(result) as ArrayRef,
395            ))
396        },
397        DataType::UInt32 => {
398            let arr = array.as_any().downcast_ref::<UInt32Array>().unwrap();
399            let result: Int64Array = arr.iter().map(|v| v.map(|v| v as i64)).collect();
400            let result: Float64Array = result.iter().map(|v| v.map(|v| v as f64)).collect();
401            Ok((
402                Field::new(name, DataType::Float64, true),
403                Arc::new(result) as ArrayRef,
404            ))
405        },
406        DataType::Int64 => {
407            let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
408            let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
409            Ok((
410                Field::new(name, DataType::Float64, true),
411                Arc::new(result) as ArrayRef,
412            ))
413        },
414        DataType::UInt64 => {
415            let arr = array.as_any().downcast_ref::<UInt64Array>().unwrap();
416            let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
417            Ok((
418                Field::new(name, DataType::Float64, true),
419                Arc::new(result) as ArrayRef,
420            ))
421        },
422        DataType::Float32 => {
423            let arr = array.as_any().downcast_ref::<Float32Array>().unwrap();
424            let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
425            Ok((
426                Field::new(name, DataType::Float64, true),
427                Arc::new(result) as ArrayRef,
428            ))
429        },
430        DataType::Decimal128(_, scale) => {
431            let scale = *scale;
432            let arr = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
433            let divisor = 10_f64.powi(scale as i32);
434            let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64 / divisor)).collect();
435            Ok((
436                Field::new(name, DataType::Float64, true),
437                Arc::new(result) as ArrayRef,
438            ))
439        },
440        DataType::Date64 => {
441            let arr = array.as_any().downcast_ref::<Date64Array>().unwrap();
442            let result: Date32Array = arr
443                .iter()
444                .map(|v| v.map(|v| (v / 86_400_000) as i32))
445                .collect();
446            Ok((
447                Field::new(name, DataType::Date32, true),
448                Arc::new(result) as ArrayRef,
449            ))
450        },
451        DataType::Timestamp(unit, _) => {
452            let casted = timestamp_to_millis(array, unit);
453            Ok((
454                Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
455                casted,
456            ))
457        },
458        DataType::Time32(TimeUnit::Second) => {
459            let arr = array.as_any().downcast_ref::<Time32SecondArray>().unwrap();
460            let result: TimestampMillisecondArray =
461                arr.iter().map(|v| v.map(|v| v as i64 * 1_000)).collect();
462            Ok((
463                Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
464                Arc::new(result) as ArrayRef,
465            ))
466        },
467        DataType::Time32(TimeUnit::Millisecond) => {
468            let arr = array
469                .as_any()
470                .downcast_ref::<Time32MillisecondArray>()
471                .unwrap();
472            let result: TimestampMillisecondArray =
473                arr.iter().map(|v| v.map(|v| v as i64)).collect();
474            Ok((
475                Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
476                Arc::new(result) as ArrayRef,
477            ))
478        },
479        DataType::Time64(TimeUnit::Microsecond) => {
480            let arr = array
481                .as_any()
482                .downcast_ref::<Time64MicrosecondArray>()
483                .unwrap();
484            let result: TimestampMillisecondArray =
485                arr.iter().map(|v| v.map(|v| v / 1_000)).collect();
486            Ok((
487                Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
488                Arc::new(result) as ArrayRef,
489            ))
490        },
491        DataType::Time64(TimeUnit::Nanosecond) => {
492            let arr = array
493                .as_any()
494                .downcast_ref::<Time64NanosecondArray>()
495                .unwrap();
496            let result: TimestampMillisecondArray =
497                arr.iter().map(|v| v.map(|v| v / 1_000_000)).collect();
498            Ok((
499                Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
500                Arc::new(result) as ArrayRef,
501            ))
502        },
503        DataType::LargeUtf8 => {
504            let arr = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
505            let result: StringArray = arr.iter().map(|v| v.map(|v| v.to_string())).collect();
506            Ok((
507                Field::new(name, DataType::Utf8, true),
508                Arc::new(result) as ArrayRef,
509            ))
510        },
511        dt => {
512            tracing::warn!(
513                "Coercing unknown Arrow type {} to Utf8 for column '{}'",
514                dt,
515                name
516            );
517            let num_rows = array.len();
518            let mut builder = StringBuilder::new();
519            for i in 0..num_rows {
520                if array.is_null(i) {
521                    builder.append_null();
522                } else {
523                    let scalar_arr = array.slice(i, 1);
524                    builder.append_value(format!("{:?}", scalar_arr));
525                }
526            }
527            Ok((
528                Field::new(name, DataType::Utf8, true),
529                Arc::new(builder.finish()) as ArrayRef,
530            ))
531        },
532    }
533}
534
535impl VirtualDataSlice {
536    pub fn new(config: ViewConfig) -> Self {
537        VirtualDataSlice {
538            config,
539            builders: IndexMap::default(),
540            row_path: None,
541            frozen: None,
542        }
543    }
544
545    /// Loads data from Arrow IPC file format bytes, with automatic
546    /// post-processing based on the view configuration.
547    ///
548    /// When `group_by` is active, extracts `__GROUPING_ID__` and
549    /// `__ROW_PATH_N__` columns to build `self.row_path`, then removes
550    /// them from the output `RecordBatch`.
551    ///
552    /// When `split_by` is active, renames data columns by replacing `_`
553    /// with `|` (the DuckDB PIVOT separator).
554    ///
555    /// Also coerces non-standard Arrow types (e.g. `Decimal128`, `Int64`)
556    /// to Perspective-compatible types.
557    pub fn from_arrow_ipc(&mut self, ipc: &[u8]) -> Result<(), Box<dyn Error>> {
558        let cursor = std::io::Cursor::new(ipc);
559        let batch = if &ipc[0..6] == "ARROW1".as_bytes() {
560            FileReader::try_new(cursor, None)?
561                .next()
562                .ok_or("Arrow IPC stream contained no record batches")??
563        } else {
564            StreamReader::try_new(cursor, None)?
565                .next()
566                .ok_or("Arrow IPC stream contained no record batches")??
567        };
568
569        let has_group_by = !self.config.group_by.is_empty();
570        let has_split_by = !self.config.split_by.is_empty();
571        let is_total = self.config.group_rollup_mode == GroupRollupMode::Total;
572
573        if !has_group_by && !has_split_by && !is_total {
574            self.frozen = Some(batch);
575            return Ok(());
576        }
577
578        let num_rows = batch.num_rows();
579        let schema = batch.schema();
580
581        // Phase A: Extract row_path from __GROUPING_ID__ and __ROW_PATH_N__
582        if has_group_by {
583            let group_by_len = self.config.group_by.len();
584            let is_flat = self.config.group_rollup_mode == GroupRollupMode::Flat;
585            let grouping_ids = if is_flat {
586                None
587            } else {
588                let grouping_id_idx = schema
589                    .index_of("__GROUPING_ID__")
590                    .map_err(|_| "Missing __GROUPING_ID__ column")?;
591                Some(cast_to_int64(batch.column(grouping_id_idx))?)
592            };
593
594            let mut row_paths: Vec<Vec<Scalar>> = (0..num_rows).map(|_| Vec::new()).collect();
595            for gidx in 0..group_by_len {
596                let col_name = format!("__ROW_PATH_{}__", gidx);
597                let col_idx = schema
598                    .index_of(&col_name)
599                    .map_err(|_| format!("Missing {} column", col_name))?;
600
601                let col = batch.column(col_idx);
602
603                // In flat mode, all rows are leaf rows
604                if is_flat {
605                    // TODO I may be dumb but I'm not exactly sure what Clippy
606                    // wants here. This could be an `enumerate` but how is this
607                    // better?
608                    #[allow(clippy::needless_range_loop)]
609                    for row_idx in 0..num_rows {
610                        row_paths[row_idx].push(extract_scalar(col, row_idx));
611                    }
612                } else {
613                    let gids = grouping_ids.as_ref().unwrap();
614                    let max_grouping_id = 2_i64.pow(group_by_len as u32 - gidx as u32) - 1;
615                    for row_idx in 0..num_rows {
616                        if gids[row_idx] < max_grouping_id {
617                            row_paths[row_idx].push(extract_scalar(col, row_idx));
618                        }
619                    }
620                }
621            }
622
623            self.row_path = Some(row_paths);
624        }
625
626        // Phase B: Rebuild RecordBatch without metadata columns, with
627        // column renames and type coercion.
628        let mut new_fields = Vec::new();
629        let mut new_arrays: Vec<ArrayRef> = Vec::new();
630        for (col_idx, field) in schema.fields().iter().enumerate() {
631            let name = field.name();
632            if name == "__GROUPING_ID__" || name.starts_with("__ROW_PATH_") {
633                continue;
634            }
635
636            let new_name = if has_split_by && !name.starts_with("__") {
637                name.replace('_', "|")
638            } else {
639                name.clone()
640            };
641
642            let (coerced_field, coerced_array) =
643                coerce_column(&new_name, field, batch.column(col_idx))?;
644            new_fields.push(coerced_field);
645            new_arrays.push(coerced_array);
646        }
647
648        let new_schema = Arc::new(Schema::new(new_fields));
649        self.frozen = if new_arrays.is_empty() {
650            Some(RecordBatch::new_empty(new_schema))
651        } else {
652            Some(RecordBatch::try_new(new_schema, new_arrays)?)
653        };
654        Ok(())
655    }
656
657    /// Freezes the builders into a `RecordBatch`. Idempotent — subsequent
658    /// calls return the cached batch.
659    pub(crate) fn freeze(&mut self) -> &RecordBatch {
660        if self.frozen.is_none() {
661            let mut fields = Vec::new();
662            let mut arrays: Vec<ArrayRef> = Vec::new();
663
664            for (name, builder) in &mut self.builders {
665                let (field, array): (Field, ArrayRef) = match builder {
666                    ColumnBuilder::Boolean(b) => (
667                        Field::new(name, DataType::Boolean, true),
668                        Arc::new(b.finish()),
669                    ),
670                    ColumnBuilder::String(b) => {
671                        (Field::new(name, DataType::Utf8, true), Arc::new(b.finish()))
672                    },
673                    ColumnBuilder::Float(b) => (
674                        Field::new(name, DataType::Float64, true),
675                        Arc::new(b.finish()),
676                    ),
677                    ColumnBuilder::Integer(b) => (
678                        Field::new(name, DataType::Int32, true),
679                        Arc::new(b.finish()),
680                    ),
681                    ColumnBuilder::Datetime(b) => (
682                        Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
683                        Arc::new(b.finish()),
684                    ),
685                };
686                fields.push(field);
687                arrays.push(array);
688            }
689
690            let schema = Arc::new(Schema::new(fields));
691            self.frozen = Some(
692                RecordBatch::try_new(schema, arrays)
693                    .expect("RecordBatch construction should not fail for well-formed builders"),
694            );
695        }
696        self.frozen.as_ref().unwrap()
697    }
698
699    /// Serializes the data to Arrow IPC streaming format.
700    pub(crate) fn render_to_arrow_ipc(&mut self) -> Result<Vec<u8>, Box<dyn Error>> {
701        let batch = self.freeze().clone();
702        let schema = batch.schema();
703        let mut buf = Vec::new();
704        {
705            let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
706            writer.write(&batch)?;
707            writer.finish()?;
708        }
709        Ok(buf)
710    }
711
712    /// Converts the columnar data to a row-oriented representation for JSON
713    /// serialization.
714    pub(crate) fn render_to_rows(&mut self) -> Vec<IndexMap<String, VirtualDataCell>> {
715        let batch = self.freeze().clone();
716        let num_rows = batch.num_rows();
717        let schema = batch.schema();
718
719        (0..num_rows)
720            .map(|row_idx| {
721                let mut row = IndexMap::new();
722
723                // Add RowPath column first if present
724                if let Some(ref rp) = self.row_path
725                    && row_idx < rp.len()
726                {
727                    row.insert(
728                        "__ROW_PATH__".to_string(),
729                        VirtualDataCell::RowPath(rp[row_idx].clone()),
730                    );
731                }
732
733                // Add Arrow columns
734                for (col_idx, field) in schema.fields().iter().enumerate() {
735                    let col = batch.column(col_idx);
736                    let cell = if col.is_null(row_idx) {
737                        match field.data_type() {
738                            DataType::Boolean => VirtualDataCell::Boolean(None),
739                            DataType::Utf8 => VirtualDataCell::String(None),
740                            DataType::Float64 => VirtualDataCell::Float(None),
741                            DataType::Int32 => VirtualDataCell::Integer(None),
742                            DataType::Timestamp(TimeUnit::Millisecond, _) => {
743                                VirtualDataCell::Datetime(None)
744                            },
745                            _ => continue,
746                        }
747                    } else {
748                        match field.data_type() {
749                            DataType::Boolean => {
750                                let arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
751                                VirtualDataCell::Boolean(Some(arr.value(row_idx)))
752                            },
753                            DataType::Utf8 => {
754                                let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
755                                VirtualDataCell::String(Some(arr.value(row_idx).to_string()))
756                            },
757                            DataType::Float64 => {
758                                let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
759                                VirtualDataCell::Float(Some(arr.value(row_idx)))
760                            },
761                            DataType::Int32 => {
762                                let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
763                                VirtualDataCell::Integer(Some(arr.value(row_idx)))
764                            },
765                            DataType::Timestamp(TimeUnit::Millisecond, _) => {
766                                let arr = col
767                                    .as_any()
768                                    .downcast_ref::<TimestampMillisecondArray>()
769                                    .unwrap();
770                                VirtualDataCell::Datetime(Some(arr.value(row_idx)))
771                            },
772                            DataType::Date32 => {
773                                let arr = col.as_any().downcast_ref::<Date32Array>().unwrap();
774                                VirtualDataCell::Datetime(Some(
775                                    arr.value(row_idx) as i64 * 86_400_000,
776                                ))
777                            },
778                            x => {
779                                tracing::error!("Unknown Arrow IPC type {}", x);
780                                continue;
781                            },
782                        }
783                    };
784                    row.insert(field.name().clone(), cell);
785                }
786
787                row
788            })
789            .collect()
790    }
791
792    /// Serializes the data to a column-oriented JSON string.
793    pub fn render_to_columns_json(&mut self) -> Result<String, Box<dyn Error>> {
794        let batch = self.freeze().clone();
795        let schema = batch.schema();
796        let mut map = serde_json::Map::new();
797
798        // Add RowPath if present
799        if let Some(ref rp) = self.row_path {
800            map.insert("__ROW_PATH__".to_string(), serde_json::to_value(rp)?);
801        }
802
803        for (col_idx, field) in schema.fields().iter().enumerate() {
804            let col = batch.column(col_idx);
805            let num_rows = col.len();
806            let values: serde_json::Value = match field.data_type() {
807                DataType::Boolean => {
808                    let arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
809                    serde_json::to_value(
810                        (0..num_rows)
811                            .map(|i| {
812                                if arr.is_null(i) {
813                                    None
814                                } else {
815                                    Some(arr.value(i))
816                                }
817                            })
818                            .collect::<Vec<_>>(),
819                    )?
820                },
821                DataType::Utf8 => {
822                    let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
823                    serde_json::to_value(
824                        (0..num_rows)
825                            .map(|i| {
826                                if arr.is_null(i) {
827                                    None
828                                } else {
829                                    Some(arr.value(i))
830                                }
831                            })
832                            .collect::<Vec<_>>(),
833                    )?
834                },
835                DataType::Float64 => {
836                    let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
837                    serde_json::to_value(
838                        (0..num_rows)
839                            .map(|i| {
840                                if arr.is_null(i) {
841                                    None
842                                } else {
843                                    Some(arr.value(i))
844                                }
845                            })
846                            .collect::<Vec<_>>(),
847                    )?
848                },
849                DataType::Int32 => {
850                    let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
851                    serde_json::to_value(
852                        (0..num_rows)
853                            .map(|i| {
854                                if arr.is_null(i) {
855                                    None
856                                } else {
857                                    Some(arr.value(i))
858                                }
859                            })
860                            .collect::<Vec<_>>(),
861                    )?
862                },
863                DataType::Timestamp(TimeUnit::Millisecond, _) => {
864                    let arr = col
865                        .as_any()
866                        .downcast_ref::<TimestampMillisecondArray>()
867                        .unwrap();
868                    serde_json::to_value(
869                        (0..num_rows)
870                            .map(|i| {
871                                if arr.is_null(i) {
872                                    None
873                                } else {
874                                    Some(arr.value(i))
875                                }
876                            })
877                            .collect::<Vec<_>>(),
878                    )?
879                },
880                DataType::Date32 => {
881                    let arr = col.as_any().downcast_ref::<Date32Array>().unwrap();
882                    serde_json::to_value(
883                        (0..num_rows)
884                            .map(|i| {
885                                if arr.is_null(i) {
886                                    None
887                                } else {
888                                    Some(arr.value(i) as i64 * 86_400_000)
889                                }
890                            })
891                            .collect::<Vec<_>>(),
892                    )?
893                },
894                x => {
895                    tracing::error!("Unknown Arrow IPC type {}", x);
896                    continue;
897                },
898            };
899            map.insert(field.name().clone(), values);
900        }
901
902        Ok(serde_json::to_string(&map)?)
903    }
904
905    /// Sets a value in a column at the specified row index.
906    ///
907    /// If `group_by_index` is `Some`, the value is added to the `__ROW_PATH__`
908    /// column as part of the row's group-by path. Otherwise, the value is
909    /// inserted into the named column.
910    ///
911    /// Creates the column if it does not already exist.
912    pub fn set_col<T: SetVirtualDataColumn>(
913        &mut self,
914        name: &str,
915        grouping_id: Option<usize>,
916        index: usize,
917        value: T,
918    ) -> Result<(), Box<dyn Error>> {
919        if name == "__GROUPING_ID__" {
920            return Ok(());
921        }
922
923        if name.starts_with("__ROW_PATH_") {
924            let group_by_index: u32 = name[11..name.len() - 2].parse()?;
925            let max_grouping_id =
926                2_i32.pow((self.config.group_by.len() as u32) - group_by_index) - 1;
927
928            if grouping_id.map(|x| x as i32).unwrap_or(i32::MAX) < max_grouping_id {
929                let col = self.row_path.get_or_insert_with(Vec::new);
930                if let Some(row) = col.get_mut(index) {
931                    let scalar = value.to_scalar();
932                    row.push(scalar);
933                } else {
934                    while col.len() < index {
935                        col.push(vec![])
936                    }
937
938                    let scalar = value.to_scalar();
939                    col.push(vec![scalar]);
940                }
941            }
942
943            Ok(())
944        } else {
945            let col_name = if !self.config.split_by.is_empty() && !name.starts_with("__") {
946                name.replace('_', "|")
947            } else {
948                name.to_owned()
949            };
950
951            if !self.builders.contains_key(&col_name) {
952                self.builders.insert(col_name.clone(), T::new_builder());
953            }
954
955            let col = self
956                .builders
957                .get_mut(&col_name)
958                .ok_or_else(|| format!("Column '{}' not found after insertion", col_name))?;
959
960            Ok(value.write_to(col)?)
961        }
962    }
963}