Skip to main content

liquid_cache/cache/
core.rs

1use arrow::array::{ArrayRef, BooleanArray};
2use arrow::buffer::BooleanBuffer;
3use arrow_schema::DataType;
4use bytes::Bytes;
5use datafusion::physical_plan::PhysicalExpr;
6use futures::StreamExt;
7use std::path::PathBuf;
8
9use super::{
10    budget::BudgetAccounting,
11    builders::{EvaluatePredicate, Get, Insert},
12    cached_batch::{CacheEntry, CachedBatchType},
13    io_context::IoContext,
14    observer::{CacheTracer, InternalEvent, Observer},
15    policies::{CachePolicy, HydrationPolicy, HydrationRequest, MaterializedEntry},
16    utils::CacheConfig,
17};
18use crate::cache::policies::SqueezePolicy;
19use crate::cache::utils::{LiquidCompressorStates, arrow_to_bytes};
20use crate::cache::{CacheExpression, index::ArtIndex, utils::EntryID};
21use crate::cache::{CacheStats, EventTrace};
22use crate::cache::{DefaultSqueezeIo, RuntimeStats};
23use crate::liquid_array::{
24    LiquidSqueezedArrayRef, SqueezeIoHandler, SqueezedBacking, SqueezedDate32Array,
25    VariantStructSqueezedArray,
26};
27use crate::sync::Arc;
28
29// CacheStats and RuntimeStats moved to stats.rs
30
31/// Cache storage for liquid cache.
32///
33/// Example (async read):
34/// ```rust
35/// use liquid_cache::cache::{LiquidCacheBuilder, EntryID};
36/// use arrow::array::UInt64Array;
37/// use std::sync::Arc;
38///
39/// tokio_test::block_on(async {
40/// let storage = LiquidCacheBuilder::new().build();
41///
42/// let entry_id = EntryID::from(0);
43/// let arrow_array = Arc::new(UInt64Array::from_iter_values(0..32));
44/// storage.insert(entry_id, arrow_array.clone()).await;
45///
46/// // Get the arrow array back asynchronously
47/// let retrieved = storage.get(&entry_id).await.unwrap();
48/// assert_eq!(retrieved.as_ref(), arrow_array.as_ref());
49/// });
50/// ```
51#[derive(Debug)]
52pub struct LiquidCache {
53    index: ArtIndex,
54    config: CacheConfig,
55    budget: BudgetAccounting,
56    cache_policy: Box<dyn CachePolicy>,
57    hydration_policy: Box<dyn HydrationPolicy>,
58    squeeze_policy: Box<dyn SqueezePolicy>,
59    observer: Arc<Observer>,
60    io_context: Arc<dyn IoContext>,
61}
62
63/// Builder returned by [`LiquidCache::insert`] for configuring cache writes.
64impl LiquidCache {
65    /// Return current cache statistics: counts and resource usage.
66    pub fn stats(&self) -> CacheStats {
67        // Count entries by residency/format
68        let total_entries = self.index.entry_count();
69
70        let mut memory_arrow_entries = 0usize;
71        let mut memory_liquid_entries = 0usize;
72        let mut memory_squeezed_liquid_entries = 0usize;
73        let mut disk_liquid_entries = 0usize;
74        let mut disk_arrow_entries = 0usize;
75
76        let mut memory_arrow_bytes = 0usize;
77        let mut memory_liquid_bytes = 0usize;
78        let mut memory_squeezed_liquid_bytes = 0usize;
79
80        self.index.for_each(|_, batch| match batch {
81            CacheEntry::MemoryArrow(array) => {
82                memory_arrow_entries += 1;
83                memory_arrow_bytes += array.get_array_memory_size();
84            }
85            CacheEntry::MemoryLiquid(array) => {
86                memory_liquid_entries += 1;
87                memory_liquid_bytes += array.get_array_memory_size();
88            }
89            CacheEntry::MemorySqueezedLiquid(array) => {
90                memory_squeezed_liquid_entries += 1;
91                memory_squeezed_liquid_bytes += array.get_array_memory_size();
92            }
93            CacheEntry::DiskLiquid(_) => disk_liquid_entries += 1,
94            CacheEntry::DiskArrow(_) => disk_arrow_entries += 1,
95        });
96
97        let memory_usage_bytes = self.budget.memory_usage_bytes();
98        let disk_usage_bytes = self.budget.disk_usage_bytes();
99        let runtime = self.observer.runtime_snapshot();
100
101        CacheStats {
102            total_entries,
103            memory_arrow_entries,
104            memory_liquid_entries,
105            memory_squeezed_liquid_entries,
106            disk_liquid_entries,
107            disk_arrow_entries,
108            memory_arrow_bytes,
109            memory_liquid_bytes,
110            memory_squeezed_liquid_bytes,
111            memory_usage_bytes,
112            disk_usage_bytes,
113            max_cache_bytes: self.config.max_cache_bytes(),
114            cache_root_dir: self.config.cache_root_dir().clone(),
115            runtime,
116        }
117    }
118
119    /// Insert a batch into the cache.
120    pub fn insert<'a>(
121        self: &'a Arc<Self>,
122        entry_id: EntryID,
123        batch_to_cache: ArrayRef,
124    ) -> Insert<'a> {
125        Insert::new(self, entry_id, batch_to_cache)
126    }
127
128    /// Create a [`Get`] builder for the provided entry.
129    pub fn get<'a>(&'a self, entry_id: &'a EntryID) -> Get<'a> {
130        Get::new(self, entry_id)
131    }
132
133    /// Create an [`EvaluatePredicate`] builder for evaluating predicates on cached data.
134    pub fn eval_predicate<'a>(
135        &'a self,
136        entry_id: &'a EntryID,
137        predicate: &'a Arc<dyn PhysicalExpr>,
138    ) -> EvaluatePredicate<'a> {
139        EvaluatePredicate::new(self, entry_id, predicate)
140    }
141
142    /// Try to read a liquid array from the cache.
143    /// Returns None if the cached data is not in liquid format.
144    pub async fn try_read_liquid(
145        &self,
146        entry_id: &EntryID,
147    ) -> Option<crate::liquid_array::LiquidArrayRef> {
148        self.observer.on_try_read_liquid();
149        self.trace(InternalEvent::TryReadLiquid { entry: *entry_id });
150        let batch = self.index.get(entry_id)?;
151        self.cache_policy
152            .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
153
154        match batch.as_ref() {
155            CacheEntry::MemoryLiquid(array) => Some(array.clone()),
156            entry @ CacheEntry::DiskLiquid(_) => {
157                let liquid = self.read_disk_liquid_array(entry_id).await;
158                self.maybe_hydrate(entry_id, entry, MaterializedEntry::Liquid(&liquid), None)
159                    .await;
160                Some(liquid)
161            }
162            CacheEntry::MemorySqueezedLiquid(array) => match array.disk_backing() {
163                SqueezedBacking::Liquid => {
164                    let liquid = self.read_disk_liquid_array(entry_id).await;
165                    Some(liquid)
166                }
167                SqueezedBacking::Arrow => None,
168            },
169            CacheEntry::DiskArrow(_) | CacheEntry::MemoryArrow(_) => None,
170        }
171    }
172
173    /// Iterate over all entries in the cache.
174    /// No guarantees are made about the order of the entries.
175    /// Isolation level: read-committed
176    pub fn for_each_entry(&self, mut f: impl FnMut(&EntryID, &CacheEntry)) {
177        self.index.for_each(&mut f);
178    }
179
180    /// Reset the cache.
181    pub fn reset(&self) {
182        self.index.reset();
183        self.budget.reset_usage();
184    }
185
186    /// Check if a batch is cached.
187    pub fn is_cached(&self, entry_id: &EntryID) -> bool {
188        self.index.is_cached(entry_id)
189    }
190
191    /// Get the config of the cache.
192    pub fn config(&self) -> &CacheConfig {
193        &self.config
194    }
195
196    /// Get the budget of the cache.
197    pub fn budget(&self) -> &BudgetAccounting {
198        &self.budget
199    }
200
201    /// Get the tracer of the cache.
202    pub fn tracer(&self) -> &CacheTracer {
203        self.observer.cache_tracer()
204    }
205
206    /// Access the cache observer (runtime stats, debug event trace, and optional cache tracing).
207    pub fn observer(&self) -> &Observer {
208        &self.observer
209    }
210
211    fn runtime_stats(&self) -> &RuntimeStats {
212        self.observer.runtime_stats()
213    }
214
215    /// Get the compressor states of the cache.
216    pub fn compressor_states(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
217        self.io_context.get_compressor(entry_id)
218    }
219
220    /// Add a squeeze hint for an entry.
221    pub fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
222        self.io_context.add_squeeze_hint(entry_id, expression);
223    }
224
225    /// Flush all entries to disk.
226    pub async fn flush_all_to_disk(&self) {
227        let mut entires = Vec::new();
228        self.for_each_entry(|entry_id, batch| {
229            entires.push((*entry_id, batch.clone()));
230        });
231        for (entry_id, batch) in entires {
232            match &batch {
233                CacheEntry::MemoryArrow(array) => {
234                    let bytes = arrow_to_bytes(array).expect("failed to convert arrow to bytes");
235                    self.write_batch_to_disk(entry_id, &batch, bytes).await;
236                    self.try_insert(entry_id, CacheEntry::disk_arrow(array.data_type().clone()))
237                        .expect("failed to insert disk arrow entry");
238                }
239                CacheEntry::MemoryLiquid(liquid_array) => {
240                    let liquid_bytes = liquid_array.to_bytes();
241                    self.write_batch_to_disk(entry_id, &batch, Bytes::from(liquid_bytes))
242                        .await;
243                    self.try_insert(
244                        entry_id,
245                        CacheEntry::disk_liquid(liquid_array.original_arrow_data_type()),
246                    )
247                    .expect("failed to insert disk liquid entry");
248                }
249                CacheEntry::MemorySqueezedLiquid(array) => {
250                    // We don't have to do anything, because it's already on disk
251                    let disk_entry = Self::disk_entry_from_squeezed(array);
252                    self.try_insert(entry_id, disk_entry)
253                        .expect("failed to insert disk entry");
254                }
255                CacheEntry::DiskArrow(_) | CacheEntry::DiskLiquid(_) => {
256                    // Already on disk, skip
257                }
258            }
259        }
260    }
261}
262
263impl LiquidCache {
264    /// returns the batch that was written to disk
265    async fn write_in_memory_batch_to_disk(
266        &self,
267        entry_id: EntryID,
268        batch: CacheEntry,
269    ) -> CacheEntry {
270        match &batch {
271            batch @ CacheEntry::MemoryArrow(_) => {
272                let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(DefaultSqueezeIo::new(
273                    self.io_context.clone(),
274                    entry_id,
275                    self.observer.clone(),
276                ));
277                let (new_batch, bytes_to_write) = self.squeeze_policy.squeeze(
278                    batch,
279                    self.io_context.get_compressor(&entry_id).as_ref(),
280                    None,
281                    &squeeze_io,
282                );
283                if let Some(bytes_to_write) = bytes_to_write {
284                    self.write_batch_to_disk(entry_id, &new_batch, bytes_to_write)
285                        .await;
286                }
287                new_batch
288            }
289            CacheEntry::MemoryLiquid(liquid_array) => {
290                let liquid_bytes = Bytes::from(liquid_array.to_bytes());
291                self.write_batch_to_disk(entry_id, &batch, liquid_bytes)
292                    .await;
293                CacheEntry::disk_liquid(liquid_array.original_arrow_data_type())
294            }
295            CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
296                // The full data is already on disk, so we just need to mark ourself as disk entry
297                let backing = squeezed_array.disk_backing();
298                if backing == SqueezedBacking::Liquid {
299                    CacheEntry::disk_liquid(squeezed_array.original_arrow_data_type())
300                } else {
301                    CacheEntry::disk_arrow(squeezed_array.original_arrow_data_type())
302                }
303            }
304            CacheEntry::DiskLiquid(_) | CacheEntry::DiskArrow(_) => {
305                unreachable!("Unexpected batch in write_in_memory_batch_to_disk")
306            }
307        }
308    }
309
310    /// Insert a batch into the cache, it will run cache replacement policy until the batch is inserted.
311    pub(crate) async fn insert_inner(&self, entry_id: EntryID, mut batch_to_cache: CacheEntry) {
312        loop {
313            let Err(not_inserted) = self.try_insert(entry_id, batch_to_cache) else {
314                return;
315            };
316            self.trace(InternalEvent::InsertFailed {
317                entry: entry_id,
318                kind: CachedBatchType::from(&not_inserted),
319            });
320
321            let victims = self.cache_policy.find_victim(8);
322            if victims.is_empty() {
323                // no advice, because the cache is already empty
324                // this can happen if the entry to be inserted is too large, in that case,
325                // we write it to disk
326                let on_disk_batch = self
327                    .write_in_memory_batch_to_disk(entry_id, not_inserted)
328                    .await;
329                batch_to_cache = on_disk_batch;
330                continue;
331            }
332            self.squeeze_victims(victims).await;
333
334            batch_to_cache = not_inserted;
335            crate::utils::yield_now_if_shuttle();
336        }
337    }
338
339    /// Create a new instance of CacheStorage.
340    pub(crate) fn new(
341        batch_size: usize,
342        max_cache_bytes: usize,
343        cache_dir: PathBuf,
344        squeeze_policy: Box<dyn SqueezePolicy>,
345        cache_policy: Box<dyn CachePolicy>,
346        hydration_policy: Box<dyn HydrationPolicy>,
347        io_worker: Arc<dyn IoContext>,
348    ) -> Self {
349        let config = CacheConfig::new(batch_size, max_cache_bytes, cache_dir);
350        Self {
351            index: ArtIndex::new(),
352            budget: BudgetAccounting::new(config.max_cache_bytes()),
353            config,
354            cache_policy,
355            hydration_policy,
356            squeeze_policy,
357            observer: Arc::new(Observer::new()),
358            io_context: io_worker,
359        }
360    }
361
362    fn try_insert(&self, entry_id: EntryID, to_insert: CacheEntry) -> Result<(), CacheEntry> {
363        let new_memory_size = to_insert.memory_usage_bytes();
364        let cached_batch_type = if let Some(entry) = self.index.get(&entry_id) {
365            let old_memory_size = entry.memory_usage_bytes();
366            if self
367                .budget
368                .try_update_memory_usage(old_memory_size, new_memory_size)
369                .is_err()
370            {
371                return Err(to_insert);
372            }
373            let batch_type = CachedBatchType::from(&to_insert);
374            self.index.insert(&entry_id, to_insert);
375            batch_type
376        } else {
377            if self.budget.try_reserve_memory(new_memory_size).is_err() {
378                return Err(to_insert);
379            }
380            let batch_type = CachedBatchType::from(&to_insert);
381            self.index.insert(&entry_id, to_insert);
382            batch_type
383        };
384
385        self.trace(InternalEvent::InsertSuccess {
386            entry: entry_id,
387            kind: cached_batch_type,
388        });
389        self.cache_policy
390            .notify_insert(&entry_id, cached_batch_type);
391
392        Ok(())
393    }
394
395    /// Consume the trace of the cache, for testing only.
396    pub fn consume_event_trace(&self) -> EventTrace {
397        self.observer.consume_event_trace()
398    }
399
400    pub(crate) fn trace(&self, event: InternalEvent) {
401        self.observer.record_internal(event);
402    }
403
404    /// Get the index of the cache.
405    #[cfg(test)]
406    pub(crate) fn index(&self) -> &ArtIndex {
407        &self.index
408    }
409
410    #[fastrace::trace]
411    async fn squeeze_victims(&self, victims: Vec<EntryID>) {
412        // Run squeeze operations sequentially using async I/O
413        self.trace(InternalEvent::SqueezeBegin {
414            victims: victims.clone(),
415        });
416        futures::stream::iter(victims)
417            .for_each_concurrent(None, |victim| async move {
418                self.squeeze_victim_inner(victim).await;
419            })
420            .await;
421    }
422
423    async fn squeeze_victim_inner(&self, to_squeeze: EntryID) {
424        let Some(mut to_squeeze_batch) = self.index.get(&to_squeeze) else {
425            return;
426        };
427        self.trace(InternalEvent::SqueezeVictim { entry: to_squeeze });
428        let compressor = self.io_context.get_compressor(&to_squeeze);
429        let squeeze_hint_arc = self.io_context.squeeze_hint(&to_squeeze);
430        let squeeze_hint = squeeze_hint_arc.as_deref();
431        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(DefaultSqueezeIo::new(
432            self.io_context.clone(),
433            to_squeeze,
434            self.observer.clone(),
435        ));
436
437        loop {
438            let (new_batch, bytes_to_write) = self.squeeze_policy.squeeze(
439                to_squeeze_batch.as_ref(),
440                compressor.as_ref(),
441                squeeze_hint,
442                &squeeze_io,
443            );
444
445            if let Some(bytes_to_write) = bytes_to_write {
446                self.write_batch_to_disk(to_squeeze, &new_batch, bytes_to_write)
447                    .await;
448            }
449            match self.try_insert(to_squeeze, new_batch) {
450                Ok(()) => {
451                    break;
452                }
453                Err(batch) => {
454                    to_squeeze_batch = Arc::new(batch);
455                }
456            }
457        }
458    }
459
460    fn disk_entry_from_squeezed(array: &LiquidSqueezedArrayRef) -> CacheEntry {
461        let constructor: fn(DataType) -> CacheEntry = match array.disk_backing() {
462            SqueezedBacking::Liquid => CacheEntry::disk_liquid,
463            SqueezedBacking::Arrow => CacheEntry::disk_arrow,
464        };
465        constructor(array.original_arrow_data_type())
466    }
467
468    async fn maybe_hydrate(
469        &self,
470        entry_id: &EntryID,
471        cached: &CacheEntry,
472        materialized: MaterializedEntry<'_>,
473        expression: Option<&CacheExpression>,
474    ) {
475        let compressor = self.io_context.get_compressor(entry_id);
476        if let Some(new_entry) = self.hydration_policy.hydrate(&HydrationRequest {
477            entry_id: *entry_id,
478            cached,
479            materialized,
480            expression,
481            compressor,
482        }) {
483            let cached_type = CachedBatchType::from(cached);
484            let new_type = CachedBatchType::from(&new_entry);
485            self.trace(InternalEvent::Hydrate {
486                entry: *entry_id,
487                cached: cached_type,
488                new: new_type,
489            });
490            self.insert_inner(*entry_id, new_entry).await;
491        }
492    }
493
494    pub(crate) async fn read_arrow_array(
495        &self,
496        entry_id: &EntryID,
497        selection: Option<&BooleanBuffer>,
498        expression: Option<&CacheExpression>,
499    ) -> Option<ArrayRef> {
500        use arrow::array::BooleanArray;
501
502        let batch = self.index.get(entry_id)?;
503        self.cache_policy
504            .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
505        self.trace(InternalEvent::Read {
506            entry: *entry_id,
507            selection: selection.is_some(),
508            expr: expression.cloned(),
509            cached: CachedBatchType::from(batch.as_ref()),
510        });
511
512        match batch.as_ref() {
513            CacheEntry::MemoryArrow(array) => match selection {
514                Some(selection) => {
515                    let selection_array = BooleanArray::new(selection.clone(), None);
516                    arrow::compute::filter(array, &selection_array).ok()
517                }
518                None => Some(array.clone()),
519            },
520            CacheEntry::MemoryLiquid(array) => match selection {
521                Some(selection) => Some(array.filter(selection)),
522                None => Some(array.to_arrow_array()),
523            },
524            CacheEntry::DiskArrow(_) | CacheEntry::DiskLiquid(_) => {
525                self.read_disk_array(batch.as_ref(), entry_id, expression, selection)
526                    .await
527            }
528            CacheEntry::MemorySqueezedLiquid(array) => {
529                self.read_squeezed_array(array, entry_id, expression, selection)
530                    .await
531            }
532        }
533    }
534
535    async fn read_disk_array(
536        &self,
537        entry: &CacheEntry,
538        entry_id: &EntryID,
539        expression: Option<&CacheExpression>,
540        selection: Option<&BooleanBuffer>,
541    ) -> Option<ArrayRef> {
542        match entry {
543            CacheEntry::DiskArrow(data_type) => {
544                if let Some(selection) = selection
545                    && selection.count_set_bits() == 0
546                {
547                    return Some(arrow::array::new_empty_array(data_type));
548                }
549                let full_array = self.read_disk_arrow_array(entry_id).await;
550                self.maybe_hydrate(
551                    entry_id,
552                    entry,
553                    MaterializedEntry::Arrow(&full_array),
554                    expression,
555                )
556                .await;
557                match selection {
558                    Some(selection) => {
559                        let selection_array = BooleanArray::new(selection.clone(), None);
560                        arrow::compute::filter(&full_array, &selection_array).ok()
561                    }
562                    None => Some(full_array),
563                }
564            }
565            CacheEntry::DiskLiquid(data_type) => {
566                if let Some(selection) = selection
567                    && selection.count_set_bits() == 0
568                {
569                    return Some(arrow::array::new_empty_array(data_type));
570                }
571                let liquid = self.read_disk_liquid_array(entry_id).await;
572                self.maybe_hydrate(
573                    entry_id,
574                    entry,
575                    MaterializedEntry::Liquid(&liquid),
576                    expression,
577                )
578                .await;
579                match selection {
580                    Some(selection) => Some(liquid.filter(selection)),
581                    None => Some(liquid.to_arrow_array()),
582                }
583            }
584            _ => unreachable!("Unexpected batch in read_disk_array"),
585        }
586    }
587
588    async fn read_squeezed_array(
589        &self,
590        array: &LiquidSqueezedArrayRef,
591        entry_id: &EntryID,
592        expression: Option<&CacheExpression>,
593        selection: Option<&BooleanBuffer>,
594    ) -> Option<ArrayRef> {
595        if let Some(array) = self.try_read_squeezed_date32_array(array, expression, selection) {
596            self.observer.on_get_squeezed_success();
597            self.trace(InternalEvent::ReadSqueezedData {
598                entry: *entry_id,
599                expression: expression.unwrap().clone(),
600            });
601            return Some(array);
602        }
603
604        if let Some(array) = self
605            .try_read_squeezed_variant_array(array, entry_id, expression, selection)
606            .await
607        {
608            self.observer.on_get_squeezed_success();
609            self.trace(InternalEvent::ReadSqueezedData {
610                entry: *entry_id,
611                expression: expression.unwrap().clone(),
612            });
613            return Some(array);
614        }
615
616        // no shortcut, needs to read full data
617        let out = match selection {
618            Some(selection) => array.filter(selection).await,
619            None => array.to_arrow_array().await,
620        };
621        Some(out)
622    }
623
624    fn try_read_squeezed_date32_array(
625        &self,
626        array: &LiquidSqueezedArrayRef,
627        expression: Option<&CacheExpression>,
628        selection: Option<&BooleanBuffer>,
629    ) -> Option<ArrayRef> {
630        if let Some(CacheExpression::ExtractDate32 { field }) = expression
631            && let Some(squeezed) = array.as_any().downcast_ref::<SqueezedDate32Array>()
632            && squeezed.field() == *field
633        {
634            let component = squeezed.to_component_array();
635            self.observer.on_hit_date32_expression();
636            if let Some(selection) = selection {
637                let selection_array = BooleanArray::new(selection.clone(), None);
638                let filtered = arrow::compute::filter(&component, &selection_array).ok()?;
639                return Some(filtered);
640            }
641            return Some(component);
642        }
643        None
644    }
645
646    async fn try_read_squeezed_variant_array(
647        &self,
648        array: &LiquidSqueezedArrayRef,
649        entry_id: &EntryID,
650        expression: Option<&CacheExpression>,
651        selection: Option<&BooleanBuffer>,
652    ) -> Option<ArrayRef> {
653        let requests = expression.and_then(|expr| expr.variant_requests())?;
654        let variant_squeezed = array
655            .as_any()
656            .downcast_ref::<VariantStructSqueezedArray>()?;
657        let all_paths_present = requests
658            .iter()
659            .all(|request| variant_squeezed.contains_path(request.path()));
660
661        let full_array = if !all_paths_present {
662            let batch = CacheEntry::MemorySqueezedLiquid(array.clone());
663            self.observer.on_get_squeezed_needs_io();
664            let full_array = self.read_disk_arrow_array(entry_id).await;
665            self.maybe_hydrate(
666                entry_id,
667                &batch,
668                MaterializedEntry::Arrow(&full_array),
669                expression,
670            )
671            .await;
672            full_array
673        } else {
674            let requested_paths = requests.iter().map(|r| r.path());
675            variant_squeezed
676                .to_arrow_array_with_paths(requested_paths)
677                .unwrap()
678        };
679
680        match selection {
681            Some(selection) => {
682                let selection_array = BooleanArray::new(selection.clone(), None);
683                arrow::compute::filter(&full_array, &selection_array).ok()
684            }
685            None => Some(full_array),
686        }
687    }
688
689    async fn write_batch_to_disk(&self, entry_id: EntryID, batch: &CacheEntry, bytes: Bytes) {
690        self.trace(InternalEvent::IoWrite {
691            entry: entry_id,
692            kind: CachedBatchType::from(batch),
693            bytes: bytes.len(),
694        });
695        let path = self.io_context.disk_path(&entry_id);
696        let len = bytes.len();
697        self.io_context.write_file(path, bytes).await.unwrap();
698        self.budget.add_used_disk_bytes(len);
699    }
700
701    async fn read_disk_arrow_array(&self, entry_id: &EntryID) -> ArrayRef {
702        let path = self.io_context.disk_path(entry_id);
703        let bytes = self.io_context.read(path, None).await.expect("read failed");
704        let cursor = std::io::Cursor::new(bytes.to_vec());
705        let mut reader =
706            arrow::ipc::reader::StreamReader::try_new(cursor, None).expect("create reader failed");
707        let batch = reader.next().unwrap().expect("read batch failed");
708        let array = batch.column(0).clone();
709        self.trace(InternalEvent::IoReadArrow {
710            entry: *entry_id,
711            bytes: bytes.len(),
712        });
713        array
714    }
715
716    async fn read_disk_liquid_array(
717        &self,
718        entry_id: &EntryID,
719    ) -> crate::liquid_array::LiquidArrayRef {
720        let path = self.io_context.disk_path(entry_id);
721        let bytes = self.io_context.read(path, None).await.expect("read failed");
722        self.trace(InternalEvent::IoReadLiquid {
723            entry: *entry_id,
724            bytes: bytes.len(),
725        });
726        let compressor_states = self.io_context.get_compressor(entry_id);
727        let compressor = compressor_states.fsst_compressor();
728
729        (crate::liquid_array::ipc::read_from_bytes(
730            bytes,
731            &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
732        )) as _
733    }
734
735    pub(crate) async fn eval_predicate_internal(
736        &self,
737        entry_id: &EntryID,
738        selection_opt: Option<&BooleanBuffer>,
739        predicate: &Arc<dyn PhysicalExpr>,
740    ) -> Option<Result<BooleanArray, ArrayRef>> {
741        use arrow::array::BooleanArray;
742
743        self.observer.on_eval_predicate();
744        let batch = self.index.get(entry_id)?;
745        self.cache_policy
746            .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
747        self.trace(InternalEvent::EvalPredicate {
748            entry: *entry_id,
749            selection: selection_opt.is_some(),
750            cached: CachedBatchType::from(batch.as_ref()),
751        });
752
753        match batch.as_ref() {
754            CacheEntry::MemoryArrow(array) => {
755                let mut owned = None;
756                let selection = selection_opt.unwrap_or_else(|| {
757                    owned = Some(BooleanBuffer::new_set(array.len()));
758                    owned.as_ref().unwrap()
759                });
760                let selection_array = BooleanArray::new(selection.clone(), None);
761                let filtered = arrow::compute::filter(array, &selection_array).ok()?;
762                Some(Err(filtered))
763            }
764            entry @ CacheEntry::DiskArrow(_) => {
765                let array = self.read_disk_arrow_array(entry_id).await;
766                self.maybe_hydrate(entry_id, entry, MaterializedEntry::Arrow(&array), None)
767                    .await;
768                let mut owned = None;
769                let selection = selection_opt.unwrap_or_else(|| {
770                    owned = Some(BooleanBuffer::new_set(array.len()));
771                    owned.as_ref().unwrap()
772                });
773                let selection_array = BooleanArray::new(selection.clone(), None);
774                let filtered = arrow::compute::filter(&array, &selection_array).ok()?;
775                Some(Err(filtered))
776            }
777            CacheEntry::MemoryLiquid(array) => {
778                let mut owned = None;
779                let selection = selection_opt.unwrap_or_else(|| {
780                    owned = Some(BooleanBuffer::new_set(array.len()));
781                    owned.as_ref().unwrap()
782                });
783                match array.try_eval_predicate(predicate, selection) {
784                    Some(buf) => Some(Ok(buf)),
785                    None => {
786                        self.runtime_stats().incr_eval_predicate_on_liquid_failed();
787                        let filtered = array.filter(selection);
788                        Some(Err(filtered))
789                    }
790                }
791            }
792            entry @ CacheEntry::DiskLiquid(_) => {
793                let liquid = self.read_disk_liquid_array(entry_id).await;
794                self.maybe_hydrate(entry_id, entry, MaterializedEntry::Liquid(&liquid), None)
795                    .await;
796                let mut owned = None;
797                let selection = selection_opt.unwrap_or_else(|| {
798                    owned = Some(BooleanBuffer::new_set(liquid.len()));
799                    owned.as_ref().unwrap()
800                });
801                match liquid.try_eval_predicate(predicate, selection) {
802                    Some(buf) => Some(Ok(buf)),
803                    None => {
804                        self.runtime_stats().incr_eval_predicate_on_liquid_failed();
805                        let filtered = liquid.filter(selection);
806                        Some(Err(filtered))
807                    }
808                }
809            }
810            CacheEntry::MemorySqueezedLiquid(array) => {
811                self.eval_predicate_on_squeezed(array, selection_opt, predicate)
812                    .await
813            }
814        }
815    }
816
817    async fn eval_predicate_on_squeezed(
818        &self,
819        array: &LiquidSqueezedArrayRef,
820        selection_opt: Option<&BooleanBuffer>,
821        predicate: &Arc<dyn PhysicalExpr>,
822    ) -> Option<Result<BooleanArray, ArrayRef>> {
823        let mut owned = None;
824        let selection = selection_opt.unwrap_or_else(|| {
825            owned = Some(BooleanBuffer::new_set(array.len()));
826            owned.as_ref().unwrap()
827        });
828        match array.try_eval_predicate(predicate, selection).await {
829            Some(buf) => Some(Ok(buf)),
830            None => Some(Err(array.filter(selection).await)),
831        }
832    }
833}
834
835#[cfg(test)]
836mod tests {
837    use super::*;
838    use crate::cache::{
839        CacheEntry, CacheExpression, CachePolicy, LiquidCacheBuilder, TranscodeSqueezeEvict,
840        policies::LruPolicy,
841        transcode_liquid_inner,
842        utils::{
843            LiquidCompressorStates, create_cache_store, create_test_array, create_test_arrow_array,
844        },
845    };
846    use crate::liquid_array::{
847        Date32Field, LiquidPrimitiveArray, LiquidSqueezedArrayRef, SqueezedDate32Array,
848    };
849    use crate::sync::thread;
850    use arrow::array::{Array, ArrayRef, Date32Array, Int32Array};
851    use arrow::datatypes::Date32Type;
852    use std::future::Future;
853    use std::sync::Arc;
854    use std::sync::atomic::{AtomicUsize, Ordering};
855
856    // Unified advice type for more concise testing
857    #[derive(Debug)]
858    struct TestPolicy {
859        target_id: Option<EntryID>,
860        advice_count: AtomicUsize,
861    }
862
863    impl TestPolicy {
864        fn new(target_id: Option<EntryID>) -> Self {
865            Self {
866                target_id,
867                advice_count: AtomicUsize::new(0),
868            }
869        }
870    }
871
872    impl CachePolicy for TestPolicy {
873        fn find_victim(&self, _cnt: usize) -> Vec<EntryID> {
874            self.advice_count.fetch_add(1, Ordering::SeqCst);
875            let id_to_use = self.target_id.unwrap();
876            vec![id_to_use]
877        }
878    }
879
880    #[tokio::test]
881    async fn test_basic_cache_operations() {
882        // Test basic insert, get, and size tracking in one test
883        let budget_size = 10 * 1024;
884        let store = create_cache_store(budget_size, Box::new(LruPolicy::new()));
885
886        // 1. Initial budget should be empty
887        assert_eq!(store.budget.memory_usage_bytes(), 0);
888
889        // 2. Insert and verify first entry
890        let entry_id1: EntryID = EntryID::from(1);
891        let array1 = create_test_array(100);
892        let size1 = array1.memory_usage_bytes();
893        store.insert_inner(entry_id1, array1).await;
894
895        // Verify budget usage and data correctness
896        assert_eq!(store.budget.memory_usage_bytes(), size1);
897        let retrieved1 = store.index().get(&entry_id1).unwrap();
898        match retrieved1.as_ref() {
899            CacheEntry::MemoryArrow(arr) => assert_eq!(arr.len(), 100),
900            _ => panic!("Expected ArrowMemory"),
901        }
902
903        let entry_id2: EntryID = EntryID::from(2);
904        let array2 = create_test_array(200);
905        let size2 = array2.memory_usage_bytes();
906        store.insert_inner(entry_id2, array2).await;
907
908        assert_eq!(store.budget.memory_usage_bytes(), size1 + size2);
909
910        let array3 = create_test_array(150);
911        let size3 = array3.memory_usage_bytes();
912        store.insert_inner(entry_id1, array3).await;
913
914        assert_eq!(store.budget.memory_usage_bytes(), size3 + size2);
915        assert!(store.index().get(&EntryID::from(999)).is_none());
916    }
917
918    #[tokio::test]
919    async fn get_arrow_array_with_expression_extracts_year() {
920        let store = create_cache_store(1 << 20, Box::new(LruPolicy::new()));
921        let entry_id = EntryID::from(42);
922
923        let date_values = Date32Array::from(vec![Some(0), Some(365), None, Some(730)]);
924        let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(date_values.clone());
925        let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, Date32Field::Year);
926        let squeezed: LiquidSqueezedArrayRef = Arc::new(squeezed);
927
928        store
929            .insert_inner(
930                entry_id,
931                CacheEntry::memory_squeezed_liquid(squeezed.clone()),
932            )
933            .await;
934
935        let expr = Arc::new(CacheExpression::extract_date32(Date32Field::Year));
936        let result = store
937            .get(&entry_id)
938            .with_expression_hint(expr)
939            .read()
940            .await
941            .expect("array present");
942
943        let result = result
944            .as_any()
945            .downcast_ref::<Date32Array>()
946            .expect("date32 result");
947        assert_eq!(result.len(), 4);
948        assert_eq!(result.value(0), 1970);
949        assert_eq!(result.value(1), 1971);
950        assert!(result.is_null(2));
951        assert_eq!(result.value(3), 1972);
952    }
953
954    #[tokio::test]
955    async fn test_cache_advice_strategies() {
956        // Comprehensive test of all three advice types
957
958        // Create entry IDs we'll use throughout the test
959        let entry_id1 = EntryID::from(1);
960        let entry_id2 = EntryID::from(2);
961
962        // 1. Test EVICT advice
963        {
964            let advisor = TestPolicy::new(Some(entry_id1));
965            let store = create_cache_store(8000, Box::new(advisor)); // Small budget to force advice
966
967            store.insert_inner(entry_id1, create_test_array(800)).await;
968            match store.index().get(&entry_id1).unwrap().as_ref() {
969                CacheEntry::MemoryArrow(_) => {}
970                other => panic!("Expected ArrowMemory, got {other:?}"),
971            }
972
973            store.insert_inner(entry_id2, create_test_array(800)).await;
974            match store.index().get(&entry_id1).unwrap().as_ref() {
975                CacheEntry::MemoryLiquid(_) => {}
976                other => panic!("Expected LiquidMemory after eviction, got {other:?}"),
977            }
978        }
979    }
980
981    #[tokio::test]
982    async fn test_concurrent_cache_operations() {
983        concurrent_cache_operations().await;
984    }
985
986    #[cfg(feature = "shuttle")]
987    #[test]
988    fn shuttle_cache_operations() {
989        crate::utils::shuttle_test(|| {
990            block_on(concurrent_cache_operations());
991        });
992    }
993
994    pub fn block_on<F: Future>(future: F) -> F::Output {
995        #[cfg(feature = "shuttle")]
996        {
997            shuttle::future::block_on(future)
998        }
999        #[cfg(not(feature = "shuttle"))]
1000        {
1001            tokio_test::block_on(future)
1002        }
1003    }
1004
1005    async fn concurrent_cache_operations() {
1006        let num_threads = 3;
1007        let ops_per_thread = 50;
1008
1009        let budget_size = num_threads * ops_per_thread * 100 * 8 / 2;
1010        let store = Arc::new(create_cache_store(budget_size, Box::new(LruPolicy::new())));
1011
1012        let mut handles = vec![];
1013        for thread_id in 0..num_threads {
1014            let store = store.clone();
1015            handles.push(thread::spawn(move || {
1016                block_on(async {
1017                    for i in 0..ops_per_thread {
1018                        let unique_id = thread_id * ops_per_thread + i;
1019                        let entry_id: EntryID = EntryID::from(unique_id);
1020                        let array = create_test_arrow_array(100);
1021                        store.insert(entry_id, array).await;
1022                    }
1023                });
1024            }));
1025        }
1026        for handle in handles {
1027            handle.join().unwrap();
1028        }
1029
1030        // Invariant 1: Every previously inserted entry can be retrieved
1031        for thread_id in 0..num_threads {
1032            for i in 0..ops_per_thread {
1033                let unique_id = thread_id * ops_per_thread + i;
1034                let entry_id: EntryID = EntryID::from(unique_id);
1035                assert!(store.index().get(&entry_id).is_some());
1036            }
1037        }
1038
1039        // Invariant 2: Number of entries matches number of insertions
1040        assert_eq!(store.index().keys().len(), num_threads * ops_per_thread);
1041    }
1042
1043    #[tokio::test]
1044    async fn test_cache_stats_memory_and_disk_usage() {
1045        // Build a small cache in blocking liquid mode to avoid background tasks
1046        let storage = LiquidCacheBuilder::new()
1047            .with_max_cache_bytes(10 * 1024 * 1024)
1048            .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1049            .build();
1050
1051        // Insert two small batches
1052        let arr1: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
1053        let arr2: ArrayRef = Arc::new(Int32Array::from_iter_values(0..128));
1054        storage.insert(EntryID::from(1usize), arr1).await;
1055        storage.insert(EntryID::from(2usize), arr2).await;
1056
1057        // Stats after insert: 2 entries, memory usage > 0, disk usage == 0
1058        let s = storage.stats();
1059        assert_eq!(s.total_entries, 2);
1060        assert!(s.memory_usage_bytes > 0);
1061        assert_eq!(s.disk_usage_bytes, 0);
1062        assert_eq!(s.max_cache_bytes, 10 * 1024 * 1024);
1063
1064        // Flush to disk and verify memory usage drops and disk usage increases
1065        storage.flush_all_to_disk().await;
1066        let s2 = storage.stats();
1067        assert_eq!(s2.total_entries, 2);
1068        assert!(s2.disk_usage_bytes > 0);
1069        // In-memory usage should be reduced after moving to on-disk formats
1070        assert!(s2.memory_usage_bytes <= s.memory_usage_bytes);
1071    }
1072
1073    #[tokio::test]
1074    async fn hydrate_disk_arrow_on_get_promotes_to_memory() {
1075        let store = create_cache_store(1 << 20, Box::new(LruPolicy::new()));
1076        let entry_id = EntryID::from(321usize);
1077        let array = create_test_arrow_array(8);
1078
1079        store.insert(entry_id, array.clone()).await;
1080        store.flush_all_to_disk().await;
1081        {
1082            let entry = store.index().get(&entry_id).unwrap();
1083            assert!(matches!(entry.as_ref(), CacheEntry::DiskArrow(_)));
1084        }
1085
1086        let result = store.get(&entry_id).await.expect("present");
1087        assert_eq!(result.as_ref(), array.as_ref());
1088        {
1089            let entry = store.index().get(&entry_id).unwrap();
1090            assert!(matches!(entry.as_ref(), CacheEntry::MemoryArrow(_)));
1091        }
1092    }
1093
1094    #[tokio::test]
1095    async fn hydrate_disk_liquid_on_get_promotes_to_memory_liquid() {
1096        let store = create_cache_store(1 << 20, Box::new(LruPolicy::new()));
1097        let entry_id = EntryID::from(322usize);
1098        let arrow_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1099        let compressor = LiquidCompressorStates::new();
1100        let liquid = transcode_liquid_inner(&arrow_array, &compressor).unwrap();
1101
1102        store
1103            .insert_inner(entry_id, CacheEntry::memory_liquid(liquid.clone()))
1104            .await;
1105        store.flush_all_to_disk().await;
1106        {
1107            let entry = store.index().get(&entry_id).unwrap();
1108            assert!(matches!(entry.as_ref(), CacheEntry::DiskLiquid(_)));
1109        }
1110
1111        let result = store.get(&entry_id).await.expect("present");
1112        assert_eq!(result.as_ref(), arrow_array.as_ref());
1113        {
1114            let entry = store.index().get(&entry_id).unwrap();
1115            assert!(matches!(entry.as_ref(), CacheEntry::MemoryLiquid(_)));
1116        }
1117    }
1118}