liquid_cache_storage/cache/
core.rs

1use arrow::array::ArrayRef;
2use arrow::buffer::BooleanBuffer;
3use arrow_schema::ArrowError;
4use datafusion::physical_plan::PhysicalExpr;
5use std::path::PathBuf;
6use std::{fmt::Debug, path::Path};
7
8use super::{
9    budget::BudgetAccounting,
10    cache_policies::CachePolicy,
11    cached_batch::{CachedBatch, CachedBatchType, GetWithPredicateResult},
12    tracer::CacheTracer,
13    utils::CacheConfig,
14};
15use crate::cache::squeeze_policies::{SqueezePolicy, TranscodeSqueezeEvict};
16use crate::cache::stats::{CacheStats, RuntimeStats};
17use crate::cache::utils::{LiquidCompressorStates, arrow_to_bytes};
18use crate::cache::{index::ArtIndex, utils::EntryID};
19use crate::cache_policies::LiquidPolicy;
20use crate::sync::Arc;
21
22use bytes::Bytes;
23use std::ops::Range;
24
25/// A trait for objects that can handle IO operations for the cache.
26#[async_trait::async_trait]
27pub trait IoContext: Debug + Send + Sync {
28    /// Get the base directory for the cache eviction, i.e., evicted data will be written to this directory.
29    fn base_dir(&self) -> &Path;
30
31    /// Get the compressor for an entry.
32    fn get_compressor(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates>;
33
34    /// Get the path to the arrow file for an entry.
35    fn arrow_path(&self, entry_id: &EntryID) -> PathBuf;
36
37    /// Get the path to the liquid file for an entry.
38    fn liquid_path(&self, entry_id: &EntryID) -> PathBuf;
39
40    /// Read the entire file at the given path.
41    async fn read_file(&self, path: PathBuf) -> Result<Bytes, std::io::Error>;
42
43    /// Read a range of bytes from a file at the given path.
44    async fn read_range(&self, path: PathBuf, range: Range<u64>) -> Result<Bytes, std::io::Error>;
45
46    /// Write the entire buffer to a file at the given path.
47    async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error>;
48}
49
50/// A default implementation of [IoContext] that uses the default compressor.
51#[derive(Debug)]
52pub struct DefaultIoContext {
53    compressor_state: Arc<LiquidCompressorStates>,
54    base_dir: PathBuf,
55}
56
57impl DefaultIoContext {
58    /// Create a new instance of [DefaultIoContext].
59    pub fn new(base_dir: PathBuf) -> Self {
60        Self {
61            compressor_state: Arc::new(LiquidCompressorStates::new()),
62            base_dir,
63        }
64    }
65}
66
67#[async_trait::async_trait]
68impl IoContext for DefaultIoContext {
69    fn base_dir(&self) -> &Path {
70        &self.base_dir
71    }
72
73    fn get_compressor(&self, _entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
74        self.compressor_state.clone()
75    }
76
77    fn arrow_path(&self, entry_id: &EntryID) -> PathBuf {
78        self.base_dir()
79            .join(format!("{:016x}.arrow", usize::from(*entry_id)))
80    }
81
82    fn liquid_path(&self, entry_id: &EntryID) -> PathBuf {
83        self.base_dir()
84            .join(format!("{:016x}.liquid", usize::from(*entry_id)))
85    }
86
87    async fn read_file(&self, path: PathBuf) -> Result<Bytes, std::io::Error> {
88        use tokio::io::AsyncReadExt;
89        let mut file = tokio::fs::File::open(path).await?;
90        let mut bytes = Vec::new();
91        file.read_to_end(&mut bytes).await?;
92        Ok(Bytes::from(bytes))
93    }
94
95    async fn read_range(&self, path: PathBuf, range: Range<u64>) -> Result<Bytes, std::io::Error> {
96        use tokio::io::{AsyncReadExt, AsyncSeekExt};
97        let mut file = tokio::fs::File::open(path).await?;
98        let len = (range.end - range.start) as usize;
99        let mut bytes = vec![0u8; len];
100        file.seek(tokio::io::SeekFrom::Start(range.start)).await?;
101        file.read_exact(&mut bytes).await?;
102        Ok(Bytes::from(bytes))
103    }
104
105    async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error> {
106        use tokio::io::AsyncWriteExt;
107        let mut file = tokio::fs::File::create(path).await?;
108        file.write_all(&data).await?;
109        Ok(())
110    }
111}
112
113/// A blocking implementation of [IoContext] that uses the default compressor.
114/// This is used for testing purposes as all io operations are blocking.
115#[derive(Debug)]
116pub struct BlockingIoContext {
117    compressor_state: Arc<LiquidCompressorStates>,
118    base_dir: PathBuf,
119}
120
121impl BlockingIoContext {
122    /// Create a new instance of [BlockingIoContext].
123    pub fn new(base_dir: PathBuf) -> Self {
124        Self {
125            compressor_state: Arc::new(LiquidCompressorStates::new()),
126            base_dir,
127        }
128    }
129}
130
131#[async_trait::async_trait]
132impl IoContext for BlockingIoContext {
133    fn base_dir(&self) -> &Path {
134        &self.base_dir
135    }
136
137    fn get_compressor(&self, _entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
138        self.compressor_state.clone()
139    }
140
141    fn arrow_path(&self, entry_id: &EntryID) -> PathBuf {
142        self.base_dir()
143            .join(format!("{:016x}.arrow", usize::from(*entry_id)))
144    }
145
146    fn liquid_path(&self, entry_id: &EntryID) -> PathBuf {
147        self.base_dir()
148            .join(format!("{:016x}.liquid", usize::from(*entry_id)))
149    }
150
151    async fn read_file(&self, path: PathBuf) -> Result<Bytes, std::io::Error> {
152        let mut file = std::fs::File::open(path)?;
153        let mut bytes = Vec::new();
154        std::io::Read::read_to_end(&mut file, &mut bytes)?;
155        Ok(Bytes::from(bytes))
156    }
157
158    async fn read_range(&self, path: PathBuf, range: Range<u64>) -> Result<Bytes, std::io::Error> {
159        let mut file = std::fs::File::open(path)?;
160        let len = (range.end - range.start) as usize;
161        let mut bytes = vec![0u8; len];
162        std::io::Seek::seek(&mut file, std::io::SeekFrom::Start(range.start))?;
163        std::io::Read::read_exact(&mut file, &mut bytes)?;
164        Ok(Bytes::from(bytes))
165    }
166
167    async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error> {
168        let mut file = std::fs::File::create(path)?;
169        std::io::Write::write_all(&mut file, &data)?;
170        Ok(())
171    }
172}
173
174// CacheStats and RuntimeStats moved to stats.rs
175
176/// Builder for [CacheStorage].
177///
178/// Example:
179/// ```rust
180/// use liquid_cache_storage::cache::CacheStorageBuilder;
181/// use liquid_cache_storage::cache_policies::LiquidPolicy;
182///
183///
184/// let _storage = CacheStorageBuilder::new()
185///     .with_batch_size(8192)
186///     .with_max_cache_bytes(1024 * 1024 * 1024)
187///     .with_cache_policy(Box::new(LiquidPolicy::new()))
188///     .build();
189/// ```
190pub struct CacheStorageBuilder {
191    batch_size: usize,
192    max_cache_bytes: usize,
193    cache_dir: Option<PathBuf>,
194    cache_policy: Box<dyn CachePolicy>,
195    squeeze_policy: Box<dyn SqueezePolicy>,
196    io_worker: Option<Arc<dyn IoContext>>,
197}
198
199impl Default for CacheStorageBuilder {
200    fn default() -> Self {
201        Self::new()
202    }
203}
204
205impl CacheStorageBuilder {
206    /// Create a new instance of CacheStorageBuilder.
207    pub fn new() -> Self {
208        Self {
209            batch_size: 8192,
210            max_cache_bytes: 1024 * 1024 * 1024,
211            cache_dir: None,
212            cache_policy: Box::new(LiquidPolicy::new()),
213            squeeze_policy: Box::new(TranscodeSqueezeEvict),
214            io_worker: None,
215        }
216    }
217
218    /// Set the cache directory for the cache.
219    /// Default is a temporary directory.
220    pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self {
221        self.cache_dir = Some(cache_dir);
222        self
223    }
224
225    /// Set the batch size for the cache.
226    /// Default is 8192.
227    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
228        self.batch_size = batch_size;
229        self
230    }
231
232    /// Set the max cache bytes for the cache.
233    /// Default is 1GB.
234    pub fn with_max_cache_bytes(mut self, max_cache_bytes: usize) -> Self {
235        self.max_cache_bytes = max_cache_bytes;
236        self
237    }
238
239    /// Set the cache policy for the cache.
240    /// Default is [LiquidPolicy].
241    pub fn with_cache_policy(mut self, policy: Box<dyn CachePolicy>) -> Self {
242        self.cache_policy = policy;
243        self
244    }
245
246    /// Set the squeeze policy for the cache.
247    /// Default is [TranscodeSqueezeEvict].
248    pub fn with_squeeze_policy(mut self, policy: Box<dyn SqueezePolicy>) -> Self {
249        self.squeeze_policy = policy;
250        self
251    }
252
253    /// Set the io worker for the cache.
254    /// Default is [DefaultIoContext].
255    pub fn with_io_worker(mut self, io_worker: Arc<dyn IoContext>) -> Self {
256        self.io_worker = Some(io_worker);
257        self
258    }
259
260    /// Build the cache storage.
261    ///
262    /// The cache storage is wrapped in an [Arc] to allow for concurrent access.
263    pub fn build(self) -> Arc<CacheStorage> {
264        let cache_dir = self
265            .cache_dir
266            .unwrap_or_else(|| tempfile::tempdir().unwrap().keep());
267        let io_worker = self
268            .io_worker
269            .unwrap_or_else(|| Arc::new(DefaultIoContext::new(cache_dir.clone())));
270        Arc::new(CacheStorage::new(
271            self.batch_size,
272            self.max_cache_bytes,
273            cache_dir,
274            self.squeeze_policy,
275            self.cache_policy,
276            io_worker,
277        ))
278    }
279}
280
281/// Cache storage for liquid cache.
282///
283/// Example (async read):
284/// ```rust
285/// use liquid_cache_storage::cache::{CacheStorageBuilder, EntryID};
286/// use arrow::array::UInt64Array;
287/// use std::sync::Arc;
288///
289/// tokio_test::block_on(async {
290/// let storage = CacheStorageBuilder::new().build();
291///
292/// let entry_id = EntryID::from(0);
293/// let arrow_array = Arc::new(UInt64Array::from_iter_values(0..32));
294/// storage.insert(entry_id, arrow_array.clone()).await;
295///
296/// // Get the arrow array back asynchronously
297/// let retrieved = storage.get_arrow_array(&entry_id).await.unwrap();
298/// assert_eq!(retrieved.as_ref(), arrow_array.as_ref());
299/// });
300/// ```
301#[derive(Debug)]
302pub struct CacheStorage {
303    index: ArtIndex,
304    config: CacheConfig,
305    budget: BudgetAccounting,
306    cache_policy: Box<dyn CachePolicy>,
307    squeeze_policy: Box<dyn SqueezePolicy>,
308    tracer: CacheTracer,
309    io_context: Arc<dyn IoContext>,
310    runtime_stats: RuntimeStats,
311}
312
313impl CacheStorage {
314    /// Return current cache statistics: counts and resource usage.
315    pub fn stats(&self) -> CacheStats {
316        // Count entries by residency/format
317        let total_entries = self.index.entry_count();
318
319        let mut memory_arrow_entries = 0usize;
320        let mut memory_liquid_entries = 0usize;
321        let mut memory_hybrid_liquid_entries = 0usize;
322        let mut disk_liquid_entries = 0usize;
323        let mut disk_arrow_entries = 0usize;
324
325        self.index.for_each(|_, batch| match batch {
326            CachedBatch::MemoryArrow(_) => memory_arrow_entries += 1,
327            CachedBatch::MemoryLiquid(_) => memory_liquid_entries += 1,
328            CachedBatch::MemoryHybridLiquid(_) => memory_hybrid_liquid_entries += 1,
329            CachedBatch::DiskLiquid(_) => disk_liquid_entries += 1,
330            CachedBatch::DiskArrow(_) => disk_arrow_entries += 1,
331        });
332
333        let memory_usage_bytes = self.budget.memory_usage_bytes();
334        let disk_usage_bytes = self.budget.disk_usage_bytes();
335        let runtime = self.runtime_stats.consume_snapshot();
336
337        CacheStats {
338            total_entries,
339            memory_arrow_entries,
340            memory_liquid_entries,
341            memory_hybrid_liquid_entries,
342            disk_liquid_entries,
343            disk_arrow_entries,
344            memory_usage_bytes,
345            disk_usage_bytes,
346            max_cache_bytes: self.config.max_cache_bytes(),
347            cache_root_dir: self.config.cache_root_dir().clone(),
348            runtime,
349        }
350    }
351
352    /// Insert a batch into the cache.
353    pub async fn insert(self: &Arc<Self>, entry_id: EntryID, batch_to_cache: ArrayRef) {
354        self.insert_inner(entry_id, CachedBatch::MemoryArrow(batch_to_cache))
355            .await;
356    }
357
358    /// Get an Arrow array from the cache asynchronously.
359    pub async fn get_arrow_array(&self, entry_id: &EntryID) -> Option<ArrayRef> {
360        self.runtime_stats.incr_get_arrow_array();
361        let batch = self.index.get(entry_id)?;
362        self.cache_policy
363            .notify_access(entry_id, CachedBatchType::from(&batch));
364
365        match batch {
366            CachedBatch::MemoryArrow(array) => Some(array),
367            CachedBatch::MemoryLiquid(array) => Some(array.to_arrow_array()),
368            CachedBatch::DiskLiquid(_) => {
369                let path = self.io_context.liquid_path(entry_id);
370                let bytes = self.io_context.read_file(path).await.ok()?;
371                let compressor_states = self.io_context.get_compressor(entry_id);
372                let compressor = compressor_states.fsst_compressor();
373                let liquid = crate::liquid_array::ipc::read_from_bytes(
374                    bytes,
375                    &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
376                );
377                Some(liquid.to_arrow_array())
378            }
379            CachedBatch::MemoryHybridLiquid(array) => match array.to_arrow_array() {
380                Ok(arr) => Some(arr),
381                Err(io_range) => {
382                    let path = self.io_context.liquid_path(entry_id);
383                    let bytes = self
384                        .io_context
385                        .read_range(path, io_range.range().clone())
386                        .await
387                        .ok()?;
388                    let new_array = array.soak(bytes);
389                    Some(new_array.to_arrow_array())
390                }
391            },
392            CachedBatch::DiskArrow(_) => {
393                let path = self.io_context.arrow_path(entry_id);
394                let bytes = self.io_context.read_file(path).await.ok()?;
395                let cursor = std::io::Cursor::new(bytes.to_vec());
396                let mut reader = arrow::ipc::reader::StreamReader::try_new(cursor, None).ok()?;
397                let batch = reader.next()?.ok()?;
398                Some(batch.column(0).clone())
399            }
400        }
401    }
402
403    /// Get an Arrow array with selection pushdown.
404    pub async fn get_with_selection(
405        &self,
406        entry_id: &EntryID,
407        selection: &BooleanBuffer,
408    ) -> Option<Result<ArrayRef, ArrowError>> {
409        use arrow::array::BooleanArray;
410
411        self.runtime_stats.incr_get_with_selection();
412        let batch = self.index.get(entry_id)?;
413        self.cache_policy
414            .notify_access(entry_id, CachedBatchType::from(&batch));
415
416        Some(match batch {
417            CachedBatch::MemoryArrow(array) => {
418                let selection_array = BooleanArray::new(selection.clone(), None);
419                arrow::compute::filter(&array, &selection_array)
420            }
421            CachedBatch::MemoryLiquid(array) => Ok(array.filter_to_arrow(selection)),
422            CachedBatch::DiskLiquid(data_type) => {
423                let select_any = selection.count_set_bits() > 0;
424                if !select_any {
425                    let empty_array = arrow::array::new_empty_array(&data_type);
426                    return Some(Ok(empty_array));
427                }
428                let path = self.io_context.liquid_path(entry_id);
429                let bytes = self.io_context.read_file(path).await.ok()?;
430                let compressor_states = self.io_context.get_compressor(entry_id);
431                let compressor = compressor_states.fsst_compressor();
432                let liquid = crate::liquid_array::ipc::read_from_bytes(
433                    bytes,
434                    &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
435                );
436                Ok(liquid.filter_to_arrow(selection))
437            }
438            CachedBatch::MemoryHybridLiquid(array) => match array.filter_to_arrow(selection) {
439                Ok(arr) => Ok(arr),
440                Err(io_range) => {
441                    let path = self.io_context.liquid_path(entry_id);
442                    let bytes = self
443                        .io_context
444                        .read_range(path, io_range.range().clone())
445                        .await
446                        .ok()?;
447                    let new_array = array.soak(bytes);
448                    Ok(new_array.filter_to_arrow(selection))
449                }
450            },
451            CachedBatch::DiskArrow(_) => {
452                let path = self.io_context.arrow_path(entry_id);
453                let bytes = self.io_context.read_file(path).await.ok()?;
454                let cursor = std::io::Cursor::new(bytes.to_vec());
455                let mut reader = arrow::ipc::reader::StreamReader::try_new(cursor, None).ok()?;
456                let batch = reader.next()?.ok()?;
457                let array = batch.column(0).clone();
458                let selection_array = BooleanArray::new(selection.clone(), None);
459                arrow::compute::filter(&array, &selection_array)
460            }
461        })
462    }
463
464    /// Get with predicate pushdown.
465    pub async fn get_with_predicate(
466        &self,
467        entry_id: &EntryID,
468        selection: &BooleanBuffer,
469        predicate: &Arc<dyn PhysicalExpr>,
470    ) -> Option<GetWithPredicateResult> {
471        use arrow::array::BooleanArray;
472
473        self.runtime_stats.incr_get_with_predicate();
474        let batch = self.index.get(entry_id)?;
475        self.cache_policy
476            .notify_access(entry_id, CachedBatchType::from(&batch));
477
478        Some(match batch {
479            CachedBatch::MemoryArrow(array) => {
480                let selection_array = BooleanArray::new(selection.clone(), None);
481                let selected = arrow::compute::filter(&array, &selection_array).ok()?;
482                GetWithPredicateResult::Filtered(selected)
483            }
484            CachedBatch::DiskArrow(_) => {
485                let path = self.io_context.arrow_path(entry_id);
486                let bytes = self.io_context.read_file(path).await.ok()?;
487                let cursor = std::io::Cursor::new(bytes.to_vec());
488                let mut reader = arrow::ipc::reader::StreamReader::try_new(cursor, None).ok()?;
489                let batch = reader.next()?.ok()?;
490                let array = batch.column(0).clone();
491                let selection_array = BooleanArray::new(selection.clone(), None);
492                let filtered = arrow::compute::filter(&array, &selection_array).ok()?;
493                GetWithPredicateResult::Filtered(filtered)
494            }
495            CachedBatch::MemoryLiquid(array) => {
496                match array.try_eval_predicate(predicate, selection) {
497                    Some(buf) => GetWithPredicateResult::Evaluated(buf),
498                    None => {
499                        let filtered = array.filter_to_arrow(selection);
500                        GetWithPredicateResult::Filtered(filtered)
501                    }
502                }
503            }
504            CachedBatch::DiskLiquid(_) => {
505                let path = self.io_context.liquid_path(entry_id);
506                let bytes = self.io_context.read_file(path).await.ok()?;
507                let compressor_states = self.io_context.get_compressor(entry_id);
508                let compressor = compressor_states.fsst_compressor();
509                let liquid = crate::liquid_array::ipc::read_from_bytes(
510                    bytes,
511                    &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
512                );
513                match liquid.try_eval_predicate(predicate, selection) {
514                    Some(buf) => GetWithPredicateResult::Evaluated(buf),
515                    None => {
516                        let filtered = liquid.filter_to_arrow(selection);
517                        GetWithPredicateResult::Filtered(filtered)
518                    }
519                }
520            }
521            CachedBatch::MemoryHybridLiquid(array) => {
522                match array.try_eval_predicate(predicate, selection) {
523                    Ok(Some(buf)) => {
524                        self.runtime_stats.incr_get_predicate_hybrid_success();
525                        GetWithPredicateResult::Evaluated(buf)
526                    }
527                    Ok(None) => {
528                        self.runtime_stats.incr_get_predicate_hybrid_unsupported();
529                        match array.filter_to_arrow(selection) {
530                            Ok(arr) => GetWithPredicateResult::Filtered(arr),
531                            Err(io_range) => {
532                                self.runtime_stats.incr_get_predicate_hybrid_needs_io();
533                                let path = self.io_context.liquid_path(entry_id);
534                                let bytes = self
535                                    .io_context
536                                    .read_range(path, io_range.range().clone())
537                                    .await
538                                    .ok()?;
539                                let new_array = array.soak(bytes);
540                                match new_array.try_eval_predicate(predicate, selection) {
541                                    Some(buf) => GetWithPredicateResult::Evaluated(buf),
542                                    None => {
543                                        let filtered = new_array.filter_to_arrow(selection);
544                                        GetWithPredicateResult::Filtered(filtered)
545                                    }
546                                }
547                            }
548                        }
549                    }
550                    Err(io_range) => {
551                        self.runtime_stats.incr_get_predicate_hybrid_needs_io();
552                        let path = self.io_context.liquid_path(entry_id);
553                        let bytes = self
554                            .io_context
555                            .read_range(path, io_range.range().clone())
556                            .await
557                            .ok()?;
558                        let new_array = array.soak(bytes);
559                        match new_array.try_eval_predicate(predicate, selection) {
560                            Some(buf) => GetWithPredicateResult::Evaluated(buf),
561                            None => {
562                                let filtered = new_array.filter_to_arrow(selection);
563                                GetWithPredicateResult::Filtered(filtered)
564                            }
565                        }
566                    }
567                }
568            }
569        })
570    }
571
572    /// Try to read a liquid array from the cache.
573    /// Returns None if the cached data is not in liquid format.
574    pub async fn try_read_liquid(
575        &self,
576        entry_id: &EntryID,
577    ) -> Option<crate::liquid_array::LiquidArrayRef> {
578        self.runtime_stats.incr_try_read_liquid();
579        let batch = self.index.get(entry_id)?;
580        self.cache_policy
581            .notify_access(entry_id, CachedBatchType::from(&batch));
582
583        match batch {
584            CachedBatch::MemoryLiquid(array) => Some(array),
585            CachedBatch::DiskLiquid(_) => {
586                let path = self.io_context.liquid_path(entry_id);
587                let bytes = self.io_context.read_file(path).await.ok()?;
588                let compressor_states = self.io_context.get_compressor(entry_id);
589                let compressor = compressor_states.fsst_compressor();
590                let liquid = crate::liquid_array::ipc::read_from_bytes(
591                    bytes,
592                    &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
593                );
594                Some(liquid)
595            }
596            CachedBatch::MemoryHybridLiquid(array) => {
597                let io_range = array.to_liquid();
598                let path = self.io_context.liquid_path(entry_id);
599                let bytes = self
600                    .io_context
601                    .read_range(path, io_range.range().clone())
602                    .await
603                    .ok()?;
604                Some(array.soak(bytes))
605            }
606            CachedBatch::DiskArrow(_) | CachedBatch::MemoryArrow(_) => None,
607        }
608    }
609
610    /// Iterate over all entries in the cache.
611    /// No guarantees are made about the order of the entries.
612    /// Isolation level: read-committed
613    pub fn for_each_entry(&self, mut f: impl FnMut(&EntryID, &CachedBatch)) {
614        self.index.for_each(&mut f);
615    }
616
617    /// Reset the cache.
618    pub fn reset(&self) {
619        self.index.reset();
620        self.budget.reset_usage();
621    }
622
623    /// Check if a batch is cached.
624    pub fn is_cached(&self, entry_id: &EntryID) -> bool {
625        self.index.is_cached(entry_id)
626    }
627
628    /// Get the config of the cache.
629    pub fn config(&self) -> &CacheConfig {
630        &self.config
631    }
632
633    /// Get the budget of the cache.
634    pub fn budget(&self) -> &BudgetAccounting {
635        &self.budget
636    }
637
638    /// Get the tracer of the cache.
639    pub fn tracer(&self) -> &CacheTracer {
640        &self.tracer
641    }
642
643    /// Get the index of the cache.
644    #[cfg(test)]
645    pub(crate) fn index(&self) -> &ArtIndex {
646        &self.index
647    }
648
649    /// Get the compressor states of the cache.
650    pub fn compressor_states(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
651        self.io_context.get_compressor(entry_id)
652    }
653
654    /// Flush all entries to disk.
655    pub fn flush_all_to_disk(&self) {
656        // Collect all entries that need to be flushed to disk
657        let mut entries_to_flush = Vec::new();
658
659        self.for_each_entry(|entry_id, batch| {
660            match batch {
661                CachedBatch::MemoryArrow(_)
662                | CachedBatch::MemoryLiquid(_)
663                | CachedBatch::MemoryHybridLiquid(_) => {
664                    entries_to_flush.push((*entry_id, batch.clone()));
665                }
666                CachedBatch::DiskArrow(_) | CachedBatch::DiskLiquid(_) => {
667                    // Already on disk, skip
668                }
669            }
670        });
671
672        // Now flush each entry to disk
673        for (entry_id, batch) in entries_to_flush {
674            match batch {
675                CachedBatch::MemoryArrow(array) => {
676                    let bytes = arrow_to_bytes(&array).expect("failed to convert arrow to bytes");
677                    let path = self.io_context.arrow_path(&entry_id);
678                    self.write_to_disk_blocking(&path, &bytes);
679                    self.try_insert(entry_id, CachedBatch::DiskArrow(array.data_type().clone()))
680                        .expect("failed to insert disk arrow entry");
681                }
682                CachedBatch::MemoryLiquid(liquid_array) => {
683                    let liquid_bytes = liquid_array.to_bytes();
684                    let path = self.io_context.liquid_path(&entry_id);
685                    self.write_to_disk_blocking(&path, &liquid_bytes);
686                    self.try_insert(
687                        entry_id,
688                        CachedBatch::DiskLiquid(liquid_array.original_arrow_data_type()),
689                    )
690                    .expect("failed to insert disk liquid entry");
691                }
692                CachedBatch::MemoryHybridLiquid(array) => {
693                    // We don't have to do anything, because it's already on disk
694                    self.try_insert(
695                        entry_id,
696                        CachedBatch::DiskLiquid(array.original_arrow_data_type()),
697                    )
698                    .expect("failed to insert disk liquid entry");
699                }
700                CachedBatch::DiskArrow(_) | CachedBatch::DiskLiquid(_) => {
701                    // Should not happen since we filtered these out above
702                    unreachable!("Unexpected disk batch in flush operation");
703                }
704            }
705        }
706    }
707}
708
709impl CacheStorage {
710    /// returns the batch that was written to disk
711    fn write_in_memory_batch_to_disk(&self, entry_id: EntryID, batch: CachedBatch) -> CachedBatch {
712        match batch {
713            CachedBatch::MemoryArrow(array) => {
714                let bytes = arrow_to_bytes(&array).expect("failed to convert arrow to bytes");
715                let path = self.io_context.arrow_path(&entry_id);
716                self.write_to_disk_blocking(&path, &bytes);
717                CachedBatch::DiskArrow(array.data_type().clone())
718            }
719            CachedBatch::MemoryLiquid(liquid_array) => {
720                let liquid_bytes = liquid_array.to_bytes();
721                let path = self.io_context.liquid_path(&entry_id);
722                self.write_to_disk_blocking(&path, &liquid_bytes);
723                CachedBatch::DiskLiquid(liquid_array.original_arrow_data_type())
724            }
725            CachedBatch::DiskLiquid(_)
726            | CachedBatch::DiskArrow(_)
727            | CachedBatch::MemoryHybridLiquid(_) => {
728                unreachable!("Unexpected batch in write_in_memory_batch_to_disk")
729            }
730        }
731    }
732
733    /// Insert a batch into the cache, it will run cache replacement policy until the batch is inserted.
734    pub(crate) async fn insert_inner(&self, entry_id: EntryID, mut batch_to_cache: CachedBatch) {
735        loop {
736            let batch_type = CachedBatchType::from(&batch_to_cache);
737            let Err(not_inserted) = self.try_insert(entry_id, batch_to_cache) else {
738                self.cache_policy.notify_insert(&entry_id, batch_type);
739                return;
740            };
741
742            let victims = self.cache_policy.find_victim(8);
743            if victims.is_empty() {
744                // no advice, because the cache is already empty
745                // this can happen if the entry to be inserted is too large, in that case,
746                // we write it to disk
747                let on_disk_batch = self.write_in_memory_batch_to_disk(entry_id, not_inserted);
748                batch_to_cache = on_disk_batch;
749                continue;
750            }
751            self.squeeze_victims(victims).await;
752
753            batch_to_cache = not_inserted;
754            crate::utils::yield_now_if_shuttle();
755        }
756    }
757
758    /// Create a new instance of CacheStorage.
759    fn new(
760        batch_size: usize,
761        max_cache_bytes: usize,
762        cache_dir: PathBuf,
763        squeeze_policy: Box<dyn SqueezePolicy>,
764        cache_policy: Box<dyn CachePolicy>,
765        io_worker: Arc<dyn IoContext>,
766    ) -> Self {
767        let config = CacheConfig::new(batch_size, max_cache_bytes, cache_dir);
768        Self {
769            index: ArtIndex::new(),
770            budget: BudgetAccounting::new(config.max_cache_bytes()),
771            config,
772            cache_policy,
773            squeeze_policy,
774            tracer: CacheTracer::new(),
775            io_context: io_worker,
776            runtime_stats: RuntimeStats::default(),
777        }
778    }
779
780    fn try_insert(&self, entry_id: EntryID, cached_batch: CachedBatch) -> Result<(), CachedBatch> {
781        let new_memory_size = cached_batch.memory_usage_bytes();
782        if let Some(entry) = self.index.get(&entry_id) {
783            let old_memory_size = entry.memory_usage_bytes();
784            if self
785                .budget
786                .try_update_memory_usage(old_memory_size, new_memory_size)
787                .is_err()
788            {
789                return Err(cached_batch);
790            }
791            self.index.insert(&entry_id, cached_batch);
792        } else {
793            if self.budget.try_reserve_memory(new_memory_size).is_err() {
794                return Err(cached_batch);
795            }
796            self.index.insert(&entry_id, cached_batch);
797        }
798
799        Ok(())
800    }
801
802    #[fastrace::trace]
803    async fn squeeze_victims(&self, victims: Vec<EntryID>) {
804        // Run squeeze operations sequentially using async I/O
805        for victim in victims {
806            self.squeeze_victim_inner(victim).await;
807        }
808    }
809
810    async fn squeeze_victim_inner(&self, to_squeeze: EntryID) {
811        let Some(mut to_squeeze_batch) = self.index.get(&to_squeeze) else {
812            return;
813        };
814        let compressor = self.io_context.get_compressor(&to_squeeze);
815
816        loop {
817            let (new_batch, bytes_to_write) = self
818                .squeeze_policy
819                .squeeze(to_squeeze_batch, compressor.as_ref());
820
821            if let Some(bytes_to_write) = bytes_to_write {
822                let path = match new_batch {
823                    CachedBatch::DiskArrow(_) => self.io_context.arrow_path(&to_squeeze),
824                    CachedBatch::DiskLiquid(_) | CachedBatch::MemoryHybridLiquid(_) => {
825                        self.io_context.liquid_path(&to_squeeze)
826                    }
827                    CachedBatch::MemoryArrow(_) | CachedBatch::MemoryLiquid(_) => {
828                        unreachable!()
829                    }
830                };
831                // Use IoContext's write_file for async I/O
832                if let Err(e) = self
833                    .io_context
834                    .write_file(path, bytes_to_write.clone())
835                    .await
836                {
837                    eprintln!("Failed to write to disk: {}", e);
838                    return;
839                }
840                self.budget.add_used_disk_bytes(bytes_to_write.len());
841            }
842            let batch_type = CachedBatchType::from(&new_batch);
843            match self.try_insert(to_squeeze, new_batch) {
844                Ok(()) => {
845                    self.cache_policy.notify_insert(&to_squeeze, batch_type);
846                    break;
847                }
848                Err(batch) => {
849                    to_squeeze_batch = batch;
850                }
851            }
852        }
853    }
854
855    fn write_to_disk_blocking(&self, path: impl AsRef<Path>, bytes: &[u8]) {
856        use std::io::Write;
857        let mut file = std::fs::File::create(&path).expect("failed to create file");
858        file.write_all(bytes).expect("failed to write to file");
859        let disk_usage = bytes.len();
860        self.budget.add_used_disk_bytes(disk_usage);
861    }
862}
863
864#[cfg(test)]
865mod tests {
866    use super::*;
867    use crate::cache::{
868        cache_policies::{CachePolicy, LruPolicy},
869        utils::{create_cache_store, create_test_array, create_test_arrow_array},
870    };
871    use crate::sync::thread;
872    use arrow::array::{Array, Int32Array};
873    use std::sync::atomic::{AtomicUsize, Ordering};
874
875    // Unified advice type for more concise testing
876    #[derive(Debug)]
877    struct TestPolicy {
878        target_id: Option<EntryID>,
879        advice_count: AtomicUsize,
880    }
881
882    impl TestPolicy {
883        fn new(target_id: Option<EntryID>) -> Self {
884            Self {
885                target_id,
886                advice_count: AtomicUsize::new(0),
887            }
888        }
889    }
890
891    impl CachePolicy for TestPolicy {
892        fn find_victim(&self, _cnt: usize) -> Vec<EntryID> {
893            self.advice_count.fetch_add(1, Ordering::SeqCst);
894            let id_to_use = self.target_id.unwrap();
895            vec![id_to_use]
896        }
897    }
898
899    #[tokio::test]
900    async fn test_basic_cache_operations() {
901        // Test basic insert, get, and size tracking in one test
902        let budget_size = 10 * 1024;
903        let store = create_cache_store(budget_size, Box::new(LruPolicy::new()));
904
905        // 1. Initial budget should be empty
906        assert_eq!(store.budget.memory_usage_bytes(), 0);
907
908        // 2. Insert and verify first entry
909        let entry_id1: EntryID = EntryID::from(1);
910        let array1 = create_test_array(100);
911        let size1 = array1.memory_usage_bytes();
912        store.insert_inner(entry_id1, array1).await;
913
914        // Verify budget usage and data correctness
915        assert_eq!(store.budget.memory_usage_bytes(), size1);
916        let retrieved1 = store.index().get(&entry_id1).unwrap();
917        match retrieved1 {
918            CachedBatch::MemoryArrow(arr) => assert_eq!(arr.len(), 100),
919            _ => panic!("Expected ArrowMemory"),
920        }
921
922        let entry_id2: EntryID = EntryID::from(2);
923        let array2 = create_test_array(200);
924        let size2 = array2.memory_usage_bytes();
925        store.insert_inner(entry_id2, array2).await;
926
927        assert_eq!(store.budget.memory_usage_bytes(), size1 + size2);
928
929        let array3 = create_test_array(150);
930        let size3 = array3.memory_usage_bytes();
931        store.insert_inner(entry_id1, array3).await;
932
933        assert_eq!(store.budget.memory_usage_bytes(), size3 + size2);
934        assert!(store.index().get(&EntryID::from(999)).is_none());
935    }
936
937    #[tokio::test]
938    async fn test_cache_advice_strategies() {
939        // Comprehensive test of all three advice types
940
941        // Create entry IDs we'll use throughout the test
942        let entry_id1 = EntryID::from(1);
943        let entry_id2 = EntryID::from(2);
944
945        // 1. Test EVICT advice
946        {
947            let advisor = TestPolicy::new(Some(entry_id1));
948            let store = create_cache_store(8000, Box::new(advisor)); // Small budget to force advice
949
950            store.insert_inner(entry_id1, create_test_array(800)).await;
951            match store.index().get(&entry_id1).unwrap() {
952                CachedBatch::MemoryArrow(_) => {}
953                other => panic!("Expected ArrowMemory, got {other:?}"),
954            }
955
956            store.insert_inner(entry_id2, create_test_array(800)).await;
957            match store.index().get(&entry_id1).unwrap() {
958                CachedBatch::MemoryLiquid(_) => {}
959                other => panic!("Expected LiquidMemory after eviction, got {other:?}"),
960            }
961        }
962    }
963
964    #[tokio::test]
965    async fn test_concurrent_cache_operations() {
966        concurrent_cache_operations().await;
967    }
968
969    #[cfg(feature = "shuttle")]
970    #[tokio::test]
971    async fn shuttle_cache_operations() {
972        crate::utils::shuttle_test(|| {
973            tokio::runtime::Runtime::new()
974                .unwrap()
975                .block_on(concurrent_cache_operations());
976        });
977    }
978    pub fn block_on<F: Future>(future: F) -> F::Output {
979        #[cfg(feature = "shuttle")]
980        {
981            shuttle::future::block_on(future)
982        }
983        #[cfg(not(feature = "shuttle"))]
984        {
985            tokio_test::block_on(future)
986        }
987    }
988
989    async fn concurrent_cache_operations() {
990        let num_threads = 3;
991        let ops_per_thread = 50;
992
993        let budget_size = num_threads * ops_per_thread * 100 * 8 / 2;
994        let store = Arc::new(create_cache_store(budget_size, Box::new(LruPolicy::new())));
995
996        let mut handles = vec![];
997        for thread_id in 0..num_threads {
998            let store = store.clone();
999            handles.push(thread::spawn(move || {
1000                block_on(async {
1001                    for i in 0..ops_per_thread {
1002                        let unique_id = thread_id * ops_per_thread + i;
1003                        let entry_id: EntryID = EntryID::from(unique_id);
1004                        let array = create_test_arrow_array(100);
1005                        store.insert(entry_id, array).await;
1006                    }
1007                });
1008            }));
1009        }
1010        for handle in handles {
1011            handle.join().unwrap();
1012        }
1013
1014        // Invariant 1: Every previously inserted entry can be retrieved
1015        for thread_id in 0..num_threads {
1016            for i in 0..ops_per_thread {
1017                let unique_id = thread_id * ops_per_thread + i;
1018                let entry_id: EntryID = EntryID::from(unique_id);
1019                assert!(store.index().get(&entry_id).is_some());
1020            }
1021        }
1022
1023        // Invariant 2: Number of entries matches number of insertions
1024        assert_eq!(store.index().keys().len(), num_threads * ops_per_thread);
1025    }
1026
1027    #[tokio::test]
1028    async fn test_cache_stats_memory_and_disk_usage() {
1029        // Build a small cache in blocking liquid mode to avoid background tasks
1030        let storage = CacheStorageBuilder::new()
1031            .with_max_cache_bytes(10 * 1024 * 1024)
1032            .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1033            .build();
1034
1035        // Insert two small batches
1036        let arr1: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
1037        let arr2: ArrayRef = Arc::new(Int32Array::from_iter_values(0..128));
1038        storage.insert(EntryID::from(1usize), arr1).await;
1039        storage.insert(EntryID::from(2usize), arr2).await;
1040
1041        // Stats after insert: 2 entries, memory usage > 0, disk usage == 0
1042        let s = storage.stats();
1043        assert_eq!(s.total_entries, 2);
1044        assert!(s.memory_usage_bytes > 0);
1045        assert_eq!(s.disk_usage_bytes, 0);
1046        assert_eq!(s.max_cache_bytes, 10 * 1024 * 1024);
1047
1048        // Flush to disk and verify memory usage drops and disk usage increases
1049        storage.flush_all_to_disk();
1050        let s2 = storage.stats();
1051        assert_eq!(s2.total_entries, 2);
1052        assert!(s2.disk_usage_bytes > 0);
1053        // In-memory usage should be reduced after moving to on-disk formats
1054        assert!(s2.memory_usage_bytes <= s.memory_usage_bytes);
1055    }
1056}