liquid_cache/cache/policies/
hydration.rs1use std::sync::Arc;
4
5use arrow::array::ArrayRef;
6
7use crate::{
8 cache::{
9 CacheExpression, LiquidCompressorStates, VariantRequest, cached_batch::CacheEntry,
10 utils::EntryID,
11 },
12 liquid_array::{
13 LiquidArrayRef, LiquidSqueezedArray, LiquidSqueezedArrayRef, VariantStructSqueezedArray,
14 },
15};
16
17use super::squeeze::try_variant_squeeze;
18
19#[derive(Debug, Clone)]
21pub enum MaterializedEntry<'a> {
22 Arrow(&'a ArrayRef),
24 Liquid(&'a LiquidArrayRef),
26}
27
28#[derive(Debug, Clone)]
30pub struct HydrationRequest<'a> {
31 pub entry_id: EntryID,
33 pub cached: &'a CacheEntry,
35 pub materialized: MaterializedEntry<'a>,
37 pub expression: Option<&'a CacheExpression>,
39 pub compressor: Arc<LiquidCompressorStates>,
41}
42
43pub trait HydrationPolicy: std::fmt::Debug + Send + Sync {
45 fn hydrate(&self, request: &HydrationRequest<'_>) -> Option<CacheEntry>;
48}
49
50#[derive(Debug, Default, Clone)]
53pub struct AlwaysHydrate;
54
55impl AlwaysHydrate {
56 pub fn new() -> Self {
58 Self
59 }
60}
61
62fn hydrate_variant_paths(
63 requests: &[VariantRequest],
64 materialized: &ArrayRef,
65 squeezed: &VariantStructSqueezedArray,
66 compressor: &LiquidCompressorStates,
67) -> Option<CacheEntry> {
68 let missing_requests: Vec<VariantRequest> = requests
69 .iter()
70 .filter(|request| !squeezed.contains_path(request.path()))
71 .cloned()
72 .collect();
73 if missing_requests.is_empty() {
74 return None;
75 }
76
77 let (new_squeezed, _) = try_variant_squeeze(materialized, &missing_requests, compressor)?;
78 let new_variant = new_squeezed
79 .as_any()
80 .downcast_ref::<VariantStructSqueezedArray>()?;
81
82 let mut combined_values = squeezed.typed_values();
83 combined_values.extend(new_variant.typed_values());
84
85 let nulls = squeezed.nulls().or_else(|| new_variant.nulls());
86 let merged = VariantStructSqueezedArray::new(
87 combined_values,
88 nulls,
89 squeezed.original_arrow_data_type(),
90 );
91 Some(CacheEntry::memory_squeezed_liquid(
92 Arc::new(merged) as LiquidSqueezedArrayRef
93 ))
94}
95
96impl HydrationPolicy for AlwaysHydrate {
97 fn hydrate(&self, request: &HydrationRequest<'_>) -> Option<CacheEntry> {
98 match (request.cached, &request.materialized) {
99 (CacheEntry::DiskArrow(_), MaterializedEntry::Arrow(arr)) => {
100 if let Some(CacheExpression::VariantGet { requests }) = request.expression
101 && let Some((squeezed, _bytes)) =
102 try_variant_squeeze(arr, requests, request.compressor.as_ref())
103 {
104 return Some(CacheEntry::memory_squeezed_liquid(squeezed));
105 }
106 Some(CacheEntry::memory_arrow((*arr).clone()))
107 }
108 (CacheEntry::DiskLiquid(_), MaterializedEntry::Liquid(liq)) => {
109 Some(CacheEntry::memory_liquid((*liq).clone()))
110 }
111 (CacheEntry::MemoryLiquid(_), _) => None,
112 (CacheEntry::MemorySqueezedLiquid(squeezed_entry), MaterializedEntry::Arrow(arr)) => {
114 if let Some(CacheExpression::VariantGet { requests }) = request.expression
115 && let Some(squeezed) = squeezed_entry
116 .as_any()
117 .downcast_ref::<VariantStructSqueezedArray>()
118 && let Some(entry) =
119 hydrate_variant_paths(requests, arr, squeezed, request.compressor.as_ref())
120 {
121 return Some(entry);
122 }
123 Some(CacheEntry::memory_arrow((*arr).clone()))
124 }
125 (CacheEntry::MemorySqueezedLiquid(_), MaterializedEntry::Liquid(liq)) => {
126 Some(CacheEntry::memory_liquid((*liq).clone()))
127 }
128 _ => None,
129 }
130 }
131}
132
133#[derive(Debug, Default, Clone)]
135pub struct NoHydration;
136
137impl NoHydration {
138 pub fn new() -> Self {
140 Self
141 }
142}
143
144impl HydrationPolicy for NoHydration {
145 fn hydrate(&self, _request: &HydrationRequest<'_>) -> Option<CacheEntry> {
146 None
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use parquet_variant_compute::json_to_variant;
154
155 use crate::cache::utils::LiquidCompressorStates;
156 use arrow::array::StringArray;
157 use arrow_schema::DataType;
158
159 fn variant_array_from_json(values: &[&str]) -> ArrayRef {
160 let strings: ArrayRef = Arc::new(StringArray::from_iter_values(values.iter().copied()));
161 let variant = json_to_variant(&strings).expect("variant array");
162 let struct_array = variant.into_inner();
163 Arc::new(struct_array) as ArrayRef
164 }
165
166 #[test]
167 fn hydrates_disk_arrow_variant_to_squeezed() {
168 let arr = variant_array_from_json(&[r#"{"name":"Ada","age":30}"#]);
169 let expr = CacheExpression::variant_get("age", DataType::Int64);
170 let policy = AlwaysHydrate::new();
171 let compressor = Arc::new(LiquidCompressorStates::new());
172 let cached_entry = CacheEntry::disk_arrow(arr.data_type().clone());
173
174 let hydrated = policy.hydrate(&HydrationRequest {
175 entry_id: EntryID::from(0),
176 cached: &cached_entry,
177 materialized: MaterializedEntry::Arrow(&arr),
178 expression: Some(&expr),
179 compressor,
180 });
181
182 let entry = hydrated.expect("should hydrate");
183 match entry {
184 CacheEntry::MemorySqueezedLiquid(squeezed) => {
185 let variant = squeezed
186 .as_any()
187 .downcast_ref::<VariantStructSqueezedArray>()
188 .expect("variant squeezed");
189 assert!(variant.contains_path("age"));
190 }
191 other => panic!("expected squeezed entry, got {other:?}"),
192 }
193 }
194
195 #[test]
196 fn hydrates_squeezed_variant_adds_missing_path() {
197 let arr = variant_array_from_json(&[r#"{"name":"Ada","age":30}"#]);
198 let name_expr = CacheExpression::variant_get("name", DataType::Utf8);
199 let age_expr = CacheExpression::variant_get("age", DataType::Int64);
200 let compressor = Arc::new(LiquidCompressorStates::new());
201
202 let (squeezed, _) = try_variant_squeeze(
204 &arr,
205 name_expr.variant_requests().unwrap(),
206 compressor.as_ref(),
207 )
208 .expect("squeeze name");
209 let cached_entry = CacheEntry::memory_squeezed_liquid(squeezed.clone());
210
211 let policy = AlwaysHydrate::new();
212 let hydrated = policy.hydrate(&HydrationRequest {
213 entry_id: EntryID::from(1),
214 cached: &cached_entry,
215 materialized: MaterializedEntry::Arrow(&arr),
216 expression: Some(&age_expr),
217 compressor,
218 });
219
220 let entry = hydrated.expect("should hydrate");
221 let squeezed = match entry {
222 CacheEntry::MemorySqueezedLiquid(sq) => sq,
223 other => panic!("expected squeezed entry, got {other:?}"),
224 };
225 let variant = squeezed
226 .as_any()
227 .downcast_ref::<VariantStructSqueezedArray>()
228 .expect("variant squeezed");
229 assert!(variant.contains_path("name"));
230 assert!(variant.contains_path("age"));
231 }
232}