liquid_cache/cache/policies/
hydration.rs1use std::sync::Arc;
4
5use arrow::array::ArrayRef;
6
7use crate::{
8 cache::{
9 CacheExpression, LiquidCompressorStates, VariantRequest, cached_batch::CacheEntry,
10 utils::EntryID,
11 },
12 liquid_array::{
13 LiquidArrayRef, LiquidSqueezedArray, LiquidSqueezedArrayRef, VariantStructSqueezedArray,
14 },
15};
16
17use super::squeeze::try_variant_squeeze;
18
19#[derive(Debug, Clone)]
21pub enum MaterializedEntry<'a> {
22 Arrow(&'a ArrayRef),
24 Liquid(&'a LiquidArrayRef),
26}
27
28#[derive(Debug, Clone)]
30pub struct HydrationRequest<'a> {
31 pub entry_id: EntryID,
33 pub cached: &'a CacheEntry,
35 pub materialized: MaterializedEntry<'a>,
37 pub expression: Option<&'a CacheExpression>,
39 pub compressor: Arc<LiquidCompressorStates>,
41}
42
43pub trait HydrationPolicy: std::fmt::Debug + Send + Sync {
45 fn hydrate(&self, request: &HydrationRequest<'_>) -> Option<CacheEntry>;
48}
49
50#[derive(Debug, Default, Clone)]
53pub struct AlwaysHydrate;
54
55impl AlwaysHydrate {
56 pub fn new() -> Self {
58 Self
59 }
60}
61
62fn hydrate_variant_paths(
63 requests: &[VariantRequest],
64 materialized: &ArrayRef,
65 squeezed: &VariantStructSqueezedArray,
66 compressor: &LiquidCompressorStates,
67) -> Option<CacheEntry> {
68 let missing_requests: Vec<VariantRequest> = requests
69 .iter()
70 .filter(|request| !squeezed.contains_path(request.path()))
71 .cloned()
72 .collect();
73 if missing_requests.is_empty() {
74 return None;
75 }
76
77 let (new_squeezed, _) = try_variant_squeeze(materialized, &missing_requests, compressor)?;
78 let new_variant = new_squeezed
79 .as_any()
80 .downcast_ref::<VariantStructSqueezedArray>()?;
81
82 let mut combined_values = squeezed.typed_values();
83 combined_values.extend(new_variant.typed_values());
84
85 let nulls = squeezed.nulls().or_else(|| new_variant.nulls());
86 let merged = VariantStructSqueezedArray::new(
87 combined_values,
88 nulls,
89 squeezed.original_arrow_data_type(),
90 squeezed.disk_backing().disk_bytes(),
91 );
92 Some(CacheEntry::memory_squeezed_liquid(
93 Arc::new(merged) as LiquidSqueezedArrayRef
94 ))
95}
96
97impl HydrationPolicy for AlwaysHydrate {
98 fn hydrate(&self, request: &HydrationRequest<'_>) -> Option<CacheEntry> {
99 match (request.cached, &request.materialized) {
100 (CacheEntry::DiskArrow { disk_bytes, .. }, MaterializedEntry::Arrow(arr)) => {
101 if let Some(CacheExpression::VariantGet { requests }) = request.expression
102 && let Some((squeezed, _bytes)) =
103 try_variant_squeeze(arr, requests, request.compressor.as_ref())
104 {
105 let variant = squeezed
106 .as_any()
107 .downcast_ref::<VariantStructSqueezedArray>()?;
108 let squeezed = VariantStructSqueezedArray::new(
109 variant.typed_values(),
110 variant.nulls(),
111 variant.original_arrow_data_type(),
112 *disk_bytes,
113 );
114 return Some(CacheEntry::memory_squeezed_liquid(
115 Arc::new(squeezed) as LiquidSqueezedArrayRef
116 ));
117 }
118 Some(CacheEntry::memory_arrow((*arr).clone()))
119 }
120 (CacheEntry::DiskLiquid { .. }, MaterializedEntry::Liquid(liq)) => {
121 Some(CacheEntry::memory_liquid((*liq).clone()))
122 }
123 (CacheEntry::MemoryLiquid(_), _) => None,
124 (CacheEntry::MemorySqueezedLiquid(squeezed_entry), MaterializedEntry::Arrow(arr)) => {
126 if let Some(CacheExpression::VariantGet { requests }) = request.expression
127 && let Some(squeezed) = squeezed_entry
128 .as_any()
129 .downcast_ref::<VariantStructSqueezedArray>()
130 && let Some(entry) =
131 hydrate_variant_paths(requests, arr, squeezed, request.compressor.as_ref())
132 {
133 return Some(entry);
134 }
135 Some(CacheEntry::memory_arrow((*arr).clone()))
136 }
137 (CacheEntry::MemorySqueezedLiquid(_), MaterializedEntry::Liquid(liq)) => {
138 Some(CacheEntry::memory_liquid((*liq).clone()))
139 }
140 _ => None,
141 }
142 }
143}
144
145#[derive(Debug, Default, Clone)]
147pub struct NoHydration;
148
149impl NoHydration {
150 pub fn new() -> Self {
152 Self
153 }
154}
155
156impl HydrationPolicy for NoHydration {
157 fn hydrate(&self, _request: &HydrationRequest<'_>) -> Option<CacheEntry> {
158 None
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use parquet_variant_compute::json_to_variant;
166
167 use crate::cache::utils::LiquidCompressorStates;
168 use arrow::array::StringArray;
169 use arrow_schema::DataType;
170
171 fn variant_array_from_json(values: &[&str]) -> ArrayRef {
172 let strings: ArrayRef = Arc::new(StringArray::from_iter_values(values.iter().copied()));
173 let variant = json_to_variant(&strings).expect("variant array");
174 let struct_array = variant.into_inner();
175 Arc::new(struct_array) as ArrayRef
176 }
177
178 #[test]
179 fn hydrates_disk_arrow_variant_to_squeezed() {
180 let arr = variant_array_from_json(&[r#"{"name":"Ada","age":30}"#]);
181 let expr = CacheExpression::variant_get("age", DataType::Int64);
182 let policy = AlwaysHydrate::new();
183 let compressor = Arc::new(LiquidCompressorStates::new());
184 let cached_entry = CacheEntry::disk_arrow(arr.data_type().clone(), 1);
185
186 let hydrated = policy.hydrate(&HydrationRequest {
187 entry_id: EntryID::from(0),
188 cached: &cached_entry,
189 materialized: MaterializedEntry::Arrow(&arr),
190 expression: Some(&expr),
191 compressor,
192 });
193
194 let entry = hydrated.expect("should hydrate");
195 match entry {
196 CacheEntry::MemorySqueezedLiquid(squeezed) => {
197 let variant = squeezed
198 .as_any()
199 .downcast_ref::<VariantStructSqueezedArray>()
200 .expect("variant squeezed");
201 assert!(variant.contains_path("age"));
202 }
203 other => panic!("expected squeezed entry, got {other:?}"),
204 }
205 }
206
207 #[test]
208 fn hydrates_squeezed_variant_adds_missing_path() {
209 let arr = variant_array_from_json(&[r#"{"name":"Ada","age":30}"#]);
210 let name_expr = CacheExpression::variant_get("name", DataType::Utf8);
211 let age_expr = CacheExpression::variant_get("age", DataType::Int64);
212 let compressor = Arc::new(LiquidCompressorStates::new());
213
214 let (squeezed, _) = try_variant_squeeze(
216 &arr,
217 name_expr.variant_requests().unwrap(),
218 compressor.as_ref(),
219 )
220 .expect("squeeze name");
221 let cached_entry = CacheEntry::memory_squeezed_liquid(squeezed.clone());
222
223 let policy = AlwaysHydrate::new();
224 let hydrated = policy.hydrate(&HydrationRequest {
225 entry_id: EntryID::from(1),
226 cached: &cached_entry,
227 materialized: MaterializedEntry::Arrow(&arr),
228 expression: Some(&age_expr),
229 compressor,
230 });
231
232 let entry = hydrated.expect("should hydrate");
233 let squeezed = match entry {
234 CacheEntry::MemorySqueezedLiquid(sq) => sq,
235 other => panic!("expected squeezed entry, got {other:?}"),
236 };
237 let variant = squeezed
238 .as_any()
239 .downcast_ref::<VariantStructSqueezedArray>()
240 .expect("variant squeezed");
241 assert!(variant.contains_path("name"));
242 assert!(variant.contains_path("age"));
243 }
244}