supertable_core/
reader.rs1use crate::manifest::FileContent;
2use crate::scan::ScanTask;
3use arrow::array::RecordBatch;
4use arrow::array::{Array, AsArray, BooleanArray};
5use arrow::compute::filter_record_batch;
6use arrow::datatypes::SchemaRef;
7use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
8use std::collections::HashSet;
9
10use crate::storage::Storage;
11
12pub struct TableReader {
14 storage: Storage,
15}
16
17impl TableReader {
18 pub fn new(storage: Storage) -> Self {
19 Self { storage }
20 }
21
22 pub async fn read_file(&self, path: &str) -> anyhow::Result<Vec<RecordBatch>> {
24 let data = self.storage.read(path).await?;
25
26 let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
28 let reader = builder.build()?;
29
30 let batches: Result<Vec<_>, _> = reader.collect();
31 Ok(batches?)
32 }
33
34 pub async fn read_schema(&self, path: &str) -> anyhow::Result<SchemaRef> {
36 let data = self.storage.read(path).await?;
37
38 let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
39 Ok(builder.schema().clone())
40 }
41
42 pub async fn read_task(&self, task: ScanTask) -> anyhow::Result<Vec<RecordBatch>> {
44 let batches = self.read_file(&task.data_file.file_path).await?;
45
46 if task.delete_files.is_empty() {
47 return Ok(batches);
48 }
49
50 let mut deleted_positions = HashSet::new();
52 for delete_file in &task.delete_files {
53 if delete_file.content == FileContent::PositionDeletes {
54 let del_batches = self.read_file(&delete_file.file_path).await?;
55 for batch in del_batches {
56 let path_col = batch.column(0).as_string::<i32>();
60 let pos_col = batch
61 .column(1)
62 .as_primitive::<arrow::datatypes::Int64Type>();
63
64 for i in 0..batch.num_rows() {
65 if !path_col.is_null(i) && path_col.value(i) == task.data_file.file_path {
66 deleted_positions.insert(pos_col.value(i));
67 }
68 }
69 }
70 }
71 }
72
73 if deleted_positions.is_empty() {
74 return Ok(batches);
75 }
76
77 let mut filtered_batches = Vec::new();
79 let mut current_pos = 0;
80
81 for batch in batches {
82 let num_rows = batch.num_rows();
83 let mut mask = Vec::with_capacity(num_rows);
84 for i in 0..num_rows {
85 let is_deleted = deleted_positions.contains(&(current_pos + i as i64));
86 mask.push(!is_deleted);
87 }
88
89 let bool_mask = BooleanArray::from(mask);
90 let filtered = filter_record_batch(&batch, &bool_mask)?;
91 filtered_batches.push(filtered);
92 current_pos += num_rows as i64;
93 }
94
95 Ok(filtered_batches)
96 }
97}