use anyhow::{Result, anyhow};
use datafusion::arrow::array::{Array, ArrayRef, GenericListArray, RecordBatch};
use datafusion::arrow::datatypes::DataType;
use std::sync::Arc;
use crate::arrow_properties::{read_property_list, serialize_properties_to_jsonb};
use crate::dfext::binary_column_accessor::{BinaryColumnAccessor, create_binary_accessor};
use std::collections::HashMap;
pub trait PropertiesColumnAccessor: Send + std::fmt::Debug {
fn jsonb_value(&self, index: usize) -> Result<Vec<u8>>;
fn len(&self) -> usize;
fn is_null(&self, index: usize) -> bool;
fn is_empty(&self) -> bool {
self.len() == 0
}
}
struct JsonbColumnAccessor {
binary_accessor: Box<dyn BinaryColumnAccessor + Send>,
}
impl JsonbColumnAccessor {
fn new(binary_accessor: Box<dyn BinaryColumnAccessor + Send>) -> Self {
Self { binary_accessor }
}
}
impl PropertiesColumnAccessor for JsonbColumnAccessor {
fn jsonb_value(&self, index: usize) -> Result<Vec<u8>> {
Ok(self.binary_accessor.value(index).to_vec())
}
fn len(&self) -> usize {
self.binary_accessor.len()
}
fn is_null(&self, index: usize) -> bool {
self.binary_accessor.is_null(index)
}
}
impl std::fmt::Debug for JsonbColumnAccessor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JsonbColumnAccessor")
.field("len", &self.binary_accessor.len())
.finish()
}
}
#[derive(Debug)]
struct StructArrayAccessor {
array: Arc<GenericListArray<i32>>,
}
impl StructArrayAccessor {
fn new(array: Arc<GenericListArray<i32>>) -> Self {
Self { array }
}
}
impl PropertiesColumnAccessor for StructArrayAccessor {
fn jsonb_value(&self, index: usize) -> Result<Vec<u8>> {
let property_list_array = self.array.value(index);
let properties = read_property_list(property_list_array)?;
let properties_map: HashMap<String, String> = properties
.into_iter()
.map(|prop| (prop.key_str().to_string(), prop.value_str().to_string()))
.collect();
serialize_properties_to_jsonb(&properties_map)
}
fn len(&self) -> usize {
self.array.len()
}
fn is_null(&self, index: usize) -> bool {
self.array.is_null(index)
}
}
pub fn create_properties_accessor(
array: &ArrayRef,
) -> Result<Box<dyn PropertiesColumnAccessor + Send>> {
match array.data_type() {
DataType::Dictionary(key_type, value_type) => {
if matches!(key_type.as_ref(), DataType::Int32)
&& matches!(value_type.as_ref(), DataType::Binary)
{
let binary_accessor = create_binary_accessor(array)?;
Ok(Box::new(JsonbColumnAccessor::new(binary_accessor)))
} else {
Err(anyhow!(
"Unsupported dictionary format for properties: key={:?}, value={:?}",
key_type,
value_type
))
}
}
DataType::List(field) => {
if let DataType::Struct(struct_fields) = field.data_type() {
let has_key = struct_fields.iter().any(|f| f.name() == "key");
let has_value = struct_fields.iter().any(|f| f.name() == "value");
if has_key && has_value {
let list_array = array
.as_any()
.downcast_ref::<GenericListArray<i32>>()
.ok_or_else(|| anyhow!("Failed to downcast to GenericListArray<i32>"))?
.clone();
Ok(Box::new(StructArrayAccessor::new(Arc::new(list_array))))
} else {
Err(anyhow!(
"List array does not contain struct with key/value fields"
))
}
} else {
Err(anyhow!(
"List array does not contain struct elements: {:?}",
field.data_type()
))
}
}
DataType::Binary => {
let binary_accessor = create_binary_accessor(array)?;
Ok(Box::new(JsonbColumnAccessor::new(binary_accessor)))
}
_ => Err(anyhow!(
"Unsupported array type for properties accessor: {:?}",
array.data_type()
)),
}
}
pub fn properties_column_by_name(
batch: &RecordBatch,
name: &str,
) -> Result<Box<dyn PropertiesColumnAccessor + Send>> {
let column = batch
.column_by_name(name)
.ok_or_else(|| anyhow!("Column '{}' not found", name))?;
create_properties_accessor(column)
}