datafusion_comet_spark_expr/json_funcs/
to_json.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// TODO upstream this to DataFusion as long as we have a way to specify all
19// of the Spark-specific compatibility features that we need (including
20// being able to specify Spark-compatible cast from all types to string)
21
22use crate::SparkCastOptions;
23use crate::{spark_cast, EvalMode};
24use arrow_array::builder::StringBuilder;
25use arrow_array::{Array, ArrayRef, RecordBatch, StringArray, StructArray};
26use arrow_schema::{DataType, Schema};
27use datafusion_common::Result;
28use datafusion_expr::ColumnarValue;
29use datafusion_physical_expr::PhysicalExpr;
30use std::any::Any;
31use std::fmt::{Debug, Display, Formatter};
32use std::hash::Hash;
33use std::sync::Arc;
34
35/// to_json function
36#[derive(Debug, Eq)]
37pub struct ToJson {
38    /// The input to convert to JSON
39    expr: Arc<dyn PhysicalExpr>,
40    /// Timezone to use when converting timestamps to JSON
41    timezone: String,
42}
43
44impl Hash for ToJson {
45    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
46        self.expr.hash(state);
47        self.timezone.hash(state);
48    }
49}
50impl PartialEq for ToJson {
51    fn eq(&self, other: &Self) -> bool {
52        self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone)
53    }
54}
55
56impl ToJson {
57    pub fn new(expr: Arc<dyn PhysicalExpr>, timezone: &str) -> Self {
58        Self {
59            expr,
60            timezone: timezone.to_owned(),
61        }
62    }
63}
64
65impl Display for ToJson {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        write!(f, "to_json({}, timezone={})", self.expr, self.timezone)
68    }
69}
70
71impl PartialEq<dyn Any> for ToJson {
72    fn eq(&self, other: &dyn Any) -> bool {
73        if let Some(other) = other.downcast_ref::<ToJson>() {
74            self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone)
75        } else {
76            false
77        }
78    }
79}
80
81impl PhysicalExpr for ToJson {
82    fn as_any(&self) -> &dyn Any {
83        self
84    }
85
86    fn data_type(&self, _: &Schema) -> Result<DataType> {
87        Ok(DataType::Utf8)
88    }
89
90    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
91        self.expr.nullable(input_schema)
92    }
93
94    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
95        let input = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
96        Ok(ColumnarValue::Array(array_to_json_string(
97            &input,
98            &self.timezone,
99        )?))
100    }
101
102    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
103        vec![&self.expr]
104    }
105
106    fn with_new_children(
107        self: Arc<Self>,
108        children: Vec<Arc<dyn PhysicalExpr>>,
109    ) -> Result<Arc<dyn PhysicalExpr>> {
110        assert!(children.len() == 1);
111        Ok(Arc::new(Self::new(
112            Arc::clone(&children[0]),
113            &self.timezone,
114        )))
115    }
116}
117
118/// Convert an array into a JSON value string representation
119fn array_to_json_string(arr: &Arc<dyn Array>, timezone: &str) -> Result<ArrayRef> {
120    if let Some(struct_array) = arr.as_any().downcast_ref::<StructArray>() {
121        struct_to_json(struct_array, timezone)
122    } else {
123        spark_cast(
124            ColumnarValue::Array(Arc::clone(arr)),
125            &DataType::Utf8,
126            &SparkCastOptions::new(EvalMode::Legacy, timezone, false),
127        )?
128        .into_array(arr.len())
129    }
130}
131
132fn escape_string(input: &str) -> String {
133    let mut escaped_string = String::with_capacity(input.len());
134    let mut is_escaped = false;
135    for c in input.chars() {
136        match c {
137            '\"' | '\\' if !is_escaped => {
138                escaped_string.push('\\');
139                escaped_string.push(c);
140                is_escaped = false;
141            }
142            '\t' => {
143                escaped_string.push('\\');
144                escaped_string.push('t');
145                is_escaped = false;
146            }
147            '\r' => {
148                escaped_string.push('\\');
149                escaped_string.push('r');
150                is_escaped = false;
151            }
152            '\n' => {
153                escaped_string.push('\\');
154                escaped_string.push('n');
155                is_escaped = false;
156            }
157            '\x0C' => {
158                escaped_string.push('\\');
159                escaped_string.push('f');
160                is_escaped = false;
161            }
162            '\x08' => {
163                escaped_string.push('\\');
164                escaped_string.push('b');
165                is_escaped = false;
166            }
167            '\\' => {
168                escaped_string.push('\\');
169                is_escaped = true;
170            }
171            _ => {
172                escaped_string.push(c);
173                is_escaped = false;
174            }
175        }
176    }
177    escaped_string
178}
179
180fn struct_to_json(array: &StructArray, timezone: &str) -> Result<ArrayRef> {
181    // get field names and escape any quotes
182    let field_names: Vec<String> = array
183        .fields()
184        .iter()
185        .map(|f| escape_string(f.name().as_str()))
186        .collect();
187    // determine which fields need to have their values quoted
188    let is_string: Vec<bool> = array
189        .fields()
190        .iter()
191        .map(|f| match f.data_type() {
192            DataType::Utf8 | DataType::LargeUtf8 => true,
193            DataType::Dictionary(_, dt) => {
194                matches!(dt.as_ref(), DataType::Utf8 | DataType::LargeUtf8)
195            }
196            _ => false,
197        })
198        .collect();
199    // create JSON string representation of each column
200    let string_arrays: Vec<ArrayRef> = array
201        .columns()
202        .iter()
203        .map(|arr| array_to_json_string(arr, timezone))
204        .collect::<Result<Vec<_>>>()?;
205    let string_arrays: Vec<&StringArray> = string_arrays
206        .iter()
207        .map(|arr| {
208            arr.as_any()
209                .downcast_ref::<StringArray>()
210                .expect("string array")
211        })
212        .collect();
213    // build the JSON string containing entries in the format `"field_name":field_value`
214    let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16);
215    let mut json = String::with_capacity(array.len() * 16);
216    for row_index in 0..array.len() {
217        if array.is_null(row_index) {
218            builder.append_null();
219        } else {
220            json.clear();
221            let mut any_fields_written = false;
222            json.push('{');
223            for col_index in 0..string_arrays.len() {
224                if !string_arrays[col_index].is_null(row_index) {
225                    if any_fields_written {
226                        json.push(',');
227                    }
228                    // quoted field name
229                    json.push('"');
230                    json.push_str(&field_names[col_index]);
231                    json.push_str("\":");
232                    // value
233                    let string_value = string_arrays[col_index].value(row_index);
234                    if is_string[col_index] {
235                        json.push('"');
236                        json.push_str(&escape_string(string_value));
237                        json.push('"');
238                    } else {
239                        json.push_str(string_value);
240                    }
241                    any_fields_written = true;
242                }
243            }
244            json.push('}');
245            builder.append_value(&json);
246        }
247    }
248    Ok(Arc::new(builder.finish()))
249}
250
251#[cfg(test)]
252mod test {
253    use crate::json_funcs::to_json::struct_to_json;
254    use arrow_array::types::Int32Type;
255    use arrow_array::{Array, PrimitiveArray, StringArray};
256    use arrow_array::{ArrayRef, BooleanArray, Int32Array, StructArray};
257    use arrow_schema::{DataType, Field};
258    use datafusion_common::Result;
259    use std::sync::Arc;
260
261    #[test]
262    fn test_primitives() -> Result<()> {
263        let bools: ArrayRef = create_bools();
264        let ints: ArrayRef = create_ints();
265        let strings: ArrayRef = create_strings();
266        let struct_array = StructArray::from(vec![
267            (Arc::new(Field::new("a", DataType::Boolean, true)), bools),
268            (Arc::new(Field::new("b", DataType::Int32, true)), ints),
269            (Arc::new(Field::new("c", DataType::Utf8, true)), strings),
270        ]);
271        let json = struct_to_json(&struct_array, "UTC")?;
272        let json = json
273            .as_any()
274            .downcast_ref::<StringArray>()
275            .expect("string array");
276        assert_eq!(4, json.len());
277        assert_eq!(r#"{"b":123}"#, json.value(0));
278        assert_eq!(r#"{"a":true,"c":"foo"}"#, json.value(1));
279        assert_eq!(r#"{"a":false,"b":2147483647,"c":"bar"}"#, json.value(2));
280        assert_eq!(r#"{"a":false,"b":-2147483648,"c":""}"#, json.value(3));
281        Ok(())
282    }
283
284    #[test]
285    fn test_nested_struct() -> Result<()> {
286        let bools: ArrayRef = create_bools();
287        let ints: ArrayRef = create_ints();
288
289        // create first struct array
290        let struct_fields = vec![
291            Arc::new(Field::new("a", DataType::Boolean, true)),
292            Arc::new(Field::new("b", DataType::Int32, true)),
293        ];
294        let struct_values = vec![bools, ints];
295        let struct_array = StructArray::from(
296            struct_fields
297                .clone()
298                .into_iter()
299                .zip(struct_values)
300                .collect::<Vec<_>>(),
301        );
302
303        // create second struct array containing the first struct array
304        let struct_fields2 = vec![Arc::new(Field::new(
305            "a",
306            DataType::Struct(struct_fields.into()),
307            true,
308        ))];
309        let struct_values2: Vec<ArrayRef> = vec![Arc::new(struct_array.clone())];
310        let struct_array2 = StructArray::from(
311            struct_fields2
312                .into_iter()
313                .zip(struct_values2)
314                .collect::<Vec<_>>(),
315        );
316
317        let json = struct_to_json(&struct_array2, "UTC")?;
318        let json = json
319            .as_any()
320            .downcast_ref::<StringArray>()
321            .expect("string array");
322        assert_eq!(4, json.len());
323        assert_eq!(r#"{"a":{"b":123}}"#, json.value(0));
324        assert_eq!(r#"{"a":{"a":true}}"#, json.value(1));
325        assert_eq!(r#"{"a":{"a":false,"b":2147483647}}"#, json.value(2));
326        assert_eq!(r#"{"a":{"a":false,"b":-2147483648}}"#, json.value(3));
327        Ok(())
328    }
329
330    fn create_ints() -> Arc<PrimitiveArray<Int32Type>> {
331        Arc::new(Int32Array::from(vec![
332            Some(123),
333            None,
334            Some(i32::MAX),
335            Some(i32::MIN),
336        ]))
337    }
338
339    fn create_bools() -> Arc<BooleanArray> {
340        Arc::new(BooleanArray::from(vec![
341            None,
342            Some(true),
343            Some(false),
344            Some(false),
345        ]))
346    }
347
348    fn create_strings() -> Arc<StringArray> {
349        Arc::new(StringArray::from(vec![
350            None,
351            Some("foo"),
352            Some("bar"),
353            Some(""),
354        ]))
355    }
356}