Skip to main content

supertable_core/
reader.rs

1use crate::manifest::FileContent;
2use crate::scan::ScanTask;
3use arrow::array::RecordBatch;
4use arrow::array::{Array, AsArray, BooleanArray};
5use arrow::compute::filter_record_batch;
6use arrow::datatypes::SchemaRef;
7use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
8use std::collections::HashSet;
9
10use crate::storage::Storage;
11
12/// Reads Parquet files from storage.
13pub struct TableReader {
14    storage: Storage,
15}
16
17impl TableReader {
18    pub fn new(storage: Storage) -> Self {
19        Self { storage }
20    }
21
22    /// Reads a Parquet file from storage and returns its RecordBatches.
23    pub async fn read_file(&self, path: &str) -> anyhow::Result<Vec<RecordBatch>> {
24        let data = self.storage.read(path).await?;
25
26        // Bytes implements ChunkReader directly
27        let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
28        let reader = builder.build()?;
29
30        let batches: Result<Vec<_>, _> = reader.collect();
31        Ok(batches?)
32    }
33
34    /// Get the schema from a Parquet file.
35    pub async fn read_schema(&self, path: &str) -> anyhow::Result<SchemaRef> {
36        let data = self.storage.read(path).await?;
37
38        let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
39        Ok(builder.schema().clone())
40    }
41
42    /// Reads a scan task and applies relevant deletes.
43    pub async fn read_task(&self, task: ScanTask) -> anyhow::Result<Vec<RecordBatch>> {
44        let batches = self.read_file(&task.data_file.file_path).await?;
45
46        if task.delete_files.is_empty() {
47            return Ok(batches);
48        }
49
50        // 1. Process position deletes
51        let mut deleted_positions = HashSet::new();
52        for delete_file in &task.delete_files {
53            if delete_file.content == FileContent::PositionDeletes {
54                let del_batches = self.read_file(&delete_file.file_path).await?;
55                for batch in del_batches {
56                    // Position delete files MUST have at least:
57                    // - file_path: UTF8
58                    // - pos: Int64
59                    let path_col = batch.column(0).as_string::<i32>();
60                    let pos_col = batch
61                        .column(1)
62                        .as_primitive::<arrow::datatypes::Int64Type>();
63
64                    for i in 0..batch.num_rows() {
65                        if !path_col.is_null(i) && path_col.value(i) == task.data_file.file_path {
66                            deleted_positions.insert(pos_col.value(i));
67                        }
68                    }
69                }
70            }
71        }
72
73        if deleted_positions.is_empty() {
74            return Ok(batches);
75        }
76
77        // 2. Apply deletes to data batches
78        let mut filtered_batches = Vec::new();
79        let mut current_pos = 0;
80
81        for batch in batches {
82            let num_rows = batch.num_rows();
83            let mut mask = Vec::with_capacity(num_rows);
84            for i in 0..num_rows {
85                let is_deleted = deleted_positions.contains(&(current_pos + i as i64));
86                mask.push(!is_deleted);
87            }
88
89            let bool_mask = BooleanArray::from(mask);
90            let filtered = filter_record_batch(&batch, &bool_mask)?;
91            filtered_batches.push(filtered);
92            current_pos += num_rows as i64;
93        }
94
95        Ok(filtered_batches)
96    }
97}