Skip to main content

liquid_cache/cache/policies/
hydration.rs

1//! Hydration policies decide whether and how to promote squeezed/on-disk entries back into memory.
2
3use std::sync::Arc;
4
5use arrow::array::ArrayRef;
6
7use crate::{
8    cache::{
9        CacheExpression, LiquidCompressorStates, VariantRequest, cached_batch::CacheEntry,
10        utils::EntryID,
11    },
12    liquid_array::{
13        LiquidArrayRef, LiquidSqueezedArray, LiquidSqueezedArrayRef, VariantStructSqueezedArray,
14    },
15};
16
17use super::squeeze::try_variant_squeeze;
18
19/// The materialized representation produced by a cache read.
20#[derive(Debug, Clone)]
21pub enum MaterializedEntry<'a> {
22    /// Arrow array in memory.
23    Arrow(&'a ArrayRef),
24    /// Liquid array in memory.
25    Liquid(&'a LiquidArrayRef),
26}
27
28/// Request context provided to a [`HydrationPolicy`].
29#[derive(Debug, Clone)]
30pub struct HydrationRequest<'a> {
31    /// Cache key being materialized.
32    pub entry_id: EntryID,
33    /// The cached entry before materialization (e.g., `DiskArrow`).
34    pub cached: &'a CacheEntry,
35    /// The fully materialized entry produced by the read path.
36    pub materialized: MaterializedEntry<'a>,
37    /// Optional expression hint associated with the read.
38    pub expression: Option<&'a CacheExpression>,
39    /// Compressor state used for hydrating into squeezed representations.
40    pub compressor: Arc<LiquidCompressorStates>,
41}
42
43/// Decide if a materialized entry should be promoted back into memory.
44pub trait HydrationPolicy: std::fmt::Debug + Send + Sync {
45    /// Determine how to hydrate a cache entry that was just materialized.
46    /// Return a new cache entry to insert if hydration is desired.
47    fn hydrate(&self, request: &HydrationRequest<'_>) -> Option<CacheEntry>;
48}
49
50/// Default hydration policy: always keep a materialized cache miss in memory
51/// by promoting along the path: disk -> squeezed -> liquid -> arrow.
52#[derive(Debug, Default, Clone)]
53pub struct AlwaysHydrate;
54
55impl AlwaysHydrate {
56    /// Create a new [`AlwaysHydrate`] policy.
57    pub fn new() -> Self {
58        Self
59    }
60}
61
62fn hydrate_variant_paths(
63    requests: &[VariantRequest],
64    materialized: &ArrayRef,
65    squeezed: &VariantStructSqueezedArray,
66    compressor: &LiquidCompressorStates,
67) -> Option<CacheEntry> {
68    let missing_requests: Vec<VariantRequest> = requests
69        .iter()
70        .filter(|request| !squeezed.contains_path(request.path()))
71        .cloned()
72        .collect();
73    if missing_requests.is_empty() {
74        return None;
75    }
76
77    let (new_squeezed, _) = try_variant_squeeze(materialized, &missing_requests, compressor)?;
78    let new_variant = new_squeezed
79        .as_any()
80        .downcast_ref::<VariantStructSqueezedArray>()?;
81
82    let mut combined_values = squeezed.typed_values();
83    combined_values.extend(new_variant.typed_values());
84
85    let nulls = squeezed.nulls().or_else(|| new_variant.nulls());
86    let merged = VariantStructSqueezedArray::new(
87        combined_values,
88        nulls,
89        squeezed.original_arrow_data_type(),
90        squeezed.disk_backing().disk_bytes(),
91    );
92    Some(CacheEntry::memory_squeezed_liquid(
93        Arc::new(merged) as LiquidSqueezedArrayRef
94    ))
95}
96
97impl HydrationPolicy for AlwaysHydrate {
98    fn hydrate(&self, request: &HydrationRequest<'_>) -> Option<CacheEntry> {
99        match (request.cached, &request.materialized) {
100            (CacheEntry::DiskArrow { disk_bytes, .. }, MaterializedEntry::Arrow(arr)) => {
101                if let Some(CacheExpression::VariantGet { requests }) = request.expression
102                    && let Some((squeezed, _bytes)) =
103                        try_variant_squeeze(arr, requests, request.compressor.as_ref())
104                {
105                    let variant = squeezed
106                        .as_any()
107                        .downcast_ref::<VariantStructSqueezedArray>()?;
108                    let squeezed = VariantStructSqueezedArray::new(
109                        variant.typed_values(),
110                        variant.nulls(),
111                        variant.original_arrow_data_type(),
112                        *disk_bytes,
113                    );
114                    return Some(CacheEntry::memory_squeezed_liquid(
115                        Arc::new(squeezed) as LiquidSqueezedArrayRef
116                    ));
117                }
118                Some(CacheEntry::memory_arrow((*arr).clone()))
119            }
120            (CacheEntry::DiskLiquid { .. }, MaterializedEntry::Liquid(liq)) => {
121                Some(CacheEntry::memory_liquid((*liq).clone()))
122            }
123            (CacheEntry::MemoryLiquid(_), _) => None,
124            // When already squeezed/hybrid or liquid in memory, prefer promoting to Arrow if available.
125            (CacheEntry::MemorySqueezedLiquid(squeezed_entry), MaterializedEntry::Arrow(arr)) => {
126                if let Some(CacheExpression::VariantGet { requests }) = request.expression
127                    && let Some(squeezed) = squeezed_entry
128                        .as_any()
129                        .downcast_ref::<VariantStructSqueezedArray>()
130                    && let Some(entry) =
131                        hydrate_variant_paths(requests, arr, squeezed, request.compressor.as_ref())
132                {
133                    return Some(entry);
134                }
135                Some(CacheEntry::memory_arrow((*arr).clone()))
136            }
137            (CacheEntry::MemorySqueezedLiquid(_), MaterializedEntry::Liquid(liq)) => {
138                Some(CacheEntry::memory_liquid((*liq).clone()))
139            }
140            _ => None,
141        }
142    }
143}
144
145/// No hydration policy: never promote a materialized entry back into memory.
146#[derive(Debug, Default, Clone)]
147pub struct NoHydration;
148
149impl NoHydration {
150    /// Create a new [`NoHydration`] policy.
151    pub fn new() -> Self {
152        Self
153    }
154}
155
156impl HydrationPolicy for NoHydration {
157    fn hydrate(&self, _request: &HydrationRequest<'_>) -> Option<CacheEntry> {
158        None
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use parquet_variant_compute::json_to_variant;
166
167    use crate::cache::utils::LiquidCompressorStates;
168    use arrow::array::StringArray;
169    use arrow_schema::DataType;
170
171    fn variant_array_from_json(values: &[&str]) -> ArrayRef {
172        let strings: ArrayRef = Arc::new(StringArray::from_iter_values(values.iter().copied()));
173        let variant = json_to_variant(&strings).expect("variant array");
174        let struct_array = variant.into_inner();
175        Arc::new(struct_array) as ArrayRef
176    }
177
178    #[test]
179    fn hydrates_disk_arrow_variant_to_squeezed() {
180        let arr = variant_array_from_json(&[r#"{"name":"Ada","age":30}"#]);
181        let expr = CacheExpression::variant_get("age", DataType::Int64);
182        let policy = AlwaysHydrate::new();
183        let compressor = Arc::new(LiquidCompressorStates::new());
184        let cached_entry = CacheEntry::disk_arrow(arr.data_type().clone(), 1);
185
186        let hydrated = policy.hydrate(&HydrationRequest {
187            entry_id: EntryID::from(0),
188            cached: &cached_entry,
189            materialized: MaterializedEntry::Arrow(&arr),
190            expression: Some(&expr),
191            compressor,
192        });
193
194        let entry = hydrated.expect("should hydrate");
195        match entry {
196            CacheEntry::MemorySqueezedLiquid(squeezed) => {
197                let variant = squeezed
198                    .as_any()
199                    .downcast_ref::<VariantStructSqueezedArray>()
200                    .expect("variant squeezed");
201                assert!(variant.contains_path("age"));
202            }
203            other => panic!("expected squeezed entry, got {other:?}"),
204        }
205    }
206
207    #[test]
208    fn hydrates_squeezed_variant_adds_missing_path() {
209        let arr = variant_array_from_json(&[r#"{"name":"Ada","age":30}"#]);
210        let name_expr = CacheExpression::variant_get("name", DataType::Utf8);
211        let age_expr = CacheExpression::variant_get("age", DataType::Int64);
212        let compressor = Arc::new(LiquidCompressorStates::new());
213
214        // Build an initial squeezed array containing only the "name" path.
215        let (squeezed, _) = try_variant_squeeze(
216            &arr,
217            name_expr.variant_requests().unwrap(),
218            compressor.as_ref(),
219        )
220        .expect("squeeze name");
221        let cached_entry = CacheEntry::memory_squeezed_liquid(squeezed.clone());
222
223        let policy = AlwaysHydrate::new();
224        let hydrated = policy.hydrate(&HydrationRequest {
225            entry_id: EntryID::from(1),
226            cached: &cached_entry,
227            materialized: MaterializedEntry::Arrow(&arr),
228            expression: Some(&age_expr),
229            compressor,
230        });
231
232        let entry = hydrated.expect("should hydrate");
233        let squeezed = match entry {
234            CacheEntry::MemorySqueezedLiquid(sq) => sq,
235            other => panic!("expected squeezed entry, got {other:?}"),
236        };
237        let variant = squeezed
238            .as_any()
239            .downcast_ref::<VariantStructSqueezedArray>()
240            .expect("variant squeezed");
241        assert!(variant.contains_path("name"));
242        assert!(variant.contains_path("age"));
243    }
244}