liquid-cache 0.1.12

10x lower latency for cloud-native DataFusion
Documentation
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, Date32Array, Int64Array, StringArray};
use arrow_schema::DataType;
use parquet_variant_compute::json_to_variant;

use crate::{
    cache::{
        AlwaysHydrate, CacheExpression, DefaultIoContext, EntryID, LiquidCacheBuilder,
        LiquidPolicy, TranscodeSqueezeEvict,
    },
    liquid_array::Date32Field,
};

fn create_date32_array() -> ArrayRef {
    let date32_array = Date32Array::from_iter_values(0..4096);
    Arc::new(date32_array)
}

#[tokio::test]
async fn read_squeezed_date_time() {
    let temp_dir = tempfile::tempdir().unwrap();
    let array = create_date32_array();
    let array_size = array.get_array_memory_size();

    let cache = LiquidCacheBuilder::new()
        .with_cache_policy(Box::new(LiquidPolicy::new()))
        .with_hydration_policy(Box::new(AlwaysHydrate::new()))
        .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
        .with_max_cache_bytes(array_size * 2)
        .with_io_context(Arc::new(DefaultIoContext::new(
            temp_dir.path().to_path_buf(),
        )))
        .build();

    let expression = Arc::new(CacheExpression::extract_date32(Date32Field::Year));

    for i in 0..4 {
        let entry_id = EntryID::from(i);
        cache
            .insert(entry_id, array.clone())
            .with_squeeze_hint(expression.clone())
            .await;
    }

    for i in 0..4 {
        let entry_id = EntryID::from(i);
        let array = cache
            .get(&entry_id)
            .with_expression_hint(expression.clone())
            .await
            .unwrap();
        assert_eq!(array.len(), array.len());
    }
    cache
        .get(&EntryID::from(1))
        .with_expression_hint(Arc::new(CacheExpression::extract_date32(
            Date32Field::Month,
        )))
        .await
        .unwrap();
    let trace = cache.consume_event_trace();
    insta::assert_snapshot!(trace);
}

fn create_variant_array() -> ArrayRef {
    let mut values = Vec::new();
    for i in 0..64 {
        if i % 2 == 0 {
            values.push(Some(r#"{"name":"Ada", "address": {"zipcode": 90001}}"#));
        } else {
            values.push(Some(
                r#"{"name":"Bob", "age": 29, "address": {"city": "New York"}}"#,
            ));
        }
    }
    let json_values: ArrayRef = Arc::new(StringArray::from(values));
    let variant = json_to_variant(&json_values).expect("variant from json");
    ArrayRef::from(variant)
}

#[tokio::test]
async fn read_squeezed_variant_path() {
    let temp_dir = tempfile::tempdir().unwrap();
    let variant_array = create_variant_array();
    let array_size = variant_array.get_array_memory_size();

    let cache = LiquidCacheBuilder::new()
        .with_cache_policy(Box::new(LiquidPolicy::new()))
        .with_hydration_policy(Box::new(AlwaysHydrate::new()))
        .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
        .with_max_cache_bytes(array_size * 3 / 2)
        .with_io_context(Arc::new(DefaultIoContext::new(
            temp_dir.path().to_path_buf(),
        )))
        .build();

    let name_expr = Arc::new(CacheExpression::variant_get("name", DataType::Utf8));
    let age_expr = Arc::new(CacheExpression::variant_get("age", DataType::Int64));
    let zipcode_expr = Arc::new(CacheExpression::variant_get(
        "address.zipcode",
        DataType::Int64,
    ));
    for i in 0..3 {
        let entry_id = EntryID::from(i);
        cache
            .insert(entry_id, variant_array.clone())
            .with_squeeze_hint(name_expr.clone())
            .await;
    }

    let squeezed = cache
        .get(&EntryID::from(0))
        .with_expression_hint(name_expr.clone())
        .read()
        .await
        .unwrap();
    assert_eq!(squeezed.len(), variant_array.len());

    cache
        .get(&EntryID::from(0))
        .with_expression_hint(age_expr.clone())
        .read()
        .await
        .unwrap();
    cache
        .get(&EntryID::from(1))
        .with_expression_hint(zipcode_expr.clone())
        .read()
        .await
        .unwrap();
    let trace = cache.consume_event_trace();
    insta::assert_snapshot!(trace);
}

fn create_int64_array() -> ArrayRef {
    let int64_array = Int64Array::from_iter_values(0..4096);
    Arc::new(int64_array)
}

#[tokio::test]
async fn read_squeezed_int64_array() {
    let temp_dir = tempfile::tempdir().unwrap();
    let int64_array = create_int64_array();
    let array_size = int64_array.get_array_memory_size();

    let cache = LiquidCacheBuilder::new()
        .with_cache_policy(Box::new(LiquidPolicy::new()))
        .with_hydration_policy(Box::new(AlwaysHydrate::new()))
        .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
        .with_max_cache_bytes(array_size * 2)
        .with_io_context(Arc::new(DefaultIoContext::new(
            temp_dir.path().to_path_buf(),
        )))
        .build();

    let expression = Arc::new(CacheExpression::PredicateColumn);

    for i in 0..4 {
        let entry_id = EntryID::from(i);
        if i % 2 == 0 {
            cache
                .insert(entry_id, int64_array.clone())
                .with_squeeze_hint(expression.clone())
                .await;
        } else {
            cache.insert(entry_id, int64_array.clone()).await;
        }
    }

    for i in 0..4 {
        let entry_id = EntryID::from(i);
        let array = cache
            .get(&entry_id)
            .with_expression_hint(expression.clone())
            .read()
            .await
            .unwrap();
        assert_eq!(array.len(), int64_array.len());
    }
    let trace = cache.consume_event_trace();
    insta::assert_snapshot!(trace);
}