tonbo 0.3.2

An embedded persistent KV database in Rust.
Documentation
use std::{any::Any, marker::PhantomData, mem, sync::Arc};

use arrow::{
    array::{Array, ArrayRef, ArrowPrimitiveType, AsArray},
    datatypes::{
        Int16Type, Int32Type, Int64Type, Int8Type, Schema as ArrowSchema, UInt16Type, UInt32Type,
        UInt64Type, UInt8Type,
    },
};
use fusio::Write;
use fusio_log::Encode;

use super::{DataType, DynRecord, Value};
use crate::{
    magic::USER_COLUMN_OFFSET,
    record::{option::OptionRecordRef, Key, Record, RecordEncodeError, RecordRef, Schema},
};

#[derive(Clone)]
pub struct DynRecordRef<'r> {
    pub columns: Vec<Value>,
    // XXX: log encode should keep the same behavior
    pub primary_index: usize,
    _marker: PhantomData<&'r ()>,
}

impl<'r> DynRecordRef<'r> {
    pub(crate) fn new(columns: Vec<Value>, primary_index: usize) -> Self {
        Self {
            columns,
            primary_index,
            _marker: PhantomData,
        }
    }
}

impl<'r> Encode for DynRecordRef<'r> {
    type Error = RecordEncodeError;

    async fn encode<W>(&self, writer: &mut W) -> Result<(), Self::Error>
    where
        W: Write,
    {
        (self.columns.len() as u32).encode(writer).await?;
        (self.primary_index as u32).encode(writer).await?;
        for col in self.columns.iter() {
            col.encode(writer).await.map_err(RecordEncodeError::Fusio)?;
        }
        Ok(())
    }

    fn size(&self) -> usize {
        let mut size = 2 * mem::size_of::<u32>();
        for col in self.columns.iter() {
            size += col.size();
        }
        size
    }
}

impl<'r> RecordRef<'r> for DynRecordRef<'r> {
    type Record = DynRecord;

    fn key(self) -> <<<Self::Record as Record>::Schema as Schema>::Key as Key>::Ref<'r> {
        self.columns
            .get(self.primary_index)
            .cloned()
            .expect("The primary key must exist")
    }

    fn from_record_batch(
        record_batch: &'r arrow::array::RecordBatch,
        offset: usize,
        projection_mask: &'r parquet::arrow::ProjectionMask,
        full_schema: &'r Arc<ArrowSchema>,
    ) -> OptionRecordRef<'r, Self> {
        let null = record_batch.column(0).as_boolean().value(offset);
        let metadata = full_schema.metadata();

        let primary_index = metadata
            .get("primary_key_index")
            .unwrap()
            .parse::<usize>()
            .unwrap();
        let ts = record_batch
            .column(1)
            .as_primitive::<arrow::datatypes::UInt32Type>()
            .value(offset)
            .into();

        let mut columns = vec![];

        for (idx, field) in full_schema.flattened_fields().iter().enumerate().skip(2) {
            let datatype = DataType::from(field.data_type());
            let schema = record_batch.schema();
            let flattened_fields = schema.flattened_fields();
            let batch_field = flattened_fields
                .iter()
                .enumerate()
                .find(|(_idx, f)| field.contains(f));
            if batch_field.is_none() {
                columns.push(Value::with_none_value(
                    datatype,
                    field.name().to_owned(),
                    field.is_nullable(),
                ));
                continue;
            }
            let col = record_batch.column(batch_field.unwrap().0);
            let is_nullable = field.is_nullable();
            let value = match datatype {
                DataType::UInt8 => Self::primitive_value::<UInt8Type>(
                    col,
                    offset,
                    idx,
                    projection_mask,
                    primary_index == idx - 2,
                ),
                DataType::UInt16 => Self::primitive_value::<UInt16Type>(
                    col,
                    offset,
                    idx,
                    projection_mask,
                    primary_index == idx - 2,
                ),
                DataType::UInt32 => Self::primitive_value::<UInt32Type>(
                    col,
                    offset,
                    idx,
                    projection_mask,
                    primary_index == idx - 2,
                ),
                DataType::UInt64 => Self::primitive_value::<UInt64Type>(
                    col,
                    offset,
                    idx,
                    projection_mask,
                    primary_index == idx - 2,
                ),
                DataType::Int8 => Self::primitive_value::<Int8Type>(
                    col,
                    offset,
                    idx,
                    projection_mask,
                    primary_index == idx - 2,
                ),
                DataType::Int16 => Self::primitive_value::<Int16Type>(
                    col,
                    offset,
                    idx,
                    projection_mask,
                    primary_index == idx - 2,
                ),
                DataType::Int32 => Self::primitive_value::<Int32Type>(
                    col,
                    offset,
                    idx,
                    projection_mask,
                    primary_index == idx - 2,
                ),
                DataType::Int64 => Self::primitive_value::<Int64Type>(
                    col,
                    offset,
                    idx,
                    projection_mask,
                    primary_index == idx - 2,
                ),
                DataType::String => {
                    let v = col.as_string::<i32>();

                    if primary_index == idx - 2 {
                        Arc::new(v.value(offset).to_owned()) as Arc<dyn Any + Send + Sync>
                    } else {
                        let value = (!v.is_null(offset) && projection_mask.leaf_included(idx))
                            .then_some(v.value(offset).to_owned());
                        Arc::new(value) as Arc<dyn Any + Send + Sync>
                    }
                }
                DataType::Boolean => {
                    let v = col.as_boolean();

                    if primary_index == idx - 2 {
                        Arc::new(v.value(offset).to_owned()) as Arc<dyn Any + Send + Sync>
                    } else {
                        let value = (!v.is_null(offset) && projection_mask.leaf_included(idx))
                            .then_some(v.value(offset).to_owned());
                        Arc::new(value) as Arc<dyn Any + Send + Sync>
                    }
                }
                DataType::Bytes => {
                    let v = col.as_binary::<i32>();
                    if primary_index == idx - 2 {
                        Arc::new(v.value(offset).to_owned()) as Arc<dyn Any + Send + Sync>
                    } else {
                        let value = (!v.is_null(offset) && projection_mask.leaf_included(idx))
                            .then_some(v.value(offset).to_owned());
                        Arc::new(value) as Arc<dyn Any + Send + Sync>
                    }
                }
            };
            columns.push(Value::new(
                datatype,
                field.name().to_owned(),
                value,
                is_nullable,
            ));
        }

        let record = DynRecordRef {
            columns,
            primary_index,
            _marker: PhantomData,
        };
        OptionRecordRef::new(ts, record, null)
    }

    fn projection(&mut self, projection_mask: &parquet::arrow::ProjectionMask) {
        for (idx, col) in self.columns.iter_mut().enumerate() {
            if idx != self.primary_index && !projection_mask.leaf_included(idx + USER_COLUMN_OFFSET)
            {
                match col.datatype() {
                    DataType::UInt8 => col.value = Arc::<Option<u8>>::new(None),
                    DataType::UInt16 => col.value = Arc::<Option<u16>>::new(None),
                    DataType::UInt32 => col.value = Arc::<Option<u32>>::new(None),
                    DataType::UInt64 => col.value = Arc::<Option<u64>>::new(None),
                    DataType::Int8 => col.value = Arc::<Option<i8>>::new(None),
                    DataType::Int16 => col.value = Arc::<Option<i16>>::new(None),
                    DataType::Int32 => col.value = Arc::<Option<i32>>::new(None),
                    DataType::Int64 => col.value = Arc::<Option<i64>>::new(None),
                    DataType::String => col.value = Arc::<Option<String>>::new(None),
                    DataType::Boolean => col.value = Arc::<Option<bool>>::new(None),
                    DataType::Bytes => col.value = Arc::<Option<Vec<u8>>>::new(None),
                };
            }
        }
    }
}

impl<'r> DynRecordRef<'r> {
    fn primitive_value<T>(
        col: &ArrayRef,
        offset: usize,
        idx: usize,
        projection_mask: &'r parquet::arrow::ProjectionMask,
        primary: bool,
    ) -> Arc<dyn Any + Send + Sync>
    where
        T: ArrowPrimitiveType,
    {
        let v = col.as_primitive::<T>();

        if primary {
            Arc::new(v.value(offset)) as Arc<dyn Any + Send + Sync>
        } else {
            let value = (!v.is_null(offset) && projection_mask.leaf_included(idx))
                .then_some(v.value(offset));
            Arc::new(value) as Arc<dyn Any + Send + Sync>
        }
    }
}