Skip to main content

perspective_client/virtual_server/
data.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::error::Error;
14use std::sync::Arc;
15
16use arrow_array::builder::{
17    BooleanBuilder, Float64Builder, Int32Builder, StringDictionaryBuilder,
18    TimestampMillisecondBuilder,
19};
20use arrow_array::cast::AsArray;
21use arrow_array::types::Int32Type;
22use arrow_array::{
23    Array, ArrayAccessor, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
24    Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, LargeStringArray,
25    RecordBatch, StringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
26    Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
27    TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
28    UInt64Array,
29};
30use arrow_ipc::reader::{FileReader, StreamReader};
31use arrow_ipc::writer::StreamWriter;
32use arrow_schema::{DataType, Field, Schema, TimeUnit};
33use indexmap::IndexMap;
34use serde::Serialize;
35
36use crate::config::{GroupRollupMode, Scalar, ViewConfig};
37
38/// An Arrow column builder, used during the population phase of
39/// [`VirtualDataSlice`].
40pub enum ColumnBuilder {
41    Boolean(BooleanBuilder),
42    String(StringDictionaryBuilder<Int32Type>),
43    Float(Float64Builder),
44    Integer(Int32Builder),
45    Datetime(TimestampMillisecondBuilder),
46}
47
48fn dict_data_type() -> DataType {
49    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
50}
51
52/// A single cell value in a row-oriented data representation.
53///
54/// Used when converting [`VirtualDataSlice`] to row format for JSON
55/// serialization.
56#[derive(Debug, Serialize)]
57#[serde(untagged)]
58pub enum VirtualDataCell {
59    Boolean(Option<bool>),
60    String(Option<String>),
61    Float(Option<f64>),
62    Integer(Option<i32>),
63    Datetime(Option<i64>),
64    RowPath(Vec<Scalar>),
65}
66
67#[derive(Copy, Clone, Debug, PartialEq, Eq)]
68pub enum RowPathStyle {
69    /// Legacy: emit a single `__ROW_PATH__` sidecar (per-row nested
70    /// array in `render_to_rows`, array-of-arrays in
71    /// `render_to_columns_json`). `__ROW_PATH_N__` per-level columns
72    /// are filtered out. Matches the native engine's `to_json` /
73    /// `to_columns` shape.
74    Sidecar,
75
76    /// Native: emit per-level `__ROW_PATH_0__`, `__ROW_PATH_1__`, …
77    /// columns directly. No `__ROW_PATH__` sidecar. Matches the native
78    /// engine's Arrow IPC, CSV, and NDJSON shapes.
79    PerLevel,
80}
81
82/// Trait for types that can be written to a [`ColumnBuilder`] which
83/// enforces sequential construction.
84///
85/// This trait enables type-safe insertion of values into virtual data columns,
86/// ensuring that values are written to columns of the correct type.
87pub trait SetVirtualDataColumn {
88    /// Writes this value (sequentially) to the given column builder.
89    ///
90    /// Returns an error if the column type does not match the value type.
91    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str>;
92
93    /// Creates a new empty column builder of the appropriate type for this
94    /// value.
95    fn new_builder() -> ColumnBuilder;
96
97    /// Converts this value to a [`Scalar`] representation.
98    fn to_scalar(self) -> Scalar;
99}
100
101impl SetVirtualDataColumn for Option<String> {
102    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
103        if let ColumnBuilder::String(builder) = col {
104            match self {
105                Some(s) => builder.append_value(&s),
106                None => builder.append_null(),
107            }
108            Ok(())
109        } else {
110            Err("Bad type")
111        }
112    }
113
114    fn new_builder() -> ColumnBuilder {
115        ColumnBuilder::String(StringDictionaryBuilder::new())
116    }
117
118    fn to_scalar(self) -> Scalar {
119        if let Some(x) = self {
120            Scalar::String(x)
121        } else {
122            Scalar::Null
123        }
124    }
125}
126
127impl SetVirtualDataColumn for Option<f64> {
128    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
129        if let ColumnBuilder::Float(builder) = col {
130            match self {
131                Some(v) => builder.append_value(v),
132                None => builder.append_null(),
133            }
134            Ok(())
135        } else {
136            Err("Bad type")
137        }
138    }
139
140    fn new_builder() -> ColumnBuilder {
141        ColumnBuilder::Float(Float64Builder::new())
142    }
143
144    fn to_scalar(self) -> Scalar {
145        if let Some(x) = self {
146            Scalar::Float(x)
147        } else {
148            Scalar::Null
149        }
150    }
151}
152
153impl SetVirtualDataColumn for Option<i32> {
154    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
155        if let ColumnBuilder::Integer(builder) = col {
156            match self {
157                Some(v) => builder.append_value(v),
158                None => builder.append_null(),
159            }
160            Ok(())
161        } else {
162            Err("Bad type")
163        }
164    }
165
166    fn new_builder() -> ColumnBuilder {
167        ColumnBuilder::Integer(Int32Builder::new())
168    }
169
170    fn to_scalar(self) -> Scalar {
171        if let Some(x) = self {
172            Scalar::Float(x as f64)
173        } else {
174            Scalar::Null
175        }
176    }
177}
178
179impl SetVirtualDataColumn for Option<i64> {
180    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
181        if let ColumnBuilder::Datetime(builder) = col {
182            match self {
183                Some(v) => builder.append_value(v),
184                None => builder.append_null(),
185            }
186            Ok(())
187        } else {
188            Err("Bad type")
189        }
190    }
191
192    fn new_builder() -> ColumnBuilder {
193        ColumnBuilder::Datetime(TimestampMillisecondBuilder::new())
194    }
195
196    fn to_scalar(self) -> Scalar {
197        if let Some(x) = self {
198            Scalar::Float(x as f64)
199        } else {
200            Scalar::Null
201        }
202    }
203}
204
205impl SetVirtualDataColumn for Option<bool> {
206    fn write_to(self, col: &mut ColumnBuilder) -> Result<(), &'static str> {
207        if let ColumnBuilder::Boolean(builder) = col {
208            match self {
209                Some(v) => builder.append_value(v),
210                None => builder.append_null(),
211            }
212            Ok(())
213        } else {
214            Err("Bad type")
215        }
216    }
217
218    fn new_builder() -> ColumnBuilder {
219        ColumnBuilder::Boolean(BooleanBuilder::new())
220    }
221
222    fn to_scalar(self) -> Scalar {
223        if let Some(x) = self {
224            Scalar::Bool(x)
225        } else {
226            Scalar::Null
227        }
228    }
229}
230
231/// A columnar data slice returned from a virtual server view query.
232///
233/// This struct represents a rectangular slice of data from a view, stored
234/// internally as Arrow builders during population and frozen into a
235/// `RecordBatch` on first consumption.
236#[derive(Debug)]
237pub struct VirtualDataSlice {
238    config: ViewConfig,
239    builders: IndexMap<String, ColumnBuilder>,
240    row_path: Option<Vec<Vec<Scalar>>>,
241    frozen: Option<RecordBatch>,
242}
243
244impl std::fmt::Debug for ColumnBuilder {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        match self {
247            ColumnBuilder::Boolean(_) => write!(f, "ColumnBuilder::Boolean(..)"),
248            ColumnBuilder::String(_) => write!(f, "ColumnBuilder::String(..)"),
249            ColumnBuilder::Float(_) => write!(f, "ColumnBuilder::Float(..)"),
250            ColumnBuilder::Integer(_) => write!(f, "ColumnBuilder::Integer(..)"),
251            ColumnBuilder::Datetime(_) => write!(f, "ColumnBuilder::Datetime(..)"),
252        }
253    }
254}
255
256/// Extracts grouping ID values from an Arrow array as `i64`.
257fn cast_to_int64(array: &ArrayRef) -> Result<Vec<i64>, Box<dyn Error>> {
258    let num_rows = array.len();
259    let mut result = Vec::with_capacity(num_rows);
260    match array.data_type() {
261        DataType::Int32 => {
262            let arr = array.as_any().downcast_ref::<Int32Array>().unwrap();
263            for i in 0..num_rows {
264                result.push(if arr.is_null(i) {
265                    0
266                } else {
267                    arr.value(i) as i64
268                });
269            }
270        },
271        DataType::Int64 => {
272            let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
273            for i in 0..num_rows {
274                result.push(if arr.is_null(i) { 0 } else { arr.value(i) });
275            }
276        },
277        DataType::Float64 => {
278            let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
279            for i in 0..num_rows {
280                result.push(if arr.is_null(i) {
281                    0
282                } else {
283                    arr.value(i) as i64
284                });
285            }
286        },
287        dt => return Err(format!("Cannot cast {} to Int64", dt).into()),
288    }
289    Ok(result)
290}
291
292/// Extracts a single cell from an Arrow array as a [`Scalar`].
293fn extract_scalar(array: &ArrayRef, row_idx: usize) -> Scalar {
294    if array.is_null(row_idx) {
295        return Scalar::Null;
296    }
297    match array.data_type() {
298        DataType::Utf8 => {
299            let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
300            Scalar::String(arr.value(row_idx).to_string())
301        },
302        DataType::Dictionary(..) => {
303            let dict = array.as_dictionary::<Int32Type>();
304            let values = dict.downcast_dict::<StringArray>().unwrap();
305            Scalar::String(values.value(row_idx).to_string())
306        },
307        DataType::Float64 => {
308            let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
309            Scalar::Float(arr.value(row_idx))
310        },
311        DataType::Int32 => {
312            let arr = array.as_any().downcast_ref::<Int32Array>().unwrap();
313            Scalar::Float(arr.value(row_idx) as f64)
314        },
315        DataType::Int64 => {
316            let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
317            Scalar::Float(arr.value(row_idx) as f64)
318        },
319        DataType::Boolean => {
320            let arr = array.as_any().downcast_ref::<BooleanArray>().unwrap();
321            Scalar::Bool(arr.value(row_idx))
322        },
323        DataType::Timestamp(TimeUnit::Millisecond, _) => {
324            let arr = array
325                .as_any()
326                .downcast_ref::<TimestampMillisecondArray>()
327                .unwrap();
328            Scalar::Float(arr.value(row_idx) as f64)
329        },
330        DataType::Date32 => {
331            let arr = array.as_any().downcast_ref::<Date32Array>().unwrap();
332            Scalar::Float(arr.value(row_idx) as f64 * 86_400_000.0)
333        },
334        _ => {
335            let scalar_arr = array.slice(row_idx, 1);
336            Scalar::String(format!("{:?}", scalar_arr))
337        },
338    }
339}
340
341/// Coerces an Arrow column to Perspective-compatible types, optionally
342/// renaming.
343/// Manually converts a timestamp array of any unit to milliseconds.
344fn timestamp_to_millis(array: &ArrayRef, unit: &TimeUnit) -> ArrayRef {
345    let millis: TimestampMillisecondArray = match unit {
346        TimeUnit::Second => {
347            let arr = array
348                .as_any()
349                .downcast_ref::<TimestampSecondArray>()
350                .unwrap();
351            arr.iter().map(|v| v.map(|v| v * 1_000)).collect()
352        },
353        TimeUnit::Microsecond => {
354            let arr = array
355                .as_any()
356                .downcast_ref::<TimestampMicrosecondArray>()
357                .unwrap();
358            arr.iter().map(|v| v.map(|v| v / 1_000)).collect()
359        },
360        TimeUnit::Nanosecond => {
361            let arr = array
362                .as_any()
363                .downcast_ref::<TimestampNanosecondArray>()
364                .unwrap();
365            arr.iter().map(|v| v.map(|v| v / 1_000_000)).collect()
366        },
367        TimeUnit::Millisecond => {
368            return array.clone();
369        },
370    };
371    Arc::new(millis) as ArrayRef
372}
373
374fn coerce_column(
375    name: &str,
376    field: &Field,
377    array: &ArrayRef,
378) -> Result<(Field, ArrayRef), Box<dyn Error>> {
379    match field.data_type() {
380        DataType::Boolean | DataType::Float64 | DataType::Int32 | DataType::Date32 => Ok((
381            Field::new(name, field.data_type().clone(), true),
382            array.clone(),
383        )),
384        DataType::Dictionary(..) => Ok((Field::new(name, dict_data_type(), true), array.clone())),
385        DataType::Utf8 => {
386            let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
387            let mut builder = StringDictionaryBuilder::<Int32Type>::new();
388            for i in 0..arr.len() {
389                if arr.is_null(i) {
390                    builder.append_null();
391                } else {
392                    builder.append_value(arr.value(i));
393                }
394            }
395            Ok((
396                Field::new(name, dict_data_type(), true),
397                Arc::new(builder.finish()) as ArrayRef,
398            ))
399        },
400        DataType::Timestamp(TimeUnit::Millisecond, _) => Ok((
401            Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
402            array.clone(),
403        )),
404        DataType::Int8 => {
405            let arr = array.as_any().downcast_ref::<Int8Array>().unwrap();
406            let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
407            Ok((
408                Field::new(name, DataType::Int32, true),
409                Arc::new(result) as ArrayRef,
410            ))
411        },
412        DataType::Int16 => {
413            let arr = array.as_any().downcast_ref::<Int16Array>().unwrap();
414            let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
415            Ok((
416                Field::new(name, DataType::Int32, true),
417                Arc::new(result) as ArrayRef,
418            ))
419        },
420        DataType::UInt8 => {
421            let arr = array.as_any().downcast_ref::<UInt8Array>().unwrap();
422            let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
423            Ok((
424                Field::new(name, DataType::Int32, true),
425                Arc::new(result) as ArrayRef,
426            ))
427        },
428        DataType::UInt16 => {
429            let arr = array.as_any().downcast_ref::<UInt16Array>().unwrap();
430            let result: Int32Array = arr.iter().map(|v| v.map(|v| v as i32)).collect();
431            Ok((
432                Field::new(name, DataType::Int32, true),
433                Arc::new(result) as ArrayRef,
434            ))
435        },
436        DataType::UInt32 => {
437            let arr = array.as_any().downcast_ref::<UInt32Array>().unwrap();
438            let result: Int64Array = arr.iter().map(|v| v.map(|v| v as i64)).collect();
439            let result: Float64Array = result.iter().map(|v| v.map(|v| v as f64)).collect();
440            Ok((
441                Field::new(name, DataType::Float64, true),
442                Arc::new(result) as ArrayRef,
443            ))
444        },
445        DataType::Int64 => {
446            let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
447            let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
448            Ok((
449                Field::new(name, DataType::Float64, true),
450                Arc::new(result) as ArrayRef,
451            ))
452        },
453        DataType::UInt64 => {
454            let arr = array.as_any().downcast_ref::<UInt64Array>().unwrap();
455            let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
456            Ok((
457                Field::new(name, DataType::Float64, true),
458                Arc::new(result) as ArrayRef,
459            ))
460        },
461        DataType::Float32 => {
462            let arr = array.as_any().downcast_ref::<Float32Array>().unwrap();
463            let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64)).collect();
464            Ok((
465                Field::new(name, DataType::Float64, true),
466                Arc::new(result) as ArrayRef,
467            ))
468        },
469        DataType::Decimal128(_, scale) => {
470            let scale = *scale;
471            let arr = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
472            let divisor = 10_f64.powi(scale as i32);
473            let result: Float64Array = arr.iter().map(|v| v.map(|v| v as f64 / divisor)).collect();
474            Ok((
475                Field::new(name, DataType::Float64, true),
476                Arc::new(result) as ArrayRef,
477            ))
478        },
479        DataType::Date64 => {
480            let arr = array.as_any().downcast_ref::<Date64Array>().unwrap();
481            let result: Date32Array = arr
482                .iter()
483                .map(|v| v.map(|v| (v / 86_400_000) as i32))
484                .collect();
485            Ok((
486                Field::new(name, DataType::Date32, true),
487                Arc::new(result) as ArrayRef,
488            ))
489        },
490        DataType::Timestamp(unit, _) => {
491            let casted = timestamp_to_millis(array, unit);
492            Ok((
493                Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
494                casted,
495            ))
496        },
497        DataType::Time32(TimeUnit::Second) => {
498            let arr = array.as_any().downcast_ref::<Time32SecondArray>().unwrap();
499            let result: TimestampMillisecondArray =
500                arr.iter().map(|v| v.map(|v| v as i64 * 1_000)).collect();
501            Ok((
502                Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
503                Arc::new(result) as ArrayRef,
504            ))
505        },
506        DataType::Time32(TimeUnit::Millisecond) => {
507            let arr = array
508                .as_any()
509                .downcast_ref::<Time32MillisecondArray>()
510                .unwrap();
511            let result: TimestampMillisecondArray =
512                arr.iter().map(|v| v.map(|v| v as i64)).collect();
513            Ok((
514                Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
515                Arc::new(result) as ArrayRef,
516            ))
517        },
518        DataType::Time64(TimeUnit::Microsecond) => {
519            let arr = array
520                .as_any()
521                .downcast_ref::<Time64MicrosecondArray>()
522                .unwrap();
523            let result: TimestampMillisecondArray =
524                arr.iter().map(|v| v.map(|v| v / 1_000)).collect();
525            Ok((
526                Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
527                Arc::new(result) as ArrayRef,
528            ))
529        },
530        DataType::Time64(TimeUnit::Nanosecond) => {
531            let arr = array
532                .as_any()
533                .downcast_ref::<Time64NanosecondArray>()
534                .unwrap();
535            let result: TimestampMillisecondArray =
536                arr.iter().map(|v| v.map(|v| v / 1_000_000)).collect();
537            Ok((
538                Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
539                Arc::new(result) as ArrayRef,
540            ))
541        },
542        DataType::LargeUtf8 => {
543            let arr = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
544            let mut builder = StringDictionaryBuilder::<Int32Type>::new();
545            for i in 0..arr.len() {
546                if arr.is_null(i) {
547                    builder.append_null();
548                } else {
549                    builder.append_value(arr.value(i));
550                }
551            }
552            Ok((
553                Field::new(name, dict_data_type(), true),
554                Arc::new(builder.finish()) as ArrayRef,
555            ))
556        },
557        dt => {
558            tracing::warn!(
559                "Coercing unknown Arrow type {} to Dictionary for column '{}'",
560                dt,
561                name
562            );
563            let num_rows = array.len();
564            let mut builder = StringDictionaryBuilder::<Int32Type>::new();
565            for i in 0..num_rows {
566                if array.is_null(i) {
567                    builder.append_null();
568                } else {
569                    let scalar_arr = array.slice(i, 1);
570                    builder.append_value(format!("{:?}", scalar_arr));
571                }
572            }
573            Ok((
574                Field::new(name, dict_data_type(), true),
575                Arc::new(builder.finish()) as ArrayRef,
576            ))
577        },
578    }
579}
580
581impl VirtualDataSlice {
582    pub fn new(config: ViewConfig) -> Self {
583        VirtualDataSlice {
584            config,
585            builders: IndexMap::default(),
586            row_path: None,
587            frozen: None,
588        }
589    }
590
591    /// Loads data from Arrow IPC file format bytes, with automatic
592    /// post-processing based on the view configuration.
593    ///
594    /// When `group_by` is active, extracts `__GROUPING_ID__` and
595    /// `__ROW_PATH_N__` columns to build `self.row_path`, then removes
596    /// `__GROUPING_ID__` from the output `RecordBatch`. The
597    /// `__ROW_PATH_N__` columns are *kept* in the frozen batch so
598    /// downstream Arrow IPC consumers (`with_typed_arrays`, used by
599    /// viewer-charts to drive its categorical/numeric axis resolvers
600    /// and tree-hierarchy walkers) see them inline — matching the
601    /// native `perspective-server`'s `to_arrow` output when
602    /// `emit_legacy_row_path_names: false`.
603    ///
604    /// When `split_by` is active, renames data columns by replacing `_`
605    /// with `|` (the DuckDB PIVOT separator).
606    ///
607    /// Also coerces non-standard Arrow types (e.g. `Decimal128`, `Int64`)
608    /// to Perspective-compatible types.
609    pub fn from_arrow_ipc(&mut self, ipc: &[u8]) -> Result<(), Box<dyn Error>> {
610        let cursor = std::io::Cursor::new(ipc);
611        let batches: Vec<RecordBatch> = if &ipc[0..6] == "ARROW1".as_bytes() {
612            FileReader::try_new(cursor, None)?.collect::<Result<Vec<_>, _>>()?
613        } else {
614            StreamReader::try_new(cursor, None)?.collect::<Result<Vec<_>, _>>()?
615        };
616
617        let batch = match batches.len() {
618            0 => return Err("Arrow IPC stream contained no record batches".into()),
619            1 => batches.into_iter().next().unwrap(),
620            _ => arrow_select::concat::concat_batches(&batches[0].schema(), &batches)?,
621        };
622
623        let has_group_by = !self.config.group_by.is_empty();
624        let has_split_by = !self.config.split_by.is_empty();
625        let is_total = self.config.group_rollup_mode == GroupRollupMode::Total;
626
627        if !has_group_by && !has_split_by && !is_total {
628            self.frozen = Some(batch);
629            return Ok(());
630        }
631
632        let num_rows = batch.num_rows();
633        let schema = batch.schema();
634
635        // Phase A: Extract row_path from __GROUPING_ID__ and __ROW_PATH_N__
636        if has_group_by {
637            let group_by_len = self.config.group_by.len();
638            let is_flat = self.config.group_rollup_mode == GroupRollupMode::Flat;
639            let grouping_ids = if is_flat {
640                None
641            } else {
642                let grouping_id_idx = schema
643                    .index_of("__GROUPING_ID__")
644                    .map_err(|_| "Missing __GROUPING_ID__ column")?;
645                Some(cast_to_int64(batch.column(grouping_id_idx))?)
646            };
647
648            let mut row_paths: Vec<Vec<Scalar>> = (0..num_rows).map(|_| Vec::new()).collect();
649            for gidx in 0..group_by_len {
650                let col_name = format!("__ROW_PATH_{}__", gidx);
651                let col_idx = schema
652                    .index_of(&col_name)
653                    .map_err(|_| format!("Missing {} column", col_name))?;
654
655                let col = batch.column(col_idx);
656
657                // In flat mode, all rows are leaf rows
658                if is_flat {
659                    // TODO I may be dumb but I'm not exactly sure what Clippy
660                    // wants here. This could be an `enumerate` but how is this
661                    // better?
662                    #[allow(clippy::needless_range_loop)]
663                    for row_idx in 0..num_rows {
664                        row_paths[row_idx].push(extract_scalar(col, row_idx));
665                    }
666                } else {
667                    let gids = grouping_ids.as_ref().unwrap();
668                    let max_grouping_id = 2_i64.pow(group_by_len as u32 - gidx as u32) - 1;
669                    for row_idx in 0..num_rows {
670                        if gids[row_idx] < max_grouping_id {
671                            row_paths[row_idx].push(extract_scalar(col, row_idx));
672                        }
673                    }
674                }
675            }
676
677            self.row_path = Some(row_paths);
678        }
679
680        // Phase B: Rebuild RecordBatch without metadata columns, with
681        // column renames and type coercion.
682        let mut new_fields = Vec::new();
683        let mut new_arrays: Vec<ArrayRef> = Vec::new();
684        for (col_idx, field) in schema.fields().iter().enumerate() {
685            let name = field.name();
686            // `__GROUPING_ID__` is an internal SQL-rollup discriminator
687            // (used in Phase A above to decide which row-path levels
688            // belong to each row). No JS consumer reads it, so it's
689            // dropped from the frozen batch.
690            //
691            // `__ROW_PATH_N__` columns are kept. Phase A copied their
692            // values into `self.row_path` for the JSON sidecar paths
693            // (`render_to_columns_json`, `render_to_rows`), but
694            // viewer-charts' `with_typed_arrays` callback needs the
695            // per-level columns inline in the Arrow stream — its
696            // categorical-axis resolver, numeric-position lookup, and
697            // tree hierarchy walker all do `columns.get(\`__ROW_PATH_${n}__\`)`.
698            // Keeping the columns here lets `render_to_arrow_ipc`
699            // serialize them naturally, matching native
700            // `perspective-server`'s `to_arrow` output.
701            if name == "__GROUPING_ID__" {
702                continue;
703            }
704
705            let new_name = if has_split_by && !name.starts_with("__") {
706                name.replace('_', "|")
707            } else {
708                name.clone()
709            };
710
711            let (coerced_field, coerced_array) =
712                coerce_column(&new_name, field, batch.column(col_idx))?;
713            new_fields.push(coerced_field);
714            new_arrays.push(coerced_array);
715        }
716
717        let new_schema = Arc::new(Schema::new(new_fields));
718        self.frozen = if new_arrays.is_empty() {
719            Some(RecordBatch::new_empty(new_schema))
720        } else {
721            Some(RecordBatch::try_new(new_schema, new_arrays)?)
722        };
723        Ok(())
724    }
725
726    /// Freezes the builders into a `RecordBatch`. Idempotent — subsequent
727    /// calls return the cached batch.
728    pub(crate) fn freeze(&mut self) -> &RecordBatch {
729        if self.frozen.is_none() {
730            let mut fields = Vec::new();
731            let mut arrays: Vec<ArrayRef> = Vec::new();
732
733            for (name, builder) in &mut self.builders {
734                let (field, array): (Field, ArrayRef) = match builder {
735                    ColumnBuilder::Boolean(b) => (
736                        Field::new(name, DataType::Boolean, true),
737                        Arc::new(b.finish()),
738                    ),
739                    ColumnBuilder::String(b) => (
740                        Field::new(name, dict_data_type(), true),
741                        Arc::new(b.finish()),
742                    ),
743                    ColumnBuilder::Float(b) => (
744                        Field::new(name, DataType::Float64, true),
745                        Arc::new(b.finish()),
746                    ),
747                    ColumnBuilder::Integer(b) => (
748                        Field::new(name, DataType::Int32, true),
749                        Arc::new(b.finish()),
750                    ),
751                    ColumnBuilder::Datetime(b) => (
752                        Field::new(name, DataType::Timestamp(TimeUnit::Millisecond, None), true),
753                        Arc::new(b.finish()),
754                    ),
755                };
756                fields.push(field);
757                arrays.push(array);
758            }
759
760            let schema = Arc::new(Schema::new(fields));
761            self.frozen = Some(
762                RecordBatch::try_new(schema, arrays)
763                    .expect("RecordBatch construction should not fail for well-formed builders"),
764            );
765        }
766        self.frozen.as_ref().unwrap()
767    }
768
769    /// Serializes the data to Arrow IPC streaming format.
770    pub(crate) fn render_to_arrow_ipc(&mut self) -> Result<Vec<u8>, Box<dyn Error>> {
771        let batch = self.freeze().clone();
772        let schema = batch.schema();
773        let mut buf = Vec::new();
774        {
775            let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
776            writer.write(&batch)?;
777            writer.finish()?;
778        }
779        Ok(buf)
780    }
781
782    /// Converts the columnar data to a row-oriented representation for JSON
783    /// serialization.
784    ///
785    /// `style` selects between the legacy `__ROW_PATH__` sidecar
786    /// (`Sidecar`, used by `to_json`) and the native per-level
787    /// `__ROW_PATH_N__` columns (`PerLevel`, used by `to_csv` /
788    /// `to_ndjson`). See [`RowPathStyle`] for the deprecation plan.
789    pub(crate) fn render_to_rows(
790        &mut self,
791        style: RowPathStyle,
792    ) -> Vec<IndexMap<String, VirtualDataCell>> {
793        let batch = self.freeze().clone();
794        let num_rows = batch.num_rows();
795        let schema = batch.schema();
796
797        (0..num_rows)
798            .map(|row_idx| {
799                let mut row = IndexMap::new();
800                if style == RowPathStyle::Sidecar
801                    && let Some(ref rp) = self.row_path
802                    && row_idx < rp.len()
803                {
804                    row.insert(
805                        "__ROW_PATH__".to_string(),
806                        VirtualDataCell::RowPath(rp[row_idx].clone()),
807                    );
808                }
809
810                for (col_idx, field) in schema.fields().iter().enumerate() {
811                    if style == RowPathStyle::Sidecar && field.name().starts_with("__ROW_PATH_") {
812                        continue;
813                    }
814
815                    let col = batch.column(col_idx);
816                    let cell = if col.is_null(row_idx) {
817                        match field.data_type() {
818                            DataType::Boolean => VirtualDataCell::Boolean(None),
819                            DataType::Utf8 | DataType::Dictionary(..) => {
820                                VirtualDataCell::String(None)
821                            },
822                            DataType::Float64 => VirtualDataCell::Float(None),
823                            DataType::Int32 => VirtualDataCell::Integer(None),
824                            DataType::Timestamp(TimeUnit::Millisecond, _) => {
825                                VirtualDataCell::Datetime(None)
826                            },
827                            _ => continue,
828                        }
829                    } else {
830                        match field.data_type() {
831                            DataType::Boolean => {
832                                let arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
833                                VirtualDataCell::Boolean(Some(arr.value(row_idx)))
834                            },
835                            DataType::Utf8 => {
836                                let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
837                                VirtualDataCell::String(Some(arr.value(row_idx).to_string()))
838                            },
839                            DataType::Dictionary(..) => {
840                                let dict = col.as_dictionary::<Int32Type>();
841                                let values = dict.downcast_dict::<StringArray>().unwrap();
842                                VirtualDataCell::String(Some(values.value(row_idx).to_string()))
843                            },
844                            DataType::Float64 => {
845                                let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
846                                VirtualDataCell::Float(Some(arr.value(row_idx)))
847                            },
848                            DataType::Int32 => {
849                                let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
850                                VirtualDataCell::Integer(Some(arr.value(row_idx)))
851                            },
852                            DataType::Int64 => {
853                                // TODO ????
854                                let arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
855                                VirtualDataCell::Float(Some(arr.value(row_idx) as f64))
856                            },
857                            DataType::Time64(TimeUnit::Microsecond) => {
858                                let arr = col
859                                    .as_any()
860                                    .downcast_ref::<Time64MicrosecondArray>()
861                                    .unwrap();
862                                VirtualDataCell::Float(Some(arr.value(row_idx) as f64))
863                            },
864                            DataType::Timestamp(TimeUnit::Microsecond, _) => {
865                                let arr = col
866                                    .as_any()
867                                    .downcast_ref::<Time64MicrosecondArray>()
868                                    .unwrap();
869                                VirtualDataCell::Datetime(Some(arr.value(row_idx) * 1000))
870                            },
871                            DataType::Timestamp(TimeUnit::Millisecond, _) => {
872                                let arr = col
873                                    .as_any()
874                                    .downcast_ref::<TimestampMillisecondArray>()
875                                    .unwrap();
876                                VirtualDataCell::Datetime(Some(arr.value(row_idx)))
877                            },
878                            DataType::Date32 => {
879                                let arr = col.as_any().downcast_ref::<Date32Array>().unwrap();
880                                VirtualDataCell::Datetime(Some(
881                                    arr.value(row_idx) as i64 * 86_400_000,
882                                ))
883                            },
884                            x => {
885                                tracing::error!("Unknown Arrow IPC type {}", x);
886                                continue;
887                            },
888                        }
889                    };
890                    row.insert(field.name().clone(), cell);
891                }
892
893                row
894            })
895            .collect()
896    }
897
898    /// Serializes the data to a column-oriented JSON string.
899    ///
900    /// `style` selects between the legacy `__ROW_PATH__` sidecar
901    /// (`Sidecar`, used by `to_columns`) and the native per-level
902    /// `__ROW_PATH_N__` columns (`PerLevel`, currently unused — reserved
903    /// for the future deprecation of `__ROW_PATH__`). See
904    /// [`RowPathStyle`] for context.
905    pub fn render_to_columns_json(
906        &mut self,
907        style: RowPathStyle,
908    ) -> Result<String, Box<dyn Error>> {
909        let batch = self.freeze().clone();
910        let schema = batch.schema();
911        let mut map = serde_json::Map::new();
912
913        if style == RowPathStyle::Sidecar
914            && let Some(ref rp) = self.row_path
915        {
916            map.insert("__ROW_PATH__".to_string(), serde_json::to_value(rp)?);
917        }
918
919        for (col_idx, field) in schema.fields().iter().enumerate() {
920            if style == RowPathStyle::Sidecar && field.name().starts_with("__ROW_PATH_") {
921                continue;
922            }
923
924            let col = batch.column(col_idx);
925            let num_rows = col.len();
926            let values: serde_json::Value = match field.data_type() {
927                DataType::Boolean => {
928                    let arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
929                    serde_json::to_value(
930                        (0..num_rows)
931                            .map(|i| {
932                                if arr.is_null(i) {
933                                    None
934                                } else {
935                                    Some(arr.value(i))
936                                }
937                            })
938                            .collect::<Vec<_>>(),
939                    )?
940                },
941                DataType::Utf8 => {
942                    let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
943                    serde_json::to_value(
944                        (0..num_rows)
945                            .map(|i| {
946                                if arr.is_null(i) {
947                                    None
948                                } else {
949                                    Some(arr.value(i))
950                                }
951                            })
952                            .collect::<Vec<_>>(),
953                    )?
954                },
955                DataType::Dictionary(..) => {
956                    let dict = col.as_dictionary::<Int32Type>();
957                    let values = dict.downcast_dict::<StringArray>().unwrap();
958                    serde_json::to_value(
959                        (0..num_rows)
960                            .map(|i| {
961                                if col.is_null(i) {
962                                    None
963                                } else {
964                                    Some(values.value(i))
965                                }
966                            })
967                            .collect::<Vec<_>>(),
968                    )?
969                },
970                DataType::Float64 => {
971                    let arr = col.as_any().downcast_ref::<Float64Array>().unwrap();
972                    serde_json::to_value(
973                        (0..num_rows)
974                            .map(|i| {
975                                if arr.is_null(i) {
976                                    None
977                                } else {
978                                    Some(arr.value(i))
979                                }
980                            })
981                            .collect::<Vec<_>>(),
982                    )?
983                },
984                DataType::Int32 => {
985                    let arr = col.as_any().downcast_ref::<Int32Array>().unwrap();
986                    serde_json::to_value(
987                        (0..num_rows)
988                            .map(|i| {
989                                if arr.is_null(i) {
990                                    None
991                                } else {
992                                    Some(arr.value(i))
993                                }
994                            })
995                            .collect::<Vec<_>>(),
996                    )?
997                },
998                DataType::Int64 => {
999                    let arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
1000                    serde_json::to_value(
1001                        (0..num_rows)
1002                            .map(|i| {
1003                                if arr.is_null(i) {
1004                                    None
1005                                } else {
1006                                    Some(arr.value(i) as f64)
1007                                }
1008                            })
1009                            .collect::<Vec<_>>(),
1010                    )?
1011                },
1012                DataType::Timestamp(TimeUnit::Millisecond, _) => {
1013                    let arr = col
1014                        .as_any()
1015                        .downcast_ref::<TimestampMillisecondArray>()
1016                        .unwrap();
1017                    serde_json::to_value(
1018                        (0..num_rows)
1019                            .map(|i| {
1020                                if arr.is_null(i) {
1021                                    None
1022                                } else {
1023                                    Some(arr.value(i))
1024                                }
1025                            })
1026                            .collect::<Vec<_>>(),
1027                    )?
1028                },
1029                DataType::Time64(TimeUnit::Microsecond) => {
1030                    let arr = col
1031                        .as_any()
1032                        .downcast_ref::<Time64MicrosecondArray>()
1033                        .unwrap();
1034                    serde_json::to_value(
1035                        (0..num_rows)
1036                            .map(|i| {
1037                                if arr.is_null(i) {
1038                                    None
1039                                } else {
1040                                    Some(arr.value(i) as f64)
1041                                }
1042                            })
1043                            .collect::<Vec<_>>(),
1044                    )?
1045                },
1046                DataType::Date32 => {
1047                    let arr = col.as_any().downcast_ref::<Date32Array>().unwrap();
1048                    serde_json::to_value(
1049                        (0..num_rows)
1050                            .map(|i| {
1051                                if arr.is_null(i) {
1052                                    None
1053                                } else {
1054                                    Some(arr.value(i) as i64 * 86_400_000)
1055                                }
1056                            })
1057                            .collect::<Vec<_>>(),
1058                    )?
1059                },
1060                x => {
1061                    tracing::error!("Unknown Arrow IPC type {}", x);
1062                    continue;
1063                },
1064            };
1065            map.insert(field.name().clone(), values);
1066        }
1067
1068        Ok(serde_json::to_string(&map)?)
1069    }
1070
1071    /// Sets a value in a column at the specified row index.
1072    ///
1073    /// If `group_by_index` is `Some`, the value is added to the `__ROW_PATH__`
1074    /// column as part of the row's group-by path. Otherwise, the value is
1075    /// inserted into the named column.
1076    ///
1077    /// Creates the column if it does not already exist.
1078    pub fn set_col<T: SetVirtualDataColumn>(
1079        &mut self,
1080        name: &str,
1081        grouping_id: Option<usize>,
1082        index: usize,
1083        value: T,
1084    ) -> Result<(), Box<dyn Error>> {
1085        if name == "__GROUPING_ID__" {
1086            return Ok(());
1087        }
1088
1089        if name.starts_with("__ROW_PATH_") {
1090            let group_by_index: u32 = name[11..name.len() - 2].parse()?;
1091            let max_grouping_id =
1092                2_i32.pow((self.config.group_by.len() as u32) - group_by_index) - 1;
1093
1094            if grouping_id.map(|x| x as i32).unwrap_or(i32::MAX) < max_grouping_id {
1095                let col = self.row_path.get_or_insert_with(Vec::new);
1096                if let Some(row) = col.get_mut(index) {
1097                    let scalar = value.to_scalar();
1098                    row.push(scalar);
1099                } else {
1100                    while col.len() < index {
1101                        col.push(vec![])
1102                    }
1103
1104                    let scalar = value.to_scalar();
1105                    col.push(vec![scalar]);
1106                }
1107            }
1108
1109            Ok(())
1110        } else {
1111            let col_name = if !self.config.split_by.is_empty() && !name.starts_with("__") {
1112                name.replace('_', "|")
1113            } else {
1114                name.to_owned()
1115            };
1116
1117            if !self.builders.contains_key(&col_name) {
1118                self.builders.insert(col_name.clone(), T::new_builder());
1119            }
1120
1121            let col = self
1122                .builders
1123                .get_mut(&col_name)
1124                .ok_or_else(|| format!("Column '{}' not found after insertion", col_name))?;
1125
1126            Ok(value.write_to(col)?)
1127        }
1128    }
1129}