vortex_datafusion/persistent/
format.rs1use std::any::Any;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9use arrow_schema::Schema;
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion_catalog::Session;
13use datafusion_common::ColumnStatistics;
14use datafusion_common::DataFusionError;
15use datafusion_common::GetExt;
16use datafusion_common::Result as DFResult;
17use datafusion_common::Statistics;
18use datafusion_common::config::ConfigField;
19use datafusion_common::config_namespace;
20use datafusion_common::internal_datafusion_err;
21use datafusion_common::not_impl_err;
22use datafusion_common::parsers::CompressionTypeVariant;
23use datafusion_common::stats::Precision;
24use datafusion_common_runtime::SpawnedTask;
25use datafusion_datasource::TableSchema;
26use datafusion_datasource::file::FileSource;
27use datafusion_datasource::file_compression_type::FileCompressionType;
28use datafusion_datasource::file_format::FileFormat;
29use datafusion_datasource::file_format::FileFormatFactory;
30use datafusion_datasource::file_scan_config::FileScanConfig;
31use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
32use datafusion_datasource::file_sink_config::FileSinkConfig;
33use datafusion_datasource::sink::DataSinkExec;
34use datafusion_datasource::source::DataSourceExec;
35use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
36use datafusion_expr::dml::InsertOp;
37use datafusion_physical_expr::LexRequirement;
38use datafusion_physical_plan::ExecutionPlan;
39use futures::FutureExt;
40use futures::StreamExt as _;
41use futures::TryStreamExt as _;
42use futures::stream;
43use object_store::ObjectMeta;
44use object_store::ObjectStore;
45use vortex::VortexSessionDefault;
46use vortex::dtype::DType;
47use vortex::dtype::Nullability;
48use vortex::dtype::PType;
49use vortex::dtype::arrow::FromArrowType;
50use vortex::error::VortexExpect;
51use vortex::error::VortexResult;
52use vortex::error::vortex_err;
53use vortex::expr::stats;
54use vortex::expr::stats::Stat;
55use vortex::file::EOF_SIZE;
56use vortex::file::MAX_POSTSCRIPT_SIZE;
57use vortex::file::OpenOptionsSessionExt;
58use vortex::file::VORTEX_FILE_EXTENSION;
59use vortex::io::object_store::ObjectStoreReadAt;
60use vortex::io::session::RuntimeSessionExt;
61use vortex::scalar::Scalar;
62use vortex::session::VortexSession;
63
64use super::cache::CachedVortexMetadata;
65use super::sink::VortexSink;
66use super::source::VortexSource;
67use crate::PrecisionExt as _;
68use crate::convert::TryToDataFusion;
69
70const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
71
72pub struct VortexFormat {
74 session: VortexSession,
75 opts: VortexTableOptions,
76}
77
78impl Debug for VortexFormat {
79 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
80 f.debug_struct("VortexFormat")
81 .field("opts", &self.opts)
82 .finish()
83 }
84}
85
86config_namespace! {
87 pub struct VortexTableOptions {
93 pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
98 pub projection_pushdown: bool, default = false
104 pub scan_concurrency: Option<usize>, default = None
110 }
111}
112
113impl Eq for VortexTableOptions {}
114
115#[derive(Debug)]
117pub struct VortexFormatFactory {
118 session: VortexSession,
119 options: Option<VortexTableOptions>,
120}
121
122impl GetExt for VortexFormatFactory {
123 fn get_ext(&self) -> String {
124 VORTEX_FILE_EXTENSION.to_string()
125 }
126}
127
128impl VortexFormatFactory {
129 #[expect(
131 clippy::new_without_default,
132 reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
133 )]
134 pub fn new() -> Self {
135 Self {
136 session: VortexSession::default(),
137 options: None,
138 }
139 }
140
141 pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
145 Self {
146 session,
147 options: Some(options),
148 }
149 }
150
151 pub fn with_options(mut self, options: VortexTableOptions) -> Self {
160 self.options = Some(options);
161 self
162 }
163}
164
165impl FileFormatFactory for VortexFormatFactory {
166 #[expect(clippy::disallowed_types, reason = "required by trait signature")]
167 fn create(
168 &self,
169 _state: &dyn Session,
170 format_options: &std::collections::HashMap<String, String>,
171 ) -> DFResult<Arc<dyn FileFormat>> {
172 let mut opts = self.options.clone().unwrap_or_default();
173 for (key, value) in format_options {
174 if let Some(key) = key.strip_prefix("format.") {
175 opts.set(key, value)?;
176 } else {
177 tracing::trace!("Ignoring options '{key}'");
178 }
179 }
180
181 Ok(Arc::new(VortexFormat::new_with_options(
182 self.session.clone(),
183 opts,
184 )))
185 }
186
187 fn default(&self) -> Arc<dyn FileFormat> {
188 Arc::new(VortexFormat::new(self.session.clone()))
189 }
190
191 fn as_any(&self) -> &dyn Any {
192 self
193 }
194}
195
196impl VortexFormat {
197 pub fn new(session: VortexSession) -> Self {
199 Self::new_with_options(session, VortexTableOptions::default())
200 }
201
202 pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
204 Self { session, opts }
205 }
206
207 pub fn options(&self) -> &VortexTableOptions {
209 &self.opts
210 }
211}
212
213#[async_trait]
214impl FileFormat for VortexFormat {
215 fn as_any(&self) -> &dyn Any {
216 self
217 }
218
219 fn compression_type(&self) -> Option<FileCompressionType> {
220 None
221 }
222
223 fn get_ext(&self) -> String {
224 VORTEX_FILE_EXTENSION.to_string()
225 }
226
227 fn get_ext_with_compression(
228 &self,
229 file_compression_type: &FileCompressionType,
230 ) -> DFResult<String> {
231 match file_compression_type.get_variant() {
232 CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
233 _ => Err(DataFusionError::Internal(
234 "Vortex does not support file level compression.".into(),
235 )),
236 }
237 }
238
239 async fn infer_schema(
240 &self,
241 state: &dyn Session,
242 store: &Arc<dyn ObjectStore>,
243 objects: &[ObjectMeta],
244 ) -> DFResult<SchemaRef> {
245 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
246
247 let mut file_schemas = stream::iter(objects.iter().cloned())
248 .map(|object| {
249 let store = store.clone();
250 let session = self.session.clone();
251 let opts = self.opts.clone();
252 let cache = file_metadata_cache.clone();
253
254 SpawnedTask::spawn(async move {
255 if let Some(entry) = cache.get(&object.location)
257 && entry.is_valid_for(&object)
258 && let Some(cached_vortex) = entry
259 .file_metadata
260 .as_any()
261 .downcast_ref::<CachedVortexMetadata>()
262 {
263 let inferred_schema = cached_vortex.footer().dtype().to_arrow_schema()?;
264 return VortexResult::Ok((object.location, inferred_schema));
265 }
266
267 let reader = Arc::new(ObjectStoreReadAt::new(
269 store,
270 object.location.clone(),
271 session.handle(),
272 ));
273
274 let vxf = session
275 .open_options()
276 .with_initial_read_size(opts.footer_initial_read_size_bytes)
277 .with_file_size(object.size)
278 .open_read(reader)
279 .await?;
280
281 let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
283 let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata);
284 cache.put(&object.location, entry);
285
286 let inferred_schema = vxf.dtype().to_arrow_schema()?;
287 VortexResult::Ok((object.location, inferred_schema))
288 })
289 .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
290 })
291 .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
292 .try_collect::<Vec<_>>()
293 .await
294 .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
295
296 file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
298 let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
299
300 Ok(Arc::new(Schema::try_merge(file_schemas)?))
301 }
302
303 #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
304 async fn infer_stats(
305 &self,
306 state: &dyn Session,
307 store: &Arc<dyn ObjectStore>,
308 table_schema: SchemaRef,
309 object: &ObjectMeta,
310 ) -> DFResult<Statistics> {
311 let object = object.clone();
312 let store = store.clone();
313 let session = self.session.clone();
314 let opts = self.opts.clone();
315 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
316
317 SpawnedTask::spawn(async move {
318 let cached_metadata = file_metadata_cache
320 .get(&object.location)
321 .filter(|entry| entry.is_valid_for(&object))
322 .and_then(|entry| {
323 entry
324 .file_metadata
325 .as_any()
326 .downcast_ref::<CachedVortexMetadata>()
327 .map(|m| {
328 (
329 m.footer().dtype().clone(),
330 m.footer().statistics().cloned(),
331 m.footer().row_count(),
332 )
333 })
334 });
335
336 let (dtype, file_stats, row_count) = match cached_metadata {
337 Some(metadata) => metadata,
338 None => {
339 let reader = Arc::new(ObjectStoreReadAt::new(
341 store,
342 object.location.clone(),
343 session.handle(),
344 ));
345
346 let vxf = session
347 .open_options()
348 .with_initial_read_size(opts.footer_initial_read_size_bytes)
349 .with_file_size(object.size)
350 .open_read(reader)
351 .await
352 .map_err(|e| {
353 DataFusionError::Execution(format!(
354 "Failed to open Vortex file {}: {e}",
355 object.location
356 ))
357 })?;
358
359 let file_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
361 let entry = CachedFileMetadataEntry::new(object.clone(), file_metadata);
362 file_metadata_cache.put(&object.location, entry);
363
364 (
365 vxf.dtype().clone(),
366 vxf.file_stats().cloned(),
367 vxf.row_count(),
368 )
369 }
370 };
371
372 let struct_dtype = dtype
373 .as_struct_fields_opt()
374 .vortex_expect("dtype is not a struct");
375
376 let Some(file_stats) = file_stats else {
378 return Ok(Statistics {
380 num_rows: Precision::Exact(
381 usize::try_from(row_count)
382 .map_err(|_| vortex_err!("Row count overflow"))
383 .vortex_expect("Row count overflow"),
384 ),
385 total_byte_size: Precision::Absent,
386 column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
387 });
388 };
389
390 let mut sum_of_column_byte_sizes = stats::Precision::exact(0_usize);
391 let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
392
393 for field in table_schema.fields().iter() {
394 let Some(col_idx) = struct_dtype.find(field.name()) else {
397 column_statistics.push(ColumnStatistics::default());
399 continue;
400 };
401 let (stats_set, stats_dtype) = file_stats.get(col_idx);
402
403 let column_size = stats_set
405 .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
406 .unwrap_or_else(|| stats::Precision::inexact(0_usize));
407 sum_of_column_byte_sizes = sum_of_column_byte_sizes
408 .zip(column_size)
409 .map(|(acc, size)| acc + size);
410
411 let min = stats_set.get(Stat::Min).and_then(|pstat_val| {
415 pstat_val
416 .map(|stat_val| {
417 Scalar::try_new(
421 Stat::Min
422 .dtype(stats_dtype)
423 .vortex_expect("must have a valid dtype"),
424 Some(stat_val),
425 )
426 .vortex_expect("`Stat::Min` somehow had an incompatible `DType`")
427 .cast(&DType::from_arrow(field.as_ref()))
428 .vortex_expect("Unable to cast to target type that DataFusion wants")
429 .try_to_df()
430 .ok()
431 })
432 .transpose()
433 });
434
435 let max = stats_set.get(Stat::Max).and_then(|pstat_val| {
437 pstat_val
438 .map(|stat_val| {
439 Scalar::try_new(
440 Stat::Max
441 .dtype(stats_dtype)
442 .vortex_expect("must have a valid dtype"),
443 Some(stat_val),
444 )
445 .vortex_expect("`Stat::Max` somehow had an incompatible `DType`")
446 .cast(&DType::from_arrow(field.as_ref()))
447 .vortex_expect("Unable to cast to target type that DataFusion wants")
448 .try_to_df()
449 .ok()
450 })
451 .transpose()
452 });
453
454 let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
455
456 column_statistics.push(ColumnStatistics {
457 null_count: null_count.to_df(),
458 min_value: min.to_df(),
459 max_value: max.to_df(),
460 sum_value: Precision::Absent,
461 distinct_count: stats_set
462 .get_as::<bool>(Stat::IsConstant, &DType::Bool(Nullability::NonNullable))
463 .and_then(|is_constant| is_constant.as_exact().map(|_| Precision::Exact(1)))
464 .unwrap_or(Precision::Absent),
465 byte_size: column_size.to_df(),
467 })
468 }
469
470 let total_byte_size = sum_of_column_byte_sizes.to_df();
471
472 Ok(Statistics {
473 num_rows: Precision::Exact(
474 usize::try_from(row_count)
475 .map_err(|_| vortex_err!("Row count overflow"))
476 .vortex_expect("Row count overflow"),
477 ),
478 total_byte_size,
479 column_statistics,
480 })
481 })
482 .await
483 .vortex_expect("Failed to spawn infer_stats")
484 }
485
486 async fn create_physical_plan(
487 &self,
488 state: &dyn Session,
489 file_scan_config: FileScanConfig,
490 ) -> DFResult<Arc<dyn ExecutionPlan>> {
491 let mut source = file_scan_config
492 .file_source()
493 .as_any()
494 .downcast_ref::<VortexSource>()
495 .cloned()
496 .ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
497
498 source = source
499 .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());
500
501 let conf = FileScanConfigBuilder::from(file_scan_config)
502 .with_source(Arc::new(source))
503 .build();
504
505 Ok(DataSourceExec::from_data_source(conf))
506 }
507
508 async fn create_writer_physical_plan(
509 &self,
510 input: Arc<dyn ExecutionPlan>,
511 _state: &dyn Session,
512 conf: FileSinkConfig,
513 order_requirements: Option<LexRequirement>,
514 ) -> DFResult<Arc<dyn ExecutionPlan>> {
515 if conf.insert_op != InsertOp::Append {
516 return not_impl_err!("Overwrites are not implemented yet for Vortex");
517 }
518
519 let schema = conf.output_schema().clone();
520 let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
521
522 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
523 }
524
525 fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
526 let mut source = VortexSource::new(table_schema, self.session.clone())
527 .with_projection_pushdown(self.opts.projection_pushdown);
528
529 if let Some(scan_concurrency) = self.opts.scan_concurrency {
530 source = source.with_scan_concurrency(scan_concurrency);
531 }
532
533 Arc::new(source) as _
534 }
535}
536
537#[cfg(test)]
538mod tests {
539
540 use super::*;
541 use crate::common_tests::TestSessionContext;
542
543 #[tokio::test]
544 async fn create_table() -> anyhow::Result<()> {
545 let ctx = TestSessionContext::default();
546
547 ctx.session
548 .sql(
549 "CREATE EXTERNAL TABLE my_tbl \
550 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
551 STORED AS vortex \
552 LOCATION 'table/'",
553 )
554 .await?;
555
556 assert!(ctx.session.table_exist("my_tbl")?);
557
558 Ok(())
559 }
560
561 #[tokio::test]
562 async fn configure_format_source() -> anyhow::Result<()> {
563 let ctx = TestSessionContext::default();
564
565 ctx.session
566 .sql(
567 "CREATE EXTERNAL TABLE my_tbl \
568 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
569 STORED AS vortex \
570 LOCATION 'table/' \
571 OPTIONS( footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
572 )
573 .await?
574 .collect()
575 .await?;
576
577 Ok(())
578 }
579
580 #[test]
581 fn format_plumbs_footer_initial_read_size() {
582 let mut opts = VortexTableOptions::default();
583 opts.set("footer_initial_read_size_bytes", "12345").unwrap();
584
585 let format = VortexFormat::new_with_options(VortexSession::default(), opts);
586 assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
587 }
588}