Skip to main content

liquid_cache/cache/policies/
squeeze.rs

1//! Squeeze policies for liquid cache.
2
3use std::sync::Arc;
4
5use arrow::array::{Array, ArrayRef, StructArray};
6use arrow_schema::DataType;
7use bytes::Bytes;
8use parquet::variant::VariantPath;
9use parquet_variant_compute::{VariantArray, shred_variant, unshred_variant};
10
11use crate::cache::{
12    CacheExpression, LiquidCompressorStates, VariantRequest, cached_batch::CacheEntry,
13    transcode_liquid_inner, transcode_liquid_inner_with_hint, utils::arrow_to_bytes,
14};
15use crate::liquid_array::{
16    LiquidSqueezedArrayRef, SqueezeIoHandler, SqueezedBacking, VariantStructSqueezedArray,
17};
18use crate::utils::VariantSchema;
19
20/// What to do when we need to squeeze an entry?
21pub trait SqueezePolicy: std::fmt::Debug + Send + Sync {
22    /// Squeeze the entry.
23    /// Returns the squeezed entry and the bytes that were used to store the entry on disk.
24    fn squeeze(
25        &self,
26        entry: &CacheEntry,
27        compressor: &LiquidCompressorStates,
28        squeeze_hint: Option<&CacheExpression>,
29        squeeze_io: &Arc<dyn SqueezeIoHandler>,
30    ) -> (CacheEntry, Option<Bytes>);
31}
32
33/// Squeeze the entry to disk.
34#[derive(Debug, Default, Clone)]
35pub struct Evict;
36
37impl SqueezePolicy for Evict {
38    fn squeeze(
39        &self,
40        entry: &CacheEntry,
41        _compressor: &LiquidCompressorStates,
42        _squeeze_hint: Option<&CacheExpression>,
43        _squeeze_io: &Arc<dyn SqueezeIoHandler>,
44    ) -> (CacheEntry, Option<Bytes>) {
45        match entry {
46            CacheEntry::MemoryArrow(array) => {
47                let bytes = arrow_to_bytes(array).expect("failed to convert arrow to bytes");
48                (
49                    CacheEntry::disk_arrow(array.data_type().clone()),
50                    Some(bytes),
51                )
52            }
53            CacheEntry::MemoryLiquid(liquid_array) => {
54                let disk_data = liquid_array.to_bytes();
55                (
56                    CacheEntry::disk_liquid(liquid_array.original_arrow_data_type()),
57                    Some(Bytes::from(disk_data)),
58                )
59            }
60            CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
61                let data_type = squeezed_array.original_arrow_data_type();
62                let new_entry = match squeezed_array.disk_backing() {
63                    SqueezedBacking::Liquid => CacheEntry::disk_liquid(data_type),
64                    SqueezedBacking::Arrow => CacheEntry::disk_arrow(data_type),
65                };
66                (new_entry, None)
67            }
68            CacheEntry::DiskLiquid(_) | CacheEntry::DiskArrow(_) => (entry.clone(), None),
69        }
70    }
71}
72
73/// Squeeze the entry to liquid memory.
74#[derive(Debug, Default, Clone)]
75pub struct TranscodeSqueezeEvict;
76
77impl SqueezePolicy for TranscodeSqueezeEvict {
78    fn squeeze(
79        &self,
80        entry: &CacheEntry,
81        compressor: &LiquidCompressorStates,
82        squeeze_hint: Option<&CacheExpression>,
83        squeeze_io: &Arc<dyn SqueezeIoHandler>,
84    ) -> (CacheEntry, Option<Bytes>) {
85        match entry {
86            CacheEntry::MemoryArrow(array) => {
87                if let Some(requests) =
88                    squeeze_hint.and_then(|expression| expression.variant_requests())
89                    && let Some((squeezed_array, bytes)) =
90                        try_variant_squeeze(array, requests, compressor)
91                {
92                    return (
93                        CacheEntry::memory_squeezed_liquid(squeezed_array),
94                        Some(bytes),
95                    );
96                }
97                match transcode_liquid_inner_with_hint(array, compressor, squeeze_hint) {
98                    Ok(liquid_array) => (CacheEntry::memory_liquid(liquid_array), None),
99                    Err(_) => {
100                        let bytes =
101                            arrow_to_bytes(array).expect("failed to convert arrow to bytes");
102                        (
103                            CacheEntry::disk_arrow(array.data_type().clone()),
104                            Some(bytes),
105                        )
106                    }
107                }
108            }
109            CacheEntry::MemoryLiquid(liquid_array) => {
110                let (squeezed_array, bytes) =
111                    match liquid_array.squeeze(squeeze_io.clone(), squeeze_hint) {
112                        Some(result) => result,
113                        None => {
114                            let bytes = Bytes::from(liquid_array.to_bytes());
115                            return (
116                                CacheEntry::disk_liquid(liquid_array.original_arrow_data_type()),
117                                Some(bytes),
118                            );
119                        }
120                    };
121                (
122                    CacheEntry::memory_squeezed_liquid(squeezed_array),
123                    Some(bytes),
124                )
125            }
126            CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
127                let data_type = squeezed_array.original_arrow_data_type();
128                let new_entry = match squeezed_array.disk_backing() {
129                    SqueezedBacking::Liquid => CacheEntry::disk_liquid(data_type),
130                    SqueezedBacking::Arrow => CacheEntry::disk_arrow(data_type),
131                };
132                (new_entry, None)
133            }
134            CacheEntry::DiskLiquid(_) | CacheEntry::DiskArrow(_) => (entry.clone(), None),
135        }
136    }
137}
138
139/// Squeeze the entry to liquid memory, but don't convert to squeezed.
140#[derive(Debug, Default, Clone)]
141pub struct TranscodeEvict;
142
143impl SqueezePolicy for TranscodeEvict {
144    fn squeeze(
145        &self,
146        entry: &CacheEntry,
147        compressor: &LiquidCompressorStates,
148        _squeeze_hint: Option<&CacheExpression>,
149        _squeeze_io: &Arc<dyn SqueezeIoHandler>,
150    ) -> (CacheEntry, Option<Bytes>) {
151        match entry {
152            CacheEntry::MemoryArrow(array) => {
153                match transcode_liquid_inner_with_hint(array, compressor, None) {
154                    Ok(liquid_array) => (CacheEntry::memory_liquid(liquid_array), None),
155                    Err(_) => {
156                        let bytes =
157                            arrow_to_bytes(array).expect("failed to convert arrow to bytes");
158                        (
159                            CacheEntry::disk_arrow(array.data_type().clone()),
160                            Some(bytes),
161                        )
162                    }
163                }
164            }
165            CacheEntry::MemoryLiquid(liquid_array) => {
166                let bytes = Bytes::from(liquid_array.to_bytes());
167                (
168                    CacheEntry::disk_liquid(liquid_array.original_arrow_data_type()),
169                    Some(bytes),
170                )
171            }
172            CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
173                let data_type = squeezed_array.original_arrow_data_type();
174                let new_entry = match squeezed_array.disk_backing() {
175                    SqueezedBacking::Liquid => CacheEntry::disk_liquid(data_type),
176                    SqueezedBacking::Arrow => CacheEntry::disk_arrow(data_type),
177                };
178                (new_entry, None)
179            }
180            CacheEntry::DiskLiquid(_) | CacheEntry::DiskArrow(_) => (entry.clone(), None),
181        }
182    }
183}
184
185pub(crate) fn try_variant_squeeze(
186    array: &ArrayRef,
187    requests: &[VariantRequest],
188    compressor: &LiquidCompressorStates,
189) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
190    let struct_array = array.as_any().downcast_ref::<StructArray>()?;
191    let mut variant_array = VariantArray::try_new(struct_array).ok()?;
192    if variant_array.is_empty() {
193        return None;
194    }
195
196    if requests.is_empty() {
197        return None;
198    }
199
200    let mut shredded_array: Option<ArrayRef> = None;
201    if let Some(shredding_type) = build_shredding_schema(struct_array, requests)
202        && let Ok(unshredded) = unshred_variant(&variant_array)
203        && let Ok(shredded) = shred_variant(&unshredded, &shredding_type)
204    {
205        let shredded_struct: ArrayRef = Arc::new(shredded.into_inner());
206        variant_array = VariantArray::try_new(shredded_struct.as_ref()).ok()?;
207        shredded_array = Some(shredded_struct);
208    }
209
210    let typed_root = variant_array.typed_value_field()?;
211    let typed_root = typed_root.as_any().downcast_ref::<StructArray>()?;
212
213    let mut collected = Vec::new();
214    for request in requests {
215        let path = request.path().trim();
216        if path.is_empty() {
217            continue;
218        }
219        let Some(path_struct) = extract_typed_values_for_path(typed_root, path) else {
220            continue;
221        };
222        let path_struct = path_struct.as_any().downcast_ref::<StructArray>()?;
223        let Some(typed_values) = path_struct.column_by_name("typed_value") else {
224            continue;
225        };
226        if typed_values.len() != array.len() {
227            continue;
228        }
229        collected.push((Arc::<str>::from(path.to_string()), typed_values.clone()));
230    }
231
232    if collected.is_empty() {
233        return None;
234    }
235
236    let backing_array = shredded_array.as_ref().unwrap_or(array);
237    let nulls = variant_array.inner().nulls().cloned();
238    let bytes = arrow_to_bytes(backing_array).ok()?;
239    let mut liquid_values = Vec::with_capacity(collected.len());
240    for (path, typed_values) in collected {
241        let Ok(liquid_array) = transcode_liquid_inner(&typed_values, compressor) else {
242            return None;
243        };
244        liquid_values.push((path, liquid_array));
245    }
246    let squeezed =
247        VariantStructSqueezedArray::new(liquid_values, nulls, backing_array.data_type().clone());
248    Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, bytes))
249}
250
251fn build_shredding_schema(
252    variant_struct: &StructArray,
253    requests: &[VariantRequest],
254) -> Option<DataType> {
255    let typed_field = match variant_struct.data_type() {
256        DataType::Struct(fields) => fields
257            .iter()
258            .find(|child| child.name() == "typed_value")
259            .cloned(),
260        _ => None,
261    };
262
263    let mut schema = VariantSchema::new(typed_field.as_deref());
264    for request in requests {
265        let path = request.path().trim();
266        if path.is_empty() {
267            continue;
268        }
269        schema.insert_path(path, request.data_type());
270    }
271    schema.shredding_type()
272}
273
274fn extract_typed_values_for_path(typed_root: &StructArray, path: &str) -> Option<ArrayRef> {
275    let path = VariantPath::from(path);
276    if path.is_empty() {
277        return None;
278    }
279
280    let mut cursor = typed_root;
281    for (idx, element) in path.iter().enumerate() {
282        let field_name = match element {
283            parquet::variant::VariantPathElement::Field { name } => name.as_ref(),
284            parquet::variant::VariantPathElement::Index { .. } => return None,
285        };
286        let field = cursor.column_by_name(field_name)?;
287        if idx == path.len() - 1 {
288            return Some(field.clone());
289        }
290        let struct_field = field.as_any().downcast_ref::<StructArray>()?;
291        let typed_value = struct_field.column_by_name("typed_value")?;
292        cursor = typed_value.as_any().downcast_ref::<StructArray>()?;
293    }
294
295    None
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use crate::cache::cached_batch::CacheEntry;
302    use crate::cache::{CacheExpression, io_context::TestSqueezeIo};
303    use crate::liquid_array::{LiquidSqueezedArray, SqueezedBacking, VariantStructSqueezedArray};
304    use arrow::array::{Array, ArrayRef, Int32Array, StringArray, StructArray};
305    use arrow_schema::Fields;
306    use arrow_schema::{DataType, Field};
307    use parquet::variant::VariantPath;
308    use parquet_variant_compute::{GetOptions, json_to_variant, variant_get};
309    use std::collections::BTreeMap;
310    use std::sync::Arc;
311
312    fn int_array(n: i32) -> ArrayRef {
313        Arc::new(Int32Array::from_iter_values(0..n))
314    }
315
316    fn decode_arrow(bytes: &Bytes) -> ArrayRef {
317        let cursor = std::io::Cursor::new(bytes.to_vec());
318        let mut reader =
319            arrow::ipc::reader::StreamReader::try_new(cursor, None).expect("arrow stream");
320        let batch = reader
321            .next()
322            .expect("non-empty stream")
323            .expect("read stream");
324        batch.column(0).clone()
325    }
326
327    fn struct_array() -> ArrayRef {
328        let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as ArrayRef;
329        let field = Arc::new(Field::new("value", DataType::Int32, true));
330        Arc::new(StructArray::from(vec![(field, values)]))
331    }
332
333    #[test]
334    fn test_squeeze_to_disk_policy() {
335        let disk = Evict;
336        let states = LiquidCompressorStates::new();
337        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
338        // MemoryArrow -> DiskArrow + bytes (Arrow IPC)
339        let arr = int_array(8);
340        let (new_batch, bytes) = disk.squeeze(
341            &CacheEntry::memory_arrow(arr.clone()),
342            &states,
343            None,
344            &squeeze_io,
345        );
346        let data = new_batch;
347        match (data, bytes) {
348            (CacheEntry::DiskArrow(dt), Some(b)) => {
349                assert_eq!(dt, DataType::Int32);
350                let decoded = decode_arrow(&b);
351                assert_eq!(decoded.as_ref(), arr.as_ref());
352            }
353            other => panic!("unexpected: {other:?}"),
354        }
355
356        // MemoryLiquid (strings) -> MemoryHybridLiquid + bytes
357        let strings = Arc::new(StringArray::from(vec!["a", "b", "a"])) as ArrayRef;
358        let liquid = transcode_liquid_inner(&strings, &states).unwrap();
359        let (new_batch, bytes) = disk.squeeze(
360            &CacheEntry::memory_liquid(liquid.clone()),
361            &states,
362            None,
363            &squeeze_io,
364        );
365        let data = new_batch;
366        match (data, bytes) {
367            (CacheEntry::DiskLiquid(_), Some(b)) => {
368                assert!(!b.is_empty());
369            }
370            other => panic!("unexpected: {other:?}"),
371        }
372
373        let expression = Some(&CacheExpression::PredicateColumn);
374        // MemorySqueezedLiquid -> DiskLiquid, no extra bytes
375        let squeezed = match liquid.squeeze(squeeze_io.clone(), expression) {
376            Some((h, _b)) => h,
377            None => panic!("squeeze should succeed for byte-view"),
378        };
379        let (new_batch, bytes) = disk.squeeze(
380            &CacheEntry::memory_squeezed_liquid(squeezed),
381            &states,
382            expression,
383            &squeeze_io,
384        );
385        let data = new_batch;
386        match (data, bytes) {
387            (CacheEntry::DiskLiquid(_data_type), None) => {}
388            other => panic!("unexpected: {other:?}"),
389        }
390
391        // Disk* -> unchanged, no bytes
392        let (b1, w1) = disk.squeeze(
393            &CacheEntry::disk_arrow(DataType::Utf8),
394            &states,
395            expression,
396            &squeeze_io,
397        );
398        assert!(matches!(b1, CacheEntry::DiskArrow(DataType::Utf8)) && w1.is_none());
399        let (b2, w2) = disk.squeeze(
400            &CacheEntry::disk_liquid(DataType::Utf8),
401            &states,
402            expression,
403            &squeeze_io,
404        );
405        assert!(matches!(b2, CacheEntry::DiskLiquid(DataType::Utf8)) && w2.is_none());
406    }
407
408    #[test]
409    fn test_squeeze_to_liquid_policy() {
410        let to_liquid = TranscodeSqueezeEvict;
411        let states = LiquidCompressorStates::new();
412        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
413
414        // MemoryArrow -> MemoryLiquid, no bytes
415        let arr = int_array(8);
416        let (new_batch, bytes) = to_liquid.squeeze(
417            &CacheEntry::memory_arrow(arr.clone()),
418            &states,
419            None,
420            &squeeze_io,
421        );
422        assert!(bytes.is_none());
423        match new_batch {
424            CacheEntry::MemoryLiquid(liq) => {
425                assert_eq!(liq.to_arrow_array().as_ref(), arr.as_ref());
426            }
427            other => panic!("unexpected: {other:?}"),
428        }
429        let expression = Some(&CacheExpression::PredicateColumn);
430
431        // MemoryLiquid (strings) -> MemorySqueezedLiquid + bytes
432        let strings = Arc::new(StringArray::from(vec!["x", "y", "x"])) as ArrayRef;
433        let liquid = transcode_liquid_inner(&strings, &states).unwrap();
434        let (new_batch, bytes) = to_liquid.squeeze(
435            &CacheEntry::memory_liquid(liquid),
436            &states,
437            expression,
438            &squeeze_io,
439        );
440        match (new_batch, bytes) {
441            (CacheEntry::MemorySqueezedLiquid(_), Some(b)) => assert!(!b.is_empty()),
442            other => panic!("unexpected: {other:?}"),
443        }
444
445        // MemorySqueezedLiquid -> DiskLiquid, no bytes
446        let strings = Arc::new(StringArray::from(vec!["m", "n"])) as ArrayRef;
447        let liquid = transcode_liquid_inner(&strings, &states).unwrap();
448        let squeezed = liquid.squeeze(squeeze_io.clone(), expression).unwrap().0;
449        let (new_batch, bytes) = to_liquid.squeeze(
450            &CacheEntry::memory_squeezed_liquid(squeezed),
451            &states,
452            expression,
453            &squeeze_io,
454        );
455        match (new_batch, bytes) {
456            (CacheEntry::DiskLiquid(DataType::Utf8), None) => {}
457            other => panic!("unexpected: {other:?}"),
458        }
459
460        // Disk* -> unchanged
461        let (b1, w1) = to_liquid.squeeze(
462            &CacheEntry::disk_arrow(DataType::Utf8),
463            &states,
464            expression,
465            &squeeze_io,
466        );
467        assert!(matches!(b1, CacheEntry::DiskArrow(DataType::Utf8)) && w1.is_none());
468        let (b2, w2) = to_liquid.squeeze(
469            &CacheEntry::disk_liquid(DataType::Utf8),
470            &states,
471            expression,
472            &squeeze_io,
473        );
474        assert!(matches!(b2, CacheEntry::DiskLiquid(DataType::Utf8)) && w2.is_none());
475    }
476
477    #[test]
478    fn transcode_squeeze_struct_falls_back_to_disk_arrow() {
479        let to_liquid = TranscodeSqueezeEvict;
480        let states = LiquidCompressorStates::new();
481        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
482        let struct_arr = struct_array();
483        let (new_batch, bytes) = to_liquid.squeeze(
484            &CacheEntry::memory_arrow(struct_arr.clone()),
485            &states,
486            None,
487            &squeeze_io,
488        );
489        match (new_batch, bytes) {
490            (CacheEntry::DiskArrow(dt), Some(b)) => {
491                assert_eq!(&dt, struct_arr.data_type());
492                assert_eq!(decode_arrow(&b).as_ref(), struct_arr.as_ref());
493            }
494            other => panic!("expected disk arrow fallback, got {other:?}"),
495        }
496    }
497
498    #[test]
499    fn transcode_evict_struct_falls_back_to_disk_arrow() {
500        let to_disk = TranscodeEvict;
501        let states = LiquidCompressorStates::new();
502        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
503        let struct_arr = struct_array();
504        let (new_batch, bytes) = to_disk.squeeze(
505            &CacheEntry::memory_arrow(struct_arr.clone()),
506            &states,
507            None,
508            &squeeze_io,
509        );
510        match (new_batch, bytes) {
511            (CacheEntry::DiskArrow(dt), Some(b)) => {
512                assert_eq!(&dt, struct_arr.data_type());
513                assert_eq!(decode_arrow(&b).as_ref(), struct_arr.as_ref());
514            }
515            other => panic!("expected disk arrow fallback, got {other:?}"),
516        }
517    }
518
519    fn enriched_variant_array(path: &str, data_type: DataType) -> ArrayRef {
520        enriched_variant_array_with_paths(&[(path, data_type)])
521    }
522
523    fn enriched_variant_array_with_paths(entries: &[(&str, DataType)]) -> ArrayRef {
524        let values: ArrayRef = Arc::new(StringArray::from(vec![
525            Some(r#"{"name": "Alice", "age": 30}"#),
526            Some(r#"{"name": "Bob", "age": 25}"#),
527            Some(r#"{"name": "Charlie", "age": 35}"#),
528        ]));
529        let base_variant = json_to_variant(&values).unwrap();
530        let base_arr: ArrayRef = Arc::new(base_variant.inner().clone());
531
532        let mut typed_structs: BTreeMap<String, ArrayRef> = BTreeMap::new();
533
534        for (path, data_type) in entries.iter() {
535            let typed_values = variant_get(
536                &base_arr,
537                GetOptions::new_with_path(VariantPath::from(*path)).with_as_type(Some(Arc::new(
538                    Field::new("typed_value", data_type.clone(), true),
539                ))),
540            )
541            .unwrap();
542
543            typed_structs
544                .entry(path.to_string())
545                .or_insert(Arc::new(StructArray::new(
546                    Fields::from(vec![Arc::new(Field::new(
547                        "typed_value",
548                        data_type.clone(),
549                        true,
550                    ))]),
551                    vec![typed_values.clone()],
552                    None,
553                )));
554        }
555
556        let mut typed_fields: Vec<Arc<Field>> = Vec::new();
557        let mut typed_columns: Vec<ArrayRef> = Vec::new();
558        for (name, tree) in typed_structs {
559            typed_fields.push(Arc::new(Field::new(
560                name.as_str(),
561                tree.data_type().clone(),
562                true,
563            )));
564            typed_columns.push(tree.clone());
565        }
566
567        let typed_struct = Arc::new(StructArray::new(
568            Fields::from(typed_fields),
569            typed_columns,
570            base_variant.inner().nulls().cloned(),
571        ));
572
573        let inner = base_variant.inner();
574        use arrow::array::BinaryViewArray;
575        Arc::new(StructArray::new(
576            Fields::from(vec![
577                Arc::new(Field::new("metadata", DataType::BinaryView, false)),
578                Arc::new(Field::new("value", DataType::BinaryView, true)),
579                Arc::new(Field::new(
580                    "typed_value",
581                    typed_struct.data_type().clone(),
582                    true,
583                )),
584            ]),
585            vec![
586                inner
587                    .column_by_name("metadata")
588                    .cloned()
589                    .unwrap_or_else(|| Arc::new(base_variant.metadata_field().clone()) as ArrayRef),
590                inner.column_by_name("value").cloned().unwrap_or_else(|| {
591                    Arc::new(BinaryViewArray::from(vec![None::<&[u8]>; inner.len()])) as ArrayRef
592                }),
593                typed_struct as ArrayRef,
594            ],
595            inner.nulls().cloned(),
596        )) as ArrayRef
597    }
598
599    fn assert_variant_squeezed(
600        squeezed: &LiquidSqueezedArrayRef,
601        expected_path: &str,
602        bytes: &Bytes,
603    ) {
604        use futures::executor::block_on;
605
606        assert!(!bytes.is_empty());
607        assert_eq!(squeezed.disk_backing(), SqueezedBacking::Arrow);
608        let struct_squeezed = squeezed
609            .as_any()
610            .downcast_ref::<VariantStructSqueezedArray>()
611            .expect("squeezed variant struct");
612        let arrow_array = block_on(struct_squeezed.to_arrow_array());
613        let struct_array = arrow_array
614            .as_any()
615            .downcast_ref::<StructArray>()
616            .expect("variant struct");
617        let value_column = struct_array
618            .column_by_name("value")
619            .expect("value column present");
620        assert_eq!(value_column.len(), value_column.null_count());
621        let typed_struct = struct_array
622            .column_by_name("typed_value")
623            .expect("typed_value column")
624            .as_any()
625            .downcast_ref::<StructArray>()
626            .expect("typed struct");
627        assert!(
628            extract_typed_values_for_path(typed_struct, expected_path).is_some(),
629            "typed path {expected_path} missing from squeezed variant"
630        );
631    }
632
633    #[test]
634    fn test_variant_squeeze_with_hint() {
635        let policy = TranscodeSqueezeEvict;
636        let states = LiquidCompressorStates::new();
637        let variant_arr = enriched_variant_array("name", DataType::Utf8);
638        let hint = CacheExpression::variant_get("name", DataType::Utf8);
639        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
640
641        let (new_batch, bytes) = policy.squeeze(
642            &CacheEntry::memory_arrow(variant_arr),
643            &states,
644            Some(&hint),
645            &squeeze_io,
646        );
647
648        match (new_batch, bytes) {
649            (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
650                assert_variant_squeezed(&squeezed, "name", &b);
651            }
652            other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
653        }
654    }
655
656    #[test]
657    fn test_variant_squeeze_with_int64_path() {
658        let policy = TranscodeSqueezeEvict;
659        let states = LiquidCompressorStates::new();
660        let variant_arr = enriched_variant_array("age", DataType::Int64);
661        let hint = CacheExpression::variant_get("age", DataType::Int64);
662        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
663
664        let (new_batch, bytes) = policy.squeeze(
665            &CacheEntry::memory_arrow(variant_arr),
666            &states,
667            Some(&hint),
668            &squeeze_io,
669        );
670
671        match (new_batch, bytes) {
672            (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
673                assert_variant_squeezed(&squeezed, "age", &b);
674            }
675            other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
676        }
677    }
678
679    #[test]
680    fn test_variant_squeeze_with_multiple_paths_preserves_all_fields() {
681        let policy = TranscodeSqueezeEvict;
682        let states = LiquidCompressorStates::new();
683        let variant_arr = enriched_variant_array_with_paths(&[
684            ("name", DataType::Utf8),
685            ("age", DataType::Int64),
686        ]);
687        let hint = CacheExpression::variant_get("name", DataType::Utf8);
688        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
689
690        let (new_batch, bytes) = policy.squeeze(
691            &CacheEntry::memory_arrow(variant_arr),
692            &states,
693            Some(&hint),
694            &squeeze_io,
695        );
696
697        match (new_batch, bytes) {
698            (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
699                assert!(!b.is_empty());
700                let struct_squeezed = squeezed
701                    .as_any()
702                    .downcast_ref::<VariantStructSqueezedArray>()
703                    .unwrap();
704                let arrow_array = futures::executor::block_on(struct_squeezed.to_arrow_array());
705                let struct_array = arrow_array.as_any().downcast_ref::<StructArray>().unwrap();
706                let typed_value = struct_array
707                    .column_by_name("typed_value")
708                    .unwrap()
709                    .as_any()
710                    .downcast_ref::<StructArray>()
711                    .unwrap();
712                assert!(typed_value.column_by_name("name").is_some());
713                assert!(typed_value.column_by_name("age").is_none());
714            }
715            other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
716        }
717    }
718
719    #[test]
720    fn test_variant_squeeze_without_hint() {
721        let policy = TranscodeSqueezeEvict;
722        let states = LiquidCompressorStates::new();
723        let variant_arr = enriched_variant_array("name", DataType::Utf8);
724        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
725
726        let (new_batch, bytes) = policy.squeeze(
727            &CacheEntry::memory_arrow(variant_arr),
728            &states,
729            None,
730            &squeeze_io,
731        );
732
733        match (new_batch, bytes) {
734            (CacheEntry::DiskArrow(_), Some(b)) => assert!(!b.is_empty()),
735            (CacheEntry::MemoryLiquid(_), None) => {}
736            other => panic!("expected DiskArrow with bytes or MemoryLiquid, got {other:?}"),
737        }
738    }
739
740    #[test]
741    fn test_variant_squeeze_skips_when_path_missing() {
742        let policy = TranscodeSqueezeEvict;
743        let states = LiquidCompressorStates::new();
744        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
745        let variant_arr = enriched_variant_array("name", DataType::Utf8);
746        let hint = CacheExpression::variant_get("age", DataType::Int64);
747
748        let (new_batch, bytes) = policy.squeeze(
749            &CacheEntry::memory_arrow(variant_arr.clone()),
750            &states,
751            Some(&hint),
752            &squeeze_io,
753        );
754
755        match (new_batch, bytes) {
756            (CacheEntry::DiskArrow(dt), Some(b)) => {
757                assert_eq!(dt, variant_arr.data_type().clone());
758                assert!(!b.is_empty());
759            }
760            other => panic!("expected DiskArrow fallback when path missing, got {other:?}"),
761        }
762    }
763}