1use arrow::array::cast::AsArray;
2use arrow::array::{ArrayRef, BooleanArray};
3use arrow::buffer::BooleanBuffer;
4use arrow::record_batch::RecordBatch;
5use arrow_schema::{Field, Schema};
6use bytes::Bytes;
7use futures::StreamExt;
8
9use super::{
10 budget::BudgetAccounting,
11 builders::{EvaluatePredicate, Get, Insert},
12 cached_batch::{CacheEntry, CachedBatchType},
13 io_context::{EntryMetadata, entry_id_to_key},
14 observer::{CacheTracer, InternalEvent, Observer},
15 policies::{CachePolicy, HydrationPolicy, HydrationRequest, MaterializedEntry},
16 utils::CacheConfig,
17};
18use crate::cache::DefaultSqueezeIo;
19use crate::cache::policies::{SqueezeOutcome, SqueezePolicy};
20use crate::cache::utils::{LiquidCompressorStates, arrow_to_bytes};
21use crate::cache::{CacheExpression, LiquidExpr, index::ArtIndex, utils::EntryID};
22use crate::cache::{CacheFull, CacheStats, EventTrace};
23use crate::liquid_array::{
24 LiquidSqueezedArrayRef, SqueezeIoHandler, SqueezedBacking, SqueezedDate32Array,
25 VariantStructSqueezedArray,
26};
27use crate::sync::Arc;
28
29#[derive(Debug)]
52pub struct LiquidCache {
53 index: ArtIndex,
54 config: CacheConfig,
55 budget: BudgetAccounting,
56 cache_policy: Box<dyn CachePolicy>,
57 hydration_policy: Box<dyn HydrationPolicy>,
58 squeeze_policy: Box<dyn SqueezePolicy>,
59 observer: Arc<Observer>,
60 metadata: Arc<dyn EntryMetadata>,
61 store: t4::Store,
62 squeeze_victims_concurrently: bool,
63}
64
65impl LiquidCache {
67 pub fn stats(&self) -> CacheStats {
69 let total_entries = self.index.entry_count();
71
72 let mut memory_arrow_entries = 0usize;
73 let mut memory_liquid_entries = 0usize;
74 let mut memory_squeezed_liquid_entries = 0usize;
75 let mut disk_liquid_entries = 0usize;
76 let mut disk_arrow_entries = 0usize;
77
78 let mut memory_arrow_bytes = 0usize;
79 let mut memory_liquid_bytes = 0usize;
80 let mut memory_squeezed_liquid_bytes = 0usize;
81
82 self.index.for_each(|_, batch| match batch {
83 CacheEntry::MemoryArrow(array) => {
84 memory_arrow_entries += 1;
85 memory_arrow_bytes += array.get_array_memory_size();
86 }
87 CacheEntry::MemoryLiquid(array) => {
88 memory_liquid_entries += 1;
89 memory_liquid_bytes += array.get_array_memory_size();
90 }
91 CacheEntry::MemorySqueezedLiquid(array) => {
92 memory_squeezed_liquid_entries += 1;
93 memory_squeezed_liquid_bytes += array.get_array_memory_size();
94 }
95 CacheEntry::DiskLiquid { .. } => disk_liquid_entries += 1,
96 CacheEntry::DiskArrow { .. } => disk_arrow_entries += 1,
97 });
98
99 let memory_usage_bytes = self.budget.memory_usage_bytes();
100 let disk_usage_bytes = self.budget.disk_usage_bytes();
101 let runtime = self.observer.runtime_snapshot();
102
103 CacheStats {
104 total_entries,
105 memory_arrow_entries,
106 memory_liquid_entries,
107 memory_squeezed_liquid_entries,
108 disk_liquid_entries,
109 disk_arrow_entries,
110 memory_arrow_bytes,
111 memory_liquid_bytes,
112 memory_squeezed_liquid_bytes,
113 memory_usage_bytes,
114 disk_usage_bytes,
115 max_memory_bytes: self.config.max_memory_bytes(),
116 max_disk_bytes: self.config.max_disk_bytes(),
117 runtime,
118 }
119 }
120
121 pub fn insert<'a>(
123 self: &'a Arc<Self>,
124 entry_id: EntryID,
125 batch_to_cache: ArrayRef,
126 ) -> Insert<'a> {
127 Insert::new(self, entry_id, batch_to_cache)
128 }
129
130 pub fn get<'a>(&'a self, entry_id: &'a EntryID) -> Get<'a> {
132 Get::new(self, entry_id)
133 }
134
135 pub fn eval_predicate<'a>(
137 &'a self,
138 entry_id: &'a EntryID,
139 predicate: &'a LiquidExpr,
140 ) -> EvaluatePredicate<'a> {
141 EvaluatePredicate::new(self, entry_id, predicate)
142 }
143
144 pub async fn try_read_liquid(
147 &self,
148 entry_id: &EntryID,
149 ) -> Option<crate::liquid_array::LiquidArrayRef> {
150 self.observer.on_try_read_liquid();
151 self.trace(InternalEvent::TryReadLiquid { entry: *entry_id });
152 let batch = self.index.get(entry_id)?;
153 self.cache_policy
154 .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
155
156 match batch.as_ref() {
157 CacheEntry::MemoryLiquid(array) => Some(array.clone()),
158 entry @ CacheEntry::DiskLiquid { .. } => {
159 let liquid = self.read_disk_liquid_array(entry_id).await;
160 self.maybe_hydrate(entry_id, entry, MaterializedEntry::Liquid(&liquid), None)
161 .await;
162 Some(liquid)
163 }
164 CacheEntry::MemorySqueezedLiquid(array) => match array.disk_backing() {
165 SqueezedBacking::Liquid(_) => {
166 let liquid = self.read_disk_liquid_array(entry_id).await;
167 Some(liquid)
168 }
169 SqueezedBacking::Arrow(_) => None,
170 },
171 CacheEntry::DiskArrow { .. } | CacheEntry::MemoryArrow(_) => None,
172 }
173 }
174
175 pub fn for_each_entry(&self, mut f: impl FnMut(&EntryID, &CacheEntry)) {
179 self.index.for_each(&mut f);
180 }
181
182 pub fn reset(&self) {
184 self.index.reset();
185 self.budget.reset_usage();
186 }
187
188 pub fn is_cached(&self, entry_id: &EntryID) -> bool {
190 self.index.is_cached(entry_id)
191 }
192
193 pub fn config(&self) -> &CacheConfig {
195 &self.config
196 }
197
198 pub fn budget(&self) -> &BudgetAccounting {
200 &self.budget
201 }
202
203 pub fn tracer(&self) -> &CacheTracer {
205 self.observer.cache_tracer()
206 }
207
208 pub fn observer(&self) -> &Observer {
210 &self.observer
211 }
212
213 pub fn compressor_states(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
215 self.metadata.get_compressor(entry_id)
216 }
217
218 pub fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
220 self.metadata.add_squeeze_hint(entry_id, expression);
221 }
222
223 pub async fn flush_all_to_disk(&self) -> Result<(), CacheFull> {
225 let mut entires = Vec::new();
226 self.for_each_entry(|entry_id, batch| {
227 entires.push((*entry_id, batch.clone()));
228 });
229 for (entry_id, batch) in entires {
230 match &batch {
231 CacheEntry::MemoryArrow(array) => {
232 let bytes = arrow_to_bytes(array).expect("failed to convert arrow to bytes");
233 let disk_bytes = bytes.len();
234 match self.write_batch_to_disk(entry_id, &batch, bytes).await {
235 Ok(()) => {
236 self.try_insert(
237 entry_id,
238 CacheEntry::disk_arrow(array.data_type().clone(), disk_bytes),
239 )
240 .expect("failed to insert disk arrow entry");
241 }
242 Err(CacheFull) => self.drop_memory_entry(entry_id, &batch),
243 }
244 }
245 CacheEntry::MemoryLiquid(liquid_array) => {
246 let liquid_bytes = liquid_array.to_bytes();
247 let disk_bytes = liquid_bytes.len();
248 match self
249 .write_batch_to_disk(entry_id, &batch, Bytes::from(liquid_bytes))
250 .await
251 {
252 Ok(()) => {
253 self.try_insert(
254 entry_id,
255 CacheEntry::disk_liquid(
256 liquid_array.original_arrow_data_type(),
257 disk_bytes,
258 ),
259 )
260 .expect("failed to insert disk liquid entry");
261 }
262 Err(CacheFull) => self.drop_memory_entry(entry_id, &batch),
263 }
264 }
265 CacheEntry::MemorySqueezedLiquid(array) => {
266 let disk_entry = Self::disk_entry_from_squeezed(array);
268 self.try_insert(entry_id, disk_entry)
269 .expect("failed to insert disk entry");
270 }
271 CacheEntry::DiskArrow { .. } | CacheEntry::DiskLiquid { .. } => {
272 }
274 }
275 }
276 Ok(())
277 }
278}
279
280impl LiquidCache {
281 async fn write_in_memory_batch_to_disk(
283 &self,
284 entry_id: EntryID,
285 batch: CacheEntry,
286 ) -> Result<CacheEntry, CacheFull> {
287 match &batch {
288 batch @ CacheEntry::MemoryArrow(_) => {
289 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(DefaultSqueezeIo::new(
290 self.store.clone(),
291 entry_id,
292 self.observer.clone(),
293 ));
294 let outcome = self.squeeze_policy.squeeze(
295 batch,
296 self.metadata.get_compressor(&entry_id).as_ref(),
297 None,
298 &squeeze_io,
299 );
300 let SqueezeOutcome::Replace {
301 entry: new_batch,
302 bytes_to_write,
303 } = outcome
304 else {
305 unreachable!("memory arrow squeeze cannot remove entry");
306 };
307 if let Some(bytes_to_write) = bytes_to_write {
308 self.write_batch_to_disk(entry_id, &new_batch, bytes_to_write)
309 .await?;
310 }
311 Ok(new_batch)
312 }
313 CacheEntry::MemoryLiquid(liquid_array) => {
314 let liquid_bytes = Bytes::from(liquid_array.to_bytes());
315 let disk_bytes = liquid_bytes.len();
316 self.write_batch_to_disk(entry_id, &batch, liquid_bytes)
317 .await?;
318 Ok(CacheEntry::disk_liquid(
319 liquid_array.original_arrow_data_type(),
320 disk_bytes,
321 ))
322 }
323 CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
324 let data_type = squeezed_array.original_arrow_data_type();
326 let entry = match squeezed_array.disk_backing() {
327 SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
328 SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
329 };
330 Ok(entry)
331 }
332 CacheEntry::DiskLiquid { .. } | CacheEntry::DiskArrow { .. } => {
333 unreachable!("Unexpected batch in write_in_memory_batch_to_disk")
334 }
335 }
336 }
337
338 pub(crate) async fn insert_inner(
340 &self,
341 entry_id: EntryID,
342 mut batch_to_cache: CacheEntry,
343 ) -> Result<(), CacheFull> {
344 loop {
345 let Err(not_inserted) = self.try_insert(entry_id, batch_to_cache) else {
346 return Ok(());
347 };
348 self.trace(InternalEvent::InsertFailed {
349 entry: entry_id,
350 kind: CachedBatchType::from(¬_inserted),
351 });
352
353 let victims = self.cache_policy.find_memory_victim(8);
354 if victims.is_empty() {
355 let on_disk_batch = self
359 .write_in_memory_batch_to_disk(entry_id, not_inserted)
360 .await?;
361 batch_to_cache = on_disk_batch;
362 continue;
363 }
364 self.squeeze_victims(victims).await?;
365
366 batch_to_cache = not_inserted;
367 crate::utils::yield_now_if_shuttle();
368 }
369 }
370
371 #[allow(clippy::too_many_arguments)]
373 pub(crate) fn new(
374 batch_size: usize,
375 max_memory_bytes: usize,
376 max_disk_bytes: usize,
377 squeeze_policy: Box<dyn SqueezePolicy>,
378 cache_policy: Box<dyn CachePolicy>,
379 hydration_policy: Box<dyn HydrationPolicy>,
380 metadata: Arc<dyn EntryMetadata>,
381 store: t4::Store,
382 squeeze_victims_concurrently: bool,
383 ) -> Self {
384 let config = CacheConfig::new(batch_size, max_memory_bytes, max_disk_bytes);
385 let observer = Arc::new(Observer::new());
386 Self {
387 index: ArtIndex::new(),
388 budget: BudgetAccounting::new(
389 config.max_memory_bytes(),
390 config.max_disk_bytes(),
391 observer.clone(),
392 ),
393 config,
394 cache_policy,
395 hydration_policy,
396 squeeze_policy,
397 observer,
398 metadata,
399 store,
400 squeeze_victims_concurrently,
401 }
402 }
403
404 fn try_insert(&self, entry_id: EntryID, to_insert: CacheEntry) -> Result<(), CacheEntry> {
405 let new_memory_size = to_insert.memory_usage_bytes();
406 let cached_batch_type = if let Some(entry) = self.index.get(&entry_id) {
407 let old_memory_size = entry.memory_usage_bytes();
408 if self
409 .budget
410 .try_update_memory_usage(old_memory_size, new_memory_size)
411 .is_err()
412 {
413 return Err(to_insert);
414 }
415 let batch_type = CachedBatchType::from(&to_insert);
416 self.index.insert(&entry_id, to_insert);
417 batch_type
418 } else {
419 if self.budget.try_reserve_memory(new_memory_size).is_err() {
420 return Err(to_insert);
421 }
422 let batch_type = CachedBatchType::from(&to_insert);
423 self.index.insert(&entry_id, to_insert);
424 batch_type
425 };
426
427 self.trace(InternalEvent::InsertSuccess {
428 entry: entry_id,
429 kind: cached_batch_type,
430 });
431 self.cache_policy
432 .notify_insert(&entry_id, cached_batch_type);
433
434 Ok(())
435 }
436
437 fn drop_memory_entry(&self, entry_id: EntryID, _expected: &CacheEntry) {
438 let Some(removed) = self.index.remove(&entry_id) else {
439 return;
440 };
441 assert!(
442 matches!(
443 removed.as_ref(),
444 CacheEntry::MemoryArrow(_)
445 | CacheEntry::MemoryLiquid(_)
446 | CacheEntry::MemorySqueezedLiquid(_)
447 ),
448 "flush should only drop memory entries"
449 );
450 self.budget
451 .try_update_memory_usage(removed.memory_usage_bytes(), 0)
452 .expect("memory release cannot fail");
453 self.cache_policy.notify_remove(&entry_id);
454 }
455
456 async fn remove_disk_entry(&self, entry_id: EntryID) {
457 let Some(removed) = self.index.remove(&entry_id) else {
458 return;
459 };
460 let disk_bytes = match removed.as_ref() {
461 CacheEntry::DiskLiquid { disk_bytes, .. }
462 | CacheEntry::DiskArrow { disk_bytes, .. } => *disk_bytes,
463 _ => panic!("remove_disk_entry called for non-disk entry"),
464 };
465 self.store
466 .remove(&entry_id_to_key(&entry_id))
467 .await
468 .expect("disk remove failed");
469 self.budget.release_disk(disk_bytes);
470 self.cache_policy.notify_remove(&entry_id);
471 self.trace(InternalEvent::DiskEvict {
472 entry: entry_id,
473 bytes: disk_bytes,
474 });
475 }
476
477 pub fn consume_event_trace(&self) -> EventTrace {
479 self.observer.consume_event_trace()
480 }
481
482 pub(crate) fn trace(&self, event: InternalEvent) {
483 self.observer.record_internal(event);
484 }
485
486 #[cfg(test)]
488 pub(crate) fn index(&self) -> &ArtIndex {
489 &self.index
490 }
491
492 #[fastrace::trace]
493 async fn squeeze_victims(&self, victims: Vec<EntryID>) -> Result<(), CacheFull> {
494 self.trace(InternalEvent::SqueezeBegin {
495 victims: victims.clone(),
496 });
497 if self.squeeze_victims_concurrently {
498 let results = futures::stream::iter(victims)
499 .map(|victim| self.squeeze_victim_inner(victim))
500 .buffer_unordered(usize::MAX)
501 .collect::<Vec<_>>()
502 .await;
503 results.into_iter().collect::<Result<Vec<_>, _>>()?;
504 } else {
505 for victim in victims {
506 self.squeeze_victim_inner(victim).await?;
507 }
508 }
509 Ok(())
510 }
511
512 async fn squeeze_victim_inner(&self, to_squeeze: EntryID) -> Result<(), CacheFull> {
513 let Some(mut to_squeeze_batch) = self.index.get(&to_squeeze) else {
514 return Ok(());
515 };
516 self.trace(InternalEvent::SqueezeVictim { entry: to_squeeze });
517 let compressor = self.metadata.get_compressor(&to_squeeze);
518 let squeeze_hint_arc = self.metadata.squeeze_hint(&to_squeeze);
519 let squeeze_hint = squeeze_hint_arc.as_deref();
520 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(DefaultSqueezeIo::new(
521 self.store.clone(),
522 to_squeeze,
523 self.observer.clone(),
524 ));
525
526 loop {
527 let outcome = self.squeeze_policy.squeeze(
528 to_squeeze_batch.as_ref(),
529 compressor.as_ref(),
530 squeeze_hint,
531 &squeeze_io,
532 );
533
534 match outcome {
535 SqueezeOutcome::Replace {
536 entry: new_batch,
537 bytes_to_write,
538 } => {
539 if let Some(bytes_to_write) = bytes_to_write {
540 self.write_batch_to_disk(to_squeeze, &new_batch, bytes_to_write)
541 .await?;
542 }
543 match self.try_insert(to_squeeze, new_batch) {
544 Ok(()) => {
545 break;
546 }
547 Err(batch) => {
548 to_squeeze_batch = Arc::new(batch);
549 }
550 }
551 }
552 SqueezeOutcome::Remove => {
553 self.remove_disk_entry(to_squeeze).await;
554 break;
555 }
556 }
557 }
558 Ok(())
559 }
560
561 fn disk_entry_from_squeezed(array: &LiquidSqueezedArrayRef) -> CacheEntry {
562 let data_type = array.original_arrow_data_type();
563 match array.disk_backing() {
564 SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
565 SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
566 }
567 }
568
569 async fn maybe_hydrate(
570 &self,
571 entry_id: &EntryID,
572 cached: &CacheEntry,
573 materialized: MaterializedEntry<'_>,
574 expression: Option<&CacheExpression>,
575 ) {
576 let compressor = self.metadata.get_compressor(entry_id);
577 if let Some(new_entry) = self.hydration_policy.hydrate(&HydrationRequest {
578 entry_id: *entry_id,
579 cached,
580 materialized,
581 expression,
582 compressor,
583 }) {
584 let cached_type = CachedBatchType::from(cached);
585 let new_type = CachedBatchType::from(&new_entry);
586 self.trace(InternalEvent::Hydrate {
587 entry: *entry_id,
588 cached: cached_type,
589 new: new_type,
590 });
591 let _ = self.insert_inner(*entry_id, new_entry).await;
592 }
593 }
594
595 pub(crate) async fn read_arrow_array(
596 &self,
597 entry_id: &EntryID,
598 selection: Option<&BooleanBuffer>,
599 expression: Option<&CacheExpression>,
600 ) -> Option<ArrayRef> {
601 use arrow::array::BooleanArray;
602
603 let batch = self.index.get(entry_id)?;
604 self.cache_policy
605 .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
606 self.trace(InternalEvent::Read {
607 entry: *entry_id,
608 selection: selection.is_some(),
609 expr: expression.cloned(),
610 cached: CachedBatchType::from(batch.as_ref()),
611 });
612
613 match batch.as_ref() {
614 CacheEntry::MemoryArrow(array) => match selection {
615 Some(selection) => {
616 let selection_array = BooleanArray::new(selection.clone(), None);
617 arrow::compute::filter(array, &selection_array).ok()
618 }
619 None => Some(array.clone()),
620 },
621 CacheEntry::MemoryLiquid(array) => match selection {
622 Some(selection) => Some(array.filter(selection)),
623 None => Some(array.to_arrow_array()),
624 },
625 CacheEntry::DiskArrow { .. } | CacheEntry::DiskLiquid { .. } => {
626 self.read_disk_array(batch.as_ref(), entry_id, expression, selection)
627 .await
628 }
629 CacheEntry::MemorySqueezedLiquid(array) => {
630 self.read_squeezed_array(array, entry_id, expression, selection)
631 .await
632 }
633 }
634 }
635
636 async fn read_disk_array(
637 &self,
638 entry: &CacheEntry,
639 entry_id: &EntryID,
640 expression: Option<&CacheExpression>,
641 selection: Option<&BooleanBuffer>,
642 ) -> Option<ArrayRef> {
643 match entry {
644 CacheEntry::DiskArrow { data_type, .. } => {
645 if let Some(selection) = selection
646 && selection.count_set_bits() == 0
647 {
648 return Some(arrow::array::new_empty_array(data_type));
649 }
650 let full_array = self.read_disk_arrow_array(entry_id).await;
651 self.maybe_hydrate(
652 entry_id,
653 entry,
654 MaterializedEntry::Arrow(&full_array),
655 expression,
656 )
657 .await;
658 match selection {
659 Some(selection) => {
660 let selection_array = BooleanArray::new(selection.clone(), None);
661 arrow::compute::filter(&full_array, &selection_array).ok()
662 }
663 None => Some(full_array),
664 }
665 }
666 CacheEntry::DiskLiquid { data_type, .. } => {
667 if let Some(selection) = selection
668 && selection.count_set_bits() == 0
669 {
670 return Some(arrow::array::new_empty_array(data_type));
671 }
672 let liquid = self.read_disk_liquid_array(entry_id).await;
673 self.maybe_hydrate(
674 entry_id,
675 entry,
676 MaterializedEntry::Liquid(&liquid),
677 expression,
678 )
679 .await;
680 match selection {
681 Some(selection) => Some(liquid.filter(selection)),
682 None => Some(liquid.to_arrow_array()),
683 }
684 }
685 _ => unreachable!("Unexpected batch in read_disk_array"),
686 }
687 }
688
689 async fn read_squeezed_array(
690 &self,
691 array: &LiquidSqueezedArrayRef,
692 entry_id: &EntryID,
693 expression: Option<&CacheExpression>,
694 selection: Option<&BooleanBuffer>,
695 ) -> Option<ArrayRef> {
696 if let Some(array) = self.try_read_squeezed_date32_array(array, expression, selection) {
697 self.observer.on_get_squeezed_success();
698 self.trace(InternalEvent::ReadSqueezedData {
699 entry: *entry_id,
700 expression: expression.unwrap().clone(),
701 });
702 return Some(array);
703 }
704
705 if let Some(array) = self
706 .try_read_squeezed_variant_array(array, entry_id, expression, selection)
707 .await
708 {
709 self.observer.on_get_squeezed_success();
710 self.trace(InternalEvent::ReadSqueezedData {
711 entry: *entry_id,
712 expression: expression.unwrap().clone(),
713 });
714 return Some(array);
715 }
716
717 let out = match selection {
719 Some(selection) => array.filter(selection).await,
720 None => array.to_arrow_array().await,
721 };
722 Some(out)
723 }
724
725 fn try_read_squeezed_date32_array(
726 &self,
727 array: &LiquidSqueezedArrayRef,
728 expression: Option<&CacheExpression>,
729 selection: Option<&BooleanBuffer>,
730 ) -> Option<ArrayRef> {
731 if let Some(CacheExpression::ExtractDate32 { field }) = expression
732 && let Some(squeezed) = array.as_any().downcast_ref::<SqueezedDate32Array>()
733 && squeezed.field() == *field
734 {
735 let component = squeezed.to_component_array();
736 self.observer.on_hit_date32_expression();
737 if let Some(selection) = selection {
738 let selection_array = BooleanArray::new(selection.clone(), None);
739 let filtered = arrow::compute::filter(&component, &selection_array).ok()?;
740 return Some(filtered);
741 }
742 return Some(component);
743 }
744 None
745 }
746
747 async fn try_read_squeezed_variant_array(
748 &self,
749 array: &LiquidSqueezedArrayRef,
750 entry_id: &EntryID,
751 expression: Option<&CacheExpression>,
752 selection: Option<&BooleanBuffer>,
753 ) -> Option<ArrayRef> {
754 let requests = expression.and_then(|expr| expr.variant_requests())?;
755 let variant_squeezed = array
756 .as_any()
757 .downcast_ref::<VariantStructSqueezedArray>()?;
758 let all_paths_present = requests
759 .iter()
760 .all(|request| variant_squeezed.contains_path(request.path()));
761
762 let full_array = if !all_paths_present {
763 let batch = CacheEntry::MemorySqueezedLiquid(array.clone());
764 self.observer.on_get_squeezed_needs_io();
765 let full_array = self.read_disk_arrow_array(entry_id).await;
766 self.maybe_hydrate(
767 entry_id,
768 &batch,
769 MaterializedEntry::Arrow(&full_array),
770 expression,
771 )
772 .await;
773 full_array
774 } else {
775 let requested_paths = requests.iter().map(|r| r.path());
776 variant_squeezed
777 .to_arrow_array_with_paths(requested_paths)
778 .unwrap()
779 };
780
781 match selection {
782 Some(selection) => {
783 let selection_array = BooleanArray::new(selection.clone(), None);
784 arrow::compute::filter(&full_array, &selection_array).ok()
785 }
786 None => Some(full_array),
787 }
788 }
789
790 async fn write_batch_to_disk(
791 &self,
792 entry_id: EntryID,
793 batch: &CacheEntry,
794 bytes: Bytes,
795 ) -> Result<(), CacheFull> {
796 let len = bytes.len();
797 loop {
798 if self.budget.try_reserve_disk(len).is_ok() {
799 break;
800 }
801 let victims = self.cache_policy.find_disk_victim(8);
802 if victims.is_empty() {
803 return Err(CacheFull);
804 }
805 for victim in victims {
806 self.remove_disk_entry(victim).await;
807 }
808 }
809 self.trace(InternalEvent::IoWrite {
810 entry: entry_id,
811 kind: CachedBatchType::from(batch),
812 bytes: len,
813 });
814 self.store
815 .put(entry_id_to_key(&entry_id), bytes.to_vec())
816 .await
817 .expect("write failed");
818 Ok(())
819 }
820
821 async fn read_disk_arrow_array(&self, entry_id: &EntryID) -> ArrayRef {
822 let bytes = self
823 .store
824 .get(&entry_id_to_key(entry_id))
825 .await
826 .expect("read failed");
827 let bytes_len = bytes.len();
828 let cursor = std::io::Cursor::new(bytes);
829 let mut reader =
830 arrow::ipc::reader::StreamReader::try_new(cursor, None).expect("create reader failed");
831 let batch = reader.next().unwrap().expect("read batch failed");
832 let array = batch.column(0).clone();
833 self.trace(InternalEvent::IoReadArrow {
834 entry: *entry_id,
835 bytes: bytes_len,
836 });
837 array
838 }
839
840 async fn read_disk_liquid_array(
841 &self,
842 entry_id: &EntryID,
843 ) -> crate::liquid_array::LiquidArrayRef {
844 let bytes = self
845 .store
846 .get(&entry_id_to_key(entry_id))
847 .await
848 .expect("read failed");
849 self.trace(InternalEvent::IoReadLiquid {
850 entry: *entry_id,
851 bytes: bytes.len(),
852 });
853 let compressor_states = self.metadata.get_compressor(entry_id);
854 let compressor = compressor_states.fsst_compressor();
855
856 (crate::liquid_array::ipc::read_from_bytes(
857 Bytes::from(bytes),
858 &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
859 )) as _
860 }
861
862 pub(crate) async fn eval_predicate_internal(
863 &self,
864 entry_id: &EntryID,
865 selection_opt: Option<&BooleanBuffer>,
866 predicate: &LiquidExpr,
867 ) -> Option<BooleanArray> {
868 use arrow::array::BooleanArray;
869
870 self.observer.on_eval_predicate();
871 let batch = self.index.get(entry_id)?;
872 self.cache_policy
873 .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
874 self.trace(InternalEvent::EvalPredicate {
875 entry: *entry_id,
876 selection: selection_opt.is_some(),
877 cached: CachedBatchType::from(batch.as_ref()),
878 });
879
880 match batch.as_ref() {
881 CacheEntry::MemoryArrow(array) => {
882 let mut owned = None;
883 let selection = selection_opt.unwrap_or_else(|| {
884 owned = Some(BooleanBuffer::new_set(array.len()));
885 owned.as_ref().unwrap()
886 });
887 let selection_array = BooleanArray::new(selection.clone(), None);
888 let filtered = arrow::compute::filter(array, &selection_array)
889 .expect("selection must match array length");
890 Some(self.eval_predicate_on_array(filtered, predicate))
891 }
892 entry @ CacheEntry::DiskArrow { .. } => {
893 let array = self.read_disk_arrow_array(entry_id).await;
894 self.maybe_hydrate(entry_id, entry, MaterializedEntry::Arrow(&array), None)
895 .await;
896 let mut owned = None;
897 let selection = selection_opt.unwrap_or_else(|| {
898 owned = Some(BooleanBuffer::new_set(array.len()));
899 owned.as_ref().unwrap()
900 });
901 let selection_array = BooleanArray::new(selection.clone(), None);
902 let filtered = arrow::compute::filter(&array, &selection_array)
903 .expect("selection must match array length");
904 Some(self.eval_predicate_on_array(filtered, predicate))
905 }
906 CacheEntry::MemoryLiquid(array) => {
907 let mut owned = None;
908 let selection = selection_opt.unwrap_or_else(|| {
909 owned = Some(BooleanBuffer::new_set(array.len()));
910 owned.as_ref().unwrap()
911 });
912 Some(array.try_eval_predicate(predicate, selection))
913 }
914 entry @ CacheEntry::DiskLiquid { .. } => {
915 let liquid = self.read_disk_liquid_array(entry_id).await;
916 self.maybe_hydrate(entry_id, entry, MaterializedEntry::Liquid(&liquid), None)
917 .await;
918 let mut owned = None;
919 let selection = selection_opt.unwrap_or_else(|| {
920 owned = Some(BooleanBuffer::new_set(liquid.len()));
921 owned.as_ref().unwrap()
922 });
923 Some(liquid.try_eval_predicate(predicate, selection))
924 }
925 CacheEntry::MemorySqueezedLiquid(array) => {
926 self.eval_predicate_on_squeezed(array, selection_opt, predicate)
927 .await
928 }
929 }
930 }
931
932 async fn eval_predicate_on_squeezed(
933 &self,
934 array: &LiquidSqueezedArrayRef,
935 selection_opt: Option<&BooleanBuffer>,
936 predicate: &LiquidExpr,
937 ) -> Option<BooleanArray> {
938 let mut owned = None;
939 let selection = selection_opt.unwrap_or_else(|| {
940 owned = Some(BooleanBuffer::new_set(array.len()));
941 owned.as_ref().unwrap()
942 });
943 Some(array.try_eval_predicate(predicate, selection).await)
944 }
945
946 fn eval_predicate_on_array(&self, array: ArrayRef, predicate: &LiquidExpr) -> BooleanArray {
947 let schema = Arc::new(Schema::new(vec![Field::new(
948 "liquid_predicate_col",
949 array.data_type().clone(),
950 true,
951 )]));
952 let record_batch =
953 RecordBatch::try_new(schema, vec![array]).expect("single-column predicate batch");
954 let result = predicate
955 .physical_expr()
956 .evaluate(&record_batch)
957 .expect("validated LiquidExpr must evaluate");
958 let boolean_array = result
959 .into_array(record_batch.num_rows())
960 .expect("predicate output must be an array");
961 boolean_array.as_boolean().clone()
962 }
963}
964
965#[cfg(test)]
966mod tests {
967 use super::*;
968 use crate::cache::{
969 CacheEntry, CacheExpression, CachePolicy, LiquidCacheBuilder, LiquidPolicy,
970 TranscodeSqueezeEvict, transcode_liquid_inner,
971 utils::{
972 LiquidCompressorStates, arrow_to_bytes, create_cache_store, create_test_array,
973 create_test_arrow_array,
974 },
975 };
976 use crate::liquid_array::{
977 Date32Field, LiquidPrimitiveArray, LiquidSqueezedArrayRef, SqueezedDate32Array,
978 };
979 use crate::sync::thread;
980 use arrow::array::{Array, ArrayRef, Date32Array, Int32Array};
981 use arrow::datatypes::Date32Type;
982 use std::future::Future;
983 use std::sync::Arc;
984 use std::sync::atomic::{AtomicUsize, Ordering};
985
986 #[derive(Debug)]
988 struct TestPolicy {
989 target_id: Option<EntryID>,
990 advice_count: AtomicUsize,
991 }
992
993 impl TestPolicy {
994 fn new(target_id: Option<EntryID>) -> Self {
995 Self {
996 target_id,
997 advice_count: AtomicUsize::new(0),
998 }
999 }
1000 }
1001
1002 impl CachePolicy for TestPolicy {
1003 fn find_memory_victim(&self, _cnt: usize) -> Vec<EntryID> {
1004 self.advice_count.fetch_add(1, Ordering::SeqCst);
1005 let id_to_use = self.target_id.unwrap();
1006 vec![id_to_use]
1007 }
1008 }
1009
1010 #[tokio::test]
1011 async fn test_basic_cache_operations() {
1012 let budget_size = 10 * 1024;
1014 let store = create_cache_store(budget_size, Box::new(LiquidPolicy::new())).await;
1015
1016 assert_eq!(store.budget.memory_usage_bytes(), 0);
1018
1019 let entry_id1: EntryID = EntryID::from(1);
1021 let array1 = create_test_array(100);
1022 let size1 = array1.memory_usage_bytes();
1023 store.insert_inner(entry_id1, array1).await.unwrap();
1024
1025 assert_eq!(store.budget.memory_usage_bytes(), size1);
1027 let retrieved1 = store.index().get(&entry_id1).unwrap();
1028 match retrieved1.as_ref() {
1029 CacheEntry::MemoryArrow(arr) => assert_eq!(arr.len(), 100),
1030 _ => panic!("Expected ArrowMemory"),
1031 }
1032
1033 let entry_id2: EntryID = EntryID::from(2);
1034 let array2 = create_test_array(200);
1035 let size2 = array2.memory_usage_bytes();
1036 store.insert_inner(entry_id2, array2).await.unwrap();
1037
1038 assert_eq!(store.budget.memory_usage_bytes(), size1 + size2);
1039
1040 let array3 = create_test_array(150);
1041 let size3 = array3.memory_usage_bytes();
1042 store.insert_inner(entry_id1, array3).await.unwrap();
1043
1044 assert_eq!(store.budget.memory_usage_bytes(), size3 + size2);
1045 assert!(store.index().get(&EntryID::from(999)).is_none());
1046 }
1047
1048 #[tokio::test]
1049 async fn get_arrow_array_with_expression_extracts_year() {
1050 let store = create_cache_store(1 << 20, Box::new(LiquidPolicy::new())).await;
1051 let entry_id = EntryID::from(42);
1052
1053 let date_values = Date32Array::from(vec![Some(2), Some(365 + 1), None, Some(365 + 100)]);
1054 let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(date_values.clone());
1055 let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, Date32Field::Year);
1056 let squeezed: LiquidSqueezedArrayRef = Arc::new(squeezed);
1057
1058 store
1059 .insert_inner(
1060 entry_id,
1061 CacheEntry::memory_squeezed_liquid(squeezed.clone()),
1062 )
1063 .await
1064 .unwrap();
1065
1066 let expr = Arc::new(CacheExpression::extract_date32(Date32Field::Year));
1067 let result = store
1068 .get(&entry_id)
1069 .with_expression_hint(expr)
1070 .read()
1071 .await
1072 .expect("array present");
1073
1074 let result = result
1075 .as_any()
1076 .downcast_ref::<Date32Array>()
1077 .expect("date32 result");
1078 assert_eq!(result.len(), 4);
1079 assert_eq!(result.value(0), 0);
1080 assert_eq!(result.value(1), 365);
1081 assert!(result.is_null(2));
1082 assert_eq!(result.value(3), 365);
1083 }
1084
1085 #[tokio::test]
1086 async fn test_cache_advice_strategies() {
1087 let entry_id1 = EntryID::from(1);
1091 let entry_id2 = EntryID::from(2);
1092
1093 {
1095 let advisor = TestPolicy::new(Some(entry_id1));
1096 let store = create_cache_store(8000, Box::new(advisor)).await; store
1099 .insert_inner(entry_id1, create_test_array(800))
1100 .await
1101 .unwrap();
1102 match store.index().get(&entry_id1).unwrap().as_ref() {
1103 CacheEntry::MemoryArrow(_) => {}
1104 other => panic!("Expected ArrowMemory, got {other:?}"),
1105 }
1106
1107 store
1108 .insert_inner(entry_id2, create_test_array(800))
1109 .await
1110 .unwrap();
1111 match store.index().get(&entry_id1).unwrap().as_ref() {
1112 CacheEntry::MemoryLiquid(_) => {}
1113 other => panic!("Expected LiquidMemory after eviction, got {other:?}"),
1114 }
1115 }
1116 }
1117
1118 #[tokio::test]
1119 async fn test_concurrent_cache_operations() {
1120 concurrent_cache_operations().await;
1121 }
1122
1123 pub fn block_on<F: Future>(future: F) -> F::Output {
1132 #[cfg(feature = "shuttle")]
1133 {
1134 shuttle::future::block_on(future)
1135 }
1136 #[cfg(not(feature = "shuttle"))]
1137 {
1138 tokio_test::block_on(future)
1139 }
1140 }
1141
1142 async fn concurrent_cache_operations() {
1143 let num_threads = 3;
1144 let ops_per_thread = 50;
1145
1146 let budget_size = num_threads * ops_per_thread * 100 * 8 / 2;
1147 let store = create_cache_store(budget_size, Box::new(LiquidPolicy::new())).await;
1148
1149 let mut handles = vec![];
1150 for thread_id in 0..num_threads {
1151 let store = store.clone();
1152 handles.push(thread::spawn(move || {
1153 block_on(async {
1154 for i in 0..ops_per_thread {
1155 let unique_id = thread_id * ops_per_thread + i;
1156 let entry_id: EntryID = EntryID::from(unique_id);
1157 let array = create_test_arrow_array(100);
1158 store.insert(entry_id, array).await.unwrap();
1159 }
1160 });
1161 }));
1162 }
1163 for handle in handles {
1164 handle.join().unwrap();
1165 }
1166
1167 for thread_id in 0..num_threads {
1169 for i in 0..ops_per_thread {
1170 let unique_id = thread_id * ops_per_thread + i;
1171 let entry_id: EntryID = EntryID::from(unique_id);
1172 assert!(store.index().get(&entry_id).is_some());
1173 }
1174 }
1175
1176 assert_eq!(store.index().keys().len(), num_threads * ops_per_thread);
1178 }
1179
1180 #[tokio::test]
1181 async fn test_cache_stats_memory_and_disk_usage() {
1182 let storage = LiquidCacheBuilder::new()
1184 .with_max_memory_bytes(10 * 1024 * 1024)
1185 .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1186 .build()
1187 .await;
1188
1189 let arr1: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
1191 let arr2: ArrayRef = Arc::new(Int32Array::from_iter_values(0..128));
1192 storage.insert(EntryID::from(1usize), arr1).await.unwrap();
1193 storage.insert(EntryID::from(2usize), arr2).await.unwrap();
1194
1195 let s = storage.stats();
1197 assert_eq!(s.total_entries, 2);
1198 assert!(s.memory_usage_bytes > 0);
1199 assert_eq!(s.disk_usage_bytes, 0);
1200 assert_eq!(s.max_memory_bytes, 10 * 1024 * 1024);
1201
1202 storage.flush_all_to_disk().await.unwrap();
1204 let s2 = storage.stats();
1205 assert_eq!(s2.total_entries, 2);
1206 assert!(s2.disk_usage_bytes > 0);
1207 assert!(s2.memory_usage_bytes <= s.memory_usage_bytes);
1209 }
1210
1211 #[tokio::test]
1212 async fn hydrate_disk_arrow_on_get_promotes_to_memory() {
1213 let store = create_cache_store(1 << 20, Box::new(LiquidPolicy::new())).await;
1214 let entry_id = EntryID::from(321usize);
1215 let array = create_test_arrow_array(8);
1216
1217 store.insert(entry_id, array.clone()).await.unwrap();
1218 store.flush_all_to_disk().await.unwrap();
1219 {
1220 let entry = store.index().get(&entry_id).unwrap();
1221 assert!(matches!(entry.as_ref(), CacheEntry::DiskArrow { .. }));
1222 }
1223
1224 let result = store.get(&entry_id).await.expect("present");
1225 assert_eq!(result.as_ref(), array.as_ref());
1226 {
1227 let entry = store.index().get(&entry_id).unwrap();
1228 assert!(matches!(entry.as_ref(), CacheEntry::MemoryArrow(_)));
1229 }
1230 }
1231
1232 #[tokio::test]
1233 async fn hydrate_disk_liquid_on_get_promotes_to_memory_liquid() {
1234 let store = create_cache_store(1 << 20, Box::new(LiquidPolicy::new())).await;
1235 let entry_id = EntryID::from(322usize);
1236 let arrow_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1237 let compressor = LiquidCompressorStates::new();
1238 let liquid = transcode_liquid_inner(&arrow_array, &compressor).unwrap();
1239
1240 store
1241 .insert_inner(entry_id, CacheEntry::memory_liquid(liquid.clone()))
1242 .await
1243 .unwrap();
1244 store.flush_all_to_disk().await.unwrap();
1245 {
1246 let entry = store.index().get(&entry_id).unwrap();
1247 assert!(matches!(entry.as_ref(), CacheEntry::DiskLiquid { .. }));
1248 }
1249
1250 let result = store.get(&entry_id).await.expect("present");
1251 assert_eq!(result.as_ref(), arrow_array.as_ref());
1252 {
1253 let entry = store.index().get(&entry_id).unwrap();
1254 assert!(matches!(entry.as_ref(), CacheEntry::MemoryLiquid(_)));
1255 }
1256 }
1257
1258 #[tokio::test]
1259 async fn insert_returns_cache_full_when_memory_and_disk_are_saturated() {
1260 let cache = LiquidCacheBuilder::new()
1261 .with_max_memory_bytes(0)
1262 .with_max_disk_bytes(0)
1263 .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1264 .build()
1265 .await;
1266 let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
1267
1268 let err = cache.insert(EntryID::from(900usize), array).await;
1269
1270 assert_eq!(err, Err(CacheFull));
1271 assert!(!cache.is_cached(&EntryID::from(900usize)));
1272 }
1273
1274 #[tokio::test]
1275 async fn insert_until_disk_full_then_evicts_oldest_disk_entry() {
1276 let first_array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
1277 let second_array: ArrayRef = Arc::new(Int32Array::from_iter_values(16..32));
1278 let first_bytes = arrow_to_bytes(&first_array).unwrap().len();
1279 let second_bytes = arrow_to_bytes(&second_array).unwrap().len();
1280 let cache = LiquidCacheBuilder::new()
1281 .with_max_memory_bytes(1 << 20)
1282 .with_max_disk_bytes(first_bytes.max(second_bytes))
1283 .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1284 .with_cache_policy(Box::new(LiquidPolicy::new()))
1285 .build()
1286 .await;
1287
1288 let first = EntryID::from(910usize);
1289 let second = EntryID::from(911usize);
1290 cache.insert(first, first_array).await.unwrap();
1291 cache.flush_all_to_disk().await.unwrap();
1292 assert!(cache.is_cached(&first));
1293
1294 cache.insert(second, second_array).await.unwrap();
1295 cache.flush_all_to_disk().await.unwrap();
1296
1297 assert!(!cache.is_cached(&first));
1298 assert!(matches!(
1299 cache.index().get(&second).unwrap().as_ref(),
1300 CacheEntry::DiskArrow { .. }
1301 ));
1302 }
1303
1304 #[tokio::test]
1305 async fn flush_all_to_disk_evicts_when_overflow() {
1306 let first_array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
1307 let second_array: ArrayRef = Arc::new(Int32Array::from_iter_values(16..32));
1308 let disk_bytes = arrow_to_bytes(&first_array).unwrap().len();
1309 let cache = LiquidCacheBuilder::new()
1310 .with_max_memory_bytes(1 << 20)
1311 .with_max_disk_bytes(disk_bytes)
1312 .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1313 .with_cache_policy(Box::new(LiquidPolicy::new()))
1314 .build()
1315 .await;
1316 let first = EntryID::from(912usize);
1317 let second = EntryID::from(913usize);
1318 cache.insert(first, first_array).await.unwrap();
1319 cache.flush_all_to_disk().await.unwrap();
1320 cache.insert(second, second_array).await.unwrap();
1321
1322 cache.flush_all_to_disk().await.unwrap();
1323
1324 assert!(!cache.is_cached(&first) || !cache.is_cached(&second));
1325 }
1326
1327 #[tokio::test]
1328 async fn disk_eviction_releases_budget() {
1329 let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
1330 let disk_bytes = arrow_to_bytes(&array).unwrap().len();
1331 let cache = LiquidCacheBuilder::new()
1332 .with_max_memory_bytes(1 << 20)
1333 .with_max_disk_bytes(disk_bytes)
1334 .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1335 .with_cache_policy(Box::new(LiquidPolicy::new()))
1336 .build()
1337 .await;
1338 let entry = EntryID::from(914usize);
1339 cache.insert(entry, array).await.unwrap();
1340 cache.flush_all_to_disk().await.unwrap();
1341 let before = cache.stats().disk_usage_bytes;
1342
1343 cache.remove_disk_entry(entry).await;
1344
1345 assert_eq!(cache.stats().disk_usage_bytes, before - disk_bytes);
1346 assert!(!cache.is_cached(&entry));
1347 }
1348
1349 #[tokio::test]
1350 async fn flush_all_to_disk_drops_entry_on_unrecoverable_overflow() {
1351 let cache = LiquidCacheBuilder::new()
1352 .with_max_memory_bytes(1 << 20)
1353 .with_max_disk_bytes(0)
1354 .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1355 .build()
1356 .await;
1357 let entry_id = EntryID::from(901usize);
1358 let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
1359 cache.insert(entry_id, array).await.unwrap();
1360
1361 let result = cache.flush_all_to_disk().await;
1362
1363 assert_eq!(result, Ok(()));
1364 assert!(!cache.is_cached(&entry_id));
1365 }
1366}