vortex_datafusion/persistent/
format.rs1use std::any::Any;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9use arrow_schema::Schema;
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion_catalog::Session;
13use datafusion_common::ColumnStatistics;
14use datafusion_common::DataFusionError;
15use datafusion_common::GetExt;
16use datafusion_common::Result as DFResult;
17use datafusion_common::Statistics;
18use datafusion_common::config::ConfigField;
19use datafusion_common::config_namespace;
20use datafusion_common::internal_datafusion_err;
21use datafusion_common::not_impl_err;
22use datafusion_common::parsers::CompressionTypeVariant;
23use datafusion_common::stats::Precision;
24use datafusion_common_runtime::SpawnedTask;
25use datafusion_datasource::TableSchema;
26use datafusion_datasource::file::FileSource;
27use datafusion_datasource::file_compression_type::FileCompressionType;
28use datafusion_datasource::file_format::FileFormat;
29use datafusion_datasource::file_format::FileFormatFactory;
30use datafusion_datasource::file_scan_config::FileScanConfig;
31use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
32use datafusion_datasource::file_sink_config::FileSinkConfig;
33use datafusion_datasource::sink::DataSinkExec;
34use datafusion_datasource::source::DataSourceExec;
35use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
36use datafusion_expr::dml::InsertOp;
37use datafusion_physical_expr::LexRequirement;
38use datafusion_physical_plan::ExecutionPlan;
39use futures::FutureExt;
40use futures::StreamExt as _;
41use futures::TryStreamExt as _;
42use futures::stream;
43use object_store::ObjectMeta;
44use object_store::ObjectStore;
45use vortex::VortexSessionDefault;
46use vortex::array::memory::MemorySessionExt;
47use vortex::dtype::DType;
48use vortex::dtype::Nullability;
49use vortex::dtype::PType;
50use vortex::dtype::arrow::FromArrowType;
51use vortex::error::VortexExpect;
52use vortex::error::VortexResult;
53use vortex::error::vortex_err;
54use vortex::expr::stats;
55use vortex::expr::stats::Stat;
56use vortex::file::EOF_SIZE;
57use vortex::file::MAX_POSTSCRIPT_SIZE;
58use vortex::file::OpenOptionsSessionExt;
59use vortex::file::VORTEX_FILE_EXTENSION;
60use vortex::io::object_store::ObjectStoreReadAt;
61use vortex::io::session::RuntimeSessionExt;
62use vortex::scalar::Scalar;
63use vortex::session::VortexSession;
64
65use super::cache::CachedVortexMetadata;
66use super::sink::VortexSink;
67use super::source::VortexSource;
68use crate::PrecisionExt as _;
69use crate::convert::TryToDataFusion;
70
71const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
72
73pub struct VortexFormat {
75 session: VortexSession,
76 opts: VortexTableOptions,
77}
78
79impl Debug for VortexFormat {
80 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("VortexFormat")
82 .field("opts", &self.opts)
83 .finish()
84 }
85}
86
87config_namespace! {
88 pub struct VortexTableOptions {
94 pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
99 pub projection_pushdown: bool, default = false
105 pub scan_concurrency: Option<usize>, default = None
111 }
112}
113
114impl Eq for VortexTableOptions {}
115
116#[derive(Debug)]
118pub struct VortexFormatFactory {
119 session: VortexSession,
120 options: Option<VortexTableOptions>,
121}
122
123impl GetExt for VortexFormatFactory {
124 fn get_ext(&self) -> String {
125 VORTEX_FILE_EXTENSION.to_string()
126 }
127}
128
129impl VortexFormatFactory {
130 #[expect(
132 clippy::new_without_default,
133 reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
134 )]
135 pub fn new() -> Self {
136 Self {
137 session: VortexSession::default(),
138 options: None,
139 }
140 }
141
142 pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
146 Self {
147 session,
148 options: Some(options),
149 }
150 }
151
152 pub fn with_options(mut self, options: VortexTableOptions) -> Self {
161 self.options = Some(options);
162 self
163 }
164}
165
166impl FileFormatFactory for VortexFormatFactory {
167 #[expect(clippy::disallowed_types, reason = "required by trait signature")]
168 fn create(
169 &self,
170 _state: &dyn Session,
171 format_options: &std::collections::HashMap<String, String>,
172 ) -> DFResult<Arc<dyn FileFormat>> {
173 let mut opts = self.options.clone().unwrap_or_default();
174 for (key, value) in format_options {
175 if let Some(key) = key.strip_prefix("format.") {
176 opts.set(key, value)?;
177 } else {
178 tracing::trace!("Ignoring options '{key}'");
179 }
180 }
181
182 Ok(Arc::new(VortexFormat::new_with_options(
183 self.session.clone(),
184 opts,
185 )))
186 }
187
188 fn default(&self) -> Arc<dyn FileFormat> {
189 Arc::new(VortexFormat::new(self.session.clone()))
190 }
191
192 fn as_any(&self) -> &dyn Any {
193 self
194 }
195}
196
197impl VortexFormat {
198 pub fn new(session: VortexSession) -> Self {
200 Self::new_with_options(session, VortexTableOptions::default())
201 }
202
203 pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
205 Self { session, opts }
206 }
207
208 pub fn options(&self) -> &VortexTableOptions {
210 &self.opts
211 }
212}
213
214#[async_trait]
215impl FileFormat for VortexFormat {
216 fn as_any(&self) -> &dyn Any {
217 self
218 }
219
220 fn compression_type(&self) -> Option<FileCompressionType> {
221 None
222 }
223
224 fn get_ext(&self) -> String {
225 VORTEX_FILE_EXTENSION.to_string()
226 }
227
228 fn get_ext_with_compression(
229 &self,
230 file_compression_type: &FileCompressionType,
231 ) -> DFResult<String> {
232 match file_compression_type.get_variant() {
233 CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
234 _ => Err(DataFusionError::Internal(
235 "Vortex does not support file level compression.".into(),
236 )),
237 }
238 }
239
240 async fn infer_schema(
241 &self,
242 state: &dyn Session,
243 store: &Arc<dyn ObjectStore>,
244 objects: &[ObjectMeta],
245 ) -> DFResult<SchemaRef> {
246 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
247
248 let mut file_schemas = stream::iter(objects.iter().cloned())
249 .map(|object| {
250 let store = Arc::clone(store);
251 let session = self.session.clone();
252 let opts = self.opts.clone();
253 let cache = Arc::clone(&file_metadata_cache);
254
255 SpawnedTask::spawn(async move {
256 if let Some(entry) = cache.get(&object.location)
258 && entry.is_valid_for(&object)
259 && let Some(cached_vortex) = entry
260 .file_metadata
261 .as_any()
262 .downcast_ref::<CachedVortexMetadata>()
263 {
264 let inferred_schema = cached_vortex.footer().dtype().to_arrow_schema()?;
265 return VortexResult::Ok((object.location, inferred_schema));
266 }
267
268 let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
270 store,
271 object.location.clone(),
272 session.handle(),
273 session.allocator(),
274 ));
275
276 let vxf = session
277 .open_options()
278 .with_initial_read_size(opts.footer_initial_read_size_bytes)
279 .with_file_size(object.size)
280 .open_read(reader)
281 .await?;
282
283 let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
285 let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata);
286 cache.put(&object.location, entry);
287
288 let inferred_schema = vxf.dtype().to_arrow_schema()?;
289 VortexResult::Ok((object.location, inferred_schema))
290 })
291 .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
292 })
293 .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
294 .try_collect::<Vec<_>>()
295 .await
296 .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
297
298 file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
300 let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
301
302 Ok(Arc::new(Schema::try_merge(file_schemas)?))
303 }
304
305 #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
306 async fn infer_stats(
307 &self,
308 state: &dyn Session,
309 store: &Arc<dyn ObjectStore>,
310 table_schema: SchemaRef,
311 object: &ObjectMeta,
312 ) -> DFResult<Statistics> {
313 let object = object.clone();
314 let store = Arc::clone(store);
315 let session = self.session.clone();
316 let opts = self.opts.clone();
317 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
318
319 SpawnedTask::spawn(async move {
320 let cached_metadata = file_metadata_cache
322 .get(&object.location)
323 .filter(|entry| entry.is_valid_for(&object))
324 .and_then(|entry| {
325 entry
326 .file_metadata
327 .as_any()
328 .downcast_ref::<CachedVortexMetadata>()
329 .map(|m| {
330 (
331 m.footer().dtype().clone(),
332 m.footer().statistics().cloned(),
333 m.footer().row_count(),
334 )
335 })
336 });
337
338 let (dtype, file_stats, row_count) = match cached_metadata {
339 Some(metadata) => metadata,
340 None => {
341 let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
343 store,
344 object.location.clone(),
345 session.handle(),
346 session.allocator(),
347 ));
348
349 let vxf = session
350 .open_options()
351 .with_initial_read_size(opts.footer_initial_read_size_bytes)
352 .with_file_size(object.size)
353 .open_read(reader)
354 .await
355 .map_err(|e| {
356 DataFusionError::Execution(format!(
357 "Failed to open Vortex file {}: {e}",
358 object.location
359 ))
360 })?;
361
362 let file_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
364 let entry = CachedFileMetadataEntry::new(object.clone(), file_metadata);
365 file_metadata_cache.put(&object.location, entry);
366
367 (
368 vxf.dtype().clone(),
369 vxf.file_stats().cloned(),
370 vxf.row_count(),
371 )
372 }
373 };
374
375 let struct_dtype = dtype
376 .as_struct_fields_opt()
377 .vortex_expect("dtype is not a struct");
378
379 let Some(file_stats) = file_stats else {
381 return Ok(Statistics {
383 num_rows: Precision::Exact(
384 usize::try_from(row_count)
385 .map_err(|_| vortex_err!("Row count overflow"))
386 .vortex_expect("Row count overflow"),
387 ),
388 total_byte_size: Precision::Absent,
389 column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
390 });
391 };
392
393 let mut sum_of_column_byte_sizes = stats::Precision::exact(0_usize);
394 let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
395
396 for field in table_schema.fields().iter() {
397 let Some(col_idx) = struct_dtype.find(field.name()) else {
400 column_statistics.push(ColumnStatistics::default());
402 continue;
403 };
404 let (stats_set, stats_dtype) = file_stats.get(col_idx);
405
406 let column_size = stats_set
408 .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
409 .unwrap_or_else(|| stats::Precision::inexact(0_usize));
410 sum_of_column_byte_sizes = sum_of_column_byte_sizes
411 .zip(column_size)
412 .map(|(acc, size)| acc + size);
413
414 let min = stats_set.get(Stat::Min).and_then(|pstat_val| {
418 pstat_val
419 .map(|stat_val| {
420 Scalar::try_new(
424 Stat::Min
425 .dtype(stats_dtype)
426 .vortex_expect("must have a valid dtype"),
427 Some(stat_val),
428 )
429 .vortex_expect("`Stat::Min` somehow had an incompatible `DType`")
430 .cast(&DType::from_arrow(field.as_ref()))
431 .vortex_expect("Unable to cast to target type that DataFusion wants")
432 .try_to_df()
433 .ok()
434 })
435 .transpose()
436 });
437
438 let max = stats_set.get(Stat::Max).and_then(|pstat_val| {
440 pstat_val
441 .map(|stat_val| {
442 Scalar::try_new(
443 Stat::Max
444 .dtype(stats_dtype)
445 .vortex_expect("must have a valid dtype"),
446 Some(stat_val),
447 )
448 .vortex_expect("`Stat::Max` somehow had an incompatible `DType`")
449 .cast(&DType::from_arrow(field.as_ref()))
450 .vortex_expect("Unable to cast to target type that DataFusion wants")
451 .try_to_df()
452 .ok()
453 })
454 .transpose()
455 });
456
457 let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
458
459 column_statistics.push(ColumnStatistics {
460 null_count: null_count.to_df(),
461 min_value: min.to_df(),
462 max_value: max.to_df(),
463 sum_value: Precision::Absent,
464 distinct_count: stats_set
465 .get_as::<bool>(Stat::IsConstant, &DType::Bool(Nullability::NonNullable))
466 .and_then(|is_constant| is_constant.as_exact().map(|_| Precision::Exact(1)))
467 .unwrap_or(Precision::Absent),
468 byte_size: column_size.to_df(),
470 })
471 }
472
473 let total_byte_size = sum_of_column_byte_sizes.to_df();
474
475 Ok(Statistics {
476 num_rows: Precision::Exact(
477 usize::try_from(row_count)
478 .map_err(|_| vortex_err!("Row count overflow"))
479 .vortex_expect("Row count overflow"),
480 ),
481 total_byte_size,
482 column_statistics,
483 })
484 })
485 .await
486 .vortex_expect("Failed to spawn infer_stats")
487 }
488
489 async fn create_physical_plan(
490 &self,
491 state: &dyn Session,
492 file_scan_config: FileScanConfig,
493 ) -> DFResult<Arc<dyn ExecutionPlan>> {
494 let mut source = file_scan_config
495 .file_source()
496 .as_any()
497 .downcast_ref::<VortexSource>()
498 .cloned()
499 .ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
500
501 source = source
502 .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());
503
504 let conf = FileScanConfigBuilder::from(file_scan_config)
505 .with_source(Arc::new(source))
506 .build();
507
508 Ok(DataSourceExec::from_data_source(conf))
509 }
510
511 async fn create_writer_physical_plan(
512 &self,
513 input: Arc<dyn ExecutionPlan>,
514 _state: &dyn Session,
515 conf: FileSinkConfig,
516 order_requirements: Option<LexRequirement>,
517 ) -> DFResult<Arc<dyn ExecutionPlan>> {
518 if conf.insert_op != InsertOp::Append {
519 return not_impl_err!("Overwrites are not implemented yet for Vortex");
520 }
521
522 let schema = Arc::clone(conf.output_schema());
523 let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
524
525 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
526 }
527
528 fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
529 let mut source = VortexSource::new(table_schema, self.session.clone())
530 .with_projection_pushdown(self.opts.projection_pushdown);
531
532 if let Some(scan_concurrency) = self.opts.scan_concurrency {
533 source = source.with_scan_concurrency(scan_concurrency);
534 }
535
536 Arc::new(source) as _
537 }
538}
539
540#[cfg(test)]
541mod tests {
542
543 use super::*;
544 use crate::common_tests::TestSessionContext;
545
546 #[tokio::test]
547 async fn create_table() -> anyhow::Result<()> {
548 let ctx = TestSessionContext::default();
549
550 ctx.session
551 .sql(
552 "CREATE EXTERNAL TABLE my_tbl \
553 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
554 STORED AS vortex \
555 LOCATION 'table/'",
556 )
557 .await?;
558
559 assert!(ctx.session.table_exist("my_tbl")?);
560
561 Ok(())
562 }
563
564 #[tokio::test]
565 async fn configure_format_source() -> anyhow::Result<()> {
566 let ctx = TestSessionContext::default();
567
568 ctx.session
569 .sql(
570 "CREATE EXTERNAL TABLE my_tbl \
571 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
572 STORED AS vortex \
573 LOCATION 'table/' \
574 OPTIONS( footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
575 )
576 .await?
577 .collect()
578 .await?;
579
580 Ok(())
581 }
582
583 #[test]
584 fn format_plumbs_footer_initial_read_size() {
585 let mut opts = VortexTableOptions::default();
586 opts.set("footer_initial_read_size_bytes", "12345").unwrap();
587
588 let format = VortexFormat::new_with_options(VortexSession::default(), opts);
589 assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
590 }
591}