Skip to main content

liquid_cache/cache/
core.rs

1use arrow::array::cast::AsArray;
2use arrow::array::{ArrayRef, BooleanArray};
3use arrow::buffer::BooleanBuffer;
4use arrow::record_batch::RecordBatch;
5use arrow_schema::{Field, Schema};
6use bytes::Bytes;
7use futures::StreamExt;
8
9use super::{
10    budget::BudgetAccounting,
11    builders::{EvaluatePredicate, Get, Insert},
12    cached_batch::{CacheEntry, CachedBatchType},
13    io_context::{EntryMetadata, entry_id_to_key},
14    observer::{CacheTracer, InternalEvent, Observer},
15    policies::{CachePolicy, HydrationPolicy, HydrationRequest, MaterializedEntry},
16    utils::CacheConfig,
17};
18use crate::cache::DefaultSqueezeIo;
19use crate::cache::policies::{SqueezeOutcome, SqueezePolicy};
20use crate::cache::utils::{LiquidCompressorStates, arrow_to_bytes};
21use crate::cache::{CacheExpression, LiquidExpr, index::ArtIndex, utils::EntryID};
22use crate::cache::{CacheFull, CacheStats, EventTrace};
23use crate::liquid_array::{
24    LiquidSqueezedArrayRef, SqueezeIoHandler, SqueezedBacking, SqueezedDate32Array,
25    VariantStructSqueezedArray,
26};
27use crate::sync::Arc;
28
29// CacheStats and RuntimeStats moved to stats.rs
30
31/// Cache storage for liquid cache.
32///
33/// Example (async read):
34/// ```rust
35/// use liquid_cache::cache::{LiquidCacheBuilder, EntryID};
36/// use arrow::array::UInt64Array;
37/// use std::sync::Arc;
38///
39/// tokio_test::block_on(async {
40/// let storage = LiquidCacheBuilder::new().build().await;
41///
42/// let entry_id = EntryID::from(0);
43/// let arrow_array = Arc::new(UInt64Array::from_iter_values(0..32));
44/// storage.insert(entry_id, arrow_array.clone()).await;
45///
46/// // Get the arrow array back asynchronously
47/// let retrieved = storage.get(&entry_id).await.unwrap();
48/// assert_eq!(retrieved.as_ref(), arrow_array.as_ref());
49/// });
50/// ```
51#[derive(Debug)]
52pub struct LiquidCache {
53    index: ArtIndex,
54    config: CacheConfig,
55    budget: BudgetAccounting,
56    cache_policy: Box<dyn CachePolicy>,
57    hydration_policy: Box<dyn HydrationPolicy>,
58    squeeze_policy: Box<dyn SqueezePolicy>,
59    observer: Arc<Observer>,
60    metadata: Arc<dyn EntryMetadata>,
61    store: t4::Store,
62    squeeze_victims_concurrently: bool,
63}
64
65/// Builder returned by [`LiquidCache::insert`] for configuring cache writes.
66impl LiquidCache {
67    /// Return current cache statistics: counts and resource usage.
68    pub fn stats(&self) -> CacheStats {
69        // Count entries by residency/format
70        let total_entries = self.index.entry_count();
71
72        let mut memory_arrow_entries = 0usize;
73        let mut memory_liquid_entries = 0usize;
74        let mut memory_squeezed_liquid_entries = 0usize;
75        let mut disk_liquid_entries = 0usize;
76        let mut disk_arrow_entries = 0usize;
77
78        let mut memory_arrow_bytes = 0usize;
79        let mut memory_liquid_bytes = 0usize;
80        let mut memory_squeezed_liquid_bytes = 0usize;
81
82        self.index.for_each(|_, batch| match batch {
83            CacheEntry::MemoryArrow(array) => {
84                memory_arrow_entries += 1;
85                memory_arrow_bytes += array.get_array_memory_size();
86            }
87            CacheEntry::MemoryLiquid(array) => {
88                memory_liquid_entries += 1;
89                memory_liquid_bytes += array.get_array_memory_size();
90            }
91            CacheEntry::MemorySqueezedLiquid(array) => {
92                memory_squeezed_liquid_entries += 1;
93                memory_squeezed_liquid_bytes += array.get_array_memory_size();
94            }
95            CacheEntry::DiskLiquid { .. } => disk_liquid_entries += 1,
96            CacheEntry::DiskArrow { .. } => disk_arrow_entries += 1,
97        });
98
99        let memory_usage_bytes = self.budget.memory_usage_bytes();
100        let disk_usage_bytes = self.budget.disk_usage_bytes();
101        let runtime = self.observer.runtime_snapshot();
102
103        CacheStats {
104            total_entries,
105            memory_arrow_entries,
106            memory_liquid_entries,
107            memory_squeezed_liquid_entries,
108            disk_liquid_entries,
109            disk_arrow_entries,
110            memory_arrow_bytes,
111            memory_liquid_bytes,
112            memory_squeezed_liquid_bytes,
113            memory_usage_bytes,
114            disk_usage_bytes,
115            max_memory_bytes: self.config.max_memory_bytes(),
116            max_disk_bytes: self.config.max_disk_bytes(),
117            runtime,
118        }
119    }
120
121    /// Insert a batch into the cache.
122    pub fn insert<'a>(
123        self: &'a Arc<Self>,
124        entry_id: EntryID,
125        batch_to_cache: ArrayRef,
126    ) -> Insert<'a> {
127        Insert::new(self, entry_id, batch_to_cache)
128    }
129
130    /// Create a [`Get`] builder for the provided entry.
131    pub fn get<'a>(&'a self, entry_id: &'a EntryID) -> Get<'a> {
132        Get::new(self, entry_id)
133    }
134
135    /// Create an [`EvaluatePredicate`] builder for evaluating predicates on cached data.
136    pub fn eval_predicate<'a>(
137        &'a self,
138        entry_id: &'a EntryID,
139        predicate: &'a LiquidExpr,
140    ) -> EvaluatePredicate<'a> {
141        EvaluatePredicate::new(self, entry_id, predicate)
142    }
143
144    /// Try to read a liquid array from the cache.
145    /// Returns None if the cached data is not in liquid format.
146    pub async fn try_read_liquid(
147        &self,
148        entry_id: &EntryID,
149    ) -> Option<crate::liquid_array::LiquidArrayRef> {
150        self.observer.on_try_read_liquid();
151        self.trace(InternalEvent::TryReadLiquid { entry: *entry_id });
152        let batch = self.index.get(entry_id)?;
153        self.cache_policy
154            .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
155
156        match batch.as_ref() {
157            CacheEntry::MemoryLiquid(array) => Some(array.clone()),
158            entry @ CacheEntry::DiskLiquid { .. } => {
159                let liquid = self.read_disk_liquid_array(entry_id).await;
160                self.maybe_hydrate(entry_id, entry, MaterializedEntry::Liquid(&liquid), None)
161                    .await;
162                Some(liquid)
163            }
164            CacheEntry::MemorySqueezedLiquid(array) => match array.disk_backing() {
165                SqueezedBacking::Liquid(_) => {
166                    let liquid = self.read_disk_liquid_array(entry_id).await;
167                    Some(liquid)
168                }
169                SqueezedBacking::Arrow(_) => None,
170            },
171            CacheEntry::DiskArrow { .. } | CacheEntry::MemoryArrow(_) => None,
172        }
173    }
174
175    /// Iterate over all entries in the cache.
176    /// No guarantees are made about the order of the entries.
177    /// Isolation level: read-committed
178    pub fn for_each_entry(&self, mut f: impl FnMut(&EntryID, &CacheEntry)) {
179        self.index.for_each(&mut f);
180    }
181
182    /// Reset the cache.
183    pub fn reset(&self) {
184        self.index.reset();
185        self.budget.reset_usage();
186    }
187
188    /// Check if a batch is cached.
189    pub fn is_cached(&self, entry_id: &EntryID) -> bool {
190        self.index.is_cached(entry_id)
191    }
192
193    /// Get the config of the cache.
194    pub fn config(&self) -> &CacheConfig {
195        &self.config
196    }
197
198    /// Get the budget of the cache.
199    pub fn budget(&self) -> &BudgetAccounting {
200        &self.budget
201    }
202
203    /// Get the tracer of the cache.
204    pub fn tracer(&self) -> &CacheTracer {
205        self.observer.cache_tracer()
206    }
207
208    /// Access the cache observer (runtime stats, debug event trace, and optional cache tracing).
209    pub fn observer(&self) -> &Observer {
210        &self.observer
211    }
212
213    /// Get the compressor states of the cache.
214    pub fn compressor_states(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
215        self.metadata.get_compressor(entry_id)
216    }
217
218    /// Add a squeeze hint for an entry.
219    pub fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
220        self.metadata.add_squeeze_hint(entry_id, expression);
221    }
222
223    /// Flush all entries to disk.
224    pub async fn flush_all_to_disk(&self) -> Result<(), CacheFull> {
225        let mut entires = Vec::new();
226        self.for_each_entry(|entry_id, batch| {
227            entires.push((*entry_id, batch.clone()));
228        });
229        for (entry_id, batch) in entires {
230            match &batch {
231                CacheEntry::MemoryArrow(array) => {
232                    let bytes = arrow_to_bytes(array).expect("failed to convert arrow to bytes");
233                    let disk_bytes = bytes.len();
234                    match self.write_batch_to_disk(entry_id, &batch, bytes).await {
235                        Ok(()) => {
236                            self.try_insert(
237                                entry_id,
238                                CacheEntry::disk_arrow(array.data_type().clone(), disk_bytes),
239                            )
240                            .expect("failed to insert disk arrow entry");
241                        }
242                        Err(CacheFull) => self.drop_memory_entry(entry_id, &batch),
243                    }
244                }
245                CacheEntry::MemoryLiquid(liquid_array) => {
246                    let liquid_bytes = liquid_array.to_bytes();
247                    let disk_bytes = liquid_bytes.len();
248                    match self
249                        .write_batch_to_disk(entry_id, &batch, Bytes::from(liquid_bytes))
250                        .await
251                    {
252                        Ok(()) => {
253                            self.try_insert(
254                                entry_id,
255                                CacheEntry::disk_liquid(
256                                    liquid_array.original_arrow_data_type(),
257                                    disk_bytes,
258                                ),
259                            )
260                            .expect("failed to insert disk liquid entry");
261                        }
262                        Err(CacheFull) => self.drop_memory_entry(entry_id, &batch),
263                    }
264                }
265                CacheEntry::MemorySqueezedLiquid(array) => {
266                    // We don't have to do anything, because it's already on disk
267                    let disk_entry = Self::disk_entry_from_squeezed(array);
268                    self.try_insert(entry_id, disk_entry)
269                        .expect("failed to insert disk entry");
270                }
271                CacheEntry::DiskArrow { .. } | CacheEntry::DiskLiquid { .. } => {
272                    // Already on disk, skip
273                }
274            }
275        }
276        Ok(())
277    }
278}
279
280impl LiquidCache {
281    /// returns the batch that was written to disk
282    async fn write_in_memory_batch_to_disk(
283        &self,
284        entry_id: EntryID,
285        batch: CacheEntry,
286    ) -> Result<CacheEntry, CacheFull> {
287        match &batch {
288            batch @ CacheEntry::MemoryArrow(_) => {
289                let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(DefaultSqueezeIo::new(
290                    self.store.clone(),
291                    entry_id,
292                    self.observer.clone(),
293                ));
294                let outcome = self.squeeze_policy.squeeze(
295                    batch,
296                    self.metadata.get_compressor(&entry_id).as_ref(),
297                    None,
298                    &squeeze_io,
299                );
300                let SqueezeOutcome::Replace {
301                    entry: new_batch,
302                    bytes_to_write,
303                } = outcome
304                else {
305                    unreachable!("memory arrow squeeze cannot remove entry");
306                };
307                if let Some(bytes_to_write) = bytes_to_write {
308                    self.write_batch_to_disk(entry_id, &new_batch, bytes_to_write)
309                        .await?;
310                }
311                Ok(new_batch)
312            }
313            CacheEntry::MemoryLiquid(liquid_array) => {
314                let liquid_bytes = Bytes::from(liquid_array.to_bytes());
315                let disk_bytes = liquid_bytes.len();
316                self.write_batch_to_disk(entry_id, &batch, liquid_bytes)
317                    .await?;
318                Ok(CacheEntry::disk_liquid(
319                    liquid_array.original_arrow_data_type(),
320                    disk_bytes,
321                ))
322            }
323            CacheEntry::MemorySqueezedLiquid(squeezed_array) => {
324                // The full data is already on disk, so we just need to mark ourself as disk entry
325                let data_type = squeezed_array.original_arrow_data_type();
326                let entry = match squeezed_array.disk_backing() {
327                    SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
328                    SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
329                };
330                Ok(entry)
331            }
332            CacheEntry::DiskLiquid { .. } | CacheEntry::DiskArrow { .. } => {
333                unreachable!("Unexpected batch in write_in_memory_batch_to_disk")
334            }
335        }
336    }
337
338    /// Insert a batch into the cache, it will run cache replacement policy until the batch is inserted.
339    pub(crate) async fn insert_inner(
340        &self,
341        entry_id: EntryID,
342        mut batch_to_cache: CacheEntry,
343    ) -> Result<(), CacheFull> {
344        loop {
345            let Err(not_inserted) = self.try_insert(entry_id, batch_to_cache) else {
346                return Ok(());
347            };
348            self.trace(InternalEvent::InsertFailed {
349                entry: entry_id,
350                kind: CachedBatchType::from(&not_inserted),
351            });
352
353            let victims = self.cache_policy.find_memory_victim(8);
354            if victims.is_empty() {
355                // no advice, because the cache is already empty
356                // this can happen if the entry to be inserted is too large, in that case,
357                // we write it to disk
358                let on_disk_batch = self
359                    .write_in_memory_batch_to_disk(entry_id, not_inserted)
360                    .await?;
361                batch_to_cache = on_disk_batch;
362                continue;
363            }
364            self.squeeze_victims(victims).await?;
365
366            batch_to_cache = not_inserted;
367            crate::utils::yield_now_if_shuttle();
368        }
369    }
370
371    /// Create a new instance of CacheStorage.
372    #[allow(clippy::too_many_arguments)]
373    pub(crate) fn new(
374        batch_size: usize,
375        max_memory_bytes: usize,
376        max_disk_bytes: usize,
377        squeeze_policy: Box<dyn SqueezePolicy>,
378        cache_policy: Box<dyn CachePolicy>,
379        hydration_policy: Box<dyn HydrationPolicy>,
380        metadata: Arc<dyn EntryMetadata>,
381        store: t4::Store,
382        squeeze_victims_concurrently: bool,
383    ) -> Self {
384        let config = CacheConfig::new(batch_size, max_memory_bytes, max_disk_bytes);
385        let observer = Arc::new(Observer::new());
386        Self {
387            index: ArtIndex::new(),
388            budget: BudgetAccounting::new(
389                config.max_memory_bytes(),
390                config.max_disk_bytes(),
391                observer.clone(),
392            ),
393            config,
394            cache_policy,
395            hydration_policy,
396            squeeze_policy,
397            observer,
398            metadata,
399            store,
400            squeeze_victims_concurrently,
401        }
402    }
403
404    fn try_insert(&self, entry_id: EntryID, to_insert: CacheEntry) -> Result<(), CacheEntry> {
405        let new_memory_size = to_insert.memory_usage_bytes();
406        let cached_batch_type = if let Some(entry) = self.index.get(&entry_id) {
407            let old_memory_size = entry.memory_usage_bytes();
408            if self
409                .budget
410                .try_update_memory_usage(old_memory_size, new_memory_size)
411                .is_err()
412            {
413                return Err(to_insert);
414            }
415            let batch_type = CachedBatchType::from(&to_insert);
416            self.index.insert(&entry_id, to_insert);
417            batch_type
418        } else {
419            if self.budget.try_reserve_memory(new_memory_size).is_err() {
420                return Err(to_insert);
421            }
422            let batch_type = CachedBatchType::from(&to_insert);
423            self.index.insert(&entry_id, to_insert);
424            batch_type
425        };
426
427        self.trace(InternalEvent::InsertSuccess {
428            entry: entry_id,
429            kind: cached_batch_type,
430        });
431        self.cache_policy
432            .notify_insert(&entry_id, cached_batch_type);
433
434        Ok(())
435    }
436
437    fn drop_memory_entry(&self, entry_id: EntryID, _expected: &CacheEntry) {
438        let Some(removed) = self.index.remove(&entry_id) else {
439            return;
440        };
441        assert!(
442            matches!(
443                removed.as_ref(),
444                CacheEntry::MemoryArrow(_)
445                    | CacheEntry::MemoryLiquid(_)
446                    | CacheEntry::MemorySqueezedLiquid(_)
447            ),
448            "flush should only drop memory entries"
449        );
450        self.budget
451            .try_update_memory_usage(removed.memory_usage_bytes(), 0)
452            .expect("memory release cannot fail");
453        self.cache_policy.notify_remove(&entry_id);
454    }
455
456    async fn remove_disk_entry(&self, entry_id: EntryID) {
457        let Some(removed) = self.index.remove(&entry_id) else {
458            return;
459        };
460        let disk_bytes = match removed.as_ref() {
461            CacheEntry::DiskLiquid { disk_bytes, .. }
462            | CacheEntry::DiskArrow { disk_bytes, .. } => *disk_bytes,
463            _ => panic!("remove_disk_entry called for non-disk entry"),
464        };
465        self.store
466            .remove(&entry_id_to_key(&entry_id))
467            .await
468            .expect("disk remove failed");
469        self.budget.release_disk(disk_bytes);
470        self.cache_policy.notify_remove(&entry_id);
471        self.trace(InternalEvent::DiskEvict {
472            entry: entry_id,
473            bytes: disk_bytes,
474        });
475    }
476
477    /// Consume the trace of the cache, for testing only.
478    pub fn consume_event_trace(&self) -> EventTrace {
479        self.observer.consume_event_trace()
480    }
481
482    pub(crate) fn trace(&self, event: InternalEvent) {
483        self.observer.record_internal(event);
484    }
485
486    /// Get the index of the cache.
487    #[cfg(test)]
488    pub(crate) fn index(&self) -> &ArtIndex {
489        &self.index
490    }
491
492    #[fastrace::trace]
493    async fn squeeze_victims(&self, victims: Vec<EntryID>) -> Result<(), CacheFull> {
494        self.trace(InternalEvent::SqueezeBegin {
495            victims: victims.clone(),
496        });
497        if self.squeeze_victims_concurrently {
498            let results = futures::stream::iter(victims)
499                .map(|victim| self.squeeze_victim_inner(victim))
500                .buffer_unordered(usize::MAX)
501                .collect::<Vec<_>>()
502                .await;
503            results.into_iter().collect::<Result<Vec<_>, _>>()?;
504        } else {
505            for victim in victims {
506                self.squeeze_victim_inner(victim).await?;
507            }
508        }
509        Ok(())
510    }
511
512    async fn squeeze_victim_inner(&self, to_squeeze: EntryID) -> Result<(), CacheFull> {
513        let Some(mut to_squeeze_batch) = self.index.get(&to_squeeze) else {
514            return Ok(());
515        };
516        self.trace(InternalEvent::SqueezeVictim { entry: to_squeeze });
517        let compressor = self.metadata.get_compressor(&to_squeeze);
518        let squeeze_hint_arc = self.metadata.squeeze_hint(&to_squeeze);
519        let squeeze_hint = squeeze_hint_arc.as_deref();
520        let squeeze_io: Arc<dyn SqueezeIoHandler> = Arc::new(DefaultSqueezeIo::new(
521            self.store.clone(),
522            to_squeeze,
523            self.observer.clone(),
524        ));
525
526        loop {
527            let outcome = self.squeeze_policy.squeeze(
528                to_squeeze_batch.as_ref(),
529                compressor.as_ref(),
530                squeeze_hint,
531                &squeeze_io,
532            );
533
534            match outcome {
535                SqueezeOutcome::Replace {
536                    entry: new_batch,
537                    bytes_to_write,
538                } => {
539                    if let Some(bytes_to_write) = bytes_to_write {
540                        self.write_batch_to_disk(to_squeeze, &new_batch, bytes_to_write)
541                            .await?;
542                    }
543                    match self.try_insert(to_squeeze, new_batch) {
544                        Ok(()) => {
545                            break;
546                        }
547                        Err(batch) => {
548                            to_squeeze_batch = Arc::new(batch);
549                        }
550                    }
551                }
552                SqueezeOutcome::Remove => {
553                    self.remove_disk_entry(to_squeeze).await;
554                    break;
555                }
556            }
557        }
558        Ok(())
559    }
560
561    fn disk_entry_from_squeezed(array: &LiquidSqueezedArrayRef) -> CacheEntry {
562        let data_type = array.original_arrow_data_type();
563        match array.disk_backing() {
564            SqueezedBacking::Liquid(n) => CacheEntry::disk_liquid(data_type, n),
565            SqueezedBacking::Arrow(n) => CacheEntry::disk_arrow(data_type, n),
566        }
567    }
568
569    async fn maybe_hydrate(
570        &self,
571        entry_id: &EntryID,
572        cached: &CacheEntry,
573        materialized: MaterializedEntry<'_>,
574        expression: Option<&CacheExpression>,
575    ) {
576        let compressor = self.metadata.get_compressor(entry_id);
577        if let Some(new_entry) = self.hydration_policy.hydrate(&HydrationRequest {
578            entry_id: *entry_id,
579            cached,
580            materialized,
581            expression,
582            compressor,
583        }) {
584            let cached_type = CachedBatchType::from(cached);
585            let new_type = CachedBatchType::from(&new_entry);
586            self.trace(InternalEvent::Hydrate {
587                entry: *entry_id,
588                cached: cached_type,
589                new: new_type,
590            });
591            let _ = self.insert_inner(*entry_id, new_entry).await;
592        }
593    }
594
595    pub(crate) async fn read_arrow_array(
596        &self,
597        entry_id: &EntryID,
598        selection: Option<&BooleanBuffer>,
599        expression: Option<&CacheExpression>,
600    ) -> Option<ArrayRef> {
601        use arrow::array::BooleanArray;
602
603        let batch = self.index.get(entry_id)?;
604        self.cache_policy
605            .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
606        self.trace(InternalEvent::Read {
607            entry: *entry_id,
608            selection: selection.is_some(),
609            expr: expression.cloned(),
610            cached: CachedBatchType::from(batch.as_ref()),
611        });
612
613        match batch.as_ref() {
614            CacheEntry::MemoryArrow(array) => match selection {
615                Some(selection) => {
616                    let selection_array = BooleanArray::new(selection.clone(), None);
617                    arrow::compute::filter(array, &selection_array).ok()
618                }
619                None => Some(array.clone()),
620            },
621            CacheEntry::MemoryLiquid(array) => match selection {
622                Some(selection) => Some(array.filter(selection)),
623                None => Some(array.to_arrow_array()),
624            },
625            CacheEntry::DiskArrow { .. } | CacheEntry::DiskLiquid { .. } => {
626                self.read_disk_array(batch.as_ref(), entry_id, expression, selection)
627                    .await
628            }
629            CacheEntry::MemorySqueezedLiquid(array) => {
630                self.read_squeezed_array(array, entry_id, expression, selection)
631                    .await
632            }
633        }
634    }
635
636    async fn read_disk_array(
637        &self,
638        entry: &CacheEntry,
639        entry_id: &EntryID,
640        expression: Option<&CacheExpression>,
641        selection: Option<&BooleanBuffer>,
642    ) -> Option<ArrayRef> {
643        match entry {
644            CacheEntry::DiskArrow { data_type, .. } => {
645                if let Some(selection) = selection
646                    && selection.count_set_bits() == 0
647                {
648                    return Some(arrow::array::new_empty_array(data_type));
649                }
650                let full_array = self.read_disk_arrow_array(entry_id).await;
651                self.maybe_hydrate(
652                    entry_id,
653                    entry,
654                    MaterializedEntry::Arrow(&full_array),
655                    expression,
656                )
657                .await;
658                match selection {
659                    Some(selection) => {
660                        let selection_array = BooleanArray::new(selection.clone(), None);
661                        arrow::compute::filter(&full_array, &selection_array).ok()
662                    }
663                    None => Some(full_array),
664                }
665            }
666            CacheEntry::DiskLiquid { data_type, .. } => {
667                if let Some(selection) = selection
668                    && selection.count_set_bits() == 0
669                {
670                    return Some(arrow::array::new_empty_array(data_type));
671                }
672                let liquid = self.read_disk_liquid_array(entry_id).await;
673                self.maybe_hydrate(
674                    entry_id,
675                    entry,
676                    MaterializedEntry::Liquid(&liquid),
677                    expression,
678                )
679                .await;
680                match selection {
681                    Some(selection) => Some(liquid.filter(selection)),
682                    None => Some(liquid.to_arrow_array()),
683                }
684            }
685            _ => unreachable!("Unexpected batch in read_disk_array"),
686        }
687    }
688
689    async fn read_squeezed_array(
690        &self,
691        array: &LiquidSqueezedArrayRef,
692        entry_id: &EntryID,
693        expression: Option<&CacheExpression>,
694        selection: Option<&BooleanBuffer>,
695    ) -> Option<ArrayRef> {
696        if let Some(array) = self.try_read_squeezed_date32_array(array, expression, selection) {
697            self.observer.on_get_squeezed_success();
698            self.trace(InternalEvent::ReadSqueezedData {
699                entry: *entry_id,
700                expression: expression.unwrap().clone(),
701            });
702            return Some(array);
703        }
704
705        if let Some(array) = self
706            .try_read_squeezed_variant_array(array, entry_id, expression, selection)
707            .await
708        {
709            self.observer.on_get_squeezed_success();
710            self.trace(InternalEvent::ReadSqueezedData {
711                entry: *entry_id,
712                expression: expression.unwrap().clone(),
713            });
714            return Some(array);
715        }
716
717        // no shortcut, needs to read full data
718        let out = match selection {
719            Some(selection) => array.filter(selection).await,
720            None => array.to_arrow_array().await,
721        };
722        Some(out)
723    }
724
725    fn try_read_squeezed_date32_array(
726        &self,
727        array: &LiquidSqueezedArrayRef,
728        expression: Option<&CacheExpression>,
729        selection: Option<&BooleanBuffer>,
730    ) -> Option<ArrayRef> {
731        if let Some(CacheExpression::ExtractDate32 { field }) = expression
732            && let Some(squeezed) = array.as_any().downcast_ref::<SqueezedDate32Array>()
733            && squeezed.field() == *field
734        {
735            let component = squeezed.to_component_array();
736            self.observer.on_hit_date32_expression();
737            if let Some(selection) = selection {
738                let selection_array = BooleanArray::new(selection.clone(), None);
739                let filtered = arrow::compute::filter(&component, &selection_array).ok()?;
740                return Some(filtered);
741            }
742            return Some(component);
743        }
744        None
745    }
746
747    async fn try_read_squeezed_variant_array(
748        &self,
749        array: &LiquidSqueezedArrayRef,
750        entry_id: &EntryID,
751        expression: Option<&CacheExpression>,
752        selection: Option<&BooleanBuffer>,
753    ) -> Option<ArrayRef> {
754        let requests = expression.and_then(|expr| expr.variant_requests())?;
755        let variant_squeezed = array
756            .as_any()
757            .downcast_ref::<VariantStructSqueezedArray>()?;
758        let all_paths_present = requests
759            .iter()
760            .all(|request| variant_squeezed.contains_path(request.path()));
761
762        let full_array = if !all_paths_present {
763            let batch = CacheEntry::MemorySqueezedLiquid(array.clone());
764            self.observer.on_get_squeezed_needs_io();
765            let full_array = self.read_disk_arrow_array(entry_id).await;
766            self.maybe_hydrate(
767                entry_id,
768                &batch,
769                MaterializedEntry::Arrow(&full_array),
770                expression,
771            )
772            .await;
773            full_array
774        } else {
775            let requested_paths = requests.iter().map(|r| r.path());
776            variant_squeezed
777                .to_arrow_array_with_paths(requested_paths)
778                .unwrap()
779        };
780
781        match selection {
782            Some(selection) => {
783                let selection_array = BooleanArray::new(selection.clone(), None);
784                arrow::compute::filter(&full_array, &selection_array).ok()
785            }
786            None => Some(full_array),
787        }
788    }
789
790    async fn write_batch_to_disk(
791        &self,
792        entry_id: EntryID,
793        batch: &CacheEntry,
794        bytes: Bytes,
795    ) -> Result<(), CacheFull> {
796        let len = bytes.len();
797        loop {
798            if self.budget.try_reserve_disk(len).is_ok() {
799                break;
800            }
801            let victims = self.cache_policy.find_disk_victim(8);
802            if victims.is_empty() {
803                return Err(CacheFull);
804            }
805            for victim in victims {
806                self.remove_disk_entry(victim).await;
807            }
808        }
809        self.trace(InternalEvent::IoWrite {
810            entry: entry_id,
811            kind: CachedBatchType::from(batch),
812            bytes: len,
813        });
814        self.store
815            .put(entry_id_to_key(&entry_id), bytes.to_vec())
816            .await
817            .expect("write failed");
818        Ok(())
819    }
820
821    async fn read_disk_arrow_array(&self, entry_id: &EntryID) -> ArrayRef {
822        let bytes = self
823            .store
824            .get(&entry_id_to_key(entry_id))
825            .await
826            .expect("read failed");
827        let bytes_len = bytes.len();
828        let cursor = std::io::Cursor::new(bytes);
829        let mut reader =
830            arrow::ipc::reader::StreamReader::try_new(cursor, None).expect("create reader failed");
831        let batch = reader.next().unwrap().expect("read batch failed");
832        let array = batch.column(0).clone();
833        self.trace(InternalEvent::IoReadArrow {
834            entry: *entry_id,
835            bytes: bytes_len,
836        });
837        array
838    }
839
840    async fn read_disk_liquid_array(
841        &self,
842        entry_id: &EntryID,
843    ) -> crate::liquid_array::LiquidArrayRef {
844        let bytes = self
845            .store
846            .get(&entry_id_to_key(entry_id))
847            .await
848            .expect("read failed");
849        self.trace(InternalEvent::IoReadLiquid {
850            entry: *entry_id,
851            bytes: bytes.len(),
852        });
853        let compressor_states = self.metadata.get_compressor(entry_id);
854        let compressor = compressor_states.fsst_compressor();
855
856        (crate::liquid_array::ipc::read_from_bytes(
857            Bytes::from(bytes),
858            &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
859        )) as _
860    }
861
862    pub(crate) async fn eval_predicate_internal(
863        &self,
864        entry_id: &EntryID,
865        selection_opt: Option<&BooleanBuffer>,
866        predicate: &LiquidExpr,
867    ) -> Option<BooleanArray> {
868        use arrow::array::BooleanArray;
869
870        self.observer.on_eval_predicate();
871        let batch = self.index.get(entry_id)?;
872        self.cache_policy
873            .notify_access(entry_id, CachedBatchType::from(batch.as_ref()));
874        self.trace(InternalEvent::EvalPredicate {
875            entry: *entry_id,
876            selection: selection_opt.is_some(),
877            cached: CachedBatchType::from(batch.as_ref()),
878        });
879
880        match batch.as_ref() {
881            CacheEntry::MemoryArrow(array) => {
882                let mut owned = None;
883                let selection = selection_opt.unwrap_or_else(|| {
884                    owned = Some(BooleanBuffer::new_set(array.len()));
885                    owned.as_ref().unwrap()
886                });
887                let selection_array = BooleanArray::new(selection.clone(), None);
888                let filtered = arrow::compute::filter(array, &selection_array)
889                    .expect("selection must match array length");
890                Some(self.eval_predicate_on_array(filtered, predicate))
891            }
892            entry @ CacheEntry::DiskArrow { .. } => {
893                let array = self.read_disk_arrow_array(entry_id).await;
894                self.maybe_hydrate(entry_id, entry, MaterializedEntry::Arrow(&array), None)
895                    .await;
896                let mut owned = None;
897                let selection = selection_opt.unwrap_or_else(|| {
898                    owned = Some(BooleanBuffer::new_set(array.len()));
899                    owned.as_ref().unwrap()
900                });
901                let selection_array = BooleanArray::new(selection.clone(), None);
902                let filtered = arrow::compute::filter(&array, &selection_array)
903                    .expect("selection must match array length");
904                Some(self.eval_predicate_on_array(filtered, predicate))
905            }
906            CacheEntry::MemoryLiquid(array) => {
907                let mut owned = None;
908                let selection = selection_opt.unwrap_or_else(|| {
909                    owned = Some(BooleanBuffer::new_set(array.len()));
910                    owned.as_ref().unwrap()
911                });
912                Some(array.try_eval_predicate(predicate, selection))
913            }
914            entry @ CacheEntry::DiskLiquid { .. } => {
915                let liquid = self.read_disk_liquid_array(entry_id).await;
916                self.maybe_hydrate(entry_id, entry, MaterializedEntry::Liquid(&liquid), None)
917                    .await;
918                let mut owned = None;
919                let selection = selection_opt.unwrap_or_else(|| {
920                    owned = Some(BooleanBuffer::new_set(liquid.len()));
921                    owned.as_ref().unwrap()
922                });
923                Some(liquid.try_eval_predicate(predicate, selection))
924            }
925            CacheEntry::MemorySqueezedLiquid(array) => {
926                self.eval_predicate_on_squeezed(array, selection_opt, predicate)
927                    .await
928            }
929        }
930    }
931
932    async fn eval_predicate_on_squeezed(
933        &self,
934        array: &LiquidSqueezedArrayRef,
935        selection_opt: Option<&BooleanBuffer>,
936        predicate: &LiquidExpr,
937    ) -> Option<BooleanArray> {
938        let mut owned = None;
939        let selection = selection_opt.unwrap_or_else(|| {
940            owned = Some(BooleanBuffer::new_set(array.len()));
941            owned.as_ref().unwrap()
942        });
943        Some(array.try_eval_predicate(predicate, selection).await)
944    }
945
946    fn eval_predicate_on_array(&self, array: ArrayRef, predicate: &LiquidExpr) -> BooleanArray {
947        let schema = Arc::new(Schema::new(vec![Field::new(
948            "liquid_predicate_col",
949            array.data_type().clone(),
950            true,
951        )]));
952        let record_batch =
953            RecordBatch::try_new(schema, vec![array]).expect("single-column predicate batch");
954        let result = predicate
955            .physical_expr()
956            .evaluate(&record_batch)
957            .expect("validated LiquidExpr must evaluate");
958        let boolean_array = result
959            .into_array(record_batch.num_rows())
960            .expect("predicate output must be an array");
961        boolean_array.as_boolean().clone()
962    }
963}
964
965#[cfg(test)]
966mod tests {
967    use super::*;
968    use crate::cache::{
969        CacheEntry, CacheExpression, CachePolicy, LiquidCacheBuilder, LiquidPolicy,
970        TranscodeSqueezeEvict, transcode_liquid_inner,
971        utils::{
972            LiquidCompressorStates, arrow_to_bytes, create_cache_store, create_test_array,
973            create_test_arrow_array,
974        },
975    };
976    use crate::liquid_array::{
977        Date32Field, LiquidPrimitiveArray, LiquidSqueezedArrayRef, SqueezedDate32Array,
978    };
979    use crate::sync::thread;
980    use arrow::array::{Array, ArrayRef, Date32Array, Int32Array};
981    use arrow::datatypes::Date32Type;
982    use std::future::Future;
983    use std::sync::Arc;
984    use std::sync::atomic::{AtomicUsize, Ordering};
985
986    // Unified advice type for more concise testing
987    #[derive(Debug)]
988    struct TestPolicy {
989        target_id: Option<EntryID>,
990        advice_count: AtomicUsize,
991    }
992
993    impl TestPolicy {
994        fn new(target_id: Option<EntryID>) -> Self {
995            Self {
996                target_id,
997                advice_count: AtomicUsize::new(0),
998            }
999        }
1000    }
1001
1002    impl CachePolicy for TestPolicy {
1003        fn find_memory_victim(&self, _cnt: usize) -> Vec<EntryID> {
1004            self.advice_count.fetch_add(1, Ordering::SeqCst);
1005            let id_to_use = self.target_id.unwrap();
1006            vec![id_to_use]
1007        }
1008    }
1009
1010    #[tokio::test]
1011    async fn test_basic_cache_operations() {
1012        // Test basic insert, get, and size tracking in one test
1013        let budget_size = 10 * 1024;
1014        let store = create_cache_store(budget_size, Box::new(LiquidPolicy::new())).await;
1015
1016        // 1. Initial budget should be empty
1017        assert_eq!(store.budget.memory_usage_bytes(), 0);
1018
1019        // 2. Insert and verify first entry
1020        let entry_id1: EntryID = EntryID::from(1);
1021        let array1 = create_test_array(100);
1022        let size1 = array1.memory_usage_bytes();
1023        store.insert_inner(entry_id1, array1).await.unwrap();
1024
1025        // Verify budget usage and data correctness
1026        assert_eq!(store.budget.memory_usage_bytes(), size1);
1027        let retrieved1 = store.index().get(&entry_id1).unwrap();
1028        match retrieved1.as_ref() {
1029            CacheEntry::MemoryArrow(arr) => assert_eq!(arr.len(), 100),
1030            _ => panic!("Expected ArrowMemory"),
1031        }
1032
1033        let entry_id2: EntryID = EntryID::from(2);
1034        let array2 = create_test_array(200);
1035        let size2 = array2.memory_usage_bytes();
1036        store.insert_inner(entry_id2, array2).await.unwrap();
1037
1038        assert_eq!(store.budget.memory_usage_bytes(), size1 + size2);
1039
1040        let array3 = create_test_array(150);
1041        let size3 = array3.memory_usage_bytes();
1042        store.insert_inner(entry_id1, array3).await.unwrap();
1043
1044        assert_eq!(store.budget.memory_usage_bytes(), size3 + size2);
1045        assert!(store.index().get(&EntryID::from(999)).is_none());
1046    }
1047
1048    #[tokio::test]
1049    async fn get_arrow_array_with_expression_extracts_year() {
1050        let store = create_cache_store(1 << 20, Box::new(LiquidPolicy::new())).await;
1051        let entry_id = EntryID::from(42);
1052
1053        let date_values = Date32Array::from(vec![Some(2), Some(365 + 1), None, Some(365 + 100)]);
1054        let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(date_values.clone());
1055        let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, Date32Field::Year);
1056        let squeezed: LiquidSqueezedArrayRef = Arc::new(squeezed);
1057
1058        store
1059            .insert_inner(
1060                entry_id,
1061                CacheEntry::memory_squeezed_liquid(squeezed.clone()),
1062            )
1063            .await
1064            .unwrap();
1065
1066        let expr = Arc::new(CacheExpression::extract_date32(Date32Field::Year));
1067        let result = store
1068            .get(&entry_id)
1069            .with_expression_hint(expr)
1070            .read()
1071            .await
1072            .expect("array present");
1073
1074        let result = result
1075            .as_any()
1076            .downcast_ref::<Date32Array>()
1077            .expect("date32 result");
1078        assert_eq!(result.len(), 4);
1079        assert_eq!(result.value(0), 0);
1080        assert_eq!(result.value(1), 365);
1081        assert!(result.is_null(2));
1082        assert_eq!(result.value(3), 365);
1083    }
1084
1085    #[tokio::test]
1086    async fn test_cache_advice_strategies() {
1087        // Comprehensive test of all three advice types
1088
1089        // Create entry IDs we'll use throughout the test
1090        let entry_id1 = EntryID::from(1);
1091        let entry_id2 = EntryID::from(2);
1092
1093        // 1. Test EVICT advice
1094        {
1095            let advisor = TestPolicy::new(Some(entry_id1));
1096            let store = create_cache_store(8000, Box::new(advisor)).await; // Small budget to force advice
1097
1098            store
1099                .insert_inner(entry_id1, create_test_array(800))
1100                .await
1101                .unwrap();
1102            match store.index().get(&entry_id1).unwrap().as_ref() {
1103                CacheEntry::MemoryArrow(_) => {}
1104                other => panic!("Expected ArrowMemory, got {other:?}"),
1105            }
1106
1107            store
1108                .insert_inner(entry_id2, create_test_array(800))
1109                .await
1110                .unwrap();
1111            match store.index().get(&entry_id1).unwrap().as_ref() {
1112                CacheEntry::MemoryLiquid(_) => {}
1113                other => panic!("Expected LiquidMemory after eviction, got {other:?}"),
1114            }
1115        }
1116    }
1117
1118    #[tokio::test]
1119    async fn test_concurrent_cache_operations() {
1120        concurrent_cache_operations().await;
1121    }
1122
1123    // #[cfg(feature = "shuttle")]
1124    // #[test]
1125    // fn shuttle_cache_operations() {
1126    //     crate::utils::shuttle_test(|| {
1127    //         block_on(concurrent_cache_operations());
1128    //     });
1129    // }
1130
1131    pub fn block_on<F: Future>(future: F) -> F::Output {
1132        #[cfg(feature = "shuttle")]
1133        {
1134            shuttle::future::block_on(future)
1135        }
1136        #[cfg(not(feature = "shuttle"))]
1137        {
1138            tokio_test::block_on(future)
1139        }
1140    }
1141
1142    async fn concurrent_cache_operations() {
1143        let num_threads = 3;
1144        let ops_per_thread = 50;
1145
1146        let budget_size = num_threads * ops_per_thread * 100 * 8 / 2;
1147        let store = create_cache_store(budget_size, Box::new(LiquidPolicy::new())).await;
1148
1149        let mut handles = vec![];
1150        for thread_id in 0..num_threads {
1151            let store = store.clone();
1152            handles.push(thread::spawn(move || {
1153                block_on(async {
1154                    for i in 0..ops_per_thread {
1155                        let unique_id = thread_id * ops_per_thread + i;
1156                        let entry_id: EntryID = EntryID::from(unique_id);
1157                        let array = create_test_arrow_array(100);
1158                        store.insert(entry_id, array).await.unwrap();
1159                    }
1160                });
1161            }));
1162        }
1163        for handle in handles {
1164            handle.join().unwrap();
1165        }
1166
1167        // Invariant 1: Every previously inserted entry can be retrieved
1168        for thread_id in 0..num_threads {
1169            for i in 0..ops_per_thread {
1170                let unique_id = thread_id * ops_per_thread + i;
1171                let entry_id: EntryID = EntryID::from(unique_id);
1172                assert!(store.index().get(&entry_id).is_some());
1173            }
1174        }
1175
1176        // Invariant 2: Number of entries matches number of insertions
1177        assert_eq!(store.index().keys().len(), num_threads * ops_per_thread);
1178    }
1179
1180    #[tokio::test]
1181    async fn test_cache_stats_memory_and_disk_usage() {
1182        // Build a small cache in blocking liquid mode to avoid background tasks
1183        let storage = LiquidCacheBuilder::new()
1184            .with_max_memory_bytes(10 * 1024 * 1024)
1185            .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1186            .build()
1187            .await;
1188
1189        // Insert two small batches
1190        let arr1: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
1191        let arr2: ArrayRef = Arc::new(Int32Array::from_iter_values(0..128));
1192        storage.insert(EntryID::from(1usize), arr1).await.unwrap();
1193        storage.insert(EntryID::from(2usize), arr2).await.unwrap();
1194
1195        // Stats after insert: 2 entries, memory usage > 0, disk usage == 0
1196        let s = storage.stats();
1197        assert_eq!(s.total_entries, 2);
1198        assert!(s.memory_usage_bytes > 0);
1199        assert_eq!(s.disk_usage_bytes, 0);
1200        assert_eq!(s.max_memory_bytes, 10 * 1024 * 1024);
1201
1202        // Flush to disk and verify memory usage drops and disk usage increases
1203        storage.flush_all_to_disk().await.unwrap();
1204        let s2 = storage.stats();
1205        assert_eq!(s2.total_entries, 2);
1206        assert!(s2.disk_usage_bytes > 0);
1207        // In-memory usage should be reduced after moving to on-disk formats
1208        assert!(s2.memory_usage_bytes <= s.memory_usage_bytes);
1209    }
1210
1211    #[tokio::test]
1212    async fn hydrate_disk_arrow_on_get_promotes_to_memory() {
1213        let store = create_cache_store(1 << 20, Box::new(LiquidPolicy::new())).await;
1214        let entry_id = EntryID::from(321usize);
1215        let array = create_test_arrow_array(8);
1216
1217        store.insert(entry_id, array.clone()).await.unwrap();
1218        store.flush_all_to_disk().await.unwrap();
1219        {
1220            let entry = store.index().get(&entry_id).unwrap();
1221            assert!(matches!(entry.as_ref(), CacheEntry::DiskArrow { .. }));
1222        }
1223
1224        let result = store.get(&entry_id).await.expect("present");
1225        assert_eq!(result.as_ref(), array.as_ref());
1226        {
1227            let entry = store.index().get(&entry_id).unwrap();
1228            assert!(matches!(entry.as_ref(), CacheEntry::MemoryArrow(_)));
1229        }
1230    }
1231
1232    #[tokio::test]
1233    async fn hydrate_disk_liquid_on_get_promotes_to_memory_liquid() {
1234        let store = create_cache_store(1 << 20, Box::new(LiquidPolicy::new())).await;
1235        let entry_id = EntryID::from(322usize);
1236        let arrow_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1237        let compressor = LiquidCompressorStates::new();
1238        let liquid = transcode_liquid_inner(&arrow_array, &compressor).unwrap();
1239
1240        store
1241            .insert_inner(entry_id, CacheEntry::memory_liquid(liquid.clone()))
1242            .await
1243            .unwrap();
1244        store.flush_all_to_disk().await.unwrap();
1245        {
1246            let entry = store.index().get(&entry_id).unwrap();
1247            assert!(matches!(entry.as_ref(), CacheEntry::DiskLiquid { .. }));
1248        }
1249
1250        let result = store.get(&entry_id).await.expect("present");
1251        assert_eq!(result.as_ref(), arrow_array.as_ref());
1252        {
1253            let entry = store.index().get(&entry_id).unwrap();
1254            assert!(matches!(entry.as_ref(), CacheEntry::MemoryLiquid(_)));
1255        }
1256    }
1257
1258    #[tokio::test]
1259    async fn insert_returns_cache_full_when_memory_and_disk_are_saturated() {
1260        let cache = LiquidCacheBuilder::new()
1261            .with_max_memory_bytes(0)
1262            .with_max_disk_bytes(0)
1263            .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1264            .build()
1265            .await;
1266        let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
1267
1268        let err = cache.insert(EntryID::from(900usize), array).await;
1269
1270        assert_eq!(err, Err(CacheFull));
1271        assert!(!cache.is_cached(&EntryID::from(900usize)));
1272    }
1273
1274    #[tokio::test]
1275    async fn insert_until_disk_full_then_evicts_oldest_disk_entry() {
1276        let first_array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
1277        let second_array: ArrayRef = Arc::new(Int32Array::from_iter_values(16..32));
1278        let first_bytes = arrow_to_bytes(&first_array).unwrap().len();
1279        let second_bytes = arrow_to_bytes(&second_array).unwrap().len();
1280        let cache = LiquidCacheBuilder::new()
1281            .with_max_memory_bytes(1 << 20)
1282            .with_max_disk_bytes(first_bytes.max(second_bytes))
1283            .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1284            .with_cache_policy(Box::new(LiquidPolicy::new()))
1285            .build()
1286            .await;
1287
1288        let first = EntryID::from(910usize);
1289        let second = EntryID::from(911usize);
1290        cache.insert(first, first_array).await.unwrap();
1291        cache.flush_all_to_disk().await.unwrap();
1292        assert!(cache.is_cached(&first));
1293
1294        cache.insert(second, second_array).await.unwrap();
1295        cache.flush_all_to_disk().await.unwrap();
1296
1297        assert!(!cache.is_cached(&first));
1298        assert!(matches!(
1299            cache.index().get(&second).unwrap().as_ref(),
1300            CacheEntry::DiskArrow { .. }
1301        ));
1302    }
1303
1304    #[tokio::test]
1305    async fn flush_all_to_disk_evicts_when_overflow() {
1306        let first_array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
1307        let second_array: ArrayRef = Arc::new(Int32Array::from_iter_values(16..32));
1308        let disk_bytes = arrow_to_bytes(&first_array).unwrap().len();
1309        let cache = LiquidCacheBuilder::new()
1310            .with_max_memory_bytes(1 << 20)
1311            .with_max_disk_bytes(disk_bytes)
1312            .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1313            .with_cache_policy(Box::new(LiquidPolicy::new()))
1314            .build()
1315            .await;
1316        let first = EntryID::from(912usize);
1317        let second = EntryID::from(913usize);
1318        cache.insert(first, first_array).await.unwrap();
1319        cache.flush_all_to_disk().await.unwrap();
1320        cache.insert(second, second_array).await.unwrap();
1321
1322        cache.flush_all_to_disk().await.unwrap();
1323
1324        assert!(!cache.is_cached(&first) || !cache.is_cached(&second));
1325    }
1326
1327    #[tokio::test]
1328    async fn disk_eviction_releases_budget() {
1329        let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
1330        let disk_bytes = arrow_to_bytes(&array).unwrap().len();
1331        let cache = LiquidCacheBuilder::new()
1332            .with_max_memory_bytes(1 << 20)
1333            .with_max_disk_bytes(disk_bytes)
1334            .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1335            .with_cache_policy(Box::new(LiquidPolicy::new()))
1336            .build()
1337            .await;
1338        let entry = EntryID::from(914usize);
1339        cache.insert(entry, array).await.unwrap();
1340        cache.flush_all_to_disk().await.unwrap();
1341        let before = cache.stats().disk_usage_bytes;
1342
1343        cache.remove_disk_entry(entry).await;
1344
1345        assert_eq!(cache.stats().disk_usage_bytes, before - disk_bytes);
1346        assert!(!cache.is_cached(&entry));
1347    }
1348
1349    #[tokio::test]
1350    async fn flush_all_to_disk_drops_entry_on_unrecoverable_overflow() {
1351        let cache = LiquidCacheBuilder::new()
1352            .with_max_memory_bytes(1 << 20)
1353            .with_max_disk_bytes(0)
1354            .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1355            .build()
1356            .await;
1357        let entry_id = EntryID::from(901usize);
1358        let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..16));
1359        cache.insert(entry_id, array).await.unwrap();
1360
1361        let result = cache.flush_all_to_disk().await;
1362
1363        assert_eq!(result, Ok(()));
1364        assert!(!cache.is_cached(&entry_id));
1365    }
1366}