vortex_datafusion/persistent/
format.rs1use std::any::Any;
5use std::fmt::{Debug, Formatter};
6use std::sync::Arc;
7
8use arrow_schema::{Schema, SchemaRef};
9use async_trait::async_trait;
10use datafusion_catalog::Session;
11use datafusion_common::config::ConfigField;
12use datafusion_common::parsers::CompressionTypeVariant;
13use datafusion_common::stats::Precision;
14use datafusion_common::{
15 ColumnStatistics, DataFusionError, GetExt, Result as DFResult, Statistics, config_namespace,
16 not_impl_err,
17};
18use datafusion_common_runtime::SpawnedTask;
19use datafusion_datasource::file::FileSource;
20use datafusion_datasource::file_compression_type::FileCompressionType;
21use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
22use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
23use datafusion_datasource::file_sink_config::FileSinkConfig;
24use datafusion_datasource::sink::DataSinkExec;
25use datafusion_datasource::source::DataSourceExec;
26use datafusion_expr::dml::InsertOp;
27use datafusion_physical_expr::LexRequirement;
28use datafusion_physical_plan::ExecutionPlan;
29use futures::{FutureExt, StreamExt as _, TryStreamExt as _, stream};
30use itertools::Itertools;
31use object_store::{ObjectMeta, ObjectStore};
32use vortex::dtype::arrow::FromArrowType;
33use vortex::dtype::{DType, Nullability, PType};
34use vortex::error::{VortexExpect, VortexResult, vortex_err};
35use vortex::file::VORTEX_FILE_EXTENSION;
36use vortex::metrics::VortexMetrics;
37use vortex::scalar::Scalar;
38use vortex::session::VortexSession;
39use vortex::stats;
40use vortex::stats::{Stat, StatsSet};
41
42use super::cache::VortexFileCache;
43use super::sink::VortexSink;
44use super::source::VortexSource;
45use crate::PrecisionExt as _;
46use crate::convert::TryToDataFusion;
47
48pub struct VortexFormat {
50 session: Arc<VortexSession>,
51 file_cache: VortexFileCache,
52 opts: VortexOptions,
53}
54
55impl Debug for VortexFormat {
56 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("VortexFormat")
58 .field("opts", &self.opts)
59 .finish()
60 }
61}
62
63config_namespace! {
64 pub struct VortexOptions {
70 pub footer_cache_size_mb: usize, default = 64
72 pub segment_cache_size_mb: usize, default = 0
74 }
75}
76
77impl Eq for VortexOptions {}
78
79#[derive(Debug)]
81pub struct VortexFormatFactory {
82 session: Arc<VortexSession>,
83 options: Option<VortexOptions>,
84}
85
86impl GetExt for VortexFormatFactory {
87 fn get_ext(&self) -> String {
88 VORTEX_FILE_EXTENSION.to_string()
89 }
90}
91
92impl VortexFormatFactory {
93 #[allow(clippy::new_without_default)] pub fn new() -> Self {
96 Self {
97 session: Arc::new(VortexSession::default()),
98 options: None,
99 }
100 }
101
102 pub fn new_with_options(session: Arc<VortexSession>, options: VortexOptions) -> Self {
106 Self {
107 session,
108 options: Some(options),
109 }
110 }
111
112 pub fn with_options(mut self, options: VortexOptions) -> Self {
121 self.options = Some(options);
122 self
123 }
124}
125
126impl FileFormatFactory for VortexFormatFactory {
127 #[allow(clippy::disallowed_types)]
128 fn create(
129 &self,
130 _state: &dyn Session,
131 format_options: &std::collections::HashMap<String, String>,
132 ) -> DFResult<Arc<dyn FileFormat>> {
133 let mut opts = self.options.clone().unwrap_or_default();
134 for (key, value) in format_options {
135 if let Some(key) = key.strip_prefix("format.") {
136 opts.set(key, value)?;
137 } else {
138 tracing::trace!("Ignoring options '{key}'");
139 }
140 }
141
142 Ok(Arc::new(VortexFormat::new_with_options(
143 self.session.clone(),
144 opts,
145 )))
146 }
147
148 fn default(&self) -> Arc<dyn FileFormat> {
149 Arc::new(VortexFormat::default())
150 }
151
152 fn as_any(&self) -> &dyn Any {
153 self
154 }
155}
156
157impl Default for VortexFormat {
158 fn default() -> Self {
159 Self::new(Arc::new(VortexSession::default()))
160 }
161}
162
163impl VortexFormat {
164 pub fn new(session: Arc<VortexSession>) -> Self {
166 Self::new_with_options(session, VortexOptions::default())
167 }
168
169 pub fn new_with_options(session: Arc<VortexSession>, opts: VortexOptions) -> Self {
171 Self {
172 session: session.clone(),
173 file_cache: VortexFileCache::new(
174 opts.footer_cache_size_mb,
175 opts.segment_cache_size_mb,
176 session,
177 ),
178 opts,
179 }
180 }
181
182 pub fn options(&self) -> &VortexOptions {
184 &self.opts
185 }
186}
187
188#[async_trait]
189impl FileFormat for VortexFormat {
190 fn as_any(&self) -> &dyn Any {
191 self
192 }
193
194 fn compression_type(&self) -> Option<FileCompressionType> {
195 None
196 }
197
198 fn get_ext(&self) -> String {
199 VORTEX_FILE_EXTENSION.to_string()
200 }
201
202 fn get_ext_with_compression(
203 &self,
204 file_compression_type: &FileCompressionType,
205 ) -> DFResult<String> {
206 match file_compression_type.get_variant() {
207 CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
208 _ => Err(DataFusionError::Internal(
209 "Vortex does not support file level compression.".into(),
210 )),
211 }
212 }
213
214 async fn infer_schema(
215 &self,
216 state: &dyn Session,
217 store: &Arc<dyn ObjectStore>,
218 objects: &[ObjectMeta],
219 ) -> DFResult<SchemaRef> {
220 let mut file_schemas = stream::iter(objects.iter().cloned())
221 .map(|o| {
222 let store = store.clone();
223 let cache = self.file_cache.clone();
224 SpawnedTask::spawn(async move {
225 let vxf = cache.try_get(&o, store).await?;
226 let inferred_schema = vxf.dtype().to_arrow_schema()?;
227 VortexResult::Ok((o.location, inferred_schema))
228 })
229 .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
230 })
231 .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
232 .try_collect::<Vec<_>>()
233 .await
234 .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
235
236 file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
238 let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
239
240 Ok(Arc::new(Schema::try_merge(file_schemas)?))
241 }
242
243 #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
244 async fn infer_stats(
245 &self,
246 _state: &dyn Session,
247 store: &Arc<dyn ObjectStore>,
248 table_schema: SchemaRef,
249 object: &ObjectMeta,
250 ) -> DFResult<Statistics> {
251 let object = object.clone();
252 let store = store.clone();
253 let cache = self.file_cache.clone();
254
255 SpawnedTask::spawn(async move {
256 let vxf = cache.try_get(&object, store.clone()).await.map_err(|e| {
257 DataFusionError::Execution(format!(
258 "Failed to open Vortex file {}: {e}",
259 object.location
260 ))
261 })?;
262
263 let struct_dtype = vxf
264 .dtype()
265 .as_struct_fields_opt()
266 .vortex_expect("dtype is not a struct");
267
268 let Some(file_stats) = vxf.file_stats() else {
270 return Ok(Statistics {
272 num_rows: Precision::Exact(
273 usize::try_from(vxf.row_count())
274 .map_err(|_| vortex_err!("Row count overflow"))
275 .vortex_expect("Row count overflow"),
276 ),
277 total_byte_size: Precision::Absent,
278 column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
279 });
280 };
281
282 let stats = table_schema
283 .fields()
284 .iter()
285 .map(|field| struct_dtype.find(field.name()))
286 .map(|idx| match idx {
287 None => StatsSet::default(),
288 Some(id) => file_stats[id].clone(),
289 })
290 .collect_vec();
291
292 let total_byte_size = stats
293 .iter()
294 .map(|stats_set| {
295 stats_set
296 .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
297 .unwrap_or_else(|| stats::Precision::inexact(0_usize))
298 })
299 .fold(stats::Precision::exact(0_usize), |acc, stats_set| {
300 acc.zip(stats_set).map(|(acc, stats_set)| acc + stats_set)
301 });
302
303 let total_byte_size = total_byte_size.to_df();
305
306 let column_statistics = stats
307 .into_iter()
308 .zip(table_schema.fields().iter())
309 .map(|(stats_set, field)| {
310 let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
311 let min = stats_set.get(Stat::Min).and_then(|n| {
312 n.map(|n| {
313 Scalar::new(
314 Stat::Min
315 .dtype(&DType::from_arrow(field.as_ref()))
316 .vortex_expect("must have a valid dtype"),
317 n,
318 )
319 .try_to_df()
320 .ok()
321 })
322 .transpose()
323 });
324
325 let max = stats_set.get(Stat::Max).and_then(|n| {
326 n.map(|n| {
327 Scalar::new(
328 Stat::Max
329 .dtype(&DType::from_arrow(field.as_ref()))
330 .vortex_expect("must have a valid dtype"),
331 n,
332 )
333 .try_to_df()
334 .ok()
335 })
336 .transpose()
337 });
338
339 ColumnStatistics {
340 null_count: null_count.to_df(),
341 max_value: max.to_df(),
342 min_value: min.to_df(),
343 sum_value: Precision::Absent,
344 distinct_count: stats_set
345 .get_as::<bool>(
346 Stat::IsConstant,
347 &DType::Bool(Nullability::NonNullable),
348 )
349 .and_then(|is_constant| {
350 is_constant.as_exact().map(|_| Precision::Exact(1))
351 })
352 .unwrap_or(Precision::Absent),
353 }
354 })
355 .collect::<Vec<_>>();
356
357 Ok(Statistics {
358 num_rows: Precision::Exact(
359 usize::try_from(vxf.row_count())
360 .map_err(|_| vortex_err!("Row count overflow"))
361 .vortex_expect("Row count overflow"),
362 ),
363 total_byte_size,
364 column_statistics,
365 })
366 })
367 .await
368 .vortex_expect("Failed to spawn infer_stats")
369 }
370
371 async fn create_physical_plan(
372 &self,
373 _state: &dyn Session,
374 file_scan_config: FileScanConfig,
375 ) -> DFResult<Arc<dyn ExecutionPlan>> {
376 let source = VortexSource::new(self.file_cache.clone(), self.session.metrics().clone());
377 let source = Arc::new(source);
378
379 Ok(DataSourceExec::from_data_source(
380 FileScanConfigBuilder::from(file_scan_config)
381 .with_source(source)
382 .build(),
383 ))
384 }
385
386 async fn create_writer_physical_plan(
387 &self,
388 input: Arc<dyn ExecutionPlan>,
389 _state: &dyn Session,
390 conf: FileSinkConfig,
391 order_requirements: Option<LexRequirement>,
392 ) -> DFResult<Arc<dyn ExecutionPlan>> {
393 if conf.insert_op != InsertOp::Append {
394 return not_impl_err!("Overwrites are not implemented yet for Vortex");
395 }
396
397 let schema = conf.output_schema().clone();
398 let sink = Arc::new(VortexSink::new(conf, schema));
399
400 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
401 }
402
403 fn file_source(&self) -> Arc<dyn FileSource> {
404 Arc::new(VortexSource::new(
405 self.file_cache.clone(),
406 VortexMetrics::default(),
407 ))
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use datafusion::execution::SessionStateBuilder;
414 use datafusion::prelude::SessionContext;
415 use tempfile::TempDir;
416
417 use super::*;
418 use crate::persistent::register_vortex_format_factory;
419
420 #[tokio::test]
421 async fn create_table() {
422 let dir = TempDir::new().unwrap();
423
424 let factory: VortexFormatFactory = VortexFormatFactory::new();
425 let mut session_state_builder = SessionStateBuilder::new().with_default_features();
426 register_vortex_format_factory(factory, &mut session_state_builder);
427 let session = SessionContext::new_with_state(session_state_builder.build());
428
429 let df = session
430 .sql(&format!(
431 "CREATE EXTERNAL TABLE my_tbl \
432 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
433 STORED AS vortex \
434 LOCATION '{}'",
435 dir.path().to_str().unwrap()
436 ))
437 .await
438 .unwrap();
439
440 assert_eq!(df.count().await.unwrap(), 0);
441 }
442
443 #[tokio::test]
444 async fn configure_format_source() {
445 let dir = TempDir::new().unwrap();
446
447 let factory = VortexFormatFactory::new();
448 let mut session_state_builder = SessionStateBuilder::new().with_default_features();
449 register_vortex_format_factory(factory, &mut session_state_builder);
450 let session = SessionContext::new_with_state(session_state_builder.build());
451
452 session
453 .sql(&format!(
454 "CREATE EXTERNAL TABLE my_tbl \
455 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
456 STORED AS vortex \
457 LOCATION '{}' \
458 OPTIONS( segment_cache_size_mb '5' );",
459 dir.path().to_str().unwrap()
460 ))
461 .await
462 .unwrap()
463 .collect()
464 .await
465 .unwrap();
466 }
467}