1use arrow::array::ArrayRef;
2use arrow::buffer::BooleanBuffer;
3use arrow_schema::ArrowError;
4use datafusion::physical_plan::PhysicalExpr;
5use std::path::PathBuf;
6use std::{fmt::Debug, path::Path};
7
8use super::{
9 budget::BudgetAccounting,
10 cache_policies::CachePolicy,
11 cached_batch::{CachedBatch, CachedBatchType, GetWithPredicateResult},
12 tracer::CacheTracer,
13 utils::CacheConfig,
14};
15use crate::cache::squeeze_policies::{SqueezePolicy, TranscodeSqueezeEvict};
16use crate::cache::stats::{CacheStats, RuntimeStats};
17use crate::cache::utils::{LiquidCompressorStates, arrow_to_bytes};
18use crate::cache::{index::ArtIndex, utils::EntryID};
19use crate::cache_policies::LiquidPolicy;
20use crate::sync::Arc;
21
22use bytes::Bytes;
23use std::ops::Range;
24
25#[async_trait::async_trait]
27pub trait IoContext: Debug + Send + Sync {
28 fn base_dir(&self) -> &Path;
30
31 fn get_compressor(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates>;
33
34 fn arrow_path(&self, entry_id: &EntryID) -> PathBuf;
36
37 fn liquid_path(&self, entry_id: &EntryID) -> PathBuf;
39
40 async fn read_file(&self, path: PathBuf) -> Result<Bytes, std::io::Error>;
42
43 async fn read_range(&self, path: PathBuf, range: Range<u64>) -> Result<Bytes, std::io::Error>;
45
46 async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error>;
48}
49
50#[derive(Debug)]
52pub struct DefaultIoContext {
53 compressor_state: Arc<LiquidCompressorStates>,
54 base_dir: PathBuf,
55}
56
57impl DefaultIoContext {
58 pub fn new(base_dir: PathBuf) -> Self {
60 Self {
61 compressor_state: Arc::new(LiquidCompressorStates::new()),
62 base_dir,
63 }
64 }
65}
66
67#[async_trait::async_trait]
68impl IoContext for DefaultIoContext {
69 fn base_dir(&self) -> &Path {
70 &self.base_dir
71 }
72
73 fn get_compressor(&self, _entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
74 self.compressor_state.clone()
75 }
76
77 fn arrow_path(&self, entry_id: &EntryID) -> PathBuf {
78 self.base_dir()
79 .join(format!("{:016x}.arrow", usize::from(*entry_id)))
80 }
81
82 fn liquid_path(&self, entry_id: &EntryID) -> PathBuf {
83 self.base_dir()
84 .join(format!("{:016x}.liquid", usize::from(*entry_id)))
85 }
86
87 async fn read_file(&self, path: PathBuf) -> Result<Bytes, std::io::Error> {
88 use tokio::io::AsyncReadExt;
89 let mut file = tokio::fs::File::open(path).await?;
90 let mut bytes = Vec::new();
91 file.read_to_end(&mut bytes).await?;
92 Ok(Bytes::from(bytes))
93 }
94
95 async fn read_range(&self, path: PathBuf, range: Range<u64>) -> Result<Bytes, std::io::Error> {
96 use tokio::io::{AsyncReadExt, AsyncSeekExt};
97 let mut file = tokio::fs::File::open(path).await?;
98 let len = (range.end - range.start) as usize;
99 let mut bytes = vec![0u8; len];
100 file.seek(tokio::io::SeekFrom::Start(range.start)).await?;
101 file.read_exact(&mut bytes).await?;
102 Ok(Bytes::from(bytes))
103 }
104
105 async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error> {
106 use tokio::io::AsyncWriteExt;
107 let mut file = tokio::fs::File::create(path).await?;
108 file.write_all(&data).await?;
109 Ok(())
110 }
111}
112
113#[derive(Debug)]
116pub struct BlockingIoContext {
117 compressor_state: Arc<LiquidCompressorStates>,
118 base_dir: PathBuf,
119}
120
121impl BlockingIoContext {
122 pub fn new(base_dir: PathBuf) -> Self {
124 Self {
125 compressor_state: Arc::new(LiquidCompressorStates::new()),
126 base_dir,
127 }
128 }
129}
130
131#[async_trait::async_trait]
132impl IoContext for BlockingIoContext {
133 fn base_dir(&self) -> &Path {
134 &self.base_dir
135 }
136
137 fn get_compressor(&self, _entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
138 self.compressor_state.clone()
139 }
140
141 fn arrow_path(&self, entry_id: &EntryID) -> PathBuf {
142 self.base_dir()
143 .join(format!("{:016x}.arrow", usize::from(*entry_id)))
144 }
145
146 fn liquid_path(&self, entry_id: &EntryID) -> PathBuf {
147 self.base_dir()
148 .join(format!("{:016x}.liquid", usize::from(*entry_id)))
149 }
150
151 async fn read_file(&self, path: PathBuf) -> Result<Bytes, std::io::Error> {
152 let mut file = std::fs::File::open(path)?;
153 let mut bytes = Vec::new();
154 std::io::Read::read_to_end(&mut file, &mut bytes)?;
155 Ok(Bytes::from(bytes))
156 }
157
158 async fn read_range(&self, path: PathBuf, range: Range<u64>) -> Result<Bytes, std::io::Error> {
159 let mut file = std::fs::File::open(path)?;
160 let len = (range.end - range.start) as usize;
161 let mut bytes = vec![0u8; len];
162 std::io::Seek::seek(&mut file, std::io::SeekFrom::Start(range.start))?;
163 std::io::Read::read_exact(&mut file, &mut bytes)?;
164 Ok(Bytes::from(bytes))
165 }
166
167 async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error> {
168 let mut file = std::fs::File::create(path)?;
169 std::io::Write::write_all(&mut file, &data)?;
170 Ok(())
171 }
172}
173
174pub struct CacheStorageBuilder {
191 batch_size: usize,
192 max_cache_bytes: usize,
193 cache_dir: Option<PathBuf>,
194 cache_policy: Box<dyn CachePolicy>,
195 squeeze_policy: Box<dyn SqueezePolicy>,
196 io_worker: Option<Arc<dyn IoContext>>,
197}
198
199impl Default for CacheStorageBuilder {
200 fn default() -> Self {
201 Self::new()
202 }
203}
204
205impl CacheStorageBuilder {
206 pub fn new() -> Self {
208 Self {
209 batch_size: 8192,
210 max_cache_bytes: 1024 * 1024 * 1024,
211 cache_dir: None,
212 cache_policy: Box::new(LiquidPolicy::new()),
213 squeeze_policy: Box::new(TranscodeSqueezeEvict),
214 io_worker: None,
215 }
216 }
217
218 pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self {
221 self.cache_dir = Some(cache_dir);
222 self
223 }
224
225 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
228 self.batch_size = batch_size;
229 self
230 }
231
232 pub fn with_max_cache_bytes(mut self, max_cache_bytes: usize) -> Self {
235 self.max_cache_bytes = max_cache_bytes;
236 self
237 }
238
239 pub fn with_cache_policy(mut self, policy: Box<dyn CachePolicy>) -> Self {
242 self.cache_policy = policy;
243 self
244 }
245
246 pub fn with_squeeze_policy(mut self, policy: Box<dyn SqueezePolicy>) -> Self {
249 self.squeeze_policy = policy;
250 self
251 }
252
253 pub fn with_io_worker(mut self, io_worker: Arc<dyn IoContext>) -> Self {
256 self.io_worker = Some(io_worker);
257 self
258 }
259
260 pub fn build(self) -> Arc<CacheStorage> {
264 let cache_dir = self
265 .cache_dir
266 .unwrap_or_else(|| tempfile::tempdir().unwrap().keep());
267 let io_worker = self
268 .io_worker
269 .unwrap_or_else(|| Arc::new(DefaultIoContext::new(cache_dir.clone())));
270 Arc::new(CacheStorage::new(
271 self.batch_size,
272 self.max_cache_bytes,
273 cache_dir,
274 self.squeeze_policy,
275 self.cache_policy,
276 io_worker,
277 ))
278 }
279}
280
281#[derive(Debug)]
302pub struct CacheStorage {
303 index: ArtIndex,
304 config: CacheConfig,
305 budget: BudgetAccounting,
306 cache_policy: Box<dyn CachePolicy>,
307 squeeze_policy: Box<dyn SqueezePolicy>,
308 tracer: CacheTracer,
309 io_context: Arc<dyn IoContext>,
310 runtime_stats: RuntimeStats,
311}
312
313impl CacheStorage {
314 pub fn stats(&self) -> CacheStats {
316 let total_entries = self.index.entry_count();
318
319 let mut memory_arrow_entries = 0usize;
320 let mut memory_liquid_entries = 0usize;
321 let mut memory_hybrid_liquid_entries = 0usize;
322 let mut disk_liquid_entries = 0usize;
323 let mut disk_arrow_entries = 0usize;
324
325 self.index.for_each(|_, batch| match batch {
326 CachedBatch::MemoryArrow(_) => memory_arrow_entries += 1,
327 CachedBatch::MemoryLiquid(_) => memory_liquid_entries += 1,
328 CachedBatch::MemoryHybridLiquid(_) => memory_hybrid_liquid_entries += 1,
329 CachedBatch::DiskLiquid(_) => disk_liquid_entries += 1,
330 CachedBatch::DiskArrow(_) => disk_arrow_entries += 1,
331 });
332
333 let memory_usage_bytes = self.budget.memory_usage_bytes();
334 let disk_usage_bytes = self.budget.disk_usage_bytes();
335 let runtime = self.runtime_stats.consume_snapshot();
336
337 CacheStats {
338 total_entries,
339 memory_arrow_entries,
340 memory_liquid_entries,
341 memory_hybrid_liquid_entries,
342 disk_liquid_entries,
343 disk_arrow_entries,
344 memory_usage_bytes,
345 disk_usage_bytes,
346 max_cache_bytes: self.config.max_cache_bytes(),
347 cache_root_dir: self.config.cache_root_dir().clone(),
348 runtime,
349 }
350 }
351
352 pub async fn insert(self: &Arc<Self>, entry_id: EntryID, batch_to_cache: ArrayRef) {
354 self.insert_inner(entry_id, CachedBatch::MemoryArrow(batch_to_cache))
355 .await;
356 }
357
358 pub async fn get_arrow_array(&self, entry_id: &EntryID) -> Option<ArrayRef> {
360 self.runtime_stats.incr_get_arrow_array();
361 let batch = self.index.get(entry_id)?;
362 self.cache_policy
363 .notify_access(entry_id, CachedBatchType::from(&batch));
364
365 match batch {
366 CachedBatch::MemoryArrow(array) => Some(array),
367 CachedBatch::MemoryLiquid(array) => Some(array.to_arrow_array()),
368 CachedBatch::DiskLiquid(_) => {
369 let path = self.io_context.liquid_path(entry_id);
370 let bytes = self.io_context.read_file(path).await.ok()?;
371 let compressor_states = self.io_context.get_compressor(entry_id);
372 let compressor = compressor_states.fsst_compressor();
373 let liquid = crate::liquid_array::ipc::read_from_bytes(
374 bytes,
375 &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
376 );
377 Some(liquid.to_arrow_array())
378 }
379 CachedBatch::MemoryHybridLiquid(array) => match array.to_arrow_array() {
380 Ok(arr) => Some(arr),
381 Err(io_range) => {
382 let path = self.io_context.liquid_path(entry_id);
383 let bytes = self
384 .io_context
385 .read_range(path, io_range.range().clone())
386 .await
387 .ok()?;
388 let new_array = array.soak(bytes);
389 Some(new_array.to_arrow_array())
390 }
391 },
392 CachedBatch::DiskArrow(_) => {
393 let path = self.io_context.arrow_path(entry_id);
394 let bytes = self.io_context.read_file(path).await.ok()?;
395 let cursor = std::io::Cursor::new(bytes.to_vec());
396 let mut reader = arrow::ipc::reader::StreamReader::try_new(cursor, None).ok()?;
397 let batch = reader.next()?.ok()?;
398 Some(batch.column(0).clone())
399 }
400 }
401 }
402
403 pub async fn get_with_selection(
405 &self,
406 entry_id: &EntryID,
407 selection: &BooleanBuffer,
408 ) -> Option<Result<ArrayRef, ArrowError>> {
409 use arrow::array::BooleanArray;
410
411 self.runtime_stats.incr_get_with_selection();
412 let batch = self.index.get(entry_id)?;
413 self.cache_policy
414 .notify_access(entry_id, CachedBatchType::from(&batch));
415
416 Some(match batch {
417 CachedBatch::MemoryArrow(array) => {
418 let selection_array = BooleanArray::new(selection.clone(), None);
419 arrow::compute::filter(&array, &selection_array)
420 }
421 CachedBatch::MemoryLiquid(array) => Ok(array.filter_to_arrow(selection)),
422 CachedBatch::DiskLiquid(data_type) => {
423 let select_any = selection.count_set_bits() > 0;
424 if !select_any {
425 let empty_array = arrow::array::new_empty_array(&data_type);
426 return Some(Ok(empty_array));
427 }
428 let path = self.io_context.liquid_path(entry_id);
429 let bytes = self.io_context.read_file(path).await.ok()?;
430 let compressor_states = self.io_context.get_compressor(entry_id);
431 let compressor = compressor_states.fsst_compressor();
432 let liquid = crate::liquid_array::ipc::read_from_bytes(
433 bytes,
434 &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
435 );
436 Ok(liquid.filter_to_arrow(selection))
437 }
438 CachedBatch::MemoryHybridLiquid(array) => match array.filter_to_arrow(selection) {
439 Ok(arr) => Ok(arr),
440 Err(io_range) => {
441 let path = self.io_context.liquid_path(entry_id);
442 let bytes = self
443 .io_context
444 .read_range(path, io_range.range().clone())
445 .await
446 .ok()?;
447 let new_array = array.soak(bytes);
448 Ok(new_array.filter_to_arrow(selection))
449 }
450 },
451 CachedBatch::DiskArrow(_) => {
452 let path = self.io_context.arrow_path(entry_id);
453 let bytes = self.io_context.read_file(path).await.ok()?;
454 let cursor = std::io::Cursor::new(bytes.to_vec());
455 let mut reader = arrow::ipc::reader::StreamReader::try_new(cursor, None).ok()?;
456 let batch = reader.next()?.ok()?;
457 let array = batch.column(0).clone();
458 let selection_array = BooleanArray::new(selection.clone(), None);
459 arrow::compute::filter(&array, &selection_array)
460 }
461 })
462 }
463
464 pub async fn get_with_predicate(
466 &self,
467 entry_id: &EntryID,
468 selection: &BooleanBuffer,
469 predicate: &Arc<dyn PhysicalExpr>,
470 ) -> Option<GetWithPredicateResult> {
471 use arrow::array::BooleanArray;
472
473 self.runtime_stats.incr_get_with_predicate();
474 let batch = self.index.get(entry_id)?;
475 self.cache_policy
476 .notify_access(entry_id, CachedBatchType::from(&batch));
477
478 Some(match batch {
479 CachedBatch::MemoryArrow(array) => {
480 let selection_array = BooleanArray::new(selection.clone(), None);
481 let selected = arrow::compute::filter(&array, &selection_array).ok()?;
482 GetWithPredicateResult::Filtered(selected)
483 }
484 CachedBatch::DiskArrow(_) => {
485 let path = self.io_context.arrow_path(entry_id);
486 let bytes = self.io_context.read_file(path).await.ok()?;
487 let cursor = std::io::Cursor::new(bytes.to_vec());
488 let mut reader = arrow::ipc::reader::StreamReader::try_new(cursor, None).ok()?;
489 let batch = reader.next()?.ok()?;
490 let array = batch.column(0).clone();
491 let selection_array = BooleanArray::new(selection.clone(), None);
492 let filtered = arrow::compute::filter(&array, &selection_array).ok()?;
493 GetWithPredicateResult::Filtered(filtered)
494 }
495 CachedBatch::MemoryLiquid(array) => {
496 match array.try_eval_predicate(predicate, selection) {
497 Some(buf) => GetWithPredicateResult::Evaluated(buf),
498 None => {
499 let filtered = array.filter_to_arrow(selection);
500 GetWithPredicateResult::Filtered(filtered)
501 }
502 }
503 }
504 CachedBatch::DiskLiquid(_) => {
505 let path = self.io_context.liquid_path(entry_id);
506 let bytes = self.io_context.read_file(path).await.ok()?;
507 let compressor_states = self.io_context.get_compressor(entry_id);
508 let compressor = compressor_states.fsst_compressor();
509 let liquid = crate::liquid_array::ipc::read_from_bytes(
510 bytes,
511 &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
512 );
513 match liquid.try_eval_predicate(predicate, selection) {
514 Some(buf) => GetWithPredicateResult::Evaluated(buf),
515 None => {
516 let filtered = liquid.filter_to_arrow(selection);
517 GetWithPredicateResult::Filtered(filtered)
518 }
519 }
520 }
521 CachedBatch::MemoryHybridLiquid(array) => {
522 match array.try_eval_predicate(predicate, selection) {
523 Ok(Some(buf)) => {
524 self.runtime_stats.incr_get_predicate_hybrid_success();
525 GetWithPredicateResult::Evaluated(buf)
526 }
527 Ok(None) => {
528 self.runtime_stats.incr_get_predicate_hybrid_unsupported();
529 match array.filter_to_arrow(selection) {
530 Ok(arr) => GetWithPredicateResult::Filtered(arr),
531 Err(io_range) => {
532 self.runtime_stats.incr_get_predicate_hybrid_needs_io();
533 let path = self.io_context.liquid_path(entry_id);
534 let bytes = self
535 .io_context
536 .read_range(path, io_range.range().clone())
537 .await
538 .ok()?;
539 let new_array = array.soak(bytes);
540 match new_array.try_eval_predicate(predicate, selection) {
541 Some(buf) => GetWithPredicateResult::Evaluated(buf),
542 None => {
543 let filtered = new_array.filter_to_arrow(selection);
544 GetWithPredicateResult::Filtered(filtered)
545 }
546 }
547 }
548 }
549 }
550 Err(io_range) => {
551 self.runtime_stats.incr_get_predicate_hybrid_needs_io();
552 let path = self.io_context.liquid_path(entry_id);
553 let bytes = self
554 .io_context
555 .read_range(path, io_range.range().clone())
556 .await
557 .ok()?;
558 let new_array = array.soak(bytes);
559 match new_array.try_eval_predicate(predicate, selection) {
560 Some(buf) => GetWithPredicateResult::Evaluated(buf),
561 None => {
562 let filtered = new_array.filter_to_arrow(selection);
563 GetWithPredicateResult::Filtered(filtered)
564 }
565 }
566 }
567 }
568 }
569 })
570 }
571
572 pub async fn try_read_liquid(
575 &self,
576 entry_id: &EntryID,
577 ) -> Option<crate::liquid_array::LiquidArrayRef> {
578 self.runtime_stats.incr_try_read_liquid();
579 let batch = self.index.get(entry_id)?;
580 self.cache_policy
581 .notify_access(entry_id, CachedBatchType::from(&batch));
582
583 match batch {
584 CachedBatch::MemoryLiquid(array) => Some(array),
585 CachedBatch::DiskLiquid(_) => {
586 let path = self.io_context.liquid_path(entry_id);
587 let bytes = self.io_context.read_file(path).await.ok()?;
588 let compressor_states = self.io_context.get_compressor(entry_id);
589 let compressor = compressor_states.fsst_compressor();
590 let liquid = crate::liquid_array::ipc::read_from_bytes(
591 bytes,
592 &crate::liquid_array::ipc::LiquidIPCContext::new(compressor),
593 );
594 Some(liquid)
595 }
596 CachedBatch::MemoryHybridLiquid(array) => {
597 let io_range = array.to_liquid();
598 let path = self.io_context.liquid_path(entry_id);
599 let bytes = self
600 .io_context
601 .read_range(path, io_range.range().clone())
602 .await
603 .ok()?;
604 Some(array.soak(bytes))
605 }
606 CachedBatch::DiskArrow(_) | CachedBatch::MemoryArrow(_) => None,
607 }
608 }
609
610 pub fn for_each_entry(&self, mut f: impl FnMut(&EntryID, &CachedBatch)) {
614 self.index.for_each(&mut f);
615 }
616
617 pub fn reset(&self) {
619 self.index.reset();
620 self.budget.reset_usage();
621 }
622
623 pub fn is_cached(&self, entry_id: &EntryID) -> bool {
625 self.index.is_cached(entry_id)
626 }
627
628 pub fn config(&self) -> &CacheConfig {
630 &self.config
631 }
632
633 pub fn budget(&self) -> &BudgetAccounting {
635 &self.budget
636 }
637
638 pub fn tracer(&self) -> &CacheTracer {
640 &self.tracer
641 }
642
643 #[cfg(test)]
645 pub(crate) fn index(&self) -> &ArtIndex {
646 &self.index
647 }
648
649 pub fn compressor_states(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
651 self.io_context.get_compressor(entry_id)
652 }
653
654 pub fn flush_all_to_disk(&self) {
656 let mut entries_to_flush = Vec::new();
658
659 self.for_each_entry(|entry_id, batch| {
660 match batch {
661 CachedBatch::MemoryArrow(_)
662 | CachedBatch::MemoryLiquid(_)
663 | CachedBatch::MemoryHybridLiquid(_) => {
664 entries_to_flush.push((*entry_id, batch.clone()));
665 }
666 CachedBatch::DiskArrow(_) | CachedBatch::DiskLiquid(_) => {
667 }
669 }
670 });
671
672 for (entry_id, batch) in entries_to_flush {
674 match batch {
675 CachedBatch::MemoryArrow(array) => {
676 let bytes = arrow_to_bytes(&array).expect("failed to convert arrow to bytes");
677 let path = self.io_context.arrow_path(&entry_id);
678 self.write_to_disk_blocking(&path, &bytes);
679 self.try_insert(entry_id, CachedBatch::DiskArrow(array.data_type().clone()))
680 .expect("failed to insert disk arrow entry");
681 }
682 CachedBatch::MemoryLiquid(liquid_array) => {
683 let liquid_bytes = liquid_array.to_bytes();
684 let path = self.io_context.liquid_path(&entry_id);
685 self.write_to_disk_blocking(&path, &liquid_bytes);
686 self.try_insert(
687 entry_id,
688 CachedBatch::DiskLiquid(liquid_array.original_arrow_data_type()),
689 )
690 .expect("failed to insert disk liquid entry");
691 }
692 CachedBatch::MemoryHybridLiquid(array) => {
693 self.try_insert(
695 entry_id,
696 CachedBatch::DiskLiquid(array.original_arrow_data_type()),
697 )
698 .expect("failed to insert disk liquid entry");
699 }
700 CachedBatch::DiskArrow(_) | CachedBatch::DiskLiquid(_) => {
701 unreachable!("Unexpected disk batch in flush operation");
703 }
704 }
705 }
706 }
707}
708
709impl CacheStorage {
710 fn write_in_memory_batch_to_disk(&self, entry_id: EntryID, batch: CachedBatch) -> CachedBatch {
712 match batch {
713 CachedBatch::MemoryArrow(array) => {
714 let bytes = arrow_to_bytes(&array).expect("failed to convert arrow to bytes");
715 let path = self.io_context.arrow_path(&entry_id);
716 self.write_to_disk_blocking(&path, &bytes);
717 CachedBatch::DiskArrow(array.data_type().clone())
718 }
719 CachedBatch::MemoryLiquid(liquid_array) => {
720 let liquid_bytes = liquid_array.to_bytes();
721 let path = self.io_context.liquid_path(&entry_id);
722 self.write_to_disk_blocking(&path, &liquid_bytes);
723 CachedBatch::DiskLiquid(liquid_array.original_arrow_data_type())
724 }
725 CachedBatch::DiskLiquid(_)
726 | CachedBatch::DiskArrow(_)
727 | CachedBatch::MemoryHybridLiquid(_) => {
728 unreachable!("Unexpected batch in write_in_memory_batch_to_disk")
729 }
730 }
731 }
732
733 pub(crate) async fn insert_inner(&self, entry_id: EntryID, mut batch_to_cache: CachedBatch) {
735 loop {
736 let batch_type = CachedBatchType::from(&batch_to_cache);
737 let Err(not_inserted) = self.try_insert(entry_id, batch_to_cache) else {
738 self.cache_policy.notify_insert(&entry_id, batch_type);
739 return;
740 };
741
742 let victims = self.cache_policy.find_victim(8);
743 if victims.is_empty() {
744 let on_disk_batch = self.write_in_memory_batch_to_disk(entry_id, not_inserted);
748 batch_to_cache = on_disk_batch;
749 continue;
750 }
751 self.squeeze_victims(victims).await;
752
753 batch_to_cache = not_inserted;
754 crate::utils::yield_now_if_shuttle();
755 }
756 }
757
758 fn new(
760 batch_size: usize,
761 max_cache_bytes: usize,
762 cache_dir: PathBuf,
763 squeeze_policy: Box<dyn SqueezePolicy>,
764 cache_policy: Box<dyn CachePolicy>,
765 io_worker: Arc<dyn IoContext>,
766 ) -> Self {
767 let config = CacheConfig::new(batch_size, max_cache_bytes, cache_dir);
768 Self {
769 index: ArtIndex::new(),
770 budget: BudgetAccounting::new(config.max_cache_bytes()),
771 config,
772 cache_policy,
773 squeeze_policy,
774 tracer: CacheTracer::new(),
775 io_context: io_worker,
776 runtime_stats: RuntimeStats::default(),
777 }
778 }
779
780 fn try_insert(&self, entry_id: EntryID, cached_batch: CachedBatch) -> Result<(), CachedBatch> {
781 let new_memory_size = cached_batch.memory_usage_bytes();
782 if let Some(entry) = self.index.get(&entry_id) {
783 let old_memory_size = entry.memory_usage_bytes();
784 if self
785 .budget
786 .try_update_memory_usage(old_memory_size, new_memory_size)
787 .is_err()
788 {
789 return Err(cached_batch);
790 }
791 self.index.insert(&entry_id, cached_batch);
792 } else {
793 if self.budget.try_reserve_memory(new_memory_size).is_err() {
794 return Err(cached_batch);
795 }
796 self.index.insert(&entry_id, cached_batch);
797 }
798
799 Ok(())
800 }
801
802 #[fastrace::trace]
803 async fn squeeze_victims(&self, victims: Vec<EntryID>) {
804 for victim in victims {
806 self.squeeze_victim_inner(victim).await;
807 }
808 }
809
810 async fn squeeze_victim_inner(&self, to_squeeze: EntryID) {
811 let Some(mut to_squeeze_batch) = self.index.get(&to_squeeze) else {
812 return;
813 };
814 let compressor = self.io_context.get_compressor(&to_squeeze);
815
816 loop {
817 let (new_batch, bytes_to_write) = self
818 .squeeze_policy
819 .squeeze(to_squeeze_batch, compressor.as_ref());
820
821 if let Some(bytes_to_write) = bytes_to_write {
822 let path = match new_batch {
823 CachedBatch::DiskArrow(_) => self.io_context.arrow_path(&to_squeeze),
824 CachedBatch::DiskLiquid(_) | CachedBatch::MemoryHybridLiquid(_) => {
825 self.io_context.liquid_path(&to_squeeze)
826 }
827 CachedBatch::MemoryArrow(_) | CachedBatch::MemoryLiquid(_) => {
828 unreachable!()
829 }
830 };
831 if let Err(e) = self
833 .io_context
834 .write_file(path, bytes_to_write.clone())
835 .await
836 {
837 eprintln!("Failed to write to disk: {}", e);
838 return;
839 }
840 self.budget.add_used_disk_bytes(bytes_to_write.len());
841 }
842 let batch_type = CachedBatchType::from(&new_batch);
843 match self.try_insert(to_squeeze, new_batch) {
844 Ok(()) => {
845 self.cache_policy.notify_insert(&to_squeeze, batch_type);
846 break;
847 }
848 Err(batch) => {
849 to_squeeze_batch = batch;
850 }
851 }
852 }
853 }
854
855 fn write_to_disk_blocking(&self, path: impl AsRef<Path>, bytes: &[u8]) {
856 use std::io::Write;
857 let mut file = std::fs::File::create(&path).expect("failed to create file");
858 file.write_all(bytes).expect("failed to write to file");
859 let disk_usage = bytes.len();
860 self.budget.add_used_disk_bytes(disk_usage);
861 }
862}
863
864#[cfg(test)]
865mod tests {
866 use super::*;
867 use crate::cache::{
868 cache_policies::{CachePolicy, LruPolicy},
869 utils::{create_cache_store, create_test_array, create_test_arrow_array},
870 };
871 use crate::sync::thread;
872 use arrow::array::{Array, Int32Array};
873 use std::sync::atomic::{AtomicUsize, Ordering};
874
875 #[derive(Debug)]
877 struct TestPolicy {
878 target_id: Option<EntryID>,
879 advice_count: AtomicUsize,
880 }
881
882 impl TestPolicy {
883 fn new(target_id: Option<EntryID>) -> Self {
884 Self {
885 target_id,
886 advice_count: AtomicUsize::new(0),
887 }
888 }
889 }
890
891 impl CachePolicy for TestPolicy {
892 fn find_victim(&self, _cnt: usize) -> Vec<EntryID> {
893 self.advice_count.fetch_add(1, Ordering::SeqCst);
894 let id_to_use = self.target_id.unwrap();
895 vec![id_to_use]
896 }
897 }
898
899 #[tokio::test]
900 async fn test_basic_cache_operations() {
901 let budget_size = 10 * 1024;
903 let store = create_cache_store(budget_size, Box::new(LruPolicy::new()));
904
905 assert_eq!(store.budget.memory_usage_bytes(), 0);
907
908 let entry_id1: EntryID = EntryID::from(1);
910 let array1 = create_test_array(100);
911 let size1 = array1.memory_usage_bytes();
912 store.insert_inner(entry_id1, array1).await;
913
914 assert_eq!(store.budget.memory_usage_bytes(), size1);
916 let retrieved1 = store.index().get(&entry_id1).unwrap();
917 match retrieved1 {
918 CachedBatch::MemoryArrow(arr) => assert_eq!(arr.len(), 100),
919 _ => panic!("Expected ArrowMemory"),
920 }
921
922 let entry_id2: EntryID = EntryID::from(2);
923 let array2 = create_test_array(200);
924 let size2 = array2.memory_usage_bytes();
925 store.insert_inner(entry_id2, array2).await;
926
927 assert_eq!(store.budget.memory_usage_bytes(), size1 + size2);
928
929 let array3 = create_test_array(150);
930 let size3 = array3.memory_usage_bytes();
931 store.insert_inner(entry_id1, array3).await;
932
933 assert_eq!(store.budget.memory_usage_bytes(), size3 + size2);
934 assert!(store.index().get(&EntryID::from(999)).is_none());
935 }
936
937 #[tokio::test]
938 async fn test_cache_advice_strategies() {
939 let entry_id1 = EntryID::from(1);
943 let entry_id2 = EntryID::from(2);
944
945 {
947 let advisor = TestPolicy::new(Some(entry_id1));
948 let store = create_cache_store(8000, Box::new(advisor)); store.insert_inner(entry_id1, create_test_array(800)).await;
951 match store.index().get(&entry_id1).unwrap() {
952 CachedBatch::MemoryArrow(_) => {}
953 other => panic!("Expected ArrowMemory, got {other:?}"),
954 }
955
956 store.insert_inner(entry_id2, create_test_array(800)).await;
957 match store.index().get(&entry_id1).unwrap() {
958 CachedBatch::MemoryLiquid(_) => {}
959 other => panic!("Expected LiquidMemory after eviction, got {other:?}"),
960 }
961 }
962 }
963
964 #[tokio::test]
965 async fn test_concurrent_cache_operations() {
966 concurrent_cache_operations().await;
967 }
968
969 #[cfg(feature = "shuttle")]
970 #[tokio::test]
971 async fn shuttle_cache_operations() {
972 crate::utils::shuttle_test(|| {
973 tokio::runtime::Runtime::new()
974 .unwrap()
975 .block_on(concurrent_cache_operations());
976 });
977 }
978 pub fn block_on<F: Future>(future: F) -> F::Output {
979 #[cfg(feature = "shuttle")]
980 {
981 shuttle::future::block_on(future)
982 }
983 #[cfg(not(feature = "shuttle"))]
984 {
985 tokio_test::block_on(future)
986 }
987 }
988
989 async fn concurrent_cache_operations() {
990 let num_threads = 3;
991 let ops_per_thread = 50;
992
993 let budget_size = num_threads * ops_per_thread * 100 * 8 / 2;
994 let store = Arc::new(create_cache_store(budget_size, Box::new(LruPolicy::new())));
995
996 let mut handles = vec![];
997 for thread_id in 0..num_threads {
998 let store = store.clone();
999 handles.push(thread::spawn(move || {
1000 block_on(async {
1001 for i in 0..ops_per_thread {
1002 let unique_id = thread_id * ops_per_thread + i;
1003 let entry_id: EntryID = EntryID::from(unique_id);
1004 let array = create_test_arrow_array(100);
1005 store.insert(entry_id, array).await;
1006 }
1007 });
1008 }));
1009 }
1010 for handle in handles {
1011 handle.join().unwrap();
1012 }
1013
1014 for thread_id in 0..num_threads {
1016 for i in 0..ops_per_thread {
1017 let unique_id = thread_id * ops_per_thread + i;
1018 let entry_id: EntryID = EntryID::from(unique_id);
1019 assert!(store.index().get(&entry_id).is_some());
1020 }
1021 }
1022
1023 assert_eq!(store.index().keys().len(), num_threads * ops_per_thread);
1025 }
1026
1027 #[tokio::test]
1028 async fn test_cache_stats_memory_and_disk_usage() {
1029 let storage = CacheStorageBuilder::new()
1031 .with_max_cache_bytes(10 * 1024 * 1024)
1032 .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
1033 .build();
1034
1035 let arr1: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
1037 let arr2: ArrayRef = Arc::new(Int32Array::from_iter_values(0..128));
1038 storage.insert(EntryID::from(1usize), arr1).await;
1039 storage.insert(EntryID::from(2usize), arr2).await;
1040
1041 let s = storage.stats();
1043 assert_eq!(s.total_entries, 2);
1044 assert!(s.memory_usage_bytes > 0);
1045 assert_eq!(s.disk_usage_bytes, 0);
1046 assert_eq!(s.max_cache_bytes, 10 * 1024 * 1024);
1047
1048 storage.flush_all_to_disk();
1050 let s2 = storage.stats();
1051 assert_eq!(s2.total_entries, 2);
1052 assert!(s2.disk_usage_bytes > 0);
1053 assert!(s2.memory_usage_bytes <= s.memory_usage_bytes);
1055 }
1056}