datafusion_orc/
file_format.rs1use std::any::Any;
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use datafusion::arrow::datatypes::{Schema, SchemaRef};
24use datafusion::common::Statistics;
25use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
26use datafusion::datasource::file_format::FileFormat;
27use datafusion::datasource::physical_plan::{FileScanConfig, FileSource};
28use datafusion::error::{DataFusionError, Result};
29use datafusion::physical_plan::ExecutionPlan;
30use futures::TryStreamExt;
31use orc_rust::reader::metadata::read_metadata_async;
32
33use crate::OrcSource;
34use async_trait::async_trait;
35use datafusion::catalog::Session;
36use datafusion::datasource::source::DataSourceExec;
37use futures_util::StreamExt;
38use object_store::path::Path;
39use object_store::{ObjectMeta, ObjectStore};
40
41use super::object_store_reader::ObjectStoreReader;
42
43async fn fetch_schema(store: &Arc<dyn ObjectStore>, file: &ObjectMeta) -> Result<(Path, Schema)> {
44 let loc_path = file.location.clone();
45 let mut reader = ObjectStoreReader::new(Arc::clone(store), file.clone());
46 let metadata = read_metadata_async(&mut reader)
47 .await
48 .map_err(|e| DataFusionError::External(Box::new(e)))?;
49 let schema = metadata
50 .root_data_type()
51 .create_arrow_schema(&HashMap::default());
52 Ok((loc_path, schema))
53}
54
55#[derive(Clone, Debug)]
56pub struct OrcFormat;
57
58#[async_trait]
59impl FileFormat for OrcFormat {
60 fn as_any(&self) -> &dyn Any {
61 self
62 }
63
64 fn get_ext(&self) -> String {
65 "orc".to_string()
66 }
67
68 fn get_ext_with_compression(&self, _compression: &FileCompressionType) -> Result<String> {
69 Ok("orc".to_string())
70 }
71
72 fn compression_type(&self) -> Option<FileCompressionType> {
73 None
74 }
75
76 async fn infer_schema(
77 &self,
78 state: &dyn Session,
79 store: &Arc<dyn ObjectStore>,
80 objects: &[ObjectMeta],
81 ) -> Result<SchemaRef> {
82 let mut schemas: Vec<_> = futures::stream::iter(objects)
83 .map(|object| fetch_schema(store, object))
84 .boxed() .buffered(state.config_options().execution.meta_fetch_concurrency)
86 .try_collect()
87 .await?;
88
89 schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
96
97 let schemas = schemas
98 .into_iter()
99 .map(|(_, schema)| schema)
100 .collect::<Vec<_>>();
101
102 let schema = Schema::try_merge(schemas)?;
103
104 Ok(Arc::new(schema))
105 }
106
107 async fn infer_stats(
108 &self,
109 _state: &dyn Session,
110 _store: &Arc<dyn ObjectStore>,
111 table_schema: SchemaRef,
112 _object: &ObjectMeta,
113 ) -> Result<Statistics> {
114 Ok(Statistics::new_unknown(&table_schema))
115 }
116
117 async fn create_physical_plan(
118 &self,
119 _state: &dyn Session,
120 conf: FileScanConfig,
121 ) -> Result<Arc<dyn ExecutionPlan>> {
122 Ok(DataSourceExec::from_data_source(conf))
123 }
124
125 fn file_source(&self) -> Arc<dyn FileSource> {
126 Arc::new(OrcSource::default())
127 }
128}