1use std::ops::Range;
10use std::sync::Arc;
11
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::expr::Expression;
15use vortex_array::expr::pruning::checked_pruning_expr;
16use vortex_array::stats::StatsSet;
17use vortex_dtype::{DType, Field, FieldMask, FieldPath, FieldPathSet};
18use vortex_error::VortexResult;
19use vortex_layout::LayoutReader;
20use vortex_layout::segments::SegmentSource;
21use vortex_metrics::VortexMetrics;
22use vortex_scan::{ScanBuilder, SplitBy};
23use vortex_session::VortexSession;
24use vortex_utils::aliases::hash_map::HashMap;
25
26use crate::footer::Footer;
27use crate::pruning::extract_relevant_file_stats_as_struct_row;
28
29#[derive(Clone)]
35pub struct VortexFile {
36 pub(crate) footer: Footer,
38 pub(crate) segment_source: Arc<dyn SegmentSource>,
40 pub(crate) metrics: VortexMetrics,
42 pub(crate) session: VortexSession,
44}
45
46impl VortexFile {
47 pub fn footer(&self) -> &Footer {
49 &self.footer
50 }
51
52 pub fn row_count(&self) -> u64 {
54 self.footer.row_count()
55 }
56
57 pub fn dtype(&self) -> &DType {
59 self.footer.dtype()
60 }
61
62 pub fn file_stats(&self) -> Option<&Arc<[StatsSet]>> {
66 self.footer.statistics()
67 }
68
69 pub fn metrics(&self) -> &VortexMetrics {
71 &self.metrics
72 }
73
74 pub fn segment_source(&self) -> Arc<dyn SegmentSource> {
79 self.segment_source.clone()
80 }
81
82 pub fn layout_reader(&self) -> VortexResult<Arc<dyn LayoutReader>> {
84 let segment_source = self.segment_source();
85 self.footer
86 .layout()
87 .new_reader("".into(), segment_source)
89 }
90
91 pub fn scan(&self) -> VortexResult<ScanBuilder<ArrayRef>> {
93 Ok(
94 ScanBuilder::new(self.session.clone(), self.layout_reader()?)
95 .with_metrics(self.metrics.clone()),
96 )
97 }
98
99 #[cfg(gpu_unstable)]
100 pub fn gpu_scan(
101 &self,
102 ctx: Arc<cudarc::driver::CudaContext>,
103 ) -> VortexResult<vortex_scan::gpu::GpuScanBuilder<vortex_gpu::GpuVector>> {
104 let segment_source = self.segment_source();
105 let gpu_reader = self
106 .footer
107 .layout()
108 .new_gpu_reader("".into(), segment_source, ctx)?;
109
110 Ok(vortex_scan::gpu::GpuScanBuilder::new(
111 self.session.clone(),
112 gpu_reader,
113 ))
114 }
115
116 pub fn can_prune(&self, filter: &Expression) -> VortexResult<bool> {
118 let Some((stats, fields)) = self
119 .footer
120 .statistics()
121 .zip(self.footer.dtype().as_struct_fields_opt())
122 else {
123 return Ok(false);
124 };
125
126 let set = FieldPathSet::from_iter(fields.names().iter().zip(stats.iter()).flat_map(
127 |(name, stats)| {
128 stats.iter().map(|(stat, _)| {
129 FieldPath::from_iter([
130 Field::Name(name.clone()),
131 Field::Name(stat.name().into()),
132 ])
133 })
134 },
135 ));
136
137 let Some((predicate, required_stats)) = checked_pruning_expr(filter, &set) else {
138 return Ok(false);
139 };
140
141 let required_file_stats = HashMap::from_iter(
142 required_stats
143 .map()
144 .iter()
145 .map(|(path, stats)| (path.clone(), stats.clone())),
146 );
147
148 let Some(file_stats) =
149 extract_relevant_file_stats_as_struct_row(&required_file_stats, stats, fields)?
150 else {
151 return Ok(false);
152 };
153
154 Ok(predicate
155 .evaluate(&file_stats)?
156 .as_constant()
157 .is_some_and(|result| result.as_bool().value() == Some(true)))
158 }
159
160 pub fn splits(&self) -> VortexResult<Vec<Range<u64>>> {
161 let reader = self.layout_reader()?;
162 Ok(SplitBy::Layout
163 .splits(reader.as_ref(), &(0..reader.row_count()), &[FieldMask::All])?
164 .into_iter()
165 .tuple_windows()
166 .map(|(start, end)| start..end)
167 .collect())
168 }
169}