use std::sync::Arc;
use arrow::array::{Array, ArrayRef, BinaryViewArray, StructArray};
use arrow::buffer::NullBuffer;
use arrow_schema::{DataType, Field, Fields};
use crate::liquid_array::{
LiquidArrayRef, LiquidDataType, LiquidSqueezedArray, NeedsBacking, SqueezedBacking,
};
use ahash::AHashMap;
#[derive(Debug)]
pub struct VariantStructSqueezedArray {
values: AHashMap<Arc<str>, LiquidArrayRef>,
len: usize,
nulls: Option<NullBuffer>,
original_arrow_type: DataType,
}
impl VariantStructSqueezedArray {
pub fn new(
values: Vec<(Arc<str>, LiquidArrayRef)>,
nulls: Option<NullBuffer>,
original_arrow_type: DataType,
) -> Self {
let len = values.first().map(|(_, array)| array.len()).unwrap_or(0);
let mut map = AHashMap::with_capacity(values.len());
for (path, array) in values {
debug_assert_eq!(array.len(), len, "variant paths must share length");
map.insert(path, array);
}
Self {
values: map,
len,
nulls,
original_arrow_type,
}
}
fn build_root_struct(&self) -> StructArray {
let metadata = Arc::new(BinaryViewArray::from(vec![b"" as &[u8]; self.len])) as ArrayRef;
let value_placeholder = Arc::new(BinaryViewArray::new_null(self.len)) as ArrayRef;
let typed_struct = self.build_typed_struct();
let metadata_field = Arc::new(Field::new("metadata", DataType::BinaryView, false));
let value_field = Arc::new(Field::new("value", DataType::BinaryView, true));
let typed_field = Arc::new(Field::new(
"typed_value",
typed_struct.data_type().clone(),
true,
));
StructArray::new(
Fields::from(vec![metadata_field, value_field, typed_field]),
vec![metadata, value_placeholder, typed_struct as ArrayRef],
self.nulls.clone(),
)
}
fn build_typed_struct(&self) -> Arc<StructArray> {
let mut root = VariantTreeNode::new(self.len);
for (path, array) in &self.values {
let segments: Vec<&str> = path
.split('.')
.filter(|segment| !segment.is_empty())
.collect();
if segments.is_empty() {
continue;
}
root.insert(&segments, array.to_arrow_array());
}
root.into_struct_array()
}
pub fn contains_path(&self, path: &str) -> bool {
self.values.contains_key(path)
}
pub fn to_arrow_array_with_paths<'a>(
&self,
paths: impl IntoIterator<Item = &'a str>,
) -> Result<ArrayRef, NeedsBacking> {
let mut filtered: Vec<(Arc<str>, LiquidArrayRef)> = Vec::new();
for path in paths.into_iter() {
if let Some(array) = self.values.get(path) {
filtered.push((Arc::from(path.to_string()), array.clone()));
}
}
if filtered.is_empty() {
return Ok(Arc::new(self.build_root_struct()) as ArrayRef);
}
let filtered = VariantStructSqueezedArray::new(
filtered,
self.nulls.clone(),
self.original_arrow_type.clone(),
);
Ok(Arc::new(filtered.build_root_struct()) as ArrayRef)
}
pub fn typed_values(&self) -> Vec<(Arc<str>, LiquidArrayRef)> {
self.values
.iter()
.map(|(path, array)| (path.clone(), array.clone()))
.collect()
}
pub fn nulls(&self) -> Option<NullBuffer> {
self.nulls.clone()
}
}
#[async_trait::async_trait]
impl LiquidSqueezedArray for VariantStructSqueezedArray {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn get_array_memory_size(&self) -> usize {
self.values
.values()
.map(|array| array.get_array_memory_size())
.sum()
}
fn len(&self) -> usize {
self.len
}
async fn to_arrow_array(&self) -> ArrayRef {
Arc::new(self.build_root_struct()) as ArrayRef
}
fn data_type(&self) -> LiquidDataType {
LiquidDataType::ByteArray
}
fn original_arrow_data_type(&self) -> DataType {
self.original_arrow_type.clone()
}
fn disk_backing(&self) -> SqueezedBacking {
SqueezedBacking::Arrow
}
}
#[derive(Default)]
struct VariantTreeNode {
len: usize,
leaf: Option<ArrayRef>,
children: AHashMap<String, VariantTreeNode>,
}
impl VariantTreeNode {
fn new(len: usize) -> Self {
Self {
len,
leaf: None,
children: AHashMap::new(),
}
}
fn insert(&mut self, segments: &[&str], values: ArrayRef) {
if segments.is_empty() {
self.leaf = Some(values);
return;
}
let (head, tail) = segments.split_first().unwrap();
self.children
.entry(head.to_string())
.or_insert_with(|| VariantTreeNode::new(self.len))
.insert(tail, values);
}
fn into_struct_array(self) -> Arc<StructArray> {
let mut fields = Vec::with_capacity(self.children.len());
let mut arrays = Vec::with_capacity(self.children.len());
let mut entries: Vec<_> = self.children.into_iter().collect();
entries.sort_by(|a, b| a.0.cmp(&b.0));
for (name, child) in entries {
let field_array = child.into_field_array();
fields.push(Arc::new(Field::new(
name.as_str(),
field_array.data_type().clone(),
false,
)));
arrays.push(field_array);
}
Arc::new(StructArray::new(Fields::from(fields), arrays, None))
}
fn into_field_array(self) -> ArrayRef {
let len = self.len;
if self.children.is_empty() {
let values = self.leaf.expect("variant leaf value present");
wrap_typed_value(len, values)
} else {
let typed_struct = self.into_struct_array() as ArrayRef;
wrap_typed_value(len, typed_struct)
}
}
}
fn wrap_typed_value(len: usize, values: ArrayRef) -> ArrayRef {
let placeholder = Arc::new(BinaryViewArray::new_null(len)) as ArrayRef;
Arc::new(StructArray::new(
Fields::from(vec![
Arc::new(Field::new("value", DataType::BinaryView, true)),
Arc::new(Field::new("typed_value", values.data_type().clone(), true)),
]),
vec![placeholder, values],
None,
)) as ArrayRef
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{Int64Array, StringArray};
use arrow_schema::DataType;
use crate::liquid_array::{LiquidByteArray, LiquidPrimitiveArray};
#[test]
fn to_arrow_array_with_paths_prunes_extra_fields() {
let did_arrow = StringArray::from(vec![Some("d")]);
let (_comp, did_liquid) = LiquidByteArray::train_from_arrow(&did_arrow);
let did_liquid: LiquidArrayRef = Arc::new(did_liquid);
let time_arrow = Int64Array::from(vec![1_i64]);
let time_liquid =
LiquidPrimitiveArray::<arrow::datatypes::Int64Type>::from_arrow_array(time_arrow);
let time_liquid: LiquidArrayRef = Arc::new(time_liquid);
let squeezed = VariantStructSqueezedArray::new(
vec![
(Arc::from("did"), did_liquid),
(Arc::from("time_us"), time_liquid),
],
None,
DataType::Struct(Fields::from(Vec::<Arc<Field>>::new())),
);
let array = squeezed
.to_arrow_array_with_paths(["time_us"])
.expect("arrow array");
let root = array
.as_any()
.downcast_ref::<StructArray>()
.expect("struct root");
let typed_value = root
.column_by_name("typed_value")
.unwrap()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
let field_names: Vec<_> = typed_value
.fields()
.iter()
.map(|f| f.name().clone())
.collect();
assert_eq!(field_names, vec!["time_us"]);
}
}