datafusion_datasource_orc/
reader.rs1use bytes::Bytes;
24use datafusion_physical_plan::metrics::Count;
25use futures_util::future::BoxFuture;
26use object_store::path::Path;
27use object_store::ObjectStore;
28use orc_rust::reader::AsyncChunkReader;
29use std::sync::Arc;
30
31pub struct ObjectStoreChunkReader {
41 store: Arc<dyn ObjectStore>,
42 path: Path,
43 file_size: Option<u64>,
44 bytes_scanned: Option<Count>,
46 io_requests: Option<Count>,
48}
49
50impl ObjectStoreChunkReader {
51 pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
53 Self {
54 store,
55 path,
56 file_size: None,
57 bytes_scanned: None,
58 io_requests: None,
59 }
60 }
61
62 pub fn with_size(store: Arc<dyn ObjectStore>, path: Path, size: u64) -> Self {
64 Self {
65 store,
66 path,
67 file_size: Some(size),
68 bytes_scanned: None,
69 io_requests: None,
70 }
71 }
72
73 pub fn with_metrics(mut self, bytes_scanned: Count, io_requests: Count) -> Self {
88 self.bytes_scanned = Some(bytes_scanned);
89 self.io_requests = Some(io_requests);
90 self
91 }
92
93 fn record_io_request(&self) {
95 if let Some(ref counter) = self.io_requests {
96 counter.add(1);
97 }
98 }
99}
100
101impl AsyncChunkReader for ObjectStoreChunkReader {
102 fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
103 Box::pin(async move {
104 if let Some(size) = self.file_size {
105 Ok(size)
106 } else {
107 self.record_io_request();
109 let meta =
110 self.store.head(&self.path).await.map_err(|e| {
111 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
112 })?;
113 self.file_size = Some(meta.size as u64);
114 Ok(meta.size as u64)
115 }
116 })
117 }
118
119 fn get_bytes(
120 &mut self,
121 offset_from_start: u64,
122 length: u64,
123 ) -> BoxFuture<'_, std::io::Result<Bytes>> {
124 let store = Arc::clone(&self.store);
125 let path = self.path.clone();
126 let bytes_scanned = self.bytes_scanned.clone();
127 let io_requests = self.io_requests.clone();
128
129 Box::pin(async move {
130 if let Some(ref counter) = io_requests {
132 counter.add(1);
133 }
134
135 let range = offset_from_start..(offset_from_start + length);
136 let bytes = store
137 .get_range(&path, range)
138 .await
139 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
140
141 if let Some(ref counter) = bytes_scanned {
143 counter.add(bytes.len());
144 }
145
146 Ok(bytes)
147 })
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use object_store::memory::InMemory;
155 use orc_rust::reader::AsyncChunkReader;
156
157 #[tokio::test]
158 async fn test_object_store_chunk_reader_new() {
159 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
160 let path = Path::from("test.orc");
161 let reader = ObjectStoreChunkReader::new(Arc::clone(&store), path.clone());
162
163 assert!(reader.file_size.is_none());
164 assert_eq!(reader.path, path);
165 }
166
167 #[tokio::test]
168 async fn test_object_store_chunk_reader_with_size() {
169 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
170 let path = Path::from("test.orc");
171 let reader = ObjectStoreChunkReader::with_size(Arc::clone(&store), path.clone(), 1024);
172
173 assert_eq!(reader.file_size, Some(1024));
174 assert_eq!(reader.path, path);
175 }
176
177 #[tokio::test]
178 async fn test_object_store_chunk_reader_len_with_known_size() {
179 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
180 let path = Path::from("test.orc");
181 let mut reader = ObjectStoreChunkReader::with_size(store, path, 2048);
182
183 let len = reader.len().await.unwrap();
184 assert_eq!(len, 2048);
185 }
186
187 #[tokio::test]
188 async fn test_object_store_chunk_reader_get_bytes() {
189 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
190 let path = Path::from("test_file.bin");
191
192 let test_data = bytes::Bytes::from(vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
194 store
195 .put(&path, test_data.clone().into())
196 .await
197 .expect("Failed to upload test data");
198
199 let mut reader =
200 ObjectStoreChunkReader::with_size(Arc::clone(&store), path, test_data.len() as u64);
201
202 let bytes = reader.get_bytes(2, 4).await.unwrap();
204 assert_eq!(bytes.as_ref(), &[2, 3, 4, 5]);
205
206 let bytes = reader.get_bytes(0, 3).await.unwrap();
208 assert_eq!(bytes.as_ref(), &[0, 1, 2]);
209 }
210
211 #[tokio::test]
212 async fn test_object_store_chunk_reader_len_fetch_metadata() {
213 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
214 let path = Path::from("metadata_test.bin");
215
216 let test_data = bytes::Bytes::from(vec![0u8; 512]);
218 store
219 .put(&path, test_data.into())
220 .await
221 .expect("Failed to upload test data");
222
223 let mut reader = ObjectStoreChunkReader::new(Arc::clone(&store), path);
225
226 let len = reader.len().await.unwrap();
228 assert_eq!(len, 512);
229 }
230
231 #[tokio::test]
232 async fn test_object_store_chunk_reader_file_not_found() {
233 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
234 let path = Path::from("nonexistent.orc");
235
236 let mut reader = ObjectStoreChunkReader::new(store, path);
237
238 let result = reader.len().await;
240 assert!(result.is_err());
241 }
242
243 #[tokio::test]
244 async fn test_object_store_chunk_reader_with_metrics() {
245 use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
246
247 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
248 let path = Path::from("metrics_test.bin");
249
250 let test_data = bytes::Bytes::from(vec![0u8; 100]);
252 store
253 .put(&path, test_data.into())
254 .await
255 .expect("Failed to upload test data");
256
257 let metrics_set = ExecutionPlanMetricsSet::new();
259 let bytes_scanned = MetricBuilder::new(&metrics_set).counter("bytes_scanned", 0);
260 let io_requests = MetricBuilder::new(&metrics_set).counter("io_requests", 0);
261
262 let mut reader = ObjectStoreChunkReader::with_size(Arc::clone(&store), path, 100)
263 .with_metrics(bytes_scanned.clone(), io_requests.clone());
264
265 assert_eq!(bytes_scanned.value(), 0);
267 assert_eq!(io_requests.value(), 0);
268
269 let _ = reader.get_bytes(0, 50).await.unwrap();
271 assert_eq!(bytes_scanned.value(), 50);
272 assert_eq!(io_requests.value(), 1);
273
274 let _ = reader.get_bytes(50, 30).await.unwrap();
276 assert_eq!(bytes_scanned.value(), 80);
277 assert_eq!(io_requests.value(), 2);
278 }
279}