datafusion_orc/
file_format.rs1use std::any::Any;
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use arrow::datatypes::Schema;
24use datafusion::arrow::datatypes::SchemaRef;
25use datafusion::common::Statistics;
26use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
27use datafusion::datasource::file_format::FileFormat;
28use datafusion::datasource::physical_plan::{FileScanConfig, FileSource};
29use datafusion::error::{DataFusionError, Result};
30use datafusion::physical_plan::ExecutionPlan;
31use futures::TryStreamExt;
32use orc_rust::reader::metadata::read_metadata_async;
33
34use crate::OrcSource;
35use async_trait::async_trait;
36use datafusion::catalog::Session;
37use datafusion::datasource::source::DataSourceExec;
38use futures_util::StreamExt;
39use object_store::path::Path;
40use object_store::{ObjectMeta, ObjectStore};
41
42use super::object_store_reader::ObjectStoreReader;
43
44async fn fetch_schema(store: &Arc<dyn ObjectStore>, file: &ObjectMeta) -> Result<(Path, Schema)> {
45 let loc_path = file.location.clone();
46 let mut reader = ObjectStoreReader::new(Arc::clone(store), file.clone());
47 let metadata = read_metadata_async(&mut reader)
48 .await
49 .map_err(|e| DataFusionError::External(Box::new(e)))?;
50 let schema = metadata
51 .root_data_type()
52 .create_arrow_schema(&HashMap::default());
53 Ok((loc_path, schema))
54}
55
56#[derive(Clone, Debug)]
57pub struct OrcFormat;
58
59#[async_trait]
60impl FileFormat for OrcFormat {
61 fn as_any(&self) -> &dyn Any {
62 self
63 }
64
65 fn get_ext(&self) -> String {
66 "orc".to_string()
67 }
68
69 fn get_ext_with_compression(&self, _compression: &FileCompressionType) -> Result<String> {
70 Ok("orc".to_string())
71 }
72
73 fn compression_type(&self) -> Option<FileCompressionType> {
74 None
75 }
76
77 async fn infer_schema(
78 &self,
79 state: &dyn Session,
80 store: &Arc<dyn ObjectStore>,
81 objects: &[ObjectMeta],
82 ) -> Result<SchemaRef> {
83 let mut schemas: Vec<_> = futures::stream::iter(objects)
84 .map(|object| fetch_schema(store, object))
85 .boxed() .buffered(state.config_options().execution.meta_fetch_concurrency)
87 .try_collect()
88 .await?;
89
90 schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
97
98 let schemas = schemas
99 .into_iter()
100 .map(|(_, schema)| schema)
101 .collect::<Vec<_>>();
102
103 let schema = Schema::try_merge(schemas)?;
104
105 Ok(Arc::new(schema))
106 }
107
108 async fn infer_stats(
109 &self,
110 _state: &dyn Session,
111 _store: &Arc<dyn ObjectStore>,
112 table_schema: SchemaRef,
113 _object: &ObjectMeta,
114 ) -> Result<Statistics> {
115 Ok(Statistics::new_unknown(&table_schema))
116 }
117
118 async fn create_physical_plan(
119 &self,
120 _state: &dyn Session,
121 conf: FileScanConfig,
122 ) -> Result<Arc<dyn ExecutionPlan>> {
123 Ok(DataSourceExec::from_data_source(conf))
124 }
125
126 fn file_source(&self) -> Arc<dyn FileSource> {
127 Arc::new(OrcSource::default())
128 }
129}