Skip to main content

liquid_cache/cache/policies/
hydration.rs

1//! Hydration policies decide whether and how to promote squeezed/on-disk entries back into memory.
2
3use std::sync::Arc;
4
5use arrow::array::ArrayRef;
6
7use crate::{
8    cache::{
9        CacheExpression, LiquidCompressorStates, VariantRequest, cached_batch::CacheEntry,
10        utils::EntryID,
11    },
12    liquid_array::{
13        LiquidArrayRef, LiquidSqueezedArray, LiquidSqueezedArrayRef, VariantStructSqueezedArray,
14    },
15};
16
17use super::squeeze::try_variant_squeeze;
18
19/// The materialized representation produced by a cache read.
20#[derive(Debug, Clone)]
21pub enum MaterializedEntry<'a> {
22    /// Arrow array in memory.
23    Arrow(&'a ArrayRef),
24    /// Liquid array in memory.
25    Liquid(&'a LiquidArrayRef),
26}
27
28/// Request context provided to a [`HydrationPolicy`].
29#[derive(Debug, Clone)]
30pub struct HydrationRequest<'a> {
31    /// Cache key being materialized.
32    pub entry_id: EntryID,
33    /// The cached entry before materialization (e.g., `DiskArrow`).
34    pub cached: &'a CacheEntry,
35    /// The fully materialized entry produced by the read path.
36    pub materialized: MaterializedEntry<'a>,
37    /// Optional expression hint associated with the read.
38    pub expression: Option<&'a CacheExpression>,
39    /// Compressor state used for hydrating into squeezed representations.
40    pub compressor: Arc<LiquidCompressorStates>,
41}
42
43/// Decide if a materialized entry should be promoted back into memory.
44pub trait HydrationPolicy: std::fmt::Debug + Send + Sync {
45    /// Determine how to hydrate a cache entry that was just materialized.
46    /// Return a new cache entry to insert if hydration is desired.
47    fn hydrate(&self, request: &HydrationRequest<'_>) -> Option<CacheEntry>;
48}
49
50/// Default hydration policy: always keep a materialized cache miss in memory
51/// by promoting along the path: disk -> squeezed -> liquid -> arrow.
52#[derive(Debug, Default, Clone)]
53pub struct AlwaysHydrate;
54
55impl AlwaysHydrate {
56    /// Create a new [`AlwaysHydrate`] policy.
57    pub fn new() -> Self {
58        Self
59    }
60}
61
62fn hydrate_variant_paths(
63    requests: &[VariantRequest],
64    materialized: &ArrayRef,
65    squeezed: &VariantStructSqueezedArray,
66    compressor: &LiquidCompressorStates,
67) -> Option<CacheEntry> {
68    let missing_requests: Vec<VariantRequest> = requests
69        .iter()
70        .filter(|request| !squeezed.contains_path(request.path()))
71        .cloned()
72        .collect();
73    if missing_requests.is_empty() {
74        return None;
75    }
76
77    let (new_squeezed, _) = try_variant_squeeze(materialized, &missing_requests, compressor)?;
78    let new_variant = new_squeezed
79        .as_any()
80        .downcast_ref::<VariantStructSqueezedArray>()?;
81
82    let mut combined_values = squeezed.typed_values();
83    combined_values.extend(new_variant.typed_values());
84
85    let nulls = squeezed.nulls().or_else(|| new_variant.nulls());
86    let merged = VariantStructSqueezedArray::new(
87        combined_values,
88        nulls,
89        squeezed.original_arrow_data_type(),
90    );
91    Some(CacheEntry::memory_squeezed_liquid(
92        Arc::new(merged) as LiquidSqueezedArrayRef
93    ))
94}
95
96impl HydrationPolicy for AlwaysHydrate {
97    fn hydrate(&self, request: &HydrationRequest<'_>) -> Option<CacheEntry> {
98        match (request.cached, &request.materialized) {
99            (CacheEntry::DiskArrow(_), MaterializedEntry::Arrow(arr)) => {
100                if let Some(CacheExpression::VariantGet { requests }) = request.expression
101                    && let Some((squeezed, _bytes)) =
102                        try_variant_squeeze(arr, requests, request.compressor.as_ref())
103                {
104                    return Some(CacheEntry::memory_squeezed_liquid(squeezed));
105                }
106                Some(CacheEntry::memory_arrow((*arr).clone()))
107            }
108            (CacheEntry::DiskLiquid(_), MaterializedEntry::Liquid(liq)) => {
109                Some(CacheEntry::memory_liquid((*liq).clone()))
110            }
111            (CacheEntry::MemoryLiquid(_), _) => None,
112            // When already squeezed/hybrid or liquid in memory, prefer promoting to Arrow if available.
113            (CacheEntry::MemorySqueezedLiquid(squeezed_entry), MaterializedEntry::Arrow(arr)) => {
114                if let Some(CacheExpression::VariantGet { requests }) = request.expression
115                    && let Some(squeezed) = squeezed_entry
116                        .as_any()
117                        .downcast_ref::<VariantStructSqueezedArray>()
118                    && let Some(entry) =
119                        hydrate_variant_paths(requests, arr, squeezed, request.compressor.as_ref())
120                {
121                    return Some(entry);
122                }
123                Some(CacheEntry::memory_arrow((*arr).clone()))
124            }
125            (CacheEntry::MemorySqueezedLiquid(_), MaterializedEntry::Liquid(liq)) => {
126                Some(CacheEntry::memory_liquid((*liq).clone()))
127            }
128            _ => None,
129        }
130    }
131}
132
133/// No hydration policy: never promote a materialized entry back into memory.
134#[derive(Debug, Default, Clone)]
135pub struct NoHydration;
136
137impl NoHydration {
138    /// Create a new [`NoHydration`] policy.
139    pub fn new() -> Self {
140        Self
141    }
142}
143
144impl HydrationPolicy for NoHydration {
145    fn hydrate(&self, _request: &HydrationRequest<'_>) -> Option<CacheEntry> {
146        None
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use parquet_variant_compute::json_to_variant;
154
155    use crate::cache::utils::LiquidCompressorStates;
156    use arrow::array::StringArray;
157    use arrow_schema::DataType;
158
159    fn variant_array_from_json(values: &[&str]) -> ArrayRef {
160        let strings: ArrayRef = Arc::new(StringArray::from_iter_values(values.iter().copied()));
161        let variant = json_to_variant(&strings).expect("variant array");
162        let struct_array = variant.into_inner();
163        Arc::new(struct_array) as ArrayRef
164    }
165
166    #[test]
167    fn hydrates_disk_arrow_variant_to_squeezed() {
168        let arr = variant_array_from_json(&[r#"{"name":"Ada","age":30}"#]);
169        let expr = CacheExpression::variant_get("age", DataType::Int64);
170        let policy = AlwaysHydrate::new();
171        let compressor = Arc::new(LiquidCompressorStates::new());
172        let cached_entry = CacheEntry::disk_arrow(arr.data_type().clone());
173
174        let hydrated = policy.hydrate(&HydrationRequest {
175            entry_id: EntryID::from(0),
176            cached: &cached_entry,
177            materialized: MaterializedEntry::Arrow(&arr),
178            expression: Some(&expr),
179            compressor,
180        });
181
182        let entry = hydrated.expect("should hydrate");
183        match entry {
184            CacheEntry::MemorySqueezedLiquid(squeezed) => {
185                let variant = squeezed
186                    .as_any()
187                    .downcast_ref::<VariantStructSqueezedArray>()
188                    .expect("variant squeezed");
189                assert!(variant.contains_path("age"));
190            }
191            other => panic!("expected squeezed entry, got {other:?}"),
192        }
193    }
194
195    #[test]
196    fn hydrates_squeezed_variant_adds_missing_path() {
197        let arr = variant_array_from_json(&[r#"{"name":"Ada","age":30}"#]);
198        let name_expr = CacheExpression::variant_get("name", DataType::Utf8);
199        let age_expr = CacheExpression::variant_get("age", DataType::Int64);
200        let compressor = Arc::new(LiquidCompressorStates::new());
201
202        // Build an initial squeezed array containing only the "name" path.
203        let (squeezed, _) = try_variant_squeeze(
204            &arr,
205            name_expr.variant_requests().unwrap(),
206            compressor.as_ref(),
207        )
208        .expect("squeeze name");
209        let cached_entry = CacheEntry::memory_squeezed_liquid(squeezed.clone());
210
211        let policy = AlwaysHydrate::new();
212        let hydrated = policy.hydrate(&HydrationRequest {
213            entry_id: EntryID::from(1),
214            cached: &cached_entry,
215            materialized: MaterializedEntry::Arrow(&arr),
216            expression: Some(&age_expr),
217            compressor,
218        });
219
220        let entry = hydrated.expect("should hydrate");
221        let squeezed = match entry {
222            CacheEntry::MemorySqueezedLiquid(sq) => sq,
223            other => panic!("expected squeezed entry, got {other:?}"),
224        };
225        let variant = squeezed
226            .as_any()
227            .downcast_ref::<VariantStructSqueezedArray>()
228            .expect("variant squeezed");
229        assert!(variant.contains_path("name"));
230        assert!(variant.contains_path("age"));
231    }
232}