1use std::ops::Range;
10use std::sync::Arc;
11
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::VectorExecutor;
15use vortex_array::expr::Expression;
16use vortex_array::expr::pruning::checked_pruning_expr;
17use vortex_array::stats::StatsSet;
18use vortex_dtype::DType;
19use vortex_dtype::Field;
20use vortex_dtype::FieldMask;
21use vortex_dtype::FieldPath;
22use vortex_dtype::FieldPathSet;
23use vortex_error::VortexResult;
24use vortex_layout::LayoutReader;
25use vortex_layout::layouts::USE_VORTEX_OPERATORS;
26use vortex_layout::segments::SegmentSource;
27use vortex_metrics::VortexMetrics;
28use vortex_scan::ScanBuilder;
29use vortex_scan::SplitBy;
30use vortex_session::VortexSession;
31use vortex_utils::aliases::hash_map::HashMap;
32
33use crate::footer::Footer;
34use crate::pruning::extract_relevant_file_stats_as_struct_row;
35
36#[derive(Clone)]
42pub struct VortexFile {
43 pub(crate) footer: Footer,
45 pub(crate) segment_source: Arc<dyn SegmentSource>,
47 pub(crate) metrics: VortexMetrics,
49 pub(crate) session: VortexSession,
51}
52
53impl VortexFile {
54 pub fn footer(&self) -> &Footer {
56 &self.footer
57 }
58
59 pub fn row_count(&self) -> u64 {
61 self.footer.row_count()
62 }
63
64 pub fn dtype(&self) -> &DType {
66 self.footer.dtype()
67 }
68
69 pub fn file_stats(&self) -> Option<&Arc<[StatsSet]>> {
73 self.footer.statistics()
74 }
75
76 pub fn metrics(&self) -> &VortexMetrics {
78 &self.metrics
79 }
80
81 pub fn segment_source(&self) -> Arc<dyn SegmentSource> {
86 self.segment_source.clone()
87 }
88
89 pub fn layout_reader(&self) -> VortexResult<Arc<dyn LayoutReader>> {
91 let segment_source = self.segment_source();
92 self.footer
93 .layout()
94 .new_reader("".into(), segment_source, &self.session)
96 }
97
98 pub fn scan(&self) -> VortexResult<ScanBuilder<ArrayRef>> {
100 Ok(
101 ScanBuilder::new(self.session.clone(), self.layout_reader()?)
102 .with_metrics(self.metrics.clone()),
103 )
104 }
105
106 #[cfg(gpu_unstable)]
107 pub fn gpu_scan(
108 &self,
109 ctx: Arc<cudarc::driver::CudaContext>,
110 ) -> VortexResult<vortex_scan::gpu::GpuScanBuilder<vortex_gpu::GpuVector>> {
111 let segment_source = self.segment_source();
112 let gpu_reader = self
113 .footer
114 .layout()
115 .new_gpu_reader("".into(), segment_source, ctx)?;
116
117 Ok(vortex_scan::gpu::GpuScanBuilder::new(
118 self.session.clone(),
119 gpu_reader,
120 ))
121 }
122
123 pub fn can_prune(&self, filter: &Expression) -> VortexResult<bool> {
125 let Some((stats, fields)) = self
126 .footer
127 .statistics()
128 .zip(self.footer.dtype().as_struct_fields_opt())
129 else {
130 return Ok(false);
131 };
132
133 let set = FieldPathSet::from_iter(fields.names().iter().zip(stats.iter()).flat_map(
134 |(name, stats)| {
135 stats.iter().map(|(stat, _)| {
136 FieldPath::from_iter([
137 Field::Name(name.clone()),
138 Field::Name(stat.name().into()),
139 ])
140 })
141 },
142 ));
143
144 let Some((predicate, required_stats)) = checked_pruning_expr(filter, &set) else {
145 return Ok(false);
146 };
147
148 let required_file_stats = HashMap::from_iter(
149 required_stats
150 .map()
151 .iter()
152 .map(|(path, stats)| (path.clone(), stats.clone())),
153 );
154
155 let Some(file_stats) =
156 extract_relevant_file_stats_as_struct_row(&required_file_stats, stats, fields)?
157 else {
158 return Ok(false);
159 };
160
161 Ok(if *USE_VORTEX_OPERATORS {
162 file_stats
163 .execute_datum(&self.session)?
164 .into_scalar()
165 .is_some_and(|s| s.as_bool().value() == Some(true))
166 } else {
167 predicate
168 .evaluate(&file_stats)?
169 .as_constant()
170 .is_some_and(|result| result.as_bool().value() == Some(true))
171 })
172 }
173
174 pub fn splits(&self) -> VortexResult<Vec<Range<u64>>> {
175 let reader = self.layout_reader()?;
176 Ok(SplitBy::Layout
177 .splits(reader.as_ref(), &(0..reader.row_count()), &[FieldMask::All])?
178 .into_iter()
179 .tuple_windows()
180 .map(|(start, end)| start..end)
181 .collect())
182 }
183}