datafusion_comet_spark_expr/json_funcs/
to_json.rsuse crate::SparkCastOptions;
use crate::{spark_cast, EvalMode};
use arrow_array::builder::StringBuilder;
use arrow_array::{Array, ArrayRef, RecordBatch, StringArray, StructArray};
use arrow_schema::{DataType, Schema};
use datafusion_common::Result;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::PhysicalExpr;
use std::any::Any;
use std::fmt::{Debug, Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;
#[derive(Debug, Eq)]
pub struct ToJson {
expr: Arc<dyn PhysicalExpr>,
timezone: String,
}
impl Hash for ToJson {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.expr.hash(state);
self.timezone.hash(state);
}
}
impl PartialEq for ToJson {
fn eq(&self, other: &Self) -> bool {
self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone)
}
}
impl ToJson {
pub fn new(expr: Arc<dyn PhysicalExpr>, timezone: &str) -> Self {
Self {
expr,
timezone: timezone.to_owned(),
}
}
}
impl Display for ToJson {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "to_json({}, timezone={})", self.expr, self.timezone)
}
}
impl PartialEq<dyn Any> for ToJson {
fn eq(&self, other: &dyn Any) -> bool {
if let Some(other) = other.downcast_ref::<ToJson>() {
self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone)
} else {
false
}
}
}
impl PhysicalExpr for ToJson {
fn as_any(&self) -> &dyn Any {
self
}
fn data_type(&self, _: &Schema) -> Result<DataType> {
Ok(DataType::Utf8)
}
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
self.expr.nullable(input_schema)
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let input = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
Ok(ColumnarValue::Array(array_to_json_string(
&input,
&self.timezone,
)?))
}
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.expr]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
assert!(children.len() == 1);
Ok(Arc::new(Self::new(
Arc::clone(&children[0]),
&self.timezone,
)))
}
}
fn array_to_json_string(arr: &Arc<dyn Array>, timezone: &str) -> Result<ArrayRef> {
if let Some(struct_array) = arr.as_any().downcast_ref::<StructArray>() {
struct_to_json(struct_array, timezone)
} else {
spark_cast(
ColumnarValue::Array(Arc::clone(arr)),
&DataType::Utf8,
&SparkCastOptions::new(EvalMode::Legacy, timezone, false),
)?
.into_array(arr.len())
}
}
fn escape_string(input: &str) -> String {
let mut escaped_string = String::with_capacity(input.len());
let mut is_escaped = false;
for c in input.chars() {
match c {
'\"' | '\\' if !is_escaped => {
escaped_string.push('\\');
escaped_string.push(c);
is_escaped = false;
}
'\t' => {
escaped_string.push('\\');
escaped_string.push('t');
is_escaped = false;
}
'\r' => {
escaped_string.push('\\');
escaped_string.push('r');
is_escaped = false;
}
'\n' => {
escaped_string.push('\\');
escaped_string.push('n');
is_escaped = false;
}
'\x0C' => {
escaped_string.push('\\');
escaped_string.push('f');
is_escaped = false;
}
'\x08' => {
escaped_string.push('\\');
escaped_string.push('b');
is_escaped = false;
}
'\\' => {
escaped_string.push('\\');
is_escaped = true;
}
_ => {
escaped_string.push(c);
is_escaped = false;
}
}
}
escaped_string
}
fn struct_to_json(array: &StructArray, timezone: &str) -> Result<ArrayRef> {
let field_names: Vec<String> = array
.fields()
.iter()
.map(|f| escape_string(f.name().as_str()))
.collect();
let is_string: Vec<bool> = array
.fields()
.iter()
.map(|f| match f.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => true,
DataType::Dictionary(_, dt) => {
matches!(dt.as_ref(), DataType::Utf8 | DataType::LargeUtf8)
}
_ => false,
})
.collect();
let string_arrays: Vec<ArrayRef> = array
.columns()
.iter()
.map(|arr| array_to_json_string(arr, timezone))
.collect::<Result<Vec<_>>>()?;
let string_arrays: Vec<&StringArray> = string_arrays
.iter()
.map(|arr| {
arr.as_any()
.downcast_ref::<StringArray>()
.expect("string array")
})
.collect();
let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16);
let mut json = String::with_capacity(array.len() * 16);
for row_index in 0..array.len() {
if array.is_null(row_index) {
builder.append_null();
} else {
json.clear();
let mut any_fields_written = false;
json.push('{');
for col_index in 0..string_arrays.len() {
if !string_arrays[col_index].is_null(row_index) {
if any_fields_written {
json.push(',');
}
json.push('"');
json.push_str(&field_names[col_index]);
json.push_str("\":");
let string_value = string_arrays[col_index].value(row_index);
if is_string[col_index] {
json.push('"');
json.push_str(&escape_string(string_value));
json.push('"');
} else {
json.push_str(string_value);
}
any_fields_written = true;
}
}
json.push('}');
builder.append_value(&json);
}
}
Ok(Arc::new(builder.finish()))
}
#[cfg(test)]
mod test {
use crate::json_funcs::to_json::struct_to_json;
use arrow_array::types::Int32Type;
use arrow_array::{Array, PrimitiveArray, StringArray};
use arrow_array::{ArrayRef, BooleanArray, Int32Array, StructArray};
use arrow_schema::{DataType, Field};
use datafusion_common::Result;
use std::sync::Arc;
#[test]
fn test_primitives() -> Result<()> {
let bools: ArrayRef = create_bools();
let ints: ArrayRef = create_ints();
let strings: ArrayRef = create_strings();
let struct_array = StructArray::from(vec![
(Arc::new(Field::new("a", DataType::Boolean, true)), bools),
(Arc::new(Field::new("b", DataType::Int32, true)), ints),
(Arc::new(Field::new("c", DataType::Utf8, true)), strings),
]);
let json = struct_to_json(&struct_array, "UTC")?;
let json = json
.as_any()
.downcast_ref::<StringArray>()
.expect("string array");
assert_eq!(4, json.len());
assert_eq!(r#"{"b":123}"#, json.value(0));
assert_eq!(r#"{"a":true,"c":"foo"}"#, json.value(1));
assert_eq!(r#"{"a":false,"b":2147483647,"c":"bar"}"#, json.value(2));
assert_eq!(r#"{"a":false,"b":-2147483648,"c":""}"#, json.value(3));
Ok(())
}
#[test]
fn test_nested_struct() -> Result<()> {
let bools: ArrayRef = create_bools();
let ints: ArrayRef = create_ints();
let struct_fields = vec![
Arc::new(Field::new("a", DataType::Boolean, true)),
Arc::new(Field::new("b", DataType::Int32, true)),
];
let struct_values = vec![bools, ints];
let struct_array = StructArray::from(
struct_fields
.clone()
.into_iter()
.zip(struct_values)
.collect::<Vec<_>>(),
);
let struct_fields2 = vec![Arc::new(Field::new(
"a",
DataType::Struct(struct_fields.into()),
true,
))];
let struct_values2: Vec<ArrayRef> = vec![Arc::new(struct_array.clone())];
let struct_array2 = StructArray::from(
struct_fields2
.into_iter()
.zip(struct_values2)
.collect::<Vec<_>>(),
);
let json = struct_to_json(&struct_array2, "UTC")?;
let json = json
.as_any()
.downcast_ref::<StringArray>()
.expect("string array");
assert_eq!(4, json.len());
assert_eq!(r#"{"a":{"b":123}}"#, json.value(0));
assert_eq!(r#"{"a":{"a":true}}"#, json.value(1));
assert_eq!(r#"{"a":{"a":false,"b":2147483647}}"#, json.value(2));
assert_eq!(r#"{"a":{"a":false,"b":-2147483648}}"#, json.value(3));
Ok(())
}
fn create_ints() -> Arc<PrimitiveArray<Int32Type>> {
Arc::new(Int32Array::from(vec![
Some(123),
None,
Some(i32::MAX),
Some(i32::MIN),
]))
}
fn create_bools() -> Arc<BooleanArray> {
Arc::new(BooleanArray::from(vec![
None,
Some(true),
Some(false),
Some(false),
]))
}
fn create_strings() -> Arc<StringArray> {
Arc::new(StringArray::from(vec![
None,
Some("foo"),
Some("bar"),
Some(""),
]))
}
}