datafusion_comet_spark_expr/json_funcs/
to_json.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// TODO upstream this to DataFusion as long as we have a way to specify all
19// of the Spark-specific compatibility features that we need (including
20// being able to specify Spark-compatible cast from all types to string)
21
22use crate::SparkCastOptions;
23use crate::{spark_cast, EvalMode};
24use arrow::array::builder::StringBuilder;
25use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, StructArray};
26use arrow::datatypes::{DataType, Schema};
27use datafusion::common::Result;
28use datafusion::physical_expr::PhysicalExpr;
29use datafusion::physical_plan::ColumnarValue;
30use std::any::Any;
31use std::fmt::{Debug, Display, Formatter};
32use std::hash::Hash;
33use std::sync::Arc;
34
35/// to_json function
36#[derive(Debug, Eq)]
37pub struct ToJson {
38    /// The input to convert to JSON
39    expr: Arc<dyn PhysicalExpr>,
40    /// Timezone to use when converting timestamps to JSON
41    timezone: String,
42}
43
44impl Hash for ToJson {
45    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
46        self.expr.hash(state);
47        self.timezone.hash(state);
48    }
49}
50impl PartialEq for ToJson {
51    fn eq(&self, other: &Self) -> bool {
52        self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone)
53    }
54}
55
56impl ToJson {
57    pub fn new(expr: Arc<dyn PhysicalExpr>, timezone: &str) -> Self {
58        Self {
59            expr,
60            timezone: timezone.to_owned(),
61        }
62    }
63}
64
65impl Display for ToJson {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        write!(f, "to_json({}, timezone={})", self.expr, self.timezone)
68    }
69}
70
71impl PartialEq<dyn Any> for ToJson {
72    fn eq(&self, other: &dyn Any) -> bool {
73        if let Some(other) = other.downcast_ref::<ToJson>() {
74            self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone)
75        } else {
76            false
77        }
78    }
79}
80
81impl PhysicalExpr for ToJson {
82    fn as_any(&self) -> &dyn Any {
83        self
84    }
85
86    fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
87        unimplemented!()
88    }
89
90    fn data_type(&self, _: &Schema) -> Result<DataType> {
91        Ok(DataType::Utf8)
92    }
93
94    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
95        self.expr.nullable(input_schema)
96    }
97
98    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
99        let input = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
100        Ok(ColumnarValue::Array(array_to_json_string(
101            &input,
102            &self.timezone,
103        )?))
104    }
105
106    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
107        vec![&self.expr]
108    }
109
110    fn with_new_children(
111        self: Arc<Self>,
112        children: Vec<Arc<dyn PhysicalExpr>>,
113    ) -> Result<Arc<dyn PhysicalExpr>> {
114        assert!(children.len() == 1);
115        Ok(Arc::new(Self::new(
116            Arc::clone(&children[0]),
117            &self.timezone,
118        )))
119    }
120}
121
122/// Convert an array into a JSON value string representation
123fn array_to_json_string(arr: &Arc<dyn Array>, timezone: &str) -> Result<ArrayRef> {
124    if let Some(struct_array) = arr.as_any().downcast_ref::<StructArray>() {
125        struct_to_json(struct_array, timezone)
126    } else {
127        spark_cast(
128            ColumnarValue::Array(Arc::clone(arr)),
129            &DataType::Utf8,
130            &SparkCastOptions::new(EvalMode::Legacy, timezone, false),
131        )?
132        .into_array(arr.len())
133    }
134}
135
136fn escape_string(input: &str) -> String {
137    let mut escaped_string = String::with_capacity(input.len());
138    let mut is_escaped = false;
139    for c in input.chars() {
140        match c {
141            '\"' | '\\' if !is_escaped => {
142                escaped_string.push('\\');
143                escaped_string.push(c);
144                is_escaped = false;
145            }
146            '\t' => {
147                escaped_string.push('\\');
148                escaped_string.push('t');
149                is_escaped = false;
150            }
151            '\r' => {
152                escaped_string.push('\\');
153                escaped_string.push('r');
154                is_escaped = false;
155            }
156            '\n' => {
157                escaped_string.push('\\');
158                escaped_string.push('n');
159                is_escaped = false;
160            }
161            '\x0C' => {
162                escaped_string.push('\\');
163                escaped_string.push('f');
164                is_escaped = false;
165            }
166            '\x08' => {
167                escaped_string.push('\\');
168                escaped_string.push('b');
169                is_escaped = false;
170            }
171            '\\' => {
172                escaped_string.push('\\');
173                is_escaped = true;
174            }
175            _ => {
176                escaped_string.push(c);
177                is_escaped = false;
178            }
179        }
180    }
181    escaped_string
182}
183
184fn struct_to_json(array: &StructArray, timezone: &str) -> Result<ArrayRef> {
185    // get field names and escape any quotes
186    let field_names: Vec<String> = array
187        .fields()
188        .iter()
189        .map(|f| escape_string(f.name().as_str()))
190        .collect();
191    // determine which fields need to have their values quoted
192    let is_string: Vec<bool> = array
193        .fields()
194        .iter()
195        .map(|f| match f.data_type() {
196            DataType::Utf8 | DataType::LargeUtf8 => true,
197            DataType::Dictionary(_, dt) => {
198                matches!(dt.as_ref(), DataType::Utf8 | DataType::LargeUtf8)
199            }
200            _ => false,
201        })
202        .collect();
203    // create JSON string representation of each column
204    let string_arrays: Vec<ArrayRef> = array
205        .columns()
206        .iter()
207        .map(|arr| array_to_json_string(arr, timezone))
208        .collect::<Result<Vec<_>>>()?;
209    let string_arrays: Vec<&StringArray> = string_arrays
210        .iter()
211        .map(|arr| {
212            arr.as_any()
213                .downcast_ref::<StringArray>()
214                .expect("string array")
215        })
216        .collect();
217    // build the JSON string containing entries in the format `"field_name":field_value`
218    let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16);
219    let mut json = String::with_capacity(array.len() * 16);
220    for row_index in 0..array.len() {
221        if array.is_null(row_index) {
222            builder.append_null();
223        } else {
224            json.clear();
225            let mut any_fields_written = false;
226            json.push('{');
227            for col_index in 0..string_arrays.len() {
228                if !string_arrays[col_index].is_null(row_index) {
229                    if any_fields_written {
230                        json.push(',');
231                    }
232                    // quoted field name
233                    json.push('"');
234                    json.push_str(&field_names[col_index]);
235                    json.push_str("\":");
236                    // value
237                    let string_value = string_arrays[col_index].value(row_index);
238                    if is_string[col_index] {
239                        json.push('"');
240                        json.push_str(&escape_string(string_value));
241                        json.push('"');
242                    } else {
243                        json.push_str(string_value);
244                    }
245                    any_fields_written = true;
246                }
247            }
248            json.push('}');
249            builder.append_value(&json);
250        }
251    }
252    Ok(Arc::new(builder.finish()))
253}
254
255#[cfg(test)]
256mod test {
257    use crate::json_funcs::to_json::struct_to_json;
258    use arrow::array::types::Int32Type;
259    use arrow::array::{Array, PrimitiveArray, StringArray};
260    use arrow::array::{ArrayRef, BooleanArray, Int32Array, StructArray};
261    use arrow::datatypes::{DataType, Field};
262    use datafusion::common::Result;
263    use std::sync::Arc;
264
265    #[test]
266    fn test_primitives() -> Result<()> {
267        let bools: ArrayRef = create_bools();
268        let ints: ArrayRef = create_ints();
269        let strings: ArrayRef = create_strings();
270        let struct_array = StructArray::from(vec![
271            (Arc::new(Field::new("a", DataType::Boolean, true)), bools),
272            (Arc::new(Field::new("b", DataType::Int32, true)), ints),
273            (Arc::new(Field::new("c", DataType::Utf8, true)), strings),
274        ]);
275        let json = struct_to_json(&struct_array, "UTC")?;
276        let json = json
277            .as_any()
278            .downcast_ref::<StringArray>()
279            .expect("string array");
280        assert_eq!(4, json.len());
281        assert_eq!(r#"{"b":123}"#, json.value(0));
282        assert_eq!(r#"{"a":true,"c":"foo"}"#, json.value(1));
283        assert_eq!(r#"{"a":false,"b":2147483647,"c":"bar"}"#, json.value(2));
284        assert_eq!(r#"{"a":false,"b":-2147483648,"c":""}"#, json.value(3));
285        Ok(())
286    }
287
288    #[test]
289    fn test_nested_struct() -> Result<()> {
290        let bools: ArrayRef = create_bools();
291        let ints: ArrayRef = create_ints();
292
293        // create first struct array
294        let struct_fields = vec![
295            Arc::new(Field::new("a", DataType::Boolean, true)),
296            Arc::new(Field::new("b", DataType::Int32, true)),
297        ];
298        let struct_values = vec![bools, ints];
299        let struct_array = StructArray::from(
300            struct_fields
301                .clone()
302                .into_iter()
303                .zip(struct_values)
304                .collect::<Vec<_>>(),
305        );
306
307        // create second struct array containing the first struct array
308        let struct_fields2 = vec![Arc::new(Field::new(
309            "a",
310            DataType::Struct(struct_fields.into()),
311            true,
312        ))];
313        let struct_values2: Vec<ArrayRef> = vec![Arc::new(struct_array.clone())];
314        let struct_array2 = StructArray::from(
315            struct_fields2
316                .into_iter()
317                .zip(struct_values2)
318                .collect::<Vec<_>>(),
319        );
320
321        let json = struct_to_json(&struct_array2, "UTC")?;
322        let json = json
323            .as_any()
324            .downcast_ref::<StringArray>()
325            .expect("string array");
326        assert_eq!(4, json.len());
327        assert_eq!(r#"{"a":{"b":123}}"#, json.value(0));
328        assert_eq!(r#"{"a":{"a":true}}"#, json.value(1));
329        assert_eq!(r#"{"a":{"a":false,"b":2147483647}}"#, json.value(2));
330        assert_eq!(r#"{"a":{"a":false,"b":-2147483648}}"#, json.value(3));
331        Ok(())
332    }
333
334    fn create_ints() -> Arc<PrimitiveArray<Int32Type>> {
335        Arc::new(Int32Array::from(vec![
336            Some(123),
337            None,
338            Some(i32::MAX),
339            Some(i32::MIN),
340        ]))
341    }
342
343    fn create_bools() -> Arc<BooleanArray> {
344        Arc::new(BooleanArray::from(vec![
345            None,
346            Some(true),
347            Some(false),
348            Some(false),
349        ]))
350    }
351
352    fn create_strings() -> Arc<StringArray> {
353        Arc::new(StringArray::from(vec![
354            None,
355            Some("foo"),
356            Some("bar"),
357            Some(""),
358        ]))
359    }
360}