1use std::ops::Range;
10use std::sync::Arc;
11
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::Columnar;
15use vortex_array::VortexSessionExecute;
16use vortex_array::expr::Expression;
17use vortex_array::expr::pruning::checked_pruning_expr;
18use vortex_dtype::DType;
19use vortex_dtype::Field;
20use vortex_dtype::FieldMask;
21use vortex_dtype::FieldPath;
22use vortex_dtype::FieldPathSet;
23use vortex_error::VortexResult;
24use vortex_layout::LayoutReader;
25use vortex_layout::segments::SegmentSource;
26use vortex_scan::ScanBuilder;
27use vortex_scan::SplitBy;
28use vortex_session::VortexSession;
29use vortex_utils::aliases::hash_map::HashMap;
30
31use crate::FileStatistics;
32use crate::footer::Footer;
33use crate::pruning::extract_relevant_file_stats_as_struct_row;
34
35#[derive(Clone)]
41pub struct VortexFile {
42 pub(crate) footer: Footer,
44 pub(crate) segment_source: Arc<dyn SegmentSource>,
46 pub(crate) session: VortexSession,
48}
49
50impl VortexFile {
51 pub fn footer(&self) -> &Footer {
53 &self.footer
54 }
55
56 pub fn row_count(&self) -> u64 {
58 self.footer.row_count()
59 }
60
61 pub fn dtype(&self) -> &DType {
63 self.footer.dtype()
64 }
65
66 pub fn file_stats(&self) -> Option<&FileStatistics> {
70 self.footer.statistics()
71 }
72
73 pub fn segment_source(&self) -> Arc<dyn SegmentSource> {
78 self.segment_source.clone()
79 }
80
81 pub fn layout_reader(&self) -> VortexResult<Arc<dyn LayoutReader>> {
83 let segment_source = self.segment_source();
84 self.footer
85 .layout()
86 .new_reader("".into(), segment_source, &self.session)
88 }
89
90 pub fn scan(&self) -> VortexResult<ScanBuilder<ArrayRef>> {
92 Ok(ScanBuilder::new(
93 self.session.clone(),
94 self.layout_reader()?,
95 ))
96 }
97
98 pub fn can_prune(&self, filter: &Expression) -> VortexResult<bool> {
100 let Some((stats, fields)) = self
101 .footer
102 .statistics()
103 .zip(self.footer.dtype().as_struct_fields_opt())
104 else {
105 return Ok(false);
106 };
107
108 let set = FieldPathSet::from_iter(
109 fields
110 .names()
111 .iter()
112 .zip(stats.stats_sets().iter())
113 .flat_map(|(name, stats)| {
114 stats.iter().map(|(stat, _)| {
115 FieldPath::from_iter([
116 Field::Name(name.clone()),
117 Field::Name(stat.name().into()),
118 ])
119 })
120 }),
121 );
122
123 let Some((predicate, required_stats)) = checked_pruning_expr(filter, &set) else {
124 return Ok(false);
125 };
126
127 let required_file_stats = HashMap::from_iter(
128 required_stats
129 .map()
130 .iter()
131 .map(|(path, stats)| (path.clone(), stats.clone())),
132 );
133
134 let Some(file_stats) = extract_relevant_file_stats_as_struct_row(
135 &required_file_stats,
136 stats.stats_sets(),
137 fields,
138 )?
139 else {
140 return Ok(false);
141 };
142
143 let mut ctx = self.session.create_execution_ctx();
144 Ok(
145 match file_stats
146 .apply(&predicate)?
147 .execute::<Columnar>(&mut ctx)?
148 {
149 Columnar::Constant(s) => s.scalar().as_bool().value() == Some(true),
150 Columnar::Canonical(_) => false,
151 },
152 )
153 }
154
155 pub fn splits(&self) -> VortexResult<Vec<Range<u64>>> {
156 let reader = self.layout_reader()?;
157 Ok(SplitBy::Layout
158 .splits(reader.as_ref(), &(0..reader.row_count()), &[FieldMask::All])?
159 .into_iter()
160 .tuple_windows()
161 .map(|(start, end)| start..end)
162 .collect())
163 }
164}