vortex_datafusion/persistent/
format.rs1use std::any::Any;
5use std::fmt::{Debug, Formatter};
6use std::sync::Arc;
7
8use arrow_schema::{Schema, SchemaRef};
9use async_trait::async_trait;
10use datafusion_catalog::Session;
11use datafusion_common::config::ConfigField;
12use datafusion_common::parsers::CompressionTypeVariant;
13use datafusion_common::stats::Precision;
14use datafusion_common::{
15 ColumnStatistics, DataFusionError, GetExt, Result as DFResult, Statistics, config_namespace,
16 not_impl_err,
17};
18use datafusion_common_runtime::SpawnedTask;
19use datafusion_datasource::file::FileSource;
20use datafusion_datasource::file_compression_type::FileCompressionType;
21use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
22use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
23use datafusion_datasource::file_sink_config::FileSinkConfig;
24use datafusion_datasource::sink::DataSinkExec;
25use datafusion_datasource::source::DataSourceExec;
26use datafusion_expr::dml::InsertOp;
27use datafusion_physical_expr::LexRequirement;
28use datafusion_physical_plan::ExecutionPlan;
29use futures::{FutureExt, StreamExt as _, TryStreamExt as _, stream};
30use itertools::Itertools;
31use object_store::{ObjectMeta, ObjectStore};
32use vortex::dtype::arrow::FromArrowType;
33use vortex::dtype::{DType, Nullability, PType};
34use vortex::error::{VortexExpect, VortexResult, vortex_err};
35use vortex::file::VORTEX_FILE_EXTENSION;
36use vortex::scalar::Scalar;
37use vortex::session::VortexSession;
38use vortex::stats::{Stat, StatsSet};
39use vortex::{VortexSessionDefault, stats};
40
41use super::cache::VortexFileCache;
42use super::sink::VortexSink;
43use super::source::VortexSource;
44use crate::PrecisionExt as _;
45use crate::convert::TryToDataFusion;
46
47pub struct VortexFormat {
49 session: VortexSession,
50 file_cache: VortexFileCache,
51 opts: VortexOptions,
52}
53
54impl Debug for VortexFormat {
55 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("VortexFormat")
57 .field("opts", &self.opts)
58 .finish()
59 }
60}
61
62config_namespace! {
63 pub struct VortexOptions {
69 pub footer_cache_size_mb: usize, default = 64
71 pub segment_cache_size_mb: usize, default = 0
73 }
74}
75
76impl Eq for VortexOptions {}
77
78#[derive(Debug)]
80pub struct VortexFormatFactory {
81 session: VortexSession,
82 options: Option<VortexOptions>,
83}
84
85impl GetExt for VortexFormatFactory {
86 fn get_ext(&self) -> String {
87 VORTEX_FILE_EXTENSION.to_string()
88 }
89}
90
91impl VortexFormatFactory {
92 #[allow(clippy::new_without_default)] pub fn new() -> Self {
95 Self {
96 session: VortexSession::default(),
97 options: None,
98 }
99 }
100
101 pub fn new_with_options(session: VortexSession, options: VortexOptions) -> Self {
105 Self {
106 session,
107 options: Some(options),
108 }
109 }
110
111 pub fn with_options(mut self, options: VortexOptions) -> Self {
120 self.options = Some(options);
121 self
122 }
123}
124
125impl FileFormatFactory for VortexFormatFactory {
126 #[allow(clippy::disallowed_types)]
127 fn create(
128 &self,
129 _state: &dyn Session,
130 format_options: &std::collections::HashMap<String, String>,
131 ) -> DFResult<Arc<dyn FileFormat>> {
132 let mut opts = self.options.clone().unwrap_or_default();
133 for (key, value) in format_options {
134 if let Some(key) = key.strip_prefix("format.") {
135 opts.set(key, value)?;
136 } else {
137 tracing::trace!("Ignoring options '{key}'");
138 }
139 }
140
141 Ok(Arc::new(VortexFormat::new_with_options(
142 self.session.clone(),
143 opts,
144 )))
145 }
146
147 fn default(&self) -> Arc<dyn FileFormat> {
148 Arc::new(VortexFormat::new(self.session.clone()))
149 }
150
151 fn as_any(&self) -> &dyn Any {
152 self
153 }
154}
155
156impl VortexFormat {
157 pub fn new(session: VortexSession) -> Self {
159 Self::new_with_options(session, VortexOptions::default())
160 }
161
162 pub fn new_with_options(session: VortexSession, opts: VortexOptions) -> Self {
164 Self {
165 session: session.clone(),
166 file_cache: VortexFileCache::new(
167 opts.footer_cache_size_mb,
168 opts.segment_cache_size_mb,
169 session,
170 ),
171 opts,
172 }
173 }
174
175 pub fn options(&self) -> &VortexOptions {
177 &self.opts
178 }
179}
180
181#[async_trait]
182impl FileFormat for VortexFormat {
183 fn as_any(&self) -> &dyn Any {
184 self
185 }
186
187 fn compression_type(&self) -> Option<FileCompressionType> {
188 None
189 }
190
191 fn get_ext(&self) -> String {
192 VORTEX_FILE_EXTENSION.to_string()
193 }
194
195 fn get_ext_with_compression(
196 &self,
197 file_compression_type: &FileCompressionType,
198 ) -> DFResult<String> {
199 match file_compression_type.get_variant() {
200 CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
201 _ => Err(DataFusionError::Internal(
202 "Vortex does not support file level compression.".into(),
203 )),
204 }
205 }
206
207 async fn infer_schema(
208 &self,
209 state: &dyn Session,
210 store: &Arc<dyn ObjectStore>,
211 objects: &[ObjectMeta],
212 ) -> DFResult<SchemaRef> {
213 let mut file_schemas = stream::iter(objects.iter().cloned())
214 .map(|o| {
215 let store = store.clone();
216 let cache = self.file_cache.clone();
217 SpawnedTask::spawn(async move {
218 let vxf = cache.try_get(&o, store).await?;
219 let inferred_schema = vxf.dtype().to_arrow_schema()?;
220 VortexResult::Ok((o.location, inferred_schema))
221 })
222 .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
223 })
224 .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
225 .try_collect::<Vec<_>>()
226 .await
227 .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
228
229 file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
231 let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
232
233 Ok(Arc::new(Schema::try_merge(file_schemas)?))
234 }
235
236 #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
237 async fn infer_stats(
238 &self,
239 _state: &dyn Session,
240 store: &Arc<dyn ObjectStore>,
241 table_schema: SchemaRef,
242 object: &ObjectMeta,
243 ) -> DFResult<Statistics> {
244 let object = object.clone();
245 let store = store.clone();
246 let cache = self.file_cache.clone();
247
248 SpawnedTask::spawn(async move {
249 let vxf = cache.try_get(&object, store.clone()).await.map_err(|e| {
250 DataFusionError::Execution(format!(
251 "Failed to open Vortex file {}: {e}",
252 object.location
253 ))
254 })?;
255
256 let struct_dtype = vxf
257 .dtype()
258 .as_struct_fields_opt()
259 .vortex_expect("dtype is not a struct");
260
261 let Some(file_stats) = vxf.file_stats() else {
263 return Ok(Statistics {
265 num_rows: Precision::Exact(
266 usize::try_from(vxf.row_count())
267 .map_err(|_| vortex_err!("Row count overflow"))
268 .vortex_expect("Row count overflow"),
269 ),
270 total_byte_size: Precision::Absent,
271 column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
272 });
273 };
274
275 let stats = table_schema
276 .fields()
277 .iter()
278 .map(|field| struct_dtype.find(field.name()))
279 .map(|idx| match idx {
280 None => StatsSet::default(),
281 Some(id) => file_stats[id].clone(),
282 })
283 .collect_vec();
284
285 let total_byte_size = stats
286 .iter()
287 .map(|stats_set| {
288 stats_set
289 .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
290 .unwrap_or_else(|| stats::Precision::inexact(0_usize))
291 })
292 .fold(stats::Precision::exact(0_usize), |acc, stats_set| {
293 acc.zip(stats_set).map(|(acc, stats_set)| acc + stats_set)
294 });
295
296 let total_byte_size = total_byte_size.to_df();
298
299 let column_statistics = stats
300 .into_iter()
301 .zip(table_schema.fields().iter())
302 .map(|(stats_set, field)| {
303 let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
304 let min = stats_set.get(Stat::Min).and_then(|n| {
305 n.map(|n| {
306 Scalar::new(
307 Stat::Min
308 .dtype(&DType::from_arrow(field.as_ref()))
309 .vortex_expect("must have a valid dtype"),
310 n,
311 )
312 .try_to_df()
313 .ok()
314 })
315 .transpose()
316 });
317
318 let max = stats_set.get(Stat::Max).and_then(|n| {
319 n.map(|n| {
320 Scalar::new(
321 Stat::Max
322 .dtype(&DType::from_arrow(field.as_ref()))
323 .vortex_expect("must have a valid dtype"),
324 n,
325 )
326 .try_to_df()
327 .ok()
328 })
329 .transpose()
330 });
331
332 ColumnStatistics {
333 null_count: null_count.to_df(),
334 max_value: max.to_df(),
335 min_value: min.to_df(),
336 sum_value: Precision::Absent,
337 distinct_count: stats_set
338 .get_as::<bool>(
339 Stat::IsConstant,
340 &DType::Bool(Nullability::NonNullable),
341 )
342 .and_then(|is_constant| {
343 is_constant.as_exact().map(|_| Precision::Exact(1))
344 })
345 .unwrap_or(Precision::Absent),
346 }
347 })
348 .collect::<Vec<_>>();
349
350 Ok(Statistics {
351 num_rows: Precision::Exact(
352 usize::try_from(vxf.row_count())
353 .map_err(|_| vortex_err!("Row count overflow"))
354 .vortex_expect("Row count overflow"),
355 ),
356 total_byte_size,
357 column_statistics,
358 })
359 })
360 .await
361 .vortex_expect("Failed to spawn infer_stats")
362 }
363
364 async fn create_physical_plan(
365 &self,
366 _state: &dyn Session,
367 file_scan_config: FileScanConfig,
368 ) -> DFResult<Arc<dyn ExecutionPlan>> {
369 let source = VortexSource::new(self.session.clone(), self.file_cache.clone());
370 let source = Arc::new(source);
371
372 Ok(DataSourceExec::from_data_source(
373 FileScanConfigBuilder::from(file_scan_config)
374 .with_source(source)
375 .build(),
376 ))
377 }
378
379 async fn create_writer_physical_plan(
380 &self,
381 input: Arc<dyn ExecutionPlan>,
382 _state: &dyn Session,
383 conf: FileSinkConfig,
384 order_requirements: Option<LexRequirement>,
385 ) -> DFResult<Arc<dyn ExecutionPlan>> {
386 if conf.insert_op != InsertOp::Append {
387 return not_impl_err!("Overwrites are not implemented yet for Vortex");
388 }
389
390 let schema = conf.output_schema().clone();
391 let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
392
393 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
394 }
395
396 fn file_source(&self) -> Arc<dyn FileSource> {
397 Arc::new(VortexSource::new(
398 self.session.clone(),
399 self.file_cache.clone(),
400 ))
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use datafusion::execution::SessionStateBuilder;
407 use datafusion::prelude::SessionContext;
408 use tempfile::TempDir;
409
410 use super::*;
411 use crate::persistent::register_vortex_format_factory;
412
413 #[tokio::test]
414 async fn create_table() {
415 let dir = TempDir::new().unwrap();
416
417 let factory: VortexFormatFactory = VortexFormatFactory::new();
418 let mut session_state_builder = SessionStateBuilder::new().with_default_features();
419 register_vortex_format_factory(factory, &mut session_state_builder);
420 let session = SessionContext::new_with_state(session_state_builder.build());
421
422 let df = session
423 .sql(&format!(
424 "CREATE EXTERNAL TABLE my_tbl \
425 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
426 STORED AS vortex \
427 LOCATION '{}'",
428 dir.path().to_str().unwrap()
429 ))
430 .await
431 .unwrap();
432
433 assert_eq!(df.count().await.unwrap(), 0);
434 }
435
436 #[tokio::test]
437 async fn configure_format_source() {
438 let dir = TempDir::new().unwrap();
439
440 let factory = VortexFormatFactory::new();
441 let mut session_state_builder = SessionStateBuilder::new().with_default_features();
442 register_vortex_format_factory(factory, &mut session_state_builder);
443 let session = SessionContext::new_with_state(session_state_builder.build());
444
445 session
446 .sql(&format!(
447 "CREATE EXTERNAL TABLE my_tbl \
448 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
449 STORED AS vortex \
450 LOCATION '{}' \
451 OPTIONS( segment_cache_size_mb '5' );",
452 dir.path().to_str().unwrap()
453 ))
454 .await
455 .unwrap()
456 .collect()
457 .await
458 .unwrap();
459 }
460}