vortex_datafusion/persistent/
source.rs1use std::any::Any;
5use std::fmt::Formatter;
6use std::sync::Arc;
7use std::sync::Weak;
8
9use datafusion_common::Result as DFResult;
10use datafusion_common::config::ConfigOptions;
11use datafusion_datasource::TableSchema;
12use datafusion_datasource::file::FileSource;
13use datafusion_datasource::file_scan_config::FileScanConfig;
14use datafusion_datasource::file_stream::FileOpener;
15use datafusion_execution::cache::cache_manager::FileMetadataCache;
16use datafusion_physical_expr::PhysicalExprRef;
17use datafusion_physical_expr::conjunction;
18use datafusion_physical_expr::projection::ProjectionExprs;
19use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
20use datafusion_physical_expr_common::physical_expr::fmt_sql;
21use datafusion_physical_plan::DisplayFormatType;
22use datafusion_physical_plan::PhysicalExpr;
23use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
24use datafusion_physical_plan::filter_pushdown::PushedDown;
25use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
26use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
27use object_store::ObjectStore;
28use object_store::path::Path;
29use vortex::error::VortexExpect;
30use vortex::file::VORTEX_FILE_EXTENSION;
31use vortex::layout::LayoutReader;
32use vortex::metrics::DefaultMetricsRegistry;
33use vortex::metrics::MetricsRegistry;
34use vortex::session::VortexSession;
35use vortex_utils::aliases::dash_map::DashMap;
36
37use super::opener::VortexOpener;
38use crate::VortexTableOptions;
39use crate::convert::exprs::DefaultExpressionConvertor;
40use crate::convert::exprs::ExpressionConvertor;
41use crate::persistent::reader::DefaultVortexReaderFactory;
42use crate::persistent::reader::VortexReaderFactory;
43
44#[derive(Clone)]
48pub struct VortexSource {
49 pub(crate) session: VortexSession,
50 pub(crate) table_schema: TableSchema,
51 pub(crate) projection: ProjectionExprs,
52 pub(crate) full_predicate: Option<PhysicalExprRef>,
55 pub(crate) vortex_predicate: Option<PhysicalExprRef>,
58 pub(crate) batch_size: Option<usize>,
59 _unused_df_metrics: ExecutionPlanMetricsSet,
60 layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
64 expression_convertor: Arc<dyn ExpressionConvertor>,
65 pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
66 vx_metrics_registry: Arc<dyn MetricsRegistry>,
67 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
68 options: VortexTableOptions,
70}
71
72impl VortexSource {
73 pub fn new(table_schema: TableSchema, session: VortexSession) -> Self {
78 let full_schema = table_schema.table_schema();
79 let indices = (0..full_schema.fields().len()).collect::<Vec<_>>();
80 let projection = ProjectionExprs::from_indices(&indices, full_schema);
81
82 Self {
83 session,
84 table_schema,
85 projection,
86 full_predicate: None,
87 vortex_predicate: None,
88 batch_size: None,
89 _unused_df_metrics: Default::default(),
90 layout_readers: Arc::new(DashMap::default()),
91 expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
92 vortex_reader_factory: None,
93 vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
94 file_metadata_cache: None,
95 options: VortexTableOptions::default(),
96 }
97 }
98
99 pub fn with_projection_pushdown(mut self, enabled: bool) -> Self {
101 self.options.projection_pushdown = enabled;
102 self
103 }
104
105 pub fn with_expression_convertor(
107 mut self,
108 expr_convertor: Arc<dyn ExpressionConvertor>,
109 ) -> Self {
110 self.expression_convertor = expr_convertor;
111 self
112 }
113
114 pub fn with_vortex_reader_factory(
118 mut self,
119 vortex_reader_factory: Arc<dyn VortexReaderFactory>,
120 ) -> Self {
121 self.vortex_reader_factory = Some(vortex_reader_factory);
122 self
123 }
124
125 pub fn metrics_registry(&self) -> &Arc<dyn MetricsRegistry> {
127 &self.vx_metrics_registry
128 }
129
130 pub fn with_file_metadata_cache(
132 mut self,
133 file_metadata_cache: Arc<dyn FileMetadataCache>,
134 ) -> Self {
135 self.file_metadata_cache = Some(file_metadata_cache);
136 self
137 }
138
139 pub fn with_scan_concurrency(mut self, scan_concurrency: usize) -> Self {
141 self.options.scan_concurrency = Some(scan_concurrency);
142 self
143 }
144
145 pub fn options(&self) -> &VortexTableOptions {
147 &self.options
148 }
149
150 pub fn with_options(mut self, opts: VortexTableOptions) -> Self {
152 self.options = opts;
153 self
154 }
155}
156
157impl FileSource for VortexSource {
158 fn create_file_opener(
159 &self,
160 object_store: Arc<dyn ObjectStore>,
161 base_config: &FileScanConfig,
162 partition: usize,
163 ) -> DFResult<Arc<dyn FileOpener>> {
164 let batch_size = self
165 .batch_size
166 .vortex_expect("batch_size must be supplied to VortexSource");
167
168 let expr_adapter_factory = base_config
169 .expr_adapter_factory
170 .clone()
171 .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory));
172
173 let vortex_reader_factory = self
174 .vortex_reader_factory
175 .clone()
176 .unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store)));
177
178 let opener = VortexOpener {
179 partition,
180 session: self.session.clone(),
181 vortex_reader_factory,
182 projection: self.projection.clone(),
183 filter: self.vortex_predicate.clone(),
184 file_pruning_predicate: self.full_predicate.clone(),
185 expr_adapter_factory,
186 table_schema: self.table_schema.clone(),
187 batch_size,
188 limit: base_config.limit.map(|l| l as u64),
189 metrics_registry: self.vx_metrics_registry.clone(),
190 layout_readers: self.layout_readers.clone(),
191 has_output_ordering: !base_config.output_ordering.is_empty(),
192 expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
193 file_metadata_cache: self.file_metadata_cache.clone(),
194 projection_pushdown: self.options.projection_pushdown,
195 scan_concurrency: self.options.scan_concurrency,
196 };
197
198 Ok(Arc::new(opener))
199 }
200
201 fn as_any(&self) -> &dyn Any {
202 self
203 }
204
205 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
206 let mut source = self.clone();
207 source.batch_size = Some(batch_size);
208 Arc::new(source)
209 }
210
211 fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
212 self.vortex_predicate.clone()
213 }
214
215 fn metrics(&self) -> &ExecutionPlanMetricsSet {
216 &self._unused_df_metrics
217 }
218
219 fn file_type(&self) -> &str {
220 VORTEX_FILE_EXTENSION
221 }
222
223 fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
224 match t {
225 DisplayFormatType::Default | DisplayFormatType::Verbose => {
226 if let Some(ref predicate) = self.vortex_predicate {
227 write!(f, ", predicate: {predicate}")?;
228 }
229 }
230 DisplayFormatType::TreeRender => {
232 if let Some(ref predicate) = self.vortex_predicate {
233 writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
234 };
235 }
236 }
237 Ok(())
238 }
239
240 fn supports_repartitioning(&self) -> bool {
241 true
242 }
243
244 fn try_pushdown_filters(
245 &self,
246 filters: Vec<Arc<dyn PhysicalExpr>>,
247 _config: &ConfigOptions,
248 ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
249 if filters.is_empty() {
250 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
251 vec![],
252 ));
253 }
254
255 let mut source = self.clone();
256
257 source.full_predicate = match source.full_predicate {
260 Some(predicate) => Some(conjunction(
261 std::iter::once(predicate).chain(filters.clone()),
262 )),
263 None => Some(conjunction(filters.clone())),
264 };
265
266 let supported_filters = filters
267 .into_iter()
268 .map(|expr| {
269 if self
270 .expression_convertor
271 .can_be_pushed_down(&expr, self.table_schema.file_schema())
272 {
273 PushedDownPredicate::supported(expr)
274 } else {
275 PushedDownPredicate::unsupported(expr)
276 }
277 })
278 .collect::<Vec<_>>();
279
280 if supported_filters
281 .iter()
282 .all(|p| matches!(p.discriminant, PushedDown::No))
283 {
284 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
285 vec![PushedDown::No; supported_filters.len()],
286 )
287 .with_updated_node(Arc::new(source) as _));
288 }
289
290 let supported = supported_filters
291 .iter()
292 .filter_map(|p| match p.discriminant {
293 PushedDown::Yes => Some(&p.predicate),
294 PushedDown::No => None,
295 })
296 .cloned();
297
298 let predicate = match source.vortex_predicate {
299 Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
300 None => conjunction(supported),
301 };
302
303 tracing::debug!(%predicate, "Saving predicate");
304
305 source.vortex_predicate = Some(predicate);
306
307 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
308 supported_filters.iter().map(|f| f.discriminant).collect(),
309 )
310 .with_updated_node(Arc::new(source) as _))
311 }
312
313 fn try_pushdown_projection(
314 &self,
315 projection: &ProjectionExprs,
316 ) -> DFResult<Option<Arc<dyn FileSource>>> {
317 let mut source = self.clone();
318 source.projection = self.projection.try_merge(projection)?;
319 Ok(Some(Arc::new(source)))
320 }
321
322 fn projection(&self) -> Option<&ProjectionExprs> {
323 Some(&self.projection)
324 }
325
326 fn table_schema(&self) -> &TableSchema {
327 &self.table_schema
328 }
329}