use std::sync::Arc;
use arrow::array::ArrayRef;
use crate::{
cache::{
CacheExpression, LiquidCompressorStates, VariantRequest, cached_batch::CacheEntry,
utils::EntryID,
},
liquid_array::{
LiquidArrayRef, LiquidSqueezedArray, LiquidSqueezedArrayRef, VariantStructSqueezedArray,
},
};
use super::squeeze::try_variant_squeeze;
#[derive(Debug, Clone)]
pub enum MaterializedEntry<'a> {
Arrow(&'a ArrayRef),
Liquid(&'a LiquidArrayRef),
}
#[derive(Debug, Clone)]
pub struct HydrationRequest<'a> {
pub entry_id: EntryID,
pub cached: &'a CacheEntry,
pub materialized: MaterializedEntry<'a>,
pub expression: Option<&'a CacheExpression>,
pub compressor: Arc<LiquidCompressorStates>,
}
pub trait HydrationPolicy: std::fmt::Debug + Send + Sync {
fn hydrate(&self, request: &HydrationRequest<'_>) -> Option<CacheEntry>;
}
#[derive(Debug, Default, Clone)]
pub struct AlwaysHydrate;
impl AlwaysHydrate {
pub fn new() -> Self {
Self
}
}
fn hydrate_variant_paths(
requests: &[VariantRequest],
materialized: &ArrayRef,
squeezed: &VariantStructSqueezedArray,
compressor: &LiquidCompressorStates,
) -> Option<CacheEntry> {
let missing_requests: Vec<VariantRequest> = requests
.iter()
.filter(|request| !squeezed.contains_path(request.path()))
.cloned()
.collect();
if missing_requests.is_empty() {
return None;
}
let (new_squeezed, _) = try_variant_squeeze(materialized, &missing_requests, compressor)?;
let new_variant = new_squeezed
.as_any()
.downcast_ref::<VariantStructSqueezedArray>()?;
let mut combined_values = squeezed.typed_values();
combined_values.extend(new_variant.typed_values());
let nulls = squeezed.nulls().or_else(|| new_variant.nulls());
let merged = VariantStructSqueezedArray::new(
combined_values,
nulls,
squeezed.original_arrow_data_type(),
);
Some(CacheEntry::memory_squeezed_liquid(
Arc::new(merged) as LiquidSqueezedArrayRef
))
}
impl HydrationPolicy for AlwaysHydrate {
fn hydrate(&self, request: &HydrationRequest<'_>) -> Option<CacheEntry> {
match (request.cached, &request.materialized) {
(CacheEntry::DiskArrow(_), MaterializedEntry::Arrow(arr)) => {
if let Some(CacheExpression::VariantGet { requests }) = request.expression
&& let Some((squeezed, _bytes)) =
try_variant_squeeze(arr, requests, request.compressor.as_ref())
{
return Some(CacheEntry::memory_squeezed_liquid(squeezed));
}
Some(CacheEntry::memory_arrow((*arr).clone()))
}
(CacheEntry::DiskLiquid(_), MaterializedEntry::Liquid(liq)) => {
Some(CacheEntry::memory_liquid((*liq).clone()))
}
(CacheEntry::MemoryLiquid(_), _) => None,
(CacheEntry::MemorySqueezedLiquid(squeezed_entry), MaterializedEntry::Arrow(arr)) => {
if let Some(CacheExpression::VariantGet { requests }) = request.expression
&& let Some(squeezed) = squeezed_entry
.as_any()
.downcast_ref::<VariantStructSqueezedArray>()
&& let Some(entry) =
hydrate_variant_paths(requests, arr, squeezed, request.compressor.as_ref())
{
return Some(entry);
}
Some(CacheEntry::memory_arrow((*arr).clone()))
}
(CacheEntry::MemorySqueezedLiquid(_), MaterializedEntry::Liquid(liq)) => {
Some(CacheEntry::memory_liquid((*liq).clone()))
}
_ => None,
}
}
}
#[derive(Debug, Default, Clone)]
pub struct NoHydration;
impl NoHydration {
pub fn new() -> Self {
Self
}
}
impl HydrationPolicy for NoHydration {
fn hydrate(&self, _request: &HydrationRequest<'_>) -> Option<CacheEntry> {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use parquet_variant_compute::json_to_variant;
use crate::cache::utils::LiquidCompressorStates;
use arrow::array::StringArray;
use arrow_schema::DataType;
fn variant_array_from_json(values: &[&str]) -> ArrayRef {
let strings: ArrayRef = Arc::new(StringArray::from_iter_values(values.iter().copied()));
let variant = json_to_variant(&strings).expect("variant array");
let struct_array = variant.into_inner();
Arc::new(struct_array) as ArrayRef
}
#[test]
fn hydrates_disk_arrow_variant_to_squeezed() {
let arr = variant_array_from_json(&[r#"{"name":"Ada","age":30}"#]);
let expr = CacheExpression::variant_get("age", DataType::Int64);
let policy = AlwaysHydrate::new();
let compressor = Arc::new(LiquidCompressorStates::new());
let cached_entry = CacheEntry::disk_arrow(arr.data_type().clone());
let hydrated = policy.hydrate(&HydrationRequest {
entry_id: EntryID::from(0),
cached: &cached_entry,
materialized: MaterializedEntry::Arrow(&arr),
expression: Some(&expr),
compressor,
});
let entry = hydrated.expect("should hydrate");
match entry {
CacheEntry::MemorySqueezedLiquid(squeezed) => {
let variant = squeezed
.as_any()
.downcast_ref::<VariantStructSqueezedArray>()
.expect("variant squeezed");
assert!(variant.contains_path("age"));
}
other => panic!("expected squeezed entry, got {other:?}"),
}
}
#[test]
fn hydrates_squeezed_variant_adds_missing_path() {
let arr = variant_array_from_json(&[r#"{"name":"Ada","age":30}"#]);
let name_expr = CacheExpression::variant_get("name", DataType::Utf8);
let age_expr = CacheExpression::variant_get("age", DataType::Int64);
let compressor = Arc::new(LiquidCompressorStates::new());
let (squeezed, _) = try_variant_squeeze(
&arr,
name_expr.variant_requests().unwrap(),
compressor.as_ref(),
)
.expect("squeeze name");
let cached_entry = CacheEntry::memory_squeezed_liquid(squeezed.clone());
let policy = AlwaysHydrate::new();
let hydrated = policy.hydrate(&HydrationRequest {
entry_id: EntryID::from(1),
cached: &cached_entry,
materialized: MaterializedEntry::Arrow(&arr),
expression: Some(&age_expr),
compressor,
});
let entry = hydrated.expect("should hydrate");
let squeezed = match entry {
CacheEntry::MemorySqueezedLiquid(sq) => sq,
other => panic!("expected squeezed entry, got {other:?}"),
};
let variant = squeezed
.as_any()
.downcast_ref::<VariantStructSqueezedArray>()
.expect("variant squeezed");
assert!(variant.contains_path("name"));
assert!(variant.contains_path("age"));
}
}