datafusion_datasource_orc/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ORC metadata processing utilities.
19//!
20//! This module exposes helpers to read ORC file metadata and derive Arrow
21//! schemas and statistics used by DataFusion.
22
23use arrow::datatypes::SchemaRef;
24use datafusion_common::stats::Precision;
25use datafusion_common::{DataFusionError, Result, Statistics};
26use object_store::{ObjectMeta, ObjectStore};
27use orc_rust::reader::metadata::read_metadata_async;
28use orc_rust::schema::ArrowSchemaOptions;
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use crate::reader::ObjectStoreChunkReader;
33
34/// Read ORC file metadata and extract an Arrow schema.
35pub async fn read_orc_schema(
36    store: &Arc<dyn ObjectStore>,
37    object: &ObjectMeta,
38) -> Result<SchemaRef> {
39    let mut reader =
40        ObjectStoreChunkReader::with_size(Arc::clone(store), object.location.clone(), object.size);
41
42    let file_metadata = read_metadata_async(&mut reader).await.map_err(|e| {
43        DataFusionError::External(format!("Failed to read ORC metadata: {}", e).into())
44    })?;
45
46    // Convert ORC schema to Arrow schema
47    let root_data_type = file_metadata.root_data_type();
48    let metadata: HashMap<String, String> = file_metadata
49        .user_custom_metadata()
50        .iter()
51        .map(|(k, v)| (k.clone(), String::from_utf8_lossy(v).to_string()))
52        .collect();
53
54    let options = ArrowSchemaOptions::new();
55    let schema = root_data_type.create_arrow_schema_with_options(&metadata, options);
56
57    Ok(Arc::new(schema))
58}
59
60/// Read ORC file statistics.
61///
62/// TODO: Extract column-level statistics (min/max/null counts) once the
63/// underlying ORC metadata exposes them in a stable form.
64pub async fn read_orc_statistics(
65    store: &Arc<dyn ObjectStore>,
66    object: &ObjectMeta,
67    _table_schema: SchemaRef,
68) -> Result<Statistics> {
69    let mut reader =
70        ObjectStoreChunkReader::with_size(Arc::clone(store), object.location.clone(), object.size);
71
72    let file_metadata = read_metadata_async(&mut reader).await.map_err(|e| {
73        DataFusionError::External(format!("Failed to read ORC metadata: {}", e).into())
74    })?;
75
76    // Extract statistics from ORC file metadata
77    let num_rows = file_metadata.number_of_rows();
78
79    // TODO: Extract column-level statistics (min/max/null counts) from file_metadata
80    // For now, return basic statistics
81    Ok(Statistics {
82        num_rows: Precision::Exact(num_rows as usize),
83        total_byte_size: Precision::Exact(object.size as usize),
84        column_statistics: vec![],
85    })
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91    use object_store::local::LocalFileSystem;
92    use object_store::path::Path as ObjectStorePath;
93    use std::path::PathBuf;
94
95    fn get_test_data_dir() -> PathBuf {
96        PathBuf::from(env!("CARGO_MANIFEST_DIR"))
97            .join("tests")
98            .join("integration")
99            .join("data")
100    }
101
102    #[tokio::test]
103    async fn test_read_orc_schema_alltypes() {
104        let test_data_dir = get_test_data_dir();
105        let orc_file = test_data_dir.join("alltypes.snappy.orc");
106
107        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
108        let path = ObjectStorePath::from_filesystem_path(&orc_file).unwrap();
109        let object_meta = store.head(&path).await.unwrap();
110
111        let schema = read_orc_schema(&store, &object_meta).await.unwrap();
112
113        // Verify schema fields
114        assert!(!schema.fields().is_empty());
115
116        // Check specific fields exist
117        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
118        assert!(field_names.contains(&"boolean"));
119        assert!(field_names.contains(&"int8"));
120        assert!(field_names.contains(&"int64"));
121        assert!(field_names.contains(&"utf8"));
122    }
123
124    #[tokio::test]
125    async fn test_read_orc_statistics_alltypes() {
126        let test_data_dir = get_test_data_dir();
127        let orc_file = test_data_dir.join("alltypes.snappy.orc");
128
129        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
130        let path = ObjectStorePath::from_filesystem_path(&orc_file).unwrap();
131        let object_meta = store.head(&path).await.unwrap();
132
133        let schema = read_orc_schema(&store, &object_meta).await.unwrap();
134        let stats = read_orc_statistics(&store, &object_meta, schema)
135            .await
136            .unwrap();
137
138        // Verify row count
139        assert_eq!(stats.num_rows, Precision::Exact(11));
140
141        // Verify byte size is set
142        match stats.total_byte_size {
143            Precision::Exact(size) => assert!(size > 0),
144            _ => panic!("Expected exact byte size"),
145        }
146    }
147
148    #[tokio::test]
149    async fn test_read_orc_schema_map_list() {
150        let test_data_dir = get_test_data_dir();
151        let orc_file = test_data_dir.join("map_list.snappy.orc");
152
153        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
154        let path = ObjectStorePath::from_filesystem_path(&orc_file).unwrap();
155        let object_meta = store.head(&path).await.unwrap();
156
157        let schema = read_orc_schema(&store, &object_meta).await.unwrap();
158
159        // Verify schema fields
160        assert!(!schema.fields().is_empty());
161
162        // Check specific fields exist
163        let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
164        assert!(field_names.contains(&"id"));
165        assert!(field_names.contains(&"m")); // Map field
166        assert!(field_names.contains(&"l")); // List field
167    }
168
169    #[tokio::test]
170    async fn test_read_orc_schema_nonexistent_file() {
171        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
172
173        // Try to read schema from a file we know doesn't exist
174        // by using a path that's clearly invalid
175        let path = ObjectStorePath::from("/this/path/definitely/does/not/exist.orc");
176
177        // Use head() which will fail for non-existent files
178        let result = store.head(&path).await;
179        assert!(result.is_err(), "Expected error for non-existent file");
180    }
181}