Skip to main content

liquid_cache_datafusion/cache/
mod.rs

1//! This module contains the cache implementation for the Parquet reader.
2//!
3
4use crate::io::ParquetIoContext;
5use crate::reader::{LiquidPredicate, extract_multi_column_or};
6use crate::sync::Mutex;
7use ahash::AHashMap;
8use arrow::array::{BooleanArray, RecordBatch};
9use arrow::buffer::BooleanBuffer;
10use arrow_schema::{ArrowError, Field, Schema, SchemaRef};
11use liquid_cache::cache::squeeze_policies::SqueezePolicy;
12use liquid_cache::cache::{
13    CachePolicy, EventTrace, HydrationPolicy, LiquidCache, LiquidCacheBuilder,
14};
15use liquid_cache_common::IoMode;
16use parquet::arrow::arrow_reader::ArrowPredicate;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21mod column;
22mod id;
23mod stats;
24
25pub(crate) use column::InsertArrowArrayError;
26pub use column::{CachedColumn, CachedColumnRef};
27pub(crate) use id::ColumnAccessPath;
28pub use id::{BatchID, ParquetArrayID};
29
30#[derive(Default, Debug)]
31struct ColumnMaps {
32    // invariant: Arc::ptr_eq(map[field.name()], map[field.id()])
33    by_id: AHashMap<u64, CachedColumnRef>,
34    by_name: AHashMap<String, CachedColumnRef>,
35}
36
37/// A row group in the cache.
38#[derive(Debug)]
39pub struct CachedRowGroup {
40    columns: ColumnMaps,
41    cache_store: Arc<LiquidCache>,
42}
43
44impl CachedRowGroup {
45    /// Create a new row group.
46    /// The column_ids are the indices of the columns in the file schema.
47    /// So they may not start from 0.
48    fn new(
49        cache_store: Arc<LiquidCache>,
50        row_group_idx: u64,
51        file_idx: u64,
52        columns: &[(u64, Arc<Field>, bool)],
53    ) -> Self {
54        let cache_dir = cache_store
55            .config()
56            .cache_root_dir()
57            .join(format!("file_{file_idx}"))
58            .join(format!("rg_{row_group_idx}"));
59        std::fs::create_dir_all(&cache_dir).expect("Failed to create cache directory");
60
61        let mut column_maps = ColumnMaps::default();
62        for (column_id, field, is_predicate_column) in columns {
63            let column_access_path = ColumnAccessPath::new(file_idx, row_group_idx, *column_id);
64            let column = Arc::new(CachedColumn::new(
65                Arc::clone(field),
66                Arc::clone(&cache_store),
67                column_access_path,
68                *is_predicate_column,
69            ));
70            column_maps.by_id.insert(*column_id, column.clone());
71            column_maps.by_name.insert(field.name().to_string(), column);
72        }
73
74        Self {
75            columns: column_maps,
76            cache_store,
77        }
78    }
79
80    /// Returns the batch size configured for this cached row group.
81    pub fn batch_size(&self) -> usize {
82        self.cache_store.config().batch_size()
83    }
84
85    /// Get a column from the row group.
86    pub fn get_column(&self, column_id: u64) -> Option<CachedColumnRef> {
87        self.columns.by_id.get(&column_id).cloned()
88    }
89
90    /// Get a column from the row group by its field name.
91    pub fn get_column_by_name(&self, column_name: &str) -> Option<CachedColumnRef> {
92        if let Some(column) = self.columns.by_name.get(column_name) {
93            return Some(column.clone());
94        }
95
96        // DataFusion may carry qualified names in physical expressions
97        // (e.g. "table.col"), while cache fields are keyed by file schema names.
98        let unqualified = column_name.rsplit('.').next().unwrap_or(column_name);
99        self.columns.by_name.get(unqualified).cloned()
100    }
101
102    /// Evaluate a predicate on a row group.
103    #[fastrace::trace]
104    pub async fn evaluate_selection_with_predicate(
105        &self,
106        batch_id: BatchID,
107        selection: &BooleanBuffer,
108        predicate: &mut LiquidPredicate,
109    ) -> Option<Result<BooleanArray, ArrowError>> {
110        let column_ids = predicate.predicate_column_ids();
111
112        if column_ids.len() == 1 {
113            // If we only have one column, we can short-circuit and try to evaluate the predicate on encoded data.
114            let column_id = column_ids[0];
115            let cache = self.get_column(column_id as u64)?;
116            return cache
117                .eval_predicate_with_filter(batch_id, selection, predicate)
118                .await;
119        } else if column_ids.len() >= 2 {
120            // Try to extract multiple column-literal expressions from OR structure
121            if let Some(column_exprs) =
122                extract_multi_column_or(predicate.physical_expr_physical_column_index())
123            {
124                let mut combined_buffer: Option<BooleanArray> = None;
125
126                for (col_name, expr) in column_exprs {
127                    let column = self.get_column_by_name(col_name)?;
128                    let entry_id = column.entry_id(batch_id).into();
129                    let liquid_array = self.cache_store.try_read_liquid(&entry_id).await;
130                    let liquid_array = match liquid_array {
131                        None => {
132                            combined_buffer = None;
133                            break;
134                        }
135                        Some(array) => array,
136                    };
137                    let buffer =
138                        if let Some(buffer) = liquid_array.try_eval_predicate(&expr, selection) {
139                            buffer
140                        } else {
141                            combined_buffer = None;
142                            break;
143                        };
144
145                    combined_buffer = Some(match combined_buffer {
146                        None => buffer,
147                        Some(existing) => {
148                            arrow::compute::kernels::boolean::or_kleene(&existing, &buffer).ok()?
149                        }
150                    });
151                }
152
153                if let Some(result) = combined_buffer {
154                    return Some(Ok(result));
155                }
156            }
157        }
158        // Otherwise, we need to first convert the data into arrow arrays.
159        let mut arrays = Vec::new();
160        let mut fields = Vec::new();
161        for column_id in column_ids {
162            let column = self.get_column(column_id as u64)?;
163            let array = column
164                .get_arrow_array_with_filter(batch_id, selection)
165                .await?;
166            arrays.push(array);
167            fields.push(column.field());
168        }
169        let schema = Arc::new(Schema::new(fields));
170        let record_batch = RecordBatch::try_new(schema, arrays).unwrap();
171        let boolean_array = predicate.evaluate(record_batch).unwrap();
172        Some(Ok(boolean_array))
173    }
174}
175
176pub(crate) type CachedRowGroupRef = Arc<CachedRowGroup>;
177
178/// A file in the cache.
179#[derive(Debug)]
180pub struct CachedFile {
181    cache_store: Arc<LiquidCache>,
182    file_id: u64,
183    file_schema: SchemaRef,
184}
185
186impl CachedFile {
187    fn new(cache_store: Arc<LiquidCache>, file_id: u64, file_schema: SchemaRef) -> Self {
188        Self {
189            cache_store,
190            file_id,
191            file_schema,
192        }
193    }
194
195    /// Create a row group handle scoped to the current query context.
196    pub fn create_row_group(
197        &self,
198        row_group_id: u64,
199        predicate_column_ids: Vec<usize>,
200    ) -> CachedRowGroupRef {
201        let columns: Vec<(u64, Arc<Field>, bool)> = self
202            .file_schema
203            .fields()
204            .iter()
205            .enumerate()
206            .map(|(idx, field)| {
207                let is_predicate_column = predicate_column_ids.contains(&idx);
208                (idx as u64, Arc::clone(field), is_predicate_column)
209            })
210            .collect();
211
212        Arc::new(CachedRowGroup::new(
213            self.cache_store.clone(),
214            row_group_id,
215            self.file_id,
216            &columns,
217        ))
218    }
219
220    /// Return the configured cache batch size.
221    pub fn batch_size(&self) -> usize {
222        self.cache_store.config().batch_size()
223    }
224
225    /// Return the full file schema tracked by the cache entry.
226    pub fn schema(&self) -> SchemaRef {
227        Arc::clone(&self.file_schema)
228    }
229}
230
231/// A reference to a cached file.
232pub(crate) type CachedFileRef = Arc<CachedFile>;
233
234/// The main cache structure.
235#[derive(Debug)]
236pub struct LiquidCacheParquet {
237    /// Map file path to file id.
238    files: Mutex<AHashMap<String, u64>>,
239
240    cache_store: Arc<LiquidCache>,
241
242    current_file_id: AtomicU64,
243}
244
245/// A reference to the main cache structure.
246pub type LiquidCacheParquetRef = Arc<LiquidCacheParquet>;
247
248impl LiquidCacheParquet {
249    /// Create a new cache for parquet files.
250    pub fn new(
251        batch_size: usize,
252        max_cache_bytes: usize,
253        cache_dir: PathBuf,
254        cache_policy: Box<dyn CachePolicy>,
255        squeeze_policy: Box<dyn SqueezePolicy>,
256        hydration_policy: Box<dyn HydrationPolicy>,
257        io_mode: IoMode,
258    ) -> Self {
259        assert!(batch_size.is_power_of_two());
260        let io_context = Arc::new(ParquetIoContext::new(cache_dir.clone(), io_mode));
261        let cache_storage_builder = LiquidCacheBuilder::new()
262            .with_batch_size(batch_size)
263            .with_max_cache_bytes(max_cache_bytes)
264            .with_cache_dir(cache_dir.clone())
265            .with_squeeze_policy(squeeze_policy)
266            .with_cache_policy(cache_policy)
267            .with_hydration_policy(hydration_policy)
268            .with_io_context(io_context);
269        let cache_storage = cache_storage_builder.build();
270
271        LiquidCacheParquet {
272            files: Mutex::new(AHashMap::new()),
273            cache_store: cache_storage,
274            current_file_id: AtomicU64::new(0),
275        }
276    }
277
278    /// Register a file in the cache.
279    pub fn register_or_get_file(
280        &self,
281        file_path: String,
282        full_file_schema: SchemaRef,
283    ) -> CachedFileRef {
284        let mut files = self.files.lock().unwrap();
285        let file_id = *files
286            .entry(file_path.clone())
287            .or_insert_with(|| self.current_file_id.fetch_add(1, Ordering::Relaxed));
288        drop(files);
289
290        Arc::new(CachedFile::new(
291            self.cache_store.clone(),
292            file_id,
293            full_file_schema,
294        ))
295    }
296
297    /// Get the batch size of the cache.
298    pub fn batch_size(&self) -> usize {
299        self.cache_store.config().batch_size()
300    }
301
302    /// Get the max cache bytes of the cache.
303    pub fn max_cache_bytes(&self) -> usize {
304        self.cache_store.config().max_cache_bytes()
305    }
306
307    /// Get the memory usage of the cache in bytes.
308    pub fn memory_usage_bytes(&self) -> usize {
309        self.cache_store.budget().memory_usage_bytes()
310    }
311
312    /// Get the disk usage of the cache in bytes.
313    pub fn disk_usage_bytes(&self) -> usize {
314        self.cache_store.budget().disk_usage_bytes()
315    }
316
317    /// Flush the cache trace to a file.
318    pub fn flush_trace(&self, to_file: impl AsRef<Path>) {
319        self.cache_store.observer().flush_cache_trace(to_file);
320    }
321
322    /// Enable the cache trace.
323    pub fn enable_trace(&self) {
324        self.cache_store.observer().enable_cache_trace();
325    }
326
327    /// Disable the cache trace.
328    pub fn disable_trace(&self) {
329        self.cache_store.observer().disable_cache_trace();
330    }
331
332    /// Reset the cache.
333    ///
334    /// # Safety
335    /// This is unsafe because resetting the cache while other threads are using the cache may cause undefined behavior.
336    /// You should only call this when no one else is using the cache.
337    pub unsafe fn reset(&self) {
338        let mut files = self.files.lock().unwrap();
339        files.clear();
340        self.cache_store.reset();
341    }
342
343    /// Flush all memory-based entries to disk while preserving their format.
344    /// Arrow entries become DiskArrow, Liquid entries become DiskLiquid.
345    /// Entries already on disk are left unchanged.
346    ///
347    /// This is for admin use only.
348    /// This has no guarantees that some new entry will not be inserted in the meantime, or some entries are promoted to memory again.
349    /// You mostly want to use this when no one else is using the cache.
350    pub async fn flush_data(&self) {
351        self.cache_store.flush_all_to_disk().await;
352    }
353
354    /// Get the storage of the cache.
355    pub fn storage(&self) -> &Arc<LiquidCache> {
356        &self.cache_store
357    }
358
359    /// Consume the event trace of the cache.
360    pub fn consume_event_trace(&self) -> EventTrace {
361        self.cache_store.consume_event_trace()
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368    use crate::cache::{CachedRowGroupRef, LiquidCacheParquet};
369    use crate::reader::FilterCandidateBuilder;
370    use arrow::array::Int32Array;
371    use arrow::buffer::BooleanBuffer;
372    use arrow::datatypes::{DataType, Field, Schema};
373    use arrow::record_batch::RecordBatch;
374    use datafusion::common::ScalarValue;
375    use datafusion::logical_expr::Operator;
376    use datafusion::physical_expr::PhysicalExpr;
377    use datafusion::physical_expr::expressions::{BinaryExpr, Literal};
378    use datafusion::physical_plan::expressions::Column;
379    use liquid_cache::cache::AlwaysHydrate;
380    use liquid_cache::cache::squeeze_policies::TranscodeSqueezeEvict;
381    use liquid_cache::cache_policies::LiquidPolicy;
382    use parquet::arrow::ArrowWriter;
383    use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
384    use std::sync::Arc;
385
386    fn setup_cache(batch_size: usize, schema: SchemaRef) -> CachedRowGroupRef {
387        let tmp_dir = tempfile::tempdir().unwrap();
388        let cache = LiquidCacheParquet::new(
389            batch_size,
390            usize::MAX,
391            tmp_dir.path().to_path_buf(),
392            Box::new(LiquidPolicy::new()),
393            Box::new(TranscodeSqueezeEvict),
394            Box::new(AlwaysHydrate::new()),
395            IoMode::Uring,
396        );
397        let file = cache.register_or_get_file("test".to_string(), schema);
398        file.create_row_group(0, vec![])
399    }
400
401    #[tokio::test]
402    async fn evaluate_or_on_cached_columns() {
403        let batch_size = 4;
404
405        let schema = Arc::new(Schema::new(vec![
406            Field::new("a", DataType::Int32, false),
407            Field::new("b", DataType::Int32, false),
408        ]));
409        let row_group = setup_cache(batch_size, schema.clone());
410
411        let col_a = row_group.get_column(0).unwrap();
412        let col_b = row_group.get_column(1).unwrap();
413
414        let batch_id = BatchID::from_row_id(0, batch_size);
415
416        let array_a = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
417        let array_b = Arc::new(Int32Array::from(vec![10, 20, 30, 40]));
418
419        assert!(col_a.insert(batch_id, array_a.clone()).await.is_ok());
420        assert!(col_b.insert(batch_id, array_b.clone()).await.is_ok());
421
422        // build parquet metadata for predicate construction
423        let tmp_meta = tempfile::NamedTempFile::new().unwrap();
424        let mut writer =
425            ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
426        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array_a, array_b]).unwrap();
427        writer.write(&batch).unwrap();
428        writer.close().unwrap();
429        let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
430        let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
431
432        // expression a = 3 OR b = 20
433        let expr_a: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
434            Arc::new(Column::new("a", 0)),
435            Operator::Eq,
436            Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
437        ));
438        let expr_b: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
439            Arc::new(Column::new("b", 1)),
440            Operator::Eq,
441            Arc::new(Literal::new(ScalarValue::Int32(Some(20)))),
442        ));
443        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(expr_a, Operator::Or, expr_b));
444
445        let builder = FilterCandidateBuilder::new(expr, Arc::clone(&schema));
446        let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
447        let projection = candidate.projection(metadata.metadata());
448        let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
449
450        let selection = BooleanBuffer::new_set(batch_size);
451        let result = row_group
452            .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
453            .await
454            .unwrap()
455            .unwrap();
456
457        let expected = BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 2).into();
458        assert_eq!(result, expected);
459    }
460
461    #[tokio::test]
462    async fn evaluate_three_column_or() {
463        let batch_size = 8;
464
465        let schema = Arc::new(Schema::new(vec![
466            Field::new("a", DataType::Int32, false),
467            Field::new("b", DataType::Int32, false),
468            Field::new("c", DataType::Int32, false),
469        ]));
470
471        let row_group = setup_cache(batch_size, schema.clone());
472
473        let col_a = row_group.get_column(0).unwrap();
474        let col_b = row_group.get_column(1).unwrap();
475        let col_c = row_group.get_column(2).unwrap();
476
477        let batch_id = BatchID::from_row_id(0, batch_size);
478
479        let array_a = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]));
480        let array_b = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50, 60, 70, 80]));
481        let array_c = Arc::new(Int32Array::from(vec![
482            100, 200, 300, 400, 500, 600, 700, 800,
483        ]));
484
485        assert!(col_a.insert(batch_id, array_a.clone()).await.is_ok());
486        assert!(col_b.insert(batch_id, array_b.clone()).await.is_ok());
487        assert!(col_c.insert(batch_id, array_c.clone()).await.is_ok());
488
489        // build parquet metadata for predicate construction
490        let tmp_meta = tempfile::NamedTempFile::new().unwrap();
491        let mut writer =
492            ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
493        let batch =
494            RecordBatch::try_new(Arc::clone(&schema), vec![array_a, array_b, array_c]).unwrap();
495        writer.write(&batch).unwrap();
496        writer.close().unwrap();
497        let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
498        let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
499
500        // expression: a = 2 OR b = 40 OR c = 600
501        let expr_a: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
502            Arc::new(Column::new("a", 0)),
503            Operator::Eq,
504            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
505        ));
506        let expr_b: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
507            Arc::new(Column::new("b", 1)),
508            Operator::Eq,
509            Arc::new(Literal::new(ScalarValue::Int32(Some(40)))),
510        ));
511        let expr_c: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
512            Arc::new(Column::new("c", 2)),
513            Operator::Eq,
514            Arc::new(Literal::new(ScalarValue::Int32(Some(600)))),
515        ));
516
517        // Build nested OR: (a = 2 OR b = 40) OR c = 600
518        let expr_ab = Arc::new(BinaryExpr::new(expr_a, Operator::Or, expr_b));
519        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(expr_ab, Operator::Or, expr_c));
520
521        let builder = FilterCandidateBuilder::new(expr, Arc::clone(&schema));
522        let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
523        let projection = candidate.projection(metadata.metadata());
524        let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
525
526        let selection = BooleanBuffer::new_set(batch_size);
527        let result = row_group
528            .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
529            .await
530            .unwrap()
531            .unwrap();
532
533        // Expected: row 1 (a=2), row 3 (b=40), row 5 (c=600) -> indices 1, 3, 5
534        let expected =
535            BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 3 || i == 5).into();
536        assert_eq!(result, expected);
537    }
538
539    #[tokio::test]
540    async fn evaluate_string_column_or() {
541        let batch_size = 8;
542
543        let schema = Arc::new(Schema::new(vec![
544            Field::new("name", DataType::Utf8View, false),
545            Field::new("city", DataType::Utf8View, false),
546        ]));
547
548        let row_group = setup_cache(batch_size, schema.clone());
549
550        let col_name = row_group.get_column(0).unwrap();
551        let col_city = row_group.get_column(1).unwrap();
552
553        let batch_id = BatchID::from_row_id(0, batch_size);
554
555        let array_name = Arc::new(arrow::array::StringViewArray::from(vec![
556            "Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Henry",
557        ]));
558        let array_city = Arc::new(arrow::array::StringViewArray::from(vec![
559            "New York", "London", "Paris", "Tokyo", "Berlin", "Sydney", "Madrid", "Rome",
560        ]));
561
562        assert!(col_name.insert(batch_id, array_name.clone()).await.is_ok());
563        assert!(col_city.insert(batch_id, array_city.clone()).await.is_ok());
564
565        // build parquet metadata for predicate construction
566        let tmp_meta = tempfile::NamedTempFile::new().unwrap();
567        let mut writer =
568            ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
569        let batch =
570            RecordBatch::try_new(Arc::clone(&schema), vec![array_name, array_city]).unwrap();
571        writer.write(&batch).unwrap();
572        writer.close().unwrap();
573        let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
574        let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
575
576        // expression: name = "Bob" OR city = "Tokyo"
577        let expr_name: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
578            Arc::new(Column::new("name", 0)),
579            Operator::Eq,
580            Arc::new(Literal::new(ScalarValue::Utf8View(Some("Bob".to_string())))),
581        ));
582        let expr_city: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
583            Arc::new(Column::new("city", 1)),
584            Operator::Eq,
585            Arc::new(Literal::new(ScalarValue::Utf8View(Some(
586                "Tokyo".to_string(),
587            )))),
588        ));
589        let expr: Arc<dyn PhysicalExpr> =
590            Arc::new(BinaryExpr::new(expr_name, Operator::Or, expr_city));
591
592        let builder = FilterCandidateBuilder::new(expr, Arc::clone(&schema));
593        let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
594        let projection = candidate.projection(metadata.metadata());
595        let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
596
597        let selection = BooleanBuffer::new_set(batch_size);
598        let result = row_group
599            .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
600            .await
601            .unwrap()
602            .unwrap();
603
604        // Expected: row 1 (name="Bob"), row 3 (city="Tokyo") -> indices 1, 3
605        let expected = BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 3).into();
606        assert_eq!(result, expected);
607    }
608}