1use arrow_array::{
2 Array, ArrayRef, BooleanArray, Date32Array, Date64Array, FixedSizeListArray, Float32Array,
3 Float64Array, Int32Array, Int64Array, ListArray, RecordBatch, StringArray, StructArray,
4 UInt32Array, UInt64Array,
5};
6use arrow_schema::DataType;
7
8pub const JS_MAX_SAFE_INTEGER_I64: i64 = 9_007_199_254_740_991;
9pub const JS_MAX_SAFE_INTEGER_U64: u64 = 9_007_199_254_740_991;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12enum JsonIntegerMode {
13 JavaScript,
14 Native,
15}
16
17pub fn is_js_safe_integer_i64(value: i64) -> bool {
18 (-JS_MAX_SAFE_INTEGER_I64..=JS_MAX_SAFE_INTEGER_I64).contains(&value)
19}
20
21pub fn record_batches_to_json_rows(results: &[RecordBatch]) -> Vec<serde_json::Value> {
23 record_batches_to_json_rows_with_mode(results, JsonIntegerMode::JavaScript)
24}
25
26pub fn record_batches_to_rust_json_rows(results: &[RecordBatch]) -> Vec<serde_json::Value> {
28 record_batches_to_json_rows_with_mode(results, JsonIntegerMode::Native)
29}
30
31fn record_batches_to_json_rows_with_mode(
32 results: &[RecordBatch],
33 integer_mode: JsonIntegerMode,
34) -> Vec<serde_json::Value> {
35 let total_rows = results.iter().map(RecordBatch::num_rows).sum();
36 let mut out = Vec::with_capacity(total_rows);
37 for batch in results {
38 let schema = batch.schema();
39 for row in 0..batch.num_rows() {
40 let mut map = serde_json::Map::new();
41 for (col_idx, field) in schema.fields().iter().enumerate() {
42 let col_arr = batch.column(col_idx);
43 map.insert(
44 field.name().clone(),
45 array_value_to_json_with_mode(col_arr, row, integer_mode),
46 );
47 }
48 out.push(serde_json::Value::Object(map));
49 }
50 }
51 out
52}
53
54pub fn array_value_to_json(array: &ArrayRef, row: usize) -> serde_json::Value {
56 array_value_to_json_with_mode(array, row, JsonIntegerMode::JavaScript)
57}
58
59fn array_value_to_json_with_mode(
60 array: &ArrayRef,
61 row: usize,
62 integer_mode: JsonIntegerMode,
63) -> serde_json::Value {
64 if array.is_null(row) {
65 return serde_json::Value::Null;
66 }
67
68 match array.data_type() {
69 DataType::Utf8 => array
70 .as_any()
71 .downcast_ref::<StringArray>()
72 .map(|a| serde_json::Value::String(a.value(row).to_string()))
73 .unwrap_or(serde_json::Value::Null),
74 DataType::Boolean => array
75 .as_any()
76 .downcast_ref::<BooleanArray>()
77 .map(|a| serde_json::Value::Bool(a.value(row)))
78 .unwrap_or(serde_json::Value::Null),
79 DataType::Int32 => array
80 .as_any()
81 .downcast_ref::<Int32Array>()
82 .map(|a| serde_json::Value::Number((a.value(row) as i64).into()))
83 .unwrap_or(serde_json::Value::Null),
84 DataType::Int64 => array
85 .as_any()
86 .downcast_ref::<Int64Array>()
87 .map(|a| {
88 let value = a.value(row);
89 match integer_mode {
90 JsonIntegerMode::JavaScript if !is_js_safe_integer_i64(value) => {
91 serde_json::Value::String(value.to_string())
92 }
93 JsonIntegerMode::JavaScript | JsonIntegerMode::Native => {
94 serde_json::Value::Number(value.into())
95 }
96 }
97 })
98 .unwrap_or(serde_json::Value::Null),
99 DataType::UInt32 => array
100 .as_any()
101 .downcast_ref::<UInt32Array>()
102 .map(|a| serde_json::Value::Number((a.value(row) as u64).into()))
103 .unwrap_or(serde_json::Value::Null),
104 DataType::UInt64 => array
105 .as_any()
106 .downcast_ref::<UInt64Array>()
107 .map(|a| {
108 let value = a.value(row);
109 match integer_mode {
110 JsonIntegerMode::JavaScript if value > JS_MAX_SAFE_INTEGER_U64 => {
111 serde_json::Value::String(value.to_string())
112 }
113 JsonIntegerMode::JavaScript | JsonIntegerMode::Native => {
114 serde_json::Value::Number(value.into())
115 }
116 }
117 })
118 .unwrap_or(serde_json::Value::Null),
119 DataType::Float32 => array
120 .as_any()
121 .downcast_ref::<Float32Array>()
122 .map(|a| json_float_value(a.value(row) as f64))
123 .unwrap_or(serde_json::Value::Null),
124 DataType::Float64 => array
125 .as_any()
126 .downcast_ref::<Float64Array>()
127 .map(|a| json_float_value(a.value(row)))
128 .unwrap_or(serde_json::Value::Null),
129 DataType::Date32 => array
130 .as_any()
131 .downcast_ref::<Date32Array>()
132 .map(|a| {
133 let days = a.value(row);
134 arrow_array::temporal_conversions::date32_to_datetime(days)
135 .map(|dt| serde_json::Value::String(dt.format("%Y-%m-%d").to_string()))
136 .unwrap_or_else(|| serde_json::Value::Number((days as i64).into()))
137 })
138 .unwrap_or(serde_json::Value::Null),
139 DataType::Date64 => array
140 .as_any()
141 .downcast_ref::<Date64Array>()
142 .map(|a| {
143 let ms = a.value(row);
144 arrow_array::temporal_conversions::date64_to_datetime(ms)
145 .map(|dt| {
146 serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string())
147 })
148 .unwrap_or_else(|| serde_json::Value::Number(ms.into()))
149 })
150 .unwrap_or(serde_json::Value::Null),
151 DataType::List(_) => array
152 .as_any()
153 .downcast_ref::<ListArray>()
154 .map(|a| {
155 let values = a.value(row);
156 serde_json::Value::Array(
157 (0..values.len())
158 .map(|idx| array_value_to_json_with_mode(&values, idx, integer_mode))
159 .collect(),
160 )
161 })
162 .unwrap_or(serde_json::Value::Null),
163 DataType::FixedSizeList(_, _) => array
164 .as_any()
165 .downcast_ref::<FixedSizeListArray>()
166 .map(|a| fixed_size_list_value_to_json(a, row, integer_mode))
167 .unwrap_or(serde_json::Value::Null),
168 DataType::Struct(_) => array
169 .as_any()
170 .downcast_ref::<StructArray>()
171 .map(|struct_arr| {
172 let mut obj = serde_json::Map::new();
173 for (i, field) in struct_arr.fields().iter().enumerate() {
174 let col = struct_arr.column(i);
175 obj.insert(
176 field.name().clone(),
177 array_value_to_json_with_mode(col, row, integer_mode),
178 );
179 }
180 serde_json::Value::Object(obj)
181 })
182 .unwrap_or(serde_json::Value::Null),
183 _ => {
184 let display =
185 arrow_cast::display::array_value_to_string(array, row).unwrap_or_default();
186 serde_json::Value::String(display)
187 }
188 }
189}
190
191fn json_float_value(value: f64) -> serde_json::Value {
192 if value.is_nan() {
193 return serde_json::Value::String("NaN".to_string());
194 }
195 if value == f64::INFINITY {
196 return serde_json::Value::String("Infinity".to_string());
197 }
198 if value == f64::NEG_INFINITY {
199 return serde_json::Value::String("-Infinity".to_string());
200 }
201
202 serde_json::Number::from_f64(value)
203 .map(serde_json::Value::Number)
204 .unwrap_or(serde_json::Value::Null)
205}
206
207fn fixed_size_list_value_to_json(
208 array: &FixedSizeListArray,
209 row: usize,
210 integer_mode: JsonIntegerMode,
211) -> serde_json::Value {
212 let value_len = array.value_length() as usize;
213 let values = array.values();
214 if let Some(float_values) = values.as_any().downcast_ref::<Float32Array>() {
215 let start = row.saturating_mul(value_len);
216 return float32_json_array(float_values, start, value_len);
217 }
218
219 let values = array.value(row);
220 serde_json::Value::Array(
221 (0..values.len())
222 .map(|idx| array_value_to_json_with_mode(&values, idx, integer_mode))
223 .collect(),
224 )
225}
226
227fn float32_json_array(values: &Float32Array, start: usize, len: usize) -> serde_json::Value {
228 let mut out = Vec::with_capacity(len);
229 let end = start.saturating_add(len).min(values.len());
230 for idx in start..end {
231 if values.is_null(idx) {
232 out.push(serde_json::Value::Null);
233 continue;
234 }
235 let value = values.value(idx) as f64;
236 out.push(
237 serde_json::Number::from_f64(value)
238 .map(serde_json::Value::Number)
239 .unwrap_or(serde_json::Value::Null),
240 );
241 }
242 serde_json::Value::Array(out)
243}
244
245#[cfg(test)]
246mod tests {
247 use super::{array_value_to_json, record_batches_to_rust_json_rows};
248 use std::sync::Arc;
249
250 use arrow_array::builder::{FixedSizeListBuilder, Float32Builder};
251 use arrow_array::{ArrayRef, Float64Array, Int64Array, RecordBatch, UInt64Array};
252 use arrow_schema::{DataType, Field, Schema};
253
254 #[test]
255 fn int64_outside_js_safe_range_is_stringified() {
256 let values: ArrayRef = Arc::new(Int64Array::from(vec![Some(9_007_199_254_740_992)]));
257 assert_eq!(
258 array_value_to_json(&values, 0),
259 serde_json::Value::String("9007199254740992".to_string())
260 );
261 }
262
263 #[test]
264 fn uint64_outside_js_safe_range_is_stringified() {
265 let values: ArrayRef = Arc::new(UInt64Array::from(vec![Some(9_007_199_254_740_992)]));
266 assert_eq!(
267 array_value_to_json(&values, 0),
268 serde_json::Value::String("9007199254740992".to_string())
269 );
270 }
271
272 #[test]
273 fn uint64_within_js_safe_range_stays_numeric() {
274 let values: ArrayRef = Arc::new(UInt64Array::from(vec![Some(9_007_199_254_740_991)]));
275 assert_eq!(
276 array_value_to_json(&values, 0),
277 serde_json::json!(9_007_199_254_740_991u64)
278 );
279 }
280
281 #[test]
282 fn rust_json_rows_preserve_full_width_integers() {
283 let schema = Arc::new(Schema::new(vec![
284 Field::new("signed", DataType::Int64, false),
285 Field::new("unsigned", DataType::UInt64, false),
286 ]));
287 let batch = RecordBatch::try_new(
288 schema,
289 vec![
290 Arc::new(Int64Array::from(vec![i64::MIN])),
291 Arc::new(UInt64Array::from(vec![u64::MAX])),
292 ],
293 )
294 .expect("batch");
295
296 assert_eq!(
297 record_batches_to_rust_json_rows(&[batch]),
298 vec![serde_json::json!({
299 "signed": i64::MIN,
300 "unsigned": u64::MAX,
301 })]
302 );
303 }
304
305 #[test]
306 fn fixed_size_float32_vectors_serialize_without_recursive_dispatch() {
307 let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), 3);
308 builder.values().append_value(0.25);
309 builder.values().append_value(0.5);
310 builder.values().append_value(0.75);
311 builder.append(true);
312
313 for _ in 0..3 {
314 builder.values().append_null();
315 }
316 builder.append(false);
317
318 builder.values().append_value(1.0);
319 builder.values().append_value(2.0);
320 builder.values().append_value(3.0);
321 builder.append(true);
322
323 let values: ArrayRef = Arc::new(builder.finish());
324 assert_eq!(
325 array_value_to_json(&values, 0),
326 serde_json::json!([0.25, 0.5, 0.75])
327 );
328 assert_eq!(array_value_to_json(&values, 1), serde_json::Value::Null);
329 assert_eq!(
330 array_value_to_json(&values, 2),
331 serde_json::json!([1.0, 2.0, 3.0])
332 );
333 }
334
335 #[test]
336 fn non_finite_floats_are_stringified() {
337 let values: ArrayRef = Arc::new(Float64Array::from(vec![
338 Some(f64::NAN),
339 Some(f64::INFINITY),
340 Some(f64::NEG_INFINITY),
341 ]));
342 assert_eq!(array_value_to_json(&values, 0), serde_json::json!("NaN"));
343 assert_eq!(
344 array_value_to_json(&values, 1),
345 serde_json::json!("Infinity")
346 );
347 assert_eq!(
348 array_value_to_json(&values, 2),
349 serde_json::json!("-Infinity")
350 );
351 }
352}