1use crate::manifest::{DataFile, FileContent, Snapshot};
2use crate::storage::Storage;
3use anyhow::Result;
4
5#[derive(Debug, Clone)]
8pub struct ScanTask {
9 pub data_file: DataFile,
10 pub delete_files: Vec<DataFile>,
11}
12
13#[derive(Debug, Clone)]
15pub enum Predicate {
16 Eq { column_id: i32, value: Vec<u8> },
18 In {
20 column_id: i32,
21 values: Vec<Vec<u8>>,
22 },
23}
24
25pub struct ScanPlanner<'a> {
27 snapshot: &'a Snapshot,
28 storage: &'a Storage,
29 filter: Option<Predicate>,
30}
31
32impl<'a> ScanPlanner<'a> {
33 pub fn new(snapshot: &'a Snapshot, storage: &'a Storage) -> Self {
34 Self {
35 snapshot,
36 storage,
37 filter: None,
38 }
39 }
40
41 pub fn with_filter(mut self, filter: Predicate) -> Self {
43 self.filter = Some(filter);
44 self
45 }
46
47 pub async fn plan(&self) -> Result<Vec<ScanTask>> {
49 let (data_files, delete_files) = self.snapshot.all_files(self.storage).await?;
50
51 let mut pos_deletes = Vec::new();
53 let mut eq_deletes = Vec::new();
54
55 for df in delete_files {
56 match df.content {
57 FileContent::PositionDeletes => pos_deletes.push(df),
58 FileContent::EqualityDeletes => eq_deletes.push(df),
59 _ => {}
60 }
61 }
62
63 let tasks = data_files
68 .into_iter()
69 .filter(|df| self.should_keep_file(df))
70 .map(|data_file| {
71 let mut relevant_deletes = Vec::new();
74
75 relevant_deletes.extend(eq_deletes.clone());
77
78 relevant_deletes.extend(pos_deletes.clone());
80
81 ScanTask {
82 data_file,
83 delete_files: relevant_deletes,
84 }
85 })
86 .collect();
87
88 Ok(tasks)
89 }
90
91 fn should_keep_file(&self, data_file: &DataFile) -> bool {
92 if let Some(ref filter) = self.filter {
93 match filter {
94 Predicate::Eq { column_id, value } => {
95 if let Some(stats) = data_file.statistics.get(column_id) {
96 if let Some(ref bf) = stats.bloom_filter {
97 return bf.contains(value);
98 }
99 }
100 }
101 Predicate::In { column_id, values } => {
102 if let Some(stats) = data_file.statistics.get(column_id) {
103 if let Some(ref bf) = stats.bloom_filter {
104 return values.iter().any(|v| bf.contains(v));
105 }
106 }
107 }
108 }
109 }
110 true
111 }
112}