1use std::ops::Range;
10use std::sync::Arc;
11
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::Columnar;
15use vortex_array::VortexSessionExecute;
16use vortex_array::dtype::DType;
17use vortex_array::dtype::Field;
18use vortex_array::dtype::FieldMask;
19use vortex_array::dtype::FieldPath;
20use vortex_array::dtype::FieldPathSet;
21use vortex_array::expr::Expression;
22use vortex_array::expr::pruning::checked_pruning_expr;
23use vortex_error::VortexResult;
24use vortex_layout::LayoutReader;
25use vortex_layout::segments::SegmentSource;
26use vortex_scan::ScanBuilder;
27use vortex_scan::SplitBy;
28use vortex_scan::api::DataSourceRef;
29use vortex_scan::layout::LayoutReaderDataSource;
30use vortex_session::VortexSession;
31use vortex_utils::aliases::hash_map::HashMap;
32
33use crate::FileStatistics;
34use crate::footer::Footer;
35use crate::pruning::extract_relevant_file_stats_as_struct_row;
36use crate::v2::FileStatsLayoutReader;
37
38#[derive(Clone)]
44pub struct VortexFile {
45 pub(crate) footer: Footer,
47 pub(crate) segment_source: Arc<dyn SegmentSource>,
49 pub(crate) session: VortexSession,
51}
52
53impl VortexFile {
54 pub fn footer(&self) -> &Footer {
56 &self.footer
57 }
58
59 pub fn row_count(&self) -> u64 {
61 self.footer.row_count()
62 }
63
64 pub fn dtype(&self) -> &DType {
66 self.footer.dtype()
67 }
68
69 pub fn file_stats(&self) -> Option<&FileStatistics> {
73 self.footer.statistics()
74 }
75
76 pub fn segment_source(&self) -> Arc<dyn SegmentSource> {
81 self.segment_source.clone()
82 }
83
84 pub fn layout_reader(&self) -> VortexResult<Arc<dyn LayoutReader>> {
86 let segment_source = self.segment_source();
87 self.footer
88 .layout()
89 .new_reader("".into(), segment_source, &self.session)
91 }
92
93 pub fn data_source(&self) -> VortexResult<DataSourceRef> {
98 let mut reader = self.layout_reader()?;
99 if let Some(stats) = self.file_stats().cloned() {
100 reader = Arc::new(FileStatsLayoutReader::new(
101 reader,
102 stats,
103 self.session.clone(),
104 ));
105 }
106 Ok(Arc::new(LayoutReaderDataSource::new(
107 reader,
108 self.session.clone(),
109 )))
110 }
111
112 pub fn scan(&self) -> VortexResult<ScanBuilder<ArrayRef>> {
114 Ok(ScanBuilder::new(
115 self.session.clone(),
116 self.layout_reader()?,
117 ))
118 }
119
120 pub fn can_prune(&self, filter: &Expression) -> VortexResult<bool> {
122 let Some((stats, fields)) = self
123 .footer
124 .statistics()
125 .zip(self.footer.dtype().as_struct_fields_opt())
126 else {
127 return Ok(false);
128 };
129
130 let set = FieldPathSet::from_iter(
131 fields
132 .names()
133 .iter()
134 .zip(stats.stats_sets().iter())
135 .flat_map(|(name, stats)| {
136 stats.iter().map(|(stat, _)| {
137 FieldPath::from_iter([
138 Field::Name(name.clone()),
139 Field::Name(stat.name().into()),
140 ])
141 })
142 }),
143 );
144
145 let Some((predicate, required_stats)) = checked_pruning_expr(filter, &set) else {
146 return Ok(false);
147 };
148
149 let required_file_stats = HashMap::from_iter(
150 required_stats
151 .map()
152 .iter()
153 .map(|(path, stats)| (path.clone(), stats.clone())),
154 );
155
156 let Some(file_stats) = extract_relevant_file_stats_as_struct_row(
157 &required_file_stats,
158 stats.stats_sets(),
159 fields,
160 )?
161 else {
162 return Ok(false);
163 };
164
165 let mut ctx = self.session.create_execution_ctx();
166 Ok(
167 match file_stats
168 .apply(&predicate)?
169 .execute::<Columnar>(&mut ctx)?
170 {
171 Columnar::Constant(s) => s.scalar().as_bool().value() == Some(true),
172 Columnar::Canonical(_) => false,
173 },
174 )
175 }
176
177 pub fn splits(&self) -> VortexResult<Vec<Range<u64>>> {
178 let reader = self.layout_reader()?;
179 Ok(SplitBy::Layout
180 .splits(reader.as_ref(), &(0..reader.row_count()), &[FieldMask::All])?
181 .into_iter()
182 .tuple_windows()
183 .map(|(start, end)| start..end)
184 .collect())
185 }
186}