vortex_datafusion/persistent/
format.rs1use std::any::Any;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9use arrow_schema::Schema;
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion_catalog::Session;
13use datafusion_common::ColumnStatistics;
14use datafusion_common::DataFusionError;
15use datafusion_common::GetExt;
16use datafusion_common::Result as DFResult;
17use datafusion_common::Statistics;
18use datafusion_common::config::ConfigField;
19use datafusion_common::config_namespace;
20use datafusion_common::not_impl_err;
21use datafusion_common::parsers::CompressionTypeVariant;
22use datafusion_common::stats::Precision;
23use datafusion_common_runtime::SpawnedTask;
24use datafusion_datasource::file::FileSource;
25use datafusion_datasource::file_compression_type::FileCompressionType;
26use datafusion_datasource::file_format::FileFormat;
27use datafusion_datasource::file_format::FileFormatFactory;
28use datafusion_datasource::file_scan_config::FileScanConfig;
29use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
30use datafusion_datasource::file_sink_config::FileSinkConfig;
31use datafusion_datasource::sink::DataSinkExec;
32use datafusion_datasource::source::DataSourceExec;
33use datafusion_expr::dml::InsertOp;
34use datafusion_physical_expr::LexRequirement;
35use datafusion_physical_plan::ExecutionPlan;
36use futures::FutureExt;
37use futures::StreamExt as _;
38use futures::TryStreamExt as _;
39use futures::stream;
40use itertools::Itertools;
41use object_store::ObjectMeta;
42use object_store::ObjectStore;
43use vortex::VortexSessionDefault;
44use vortex::array::stats::StatsSet;
45use vortex::dtype::DType;
46use vortex::dtype::Nullability;
47use vortex::dtype::PType;
48use vortex::dtype::arrow::FromArrowType;
49use vortex::error::VortexExpect;
50use vortex::error::VortexResult;
51use vortex::error::vortex_err;
52use vortex::expr::stats;
53use vortex::expr::stats::Stat;
54use vortex::file::VORTEX_FILE_EXTENSION;
55use vortex::scalar::Scalar;
56use vortex::session::VortexSession;
57
58use super::cache::VortexFileCache;
59use super::sink::VortexSink;
60use super::source::VortexSource;
61use crate::PrecisionExt as _;
62use crate::convert::TryToDataFusion;
63
64pub struct VortexFormat {
66 session: VortexSession,
67 file_cache: VortexFileCache,
68 opts: VortexOptions,
69}
70
71impl Debug for VortexFormat {
72 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("VortexFormat")
74 .field("opts", &self.opts)
75 .finish()
76 }
77}
78
79config_namespace! {
80 pub struct VortexOptions {
86 pub footer_cache_size_mb: usize, default = 64
88 pub segment_cache_size_mb: usize, default = 0
90 }
91}
92
93impl Eq for VortexOptions {}
94
95#[derive(Debug)]
97pub struct VortexFormatFactory {
98 session: VortexSession,
99 options: Option<VortexOptions>,
100}
101
102impl GetExt for VortexFormatFactory {
103 fn get_ext(&self) -> String {
104 VORTEX_FILE_EXTENSION.to_string()
105 }
106}
107
108impl VortexFormatFactory {
109 #[expect(
111 clippy::new_without_default,
112 reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
113 )]
114 pub fn new() -> Self {
115 Self {
116 session: VortexSession::default(),
117 options: None,
118 }
119 }
120
121 pub fn new_with_options(session: VortexSession, options: VortexOptions) -> Self {
125 Self {
126 session,
127 options: Some(options),
128 }
129 }
130
131 pub fn with_options(mut self, options: VortexOptions) -> Self {
140 self.options = Some(options);
141 self
142 }
143}
144
145impl FileFormatFactory for VortexFormatFactory {
146 #[expect(clippy::disallowed_types, reason = "required by trait signature")]
147 fn create(
148 &self,
149 _state: &dyn Session,
150 format_options: &std::collections::HashMap<String, String>,
151 ) -> DFResult<Arc<dyn FileFormat>> {
152 let mut opts = self.options.clone().unwrap_or_default();
153 for (key, value) in format_options {
154 if let Some(key) = key.strip_prefix("format.") {
155 opts.set(key, value)?;
156 } else {
157 tracing::trace!("Ignoring options '{key}'");
158 }
159 }
160
161 Ok(Arc::new(VortexFormat::new_with_options(
162 self.session.clone(),
163 opts,
164 )))
165 }
166
167 fn default(&self) -> Arc<dyn FileFormat> {
168 Arc::new(VortexFormat::new(self.session.clone()))
169 }
170
171 fn as_any(&self) -> &dyn Any {
172 self
173 }
174}
175
176impl VortexFormat {
177 pub fn new(session: VortexSession) -> Self {
179 Self::new_with_options(session, VortexOptions::default())
180 }
181
182 pub fn new_with_options(session: VortexSession, opts: VortexOptions) -> Self {
184 Self {
185 session: session.clone(),
186 file_cache: VortexFileCache::new(
187 opts.footer_cache_size_mb,
188 opts.segment_cache_size_mb,
189 session,
190 ),
191 opts,
192 }
193 }
194
195 pub fn options(&self) -> &VortexOptions {
197 &self.opts
198 }
199}
200
201#[async_trait]
202impl FileFormat for VortexFormat {
203 fn as_any(&self) -> &dyn Any {
204 self
205 }
206
207 fn compression_type(&self) -> Option<FileCompressionType> {
208 None
209 }
210
211 fn get_ext(&self) -> String {
212 VORTEX_FILE_EXTENSION.to_string()
213 }
214
215 fn get_ext_with_compression(
216 &self,
217 file_compression_type: &FileCompressionType,
218 ) -> DFResult<String> {
219 match file_compression_type.get_variant() {
220 CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
221 _ => Err(DataFusionError::Internal(
222 "Vortex does not support file level compression.".into(),
223 )),
224 }
225 }
226
227 async fn infer_schema(
228 &self,
229 state: &dyn Session,
230 store: &Arc<dyn ObjectStore>,
231 objects: &[ObjectMeta],
232 ) -> DFResult<SchemaRef> {
233 let mut file_schemas = stream::iter(objects.iter().cloned())
234 .map(|o| {
235 let store = store.clone();
236 let cache = self.file_cache.clone();
237 SpawnedTask::spawn(async move {
238 let vxf = cache.try_get(&o, store).await?;
239 let inferred_schema = vxf.dtype().to_arrow_schema()?;
240 VortexResult::Ok((o.location, inferred_schema))
241 })
242 .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
243 })
244 .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
245 .try_collect::<Vec<_>>()
246 .await
247 .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
248
249 file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
251 let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
252
253 Ok(Arc::new(Schema::try_merge(file_schemas)?))
254 }
255
256 #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
257 async fn infer_stats(
258 &self,
259 _state: &dyn Session,
260 store: &Arc<dyn ObjectStore>,
261 table_schema: SchemaRef,
262 object: &ObjectMeta,
263 ) -> DFResult<Statistics> {
264 let object = object.clone();
265 let store = store.clone();
266 let cache = self.file_cache.clone();
267
268 SpawnedTask::spawn(async move {
269 let vxf = cache.try_get(&object, store.clone()).await.map_err(|e| {
270 DataFusionError::Execution(format!(
271 "Failed to open Vortex file {}: {e}",
272 object.location
273 ))
274 })?;
275
276 let struct_dtype = vxf
277 .dtype()
278 .as_struct_fields_opt()
279 .vortex_expect("dtype is not a struct");
280
281 let Some(file_stats) = vxf.file_stats() else {
283 return Ok(Statistics {
285 num_rows: Precision::Exact(
286 usize::try_from(vxf.row_count())
287 .map_err(|_| vortex_err!("Row count overflow"))
288 .vortex_expect("Row count overflow"),
289 ),
290 total_byte_size: Precision::Absent,
291 column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
292 });
293 };
294
295 let stats = table_schema
296 .fields()
297 .iter()
298 .map(|field| struct_dtype.find(field.name()))
299 .map(|idx| match idx {
300 None => StatsSet::default(),
301 Some(id) => file_stats[id].clone(),
302 })
303 .collect_vec();
304
305 let total_byte_size = stats
306 .iter()
307 .map(|stats_set| {
308 stats_set
309 .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
310 .unwrap_or_else(|| stats::Precision::inexact(0_usize))
311 })
312 .fold(stats::Precision::exact(0_usize), |acc, stats_set| {
313 acc.zip(stats_set).map(|(acc, stats_set)| acc + stats_set)
314 });
315
316 let total_byte_size = total_byte_size.to_df();
318
319 let column_statistics = stats
320 .into_iter()
321 .zip(table_schema.fields().iter())
322 .map(|(stats_set, field)| {
323 let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
324 let min = stats_set.get(Stat::Min).and_then(|n| {
325 n.map(|n| {
326 Scalar::new(
327 Stat::Min
328 .dtype(&DType::from_arrow(field.as_ref()))
329 .vortex_expect("must have a valid dtype"),
330 n,
331 )
332 .try_to_df()
333 .ok()
334 })
335 .transpose()
336 });
337
338 let max = stats_set.get(Stat::Max).and_then(|n| {
339 n.map(|n| {
340 Scalar::new(
341 Stat::Max
342 .dtype(&DType::from_arrow(field.as_ref()))
343 .vortex_expect("must have a valid dtype"),
344 n,
345 )
346 .try_to_df()
347 .ok()
348 })
349 .transpose()
350 });
351
352 ColumnStatistics {
353 null_count: null_count.to_df(),
354 max_value: max.to_df(),
355 min_value: min.to_df(),
356 sum_value: Precision::Absent,
357 distinct_count: stats_set
358 .get_as::<bool>(
359 Stat::IsConstant,
360 &DType::Bool(Nullability::NonNullable),
361 )
362 .and_then(|is_constant| {
363 is_constant.as_exact().map(|_| Precision::Exact(1))
364 })
365 .unwrap_or(Precision::Absent),
366 }
367 })
368 .collect::<Vec<_>>();
369
370 Ok(Statistics {
371 num_rows: Precision::Exact(
372 usize::try_from(vxf.row_count())
373 .map_err(|_| vortex_err!("Row count overflow"))
374 .vortex_expect("Row count overflow"),
375 ),
376 total_byte_size,
377 column_statistics,
378 })
379 })
380 .await
381 .vortex_expect("Failed to spawn infer_stats")
382 }
383
384 async fn create_physical_plan(
385 &self,
386 _state: &dyn Session,
387 file_scan_config: FileScanConfig,
388 ) -> DFResult<Arc<dyn ExecutionPlan>> {
389 let source = VortexSource::new(self.session.clone(), self.file_cache.clone());
390 let source = Arc::new(source);
391
392 Ok(DataSourceExec::from_data_source(
393 FileScanConfigBuilder::from(file_scan_config)
394 .with_source(source)
395 .build(),
396 ))
397 }
398
399 async fn create_writer_physical_plan(
400 &self,
401 input: Arc<dyn ExecutionPlan>,
402 _state: &dyn Session,
403 conf: FileSinkConfig,
404 order_requirements: Option<LexRequirement>,
405 ) -> DFResult<Arc<dyn ExecutionPlan>> {
406 if conf.insert_op != InsertOp::Append {
407 return not_impl_err!("Overwrites are not implemented yet for Vortex");
408 }
409
410 let schema = conf.output_schema().clone();
411 let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
412
413 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
414 }
415
416 fn file_source(&self) -> Arc<dyn FileSource> {
417 Arc::new(VortexSource::new(
418 self.session.clone(),
419 self.file_cache.clone(),
420 ))
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use datafusion::execution::SessionStateBuilder;
427 use datafusion::prelude::SessionContext;
428 use tempfile::TempDir;
429
430 use super::*;
431 use crate::persistent::register_vortex_format_factory;
432
433 #[tokio::test]
434 async fn create_table() {
435 let dir = TempDir::new().unwrap();
436
437 let factory: VortexFormatFactory = VortexFormatFactory::new();
438 let mut session_state_builder = SessionStateBuilder::new().with_default_features();
439 register_vortex_format_factory(factory, &mut session_state_builder);
440 let session = SessionContext::new_with_state(session_state_builder.build());
441
442 let df = session
443 .sql(&format!(
444 "CREATE EXTERNAL TABLE my_tbl \
445 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
446 STORED AS vortex \
447 LOCATION '{}'",
448 dir.path().to_str().unwrap()
449 ))
450 .await
451 .unwrap();
452
453 assert_eq!(df.count().await.unwrap(), 0);
454 }
455
456 #[tokio::test]
457 async fn configure_format_source() {
458 let dir = TempDir::new().unwrap();
459
460 let factory = VortexFormatFactory::new();
461 let mut session_state_builder = SessionStateBuilder::new().with_default_features();
462 register_vortex_format_factory(factory, &mut session_state_builder);
463 let session = SessionContext::new_with_state(session_state_builder.build());
464
465 session
466 .sql(&format!(
467 "CREATE EXTERNAL TABLE my_tbl \
468 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
469 STORED AS vortex \
470 LOCATION '{}' \
471 OPTIONS( segment_cache_size_mb '5' );",
472 dir.path().to_str().unwrap()
473 ))
474 .await
475 .unwrap()
476 .collect()
477 .await
478 .unwrap();
479 }
480}