1use arrow::array::{ArrayRef, BooleanArray};
2use arrow::buffer::BooleanBuffer;
3use arrow_schema::DataType;
4use bytes::Bytes;
5use datafusion::physical_plan::PhysicalExpr;
6use futures::StreamExt;
7use std::path::PathBuf;
8
9use super::{
10 budget::BudgetAccounting,
11 builders::{EvaluatePredicate, Get, Insert},
12 cached_batch::{CacheEntry, CachedBatchType},
13 io_context::IoContext,
14 observer::{CacheTracer, InternalEvent, Observer},
15 policies::{CachePolicy, HydrationPolicy, HydrationRequest, MaterializedEntry},
16 utils::CacheConfig,
17};
18use crate::cache::policies::SqueezePolicy;
19use crate::cache::utils::{LiquidCompressorStates, arrow_to_bytes};
20use crate::cache::{CacheExpression, index::ArtIndex, utils::EntryID};
21use crate::cache::{CacheStats, EventTrace};
22use crate::cache::{DefaultSqueezeIo, RuntimeStats};
23use crate::liquid_array::{
24 LiquidSqueezedArrayRef, SqueezeIoHandler, SqueezedBacking, SqueezedDate32Array,
25 VariantStructSqueezedArray,
26};
27use crate::sync::Arc;
28
29#[derive(Debug)]
52pub struct LiquidCache {
53 index: ArtIndex,
54 config: CacheConfig,
55 budget: BudgetAccounting,
56 cache_policy: Box<dyn CachePolicy>,
57 hydration_policy: Box<dyn HydrationPolicy>,
58 squeeze_policy: Box<dyn SqueezePolicy>,
59 observer: Arc<Observer>,
60 io_context: Arc<dyn IoContext>,
61}
62
63impl LiquidCache {
65 pub fn stats(&self) -> CacheStats {
67 let total_entries = self.index.entry_count();
69
70 let mut memory_arrow_entries = 0usize;
71 let mut memory_liquid_entries = 0usize;
72 let mut memory_squeezed_liquid_entries = 0usize;
73 let mut disk_liquid_entries = 0usize;
74 let mut disk_arrow_entries = 0usize;
75
76 let mut memory_arrow_bytes = 0usize;
77 let mut memory_liquid_bytes = 0usize;
78 let mut memory_squeezed_liquid_bytes = 0usize;
79
80 self.index.for_each(|_, batch| match batch {
81 CacheEntry::MemoryArrow(array) => {
82 memory_arrow_entries += 1;
83 memory_arrow_bytes += array.get_array_memory_size();
84 }
85 CacheEntry::MemoryLiquid(array) => {
86 memory_liquid_entries += 1;
87 memory_liquid_bytes += array.get_array_memory_size();
88 }
89 CacheEntry::MemorySqueezedLiquid(array) => {
90 memory_squeezed_liquid_entries += 1;
91 memory_squeezed_liquid_bytes += array.get_array_memory_size();
92 }
93 CacheEntry::DiskLiquid(_) => disk_liquid_entries += 1,
94 CacheEntry::DiskArrow(_) => disk_arrow_entries += 1,
95 });
96
97 let memory_usage_bytes = self.budget.memory_usage_bytes();
98 let disk_usage_bytes = self.budget.disk_usage_bytes();
99 let runtime = self.observer.runtime_snapshot();
100
101 CacheStats {
102 total_entries,
103 memory_arrow_entries,
104 memory_liquid_entries,
105 memory_squeezed_liquid_entries,
106 disk_liquid_entries,
107 disk_arrow_entries,
108 memory_arrow_bytes,
109 memory_liquid_bytes,
110 memory_squeezed_liquid_bytes,
111 memory_usage_bytes,
112 disk_usage_bytes,
113 max_cache_bytes: self.config.max_cache_bytes(),
114 cache_root_dir: self.config.cache_root_dir().clone(),
115 runtime,
116 }
117 }
118
119 pub fn insert<'a>(
121 self: &'a Arc<Self>,
122 entry_id: EntryID,
123 batch_to_cache: ArrayRef,
124 ) -> Insert<'a> {
125 Insert::new(self, entry_id, batch_to_cache)
126 }
127
128 pub fn get<'a>(&'a self, entry_id: &'a EntryID) -> Get<'a> {
130 Get::new(self, entry_id)
131 }
132
133 pub fn eval_predicate<'a>(
135 &'a self,
136 entry_id: &'a EntryID,
137 predicate: &'a Arc<dyn PhysicalExpr>,
138 ) -> EvaluatePredicate<'a> {
139 EvaluatePredicate::new(self, entry_id, predicate)
140 }
141
142 pub async fn try_read_liquid(
145 &self,
146 entry_id: &EntryID,
147 ) -> Option<crate::liquid_array::LiquidArrayRef> {
148 self.observer.on_try_read_liquid();
149 self.trace(InternalEvent::TryReadLiquid { entry: *entry_id });
150 let batch = self.index.get(entry_id)?;
151 self.cache_policy
152 .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
153
154 match batch.as_ref() {
155 CacheEntry::MemoryLiquid(array) => Some(array.clone()),
156 entry @ CacheEntry::DiskLiquid(_) => {
157 let liquid = self.read_disk_liquid_array(entry_id).await;
158 self.maybe_hydrate(entry_id, entry, MaterializedEntry::Liquid(&liquid), None)
159 .await;
160 Some(liquid)
161 }
162 CacheEntry::MemorySqueezedLiquid(array) => match array.disk_backing() {
163 SqueezedBacking::Liquid => {
164 let liquid = self.read_disk_liquid_array(entry_id).await;
165 Some(liquid)
166 }
167 SqueezedBacking::Arrow => None,
168 },
169 CacheEntry::DiskArrow(_) | CacheEntry::MemoryArrow(_) => None,
170 }
171 }
172
173 pub fn for_each_entry(&self, mut f: impl FnMut(&EntryID, &CacheEntry)) {
177 self.index.for_each(&mut f);
178 }
179
180 pub fn reset(&self) {
182 self.index.reset();
183 self.budget.reset_usage();
184 }
185
186 pub fn is_cached(&self, entry_id: &EntryID) -> bool {
188 self.index.is_cached(entry_id)
189 }
190
191 pub fn config(&self) -> &CacheConfig {
193 &self.config
194 }
195
196 pub fn budget(&self) -> &BudgetAccounting {
198 &self.budget
199 }
200
201 pub fn tracer(&self) -> &CacheTracer {
203 self.observer.cache_tracer()
204 }
205
206 pub fn observer(&self) -> &Observer {
208 &self.observer
209 }
210
211 fn runtime_stats(&self) -> &RuntimeStats {
212 self.observer.runtime_stats()
213 }
214
215 pub fn compressor_states(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
217 self.io_context.get_compressor(entry_id)
218 }
219
220 pub fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
222 self.io_context.add_squeeze_hint(entry_id, expression);
223 }
224
225 pub async fn flush_all_to_disk(&self) {
227 let mut entires = Vec::new();
228 self.for_each_entry(|entry_id, batch| {
229 entires.push((*entry_id, batch.clone()));
230 });
231 for (entry_id, batch) in entires {
232 match &batch {
233 CacheEntry::MemoryArrow(array) => {
234 let bytes = arrow_to_bytes(array).expect("failed to convert arrow to bytes");
235 self.write_batch_to_disk(entry_id, &batch, bytes).await;
236 self.try_insert(entry_id, CacheEntry::disk_arrow(array.data_type().clone()))
237 .expect("failed to insert disk arrow entry");
238 }
239 CacheEntry::MemoryLiquid(liquid_array) => {
240 let liquid_bytes = liquid_array.to_bytes();
241 self.write_batch_to_disk(entry_id, &batch, Bytes::from(liquid_bytes))
242 .await;
243 self.try_insert(
244 entry_id,
245 CacheEntry::disk_liquid(liquid_array.original_arrow_data_type()),
246 )
247 .expect("failed to insert disk liquid entry");
248 }
249 CacheEntry::MemorySqueezedLiquid(array) => {
250 let disk_entry = Self::disk_entry_from_squeezed(array);
252 self.try_insert(entry_id, disk_entry)
253 .expect("failed to insert disk entry");
254 }
255 CacheEntry::DiskArrow(_) | CacheEntry::DiskLiquid(_) => {
256 }
258 }
259 }
260 }
261}
262
263impl LiquidCache {
264 async fn write_in_memory_batch_to_disk(
266 &self,
267 entry_id: EntryID,
268 batch: CacheEntry,
269 ) -> CacheEntry {
270 match &batch {
271 batch @ CacheEntry::MemoryArrow(_) => {
272 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(DefaultSqueezeIo::new(
273 self.io_context.clone(),
274 entry_id,
275 self.observer.clone(),
276 ));
277 let (new_batch, bytes_to_write) = self.squeeze_policy.squeeze(
278 batch,
279 self.io_context.get_compressor(&entry_id).as_ref(),
280 None,
281 &squeeze_io,
282 );
283 if let Some(bytes_to_write) = bytes_to_write {
284 self.write_batch_to_disk(entry_id, &new_batch, bytes_to_write)
285 .await;
286 }
287 new_batch
288 }
289 CacheEntry::MemoryLiquid(liquid_array) => {
290 let liquid_bytes = Bytes::from(liquid_array.to_bytes());
291 self.write_batch_to_disk(entry_id, &batch, liquid_bytes)
292 .await;
293 CacheEntry::disk_liquid(liquid_array.original_arrow_data_type())
294 }
295 CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
296 let backing = squeezed_array.disk_backing();
298 if backing == SqueezedBacking::Liquid {
299 CacheEntry::disk_liquid(squeezed_array.original_arrow_data_type())
300 } else {
301 CacheEntry::disk_arrow(squeezed_array.original_arrow_data_type())
302 }
303 }
304 CacheEntry::DiskLiquid(_) | CacheEntry::DiskArrow(_) => {
305 unreachable!("Unexpected batch in write_in_memory_batch_to_disk")
306 }
307 }
308 }
309
310 pub(crate) async fn insert_inner(&self, entry_id: EntryID, mut batch_to_cache: CacheEntry) {
312 loop {
313 let Err(not_inserted) = self.try_insert(entry_id, batch_to_cache) else {
314 return;
315 };
316 self.trace(InternalEvent::InsertFailed {
317 entry: entry_id,
318 kind: CachedBatchType::from(¬_inserted),
319 });
320
321 let victims = self.cache_policy.find_victim(8);
322 if victims.is_empty() {
323 let on_disk_batch = self
327 .write_in_memory_batch_to_disk(entry_id, not_inserted)
328 .await;
329 batch_to_cache = on_disk_batch;
330 continue;
331 }
332 self.squeeze_victims(victims).await;
333
334 batch_to_cache = not_inserted;
335 crate::utils::yield_now_if_shuttle();
336 }
337 }
338
339 pub(crate) fn new(
341 batch_size: usize,
342 max_cache_bytes: usize,
343 cache_dir: PathBuf,
344 squeeze_policy: Box<dyn SqueezePolicy>,
345 cache_policy: Box<dyn CachePolicy>,
346 hydration_policy: Box<dyn HydrationPolicy>,
347 io_worker: Arc<dyn IoContext>,
348 ) -> Self {
349 let config = CacheConfig::new(batch_size, max_cache_bytes, cache_dir);
350 Self {
351 index: ArtIndex::new(),
352 budget: BudgetAccounting::new(config.max_cache_bytes()),
353 config,
354 cache_policy,
355 hydration_policy,
356 squeeze_policy,
357 observer: Arc::new(Observer::new()),
358 io_context: io_worker,
359 }
360 }
361
362 fn try_insert(&self, entry_id: EntryID, to_insert: CacheEntry) -> Result<(), CacheEntry> {
363 let new_memory_size = to_insert.memory_usage_bytes();
364 let cached_batch_type = if let Some(entry) = self.index.get(&entry_id) {
365 let old_memory_size = entry.memory_usage_bytes();
366 if self
367 .budget
368 .try_update_memory_usage(old_memory_size, new_memory_size)
369 .is_err()
370 {
371 return Err(to_insert);
372 }
373 let batch_type = CachedBatchType::from(&to_insert);
374 self.index.insert(&entry_id, to_insert);
375 batch_type
376 } else {
377 if self.budget.try_reserve_memory(new_memory_size).is_err() {
378 return Err(to_insert);
379 }
380 let batch_type = CachedBatchType::from(&to_insert);
381 self.index.insert(&entry_id, to_insert);
382 batch_type
383 };
384
385 self.trace(InternalEvent::InsertSuccess {
386 entry: entry_id,
387 kind: cached_batch_type,
388 });
389 self.cache_policy
390 .notify_insert(&entry_id, cached_batch_type);
391
392 Ok(())
393 }
394
395 pub fn consume_event_trace(&self) -> EventTrace {
397 self.observer.consume_event_trace()
398 }
399
400 pub(crate) fn trace(&self, event: InternalEvent) {
401 self.observer.record_internal(event);
402 }
403
404 #[cfg(test)]
406 pub(crate) fn index(&self) -> &ArtIndex {
407 &self.index
408 }
409
410 #[fastrace::trace]
411 async fn squeeze_victims(&self, victims: Vec<EntryID>) {
412 self.trace(InternalEvent::SqueezeBegin {
414 victims: victims.clone(),
415 });
416 futures::stream::iter(victims)
417 .for_each_concurrent(None, |victim| async move {
418 self.squeeze_victim_inner(victim).await;
419 })
420 .await;
421 }
422
423 async fn squeeze_victim_inner(&self, to_squeeze: EntryID) {
424 let Some(mut to_squeeze_batch) = self.index.get(&to_squeeze) else {
425 return;
426 };
427 self.trace(InternalEvent::SqueezeVictim { entry: to_squeeze });
428 let compressor = self.io_context.get_compressor(&to_squeeze);
429 let squeeze_hint_arc = self.io_context.squeeze_hint(&to_squeeze);
430 let squeeze_hint = squeeze_hint_arc.as_deref();
431 let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(DefaultSqueezeIo::new(
432 self.io_context.clone(),
433 to_squeeze,
434 self.observer.clone(),
435 ));
436
437 loop {
438 let (new_batch, bytes_to_write) = self.squeeze_policy.squeeze(
439 to_squeeze_batch.as_ref(),
440 compressor.as_ref(),
441 squeeze_hint,
442 &squeeze_io,
443 );
444
445 if let Some(bytes_to_write) = bytes_to_write {
446 self.write_batch_to_disk(to_squeeze, &new_batch, bytes_to_write)
447 .await;
448 }
449 match self.try_insert(to_squeeze, new_batch) {
450 Ok(()) => {
451 break;
452 }
453 Err(batch) => {
454 to_squeeze_batch = Arc::new(batch);
455 }
456 }
457 }
458 }
459
460 fn disk_entry_from_squeezed(array: &LiquidSqueezedArrayRef) -> CacheEntry {
461 let constructor: fn(DataType) -> CacheEntry = match array.disk_backing() {
462 SqueezedBacking::Liquid => CacheEntry::disk_liquid,
463 SqueezedBacking::Arrow => CacheEntry::disk_arrow,
464 };
465 constructor(array.original_arrow_data_type())
466 }
467
468 async fn maybe_hydrate(
469 &self,
470 entry_id: &EntryID,
471 cached: &CacheEntry,
472 materialized: MaterializedEntry<'_>,
473 expression: Option<&CacheExpression>,
474 ) {
475 let compressor = self.io_context.get_compressor(entry_id);
476 if let Some(new_entry) = self.hydration_policy.hydrate(&HydrationRequest {
477 entry_id: *entry_id,
478 cached,
479 materialized,
480 expression,
481 compressor,
482 }) {
483 let cached_type = CachedBatchType::from(cached);
484 let new_type = CachedBatchType::from(&new_entry);
485 self.trace(InternalEvent::Hydrate {
486 entry: *entry_id,
487 cached: cached_type,
488 new: new_type,
489 });
490 self.insert_inner(*entry_id, new_entry).await;
491 }
492 }
493
494 pub(crate) async fn read_arrow_array(
495 &self,
496 entry_id: &EntryID,
497 selection: Option<&BooleanBuffer>,
498 expression: Option<&CacheExpression>,
499 ) -> Option<ArrayRef> {
500 use arrow::array::BooleanArray;
501
502 let batch = self.index.get(entry_id)?;
503 self.cache_policy
504 .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
505 self.trace(InternalEvent::Read {
506 entry: *entry_id,
507 selection: selection.is_some(),
508 expr: expression.cloned(),
509 cached: CachedBatchType::from(batch.as_ref()),
510 });
511
512 match batch.as_ref() {
513 CacheEntry::MemoryArrow(array) => match selection {
514 Some(selection) => {
515 let selection_array = BooleanArray::new(selection.clone(), None);
516 arrow::compute::filter(array, &selection_array).ok()
517 }
518 None => Some(array.clone()),
519 },
520 CacheEntry::MemoryLiquid(array) => match selection {
521 Some(selection) => Some(array.filter(selection)),
522 None => Some(array.to_arrow_array()),
523 },
524 CacheEntry::DiskArrow(_) | CacheEntry::DiskLiquid(_) => {
525 self.read_disk_array(batch.as_ref(), entry_id, expression, selection)
526 .await
527 }
528 CacheEntry::MemorySqueezedLiquid(array) => {
529 self.read_squeezed_array(array, entry_id, expression, selection)
530 .await
531 }
532 }
533 }
534
535 async fn read_disk_array(
536 &self,
537 entry: &CacheEntry,
538 entry_id: &EntryID,
539 expression: Option<&CacheExpression>,
540 selection: Option<&BooleanBuffer>,
541 ) -> Option<ArrayRef> {
542 match entry {
543 CacheEntry::DiskArrow(data_type) => {
544 if let Some(selection) = selection
545 && selection.count_set_bits() == 0
546 {
547 return Some(arrow::array::new_empty_array(data_type));
548 }
549 let full_array = self.read_disk_arrow_array(entry_id).await;
550 self.maybe_hydrate(
551 entry_id,
552 entry,
553 MaterializedEntry::Arrow(&full_array),
554 expression,
555 )
556 .await;
557 match selection {
558 Some(selection) => {
559 let selection_array = BooleanArray::new(selection.clone(), None);
560 arrow::compute::filter(&full_array, &selection_array).ok()
561 }
562 None => Some(full_array),
563 }
564 }
565 CacheEntry::DiskLiquid(data_type) => {
566 if let Some(selection) = selection
567 && selection.count_set_bits() == 0
568 {
569 return Some(arrow::array::new_empty_array(data_type));
570 }
571 let liquid = self.read_disk_liquid_array(entry_id).await;
572 self.maybe_hydrate(
573 entry_id,
574 entry,
575 MaterializedEntry::Liquid(&liquid),
576 expression,
577 )
578 .await;
579 match selection {
580 Some(selection) => Some(liquid.filter(selection)),
581 None => Some(liquid.to_arrow_array()),
582 }
583 }
584 _ => unreachable!("Unexpected batch in read_disk_array"),
585 }
586 }
587
588 async fn read_squeezed_array(
589 &self,
590 array: &LiquidSqueezedArrayRef,
591 entry_id: &EntryID,
592 expression: Option<&CacheExpression>,
593 selection: Option<&BooleanBuffer>,
594 ) -> Option<ArrayRef> {
595 if let Some(array) = self.try_read_squeezed_date32_array(array, expression, selection) {
596 self.observer.on_get_squeezed_success();
597 self.trace(InternalEvent::ReadSqueezedData {
598 entry: *entry_id,
599 expression: expression.unwrap().clone(),
600 });
601 return Some(array);
602 }
603
604 if let Some(array) = self
605 .try_read_squeezed_variant_array(array, entry_id, expression, selection)
606 .await
607 {
608 self.observer.on_get_squeezed_success();
609 self.trace(InternalEvent::ReadSqueezedData {
610 entry: *entry_id,
611 expression: expression.unwrap().clone(),
612 });
613 return Some(array);
614 }
615
616 let out = match selection {
618 Some(selection) => array.filter(selection).await,
619 None => array.to_arrow_array().await,
620 };
621 Some(out)
622 }
623
624 fn try_read_squeezed_date32_array(
625 &self,
626 array: &LiquidSqueezedArrayRef,
627 expression: Option<&CacheExpression>,
628 selection: Option<&BooleanBuffer>,
629 ) -> Option<ArrayRef> {
630 if let Some(CacheExpression::ExtractDate32 { field }) = expression
631 && let Some(squeezed) = array.as_any().downcast_ref::<SqueezedDate32Array>()
632 && squeezed.field() == *field
633 {
634 let component = squeezed.to_component_array();
635 self.observer.on_hit_date32_expression();
636 if let Some(selection) = selection {
637 let selection_array = BooleanArray::new(selection.clone(), None);
638 let filtered = arrow::compute::filter(&component, &selection_array).ok()?;
639 return Some(filtered);
640 }
641 return Some(component);
642 }
643 None
644 }
645
646 async fn try_read_squeezed_variant_array(
647 &self,
648 array: &LiquidSqueezedArrayRef,
649 entry_id: &EntryID,
650 expression: Option<&CacheExpression>,
651 selection: Option<&BooleanBuffer>,
652 ) -> Option<ArrayRef> {
653 let requests = expression.and_then(|expr| expr.variant_requests())?;
654 let variant_squeezed = array
655 .as_any()
656 .downcast_ref::<VariantStructSqueezedArray>()?;
657 let all_paths_present = requests
658 .iter()
659 .all(|request| variant_squeezed.contains_path(request.path()));
660
661 let full_array = if !all_paths_present {
662 let batch = CacheEntry::MemorySqueezedLiquid(array.clone());
663 self.observer.on_get_squeezed_needs_io();
664 let full_array = self.read_disk_arrow_array(entry_id).await;
665 self.maybe_hydrate(
666 entry_id,
667 &batch,
668 MaterializedEntry::Arrow(&full_array),
669 expression,
670 )
671 .await;
672 full_array
673 } else {
674 let requested_paths = requests.iter().map(|r| r.path());
675 variant_squeezed
676 .to_arrow_array_with_paths(requested_paths)
677 .unwrap()
678 };
679
680 match selection {
681 Some(selection) => {
682 let selection_array = BooleanArray::new(selection.clone(), None);
683 arrow::compute::filter(&full_array, &selection_array).ok()
684 }
685 None => Some(full_array),
686 }
687 }
688
689 async fn write_batch_to_disk(&self, entry_id: EntryID, batch: &CacheEntry, bytes: Bytes) {
690 self.trace(InternalEvent::IoWrite {
691 entry: entry_id,
692 kind: CachedBatchType::from(batch),
693 bytes: bytes.len(),
694 });
695 let path = self.io_context.disk_path(&entry_id);
696 let len = bytes.len();
697 self.io_context.write_file(path, bytes).await.unwrap();
698 self.budget.add_used_disk_bytes(len);
699 }
700
701 async fn read_disk_arrow_array(&self, entry_id: &EntryID) -> ArrayRef {
702 let path = self.io_context.disk_path(entry_id);
703 let bytes = self.io_context.read(path, None).await.expect("read failed");
704 let cursor = std::io::Cursor::new(bytes.to_vec());
705 let mut reader =
706 arrow::ipc::reader::StreamReader::try_new(cursor, None).expect("create reader failed");
707 let batch = reader.next().unwrap().expect("read batch failed");
708 let array = batch.column(0).clone();
709 self.trace(InternalEvent::IoReadArrow {
710 entry: *entry_id,
711 bytes: bytes.len(),
712 });
713 array
714 }
715
716 async fn read_disk_liquid_array(
717 &self,
718 entry_id: &EntryID,
719 ) -> crate::liquid_array::LiquidArrayRef {
720 let path = self.io_context.disk_path(entry_id);
721 let bytes = self.io_context.read(path, None).await.expect("read failed");
722 self.trace(InternalEvent::IoReadLiquid {
723 entry: *entry_id,
724 bytes: bytes.len(),
725 });
726 let compressor_states = self.io_context.get_compressor(entry_id);
727 let compressor = compressor_states.fsst_compressor();
728
729 (crate::liquid_array::ipc::read_from_bytes(
730 bytes,
731 &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
732 )) as _
733 }
734
735 pub(crate) async fn eval_predicate_internal(
736 &self,
737 entry_id: &EntryID,
738 selection_opt: Option<&BooleanBuffer>,
739 predicate: &Arc<dyn PhysicalExpr>,
740 ) -> Option<Result<BooleanArray, ArrayRef>> {
741 use arrow::array::BooleanArray;
742
743 self.observer.on_eval_predicate();
744 let batch = self.index.get(entry_id)?;
745 self.cache_policy
746 .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
747 self.trace(InternalEvent::EvalPredicate {
748 entry: *entry_id,
749 selection: selection_opt.is_some(),
750 cached: CachedBatchType::from(batch.as_ref()),
751 });
752
753 match batch.as_ref() {
754 CacheEntry::MemoryArrow(array) => {
755 let mut owned = None;
756 let selection = selection_opt.unwrap_or_else(|| {
757 owned = Some(BooleanBuffer::new_set(array.len()));
758 owned.as_ref().unwrap()
759 });
760 let selection_array = BooleanArray::new(selection.clone(), None);
761 let filtered = arrow::compute::filter(array, &selection_array).ok()?;
762 Some(Err(filtered))
763 }
764 entry @ CacheEntry::DiskArrow(_) => {
765 let array = self.read_disk_arrow_array(entry_id).await;
766 self.maybe_hydrate(entry_id, entry, MaterializedEntry::Arrow(&array), None)
767 .await;
768 let mut owned = None;
769 let selection = selection_opt.unwrap_or_else(|| {
770 owned = Some(BooleanBuffer::new_set(array.len()));
771 owned.as_ref().unwrap()
772 });
773 let selection_array = BooleanArray::new(selection.clone(), None);
774 let filtered = arrow::compute::filter(&array, &selection_array).ok()?;
775 Some(Err(filtered))
776 }
777 CacheEntry::MemoryLiquid(array) => {
778 let mut owned = None;
779 let selection = selection_opt.unwrap_or_else(|| {
780 owned = Some(BooleanBuffer::new_set(array.len()));
781 owned.as_ref().unwrap()
782 });
783 match array.try_eval_predicate(predicate, selection) {
784 Some(buf) => Some(Ok(buf)),
785 None => {
786 self.runtime_stats().incr_eval_predicate_on_liquid_failed();
787 let filtered = array.filter(selection);
788 Some(Err(filtered))
789 }
790 }
791 }
792 entry @ CacheEntry::DiskLiquid(_) => {
793 let liquid = self.read_disk_liquid_array(entry_id).await;
794 self.maybe_hydrate(entry_id, entry, MaterializedEntry::Liquid(&liquid), None)
795 .await;
796 let mut owned = None;
797 let selection = selection_opt.unwrap_or_else(|| {
798 owned = Some(BooleanBuffer::new_set(liquid.len()));
799 owned.as_ref().unwrap()
800 });
801 match liquid.try_eval_predicate(predicate, selection) {
802 Some(buf) => Some(Ok(buf)),
803 None => {
804 self.runtime_stats().incr_eval_predicate_on_liquid_failed();
805 let filtered = liquid.filter(selection);
806 Some(Err(filtered))
807 }
808 }
809 }
810 CacheEntry::MemorySqueezedLiquid(array) => {
811 self.eval_predicate_on_squeezed(array, selection_opt, predicate)
812 .await
813 }
814 }
815 }
816
817 async fn eval_predicate_on_squeezed(
818 &self,
819 array: &LiquidSqueezedArrayRef,
820 selection_opt: Option<&BooleanBuffer>,
821 predicate: &Arc<dyn PhysicalExpr>,
822 ) -> Option<Result<BooleanArray, ArrayRef>> {
823 let mut owned = None;
824 let selection = selection_opt.unwrap_or_else(|| {
825 owned = Some(BooleanBuffer::new_set(array.len()));
826 owned.as_ref().unwrap()
827 });
828 match array.try_eval_predicate(predicate, selection).await {
829 Some(buf) => Some(Ok(buf)),
830 None => Some(Err(array.filter(selection).await)),
831 }
832 }
833}
834
835#[cfg(test)]
836mod tests {
837 use super::*;
838 use crate::cache::{
839 CacheEntry, CacheExpression, CachePolicy, LiquidCacheBuilder, TranscodeSqueezeEvict,
840 policies::LruPolicy,
841 transcode_liquid_inner,
842 utils::{
843 LiquidCompressorStates, create_cache_store, create_test_array, create_test_arrow_array,
844 },
845 };
846 use crate::liquid_array::{
847 Date32Field, LiquidPrimitiveArray, LiquidSqueezedArrayRef, SqueezedDate32Array,
848 };
849 use crate::sync::thread;
850 use arrow::array::{Array, ArrayRef, Date32Array, Int32Array};
851 use arrow::datatypes::Date32Type;
852 use std::future::Future;
853 use std::sync::Arc;
854 use std::sync::atomic::{AtomicUsize, Ordering};
855
856 #[derive(Debug)]
858 struct TestPolicy {
859 target_id: Option<EntryID>,
860 advice_count: AtomicUsize,
861 }
862
863 impl TestPolicy {
864 fn new(target_id: Option<EntryID>) -> Self {
865 Self {
866 target_id,
867 advice_count: AtomicUsize::new(0),
868 }
869 }
870 }
871
872 impl CachePolicy for TestPolicy {
873 fn find_victim(&self, _cnt: usize) -> Vec<EntryID> {
874 self.advice_count.fetch_add(1, Ordering::SeqCst);
875 let id_to_use = self.target_id.unwrap();
876 vec![id_to_use]
877 }
878 }
879
880 #[tokio::test]
881 async fn test_basic_cache_operations() {
882 let budget_size = 10 * 1024;
884 let store = create_cache_store(budget_size, Box::new(LruPolicy::new()));
885
886 assert_eq!(store.budget.memory_usage_bytes(), 0);
888
889 let entry_id1: EntryID = EntryID::from(1);
891 let array1 = create_test_array(100);
892 let size1 = array1.memory_usage_bytes();
893 store.insert_inner(entry_id1, array1).await;
894
895 assert_eq!(store.budget.memory_usage_bytes(), size1);
897 let retrieved1 = store.index().get(&entry_id1).unwrap();
898 match retrieved1.as_ref() {
899 CacheEntry::MemoryArrow(arr) => assert_eq!(arr.len(), 100),
900 _ => panic!("Expected ArrowMemory"),
901 }
902
903 let entry_id2: EntryID = EntryID::from(2);
904 let array2 = create_test_array(200);
905 let size2 = array2.memory_usage_bytes();
906 store.insert_inner(entry_id2, array2).await;
907
908 assert_eq!(store.budget.memory_usage_bytes(), size1 + size2);
909
910 let array3 = create_test_array(150);
911 let size3 = array3.memory_usage_bytes();
912 store.insert_inner(entry_id1, array3).await;
913
914 assert_eq!(store.budget.memory_usage_bytes(), size3 + size2);
915 assert!(store.index().get(&EntryID::from(999)).is_none());
916 }
917
918 #[tokio::test]
919 async fn get_arrow_array_with_expression_extracts_year() {
920 let store = create_cache_store(1 << 20, Box::new(LruPolicy::new()));
921 let entry_id = EntryID::from(42);
922
923 let date_values = Date32Array::from(vec![Some(0), Some(365), None, Some(730)]);
924 let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(date_values.clone());
925 let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, Date32Field::Year);
926 let squeezed: LiquidSqueezedArrayRef = Arc::new(squeezed);
927
928 store
929 .insert_inner(
930 entry_id,
931 CacheEntry::memory_squeezed_liquid(squeezed.clone()),
932 )
933 .await;
934
935 let expr = Arc::new(CacheExpression::extract_date32(Date32Field::Year));
936 let result = store
937 .get(&entry_id)
938 .with_expression_hint(expr)
939 .read()
940 .await
941 .expect("array present");
942
943 let result = result
944 .as_any()
945 .downcast_ref::<Date32Array>()
946 .expect("date32 result");
947 assert_eq!(result.len(), 4);
948 assert_eq!(result.value(0), 1970);
949 assert_eq!(result.value(1), 1971);
950 assert!(result.is_null(2));
951 assert_eq!(result.value(3), 1972);
952 }
953
954 #[tokio::test]
955 async fn test_cache_advice_strategies() {
956 let entry_id1 = EntryID::from(1);
960 let entry_id2 = EntryID::from(2);
961
962 {
964 let advisor = TestPolicy::new(Some(entry_id1));
965 let store = create_cache_store(8000, Box::new(advisor)); store.insert_inner(entry_id1, create_test_array(800)).await;
968 match store.index().get(&entry_id1).unwrap().as_ref() {
969 CacheEntry::MemoryArrow(_) => {}
970 other => panic!("Expected ArrowMemory, got {other:?}"),
971 }
972
973 store.insert_inner(entry_id2, create_test_array(800)).await;
974 match store.index().get(&entry_id1).unwrap().as_ref() {
975 CacheEntry::MemoryLiquid(_) => {}
976 other => panic!("Expected LiquidMemory after eviction, got {other:?}"),
977 }
978 }
979 }
980
981 #[tokio::test]
982 async fn test_concurrent_cache_operations() {
983 concurrent_cache_operations().await;
984 }
985
986 #[cfg(feature = "shuttle")]
987 #[test]
988 fn shuttle_cache_operations() {
989 crate::utils::shuttle_test(|| {
990 block_on(concurrent_cache_operations());
991 });
992 }
993
994 pub fn block_on<F: Future>(future: F) -> F::Output {
995 #[cfg(feature = "shuttle")]
996 {
997 shuttle::future::block_on(future)
998 }
999 #[cfg(not(feature = "shuttle"))]
1000 {
1001 tokio_test::block_on(future)
1002 }
1003 }
1004
1005 async fn concurrent_cache_operations() {
1006 let num_threads = 3;
1007 let ops_per_thread = 50;
1008
1009 let budget_size = num_threads * ops_per_thread * 100 * 8 / 2;
1010 let store = Arc::new(create_cache_store(budget_size, Box::new(LruPolicy::new())));
1011
1012 let mut handles = vec![];
1013 for thread_id in 0..num_threads {
1014 let store = store.clone();
1015 handles.push(thread::spawn(move || {
1016 block_on(async {
1017 for i in 0..ops_per_thread {
1018 let unique_id = thread_id * ops_per_thread + i;
1019 let entry_id: EntryID = EntryID::from(unique_id);
1020 let array = create_test_arrow_array(100);
1021 store.insert(entry_id, array).await;
1022 }
1023 });
1024 }));
1025 }
1026 for handle in handles {
1027 handle.join().unwrap();
1028 }
1029
1030 for thread_id in 0..num_threads {
1032 for i in 0..ops_per_thread {
1033 let unique_id = thread_id * ops_per_thread + i;
1034 let entry_id: EntryID = EntryID::from(unique_id);
1035 assert!(store.index().get(&entry_id).is_some());
1036 }
1037 }
1038
1039 assert_eq!(store.index().keys().len(), num_threads * ops_per_thread);
1041 }
1042
1043 #[tokio::test]
1044 async fn test_cache_stats_memory_and_disk_usage() {
1045 let storage = LiquidCacheBuilder::new()
1047 .with_max_cache_bytes(10 * 1024 * 1024)
1048 .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1049 .build();
1050
1051 let arr1: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
1053 let arr2: ArrayRef = Arc::new(Int32Array::from_iter_values(0..128));
1054 storage.insert(EntryID::from(1usize), arr1).await;
1055 storage.insert(EntryID::from(2usize), arr2).await;
1056
1057 let s = storage.stats();
1059 assert_eq!(s.total_entries, 2);
1060 assert!(s.memory_usage_bytes > 0);
1061 assert_eq!(s.disk_usage_bytes, 0);
1062 assert_eq!(s.max_cache_bytes, 10 * 1024 * 1024);
1063
1064 storage.flush_all_to_disk().await;
1066 let s2 = storage.stats();
1067 assert_eq!(s2.total_entries, 2);
1068 assert!(s2.disk_usage_bytes > 0);
1069 assert!(s2.memory_usage_bytes <= s.memory_usage_bytes);
1071 }
1072
1073 #[tokio::test]
1074 async fn hydrate_disk_arrow_on_get_promotes_to_memory() {
1075 let store = create_cache_store(1 << 20, Box::new(LruPolicy::new()));
1076 let entry_id = EntryID::from(321usize);
1077 let array = create_test_arrow_array(8);
1078
1079 store.insert(entry_id, array.clone()).await;
1080 store.flush_all_to_disk().await;
1081 {
1082 let entry = store.index().get(&entry_id).unwrap();
1083 assert!(matches!(entry.as_ref(), CacheEntry::DiskArrow(_)));
1084 }
1085
1086 let result = store.get(&entry_id).await.expect("present");
1087 assert_eq!(result.as_ref(), array.as_ref());
1088 {
1089 let entry = store.index().get(&entry_id).unwrap();
1090 assert!(matches!(entry.as_ref(), CacheEntry::MemoryArrow(_)));
1091 }
1092 }
1093
1094 #[tokio::test]
1095 async fn hydrate_disk_liquid_on_get_promotes_to_memory_liquid() {
1096 let store = create_cache_store(1 << 20, Box::new(LruPolicy::new()));
1097 let entry_id = EntryID::from(322usize);
1098 let arrow_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1099 let compressor = LiquidCompressorStates::new();
1100 let liquid = transcode_liquid_inner(&arrow_array, &compressor).unwrap();
1101
1102 store
1103 .insert_inner(entry_id, CacheEntry::memory_liquid(liquid.clone()))
1104 .await;
1105 store.flush_all_to_disk().await;
1106 {
1107 let entry = store.index().get(&entry_id).unwrap();
1108 assert!(matches!(entry.as_ref(), CacheEntry::DiskLiquid(_)));
1109 }
1110
1111 let result = store.get(&entry_id).await.expect("present");
1112 assert_eq!(result.as_ref(), arrow_array.as_ref());
1113 {
1114 let entry = store.index().get(&entry_id).unwrap();
1115 assert!(matches!(entry.as_ref(), CacheEntry::MemoryLiquid(_)));
1116 }
1117 }
1118}