liquid_cache_parquet/cache/
mod.rs

1//! This module contains the cache implementation for the Parquet reader.
2//!
3
4use crate::cache::io::{ColumnAccessPath, ParquetIoContext};
5use crate::reader::{LiquidPredicate, extract_multi_column_or};
6use crate::sync::{Mutex, RwLock};
7use ahash::AHashMap;
8use arrow::array::{Array, ArrayRef, BooleanArray, RecordBatch};
9use arrow::buffer::BooleanBuffer;
10use arrow::compute::prep_null_mask_filter;
11use arrow_schema::{ArrowError, Field, Schema};
12use liquid_cache_common::IoMode;
13use liquid_cache_storage::cache::GetWithPredicateResult;
14use liquid_cache_storage::cache::squeeze_policies::SqueezePolicy;
15use liquid_cache_storage::cache::{CachePolicy, CacheStorage, CacheStorageBuilder};
16use parquet::arrow::arrow_reader::ArrowPredicate;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21mod id;
22mod io;
23#[cfg(target_os = "linux")]
24mod io_uring;
25mod stats;
26
27pub use id::{BatchID, ParquetArrayID};
28
29/// A column in the cache.
30#[derive(Debug)]
31pub struct LiquidCachedColumn {
32    cache_store: Arc<CacheStorage>,
33    field: Arc<Field>,
34    column_path: ColumnAccessPath,
35}
36
37pub(crate) type LiquidCachedColumnRef = Arc<LiquidCachedColumn>;
38
39#[derive(Default, Debug)]
40struct ColumnMaps {
41    // invariant: Arc::ptr_eq(map[field.name()], map[field.id()])
42    by_id: AHashMap<u64, LiquidCachedColumnRef>,
43    by_name: AHashMap<String, LiquidCachedColumnRef>,
44}
45
46/// Error type for inserting an arrow array into the cache.
47#[derive(Debug)]
48pub enum InsertArrowArrayError {
49    /// The array is already cached.
50    AlreadyCached,
51}
52
53impl LiquidCachedColumn {
54    fn new(
55        field: Arc<Field>,
56        cache_store: Arc<CacheStorage>,
57        column_id: u64,
58        row_group_id: u64,
59        file_id: u64,
60    ) -> Self {
61        let column_path = ColumnAccessPath::new(file_id, row_group_id, column_id);
62        column_path.initialize_dir(cache_store.config().cache_root_dir());
63        Self {
64            field,
65            cache_store,
66            column_path,
67        }
68    }
69
70    /// row_id must be on a batch boundary.
71    fn entry_id(&self, batch_id: BatchID) -> ParquetArrayID {
72        self.column_path.entry_id(batch_id)
73    }
74
75    pub(crate) fn is_cached(&self, batch_id: BatchID) -> bool {
76        self.cache_store.is_cached(&self.entry_id(batch_id).into())
77    }
78
79    fn arrow_array_to_record_batch(&self, array: ArrayRef, field: &Arc<Field>) -> RecordBatch {
80        let schema = Arc::new(Schema::new(vec![field.clone()]));
81        RecordBatch::try_new(schema, vec![array]).unwrap()
82    }
83
84    /// Returns the Arrow field metadata for this cached column.
85    pub fn field(&self) -> Arc<Field> {
86        self.field.clone()
87    }
88
89    /// Evaluates a predicate on a cached column.
90    pub async fn eval_predicate_with_filter(
91        &self,
92        batch_id: BatchID,
93        filter: &BooleanBuffer,
94        predicate: &mut LiquidPredicate,
95    ) -> Option<Result<BooleanArray, ArrowError>> {
96        let entry_id = self.entry_id(batch_id).into();
97        let result = self
98            .cache_store
99            .get_with_predicate(
100                &entry_id,
101                filter,
102                predicate.physical_expr_physical_column_index(),
103            )
104            .await?;
105
106        match result {
107            GetWithPredicateResult::Evaluated(buffer) => Some(Ok(buffer)),
108            GetWithPredicateResult::Filtered(array) => {
109                let record_batch = self.arrow_array_to_record_batch(array, &self.field);
110                let boolean_array = predicate.evaluate(record_batch).unwrap();
111                let predicate_filter = match boolean_array.null_count() {
112                    0 => boolean_array,
113                    _ => prep_null_mask_filter(&boolean_array),
114                };
115                Some(Ok(predicate_filter))
116            }
117        }
118    }
119
120    /// Get an arrow array with a filter applied.
121    pub async fn get_arrow_array_with_filter(
122        &self,
123        batch_id: BatchID,
124        filter: &BooleanBuffer,
125    ) -> Option<ArrayRef> {
126        let entry_id = self.entry_id(batch_id).into();
127        let result = self
128            .cache_store
129            .get_with_selection(&entry_id, filter)
130            .await?;
131        result.ok()
132    }
133
134    #[cfg(test)]
135    pub(crate) async fn get_arrow_array_test_only(&self, batch_id: BatchID) -> Option<ArrayRef> {
136        let entry_id = self.entry_id(batch_id).into();
137        self.cache_store.get_arrow_array(&entry_id).await
138    }
139
140    /// Insert an array into the cache.
141    pub async fn insert(
142        self: &Arc<Self>,
143        batch_id: BatchID,
144        array: ArrayRef,
145    ) -> Result<(), InsertArrowArrayError> {
146        if self.is_cached(batch_id) {
147            return Err(InsertArrowArrayError::AlreadyCached);
148        }
149        self.cache_store
150            .insert(self.entry_id(batch_id).into(), array)
151            .await;
152        Ok(())
153    }
154}
155
156/// A row group in the cache.
157#[derive(Debug)]
158pub struct LiquidCachedRowGroup {
159    columns: RwLock<ColumnMaps>,
160    cache_store: Arc<CacheStorage>,
161    row_group_id: u64,
162    file_id: u64,
163}
164
165impl LiquidCachedRowGroup {
166    fn new(cache_store: Arc<CacheStorage>, row_group_id: u64, file_id: u64) -> Self {
167        let cache_dir = cache_store
168            .config()
169            .cache_root_dir()
170            .join(format!("file_{file_id}"))
171            .join(format!("rg_{row_group_id}"));
172        std::fs::create_dir_all(&cache_dir).expect("Failed to create cache directory");
173        Self {
174            columns: RwLock::new(ColumnMaps::default()),
175            cache_store,
176            row_group_id,
177            file_id,
178        }
179    }
180
181    /// Create a column in the row group.
182    pub fn create_column(&self, column_id: u64, field: Arc<Field>) -> LiquidCachedColumnRef {
183        use std::collections::hash_map::Entry;
184        let mut columns = self.columns.write().unwrap();
185
186        match columns.by_id.entry(column_id) {
187            Entry::Occupied(entry) => {
188                let v = entry.get().clone();
189                assert_eq!(v.field, field);
190                columns
191                    .by_name
192                    .entry(v.field.name().to_string())
193                    .or_insert_with(|| v.clone());
194                v
195            }
196            Entry::Vacant(entry) => {
197                let column = Arc::new(LiquidCachedColumn::new(
198                    field,
199                    self.cache_store.clone(),
200                    column_id,
201                    self.row_group_id,
202                    self.file_id,
203                ));
204                let field_name = column.field.name().to_string();
205                entry.insert(column.clone());
206                if let Some(existing) = columns.by_name.insert(field_name, column.clone()) {
207                    assert!(Arc::ptr_eq(&existing, &column), "column name collision");
208                }
209                column
210            }
211        }
212    }
213
214    /// Returns the batch size configured for this cached row group.
215    pub fn batch_size(&self) -> usize {
216        self.cache_store.config().batch_size()
217    }
218
219    /// Get a column from the row group.
220    pub fn get_column(&self, column_id: u64) -> Option<LiquidCachedColumnRef> {
221        self.columns.read().unwrap().by_id.get(&column_id).cloned()
222    }
223
224    /// Get a column from the row group by its field name.
225    pub fn get_column_by_name(&self, column_name: &str) -> Option<LiquidCachedColumnRef> {
226        self.columns
227            .read()
228            .unwrap()
229            .by_name
230            .get(column_name)
231            .cloned()
232    }
233
234    /// Evaluate a predicate on a row group.
235    #[fastrace::trace]
236    pub async fn evaluate_selection_with_predicate(
237        &self,
238        batch_id: BatchID,
239        selection: &BooleanBuffer,
240        predicate: &mut LiquidPredicate,
241    ) -> Option<Result<BooleanArray, ArrowError>> {
242        let column_ids = predicate.predicate_column_ids();
243
244        if column_ids.len() == 1 {
245            // If we only have one column, we can short-circuit and try to evaluate the predicate on encoded data.
246            let column_id = column_ids[0];
247            let cache = self.get_column(column_id as u64)?;
248            return cache
249                .eval_predicate_with_filter(batch_id, selection, predicate)
250                .await;
251        } else if column_ids.len() >= 2 {
252            // Try to extract multiple column-literal expressions from OR structure
253            if let Some(column_exprs) =
254                extract_multi_column_or(predicate.physical_expr_physical_column_index())
255            {
256                let mut combined_buffer: Option<BooleanArray> = None;
257
258                for (col_name, expr) in column_exprs {
259                    let column = self.get_column_by_name(col_name)?;
260                    let entry_id = column.entry_id(batch_id).into();
261                    let liquid_array = self.cache_store.try_read_liquid(&entry_id).await;
262                    let liquid_array = match liquid_array {
263                        None => {
264                            combined_buffer = None;
265                            break;
266                        }
267                        Some(array) => array,
268                    };
269                    let buffer =
270                        if let Some(buffer) = liquid_array.try_eval_predicate(&expr, selection) {
271                            buffer
272                        } else {
273                            combined_buffer = None;
274                            break;
275                        };
276
277                    combined_buffer = Some(match combined_buffer {
278                        None => buffer,
279                        Some(existing) => {
280                            arrow::compute::kernels::boolean::or_kleene(&existing, &buffer).ok()?
281                        }
282                    });
283                }
284
285                if let Some(result) = combined_buffer {
286                    return Some(Ok(result));
287                }
288            }
289        }
290        // Otherwise, we need to first convert the data into arrow arrays.
291        let mut arrays = Vec::new();
292        let mut fields = Vec::new();
293        for column_id in column_ids {
294            let column = self.get_column(column_id as u64)?;
295            let array = column
296                .get_arrow_array_with_filter(batch_id, selection)
297                .await?;
298            arrays.push(array);
299            fields.push(column.field.clone());
300        }
301        let schema = Arc::new(Schema::new(fields));
302        let record_batch = RecordBatch::try_new(schema, arrays).unwrap();
303        let boolean_array = predicate.evaluate(record_batch).unwrap();
304        Some(Ok(boolean_array))
305    }
306}
307
308pub(crate) type LiquidCachedRowGroupRef = Arc<LiquidCachedRowGroup>;
309
310/// A file in the cache.
311#[derive(Debug)]
312pub struct LiquidCachedFile {
313    row_groups: Mutex<AHashMap<u64, Arc<LiquidCachedRowGroup>>>,
314    cache_store: Arc<CacheStorage>,
315    file_id: u64,
316}
317
318impl LiquidCachedFile {
319    fn new(cache_store: Arc<CacheStorage>, file_id: u64) -> Self {
320        Self {
321            row_groups: Mutex::new(AHashMap::new()),
322            cache_store,
323            file_id,
324        }
325    }
326
327    /// Get a row group from the cache.
328    pub fn row_group(&self, row_group_id: u64) -> LiquidCachedRowGroupRef {
329        let mut row_groups = self.row_groups.lock().unwrap();
330        let row_group = row_groups.entry(row_group_id).or_insert_with(|| {
331            Arc::new(LiquidCachedRowGroup::new(
332                self.cache_store.clone(),
333                row_group_id,
334                self.file_id,
335            ))
336        });
337        row_group.clone()
338    }
339
340    fn reset(&self) {
341        self.cache_store.reset();
342    }
343}
344
345/// A reference to a cached file.
346pub(crate) type LiquidCachedFileRef = Arc<LiquidCachedFile>;
347
348/// The main cache structure.
349#[derive(Debug)]
350pub struct LiquidCache {
351    /// Files -> RowGroups -> Columns -> Batches
352    files: Mutex<AHashMap<String, Arc<LiquidCachedFile>>>,
353
354    cache_store: Arc<CacheStorage>,
355
356    current_file_id: AtomicU64,
357}
358
359/// A reference to the main cache structure.
360pub type LiquidCacheRef = Arc<LiquidCache>;
361
362impl LiquidCache {
363    /// Create a new cache
364    pub fn new(
365        batch_size: usize,
366        max_cache_bytes: usize,
367        cache_dir: PathBuf,
368        cache_policy: Box<dyn CachePolicy>,
369        squeeze_policy: Box<dyn SqueezePolicy>,
370        io_mode: IoMode,
371    ) -> Self {
372        assert!(batch_size.is_power_of_two());
373        let io_context = Arc::new(ParquetIoContext::new(cache_dir.clone(), io_mode));
374        let cache_storage_builder = CacheStorageBuilder::new()
375            .with_batch_size(batch_size)
376            .with_max_cache_bytes(max_cache_bytes)
377            .with_cache_dir(cache_dir.clone())
378            .with_squeeze_policy(squeeze_policy)
379            .with_cache_policy(cache_policy)
380            .with_io_worker(io_context);
381        let cache_storage = cache_storage_builder.build();
382
383        LiquidCache {
384            files: Mutex::new(AHashMap::new()),
385            cache_store: cache_storage,
386            current_file_id: AtomicU64::new(0),
387        }
388    }
389
390    /// Register a file in the cache.
391    pub fn register_or_get_file(&self, file_path: String) -> LiquidCachedFileRef {
392        let mut files = self.files.lock().unwrap();
393        let value = files.entry(file_path.clone()).or_insert_with(|| {
394            let file_id = self.current_file_id.fetch_add(1, Ordering::Relaxed);
395            Arc::new(LiquidCachedFile::new(self.cache_store.clone(), file_id))
396        });
397        value.clone()
398    }
399
400    /// Get the batch size of the cache.
401    pub fn batch_size(&self) -> usize {
402        self.cache_store.config().batch_size()
403    }
404
405    /// Get the max cache bytes of the cache.
406    pub fn max_cache_bytes(&self) -> usize {
407        self.cache_store.config().max_cache_bytes()
408    }
409
410    /// Get the memory usage of the cache in bytes.
411    pub fn memory_usage_bytes(&self) -> usize {
412        self.cache_store.budget().memory_usage_bytes()
413    }
414
415    /// Get the disk usage of the cache in bytes.
416    pub fn disk_usage_bytes(&self) -> usize {
417        self.cache_store.budget().disk_usage_bytes()
418    }
419
420    /// Flush the cache trace to a file.
421    pub fn flush_trace(&self, to_file: impl AsRef<Path>) {
422        self.cache_store.tracer().flush(to_file);
423    }
424
425    /// Enable the cache trace.
426    pub fn enable_trace(&self) {
427        self.cache_store.tracer().enable();
428    }
429
430    /// Disable the cache trace.
431    pub fn disable_trace(&self) {
432        self.cache_store.tracer().disable();
433    }
434
435    /// Reset the cache.
436    ///
437    /// # Safety
438    /// This is unsafe because resetting the cache while other threads are using the cache may cause undefined behavior.
439    /// You should only call this when no one else is using the cache.
440    pub unsafe fn reset(&self) {
441        let mut files = self.files.lock().unwrap();
442        for file in files.values_mut() {
443            file.reset();
444        }
445        self.cache_store.reset();
446    }
447
448    /// Flush all memory-based entries to disk while preserving their format.
449    /// Arrow entries become DiskArrow, Liquid entries become DiskLiquid.
450    /// Entries already on disk are left unchanged.
451    ///
452    /// This is for admin use only.
453    /// This has no guarantees that some new entry will not be inserted in the meantime, or some entries are promoted to memory again.
454    /// You mostly want to use this when no one else is using the cache.
455    pub fn flush_data(&self) {
456        self.cache_store.flush_all_to_disk();
457    }
458
459    /// Get the storage of the cache.
460    pub fn storage(&self) -> &Arc<CacheStorage> {
461        &self.cache_store
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468    use crate::cache::{LiquidCache, LiquidCachedRowGroupRef};
469    use crate::reader::FilterCandidateBuilder;
470    use arrow::array::Int32Array;
471    use arrow::buffer::BooleanBuffer;
472    use arrow::datatypes::{DataType, Field, Schema};
473    use arrow::record_batch::RecordBatch;
474    use datafusion::common::ScalarValue;
475    use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory;
476    use datafusion::logical_expr::Operator;
477    use datafusion::physical_expr::PhysicalExpr;
478    use datafusion::physical_expr::expressions::{BinaryExpr, Literal};
479    use datafusion::physical_plan::expressions::Column;
480    use liquid_cache_storage::cache::squeeze_policies::TranscodeSqueezeEvict;
481    use liquid_cache_storage::cache_policies::LiquidPolicy;
482    use parquet::arrow::ArrowWriter;
483    use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
484    use std::sync::Arc;
485
486    fn setup_cache(batch_size: usize) -> LiquidCachedRowGroupRef {
487        let tmp_dir = tempfile::tempdir().unwrap();
488        let cache = LiquidCache::new(
489            batch_size,
490            usize::MAX,
491            tmp_dir.path().to_path_buf(),
492            Box::new(LiquidPolicy::new()),
493            Box::new(TranscodeSqueezeEvict),
494            IoMode::Uring,
495        );
496        let file = cache.register_or_get_file("test".to_string());
497        file.row_group(0)
498    }
499
500    #[tokio::test]
501    async fn evaluate_or_on_cached_columns() {
502        let batch_size = 4;
503        let row_group = setup_cache(batch_size);
504
505        let schema = Arc::new(Schema::new(vec![
506            Field::new("a", DataType::Int32, false),
507            Field::new("b", DataType::Int32, false),
508        ]));
509
510        let col_a = row_group.create_column(0, Arc::new(Field::new("a", DataType::Int32, false)));
511        let col_b = row_group.create_column(1, Arc::new(Field::new("b", DataType::Int32, false)));
512
513        let batch_id = BatchID::from_row_id(0, batch_size);
514
515        let array_a = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
516        let array_b = Arc::new(Int32Array::from(vec![10, 20, 30, 40]));
517
518        assert!(col_a.insert(batch_id, array_a.clone()).await.is_ok());
519        assert!(col_b.insert(batch_id, array_b.clone()).await.is_ok());
520
521        // build parquet metadata for predicate construction
522        let tmp_meta = tempfile::NamedTempFile::new().unwrap();
523        let mut writer =
524            ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
525        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array_a, array_b]).unwrap();
526        writer.write(&batch).unwrap();
527        writer.close().unwrap();
528        let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
529        let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
530
531        // expression a = 3 OR b = 20
532        let expr_a: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
533            Arc::new(Column::new("a", 0)),
534            Operator::Eq,
535            Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
536        ));
537        let expr_b: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
538            Arc::new(Column::new("b", 1)),
539            Operator::Eq,
540            Arc::new(Literal::new(ScalarValue::Int32(Some(20)))),
541        ));
542        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(expr_a, Operator::Or, expr_b));
543
544        let adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
545        let builder = FilterCandidateBuilder::new(
546            expr,
547            Arc::clone(&schema),
548            Arc::clone(&schema),
549            adapter_factory,
550        );
551        let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
552        let projection = candidate.projection(metadata.metadata());
553        let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
554
555        let selection = BooleanBuffer::new_set(batch_size);
556        let result = row_group
557            .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
558            .await
559            .unwrap()
560            .unwrap();
561
562        let expected = BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 2).into();
563        assert_eq!(result, expected);
564    }
565
566    #[tokio::test]
567    async fn evaluate_three_column_or() {
568        let batch_size = 8;
569        let row_group = setup_cache(batch_size);
570
571        let schema = Arc::new(Schema::new(vec![
572            Field::new("a", DataType::Int32, false),
573            Field::new("b", DataType::Int32, false),
574            Field::new("c", DataType::Int32, false),
575        ]));
576
577        let col_a = row_group.create_column(0, Arc::new(Field::new("a", DataType::Int32, false)));
578        let col_b = row_group.create_column(1, Arc::new(Field::new("b", DataType::Int32, false)));
579        let col_c = row_group.create_column(2, Arc::new(Field::new("c", DataType::Int32, false)));
580
581        let batch_id = BatchID::from_row_id(0, batch_size);
582
583        let array_a = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]));
584        let array_b = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50, 60, 70, 80]));
585        let array_c = Arc::new(Int32Array::from(vec![
586            100, 200, 300, 400, 500, 600, 700, 800,
587        ]));
588
589        assert!(col_a.insert(batch_id, array_a.clone()).await.is_ok());
590        assert!(col_b.insert(batch_id, array_b.clone()).await.is_ok());
591        assert!(col_c.insert(batch_id, array_c.clone()).await.is_ok());
592
593        // build parquet metadata for predicate construction
594        let tmp_meta = tempfile::NamedTempFile::new().unwrap();
595        let mut writer =
596            ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
597        let batch =
598            RecordBatch::try_new(Arc::clone(&schema), vec![array_a, array_b, array_c]).unwrap();
599        writer.write(&batch).unwrap();
600        writer.close().unwrap();
601        let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
602        let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
603
604        // expression: a = 2 OR b = 40 OR c = 600
605        let expr_a: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
606            Arc::new(Column::new("a", 0)),
607            Operator::Eq,
608            Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
609        ));
610        let expr_b: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
611            Arc::new(Column::new("b", 1)),
612            Operator::Eq,
613            Arc::new(Literal::new(ScalarValue::Int32(Some(40)))),
614        ));
615        let expr_c: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
616            Arc::new(Column::new("c", 2)),
617            Operator::Eq,
618            Arc::new(Literal::new(ScalarValue::Int32(Some(600)))),
619        ));
620
621        // Build nested OR: (a = 2 OR b = 40) OR c = 600
622        let expr_ab = Arc::new(BinaryExpr::new(expr_a, Operator::Or, expr_b));
623        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(expr_ab, Operator::Or, expr_c));
624
625        let adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
626        let builder = FilterCandidateBuilder::new(
627            expr,
628            Arc::clone(&schema),
629            Arc::clone(&schema),
630            adapter_factory,
631        );
632        let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
633        let projection = candidate.projection(metadata.metadata());
634        let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
635
636        let selection = BooleanBuffer::new_set(batch_size);
637        let result = row_group
638            .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
639            .await
640            .unwrap()
641            .unwrap();
642
643        // Expected: row 1 (a=2), row 3 (b=40), row 5 (c=600) -> indices 1, 3, 5
644        let expected =
645            BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 3 || i == 5).into();
646        assert_eq!(result, expected);
647    }
648
649    #[tokio::test]
650    async fn evaluate_string_column_or() {
651        let batch_size = 8;
652        let row_group = setup_cache(batch_size);
653
654        let schema = Arc::new(Schema::new(vec![
655            Field::new("name", DataType::Utf8View, false),
656            Field::new("city", DataType::Utf8View, false),
657        ]));
658
659        let col_name =
660            row_group.create_column(0, Arc::new(Field::new("name", DataType::Utf8View, false)));
661        let col_city =
662            row_group.create_column(1, Arc::new(Field::new("city", DataType::Utf8View, false)));
663
664        let batch_id = BatchID::from_row_id(0, batch_size);
665
666        let array_name = Arc::new(arrow::array::StringViewArray::from(vec![
667            "Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Henry",
668        ]));
669        let array_city = Arc::new(arrow::array::StringViewArray::from(vec![
670            "New York", "London", "Paris", "Tokyo", "Berlin", "Sydney", "Madrid", "Rome",
671        ]));
672
673        assert!(col_name.insert(batch_id, array_name.clone()).await.is_ok());
674        assert!(col_city.insert(batch_id, array_city.clone()).await.is_ok());
675
676        // build parquet metadata for predicate construction
677        let tmp_meta = tempfile::NamedTempFile::new().unwrap();
678        let mut writer =
679            ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
680        let batch =
681            RecordBatch::try_new(Arc::clone(&schema), vec![array_name, array_city]).unwrap();
682        writer.write(&batch).unwrap();
683        writer.close().unwrap();
684        let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
685        let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
686
687        // expression: name = "Bob" OR city = "Tokyo"
688        let expr_name: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
689            Arc::new(Column::new("name", 0)),
690            Operator::Eq,
691            Arc::new(Literal::new(ScalarValue::Utf8View(Some("Bob".to_string())))),
692        ));
693        let expr_city: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
694            Arc::new(Column::new("city", 1)),
695            Operator::Eq,
696            Arc::new(Literal::new(ScalarValue::Utf8View(Some(
697                "Tokyo".to_string(),
698            )))),
699        ));
700        let expr: Arc<dyn PhysicalExpr> =
701            Arc::new(BinaryExpr::new(expr_name, Operator::Or, expr_city));
702
703        let adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
704        let builder = FilterCandidateBuilder::new(
705            expr,
706            Arc::clone(&schema),
707            Arc::clone(&schema),
708            adapter_factory,
709        );
710        let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
711        let projection = candidate.projection(metadata.metadata());
712        let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
713
714        let selection = BooleanBuffer::new_set(batch_size);
715        let result = row_group
716            .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
717            .await
718            .unwrap()
719            .unwrap();
720
721        // Expected: row 1 (name="Bob"), row 3 (city="Tokyo") -> indices 1, 3
722        let expected = BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 3).into();
723        assert_eq!(result, expected);
724    }
725}