datafusion_comet_spark_expr/json_funcs/
to_json.rs1use crate::SparkCastOptions;
23use crate::{spark_cast, EvalMode};
24use arrow_array::builder::StringBuilder;
25use arrow_array::{Array, ArrayRef, RecordBatch, StringArray, StructArray};
26use arrow_schema::{DataType, Schema};
27use datafusion_common::Result;
28use datafusion_expr::ColumnarValue;
29use datafusion_physical_expr::PhysicalExpr;
30use std::any::Any;
31use std::fmt::{Debug, Display, Formatter};
32use std::hash::Hash;
33use std::sync::Arc;
34
35#[derive(Debug, Eq)]
37pub struct ToJson {
38 expr: Arc<dyn PhysicalExpr>,
40 timezone: String,
42}
43
44impl Hash for ToJson {
45 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
46 self.expr.hash(state);
47 self.timezone.hash(state);
48 }
49}
50impl PartialEq for ToJson {
51 fn eq(&self, other: &Self) -> bool {
52 self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone)
53 }
54}
55
56impl ToJson {
57 pub fn new(expr: Arc<dyn PhysicalExpr>, timezone: &str) -> Self {
58 Self {
59 expr,
60 timezone: timezone.to_owned(),
61 }
62 }
63}
64
65impl Display for ToJson {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 write!(f, "to_json({}, timezone={})", self.expr, self.timezone)
68 }
69}
70
71impl PartialEq<dyn Any> for ToJson {
72 fn eq(&self, other: &dyn Any) -> bool {
73 if let Some(other) = other.downcast_ref::<ToJson>() {
74 self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone)
75 } else {
76 false
77 }
78 }
79}
80
81impl PhysicalExpr for ToJson {
82 fn as_any(&self) -> &dyn Any {
83 self
84 }
85
86 fn data_type(&self, _: &Schema) -> Result<DataType> {
87 Ok(DataType::Utf8)
88 }
89
90 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
91 self.expr.nullable(input_schema)
92 }
93
94 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
95 let input = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
96 Ok(ColumnarValue::Array(array_to_json_string(
97 &input,
98 &self.timezone,
99 )?))
100 }
101
102 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
103 vec![&self.expr]
104 }
105
106 fn with_new_children(
107 self: Arc<Self>,
108 children: Vec<Arc<dyn PhysicalExpr>>,
109 ) -> Result<Arc<dyn PhysicalExpr>> {
110 assert!(children.len() == 1);
111 Ok(Arc::new(Self::new(
112 Arc::clone(&children[0]),
113 &self.timezone,
114 )))
115 }
116}
117
118fn array_to_json_string(arr: &Arc<dyn Array>, timezone: &str) -> Result<ArrayRef> {
120 if let Some(struct_array) = arr.as_any().downcast_ref::<StructArray>() {
121 struct_to_json(struct_array, timezone)
122 } else {
123 spark_cast(
124 ColumnarValue::Array(Arc::clone(arr)),
125 &DataType::Utf8,
126 &SparkCastOptions::new(EvalMode::Legacy, timezone, false),
127 )?
128 .into_array(arr.len())
129 }
130}
131
132fn escape_string(input: &str) -> String {
133 let mut escaped_string = String::with_capacity(input.len());
134 let mut is_escaped = false;
135 for c in input.chars() {
136 match c {
137 '\"' | '\\' if !is_escaped => {
138 escaped_string.push('\\');
139 escaped_string.push(c);
140 is_escaped = false;
141 }
142 '\t' => {
143 escaped_string.push('\\');
144 escaped_string.push('t');
145 is_escaped = false;
146 }
147 '\r' => {
148 escaped_string.push('\\');
149 escaped_string.push('r');
150 is_escaped = false;
151 }
152 '\n' => {
153 escaped_string.push('\\');
154 escaped_string.push('n');
155 is_escaped = false;
156 }
157 '\x0C' => {
158 escaped_string.push('\\');
159 escaped_string.push('f');
160 is_escaped = false;
161 }
162 '\x08' => {
163 escaped_string.push('\\');
164 escaped_string.push('b');
165 is_escaped = false;
166 }
167 '\\' => {
168 escaped_string.push('\\');
169 is_escaped = true;
170 }
171 _ => {
172 escaped_string.push(c);
173 is_escaped = false;
174 }
175 }
176 }
177 escaped_string
178}
179
180fn struct_to_json(array: &StructArray, timezone: &str) -> Result<ArrayRef> {
181 let field_names: Vec<String> = array
183 .fields()
184 .iter()
185 .map(|f| escape_string(f.name().as_str()))
186 .collect();
187 let is_string: Vec<bool> = array
189 .fields()
190 .iter()
191 .map(|f| match f.data_type() {
192 DataType::Utf8 | DataType::LargeUtf8 => true,
193 DataType::Dictionary(_, dt) => {
194 matches!(dt.as_ref(), DataType::Utf8 | DataType::LargeUtf8)
195 }
196 _ => false,
197 })
198 .collect();
199 let string_arrays: Vec<ArrayRef> = array
201 .columns()
202 .iter()
203 .map(|arr| array_to_json_string(arr, timezone))
204 .collect::<Result<Vec<_>>>()?;
205 let string_arrays: Vec<&StringArray> = string_arrays
206 .iter()
207 .map(|arr| {
208 arr.as_any()
209 .downcast_ref::<StringArray>()
210 .expect("string array")
211 })
212 .collect();
213 let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16);
215 let mut json = String::with_capacity(array.len() * 16);
216 for row_index in 0..array.len() {
217 if array.is_null(row_index) {
218 builder.append_null();
219 } else {
220 json.clear();
221 let mut any_fields_written = false;
222 json.push('{');
223 for col_index in 0..string_arrays.len() {
224 if !string_arrays[col_index].is_null(row_index) {
225 if any_fields_written {
226 json.push(',');
227 }
228 json.push('"');
230 json.push_str(&field_names[col_index]);
231 json.push_str("\":");
232 let string_value = string_arrays[col_index].value(row_index);
234 if is_string[col_index] {
235 json.push('"');
236 json.push_str(&escape_string(string_value));
237 json.push('"');
238 } else {
239 json.push_str(string_value);
240 }
241 any_fields_written = true;
242 }
243 }
244 json.push('}');
245 builder.append_value(&json);
246 }
247 }
248 Ok(Arc::new(builder.finish()))
249}
250
251#[cfg(test)]
252mod test {
253 use crate::json_funcs::to_json::struct_to_json;
254 use arrow_array::types::Int32Type;
255 use arrow_array::{Array, PrimitiveArray, StringArray};
256 use arrow_array::{ArrayRef, BooleanArray, Int32Array, StructArray};
257 use arrow_schema::{DataType, Field};
258 use datafusion_common::Result;
259 use std::sync::Arc;
260
261 #[test]
262 fn test_primitives() -> Result<()> {
263 let bools: ArrayRef = create_bools();
264 let ints: ArrayRef = create_ints();
265 let strings: ArrayRef = create_strings();
266 let struct_array = StructArray::from(vec![
267 (Arc::new(Field::new("a", DataType::Boolean, true)), bools),
268 (Arc::new(Field::new("b", DataType::Int32, true)), ints),
269 (Arc::new(Field::new("c", DataType::Utf8, true)), strings),
270 ]);
271 let json = struct_to_json(&struct_array, "UTC")?;
272 let json = json
273 .as_any()
274 .downcast_ref::<StringArray>()
275 .expect("string array");
276 assert_eq!(4, json.len());
277 assert_eq!(r#"{"b":123}"#, json.value(0));
278 assert_eq!(r#"{"a":true,"c":"foo"}"#, json.value(1));
279 assert_eq!(r#"{"a":false,"b":2147483647,"c":"bar"}"#, json.value(2));
280 assert_eq!(r#"{"a":false,"b":-2147483648,"c":""}"#, json.value(3));
281 Ok(())
282 }
283
284 #[test]
285 fn test_nested_struct() -> Result<()> {
286 let bools: ArrayRef = create_bools();
287 let ints: ArrayRef = create_ints();
288
289 let struct_fields = vec![
291 Arc::new(Field::new("a", DataType::Boolean, true)),
292 Arc::new(Field::new("b", DataType::Int32, true)),
293 ];
294 let struct_values = vec![bools, ints];
295 let struct_array = StructArray::from(
296 struct_fields
297 .clone()
298 .into_iter()
299 .zip(struct_values)
300 .collect::<Vec<_>>(),
301 );
302
303 let struct_fields2 = vec![Arc::new(Field::new(
305 "a",
306 DataType::Struct(struct_fields.into()),
307 true,
308 ))];
309 let struct_values2: Vec<ArrayRef> = vec![Arc::new(struct_array.clone())];
310 let struct_array2 = StructArray::from(
311 struct_fields2
312 .into_iter()
313 .zip(struct_values2)
314 .collect::<Vec<_>>(),
315 );
316
317 let json = struct_to_json(&struct_array2, "UTC")?;
318 let json = json
319 .as_any()
320 .downcast_ref::<StringArray>()
321 .expect("string array");
322 assert_eq!(4, json.len());
323 assert_eq!(r#"{"a":{"b":123}}"#, json.value(0));
324 assert_eq!(r#"{"a":{"a":true}}"#, json.value(1));
325 assert_eq!(r#"{"a":{"a":false,"b":2147483647}}"#, json.value(2));
326 assert_eq!(r#"{"a":{"a":false,"b":-2147483648}}"#, json.value(3));
327 Ok(())
328 }
329
330 fn create_ints() -> Arc<PrimitiveArray<Int32Type>> {
331 Arc::new(Int32Array::from(vec![
332 Some(123),
333 None,
334 Some(i32::MAX),
335 Some(i32::MIN),
336 ]))
337 }
338
339 fn create_bools() -> Arc<BooleanArray> {
340 Arc::new(BooleanArray::from(vec![
341 None,
342 Some(true),
343 Some(false),
344 Some(false),
345 ]))
346 }
347
348 fn create_strings() -> Arc<StringArray> {
349 Arc::new(StringArray::from(vec![
350 None,
351 Some("foo"),
352 Some("bar"),
353 Some(""),
354 ]))
355 }
356}