Skip to main content

omnigraph_compiler/
json_output.rs

1use arrow_array::{
2    Array, ArrayRef, BooleanArray, Date32Array, Date64Array, FixedSizeListArray, Float32Array,
3    Float64Array, Int32Array, Int64Array, ListArray, RecordBatch, StringArray, StructArray,
4    UInt32Array, UInt64Array,
5};
6use arrow_schema::DataType;
7
8pub const JS_MAX_SAFE_INTEGER_I64: i64 = 9_007_199_254_740_991;
9pub const JS_MAX_SAFE_INTEGER_U64: u64 = 9_007_199_254_740_991;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12enum JsonIntegerMode {
13    JavaScript,
14    Native,
15}
16
17pub fn is_js_safe_integer_i64(value: i64) -> bool {
18    (-JS_MAX_SAFE_INTEGER_I64..=JS_MAX_SAFE_INTEGER_I64).contains(&value)
19}
20
21/// Convert Arrow RecordBatches into a Vec of JSON objects (one per row).
22pub fn record_batches_to_json_rows(results: &[RecordBatch]) -> Vec<serde_json::Value> {
23    record_batches_to_json_rows_with_mode(results, JsonIntegerMode::JavaScript)
24}
25
26/// Convert Arrow RecordBatches into JSON rows without JS-safe integer coercion.
27pub fn record_batches_to_rust_json_rows(results: &[RecordBatch]) -> Vec<serde_json::Value> {
28    record_batches_to_json_rows_with_mode(results, JsonIntegerMode::Native)
29}
30
31fn record_batches_to_json_rows_with_mode(
32    results: &[RecordBatch],
33    integer_mode: JsonIntegerMode,
34) -> Vec<serde_json::Value> {
35    let total_rows = results.iter().map(RecordBatch::num_rows).sum();
36    let mut out = Vec::with_capacity(total_rows);
37    for batch in results {
38        let schema = batch.schema();
39        for row in 0..batch.num_rows() {
40            let mut map = serde_json::Map::new();
41            for (col_idx, field) in schema.fields().iter().enumerate() {
42                let col_arr = batch.column(col_idx);
43                map.insert(
44                    field.name().clone(),
45                    array_value_to_json_with_mode(col_arr, row, integer_mode),
46                );
47            }
48            out.push(serde_json::Value::Object(map));
49        }
50    }
51    out
52}
53
54/// Convert a single cell from an Arrow array to a serde_json::Value.
55pub fn array_value_to_json(array: &ArrayRef, row: usize) -> serde_json::Value {
56    array_value_to_json_with_mode(array, row, JsonIntegerMode::JavaScript)
57}
58
59fn array_value_to_json_with_mode(
60    array: &ArrayRef,
61    row: usize,
62    integer_mode: JsonIntegerMode,
63) -> serde_json::Value {
64    if array.is_null(row) {
65        return serde_json::Value::Null;
66    }
67
68    match array.data_type() {
69        DataType::Utf8 => array
70            .as_any()
71            .downcast_ref::<StringArray>()
72            .map(|a| serde_json::Value::String(a.value(row).to_string()))
73            .unwrap_or(serde_json::Value::Null),
74        DataType::Boolean => array
75            .as_any()
76            .downcast_ref::<BooleanArray>()
77            .map(|a| serde_json::Value::Bool(a.value(row)))
78            .unwrap_or(serde_json::Value::Null),
79        DataType::Int32 => array
80            .as_any()
81            .downcast_ref::<Int32Array>()
82            .map(|a| serde_json::Value::Number((a.value(row) as i64).into()))
83            .unwrap_or(serde_json::Value::Null),
84        DataType::Int64 => array
85            .as_any()
86            .downcast_ref::<Int64Array>()
87            .map(|a| {
88                let value = a.value(row);
89                match integer_mode {
90                    JsonIntegerMode::JavaScript if !is_js_safe_integer_i64(value) => {
91                        serde_json::Value::String(value.to_string())
92                    }
93                    JsonIntegerMode::JavaScript | JsonIntegerMode::Native => {
94                        serde_json::Value::Number(value.into())
95                    }
96                }
97            })
98            .unwrap_or(serde_json::Value::Null),
99        DataType::UInt32 => array
100            .as_any()
101            .downcast_ref::<UInt32Array>()
102            .map(|a| serde_json::Value::Number((a.value(row) as u64).into()))
103            .unwrap_or(serde_json::Value::Null),
104        DataType::UInt64 => array
105            .as_any()
106            .downcast_ref::<UInt64Array>()
107            .map(|a| {
108                let value = a.value(row);
109                match integer_mode {
110                    JsonIntegerMode::JavaScript if value > JS_MAX_SAFE_INTEGER_U64 => {
111                        serde_json::Value::String(value.to_string())
112                    }
113                    JsonIntegerMode::JavaScript | JsonIntegerMode::Native => {
114                        serde_json::Value::Number(value.into())
115                    }
116                }
117            })
118            .unwrap_or(serde_json::Value::Null),
119        DataType::Float32 => array
120            .as_any()
121            .downcast_ref::<Float32Array>()
122            .map(|a| json_float_value(a.value(row) as f64))
123            .unwrap_or(serde_json::Value::Null),
124        DataType::Float64 => array
125            .as_any()
126            .downcast_ref::<Float64Array>()
127            .map(|a| json_float_value(a.value(row)))
128            .unwrap_or(serde_json::Value::Null),
129        DataType::Date32 => array
130            .as_any()
131            .downcast_ref::<Date32Array>()
132            .map(|a| {
133                let days = a.value(row);
134                arrow_array::temporal_conversions::date32_to_datetime(days)
135                    .map(|dt| serde_json::Value::String(dt.format("%Y-%m-%d").to_string()))
136                    .unwrap_or_else(|| serde_json::Value::Number((days as i64).into()))
137            })
138            .unwrap_or(serde_json::Value::Null),
139        DataType::Date64 => array
140            .as_any()
141            .downcast_ref::<Date64Array>()
142            .map(|a| {
143                let ms = a.value(row);
144                arrow_array::temporal_conversions::date64_to_datetime(ms)
145                    .map(|dt| {
146                        serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string())
147                    })
148                    .unwrap_or_else(|| serde_json::Value::Number(ms.into()))
149            })
150            .unwrap_or(serde_json::Value::Null),
151        DataType::List(_) => array
152            .as_any()
153            .downcast_ref::<ListArray>()
154            .map(|a| {
155                let values = a.value(row);
156                serde_json::Value::Array(
157                    (0..values.len())
158                        .map(|idx| array_value_to_json_with_mode(&values, idx, integer_mode))
159                        .collect(),
160                )
161            })
162            .unwrap_or(serde_json::Value::Null),
163        DataType::FixedSizeList(_, _) => array
164            .as_any()
165            .downcast_ref::<FixedSizeListArray>()
166            .map(|a| fixed_size_list_value_to_json(a, row, integer_mode))
167            .unwrap_or(serde_json::Value::Null),
168        DataType::Struct(_) => array
169            .as_any()
170            .downcast_ref::<StructArray>()
171            .map(|struct_arr| {
172                let mut obj = serde_json::Map::new();
173                for (i, field) in struct_arr.fields().iter().enumerate() {
174                    let col = struct_arr.column(i);
175                    obj.insert(
176                        field.name().clone(),
177                        array_value_to_json_with_mode(col, row, integer_mode),
178                    );
179                }
180                serde_json::Value::Object(obj)
181            })
182            .unwrap_or(serde_json::Value::Null),
183        _ => {
184            let display =
185                arrow_cast::display::array_value_to_string(array, row).unwrap_or_default();
186            serde_json::Value::String(display)
187        }
188    }
189}
190
191fn json_float_value(value: f64) -> serde_json::Value {
192    if value.is_nan() {
193        return serde_json::Value::String("NaN".to_string());
194    }
195    if value == f64::INFINITY {
196        return serde_json::Value::String("Infinity".to_string());
197    }
198    if value == f64::NEG_INFINITY {
199        return serde_json::Value::String("-Infinity".to_string());
200    }
201
202    serde_json::Number::from_f64(value)
203        .map(serde_json::Value::Number)
204        .unwrap_or(serde_json::Value::Null)
205}
206
207fn fixed_size_list_value_to_json(
208    array: &FixedSizeListArray,
209    row: usize,
210    integer_mode: JsonIntegerMode,
211) -> serde_json::Value {
212    let value_len = array.value_length() as usize;
213    let values = array.values();
214    if let Some(float_values) = values.as_any().downcast_ref::<Float32Array>() {
215        let start = row.saturating_mul(value_len);
216        return float32_json_array(float_values, start, value_len);
217    }
218
219    let values = array.value(row);
220    serde_json::Value::Array(
221        (0..values.len())
222            .map(|idx| array_value_to_json_with_mode(&values, idx, integer_mode))
223            .collect(),
224    )
225}
226
227fn float32_json_array(values: &Float32Array, start: usize, len: usize) -> serde_json::Value {
228    let mut out = Vec::with_capacity(len);
229    let end = start.saturating_add(len).min(values.len());
230    for idx in start..end {
231        if values.is_null(idx) {
232            out.push(serde_json::Value::Null);
233            continue;
234        }
235        let value = values.value(idx) as f64;
236        out.push(
237            serde_json::Number::from_f64(value)
238                .map(serde_json::Value::Number)
239                .unwrap_or(serde_json::Value::Null),
240        );
241    }
242    serde_json::Value::Array(out)
243}
244
245#[cfg(test)]
246mod tests {
247    use super::{array_value_to_json, record_batches_to_rust_json_rows};
248    use std::sync::Arc;
249
250    use arrow_array::builder::{FixedSizeListBuilder, Float32Builder};
251    use arrow_array::{ArrayRef, Float64Array, Int64Array, RecordBatch, UInt64Array};
252    use arrow_schema::{DataType, Field, Schema};
253
254    #[test]
255    fn int64_outside_js_safe_range_is_stringified() {
256        let values: ArrayRef = Arc::new(Int64Array::from(vec![Some(9_007_199_254_740_992)]));
257        assert_eq!(
258            array_value_to_json(&values, 0),
259            serde_json::Value::String("9007199254740992".to_string())
260        );
261    }
262
263    #[test]
264    fn uint64_outside_js_safe_range_is_stringified() {
265        let values: ArrayRef = Arc::new(UInt64Array::from(vec![Some(9_007_199_254_740_992)]));
266        assert_eq!(
267            array_value_to_json(&values, 0),
268            serde_json::Value::String("9007199254740992".to_string())
269        );
270    }
271
272    #[test]
273    fn uint64_within_js_safe_range_stays_numeric() {
274        let values: ArrayRef = Arc::new(UInt64Array::from(vec![Some(9_007_199_254_740_991)]));
275        assert_eq!(
276            array_value_to_json(&values, 0),
277            serde_json::json!(9_007_199_254_740_991u64)
278        );
279    }
280
281    #[test]
282    fn rust_json_rows_preserve_full_width_integers() {
283        let schema = Arc::new(Schema::new(vec![
284            Field::new("signed", DataType::Int64, false),
285            Field::new("unsigned", DataType::UInt64, false),
286        ]));
287        let batch = RecordBatch::try_new(
288            schema,
289            vec![
290                Arc::new(Int64Array::from(vec![i64::MIN])),
291                Arc::new(UInt64Array::from(vec![u64::MAX])),
292            ],
293        )
294        .expect("batch");
295
296        assert_eq!(
297            record_batches_to_rust_json_rows(&[batch]),
298            vec![serde_json::json!({
299                "signed": i64::MIN,
300                "unsigned": u64::MAX,
301            })]
302        );
303    }
304
305    #[test]
306    fn fixed_size_float32_vectors_serialize_without_recursive_dispatch() {
307        let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), 3);
308        builder.values().append_value(0.25);
309        builder.values().append_value(0.5);
310        builder.values().append_value(0.75);
311        builder.append(true);
312
313        for _ in 0..3 {
314            builder.values().append_null();
315        }
316        builder.append(false);
317
318        builder.values().append_value(1.0);
319        builder.values().append_value(2.0);
320        builder.values().append_value(3.0);
321        builder.append(true);
322
323        let values: ArrayRef = Arc::new(builder.finish());
324        assert_eq!(
325            array_value_to_json(&values, 0),
326            serde_json::json!([0.25, 0.5, 0.75])
327        );
328        assert_eq!(array_value_to_json(&values, 1), serde_json::Value::Null);
329        assert_eq!(
330            array_value_to_json(&values, 2),
331            serde_json::json!([1.0, 2.0, 3.0])
332        );
333    }
334
335    #[test]
336    fn non_finite_floats_are_stringified() {
337        let values: ArrayRef = Arc::new(Float64Array::from(vec![
338            Some(f64::NAN),
339            Some(f64::INFINITY),
340            Some(f64::NEG_INFINITY),
341        ]));
342        assert_eq!(array_value_to_json(&values, 0), serde_json::json!("NaN"));
343        assert_eq!(
344            array_value_to_json(&values, 1),
345            serde_json::json!("Infinity")
346        );
347        assert_eq!(
348            array_value_to_json(&values, 2),
349            serde_json::json!("-Infinity")
350        );
351    }
352}