vortex_datafusion/persistent/
source.rs1use std::any::Any;
5use std::fmt::Formatter;
6use std::sync::{Arc, Weak};
7
8use arrow_schema::SchemaRef;
9use datafusion_common::config::ConfigOptions;
10use datafusion_common::{Result as DFResult, Statistics};
11use datafusion_datasource::file::FileSource;
12use datafusion_datasource::file_scan_config::FileScanConfig;
13use datafusion_datasource::file_stream::FileOpener;
14use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
15use datafusion_physical_expr::schema_rewriter::{
16 DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory,
17};
18use datafusion_physical_expr::{PhysicalExprRef, conjunction};
19use datafusion_physical_plan::filter_pushdown::{
20 FilterPushdownPropagation, PushedDown, PushedDownPredicate,
21};
22use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
23use datafusion_physical_plan::{DisplayFormatType, PhysicalExpr};
24use object_store::ObjectStore;
25use object_store::path::Path;
26use vortex::error::VortexExpect as _;
27use vortex::file::VORTEX_FILE_EXTENSION;
28use vortex::layout::LayoutReader;
29use vortex::metrics::VortexMetrics;
30use vortex_utils::aliases::dash_map::DashMap;
31
32use super::cache::VortexFileCache;
33use super::metrics::PARTITION_LABEL;
34use super::opener::VortexOpener;
35use crate::convert::exprs::can_be_pushed_down;
36
37#[derive(Clone)]
41pub struct VortexSource {
42 pub(crate) file_cache: VortexFileCache,
43 pub(crate) predicate: Option<PhysicalExprRef>,
44 pub(crate) batch_size: Option<usize>,
45 pub(crate) projected_statistics: Option<Statistics>,
46 pub(crate) arrow_file_schema: Option<SchemaRef>,
48 pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
49 pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
50 pub(crate) metrics: VortexMetrics,
51 _unused_df_metrics: ExecutionPlanMetricsSet,
52 layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
56}
57
58impl VortexSource {
59 pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self {
60 Self {
61 file_cache,
62 metrics,
63 predicate: None,
64 batch_size: None,
65 projected_statistics: None,
66 arrow_file_schema: None,
67 schema_adapter_factory: None,
68 expr_adapter_factory: None,
69 _unused_df_metrics: Default::default(),
70 layout_readers: Arc::new(DashMap::default()),
71 }
72 }
73
74 pub fn with_expr_adapter_factory(
79 &self,
80 expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
81 ) -> Arc<dyn FileSource> {
82 let mut source = self.clone();
83 source.expr_adapter_factory = Some(expr_adapter_factory);
84 Arc::new(source)
85 }
86}
87
88impl FileSource for VortexSource {
89 fn create_file_opener(
90 &self,
91 object_store: Arc<dyn ObjectStore>,
92 base_config: &FileScanConfig,
93 partition: usize,
94 ) -> Arc<dyn FileOpener> {
95 let partition_metrics = self
96 .metrics
97 .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
98
99 let batch_size = self
100 .batch_size
101 .vortex_expect("batch_size must be supplied to VortexSource");
102
103 let expr_adapter = self
104 .expr_adapter_factory
105 .as_ref()
106 .or(base_config.expr_adapter_factory.as_ref());
107 let schema_adapter = self.schema_adapter_factory.as_ref();
108
109 let (expr_adapter_factory, schema_adapter_factory) = match (expr_adapter, schema_adapter) {
111 (Some(expr_adapter), Some(schema_adapter)) => {
112 (Some(expr_adapter.clone()), schema_adapter.clone())
113 }
114 (Some(expr_adapter), None) => (
115 Some(expr_adapter.clone()),
116 Arc::new(DefaultSchemaAdapterFactory) as _,
117 ),
118 (None, Some(schema_adapter)) => {
119 (None, schema_adapter.clone())
121 }
122 (None, None) => (
123 Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
124 Arc::new(DefaultSchemaAdapterFactory) as _,
125 ),
126 };
127
128 let projection = base_config.file_column_projection_indices().map(Arc::from);
129
130 let opener = VortexOpener {
131 object_store,
132 projection,
133 filter: self.predicate.clone(),
134 expr_adapter_factory,
135 schema_adapter_factory,
136 partition_fields: base_config.table_partition_cols.clone(),
137 logical_schema: base_config.file_schema.clone(),
138 file_cache: self.file_cache.clone(),
139 batch_size,
140 limit: base_config.limit,
141 metrics: partition_metrics,
142 layout_readers: self.layout_readers.clone(),
143 };
144
145 Arc::new(opener)
146 }
147
148 fn as_any(&self) -> &dyn Any {
149 self
150 }
151
152 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
153 let mut source = self.clone();
154 source.batch_size = Some(batch_size);
155 Arc::new(source)
156 }
157
158 fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
159 let mut source = self.clone();
160 source.arrow_file_schema = Some(schema);
161 Arc::new(source)
162 }
163
164 fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
165 Arc::new(self.clone())
166 }
167
168 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
169 let mut source = self.clone();
170 source.projected_statistics = Some(statistics);
171 Arc::new(source)
172 }
173
174 fn metrics(&self) -> &ExecutionPlanMetricsSet {
175 &self._unused_df_metrics
176 }
177
178 fn statistics(&self) -> DFResult<Statistics> {
179 let statistics = self
180 .projected_statistics
181 .clone()
182 .vortex_expect("projected_statistics must be set");
183
184 if self.predicate.is_some() {
185 Ok(statistics.to_inexact())
186 } else {
187 Ok(statistics)
188 }
189 }
190
191 fn file_type(&self) -> &str {
192 VORTEX_FILE_EXTENSION
193 }
194
195 fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
196 match t {
197 DisplayFormatType::Default | DisplayFormatType::Verbose => {
198 if let Some(ref predicate) = self.predicate {
199 write!(f, ", predicate: {predicate}")?;
200 }
201 }
202 DisplayFormatType::TreeRender => {
204 if let Some(ref predicate) = self.predicate {
205 writeln!(f, "predicate={predicate}")?;
206 };
207 }
208 }
209 Ok(())
210 }
211
212 fn try_pushdown_filters(
213 &self,
214 filters: Vec<Arc<dyn PhysicalExpr>>,
215 _config: &ConfigOptions,
216 ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
217 let Some(schema) = self.arrow_file_schema.as_ref() else {
218 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
219 vec![PushedDown::No; filters.len()],
220 ));
221 };
222
223 let mut source = self.clone();
224
225 let filters = filters
226 .into_iter()
227 .map(|expr| {
228 if can_be_pushed_down(&expr, schema) {
229 PushedDownPredicate::supported(expr)
230 } else {
231 PushedDownPredicate::unsupported(expr)
232 }
233 })
234 .collect::<Vec<_>>();
235
236 if filters
237 .iter()
238 .all(|p| matches!(p.discriminant, PushedDown::No))
239 {
240 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
241 vec![PushedDown::No; filters.len()],
242 ));
243 }
244
245 let supported = filters
246 .iter()
247 .filter_map(|p| match p.discriminant {
248 PushedDown::Yes => Some(&p.predicate),
249 PushedDown::No => None,
250 })
251 .cloned();
252
253 let predicate = match source.predicate {
254 Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
255 None => conjunction(supported),
256 };
257 source.predicate = Some(predicate);
258
259 let pushdown_propagation = if source.predicate.clone().is_some() {
260 FilterPushdownPropagation::with_parent_pushdown_result(
261 filters.iter().map(|f| f.discriminant).collect(),
262 )
263 .with_updated_node(Arc::new(source) as _)
264 } else {
265 FilterPushdownPropagation::with_parent_pushdown_result(vec![
266 PushedDown::No;
267 filters.len()
268 ])
269 };
270
271 Ok(pushdown_propagation)
272 }
273
274 fn with_schema_adapter_factory(
275 &self,
276 factory: Arc<dyn SchemaAdapterFactory>,
277 ) -> DFResult<Arc<dyn FileSource>> {
278 let mut source = self.clone();
279 source.schema_adapter_factory = Some(factory);
280 Ok(Arc::new(source))
281 }
282
283 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
284 None
285 }
286}