1use crate::io::ParquetIoContext;
5use crate::reader::{LiquidPredicate, extract_multi_column_or};
6use crate::sync::Mutex;
7use ahash::AHashMap;
8use arrow::array::{BooleanArray, RecordBatch};
9use arrow::buffer::BooleanBuffer;
10use arrow_schema::{ArrowError, Field, Schema, SchemaRef};
11use liquid_cache::cache::squeeze_policies::SqueezePolicy;
12use liquid_cache::cache::{
13 CachePolicy, EventTrace, HydrationPolicy, LiquidCache, LiquidCacheBuilder,
14};
15use liquid_cache_common::IoMode;
16use parquet::arrow::arrow_reader::ArrowPredicate;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21mod column;
22mod id;
23mod stats;
24
25pub(crate) use column::InsertArrowArrayError;
26pub use column::{CachedColumn, CachedColumnRef};
27pub(crate) use id::ColumnAccessPath;
28pub use id::{BatchID, ParquetArrayID};
29
30#[derive(Default, Debug)]
31struct ColumnMaps {
32 by_id: AHashMap<u64, CachedColumnRef>,
34 by_name: AHashMap<String, CachedColumnRef>,
35}
36
37#[derive(Debug)]
39pub struct CachedRowGroup {
40 columns: ColumnMaps,
41 cache_store: Arc<LiquidCache>,
42}
43
44impl CachedRowGroup {
45 fn new(
49 cache_store: Arc<LiquidCache>,
50 row_group_idx: u64,
51 file_idx: u64,
52 columns: &[(u64, Arc<Field>, bool)],
53 ) -> Self {
54 let cache_dir = cache_store
55 .config()
56 .cache_root_dir()
57 .join(format!("file_{file_idx}"))
58 .join(format!("rg_{row_group_idx}"));
59 std::fs::create_dir_all(&cache_dir).expect("Failed to create cache directory");
60
61 let mut column_maps = ColumnMaps::default();
62 for (column_id, field, is_predicate_column) in columns {
63 let column_access_path = ColumnAccessPath::new(file_idx, row_group_idx, *column_id);
64 let column = Arc::new(CachedColumn::new(
65 Arc::clone(field),
66 Arc::clone(&cache_store),
67 column_access_path,
68 *is_predicate_column,
69 ));
70 column_maps.by_id.insert(*column_id, column.clone());
71 column_maps.by_name.insert(field.name().to_string(), column);
72 }
73
74 Self {
75 columns: column_maps,
76 cache_store,
77 }
78 }
79
80 pub fn batch_size(&self) -> usize {
82 self.cache_store.config().batch_size()
83 }
84
85 pub fn get_column(&self, column_id: u64) -> Option<CachedColumnRef> {
87 self.columns.by_id.get(&column_id).cloned()
88 }
89
90 pub fn get_column_by_name(&self, column_name: &str) -> Option<CachedColumnRef> {
92 if let Some(column) = self.columns.by_name.get(column_name) {
93 return Some(column.clone());
94 }
95
96 let unqualified = column_name.rsplit('.').next().unwrap_or(column_name);
99 self.columns.by_name.get(unqualified).cloned()
100 }
101
102 #[fastrace::trace]
104 pub async fn evaluate_selection_with_predicate(
105 &self,
106 batch_id: BatchID,
107 selection: &BooleanBuffer,
108 predicate: &mut LiquidPredicate,
109 ) -> Option<Result<BooleanArray, ArrowError>> {
110 let column_ids = predicate.predicate_column_ids();
111
112 if column_ids.len() == 1 {
113 let column_id = column_ids[0];
115 let cache = self.get_column(column_id as u64)?;
116 return cache
117 .eval_predicate_with_filter(batch_id, selection, predicate)
118 .await;
119 } else if column_ids.len() >= 2 {
120 if let Some(column_exprs) =
122 extract_multi_column_or(predicate.physical_expr_physical_column_index())
123 {
124 let mut combined_buffer: Option<BooleanArray> = None;
125
126 for (col_name, expr) in column_exprs {
127 let column = self.get_column_by_name(col_name)?;
128 let entry_id = column.entry_id(batch_id).into();
129 let liquid_array = self.cache_store.try_read_liquid(&entry_id).await;
130 let liquid_array = match liquid_array {
131 None => {
132 combined_buffer = None;
133 break;
134 }
135 Some(array) => array,
136 };
137 let buffer =
138 if let Some(buffer) = liquid_array.try_eval_predicate(&expr, selection) {
139 buffer
140 } else {
141 combined_buffer = None;
142 break;
143 };
144
145 combined_buffer = Some(match combined_buffer {
146 None => buffer,
147 Some(existing) => {
148 arrow::compute::kernels::boolean::or_kleene(&existing, &buffer).ok()?
149 }
150 });
151 }
152
153 if let Some(result) = combined_buffer {
154 return Some(Ok(result));
155 }
156 }
157 }
158 let mut arrays = Vec::new();
160 let mut fields = Vec::new();
161 for column_id in column_ids {
162 let column = self.get_column(column_id as u64)?;
163 let array = column
164 .get_arrow_array_with_filter(batch_id, selection)
165 .await?;
166 arrays.push(array);
167 fields.push(column.field());
168 }
169 let schema = Arc::new(Schema::new(fields));
170 let record_batch = RecordBatch::try_new(schema, arrays).unwrap();
171 let boolean_array = predicate.evaluate(record_batch).unwrap();
172 Some(Ok(boolean_array))
173 }
174}
175
176pub(crate) type CachedRowGroupRef = Arc<CachedRowGroup>;
177
178#[derive(Debug)]
180pub struct CachedFile {
181 cache_store: Arc<LiquidCache>,
182 file_id: u64,
183 file_schema: SchemaRef,
184}
185
186impl CachedFile {
187 fn new(cache_store: Arc<LiquidCache>, file_id: u64, file_schema: SchemaRef) -> Self {
188 Self {
189 cache_store,
190 file_id,
191 file_schema,
192 }
193 }
194
195 pub fn create_row_group(
197 &self,
198 row_group_id: u64,
199 predicate_column_ids: Vec<usize>,
200 ) -> CachedRowGroupRef {
201 let columns: Vec<(u64, Arc<Field>, bool)> = self
202 .file_schema
203 .fields()
204 .iter()
205 .enumerate()
206 .map(|(idx, field)| {
207 let is_predicate_column = predicate_column_ids.contains(&idx);
208 (idx as u64, Arc::clone(field), is_predicate_column)
209 })
210 .collect();
211
212 Arc::new(CachedRowGroup::new(
213 self.cache_store.clone(),
214 row_group_id,
215 self.file_id,
216 &columns,
217 ))
218 }
219
220 pub fn batch_size(&self) -> usize {
222 self.cache_store.config().batch_size()
223 }
224
225 pub fn schema(&self) -> SchemaRef {
227 Arc::clone(&self.file_schema)
228 }
229}
230
231pub(crate) type CachedFileRef = Arc<CachedFile>;
233
234#[derive(Debug)]
236pub struct LiquidCacheParquet {
237 files: Mutex<AHashMap<String, u64>>,
239
240 cache_store: Arc<LiquidCache>,
241
242 current_file_id: AtomicU64,
243}
244
245pub type LiquidCacheParquetRef = Arc<LiquidCacheParquet>;
247
248impl LiquidCacheParquet {
249 pub fn new(
251 batch_size: usize,
252 max_cache_bytes: usize,
253 cache_dir: PathBuf,
254 cache_policy: Box<dyn CachePolicy>,
255 squeeze_policy: Box<dyn SqueezePolicy>,
256 hydration_policy: Box<dyn HydrationPolicy>,
257 io_mode: IoMode,
258 ) -> Self {
259 assert!(batch_size.is_power_of_two());
260 let io_context = Arc::new(ParquetIoContext::new(cache_dir.clone(), io_mode));
261 let cache_storage_builder = LiquidCacheBuilder::new()
262 .with_batch_size(batch_size)
263 .with_max_cache_bytes(max_cache_bytes)
264 .with_cache_dir(cache_dir.clone())
265 .with_squeeze_policy(squeeze_policy)
266 .with_cache_policy(cache_policy)
267 .with_hydration_policy(hydration_policy)
268 .with_io_context(io_context);
269 let cache_storage = cache_storage_builder.build();
270
271 LiquidCacheParquet {
272 files: Mutex::new(AHashMap::new()),
273 cache_store: cache_storage,
274 current_file_id: AtomicU64::new(0),
275 }
276 }
277
278 pub fn register_or_get_file(
280 &self,
281 file_path: String,
282 full_file_schema: SchemaRef,
283 ) -> CachedFileRef {
284 let mut files = self.files.lock().unwrap();
285 let file_id = *files
286 .entry(file_path.clone())
287 .or_insert_with(|| self.current_file_id.fetch_add(1, Ordering::Relaxed));
288 drop(files);
289
290 Arc::new(CachedFile::new(
291 self.cache_store.clone(),
292 file_id,
293 full_file_schema,
294 ))
295 }
296
297 pub fn batch_size(&self) -> usize {
299 self.cache_store.config().batch_size()
300 }
301
302 pub fn max_cache_bytes(&self) -> usize {
304 self.cache_store.config().max_cache_bytes()
305 }
306
307 pub fn memory_usage_bytes(&self) -> usize {
309 self.cache_store.budget().memory_usage_bytes()
310 }
311
312 pub fn disk_usage_bytes(&self) -> usize {
314 self.cache_store.budget().disk_usage_bytes()
315 }
316
317 pub fn flush_trace(&self, to_file: impl AsRef<Path>) {
319 self.cache_store.observer().flush_cache_trace(to_file);
320 }
321
322 pub fn enable_trace(&self) {
324 self.cache_store.observer().enable_cache_trace();
325 }
326
327 pub fn disable_trace(&self) {
329 self.cache_store.observer().disable_cache_trace();
330 }
331
332 pub unsafe fn reset(&self) {
338 let mut files = self.files.lock().unwrap();
339 files.clear();
340 self.cache_store.reset();
341 }
342
343 pub async fn flush_data(&self) {
351 self.cache_store.flush_all_to_disk().await;
352 }
353
354 pub fn storage(&self) -> &Arc<LiquidCache> {
356 &self.cache_store
357 }
358
359 pub fn consume_event_trace(&self) -> EventTrace {
361 self.cache_store.consume_event_trace()
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368 use crate::cache::{CachedRowGroupRef, LiquidCacheParquet};
369 use crate::reader::FilterCandidateBuilder;
370 use arrow::array::Int32Array;
371 use arrow::buffer::BooleanBuffer;
372 use arrow::datatypes::{DataType, Field, Schema};
373 use arrow::record_batch::RecordBatch;
374 use datafusion::common::ScalarValue;
375 use datafusion::logical_expr::Operator;
376 use datafusion::physical_expr::PhysicalExpr;
377 use datafusion::physical_expr::expressions::{BinaryExpr, Literal};
378 use datafusion::physical_plan::expressions::Column;
379 use liquid_cache::cache::AlwaysHydrate;
380 use liquid_cache::cache::squeeze_policies::TranscodeSqueezeEvict;
381 use liquid_cache::cache_policies::LiquidPolicy;
382 use parquet::arrow::ArrowWriter;
383 use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
384 use std::sync::Arc;
385
386 fn setup_cache(batch_size: usize, schema: SchemaRef) -> CachedRowGroupRef {
387 let tmp_dir = tempfile::tempdir().unwrap();
388 let cache = LiquidCacheParquet::new(
389 batch_size,
390 usize::MAX,
391 tmp_dir.path().to_path_buf(),
392 Box::new(LiquidPolicy::new()),
393 Box::new(TranscodeSqueezeEvict),
394 Box::new(AlwaysHydrate::new()),
395 IoMode::Uring,
396 );
397 let file = cache.register_or_get_file("test".to_string(), schema);
398 file.create_row_group(0, vec![])
399 }
400
401 #[tokio::test]
402 async fn evaluate_or_on_cached_columns() {
403 let batch_size = 4;
404
405 let schema = Arc::new(Schema::new(vec![
406 Field::new("a", DataType::Int32, false),
407 Field::new("b", DataType::Int32, false),
408 ]));
409 let row_group = setup_cache(batch_size, schema.clone());
410
411 let col_a = row_group.get_column(0).unwrap();
412 let col_b = row_group.get_column(1).unwrap();
413
414 let batch_id = BatchID::from_row_id(0, batch_size);
415
416 let array_a = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
417 let array_b = Arc::new(Int32Array::from(vec![10, 20, 30, 40]));
418
419 assert!(col_a.insert(batch_id, array_a.clone()).await.is_ok());
420 assert!(col_b.insert(batch_id, array_b.clone()).await.is_ok());
421
422 let tmp_meta = tempfile::NamedTempFile::new().unwrap();
424 let mut writer =
425 ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
426 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array_a, array_b]).unwrap();
427 writer.write(&batch).unwrap();
428 writer.close().unwrap();
429 let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
430 let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
431
432 let expr_a: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
434 Arc::new(Column::new("a", 0)),
435 Operator::Eq,
436 Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
437 ));
438 let expr_b: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
439 Arc::new(Column::new("b", 1)),
440 Operator::Eq,
441 Arc::new(Literal::new(ScalarValue::Int32(Some(20)))),
442 ));
443 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(expr_a, Operator::Or, expr_b));
444
445 let builder = FilterCandidateBuilder::new(expr, Arc::clone(&schema));
446 let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
447 let projection = candidate.projection(metadata.metadata());
448 let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
449
450 let selection = BooleanBuffer::new_set(batch_size);
451 let result = row_group
452 .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
453 .await
454 .unwrap()
455 .unwrap();
456
457 let expected = BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 2).into();
458 assert_eq!(result, expected);
459 }
460
461 #[tokio::test]
462 async fn evaluate_three_column_or() {
463 let batch_size = 8;
464
465 let schema = Arc::new(Schema::new(vec![
466 Field::new("a", DataType::Int32, false),
467 Field::new("b", DataType::Int32, false),
468 Field::new("c", DataType::Int32, false),
469 ]));
470
471 let row_group = setup_cache(batch_size, schema.clone());
472
473 let col_a = row_group.get_column(0).unwrap();
474 let col_b = row_group.get_column(1).unwrap();
475 let col_c = row_group.get_column(2).unwrap();
476
477 let batch_id = BatchID::from_row_id(0, batch_size);
478
479 let array_a = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]));
480 let array_b = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50, 60, 70, 80]));
481 let array_c = Arc::new(Int32Array::from(vec![
482 100, 200, 300, 400, 500, 600, 700, 800,
483 ]));
484
485 assert!(col_a.insert(batch_id, array_a.clone()).await.is_ok());
486 assert!(col_b.insert(batch_id, array_b.clone()).await.is_ok());
487 assert!(col_c.insert(batch_id, array_c.clone()).await.is_ok());
488
489 let tmp_meta = tempfile::NamedTempFile::new().unwrap();
491 let mut writer =
492 ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
493 let batch =
494 RecordBatch::try_new(Arc::clone(&schema), vec![array_a, array_b, array_c]).unwrap();
495 writer.write(&batch).unwrap();
496 writer.close().unwrap();
497 let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
498 let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
499
500 let expr_a: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
502 Arc::new(Column::new("a", 0)),
503 Operator::Eq,
504 Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
505 ));
506 let expr_b: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
507 Arc::new(Column::new("b", 1)),
508 Operator::Eq,
509 Arc::new(Literal::new(ScalarValue::Int32(Some(40)))),
510 ));
511 let expr_c: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
512 Arc::new(Column::new("c", 2)),
513 Operator::Eq,
514 Arc::new(Literal::new(ScalarValue::Int32(Some(600)))),
515 ));
516
517 let expr_ab = Arc::new(BinaryExpr::new(expr_a, Operator::Or, expr_b));
519 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(expr_ab, Operator::Or, expr_c));
520
521 let builder = FilterCandidateBuilder::new(expr, Arc::clone(&schema));
522 let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
523 let projection = candidate.projection(metadata.metadata());
524 let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
525
526 let selection = BooleanBuffer::new_set(batch_size);
527 let result = row_group
528 .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
529 .await
530 .unwrap()
531 .unwrap();
532
533 let expected =
535 BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 3 || i == 5).into();
536 assert_eq!(result, expected);
537 }
538
539 #[tokio::test]
540 async fn evaluate_string_column_or() {
541 let batch_size = 8;
542
543 let schema = Arc::new(Schema::new(vec![
544 Field::new("name", DataType::Utf8View, false),
545 Field::new("city", DataType::Utf8View, false),
546 ]));
547
548 let row_group = setup_cache(batch_size, schema.clone());
549
550 let col_name = row_group.get_column(0).unwrap();
551 let col_city = row_group.get_column(1).unwrap();
552
553 let batch_id = BatchID::from_row_id(0, batch_size);
554
555 let array_name = Arc::new(arrow::array::StringViewArray::from(vec![
556 "Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Henry",
557 ]));
558 let array_city = Arc::new(arrow::array::StringViewArray::from(vec![
559 "New York", "London", "Paris", "Tokyo", "Berlin", "Sydney", "Madrid", "Rome",
560 ]));
561
562 assert!(col_name.insert(batch_id, array_name.clone()).await.is_ok());
563 assert!(col_city.insert(batch_id, array_city.clone()).await.is_ok());
564
565 let tmp_meta = tempfile::NamedTempFile::new().unwrap();
567 let mut writer =
568 ArrowWriter::try_new(tmp_meta.reopen().unwrap(), Arc::clone(&schema), None).unwrap();
569 let batch =
570 RecordBatch::try_new(Arc::clone(&schema), vec![array_name, array_city]).unwrap();
571 writer.write(&batch).unwrap();
572 writer.close().unwrap();
573 let file_reader = std::fs::File::open(tmp_meta.path()).unwrap();
574 let metadata = ArrowReaderMetadata::load(&file_reader, ArrowReaderOptions::new()).unwrap();
575
576 let expr_name: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
578 Arc::new(Column::new("name", 0)),
579 Operator::Eq,
580 Arc::new(Literal::new(ScalarValue::Utf8View(Some("Bob".to_string())))),
581 ));
582 let expr_city: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
583 Arc::new(Column::new("city", 1)),
584 Operator::Eq,
585 Arc::new(Literal::new(ScalarValue::Utf8View(Some(
586 "Tokyo".to_string(),
587 )))),
588 ));
589 let expr: Arc<dyn PhysicalExpr> =
590 Arc::new(BinaryExpr::new(expr_name, Operator::Or, expr_city));
591
592 let builder = FilterCandidateBuilder::new(expr, Arc::clone(&schema));
593 let candidate = builder.build(metadata.metadata()).unwrap().unwrap();
594 let projection = candidate.projection(metadata.metadata());
595 let mut predicate = LiquidPredicate::try_new(candidate, projection).unwrap();
596
597 let selection = BooleanBuffer::new_set(batch_size);
598 let result = row_group
599 .evaluate_selection_with_predicate(batch_id, &selection, &mut predicate)
600 .await
601 .unwrap()
602 .unwrap();
603
604 let expected = BooleanBuffer::collect_bool(batch_size, |i| i == 1 || i == 3).into();
606 assert_eq!(result, expected);
607 }
608}