Skip to main content

supertable_core/
scan.rs

1use crate::manifest::{DataFile, FileContent, Snapshot};
2use crate::storage::Storage;
3use anyhow::Result;
4
5/// A single unit of work for reading a table.
6/// Combines a data file with its relevant delete files.
7#[derive(Debug, Clone)]
8pub struct ScanTask {
9    pub data_file: DataFile,
10    pub delete_files: Vec<DataFile>,
11}
12
13/// A simple predicate for data pruning.
14#[derive(Debug, Clone)]
15pub enum Predicate {
16    /// Equality predicate: column_id == value
17    Eq { column_id: i32, value: Vec<u8> },
18    /// Set membership: column_id IN (values)
19    In {
20        column_id: i32,
21        values: Vec<Vec<u8>>,
22    },
23}
24
25/// Plan scans for a table snapshot.
26pub struct ScanPlanner<'a> {
27    snapshot: &'a Snapshot,
28    storage: &'a Storage,
29    filter: Option<Predicate>,
30}
31
32impl<'a> ScanPlanner<'a> {
33    pub fn new(snapshot: &'a Snapshot, storage: &'a Storage) -> Self {
34        Self {
35            snapshot,
36            storage,
37            filter: None,
38        }
39    }
40
41    /// Adds a filter to the scan planner.
42    pub fn with_filter(mut self, filter: Predicate) -> Self {
43        self.filter = Some(filter);
44        self
45    }
46
47    /// Plans the scan by associating data files with relevant delete files.
48    pub async fn plan(&self) -> Result<Vec<ScanTask>> {
49        let (data_files, delete_files) = self.snapshot.all_files(self.storage).await?;
50
51        // Group delete files by type
52        let mut pos_deletes = Vec::new();
53        let mut eq_deletes = Vec::new();
54
55        for df in delete_files {
56            match df.content {
57                FileContent::PositionDeletes => pos_deletes.push(df),
58                FileContent::EqualityDeletes => eq_deletes.push(df),
59                _ => {}
60            }
61        }
62
63        // For this prototype, we'll associate all equality deletes with all data files
64        // and filter position deletes by file path if we were to read them here.
65        // In a real implementation, we'd use partition pruning for deletes too.
66
67        let tasks = data_files
68            .into_iter()
69            .filter(|df| self.should_keep_file(df))
70            .map(|data_file| {
71                // Find relevant position deletes for this specific file.
72                // (Simplified: In a real system we'd use metadata to avoid searching all)
73                let mut relevant_deletes = Vec::new();
74
75                // Add all equality deletes (conservative)
76                relevant_deletes.extend(eq_deletes.clone());
77
78                // Add all position deletes (reader will filter)
79                relevant_deletes.extend(pos_deletes.clone());
80
81                ScanTask {
82                    data_file,
83                    delete_files: relevant_deletes,
84                }
85            })
86            .collect();
87
88        Ok(tasks)
89    }
90
91    fn should_keep_file(&self, data_file: &DataFile) -> bool {
92        if let Some(ref filter) = self.filter {
93            match filter {
94                Predicate::Eq { column_id, value } => {
95                    if let Some(stats) = data_file.statistics.get(column_id) {
96                        if let Some(ref bf) = stats.bloom_filter {
97                            return bf.contains(value);
98                        }
99                    }
100                }
101                Predicate::In { column_id, values } => {
102                    if let Some(stats) = data_file.statistics.get(column_id) {
103                        if let Some(ref bf) = stats.bloom_filter {
104                            return values.iter().any(|v| bf.contains(v));
105                        }
106                    }
107                }
108            }
109        }
110        true
111    }
112}