datafusion_datasource_orc/
metadata.rs1use arrow::datatypes::SchemaRef;
24use datafusion_common::stats::Precision;
25use datafusion_common::{DataFusionError, Result, Statistics};
26use object_store::{ObjectMeta, ObjectStore};
27use orc_rust::reader::metadata::read_metadata_async;
28use orc_rust::schema::ArrowSchemaOptions;
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use crate::reader::ObjectStoreChunkReader;
33
34pub async fn read_orc_schema(
36 store: &Arc<dyn ObjectStore>,
37 object: &ObjectMeta,
38) -> Result<SchemaRef> {
39 let mut reader =
40 ObjectStoreChunkReader::with_size(Arc::clone(store), object.location.clone(), object.size);
41
42 let file_metadata = read_metadata_async(&mut reader).await.map_err(|e| {
43 DataFusionError::External(format!("Failed to read ORC metadata: {}", e).into())
44 })?;
45
46 let root_data_type = file_metadata.root_data_type();
48 let metadata: HashMap<String, String> = file_metadata
49 .user_custom_metadata()
50 .iter()
51 .map(|(k, v)| (k.clone(), String::from_utf8_lossy(v).to_string()))
52 .collect();
53
54 let options = ArrowSchemaOptions::new();
55 let schema = root_data_type.create_arrow_schema_with_options(&metadata, options);
56
57 Ok(Arc::new(schema))
58}
59
60pub async fn read_orc_statistics(
65 store: &Arc<dyn ObjectStore>,
66 object: &ObjectMeta,
67 _table_schema: SchemaRef,
68) -> Result<Statistics> {
69 let mut reader =
70 ObjectStoreChunkReader::with_size(Arc::clone(store), object.location.clone(), object.size);
71
72 let file_metadata = read_metadata_async(&mut reader).await.map_err(|e| {
73 DataFusionError::External(format!("Failed to read ORC metadata: {}", e).into())
74 })?;
75
76 let num_rows = file_metadata.number_of_rows();
78
79 Ok(Statistics {
82 num_rows: Precision::Exact(num_rows as usize),
83 total_byte_size: Precision::Exact(object.size as usize),
84 column_statistics: vec![],
85 })
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use object_store::local::LocalFileSystem;
92 use object_store::path::Path as ObjectStorePath;
93 use std::path::PathBuf;
94
95 fn get_test_data_dir() -> PathBuf {
96 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
97 .join("tests")
98 .join("integration")
99 .join("data")
100 }
101
102 #[tokio::test]
103 async fn test_read_orc_schema_alltypes() {
104 let test_data_dir = get_test_data_dir();
105 let orc_file = test_data_dir.join("alltypes.snappy.orc");
106
107 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
108 let path = ObjectStorePath::from_filesystem_path(&orc_file).unwrap();
109 let object_meta = store.head(&path).await.unwrap();
110
111 let schema = read_orc_schema(&store, &object_meta).await.unwrap();
112
113 assert!(!schema.fields().is_empty());
115
116 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
118 assert!(field_names.contains(&"boolean"));
119 assert!(field_names.contains(&"int8"));
120 assert!(field_names.contains(&"int64"));
121 assert!(field_names.contains(&"utf8"));
122 }
123
124 #[tokio::test]
125 async fn test_read_orc_statistics_alltypes() {
126 let test_data_dir = get_test_data_dir();
127 let orc_file = test_data_dir.join("alltypes.snappy.orc");
128
129 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
130 let path = ObjectStorePath::from_filesystem_path(&orc_file).unwrap();
131 let object_meta = store.head(&path).await.unwrap();
132
133 let schema = read_orc_schema(&store, &object_meta).await.unwrap();
134 let stats = read_orc_statistics(&store, &object_meta, schema)
135 .await
136 .unwrap();
137
138 assert_eq!(stats.num_rows, Precision::Exact(11));
140
141 match stats.total_byte_size {
143 Precision::Exact(size) => assert!(size > 0),
144 _ => panic!("Expected exact byte size"),
145 }
146 }
147
148 #[tokio::test]
149 async fn test_read_orc_schema_map_list() {
150 let test_data_dir = get_test_data_dir();
151 let orc_file = test_data_dir.join("map_list.snappy.orc");
152
153 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
154 let path = ObjectStorePath::from_filesystem_path(&orc_file).unwrap();
155 let object_meta = store.head(&path).await.unwrap();
156
157 let schema = read_orc_schema(&store, &object_meta).await.unwrap();
158
159 assert!(!schema.fields().is_empty());
161
162 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
164 assert!(field_names.contains(&"id"));
165 assert!(field_names.contains(&"m")); assert!(field_names.contains(&"l")); }
168
169 #[tokio::test]
170 async fn test_read_orc_schema_nonexistent_file() {
171 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
172
173 let path = ObjectStorePath::from("/this/path/definitely/does/not/exist.orc");
176
177 let result = store.head(&path).await;
179 assert!(result.is_err(), "Expected error for non-existent file");
180 }
181}