vortex_datafusion/persistent/
format.rs1use std::any::Any;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9use arrow_schema::Schema;
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion_catalog::Session;
13use datafusion_common::ColumnStatistics;
14use datafusion_common::DataFusionError;
15use datafusion_common::GetExt;
16use datafusion_common::Result as DFResult;
17use datafusion_common::Statistics;
18use datafusion_common::config::ConfigField;
19use datafusion_common::config_namespace;
20use datafusion_common::internal_datafusion_err;
21use datafusion_common::not_impl_err;
22use datafusion_common::parsers::CompressionTypeVariant;
23use datafusion_common::stats::Precision;
24use datafusion_common_runtime::SpawnedTask;
25use datafusion_datasource::TableSchema;
26use datafusion_datasource::file::FileSource;
27use datafusion_datasource::file_compression_type::FileCompressionType;
28use datafusion_datasource::file_format::FileFormat;
29use datafusion_datasource::file_format::FileFormatFactory;
30use datafusion_datasource::file_scan_config::FileScanConfig;
31use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
32use datafusion_datasource::file_sink_config::FileSinkConfig;
33use datafusion_datasource::sink::DataSinkExec;
34use datafusion_datasource::source::DataSourceExec;
35use datafusion_expr::dml::InsertOp;
36use datafusion_physical_expr::LexRequirement;
37use datafusion_physical_plan::ExecutionPlan;
38use futures::FutureExt;
39use futures::StreamExt as _;
40use futures::TryStreamExt as _;
41use futures::stream;
42use object_store::ObjectMeta;
43use object_store::ObjectStore;
44use vortex::VortexSessionDefault;
45use vortex::dtype::DType;
46use vortex::dtype::Nullability;
47use vortex::dtype::PType;
48use vortex::dtype::arrow::FromArrowType;
49use vortex::error::VortexExpect;
50use vortex::error::VortexResult;
51use vortex::error::vortex_err;
52use vortex::expr::stats;
53use vortex::expr::stats::Stat;
54use vortex::file::EOF_SIZE;
55use vortex::file::MAX_POSTSCRIPT_SIZE;
56use vortex::file::OpenOptionsSessionExt;
57use vortex::file::VORTEX_FILE_EXTENSION;
58use vortex::io::object_store::ObjectStoreReadAt;
59use vortex::io::session::RuntimeSessionExt;
60use vortex::scalar::Scalar;
61use vortex::session::VortexSession;
62
63use super::cache::CachedVortexMetadata;
64use super::sink::VortexSink;
65use super::source::VortexSource;
66use crate::PrecisionExt as _;
67use crate::convert::TryToDataFusion;
68
69const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
70
71pub struct VortexFormat {
73 session: VortexSession,
74 opts: VortexTableOptions,
75}
76
77impl Debug for VortexFormat {
78 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("VortexFormat")
80 .field("opts", &self.opts)
81 .finish()
82 }
83}
84
85config_namespace! {
86 pub struct VortexTableOptions {
92 pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
97 pub projection_pushdown: bool, default = false
103 pub scan_concurrency: Option<usize>, default = None
109 }
110}
111
112impl Eq for VortexTableOptions {}
113
114#[derive(Debug)]
116pub struct VortexFormatFactory {
117 session: VortexSession,
118 options: Option<VortexTableOptions>,
119}
120
121impl GetExt for VortexFormatFactory {
122 fn get_ext(&self) -> String {
123 VORTEX_FILE_EXTENSION.to_string()
124 }
125}
126
127impl VortexFormatFactory {
128 #[expect(
130 clippy::new_without_default,
131 reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
132 )]
133 pub fn new() -> Self {
134 Self {
135 session: VortexSession::default(),
136 options: None,
137 }
138 }
139
140 pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
144 Self {
145 session,
146 options: Some(options),
147 }
148 }
149
150 pub fn with_options(mut self, options: VortexTableOptions) -> Self {
159 self.options = Some(options);
160 self
161 }
162}
163
164impl FileFormatFactory for VortexFormatFactory {
165 #[expect(clippy::disallowed_types, reason = "required by trait signature")]
166 fn create(
167 &self,
168 _state: &dyn Session,
169 format_options: &std::collections::HashMap<String, String>,
170 ) -> DFResult<Arc<dyn FileFormat>> {
171 let mut opts = self.options.clone().unwrap_or_default();
172 for (key, value) in format_options {
173 if let Some(key) = key.strip_prefix("format.") {
174 opts.set(key, value)?;
175 } else {
176 tracing::trace!("Ignoring options '{key}'");
177 }
178 }
179
180 Ok(Arc::new(VortexFormat::new_with_options(
181 self.session.clone(),
182 opts,
183 )))
184 }
185
186 fn default(&self) -> Arc<dyn FileFormat> {
187 Arc::new(VortexFormat::new(self.session.clone()))
188 }
189
190 fn as_any(&self) -> &dyn Any {
191 self
192 }
193}
194
195impl VortexFormat {
196 pub fn new(session: VortexSession) -> Self {
198 Self::new_with_options(session, VortexTableOptions::default())
199 }
200
201 pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
203 Self { session, opts }
204 }
205
206 pub fn options(&self) -> &VortexTableOptions {
208 &self.opts
209 }
210}
211
212#[async_trait]
213impl FileFormat for VortexFormat {
214 fn as_any(&self) -> &dyn Any {
215 self
216 }
217
218 fn compression_type(&self) -> Option<FileCompressionType> {
219 None
220 }
221
222 fn get_ext(&self) -> String {
223 VORTEX_FILE_EXTENSION.to_string()
224 }
225
226 fn get_ext_with_compression(
227 &self,
228 file_compression_type: &FileCompressionType,
229 ) -> DFResult<String> {
230 match file_compression_type.get_variant() {
231 CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
232 _ => Err(DataFusionError::Internal(
233 "Vortex does not support file level compression.".into(),
234 )),
235 }
236 }
237
238 async fn infer_schema(
239 &self,
240 state: &dyn Session,
241 store: &Arc<dyn ObjectStore>,
242 objects: &[ObjectMeta],
243 ) -> DFResult<SchemaRef> {
244 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
245
246 let mut file_schemas = stream::iter(objects.iter().cloned())
247 .map(|object| {
248 let store = store.clone();
249 let session = self.session.clone();
250 let opts = self.opts.clone();
251 let cache = file_metadata_cache.clone();
252
253 SpawnedTask::spawn(async move {
254 if let Some(cached) = cache.get(&object)
256 && let Some(cached_vortex) =
257 cached.as_any().downcast_ref::<CachedVortexMetadata>()
258 {
259 let inferred_schema = cached_vortex.footer().dtype().to_arrow_schema()?;
260 return VortexResult::Ok((object.location, inferred_schema));
261 }
262
263 let reader = Arc::new(ObjectStoreReadAt::new(
265 store,
266 object.location.clone(),
267 session.handle(),
268 ));
269
270 let vxf = session
271 .open_options()
272 .with_initial_read_size(opts.footer_initial_read_size_bytes)
273 .with_file_size(object.size)
274 .open_read(reader)
275 .await?;
276
277 let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
279 cache.put(&object, cached_metadata);
280
281 let inferred_schema = vxf.dtype().to_arrow_schema()?;
282 VortexResult::Ok((object.location, inferred_schema))
283 })
284 .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
285 })
286 .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
287 .try_collect::<Vec<_>>()
288 .await
289 .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
290
291 file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
293 let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
294
295 Ok(Arc::new(Schema::try_merge(file_schemas)?))
296 }
297
298 #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
299 async fn infer_stats(
300 &self,
301 state: &dyn Session,
302 store: &Arc<dyn ObjectStore>,
303 table_schema: SchemaRef,
304 object: &ObjectMeta,
305 ) -> DFResult<Statistics> {
306 let object = object.clone();
307 let store = store.clone();
308 let session = self.session.clone();
309 let opts = self.opts.clone();
310 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
311
312 SpawnedTask::spawn(async move {
313 let cached_metadata = file_metadata_cache.get(&object).and_then(|cached| {
315 cached
316 .as_any()
317 .downcast_ref::<CachedVortexMetadata>()
318 .map(|m| {
319 (
320 m.footer().dtype().clone(),
321 m.footer().statistics().cloned(),
322 m.footer().row_count(),
323 )
324 })
325 });
326
327 let (dtype, file_stats, row_count) = match cached_metadata {
328 Some(metadata) => metadata,
329 None => {
330 let reader = Arc::new(ObjectStoreReadAt::new(
332 store,
333 object.location.clone(),
334 session.handle(),
335 ));
336
337 let vxf = session
338 .open_options()
339 .with_initial_read_size(opts.footer_initial_read_size_bytes)
340 .with_file_size(object.size)
341 .open_read(reader)
342 .await
343 .map_err(|e| {
344 DataFusionError::Execution(format!(
345 "Failed to open Vortex file {}: {e}",
346 object.location
347 ))
348 })?;
349
350 let cached = Arc::new(CachedVortexMetadata::new(&vxf));
352 file_metadata_cache.put(&object, cached);
353
354 (
355 vxf.dtype().clone(),
356 vxf.file_stats().cloned(),
357 vxf.row_count(),
358 )
359 }
360 };
361
362 let struct_dtype = dtype
363 .as_struct_fields_opt()
364 .vortex_expect("dtype is not a struct");
365
366 let Some(file_stats) = file_stats else {
368 return Ok(Statistics {
370 num_rows: Precision::Exact(
371 usize::try_from(row_count)
372 .map_err(|_| vortex_err!("Row count overflow"))
373 .vortex_expect("Row count overflow"),
374 ),
375 total_byte_size: Precision::Absent,
376 column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
377 });
378 };
379
380 let mut sum_of_column_byte_sizes = stats::Precision::exact(0_usize);
381 let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
382
383 for field in table_schema.fields().iter() {
384 let Some(col_idx) = struct_dtype.find(field.name()) else {
387 column_statistics.push(ColumnStatistics::default());
389 continue;
390 };
391 let (stats_set, stats_dtype) = file_stats.get(col_idx);
392
393 let column_size = stats_set
395 .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
396 .unwrap_or_else(|| stats::Precision::inexact(0_usize));
397 sum_of_column_byte_sizes = sum_of_column_byte_sizes
398 .zip(column_size)
399 .map(|(acc, size)| acc + size);
400
401 let min = stats_set.get(Stat::Min).and_then(|pstat_val| {
405 pstat_val
406 .map(|stat_val| {
407 Scalar::try_new(
411 Stat::Min
412 .dtype(stats_dtype)
413 .vortex_expect("must have a valid dtype"),
414 Some(stat_val),
415 )
416 .vortex_expect("`Stat::Min` somehow had an incompatible `DType`")
417 .cast(&DType::from_arrow(field.as_ref()))
418 .vortex_expect("Unable to cast to target type that DataFusion wants")
419 .try_to_df()
420 .ok()
421 })
422 .transpose()
423 });
424
425 let max = stats_set.get(Stat::Max).and_then(|pstat_val| {
427 pstat_val
428 .map(|stat_val| {
429 Scalar::try_new(
430 Stat::Max
431 .dtype(stats_dtype)
432 .vortex_expect("must have a valid dtype"),
433 Some(stat_val),
434 )
435 .vortex_expect("`Stat::Max` somehow had an incompatible `DType`")
436 .cast(&DType::from_arrow(field.as_ref()))
437 .vortex_expect("Unable to cast to target type that DataFusion wants")
438 .try_to_df()
439 .ok()
440 })
441 .transpose()
442 });
443
444 let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
445
446 column_statistics.push(ColumnStatistics {
447 null_count: null_count.to_df(),
448 min_value: min.to_df(),
449 max_value: max.to_df(),
450 sum_value: Precision::Absent,
451 distinct_count: stats_set
452 .get_as::<bool>(Stat::IsConstant, &DType::Bool(Nullability::NonNullable))
453 .and_then(|is_constant| is_constant.as_exact().map(|_| Precision::Exact(1)))
454 .unwrap_or(Precision::Absent),
455 byte_size: column_size.to_df(),
457 })
458 }
459
460 let total_byte_size = sum_of_column_byte_sizes.to_df();
461
462 Ok(Statistics {
463 num_rows: Precision::Exact(
464 usize::try_from(row_count)
465 .map_err(|_| vortex_err!("Row count overflow"))
466 .vortex_expect("Row count overflow"),
467 ),
468 total_byte_size,
469 column_statistics,
470 })
471 })
472 .await
473 .vortex_expect("Failed to spawn infer_stats")
474 }
475
476 async fn create_physical_plan(
477 &self,
478 state: &dyn Session,
479 file_scan_config: FileScanConfig,
480 ) -> DFResult<Arc<dyn ExecutionPlan>> {
481 let mut source = file_scan_config
482 .file_source()
483 .as_any()
484 .downcast_ref::<VortexSource>()
485 .cloned()
486 .ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
487
488 source = source
489 .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());
490
491 let conf = FileScanConfigBuilder::from(file_scan_config)
492 .with_source(Arc::new(source))
493 .build();
494
495 Ok(DataSourceExec::from_data_source(conf))
496 }
497
498 async fn create_writer_physical_plan(
499 &self,
500 input: Arc<dyn ExecutionPlan>,
501 _state: &dyn Session,
502 conf: FileSinkConfig,
503 order_requirements: Option<LexRequirement>,
504 ) -> DFResult<Arc<dyn ExecutionPlan>> {
505 if conf.insert_op != InsertOp::Append {
506 return not_impl_err!("Overwrites are not implemented yet for Vortex");
507 }
508
509 let schema = conf.output_schema().clone();
510 let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
511
512 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
513 }
514
515 fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
516 let mut source = VortexSource::new(table_schema, self.session.clone())
517 .with_projection_pushdown(self.opts.projection_pushdown);
518
519 if let Some(scan_concurrency) = self.opts.scan_concurrency {
520 source = source.with_scan_concurrency(scan_concurrency);
521 }
522
523 Arc::new(source) as _
524 }
525}
526
527#[cfg(test)]
528mod tests {
529
530 use super::*;
531 use crate::common_tests::TestSessionContext;
532
533 #[tokio::test]
534 async fn create_table() -> anyhow::Result<()> {
535 let ctx = TestSessionContext::default();
536
537 ctx.session
538 .sql(
539 "CREATE EXTERNAL TABLE my_tbl \
540 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
541 STORED AS vortex \
542 LOCATION 'table/'",
543 )
544 .await?;
545
546 assert!(ctx.session.table_exist("my_tbl")?);
547
548 Ok(())
549 }
550
551 #[tokio::test]
552 async fn configure_format_source() -> anyhow::Result<()> {
553 let ctx = TestSessionContext::default();
554
555 ctx.session
556 .sql(
557 "CREATE EXTERNAL TABLE my_tbl \
558 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
559 STORED AS vortex \
560 LOCATION 'table/' \
561 OPTIONS( footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
562 )
563 .await?
564 .collect()
565 .await?;
566
567 Ok(())
568 }
569
570 #[test]
571 fn format_plumbs_footer_initial_read_size() {
572 let mut opts = VortexTableOptions::default();
573 opts.set("footer_initial_read_size_bytes", "12345").unwrap();
574
575 let format = VortexFormat::new_with_options(VortexSession::default(), opts);
576 assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
577 }
578}