use arrow_array::builder::{
BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
};
use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray};
use arrow_schema::DataType;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{ScalarUDF, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datafusion::prelude::create_udf;
use std::sync::Arc;
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JsonbType {
Null = 0,
Boolean = 1,
Int64 = 2,
Float64 = 3,
String = 4,
Array = 5,
Object = 6,
}
impl JsonbType {
pub fn from_u8(value: u8) -> Option<Self> {
match value {
0 => Some(Self::Null),
1 => Some(Self::Boolean),
2 => Some(Self::Int64),
3 => Some(Self::Float64),
4 => Some(Self::String),
5 => Some(Self::Array),
6 => Some(Self::Object),
_ => None,
}
}
pub fn as_u8(self) -> u8 {
self as u8
}
}
mod common {
use super::*;
#[derive(Debug, Clone)]
pub enum KeyType {
Field(String),
Index(usize),
}
impl KeyType {
pub fn parse(key: &str) -> Self {
if let Ok(index) = key.parse::<usize>() {
Self::Index(index)
} else {
Self::Field(key.to_string())
}
}
}
pub fn columnar_to_arrays(args: &[ColumnarValue]) -> Vec<ArrayRef> {
args.iter()
.map(|arg| match arg {
ColumnarValue::Array(arr) => arr.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(),
})
.collect()
}
pub fn execution_error(msg: impl Into<String>) -> DataFusionError {
DataFusionError::Execution(msg.into())
}
pub fn validate_arg_count(
args: &[ArrayRef],
expected: usize,
function_name: &str,
) -> Result<()> {
if args.len() != expected {
return Err(execution_error(format!(
"{} requires exactly {} arguments",
function_name, expected
)));
}
Ok(())
}
pub fn extract_jsonb_array(args: &[ArrayRef]) -> Result<&LargeBinaryArray> {
args[0]
.as_any()
.downcast_ref::<LargeBinaryArray>()
.ok_or_else(|| execution_error("First argument must be LargeBinary"))
}
pub fn extract_string_array(args: &[ArrayRef], arg_index: usize) -> Result<&StringArray> {
args[arg_index]
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1)))
}
pub fn get_string_value_at(string_array: &StringArray, index: usize) -> Option<&str> {
let actual_index = if string_array.len() == 1 { 0 } else { index };
if string_array.is_null(actual_index) {
None
} else {
Some(string_array.value(actual_index))
}
}
pub fn get_json_value_by_key(
raw_jsonb: &jsonb::RawJsonb,
key_type: &KeyType,
) -> Result<Option<jsonb::OwnedJsonb>> {
match key_type {
KeyType::Field(field) => raw_jsonb
.get_by_name(field, false)
.map_err(|e| execution_error(format!("Failed to get field '{}': {}", field, e))),
KeyType::Index(index) => raw_jsonb.get_by_index(*index).map_err(|e| {
execution_error(format!("Failed to get array element [{}]: {}", index, e))
}),
}
}
pub fn parse_json_path(path: &str) -> Result<jsonb::jsonpath::JsonPath<'_>> {
jsonb::jsonpath::parse_json_path(path.as_bytes())
.map_err(|e| execution_error(format!("Invalid JSONPath '{}': {}", path, e)))
}
}
fn json_value_to_string(value: jsonb::OwnedJsonb) -> Result<Option<String>> {
let raw_jsonb = value.as_raw();
if raw_jsonb
.is_null()
.map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
{
return Ok(None);
}
raw_jsonb
.to_str()
.map(Some)
.map_err(|e| common::execution_error(format!("Failed to convert to string: {}", e)))
}
fn json_value_to_int(value: jsonb::OwnedJsonb) -> Result<Option<i64>> {
let raw_jsonb = value.as_raw();
if raw_jsonb
.is_null()
.map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
{
return Ok(None);
}
raw_jsonb
.to_i64()
.map(Some)
.map_err(|e| common::execution_error(format!("Failed to convert to integer: {}", e)))
}
fn json_value_to_float(value: jsonb::OwnedJsonb) -> Result<Option<f64>> {
let raw_jsonb = value.as_raw();
if raw_jsonb
.is_null()
.map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
{
return Ok(None);
}
raw_jsonb
.to_f64()
.map(Some)
.map_err(|e| common::execution_error(format!("Failed to convert to float: {}", e)))
}
fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result<Option<bool>> {
let raw_jsonb = value.as_raw();
if raw_jsonb
.is_null()
.map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
{
return Ok(None);
}
raw_jsonb
.to_bool()
.map(Some)
.map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e)))
}
pub fn json_extract_udf() -> ScalarUDF {
create_udf(
"json_extract",
vec![DataType::LargeBinary, DataType::Utf8],
DataType::Utf8,
Volatility::Immutable,
Arc::new(json_extract_columnar_impl),
)
}
pub fn json_extract_with_type_udf() -> ScalarUDF {
use arrow_schema::Fields;
let return_type = DataType::Struct(Fields::from(vec![
arrow_schema::Field::new("value", DataType::LargeBinary, true),
arrow_schema::Field::new("type_tag", DataType::UInt8, false),
]));
create_udf(
"json_extract_with_type",
vec![DataType::LargeBinary, DataType::Utf8],
return_type,
Volatility::Immutable,
Arc::new(json_extract_with_type_columnar_impl),
)
}
fn json_extract_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = common::columnar_to_arrays(args);
let result = json_extract_impl(&arrays)?;
Ok(ColumnarValue::Array(result))
}
fn json_extract_with_type_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = common::columnar_to_arrays(args);
let result = json_extract_with_type_impl(&arrays)?;
Ok(ColumnarValue::Array(result))
}
fn json_extract_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
common::validate_arg_count(args, 2, "json_extract")?;
let jsonb_array = common::extract_jsonb_array(args)?;
let path_array = common::extract_string_array(args, 1)?;
let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
for i in 0..jsonb_array.len() {
if jsonb_array.is_null(i) {
builder.append_null();
} else if let Some(path) = common::get_string_value_at(path_array, i) {
let jsonb_bytes = jsonb_array.value(i);
match extract_json_path(jsonb_bytes, path)? {
Some(value) => builder.append_value(&value),
None => builder.append_null(),
}
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
}
fn json_extract_with_type_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
use arrow_array::StructArray;
use arrow_array::builder::{LargeBinaryBuilder, UInt8Builder};
common::validate_arg_count(args, 2, "json_extract_with_type")?;
let jsonb_array = common::extract_jsonb_array(args)?;
let path_array = common::extract_string_array(args, 1)?;
let mut value_builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 1024);
let mut type_builder = UInt8Builder::with_capacity(jsonb_array.len());
for i in 0..jsonb_array.len() {
if jsonb_array.is_null(i) {
value_builder.append_null();
type_builder.append_value(JsonbType::Null.as_u8());
} else if let Some(path) = common::get_string_value_at(path_array, i) {
let jsonb_bytes = jsonb_array.value(i);
match extract_json_path_with_type(jsonb_bytes, path)? {
Some((value_bytes, type_tag)) => {
value_builder.append_value(&value_bytes);
type_builder.append_value(type_tag);
}
None => {
value_builder.append_null();
type_builder.append_value(JsonbType::Null.as_u8());
}
}
} else {
value_builder.append_null();
type_builder.append_value(JsonbType::Null.as_u8());
}
}
let value_array = Arc::new(value_builder.finish()) as ArrayRef;
let type_array = Arc::new(type_builder.finish()) as ArrayRef;
let struct_array = StructArray::from(vec![
(
Arc::new(arrow_schema::Field::new(
"value",
DataType::LargeBinary,
true,
)),
value_array,
),
(
Arc::new(arrow_schema::Field::new("type_tag", DataType::UInt8, false)),
type_array,
),
]);
Ok(Arc::new(struct_array))
}
fn extract_json_path_with_type(jsonb_bytes: &[u8], path: &str) -> Result<Option<(Vec<u8>, u8)>> {
let json_path = common::parse_json_path(path)?;
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
match selector.select_values(&json_path) {
Ok(values) => {
if values.is_empty() {
Ok(None)
} else {
let owned_value = &values[0];
let raw = owned_value.as_raw();
let jsonb_type = if raw.is_null().unwrap_or(false) {
JsonbType::Null
} else if raw.is_boolean().unwrap_or(false) {
JsonbType::Boolean
} else if raw.is_number().unwrap_or(false) {
if raw.to_i64().is_ok() {
JsonbType::Int64
} else {
JsonbType::Float64
}
} else if raw.is_string().unwrap_or(false) {
JsonbType::String
} else if raw.is_array().unwrap_or(false) {
JsonbType::Array
} else if raw.is_object().unwrap_or(false) {
JsonbType::Object
} else {
JsonbType::String };
Ok(Some((owned_value.clone().to_vec(), jsonb_type.as_u8())))
}
}
Err(e) => Err(common::execution_error(format!(
"Failed to select values from path '{}': {}",
path, e
))),
}
}
fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result<Option<String>> {
let json_path = common::parse_json_path(path)?;
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
match selector.select_values(&json_path) {
Ok(values) => {
if values.is_empty() {
Ok(None)
} else {
Ok(Some(values[0].to_string()))
}
}
Err(e) => Err(common::execution_error(format!(
"Failed to select values from path '{}': {}",
path, e
))),
}
}
pub fn json_exists_udf() -> ScalarUDF {
create_udf(
"json_exists",
vec![DataType::LargeBinary, DataType::Utf8],
DataType::Boolean,
Volatility::Immutable,
Arc::new(json_exists_columnar_impl),
)
}
fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = common::columnar_to_arrays(args);
let result = json_exists_impl(&arrays)?;
Ok(ColumnarValue::Array(result))
}
fn json_exists_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
common::validate_arg_count(args, 2, "json_exists")?;
let jsonb_array = common::extract_jsonb_array(args)?;
let path_array = common::extract_string_array(args, 1)?;
let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
for i in 0..jsonb_array.len() {
if jsonb_array.is_null(i) {
builder.append_null();
} else if let Some(path) = common::get_string_value_at(path_array, i) {
let jsonb_bytes = jsonb_array.value(i);
let exists = check_json_path_exists(jsonb_bytes, path)?;
builder.append_value(exists);
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
}
fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result<bool> {
let json_path = common::parse_json_path(path)?;
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
match selector.exists(&json_path) {
Ok(exists) => Ok(exists),
Err(e) => Err(common::execution_error(format!(
"Failed to check existence of path '{}': {}",
path, e
))),
}
}
pub fn json_get_udf() -> ScalarUDF {
create_udf(
"json_get",
vec![DataType::LargeBinary, DataType::Utf8],
DataType::LargeBinary,
Volatility::Immutable,
Arc::new(json_get_columnar_impl),
)
}
fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = common::columnar_to_arrays(args);
let result = json_get_impl(&arrays)?;
Ok(ColumnarValue::Array(result))
}
fn json_get_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
common::validate_arg_count(args, 2, "json_get")?;
let jsonb_array = common::extract_jsonb_array(args)?;
let key_array = common::extract_string_array(args, 1)?;
let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0);
for i in 0..jsonb_array.len() {
if jsonb_array.is_null(i) {
builder.append_null();
} else if let Some(key) = common::get_string_value_at(key_array, i) {
let jsonb_bytes = jsonb_array.value(i);
let key_type = common::KeyType::parse(key);
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
Some(value) => builder.append_value(value.as_raw().as_ref()),
None => builder.append_null(),
}
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
}
pub fn json_get_string_udf() -> ScalarUDF {
create_udf(
"json_get_string",
vec![DataType::LargeBinary, DataType::Utf8],
DataType::Utf8,
Volatility::Immutable,
Arc::new(json_get_string_columnar_impl),
)
}
fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = common::columnar_to_arrays(args);
let result = json_get_string_impl(&arrays)?;
Ok(ColumnarValue::Array(result))
}
fn json_get_string_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
common::validate_arg_count(args, 2, "json_get_string")?;
let jsonb_array = common::extract_jsonb_array(args)?;
let key_array = common::extract_string_array(args, 1)?;
let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
for i in 0..jsonb_array.len() {
if jsonb_array.is_null(i) {
builder.append_null();
} else if let Some(key) = common::get_string_value_at(key_array, i) {
let jsonb_bytes = jsonb_array.value(i);
let key_type = common::KeyType::parse(key);
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
Some(value) => match json_value_to_string(value)? {
Some(string_val) => builder.append_value(&string_val),
None => builder.append_null(),
},
None => builder.append_null(),
}
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
}
pub fn json_get_int_udf() -> ScalarUDF {
create_udf(
"json_get_int",
vec![DataType::LargeBinary, DataType::Utf8],
DataType::Int64,
Volatility::Immutable,
Arc::new(json_get_int_columnar_impl),
)
}
fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = common::columnar_to_arrays(args);
let result = json_get_int_impl(&arrays)?;
Ok(ColumnarValue::Array(result))
}
fn json_get_int_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
common::validate_arg_count(args, 2, "json_get_int")?;
let jsonb_array = common::extract_jsonb_array(args)?;
let key_array = common::extract_string_array(args, 1)?;
let mut builder = Int64Builder::with_capacity(jsonb_array.len());
for i in 0..jsonb_array.len() {
if jsonb_array.is_null(i) {
builder.append_null();
} else if let Some(key) = common::get_string_value_at(key_array, i) {
let jsonb_bytes = jsonb_array.value(i);
let key_type = common::KeyType::parse(key);
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
Some(value) => match json_value_to_int(value)? {
Some(int_val) => builder.append_value(int_val),
None => builder.append_null(),
},
None => builder.append_null(),
}
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
}
pub fn json_get_float_udf() -> ScalarUDF {
create_udf(
"json_get_float",
vec![DataType::LargeBinary, DataType::Utf8],
DataType::Float64,
Volatility::Immutable,
Arc::new(json_get_float_columnar_impl),
)
}
fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = common::columnar_to_arrays(args);
let result = json_get_float_impl(&arrays)?;
Ok(ColumnarValue::Array(result))
}
fn json_get_float_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
common::validate_arg_count(args, 2, "json_get_float")?;
let jsonb_array = common::extract_jsonb_array(args)?;
let key_array = common::extract_string_array(args, 1)?;
let mut builder = Float64Builder::with_capacity(jsonb_array.len());
for i in 0..jsonb_array.len() {
if jsonb_array.is_null(i) {
builder.append_null();
} else if let Some(key) = common::get_string_value_at(key_array, i) {
let jsonb_bytes = jsonb_array.value(i);
let key_type = common::KeyType::parse(key);
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
Some(value) => match json_value_to_float(value)? {
Some(float_val) => builder.append_value(float_val),
None => builder.append_null(),
},
None => builder.append_null(),
}
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
}
pub fn json_get_bool_udf() -> ScalarUDF {
create_udf(
"json_get_bool",
vec![DataType::LargeBinary, DataType::Utf8],
DataType::Boolean,
Volatility::Immutable,
Arc::new(json_get_bool_columnar_impl),
)
}
fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = common::columnar_to_arrays(args);
let result = json_get_bool_impl(&arrays)?;
Ok(ColumnarValue::Array(result))
}
fn json_get_bool_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
common::validate_arg_count(args, 2, "json_get_bool")?;
let jsonb_array = common::extract_jsonb_array(args)?;
let key_array = common::extract_string_array(args, 1)?;
let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
for i in 0..jsonb_array.len() {
if jsonb_array.is_null(i) {
builder.append_null();
} else if let Some(key) = common::get_string_value_at(key_array, i) {
let jsonb_bytes = jsonb_array.value(i);
let key_type = common::KeyType::parse(key);
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
Some(value) => match json_value_to_bool(value)? {
Some(bool_val) => builder.append_value(bool_val),
None => builder.append_null(),
},
None => builder.append_null(),
}
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
}
pub fn json_array_contains_udf() -> ScalarUDF {
create_udf(
"json_array_contains",
vec![DataType::LargeBinary, DataType::Utf8, DataType::Utf8],
DataType::Boolean,
Volatility::Immutable,
Arc::new(json_array_contains_columnar_impl),
)
}
fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = common::columnar_to_arrays(args);
let result = json_array_contains_impl(&arrays)?;
Ok(ColumnarValue::Array(result))
}
fn json_array_contains_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
common::validate_arg_count(args, 3, "json_array_contains")?;
let jsonb_array = common::extract_jsonb_array(args)?;
let path_array = common::extract_string_array(args, 1)?;
let value_array = common::extract_string_array(args, 2)?;
let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
for i in 0..jsonb_array.len() {
if jsonb_array.is_null(i) {
builder.append_null();
} else {
let path = common::get_string_value_at(path_array, i);
let value = common::get_string_value_at(value_array, i);
match (path, value) {
(Some(p), Some(v)) => {
let jsonb_bytes = jsonb_array.value(i);
let contains = check_array_contains(jsonb_bytes, p, v)?;
builder.append_value(contains);
}
_ => builder.append_null(),
}
}
}
Ok(Arc::new(builder.finish()))
}
fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result<bool> {
let json_path = common::parse_json_path(path)?;
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
match selector.select_values(&json_path) {
Ok(values) => {
for v in values {
let raw = v.as_raw();
let mut index = 0;
loop {
match raw.get_by_index(index) {
Ok(Some(elem)) => {
let elem_str = elem.to_string();
if elem_str == value || elem_str == format!("\"{}\"", value) {
return Ok(true);
}
index += 1;
}
Ok(None) => break, Err(_) => break, }
}
}
Ok(false)
}
Err(e) => Err(common::execution_error(format!(
"Failed to check array contains at path '{}': {}",
path, e
))),
}
}
pub fn json_array_length_udf() -> ScalarUDF {
create_udf(
"json_array_length",
vec![DataType::LargeBinary, DataType::Utf8],
DataType::Int64,
Volatility::Immutable,
Arc::new(json_array_length_columnar_impl),
)
}
fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays = common::columnar_to_arrays(args);
let result = json_array_length_impl(&arrays)?;
Ok(ColumnarValue::Array(result))
}
fn json_array_length_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
common::validate_arg_count(args, 2, "json_array_length")?;
let jsonb_array = common::extract_jsonb_array(args)?;
let path_array = common::extract_string_array(args, 1)?;
let mut builder = Int64Builder::with_capacity(jsonb_array.len());
for i in 0..jsonb_array.len() {
if jsonb_array.is_null(i) {
builder.append_null();
} else if let Some(path) = common::get_string_value_at(path_array, i) {
let jsonb_bytes = jsonb_array.value(i);
match get_array_length(jsonb_bytes, path)? {
Some(len) => builder.append_value(len),
None => builder.append_null(),
}
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
}
fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result<Option<i64>> {
let json_path = common::parse_json_path(path)?;
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
match selector.select_values(&json_path) {
Ok(values) => {
if values.is_empty() {
return Ok(None);
}
let first = &values[0];
let raw = first.as_raw();
let mut count = 0;
loop {
match raw.get_by_index(count) {
Ok(Some(_)) => count += 1,
Ok(None) => break, Err(_) => {
if count == 0 {
return Err(common::execution_error(format!(
"Path '{}' does not point to an array",
path
)));
}
break;
}
}
}
Ok(Some(count as i64))
}
Err(e) => Err(common::execution_error(format!(
"Failed to get array length at path '{}': {}",
path, e
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::builder::LargeBinaryBuilder;
use arrow_array::{BooleanArray, Int64Array};
fn create_test_jsonb(json_str: &str) -> Vec<u8> {
jsonb::parse_value(json_str.as_bytes()).unwrap().to_vec()
}
#[test]
fn test_jsonb_type_enum() {
assert_eq!(JsonbType::Null.as_u8(), 0);
assert_eq!(JsonbType::Boolean.as_u8(), 1);
assert_eq!(JsonbType::Int64.as_u8(), 2);
assert_eq!(JsonbType::Float64.as_u8(), 3);
assert_eq!(JsonbType::String.as_u8(), 4);
assert_eq!(JsonbType::Array.as_u8(), 5);
assert_eq!(JsonbType::Object.as_u8(), 6);
assert_eq!(JsonbType::from_u8(0), Some(JsonbType::Null));
assert_eq!(JsonbType::from_u8(1), Some(JsonbType::Boolean));
assert_eq!(JsonbType::from_u8(2), Some(JsonbType::Int64));
assert_eq!(JsonbType::from_u8(3), Some(JsonbType::Float64));
assert_eq!(JsonbType::from_u8(4), Some(JsonbType::String));
assert_eq!(JsonbType::from_u8(5), Some(JsonbType::Array));
assert_eq!(JsonbType::from_u8(6), Some(JsonbType::Object));
assert_eq!(JsonbType::from_u8(7), None); }
#[tokio::test]
async fn test_json_extract_udf() -> Result<()> {
let json = r#"{"user": {"name": "Alice", "age": 30}}"#;
let jsonb_bytes = create_test_jsonb(json);
let mut binary_builder = LargeBinaryBuilder::new();
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_null();
let jsonb_array = Arc::new(binary_builder.finish());
let path_array = Arc::new(StringArray::from(vec![
Some("$.user.name"),
Some("$.user.age"),
Some("$.user.name"),
]));
let result = json_extract_impl(&[jsonb_array, path_array])?;
let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_array.len(), 3);
assert_eq!(string_array.value(0), "\"Alice\"");
assert_eq!(string_array.value(1), "30");
assert!(string_array.is_null(2));
Ok(())
}
#[tokio::test]
async fn test_json_exists_udf() -> Result<()> {
let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["rust", "json"]}"#;
let jsonb_bytes = create_test_jsonb(json);
let mut binary_builder = LargeBinaryBuilder::new();
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_null();
let jsonb_array = Arc::new(binary_builder.finish());
let path_array = Arc::new(StringArray::from(vec![
Some("$.user.name"),
Some("$.user.email"),
Some("$.tags"),
Some("$.any"),
]));
let result = json_exists_impl(&[jsonb_array, path_array])?;
let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
assert_eq!(bool_array.len(), 4);
assert!(bool_array.value(0));
assert!(!bool_array.value(1));
assert!(bool_array.value(2));
assert!(bool_array.is_null(3));
Ok(())
}
#[tokio::test]
async fn test_json_get_string_udf() -> Result<()> {
let json = r#"{"str": "hello", "num": 123, "bool": true, "null": null}"#;
let jsonb_bytes = create_test_jsonb(json);
let mut binary_builder = LargeBinaryBuilder::new();
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
let jsonb_array = Arc::new(binary_builder.finish());
let key_array = Arc::new(StringArray::from(vec![
Some("str"),
Some("num"),
Some("bool"),
Some("null"),
]));
let result = json_get_string_impl(&[jsonb_array, key_array])?;
let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_array.len(), 4);
assert_eq!(string_array.value(0), "hello");
assert_eq!(string_array.value(1), "123");
assert_eq!(string_array.value(2), "true");
assert!(string_array.is_null(3));
Ok(())
}
#[tokio::test]
async fn test_json_get_int_udf() -> Result<()> {
let json = r#"{"int": 42, "str_num": "99", "bool": true}"#;
let jsonb_bytes = create_test_jsonb(json);
let mut binary_builder = LargeBinaryBuilder::new();
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
let jsonb_array = Arc::new(binary_builder.finish());
let key_array = Arc::new(StringArray::from(vec![
Some("int"),
Some("str_num"),
Some("bool"),
]));
let result = json_get_int_impl(&[jsonb_array, key_array])?;
let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int_array.len(), 3);
assert_eq!(int_array.value(0), 42);
assert_eq!(int_array.value(1), 99);
assert_eq!(int_array.value(2), 1);
Ok(())
}
#[tokio::test]
async fn test_json_get_bool_udf() -> Result<()> {
let json =
r#"{"bool_true": true, "bool_false": false, "str_true": "true", "str_false": "false"}"#;
let jsonb_bytes = create_test_jsonb(json);
let mut binary_builder = LargeBinaryBuilder::new();
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
let jsonb_array = Arc::new(binary_builder.finish());
let key_array = Arc::new(StringArray::from(vec![
Some("bool_true"),
Some("bool_false"),
Some("str_true"),
Some("str_false"),
]));
let result = json_get_bool_impl(&[jsonb_array, key_array])?;
let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
assert_eq!(bool_array.len(), 4);
assert!(bool_array.value(0));
assert!(!bool_array.value(1));
assert!(bool_array.value(2)); assert!(!bool_array.value(3));
Ok(())
}
#[tokio::test]
async fn test_json_array_contains_udf() -> Result<()> {
let json = r#"{"tags": ["rust", "json", "database"], "nums": [1, 2, 3]}"#;
let jsonb_bytes = create_test_jsonb(json);
let mut binary_builder = LargeBinaryBuilder::new();
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_null();
let jsonb_array = Arc::new(binary_builder.finish());
let path_array = Arc::new(StringArray::from(vec![
Some("$.tags"),
Some("$.tags"),
Some("$.nums"),
Some("$.tags"),
]));
let value_array = Arc::new(StringArray::from(vec![
Some("rust"),
Some("python"),
Some("2"),
Some("any"),
]));
let result = json_array_contains_impl(&[jsonb_array, path_array, value_array])?;
let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
assert_eq!(bool_array.len(), 4);
assert!(bool_array.value(0));
assert!(!bool_array.value(1));
assert!(bool_array.value(2));
assert!(bool_array.is_null(3));
Ok(())
}
#[tokio::test]
async fn test_json_array_length_udf() -> Result<()> {
let json = r#"{"empty": [], "tags": ["a", "b", "c"], "nested": {"arr": [1, 2]}}"#;
let jsonb_bytes = create_test_jsonb(json);
let mut binary_builder = LargeBinaryBuilder::new();
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_null();
let jsonb_array = Arc::new(binary_builder.finish());
let path_array = Arc::new(StringArray::from(vec![
Some("$.empty"),
Some("$.tags"),
Some("$.nested.arr"),
Some("$.any"),
]));
let result = json_array_length_impl(&[jsonb_array, path_array])?;
let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int_array.len(), 4);
assert_eq!(int_array.value(0), 0);
assert_eq!(int_array.value(1), 3);
assert_eq!(int_array.value(2), 2);
assert!(int_array.is_null(3));
Ok(())
}
#[tokio::test]
async fn test_json_array_access() -> Result<()> {
let json = r#"["first", "second", "third"]"#;
let jsonb_bytes = create_test_jsonb(json);
let mut binary_builder = LargeBinaryBuilder::new();
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
binary_builder.append_value(&jsonb_bytes);
let jsonb_array = Arc::new(binary_builder.finish());
let key_array = Arc::new(StringArray::from(vec![
Some("0"),
Some("1"),
Some("10"), ]));
let result = json_get_string_impl(&[jsonb_array, key_array])?;
let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(string_array.len(), 3);
assert_eq!(string_array.value(0), "first");
assert_eq!(string_array.value(1), "second");
assert!(string_array.is_null(2));
Ok(())
}
}