micromegas-analytics 0.24.0

analytics module of micromegas
Documentation
use datafusion::arrow::array::{
    Array, AsArray, DictionaryArray, GenericListArray, Int32Array, ListBuilder, StringBuilder,
    StructArray, StructBuilder,
};
use datafusion::arrow::datatypes::{DataType, Field, Fields, Int32Type};
use datafusion::common::{Result, internal_err};
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{
    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
use micromegas_datafusion_extensions::properties::properties_udf::extract_properties_as_vec;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

#[derive(Debug, PartialEq, Eq, Hash)]
pub struct PropertiesToDict {
    signature: Signature,
}

impl PropertiesToDict {
    pub fn new() -> Self {
        Self::default()
    }
}

impl Default for PropertiesToDict {
    fn default() -> Self {
        Self {
            signature: Signature::exact(
                vec![DataType::List(Arc::new(Field::new(
                    "Property",
                    DataType::Struct(Fields::from(vec![
                        Field::new("key", DataType::Utf8, false),
                        Field::new("value", DataType::Utf8, false),
                    ])),
                    false,
                )))],
                Volatility::Immutable,
            ),
        }
    }
}

impl ScalarUDFImpl for PropertiesToDict {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn name(&self) -> &str {
        "properties_to_dict"
    }

    fn signature(&self) -> &Signature {
        &self.signature
    }

    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
        Ok(DataType::Dictionary(
            Box::new(DataType::Int32),
            Box::new(DataType::List(Arc::new(Field::new(
                "Property",
                DataType::Struct(Fields::from(vec![
                    Field::new("key", DataType::Utf8, false),
                    Field::new("value", DataType::Utf8, false),
                ])),
                false,
            )))),
        ))
    }

    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
        let args = args.args;
        if args.len() != 1 {
            return internal_err!("properties_to_dict expects exactly one argument");
        }

        match &args[0] {
            ColumnarValue::Array(array) => {
                let list_array = array
                    .as_any()
                    .downcast_ref::<GenericListArray<i32>>()
                    .ok_or_else(|| {
                        DataFusionError::Internal(
                            "properties_to_dict requires a list array as input".to_string(),
                        )
                    })?;

                let dict_array = build_dictionary_from_properties(list_array)?;
                Ok(ColumnarValue::Array(Arc::new(dict_array)))
            }
            ColumnarValue::Scalar(_) => {
                internal_err!("properties_to_dict does not support scalar inputs")
            }
        }
    }
}

struct PropertiesDictionaryBuilder {
    map: HashMap<Vec<(String, String)>, usize>,
    values_builder: ListBuilder<StructBuilder>,
    keys: Vec<Option<i32>>,
}

impl PropertiesDictionaryBuilder {
    fn new(capacity: usize) -> Self {
        let prop_struct_fields = vec![
            Field::new("key", DataType::Utf8, false),
            Field::new("value", DataType::Utf8, false),
        ];
        let prop_field = Arc::new(Field::new(
            "Property",
            DataType::Struct(Fields::from(prop_struct_fields.clone())),
            false,
        ));
        let values_builder =
            ListBuilder::new(StructBuilder::from_fields(prop_struct_fields, capacity))
                .with_field(prop_field);

        Self {
            map: HashMap::new(),
            values_builder,
            keys: Vec::with_capacity(capacity),
        }
    }

    fn append_property_list(&mut self, struct_array: &StructArray) -> Result<()> {
        let prop_vec = extract_properties_as_vec(struct_array)?;

        match self.map.get(&prop_vec) {
            Some(&index) => {
                self.keys.push(Some(index as i32));
            }
            None => {
                let new_index = self.map.len();
                self.add_to_values(&prop_vec)?;
                self.map.insert(prop_vec, new_index);
                self.keys.push(Some(new_index as i32));
            }
        }
        Ok(())
    }

    fn append_null(&mut self) {
        self.keys.push(None);
    }

    fn add_to_values(&mut self, properties: &[(String, String)]) -> Result<()> {
        let struct_builder = self.values_builder.values();
        for (key, value) in properties {
            struct_builder
                .field_builder::<StringBuilder>(0)
                .ok_or_else(|| DataFusionError::Internal("Failed to get key builder".to_string()))?
                .append_value(key);
            struct_builder
                .field_builder::<StringBuilder>(1)
                .ok_or_else(|| {
                    DataFusionError::Internal("Failed to get value builder".to_string())
                })?
                .append_value(value);
            struct_builder.append(true);
        }
        self.values_builder.append(true);
        Ok(())
    }

    fn finish(mut self) -> Result<DictionaryArray<Int32Type>> {
        let keys = Int32Array::from(self.keys);
        let values = Arc::new(self.values_builder.finish());
        DictionaryArray::try_new(keys, values)
            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
    }
}

pub fn build_dictionary_from_properties(
    list_array: &GenericListArray<i32>,
) -> Result<DictionaryArray<Int32Type>> {
    let mut builder = PropertiesDictionaryBuilder::new(list_array.len());
    for i in 0..list_array.len() {
        if list_array.is_null(i) {
            builder.append_null();
        } else {
            let start = list_array.value_offsets()[i] as usize;
            let end = list_array.value_offsets()[i + 1] as usize;
            let sliced_values = list_array.values().slice(start, end - start);
            let struct_array = sliced_values.as_struct();
            builder.append_property_list(struct_array)?;
        }
    }

    builder.finish()
}