orc_rust/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub mod metadata;
19
20use std::fs::File;
21use std::io::{BufReader, Read, Seek, SeekFrom};
22
23use bytes::{Buf, Bytes};
24
25/// Primary source used for reading required bytes for operations.
26#[allow(clippy::len_without_is_empty)]
27pub trait ChunkReader {
28    type T: Read;
29
30    /// Get total length of bytes. Useful for parsing the metadata located at
31    /// the end of the file.
32    // TODO: this is only used for file tail, so replace with load_metadata?
33    fn len(&self) -> u64;
34
35    /// Get a reader starting at a specific offset.
36    fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T>;
37
38    /// Read bytes from an offset with specific length.
39    fn get_bytes(&self, offset_from_start: u64, length: u64) -> std::io::Result<Bytes> {
40        let mut bytes = vec![0; length as usize];
41        self.get_read(offset_from_start)?
42            .take(length)
43            .read_exact(&mut bytes)?;
44        Ok(bytes.into())
45    }
46}
47
48impl ChunkReader for File {
49    type T = BufReader<File>;
50
51    fn len(&self) -> u64 {
52        self.metadata().map(|m| m.len()).unwrap_or(0u64)
53    }
54
55    /// Care needs to be taken when using this simultaneously as underlying
56    /// file descriptor is the same and will be affected by other invocations.
57    ///
58    /// See [`File::try_clone()`] for more details.
59    fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T> {
60        let mut reader = self.try_clone()?;
61        reader.seek(SeekFrom::Start(offset_from_start))?;
62        Ok(BufReader::new(self.try_clone()?))
63    }
64}
65
66impl ChunkReader for Bytes {
67    type T = bytes::buf::Reader<Bytes>;
68
69    fn len(&self) -> u64 {
70        self.len() as u64
71    }
72
73    fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T> {
74        Ok(self.slice(offset_from_start as usize..).reader())
75    }
76}
77
78#[cfg(feature = "async")]
79mod async_chunk_reader {
80    use super::*;
81
82    use futures_util::future::BoxFuture;
83    use futures_util::FutureExt;
84    use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
85
86    #[allow(clippy::len_without_is_empty)]
87    pub trait AsyncChunkReader: Send {
88        // TODO: this is only used for file tail, so replace with load_metadata?
89        fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>>;
90
91        fn get_bytes(
92            &mut self,
93            offset_from_start: u64,
94            length: u64,
95        ) -> BoxFuture<'_, std::io::Result<Bytes>>;
96    }
97
98    impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncChunkReader for T {
99        fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
100            async move { self.seek(SeekFrom::End(0)).await }.boxed()
101        }
102
103        fn get_bytes(
104            &mut self,
105            offset_from_start: u64,
106            length: u64,
107        ) -> BoxFuture<'_, std::io::Result<Bytes>> {
108            async move {
109                self.seek(SeekFrom::Start(offset_from_start)).await?;
110                let mut buffer = vec![0; length as usize];
111                self.read_exact(&mut buffer).await?;
112                Ok(buffer.into())
113            }
114            .boxed()
115        }
116    }
117
118    impl AsyncChunkReader for Box<dyn AsyncChunkReader> {
119        fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
120            self.as_mut().len()
121        }
122
123        fn get_bytes(
124            &mut self,
125            offset_from_start: u64,
126            length: u64,
127        ) -> BoxFuture<'_, std::io::Result<Bytes>> {
128            self.as_mut().get_bytes(offset_from_start, length)
129        }
130    }
131}
132
133#[cfg(feature = "async")]
134pub use async_chunk_reader::AsyncChunkReader;
135
136#[cfg(all(feature = "async", feature = "opendal"))]
137mod async_opendal_reader {
138    use crate::reader::AsyncChunkReader;
139    use bytes::Bytes;
140    use futures_util::future::BoxFuture;
141    use opendal::Operator;
142    use std::sync::Arc;
143
144    /// AsyncOpendalReader provides native support for [`opendal`]
145    ///
146    /// ```
147    /// use opendal::Operator;
148    /// use std::io::Result;
149    /// use orc_rust::reader::AsyncOpendalReader;
150    /// use orc_rust::reader::AsyncChunkReader;
151    /// use opendal::services::MemoryConfig;
152    ///
153    /// # async fn test() -> Result<()> {
154    /// let op = Operator::from_config(MemoryConfig::default())?.finish();
155    /// op.write("test", "Hello, world!").await?;
156    ///
157    /// let mut reader = AsyncOpendalReader::new(op, "test");
158    /// let len = reader.len().await?;
159    /// let data = reader.get_bytes(0, len).await?;
160    /// #    Ok(())
161    /// # }
162    /// ```
163    pub struct AsyncOpendalReader {
164        op: Operator,
165        path: Arc<String>,
166    }
167
168    impl AsyncOpendalReader {
169        /// Create a new async opendal reader.
170        pub fn new(op: Operator, path: &str) -> Self {
171            Self {
172                op,
173                path: Arc::new(path.to_string()),
174            }
175        }
176    }
177
178    impl AsyncChunkReader for AsyncOpendalReader {
179        fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
180            let path = self.path.clone();
181            Box::pin(async move {
182                let meta = self.op.stat(&path).await?;
183                Ok(meta.content_length())
184            })
185        }
186
187        fn get_bytes(
188            &mut self,
189            offset_from_start: u64,
190            length: u64,
191        ) -> BoxFuture<'_, std::io::Result<Bytes>> {
192            let path = self.path.clone();
193
194            Box::pin(async move {
195                let reader = self
196                    .op
197                    .read_with(&path)
198                    .range(offset_from_start..offset_from_start + length)
199                    .await?;
200                Ok(reader.to_bytes())
201            })
202        }
203    }
204}
205
206#[cfg(all(feature = "async", feature = "opendal"))]
207pub use async_opendal_reader::AsyncOpendalReader;