datafusion_datasource_orc/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ORC reader adapters.
19//!
20//! Provides an [`AsyncChunkReader`] implementation backed by `object_store`
21//! so `orc-rust` can read from local or remote object stores.
22
23use bytes::Bytes;
24use datafusion_physical_plan::metrics::Count;
25use futures_util::future::BoxFuture;
26use object_store::path::Path;
27use object_store::ObjectStore;
28use orc_rust::reader::AsyncChunkReader;
29use std::sync::Arc;
30
31/// Adapter to convert ObjectStore to AsyncChunkReader for orc-rust.
32///
33/// This adapter bridges the gap between DataFusion's `ObjectStore` abstraction
34/// and `orc-rust`'s `AsyncChunkReader` trait, enabling ORC files to be read
35/// from any supported object store (local filesystem, S3, GCS, Azure, etc.).
36///
37/// When metrics are provided, it automatically tracks:
38/// - `bytes_scanned`: Total bytes read from the object store
39/// - `io_requests`: Number of I/O operations performed
40pub struct ObjectStoreChunkReader {
41    store: Arc<dyn ObjectStore>,
42    path: Path,
43    file_size: Option<u64>,
44    /// Optional counter for tracking bytes scanned
45    bytes_scanned: Option<Count>,
46    /// Optional counter for tracking I/O requests
47    io_requests: Option<Count>,
48}
49
50impl ObjectStoreChunkReader {
51    /// Create a new ObjectStoreChunkReader
52    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
53        Self {
54            store,
55            path,
56            file_size: None,
57            bytes_scanned: None,
58            io_requests: None,
59        }
60    }
61
62    /// Create with known file size (for optimization)
63    pub fn with_size(store: Arc<dyn ObjectStore>, path: Path, size: u64) -> Self {
64        Self {
65            store,
66            path,
67            file_size: Some(size),
68            bytes_scanned: None,
69            io_requests: None,
70        }
71    }
72
73    /// Attach metrics counters to track I/O statistics.
74    ///
75    /// When metrics are attached, every read operation will automatically
76    /// update the `bytes_scanned` and `io_requests` counters.
77    ///
78    /// # Example
79    ///
80    /// ```ignore
81    /// use datafusion_datasource_orc::metrics::OrcFileMetrics;
82    ///
83    /// let metrics = OrcFileMetrics::new(0, "file.orc", &metrics_set);
84    /// let reader = ObjectStoreChunkReader::with_size(store, path, size)
85    ///     .with_metrics(metrics.bytes_scanned.clone(), metrics.io_requests.clone());
86    /// ```
87    pub fn with_metrics(mut self, bytes_scanned: Count, io_requests: Count) -> Self {
88        self.bytes_scanned = Some(bytes_scanned);
89        self.io_requests = Some(io_requests);
90        self
91    }
92
93    /// Record an I/O request (if metrics are attached)
94    fn record_io_request(&self) {
95        if let Some(ref counter) = self.io_requests {
96            counter.add(1);
97        }
98    }
99}
100
101impl AsyncChunkReader for ObjectStoreChunkReader {
102    fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
103        Box::pin(async move {
104            if let Some(size) = self.file_size {
105                Ok(size)
106            } else {
107                // Fetch metadata to get file size
108                self.record_io_request();
109                let meta =
110                    self.store.head(&self.path).await.map_err(|e| {
111                        std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
112                    })?;
113                self.file_size = Some(meta.size as u64);
114                Ok(meta.size as u64)
115            }
116        })
117    }
118
119    fn get_bytes(
120        &mut self,
121        offset_from_start: u64,
122        length: u64,
123    ) -> BoxFuture<'_, std::io::Result<Bytes>> {
124        let store = Arc::clone(&self.store);
125        let path = self.path.clone();
126        let bytes_scanned = self.bytes_scanned.clone();
127        let io_requests = self.io_requests.clone();
128
129        Box::pin(async move {
130            // Record I/O request
131            if let Some(ref counter) = io_requests {
132                counter.add(1);
133            }
134
135            let range = offset_from_start..(offset_from_start + length);
136            let bytes = store
137                .get_range(&path, range)
138                .await
139                .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
140
141            // Record bytes read
142            if let Some(ref counter) = bytes_scanned {
143                counter.add(bytes.len());
144            }
145
146            Ok(bytes)
147        })
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use object_store::memory::InMemory;
155    use orc_rust::reader::AsyncChunkReader;
156
157    #[tokio::test]
158    async fn test_object_store_chunk_reader_new() {
159        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
160        let path = Path::from("test.orc");
161        let reader = ObjectStoreChunkReader::new(Arc::clone(&store), path.clone());
162
163        assert!(reader.file_size.is_none());
164        assert_eq!(reader.path, path);
165    }
166
167    #[tokio::test]
168    async fn test_object_store_chunk_reader_with_size() {
169        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
170        let path = Path::from("test.orc");
171        let reader = ObjectStoreChunkReader::with_size(Arc::clone(&store), path.clone(), 1024);
172
173        assert_eq!(reader.file_size, Some(1024));
174        assert_eq!(reader.path, path);
175    }
176
177    #[tokio::test]
178    async fn test_object_store_chunk_reader_len_with_known_size() {
179        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
180        let path = Path::from("test.orc");
181        let mut reader = ObjectStoreChunkReader::with_size(store, path, 2048);
182
183        let len = reader.len().await.unwrap();
184        assert_eq!(len, 2048);
185    }
186
187    #[tokio::test]
188    async fn test_object_store_chunk_reader_get_bytes() {
189        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
190        let path = Path::from("test_file.bin");
191
192        // Upload test data
193        let test_data = bytes::Bytes::from(vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
194        store
195            .put(&path, test_data.clone().into())
196            .await
197            .expect("Failed to upload test data");
198
199        let mut reader =
200            ObjectStoreChunkReader::with_size(Arc::clone(&store), path, test_data.len() as u64);
201
202        // Read a portion of the data
203        let bytes = reader.get_bytes(2, 4).await.unwrap();
204        assert_eq!(bytes.as_ref(), &[2, 3, 4, 5]);
205
206        // Read from the beginning
207        let bytes = reader.get_bytes(0, 3).await.unwrap();
208        assert_eq!(bytes.as_ref(), &[0, 1, 2]);
209    }
210
211    #[tokio::test]
212    async fn test_object_store_chunk_reader_len_fetch_metadata() {
213        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
214        let path = Path::from("metadata_test.bin");
215
216        // Upload test data
217        let test_data = bytes::Bytes::from(vec![0u8; 512]);
218        store
219            .put(&path, test_data.into())
220            .await
221            .expect("Failed to upload test data");
222
223        // Create reader without known size
224        let mut reader = ObjectStoreChunkReader::new(Arc::clone(&store), path);
225
226        // Should fetch metadata to get size
227        let len = reader.len().await.unwrap();
228        assert_eq!(len, 512);
229    }
230
231    #[tokio::test]
232    async fn test_object_store_chunk_reader_file_not_found() {
233        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
234        let path = Path::from("nonexistent.orc");
235
236        let mut reader = ObjectStoreChunkReader::new(store, path);
237
238        // Should return error when file doesn't exist
239        let result = reader.len().await;
240        assert!(result.is_err());
241    }
242
243    #[tokio::test]
244    async fn test_object_store_chunk_reader_with_metrics() {
245        use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
246
247        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
248        let path = Path::from("metrics_test.bin");
249
250        // Upload test data
251        let test_data = bytes::Bytes::from(vec![0u8; 100]);
252        store
253            .put(&path, test_data.into())
254            .await
255            .expect("Failed to upload test data");
256
257        // Create metrics
258        let metrics_set = ExecutionPlanMetricsSet::new();
259        let bytes_scanned = MetricBuilder::new(&metrics_set).counter("bytes_scanned", 0);
260        let io_requests = MetricBuilder::new(&metrics_set).counter("io_requests", 0);
261
262        let mut reader = ObjectStoreChunkReader::with_size(Arc::clone(&store), path, 100)
263            .with_metrics(bytes_scanned.clone(), io_requests.clone());
264
265        // Initial state
266        assert_eq!(bytes_scanned.value(), 0);
267        assert_eq!(io_requests.value(), 0);
268
269        // Read some bytes
270        let _ = reader.get_bytes(0, 50).await.unwrap();
271        assert_eq!(bytes_scanned.value(), 50);
272        assert_eq!(io_requests.value(), 1);
273
274        // Read more bytes
275        let _ = reader.get_bytes(50, 30).await.unwrap();
276        assert_eq!(bytes_scanned.value(), 80);
277        assert_eq!(io_requests.value(), 2);
278    }
279}