1use crate::cache::io::{ColumnAccessPath, ParquetIoContext};
5use crate::reader::{LiquidPredicate, extract_multi_column_or};
6use crate::sync::{Mutex, RwLock};
7use ahash::AHashMap;
8use arrow::array::{Array, ArrayRef, BooleanArray, RecordBatch};
9use arrow::buffer::BooleanBuffer;
10use arrow::compute::prep_null_mask_filter;
11use arrow_schema::{ArrowError, Field, Schema};
12use liquid_cache_common::IoMode;
13use liquid_cache_storage::cache::GetWithPredicateResult;
14use liquid_cache_storage::cache::squeeze_policies::SqueezePolicy;
15use liquid_cache_storage::cache::{CachePolicy, CacheStorage, CacheStorageBuilder};
16use parquet::arrow::arrow_reader::ArrowPredicate;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21mod id;
22mod io;
23#[cfg(target_os = "linux")]
24mod io_uring;
25mod stats;
26
27pub use id::{BatchID, ParquetArrayID};
28
29#[derive(Debug)]
31pub struct LiquidCachedColumn {
32 cache_store: Arc<CacheStorage>,
33 field: Arc<Field>,
34 column_path: ColumnAccessPath,
35}
36
37pub(crate) type LiquidCachedColumnRef = Arc<LiquidCachedColumn>;
38
39#[derive(Default, Debug)]
40struct ColumnMaps {
41 by_id: AHashMap<u64, LiquidCachedColumnRef>,
43 by_name: AHashMap<String, LiquidCachedColumnRef>,
44}
45
46#[derive(Debug)]
48pub enum InsertArrowArrayError {
49 AlreadyCached,
51}
52
53impl LiquidCachedColumn {
54 fn new(
55 field: Arc<Field>,
56 cache_store: Arc<CacheStorage>,
57 column_id: u64,
58 row_group_id: u64,
59 file_id: u64,
60 ) -> Self {
61 let column_path = ColumnAccessPath::new(file_id, row_group_id, column_id);
62 column_path.initialize_dir(cache_store.config().cache_root_dir());
63 Self {
64 field,
65 cache_store,
66 column_path,
67 }
68 }
69
70 fn entry_id(&self, batch_id: BatchID) -> ParquetArrayID {
72 self.column_path.entry_id(batch_id)
73 }
74
75 pub(crate) fn is_cached(&self, batch_id: BatchID) -> bool {
76 self.cache_store.is_cached(&self.entry_id(batch_id).into())
77 }
78
79 fn arrow_array_to_record_batch(&self, array: ArrayRef, field: &Arc<Field>) -> RecordBatch {
80 let schema = Arc::new(Schema::new(vec![field.clone()]));
81 RecordBatch::try_new(schema, vec![array]).unwrap()
82 }
83
84 pub fn field(&self) -> Arc<Field> {
86 self.field.clone()
87 }
88
89 pub async fn eval_predicate_with_filter(
91 &self,
92 batch_id: BatchID,
93 filter: &BooleanBuffer,
94 predicate: &mut LiquidPredicate,
95 ) -> Option<Result<BooleanArray, ArrowError>> {
96 let entry_id = self.entry_id(batch_id).into();
97 let result = self
98 .cache_store
99 .get_with_predicate(
100 &entry_id,
101 filter,
102 predicate.physical_expr_physical_column_index(),
103 )
104 .await?;
105
106 match result {
107 GetWithPredicateResult::Evaluated(buffer) => Some(Ok(buffer)),
108 GetWithPredicateResult::Filtered(array) => {
109 let record_batch = self.arrow_array_to_record_batch(array, &self.field);
110 let boolean_array = predicate.evaluate(record_batch).unwrap();
111 let predicate_filter = match boolean_array.null_count() {
112 0 => boolean_array,
113 _ => prep_null_mask_filter(&boolean_array),
114 };
115 Some(Ok(predicate_filter))
116 }
117 }
118 }
119
120 pub async fn get_arrow_array_with_filter(
122 &self,
123 batch_id: BatchID,
124 filter: &BooleanBuffer,
125 ) -> Option<ArrayRef> {
126 let entry_id = self.entry_id(batch_id).into();
127 let result = self
128 .cache_store
129 .get_with_selection(&entry_id, filter)
130 .await?;
131 result.ok()
132 }
133
134 #[cfg(test)]
135 pub(crate) async fn get_arrow_array_test_only(&self, batch_id: BatchID) -> Option<ArrayRef> {
136 let entry_id = self.entry_id(batch_id).into();
137 self.cache_store.get_arrow_array(&entry_id).await
138 }
139
140 pub async fn insert(
142 self: &Arc<Self>,
143 batch_id: BatchID,
144 array: ArrayRef,
145 ) -> Result<(), InsertArrowArrayError> {
146 if self.is_cached(batch_id) {
147 return Err(InsertArrowArrayError::AlreadyCached);
148 }
149 self.cache_store
150 .insert(self.entry_id(batch_id).into(), array)
151 .await;
152 Ok(())
153 }
154}
155
156#[derive(Debug)]
158pub struct LiquidCachedRowGroup {
159 columns: RwLock<ColumnMaps>,
160 cache_store: Arc<CacheStorage>,
161 row_group_id: u64,
162 file_id: u64,
163}
164
165impl LiquidCachedRowGroup {
166 fn new(cache_store: Arc<CacheStorage>, row_group_id: u64, file_id: u64) -> Self {
167 let cache_dir = cache_store
168 .config()
169 .cache_root_dir()
170 .join(format!("file_{file_id}"))
171 .join(format!("rg_{row_group_id}"));
172 std::fs::create_dir_all(&cache_dir).expect("Failed to create cache directory");
173 Self {
174 columns: RwLock::new(ColumnMaps::default()),
175 cache_store,
176 row_group_id,
177 file_id,
178 }
179 }
180
181 pub fn create_column(&self, column_id: u64, field: Arc<Field>) -> LiquidCachedColumnRef {
183 use std::collections::hash_map::Entry;
184 let mut columns = self.columns.write().unwrap();
185
186 match columns.by_id.entry(column_id) {
187 Entry::Occupied(entry) => {
188 let v = entry.get().clone();
189 assert_eq!(v.field, field);
190 columns
191 .by_name
192 .entry(v.field.name().to_string())
193 .or_insert_with(|| v.clone());
194 v
195 }
196 Entry::Vacant(entry) => {
197 let column = Arc::new(LiquidCachedColumn::new(
198 field,
199 self.cache_store.clone(),
200 column_id,
201 self.row_group_id,
202 self.file_id,
203 ));
204 let field_name = column.field.name().to_string();
205 entry.insert(column.clone());
206 if let Some(existing) = columns.by_name.insert(field_name, column.clone()) {
207 assert!(Arc::ptr_eq(&existing, &column), "column name collision");
208 }
209 column
210 }
211 }
212 }
213
214 pub fn batch_size(&self) -> usize {
216 self.cache_store.config().batch_size()
217 }
218
219 pub fn get_column(&self, column_id: u64) -> Option<LiquidCachedColumnRef> {
221 self.columns.read().unwrap().by_id.get(&column_id).cloned()
222 }
223
224 pub fn get_column_by_name(&self, column_name: &str) -> Option<LiquidCachedColumnRef> {
226 self.columns
227 .read()
228 .unwrap()
229 .by_name
230 .get(column_name)
231 .cloned()
232 }
233
234 #[fastrace::trace]
236 pub async fn evaluate_selection_with_predicate(
237 &self,
238 batch_id: BatchID,
239 selection: &BooleanBuffer,
240 predicate: &mut LiquidPredicate,
241 ) -> Option<Result<BooleanArray, ArrowError>> {
242 let column_ids = predicate.predicate_column_ids();
243
244 if column_ids.len() == 1 {
245 let column_id = column_ids[0];
247 let cache = self.get_column(column_id as u64)?;
248 return cache
249 .eval_predicate_with_filter(batch_id, selection, predicate)
250 .await;
251 } else if column_ids.len() >= 2 {
252 if let Some(column_exprs) =
254 extract_multi_column_or(predicate.physical_expr_physical_column_index())
255 {
256 let mut combined_buffer: Option<BooleanArray> = None;
257
258 for (col_name, expr) in column_exprs {
259 let column = self.get_column_by_name(col_name)?;
260 let entry_id = column.entry_id(batch_id).into();
261 let liquid_array = self.cache_store.try_read_liquid(&entry_id).await;
262 let liquid_array = match liquid_array {
263 None => {
264 combined_buffer = None;
265 break;
266 }
267 Some(array) => array,
268 };
269 let buffer =
270 if let Some(buffer) = liquid_array.try_eval_predicate(&expr, selection) {
271 buffer
272 } else {
273 combined_buffer = None;
274 break;
275 };
276
277 combined_buffer = Some(match combined_buffer {
278 None => buffer,
279 Some(existing) => {
280 arrow::compute::kernels::boolean::or_kleene(&existing, &buffer).ok()?
281 }
282 });
283 }
284
285 if let Some(result) = combined_buffer {
286 return Some(Ok(result));
287 }
288 }
289 }
290 let mut arrays = Vec::new();
292 let mut fields = Vec::new();
293 for column_id in column_ids {
294 let column = self.get_column(column_id as u64)?;
295 let array = column
296 .get_arrow_array_with_filter(batch_id, selection)
297 .await?;
298 arrays.push(array);
299 fields.push(column.field.clone());
300 }
301 let schema = Arc::new(Schema::new(fields));
302 let record_batch = RecordBatch::try_new(schema, arrays).unwrap();
303 let boolean_array = predicate.evaluate(record_batch).unwrap();
304 Some(Ok(boolean_array))
305 }
306}
307
308pub(crate) type LiquidCachedRowGroupRef = Arc<LiquidCachedRowGroup>;
309
310#[derive(Debug)]
312pub struct LiquidCachedFile {
313 row_groups: Mutex<AHashMap<u64, Arc<LiquidCachedRowGroup>>>,
314 cache_store: Arc<CacheStorage>,
315 file_id: u64,
316}
317
318impl LiquidCachedFile {
319 fn new(cache_store: Arc<CacheStorage>, file_id: u64) -> Self {
320 Self {
321 row_groups: Mutex::new(AHashMap::new()),
322 cache_store,
323 file_id,
324 }
325 }
326
327 pub fn row_group(&self, row_group_id: u64) -> LiquidCachedRowGroupRef {
329 let mut row_groups = self.row_groups.lock().unwrap();
330 let row_group = row_groups.entry(row_group_id).or_insert_with(|| {
331 Arc::new(LiquidCachedRowGroup::new(
332 self.cache_store.clone(),
333 row_group_id,
334 self.file_id,
335 ))
336 });
337 row_group.clone()
338 }
339
340 fn reset(&self) {
341 self.cache_store.reset();
342 }
343}
344
345pub(crate) type LiquidCachedFileRef = Arc<LiquidCachedFile>;
347
348#[derive(Debug)]
350pub struct LiquidCache {
351 files: Mutex<AHashMap<String, Arc<LiquidCachedFile>>>,
353
354 cache_store: Arc<CacheStorage>,
355
356 current_file_id: AtomicU64,
357}
358
359pub type LiquidCacheRef = Arc<LiquidCache>;
361
362impl LiquidCache {
363 pub fn new(
365 batch_size: usize,
366 max_cache_bytes: usize,
367 cache_dir: PathBuf,
368 cache_policy: Box<dyn CachePolicy>,
369 squeeze_policy: Box<dyn SqueezePolicy>,
370 io_mode: IoMode,
371 ) -> Self {
372 assert!(batch_size.is_power_of_two());
373 let io_context = Arc::new(ParquetIoContext::new(cache_dir.clone(), io_mode));
374 let cache_storage_builder = CacheStorageBuilder::new()
375 .with_batch_size(batch_size)
376 .with_max_cache_bytes(max_cache_bytes)
377 .with_cache_dir(cache_dir.clone())
378 .with_squeeze_policy(squeeze_policy)
379 .with_cache_policy(cache_policy)
380 .with_io_worker(io_context);
381 let cache_storage = cache_storage_builder.build();
382
383 LiquidCache {
384 files: Mutex::new(AHashMap::new()),
385 cache_store: cache_storage,
386 current_file_id: AtomicU64::new(0),
387 }
388 }
389
390 pub fn register_or_get_file(&self, file_path: String) -> LiquidCachedFileRef {
392 let mut files = self.files.lock().unwrap();
393 let value = files.entry(file_path.clone()).or_insert_with(|| {
394 let file_id = self.current_file_id.fetch_add(1, Ordering::Relaxed);
395 Arc::new(LiquidCachedFile::new(self.cache_store.clone(), file_id))
396 });
397 value.clone()
398 }
399
400 pub fn batch_size(&self) -> usize {
402 self.cache_store.config().batch_size()
403 }
404
405 pub fn max_cache_bytes(&self) -> usize {
407 self.cache_store.config().max_cache_bytes()
408 }
409
410 pub fn memory_usage_bytes(&self) -> usize {
412 self.cache_store.budget().memory_usage_bytes()
413 }
414
415 pub fn disk_usage_bytes(&self) -> usize {
417 self.cache_store.budget().disk_usage_bytes()
418 }
419
420 pub fn flush_trace(&self, to_file: impl AsRef<Path>) {
422 self.cache_store.tracer().flush(to_file);
423 }
424
425 pub fn enable_trace(&self) {
427 self.cache_store.tracer().enable();
428 }
429
430 pub fn disable_trace(&self) {
432 self.cache_store.tracer().disable();
433 }
434
435 pub unsafe fn reset(&self) {
441 let mut files = self.files.lock().unwrap();
442 for file in files.values_mut() {
443 file.reset();
444 }
445 self.cache_store.reset();
446 }
447
448 pub fn flush_data(&self) {
456 self.cache_store.flush_all_to_disk();
457 }
458
459 pub fn storage(&self) -> &Arc<CacheStorage> {
461 &self.cache_store
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468 use crate::cache::{LiquidCache, LiquidCachedRowGroupRef};
469 use crate::reader::FilterCandidateBuilder;
470 use arrow::array::Int32Array;
471 use arrow::buffer::BooleanBuffer;
472 use arrow::datatypes::{DataType, Field, Schema};
473 use arrow::record_batch::RecordBatch;
474 use datafusion::common::ScalarValue;
475 use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory;
476 use datafusion::logical_expr::Operator;
477 use datafusion::physical_expr::PhysicalExpr;
478 use datafusion::physical_expr::expressions::{BinaryExpr, Literal};
479 use datafusion::physical_plan::expressions::Column;
480 use liquid_cache_storage::cache::squeeze_policies::TranscodeSqueezeEvict;
481 use liquid_cache_storage::cache_policies::LiquidPolicy;
482 use parquet::arrow::ArrowWriter;
483 use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
484 use std::sync::Arc;
485
486 fn setup_cache(batch_size: usize) -> LiquidCachedRowGroupRef {
487 let tmp_dir = tempfile::tempdir().unwrap();
488 let cache = LiquidCache::new(
489 batch_size,
490 usize::MAX,
491 tmp_dir.path().to_path_buf(),
492 Box::new(LiquidPolicy::new()),
493 Box::new(TranscodeSqueezeEvict),
494 IoMode::Uring,
495 );
496 let file = cache.register_or_get_file("test".to_string());
497 file.row_group(0)
498 }
499
500 #[tokio::test]
501 async fn evaluate_or_on_cached_columns() {
502 let batch_size = 4;
503 let row_group = setup_cache(batch_size);
504
505 let schema = Arc::new(Schema::new(vec![
506 Field::new("a", DataType::Int32, false),
507 Field::new("b", DataType::Int32, false),
508 ]));
509
510 let col_a = row_group.create_column(0, Arc::new(Field::new("a", DataType::Int32, false)));
511 let col_b = row_group.create_column(1, Arc::new(Field::new("b", DataType::Int32, false)));
512
513 let batch_id = BatchID::from_row_id(0, batch_size);
514
515 let array_a = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
516 let array_b = Arc::new(Int32Array::from(vec![10, 20, 30, 40]));
517
518 assert!(col_a.insert(batch_id, array_a.clone()).await.is_ok());
519 assert!(col_b.insert(batch_id, array_b.clone()).await.is_ok());
520
521 let tmp_meta = tempfile::NamedTempFile::new().unwrap();
523 let mut writer =
524 ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
525 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array_a, array_b]).unwrap();
526 writer.write(&batch).unwrap();
527 writer.close().unwrap();
528 let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
529 let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
530
531 let expr_a: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
533 Arc::new(Column::new("a", 0)),
534 Operator::Eq,
535 Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
536 ));
537 let expr_b: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
538 Arc::new(Column::new("b", 1)),
539 Operator::Eq,
540 Arc::new(Literal::new(ScalarValue::Int32(Some(20)))),
541 ));
542 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(expr_a, Operator::Or, expr_b));
543
544 let adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
545 let builder = FilterCandidateBuilder::new(
546 expr,
547 Arc::clone(&schema),
548 Arc::clone(&schema),
549 adapter_factory,
550 );
551 let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
552 let projection = candidate.projection(metadata.metadata());
553 let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
554
555 let selection = BooleanBuffer::new_set(batch_size);
556 let result = row_group
557 .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
558 .await
559 .unwrap()
560 .unwrap();
561
562 let expected = BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 2).into();
563 assert_eq!(result, expected);
564 }
565
566 #[tokio::test]
567 async fn evaluate_three_column_or() {
568 let batch_size = 8;
569 let row_group = setup_cache(batch_size);
570
571 let schema = Arc::new(Schema::new(vec![
572 Field::new("a", DataType::Int32, false),
573 Field::new("b", DataType::Int32, false),
574 Field::new("c", DataType::Int32, false),
575 ]));
576
577 let col_a = row_group.create_column(0, Arc::new(Field::new("a", DataType::Int32, false)));
578 let col_b = row_group.create_column(1, Arc::new(Field::new("b", DataType::Int32, false)));
579 let col_c = row_group.create_column(2, Arc::new(Field::new("c", DataType::Int32, false)));
580
581 let batch_id = BatchID::from_row_id(0, batch_size);
582
583 let array_a = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]));
584 let array_b = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50, 60, 70, 80]));
585 let array_c = Arc::new(Int32Array::from(vec![
586 100, 200, 300, 400, 500, 600, 700, 800,
587 ]));
588
589 assert!(col_a.insert(batch_id, array_a.clone()).await.is_ok());
590 assert!(col_b.insert(batch_id, array_b.clone()).await.is_ok());
591 assert!(col_c.insert(batch_id, array_c.clone()).await.is_ok());
592
593 let tmp_meta = tempfile::NamedTempFile::new().unwrap();
595 let mut writer =
596 ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
597 let batch =
598 RecordBatch::try_new(Arc::clone(&schema), vec![array_a, array_b, array_c]).unwrap();
599 writer.write(&batch).unwrap();
600 writer.close().unwrap();
601 let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
602 let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
603
604 let expr_a: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
606 Arc::new(Column::new("a", 0)),
607 Operator::Eq,
608 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
609 ));
610 let expr_b: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
611 Arc::new(Column::new("b", 1)),
612 Operator::Eq,
613 Arc::new(Literal::new(ScalarValue::Int32(Some(40)))),
614 ));
615 let expr_c: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
616 Arc::new(Column::new("c", 2)),
617 Operator::Eq,
618 Arc::new(Literal::new(ScalarValue::Int32(Some(600)))),
619 ));
620
621 let expr_ab = Arc::new(BinaryExpr::new(expr_a, Operator::Or, expr_b));
623 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(expr_ab, Operator::Or, expr_c));
624
625 let adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
626 let builder = FilterCandidateBuilder::new(
627 expr,
628 Arc::clone(&schema),
629 Arc::clone(&schema),
630 adapter_factory,
631 );
632 let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
633 let projection = candidate.projection(metadata.metadata());
634 let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
635
636 let selection = BooleanBuffer::new_set(batch_size);
637 let result = row_group
638 .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
639 .await
640 .unwrap()
641 .unwrap();
642
643 let expected =
645 BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 3 || i == 5).into();
646 assert_eq!(result, expected);
647 }
648
649 #[tokio::test]
650 async fn evaluate_string_column_or() {
651 let batch_size = 8;
652 let row_group = setup_cache(batch_size);
653
654 let schema = Arc::new(Schema::new(vec![
655 Field::new("name", DataType::Utf8View, false),
656 Field::new("city", DataType::Utf8View, false),
657 ]));
658
659 let col_name =
660 row_group.create_column(0, Arc::new(Field::new("name", DataType::Utf8View, false)));
661 let col_city =
662 row_group.create_column(1, Arc::new(Field::new("city", DataType::Utf8View, false)));
663
664 let batch_id = BatchID::from_row_id(0, batch_size);
665
666 let array_name = Arc::new(arrow::array::StringViewArray::from(vec![
667 "Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Henry",
668 ]));
669 let array_city = Arc::new(arrow::array::StringViewArray::from(vec![
670 "New York", "London", "Paris", "Tokyo", "Berlin", "Sydney", "Madrid", "Rome",
671 ]));
672
673 assert!(col_name.insert(batch_id, array_name.clone()).await.is_ok());
674 assert!(col_city.insert(batch_id, array_city.clone()).await.is_ok());
675
676 let tmp_meta = tempfile::NamedTempFile::new().unwrap();
678 let mut writer =
679 ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
680 let batch =
681 RecordBatch::try_new(Arc::clone(&schema), vec![array_name, array_city]).unwrap();
682 writer.write(&batch).unwrap();
683 writer.close().unwrap();
684 let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
685 let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
686
687 let expr_name: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
689 Arc::new(Column::new("name", 0)),
690 Operator::Eq,
691 Arc::new(Literal::new(ScalarValue::Utf8View(Some("Bob".to_string())))),
692 ));
693 let expr_city: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
694 Arc::new(Column::new("city", 1)),
695 Operator::Eq,
696 Arc::new(Literal::new(ScalarValue::Utf8View(Some(
697 "Tokyo".to_string(),
698 )))),
699 ));
700 let expr: Arc<dyn PhysicalExpr> =
701 Arc::new(BinaryExpr::new(expr_name, Operator::Or, expr_city));
702
703 let adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
704 let builder = FilterCandidateBuilder::new(
705 expr,
706 Arc::clone(&schema),
707 Arc::clone(&schema),
708 adapter_factory,
709 );
710 let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
711 let projection = candidate.projection(metadata.metadata());
712 let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
713
714 let selection = BooleanBuffer::new_set(batch_size);
715 let result = row_group
716 .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
717 .await
718 .unwrap()
719 .unwrap();
720
721 let expected = BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 3).into();
723 assert_eq!(result, expected);
724 }
725}