datafusion_comet_spark_expr/json_funcs/
to_json.rs1use crate::SparkCastOptions;
23use crate::{spark_cast, EvalMode};
24use arrow::array::builder::StringBuilder;
25use arrow::array::{Array, ArrayRef, RecordBatch, StringArray, StructArray};
26use arrow::datatypes::{DataType, Schema};
27use datafusion::common::Result;
28use datafusion::physical_expr::PhysicalExpr;
29use datafusion::physical_plan::ColumnarValue;
30use std::any::Any;
31use std::fmt::{Debug, Display, Formatter};
32use std::hash::Hash;
33use std::sync::Arc;
34
35#[derive(Debug, Eq)]
37pub struct ToJson {
38 expr: Arc<dyn PhysicalExpr>,
40 timezone: String,
42}
43
44impl Hash for ToJson {
45 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
46 self.expr.hash(state);
47 self.timezone.hash(state);
48 }
49}
50impl PartialEq for ToJson {
51 fn eq(&self, other: &Self) -> bool {
52 self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone)
53 }
54}
55
56impl ToJson {
57 pub fn new(expr: Arc<dyn PhysicalExpr>, timezone: &str) -> Self {
58 Self {
59 expr,
60 timezone: timezone.to_owned(),
61 }
62 }
63}
64
65impl Display for ToJson {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 write!(f, "to_json({}, timezone={})", self.expr, self.timezone)
68 }
69}
70
71impl PartialEq<dyn Any> for ToJson {
72 fn eq(&self, other: &dyn Any) -> bool {
73 if let Some(other) = other.downcast_ref::<ToJson>() {
74 self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone)
75 } else {
76 false
77 }
78 }
79}
80
81impl PhysicalExpr for ToJson {
82 fn as_any(&self) -> &dyn Any {
83 self
84 }
85
86 fn fmt_sql(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
87 unimplemented!()
88 }
89
90 fn data_type(&self, _: &Schema) -> Result<DataType> {
91 Ok(DataType::Utf8)
92 }
93
94 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
95 self.expr.nullable(input_schema)
96 }
97
98 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
99 let input = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
100 Ok(ColumnarValue::Array(array_to_json_string(
101 &input,
102 &self.timezone,
103 )?))
104 }
105
106 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
107 vec![&self.expr]
108 }
109
110 fn with_new_children(
111 self: Arc<Self>,
112 children: Vec<Arc<dyn PhysicalExpr>>,
113 ) -> Result<Arc<dyn PhysicalExpr>> {
114 assert!(children.len() == 1);
115 Ok(Arc::new(Self::new(
116 Arc::clone(&children[0]),
117 &self.timezone,
118 )))
119 }
120}
121
122fn array_to_json_string(arr: &Arc<dyn Array>, timezone: &str) -> Result<ArrayRef> {
124 if let Some(struct_array) = arr.as_any().downcast_ref::<StructArray>() {
125 struct_to_json(struct_array, timezone)
126 } else {
127 spark_cast(
128 ColumnarValue::Array(Arc::clone(arr)),
129 &DataType::Utf8,
130 &SparkCastOptions::new(EvalMode::Legacy, timezone, false),
131 )?
132 .into_array(arr.len())
133 }
134}
135
136fn escape_string(input: &str) -> String {
137 let mut escaped_string = String::with_capacity(input.len());
138 let mut is_escaped = false;
139 for c in input.chars() {
140 match c {
141 '\"' | '\\' if !is_escaped => {
142 escaped_string.push('\\');
143 escaped_string.push(c);
144 is_escaped = false;
145 }
146 '\t' => {
147 escaped_string.push('\\');
148 escaped_string.push('t');
149 is_escaped = false;
150 }
151 '\r' => {
152 escaped_string.push('\\');
153 escaped_string.push('r');
154 is_escaped = false;
155 }
156 '\n' => {
157 escaped_string.push('\\');
158 escaped_string.push('n');
159 is_escaped = false;
160 }
161 '\x0C' => {
162 escaped_string.push('\\');
163 escaped_string.push('f');
164 is_escaped = false;
165 }
166 '\x08' => {
167 escaped_string.push('\\');
168 escaped_string.push('b');
169 is_escaped = false;
170 }
171 '\\' => {
172 escaped_string.push('\\');
173 is_escaped = true;
174 }
175 _ => {
176 escaped_string.push(c);
177 is_escaped = false;
178 }
179 }
180 }
181 escaped_string
182}
183
184fn struct_to_json(array: &StructArray, timezone: &str) -> Result<ArrayRef> {
185 let field_names: Vec<String> = array
187 .fields()
188 .iter()
189 .map(|f| escape_string(f.name().as_str()))
190 .collect();
191 let is_string: Vec<bool> = array
193 .fields()
194 .iter()
195 .map(|f| match f.data_type() {
196 DataType::Utf8 | DataType::LargeUtf8 => true,
197 DataType::Dictionary(_, dt) => {
198 matches!(dt.as_ref(), DataType::Utf8 | DataType::LargeUtf8)
199 }
200 _ => false,
201 })
202 .collect();
203 let string_arrays: Vec<ArrayRef> = array
205 .columns()
206 .iter()
207 .map(|arr| array_to_json_string(arr, timezone))
208 .collect::<Result<Vec<_>>>()?;
209 let string_arrays: Vec<&StringArray> = string_arrays
210 .iter()
211 .map(|arr| {
212 arr.as_any()
213 .downcast_ref::<StringArray>()
214 .expect("string array")
215 })
216 .collect();
217 let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16);
219 let mut json = String::with_capacity(array.len() * 16);
220 for row_index in 0..array.len() {
221 if array.is_null(row_index) {
222 builder.append_null();
223 } else {
224 json.clear();
225 let mut any_fields_written = false;
226 json.push('{');
227 for col_index in 0..string_arrays.len() {
228 if !string_arrays[col_index].is_null(row_index) {
229 if any_fields_written {
230 json.push(',');
231 }
232 json.push('"');
234 json.push_str(&field_names[col_index]);
235 json.push_str("\":");
236 let string_value = string_arrays[col_index].value(row_index);
238 if is_string[col_index] {
239 json.push('"');
240 json.push_str(&escape_string(string_value));
241 json.push('"');
242 } else {
243 json.push_str(string_value);
244 }
245 any_fields_written = true;
246 }
247 }
248 json.push('}');
249 builder.append_value(&json);
250 }
251 }
252 Ok(Arc::new(builder.finish()))
253}
254
255#[cfg(test)]
256mod test {
257 use crate::json_funcs::to_json::struct_to_json;
258 use arrow::array::types::Int32Type;
259 use arrow::array::{Array, PrimitiveArray, StringArray};
260 use arrow::array::{ArrayRef, BooleanArray, Int32Array, StructArray};
261 use arrow::datatypes::{DataType, Field};
262 use datafusion::common::Result;
263 use std::sync::Arc;
264
265 #[test]
266 fn test_primitives() -> Result<()> {
267 let bools: ArrayRef = create_bools();
268 let ints: ArrayRef = create_ints();
269 let strings: ArrayRef = create_strings();
270 let struct_array = StructArray::from(vec![
271 (Arc::new(Field::new("a", DataType::Boolean, true)), bools),
272 (Arc::new(Field::new("b", DataType::Int32, true)), ints),
273 (Arc::new(Field::new("c", DataType::Utf8, true)), strings),
274 ]);
275 let json = struct_to_json(&struct_array, "UTC")?;
276 let json = json
277 .as_any()
278 .downcast_ref::<StringArray>()
279 .expect("string array");
280 assert_eq!(4, json.len());
281 assert_eq!(r#"{"b":123}"#, json.value(0));
282 assert_eq!(r#"{"a":true,"c":"foo"}"#, json.value(1));
283 assert_eq!(r#"{"a":false,"b":2147483647,"c":"bar"}"#, json.value(2));
284 assert_eq!(r#"{"a":false,"b":-2147483648,"c":""}"#, json.value(3));
285 Ok(())
286 }
287
288 #[test]
289 fn test_nested_struct() -> Result<()> {
290 let bools: ArrayRef = create_bools();
291 let ints: ArrayRef = create_ints();
292
293 let struct_fields = vec![
295 Arc::new(Field::new("a", DataType::Boolean, true)),
296 Arc::new(Field::new("b", DataType::Int32, true)),
297 ];
298 let struct_values = vec![bools, ints];
299 let struct_array = StructArray::from(
300 struct_fields
301 .clone()
302 .into_iter()
303 .zip(struct_values)
304 .collect::<Vec<_>>(),
305 );
306
307 let struct_fields2 = vec![Arc::new(Field::new(
309 "a",
310 DataType::Struct(struct_fields.into()),
311 true,
312 ))];
313 let struct_values2: Vec<ArrayRef> = vec![Arc::new(struct_array.clone())];
314 let struct_array2 = StructArray::from(
315 struct_fields2
316 .into_iter()
317 .zip(struct_values2)
318 .collect::<Vec<_>>(),
319 );
320
321 let json = struct_to_json(&struct_array2, "UTC")?;
322 let json = json
323 .as_any()
324 .downcast_ref::<StringArray>()
325 .expect("string array");
326 assert_eq!(4, json.len());
327 assert_eq!(r#"{"a":{"b":123}}"#, json.value(0));
328 assert_eq!(r#"{"a":{"a":true}}"#, json.value(1));
329 assert_eq!(r#"{"a":{"a":false,"b":2147483647}}"#, json.value(2));
330 assert_eq!(r#"{"a":{"a":false,"b":-2147483648}}"#, json.value(3));
331 Ok(())
332 }
333
334 fn create_ints() -> Arc<PrimitiveArray<Int32Type>> {
335 Arc::new(Int32Array::from(vec![
336 Some(123),
337 None,
338 Some(i32::MAX),
339 Some(i32::MIN),
340 ]))
341 }
342
343 fn create_bools() -> Arc<BooleanArray> {
344 Arc::new(BooleanArray::from(vec![
345 None,
346 Some(true),
347 Some(false),
348 Some(false),
349 ]))
350 }
351
352 fn create_strings() -> Arc<StringArray> {
353 Arc::new(StringArray::from(vec![
354 None,
355 Some("foo"),
356 Some("bar"),
357 Some(""),
358 ]))
359 }
360}