Skip to main content

liquid_cache/cache/policies/
squeeze.rs

1//! Squeeze policies for liquid cache.
2
3use std::sync::Arc;
4
5use arrow::array::{Array, ArrayRef, StructArray};
6use arrow_schema::DataType;
7use bytes::Bytes;
8use parquet::variant::VariantPath;
9use parquet_variant_compute::{VariantArray, shred_variant, unshred_variant};
10
11use crate::cache::{
12    CacheExpression, LiquidCompressorStates, VariantRequest, cached_batch::CacheEntry,
13    transcode_liquid_inner, transcode_liquid_inner_with_hint, utils::arrow_to_bytes,
14};
15use crate::liquid_array::{
16    LiquidSqueezedArrayRef, SqueezeIoHandler, SqueezedBacking, VariantStructSqueezedArray,
17};
18use crate::utils::VariantSchema;
19
20/// What to do when we need to squeeze an entry?
21#[derive(Debug, Clone)]
22pub enum SqueezeOutcome {
23    /// Replace the cache entry, optionally writing bytes to disk first.
24    Replace {
25        /// Replacement cache entry.
26        entry: CacheEntry,
27        /// Bytes that must be written before inserting the replacement.
28        bytes_to_write: Option<Bytes>,
29    },
30    /// Remove the entry entirely.
31    Remove,
32}
33
34/// Policy that chooses the next representation for an entry under memory pressure.
35pub trait SqueezePolicy: std::fmt::Debug + Send + Sync {
36    /// Squeeze the entry.
37    fn squeeze(
38        &self,
39        entry: &CacheEntry,
40        compressor: &LiquidCompressorStates,
41        squeeze_hint: Option<&CacheExpression>,
42        squeeze_io: &Arc<dyn SqueezeIoHandler>,
43    ) -> SqueezeOutcome;
44}
45
46/// Squeeze the entry to disk.
47#[derive(Debug, Default, Clone)]
48pub struct Evict;
49
50impl SqueezePolicy for Evict {
51    fn squeeze(
52        &self,
53        entry: &CacheEntry,
54        _compressor: &LiquidCompressorStates,
55        _squeeze_hint: Option<&CacheExpression>,
56        _squeeze_io: &Arc<dyn SqueezeIoHandler>,
57    ) -> SqueezeOutcome {
58        match entry {
59            CacheEntry::MemoryArrow(array) => {
60                let bytes = arrow_to_bytes(array).expect("failed to convert arrow to bytes");
61                SqueezeOutcome::Replace {
62                    entry: CacheEntry::disk_arrow(array.data_type().clone(), bytes.len()),
63                    bytes_to_write: Some(bytes),
64                }
65            }
66            CacheEntry::MemoryLiquid(liquid_array) => {
67                let disk_data = liquid_array.to_bytes();
68                SqueezeOutcome::Replace {
69                    entry: CacheEntry::disk_liquid(
70                        liquid_array.original_arrow_data_type(),
71                        disk_data.len(),
72                    ),
73                    bytes_to_write: Some(Bytes::from(disk_data)),
74                }
75            }
76            CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
77                let data_type = squeezed_array.original_arrow_data_type();
78                let new_entry = match squeezed_array.disk_backing() {
79                    SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
80                    SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
81                };
82                SqueezeOutcome::Replace {
83                    entry: new_entry,
84                    bytes_to_write: None,
85                }
86            }
87            CacheEntry::DiskLiquid { .. } | CacheEntry::DiskArrow { .. } => SqueezeOutcome::Remove,
88        }
89    }
90}
91
92/// Squeeze the entry to liquid memory.
93#[derive(Debug, Default, Clone)]
94pub struct TranscodeSqueezeEvict;
95
96impl SqueezePolicy for TranscodeSqueezeEvict {
97    fn squeeze(
98        &self,
99        entry: &CacheEntry,
100        compressor: &LiquidCompressorStates,
101        squeeze_hint: Option<&CacheExpression>,
102        squeeze_io: &Arc<dyn SqueezeIoHandler>,
103    ) -> SqueezeOutcome {
104        match entry {
105            CacheEntry::MemoryArrow(array) => {
106                if let Some(requests) =
107                    squeeze_hint.and_then(|expression| expression.variant_requests())
108                    && let Some((squeezed_array, bytes)) =
109                        try_variant_squeeze(array, requests, compressor)
110                {
111                    return SqueezeOutcome::Replace {
112                        entry: CacheEntry::memory_squeezed_liquid(squeezed_array),
113                        bytes_to_write: Some(bytes),
114                    };
115                }
116                match transcode_liquid_inner_with_hint(array, compressor, squeeze_hint) {
117                    Ok(liquid_array) => SqueezeOutcome::Replace {
118                        entry: CacheEntry::memory_liquid(liquid_array),
119                        bytes_to_write: None,
120                    },
121                    Err(_) => {
122                        let bytes =
123                            arrow_to_bytes(array).expect("failed to convert arrow to bytes");
124                        SqueezeOutcome::Replace {
125                            entry: CacheEntry::disk_arrow(array.data_type().clone(), bytes.len()),
126                            bytes_to_write: Some(bytes),
127                        }
128                    }
129                }
130            }
131            CacheEntry::MemoryLiquid(liquid_array) => {
132                let (squeezed_array, bytes) =
133                    match liquid_array.squeeze(squeeze_io.clone(), squeeze_hint) {
134                        Some(result) => result,
135                        None => {
136                            let bytes = Bytes::from(liquid_array.to_bytes());
137                            return SqueezeOutcome::Replace {
138                                entry: CacheEntry::disk_liquid(
139                                    liquid_array.original_arrow_data_type(),
140                                    bytes.len(),
141                                ),
142                                bytes_to_write: Some(bytes),
143                            };
144                        }
145                    };
146                SqueezeOutcome::Replace {
147                    entry: CacheEntry::memory_squeezed_liquid(squeezed_array),
148                    bytes_to_write: Some(bytes),
149                }
150            }
151            CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
152                let data_type = squeezed_array.original_arrow_data_type();
153                let new_entry = match squeezed_array.disk_backing() {
154                    SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
155                    SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
156                };
157                SqueezeOutcome::Replace {
158                    entry: new_entry,
159                    bytes_to_write: None,
160                }
161            }
162            CacheEntry::DiskLiquid { .. } | CacheEntry::DiskArrow { .. } => SqueezeOutcome::Remove,
163        }
164    }
165}
166
167/// Squeeze the entry to liquid memory, but don't convert to squeezed.
168#[derive(Debug, Default, Clone)]
169pub struct TranscodeEvict;
170
171impl SqueezePolicy for TranscodeEvict {
172    fn squeeze(
173        &self,
174        entry: &CacheEntry,
175        compressor: &LiquidCompressorStates,
176        _squeeze_hint: Option<&CacheExpression>,
177        _squeeze_io: &Arc<dyn SqueezeIoHandler>,
178    ) -> SqueezeOutcome {
179        match entry {
180            CacheEntry::MemoryArrow(array) => {
181                match transcode_liquid_inner_with_hint(array, compressor, None) {
182                    Ok(liquid_array) => SqueezeOutcome::Replace {
183                        entry: CacheEntry::memory_liquid(liquid_array),
184                        bytes_to_write: None,
185                    },
186                    Err(_) => {
187                        let bytes =
188                            arrow_to_bytes(array).expect("failed to convert arrow to bytes");
189                        SqueezeOutcome::Replace {
190                            entry: CacheEntry::disk_arrow(array.data_type().clone(), bytes.len()),
191                            bytes_to_write: Some(bytes),
192                        }
193                    }
194                }
195            }
196            CacheEntry::MemoryLiquid(liquid_array) => {
197                let bytes = Bytes::from(liquid_array.to_bytes());
198                SqueezeOutcome::Replace {
199                    entry: CacheEntry::disk_liquid(
200                        liquid_array.original_arrow_data_type(),
201                        bytes.len(),
202                    ),
203                    bytes_to_write: Some(bytes),
204                }
205            }
206            CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
207                let data_type = squeezed_array.original_arrow_data_type();
208                let new_entry = match squeezed_array.disk_backing() {
209                    SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
210                    SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
211                };
212                SqueezeOutcome::Replace {
213                    entry: new_entry,
214                    bytes_to_write: None,
215                }
216            }
217            CacheEntry::DiskLiquid { .. } | CacheEntry::DiskArrow { .. } => SqueezeOutcome::Remove,
218        }
219    }
220}
221
222pub(crate) fn try_variant_squeeze(
223    array: &ArrayRef,
224    requests: &[VariantRequest],
225    compressor: &LiquidCompressorStates,
226) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
227    let struct_array = array.as_any().downcast_ref::<StructArray>()?;
228    let mut variant_array = VariantArray::try_new(struct_array).ok()?;
229    if variant_array.is_empty() {
230        return None;
231    }
232
233    if requests.is_empty() {
234        return None;
235    }
236
237    let mut shredded_array: Option<ArrayRef> = None;
238    if let Some(shredding_type) = build_shredding_schema(struct_array, requests)
239        && let Ok(unshredded) = unshred_variant(&variant_array)
240        && let Ok(shredded) = shred_variant(&unshredded, &shredding_type)
241    {
242        let shredded_struct: ArrayRef = Arc::new(shredded.into_inner());
243        variant_array = VariantArray::try_new(shredded_struct.as_ref()).ok()?;
244        shredded_array = Some(shredded_struct);
245    }
246
247    let typed_root = variant_array.typed_value_field()?;
248    let typed_root = typed_root.as_any().downcast_ref::<StructArray>()?;
249
250    let mut collected = Vec::new();
251    for request in requests {
252        let path = request.path().trim();
253        if path.is_empty() {
254            continue;
255        }
256        let Some(path_struct) = extract_typed_values_for_path(typed_root, path) else {
257            continue;
258        };
259        let path_struct = path_struct.as_any().downcast_ref::<StructArray>()?;
260        let Some(typed_values) = path_struct.column_by_name("typed_value") else {
261            continue;
262        };
263        if typed_values.len() != array.len() {
264            continue;
265        }
266        collected.push((Arc::<str>::from(path.to_string()), typed_values.clone()));
267    }
268
269    if collected.is_empty() {
270        return None;
271    }
272
273    let backing_array = shredded_array.as_ref().unwrap_or(array);
274    let nulls = variant_array.inner().nulls().cloned();
275    let bytes = arrow_to_bytes(backing_array).ok()?;
276    let mut liquid_values = Vec::with_capacity(collected.len());
277    for (path, typed_values) in collected {
278        let Ok(liquid_array) = transcode_liquid_inner(&typed_values, compressor) else {
279            return None;
280        };
281        liquid_values.push((path, liquid_array));
282    }
283    let squeezed = VariantStructSqueezedArray::new(
284        liquid_values,
285        nulls,
286        backing_array.data_type().clone(),
287        bytes.len(),
288    );
289    Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, bytes))
290}
291
292fn build_shredding_schema(
293    variant_struct: &StructArray,
294    requests: &[VariantRequest],
295) -> Option<DataType> {
296    let typed_field = match variant_struct.data_type() {
297        DataType::Struct(fields) => fields
298            .iter()
299            .find(|child| child.name() == "typed_value")
300            .cloned(),
301        _ => None,
302    };
303
304    let mut schema = VariantSchema::new(typed_field.as_deref());
305    for request in requests {
306        let path = request.path().trim();
307        if path.is_empty() {
308            continue;
309        }
310        schema.insert_path(path, request.data_type());
311    }
312    schema.shredding_type()
313}
314
315fn extract_typed_values_for_path(typed_root: &StructArray, path: &str) -> Option<ArrayRef> {
316    let path = VariantPath::try_from(path).ok()?;
317    if path.is_empty() {
318        return None;
319    }
320
321    let mut cursor = typed_root;
322    for (idx, element) in path.iter().enumerate() {
323        let field_name = match element {
324            parquet::variant::VariantPathElement::Field { name } => name.as_ref(),
325            parquet::variant::VariantPathElement::Index { .. } => return None,
326        };
327        let field = cursor.column_by_name(field_name)?;
328        if idx == path.len() - 1 {
329            return Some(field.clone());
330        }
331        let struct_field = field.as_any().downcast_ref::<StructArray>()?;
332        let typed_value = struct_field.column_by_name("typed_value")?;
333        cursor = typed_value.as_any().downcast_ref::<StructArray>()?;
334    }
335
336    None
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use crate::cache::cached_batch::CacheEntry;
343    use crate::cache::{CacheExpression, io_context::TestSqueezeIo};
344    use crate::liquid_array::{LiquidSqueezedArray, SqueezedBacking, VariantStructSqueezedArray};
345    use arrow::array::{Array, ArrayRef, Int32Array, StringArray, StructArray};
346    use arrow_schema::Fields;
347    use arrow_schema::{DataType, Field};
348    use parquet::variant::VariantPath;
349    use parquet_variant_compute::{GetOptions, json_to_variant, variant_get};
350    use std::collections::BTreeMap;
351    use std::sync::Arc;
352
353    fn int_array(n: i32) -> ArrayRef {
354        Arc::new(Int32Array::from_iter_values(0..n))
355    }
356
357    fn decode_arrow(bytes: &Bytes) -> ArrayRef {
358        let cursor = std::io::Cursor::new(bytes.to_vec());
359        let mut reader =
360            arrow::ipc::reader::StreamReader::try_new(cursor, None).expect("arrow stream");
361        let batch = reader
362            .next()
363            .expect("non-empty stream")
364            .expect("read stream");
365        batch.column(0).clone()
366    }
367
368    fn into_replace(outcome: SqueezeOutcome) -> (CacheEntry, Option<Bytes>) {
369        match outcome {
370            SqueezeOutcome::Replace {
371                entry,
372                bytes_to_write,
373            } => (entry, bytes_to_write),
374            SqueezeOutcome::Remove => panic!("expected replacement"),
375        }
376    }
377
378    fn struct_array() -> ArrayRef {
379        let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as ArrayRef;
380        let field = Arc::new(Field::new("value", DataType::Int32, true));
381        Arc::new(StructArray::from(vec![(field, values)]))
382    }
383
384    #[test]
385    fn test_squeeze_to_disk_policy() {
386        let disk = Evict;
387        let states = LiquidCompressorStates::new();
388        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
389        // MemoryArrow -> DiskArrow + bytes (Arrow IPC)
390        let arr = int_array(8);
391        let (new_batch, bytes) = into_replace(disk.squeeze(
392            &CacheEntry::memory_arrow(arr.clone()),
393            &states,
394            None,
395            &squeeze_io,
396        ));
397        let data = new_batch;
398        match (data, bytes) {
399            (
400                CacheEntry::DiskArrow {
401                    data_type: dt,
402                    disk_bytes,
403                },
404                Some(b),
405            ) => {
406                assert_eq!(dt, DataType::Int32);
407                assert_eq!(disk_bytes, b.len());
408                let decoded = decode_arrow(&b);
409                assert_eq!(decoded.as_ref(), arr.as_ref());
410            }
411            other => panic!("unexpected: {other:?}"),
412        }
413
414        // MemoryLiquid (strings) -> MemoryHybridLiquid + bytes
415        let strings = Arc::new(StringArray::from(vec!["a", "b", "a"])) as ArrayRef;
416        let liquid = transcode_liquid_inner(&strings, &states).unwrap();
417        let (new_batch, bytes) = into_replace(disk.squeeze(
418            &CacheEntry::memory_liquid(liquid.clone()),
419            &states,
420            None,
421            &squeeze_io,
422        ));
423        let data = new_batch;
424        match (data, bytes) {
425            (CacheEntry::DiskLiquid { disk_bytes, .. }, Some(b)) => {
426                assert_eq!(disk_bytes, b.len());
427                assert!(!b.is_empty());
428            }
429            other => panic!("unexpected: {other:?}"),
430        }
431
432        let expression = Some(&CacheExpression::PredicateColumn);
433        // MemorySqueezedLiquid -> DiskLiquid, no extra bytes
434        let squeezed = match liquid.squeeze(squeeze_io.clone(), expression) {
435            Some((h, _b)) => h,
436            None => panic!("squeeze should succeed for byte-view"),
437        };
438        let (new_batch, bytes) = into_replace(disk.squeeze(
439            &CacheEntry::memory_squeezed_liquid(squeezed),
440            &states,
441            expression,
442            &squeeze_io,
443        ));
444        let data = new_batch;
445        match (data, bytes) {
446            (
447                CacheEntry::DiskLiquid {
448                    data_type: _data_type,
449                    ..
450                },
451                None,
452            ) => {}
453            other => panic!("unexpected: {other:?}"),
454        }
455
456        // Disk* -> remove
457        let b1 = disk.squeeze(
458            &CacheEntry::disk_arrow(DataType::Utf8, 1),
459            &states,
460            expression,
461            &squeeze_io,
462        );
463        assert!(matches!(b1, SqueezeOutcome::Remove));
464        let b2 = disk.squeeze(
465            &CacheEntry::disk_liquid(DataType::Utf8, 1),
466            &states,
467            expression,
468            &squeeze_io,
469        );
470        assert!(matches!(b2, SqueezeOutcome::Remove));
471    }
472
473    #[test]
474    fn test_squeeze_to_liquid_policy() {
475        let to_liquid = TranscodeSqueezeEvict;
476        let states = LiquidCompressorStates::new();
477        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
478
479        // MemoryArrow -> MemoryLiquid, no bytes
480        let arr = int_array(8);
481        let (new_batch, bytes) = into_replace(to_liquid.squeeze(
482            &CacheEntry::memory_arrow(arr.clone()),
483            &states,
484            None,
485            &squeeze_io,
486        ));
487        assert!(bytes.is_none());
488        match new_batch {
489            CacheEntry::MemoryLiquid(liq) => {
490                assert_eq!(liq.to_arrow_array().as_ref(), arr.as_ref());
491            }
492            other => panic!("unexpected: {other:?}"),
493        }
494        let expression = Some(&CacheExpression::PredicateColumn);
495
496        // MemoryLiquid (strings) -> MemorySqueezedLiquid + bytes
497        let strings = Arc::new(StringArray::from(vec!["x", "y", "x"])) as ArrayRef;
498        let liquid = transcode_liquid_inner(&strings, &states).unwrap();
499        let (new_batch, bytes) = into_replace(to_liquid.squeeze(
500            &CacheEntry::memory_liquid(liquid),
501            &states,
502            expression,
503            &squeeze_io,
504        ));
505        match (new_batch, bytes) {
506            (CacheEntry::MemorySqueezedLiquid(_), Some(b)) => assert!(!b.is_empty()),
507            other => panic!("unexpected: {other:?}"),
508        }
509
510        // MemorySqueezedLiquid -> DiskLiquid, no bytes
511        let strings = Arc::new(StringArray::from(vec!["m", "n"])) as ArrayRef;
512        let liquid = transcode_liquid_inner(&strings, &states).unwrap();
513        let squeezed = liquid.squeeze(squeeze_io.clone(), expression).unwrap().0;
514        let (new_batch, bytes) = into_replace(to_liquid.squeeze(
515            &CacheEntry::memory_squeezed_liquid(squeezed),
516            &states,
517            expression,
518            &squeeze_io,
519        ));
520        match (new_batch, bytes) {
521            (
522                CacheEntry::DiskLiquid {
523                    data_type: DataType::Utf8,
524                    ..
525                },
526                None,
527            ) => {}
528            other => panic!("unexpected: {other:?}"),
529        }
530
531        // Disk* -> remove
532        let b1 = to_liquid.squeeze(
533            &CacheEntry::disk_arrow(DataType::Utf8, 1),
534            &states,
535            expression,
536            &squeeze_io,
537        );
538        assert!(matches!(b1, SqueezeOutcome::Remove));
539        let b2 = to_liquid.squeeze(
540            &CacheEntry::disk_liquid(DataType::Utf8, 1),
541            &states,
542            expression,
543            &squeeze_io,
544        );
545        assert!(matches!(b2, SqueezeOutcome::Remove));
546    }
547
548    #[test]
549    fn transcode_squeeze_struct_falls_back_to_disk_arrow() {
550        let to_liquid = TranscodeSqueezeEvict;
551        let states = LiquidCompressorStates::new();
552        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
553        let struct_arr = struct_array();
554        let (new_batch, bytes) = into_replace(to_liquid.squeeze(
555            &CacheEntry::memory_arrow(struct_arr.clone()),
556            &states,
557            None,
558            &squeeze_io,
559        ));
560        match (new_batch, bytes) {
561            (
562                CacheEntry::DiskArrow {
563                    data_type: dt,
564                    disk_bytes,
565                },
566                Some(b),
567            ) => {
568                assert_eq!(&dt, struct_arr.data_type());
569                assert_eq!(disk_bytes, b.len());
570                assert_eq!(decode_arrow(&b).as_ref(), struct_arr.as_ref());
571            }
572            other => panic!("expected disk arrow fallback, got {other:?}"),
573        }
574    }
575
576    #[test]
577    fn transcode_evict_struct_falls_back_to_disk_arrow() {
578        let to_disk = TranscodeEvict;
579        let states = LiquidCompressorStates::new();
580        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
581        let struct_arr = struct_array();
582        let (new_batch, bytes) = into_replace(to_disk.squeeze(
583            &CacheEntry::memory_arrow(struct_arr.clone()),
584            &states,
585            None,
586            &squeeze_io,
587        ));
588        match (new_batch, bytes) {
589            (
590                CacheEntry::DiskArrow {
591                    data_type: dt,
592                    disk_bytes,
593                },
594                Some(b),
595            ) => {
596                assert_eq!(&dt, struct_arr.data_type());
597                assert_eq!(disk_bytes, b.len());
598                assert_eq!(decode_arrow(&b).as_ref(), struct_arr.as_ref());
599            }
600            other => panic!("expected disk arrow fallback, got {other:?}"),
601        }
602    }
603
604    fn enriched_variant_array(path: &str, data_type: DataType) -> ArrayRef {
605        enriched_variant_array_with_paths(&[(path, data_type)])
606    }
607
608    fn enriched_variant_array_with_paths(entries: &[(&str, DataType)]) -> ArrayRef {
609        let values: ArrayRef = Arc::new(StringArray::from(vec![
610            Some(r#"{"name": "Alice", "age": 30}"#),
611            Some(r#"{"name": "Bob", "age": 25}"#),
612            Some(r#"{"name": "Charlie", "age": 35}"#),
613        ]));
614        let base_variant = json_to_variant(&values).unwrap();
615        let base_arr: ArrayRef = Arc::new(base_variant.inner().clone());
616
617        let mut typed_structs: BTreeMap<String, ArrayRef> = BTreeMap::new();
618
619        for (path, data_type) in entries.iter() {
620            let typed_values = variant_get(
621                &base_arr,
622                GetOptions::new_with_path(
623                    VariantPath::try_from(*path).expect("variant path should parse"),
624                )
625                .with_as_type(Some(Arc::new(Field::new(
626                    "typed_value",
627                    data_type.clone(),
628                    true,
629                )))),
630            )
631            .unwrap();
632
633            typed_structs
634                .entry(path.to_string())
635                .or_insert(Arc::new(StructArray::new(
636                    Fields::from(vec![Arc::new(Field::new(
637                        "typed_value",
638                        data_type.clone(),
639                        true,
640                    ))]),
641                    vec![typed_values.clone()],
642                    None,
643                )));
644        }
645
646        let mut typed_fields: Vec<Arc<Field>> = Vec::new();
647        let mut typed_columns: Vec<ArrayRef> = Vec::new();
648        for (name, tree) in typed_structs {
649            typed_fields.push(Arc::new(Field::new(
650                name.as_str(),
651                tree.data_type().clone(),
652                true,
653            )));
654            typed_columns.push(tree.clone());
655        }
656
657        let typed_struct = Arc::new(StructArray::new(
658            Fields::from(typed_fields),
659            typed_columns,
660            base_variant.inner().nulls().cloned(),
661        ));
662
663        let inner = base_variant.inner();
664        use arrow::array::BinaryViewArray;
665        Arc::new(StructArray::new(
666            Fields::from(vec![
667                Arc::new(Field::new("metadata", DataType::BinaryView, false)),
668                Arc::new(Field::new("value", DataType::BinaryView, true)),
669                Arc::new(Field::new(
670                    "typed_value",
671                    typed_struct.data_type().clone(),
672                    true,
673                )),
674            ]),
675            vec![
676                inner
677                    .column_by_name("metadata")
678                    .cloned()
679                    .unwrap_or_else(|| Arc::new(base_variant.metadata_field().clone()) as ArrayRef),
680                inner.column_by_name("value").cloned().unwrap_or_else(|| {
681                    Arc::new(BinaryViewArray::from(vec![None::<&[u8]>; inner.len()])) as ArrayRef
682                }),
683                typed_struct as ArrayRef,
684            ],
685            inner.nulls().cloned(),
686        )) as ArrayRef
687    }
688
689    fn assert_variant_squeezed(
690        squeezed: &LiquidSqueezedArrayRef,
691        expected_path: &str,
692        bytes: &Bytes,
693    ) {
694        use futures::executor::block_on;
695
696        assert!(!bytes.is_empty());
697        assert!(matches!(squeezed.disk_backing(), SqueezedBacking::Arrow(_)));
698        let struct_squeezed = squeezed
699            .as_any()
700            .downcast_ref::<VariantStructSqueezedArray>()
701            .expect("squeezed variant struct");
702        let arrow_array = block_on(struct_squeezed.to_arrow_array());
703        let struct_array = arrow_array
704            .as_any()
705            .downcast_ref::<StructArray>()
706            .expect("variant struct");
707        let value_column = struct_array
708            .column_by_name("value")
709            .expect("value column present");
710        assert_eq!(value_column.len(), value_column.null_count());
711        let typed_struct = struct_array
712            .column_by_name("typed_value")
713            .expect("typed_value column")
714            .as_any()
715            .downcast_ref::<StructArray>()
716            .expect("typed struct");
717        assert!(
718            extract_typed_values_for_path(typed_struct, expected_path).is_some(),
719            "typed path {expected_path} missing from squeezed variant"
720        );
721    }
722
723    #[test]
724    fn test_variant_squeeze_with_hint() {
725        let policy = TranscodeSqueezeEvict;
726        let states = LiquidCompressorStates::new();
727        let variant_arr = enriched_variant_array("name", DataType::Utf8);
728        let hint = CacheExpression::variant_get("name", DataType::Utf8);
729        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
730
731        let (new_batch, bytes) = into_replace(policy.squeeze(
732            &CacheEntry::memory_arrow(variant_arr),
733            &states,
734            Some(&hint),
735            &squeeze_io,
736        ));
737
738        match (new_batch, bytes) {
739            (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
740                assert_variant_squeezed(&squeezed, "name", &b);
741            }
742            other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
743        }
744    }
745
746    #[test]
747    fn test_variant_squeeze_with_int64_path() {
748        let policy = TranscodeSqueezeEvict;
749        let states = LiquidCompressorStates::new();
750        let variant_arr = enriched_variant_array("age", DataType::Int64);
751        let hint = CacheExpression::variant_get("age", DataType::Int64);
752        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
753
754        let (new_batch, bytes) = into_replace(policy.squeeze(
755            &CacheEntry::memory_arrow(variant_arr),
756            &states,
757            Some(&hint),
758            &squeeze_io,
759        ));
760
761        match (new_batch, bytes) {
762            (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
763                assert_variant_squeezed(&squeezed, "age", &b);
764            }
765            other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
766        }
767    }
768
769    #[test]
770    fn test_variant_squeeze_with_multiple_paths_preserves_all_fields() {
771        let policy = TranscodeSqueezeEvict;
772        let states = LiquidCompressorStates::new();
773        let variant_arr = enriched_variant_array_with_paths(&[
774            ("name", DataType::Utf8),
775            ("age", DataType::Int64),
776        ]);
777        let hint = CacheExpression::variant_get("name", DataType::Utf8);
778        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
779
780        let (new_batch, bytes) = into_replace(policy.squeeze(
781            &CacheEntry::memory_arrow(variant_arr),
782            &states,
783            Some(&hint),
784            &squeeze_io,
785        ));
786
787        match (new_batch, bytes) {
788            (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
789                assert!(!b.is_empty());
790                let struct_squeezed = squeezed
791                    .as_any()
792                    .downcast_ref::<VariantStructSqueezedArray>()
793                    .unwrap();
794                let arrow_array = futures::executor::block_on(struct_squeezed.to_arrow_array());
795                let struct_array = arrow_array.as_any().downcast_ref::<StructArray>().unwrap();
796                let typed_value = struct_array
797                    .column_by_name("typed_value")
798                    .unwrap()
799                    .as_any()
800                    .downcast_ref::<StructArray>()
801                    .unwrap();
802                assert!(typed_value.column_by_name("name").is_some());
803                assert!(typed_value.column_by_name("age").is_none());
804            }
805            other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
806        }
807    }
808
809    #[test]
810    fn test_variant_squeeze_without_hint() {
811        let policy = TranscodeSqueezeEvict;
812        let states = LiquidCompressorStates::new();
813        let variant_arr = enriched_variant_array("name", DataType::Utf8);
814        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
815
816        let (new_batch, bytes) = into_replace(policy.squeeze(
817            &CacheEntry::memory_arrow(variant_arr),
818            &states,
819            None,
820            &squeeze_io,
821        ));
822
823        match (new_batch, bytes) {
824            (CacheEntry::DiskArrow { disk_bytes, .. }, Some(b)) => {
825                assert_eq!(disk_bytes, b.len());
826                assert!(!b.is_empty());
827            }
828            (CacheEntry::MemoryLiquid(_), None) => {}
829            other => panic!("expected DiskArrow with bytes or MemoryLiquid, got {other:?}"),
830        }
831    }
832
833    #[test]
834    fn test_variant_squeeze_skips_when_path_missing() {
835        let policy = TranscodeSqueezeEvict;
836        let states = LiquidCompressorStates::new();
837        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
838        let variant_arr = enriched_variant_array("name", DataType::Utf8);
839        let hint = CacheExpression::variant_get("age", DataType::Int64);
840
841        let (new_batch, bytes) = into_replace(policy.squeeze(
842            &CacheEntry::memory_arrow(variant_arr.clone()),
843            &states,
844            Some(&hint),
845            &squeeze_io,
846        ));
847
848        match (new_batch, bytes) {
849            (
850                CacheEntry::DiskArrow {
851                    data_type: dt,
852                    disk_bytes,
853                },
854                Some(b),
855            ) => {
856                assert_eq!(dt, variant_arr.data_type().clone());
857                assert_eq!(disk_bytes, b.len());
858                assert!(!b.is_empty());
859            }
860            other => panic!("expected DiskArrow fallback when path missing, got {other:?}"),
861        }
862    }
863}