1use std::sync::Arc;
4
5use arrow::array::{Array, ArrayRef, StructArray};
6use arrow_schema::DataType;
7use bytes::Bytes;
8use parquet::variant::VariantPath;
9use parquet_variant_compute::{VariantArray, shred_variant, unshred_variant};
10
11use crate::cache::{
12 CacheExpression, LiquidCompressorStates, VariantRequest, cached_batch::CacheEntry,
13 transcode_liquid_inner, transcode_liquid_inner_with_hint, utils::arrow_to_bytes,
14};
15use crate::liquid_array::{
16 LiquidSqueezedArrayRef, SqueezeIoHandler, SqueezedBacking, VariantStructSqueezedArray,
17};
18use crate::utils::VariantSchema;
19
20#[derive(Debug, Clone)]
22pub enum SqueezeOutcome {
23 Replace {
25 entry: CacheEntry,
27 bytes_to_write: Option<Bytes>,
29 },
30 Remove,
32}
33
34pub trait SqueezePolicy: std::fmt::Debug + Send + Sync {
36 fn squeeze(
38 &self,
39 entry: &CacheEntry,
40 compressor: &LiquidCompressorStates,
41 squeeze_hint: Option<&CacheExpression>,
42 squeeze_io: &Arc<dyn SqueezeIoHandler>,
43 ) -> SqueezeOutcome;
44}
45
46#[derive(Debug, Default, Clone)]
48pub struct Evict;
49
50impl SqueezePolicy for Evict {
51 fn squeeze(
52 &self,
53 entry: &CacheEntry,
54 _compressor: &LiquidCompressorStates,
55 _squeeze_hint: Option<&CacheExpression>,
56 _squeeze_io: &Arc<dyn SqueezeIoHandler>,
57 ) -> SqueezeOutcome {
58 match entry {
59 CacheEntry::MemoryArrow(array) => {
60 let bytes = arrow_to_bytes(array).expect("failed to convert arrow to bytes");
61 SqueezeOutcome::Replace {
62 entry: CacheEntry::disk_arrow(array.data_type().clone(), bytes.len()),
63 bytes_to_write: Some(bytes),
64 }
65 }
66 CacheEntry::MemoryLiquid(liquid_array) => {
67 let disk_data = liquid_array.to_bytes();
68 SqueezeOutcome::Replace {
69 entry: CacheEntry::disk_liquid(
70 liquid_array.original_arrow_data_type(),
71 disk_data.len(),
72 ),
73 bytes_to_write: Some(Bytes::from(disk_data)),
74 }
75 }
76 CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
77 let data_type = squeezed_array.original_arrow_data_type();
78 let new_entry = match squeezed_array.disk_backing() {
79 SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
80 SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
81 };
82 SqueezeOutcome::Replace {
83 entry: new_entry,
84 bytes_to_write: None,
85 }
86 }
87 CacheEntry::DiskLiquid { .. } | CacheEntry::DiskArrow { .. } => SqueezeOutcome::Remove,
88 }
89 }
90}
91
92#[derive(Debug, Default, Clone)]
94pub struct TranscodeSqueezeEvict;
95
96impl SqueezePolicy for TranscodeSqueezeEvict {
97 fn squeeze(
98 &self,
99 entry: &CacheEntry,
100 compressor: &LiquidCompressorStates,
101 squeeze_hint: Option<&CacheExpression>,
102 squeeze_io: &Arc<dyn SqueezeIoHandler>,
103 ) -> SqueezeOutcome {
104 match entry {
105 CacheEntry::MemoryArrow(array) => {
106 if let Some(requests) =
107 squeeze_hint.and_then(|expression| expression.variant_requests())
108 && let Some((squeezed_array, bytes)) =
109 try_variant_squeeze(array, requests, compressor)
110 {
111 return SqueezeOutcome::Replace {
112 entry: CacheEntry::memory_squeezed_liquid(squeezed_array),
113 bytes_to_write: Some(bytes),
114 };
115 }
116 match transcode_liquid_inner_with_hint(array, compressor, squeeze_hint) {
117 Ok(liquid_array) => SqueezeOutcome::Replace {
118 entry: CacheEntry::memory_liquid(liquid_array),
119 bytes_to_write: None,
120 },
121 Err(_) => {
122 let bytes =
123 arrow_to_bytes(array).expect("failed to convert arrow to bytes");
124 SqueezeOutcome::Replace {
125 entry: CacheEntry::disk_arrow(array.data_type().clone(), bytes.len()),
126 bytes_to_write: Some(bytes),
127 }
128 }
129 }
130 }
131 CacheEntry::MemoryLiquid(liquid_array) => {
132 let (squeezed_array, bytes) =
133 match liquid_array.squeeze(squeeze_io.clone(), squeeze_hint) {
134 Some(result) => result,
135 None => {
136 let bytes = Bytes::from(liquid_array.to_bytes());
137 return SqueezeOutcome::Replace {
138 entry: CacheEntry::disk_liquid(
139 liquid_array.original_arrow_data_type(),
140 bytes.len(),
141 ),
142 bytes_to_write: Some(bytes),
143 };
144 }
145 };
146 SqueezeOutcome::Replace {
147 entry: CacheEntry::memory_squeezed_liquid(squeezed_array),
148 bytes_to_write: Some(bytes),
149 }
150 }
151 CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
152 let data_type = squeezed_array.original_arrow_data_type();
153 let new_entry = match squeezed_array.disk_backing() {
154 SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
155 SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
156 };
157 SqueezeOutcome::Replace {
158 entry: new_entry,
159 bytes_to_write: None,
160 }
161 }
162 CacheEntry::DiskLiquid { .. } | CacheEntry::DiskArrow { .. } => SqueezeOutcome::Remove,
163 }
164 }
165}
166
167#[derive(Debug, Default, Clone)]
169pub struct TranscodeEvict;
170
171impl SqueezePolicy for TranscodeEvict {
172 fn squeeze(
173 &self,
174 entry: &CacheEntry,
175 compressor: &LiquidCompressorStates,
176 _squeeze_hint: Option<&CacheExpression>,
177 _squeeze_io: &Arc<dyn SqueezeIoHandler>,
178 ) -> SqueezeOutcome {
179 match entry {
180 CacheEntry::MemoryArrow(array) => {
181 match transcode_liquid_inner_with_hint(array, compressor, None) {
182 Ok(liquid_array) => SqueezeOutcome::Replace {
183 entry: CacheEntry::memory_liquid(liquid_array),
184 bytes_to_write: None,
185 },
186 Err(_) => {
187 let bytes =
188 arrow_to_bytes(array).expect("failed to convert arrow to bytes");
189 SqueezeOutcome::Replace {
190 entry: CacheEntry::disk_arrow(array.data_type().clone(), bytes.len()),
191 bytes_to_write: Some(bytes),
192 }
193 }
194 }
195 }
196 CacheEntry::MemoryLiquid(liquid_array) => {
197 let bytes = Bytes::from(liquid_array.to_bytes());
198 SqueezeOutcome::Replace {
199 entry: CacheEntry::disk_liquid(
200 liquid_array.original_arrow_data_type(),
201 bytes.len(),
202 ),
203 bytes_to_write: Some(bytes),
204 }
205 }
206 CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
207 let data_type = squeezed_array.original_arrow_data_type();
208 let new_entry = match squeezed_array.disk_backing() {
209 SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
210 SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
211 };
212 SqueezeOutcome::Replace {
213 entry: new_entry,
214 bytes_to_write: None,
215 }
216 }
217 CacheEntry::DiskLiquid { .. } | CacheEntry::DiskArrow { .. } => SqueezeOutcome::Remove,
218 }
219 }
220}
221
222pub(crate) fn try_variant_squeeze(
223 array: &ArrayRef,
224 requests: &[VariantRequest],
225 compressor: &LiquidCompressorStates,
226) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
227 let struct_array = array.as_any().downcast_ref::<StructArray>()?;
228 let mut variant_array = VariantArray::try_new(struct_array).ok()?;
229 if variant_array.is_empty() {
230 return None;
231 }
232
233 if requests.is_empty() {
234 return None;
235 }
236
237 let mut shredded_array: Option<ArrayRef> = None;
238 if let Some(shredding_type) = build_shredding_schema(struct_array, requests)
239 && let Ok(unshredded) = unshred_variant(&variant_array)
240 && let Ok(shredded) = shred_variant(&unshredded, &shredding_type)
241 {
242 let shredded_struct: ArrayRef = Arc::new(shredded.into_inner());
243 variant_array = VariantArray::try_new(shredded_struct.as_ref()).ok()?;
244 shredded_array = Some(shredded_struct);
245 }
246
247 let typed_root = variant_array.typed_value_field()?;
248 let typed_root = typed_root.as_any().downcast_ref::<StructArray>()?;
249
250 let mut collected = Vec::new();
251 for request in requests {
252 let path = request.path().trim();
253 if path.is_empty() {
254 continue;
255 }
256 let Some(path_struct) = extract_typed_values_for_path(typed_root, path) else {
257 continue;
258 };
259 let path_struct = path_struct.as_any().downcast_ref::<StructArray>()?;
260 let Some(typed_values) = path_struct.column_by_name("typed_value") else {
261 continue;
262 };
263 if typed_values.len() != array.len() {
264 continue;
265 }
266 collected.push((Arc::<str>::from(path.to_string()), typed_values.clone()));
267 }
268
269 if collected.is_empty() {
270 return None;
271 }
272
273 let backing_array = shredded_array.as_ref().unwrap_or(array);
274 let nulls = variant_array.inner().nulls().cloned();
275 let bytes = arrow_to_bytes(backing_array).ok()?;
276 let mut liquid_values = Vec::with_capacity(collected.len());
277 for (path, typed_values) in collected {
278 let Ok(liquid_array) = transcode_liquid_inner(&typed_values, compressor) else {
279 return None;
280 };
281 liquid_values.push((path, liquid_array));
282 }
283 let squeezed = VariantStructSqueezedArray::new(
284 liquid_values,
285 nulls,
286 backing_array.data_type().clone(),
287 bytes.len(),
288 );
289 Some((Arc::new(squeezed) as LiquidSqueezedArrayRef, bytes))
290}
291
292fn build_shredding_schema(
293 variant_struct: &StructArray,
294 requests: &[VariantRequest],
295) -> Option<DataType> {
296 let typed_field = match variant_struct.data_type() {
297 DataType::Struct(fields) => fields
298 .iter()
299 .find(|child| child.name() == "typed_value")
300 .cloned(),
301 _ => None,
302 };
303
304 let mut schema = VariantSchema::new(typed_field.as_deref());
305 for request in requests {
306 let path = request.path().trim();
307 if path.is_empty() {
308 continue;
309 }
310 schema.insert_path(path, request.data_type());
311 }
312 schema.shredding_type()
313}
314
315fn extract_typed_values_for_path(typed_root: &StructArray, path: &str) -> Option<ArrayRef> {
316 let path = VariantPath::try_from(path).ok()?;
317 if path.is_empty() {
318 return None;
319 }
320
321 let mut cursor = typed_root;
322 for (idx, element) in path.iter().enumerate() {
323 let field_name = match element {
324 parquet::variant::VariantPathElement::Field { name } => name.as_ref(),
325 parquet::variant::VariantPathElement::Index { .. } => return None,
326 };
327 let field = cursor.column_by_name(field_name)?;
328 if idx == path.len() - 1 {
329 return Some(field.clone());
330 }
331 let struct_field = field.as_any().downcast_ref::<StructArray>()?;
332 let typed_value = struct_field.column_by_name("typed_value")?;
333 cursor = typed_value.as_any().downcast_ref::<StructArray>()?;
334 }
335
336 None
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use crate::cache::cached_batch::CacheEntry;
343 use crate::cache::{CacheExpression, io_context::TestSqueezeIo};
344 use crate::liquid_array::{LiquidSqueezedArray, SqueezedBacking, VariantStructSqueezedArray};
345 use arrow::array::{Array, ArrayRef, Int32Array, StringArray, StructArray};
346 use arrow_schema::Fields;
347 use arrow_schema::{DataType, Field};
348 use parquet::variant::VariantPath;
349 use parquet_variant_compute::{GetOptions, json_to_variant, variant_get};
350 use std::collections::BTreeMap;
351 use std::sync::Arc;
352
353 fn int_array(n: i32) -> ArrayRef {
354 Arc::new(Int32Array::from_iter_values(0..n))
355 }
356
357 fn decode_arrow(bytes: &Bytes) -> ArrayRef {
358 let cursor = std::io::Cursor::new(bytes.to_vec());
359 let mut reader =
360 arrow::ipc::reader::StreamReader::try_new(cursor, None).expect("arrow stream");
361 let batch = reader
362 .next()
363 .expect("non-empty stream")
364 .expect("read stream");
365 batch.column(0).clone()
366 }
367
368 fn into_replace(outcome: SqueezeOutcome) -> (CacheEntry, Option<Bytes>) {
369 match outcome {
370 SqueezeOutcome::Replace {
371 entry,
372 bytes_to_write,
373 } => (entry, bytes_to_write),
374 SqueezeOutcome::Remove => panic!("expected replacement"),
375 }
376 }
377
378 fn struct_array() -> ArrayRef {
379 let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as ArrayRef;
380 let field = Arc::new(Field::new("value", DataType::Int32, true));
381 Arc::new(StructArray::from(vec![(field, values)]))
382 }
383
384 #[test]
385 fn test_squeeze_to_disk_policy() {
386 let disk = Evict;
387 let states = LiquidCompressorStates::new();
388 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
389 let arr = int_array(8);
391 let (new_batch, bytes) = into_replace(disk.squeeze(
392 &CacheEntry::memory_arrow(arr.clone()),
393 &states,
394 None,
395 &squeeze_io,
396 ));
397 let data = new_batch;
398 match (data, bytes) {
399 (
400 CacheEntry::DiskArrow {
401 data_type: dt,
402 disk_bytes,
403 },
404 Some(b),
405 ) => {
406 assert_eq!(dt, DataType::Int32);
407 assert_eq!(disk_bytes, b.len());
408 let decoded = decode_arrow(&b);
409 assert_eq!(decoded.as_ref(), arr.as_ref());
410 }
411 other => panic!("unexpected: {other:?}"),
412 }
413
414 let strings = Arc::new(StringArray::from(vec!["a", "b", "a"])) as ArrayRef;
416 let liquid = transcode_liquid_inner(&strings, &states).unwrap();
417 let (new_batch, bytes) = into_replace(disk.squeeze(
418 &CacheEntry::memory_liquid(liquid.clone()),
419 &states,
420 None,
421 &squeeze_io,
422 ));
423 let data = new_batch;
424 match (data, bytes) {
425 (CacheEntry::DiskLiquid { disk_bytes, .. }, Some(b)) => {
426 assert_eq!(disk_bytes, b.len());
427 assert!(!b.is_empty());
428 }
429 other => panic!("unexpected: {other:?}"),
430 }
431
432 let expression = Some(&CacheExpression::PredicateColumn);
433 let squeezed = match liquid.squeeze(squeeze_io.clone(), expression) {
435 Some((h, _b)) => h,
436 None => panic!("squeeze should succeed for byte-view"),
437 };
438 let (new_batch, bytes) = into_replace(disk.squeeze(
439 &CacheEntry::memory_squeezed_liquid(squeezed),
440 &states,
441 expression,
442 &squeeze_io,
443 ));
444 let data = new_batch;
445 match (data, bytes) {
446 (
447 CacheEntry::DiskLiquid {
448 data_type: _data_type,
449 ..
450 },
451 None,
452 ) => {}
453 other => panic!("unexpected: {other:?}"),
454 }
455
456 let b1 = disk.squeeze(
458 &CacheEntry::disk_arrow(DataType::Utf8, 1),
459 &states,
460 expression,
461 &squeeze_io,
462 );
463 assert!(matches!(b1, SqueezeOutcome::Remove));
464 let b2 = disk.squeeze(
465 &CacheEntry::disk_liquid(DataType::Utf8, 1),
466 &states,
467 expression,
468 &squeeze_io,
469 );
470 assert!(matches!(b2, SqueezeOutcome::Remove));
471 }
472
473 #[test]
474 fn test_squeeze_to_liquid_policy() {
475 let to_liquid = TranscodeSqueezeEvict;
476 let states = LiquidCompressorStates::new();
477 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
478
479 let arr = int_array(8);
481 let (new_batch, bytes) = into_replace(to_liquid.squeeze(
482 &CacheEntry::memory_arrow(arr.clone()),
483 &states,
484 None,
485 &squeeze_io,
486 ));
487 assert!(bytes.is_none());
488 match new_batch {
489 CacheEntry::MemoryLiquid(liq) => {
490 assert_eq!(liq.to_arrow_array().as_ref(), arr.as_ref());
491 }
492 other => panic!("unexpected: {other:?}"),
493 }
494 let expression = Some(&CacheExpression::PredicateColumn);
495
496 let strings = Arc::new(StringArray::from(vec!["x", "y", "x"])) as ArrayRef;
498 let liquid = transcode_liquid_inner(&strings, &states).unwrap();
499 let (new_batch, bytes) = into_replace(to_liquid.squeeze(
500 &CacheEntry::memory_liquid(liquid),
501 &states,
502 expression,
503 &squeeze_io,
504 ));
505 match (new_batch, bytes) {
506 (CacheEntry::MemorySqueezedLiquid(_), Some(b)) => assert!(!b.is_empty()),
507 other => panic!("unexpected: {other:?}"),
508 }
509
510 let strings = Arc::new(StringArray::from(vec!["m", "n"])) as ArrayRef;
512 let liquid = transcode_liquid_inner(&strings, &states).unwrap();
513 let squeezed = liquid.squeeze(squeeze_io.clone(), expression).unwrap().0;
514 let (new_batch, bytes) = into_replace(to_liquid.squeeze(
515 &CacheEntry::memory_squeezed_liquid(squeezed),
516 &states,
517 expression,
518 &squeeze_io,
519 ));
520 match (new_batch, bytes) {
521 (
522 CacheEntry::DiskLiquid {
523 data_type: DataType::Utf8,
524 ..
525 },
526 None,
527 ) => {}
528 other => panic!("unexpected: {other:?}"),
529 }
530
531 let b1 = to_liquid.squeeze(
533 &CacheEntry::disk_arrow(DataType::Utf8, 1),
534 &states,
535 expression,
536 &squeeze_io,
537 );
538 assert!(matches!(b1, SqueezeOutcome::Remove));
539 let b2 = to_liquid.squeeze(
540 &CacheEntry::disk_liquid(DataType::Utf8, 1),
541 &states,
542 expression,
543 &squeeze_io,
544 );
545 assert!(matches!(b2, SqueezeOutcome::Remove));
546 }
547
548 #[test]
549 fn transcode_squeeze_struct_falls_back_to_disk_arrow() {
550 let to_liquid = TranscodeSqueezeEvict;
551 let states = LiquidCompressorStates::new();
552 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
553 let struct_arr = struct_array();
554 let (new_batch, bytes) = into_replace(to_liquid.squeeze(
555 &CacheEntry::memory_arrow(struct_arr.clone()),
556 &states,
557 None,
558 &squeeze_io,
559 ));
560 match (new_batch, bytes) {
561 (
562 CacheEntry::DiskArrow {
563 data_type: dt,
564 disk_bytes,
565 },
566 Some(b),
567 ) => {
568 assert_eq!(&dt, struct_arr.data_type());
569 assert_eq!(disk_bytes, b.len());
570 assert_eq!(decode_arrow(&b).as_ref(), struct_arr.as_ref());
571 }
572 other => panic!("expected disk arrow fallback, got {other:?}"),
573 }
574 }
575
576 #[test]
577 fn transcode_evict_struct_falls_back_to_disk_arrow() {
578 let to_disk = TranscodeEvict;
579 let states = LiquidCompressorStates::new();
580 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
581 let struct_arr = struct_array();
582 let (new_batch, bytes) = into_replace(to_disk.squeeze(
583 &CacheEntry::memory_arrow(struct_arr.clone()),
584 &states,
585 None,
586 &squeeze_io,
587 ));
588 match (new_batch, bytes) {
589 (
590 CacheEntry::DiskArrow {
591 data_type: dt,
592 disk_bytes,
593 },
594 Some(b),
595 ) => {
596 assert_eq!(&dt, struct_arr.data_type());
597 assert_eq!(disk_bytes, b.len());
598 assert_eq!(decode_arrow(&b).as_ref(), struct_arr.as_ref());
599 }
600 other => panic!("expected disk arrow fallback, got {other:?}"),
601 }
602 }
603
604 fn enriched_variant_array(path: &str, data_type: DataType) -> ArrayRef {
605 enriched_variant_array_with_paths(&[(path, data_type)])
606 }
607
608 fn enriched_variant_array_with_paths(entries: &[(&str, DataType)]) -> ArrayRef {
609 let values: ArrayRef = Arc::new(StringArray::from(vec![
610 Some(r#"{"name": "Alice", "age": 30}"#),
611 Some(r#"{"name": "Bob", "age": 25}"#),
612 Some(r#"{"name": "Charlie", "age": 35}"#),
613 ]));
614 let base_variant = json_to_variant(&values).unwrap();
615 let base_arr: ArrayRef = Arc::new(base_variant.inner().clone());
616
617 let mut typed_structs: BTreeMap<String, ArrayRef> = BTreeMap::new();
618
619 for (path, data_type) in entries.iter() {
620 let typed_values = variant_get(
621 &base_arr,
622 GetOptions::new_with_path(
623 VariantPath::try_from(*path).expect("variant path should parse"),
624 )
625 .with_as_type(Some(Arc::new(Field::new(
626 "typed_value",
627 data_type.clone(),
628 true,
629 )))),
630 )
631 .unwrap();
632
633 typed_structs
634 .entry(path.to_string())
635 .or_insert(Arc::new(StructArray::new(
636 Fields::from(vec![Arc::new(Field::new(
637 "typed_value",
638 data_type.clone(),
639 true,
640 ))]),
641 vec![typed_values.clone()],
642 None,
643 )));
644 }
645
646 let mut typed_fields: Vec<Arc<Field>> = Vec::new();
647 let mut typed_columns: Vec<ArrayRef> = Vec::new();
648 for (name, tree) in typed_structs {
649 typed_fields.push(Arc::new(Field::new(
650 name.as_str(),
651 tree.data_type().clone(),
652 true,
653 )));
654 typed_columns.push(tree.clone());
655 }
656
657 let typed_struct = Arc::new(StructArray::new(
658 Fields::from(typed_fields),
659 typed_columns,
660 base_variant.inner().nulls().cloned(),
661 ));
662
663 let inner = base_variant.inner();
664 use arrow::array::BinaryViewArray;
665 Arc::new(StructArray::new(
666 Fields::from(vec![
667 Arc::new(Field::new("metadata", DataType::BinaryView, false)),
668 Arc::new(Field::new("value", DataType::BinaryView, true)),
669 Arc::new(Field::new(
670 "typed_value",
671 typed_struct.data_type().clone(),
672 true,
673 )),
674 ]),
675 vec![
676 inner
677 .column_by_name("metadata")
678 .cloned()
679 .unwrap_or_else(|| Arc::new(base_variant.metadata_field().clone()) as ArrayRef),
680 inner.column_by_name("value").cloned().unwrap_or_else(|| {
681 Arc::new(BinaryViewArray::from(vec![None::<&[u8]>; inner.len()])) as ArrayRef
682 }),
683 typed_struct as ArrayRef,
684 ],
685 inner.nulls().cloned(),
686 )) as ArrayRef
687 }
688
689 fn assert_variant_squeezed(
690 squeezed: &LiquidSqueezedArrayRef,
691 expected_path: &str,
692 bytes: &Bytes,
693 ) {
694 use futures::executor::block_on;
695
696 assert!(!bytes.is_empty());
697 assert!(matches!(squeezed.disk_backing(), SqueezedBacking::Arrow(_)));
698 let struct_squeezed = squeezed
699 .as_any()
700 .downcast_ref::<VariantStructSqueezedArray>()
701 .expect("squeezed variant struct");
702 let arrow_array = block_on(struct_squeezed.to_arrow_array());
703 let struct_array = arrow_array
704 .as_any()
705 .downcast_ref::<StructArray>()
706 .expect("variant struct");
707 let value_column = struct_array
708 .column_by_name("value")
709 .expect("value column present");
710 assert_eq!(value_column.len(), value_column.null_count());
711 let typed_struct = struct_array
712 .column_by_name("typed_value")
713 .expect("typed_value column")
714 .as_any()
715 .downcast_ref::<StructArray>()
716 .expect("typed struct");
717 assert!(
718 extract_typed_values_for_path(typed_struct, expected_path).is_some(),
719 "typed path {expected_path} missing from squeezed variant"
720 );
721 }
722
723 #[test]
724 fn test_variant_squeeze_with_hint() {
725 let policy = TranscodeSqueezeEvict;
726 let states = LiquidCompressorStates::new();
727 let variant_arr = enriched_variant_array("name", DataType::Utf8);
728 let hint = CacheExpression::variant_get("name", DataType::Utf8);
729 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
730
731 let (new_batch, bytes) = into_replace(policy.squeeze(
732 &CacheEntry::memory_arrow(variant_arr),
733 &states,
734 Some(&hint),
735 &squeeze_io,
736 ));
737
738 match (new_batch, bytes) {
739 (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
740 assert_variant_squeezed(&squeezed, "name", &b);
741 }
742 other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
743 }
744 }
745
746 #[test]
747 fn test_variant_squeeze_with_int64_path() {
748 let policy = TranscodeSqueezeEvict;
749 let states = LiquidCompressorStates::new();
750 let variant_arr = enriched_variant_array("age", DataType::Int64);
751 let hint = CacheExpression::variant_get("age", DataType::Int64);
752 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
753
754 let (new_batch, bytes) = into_replace(policy.squeeze(
755 &CacheEntry::memory_arrow(variant_arr),
756 &states,
757 Some(&hint),
758 &squeeze_io,
759 ));
760
761 match (new_batch, bytes) {
762 (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
763 assert_variant_squeezed(&squeezed, "age", &b);
764 }
765 other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
766 }
767 }
768
769 #[test]
770 fn test_variant_squeeze_with_multiple_paths_preserves_all_fields() {
771 let policy = TranscodeSqueezeEvict;
772 let states = LiquidCompressorStates::new();
773 let variant_arr = enriched_variant_array_with_paths(&[
774 ("name", DataType::Utf8),
775 ("age", DataType::Int64),
776 ]);
777 let hint = CacheExpression::variant_get("name", DataType::Utf8);
778 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
779
780 let (new_batch, bytes) = into_replace(policy.squeeze(
781 &CacheEntry::memory_arrow(variant_arr),
782 &states,
783 Some(&hint),
784 &squeeze_io,
785 ));
786
787 match (new_batch, bytes) {
788 (CacheEntry::MemorySqueezedLiquid(squeezed), Some(b)) => {
789 assert!(!b.is_empty());
790 let struct_squeezed = squeezed
791 .as_any()
792 .downcast_ref::<VariantStructSqueezedArray>()
793 .unwrap();
794 let arrow_array = futures::executor::block_on(struct_squeezed.to_arrow_array());
795 let struct_array = arrow_array.as_any().downcast_ref::<StructArray>().unwrap();
796 let typed_value = struct_array
797 .column_by_name("typed_value")
798 .unwrap()
799 .as_any()
800 .downcast_ref::<StructArray>()
801 .unwrap();
802 assert!(typed_value.column_by_name("name").is_some());
803 assert!(typed_value.column_by_name("age").is_none());
804 }
805 other => panic!("expected MemorySqueezedLiquid with bytes, got {other:?}"),
806 }
807 }
808
809 #[test]
810 fn test_variant_squeeze_without_hint() {
811 let policy = TranscodeSqueezeEvict;
812 let states = LiquidCompressorStates::new();
813 let variant_arr = enriched_variant_array("name", DataType::Utf8);
814 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
815
816 let (new_batch, bytes) = into_replace(policy.squeeze(
817 &CacheEntry::memory_arrow(variant_arr),
818 &states,
819 None,
820 &squeeze_io,
821 ));
822
823 match (new_batch, bytes) {
824 (CacheEntry::DiskArrow { disk_bytes, .. }, Some(b)) => {
825 assert_eq!(disk_bytes, b.len());
826 assert!(!b.is_empty());
827 }
828 (CacheEntry::MemoryLiquid(_), None) => {}
829 other => panic!("expected DiskArrow with bytes or MemoryLiquid, got {other:?}"),
830 }
831 }
832
833 #[test]
834 fn test_variant_squeeze_skips_when_path_missing() {
835 let policy = TranscodeSqueezeEvict;
836 let states = LiquidCompressorStates::new();
837 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(TestSqueezeIo::default());
838 let variant_arr = enriched_variant_array("name", DataType::Utf8);
839 let hint = CacheExpression::variant_get("age", DataType::Int64);
840
841 let (new_batch, bytes) = into_replace(policy.squeeze(
842 &CacheEntry::memory_arrow(variant_arr.clone()),
843 &states,
844 Some(&hint),
845 &squeeze_io,
846 ));
847
848 match (new_batch, bytes) {
849 (
850 CacheEntry::DiskArrow {
851 data_type: dt,
852 disk_bytes,
853 },
854 Some(b),
855 ) => {
856 assert_eq!(dt, variant_arr.data_type().clone());
857 assert_eq!(disk_bytes, b.len());
858 assert!(!b.is_empty());
859 }
860 other => panic!("expected DiskArrow fallback when path missing, got {other:?}"),
861 }
862 }
863}