use datafusion::arrow::array::{
Array, ArrayRef, DictionaryArray, GenericBinaryArray, StringArray, StringDictionaryBuilder,
};
use datafusion::arrow::datatypes::{DataType, Field, Int32Type, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::prelude::*;
use jsonb::RawJsonb;
use micromegas_analytics::dfext::jsonb::parse::make_jsonb_parse_udf;
use std::sync::Arc;
fn create_string_array(values: Vec<&str>) -> ArrayRef {
Arc::new(StringArray::from(values))
}
fn create_string_dictionary_array(values: Vec<&str>, keys: Vec<Option<i32>>) -> ArrayRef {
let mut builder = StringDictionaryBuilder::<Int32Type>::new();
for key in keys {
match key {
Some(idx) => {
builder.append_value(values[idx as usize]);
}
None => {
builder.append_null();
}
}
}
Arc::new(builder.finish())
}
fn create_record_batch(json_array: ArrayRef) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new(
"json_data",
json_array.data_type().clone(),
true,
)]));
RecordBatch::try_new(schema, vec![json_array]).expect("Failed to create RecordBatch")
}
async fn execute_jsonb_parse(batch: RecordBatch) -> Vec<Option<Vec<u8>>> {
let ctx = SessionContext::new();
ctx.register_udf(make_jsonb_parse_udf());
ctx.register_batch("test_table", batch)
.expect("Failed to register batch");
let df = ctx
.sql("SELECT jsonb_parse(json_data) as result FROM test_table")
.await
.expect("Failed to execute query");
let results = df.collect().await.expect("Failed to collect results");
assert_eq!(results.len(), 1, "Expected single result batch");
let result_batch = &results[0];
let result_array = result_batch.column(0);
assert!(
matches!(
result_array.data_type(),
DataType::Dictionary(_, inner) if matches!(inner.as_ref(), DataType::Binary)
),
"Expected Dictionary<Int32, Binary> result, got {:?}",
result_array.data_type()
);
let dict_array = result_array
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.expect("Expected DictionaryArray");
let binary_values = dict_array
.values()
.as_any()
.downcast_ref::<GenericBinaryArray<i32>>()
.expect("Expected BinaryArray values");
(0..dict_array.len())
.map(|i| {
if dict_array.is_null(i) {
None
} else {
let key_index = dict_array.keys().value(i) as usize;
Some(binary_values.value(key_index).to_vec())
}
})
.collect()
}
#[tokio::test]
async fn test_jsonb_parse_with_string_input() {
let json1 = r#"{"key": "value1"}"#;
let json2 = r#"{"key": "value2"}"#;
let input = create_string_array(vec![json1, json2]);
let batch = create_record_batch(input);
let results = execute_jsonb_parse(batch).await;
assert_eq!(results.len(), 2);
assert!(results[0].is_some());
assert!(results[1].is_some());
let jsonb1 = RawJsonb::new(results[0].as_ref().unwrap());
let owned1 = jsonb1.get_by_name("key", true).unwrap().unwrap();
let raw1 = owned1.as_raw();
let value1 = raw1.as_str().unwrap().unwrap();
assert_eq!(value1, "value1");
let jsonb2 = RawJsonb::new(results[1].as_ref().unwrap());
let owned2 = jsonb2.get_by_name("key", true).unwrap().unwrap();
let raw2 = owned2.as_raw();
let value2 = raw2.as_str().unwrap().unwrap();
assert_eq!(value2, "value2");
}
#[tokio::test]
async fn test_jsonb_parse_with_dictionary_input() {
let json1 = r#"{"status": "active"}"#;
let json2 = r#"{"status": "inactive"}"#;
let input = create_string_dictionary_array(
vec![json1, json2],
vec![Some(0), Some(1), Some(0), Some(1)],
);
let batch = create_record_batch(input);
let results = execute_jsonb_parse(batch).await;
assert_eq!(results.len(), 4);
for result in &results {
assert!(result.is_some());
}
let jsonb0 = RawJsonb::new(results[0].as_ref().unwrap());
let owned0 = jsonb0.get_by_name("status", true).unwrap().unwrap();
let raw0 = owned0.as_raw();
let status0 = raw0.as_str().unwrap().unwrap();
assert_eq!(status0, "active");
let jsonb1 = RawJsonb::new(results[1].as_ref().unwrap());
let owned1 = jsonb1.get_by_name("status", true).unwrap().unwrap();
let raw1 = owned1.as_raw();
let status1 = raw1.as_str().unwrap().unwrap();
assert_eq!(status1, "inactive");
let jsonb2 = RawJsonb::new(results[2].as_ref().unwrap());
let owned2 = jsonb2.get_by_name("status", true).unwrap().unwrap();
let raw2 = owned2.as_raw();
let status2 = raw2.as_str().unwrap().unwrap();
assert_eq!(status2, "active");
let jsonb3 = RawJsonb::new(results[3].as_ref().unwrap());
let owned3 = jsonb3.get_by_name("status", true).unwrap().unwrap();
let raw3 = owned3.as_raw();
let status3 = raw3.as_str().unwrap().unwrap();
assert_eq!(status3, "inactive");
}
#[tokio::test]
async fn test_jsonb_parse_with_nulls() {
let json1 = r#"{"key": "value"}"#;
let input = create_string_dictionary_array(vec![json1], vec![Some(0), None, Some(0)]);
let batch = create_record_batch(input);
let results = execute_jsonb_parse(batch).await;
assert_eq!(results.len(), 3);
assert!(results[0].is_some());
assert!(results[1].is_none()); assert!(results[2].is_some());
}
#[tokio::test]
async fn test_jsonb_parse_returns_dictionary_type() {
let json1 = r#"{"test": "data"}"#;
let input = create_string_array(vec![json1]);
let batch = create_record_batch(input);
let ctx = SessionContext::new();
ctx.register_udf(make_jsonb_parse_udf());
ctx.register_batch("test_table", batch)
.expect("Failed to register batch");
let df = ctx
.sql("SELECT jsonb_parse(json_data) as result FROM test_table")
.await
.expect("Failed to execute query");
let schema = df.schema();
let result_field = schema.field(0);
assert!(
matches!(
result_field.data_type(),
DataType::Dictionary(key_type, value_type)
if matches!(key_type.as_ref(), DataType::Int32)
&& matches!(value_type.as_ref(), DataType::Binary)
),
"Expected Dictionary<Int32, Binary> return type, got {:?}",
result_field.data_type()
);
}
#[tokio::test]
async fn test_jsonb_parse_with_invalid_json() {
let valid_json = r#"{"key": "value"}"#;
let invalid_json = r#"{"key": invalid}"#;
let input = create_string_array(vec![valid_json, invalid_json]);
let batch = create_record_batch(input);
let results = execute_jsonb_parse(batch).await;
assert_eq!(results.len(), 2);
assert!(results[0].is_some()); assert!(results[1].is_none()); }
#[tokio::test]
async fn test_jsonb_parse_various_json_types() {
let json_object = r#"{"a": 1, "b": 2}"#;
let json_array = r#"[1, 2, 3]"#;
let json_string = r#""hello""#;
let json_number = r#"42"#;
let json_bool = r#"true"#;
let json_null = r#"null"#;
let input = create_string_array(vec![
json_object,
json_array,
json_string,
json_number,
json_bool,
json_null,
]);
let batch = create_record_batch(input);
let results = execute_jsonb_parse(batch).await;
assert_eq!(results.len(), 6);
for (i, result) in results.iter().enumerate() {
assert!(result.is_some(), "Result {i} should not be null");
}
}