1use std::sync::Arc;
4
5use arrow::array::{Array, ArrayRef, StructArray};
6use arrow_schema::DataType;
7use bytes::Bytes;
8use parquet::variant::VariantPath;
9use parquet_variant_compute::{VariantArray, shred_variant, unshred_variant};
10
11use crate::cache::{
12 CacheExpression, LiquidCompressorStates, VariantRequest, cached_batch::CacheEntry,
13 transcode_liquid_inner, transcode_liquid_inner_with_hint, utils::arrow_to_bytes,
14};
15use crate::liquid_array::{
16 LiquidSqueezedArrayRef, SqueezeIoHandler, SqueezedBacking, VariantStructSqueezedArray,
17};
18use crate::utils::VariantSchema;
19
20pub trait SqueezePolicy: std::fmt::Debug + Send + Sync {
22 fn squeeze(
25 &self,
26 entry: &CacheEntry,
27 compressor: &LiquidCompressorStates,
28 squeeze_hint: Option<&CacheExpression>,
29 squeeze_io: &Arc<dyn SqueezeIoHandler>,
30 ) -> (CacheEntry, Option<Bytes>);
31}
32
33#[derive(Debug, Default, Clone)]
35pub struct Evict;
36
37impl SqueezePolicy for Evict {
38 fn squeeze(
39 &self,
40 entry: &CacheEntry,
41 _compressor: &LiquidCompressorStates,
42 _squeeze_hint: Option<&CacheExpression>,
43 _squeeze_io: &Arc<dyn SqueezeIoHandler>,
44 ) -> (CacheEntry, Option<Bytes>) {
45 match entry {
46 CacheEntry::MemoryArrow(array) => {
47 let bytes = arrow_to_bytes(array).expect("failed to convert arrow to bytes");
48 (
49 CacheEntry::disk_arrow(array.data_type().clone()),
50 Some(bytes),
51 )
52 }
53 CacheEntry::MemoryLiquid(liquid_array) => {
54 let disk_data = liquid_array.to_bytes();
55 (
56 CacheEntry::disk_liquid(liquid_array.original_arrow_data_type()),
57 Some(Bytes::from(disk_data)),
58 )
59 }
60 CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
61 let data_type = squeezed_array.original_arrow_data_type();
62 let new_entry = match squeezed_array.disk_backing() {
63 SqueezedBacking::Liquid => CacheEntry::disk_liquid(data_type),
64 SqueezedBacking::Arrow => CacheEntry::disk_arrow(data_type),
65 };
66 (new_entry, None)
67 }
68 CacheEntry::DiskLiquid(_) | CacheEntry::DiskArrow(_) => (entry.clone(), None),
69 }
70 }
71}
72
73#[derive(Debug, Default, Clone)]
75pub struct TranscodeSqueezeEvict;
76
77impl SqueezePolicy for TranscodeSqueezeEvict {
78 fn squeeze(
79 &self,
80 entry: &CacheEntry,
81 compressor: &LiquidCompressorStates,
82 squeeze_hint: Option<&CacheExpression>,
83 squeeze_io: &Arc<dyn SqueezeIoHandler>,
84 ) -> (CacheEntry, Option<Bytes>) {
85 match entry {
86 CacheEntry::MemoryArrow(array) => {
87 if let Some(requests) =
88 squeeze_hint.and_then(|expression| expression.variant_requests())
89 && let Some((squeezed_array, bytes)) =
90 try_variant_squeeze(array, requests, compressor)
91 {
92 return (
93 CacheEntry::memory_squeezed_liquid(squeezed_array),
94 Some(bytes),
95 );
96 }
97 match transcode_liquid_inner_with_hint(array, compressor, squeeze_hint) {
98 Ok(liquid_array) => (CacheEntry::memory_liquid(liquid_array), None),
99 Err(_) => {
100 let bytes =
101 arrow_to_bytes(array).expect("failed to convert arrow to bytes");
102 (
103 CacheEntry::disk_arrow(array.data_type().clone()),
104 Some(bytes),
105 )
106 }
107 }
108 }
109 CacheEntry::MemoryLiquid(liquid_array) => {
110 let (squeezed_array, bytes) =
111 match liquid_array.squeeze(squeeze_io.clone(), squeeze_hint) {
112 Some(result) => result,
113 None => {
114 let bytes = Bytes::from(liquid_array.to_bytes());
115 return (
116 CacheEntry::disk_liquid(liquid_array.original_arrow_data_type()),
117 Some(bytes),
118 );
119 }
120 };
121 (
122 CacheEntry::memory_squeezed_liquid(squeezed_array),
123 Some(bytes),
124 )
125 }
126 CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
127 let data_type = squeezed_array.original_arrow_data_type();
128 let new_entry = match squeezed_array.disk_backing() {
129 SqueezedBacking::Liquid => CacheEntry::disk_liquid(data_type),
130 SqueezedBacking::Arrow => CacheEntry::disk_arrow(data_type),
131 };
132 (new_entry, None)
133 }
134 CacheEntry::DiskLiquid(_) | CacheEntry::DiskArrow(_) => (entry.clone(), None),
135 }
136 }
137}
138
139#[derive(Debug, Default, Clone)]
141pub struct TranscodeEvict;
142
143impl SqueezePolicy for TranscodeEvict {
144 fn squeeze(
145 &self,
146 entry: &CacheEntry,
147 compressor: &LiquidCompressorStates,
148 _squeeze_hint: Option<&CacheExpression>,
149 _squeeze_io: &Arc<dyn SqueezeIoHandler>,
150 ) -> (CacheEntry, Option<Bytes>) {
151 match entry {
152 CacheEntry::MemoryArrow(array) => {
153 match transcode_liquid_inner_with_hint(array, compressor, None) {
154 Ok(liquid_array) => (CacheEntry::memory_liquid(liquid_array), None),
155 Err(_) => {
156 let bytes =
157 arrow_to_bytes(array).expect("failed to convert arrow to bytes");
158 (
159 CacheEntry::disk_arrow(array.data_type().clone()),
160 Some(bytes),
161 )
162 }
163 }
164 }
165 CacheEntry::MemoryLiquid(liquid_array) => {
166 let bytes = Bytes::from(liquid_array.to_bytes());
167 (
168 CacheEntry::disk_liquid(liquid_array.original_arrow_data_type()),
169 Some(bytes),
170 )
171 }
172 CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
173 let data_type = squeezed_array.original_arrow_data_type();
174 let new_entry = match squeezed_array.disk_backing() {
175 SqueezedBacking::Liquid => CacheEntry::disk_liquid(data_type),
176 SqueezedBacking::Arrow => CacheEntry::disk_arrow(data_type),
177 };
178 (new_entry, None)
179 }
180 CacheEntry::DiskLiquid(_) | CacheEntry::DiskArrow(_) => (entry.clone(), None),
181 }
182 }
183}
184
185pub(crate) fn try_variant_squeeze(
186 array: &ArrayRef,
187 requests: &[VariantRequest],
188 compressor: &LiquidCompressorStates,
189) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
190 let struct_array = array.as_any().downcast_ref::<StructArray>()?;
191 let mut variant_array = VariantArray::try_new(struct_array).ok()?;
192 if variant_array.is_empty() {
193 return None;
194 }
195
196 if requests.is_empty() {
197 return None;
198 }
199
200 let mut shredded_array: Option<ArrayRef> = None;
201 if let Some(shredding_type) = build_shredding_schema(struct_array, requests)
202 && let Ok(unshredded) = unshred_variant(&variant_array)
203 && let Ok(shredded) = shred_variant(&unshredded, &shredding_type)
204 {
205 let shredded_struct: ArrayRef = Arc::new(shredded.into_inner());
206 variant_array = VariantArray::try_new(shredded_struct.as_ref()).ok()?;
207 shredded_array = Some(shredded_struct);
208 }
209
210 let typed_root = variant_array.typed_value_field()?;
211 let typed_root = typed_root.as_any().downcast_ref::<StructArray>()?;
212
213 let mut collected = Vec::new();
214 for request in requests {
215 let path = request.path().trim();
216 if path.is_empty() {
217 continue;
218 }
219 let Some(path_struct) = extract_typed_values_for_path(typed_root, path) else {
220 continue;
221 };
222 let path_struct = path_struct.as_any().downcast_ref::<StructArray>()?;
223 let Some(typed_values) = path_struct.column_by_name("typed_value") else {
224 continue;
225 };
226 if typed_values.len() != array.len() {
227 continue;
228 }
229 collected.push((Arc::<str>::from(path.to_string()), typed_values.clone()));
230 }
231
232 if collected.is_empty() {
233 return None;
234 }
235
236 let backing_array = shredded_array.as_ref().unwrap_or(array);
237 let nulls = variant_array.inner().nulls().cloned();
238 let bytes = arrow_to_bytes(backing_array).ok()?;
239 let mut liquid_values = Vec::with_capacity(collected.len());
240 for (path, typed_values) in collected {
241 let Ok(liquid_array) = transcode_liquid_inner(&typed_values, compressor) else {
242 return None;
243 };
244 liquid_values.push((path, liquid_array));
245 }
246 let squeezed =
247 VariantStructSqueezedArray::new(liquid_values, nulls, backing_array.data_type().clone());
248 Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, bytes))
249}
250
251fn build_shredding_schema(
252 variant_struct: &StructArray,
253 requests: &[VariantRequest],
254) -> Option<DataType> {
255 let typed_field = match variant_struct.data_type() {
256 DataType::Struct(fields) => fields
257 .iter()
258 .find(|child| child.name() == "typed_value")
259 .cloned(),
260 _ => None,
261 };
262
263 let mut schema = VariantSchema::new(typed_field.as_deref());
264 for request in requests {
265 let path = request.path().trim();
266 if path.is_empty() {
267 continue;
268 }
269 schema.insert_path(path, request.data_type());
270 }
271 schema.shredding_type()
272}
273
274fn extract_typed_values_for_path(typed_root: &StructArray, path: &str) -> Option<ArrayRef> {
275 let path = VariantPath::from(path);
276 if path.is_empty() {
277 return None;
278 }
279
280 let mut cursor = typed_root;
281 for (idx, element) in path.iter().enumerate() {
282 let field_name = match element {
283 parquet::variant::VariantPathElement::Field { name } => name.as_ref(),
284 parquet::variant::VariantPathElement::Index { .. } => return None,
285 };
286 let field = cursor.column_by_name(field_name)?;
287 if idx == path.len() - 1 {
288 return Some(field.clone());
289 }
290 let struct_field = field.as_any().downcast_ref::<StructArray>()?;
291 let typed_value = struct_field.column_by_name("typed_value")?;
292 cursor = typed_value.as_any().downcast_ref::<StructArray>()?;
293 }
294
295 None
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use crate::cache::cached_batch::CacheEntry;
302 use crate::cache::{CacheExpression, io_context::TestSqueezeIo};
303 use crate::liquid_array::{LiquidSqueezedArray, SqueezedBacking, VariantStructSqueezedArray};
304 use arrow::array::{Array, ArrayRef, Int32Array, StringArray, StructArray};
305 use arrow_schema::Fields;
306 use arrow_schema::{DataType, Field};
307 use parquet::variant::VariantPath;
308 use parquet_variant_compute::{GetOptions, json_to_variant, variant_get};
309 use std::collections::BTreeMap;
310 use std::sync::Arc;
311
312 fn int_array(n: i32) -> ArrayRef {
313 Arc::new(Int32Array::from_iter_values(0..n))
314 }
315
316 fn decode_arrow(bytes: &Bytes) -> ArrayRef {
317 let cursor = std::io::Cursor::new(bytes.to_vec());
318 let mut reader =
319 arrow::ipc::reader::StreamReader::try_new(cursor, None).expect("arrow stream");
320 let batch = reader
321 .next()
322 .expect("non-empty stream")
323 .expect("read stream");
324 batch.column(0).clone()
325 }
326
327 fn struct_array() -> ArrayRef {
328 let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as ArrayRef;
329 let field = Arc::new(Field::new("value", DataType::Int32, true));
330 Arc::new(StructArray::from(vec![(field, values)]))
331 }
332
333 #[test]
334 fn test_squeeze_to_disk_policy() {
335 let disk = Evict;
336 let states = LiquidCompressorStates::new();
337 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
338 let arr = int_array(8);
340 let (new_batch, bytes) = disk.squeeze(
341 &CacheEntry::memory_arrow(arr.clone()),
342 &states,
343 None,
344 &squeeze_io,
345 );
346 let data = new_batch;
347 match (data, bytes) {
348 (CacheEntry::DiskArrow(dt), Some(b)) => {
349 assert_eq!(dt, DataType::Int32);
350 let decoded = decode_arrow(&b);
351 assert_eq!(decoded.as_ref(), arr.as_ref());
352 }
353 other => panic!("unexpected: {other:?}"),
354 }
355
356 let strings = Arc::new(StringArray::from(vec!["a", "b", "a"])) as ArrayRef;
358 let liquid = transcode_liquid_inner(&strings, &states).unwrap();
359 let (new_batch, bytes) = disk.squeeze(
360 &CacheEntry::memory_liquid(liquid.clone()),
361 &states,
362 None,
363 &squeeze_io,
364 );
365 let data = new_batch;
366 match (data, bytes) {
367 (CacheEntry::DiskLiquid(_), Some(b)) => {
368 assert!(!b.is_empty());
369 }
370 other => panic!("unexpected: {other:?}"),
371 }
372
373 let expression = Some(&CacheExpression::PredicateColumn);
374 let squeezed = match liquid.squeeze(squeeze_io.clone(), expression) {
376 Some((h, _b)) => h,
377 None => panic!("squeeze should succeed for byte-view"),
378 };
379 let (new_batch, bytes) = disk.squeeze(
380 &CacheEntry::memory_squeezed_liquid(squeezed),
381 &states,
382 expression,
383 &squeeze_io,
384 );
385 let data = new_batch;
386 match (data, bytes) {
387 (CacheEntry::DiskLiquid(_data_type), None) => {}
388 other => panic!("unexpected: {other:?}"),
389 }
390
391 let (b1, w1) = disk.squeeze(
393 &CacheEntry::disk_arrow(DataType::Utf8),
394 &states,
395 expression,
396 &squeeze_io,
397 );
398 assert!(matches!(b1, CacheEntry::DiskArrow(DataType::Utf8)) && w1.is_none());
399 let (b2, w2) = disk.squeeze(
400 &CacheEntry::disk_liquid(DataType::Utf8),
401 &states,
402 expression,
403 &squeeze_io,
404 );
405 assert!(matches!(b2, CacheEntry::DiskLiquid(DataType::Utf8)) && w2.is_none());
406 }
407
408 #[test]
409 fn test_squeeze_to_liquid_policy() {
410 let to_liquid = TranscodeSqueezeEvict;
411 let states = LiquidCompressorStates::new();
412 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
413
414 let arr = int_array(8);
416 let (new_batch, bytes) = to_liquid.squeeze(
417 &CacheEntry::memory_arrow(arr.clone()),
418 &states,
419 None,
420 &squeeze_io,
421 );
422 assert!(bytes.is_none());
423 match new_batch {
424 CacheEntry::MemoryLiquid(liq) => {
425 assert_eq!(liq.to_arrow_array().as_ref(), arr.as_ref());
426 }
427 other => panic!("unexpected: {other:?}"),
428 }
429 let expression = Some(&CacheExpression::PredicateColumn);
430
431 let strings = Arc::new(StringArray::from(vec!["x", "y", "x"])) as ArrayRef;
433 let liquid = transcode_liquid_inner(&strings, &states).unwrap();
434 let (new_batch, bytes) = to_liquid.squeeze(
435 &CacheEntry::memory_liquid(liquid),
436 &states,
437 expression,
438 &squeeze_io,
439 );
440 match (new_batch, bytes) {
441 (CacheEntry::MemorySqueezedLiquid(_), Some(b)) => assert!(!b.is_empty()),
442 other => panic!("unexpected: {other:?}"),
443 }
444
445 let strings = Arc::new(StringArray::from(vec!["m", "n"])) as ArrayRef;
447 let liquid = transcode_liquid_inner(&strings, &states).unwrap();
448 let squeezed = liquid.squeeze(squeeze_io.clone(), expression).unwrap().0;
449 let (new_batch, bytes) = to_liquid.squeeze(
450 &CacheEntry::memory_squeezed_liquid(squeezed),
451 &states,
452 expression,
453 &squeeze_io,
454 );
455 match (new_batch, bytes) {
456 (CacheEntry::DiskLiquid(DataType::Utf8), None) => {}
457 other => panic!("unexpected: {other:?}"),
458 }
459
460 let (b1, w1) = to_liquid.squeeze(
462 &CacheEntry::disk_arrow(DataType::Utf8),
463 &states,
464 expression,
465 &squeeze_io,
466 );
467 assert!(matches!(b1, CacheEntry::DiskArrow(DataType::Utf8)) && w1.is_none());
468 let (b2, w2) = to_liquid.squeeze(
469 &CacheEntry::disk_liquid(DataType::Utf8),
470 &states,
471 expression,
472 &squeeze_io,
473 );
474 assert!(matches!(b2, CacheEntry::DiskLiquid(DataType::Utf8)) && w2.is_none());
475 }
476
477 #[test]
478 fn transcode_squeeze_struct_falls_back_to_disk_arrow() {
479 let to_liquid = TranscodeSqueezeEvict;
480 let states = LiquidCompressorStates::new();
481 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
482 let struct_arr = struct_array();
483 let (new_batch, bytes) = to_liquid.squeeze(
484 &CacheEntry::memory_arrow(struct_arr.clone()),
485 &states,
486 None,
487 &squeeze_io,
488 );
489 match (new_batch, bytes) {
490 (CacheEntry::DiskArrow(dt), Some(b)) => {
491 assert_eq!(&dt, struct_arr.data_type());
492 assert_eq!(decode_arrow(&b).as_ref(), struct_arr.as_ref());
493 }
494 other => panic!("expected disk arrow fallback, got {other:?}"),
495 }
496 }
497
498 #[test]
499 fn transcode_evict_struct_falls_back_to_disk_arrow() {
500 let to_disk = TranscodeEvict;
501 let states = LiquidCompressorStates::new();
502 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
503 let struct_arr = struct_array();
504 let (new_batch, bytes) = to_disk.squeeze(
505 &CacheEntry::memory_arrow(struct_arr.clone()),
506 &states,
507 None,
508 &squeeze_io,
509 );
510 match (new_batch, bytes) {
511 (CacheEntry::DiskArrow(dt), Some(b)) => {
512 assert_eq!(&dt, struct_arr.data_type());
513 assert_eq!(decode_arrow(&b).as_ref(), struct_arr.as_ref());
514 }
515 other => panic!("expected disk arrow fallback, got {other:?}"),
516 }
517 }
518
519 fn enriched_variant_array(path: &str, data_type: DataType) -> ArrayRef {
520 enriched_variant_array_with_paths(&[(path, data_type)])
521 }
522
523 fn enriched_variant_array_with_paths(entries: &[(&str, DataType)]) -> ArrayRef {
524 let values: ArrayRef = Arc::new(StringArray::from(vec![
525 Some(r#"{"name": "Alice", "age": 30}"#),
526 Some(r#"{"name": "Bob", "age": 25}"#),
527 Some(r#"{"name": "Charlie", "age": 35}"#),
528 ]));
529 let base_variant = json_to_variant(&values).unwrap();
530 let base_arr: ArrayRef = Arc::new(base_variant.inner().clone());
531
532 let mut typed_structs: BTreeMap<String, ArrayRef> = BTreeMap::new();
533
534 for (path, data_type) in entries.iter() {
535 let typed_values = variant_get(
536 &base_arr,
537 GetOptions::new_with_path(VariantPath::from(*path)).with_as_type(Some(Arc::new(
538 Field::new("typed_value", data_type.clone(), true),
539 ))),
540 )
541 .unwrap();
542
543 typed_structs
544 .entry(path.to_string())
545 .or_insert(Arc::new(StructArray::new(
546 Fields::from(vec![Arc::new(Field::new(
547 "typed_value",
548 data_type.clone(),
549 true,
550 ))]),
551 vec![typed_values.clone()],
552 None,
553 )));
554 }
555
556 let mut typed_fields: Vec<Arc<Field>> = Vec::new();
557 let mut typed_columns: Vec<ArrayRef> = Vec::new();
558 for (name, tree) in typed_structs {
559 typed_fields.push(Arc::new(Field::new(
560 name.as_str(),
561 tree.data_type().clone(),
562 true,
563 )));
564 typed_columns.push(tree.clone());
565 }
566
567 let typed_struct = Arc::new(StructArray::new(
568 Fields::from(typed_fields),
569 typed_columns,
570 base_variant.inner().nulls().cloned(),
571 ));
572
573 let inner = base_variant.inner();
574 use arrow::array::BinaryViewArray;
575 Arc::new(StructArray::new(
576 Fields::from(vec![
577 Arc::new(Field::new("metadata", DataType::BinaryView, false)),
578 Arc::new(Field::new("value", DataType::BinaryView, true)),
579 Arc::new(Field::new(
580 "typed_value",
581 typed_struct.data_type().clone(),
582 true,
583 )),
584 ]),
585 vec![
586 inner
587 .column_by_name("metadata")
588 .cloned()
589 .unwrap_or_else(|| Arc::new(base_variant.metadata_field().clone()) as ArrayRef),
590 inner.column_by_name("value").cloned().unwrap_or_else(|| {
591 Arc::new(BinaryViewArray::from(vec![None::<&[u8]>; inner.len()])) as ArrayRef
592 }),
593 typed_struct as ArrayRef,
594 ],
595 inner.nulls().cloned(),
596 )) as ArrayRef
597 }
598
599 fn assert_variant_squeezed(
600 squeezed: &LiquidSqueezedArrayRef,
601 expected_path: &str,
602 bytes: &Bytes,
603 ) {
604 use futures::executor::block_on;
605
606 assert!(!bytes.is_empty());
607 assert_eq!(squeezed.disk_backing(), SqueezedBacking::Arrow);
608 let struct_squeezed = squeezed
609 .as_any()
610 .downcast_ref::<VariantStructSqueezedArray>()
611 .expect("squeezed variant struct");
612 let arrow_array = block_on(struct_squeezed.to_arrow_array());
613 let struct_array = arrow_array
614 .as_any()
615 .downcast_ref::<StructArray>()
616 .expect("variant struct");
617 let value_column = struct_array
618 .column_by_name("value")
619 .expect("value column present");
620 assert_eq!(value_column.len(), value_column.null_count());
621 let typed_struct = struct_array
622 .column_by_name("typed_value")
623 .expect("typed_value column")
624 .as_any()
625 .downcast_ref::<StructArray>()
626 .expect("typed struct");
627 assert!(
628 extract_typed_values_for_path(typed_struct, expected_path).is_some(),
629 "typed path {expected_path} missing from squeezed variant"
630 );
631 }
632
633 #[test]
634 fn test_variant_squeeze_with_hint() {
635 let policy = TranscodeSqueezeEvict;
636 let states = LiquidCompressorStates::new();
637 let variant_arr = enriched_variant_array("name", DataType::Utf8);
638 let hint = CacheExpression::variant_get("name", DataType::Utf8);
639 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
640
641 let (new_batch, bytes) = policy.squeeze(
642 &CacheEntry::memory_arrow(variant_arr),
643 &states,
644 Some(&hint),
645 &squeeze_io,
646 );
647
648 match (new_batch, bytes) {
649 (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
650 assert_variant_squeezed(&squeezed, "name", &b);
651 }
652 other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
653 }
654 }
655
656 #[test]
657 fn test_variant_squeeze_with_int64_path() {
658 let policy = TranscodeSqueezeEvict;
659 let states = LiquidCompressorStates::new();
660 let variant_arr = enriched_variant_array("age", DataType::Int64);
661 let hint = CacheExpression::variant_get("age", DataType::Int64);
662 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
663
664 let (new_batch, bytes) = policy.squeeze(
665 &CacheEntry::memory_arrow(variant_arr),
666 &states,
667 Some(&hint),
668 &squeeze_io,
669 );
670
671 match (new_batch, bytes) {
672 (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
673 assert_variant_squeezed(&squeezed, "age", &b);
674 }
675 other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
676 }
677 }
678
679 #[test]
680 fn test_variant_squeeze_with_multiple_paths_preserves_all_fields() {
681 let policy = TranscodeSqueezeEvict;
682 let states = LiquidCompressorStates::new();
683 let variant_arr = enriched_variant_array_with_paths(&[
684 ("name", DataType::Utf8),
685 ("age", DataType::Int64),
686 ]);
687 let hint = CacheExpression::variant_get("name", DataType::Utf8);
688 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
689
690 let (new_batch, bytes) = policy.squeeze(
691 &CacheEntry::memory_arrow(variant_arr),
692 &states,
693 Some(&hint),
694 &squeeze_io,
695 );
696
697 match (new_batch, bytes) {
698 (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
699 assert!(!b.is_empty());
700 let struct_squeezed = squeezed
701 .as_any()
702 .downcast_ref::<VariantStructSqueezedArray>()
703 .unwrap();
704 let arrow_array = futures::executor::block_on(struct_squeezed.to_arrow_array());
705 let struct_array = arrow_array.as_any().downcast_ref::<StructArray>().unwrap();
706 let typed_value = struct_array
707 .column_by_name("typed_value")
708 .unwrap()
709 .as_any()
710 .downcast_ref::<StructArray>()
711 .unwrap();
712 assert!(typed_value.column_by_name("name").is_some());
713 assert!(typed_value.column_by_name("age").is_none());
714 }
715 other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
716 }
717 }
718
719 #[test]
720 fn test_variant_squeeze_without_hint() {
721 let policy = TranscodeSqueezeEvict;
722 let states = LiquidCompressorStates::new();
723 let variant_arr = enriched_variant_array("name", DataType::Utf8);
724 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
725
726 let (new_batch, bytes) = policy.squeeze(
727 &CacheEntry::memory_arrow(variant_arr),
728 &states,
729 None,
730 &squeeze_io,
731 );
732
733 match (new_batch, bytes) {
734 (CacheEntry::DiskArrow(_), Some(b)) => assert!(!b.is_empty()),
735 (CacheEntry::MemoryLiquid(_), None) => {}
736 other => panic!("expected DiskArrow with bytes or MemoryLiquid, got {other:?}"),
737 }
738 }
739
740 #[test]
741 fn test_variant_squeeze_skips_when_path_missing() {
742 let policy = TranscodeSqueezeEvict;
743 let states = LiquidCompressorStates::new();
744 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
745 let variant_arr = enriched_variant_array("name", DataType::Utf8);
746 let hint = CacheExpression::variant_get("age", DataType::Int64);
747
748 let (new_batch, bytes) = policy.squeeze(
749 &CacheEntry::memory_arrow(variant_arr.clone()),
750 &states,
751 Some(&hint),
752 &squeeze_io,
753 );
754
755 match (new_batch, bytes) {
756 (CacheEntry::DiskArrow(dt), Some(b)) => {
757 assert_eq!(dt, variant_arr.data_type().clone());
758 assert!(!b.is_empty());
759 }
760 other => panic!("expected DiskArrow fallback when path missing, got {other:?}"),
761 }
762 }
763}